diff --git a/.github/workflows/smoke-nifi-services.yml b/.github/workflows/smoke-nifi-services.yml index 782a7c8dd..f59664aff 100644 --- a/.github/workflows/smoke-nifi-services.yml +++ b/.github/workflows/smoke-nifi-services.yml @@ -33,18 +33,16 @@ jobs: - name: Start NiFi services (build) run: | set -euo pipefail - source deploy/export_env_vars.sh - set -euo pipefail - docker compose -f deploy/services.dev.yml up -d --build nifi nifi-nginx nifi-registry-flow + make -C deploy start-nifi-dev-build - name: Smoke tests run: | set -euo pipefail - echo "Running smoke checks against NiFi, NiFi Registry, and nginx." + echo "Running smoke checks against NiFi and nginx." retries=30 delay=15 for attempt in $(seq 1 $retries); do - if ./scripts/smoke_nifi_services.sh; then + if ./scripts/tests/smoke_nifi_services.sh; then exit 0 fi echo "Attempt ${attempt}/${retries} failed. Sleeping ${delay}s..." @@ -60,7 +58,7 @@ jobs: source deploy/export_env_vars.sh set -euo pipefail docker compose -f deploy/services.dev.yml ps - docker compose -f deploy/services.dev.yml logs --no-color nifi nifi-nginx nifi-registry-flow + docker compose -f deploy/services.dev.yml logs --no-color nifi nifi-nginx - name: Shutdown stack if: always() diff --git a/deploy/Makefile b/deploy/Makefile index 367f1fe43..e6a57ac95 100644 --- a/deploy/Makefile +++ b/deploy/Makefile @@ -25,17 +25,23 @@ load-env: show-env: ${WITH_ENV} >/dev/null 2>&1; printenv | sort + +fix-nifi-registry-perms: + $(WITH_ENV) SKIP_EXPORT_ENV=1 ../nifi/fix_nifi_registry_perms.sh $(COMPOSE_FILE) # start services start-nifi: - $(WITH_ENV) docker compose -f services.yml $(DC_START_CMD) nifi nifi-nginx nifi-registry-flow + $(WITH_ENV) SKIP_EXPORT_ENV=1 ../nifi/fix_nifi_registry_perms.sh services.yml; \ + docker compose -f services.yml $(DC_START_CMD) nifi nifi-nginx start-nifi-dev: - $(WITH_ENV) docker compose -f services.dev.yml $(DC_START_CMD) nifi nifi-nginx nifi-registry-flow + $(WITH_ENV) SKIP_EXPORT_ENV=1 ../nifi/fix_nifi_registry_perms.sh services.dev.yml; \ + docker compose -f services.dev.yml $(DC_START_CMD) nifi nifi-nginx start-nifi-dev-build: - $(WITH_ENV) docker compose -f services.dev.yml up -d --build nifi nifi-nginx nifi-registry-flow + $(WITH_ENV) SKIP_EXPORT_ENV=1 ../nifi/fix_nifi_registry_perms.sh services.dev.yml; \ + docker compose -f services.dev.yml up -d --build nifi nifi-nginx start-elastic: $(WITH_ENV) docker compose -f services.yml $(DC_START_CMD) elasticsearch-1 elasticsearch-2 kibana @@ -101,16 +107,38 @@ start-data-infra: start-nifi start-elastic start-samples start-all: start-data-infra start-jupyter start-medcat-service start-ocr-services -.PHONY: start-all start-data-infra start-nifi start-nifi-dev start-nifi-dev-build start-elastic start-samples start-jupyter +.PHONY: start-all start-data-infra start-nifi start-nifi-dev start-nifi-dev-build start-elastic start-samples start-jupyter fix-nifi-registry-perms # stop services # stop-nifi: - $(WITH_ENV) docker compose -f services.yml $(DC_STOP_CMD) nifi nifi-nginx nifi-registry-flow + $(WITH_ENV) docker compose -f services.yml $(DC_STOP_CMD) nifi nifi-nginx stop-nifi-dev: - $(WITH_ENV) docker compose -f services.dev.yml $(DC_STOP_CMD) nifi nifi-nginx nifi-registry-flow + $(WITH_ENV) docker compose -f services.dev.yml $(DC_STOP_CMD) nifi nifi-nginx + +delete-nifi-containers: + $(WITH_ENV) docker compose -f services.yml rm -f -s nifi nifi-nginx + +delete-nifi-dev-containers: + $(WITH_ENV) docker compose -f services.dev.yml rm -f -s nifi nifi-nginx + +delete-nifi-images: + $(WITH_ENV) images="$$(docker compose -f services.yml config --images nifi nifi-nginx | sort -u)"; \ + if [ -n "$$images" ]; then \ + docker image rm -f $$images; \ + else \ + echo "No NiFi images found in services.yml"; \ + fi + +delete-nifi-dev-images: + $(WITH_ENV) images="$$(docker compose -f services.dev.yml config --images nifi nifi-nginx | sort -u)"; \ + if [ -n "$$images" ]; then \ + docker image rm -f $$images; \ + else \ + echo "No NiFi images found in services.dev.yml"; \ + fi stop-elastic: $(WITH_ENV) docker compose -f services.yml $(DC_STOP_CMD) elasticsearch-1 elasticsearch-2 kibana @@ -173,7 +201,7 @@ stop-data-infra: stop-nifi stop-elastic stop-samples stop-all: stop-data-infra stop-jupyter stop-medcat-service stop-ocr-services -.PHONY: stop-data-infra stop-nifi stop-nifi-dev stop-elastic stop-samples stop-jupyter +.PHONY: stop-data-infra stop-nifi stop-nifi-dev delete-nifi-containers delete-nifi-dev-containers delete-nifi-images delete-nifi-dev-images stop-elastic stop-samples stop-jupyter # cleanup diff --git a/deploy/elasticsearch.env b/deploy/elasticsearch.env index 9b3616f55..0c7519e06 100644 --- a/deploy/elasticsearch.env +++ b/deploy/elasticsearch.env @@ -7,7 +7,7 @@ ELASTICSEARCH_VERSION=opensearch # possible values : -# - elasticsearch : docker.elastic.co/elasticsearch/elasticsearch:8.18.2 +# - elasticsearch : docker.elastic.co/elasticsearch/elasticsearch:8.19.11 # - elasticsearch (custom cogstack image) : cogstacksystems/cogstack-elasticsearch:latest # - opensearch : opensearchproject/opensearch:3.4.0 # the custom cogstack image is always based on the last image of ES native @@ -89,6 +89,8 @@ ELASTICSEARCH_SECURITY_DIR=../security/certificates/elastic/ # MEMORY CONFIG ELASTICSEARCH_JAVA_OPTS="-Xms512m -Xmx512m -Des.failure_store_feature_flag_enabled=true" +ES_JAVA_OPTS=$ELASTICSEARCH_JAVA_OPTS +OPENSEARCH_JAVA_OPTS=$ELASTICSEARCH_JAVA_OPTS ELASTICSEARCH_DOCKER_CPU_MIN=1 ELASTICSEARCH_DOCKER_CPU_MAX=1 @@ -163,7 +165,7 @@ KIBANA_VERSION=opensearch-dashboards KIBANA_CONFIG_FILE_VERSION=opensearch_dashboards # possible values: -# - elasticsearch : docker.elastic.co/kibana/kibana:8.18.2 +# - elasticsearch : docker.elastic.co/kibana/kibana:8.19.11 # - elasticsearch (custom cogstack image) : cogstacksystems/cogstack-kibana:latest # - opensearch : opensearchproject/opensearch-dashboards:3.4.0 # the custom cogstack image is always based on the last image of ES native @@ -205,7 +207,7 @@ ELASTICSEARCH_XPACK_SECURITY_REPORTING_ENCRYPTION_KEY="e0Y1gTxHWOopIWMTtpjQsDS6K ######################################################################### METRICBEAT Env vars ########################################################################## -METRICBEAT_IMAGE="docker.elastic.co/beats/metricbeat:8.18.2" +METRICBEAT_IMAGE="docker.elastic.co/beats/metricbeat:8.19.11" METRICBEAT_DOCKER_SHM=512m METRICBEAT_DOCKER_CPU_MIN=1 @@ -222,7 +224,7 @@ FILEBEAT_STARTUP_COMMAND="-e --strict.perms=false" FILEBEAT_HOST="https://elasticsearch-1:9200" -FILEBEAT_IMAGE="docker.elastic.co/beats/filebeat:8.18.2" +FILEBEAT_IMAGE="docker.elastic.co/beats/filebeat:8.19.11" FILEBEAT_DOCKER_SHM=512m diff --git a/deploy/export_env_vars.sh b/deploy/export_env_vars.sh index 2ee8a95cf..c68b4f1b3 100755 --- a/deploy/export_env_vars.sh +++ b/deploy/export_env_vars.sh @@ -41,22 +41,6 @@ env_files=( "$SERVICES_DIR/cogstack-nlp/medcat-service/env/medcat.env" ) -LINT_SCRIPT="$SCRIPT_DIR/../nifi/user_scripts/utils/lint_env.py" - -if [ -e "$LINT_SCRIPT" ]; then - chmod +x $LINT_SCRIPT -fi - -if [ -x "$LINT_SCRIPT" ]; then - echo "🔍 Validating env files..." - if ! python3 "$LINT_SCRIPT" "${env_files[@]}"; then - echo "❌ Env validation failed. Fix the errors above before continuing." - exit 1 - fi -else - echo "⚠️ Skipping env validation; $LINT_SCRIPT not found or not executable." -fi - for env_file in "${env_files[@]}"; do if [ -f "$env_file" ]; then echo "✅ Sourcing $env_file" diff --git a/deploy/network_settings.env b/deploy/network_settings.env index ee2357969..5ae69fcbb 100644 --- a/deploy/network_settings.env +++ b/deploy/network_settings.env @@ -10,7 +10,6 @@ ELASTICSEARCH_2_HOST_NAME="test-2:0.0.0.0" ELASTICSEARCH_3_HOST_NAME="test-3:0.0.0.0" KIBANA_HOST_NAME="test-4:0.0.0.0" NIFI_HOST_NAME="test-5:0.0.0.0" -NIFI_REGISTRY_HOST_NAME="test-6:0.0.0.0" # general network settings HTTPS_PROXY="" @@ -18,4 +17,4 @@ HTTP_PROXY="" NO_PROXY="" no_proxy="" http_proxy="" -https_proxy="" \ No newline at end of file +https_proxy="" diff --git a/deploy/nifi.env b/deploy/nifi.env index c9d007d7a..ddc5fa713 100644 --- a/deploy/nifi.env +++ b/deploy/nifi.env @@ -9,27 +9,20 @@ NIFI_JVM_HEAP_MAX=1g NIFI_DOCKER_SHM_SIZE=1g -NIFI_DOCKER_REGISTRY_SHM_SIZE=1g NIFI_DOCKER_CPU_MIN=1 NIFI_DOCKER_CPU_MAX=1 NIFI_DOCKER_RAM=1g -NIFI_REGISTRY_DOCKER_CPU_MIN=1 -NIFI_REGISTRY_DOCKER_CPU_MAX=1 -NIFI_REGISTRY_DOCKER_RAM=1g - NIFI_DOCKER_LOG_SIZE_PER_FILE="250m" NIFI_DOCKER_LOG_NUM_FILES=10 ############################################################################################################################## NIFI_VERSION="2.7.2" -NIFI_REGISTRY_VERSION=$NIFI_VERSION -# NiFi/NiFi Registry Docker image +# NiFi Docker image NIFI_DOCKER_IMAGE="cogstacksystems/cogstack-nifi:latest" -NIFI_REGISTRY_DOCKER_IMAGE="apache/nifi-registry:${NIFI_REGISTRY_VERSION:-2.7.2}" ############################################################################################################################## @@ -41,6 +34,9 @@ NIFI_DATA_PATH="../data/" NIFI_TOOLKIT_VERSION=$NIFI_VERSION +# this is to mount medcat models (optional) +NIFI_MEDCAT_SERVICE_MODEL_PRODUCTION_PATH="../services/cogstack-nlp/medcat-service/models/" + #### Port and network settings NIFI_WEB_PROXY_CONTEXT_PATH="/nifi" @@ -92,20 +88,3 @@ NIFI_TRUSTSTORE_TYPE=JKS # this is from ./security/certificates_nifi.env, NIFI_SUBJ_LINE_CERTIFICATE_CN NIFI_INITIAL_ADMIN_IDENTITY="cogstack" - -############################################################################################################################## -# NIFI REGISTRY FLOW SECTION -############################################################################################################################## -NIFI_REGISTRY_DB_DIR=/opt/nifi-registry/nifi-registry-current/database -NIFI_REGISTRY_FLOW_PROVIDER=file -NIFI_REGISTRY_FLOW_STORAGE_DIR=/opt/nifi-registry/nifi-registry-current/flow_storage - -NIFI_REGISTRY_FLOW_OUTPUT_PORT=8083 -NIFI_REGISTRY_FLOW_INPUT_PORT=18443 - -NIFI_REGISTRY_EXTERNAL_PORT_NGINX=18443 -NIFI_REGISTRY_INTERNAL_PORT_NGINX=18443 - -NIFI_REGISTRY_KEYSTORE_PATH="/security/certificates/nifi/nifi-keystore.jks" -NIFI_REGISTRY_TRUSTSTORE_PATH="/security/certificates/nifi/nifi-truststore.jks" - diff --git a/deploy/services.dev.yml b/deploy/services.dev.yml index e1970b449..d89b9169b 100644 --- a/deploy/services.dev.yml +++ b/deploy/services.dev.yml @@ -41,7 +41,6 @@ x-common-hosts: &common-hosts - ${ELASTICSEARCH_3_HOST_NAME:-test-3:0.0.0.0} - ${KIBANA_HOST_NAME:-test-4:0.0.0.0} - ${NIFI_HOST_NAME:-test-5:0.0.0.0} - - ${NIFI_REGISTRY_HOST_NAME:-test-6:0.0.0.0} x-common-ulimits: &common-ulimits ulimits: @@ -63,11 +62,11 @@ x-nifi-common: &nifi-common x-nifi-volumes: &nifi-volumes # Drivers - - ../nifi/drivers:/opt/nifi/drivers + - ../nifi/drivers:/opt/nifi/drivers:ro # User overrides bundled in the image - ../nifi/user_scripts:/opt/nifi/user_scripts:rw - - ../nifi/user_schemas:/opt/nifi/user_schemas:rw + - ../nifi/user_schemas:/opt/nifi/user_schemas:ro # Python processors (NiFi 2.x) - ../nifi/user_python_extensions:/opt/nifi/nifi-current/python_extensions:rw @@ -84,9 +83,6 @@ x-nifi-volumes: &nifi-volumes # Ingest data directory - ./${NIFI_DATA_PATH:-../data/}:/data/:rw - # DB schemas - - ../services/cogstack-db/:/opt/cogstack-db/:rw - # MedCAT models - ./${RES_MEDCAT_SERVICE_MODEL_PRODUCTION_PATH:-../services/cogstack-nlp/medcat-service/models/}:/opt/models:rw @@ -101,19 +97,6 @@ x-nifi-volumes: &nifi-volumes # Flowfile error output - nifi-vol-errors:/opt/nifi/pipeline/flowfile-errors -x-nifi-registry-volumes: &nifi-registry-volumes - # Registry configuration - - ../nifi/nifi-registry/:/opt/nifi-registry/nifi-registry-current/conf/:rw - - # Security certificates - - ../security:/security:ro - - # Registry persistence - - nifi-registry-vol-database:/opt/nifi-registry/nifi-registry-current/database - - nifi-registry-vol-flow-storage:/opt/nifi-registry/nifi-registry-current/flow_storage - - nifi-registry-vol-work:/opt/nifi-registry/nifi-registry-current/work - - nifi-registry-vol-logs:/opt/nifi-registry/nifi-registry-current/logs - #---------------------------------------------------------------------------# # Used services # #---------------------------------------------------------------------------# @@ -127,21 +110,10 @@ services: build: context: .. dockerfile: nifi/Dockerfile - args: - HTTP_PROXY: $HTTP_PROXY - HTTPS_PROXY: $HTTPS_PROXY - no_proxy: $no_proxy container_name: cogstack-nifi hostname: nifi shm_size: ${NIFI_DOCKER_SHM_SIZE:-"1g"} environment: - - USER_ID=${NIFI_UID:-1000} - - GROUP_ID=${NIFI_GID:-1000} - - NIFI_WEB_PROXY_HOST=${NIFI_WEB_PROXY_HOST:-"localhost:8443"} - - NIFI_WEB_PROXY_CONTEXT_PATH=${NIFI_WEB_PROXY_CONTEXT_PATH:-"/nifi"} - - NIFI_INTERNAL_PORT=${NIFI_INTERNAL_PORT:-8443} - - NIFI_OUTPUT_PORT=${NIFI_OUTPUT_PORT:-8082} - - NIFI_INPUT_SOCKET_PORT=${NIFI_INPUT_SOCKET_PORT:-10000} - PYTHONPATH=${NIFI_PYTHONPATH:-/opt/nifi/nifi-current/python/framework} - JVM_OPTS="${NIFI_JVM_OPTS:--XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+ParallelRefProcEnabled -Djava.security.egd=file:/dev/./urandom}" deploy: @@ -163,52 +135,6 @@ services: - "${NIFI_OUTPUT_PORT:-8082}:${NIFI_INTERNAL_PORT:-8443}" - "${NIFI_INPUT_SOCKET_PORT:-10000}" logging: *nifi-logging-common - - nifi-registry-flow: - <<: *nifi-common - image: ${NIFI_REGISTRY_DOCKER_IMAGE:-apache/nifi-registry:${NIFI_REGISTRY_VERSION:-latest}} - hostname: nifi-registry - container_name: cogstack-nifi-registry-flow - shm_size: ${NIFI_DOCKER_REGISTRY_SHM_SIZE:-1g} - user: root - environment: - - http_proxy=$HTTP_PROXY - - https_proxy=$HTTPS_PROXY - - no_proxy=$no_proxy - - USER_ID=${NIFI_UID:-1000} - - GROUP_ID=${NIFI_GID:-1000} - - KEYSTORE_PATH=${NIFI_REGISTRY_KEYSTORE_PATH:-/security/certificates/nifi/nifi-keystore.jks} - - KEYSTORE_TYPE=${NIFI_KEYSTORE_TYPE:-jks} - - KEYSTORE_PASSWORD=${NIFI_KEYSTORE_PASSWORD:-"cogstackNifi"} - - TRUSTSTORE_PASSWORD=${NIFI_TRUSTSTORE_PASSWORD:-"cogstackNifi"} - - TRUSTSTORE_PATH=${NIFI_REGISTRY_TRUSTSTORE_PATH:-/security/certificates/nifi/nifi-truststore.jks} - - - TRUSTSTORE_TYPE=${NIFI_TRUSTSTORE_TYPE:-jks} - - INITIAL_ADMIN_IDENTITY=${NIFI_INITIAL_ADMIN_IDENTITY:-"cogstack"} - - AUTH=${NIFI_AUTH:-"tls"} - - NIFI_REGISTRY_DB_DIR=${NIFI_REGISTRY_DB_DIR:-/opt/nifi-registry/nifi-registry-current/database} - #- NIFI_REGISTRY_FLOW_PROVIDER=${NIFI_REGISTRY_FLOW_PROVIDER:-file} - - NIFI_REGISTRY_FLOW_STORAGE_DIR=${NIFI_REGISTRY_FLOW_STORAGE_DIR:-/opt/nifi-registry/nifi-registry-current/flow_storage} - deploy: - resources: - limits: - cpus: "${NIFI_REGISTRY_DOCKER_CPU_MAX}" - memory: "${NIFI_REGISTRY_DOCKER_RAM}" - reservations: - cpus: "${NIFI_REGISTRY_DOCKER_CPU_MIN}" - memory: "${NIFI_REGISTRY_DOCKER_RAM}" - volumes: *nifi-registry-volumes - extra_hosts: *common-hosts - tty: true - ports: - - "${NIFI_REGISTRY_FLOW_OUTPUT_PORT:-8083}:${NIFI_REGISTRY_FLOW_INPUT_PORT:-18443}" - - entrypoint: bash -c "chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/database && \ - chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/flow_storage && \ - chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/work && \ - chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/logs && \ - bash /opt/nifi-registry/scripts/start.sh" - logging: *nifi-logging-common nifi-nginx: build: @@ -232,7 +158,6 @@ services: - ../security/certificates:/certificates:ro ports: - "${NIFI_EXTERNAL_PORT_NGINX:-8443}:${NIFI_INTERNAL_PORT_NGINX:-8443}" - - "${NIFI_REGISTRY_EXTERNAL_PORT_NGINX:-18443}:${NIFI_REGISTRY_INTERNAL_PORT_NGINX:-18443}" networks: - cognet extra_hosts: *common-hosts @@ -264,15 +189,6 @@ volumes: nifi-vol-errors: driver: local - nifi-registry-vol-database: - driver: local - nifi-registry-vol-flow-storage: - driver: local - nifi-registry-vol-work: - driver: local - nifi-registry-vol-logs: - driver: local - #---------------------------------------------------------------------------# # Docker networks. # #---------------------------------------------------------------------------# diff --git a/deploy/services.yml b/deploy/services.yml index a0901553c..d0b16037f 100644 --- a/deploy/services.yml +++ b/deploy/services.yml @@ -52,7 +52,6 @@ x-common-hosts: &common-hosts - ${ELASTICSEARCH_3_HOST_NAME:-test-3:0.0.0.0} - ${KIBANA_HOST_NAME:-test-4:0.0.0.0} - ${NIFI_HOST_NAME:-test-5:0.0.0.0} - - ${NIFI_REGISTRY_HOST_NAME:-test-6:0.0.0.0} x-common-ulimits: &common-ulimits ulimits: @@ -74,11 +73,11 @@ x-nifi-common: &nifi-common x-nifi-volumes: &nifi-volumes # Drivers - - ../nifi/drivers:/opt/nifi/drivers + - ../nifi/drivers:/opt/nifi/drivers:ro # User overrides bundled in the image - ../nifi/user_scripts:/opt/nifi/user_scripts:rw - - ../nifi/user_schemas:/opt/nifi/user_schemas:rw + - ../nifi/user_schemas:/opt/nifi/user_schemas:ro # Python processors (NiFi 2.x) - ../nifi/user_python_extensions:/opt/nifi/nifi-current/python_extensions:rw @@ -95,11 +94,8 @@ x-nifi-volumes: &nifi-volumes # Ingest data directory - ./${NIFI_DATA_PATH:-../data/}:/data/:rw - # DB schemas - - ../services/cogstack-db/:/opt/cogstack-db/:rw - # MedCAT models - - ./${RES_MEDCAT_SERVICE_MODEL_PRODUCTION_PATH:-../services/cogstack-nlp/medcat-service/models/}:/opt/models:rw + - ./${NIFI_MEDCAT_SERVICE_MODEL_PRODUCTION_PATH:-../services/cogstack-nlp/medcat-service/models/}:/opt/models:rw # NiFi repositories/state - nifi-vol-logs:/opt/nifi/nifi-current/logs @@ -112,19 +108,6 @@ x-nifi-volumes: &nifi-volumes # Flowfile error output - nifi-vol-errors:/opt/nifi/pipeline/flowfile-errors -x-nifi-registry-volumes: &nifi-registry-volumes - # Registry configuration - - ../nifi/nifi-registry/:/opt/nifi-registry/nifi-registry-current/conf/:rw - - # Security certificates - - ../security:/security:ro - - # Registry persistence - - nifi-registry-vol-database:/opt/nifi-registry/nifi-registry-current/database - - nifi-registry-vol-flow-storage:/opt/nifi-registry/nifi-registry-current/flow_storage - - nifi-registry-vol-work:/opt/nifi-registry/nifi-registry-current/work - - nifi-registry-vol-logs:/opt/nifi-registry/nifi-registry-current/logs - x-db-common: &db-common <<: *common-ulimits shm_size: ${DATABASE_DOCKER_SHM_SIZE:-"1g"} @@ -145,7 +128,6 @@ x-es-common-volumes: &es-common-volumes - ../services/elasticsearch/config/log4j2_${ELASTICSEARCH_VERSION:-opensearch}.properties:/usr/share/${ELASTICSEARCH_VERSION:-opensearch}/config/log4j2.properties:ro # Shared root CA + admin certs - ../security/certificates/elastic/${ELASTICSEARCH_VERSION:-opensearch}/elastic-stack-ca.crt.pem:/usr/share/${ELASTICSEARCH_VERSION:-opensearch}/config/root-ca.crt:ro - - ../security/certificates/elastic/${ELASTICSEARCH_VERSION:-opensearch}/elastic-stack-ca.key.pem:/usr/share/${ELASTICSEARCH_VERSION:-opensearch}/config/root-ca.key:ro # OPENSEARCH specific (always mounted even if unused) - ../security/certificates/elastic/opensearch/admin.crt:/usr/share/${ELASTICSEARCH_VERSION:-opensearch}/config/admin.crt:ro - ../security/certificates/elastic/opensearch/admin.key.pem:/usr/share/${ELASTICSEARCH_VERSION:-opensearch}/config/admin.key.pem:ro @@ -171,9 +153,6 @@ x-es-common: &es-common networks: - cognet extra_hosts: *common-hosts - environment: - ES_JAVA_OPTS: ${ELASTICSEARCH_JAVA_OPTS:--Xms2048m -Xmx2048m -Des.failure_store_feature_flag_enabled=true} - OPENSEARCH_JAVA_OPTS: ${ELASTICSEARCH_JAVA_OPTS:--Xms2048m -Xmx2048m -Des.failure_store_feature_flag_enabled=true} logging: *es-logging-common deploy: resources: @@ -204,7 +183,6 @@ x-metricbeat-common: &metricbeat-common volumes: - ../services/metricbeat/metricbeat.yml:/usr/share/metricbeat/metricbeat.yml:ro - ../security/certificates/elastic/elasticsearch/elastic-stack-ca.crt.pem:/usr/share/metricbeat/root-ca.crt:ro - - ../security/certificates/elastic/elasticsearch/elastic-stack-ca.key.pem:/usr/share/metricbeat/root-ca.key:ro networks: - cognet extra_hosts: *common-hosts @@ -219,11 +197,6 @@ x-filebeat-common: &filebeat-common env_file: - ./elasticsearch.env - ../security/env/users_elasticsearch.env - environment: - - ELASTICSEARCH_HOSTS=${ELASTICSEARCH_HOSTS:-["https://elasticsearch-1:9200","https://elasticsearch-2:9200"]} - - FILEBEAT_USER=${FILEBEAT_USER:-elastic} - - FILEBEAT_PASSWORD=${FILEBEAT_PASSWORD:-kibanaserver} - - KIBANA_HOST=${KIBANA_HOST:-"https://kibana:5601"} deploy: resources: limits: @@ -233,9 +206,8 @@ x-filebeat-common: &filebeat-common cpus: "${FILEBEAT_DOCKER_CPU_MIN}" memory: "${FILEBEAT_DOCKER_RAM}" volumes: - - ../services/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:rw + - ../services/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro - ../security/certificates/elastic/elasticsearch/elastic-stack-ca.crt.pem:/etc/pki/root/root-ca.crt:ro - - ../security/certificates/elastic/elasticsearch/elastic-stack-ca.key.pem:/etc/pki/root/root-ca.key:ro networks: - cognet extra_hosts: *common-hosts @@ -251,7 +223,7 @@ services: #---------------------------------------------------------------------------# samples-db: <<: *db-common - image: postgres:17.7-alpine + image: postgres:18.1-trixie container_name: cogstack-samples-db platform: linux/amd64 environment: @@ -264,7 +236,7 @@ services: - ../services/pgsamples/schemas:/data/schemas:rw - ../services/pgsamples/init_db.sh:/docker-entrypoint-initdb.d/init_db.sh:ro # data persistence - - samples-vol:/var/lib/postgresql/data + - samples-vol:/var/lib/postgresql command: postgres -c "max_connections=${POSTGRES_DB_MAX_CONNECTIONS:-100}" ports: - 5554:5432 @@ -278,7 +250,7 @@ services: #---------------------------------------------------------------------------# cogstack-databank-db: <<: *db-common - image: postgres:17.7-alpine + image: postgres:18.1-trixie container_name: cogstack-production-databank-db platform: linux/amd64 environment: @@ -290,7 +262,7 @@ services: - ../services/cogstack-db/pgsql/schemas:/data/:ro - ../services/cogstack-db/pgsql/init_db.sh:/docker-entrypoint-initdb.d/init_db.sh:ro # data persistence - - databank-vol:/var/lib/postgresql/data + - databank-vol:/var/lib/postgresql command: postgres -c "max_connections=${POSTGRES_DB_MAX_CONNECTIONS:-100}" ports: - 5558:5432 @@ -298,15 +270,13 @@ services: - 5432 networks: - cognet - + cogstack-databank-db-mssql: <<: *db-common image: mcr.microsoft.com/mssql/server:2019-latest container_name: cogstack-production-databank-db-mssql environment: - ACCEPT_EULA=y - - MSSQL_SA_USER=${MSSQL_SA_USER:-sa} - - MSSQL_SA_PASSWORD=${MSSQL_SA_PASSWORD:-admin!COGSTACK2022} volumes: # mapping postgres data dump and initialization - ../services/cogstack-db/mssql/schemas:/data/:ro @@ -481,7 +451,6 @@ services: # Security certificates, general - ../security/certificates/elastic/${ELASTICSEARCH_VERSION:-opensearch}/elastic-stack-ca.crt.pem:/usr/share/${KIBANA_VERSION:-opensearch-dashboards}/config/root-ca.crt:ro - - ../security/certificates/elastic/${ELASTICSEARCH_VERSION:-opensearch}/elastic-stack-ca.key.pem:/usr/share/${KIBANA_VERSION:-opensearch-dashboards}/config/root-ca.key:ro - ../security/certificates/elastic/${ELASTICSEARCH_VERSION:-opensearch}/elastic-stack-ca.p12:/usr/share/${KIBANA_VERSION:-opensearch-dashboards}/config/root-ca.p12:ro - ../security/certificates/elastic/${ELASTICSEARCH_VERSION:-opensearch}/elasticsearch/${ES_INSTANCE_NAME_1:-elasticsearch-1}/${ES_INSTANCE_NAME_1:-elasticsearch-1}.crt:/usr/share/${KIBANA_VERSION:-opensearch-dashboards}/config/esnode1.crt:ro @@ -509,13 +478,6 @@ services: hostname: nifi shm_size: ${NIFI_DOCKER_SHM_SIZE:-"1g"} environment: - - USER_ID=${NIFI_UID:-1000} - - GROUP_ID=${NIFI_GID:-1000} - - NIFI_WEB_PROXY_HOST=${NIFI_WEB_PROXY_HOST:-"localhost:8443"} - - NIFI_WEB_PROXY_CONTEXT_PATH=${NIFI_WEB_PROXY_CONTEXT_PATH:-"/nifi"} - - NIFI_INTERNAL_PORT=${NIFI_INTERNAL_PORT:-8443} - - NIFI_OUTPUT_PORT=${NIFI_OUTPUT_PORT:-8082} - - NIFI_INPUT_SOCKET_PORT=${NIFI_INPUT_SOCKET_PORT:-10000} - PYTHONPATH=${NIFI_PYTHONPATH:-/opt/nifi/nifi-current/python/framework} - JVM_OPTS="${NIFI_JVM_OPTS:--XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+ParallelRefProcEnabled -Djava.security.egd=file:/dev/./urandom}" deploy: @@ -537,53 +499,7 @@ services: - "${NIFI_OUTPUT_PORT:-8082}:${NIFI_INTERNAL_PORT:-8443}" - "${NIFI_INPUT_SOCKET_PORT:-10000}" logging: *nifi-logging-common - - nifi-registry-flow: - <<: *nifi-common - image: ${NIFI_REGISTRY_DOCKER_IMAGE:-apache/nifi-registry:${NIFI_REGISTRY_VERSION:-latest}} - hostname: nifi-registry - container_name: cogstack-nifi-registry-flow - shm_size: ${NIFI_DOCKER_REGISTRY_SHM_SIZE:-1g} - user: root - environment: - - http_proxy=$HTTP_PROXY - - https_proxy=$HTTPS_PROXY - - no_proxy=$no_proxy - - USER_ID=${NIFI_UID:-1000} - - GROUP_ID=${NIFI_GID:-1000} - - KEYSTORE_PATH=${NIFI_REGISTRY_KEYSTORE_PATH:-/security/certificates/nifi/nifi-keystore.jks} - - KEYSTORE_TYPE=${NIFI_KEYSTORE_TYPE:-jks} - - KEYSTORE_PASSWORD=${NIFI_KEYSTORE_PASSWORD:-"cogstackNifi"} - - TRUSTSTORE_PASSWORD=${NIFI_TRUSTSTORE_PASSWORD:-"cogstackNifi"} - - TRUSTSTORE_PATH=${NIFI_REGISTRY_TRUSTSTORE_PATH:-/security/certificates/nifi/nifi-truststore.jks} - - - TRUSTSTORE_TYPE=${NIFI_TRUSTSTORE_TYPE:-jks} - - INITIAL_ADMIN_IDENTITY=${NIFI_INITIAL_ADMIN_IDENTITY:-"cogstack"} - - AUTH=${NIFI_AUTH:-"tls"} - - NIFI_REGISTRY_DB_DIR=${NIFI_REGISTRY_DB_DIR:-/opt/nifi-registry/nifi-registry-current/database} - #- NIFI_REGISTRY_FLOW_PROVIDER=${NIFI_REGISTRY_FLOW_PROVIDER:-file} - - NIFI_REGISTRY_FLOW_STORAGE_DIR=${NIFI_REGISTRY_FLOW_STORAGE_DIR:-/opt/nifi-registry/nifi-registry-current/flow_storage} - deploy: - resources: - limits: - cpus: "${NIFI_REGISTRY_DOCKER_CPU_MAX}" - memory: "${NIFI_REGISTRY_DOCKER_RAM}" - reservations: - cpus: "${NIFI_REGISTRY_DOCKER_CPU_MIN}" - memory: "${NIFI_REGISTRY_DOCKER_RAM}" - volumes: *nifi-registry-volumes - extra_hosts: *common-hosts - tty: true - ports: - - "${NIFI_REGISTRY_FLOW_OUTPUT_PORT:-8083}:${NIFI_REGISTRY_FLOW_INPUT_PORT:-18443}" - - entrypoint: bash -c "chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/database && \ - chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/flow_storage && \ - chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/work && \ - chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/logs && \ - bash /opt/nifi-registry/scripts/start.sh" - logging: *nifi-logging-common - + nifi-nginx: image: cogstacksystems/nifi-nginx:latest container_name: cogstack-nifi-nginx @@ -604,7 +520,6 @@ services: - ../security/certificates:/certificates:ro ports: - "${NIFI_EXTERNAL_PORT_NGINX:-8443}:${NIFI_INTERNAL_PORT_NGINX:-8443}" - - "${NIFI_REGISTRY_EXTERNAL_PORT_NGINX:-18443}:${NIFI_REGISTRY_INTERNAL_PORT_NGINX:-18443}" networks: - cognet extra_hosts: *common-hosts @@ -638,10 +553,6 @@ services: image: gitea/gitea:1.23-rootless shm_size: ${GITEA_DOCKER_SHM_SIZE:-"1g"} restart: always - environment: - - http_proxy=$HTTP_PROXY - - https_proxy=$HTTPS_PROXY - - no_proxy=$no_proxy deploy: resources: limits: @@ -724,15 +635,6 @@ volumes: nifi-vol-errors: driver: local - nifi-registry-vol-database: - driver: local - nifi-registry-vol-flow-storage: - driver: local - nifi-registry-vol-work: - driver: local - nifi-registry-vol-logs: - driver: local - # Gitea gitea-lib-vol: driver: local diff --git a/docs/deploy/deployment.md b/docs/deploy/deployment.md index 1a0bd7440..65b5c5347 100644 --- a/docs/deploy/deployment.md +++ b/docs/deploy/deployment.md @@ -17,6 +17,14 @@ Make sure you have read the [Prerequisites](./main.md) section before proceeding These variables configure NiFi, Elasticsearch/OpenSearch, Kibana, Jupyter, Metricbeat, the sample DB, etc. +> **Important:** If you run `docker compose` directly (instead of `make`), first load the envs with: +> +> ```bash +> source ./deploy/export_env_vars.sh +> ``` +> +> The Makefile targets already do this for you. + ## 🧩 Modular service design (important) This repository follows a **modular deployment model**: diff --git a/docs/nifi/main.md b/docs/nifi/main.md index 82d899b85..9c0b8b90b 100644 --- a/docs/nifi/main.md +++ b/docs/nifi/main.md @@ -140,6 +140,16 @@ You should check if the env vars have been set after running the script: echo $NIFI_GID ``` +### NiFi Registry permissions helper + +If NiFi Registry fails to start due to permission issues on its persistent volumes, run the helper script once to fix ownership: + + ```bash + ./nifi/fix_nifi_registry_perms.sh + ``` + +This script runs the registry container as root only long enough to `chown` the registry `database`, `flow_storage`, `work`, and `logs` directories, then exits. Subsequent starts can run as the default non-root user. + If the above command prints some numbers then it means that the `export_env_vars.sh` script worked. Otherwise, if you don't see anything, or just blank lines, then you need to execute the following: ```bash @@ -172,7 +182,7 @@ Then execute the `recreate_nifi_docker_image.sh` script located in the `./nifi` bash recreate_nifi_docker_image.sh ``` -Remember that the above export script and/or command are only visible in the current shell, so every time you restart your shell terminal you must execute the `./deploy/export_env_vars.sh` so that the variables will be visible by docker at runtime, because it uses the GID/UID in the `services.yml` file , specifying in the service definition `user: "${USER_ID:-${NIFI_UID:-1000}}:${GROUP_ID:-${NIFI_GID:-1000}}"`. +Remember that the above export script and/or command are only visible in the current shell, so every time you restart your shell terminal you must `source ./deploy/export_env_vars.sh` so the variables are visible to Docker at runtime. If you're using the `deploy/Makefile` targets, it handles this for you. ### `{bootstrap.conf}` diff --git a/nifi/fix_nifi_registry_perms.sh b/nifi/fix_nifi_registry_perms.sh new file mode 100755 index 000000000..7e7e8c27c --- /dev/null +++ b/nifi/fix_nifi_registry_perms.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash + +set -euo pipefail + +# Support being run from any directory. +SCRIPT_SOURCE="${BASH_SOURCE[0]-$0}" +SCRIPT_DIR="$(cd "$(dirname "$SCRIPT_SOURCE")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" + +COMPOSE_FILE="${1:-services.yml}" +if [[ "$COMPOSE_FILE" != /* ]]; then + if [[ "$COMPOSE_FILE" == services*.yml ]]; then + COMPOSE_FILE="deploy/$COMPOSE_FILE" + fi + COMPOSE_PATH="$REPO_ROOT/$COMPOSE_FILE" +else + COMPOSE_PATH="$COMPOSE_FILE" +fi + +if [ ! -f "$COMPOSE_PATH" ]; then + echo "Compose file not found: $COMPOSE_PATH" >&2 + exit 1 +fi + +if [ "${SKIP_EXPORT_ENV:-}" != "1" ]; then + set -a + source "$REPO_ROOT/deploy/export_env_vars.sh" + set +a +fi + +docker compose -f "$COMPOSE_PATH" run --rm --no-deps --user root --entrypoint bash -T nifi-registry-flow \ + -c 'chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/{database,flow_storage,work,logs}' diff --git a/nifi/nifi-registry/authorizations.xml b/nifi/nifi-registry/authorizations.xml deleted file mode 100644 index 34a92ca3b..000000000 --- a/nifi/nifi-registry/authorizations.xml +++ /dev/null @@ -1,284 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/nifi/nifi-registry/authorizers.xml b/nifi/nifi-registry/authorizers.xml deleted file mode 100644 index 31555617f..000000000 --- a/nifi/nifi-registry/authorizers.xml +++ /dev/null @@ -1,334 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - file-user-group-provider - org.apache.nifi.registry.security.authorization.file.FileUserGroupProvider - ./conf/users.xml - - cogstack - C=UK, ST=London, L=UK, O=cogstack, OU=cogstack, CN=cogstack - admin - - - - file-access-policy-provider - org.apache.nifi.registry.security.authorization.file.FileAccessPolicyProvider - file-user-group-provider - ./conf/authorizations.xml - cogstack - cogstack - C=UK, ST=London, L=UK, O=cogstack, OU=cogstack, CN=cogstack - admin - - - - managed-authorizer - org.apache.nifi.registry.security.authorization.StandardManagedAuthorizer - file-access-policy-provider - - - diff --git a/nifi/nifi-registry/bootstrap-aws.conf b/nifi/nifi-registry/bootstrap-aws.conf deleted file mode 100644 index f624dec9b..000000000 --- a/nifi/nifi-registry/bootstrap-aws.conf +++ /dev/null @@ -1,27 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# AWS KMS Key ID is required to be configured for AWS KMS Sensitive Property Provider -aws.kms.key.id= - -# NiFi uses the following properties when authentication to AWS when all values are provided. -# NiFi uses the default AWS credentials provider chain when one or more or the following properties are blank -# AWS SDK documentation describes the default credential retrieval order: -# https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html#credentials-chain -aws.access.key.id= -aws.secret.access.key= -aws.region= diff --git a/nifi/nifi-registry/bootstrap-azure.conf b/nifi/nifi-registry/bootstrap-azure.conf deleted file mode 100644 index 49e318eaa..000000000 --- a/nifi/nifi-registry/bootstrap-azure.conf +++ /dev/null @@ -1,24 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Key Identifier for Azure Key Vault Key Sensitive Property Provider -azure.keyvault.key.id= -# Encryption Algorithm for Azure Key Vault Key Sensitive Property Provider -azure.keyvault.encryption.algorithm= - -# Vault URI for Azure Key Vault Secret Sensitive Property Provider -azure.keyvault.uri= diff --git a/nifi/nifi-registry/bootstrap-gcp.conf b/nifi/nifi-registry/bootstrap-gcp.conf deleted file mode 100644 index 440dad236..000000000 --- a/nifi/nifi-registry/bootstrap-gcp.conf +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# These GCP KMS settings must all be configured in order to use the GCP KMS Sensitive Property Provider -gcp.kms.project= -gcp.kms.location= -gcp.kms.keyring= -gcp.kms.key= \ No newline at end of file diff --git a/nifi/nifi-registry/bootstrap-hashicorp-vault.conf b/nifi/nifi-registry/bootstrap-hashicorp-vault.conf deleted file mode 100644 index bf6975b96..000000000 --- a/nifi/nifi-registry/bootstrap-hashicorp-vault.conf +++ /dev/null @@ -1,53 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# HTTP or HTTPS URI for HashiCorp Vault is required to enable the Sensitive Properties Provider -vault.uri= - -# Transit Path is required to enable the Sensitive Properties Provider Protection Scheme 'hashicorp/vault/transit/{path}' -vault.transit.path= - -# Key/Value Path is required to enable the Sensitive Properties Provider Protection Scheme 'hashicorp/vault/kv/{path}' -vault.kv.path= -# Key/Value Secrets Engine version may be 1 or 2, and defaults to 1 -# vault.kv.version=1 - -# Token Authentication example properties -# vault.authentication=TOKEN -# vault.token= - -# Optional file supports authentication properties described in the Spring Vault Environment Configuration -# https://docs.spring.io/spring-vault/docs/2.3.x/reference/html/#vault.core.environment-vault-configuration -# -# All authentication properties must be included in bootstrap-hashicorp-vault.conf when this property is not specified. -# Properties in bootstrap-hashicorp-vault.conf take precedence when the same values are defined in both files. -# Token Authentication is the default when the 'vault.authentication' property is not specified. -vault.authentication.properties.file= - -# Optional Timeout properties -vault.connection.timeout=5 secs -vault.read.timeout=15 secs - -# Optional TLS properties -vault.ssl.enabledCipherSuites= -vault.ssl.enabledProtocols= -vault.ssl.key-store= -vault.ssl.key-store-type= -vault.ssl.key-store-password= -vault.ssl.trust-store= -vault.ssl.trust-store-type= -vault.ssl.trust-store-password= diff --git a/nifi/nifi-registry/bootstrap.conf b/nifi/nifi-registry/bootstrap.conf deleted file mode 100644 index f10c0fa5c..000000000 --- a/nifi/nifi-registry/bootstrap.conf +++ /dev/null @@ -1,50 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Java command to use when running nifi-registry -java=java - -# Username to use when running nifi-registry. This value will be ignored on Windows. -run.as= - -# Configure the working directory for launching the NiFi Registry process -# If not specified, the working directory will fall back to using the NIFI_REGISTRY_HOME env variable -# If the environment variable is not specified, the working directory will fall back to the parent of this file's parent -working.dir= - -# Configure where nifi-registry's lib and conf directories live -lib.dir=./lib -conf.dir=./conf -docs.dir=./docs - -# How long to wait after telling nifi-registry to shutdown before explicitly killing the Process -graceful.shutdown.seconds=20 - -# Disable JSR 199 so that we can use JSP's without running a JDK -java.arg.1=-Dorg.apache.jasper.compiler.disablejsr199=true - -# JVM memory settings -java.arg.2=-Xms512m -java.arg.3=-Xmx512m - -# Enable Remote Debugging -#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000 - -# allowRestrictedHeaders is required for Cluster/Node communications to work properly -java.arg.5=-Dsun.net.http.allowRestrictedHeaders=true -java.arg.6=-Djava.protocol.handler.pkgs=sun.net.www.protocol - diff --git a/nifi/nifi-registry/groups.xml b/nifi/nifi-registry/groups.xml deleted file mode 100644 index 005f58e20..000000000 --- a/nifi/nifi-registry/groups.xml +++ /dev/null @@ -1,4 +0,0 @@ - - - - \ No newline at end of file diff --git a/nifi/nifi-registry/identity-providers.xml b/nifi/nifi-registry/identity-providers.xml deleted file mode 100644 index 656d3ae5d..000000000 --- a/nifi/nifi-registry/identity-providers.xml +++ /dev/null @@ -1,111 +0,0 @@ - - - - - - - - - - - file-identity-provider - org.apache.nifi.registry.security.identity.provider.FileIdentityProvider - ./conf/users.xml - - - \ No newline at end of file diff --git a/nifi/nifi-registry/logback.xml b/nifi/nifi-registry/logback.xml deleted file mode 100644 index 3044c9366..000000000 --- a/nifi/nifi-registry/logback.xml +++ /dev/null @@ -1,133 +0,0 @@ - - - - - true - - - - ${org.apache.nifi.registry.bootstrap.config.log.dir}/nifi-registry-app.log - - - ${org.apache.nifi.registry.bootstrap.config.log.dir}/nifi-registry-app_%d{yyyy-MM-dd_HH}.%i.log - - 100MB - - 30 - - 3GB - - true - - true - - %date %level [%thread] %logger{40} %msg%n - - - - - ${org.apache.nifi.registry.bootstrap.config.log.dir}/nifi-registry-bootstrap.log - - - ${org.apache.nifi.registry.bootstrap.config.log.dir}/nifi-registry-bootstrap_%d.log - - 30 - - 3GB - - true - - - %date %level [%thread] %logger{40} %msg%n - - - - - ${org.apache.nifi.registry.bootstrap.config.log.dir}/nifi-registry-event.log - - - ${org.apache.nifi.registry.bootstrap.config.log.dir}/nifi-registry-event_%d.log - - 30 - - 3GB - - true - - - %date ## %msg%n - - - - - - %date %level [%thread] %logger{40} %msg%n - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/nifi/nifi-registry/nifi-registry.properties b/nifi/nifi-registry/nifi-registry.properties deleted file mode 100644 index 40bff0249..000000000 --- a/nifi/nifi-registry/nifi-registry.properties +++ /dev/null @@ -1,127 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# web properties # -nifi.registry.web.war.directory=./lib -nifi.registry.web.http.host= -nifi.registry.web.http.port= -nifi.registry.web.https.host=nifi-registry -nifi.registry.web.https.port=18443 -nifi.registry.web.jetty.http2.idle.timeout=300000 -nifi.registry.web.jetty.idle.timeout=300000 -nifi.registry.web.https.network.interface.default= -nifi.registry.web.https.application.protocols=h2 http/1.1 -nifi.registry.web.jetty.working.directory=./work/jetty -nifi.registry.web.jetty.threads=200 -nifi.registry.web.should.send.server.version=true -nifi.registry.web.jetty.use.http2=false - -nifi.registry.web.context.path=/nifi-registry -nifi.registry.web.proxy.context.path=/nifi-registry -nifi.registry.web.proxy.host=localhost:18443,nifi-registry:18443,nifi-registry-flow:18443,cogstack-nifi-registry-flow:18443,cogstack-nifi-registry:18443,nginx.local:18443 - -# security properties # -nifi.registry.security.keystore=/security/certificates/nifi/nifi-keystore.jks -nifi.registry.security.keystoreType=JKS -nifi.registry.security.keystorePasswd=cogstackNifi -nifi.registry.security.keyPasswd=cogstackNifi -nifi.registry.security.truststore=/security/certificates/nifi/nifi-truststore.jks -nifi.registry.security.truststoreType=JKS -nifi.registry.security.truststorePasswd=cogstackNifi -nifi.registry.security.needClientAuth=false -nifi.registry.security.authorizers.configuration.file=./conf/authorizers.xml -nifi.registry.security.authorizer=managed-authorizer -nifi.registry.security.identity.providers.configuration.file=./conf/identity-providers.xml -nifi.registry.security.identity.provider= - -# providers properties # -nifi.registry.providers.configuration.file=./conf/providers.xml - -# registry alias properties # -nifi.registry.registry.alias.configuration.file=./conf/registry-aliases.xml - -# extensions working dir # -nifi.registry.extensions.working.directory=./work/extensions - -# legacy database properties, used to migrate data from original DB to new DB below -# NOTE: Users upgrading from 0.1.0 should leave these populated, but new installs after 0.1.0 should leave these empty -nifi.registry.db.directory= -nifi.registry.db.url.append= - -# database properties -nifi.registry.db.url=jdbc:h2:./database/nifi-registry-primary;AUTOCOMMIT=OFF;DB_CLOSE_ON_EXIT=FALSE;LOCK_MODE=3;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE -nifi.registry.db.driver.class=org.h2.Driver -nifi.registry.db.driver.directory=/opt/nifi-registry/nifi-registry-current/database -nifi.registry.db.username=nifireg -nifi.registry.db.password=nifireg -nifi.registry.db.maxConnections=5 -nifi.registry.db.sql.debug=false - -# extension directories # -# Each property beginning with "nifi.registry.extension.dir." will be treated as location for an extension, -# and a class loader will be created for each location, with the system class loader as the parent -# -#nifi.registry.extension.dir.1=/path/to/extension1 -#nifi.registry.extension.dir.2=/path/to/extension2 - -nifi.registry.extension.dir.aws=./ext/aws/lib - -# Identity Mapping Properties # -# These properties allow normalizing user identities such that identities coming from different identity providers -# (certificates, LDAP, Kerberos) can be treated the same internally in NiFi. The following example demonstrates normalizing -# DNs from certificates and principals from Kerberos into a common identity string: - -nifi.registry.security.identity.mapping.pattern.dn=^CN=(.*?),.*$ -nifi.registry.security.identity.mapping.value.dn=$1 -nifi.registry.security.identity.mapping.transform.dn=NONE - -# nifi.registry.security.identity.mapping.pattern.dn=^CN=(.*?), OU=(.*?), O=(.*?), L=(.*?), ST=(.*?), C=(.*?)$ -# nifi.registry.security.identity.mapping.value.dn=$1@$2 -# nifi.registry.security.identity.mapping.transform.dn=NONE - -# nifi.registry.security.identity.mapping.pattern.kerb=^(.*?)/instance@(.*?)$ -# nifi.registry.security.identity.mapping.value.kerb=$1@$2 -# nifi.registry.security.identity.mapping.transform.kerb=UPPER - -# Group Mapping Properties # -# These properties allow normalizing group names coming from external sources like LDAP. The following example -# lowercases any group name. -# -# nifi.registry.security.group.mapping.pattern.anygroup=^(.*)$ -# nifi.registry.security.group.mapping.value.anygroup=$1 -# nifi.registry.security.group.mapping.transform.anygroup=LOWER - -# User authentication -nifi.registry.security.user.authorizer=managed-authorizer - - -# kerberos properties # -# nifi.registry.kerberos.krb5.file= -# nifi.registry.kerberos.spnego.principal= -# nifi.registry.kerberos.spnego.keytab.location= -# nifi.registry.kerberos.spnego.authentication.expiration=12 hours - -# OIDC # -# nifi.registry.security.user.oidc.discovery.url= -# nifi.registry.security.user.oidc.connect.timeout= -# nifi.registry.security.user.oidc.read.timeout= -# nifi.registry.security.user.oidc.client.id= -# nifi.registry.security.user.oidc.client.secret= -# nifi.registry.security.user.oidc.preferred.jwsalgorithm= -# nifi.registry.security.user.oidc.claim.groups=groups - -# revision management # -# This feature should remain disabled until a future NiFi release that supports the revision API changes -nifi.registry.revisions.enabled=false diff --git a/nifi/nifi-registry/providers.xml b/nifi/nifi-registry/providers.xml deleted file mode 100644 index 63ba86d46..000000000 --- a/nifi/nifi-registry/providers.xml +++ /dev/null @@ -1,91 +0,0 @@ - - - - - - org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider - /opt/nifi-registry/nifi-registry-current/flow_storage - - - - - - - - - - org.apache.nifi.registry.provider.extension.FileSystemBundlePersistenceProvider - ./extension_bundles - - - - diff --git a/nifi/nifi-registry/registry-aliases.xml b/nifi/nifi-registry/registry-aliases.xml deleted file mode 100644 index 9bd1b2d85..000000000 --- a/nifi/nifi-registry/registry-aliases.xml +++ /dev/null @@ -1,23 +0,0 @@ - - - - - \ No newline at end of file diff --git a/nifi/nifi-registry/users.xml b/nifi/nifi-registry/users.xml deleted file mode 100644 index d2488ff3a..000000000 --- a/nifi/nifi-registry/users.xml +++ /dev/null @@ -1,10 +0,0 @@ - - - - - - - - - - diff --git a/nifi/user_python_extensions/convert_avro_binary_field_to_base64.py b/nifi/user_python_extensions/convert_avro_binary_field_to_base64.py index 17829a55e..fadee638e 100644 --- a/nifi/user_python_extensions/convert_avro_binary_field_to_base64.py +++ b/nifi/user_python_extensions/convert_avro_binary_field_to_base64.py @@ -2,7 +2,6 @@ import copy import io import json -import traceback from avro.datafile import DataFileReader, DataFileWriter from avro.io import DatumReader, DatumWriter @@ -14,7 +13,6 @@ StandardValidators, ) from nifiapi.relationship import Relationship -from overrides import overrides from py4j.java_gateway import JavaObject, JVMView from nifi.user_scripts.utils.nifi.base_nifi_processor import BaseNiFiProcessor @@ -83,10 +81,9 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties self.relationships: list[Relationship] = self._relationships - @overrides - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: """ - Transforms an Avro flow file by converting a specified binary field to a base64-encoded string. + Processes an Avro flow file by converting a specified binary field to a base64-encoded string. Args: context (ProcessContext): The process context containing processor properties. @@ -100,69 +97,64 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr FlowFileTransformResult: The result containing the transformed flow file, updated attributes, and relationship. """ - try: - self.process_context = context - self.set_properties(context.getProperties()) - - # read avro record - input_raw_bytes: bytes = flowFile.getContentsAsBytes() - input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) - reader: DataFileReader = DataFileReader(input_byte_buffer, DatumReader()) - - schema: Schema | None = reader.datum_reader.writers_schema - - # change the datatype of the binary field from bytes to string - # (avoids headaches later on when converting avro to json) - # because if we dont change the schema the native NiFi converter will convert bytes to an array of integers. - output_schema = None - if schema is not None and isinstance(schema, RecordSchema): - schema_dict = copy.deepcopy(schema.to_json()) - for field in schema_dict["fields"]: # type: ignore - self.logger.info(str(field)) - if field["name"] == self.binary_field_name: - field["type"] = ["null", "string"] - break - output_schema = parse(json.dumps(schema_dict)) - - # Write them to a binary avro stream - output_byte_buffer = io.BytesIO() - writer = DataFileWriter(output_byte_buffer, DatumWriter(), output_schema) - - for record in reader: - if type(record) is dict: - record_document_binary_data = record.get(str(self.binary_field_name), None) - - if record_document_binary_data is not None: - if self.operation_mode == "base64": - record_document_binary_data = base64.b64encode(record_document_binary_data).decode() - else: - self.logger.info("No binary data found in record, using empty content") + # read avro record + input_raw_bytes: bytes = flowFile.getContentsAsBytes() + input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) + reader: DataFileReader = DataFileReader(input_byte_buffer, DatumReader()) + + schema: Schema | None = reader.datum_reader.writers_schema + + # change the datatype of the binary field from bytes to string + # (avoids headaches later on when converting avro to json) + # because if we dont change the schema the native NiFi converter will convert bytes to an array of integers. + output_schema = None + if schema is not None and isinstance(schema, RecordSchema): + schema_dict = copy.deepcopy(schema.to_json()) + for field in schema_dict["fields"]: # type: ignore + self.logger.info(str(field)) + if field["name"] == self.binary_field_name: + field["type"] = ["null", "string"] + break + output_schema = parse(json.dumps(schema_dict)) + + # Write them to a binary avro stream + output_byte_buffer = io.BytesIO() + writer = DataFileWriter(output_byte_buffer, DatumWriter(), output_schema) + + for record in reader: + if type(record) is dict: + record_document_binary_data = record.get(str(self.binary_field_name), None) + + if record_document_binary_data is not None: + if self.operation_mode == "base64": + record_document_binary_data = base64.b64encode(record_document_binary_data).decode() else: - raise TypeError("Expected Avro record to be a dictionary, but got: " + str(type(record))) - - _tmp_record = {} - _tmp_record[str(self.binary_field_name)] = record_document_binary_data - - for k, v in record.items(): - if k != str(self.binary_field_name): - _tmp_record[k] = v - - writer.append(_tmp_record) - - input_byte_buffer.close() - reader.close() - writer.flush() - output_byte_buffer.seek(0) - - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - attributes["document_id_field_name"] = str(self.document_id_field_name) - attributes["binary_field"] = str(self.binary_field_name) - attributes["operation_mode"] = str(self.operation_mode) - attributes["mime.type"] = "application/avro-binary" - - return FlowFileTransformResult(relationship="success", - attributes=attributes, - contents=output_byte_buffer.getvalue()) - except Exception as exception: - self.logger.error("Exception during Avro processing: " + traceback.format_exc()) - raise exception \ No newline at end of file + self.logger.info("No binary data found in record, using empty content") + else: + raise TypeError("Expected Avro record to be a dictionary, but got: " + str(type(record))) + + _tmp_record = {} + _tmp_record[str(self.binary_field_name)] = record_document_binary_data + + for k, v in record.items(): + if k != str(self.binary_field_name): + _tmp_record[k] = v + + writer.append(_tmp_record) + + input_byte_buffer.close() + reader.close() + writer.flush() + output_byte_buffer.seek(0) + + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + attributes["document_id_field_name"] = str(self.document_id_field_name) + attributes["binary_field"] = str(self.binary_field_name) + attributes["operation_mode"] = str(self.operation_mode) + attributes["mime.type"] = "application/avro-binary" + + return FlowFileTransformResult( + relationship="success", + attributes=attributes, + contents=output_byte_buffer.getvalue(), + ) diff --git a/nifi/user_python_extensions/convert_json_record_schema.py b/nifi/user_python_extensions/convert_json_record_schema.py index ac29e842e..ed313d8f4 100644 --- a/nifi/user_python_extensions/convert_json_record_schema.py +++ b/nifi/user_python_extensions/convert_json_record_schema.py @@ -1,12 +1,10 @@ import json -import traceback from collections import defaultdict from typing import Any from nifiapi.flowfiletransform import FlowFileTransformResult from nifiapi.properties import ProcessContext, PropertyDescriptor, StandardValidators from nifiapi.relationship import Relationship -from overrides import overrides from py4j.java_gateway import JavaObject, JVMView from nifi.user_scripts.utils.nifi.base_nifi_processor import BaseNiFiProcessor @@ -322,35 +320,29 @@ def map_record(self, record: dict, json_mapper_schema: dict) -> dict: return new_record - @overrides - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: output_contents: list[dict[Any, Any]] = [] - try: - self.process_context: ProcessContext = context - self.set_properties(context.getProperties()) + # read avro record + input_raw_bytes: bytes = flowFile.getContentsAsBytes() + records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8")) - # read avro record - input_raw_bytes: bytes = flowFile.getContentsAsBytes() - records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8")) + if isinstance(records, dict): + records = [records] - if isinstance(records, dict): - records = [records] + json_mapper_schema: dict = {} + with open(self.json_mapper_schema_path) as file: + json_mapper_schema = json.load(file) - json_mapper_schema: dict = {} - with open(self.json_mapper_schema_path) as file: - json_mapper_schema = json.load(file) + for record in records: + output_contents.append(self.map_record(record, json_mapper_schema)) - for record in records: - output_contents.append(self.map_record(record, json_mapper_schema)) + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + attributes["json_mapper_schema_path"] = str(self.json_mapper_schema_path) + attributes["mime.type"] = "application/json" - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - attributes["json_mapper_schema_path"] = str(self.json_mapper_schema_path) - attributes["mime.type"] = "application/json" - - return FlowFileTransformResult(relationship="success", - attributes=attributes, - contents=json.dumps(output_contents).encode('utf-8')) - except Exception as exception: - self.logger.error("Exception during flowfile processing: " + traceback.format_exc()) - raise exception \ No newline at end of file + return FlowFileTransformResult( + relationship="success", + attributes=attributes, + contents=json.dumps(output_contents).encode("utf-8"), + ) diff --git a/nifi/user_python_extensions/convert_json_to_attribute.py b/nifi/user_python_extensions/convert_json_to_attribute.py index 231663be8..43a11bf7a 100644 --- a/nifi/user_python_extensions/convert_json_to_attribute.py +++ b/nifi/user_python_extensions/convert_json_to_attribute.py @@ -1,10 +1,8 @@ import json import re -import traceback from nifiapi.flowfiletransform import FlowFileTransformResult from nifiapi.properties import ProcessContext, PropertyDescriptor, StandardValidators -from overrides import overrides from py4j.java_gateway import JavaObject, JVMView from nifi.user_scripts.utils.nifi.base_nifi_processor import BaseNiFiProcessor @@ -44,46 +42,39 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties - @overrides - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: DIGITS = re.compile(r"^\d+$") + + # read avro record + input_raw_bytes: bytes = flowFile.getContentsAsBytes() + text = (input_raw_bytes.decode("utf-8", errors="replace").strip() if input_raw_bytes else "[]") + try: - self.process_context = context - self.set_properties(context.getProperties()) - - # read avro record - input_raw_bytes: bytes = flowFile.getContentsAsBytes() - text = (input_raw_bytes.decode("utf-8", errors="replace").strip() if input_raw_bytes else "[]") - - try: - parsed = json.loads(text) if text else [] - except Exception: - parsed = [] - - records = parsed if isinstance(parsed, list) else parsed.get("records", []) - if not isinstance(records, list): - records = [] - - ids = [] - for r in records: - if not isinstance(r, dict): - continue - v = r.get(self.field_name) - if v is None: - continue - s = str(v).strip() - if DIGITS.match(s): - ids.append(s) - - ids_csv = ",".join(ids) - return FlowFileTransformResult( - relationship="success", - attributes={ - "ids_csv": ids_csv, - "ids_count": str(len(ids)), - "ids_len": str(len(ids_csv)), - }, - ) - except Exception as exception: - self.logger.error("Exception during Avro processing: " + traceback.format_exc()) - raise exception \ No newline at end of file + parsed = json.loads(text) if text else [] + except Exception: + parsed = [] + + records = parsed if isinstance(parsed, list) else parsed.get("records", []) + if not isinstance(records, list): + records = [] + + ids = [] + for r in records: + if not isinstance(r, dict): + continue + v = r.get(self.field_name) + if v is None: + continue + s = str(v).strip() + if DIGITS.match(s): + ids.append(s) + + ids_csv = ",".join(ids) + return FlowFileTransformResult( + relationship="success", + attributes={ + "ids_csv": ids_csv, + "ids_count": str(len(ids)), + "ids_len": str(len(ids_csv)), + }, + ) diff --git a/nifi/user_python_extensions/convert_record_parquet_to_json.py b/nifi/user_python_extensions/convert_record_parquet_to_json.py index cde065aee..bb2f49d83 100644 --- a/nifi/user_python_extensions/convert_record_parquet_to_json.py +++ b/nifi/user_python_extensions/convert_record_parquet_to_json.py @@ -1,10 +1,8 @@ import io import json -import traceback from nifiapi.flowfiletransform import FlowFileTransformResult from nifiapi.properties import ProcessContext, PropertyDescriptor -from overrides import overrides from py4j.java_gateway import JavaObject, JVMView from pyarrow import parquet @@ -30,49 +28,43 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties - @overrides - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: """ """ - try: - self.process_context = context - self.set_properties(context.getProperties()) + # read avro record + input_raw_bytes: bytes = flowFile.getContentsAsBytes() + input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) - # read avro record - input_raw_bytes: bytes = flowFile.getContentsAsBytes() - input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) + parquet_file = parquet.ParquetFile(input_byte_buffer) - parquet_file = parquet.ParquetFile(input_byte_buffer) + output_buffer: io.BytesIO = io.BytesIO() + record_count: int = 0 - output_buffer: io.BytesIO = io.BytesIO() - record_count: int = 0 + for batch in parquet_file.iter_batches(batch_size=10000): + records: list[dict] = batch.to_pylist() - for batch in parquet_file.iter_batches(batch_size=10000): - records: list[dict] = batch.to_pylist() + for record in records: + json_record = json.dumps( + record, + ensure_ascii=False, + separators=(",", ":"), + default=parquet_json_data_type_convert, + ) - for record in records: - json_record = json.dumps( - record, - ensure_ascii=False, - separators=(",", ":"), - default=parquet_json_data_type_convert, - ) + output_buffer.write(json_record.encode("utf-8")) + output_buffer.write(b"\n") + record_count += len(records) - output_buffer.write(json_record.encode("utf-8")) - output_buffer.write(b"\n") - record_count += len(records) + input_byte_buffer.close() - input_byte_buffer.close() + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + attributes["mime.type"] = "application/x-ndjson" + attributes["record.count"] = str(record_count) - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - attributes["mime.type"] = "application/x-ndjson" - attributes["record.count"] = str(record_count) - - return FlowFileTransformResult(relationship="success", - attributes=attributes, - contents=output_buffer.getvalue()) - except Exception as exception: - self.logger.error("Exception during Avro processing: " + traceback.format_exc()) - raise exception \ No newline at end of file + return FlowFileTransformResult( + relationship="success", + attributes=attributes, + contents=output_buffer.getvalue(), + ) diff --git a/nifi/user_python_extensions/parse_service_response.py b/nifi/user_python_extensions/parse_service_response.py index e4eddf8c3..d8ea0a779 100644 --- a/nifi/user_python_extensions/parse_service_response.py +++ b/nifi/user_python_extensions/parse_service_response.py @@ -1,5 +1,4 @@ import json -import traceback from nifiapi.flowfiletransform import FlowFileTransformResult from nifiapi.properties import ( @@ -8,7 +7,6 @@ StandardValidators, ) from nifiapi.relationship import Relationship -from overrides import overrides from py4j.java_gateway import JavaObject, JVMView from nifi.user_scripts.utils.nifi.base_nifi_processor import BaseNiFiProcessor @@ -95,10 +93,9 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties self.relationships: list[Relationship] = self._relationships - @overrides - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: """ - Transforms the input FlowFile by parsing the service response and extracting relevant fields. + Processes the input FlowFile by parsing the service response and extracting relevant fields. Args: context (ProcessContext): The process context containing processor properties. @@ -113,87 +110,83 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr output_contents: list = [] - try: - self.process_context: ProcessContext = context - self.set_properties(context.getProperties()) + # read avro record + input_raw_bytes: bytes = flowFile.getContentsAsBytes() - # read avro record - input_raw_bytes: bytes = flowFile.getContentsAsBytes() + records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8")) - records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8")) + if isinstance(records, dict): + records = [records] - if isinstance(records, dict): - records = [records] + if self.service_message_type == "ocr": + for record in records: + result = record.get("result", {}) - if self.service_message_type == "ocr": - for record in records: - result = record.get("result", {}) + _record = {} + _record["metadata"] = result.get("metadata", {}) + _record[self.output_text_field_name] = result.get("text", "") + _record["success"] = result.get("success", False) + _record["timestamp"] = result.get("timestamp", None) - _record = {} - _record["metadata"] = result.get("metadata", {}) - _record[self.output_text_field_name] = result.get("text", "") - _record["success"] = result.get("success", False) - _record["timestamp"] = result.get("timestamp", None) + if "footer" in result: + for k, v in result["footer"].items(): + _record[k] = v - if "footer" in result: - for k, v in result["footer"].items(): - _record[k] = v + output_contents.append(_record) - output_contents.append(_record) + elif self.service_message_type == "medcat" and "result" in records[0]: + result = records[0].get("result", []) + medcat_info = records[0].get("medcat_info", {}) - elif self.service_message_type == "medcat" and "result" in records[0]: - result = records[0].get("result", []) - medcat_info = records[0].get("medcat_info", {}) + if isinstance(result, dict): + result = [result] - if isinstance(result, dict): - result = [result] + for annotated_record in result: + annotations = annotated_record.get("annotations", []) + annotations = annotations[0] if len(annotations) > 0 else annotations + footer = annotated_record.get("footer", {}) - for annotated_record in result: - annotations = annotated_record.get("annotations", []) - annotations = annotations[0] if len(annotations) > 0 else annotations - footer = annotated_record.get("footer", {}) + if self.medcat_output_mode == "deid": + _output_annotated_record = {} + _output_annotated_record["service_model"] = medcat_info + _output_annotated_record["timestamp"] = annotated_record.get("timestamp", None) + _output_annotated_record[self.output_text_field_name] = annotated_record.get("text", "") - if self.medcat_output_mode == "deid": - _output_annotated_record = {} - _output_annotated_record["service_model"] = medcat_info - _output_annotated_record["timestamp"] = annotated_record.get("timestamp", None) - _output_annotated_record[self.output_text_field_name] = annotated_record.get("text", "") + if self.medcat_deid_keep_annotations is True: + _output_annotated_record["annotations"] = annotations + else: + _output_annotated_record["annotations"] = {} - if self.medcat_deid_keep_annotations is True: - _output_annotated_record["annotations"] = annotations - else: - _output_annotated_record["annotations"] = {} + for k, v in footer.items(): + _output_annotated_record[k] = v + output_contents.append(_output_annotated_record) - for k, v in footer.items(): - _output_annotated_record[k] = v - output_contents.append(_output_annotated_record) + else: + for annotation_id, annotation_data in annotations.items(): + _output_annotated_record = {} + _output_annotated_record["service_model"] = medcat_info + _output_annotated_record["timestamp"] = annotated_record.get("timestamp", None) - else: - for annotation_id, annotation_data in annotations.items(): - _output_annotated_record = {} - _output_annotated_record["service_model"] = medcat_info - _output_annotated_record["timestamp"] = annotated_record.get("timestamp", None) + for k, v in annotation_data.items(): + _output_annotated_record[k] = v - for k, v in annotation_data.items(): - _output_annotated_record[k] = v + for k, v in footer.items(): + _output_annotated_record[k] = v - for k, v in footer.items(): - _output_annotated_record[k] = v + if self.document_id_field_name in footer: + _output_annotated_record["annotation_id"] = ( + str(footer[self.document_id_field_name]) + "_" + str(annotation_id) + ) - if self.document_id_field_name in footer: - _output_annotated_record["annotation_id"] = \ - str(footer[self.document_id_field_name]) + "_" + str(annotation_id) + output_contents.append(_output_annotated_record) - output_contents.append(_output_annotated_record) + # add properties to flowfile attributes + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + attributes["output_text_field_name"] = str(self.output_text_field_name) + attributes["mime.type"] = "application/json" - # add properties to flowfile attributes - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - attributes["output_text_field_name"] = str(self.output_text_field_name) - attributes["mime.type"] = "application/json" - - return FlowFileTransformResult(relationship="success", - attributes=attributes, - contents=json.dumps(output_contents).encode('utf-8')) - except Exception as exception: - self.logger.error("Exception during flowfile processing: " + traceback.format_exc()) - raise exception \ No newline at end of file + return FlowFileTransformResult( + relationship="success", + attributes=attributes, + contents=json.dumps(output_contents).encode("utf-8"), + ) diff --git a/nifi/user_python_extensions/prepare_record_for_nlp.py b/nifi/user_python_extensions/prepare_record_for_nlp.py index 443de6b2b..40f7f70a7 100644 --- a/nifi/user_python_extensions/prepare_record_for_nlp.py +++ b/nifi/user_python_extensions/prepare_record_for_nlp.py @@ -1,6 +1,5 @@ import io import json -import traceback from typing import Any from avro.datafile import DataFileReader @@ -11,7 +10,6 @@ PropertyDescriptor, StandardValidators, ) -from overrides import overrides from py4j.java_gateway import JavaObject, JVMView from nifi.user_scripts.utils.nifi.base_nifi_processor import BaseNiFiProcessor @@ -55,8 +53,7 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties - @overrides - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: """_summary_ Args: @@ -73,49 +70,46 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr output_contents: list = [] - try: - self.process_context = context - self.set_properties(context.getProperties()) + self.process_flow_file_type = str(self.process_flow_file_type).lower() - self.process_flow_file_type = str(self.process_flow_file_type).lower() + # read avro record + input_raw_bytes: bytes = flowFile.getContentsAsBytes() + input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) - # read avro record - input_raw_bytes: bytes = flowFile.getContentsAsBytes() - input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) + reader: DataFileReader | (list[dict[str, Any]] | list[Any]) - reader: DataFileReader | (list[dict[str, Any]] | list[Any]) + if self.process_flow_file_type == "avro": + reader = DataFileReader(input_byte_buffer, DatumReader()) + else: + json_obj = json.loads(input_byte_buffer.read().decode("utf-8")) + reader = [json_obj] if isinstance(json_obj, dict) else json_obj if isinstance(json_obj, list) else [] - if self.process_flow_file_type == "avro": - reader = DataFileReader(input_byte_buffer, DatumReader()) + for record in reader: + if type(record) is dict: + record_document_text = record.get(str(self.document_text_field_name), "") else: - json_obj = json.loads(input_byte_buffer.read().decode("utf-8")) - reader = [json_obj] if isinstance(json_obj, dict) else json_obj if isinstance(json_obj, list) else [] + raise TypeError("Expected record to be a dictionary, but got: " + str(type(record))) - for record in reader: - if type(record) is dict: - record_document_text = record.get(str(self.document_text_field_name), "") - else: - raise TypeError("Expected record to be a dictionary, but got: " + str(type(record))) - - output_contents.append({ + output_contents.append( + { "text": record_document_text, - "footer": {k: v for k, v in record.items() if k != str(self.document_text_field_name)} - }) + "footer": {k: v for k, v in record.items() if k != str(self.document_text_field_name)}, + } + ) - input_byte_buffer.close() + input_byte_buffer.close() - if isinstance(reader, DataFileReader): - reader.close() + if isinstance(reader, DataFileReader): + reader.close() - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - attributes["document_id_field_name"] = str(self.document_id_field_name) - attributes["mime.type"] = "application/json" + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + attributes["document_id_field_name"] = str(self.document_id_field_name) + attributes["mime.type"] = "application/json" - output_contents = output_contents[0] if len(output_contents) == 1 else output_contents + output_contents = output_contents[0] if len(output_contents) == 1 else output_contents - return FlowFileTransformResult(relationship="success", - attributes=attributes, - contents=json.dumps({"content": output_contents}).encode("utf-8")) - except Exception as exception: - self.logger.error("Exception during flowfile processing: " + traceback.format_exc()) - raise exception \ No newline at end of file + return FlowFileTransformResult( + relationship="success", + attributes=attributes, + contents=json.dumps({"content": output_contents}).encode("utf-8"), + ) diff --git a/nifi/user_python_extensions/prepare_record_for_ocr.py b/nifi/user_python_extensions/prepare_record_for_ocr.py index e6c41e5ca..709c185d6 100644 --- a/nifi/user_python_extensions/prepare_record_for_ocr.py +++ b/nifi/user_python_extensions/prepare_record_for_ocr.py @@ -1,7 +1,6 @@ import base64 import io import json -import traceback from typing import Any from avro.datafile import DataFileReader @@ -13,7 +12,6 @@ StandardValidators, ) from nifiapi.relationship import Relationship -from overrides import overrides from py4j.java_gateway import JavaObject, JVMView from nifi.user_scripts.utils.nifi.base_nifi_processor import BaseNiFiProcessor @@ -78,68 +76,65 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties self.relationships: list[Relationship] = self._relationships - @overrides - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: output_contents: list = [] - try: - self.process_context = context - self.set_properties(context.getProperties()) - - self.process_flow_file_type = str(self.process_flow_file_type).lower() - - # read avro record - input_raw_bytes: bytes = flowFile.getContentsAsBytes() - input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) - - reader: DataFileReader | (list[dict[str, Any]] | list[Any]) - - if self.process_flow_file_type == "avro": - reader = DataFileReader(input_byte_buffer, DatumReader()) - elif self.process_flow_file_type == "ndjson": - json_lines = input_byte_buffer.read().decode("utf-8").splitlines() - reader = [json.loads(line) for line in json_lines if line.strip()] - else: - json_obj = json.loads(input_byte_buffer.read().decode("utf-8")) - reader = [json_obj] if isinstance(json_obj, dict) else json_obj if isinstance(json_obj, list) else [] - - for record in reader: - if type(record) is dict: - record_document_binary_data = record.get(str(self.binary_field_name), None) - if record_document_binary_data is not None: - if self.operation_mode == "base64": - record_document_binary_data = base64.b64encode(record_document_binary_data).decode() - else: - self.logger.info("No binary data found in record, using empty content") + self.process_flow_file_type = str(self.process_flow_file_type).lower() + + # read avro record + input_raw_bytes: bytes = flowFile.getContentsAsBytes() + input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) + + reader: DataFileReader | (list[dict[str, Any]] | list[Any]) + + if self.process_flow_file_type == "avro": + reader = DataFileReader(input_byte_buffer, DatumReader()) + elif self.process_flow_file_type == "ndjson": + json_lines = input_byte_buffer.read().decode("utf-8").splitlines() + reader = [json.loads(line) for line in json_lines if line.strip()] + else: + json_obj = json.loads(input_byte_buffer.read().decode("utf-8")) + reader = [json_obj] if isinstance(json_obj, dict) else json_obj if isinstance(json_obj, list) else [] + + for record in reader: + if type(record) is dict: + record_document_binary_data = record.get(str(self.binary_field_name), None) + if record_document_binary_data is not None: + if self.operation_mode == "base64": + record_document_binary_data = base64.b64encode(record_document_binary_data).decode() else: - raise TypeError("Expected record to be a dictionary, but got: " + str(type(record))) + self.logger.info("No binary data found in record, using empty content") + else: + raise TypeError("Expected record to be a dictionary, but got: " + str(type(record))) - output_contents.append({ + output_contents.append( + { "binary_data": record_document_binary_data, - "footer": {k: v for k, v in record.items() if k != str(self.binary_field_name)} - }) + "footer": {k: v for k, v in record.items() if k != str(self.binary_field_name)}, + } + ) - input_byte_buffer.close() + input_byte_buffer.close() - if isinstance(reader, DataFileReader): - reader.close() + if isinstance(reader, DataFileReader): + reader.close() - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - attributes["document_id_field_name"] = str(self.document_id_field_name) - attributes["binary_field"] = str(self.binary_field_name) - attributes["output_text_field_name"] = str(self.output_text_field_name) - attributes["mime.type"] = "application/json" + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + attributes["document_id_field_name"] = str(self.document_id_field_name) + attributes["binary_field"] = str(self.binary_field_name) + attributes["output_text_field_name"] = str(self.output_text_field_name) + attributes["mime.type"] = "application/json" - if self.process_flow_file_type == "avro": - return FlowFileTransformResult(relationship="success", - attributes=attributes, - contents=json.dumps(output_contents, cls=AvroJSONEncoder).encode("utf-8") - ) - else: - return FlowFileTransformResult(relationship="success", - attributes=attributes, - contents=json.dumps(output_contents).encode("utf-8")) - except Exception as exception: - self.logger.error("Exception during flowfile processing: " + traceback.format_exc()) - raise exception + if self.process_flow_file_type == "avro": + return FlowFileTransformResult( + relationship="success", + attributes=attributes, + contents=json.dumps(output_contents, cls=AvroJSONEncoder).encode("utf-8"), + ) + + return FlowFileTransformResult( + relationship="success", + attributes=attributes, + contents=json.dumps(output_contents).encode("utf-8"), + ) diff --git a/nifi/user_python_extensions/record_add_geolocation.py b/nifi/user_python_extensions/record_add_geolocation.py index 2a027937f..dce48bbe5 100644 --- a/nifi/user_python_extensions/record_add_geolocation.py +++ b/nifi/user_python_extensions/record_add_geolocation.py @@ -11,7 +11,6 @@ PropertyDescriptor, StandardValidators, ) -from overrides import overrides from py4j.java_gateway import JavaObject, JVMView from nifi.user_scripts.utils.generic import download_file_from_url, safe_delete_paths @@ -88,7 +87,6 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties - @overrides def onScheduled(self, context: ProcessContext) -> None: """ Initializes processor resources when scheduled. Args: @@ -155,9 +153,8 @@ def _check_geolocation_lookup_datafile(self) -> bool: return file_found - @overrides - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: - """ Transforms the input FlowFile by adding geolocation data based on postcode lookup. + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + """ Processes the input FlowFile by adding geolocation data based on postcode lookup. Args: context (ProcessContext): The process context. flowFile (JavaObject): The input FlowFile to be transformed. @@ -171,54 +168,45 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr Use SplitRecord processor to split large files into smaller chunks before processing. """ - try: - self.process_context: ProcessContext = context - self.set_properties(context.getProperties()) - - input_raw_bytes: bytes = flowFile.getContentsAsBytes() - - records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8")) - - valid_records: list[dict] = [] - error_records: list[dict] = [] - - if isinstance(records, dict): - records = [records] - - if self.postcode_lookup_index: - for record in records: - if self.postcode_field_name in record: - _postcode = str(record[self.postcode_field_name]).replace(" ", "") - _data_col_row_idx = self.postcode_lookup_index.get(_postcode, -1) - - if _data_col_row_idx != -1: - _selected_row = self.loaded_csv_file_rows[_data_col_row_idx] - _lat, _long = str(_selected_row[7]).strip(), str(_selected_row[8]).strip() - try: - record[self.geolocation_field_name] = { - "lat": float(_lat), - "lon": float(_long) - } - except ValueError: - self.logger.debug(f"invalid lat/long values for postcode {_postcode}: {_lat}, {_long}") - error_records.append(record) - valid_records.append(record) - else: - raise FileNotFoundError("geolocation lookup datafile is not available and data was not loaded, " \ - "please check URLs") - - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - attributes["mime.type"] = "application/json" - - if error_records: - attributes["record.count.errors"] = str(len(error_records)) - attributes["record.count"] = str(len(valid_records)) - - return FlowFileTransformResult( - relationship="success", - attributes=attributes, - contents=json.dumps(valid_records).encode("utf-8"), + input_raw_bytes: bytes = flowFile.getContentsAsBytes() + + records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8")) + + valid_records: list[dict] = [] + error_records: list[dict] = [] + + if isinstance(records, dict): + records = [records] + + if self.postcode_lookup_index: + for record in records: + if self.postcode_field_name in record: + _postcode = str(record[self.postcode_field_name]).replace(" ", "") + _data_col_row_idx = self.postcode_lookup_index.get(_postcode, -1) + + if _data_col_row_idx != -1: + _selected_row = self.loaded_csv_file_rows[_data_col_row_idx] + _lat, _long = str(_selected_row[7]).strip(), str(_selected_row[8]).strip() + try: + record[self.geolocation_field_name] = {"lat": float(_lat), "lon": float(_long)} + except ValueError: + self.logger.debug(f"invalid lat/long values for postcode {_postcode}: {_lat}, {_long}") + error_records.append(record) + valid_records.append(record) + else: + raise FileNotFoundError( + "geolocation lookup datafile is not available and data was not loaded, please check URLs" ) - except Exception as exception: - self.logger.error("Exception during flowfile processing:\n" + traceback.format_exc()) - return self.build_failure_result(flowFile, exception) \ No newline at end of file + + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + attributes["mime.type"] = "application/json" + + if error_records: + attributes["record.count.errors"] = str(len(error_records)) + attributes["record.count"] = str(len(valid_records)) + + return FlowFileTransformResult( + relationship="success", + attributes=attributes, + contents=json.dumps(valid_records).encode("utf-8"), + ) diff --git a/nifi/user_python_extensions/record_decompress_cerner_blob.py b/nifi/user_python_extensions/record_decompress_cerner_blob.py index 718a8a14b..a26917a2f 100644 --- a/nifi/user_python_extensions/record_decompress_cerner_blob.py +++ b/nifi/user_python_extensions/record_decompress_cerner_blob.py @@ -1,6 +1,5 @@ import base64 import json -import traceback from nifiapi.flowfiletransform import FlowFileTransformResult from nifiapi.properties import ( @@ -8,7 +7,6 @@ PropertyDescriptor, StandardValidators, ) -from overrides import overrides from py4j.java_gateway import JavaObject, JVMView from nifi.user_scripts.utils.codecs.cerner_blob import DecompressLzwCernerBlob @@ -95,8 +93,24 @@ def __init__(self, jvm: JVMView): self.descriptors: list[PropertyDescriptor] = self._properties - @overrides - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def _load_json_records(self, input_raw_bytes: bytes | bytearray) -> list | dict: + try: + return json.loads(input_raw_bytes.decode()) + except json.JSONDecodeError as exc: + self.logger.error(f"Error decoding JSON: {exc} \nAttempting to decode as {self.input_charset}") + try: + return json.loads(input_raw_bytes.decode(self.input_charset)) + except json.JSONDecodeError as exc2: + self.logger.error(f"Error decoding JSON: {exc2} \nAttempting to decode as windows-1252") + try: + return json.loads(input_raw_bytes.decode("windows-1252")) + except json.JSONDecodeError as exc3: + raise ValueError( + "Error decoding JSON after trying utf-8, " + f"{self.input_charset}, and windows-1252: {exc3}" + ) from exc3 + + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: """ Transforms the input FlowFile by decompressing Cerner blob data from JSON records. @@ -114,198 +128,117 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr output_contents: list = [] attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - try: - self.process_context = context - self.set_properties(context.getProperties()) + # read avro record + input_raw_bytes: bytes | bytearray = flowFile.getContentsAsBytes() - # read avro record - input_raw_bytes: bytes | bytearray = flowFile.getContentsAsBytes() + records: list | dict = self._load_json_records(input_raw_bytes) - records: list | dict = [] + if not isinstance(records, list): + records = [records] - try: - records = json.loads(input_raw_bytes.decode()) - except json.JSONDecodeError as e: - self.logger.error(f"Error decoding JSON: {str(e)} \nAttempting to decode as {self.input_charset}") - try: - records = json.loads(input_raw_bytes.decode(self.input_charset)) - except json.JSONDecodeError as e: - self.logger.error(f"Error decoding JSON: {str(e)} \nAttempting to decode as windows-1252") - try: - records = json.loads(input_raw_bytes.decode("windows-1252")) - except json.JSONDecodeError as e: - return self.build_failure_result( - flowFile, - ValueError(f"Error decoding JSON: {str(e)} \n with windows-1252"), - attributes=attributes, - contents=input_raw_bytes, - ) - - if not isinstance(records, list): - records = [records] - - if not records: - return self.build_failure_result( - flowFile, - ValueError("No records found in JSON input"), - attributes=attributes, - contents=input_raw_bytes, - ) + if not records: + raise ValueError("No records found in JSON input") - # sanity check: blobs are from the same document_id - doc_ids: set = {str(r.get(self.document_id_field_name, "")) for r in records} - if len(doc_ids) > 1: - return self.build_failure_result( - flowFile, - ValueError(f"Multiple document IDs in one FlowFile: {list(doc_ids)}"), - attributes=attributes, - contents=input_raw_bytes, - ) + # sanity check: blobs are from the same document_id + doc_ids: set = {str(r.get(self.document_id_field_name, "")) for r in records} + if len(doc_ids) > 1: + raise ValueError(f"Multiple document IDs in one FlowFile: {list(doc_ids)}") + + concatenated_blob_sequence_order: dict = {} + output_merged_record: dict = {} + + have_any_sequence: bool = any(self.blob_sequence_order_field_name in record for record in records) + have_any_no_sequence: bool = any(self.blob_sequence_order_field_name not in record for record in records) - concatenated_blob_sequence_order: dict = {} - output_merged_record: dict = {} - - have_any_sequence: bool = any(self.blob_sequence_order_field_name in record for record in records) - have_any_no_sequence: bool = any(self.blob_sequence_order_field_name not in record for record in records) - - if have_any_sequence and have_any_no_sequence: - return self.build_failure_result( - flowFile, - ValueError( - f"Mixed records: some have '{self.blob_sequence_order_field_name}', some don't. " - "Cannot safely reconstruct blob stream." - ), - attributes=attributes, - contents=input_raw_bytes, + if have_any_sequence and have_any_no_sequence: + raise ValueError( + f"Mixed records: some have '{self.blob_sequence_order_field_name}', some don't. " + "Cannot safely reconstruct blob stream." + ) + + for record in records: + if self.binary_field_name not in record or record[self.binary_field_name] in (None, ""): + raise ValueError(f"Missing '{self.binary_field_name}' in a record") + + if have_any_sequence: + seq = int(record[self.blob_sequence_order_field_name]) + if seq in concatenated_blob_sequence_order: + raise ValueError(f"Duplicate {self.blob_sequence_order_field_name}: {seq}") + + concatenated_blob_sequence_order[seq] = record[self.binary_field_name] + else: + # no sequence anywhere: preserve record order (0..n-1) + seq = len(concatenated_blob_sequence_order) + concatenated_blob_sequence_order[seq] = record[self.binary_field_name] + + # take fields from the first record, doesn't matter which one, + # as they are expected to be the same except for the binary data field + for k, v in records[0].items(): + if k not in output_merged_record and k != self.binary_field_name: + output_merged_record[k] = v + + full_compressed_blob = bytearray() + + # double check to make sure there is no gap in the blob sequence, i.e missing blob. + order_of_blobs_keys = sorted(concatenated_blob_sequence_order.keys()) + for i in range(1, len(order_of_blobs_keys)): + if order_of_blobs_keys[i] != order_of_blobs_keys[i-1] + 1: + raise ValueError( + f"Sequence gap: missing {order_of_blobs_keys[i-1] + 1} " + f"(have {order_of_blobs_keys[i-1]} then {order_of_blobs_keys[i]})" ) - for record in records: - if self.binary_field_name not in record or record[self.binary_field_name] in (None, ""): - return self.build_failure_result( - flowFile, - ValueError(f"Missing '{self.binary_field_name}' in a record"), - attributes=attributes, - contents=input_raw_bytes, - ) + for k in order_of_blobs_keys: + v = concatenated_blob_sequence_order[k] - if have_any_sequence: - seq = int(record[self.blob_sequence_order_field_name]) - if seq in concatenated_blob_sequence_order: - return self.build_failure_result( - flowFile, - ValueError(f"Duplicate {self.blob_sequence_order_field_name}: {seq}"), - attributes=attributes, - contents=input_raw_bytes, - ) - - concatenated_blob_sequence_order[seq] = record[self.binary_field_name] - else: - # no sequence anywhere: preserve record order (0..n-1) - seq = len(concatenated_blob_sequence_order) - concatenated_blob_sequence_order[seq] = record[self.binary_field_name] - - # take fields from the first record, doesn't matter which one, - # as they are expected to be the same except for the binary data field - for k, v in records[0].items(): - if k not in output_merged_record and k != self.binary_field_name: - output_merged_record[k] = v - - full_compressed_blob = bytearray() - - # double check to make sure there is no gap in the blob sequence, i.e missing blob. - order_of_blobs_keys = sorted(concatenated_blob_sequence_order.keys()) - for i in range(1, len(order_of_blobs_keys)): - if order_of_blobs_keys[i] != order_of_blobs_keys[i-1] + 1: - return self.build_failure_result( - flowFile, - ValueError( - f"Sequence gap: missing {order_of_blobs_keys[i-1] + 1} " - f"(have {order_of_blobs_keys[i-1]} then {order_of_blobs_keys[i]})" - ), - attributes=attributes, - contents=input_raw_bytes, - ) + temporary_blob: bytes = b"" - for k in order_of_blobs_keys: - v = concatenated_blob_sequence_order[k] - - temporary_blob: bytes = b"" - - if self.binary_field_source_encoding == "base64": - if not isinstance(v, str): - return self.build_failure_result( - flowFile, - ValueError( - f"Expected base64 string in {self.binary_field_name} for part {k}, got {type(v)}" - ), - attributes=attributes, - contents=input_raw_bytes, - ) - try: - temporary_blob = base64.b64decode(v, validate=True) - except Exception as e: - return self.build_failure_result( - flowFile, - ValueError(f"Error decoding base64 blob part {k}: {e}"), - attributes=attributes, - contents=input_raw_bytes, - ) + if self.binary_field_source_encoding == "base64": + if not isinstance(v, str): + raise ValueError( + f"Expected base64 string in {self.binary_field_name} for part {k}, got {type(v)}" + ) + try: + temporary_blob = base64.b64decode(v, validate=True) + except Exception as exc: + raise ValueError(f"Error decoding base64 blob part {k}: {exc}") from exc + else: + # raw bytes path + if isinstance(v, (bytes, bytearray)): + temporary_blob = v else: - # raw bytes path - if isinstance(v, (bytes, bytearray)): - temporary_blob = v - else: - return self.build_failure_result( - flowFile, - ValueError( - f"Expected bytes in {self.binary_field_name} for part {k}, got {type(v)}" - ), - attributes=attributes, - contents=input_raw_bytes, - ) - - full_compressed_blob.extend(temporary_blob) - - # build / add new attributes to dict before doing anything else to have some trace. - attributes["document_id_field_name"] = str(self.document_id_field_name) - attributes["document_id"] = str(output_merged_record.get(self.document_id_field_name, "")) - attributes["binary_field"] = str(self.binary_field_name) - attributes["output_text_field_name"] = str(self.output_text_field_name) - attributes["mime.type"] = "application/json" - attributes["blob_parts"] = str(len(order_of_blobs_keys)) - attributes["blob_seq_min"] = str(order_of_blobs_keys[0]) if order_of_blobs_keys else "" - attributes["blob_seq_max"] = str(order_of_blobs_keys[-1]) if order_of_blobs_keys else "" - attributes["compressed_len"] = str(len(full_compressed_blob)) - attributes["compressed_head_hex"] = bytes(full_compressed_blob[:16]).hex() + raise ValueError( + f"Expected bytes in {self.binary_field_name} for part {k}, got {type(v)}" + ) - try: - decompress_blob = DecompressLzwCernerBlob() - decompress_blob.decompress(full_compressed_blob) - output_merged_record[self.binary_field_name] = bytes(decompress_blob.output_stream) - except Exception as exception: - return self.build_failure_result( - flowFile, - exception=exception, - attributes=attributes, - include_flowfile_attributes=False, - contents=input_raw_bytes - ) + full_compressed_blob.extend(temporary_blob) - if self.output_mode == "base64": - output_merged_record[self.binary_field_name] = \ - base64.b64encode(output_merged_record[self.binary_field_name]).decode(self.output_charset) + # build / add new attributes to dict before doing anything else to have some trace. + attributes["document_id_field_name"] = str(self.document_id_field_name) + attributes["document_id"] = str(output_merged_record.get(self.document_id_field_name, "")) + attributes["binary_field"] = str(self.binary_field_name) + attributes["output_text_field_name"] = str(self.output_text_field_name) + attributes["mime.type"] = "application/json" + attributes["blob_parts"] = str(len(order_of_blobs_keys)) + attributes["blob_seq_min"] = str(order_of_blobs_keys[0]) if order_of_blobs_keys else "" + attributes["blob_seq_max"] = str(order_of_blobs_keys[-1]) if order_of_blobs_keys else "" + attributes["compressed_len"] = str(len(full_compressed_blob)) + attributes["compressed_head_hex"] = bytes(full_compressed_blob[:16]).hex() - output_contents.append(output_merged_record) + try: + decompress_blob = DecompressLzwCernerBlob() + decompress_blob.decompress(full_compressed_blob) + output_merged_record[self.binary_field_name] = bytes(decompress_blob.output_stream) - return FlowFileTransformResult(relationship=self.REL_SUCCESS, - attributes=attributes, - contents=json.dumps(output_contents).encode("utf-8")) except Exception as exception: - self.logger.error("Exception during flowfile processing: " + traceback.format_exc()) - return self.build_failure_result( - flowFile, - exception, - attributes=attributes, - contents=locals().get("input_raw_bytes", flowFile.getContentsAsBytes()), - include_flowfile_attributes=False - ) + raise RuntimeError("Error decompressing Cerner LZW blob") from exception + + if self.output_mode == "base64": + output_merged_record[self.binary_field_name] = \ + base64.b64encode(output_merged_record[self.binary_field_name]).decode(self.output_charset) + + output_contents.append(output_merged_record) + + return FlowFileTransformResult(relationship=self.REL_SUCCESS, + attributes=attributes, + contents=json.dumps(output_contents).encode("utf-8")) diff --git a/nifi/user_python_extensions/sample_processor.py b/nifi/user_python_extensions/sample_processor.py index a92a05430..73320e635 100644 --- a/nifi/user_python_extensions/sample_processor.py +++ b/nifi/user_python_extensions/sample_processor.py @@ -1,6 +1,5 @@ import io import json -import traceback from typing import Any from avro.datafile import DataFileReader, DataFileWriter @@ -92,7 +91,7 @@ def onScheduled(self, context: ProcessContext) -> None: """ pass - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: """ NOTE: This is a sample method meant to be overridden and reimplemented by subclasses. @@ -115,48 +114,42 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr output_contents: list[Any] = [] - try: - self.process_context: ProcessContext = context - self.set_properties(context.getProperties()) - # add properties to flowfile attributes - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - self.logger.info("Successfully transformed Avro content for OCR") - - input_raw_bytes: bytes = flowFile.getContentsAsBytes() - - # read avro record - self.logger.debug("Reading flowfile content as bytes") - input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) - reader: DataFileReader = DataFileReader(input_byte_buffer, DatumReader()) - - # below is an example of how to handle avro records, each record - schema: Schema | None = reader.datum_reader.writers_schema - - for record in reader: - #do stuff - pass - - # streams need to be closed - input_byte_buffer.close() - reader.close() - - # Write them to a binary avro stre - output_byte_buffer = io.BytesIO() - writer = DataFileWriter(output_byte_buffer, DatumWriter(), schema) - - writer.flush() - writer.close() - output_byte_buffer.seek(0) - - # add properties to flowfile attributes - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - attributes["sample_property_one"] = str(self.sample_property_one) - attributes["sample_property_two"] = str(self.sample_property_two) - attributes["sample_property_three"] = str(self.sample_property_three) - - return FlowFileTransformResult(relationship="success", - attributes=attributes, - contents=json.dumps(output_contents)) - except Exception as exception: - self.logger.error("Exception during Avro processing: " + traceback.format_exc()) - raise exception \ No newline at end of file + # add properties to flowfile attributes + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + self.logger.info("Successfully transformed Avro content for OCR") + + input_raw_bytes: bytes = flowFile.getContentsAsBytes() + + # read avro record + self.logger.debug("Reading flowfile content as bytes") + input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes) + reader: DataFileReader = DataFileReader(input_byte_buffer, DatumReader()) + + # below is an example of how to handle avro records, each record + schema: Schema | None = reader.datum_reader.writers_schema + + for record in reader: + #do stuff + pass + + # streams need to be closed + input_byte_buffer.close() + reader.close() + + # Write them to a binary avro stre + output_byte_buffer = io.BytesIO() + writer = DataFileWriter(output_byte_buffer, DatumWriter(), schema) + + writer.flush() + writer.close() + output_byte_buffer.seek(0) + + # add properties to flowfile attributes + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + attributes["sample_property_one"] = str(self.sample_property_one) + attributes["sample_property_two"] = str(self.sample_property_two) + attributes["sample_property_three"] = str(self.sample_property_three) + + return FlowFileTransformResult(relationship="success", + attributes=attributes, + contents=json.dumps(output_contents)) diff --git a/nifi/user_scripts/processors/record_decompress_cerner_blob.py b/nifi/user_scripts/processors/record_decompress_cerner_blob.py new file mode 100644 index 000000000..1b93487d7 --- /dev/null +++ b/nifi/user_scripts/processors/record_decompress_cerner_blob.py @@ -0,0 +1,160 @@ +import base64 +import json +import os +import sys + +try: + from nifi.user_scripts.utils.codecs.cerner_blob import DecompressLzwCernerBlob +except ModuleNotFoundError: + # Fallback for direct script execution when PYTHONPATH does not include repository root. + script_dir = os.path.dirname(os.path.abspath(__file__)) + repo_root = os.path.abspath(os.path.join(script_dir, "..", "..", "..")) + if repo_root not in sys.path: + sys.path.insert(0, repo_root) + from nifi.user_scripts.utils.codecs.cerner_blob import DecompressLzwCernerBlob + +BINARY_FIELD_NAME = "binarydoc" +OUTPUT_TEXT_FIELD_NAME = "text" +DOCUMENT_ID_FIELD_NAME = "id" +INPUT_CHARSET = "utf-8" +OUTPUT_CHARSET = "utf-8" +OUTPUT_MODE = "base64" +BINARY_FIELD_SOURCE_ENCODING = "base64" +BLOB_SEQUENCE_ORDER_FIELD_NAME = "blob_sequence_num" +OPERATION_MODE = "base64" + +for arg in sys.argv: + _arg = arg.split("=", 1) + if _arg[0] == "binary_field_name": + BINARY_FIELD_NAME = _arg[1] + elif _arg[0] == "output_text_field_name": + OUTPUT_TEXT_FIELD_NAME = _arg[1] + elif _arg[0] == "document_id_field_name": + DOCUMENT_ID_FIELD_NAME = _arg[1] + elif _arg[0] == "input_charset": + INPUT_CHARSET = _arg[1] + elif _arg[0] == "output_charset": + OUTPUT_CHARSET = _arg[1] + elif _arg[0] == "output_mode": + OUTPUT_MODE = _arg[1] + elif _arg[0] == "binary_field_source_encoding": + BINARY_FIELD_SOURCE_ENCODING = _arg[1] + elif _arg[0] == "blob_sequence_order_field_name": + BLOB_SEQUENCE_ORDER_FIELD_NAME = _arg[1] + elif _arg[0] == "operation_mode": + OPERATION_MODE = _arg[1] + + +def load_json_records(input_raw_bytes): + try: + return json.loads(input_raw_bytes.decode()) + except (UnicodeDecodeError, json.JSONDecodeError): + try: + return json.loads(input_raw_bytes.decode(INPUT_CHARSET)) + except (UnicodeDecodeError, json.JSONDecodeError): + try: + return json.loads(input_raw_bytes.decode("windows-1252")) + except (UnicodeDecodeError, json.JSONDecodeError) as exc: + raise ValueError( + "Error decoding JSON after trying utf-8, " + + INPUT_CHARSET + + ", and windows-1252" + ) from exc + + +def decode_blob_part(value, blob_part): + if BINARY_FIELD_SOURCE_ENCODING == "base64": + if not isinstance(value, str): + raise ValueError( + f"Expected base64 string in {BINARY_FIELD_NAME} for part {blob_part}, got {type(value)}" + ) + + try: + return base64.b64decode(value, validate=True) + except Exception as exc: + raise ValueError(f"Error decoding base64 blob part {blob_part}: {exc}") from exc + + if isinstance(value, str): + return value.encode(INPUT_CHARSET) + + if isinstance(value, list) and all(isinstance(v, int) and 0 <= v <= 255 for v in value): + return bytes(value) + + if isinstance(value, (bytes, bytearray)): + return bytes(value) + + raise ValueError( + f"Expected bytes-like data in {BINARY_FIELD_NAME} for part {blob_part}, got {type(value)}" + ) + + +records = load_json_records(sys.stdin.buffer.read()) + +if isinstance(records, dict): + records = [records] + +if not records: + raise ValueError("No records found in JSON input") + +# keep the same sanity check as the extension: one flowfile should carry one document +doc_ids = {str(record.get(DOCUMENT_ID_FIELD_NAME, "")) for record in records} +if len(doc_ids) > 1: + raise ValueError(f"Multiple document IDs in one FlowFile: {list(doc_ids)}") + +concatenated_blob_sequence_order = {} +output_merged_record = {} + +have_any_sequence = any(BLOB_SEQUENCE_ORDER_FIELD_NAME in record for record in records) +have_any_no_sequence = any(BLOB_SEQUENCE_ORDER_FIELD_NAME not in record for record in records) + +if have_any_sequence and have_any_no_sequence: + raise ValueError( + f"Mixed records: some have '{BLOB_SEQUENCE_ORDER_FIELD_NAME}', some don't. " + "Cannot safely reconstruct blob stream." + ) + +for record in records: + if BINARY_FIELD_NAME not in record or record[BINARY_FIELD_NAME] in (None, ""): + raise ValueError(f"Missing '{BINARY_FIELD_NAME}' in a record") + + if have_any_sequence: + sequence_number = int(record[BLOB_SEQUENCE_ORDER_FIELD_NAME]) + if sequence_number in concatenated_blob_sequence_order: + raise ValueError(f"Duplicate {BLOB_SEQUENCE_ORDER_FIELD_NAME}: {sequence_number}") + concatenated_blob_sequence_order[sequence_number] = record[BINARY_FIELD_NAME] + else: + sequence_number = len(concatenated_blob_sequence_order) + concatenated_blob_sequence_order[sequence_number] = record[BINARY_FIELD_NAME] + +# copy all non-binary fields from the first input record +for k, v in records[0].items(): + if k != BINARY_FIELD_NAME and k not in output_merged_record: + output_merged_record[k] = v + +full_compressed_blob = bytearray() +blob_sequence_keys = sorted(concatenated_blob_sequence_order.keys()) + +for i in range(1, len(blob_sequence_keys)): + if blob_sequence_keys[i] != blob_sequence_keys[i - 1] + 1: + raise ValueError( + f"Sequence gap: missing {blob_sequence_keys[i - 1] + 1} " + f"(have {blob_sequence_keys[i - 1]} then {blob_sequence_keys[i]})" + ) + +for blob_part in blob_sequence_keys: + full_compressed_blob.extend( + decode_blob_part(concatenated_blob_sequence_order[blob_part], blob_part) + ) + +decompress_blob = DecompressLzwCernerBlob() +decompress_blob.decompress(full_compressed_blob) +decompressed_blob = bytes(decompress_blob.output_stream) + +if OUTPUT_MODE == "base64": + output_merged_record[BINARY_FIELD_NAME] = base64.b64encode(decompressed_blob).decode(OUTPUT_CHARSET) +elif OUTPUT_MODE == "raw": + output_merged_record[BINARY_FIELD_NAME] = decompressed_blob.decode(OUTPUT_CHARSET, errors="replace") +else: + raise ValueError(f"Unsupported output_mode: {OUTPUT_MODE}") + +sys.stdout.buffer.write(json.dumps([output_merged_record], ensure_ascii=False).encode("utf-8")) diff --git a/nifi/user_scripts/utils/nifi/base_nifi_processor.py b/nifi/user_scripts/utils/nifi/base_nifi_processor.py index f29478cf1..d5915bff9 100644 --- a/nifi/user_scripts/utils/nifi/base_nifi_processor.py +++ b/nifi/user_scripts/utils/nifi/base_nifi_processor.py @@ -219,11 +219,11 @@ def onScheduled(self, context: ProcessContext) -> None: """ pass - def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: """ Process a FlowFile and return a FlowFileTransformResult. - Subclasses must override this method to implement processor logic. + Subclasses should implement this method with processor logic. Args: context: The NiFi ProcessContext for this invocation. @@ -233,3 +233,30 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr NotImplementedError: Always, until overridden by a subclass. """ raise NotImplementedError + + def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult: + """ + NiFi entrypoint. Calls process() and builds a failure result on exceptions. + + Args: + context: The NiFi ProcessContext for this invocation. + flowFile: The FlowFile being processed. + """ + self.process_context = context + + try: + self.set_properties(context.getProperties()) + result = self.process(context, flowFile) + if not isinstance(result, FlowFileTransformResult): + raise TypeError( + f"{self.__class__.__name__}.process() must return FlowFileTransformResult, " + f"got {type(result).__name__}" + ) + return result + except Exception as exception: + self.logger.error("Exception during flowfile processing", exc_info=True) + return self.build_failure_result( + flowFile, + exception, + include_flowfile_attributes=True, + ) diff --git a/scripts/smoke_nifi_services.sh b/scripts/tests/smoke_nifi_services.sh similarity index 79% rename from scripts/smoke_nifi_services.sh rename to scripts/tests/smoke_nifi_services.sh index 0d2c79853..5835dbe65 100755 --- a/scripts/smoke_nifi_services.sh +++ b/scripts/tests/smoke_nifi_services.sh @@ -1,9 +1,9 @@ #!/usr/bin/env bash -# Smoke checks for NiFi, NiFi Registry, and the nginx reverse proxy. +# Smoke checks for NiFi and the nginx reverse proxy. set -euo pipefail -ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" ENV_LOADER="${ROOT_DIR}/deploy/export_env_vars.sh" if [[ -f "$ENV_LOADER" ]]; then @@ -18,7 +18,6 @@ fi HOST="${NIFI_SMOKE_HOST:-localhost}" NIFI_PORT="${NIFI_EXTERNAL_PORT_NGINX:-8443}" -REGISTRY_PORT="${NIFI_REGISTRY_EXTERNAL_PORT_NGINX:-18443}" ALLOWED_CODES=(200 301 302 303 307 308 401 403) @@ -45,5 +44,4 @@ check_url() { } check_url "nifi" "https://${HOST}:${NIFI_PORT}/nifi/" -check_url "nifi-registry" "https://${HOST}:${REGISTRY_PORT}/nifi-registry/" check_url "nifi-nginx" "https://${HOST}:${NIFI_PORT}/" diff --git a/services/nginx/config/nginx.conf.template b/services/nginx/config/nginx.conf.template index 8b13fe656..a046069a9 100644 --- a/services/nginx/config/nginx.conf.template +++ b/services/nginx/config/nginx.conf.template @@ -39,10 +39,6 @@ http { upstream nifi { server nifi:8443; } - - upstream nifi-registry { - server nifi-registry:18443; - } include /etc/nginx/sites-enabled/*.conf; @@ -81,60 +77,6 @@ http { # return 301 https://$host$request_uri; #} - server { - listen 18443 ssl; - server_name nginx.local; - - ssl_certificate /certificates/nifi/nifi.pem; - ssl_certificate_key /certificates/nifi/nifi.key; - ssl_client_certificate /certificates/root/root-ca.pem; - ssl_trusted_certificate /certificates/root/root-ca.pem; - - location / { - proxy_ssl_certificate /certificates/nifi/nifi.pem; - proxy_ssl_certificate_key /certificates/nifi/nifi.key; - proxy_ssl_trusted_certificate /certificates/root/root-ca.pem; - proxy_set_header Host nifi-registry; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-ProxyHost $host; - proxy_set_header X-ProxyPort 18443; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header X-ProxyScheme $scheme; - - proxy_pass https://nifi-registry; - } - - location ^~ /nifi-registry/ { - proxy_ssl_certificate /certificates/nifi/nifi.pem; - proxy_ssl_certificate_key /certificates/nifi/nifi.key; - proxy_ssl_trusted_certificate /certificates/root/root-ca.pem; - proxy_set_header Host nifi-registry; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-ProxyHost $host; - proxy_set_header X-ProxyPort 18443; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header X-ProxyScheme $scheme; - - proxy_pass https://nifi-registry/nifi-registry/; - } - - location ^~ /nifi-registry-api/ { - proxy_ssl_certificate /certificates/nifi/nifi.pem; - proxy_ssl_certificate_key /certificates/nifi/nifi.key; - proxy_ssl_trusted_certificate /certificates/root/root-ca.pem; - - proxy_set_header Host nifi-registry; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-ProxyHost $host; - proxy_set_header X-ProxyPort 18443; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header X-ProxyScheme $scheme; - - proxy_pass https://nifi-registry/nifi-registry-api/; - } - - } - server { listen 8443 ssl; server_name nginx.local;