diff --git a/.github/workflows/smoke-nifi-services.yml b/.github/workflows/smoke-nifi-services.yml
index 782a7c8dd..f59664aff 100644
--- a/.github/workflows/smoke-nifi-services.yml
+++ b/.github/workflows/smoke-nifi-services.yml
@@ -33,18 +33,16 @@ jobs:
- name: Start NiFi services (build)
run: |
set -euo pipefail
- source deploy/export_env_vars.sh
- set -euo pipefail
- docker compose -f deploy/services.dev.yml up -d --build nifi nifi-nginx nifi-registry-flow
+ make -C deploy start-nifi-dev-build
- name: Smoke tests
run: |
set -euo pipefail
- echo "Running smoke checks against NiFi, NiFi Registry, and nginx."
+ echo "Running smoke checks against NiFi and nginx."
retries=30
delay=15
for attempt in $(seq 1 $retries); do
- if ./scripts/smoke_nifi_services.sh; then
+ if ./scripts/tests/smoke_nifi_services.sh; then
exit 0
fi
echo "Attempt ${attempt}/${retries} failed. Sleeping ${delay}s..."
@@ -60,7 +58,7 @@ jobs:
source deploy/export_env_vars.sh
set -euo pipefail
docker compose -f deploy/services.dev.yml ps
- docker compose -f deploy/services.dev.yml logs --no-color nifi nifi-nginx nifi-registry-flow
+ docker compose -f deploy/services.dev.yml logs --no-color nifi nifi-nginx
- name: Shutdown stack
if: always()
diff --git a/deploy/Makefile b/deploy/Makefile
index 367f1fe43..e6a57ac95 100644
--- a/deploy/Makefile
+++ b/deploy/Makefile
@@ -25,17 +25,23 @@ load-env:
show-env:
${WITH_ENV} >/dev/null 2>&1; printenv | sort
+
+fix-nifi-registry-perms:
+ $(WITH_ENV) SKIP_EXPORT_ENV=1 ../nifi/fix_nifi_registry_perms.sh $(COMPOSE_FILE)
# start services
start-nifi:
- $(WITH_ENV) docker compose -f services.yml $(DC_START_CMD) nifi nifi-nginx nifi-registry-flow
+ $(WITH_ENV) SKIP_EXPORT_ENV=1 ../nifi/fix_nifi_registry_perms.sh services.yml; \
+ docker compose -f services.yml $(DC_START_CMD) nifi nifi-nginx
start-nifi-dev:
- $(WITH_ENV) docker compose -f services.dev.yml $(DC_START_CMD) nifi nifi-nginx nifi-registry-flow
+ $(WITH_ENV) SKIP_EXPORT_ENV=1 ../nifi/fix_nifi_registry_perms.sh services.dev.yml; \
+ docker compose -f services.dev.yml $(DC_START_CMD) nifi nifi-nginx
start-nifi-dev-build:
- $(WITH_ENV) docker compose -f services.dev.yml up -d --build nifi nifi-nginx nifi-registry-flow
+ $(WITH_ENV) SKIP_EXPORT_ENV=1 ../nifi/fix_nifi_registry_perms.sh services.dev.yml; \
+ docker compose -f services.dev.yml up -d --build nifi nifi-nginx
start-elastic:
$(WITH_ENV) docker compose -f services.yml $(DC_START_CMD) elasticsearch-1 elasticsearch-2 kibana
@@ -101,16 +107,38 @@ start-data-infra: start-nifi start-elastic start-samples
start-all: start-data-infra start-jupyter start-medcat-service start-ocr-services
-.PHONY: start-all start-data-infra start-nifi start-nifi-dev start-nifi-dev-build start-elastic start-samples start-jupyter
+.PHONY: start-all start-data-infra start-nifi start-nifi-dev start-nifi-dev-build start-elastic start-samples start-jupyter fix-nifi-registry-perms
# stop services
#
stop-nifi:
- $(WITH_ENV) docker compose -f services.yml $(DC_STOP_CMD) nifi nifi-nginx nifi-registry-flow
+ $(WITH_ENV) docker compose -f services.yml $(DC_STOP_CMD) nifi nifi-nginx
stop-nifi-dev:
- $(WITH_ENV) docker compose -f services.dev.yml $(DC_STOP_CMD) nifi nifi-nginx nifi-registry-flow
+ $(WITH_ENV) docker compose -f services.dev.yml $(DC_STOP_CMD) nifi nifi-nginx
+
+delete-nifi-containers:
+ $(WITH_ENV) docker compose -f services.yml rm -f -s nifi nifi-nginx
+
+delete-nifi-dev-containers:
+ $(WITH_ENV) docker compose -f services.dev.yml rm -f -s nifi nifi-nginx
+
+delete-nifi-images:
+ $(WITH_ENV) images="$$(docker compose -f services.yml config --images nifi nifi-nginx | sort -u)"; \
+ if [ -n "$$images" ]; then \
+ docker image rm -f $$images; \
+ else \
+ echo "No NiFi images found in services.yml"; \
+ fi
+
+delete-nifi-dev-images:
+ $(WITH_ENV) images="$$(docker compose -f services.dev.yml config --images nifi nifi-nginx | sort -u)"; \
+ if [ -n "$$images" ]; then \
+ docker image rm -f $$images; \
+ else \
+ echo "No NiFi images found in services.dev.yml"; \
+ fi
stop-elastic:
$(WITH_ENV) docker compose -f services.yml $(DC_STOP_CMD) elasticsearch-1 elasticsearch-2 kibana
@@ -173,7 +201,7 @@ stop-data-infra: stop-nifi stop-elastic stop-samples
stop-all: stop-data-infra stop-jupyter stop-medcat-service stop-ocr-services
-.PHONY: stop-data-infra stop-nifi stop-nifi-dev stop-elastic stop-samples stop-jupyter
+.PHONY: stop-data-infra stop-nifi stop-nifi-dev delete-nifi-containers delete-nifi-dev-containers delete-nifi-images delete-nifi-dev-images stop-elastic stop-samples stop-jupyter
# cleanup
diff --git a/deploy/elasticsearch.env b/deploy/elasticsearch.env
index 9b3616f55..0c7519e06 100644
--- a/deploy/elasticsearch.env
+++ b/deploy/elasticsearch.env
@@ -7,7 +7,7 @@
ELASTICSEARCH_VERSION=opensearch
# possible values :
-# - elasticsearch : docker.elastic.co/elasticsearch/elasticsearch:8.18.2
+# - elasticsearch : docker.elastic.co/elasticsearch/elasticsearch:8.19.11
# - elasticsearch (custom cogstack image) : cogstacksystems/cogstack-elasticsearch:latest
# - opensearch : opensearchproject/opensearch:3.4.0
# the custom cogstack image is always based on the last image of ES native
@@ -89,6 +89,8 @@ ELASTICSEARCH_SECURITY_DIR=../security/certificates/elastic/
# MEMORY CONFIG
ELASTICSEARCH_JAVA_OPTS="-Xms512m -Xmx512m -Des.failure_store_feature_flag_enabled=true"
+ES_JAVA_OPTS=$ELASTICSEARCH_JAVA_OPTS
+OPENSEARCH_JAVA_OPTS=$ELASTICSEARCH_JAVA_OPTS
ELASTICSEARCH_DOCKER_CPU_MIN=1
ELASTICSEARCH_DOCKER_CPU_MAX=1
@@ -163,7 +165,7 @@ KIBANA_VERSION=opensearch-dashboards
KIBANA_CONFIG_FILE_VERSION=opensearch_dashboards
# possible values:
-# - elasticsearch : docker.elastic.co/kibana/kibana:8.18.2
+# - elasticsearch : docker.elastic.co/kibana/kibana:8.19.11
# - elasticsearch (custom cogstack image) : cogstacksystems/cogstack-kibana:latest
# - opensearch : opensearchproject/opensearch-dashboards:3.4.0
# the custom cogstack image is always based on the last image of ES native
@@ -205,7 +207,7 @@ ELASTICSEARCH_XPACK_SECURITY_REPORTING_ENCRYPTION_KEY="e0Y1gTxHWOopIWMTtpjQsDS6K
######################################################################### METRICBEAT Env vars ##########################################################################
-METRICBEAT_IMAGE="docker.elastic.co/beats/metricbeat:8.18.2"
+METRICBEAT_IMAGE="docker.elastic.co/beats/metricbeat:8.19.11"
METRICBEAT_DOCKER_SHM=512m
METRICBEAT_DOCKER_CPU_MIN=1
@@ -222,7 +224,7 @@ FILEBEAT_STARTUP_COMMAND="-e --strict.perms=false"
FILEBEAT_HOST="https://elasticsearch-1:9200"
-FILEBEAT_IMAGE="docker.elastic.co/beats/filebeat:8.18.2"
+FILEBEAT_IMAGE="docker.elastic.co/beats/filebeat:8.19.11"
FILEBEAT_DOCKER_SHM=512m
diff --git a/deploy/export_env_vars.sh b/deploy/export_env_vars.sh
index 2ee8a95cf..c68b4f1b3 100755
--- a/deploy/export_env_vars.sh
+++ b/deploy/export_env_vars.sh
@@ -41,22 +41,6 @@ env_files=(
"$SERVICES_DIR/cogstack-nlp/medcat-service/env/medcat.env"
)
-LINT_SCRIPT="$SCRIPT_DIR/../nifi/user_scripts/utils/lint_env.py"
-
-if [ -e "$LINT_SCRIPT" ]; then
- chmod +x $LINT_SCRIPT
-fi
-
-if [ -x "$LINT_SCRIPT" ]; then
- echo "🔍 Validating env files..."
- if ! python3 "$LINT_SCRIPT" "${env_files[@]}"; then
- echo "❌ Env validation failed. Fix the errors above before continuing."
- exit 1
- fi
-else
- echo "⚠️ Skipping env validation; $LINT_SCRIPT not found or not executable."
-fi
-
for env_file in "${env_files[@]}"; do
if [ -f "$env_file" ]; then
echo "✅ Sourcing $env_file"
diff --git a/deploy/network_settings.env b/deploy/network_settings.env
index ee2357969..5ae69fcbb 100644
--- a/deploy/network_settings.env
+++ b/deploy/network_settings.env
@@ -10,7 +10,6 @@ ELASTICSEARCH_2_HOST_NAME="test-2:0.0.0.0"
ELASTICSEARCH_3_HOST_NAME="test-3:0.0.0.0"
KIBANA_HOST_NAME="test-4:0.0.0.0"
NIFI_HOST_NAME="test-5:0.0.0.0"
-NIFI_REGISTRY_HOST_NAME="test-6:0.0.0.0"
# general network settings
HTTPS_PROXY=""
@@ -18,4 +17,4 @@ HTTP_PROXY=""
NO_PROXY=""
no_proxy=""
http_proxy=""
-https_proxy=""
\ No newline at end of file
+https_proxy=""
diff --git a/deploy/nifi.env b/deploy/nifi.env
index c9d007d7a..ddc5fa713 100644
--- a/deploy/nifi.env
+++ b/deploy/nifi.env
@@ -9,27 +9,20 @@ NIFI_JVM_HEAP_MAX=1g
NIFI_DOCKER_SHM_SIZE=1g
-NIFI_DOCKER_REGISTRY_SHM_SIZE=1g
NIFI_DOCKER_CPU_MIN=1
NIFI_DOCKER_CPU_MAX=1
NIFI_DOCKER_RAM=1g
-NIFI_REGISTRY_DOCKER_CPU_MIN=1
-NIFI_REGISTRY_DOCKER_CPU_MAX=1
-NIFI_REGISTRY_DOCKER_RAM=1g
-
NIFI_DOCKER_LOG_SIZE_PER_FILE="250m"
NIFI_DOCKER_LOG_NUM_FILES=10
##############################################################################################################################
NIFI_VERSION="2.7.2"
-NIFI_REGISTRY_VERSION=$NIFI_VERSION
-# NiFi/NiFi Registry Docker image
+# NiFi Docker image
NIFI_DOCKER_IMAGE="cogstacksystems/cogstack-nifi:latest"
-NIFI_REGISTRY_DOCKER_IMAGE="apache/nifi-registry:${NIFI_REGISTRY_VERSION:-2.7.2}"
##############################################################################################################################
@@ -41,6 +34,9 @@ NIFI_DATA_PATH="../data/"
NIFI_TOOLKIT_VERSION=$NIFI_VERSION
+# this is to mount medcat models (optional)
+NIFI_MEDCAT_SERVICE_MODEL_PRODUCTION_PATH="../services/cogstack-nlp/medcat-service/models/"
+
#### Port and network settings
NIFI_WEB_PROXY_CONTEXT_PATH="/nifi"
@@ -92,20 +88,3 @@ NIFI_TRUSTSTORE_TYPE=JKS
# this is from ./security/certificates_nifi.env, NIFI_SUBJ_LINE_CERTIFICATE_CN
NIFI_INITIAL_ADMIN_IDENTITY="cogstack"
-
-##############################################################################################################################
-# NIFI REGISTRY FLOW SECTION
-##############################################################################################################################
-NIFI_REGISTRY_DB_DIR=/opt/nifi-registry/nifi-registry-current/database
-NIFI_REGISTRY_FLOW_PROVIDER=file
-NIFI_REGISTRY_FLOW_STORAGE_DIR=/opt/nifi-registry/nifi-registry-current/flow_storage
-
-NIFI_REGISTRY_FLOW_OUTPUT_PORT=8083
-NIFI_REGISTRY_FLOW_INPUT_PORT=18443
-
-NIFI_REGISTRY_EXTERNAL_PORT_NGINX=18443
-NIFI_REGISTRY_INTERNAL_PORT_NGINX=18443
-
-NIFI_REGISTRY_KEYSTORE_PATH="/security/certificates/nifi/nifi-keystore.jks"
-NIFI_REGISTRY_TRUSTSTORE_PATH="/security/certificates/nifi/nifi-truststore.jks"
-
diff --git a/deploy/services.dev.yml b/deploy/services.dev.yml
index e1970b449..d89b9169b 100644
--- a/deploy/services.dev.yml
+++ b/deploy/services.dev.yml
@@ -41,7 +41,6 @@ x-common-hosts: &common-hosts
- ${ELASTICSEARCH_3_HOST_NAME:-test-3:0.0.0.0}
- ${KIBANA_HOST_NAME:-test-4:0.0.0.0}
- ${NIFI_HOST_NAME:-test-5:0.0.0.0}
- - ${NIFI_REGISTRY_HOST_NAME:-test-6:0.0.0.0}
x-common-ulimits: &common-ulimits
ulimits:
@@ -63,11 +62,11 @@ x-nifi-common: &nifi-common
x-nifi-volumes: &nifi-volumes
# Drivers
- - ../nifi/drivers:/opt/nifi/drivers
+ - ../nifi/drivers:/opt/nifi/drivers:ro
# User overrides bundled in the image
- ../nifi/user_scripts:/opt/nifi/user_scripts:rw
- - ../nifi/user_schemas:/opt/nifi/user_schemas:rw
+ - ../nifi/user_schemas:/opt/nifi/user_schemas:ro
# Python processors (NiFi 2.x)
- ../nifi/user_python_extensions:/opt/nifi/nifi-current/python_extensions:rw
@@ -84,9 +83,6 @@ x-nifi-volumes: &nifi-volumes
# Ingest data directory
- ./${NIFI_DATA_PATH:-../data/}:/data/:rw
- # DB schemas
- - ../services/cogstack-db/:/opt/cogstack-db/:rw
-
# MedCAT models
- ./${RES_MEDCAT_SERVICE_MODEL_PRODUCTION_PATH:-../services/cogstack-nlp/medcat-service/models/}:/opt/models:rw
@@ -101,19 +97,6 @@ x-nifi-volumes: &nifi-volumes
# Flowfile error output
- nifi-vol-errors:/opt/nifi/pipeline/flowfile-errors
-x-nifi-registry-volumes: &nifi-registry-volumes
- # Registry configuration
- - ../nifi/nifi-registry/:/opt/nifi-registry/nifi-registry-current/conf/:rw
-
- # Security certificates
- - ../security:/security:ro
-
- # Registry persistence
- - nifi-registry-vol-database:/opt/nifi-registry/nifi-registry-current/database
- - nifi-registry-vol-flow-storage:/opt/nifi-registry/nifi-registry-current/flow_storage
- - nifi-registry-vol-work:/opt/nifi-registry/nifi-registry-current/work
- - nifi-registry-vol-logs:/opt/nifi-registry/nifi-registry-current/logs
-
#---------------------------------------------------------------------------#
# Used services #
#---------------------------------------------------------------------------#
@@ -127,21 +110,10 @@ services:
build:
context: ..
dockerfile: nifi/Dockerfile
- args:
- HTTP_PROXY: $HTTP_PROXY
- HTTPS_PROXY: $HTTPS_PROXY
- no_proxy: $no_proxy
container_name: cogstack-nifi
hostname: nifi
shm_size: ${NIFI_DOCKER_SHM_SIZE:-"1g"}
environment:
- - USER_ID=${NIFI_UID:-1000}
- - GROUP_ID=${NIFI_GID:-1000}
- - NIFI_WEB_PROXY_HOST=${NIFI_WEB_PROXY_HOST:-"localhost:8443"}
- - NIFI_WEB_PROXY_CONTEXT_PATH=${NIFI_WEB_PROXY_CONTEXT_PATH:-"/nifi"}
- - NIFI_INTERNAL_PORT=${NIFI_INTERNAL_PORT:-8443}
- - NIFI_OUTPUT_PORT=${NIFI_OUTPUT_PORT:-8082}
- - NIFI_INPUT_SOCKET_PORT=${NIFI_INPUT_SOCKET_PORT:-10000}
- PYTHONPATH=${NIFI_PYTHONPATH:-/opt/nifi/nifi-current/python/framework}
- JVM_OPTS="${NIFI_JVM_OPTS:--XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+ParallelRefProcEnabled -Djava.security.egd=file:/dev/./urandom}"
deploy:
@@ -163,52 +135,6 @@ services:
- "${NIFI_OUTPUT_PORT:-8082}:${NIFI_INTERNAL_PORT:-8443}"
- "${NIFI_INPUT_SOCKET_PORT:-10000}"
logging: *nifi-logging-common
-
- nifi-registry-flow:
- <<: *nifi-common
- image: ${NIFI_REGISTRY_DOCKER_IMAGE:-apache/nifi-registry:${NIFI_REGISTRY_VERSION:-latest}}
- hostname: nifi-registry
- container_name: cogstack-nifi-registry-flow
- shm_size: ${NIFI_DOCKER_REGISTRY_SHM_SIZE:-1g}
- user: root
- environment:
- - http_proxy=$HTTP_PROXY
- - https_proxy=$HTTPS_PROXY
- - no_proxy=$no_proxy
- - USER_ID=${NIFI_UID:-1000}
- - GROUP_ID=${NIFI_GID:-1000}
- - KEYSTORE_PATH=${NIFI_REGISTRY_KEYSTORE_PATH:-/security/certificates/nifi/nifi-keystore.jks}
- - KEYSTORE_TYPE=${NIFI_KEYSTORE_TYPE:-jks}
- - KEYSTORE_PASSWORD=${NIFI_KEYSTORE_PASSWORD:-"cogstackNifi"}
- - TRUSTSTORE_PASSWORD=${NIFI_TRUSTSTORE_PASSWORD:-"cogstackNifi"}
- - TRUSTSTORE_PATH=${NIFI_REGISTRY_TRUSTSTORE_PATH:-/security/certificates/nifi/nifi-truststore.jks}
-
- - TRUSTSTORE_TYPE=${NIFI_TRUSTSTORE_TYPE:-jks}
- - INITIAL_ADMIN_IDENTITY=${NIFI_INITIAL_ADMIN_IDENTITY:-"cogstack"}
- - AUTH=${NIFI_AUTH:-"tls"}
- - NIFI_REGISTRY_DB_DIR=${NIFI_REGISTRY_DB_DIR:-/opt/nifi-registry/nifi-registry-current/database}
- #- NIFI_REGISTRY_FLOW_PROVIDER=${NIFI_REGISTRY_FLOW_PROVIDER:-file}
- - NIFI_REGISTRY_FLOW_STORAGE_DIR=${NIFI_REGISTRY_FLOW_STORAGE_DIR:-/opt/nifi-registry/nifi-registry-current/flow_storage}
- deploy:
- resources:
- limits:
- cpus: "${NIFI_REGISTRY_DOCKER_CPU_MAX}"
- memory: "${NIFI_REGISTRY_DOCKER_RAM}"
- reservations:
- cpus: "${NIFI_REGISTRY_DOCKER_CPU_MIN}"
- memory: "${NIFI_REGISTRY_DOCKER_RAM}"
- volumes: *nifi-registry-volumes
- extra_hosts: *common-hosts
- tty: true
- ports:
- - "${NIFI_REGISTRY_FLOW_OUTPUT_PORT:-8083}:${NIFI_REGISTRY_FLOW_INPUT_PORT:-18443}"
-
- entrypoint: bash -c "chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/database && \
- chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/flow_storage && \
- chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/work && \
- chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/logs && \
- bash /opt/nifi-registry/scripts/start.sh"
- logging: *nifi-logging-common
nifi-nginx:
build:
@@ -232,7 +158,6 @@ services:
- ../security/certificates:/certificates:ro
ports:
- "${NIFI_EXTERNAL_PORT_NGINX:-8443}:${NIFI_INTERNAL_PORT_NGINX:-8443}"
- - "${NIFI_REGISTRY_EXTERNAL_PORT_NGINX:-18443}:${NIFI_REGISTRY_INTERNAL_PORT_NGINX:-18443}"
networks:
- cognet
extra_hosts: *common-hosts
@@ -264,15 +189,6 @@ volumes:
nifi-vol-errors:
driver: local
- nifi-registry-vol-database:
- driver: local
- nifi-registry-vol-flow-storage:
- driver: local
- nifi-registry-vol-work:
- driver: local
- nifi-registry-vol-logs:
- driver: local
-
#---------------------------------------------------------------------------#
# Docker networks. #
#---------------------------------------------------------------------------#
diff --git a/deploy/services.yml b/deploy/services.yml
index a0901553c..d0b16037f 100644
--- a/deploy/services.yml
+++ b/deploy/services.yml
@@ -52,7 +52,6 @@ x-common-hosts: &common-hosts
- ${ELASTICSEARCH_3_HOST_NAME:-test-3:0.0.0.0}
- ${KIBANA_HOST_NAME:-test-4:0.0.0.0}
- ${NIFI_HOST_NAME:-test-5:0.0.0.0}
- - ${NIFI_REGISTRY_HOST_NAME:-test-6:0.0.0.0}
x-common-ulimits: &common-ulimits
ulimits:
@@ -74,11 +73,11 @@ x-nifi-common: &nifi-common
x-nifi-volumes: &nifi-volumes
# Drivers
- - ../nifi/drivers:/opt/nifi/drivers
+ - ../nifi/drivers:/opt/nifi/drivers:ro
# User overrides bundled in the image
- ../nifi/user_scripts:/opt/nifi/user_scripts:rw
- - ../nifi/user_schemas:/opt/nifi/user_schemas:rw
+ - ../nifi/user_schemas:/opt/nifi/user_schemas:ro
# Python processors (NiFi 2.x)
- ../nifi/user_python_extensions:/opt/nifi/nifi-current/python_extensions:rw
@@ -95,11 +94,8 @@ x-nifi-volumes: &nifi-volumes
# Ingest data directory
- ./${NIFI_DATA_PATH:-../data/}:/data/:rw
- # DB schemas
- - ../services/cogstack-db/:/opt/cogstack-db/:rw
-
# MedCAT models
- - ./${RES_MEDCAT_SERVICE_MODEL_PRODUCTION_PATH:-../services/cogstack-nlp/medcat-service/models/}:/opt/models:rw
+ - ./${NIFI_MEDCAT_SERVICE_MODEL_PRODUCTION_PATH:-../services/cogstack-nlp/medcat-service/models/}:/opt/models:rw
# NiFi repositories/state
- nifi-vol-logs:/opt/nifi/nifi-current/logs
@@ -112,19 +108,6 @@ x-nifi-volumes: &nifi-volumes
# Flowfile error output
- nifi-vol-errors:/opt/nifi/pipeline/flowfile-errors
-x-nifi-registry-volumes: &nifi-registry-volumes
- # Registry configuration
- - ../nifi/nifi-registry/:/opt/nifi-registry/nifi-registry-current/conf/:rw
-
- # Security certificates
- - ../security:/security:ro
-
- # Registry persistence
- - nifi-registry-vol-database:/opt/nifi-registry/nifi-registry-current/database
- - nifi-registry-vol-flow-storage:/opt/nifi-registry/nifi-registry-current/flow_storage
- - nifi-registry-vol-work:/opt/nifi-registry/nifi-registry-current/work
- - nifi-registry-vol-logs:/opt/nifi-registry/nifi-registry-current/logs
-
x-db-common: &db-common
<<: *common-ulimits
shm_size: ${DATABASE_DOCKER_SHM_SIZE:-"1g"}
@@ -145,7 +128,6 @@ x-es-common-volumes: &es-common-volumes
- ../services/elasticsearch/config/log4j2_${ELASTICSEARCH_VERSION:-opensearch}.properties:/usr/share/${ELASTICSEARCH_VERSION:-opensearch}/config/log4j2.properties:ro
# Shared root CA + admin certs
- ../security/certificates/elastic/${ELASTICSEARCH_VERSION:-opensearch}/elastic-stack-ca.crt.pem:/usr/share/${ELASTICSEARCH_VERSION:-opensearch}/config/root-ca.crt:ro
- - ../security/certificates/elastic/${ELASTICSEARCH_VERSION:-opensearch}/elastic-stack-ca.key.pem:/usr/share/${ELASTICSEARCH_VERSION:-opensearch}/config/root-ca.key:ro
# OPENSEARCH specific (always mounted even if unused)
- ../security/certificates/elastic/opensearch/admin.crt:/usr/share/${ELASTICSEARCH_VERSION:-opensearch}/config/admin.crt:ro
- ../security/certificates/elastic/opensearch/admin.key.pem:/usr/share/${ELASTICSEARCH_VERSION:-opensearch}/config/admin.key.pem:ro
@@ -171,9 +153,6 @@ x-es-common: &es-common
networks:
- cognet
extra_hosts: *common-hosts
- environment:
- ES_JAVA_OPTS: ${ELASTICSEARCH_JAVA_OPTS:--Xms2048m -Xmx2048m -Des.failure_store_feature_flag_enabled=true}
- OPENSEARCH_JAVA_OPTS: ${ELASTICSEARCH_JAVA_OPTS:--Xms2048m -Xmx2048m -Des.failure_store_feature_flag_enabled=true}
logging: *es-logging-common
deploy:
resources:
@@ -204,7 +183,6 @@ x-metricbeat-common: &metricbeat-common
volumes:
- ../services/metricbeat/metricbeat.yml:/usr/share/metricbeat/metricbeat.yml:ro
- ../security/certificates/elastic/elasticsearch/elastic-stack-ca.crt.pem:/usr/share/metricbeat/root-ca.crt:ro
- - ../security/certificates/elastic/elasticsearch/elastic-stack-ca.key.pem:/usr/share/metricbeat/root-ca.key:ro
networks:
- cognet
extra_hosts: *common-hosts
@@ -219,11 +197,6 @@ x-filebeat-common: &filebeat-common
env_file:
- ./elasticsearch.env
- ../security/env/users_elasticsearch.env
- environment:
- - ELASTICSEARCH_HOSTS=${ELASTICSEARCH_HOSTS:-["https://elasticsearch-1:9200","https://elasticsearch-2:9200"]}
- - FILEBEAT_USER=${FILEBEAT_USER:-elastic}
- - FILEBEAT_PASSWORD=${FILEBEAT_PASSWORD:-kibanaserver}
- - KIBANA_HOST=${KIBANA_HOST:-"https://kibana:5601"}
deploy:
resources:
limits:
@@ -233,9 +206,8 @@ x-filebeat-common: &filebeat-common
cpus: "${FILEBEAT_DOCKER_CPU_MIN}"
memory: "${FILEBEAT_DOCKER_RAM}"
volumes:
- - ../services/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:rw
+ - ../services/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
- ../security/certificates/elastic/elasticsearch/elastic-stack-ca.crt.pem:/etc/pki/root/root-ca.crt:ro
- - ../security/certificates/elastic/elasticsearch/elastic-stack-ca.key.pem:/etc/pki/root/root-ca.key:ro
networks:
- cognet
extra_hosts: *common-hosts
@@ -251,7 +223,7 @@ services:
#---------------------------------------------------------------------------#
samples-db:
<<: *db-common
- image: postgres:17.7-alpine
+ image: postgres:18.1-trixie
container_name: cogstack-samples-db
platform: linux/amd64
environment:
@@ -264,7 +236,7 @@ services:
- ../services/pgsamples/schemas:/data/schemas:rw
- ../services/pgsamples/init_db.sh:/docker-entrypoint-initdb.d/init_db.sh:ro
# data persistence
- - samples-vol:/var/lib/postgresql/data
+ - samples-vol:/var/lib/postgresql
command: postgres -c "max_connections=${POSTGRES_DB_MAX_CONNECTIONS:-100}"
ports:
- 5554:5432
@@ -278,7 +250,7 @@ services:
#---------------------------------------------------------------------------#
cogstack-databank-db:
<<: *db-common
- image: postgres:17.7-alpine
+ image: postgres:18.1-trixie
container_name: cogstack-production-databank-db
platform: linux/amd64
environment:
@@ -290,7 +262,7 @@ services:
- ../services/cogstack-db/pgsql/schemas:/data/:ro
- ../services/cogstack-db/pgsql/init_db.sh:/docker-entrypoint-initdb.d/init_db.sh:ro
# data persistence
- - databank-vol:/var/lib/postgresql/data
+ - databank-vol:/var/lib/postgresql
command: postgres -c "max_connections=${POSTGRES_DB_MAX_CONNECTIONS:-100}"
ports:
- 5558:5432
@@ -298,15 +270,13 @@ services:
- 5432
networks:
- cognet
-
+
cogstack-databank-db-mssql:
<<: *db-common
image: mcr.microsoft.com/mssql/server:2019-latest
container_name: cogstack-production-databank-db-mssql
environment:
- ACCEPT_EULA=y
- - MSSQL_SA_USER=${MSSQL_SA_USER:-sa}
- - MSSQL_SA_PASSWORD=${MSSQL_SA_PASSWORD:-admin!COGSTACK2022}
volumes:
# mapping postgres data dump and initialization
- ../services/cogstack-db/mssql/schemas:/data/:ro
@@ -481,7 +451,6 @@ services:
# Security certificates, general
- ../security/certificates/elastic/${ELASTICSEARCH_VERSION:-opensearch}/elastic-stack-ca.crt.pem:/usr/share/${KIBANA_VERSION:-opensearch-dashboards}/config/root-ca.crt:ro
- - ../security/certificates/elastic/${ELASTICSEARCH_VERSION:-opensearch}/elastic-stack-ca.key.pem:/usr/share/${KIBANA_VERSION:-opensearch-dashboards}/config/root-ca.key:ro
- ../security/certificates/elastic/${ELASTICSEARCH_VERSION:-opensearch}/elastic-stack-ca.p12:/usr/share/${KIBANA_VERSION:-opensearch-dashboards}/config/root-ca.p12:ro
- ../security/certificates/elastic/${ELASTICSEARCH_VERSION:-opensearch}/elasticsearch/${ES_INSTANCE_NAME_1:-elasticsearch-1}/${ES_INSTANCE_NAME_1:-elasticsearch-1}.crt:/usr/share/${KIBANA_VERSION:-opensearch-dashboards}/config/esnode1.crt:ro
@@ -509,13 +478,6 @@ services:
hostname: nifi
shm_size: ${NIFI_DOCKER_SHM_SIZE:-"1g"}
environment:
- - USER_ID=${NIFI_UID:-1000}
- - GROUP_ID=${NIFI_GID:-1000}
- - NIFI_WEB_PROXY_HOST=${NIFI_WEB_PROXY_HOST:-"localhost:8443"}
- - NIFI_WEB_PROXY_CONTEXT_PATH=${NIFI_WEB_PROXY_CONTEXT_PATH:-"/nifi"}
- - NIFI_INTERNAL_PORT=${NIFI_INTERNAL_PORT:-8443}
- - NIFI_OUTPUT_PORT=${NIFI_OUTPUT_PORT:-8082}
- - NIFI_INPUT_SOCKET_PORT=${NIFI_INPUT_SOCKET_PORT:-10000}
- PYTHONPATH=${NIFI_PYTHONPATH:-/opt/nifi/nifi-current/python/framework}
- JVM_OPTS="${NIFI_JVM_OPTS:--XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+ParallelRefProcEnabled -Djava.security.egd=file:/dev/./urandom}"
deploy:
@@ -537,53 +499,7 @@ services:
- "${NIFI_OUTPUT_PORT:-8082}:${NIFI_INTERNAL_PORT:-8443}"
- "${NIFI_INPUT_SOCKET_PORT:-10000}"
logging: *nifi-logging-common
-
- nifi-registry-flow:
- <<: *nifi-common
- image: ${NIFI_REGISTRY_DOCKER_IMAGE:-apache/nifi-registry:${NIFI_REGISTRY_VERSION:-latest}}
- hostname: nifi-registry
- container_name: cogstack-nifi-registry-flow
- shm_size: ${NIFI_DOCKER_REGISTRY_SHM_SIZE:-1g}
- user: root
- environment:
- - http_proxy=$HTTP_PROXY
- - https_proxy=$HTTPS_PROXY
- - no_proxy=$no_proxy
- - USER_ID=${NIFI_UID:-1000}
- - GROUP_ID=${NIFI_GID:-1000}
- - KEYSTORE_PATH=${NIFI_REGISTRY_KEYSTORE_PATH:-/security/certificates/nifi/nifi-keystore.jks}
- - KEYSTORE_TYPE=${NIFI_KEYSTORE_TYPE:-jks}
- - KEYSTORE_PASSWORD=${NIFI_KEYSTORE_PASSWORD:-"cogstackNifi"}
- - TRUSTSTORE_PASSWORD=${NIFI_TRUSTSTORE_PASSWORD:-"cogstackNifi"}
- - TRUSTSTORE_PATH=${NIFI_REGISTRY_TRUSTSTORE_PATH:-/security/certificates/nifi/nifi-truststore.jks}
-
- - TRUSTSTORE_TYPE=${NIFI_TRUSTSTORE_TYPE:-jks}
- - INITIAL_ADMIN_IDENTITY=${NIFI_INITIAL_ADMIN_IDENTITY:-"cogstack"}
- - AUTH=${NIFI_AUTH:-"tls"}
- - NIFI_REGISTRY_DB_DIR=${NIFI_REGISTRY_DB_DIR:-/opt/nifi-registry/nifi-registry-current/database}
- #- NIFI_REGISTRY_FLOW_PROVIDER=${NIFI_REGISTRY_FLOW_PROVIDER:-file}
- - NIFI_REGISTRY_FLOW_STORAGE_DIR=${NIFI_REGISTRY_FLOW_STORAGE_DIR:-/opt/nifi-registry/nifi-registry-current/flow_storage}
- deploy:
- resources:
- limits:
- cpus: "${NIFI_REGISTRY_DOCKER_CPU_MAX}"
- memory: "${NIFI_REGISTRY_DOCKER_RAM}"
- reservations:
- cpus: "${NIFI_REGISTRY_DOCKER_CPU_MIN}"
- memory: "${NIFI_REGISTRY_DOCKER_RAM}"
- volumes: *nifi-registry-volumes
- extra_hosts: *common-hosts
- tty: true
- ports:
- - "${NIFI_REGISTRY_FLOW_OUTPUT_PORT:-8083}:${NIFI_REGISTRY_FLOW_INPUT_PORT:-18443}"
-
- entrypoint: bash -c "chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/database && \
- chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/flow_storage && \
- chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/work && \
- chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/logs && \
- bash /opt/nifi-registry/scripts/start.sh"
- logging: *nifi-logging-common
-
+
nifi-nginx:
image: cogstacksystems/nifi-nginx:latest
container_name: cogstack-nifi-nginx
@@ -604,7 +520,6 @@ services:
- ../security/certificates:/certificates:ro
ports:
- "${NIFI_EXTERNAL_PORT_NGINX:-8443}:${NIFI_INTERNAL_PORT_NGINX:-8443}"
- - "${NIFI_REGISTRY_EXTERNAL_PORT_NGINX:-18443}:${NIFI_REGISTRY_INTERNAL_PORT_NGINX:-18443}"
networks:
- cognet
extra_hosts: *common-hosts
@@ -638,10 +553,6 @@ services:
image: gitea/gitea:1.23-rootless
shm_size: ${GITEA_DOCKER_SHM_SIZE:-"1g"}
restart: always
- environment:
- - http_proxy=$HTTP_PROXY
- - https_proxy=$HTTPS_PROXY
- - no_proxy=$no_proxy
deploy:
resources:
limits:
@@ -724,15 +635,6 @@ volumes:
nifi-vol-errors:
driver: local
- nifi-registry-vol-database:
- driver: local
- nifi-registry-vol-flow-storage:
- driver: local
- nifi-registry-vol-work:
- driver: local
- nifi-registry-vol-logs:
- driver: local
-
# Gitea
gitea-lib-vol:
driver: local
diff --git a/docs/deploy/deployment.md b/docs/deploy/deployment.md
index 1a0bd7440..65b5c5347 100644
--- a/docs/deploy/deployment.md
+++ b/docs/deploy/deployment.md
@@ -17,6 +17,14 @@ Make sure you have read the [Prerequisites](./main.md) section before proceeding
These variables configure NiFi, Elasticsearch/OpenSearch, Kibana, Jupyter, Metricbeat, the sample DB, etc.
+> **Important:** If you run `docker compose` directly (instead of `make`), first load the envs with:
+>
+> ```bash
+> source ./deploy/export_env_vars.sh
+> ```
+>
+> The Makefile targets already do this for you.
+
## 🧩 Modular service design (important)
This repository follows a **modular deployment model**:
diff --git a/docs/nifi/main.md b/docs/nifi/main.md
index 82d899b85..9c0b8b90b 100644
--- a/docs/nifi/main.md
+++ b/docs/nifi/main.md
@@ -140,6 +140,16 @@ You should check if the env vars have been set after running the script:
echo $NIFI_GID
```
+### NiFi Registry permissions helper
+
+If NiFi Registry fails to start due to permission issues on its persistent volumes, run the helper script once to fix ownership:
+
+ ```bash
+ ./nifi/fix_nifi_registry_perms.sh
+ ```
+
+This script runs the registry container as root only long enough to `chown` the registry `database`, `flow_storage`, `work`, and `logs` directories, then exits. Subsequent starts can run as the default non-root user.
+
If the above command prints some numbers then it means that the `export_env_vars.sh` script worked. Otherwise, if you don't see anything, or just blank lines, then you need to execute the following:
```bash
@@ -172,7 +182,7 @@ Then execute the `recreate_nifi_docker_image.sh` script located in the `./nifi`
bash recreate_nifi_docker_image.sh
```
-Remember that the above export script and/or command are only visible in the current shell, so every time you restart your shell terminal you must execute the `./deploy/export_env_vars.sh` so that the variables will be visible by docker at runtime, because it uses the GID/UID in the `services.yml` file , specifying in the service definition `user: "${USER_ID:-${NIFI_UID:-1000}}:${GROUP_ID:-${NIFI_GID:-1000}}"`.
+Remember that the above export script and/or command are only visible in the current shell, so every time you restart your shell terminal you must `source ./deploy/export_env_vars.sh` so the variables are visible to Docker at runtime. If you're using the `deploy/Makefile` targets, it handles this for you.
### `{bootstrap.conf}`
diff --git a/nifi/fix_nifi_registry_perms.sh b/nifi/fix_nifi_registry_perms.sh
new file mode 100755
index 000000000..7e7e8c27c
--- /dev/null
+++ b/nifi/fix_nifi_registry_perms.sh
@@ -0,0 +1,32 @@
+#!/usr/bin/env bash
+
+set -euo pipefail
+
+# Support being run from any directory.
+SCRIPT_SOURCE="${BASH_SOURCE[0]-$0}"
+SCRIPT_DIR="$(cd "$(dirname "$SCRIPT_SOURCE")" && pwd)"
+REPO_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
+
+COMPOSE_FILE="${1:-services.yml}"
+if [[ "$COMPOSE_FILE" != /* ]]; then
+ if [[ "$COMPOSE_FILE" == services*.yml ]]; then
+ COMPOSE_FILE="deploy/$COMPOSE_FILE"
+ fi
+ COMPOSE_PATH="$REPO_ROOT/$COMPOSE_FILE"
+else
+ COMPOSE_PATH="$COMPOSE_FILE"
+fi
+
+if [ ! -f "$COMPOSE_PATH" ]; then
+ echo "Compose file not found: $COMPOSE_PATH" >&2
+ exit 1
+fi
+
+if [ "${SKIP_EXPORT_ENV:-}" != "1" ]; then
+ set -a
+ source "$REPO_ROOT/deploy/export_env_vars.sh"
+ set +a
+fi
+
+docker compose -f "$COMPOSE_PATH" run --rm --no-deps --user root --entrypoint bash -T nifi-registry-flow \
+ -c 'chown -R nifi:nifi /opt/nifi-registry/nifi-registry-current/{database,flow_storage,work,logs}'
diff --git a/nifi/nifi-registry/authorizations.xml b/nifi/nifi-registry/authorizations.xml
deleted file mode 100644
index 34a92ca3b..000000000
--- a/nifi/nifi-registry/authorizations.xml
+++ /dev/null
@@ -1,284 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/nifi/nifi-registry/authorizers.xml b/nifi/nifi-registry/authorizers.xml
deleted file mode 100644
index 31555617f..000000000
--- a/nifi/nifi-registry/authorizers.xml
+++ /dev/null
@@ -1,334 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- file-user-group-provider
- org.apache.nifi.registry.security.authorization.file.FileUserGroupProvider
- ./conf/users.xml
-
- cogstack
- C=UK, ST=London, L=UK, O=cogstack, OU=cogstack, CN=cogstack
- admin
-
-
-
- file-access-policy-provider
- org.apache.nifi.registry.security.authorization.file.FileAccessPolicyProvider
- file-user-group-provider
- ./conf/authorizations.xml
- cogstack
- cogstack
- C=UK, ST=London, L=UK, O=cogstack, OU=cogstack, CN=cogstack
- admin
-
-
-
- managed-authorizer
- org.apache.nifi.registry.security.authorization.StandardManagedAuthorizer
- file-access-policy-provider
-
-
-
diff --git a/nifi/nifi-registry/bootstrap-aws.conf b/nifi/nifi-registry/bootstrap-aws.conf
deleted file mode 100644
index f624dec9b..000000000
--- a/nifi/nifi-registry/bootstrap-aws.conf
+++ /dev/null
@@ -1,27 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# AWS KMS Key ID is required to be configured for AWS KMS Sensitive Property Provider
-aws.kms.key.id=
-
-# NiFi uses the following properties when authentication to AWS when all values are provided.
-# NiFi uses the default AWS credentials provider chain when one or more or the following properties are blank
-# AWS SDK documentation describes the default credential retrieval order:
-# https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html#credentials-chain
-aws.access.key.id=
-aws.secret.access.key=
-aws.region=
diff --git a/nifi/nifi-registry/bootstrap-azure.conf b/nifi/nifi-registry/bootstrap-azure.conf
deleted file mode 100644
index 49e318eaa..000000000
--- a/nifi/nifi-registry/bootstrap-azure.conf
+++ /dev/null
@@ -1,24 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Key Identifier for Azure Key Vault Key Sensitive Property Provider
-azure.keyvault.key.id=
-# Encryption Algorithm for Azure Key Vault Key Sensitive Property Provider
-azure.keyvault.encryption.algorithm=
-
-# Vault URI for Azure Key Vault Secret Sensitive Property Provider
-azure.keyvault.uri=
diff --git a/nifi/nifi-registry/bootstrap-gcp.conf b/nifi/nifi-registry/bootstrap-gcp.conf
deleted file mode 100644
index 440dad236..000000000
--- a/nifi/nifi-registry/bootstrap-gcp.conf
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# These GCP KMS settings must all be configured in order to use the GCP KMS Sensitive Property Provider
-gcp.kms.project=
-gcp.kms.location=
-gcp.kms.keyring=
-gcp.kms.key=
\ No newline at end of file
diff --git a/nifi/nifi-registry/bootstrap-hashicorp-vault.conf b/nifi/nifi-registry/bootstrap-hashicorp-vault.conf
deleted file mode 100644
index bf6975b96..000000000
--- a/nifi/nifi-registry/bootstrap-hashicorp-vault.conf
+++ /dev/null
@@ -1,53 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# HTTP or HTTPS URI for HashiCorp Vault is required to enable the Sensitive Properties Provider
-vault.uri=
-
-# Transit Path is required to enable the Sensitive Properties Provider Protection Scheme 'hashicorp/vault/transit/{path}'
-vault.transit.path=
-
-# Key/Value Path is required to enable the Sensitive Properties Provider Protection Scheme 'hashicorp/vault/kv/{path}'
-vault.kv.path=
-# Key/Value Secrets Engine version may be 1 or 2, and defaults to 1
-# vault.kv.version=1
-
-# Token Authentication example properties
-# vault.authentication=TOKEN
-# vault.token=
-
-# Optional file supports authentication properties described in the Spring Vault Environment Configuration
-# https://docs.spring.io/spring-vault/docs/2.3.x/reference/html/#vault.core.environment-vault-configuration
-#
-# All authentication properties must be included in bootstrap-hashicorp-vault.conf when this property is not specified.
-# Properties in bootstrap-hashicorp-vault.conf take precedence when the same values are defined in both files.
-# Token Authentication is the default when the 'vault.authentication' property is not specified.
-vault.authentication.properties.file=
-
-# Optional Timeout properties
-vault.connection.timeout=5 secs
-vault.read.timeout=15 secs
-
-# Optional TLS properties
-vault.ssl.enabledCipherSuites=
-vault.ssl.enabledProtocols=
-vault.ssl.key-store=
-vault.ssl.key-store-type=
-vault.ssl.key-store-password=
-vault.ssl.trust-store=
-vault.ssl.trust-store-type=
-vault.ssl.trust-store-password=
diff --git a/nifi/nifi-registry/bootstrap.conf b/nifi/nifi-registry/bootstrap.conf
deleted file mode 100644
index f10c0fa5c..000000000
--- a/nifi/nifi-registry/bootstrap.conf
+++ /dev/null
@@ -1,50 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Java command to use when running nifi-registry
-java=java
-
-# Username to use when running nifi-registry. This value will be ignored on Windows.
-run.as=
-
-# Configure the working directory for launching the NiFi Registry process
-# If not specified, the working directory will fall back to using the NIFI_REGISTRY_HOME env variable
-# If the environment variable is not specified, the working directory will fall back to the parent of this file's parent
-working.dir=
-
-# Configure where nifi-registry's lib and conf directories live
-lib.dir=./lib
-conf.dir=./conf
-docs.dir=./docs
-
-# How long to wait after telling nifi-registry to shutdown before explicitly killing the Process
-graceful.shutdown.seconds=20
-
-# Disable JSR 199 so that we can use JSP's without running a JDK
-java.arg.1=-Dorg.apache.jasper.compiler.disablejsr199=true
-
-# JVM memory settings
-java.arg.2=-Xms512m
-java.arg.3=-Xmx512m
-
-# Enable Remote Debugging
-#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000
-
-# allowRestrictedHeaders is required for Cluster/Node communications to work properly
-java.arg.5=-Dsun.net.http.allowRestrictedHeaders=true
-java.arg.6=-Djava.protocol.handler.pkgs=sun.net.www.protocol
-
diff --git a/nifi/nifi-registry/groups.xml b/nifi/nifi-registry/groups.xml
deleted file mode 100644
index 005f58e20..000000000
--- a/nifi/nifi-registry/groups.xml
+++ /dev/null
@@ -1,4 +0,0 @@
-
-
-
-
\ No newline at end of file
diff --git a/nifi/nifi-registry/identity-providers.xml b/nifi/nifi-registry/identity-providers.xml
deleted file mode 100644
index 656d3ae5d..000000000
--- a/nifi/nifi-registry/identity-providers.xml
+++ /dev/null
@@ -1,111 +0,0 @@
-
-
-
-
-
-
-
-
-
-
- file-identity-provider
- org.apache.nifi.registry.security.identity.provider.FileIdentityProvider
- ./conf/users.xml
-
-
-
\ No newline at end of file
diff --git a/nifi/nifi-registry/logback.xml b/nifi/nifi-registry/logback.xml
deleted file mode 100644
index 3044c9366..000000000
--- a/nifi/nifi-registry/logback.xml
+++ /dev/null
@@ -1,133 +0,0 @@
-
-
-
-
- true
-
-
-
- ${org.apache.nifi.registry.bootstrap.config.log.dir}/nifi-registry-app.log
-
-
- ${org.apache.nifi.registry.bootstrap.config.log.dir}/nifi-registry-app_%d{yyyy-MM-dd_HH}.%i.log
-
- 100MB
-
- 30
-
- 3GB
-
- true
-
- true
-
- %date %level [%thread] %logger{40} %msg%n
-
-
-
-
- ${org.apache.nifi.registry.bootstrap.config.log.dir}/nifi-registry-bootstrap.log
-
-
- ${org.apache.nifi.registry.bootstrap.config.log.dir}/nifi-registry-bootstrap_%d.log
-
- 30
-
- 3GB
-
- true
-
-
- %date %level [%thread] %logger{40} %msg%n
-
-
-
-
- ${org.apache.nifi.registry.bootstrap.config.log.dir}/nifi-registry-event.log
-
-
- ${org.apache.nifi.registry.bootstrap.config.log.dir}/nifi-registry-event_%d.log
-
- 30
-
- 3GB
-
- true
-
-
- %date ## %msg%n
-
-
-
-
-
- %date %level [%thread] %logger{40} %msg%n
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/nifi/nifi-registry/nifi-registry.properties b/nifi/nifi-registry/nifi-registry.properties
deleted file mode 100644
index 40bff0249..000000000
--- a/nifi/nifi-registry/nifi-registry.properties
+++ /dev/null
@@ -1,127 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# web properties #
-nifi.registry.web.war.directory=./lib
-nifi.registry.web.http.host=
-nifi.registry.web.http.port=
-nifi.registry.web.https.host=nifi-registry
-nifi.registry.web.https.port=18443
-nifi.registry.web.jetty.http2.idle.timeout=300000
-nifi.registry.web.jetty.idle.timeout=300000
-nifi.registry.web.https.network.interface.default=
-nifi.registry.web.https.application.protocols=h2 http/1.1
-nifi.registry.web.jetty.working.directory=./work/jetty
-nifi.registry.web.jetty.threads=200
-nifi.registry.web.should.send.server.version=true
-nifi.registry.web.jetty.use.http2=false
-
-nifi.registry.web.context.path=/nifi-registry
-nifi.registry.web.proxy.context.path=/nifi-registry
-nifi.registry.web.proxy.host=localhost:18443,nifi-registry:18443,nifi-registry-flow:18443,cogstack-nifi-registry-flow:18443,cogstack-nifi-registry:18443,nginx.local:18443
-
-# security properties #
-nifi.registry.security.keystore=/security/certificates/nifi/nifi-keystore.jks
-nifi.registry.security.keystoreType=JKS
-nifi.registry.security.keystorePasswd=cogstackNifi
-nifi.registry.security.keyPasswd=cogstackNifi
-nifi.registry.security.truststore=/security/certificates/nifi/nifi-truststore.jks
-nifi.registry.security.truststoreType=JKS
-nifi.registry.security.truststorePasswd=cogstackNifi
-nifi.registry.security.needClientAuth=false
-nifi.registry.security.authorizers.configuration.file=./conf/authorizers.xml
-nifi.registry.security.authorizer=managed-authorizer
-nifi.registry.security.identity.providers.configuration.file=./conf/identity-providers.xml
-nifi.registry.security.identity.provider=
-
-# providers properties #
-nifi.registry.providers.configuration.file=./conf/providers.xml
-
-# registry alias properties #
-nifi.registry.registry.alias.configuration.file=./conf/registry-aliases.xml
-
-# extensions working dir #
-nifi.registry.extensions.working.directory=./work/extensions
-
-# legacy database properties, used to migrate data from original DB to new DB below
-# NOTE: Users upgrading from 0.1.0 should leave these populated, but new installs after 0.1.0 should leave these empty
-nifi.registry.db.directory=
-nifi.registry.db.url.append=
-
-# database properties
-nifi.registry.db.url=jdbc:h2:./database/nifi-registry-primary;AUTOCOMMIT=OFF;DB_CLOSE_ON_EXIT=FALSE;LOCK_MODE=3;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
-nifi.registry.db.driver.class=org.h2.Driver
-nifi.registry.db.driver.directory=/opt/nifi-registry/nifi-registry-current/database
-nifi.registry.db.username=nifireg
-nifi.registry.db.password=nifireg
-nifi.registry.db.maxConnections=5
-nifi.registry.db.sql.debug=false
-
-# extension directories #
-# Each property beginning with "nifi.registry.extension.dir." will be treated as location for an extension,
-# and a class loader will be created for each location, with the system class loader as the parent
-#
-#nifi.registry.extension.dir.1=/path/to/extension1
-#nifi.registry.extension.dir.2=/path/to/extension2
-
-nifi.registry.extension.dir.aws=./ext/aws/lib
-
-# Identity Mapping Properties #
-# These properties allow normalizing user identities such that identities coming from different identity providers
-# (certificates, LDAP, Kerberos) can be treated the same internally in NiFi. The following example demonstrates normalizing
-# DNs from certificates and principals from Kerberos into a common identity string:
-
-nifi.registry.security.identity.mapping.pattern.dn=^CN=(.*?),.*$
-nifi.registry.security.identity.mapping.value.dn=$1
-nifi.registry.security.identity.mapping.transform.dn=NONE
-
-# nifi.registry.security.identity.mapping.pattern.dn=^CN=(.*?), OU=(.*?), O=(.*?), L=(.*?), ST=(.*?), C=(.*?)$
-# nifi.registry.security.identity.mapping.value.dn=$1@$2
-# nifi.registry.security.identity.mapping.transform.dn=NONE
-
-# nifi.registry.security.identity.mapping.pattern.kerb=^(.*?)/instance@(.*?)$
-# nifi.registry.security.identity.mapping.value.kerb=$1@$2
-# nifi.registry.security.identity.mapping.transform.kerb=UPPER
-
-# Group Mapping Properties #
-# These properties allow normalizing group names coming from external sources like LDAP. The following example
-# lowercases any group name.
-#
-# nifi.registry.security.group.mapping.pattern.anygroup=^(.*)$
-# nifi.registry.security.group.mapping.value.anygroup=$1
-# nifi.registry.security.group.mapping.transform.anygroup=LOWER
-
-# User authentication
-nifi.registry.security.user.authorizer=managed-authorizer
-
-
-# kerberos properties #
-# nifi.registry.kerberos.krb5.file=
-# nifi.registry.kerberos.spnego.principal=
-# nifi.registry.kerberos.spnego.keytab.location=
-# nifi.registry.kerberos.spnego.authentication.expiration=12 hours
-
-# OIDC #
-# nifi.registry.security.user.oidc.discovery.url=
-# nifi.registry.security.user.oidc.connect.timeout=
-# nifi.registry.security.user.oidc.read.timeout=
-# nifi.registry.security.user.oidc.client.id=
-# nifi.registry.security.user.oidc.client.secret=
-# nifi.registry.security.user.oidc.preferred.jwsalgorithm=
-# nifi.registry.security.user.oidc.claim.groups=groups
-
-# revision management #
-# This feature should remain disabled until a future NiFi release that supports the revision API changes
-nifi.registry.revisions.enabled=false
diff --git a/nifi/nifi-registry/providers.xml b/nifi/nifi-registry/providers.xml
deleted file mode 100644
index 63ba86d46..000000000
--- a/nifi/nifi-registry/providers.xml
+++ /dev/null
@@ -1,91 +0,0 @@
-
-
-
-
-
- org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider
- /opt/nifi-registry/nifi-registry-current/flow_storage
-
-
-
-
-
-
-
-
-
- org.apache.nifi.registry.provider.extension.FileSystemBundlePersistenceProvider
- ./extension_bundles
-
-
-
-
diff --git a/nifi/nifi-registry/registry-aliases.xml b/nifi/nifi-registry/registry-aliases.xml
deleted file mode 100644
index 9bd1b2d85..000000000
--- a/nifi/nifi-registry/registry-aliases.xml
+++ /dev/null
@@ -1,23 +0,0 @@
-
-
-
-
-
\ No newline at end of file
diff --git a/nifi/nifi-registry/users.xml b/nifi/nifi-registry/users.xml
deleted file mode 100644
index d2488ff3a..000000000
--- a/nifi/nifi-registry/users.xml
+++ /dev/null
@@ -1,10 +0,0 @@
-
-
-
-
-
-
-
-
-
-
diff --git a/nifi/user_python_extensions/convert_avro_binary_field_to_base64.py b/nifi/user_python_extensions/convert_avro_binary_field_to_base64.py
index 17829a55e..fadee638e 100644
--- a/nifi/user_python_extensions/convert_avro_binary_field_to_base64.py
+++ b/nifi/user_python_extensions/convert_avro_binary_field_to_base64.py
@@ -2,7 +2,6 @@
import copy
import io
import json
-import traceback
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
@@ -14,7 +13,6 @@
StandardValidators,
)
from nifiapi.relationship import Relationship
-from overrides import overrides
from py4j.java_gateway import JavaObject, JVMView
from nifi.user_scripts.utils.nifi.base_nifi_processor import BaseNiFiProcessor
@@ -83,10 +81,9 @@ def __init__(self, jvm: JVMView):
self.descriptors: list[PropertyDescriptor] = self._properties
self.relationships: list[Relationship] = self._relationships
- @overrides
- def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
+ def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
"""
- Transforms an Avro flow file by converting a specified binary field to a base64-encoded string.
+ Processes an Avro flow file by converting a specified binary field to a base64-encoded string.
Args:
context (ProcessContext): The process context containing processor properties.
@@ -100,69 +97,64 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr
FlowFileTransformResult: The result containing the transformed flow file, updated attributes,
and relationship.
"""
- try:
- self.process_context = context
- self.set_properties(context.getProperties())
-
- # read avro record
- input_raw_bytes: bytes = flowFile.getContentsAsBytes()
- input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes)
- reader: DataFileReader = DataFileReader(input_byte_buffer, DatumReader())
-
- schema: Schema | None = reader.datum_reader.writers_schema
-
- # change the datatype of the binary field from bytes to string
- # (avoids headaches later on when converting avro to json)
- # because if we dont change the schema the native NiFi converter will convert bytes to an array of integers.
- output_schema = None
- if schema is not None and isinstance(schema, RecordSchema):
- schema_dict = copy.deepcopy(schema.to_json())
- for field in schema_dict["fields"]: # type: ignore
- self.logger.info(str(field))
- if field["name"] == self.binary_field_name:
- field["type"] = ["null", "string"]
- break
- output_schema = parse(json.dumps(schema_dict))
-
- # Write them to a binary avro stream
- output_byte_buffer = io.BytesIO()
- writer = DataFileWriter(output_byte_buffer, DatumWriter(), output_schema)
-
- for record in reader:
- if type(record) is dict:
- record_document_binary_data = record.get(str(self.binary_field_name), None)
-
- if record_document_binary_data is not None:
- if self.operation_mode == "base64":
- record_document_binary_data = base64.b64encode(record_document_binary_data).decode()
- else:
- self.logger.info("No binary data found in record, using empty content")
+ # read avro record
+ input_raw_bytes: bytes = flowFile.getContentsAsBytes()
+ input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes)
+ reader: DataFileReader = DataFileReader(input_byte_buffer, DatumReader())
+
+ schema: Schema | None = reader.datum_reader.writers_schema
+
+ # change the datatype of the binary field from bytes to string
+ # (avoids headaches later on when converting avro to json)
+ # because if we dont change the schema the native NiFi converter will convert bytes to an array of integers.
+ output_schema = None
+ if schema is not None and isinstance(schema, RecordSchema):
+ schema_dict = copy.deepcopy(schema.to_json())
+ for field in schema_dict["fields"]: # type: ignore
+ self.logger.info(str(field))
+ if field["name"] == self.binary_field_name:
+ field["type"] = ["null", "string"]
+ break
+ output_schema = parse(json.dumps(schema_dict))
+
+ # Write them to a binary avro stream
+ output_byte_buffer = io.BytesIO()
+ writer = DataFileWriter(output_byte_buffer, DatumWriter(), output_schema)
+
+ for record in reader:
+ if type(record) is dict:
+ record_document_binary_data = record.get(str(self.binary_field_name), None)
+
+ if record_document_binary_data is not None:
+ if self.operation_mode == "base64":
+ record_document_binary_data = base64.b64encode(record_document_binary_data).decode()
else:
- raise TypeError("Expected Avro record to be a dictionary, but got: " + str(type(record)))
-
- _tmp_record = {}
- _tmp_record[str(self.binary_field_name)] = record_document_binary_data
-
- for k, v in record.items():
- if k != str(self.binary_field_name):
- _tmp_record[k] = v
-
- writer.append(_tmp_record)
-
- input_byte_buffer.close()
- reader.close()
- writer.flush()
- output_byte_buffer.seek(0)
-
- attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
- attributes["document_id_field_name"] = str(self.document_id_field_name)
- attributes["binary_field"] = str(self.binary_field_name)
- attributes["operation_mode"] = str(self.operation_mode)
- attributes["mime.type"] = "application/avro-binary"
-
- return FlowFileTransformResult(relationship="success",
- attributes=attributes,
- contents=output_byte_buffer.getvalue())
- except Exception as exception:
- self.logger.error("Exception during Avro processing: " + traceback.format_exc())
- raise exception
\ No newline at end of file
+ self.logger.info("No binary data found in record, using empty content")
+ else:
+ raise TypeError("Expected Avro record to be a dictionary, but got: " + str(type(record)))
+
+ _tmp_record = {}
+ _tmp_record[str(self.binary_field_name)] = record_document_binary_data
+
+ for k, v in record.items():
+ if k != str(self.binary_field_name):
+ _tmp_record[k] = v
+
+ writer.append(_tmp_record)
+
+ input_byte_buffer.close()
+ reader.close()
+ writer.flush()
+ output_byte_buffer.seek(0)
+
+ attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
+ attributes["document_id_field_name"] = str(self.document_id_field_name)
+ attributes["binary_field"] = str(self.binary_field_name)
+ attributes["operation_mode"] = str(self.operation_mode)
+ attributes["mime.type"] = "application/avro-binary"
+
+ return FlowFileTransformResult(
+ relationship="success",
+ attributes=attributes,
+ contents=output_byte_buffer.getvalue(),
+ )
diff --git a/nifi/user_python_extensions/convert_json_record_schema.py b/nifi/user_python_extensions/convert_json_record_schema.py
index ac29e842e..ed313d8f4 100644
--- a/nifi/user_python_extensions/convert_json_record_schema.py
+++ b/nifi/user_python_extensions/convert_json_record_schema.py
@@ -1,12 +1,10 @@
import json
-import traceback
from collections import defaultdict
from typing import Any
from nifiapi.flowfiletransform import FlowFileTransformResult
from nifiapi.properties import ProcessContext, PropertyDescriptor, StandardValidators
from nifiapi.relationship import Relationship
-from overrides import overrides
from py4j.java_gateway import JavaObject, JVMView
from nifi.user_scripts.utils.nifi.base_nifi_processor import BaseNiFiProcessor
@@ -322,35 +320,29 @@ def map_record(self, record: dict, json_mapper_schema: dict) -> dict:
return new_record
- @overrides
- def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
+ def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
output_contents: list[dict[Any, Any]] = []
- try:
- self.process_context: ProcessContext = context
- self.set_properties(context.getProperties())
+ # read avro record
+ input_raw_bytes: bytes = flowFile.getContentsAsBytes()
+ records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8"))
- # read avro record
- input_raw_bytes: bytes = flowFile.getContentsAsBytes()
- records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8"))
+ if isinstance(records, dict):
+ records = [records]
- if isinstance(records, dict):
- records = [records]
+ json_mapper_schema: dict = {}
+ with open(self.json_mapper_schema_path) as file:
+ json_mapper_schema = json.load(file)
- json_mapper_schema: dict = {}
- with open(self.json_mapper_schema_path) as file:
- json_mapper_schema = json.load(file)
+ for record in records:
+ output_contents.append(self.map_record(record, json_mapper_schema))
- for record in records:
- output_contents.append(self.map_record(record, json_mapper_schema))
+ attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
+ attributes["json_mapper_schema_path"] = str(self.json_mapper_schema_path)
+ attributes["mime.type"] = "application/json"
- attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
- attributes["json_mapper_schema_path"] = str(self.json_mapper_schema_path)
- attributes["mime.type"] = "application/json"
-
- return FlowFileTransformResult(relationship="success",
- attributes=attributes,
- contents=json.dumps(output_contents).encode('utf-8'))
- except Exception as exception:
- self.logger.error("Exception during flowfile processing: " + traceback.format_exc())
- raise exception
\ No newline at end of file
+ return FlowFileTransformResult(
+ relationship="success",
+ attributes=attributes,
+ contents=json.dumps(output_contents).encode("utf-8"),
+ )
diff --git a/nifi/user_python_extensions/convert_json_to_attribute.py b/nifi/user_python_extensions/convert_json_to_attribute.py
index 231663be8..43a11bf7a 100644
--- a/nifi/user_python_extensions/convert_json_to_attribute.py
+++ b/nifi/user_python_extensions/convert_json_to_attribute.py
@@ -1,10 +1,8 @@
import json
import re
-import traceback
from nifiapi.flowfiletransform import FlowFileTransformResult
from nifiapi.properties import ProcessContext, PropertyDescriptor, StandardValidators
-from overrides import overrides
from py4j.java_gateway import JavaObject, JVMView
from nifi.user_scripts.utils.nifi.base_nifi_processor import BaseNiFiProcessor
@@ -44,46 +42,39 @@ def __init__(self, jvm: JVMView):
self.descriptors: list[PropertyDescriptor] = self._properties
- @overrides
- def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
+ def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
DIGITS = re.compile(r"^\d+$")
+
+ # read avro record
+ input_raw_bytes: bytes = flowFile.getContentsAsBytes()
+ text = (input_raw_bytes.decode("utf-8", errors="replace").strip() if input_raw_bytes else "[]")
+
try:
- self.process_context = context
- self.set_properties(context.getProperties())
-
- # read avro record
- input_raw_bytes: bytes = flowFile.getContentsAsBytes()
- text = (input_raw_bytes.decode("utf-8", errors="replace").strip() if input_raw_bytes else "[]")
-
- try:
- parsed = json.loads(text) if text else []
- except Exception:
- parsed = []
-
- records = parsed if isinstance(parsed, list) else parsed.get("records", [])
- if not isinstance(records, list):
- records = []
-
- ids = []
- for r in records:
- if not isinstance(r, dict):
- continue
- v = r.get(self.field_name)
- if v is None:
- continue
- s = str(v).strip()
- if DIGITS.match(s):
- ids.append(s)
-
- ids_csv = ",".join(ids)
- return FlowFileTransformResult(
- relationship="success",
- attributes={
- "ids_csv": ids_csv,
- "ids_count": str(len(ids)),
- "ids_len": str(len(ids_csv)),
- },
- )
- except Exception as exception:
- self.logger.error("Exception during Avro processing: " + traceback.format_exc())
- raise exception
\ No newline at end of file
+ parsed = json.loads(text) if text else []
+ except Exception:
+ parsed = []
+
+ records = parsed if isinstance(parsed, list) else parsed.get("records", [])
+ if not isinstance(records, list):
+ records = []
+
+ ids = []
+ for r in records:
+ if not isinstance(r, dict):
+ continue
+ v = r.get(self.field_name)
+ if v is None:
+ continue
+ s = str(v).strip()
+ if DIGITS.match(s):
+ ids.append(s)
+
+ ids_csv = ",".join(ids)
+ return FlowFileTransformResult(
+ relationship="success",
+ attributes={
+ "ids_csv": ids_csv,
+ "ids_count": str(len(ids)),
+ "ids_len": str(len(ids_csv)),
+ },
+ )
diff --git a/nifi/user_python_extensions/convert_record_parquet_to_json.py b/nifi/user_python_extensions/convert_record_parquet_to_json.py
index cde065aee..bb2f49d83 100644
--- a/nifi/user_python_extensions/convert_record_parquet_to_json.py
+++ b/nifi/user_python_extensions/convert_record_parquet_to_json.py
@@ -1,10 +1,8 @@
import io
import json
-import traceback
from nifiapi.flowfiletransform import FlowFileTransformResult
from nifiapi.properties import ProcessContext, PropertyDescriptor
-from overrides import overrides
from py4j.java_gateway import JavaObject, JVMView
from pyarrow import parquet
@@ -30,49 +28,43 @@ def __init__(self, jvm: JVMView):
self.descriptors: list[PropertyDescriptor] = self._properties
- @overrides
- def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
+ def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
"""
"""
- try:
- self.process_context = context
- self.set_properties(context.getProperties())
+ # read avro record
+ input_raw_bytes: bytes = flowFile.getContentsAsBytes()
+ input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes)
- # read avro record
- input_raw_bytes: bytes = flowFile.getContentsAsBytes()
- input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes)
+ parquet_file = parquet.ParquetFile(input_byte_buffer)
- parquet_file = parquet.ParquetFile(input_byte_buffer)
+ output_buffer: io.BytesIO = io.BytesIO()
+ record_count: int = 0
- output_buffer: io.BytesIO = io.BytesIO()
- record_count: int = 0
+ for batch in parquet_file.iter_batches(batch_size=10000):
+ records: list[dict] = batch.to_pylist()
- for batch in parquet_file.iter_batches(batch_size=10000):
- records: list[dict] = batch.to_pylist()
+ for record in records:
+ json_record = json.dumps(
+ record,
+ ensure_ascii=False,
+ separators=(",", ":"),
+ default=parquet_json_data_type_convert,
+ )
- for record in records:
- json_record = json.dumps(
- record,
- ensure_ascii=False,
- separators=(",", ":"),
- default=parquet_json_data_type_convert,
- )
+ output_buffer.write(json_record.encode("utf-8"))
+ output_buffer.write(b"\n")
+ record_count += len(records)
- output_buffer.write(json_record.encode("utf-8"))
- output_buffer.write(b"\n")
- record_count += len(records)
+ input_byte_buffer.close()
- input_byte_buffer.close()
+ attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
+ attributes["mime.type"] = "application/x-ndjson"
+ attributes["record.count"] = str(record_count)
- attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
- attributes["mime.type"] = "application/x-ndjson"
- attributes["record.count"] = str(record_count)
-
- return FlowFileTransformResult(relationship="success",
- attributes=attributes,
- contents=output_buffer.getvalue())
- except Exception as exception:
- self.logger.error("Exception during Avro processing: " + traceback.format_exc())
- raise exception
\ No newline at end of file
+ return FlowFileTransformResult(
+ relationship="success",
+ attributes=attributes,
+ contents=output_buffer.getvalue(),
+ )
diff --git a/nifi/user_python_extensions/parse_service_response.py b/nifi/user_python_extensions/parse_service_response.py
index e4eddf8c3..d8ea0a779 100644
--- a/nifi/user_python_extensions/parse_service_response.py
+++ b/nifi/user_python_extensions/parse_service_response.py
@@ -1,5 +1,4 @@
import json
-import traceback
from nifiapi.flowfiletransform import FlowFileTransformResult
from nifiapi.properties import (
@@ -8,7 +7,6 @@
StandardValidators,
)
from nifiapi.relationship import Relationship
-from overrides import overrides
from py4j.java_gateway import JavaObject, JVMView
from nifi.user_scripts.utils.nifi.base_nifi_processor import BaseNiFiProcessor
@@ -95,10 +93,9 @@ def __init__(self, jvm: JVMView):
self.descriptors: list[PropertyDescriptor] = self._properties
self.relationships: list[Relationship] = self._relationships
- @overrides
- def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
+ def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
"""
- Transforms the input FlowFile by parsing the service response and extracting relevant fields.
+ Processes the input FlowFile by parsing the service response and extracting relevant fields.
Args:
context (ProcessContext): The process context containing processor properties.
@@ -113,87 +110,83 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr
output_contents: list = []
- try:
- self.process_context: ProcessContext = context
- self.set_properties(context.getProperties())
+ # read avro record
+ input_raw_bytes: bytes = flowFile.getContentsAsBytes()
- # read avro record
- input_raw_bytes: bytes = flowFile.getContentsAsBytes()
+ records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8"))
- records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8"))
+ if isinstance(records, dict):
+ records = [records]
- if isinstance(records, dict):
- records = [records]
+ if self.service_message_type == "ocr":
+ for record in records:
+ result = record.get("result", {})
- if self.service_message_type == "ocr":
- for record in records:
- result = record.get("result", {})
+ _record = {}
+ _record["metadata"] = result.get("metadata", {})
+ _record[self.output_text_field_name] = result.get("text", "")
+ _record["success"] = result.get("success", False)
+ _record["timestamp"] = result.get("timestamp", None)
- _record = {}
- _record["metadata"] = result.get("metadata", {})
- _record[self.output_text_field_name] = result.get("text", "")
- _record["success"] = result.get("success", False)
- _record["timestamp"] = result.get("timestamp", None)
+ if "footer" in result:
+ for k, v in result["footer"].items():
+ _record[k] = v
- if "footer" in result:
- for k, v in result["footer"].items():
- _record[k] = v
+ output_contents.append(_record)
- output_contents.append(_record)
+ elif self.service_message_type == "medcat" and "result" in records[0]:
+ result = records[0].get("result", [])
+ medcat_info = records[0].get("medcat_info", {})
- elif self.service_message_type == "medcat" and "result" in records[0]:
- result = records[0].get("result", [])
- medcat_info = records[0].get("medcat_info", {})
+ if isinstance(result, dict):
+ result = [result]
- if isinstance(result, dict):
- result = [result]
+ for annotated_record in result:
+ annotations = annotated_record.get("annotations", [])
+ annotations = annotations[0] if len(annotations) > 0 else annotations
+ footer = annotated_record.get("footer", {})
- for annotated_record in result:
- annotations = annotated_record.get("annotations", [])
- annotations = annotations[0] if len(annotations) > 0 else annotations
- footer = annotated_record.get("footer", {})
+ if self.medcat_output_mode == "deid":
+ _output_annotated_record = {}
+ _output_annotated_record["service_model"] = medcat_info
+ _output_annotated_record["timestamp"] = annotated_record.get("timestamp", None)
+ _output_annotated_record[self.output_text_field_name] = annotated_record.get("text", "")
- if self.medcat_output_mode == "deid":
- _output_annotated_record = {}
- _output_annotated_record["service_model"] = medcat_info
- _output_annotated_record["timestamp"] = annotated_record.get("timestamp", None)
- _output_annotated_record[self.output_text_field_name] = annotated_record.get("text", "")
+ if self.medcat_deid_keep_annotations is True:
+ _output_annotated_record["annotations"] = annotations
+ else:
+ _output_annotated_record["annotations"] = {}
- if self.medcat_deid_keep_annotations is True:
- _output_annotated_record["annotations"] = annotations
- else:
- _output_annotated_record["annotations"] = {}
+ for k, v in footer.items():
+ _output_annotated_record[k] = v
+ output_contents.append(_output_annotated_record)
- for k, v in footer.items():
- _output_annotated_record[k] = v
- output_contents.append(_output_annotated_record)
+ else:
+ for annotation_id, annotation_data in annotations.items():
+ _output_annotated_record = {}
+ _output_annotated_record["service_model"] = medcat_info
+ _output_annotated_record["timestamp"] = annotated_record.get("timestamp", None)
- else:
- for annotation_id, annotation_data in annotations.items():
- _output_annotated_record = {}
- _output_annotated_record["service_model"] = medcat_info
- _output_annotated_record["timestamp"] = annotated_record.get("timestamp", None)
+ for k, v in annotation_data.items():
+ _output_annotated_record[k] = v
- for k, v in annotation_data.items():
- _output_annotated_record[k] = v
+ for k, v in footer.items():
+ _output_annotated_record[k] = v
- for k, v in footer.items():
- _output_annotated_record[k] = v
+ if self.document_id_field_name in footer:
+ _output_annotated_record["annotation_id"] = (
+ str(footer[self.document_id_field_name]) + "_" + str(annotation_id)
+ )
- if self.document_id_field_name in footer:
- _output_annotated_record["annotation_id"] = \
- str(footer[self.document_id_field_name]) + "_" + str(annotation_id)
+ output_contents.append(_output_annotated_record)
- output_contents.append(_output_annotated_record)
+ # add properties to flowfile attributes
+ attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
+ attributes["output_text_field_name"] = str(self.output_text_field_name)
+ attributes["mime.type"] = "application/json"
- # add properties to flowfile attributes
- attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
- attributes["output_text_field_name"] = str(self.output_text_field_name)
- attributes["mime.type"] = "application/json"
-
- return FlowFileTransformResult(relationship="success",
- attributes=attributes,
- contents=json.dumps(output_contents).encode('utf-8'))
- except Exception as exception:
- self.logger.error("Exception during flowfile processing: " + traceback.format_exc())
- raise exception
\ No newline at end of file
+ return FlowFileTransformResult(
+ relationship="success",
+ attributes=attributes,
+ contents=json.dumps(output_contents).encode("utf-8"),
+ )
diff --git a/nifi/user_python_extensions/prepare_record_for_nlp.py b/nifi/user_python_extensions/prepare_record_for_nlp.py
index 443de6b2b..40f7f70a7 100644
--- a/nifi/user_python_extensions/prepare_record_for_nlp.py
+++ b/nifi/user_python_extensions/prepare_record_for_nlp.py
@@ -1,6 +1,5 @@
import io
import json
-import traceback
from typing import Any
from avro.datafile import DataFileReader
@@ -11,7 +10,6 @@
PropertyDescriptor,
StandardValidators,
)
-from overrides import overrides
from py4j.java_gateway import JavaObject, JVMView
from nifi.user_scripts.utils.nifi.base_nifi_processor import BaseNiFiProcessor
@@ -55,8 +53,7 @@ def __init__(self, jvm: JVMView):
self.descriptors: list[PropertyDescriptor] = self._properties
- @overrides
- def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
+ def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
"""_summary_
Args:
@@ -73,49 +70,46 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr
output_contents: list = []
- try:
- self.process_context = context
- self.set_properties(context.getProperties())
+ self.process_flow_file_type = str(self.process_flow_file_type).lower()
- self.process_flow_file_type = str(self.process_flow_file_type).lower()
+ # read avro record
+ input_raw_bytes: bytes = flowFile.getContentsAsBytes()
+ input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes)
- # read avro record
- input_raw_bytes: bytes = flowFile.getContentsAsBytes()
- input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes)
+ reader: DataFileReader | (list[dict[str, Any]] | list[Any])
- reader: DataFileReader | (list[dict[str, Any]] | list[Any])
+ if self.process_flow_file_type == "avro":
+ reader = DataFileReader(input_byte_buffer, DatumReader())
+ else:
+ json_obj = json.loads(input_byte_buffer.read().decode("utf-8"))
+ reader = [json_obj] if isinstance(json_obj, dict) else json_obj if isinstance(json_obj, list) else []
- if self.process_flow_file_type == "avro":
- reader = DataFileReader(input_byte_buffer, DatumReader())
+ for record in reader:
+ if type(record) is dict:
+ record_document_text = record.get(str(self.document_text_field_name), "")
else:
- json_obj = json.loads(input_byte_buffer.read().decode("utf-8"))
- reader = [json_obj] if isinstance(json_obj, dict) else json_obj if isinstance(json_obj, list) else []
+ raise TypeError("Expected record to be a dictionary, but got: " + str(type(record)))
- for record in reader:
- if type(record) is dict:
- record_document_text = record.get(str(self.document_text_field_name), "")
- else:
- raise TypeError("Expected record to be a dictionary, but got: " + str(type(record)))
-
- output_contents.append({
+ output_contents.append(
+ {
"text": record_document_text,
- "footer": {k: v for k, v in record.items() if k != str(self.document_text_field_name)}
- })
+ "footer": {k: v for k, v in record.items() if k != str(self.document_text_field_name)},
+ }
+ )
- input_byte_buffer.close()
+ input_byte_buffer.close()
- if isinstance(reader, DataFileReader):
- reader.close()
+ if isinstance(reader, DataFileReader):
+ reader.close()
- attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
- attributes["document_id_field_name"] = str(self.document_id_field_name)
- attributes["mime.type"] = "application/json"
+ attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
+ attributes["document_id_field_name"] = str(self.document_id_field_name)
+ attributes["mime.type"] = "application/json"
- output_contents = output_contents[0] if len(output_contents) == 1 else output_contents
+ output_contents = output_contents[0] if len(output_contents) == 1 else output_contents
- return FlowFileTransformResult(relationship="success",
- attributes=attributes,
- contents=json.dumps({"content": output_contents}).encode("utf-8"))
- except Exception as exception:
- self.logger.error("Exception during flowfile processing: " + traceback.format_exc())
- raise exception
\ No newline at end of file
+ return FlowFileTransformResult(
+ relationship="success",
+ attributes=attributes,
+ contents=json.dumps({"content": output_contents}).encode("utf-8"),
+ )
diff --git a/nifi/user_python_extensions/prepare_record_for_ocr.py b/nifi/user_python_extensions/prepare_record_for_ocr.py
index e6c41e5ca..709c185d6 100644
--- a/nifi/user_python_extensions/prepare_record_for_ocr.py
+++ b/nifi/user_python_extensions/prepare_record_for_ocr.py
@@ -1,7 +1,6 @@
import base64
import io
import json
-import traceback
from typing import Any
from avro.datafile import DataFileReader
@@ -13,7 +12,6 @@
StandardValidators,
)
from nifiapi.relationship import Relationship
-from overrides import overrides
from py4j.java_gateway import JavaObject, JVMView
from nifi.user_scripts.utils.nifi.base_nifi_processor import BaseNiFiProcessor
@@ -78,68 +76,65 @@ def __init__(self, jvm: JVMView):
self.descriptors: list[PropertyDescriptor] = self._properties
self.relationships: list[Relationship] = self._relationships
- @overrides
- def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
+ def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
output_contents: list = []
- try:
- self.process_context = context
- self.set_properties(context.getProperties())
-
- self.process_flow_file_type = str(self.process_flow_file_type).lower()
-
- # read avro record
- input_raw_bytes: bytes = flowFile.getContentsAsBytes()
- input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes)
-
- reader: DataFileReader | (list[dict[str, Any]] | list[Any])
-
- if self.process_flow_file_type == "avro":
- reader = DataFileReader(input_byte_buffer, DatumReader())
- elif self.process_flow_file_type == "ndjson":
- json_lines = input_byte_buffer.read().decode("utf-8").splitlines()
- reader = [json.loads(line) for line in json_lines if line.strip()]
- else:
- json_obj = json.loads(input_byte_buffer.read().decode("utf-8"))
- reader = [json_obj] if isinstance(json_obj, dict) else json_obj if isinstance(json_obj, list) else []
-
- for record in reader:
- if type(record) is dict:
- record_document_binary_data = record.get(str(self.binary_field_name), None)
- if record_document_binary_data is not None:
- if self.operation_mode == "base64":
- record_document_binary_data = base64.b64encode(record_document_binary_data).decode()
- else:
- self.logger.info("No binary data found in record, using empty content")
+ self.process_flow_file_type = str(self.process_flow_file_type).lower()
+
+ # read avro record
+ input_raw_bytes: bytes = flowFile.getContentsAsBytes()
+ input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes)
+
+ reader: DataFileReader | (list[dict[str, Any]] | list[Any])
+
+ if self.process_flow_file_type == "avro":
+ reader = DataFileReader(input_byte_buffer, DatumReader())
+ elif self.process_flow_file_type == "ndjson":
+ json_lines = input_byte_buffer.read().decode("utf-8").splitlines()
+ reader = [json.loads(line) for line in json_lines if line.strip()]
+ else:
+ json_obj = json.loads(input_byte_buffer.read().decode("utf-8"))
+ reader = [json_obj] if isinstance(json_obj, dict) else json_obj if isinstance(json_obj, list) else []
+
+ for record in reader:
+ if type(record) is dict:
+ record_document_binary_data = record.get(str(self.binary_field_name), None)
+ if record_document_binary_data is not None:
+ if self.operation_mode == "base64":
+ record_document_binary_data = base64.b64encode(record_document_binary_data).decode()
else:
- raise TypeError("Expected record to be a dictionary, but got: " + str(type(record)))
+ self.logger.info("No binary data found in record, using empty content")
+ else:
+ raise TypeError("Expected record to be a dictionary, but got: " + str(type(record)))
- output_contents.append({
+ output_contents.append(
+ {
"binary_data": record_document_binary_data,
- "footer": {k: v for k, v in record.items() if k != str(self.binary_field_name)}
- })
+ "footer": {k: v for k, v in record.items() if k != str(self.binary_field_name)},
+ }
+ )
- input_byte_buffer.close()
+ input_byte_buffer.close()
- if isinstance(reader, DataFileReader):
- reader.close()
+ if isinstance(reader, DataFileReader):
+ reader.close()
- attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
- attributes["document_id_field_name"] = str(self.document_id_field_name)
- attributes["binary_field"] = str(self.binary_field_name)
- attributes["output_text_field_name"] = str(self.output_text_field_name)
- attributes["mime.type"] = "application/json"
+ attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
+ attributes["document_id_field_name"] = str(self.document_id_field_name)
+ attributes["binary_field"] = str(self.binary_field_name)
+ attributes["output_text_field_name"] = str(self.output_text_field_name)
+ attributes["mime.type"] = "application/json"
- if self.process_flow_file_type == "avro":
- return FlowFileTransformResult(relationship="success",
- attributes=attributes,
- contents=json.dumps(output_contents, cls=AvroJSONEncoder).encode("utf-8")
- )
- else:
- return FlowFileTransformResult(relationship="success",
- attributes=attributes,
- contents=json.dumps(output_contents).encode("utf-8"))
- except Exception as exception:
- self.logger.error("Exception during flowfile processing: " + traceback.format_exc())
- raise exception
+ if self.process_flow_file_type == "avro":
+ return FlowFileTransformResult(
+ relationship="success",
+ attributes=attributes,
+ contents=json.dumps(output_contents, cls=AvroJSONEncoder).encode("utf-8"),
+ )
+
+ return FlowFileTransformResult(
+ relationship="success",
+ attributes=attributes,
+ contents=json.dumps(output_contents).encode("utf-8"),
+ )
diff --git a/nifi/user_python_extensions/record_add_geolocation.py b/nifi/user_python_extensions/record_add_geolocation.py
index 2a027937f..dce48bbe5 100644
--- a/nifi/user_python_extensions/record_add_geolocation.py
+++ b/nifi/user_python_extensions/record_add_geolocation.py
@@ -11,7 +11,6 @@
PropertyDescriptor,
StandardValidators,
)
-from overrides import overrides
from py4j.java_gateway import JavaObject, JVMView
from nifi.user_scripts.utils.generic import download_file_from_url, safe_delete_paths
@@ -88,7 +87,6 @@ def __init__(self, jvm: JVMView):
self.descriptors: list[PropertyDescriptor] = self._properties
- @overrides
def onScheduled(self, context: ProcessContext) -> None:
""" Initializes processor resources when scheduled.
Args:
@@ -155,9 +153,8 @@ def _check_geolocation_lookup_datafile(self) -> bool:
return file_found
- @overrides
- def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
- """ Transforms the input FlowFile by adding geolocation data based on postcode lookup.
+ def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
+ """ Processes the input FlowFile by adding geolocation data based on postcode lookup.
Args:
context (ProcessContext): The process context.
flowFile (JavaObject): The input FlowFile to be transformed.
@@ -171,54 +168,45 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr
Use SplitRecord processor to split large files into smaller chunks before processing.
"""
- try:
- self.process_context: ProcessContext = context
- self.set_properties(context.getProperties())
-
- input_raw_bytes: bytes = flowFile.getContentsAsBytes()
-
- records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8"))
-
- valid_records: list[dict] = []
- error_records: list[dict] = []
-
- if isinstance(records, dict):
- records = [records]
-
- if self.postcode_lookup_index:
- for record in records:
- if self.postcode_field_name in record:
- _postcode = str(record[self.postcode_field_name]).replace(" ", "")
- _data_col_row_idx = self.postcode_lookup_index.get(_postcode, -1)
-
- if _data_col_row_idx != -1:
- _selected_row = self.loaded_csv_file_rows[_data_col_row_idx]
- _lat, _long = str(_selected_row[7]).strip(), str(_selected_row[8]).strip()
- try:
- record[self.geolocation_field_name] = {
- "lat": float(_lat),
- "lon": float(_long)
- }
- except ValueError:
- self.logger.debug(f"invalid lat/long values for postcode {_postcode}: {_lat}, {_long}")
- error_records.append(record)
- valid_records.append(record)
- else:
- raise FileNotFoundError("geolocation lookup datafile is not available and data was not loaded, " \
- "please check URLs")
-
- attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
- attributes["mime.type"] = "application/json"
-
- if error_records:
- attributes["record.count.errors"] = str(len(error_records))
- attributes["record.count"] = str(len(valid_records))
-
- return FlowFileTransformResult(
- relationship="success",
- attributes=attributes,
- contents=json.dumps(valid_records).encode("utf-8"),
+ input_raw_bytes: bytes = flowFile.getContentsAsBytes()
+
+ records: dict | list[dict] = json.loads(input_raw_bytes.decode("utf-8"))
+
+ valid_records: list[dict] = []
+ error_records: list[dict] = []
+
+ if isinstance(records, dict):
+ records = [records]
+
+ if self.postcode_lookup_index:
+ for record in records:
+ if self.postcode_field_name in record:
+ _postcode = str(record[self.postcode_field_name]).replace(" ", "")
+ _data_col_row_idx = self.postcode_lookup_index.get(_postcode, -1)
+
+ if _data_col_row_idx != -1:
+ _selected_row = self.loaded_csv_file_rows[_data_col_row_idx]
+ _lat, _long = str(_selected_row[7]).strip(), str(_selected_row[8]).strip()
+ try:
+ record[self.geolocation_field_name] = {"lat": float(_lat), "lon": float(_long)}
+ except ValueError:
+ self.logger.debug(f"invalid lat/long values for postcode {_postcode}: {_lat}, {_long}")
+ error_records.append(record)
+ valid_records.append(record)
+ else:
+ raise FileNotFoundError(
+ "geolocation lookup datafile is not available and data was not loaded, please check URLs"
)
- except Exception as exception:
- self.logger.error("Exception during flowfile processing:\n" + traceback.format_exc())
- return self.build_failure_result(flowFile, exception)
\ No newline at end of file
+
+ attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
+ attributes["mime.type"] = "application/json"
+
+ if error_records:
+ attributes["record.count.errors"] = str(len(error_records))
+ attributes["record.count"] = str(len(valid_records))
+
+ return FlowFileTransformResult(
+ relationship="success",
+ attributes=attributes,
+ contents=json.dumps(valid_records).encode("utf-8"),
+ )
diff --git a/nifi/user_python_extensions/record_decompress_cerner_blob.py b/nifi/user_python_extensions/record_decompress_cerner_blob.py
index 718a8a14b..a26917a2f 100644
--- a/nifi/user_python_extensions/record_decompress_cerner_blob.py
+++ b/nifi/user_python_extensions/record_decompress_cerner_blob.py
@@ -1,6 +1,5 @@
import base64
import json
-import traceback
from nifiapi.flowfiletransform import FlowFileTransformResult
from nifiapi.properties import (
@@ -8,7 +7,6 @@
PropertyDescriptor,
StandardValidators,
)
-from overrides import overrides
from py4j.java_gateway import JavaObject, JVMView
from nifi.user_scripts.utils.codecs.cerner_blob import DecompressLzwCernerBlob
@@ -95,8 +93,24 @@ def __init__(self, jvm: JVMView):
self.descriptors: list[PropertyDescriptor] = self._properties
- @overrides
- def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
+ def _load_json_records(self, input_raw_bytes: bytes | bytearray) -> list | dict:
+ try:
+ return json.loads(input_raw_bytes.decode())
+ except json.JSONDecodeError as exc:
+ self.logger.error(f"Error decoding JSON: {exc} \nAttempting to decode as {self.input_charset}")
+ try:
+ return json.loads(input_raw_bytes.decode(self.input_charset))
+ except json.JSONDecodeError as exc2:
+ self.logger.error(f"Error decoding JSON: {exc2} \nAttempting to decode as windows-1252")
+ try:
+ return json.loads(input_raw_bytes.decode("windows-1252"))
+ except json.JSONDecodeError as exc3:
+ raise ValueError(
+ "Error decoding JSON after trying utf-8, "
+ f"{self.input_charset}, and windows-1252: {exc3}"
+ ) from exc3
+
+ def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
"""
Transforms the input FlowFile by decompressing Cerner blob data from JSON records.
@@ -114,198 +128,117 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr
output_contents: list = []
attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
- try:
- self.process_context = context
- self.set_properties(context.getProperties())
+ # read avro record
+ input_raw_bytes: bytes | bytearray = flowFile.getContentsAsBytes()
- # read avro record
- input_raw_bytes: bytes | bytearray = flowFile.getContentsAsBytes()
+ records: list | dict = self._load_json_records(input_raw_bytes)
- records: list | dict = []
+ if not isinstance(records, list):
+ records = [records]
- try:
- records = json.loads(input_raw_bytes.decode())
- except json.JSONDecodeError as e:
- self.logger.error(f"Error decoding JSON: {str(e)} \nAttempting to decode as {self.input_charset}")
- try:
- records = json.loads(input_raw_bytes.decode(self.input_charset))
- except json.JSONDecodeError as e:
- self.logger.error(f"Error decoding JSON: {str(e)} \nAttempting to decode as windows-1252")
- try:
- records = json.loads(input_raw_bytes.decode("windows-1252"))
- except json.JSONDecodeError as e:
- return self.build_failure_result(
- flowFile,
- ValueError(f"Error decoding JSON: {str(e)} \n with windows-1252"),
- attributes=attributes,
- contents=input_raw_bytes,
- )
-
- if not isinstance(records, list):
- records = [records]
-
- if not records:
- return self.build_failure_result(
- flowFile,
- ValueError("No records found in JSON input"),
- attributes=attributes,
- contents=input_raw_bytes,
- )
+ if not records:
+ raise ValueError("No records found in JSON input")
- # sanity check: blobs are from the same document_id
- doc_ids: set = {str(r.get(self.document_id_field_name, "")) for r in records}
- if len(doc_ids) > 1:
- return self.build_failure_result(
- flowFile,
- ValueError(f"Multiple document IDs in one FlowFile: {list(doc_ids)}"),
- attributes=attributes,
- contents=input_raw_bytes,
- )
+ # sanity check: blobs are from the same document_id
+ doc_ids: set = {str(r.get(self.document_id_field_name, "")) for r in records}
+ if len(doc_ids) > 1:
+ raise ValueError(f"Multiple document IDs in one FlowFile: {list(doc_ids)}")
+
+ concatenated_blob_sequence_order: dict = {}
+ output_merged_record: dict = {}
+
+ have_any_sequence: bool = any(self.blob_sequence_order_field_name in record for record in records)
+ have_any_no_sequence: bool = any(self.blob_sequence_order_field_name not in record for record in records)
- concatenated_blob_sequence_order: dict = {}
- output_merged_record: dict = {}
-
- have_any_sequence: bool = any(self.blob_sequence_order_field_name in record for record in records)
- have_any_no_sequence: bool = any(self.blob_sequence_order_field_name not in record for record in records)
-
- if have_any_sequence and have_any_no_sequence:
- return self.build_failure_result(
- flowFile,
- ValueError(
- f"Mixed records: some have '{self.blob_sequence_order_field_name}', some don't. "
- "Cannot safely reconstruct blob stream."
- ),
- attributes=attributes,
- contents=input_raw_bytes,
+ if have_any_sequence and have_any_no_sequence:
+ raise ValueError(
+ f"Mixed records: some have '{self.blob_sequence_order_field_name}', some don't. "
+ "Cannot safely reconstruct blob stream."
+ )
+
+ for record in records:
+ if self.binary_field_name not in record or record[self.binary_field_name] in (None, ""):
+ raise ValueError(f"Missing '{self.binary_field_name}' in a record")
+
+ if have_any_sequence:
+ seq = int(record[self.blob_sequence_order_field_name])
+ if seq in concatenated_blob_sequence_order:
+ raise ValueError(f"Duplicate {self.blob_sequence_order_field_name}: {seq}")
+
+ concatenated_blob_sequence_order[seq] = record[self.binary_field_name]
+ else:
+ # no sequence anywhere: preserve record order (0..n-1)
+ seq = len(concatenated_blob_sequence_order)
+ concatenated_blob_sequence_order[seq] = record[self.binary_field_name]
+
+ # take fields from the first record, doesn't matter which one,
+ # as they are expected to be the same except for the binary data field
+ for k, v in records[0].items():
+ if k not in output_merged_record and k != self.binary_field_name:
+ output_merged_record[k] = v
+
+ full_compressed_blob = bytearray()
+
+ # double check to make sure there is no gap in the blob sequence, i.e missing blob.
+ order_of_blobs_keys = sorted(concatenated_blob_sequence_order.keys())
+ for i in range(1, len(order_of_blobs_keys)):
+ if order_of_blobs_keys[i] != order_of_blobs_keys[i-1] + 1:
+ raise ValueError(
+ f"Sequence gap: missing {order_of_blobs_keys[i-1] + 1} "
+ f"(have {order_of_blobs_keys[i-1]} then {order_of_blobs_keys[i]})"
)
- for record in records:
- if self.binary_field_name not in record or record[self.binary_field_name] in (None, ""):
- return self.build_failure_result(
- flowFile,
- ValueError(f"Missing '{self.binary_field_name}' in a record"),
- attributes=attributes,
- contents=input_raw_bytes,
- )
+ for k in order_of_blobs_keys:
+ v = concatenated_blob_sequence_order[k]
- if have_any_sequence:
- seq = int(record[self.blob_sequence_order_field_name])
- if seq in concatenated_blob_sequence_order:
- return self.build_failure_result(
- flowFile,
- ValueError(f"Duplicate {self.blob_sequence_order_field_name}: {seq}"),
- attributes=attributes,
- contents=input_raw_bytes,
- )
-
- concatenated_blob_sequence_order[seq] = record[self.binary_field_name]
- else:
- # no sequence anywhere: preserve record order (0..n-1)
- seq = len(concatenated_blob_sequence_order)
- concatenated_blob_sequence_order[seq] = record[self.binary_field_name]
-
- # take fields from the first record, doesn't matter which one,
- # as they are expected to be the same except for the binary data field
- for k, v in records[0].items():
- if k not in output_merged_record and k != self.binary_field_name:
- output_merged_record[k] = v
-
- full_compressed_blob = bytearray()
-
- # double check to make sure there is no gap in the blob sequence, i.e missing blob.
- order_of_blobs_keys = sorted(concatenated_blob_sequence_order.keys())
- for i in range(1, len(order_of_blobs_keys)):
- if order_of_blobs_keys[i] != order_of_blobs_keys[i-1] + 1:
- return self.build_failure_result(
- flowFile,
- ValueError(
- f"Sequence gap: missing {order_of_blobs_keys[i-1] + 1} "
- f"(have {order_of_blobs_keys[i-1]} then {order_of_blobs_keys[i]})"
- ),
- attributes=attributes,
- contents=input_raw_bytes,
- )
+ temporary_blob: bytes = b""
- for k in order_of_blobs_keys:
- v = concatenated_blob_sequence_order[k]
-
- temporary_blob: bytes = b""
-
- if self.binary_field_source_encoding == "base64":
- if not isinstance(v, str):
- return self.build_failure_result(
- flowFile,
- ValueError(
- f"Expected base64 string in {self.binary_field_name} for part {k}, got {type(v)}"
- ),
- attributes=attributes,
- contents=input_raw_bytes,
- )
- try:
- temporary_blob = base64.b64decode(v, validate=True)
- except Exception as e:
- return self.build_failure_result(
- flowFile,
- ValueError(f"Error decoding base64 blob part {k}: {e}"),
- attributes=attributes,
- contents=input_raw_bytes,
- )
+ if self.binary_field_source_encoding == "base64":
+ if not isinstance(v, str):
+ raise ValueError(
+ f"Expected base64 string in {self.binary_field_name} for part {k}, got {type(v)}"
+ )
+ try:
+ temporary_blob = base64.b64decode(v, validate=True)
+ except Exception as exc:
+ raise ValueError(f"Error decoding base64 blob part {k}: {exc}") from exc
+ else:
+ # raw bytes path
+ if isinstance(v, (bytes, bytearray)):
+ temporary_blob = v
else:
- # raw bytes path
- if isinstance(v, (bytes, bytearray)):
- temporary_blob = v
- else:
- return self.build_failure_result(
- flowFile,
- ValueError(
- f"Expected bytes in {self.binary_field_name} for part {k}, got {type(v)}"
- ),
- attributes=attributes,
- contents=input_raw_bytes,
- )
-
- full_compressed_blob.extend(temporary_blob)
-
- # build / add new attributes to dict before doing anything else to have some trace.
- attributes["document_id_field_name"] = str(self.document_id_field_name)
- attributes["document_id"] = str(output_merged_record.get(self.document_id_field_name, ""))
- attributes["binary_field"] = str(self.binary_field_name)
- attributes["output_text_field_name"] = str(self.output_text_field_name)
- attributes["mime.type"] = "application/json"
- attributes["blob_parts"] = str(len(order_of_blobs_keys))
- attributes["blob_seq_min"] = str(order_of_blobs_keys[0]) if order_of_blobs_keys else ""
- attributes["blob_seq_max"] = str(order_of_blobs_keys[-1]) if order_of_blobs_keys else ""
- attributes["compressed_len"] = str(len(full_compressed_blob))
- attributes["compressed_head_hex"] = bytes(full_compressed_blob[:16]).hex()
+ raise ValueError(
+ f"Expected bytes in {self.binary_field_name} for part {k}, got {type(v)}"
+ )
- try:
- decompress_blob = DecompressLzwCernerBlob()
- decompress_blob.decompress(full_compressed_blob)
- output_merged_record[self.binary_field_name] = bytes(decompress_blob.output_stream)
- except Exception as exception:
- return self.build_failure_result(
- flowFile,
- exception=exception,
- attributes=attributes,
- include_flowfile_attributes=False,
- contents=input_raw_bytes
- )
+ full_compressed_blob.extend(temporary_blob)
- if self.output_mode == "base64":
- output_merged_record[self.binary_field_name] = \
- base64.b64encode(output_merged_record[self.binary_field_name]).decode(self.output_charset)
+ # build / add new attributes to dict before doing anything else to have some trace.
+ attributes["document_id_field_name"] = str(self.document_id_field_name)
+ attributes["document_id"] = str(output_merged_record.get(self.document_id_field_name, ""))
+ attributes["binary_field"] = str(self.binary_field_name)
+ attributes["output_text_field_name"] = str(self.output_text_field_name)
+ attributes["mime.type"] = "application/json"
+ attributes["blob_parts"] = str(len(order_of_blobs_keys))
+ attributes["blob_seq_min"] = str(order_of_blobs_keys[0]) if order_of_blobs_keys else ""
+ attributes["blob_seq_max"] = str(order_of_blobs_keys[-1]) if order_of_blobs_keys else ""
+ attributes["compressed_len"] = str(len(full_compressed_blob))
+ attributes["compressed_head_hex"] = bytes(full_compressed_blob[:16]).hex()
- output_contents.append(output_merged_record)
+ try:
+ decompress_blob = DecompressLzwCernerBlob()
+ decompress_blob.decompress(full_compressed_blob)
+ output_merged_record[self.binary_field_name] = bytes(decompress_blob.output_stream)
- return FlowFileTransformResult(relationship=self.REL_SUCCESS,
- attributes=attributes,
- contents=json.dumps(output_contents).encode("utf-8"))
except Exception as exception:
- self.logger.error("Exception during flowfile processing: " + traceback.format_exc())
- return self.build_failure_result(
- flowFile,
- exception,
- attributes=attributes,
- contents=locals().get("input_raw_bytes", flowFile.getContentsAsBytes()),
- include_flowfile_attributes=False
- )
+ raise RuntimeError("Error decompressing Cerner LZW blob") from exception
+
+ if self.output_mode == "base64":
+ output_merged_record[self.binary_field_name] = \
+ base64.b64encode(output_merged_record[self.binary_field_name]).decode(self.output_charset)
+
+ output_contents.append(output_merged_record)
+
+ return FlowFileTransformResult(relationship=self.REL_SUCCESS,
+ attributes=attributes,
+ contents=json.dumps(output_contents).encode("utf-8"))
diff --git a/nifi/user_python_extensions/sample_processor.py b/nifi/user_python_extensions/sample_processor.py
index a92a05430..73320e635 100644
--- a/nifi/user_python_extensions/sample_processor.py
+++ b/nifi/user_python_extensions/sample_processor.py
@@ -1,6 +1,5 @@
import io
import json
-import traceback
from typing import Any
from avro.datafile import DataFileReader, DataFileWriter
@@ -92,7 +91,7 @@ def onScheduled(self, context: ProcessContext) -> None:
"""
pass
- def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
+ def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
"""
NOTE: This is a sample method meant to be overridden and reimplemented by subclasses.
@@ -115,48 +114,42 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr
output_contents: list[Any] = []
- try:
- self.process_context: ProcessContext = context
- self.set_properties(context.getProperties())
- # add properties to flowfile attributes
- attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
- self.logger.info("Successfully transformed Avro content for OCR")
-
- input_raw_bytes: bytes = flowFile.getContentsAsBytes()
-
- # read avro record
- self.logger.debug("Reading flowfile content as bytes")
- input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes)
- reader: DataFileReader = DataFileReader(input_byte_buffer, DatumReader())
-
- # below is an example of how to handle avro records, each record
- schema: Schema | None = reader.datum_reader.writers_schema
-
- for record in reader:
- #do stuff
- pass
-
- # streams need to be closed
- input_byte_buffer.close()
- reader.close()
-
- # Write them to a binary avro stre
- output_byte_buffer = io.BytesIO()
- writer = DataFileWriter(output_byte_buffer, DatumWriter(), schema)
-
- writer.flush()
- writer.close()
- output_byte_buffer.seek(0)
-
- # add properties to flowfile attributes
- attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
- attributes["sample_property_one"] = str(self.sample_property_one)
- attributes["sample_property_two"] = str(self.sample_property_two)
- attributes["sample_property_three"] = str(self.sample_property_three)
-
- return FlowFileTransformResult(relationship="success",
- attributes=attributes,
- contents=json.dumps(output_contents))
- except Exception as exception:
- self.logger.error("Exception during Avro processing: " + traceback.format_exc())
- raise exception
\ No newline at end of file
+ # add properties to flowfile attributes
+ attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
+ self.logger.info("Successfully transformed Avro content for OCR")
+
+ input_raw_bytes: bytes = flowFile.getContentsAsBytes()
+
+ # read avro record
+ self.logger.debug("Reading flowfile content as bytes")
+ input_byte_buffer: io.BytesIO = io.BytesIO(input_raw_bytes)
+ reader: DataFileReader = DataFileReader(input_byte_buffer, DatumReader())
+
+ # below is an example of how to handle avro records, each record
+ schema: Schema | None = reader.datum_reader.writers_schema
+
+ for record in reader:
+ #do stuff
+ pass
+
+ # streams need to be closed
+ input_byte_buffer.close()
+ reader.close()
+
+ # Write them to a binary avro stre
+ output_byte_buffer = io.BytesIO()
+ writer = DataFileWriter(output_byte_buffer, DatumWriter(), schema)
+
+ writer.flush()
+ writer.close()
+ output_byte_buffer.seek(0)
+
+ # add properties to flowfile attributes
+ attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()}
+ attributes["sample_property_one"] = str(self.sample_property_one)
+ attributes["sample_property_two"] = str(self.sample_property_two)
+ attributes["sample_property_three"] = str(self.sample_property_three)
+
+ return FlowFileTransformResult(relationship="success",
+ attributes=attributes,
+ contents=json.dumps(output_contents))
diff --git a/nifi/user_scripts/processors/record_decompress_cerner_blob.py b/nifi/user_scripts/processors/record_decompress_cerner_blob.py
new file mode 100644
index 000000000..1b93487d7
--- /dev/null
+++ b/nifi/user_scripts/processors/record_decompress_cerner_blob.py
@@ -0,0 +1,160 @@
+import base64
+import json
+import os
+import sys
+
+try:
+ from nifi.user_scripts.utils.codecs.cerner_blob import DecompressLzwCernerBlob
+except ModuleNotFoundError:
+ # Fallback for direct script execution when PYTHONPATH does not include repository root.
+ script_dir = os.path.dirname(os.path.abspath(__file__))
+ repo_root = os.path.abspath(os.path.join(script_dir, "..", "..", ".."))
+ if repo_root not in sys.path:
+ sys.path.insert(0, repo_root)
+ from nifi.user_scripts.utils.codecs.cerner_blob import DecompressLzwCernerBlob
+
+BINARY_FIELD_NAME = "binarydoc"
+OUTPUT_TEXT_FIELD_NAME = "text"
+DOCUMENT_ID_FIELD_NAME = "id"
+INPUT_CHARSET = "utf-8"
+OUTPUT_CHARSET = "utf-8"
+OUTPUT_MODE = "base64"
+BINARY_FIELD_SOURCE_ENCODING = "base64"
+BLOB_SEQUENCE_ORDER_FIELD_NAME = "blob_sequence_num"
+OPERATION_MODE = "base64"
+
+for arg in sys.argv:
+ _arg = arg.split("=", 1)
+ if _arg[0] == "binary_field_name":
+ BINARY_FIELD_NAME = _arg[1]
+ elif _arg[0] == "output_text_field_name":
+ OUTPUT_TEXT_FIELD_NAME = _arg[1]
+ elif _arg[0] == "document_id_field_name":
+ DOCUMENT_ID_FIELD_NAME = _arg[1]
+ elif _arg[0] == "input_charset":
+ INPUT_CHARSET = _arg[1]
+ elif _arg[0] == "output_charset":
+ OUTPUT_CHARSET = _arg[1]
+ elif _arg[0] == "output_mode":
+ OUTPUT_MODE = _arg[1]
+ elif _arg[0] == "binary_field_source_encoding":
+ BINARY_FIELD_SOURCE_ENCODING = _arg[1]
+ elif _arg[0] == "blob_sequence_order_field_name":
+ BLOB_SEQUENCE_ORDER_FIELD_NAME = _arg[1]
+ elif _arg[0] == "operation_mode":
+ OPERATION_MODE = _arg[1]
+
+
+def load_json_records(input_raw_bytes):
+ try:
+ return json.loads(input_raw_bytes.decode())
+ except (UnicodeDecodeError, json.JSONDecodeError):
+ try:
+ return json.loads(input_raw_bytes.decode(INPUT_CHARSET))
+ except (UnicodeDecodeError, json.JSONDecodeError):
+ try:
+ return json.loads(input_raw_bytes.decode("windows-1252"))
+ except (UnicodeDecodeError, json.JSONDecodeError) as exc:
+ raise ValueError(
+ "Error decoding JSON after trying utf-8, "
+ + INPUT_CHARSET
+ + ", and windows-1252"
+ ) from exc
+
+
+def decode_blob_part(value, blob_part):
+ if BINARY_FIELD_SOURCE_ENCODING == "base64":
+ if not isinstance(value, str):
+ raise ValueError(
+ f"Expected base64 string in {BINARY_FIELD_NAME} for part {blob_part}, got {type(value)}"
+ )
+
+ try:
+ return base64.b64decode(value, validate=True)
+ except Exception as exc:
+ raise ValueError(f"Error decoding base64 blob part {blob_part}: {exc}") from exc
+
+ if isinstance(value, str):
+ return value.encode(INPUT_CHARSET)
+
+ if isinstance(value, list) and all(isinstance(v, int) and 0 <= v <= 255 for v in value):
+ return bytes(value)
+
+ if isinstance(value, (bytes, bytearray)):
+ return bytes(value)
+
+ raise ValueError(
+ f"Expected bytes-like data in {BINARY_FIELD_NAME} for part {blob_part}, got {type(value)}"
+ )
+
+
+records = load_json_records(sys.stdin.buffer.read())
+
+if isinstance(records, dict):
+ records = [records]
+
+if not records:
+ raise ValueError("No records found in JSON input")
+
+# keep the same sanity check as the extension: one flowfile should carry one document
+doc_ids = {str(record.get(DOCUMENT_ID_FIELD_NAME, "")) for record in records}
+if len(doc_ids) > 1:
+ raise ValueError(f"Multiple document IDs in one FlowFile: {list(doc_ids)}")
+
+concatenated_blob_sequence_order = {}
+output_merged_record = {}
+
+have_any_sequence = any(BLOB_SEQUENCE_ORDER_FIELD_NAME in record for record in records)
+have_any_no_sequence = any(BLOB_SEQUENCE_ORDER_FIELD_NAME not in record for record in records)
+
+if have_any_sequence and have_any_no_sequence:
+ raise ValueError(
+ f"Mixed records: some have '{BLOB_SEQUENCE_ORDER_FIELD_NAME}', some don't. "
+ "Cannot safely reconstruct blob stream."
+ )
+
+for record in records:
+ if BINARY_FIELD_NAME not in record or record[BINARY_FIELD_NAME] in (None, ""):
+ raise ValueError(f"Missing '{BINARY_FIELD_NAME}' in a record")
+
+ if have_any_sequence:
+ sequence_number = int(record[BLOB_SEQUENCE_ORDER_FIELD_NAME])
+ if sequence_number in concatenated_blob_sequence_order:
+ raise ValueError(f"Duplicate {BLOB_SEQUENCE_ORDER_FIELD_NAME}: {sequence_number}")
+ concatenated_blob_sequence_order[sequence_number] = record[BINARY_FIELD_NAME]
+ else:
+ sequence_number = len(concatenated_blob_sequence_order)
+ concatenated_blob_sequence_order[sequence_number] = record[BINARY_FIELD_NAME]
+
+# copy all non-binary fields from the first input record
+for k, v in records[0].items():
+ if k != BINARY_FIELD_NAME and k not in output_merged_record:
+ output_merged_record[k] = v
+
+full_compressed_blob = bytearray()
+blob_sequence_keys = sorted(concatenated_blob_sequence_order.keys())
+
+for i in range(1, len(blob_sequence_keys)):
+ if blob_sequence_keys[i] != blob_sequence_keys[i - 1] + 1:
+ raise ValueError(
+ f"Sequence gap: missing {blob_sequence_keys[i - 1] + 1} "
+ f"(have {blob_sequence_keys[i - 1]} then {blob_sequence_keys[i]})"
+ )
+
+for blob_part in blob_sequence_keys:
+ full_compressed_blob.extend(
+ decode_blob_part(concatenated_blob_sequence_order[blob_part], blob_part)
+ )
+
+decompress_blob = DecompressLzwCernerBlob()
+decompress_blob.decompress(full_compressed_blob)
+decompressed_blob = bytes(decompress_blob.output_stream)
+
+if OUTPUT_MODE == "base64":
+ output_merged_record[BINARY_FIELD_NAME] = base64.b64encode(decompressed_blob).decode(OUTPUT_CHARSET)
+elif OUTPUT_MODE == "raw":
+ output_merged_record[BINARY_FIELD_NAME] = decompressed_blob.decode(OUTPUT_CHARSET, errors="replace")
+else:
+ raise ValueError(f"Unsupported output_mode: {OUTPUT_MODE}")
+
+sys.stdout.buffer.write(json.dumps([output_merged_record], ensure_ascii=False).encode("utf-8"))
diff --git a/nifi/user_scripts/utils/nifi/base_nifi_processor.py b/nifi/user_scripts/utils/nifi/base_nifi_processor.py
index f29478cf1..d5915bff9 100644
--- a/nifi/user_scripts/utils/nifi/base_nifi_processor.py
+++ b/nifi/user_scripts/utils/nifi/base_nifi_processor.py
@@ -219,11 +219,11 @@ def onScheduled(self, context: ProcessContext) -> None:
"""
pass
- def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
+ def process(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
"""
Process a FlowFile and return a FlowFileTransformResult.
- Subclasses must override this method to implement processor logic.
+ Subclasses should implement this method with processor logic.
Args:
context: The NiFi ProcessContext for this invocation.
@@ -233,3 +233,30 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr
NotImplementedError: Always, until overridden by a subclass.
"""
raise NotImplementedError
+
+ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTransformResult:
+ """
+ NiFi entrypoint. Calls process() and builds a failure result on exceptions.
+
+ Args:
+ context: The NiFi ProcessContext for this invocation.
+ flowFile: The FlowFile being processed.
+ """
+ self.process_context = context
+
+ try:
+ self.set_properties(context.getProperties())
+ result = self.process(context, flowFile)
+ if not isinstance(result, FlowFileTransformResult):
+ raise TypeError(
+ f"{self.__class__.__name__}.process() must return FlowFileTransformResult, "
+ f"got {type(result).__name__}"
+ )
+ return result
+ except Exception as exception:
+ self.logger.error("Exception during flowfile processing", exc_info=True)
+ return self.build_failure_result(
+ flowFile,
+ exception,
+ include_flowfile_attributes=True,
+ )
diff --git a/scripts/smoke_nifi_services.sh b/scripts/tests/smoke_nifi_services.sh
similarity index 79%
rename from scripts/smoke_nifi_services.sh
rename to scripts/tests/smoke_nifi_services.sh
index 0d2c79853..5835dbe65 100755
--- a/scripts/smoke_nifi_services.sh
+++ b/scripts/tests/smoke_nifi_services.sh
@@ -1,9 +1,9 @@
#!/usr/bin/env bash
-# Smoke checks for NiFi, NiFi Registry, and the nginx reverse proxy.
+# Smoke checks for NiFi and the nginx reverse proxy.
set -euo pipefail
-ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
+ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
ENV_LOADER="${ROOT_DIR}/deploy/export_env_vars.sh"
if [[ -f "$ENV_LOADER" ]]; then
@@ -18,7 +18,6 @@ fi
HOST="${NIFI_SMOKE_HOST:-localhost}"
NIFI_PORT="${NIFI_EXTERNAL_PORT_NGINX:-8443}"
-REGISTRY_PORT="${NIFI_REGISTRY_EXTERNAL_PORT_NGINX:-18443}"
ALLOWED_CODES=(200 301 302 303 307 308 401 403)
@@ -45,5 +44,4 @@ check_url() {
}
check_url "nifi" "https://${HOST}:${NIFI_PORT}/nifi/"
-check_url "nifi-registry" "https://${HOST}:${REGISTRY_PORT}/nifi-registry/"
check_url "nifi-nginx" "https://${HOST}:${NIFI_PORT}/"
diff --git a/services/nginx/config/nginx.conf.template b/services/nginx/config/nginx.conf.template
index 8b13fe656..a046069a9 100644
--- a/services/nginx/config/nginx.conf.template
+++ b/services/nginx/config/nginx.conf.template
@@ -39,10 +39,6 @@ http {
upstream nifi {
server nifi:8443;
}
-
- upstream nifi-registry {
- server nifi-registry:18443;
- }
include /etc/nginx/sites-enabled/*.conf;
@@ -81,60 +77,6 @@ http {
# return 301 https://$host$request_uri;
#}
- server {
- listen 18443 ssl;
- server_name nginx.local;
-
- ssl_certificate /certificates/nifi/nifi.pem;
- ssl_certificate_key /certificates/nifi/nifi.key;
- ssl_client_certificate /certificates/root/root-ca.pem;
- ssl_trusted_certificate /certificates/root/root-ca.pem;
-
- location / {
- proxy_ssl_certificate /certificates/nifi/nifi.pem;
- proxy_ssl_certificate_key /certificates/nifi/nifi.key;
- proxy_ssl_trusted_certificate /certificates/root/root-ca.pem;
- proxy_set_header Host nifi-registry;
- proxy_set_header X-Real-IP $remote_addr;
- proxy_set_header X-ProxyHost $host;
- proxy_set_header X-ProxyPort 18443;
- proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
- proxy_set_header X-ProxyScheme $scheme;
-
- proxy_pass https://nifi-registry;
- }
-
- location ^~ /nifi-registry/ {
- proxy_ssl_certificate /certificates/nifi/nifi.pem;
- proxy_ssl_certificate_key /certificates/nifi/nifi.key;
- proxy_ssl_trusted_certificate /certificates/root/root-ca.pem;
- proxy_set_header Host nifi-registry;
- proxy_set_header X-Real-IP $remote_addr;
- proxy_set_header X-ProxyHost $host;
- proxy_set_header X-ProxyPort 18443;
- proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
- proxy_set_header X-ProxyScheme $scheme;
-
- proxy_pass https://nifi-registry/nifi-registry/;
- }
-
- location ^~ /nifi-registry-api/ {
- proxy_ssl_certificate /certificates/nifi/nifi.pem;
- proxy_ssl_certificate_key /certificates/nifi/nifi.key;
- proxy_ssl_trusted_certificate /certificates/root/root-ca.pem;
-
- proxy_set_header Host nifi-registry;
- proxy_set_header X-Real-IP $remote_addr;
- proxy_set_header X-ProxyHost $host;
- proxy_set_header X-ProxyPort 18443;
- proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
- proxy_set_header X-ProxyScheme $scheme;
-
- proxy_pass https://nifi-registry/nifi-registry-api/;
- }
-
- }
-
server {
listen 8443 ssl;
server_name nginx.local;