Introduction Hello, my name is Temirlan, and I'm a Staff Software Engineer in the GovTech domain. My daily tasks often involve streamlining complex workflows and enhancing system efficiencies, which sometimes makes the use of graphical user interfaces (GUIs) more of a hindrance than a help. This is especially true in environments where precision, automation, and scalability are key. From my experience, while GUIs can be visually appealing and user-friendly for general tasks, they often lack the flexibility needed for more sophisticated operations. This leads to inefficiencies and frustrations when handling tasks like setting up data streams, which are crucial in our field. That’s why I prefer a code-first approach, using APIs that align more naturally with my work as a software engineer. APIs offer the precision and programmability that GUIs often fail to deliver, allowing for more controlled and customizable management of technology. In this article, I'll share how I use NiPyAPI to manage Apache NiFi data streams programmatically, bypassing the GUI entirely. This approach not only saves time but also increases the reliability and reproducibility of our data flow setups. Use Case Overview In this use case, we will explore how a NiFi processor can be utilized to automate the retrieval of weather data from an online source. The primary goal is to demonstrate the power and flexibility of using NiPyAPI for handling data streams without the need for a graphical user interface. The scenario involves the following steps: Data Retrieval: Our NiFi processor is configured to connect to a specified weather information website. This processor is responsible for fetching current weather data, which could include information like temperature, humidity, wind speed, and other relevant meteorological metrics. Data Processing: Once the data is retrieved, it needs to be processed and formatted appropriately. This might involve parsing JSON data into a more usable format, filtering out unnecessary details, or transforming the data into a structure that is useful for downstream applications. Data Transmission to Kafka: After processing, the transformed data is sent to a Kafka topic. This step integrates the weather data into a larger data ecosystem, making it available for real-time analytics or for consumption by other applications within the organization. Logging Actions: To ensure traceability and to aid in debugging, the processor also logs each action it performs. This includes recording the time of data retrieval, any errors encountered during the process, and successful data integration events. Let’s Go and See How it Looks in Our Code! """ This script is used to create a data stream flow processor that gets data from a weather site, logs it, adds a processor for some simple transformation, and then puts the data to a Kafka topic. Modules: -------- nipyapi: Python client for NiFi API Functions: ---------- nipyapi.config.nifi_config.host: Set the host for NiFi API nipyapi.canvas.get_root_pg_id: Get the root process group id nipyapi.canvas.get_process_group: Get the process group nipyapi.canvas.create_process_group: Create a new process group nipyapi.canvas.get_processor: Get the processor nipyapi.canvas.create_processor: Create a new processor nipyapi.canvas.list_all_connections: List all connections nipyapi.canvas.create_connection: Create a new connection Variables: ---------- root_id: The root process group id root_process_group: The root process group pg_name: The name of the process group existing_pg: The existing process group my_pg: The process group processor_name_gff: The name of the GenerateFlowFile processor existing_processors_gff: The existing GenerateFlowFile processor generate_flowfile_processor: The GenerateFlowFile processor http_processor_name: The name of the GetHTTP processor existing_processors_http: The existing GetHTTP processor get_http_processor: The GetHTTP processor log_processor_name: The name of the Logger processor existing_processors_log: The existing Logger processor get_logger_processor: The Logger processor existing_connections: The existing connections connection_ff_to_http_name: The name of the connection from FlowFile to HTTP is_connection_ff_to_http: The connection from FlowFile to HTTP connection_http_to_logger: The name of the connection from HTTP to logger is_connection_http_to_logger: The connection from HTTP to logger replace_text_processor_name: The name of the ReplaceText processor existing_processors_replace_text: The existing ReplaceText processor replace_text_processor: The ReplaceText processor publish_kafka_processor_name: The name of the PublishKafka processor existing_processors_publish_kafka: The existing PublishKafka processor publish_kafka_processor: The PublishKafka processor connection_http_to_replace_text: The name of the connection from GetHTTP to ReplaceText is_connection_http_to_replace_text: The connection from GetHTTP to ReplaceText connection_replace_text_to_kafka: The name of the connection from ReplaceText to Kafka is_connection_replace_text_to_kafka: The connection from ReplaceText to Kafka """ import nipyapi nipyapi.config.nifi_config.host = 'http://localhost:8888/nifi-api' root_id = nipyapi.canvas.get_root_pg_id() root_process_group = nipyapi.canvas.get_process_group(root_id, 'id') pg_name = 'weather-python' existing_pg = nipyapi.canvas.get_process_group(pg_name, identifier_type='name') if existing_pg is None: my_pg = nipyapi.canvas.create_process_group( parent_pg=root_process_group, new_pg_name=pg_name, location=(400.0, 100.0) ) print(f"Created new process group: {pg_name}") else: my_pg = existing_pg print(f"Using existing process group: {pg_name}") processor_name_gff = 'GenerateFlowFile' existing_processors_gff = nipyapi.canvas.get_processor('GenerateFlowFile') if existing_processors_gff is None: generate_flowfile_processor = nipyapi.canvas.create_processor( parent_pg=my_pg, processor=nipyapi.canvas.get_processor_type('GenerateFlowFile'), name=processor_name_gff, location=(500.0, 100.0), config=nipyapi.nifi.ProcessorConfigDTO( properties={ 'Custom Text': 'Example text', 'File Size': '1 KB', 'time': '${time:format("yyyy-MM-dd HH:mm:ss", "GMT")}' }, scheduling_period='10 sec', scheduling_strategy='TIMER_DRIVEN' ) ) print(f"Created new processor: {processor_name_gff}") else: generate_flowfile_processor = existing_processors_gff print(f"Using existing processor: {processor_name_gff}") http_processor_name = 'GetHTTP' existing_processors_http = nipyapi.canvas.get_processor('GetHTTP') if existing_processors_http is None: get_http_processor = nipyapi.canvas.create_processor( parent_pg=my_pg, processor=nipyapi.canvas.get_processor_type('InvokeHTTP'), name=http_processor_name, location=(500.0, 400.0), config=nipyapi.nifi.ProcessorConfigDTO( properties={ 'HTTP Method': 'GET', 'Remote URL': 'https://api.open-meteo.com/v1/bom?latitude=51.1801&longitude=71.446&daily=temperature_2m_max&timezone=auto&start_date=${time}&end_date=${time}' }, auto_terminated_relationships=['Retry', 'No Retry', 'Original', 'Failure'] ) ) print(f"Created processor: {http_processor_name}") else: get_http_processor = existing_processors_http print(f"Using existing processor: {http_processor_name}") log_processor_name = 'Logger' existing_processors_log = nipyapi.canvas.get_processor('Logger') if existing_processors_log is None: get_logger_processor = nipyapi.canvas.create_processor( parent_pg=my_pg, processor=nipyapi.canvas.get_processor_type('LogMessage'), name=log_processor_name, location=(1200.0, 400.0) ) print(f"Created processor: {log_processor_name}") else: get_logger_processor = existing_processors_log print(f"Using existing processor: {log_processor_name}") existing_connections = nipyapi.canvas.list_all_connections(my_pg.id) connection_ff_to_http_name = 'FlowFile to HTTP' is_connection_ff_to_http = next((conn for conn in existing_connections if conn.component.name == connection_ff_to_http_name), None) if is_connection_ff_to_http is None: nipyapi.canvas.create_connection( source=generate_flowfile_processor, target=get_http_processor, relationships=['success'], name=connection_ff_to_http_name ) print(f"Connected {processor_name_gff} to {http_processor_name}") else: print(f"Connection '{connection_ff_to_http_name}' already exists") connection_http_to_logger = 'HTTP to logger' is_connection_http_to_logger = next((conn for conn in existing_connections if conn.component.name == connection_http_to_logger), None) if is_connection_http_to_logger is None: nipyapi.canvas.create_connection( source=get_http_processor, target=get_logger_processor, relationships=['Response'], name=connection_http_to_logger ) print(f"Connected {http_processor_name} to {log_processor_name}") else: print(f"Connection '{connection_http_to_logger}' already exists") replace_text_processor_name = 'ReplaceText' existing_processors_replace_text = nipyapi.canvas.get_processor(replace_text_processor_name) if existing_processors_replace_text is None: replace_text_processor = nipyapi.canvas.create_processor( parent_pg=my_pg, processor=nipyapi.canvas.get_processor_type('ReplaceText'), name=replace_text_processor_name, location=(800.0, 400.0), config=nipyapi.nifi.ProcessorConfigDTO( properties={ 'Replacement Strategy': 'Regex Replace', 'Search Value': 'regex_to_search', 'Replacement Value': 'replacement_value' } ) ) print(f"Created processor: {replace_text_processor_name}") else: replace_text_processor = existing_processors_replace_text print(f"Using existing processor: {replace_text_processor_name}") publish_kafka_processor_name = 'PublishKafka' existing_processors_publish_kafka = nipyapi.canvas.get_processor(publish_kafka_processor_name) if existing_processors_publish_kafka is None: publish_kafka_processor = nipyapi.canvas.create_processor( parent_pg=my_pg, processor=nipyapi.canvas.get_processor_type('PublishKafka'), name=publish_kafka_processor_name, location=(1100.0, 400.0), config=nipyapi.nifi.ProcessorConfigDTO( properties={ 'Kafka Brokers': 'kafka:9092', 'Topic Name': 'my_awesome_topic', } ) ) print(f"Created processor: {publish_kafka_processor_name}") else: publish_kafka_processor = existing_processors_publish_kafka print(f"Using existing processor: {publish_kafka_processor_name}") connection_http_to_replace_text = 'HTTP to ReplaceText' is_connection_http_to_replace_text = next((conn for conn in existing_connections if conn.component.name == connection_http_to_replace_text), None) if is_connection_http_to_replace_text is None: nipyapi.canvas.create_connection( source=get_http_processor, target=replace_text_processor, relationships=['Response'], name=connection_http_to_replace_text ) print(f"Connected {http_processor_name} to {replace_text_processor_name}") else: print(f"Connection '{connection_http_to_replace_text}' already exists") connection_replace_text_to_kafka = 'ReplaceText to Kafka' is_connection_replace_text_to_kafka = next((conn for conn in existing_connections if conn.component.name == connection_replace_text_to_kafka), None) if is_connection_replace_text_to_kafka is None: nipyapi.canvas.create_connection( source=replace_text_processor, target=publish_kafka_processor, relationships=['success'], name=connection_replace_text_to_kafka ) print(f"Connected {replace_text_processor_name} to {publish_kafka_processor_name}") else: print(f"Connection '{connection_replace_text_to_kafka}' already exists") Summary In this article, we explored how to build a lightweight ETL (Extract, Transform, Load) process using the Python API NiPyAPI, adopting a no-UI approach that leverages programmable interactions with Apache NiFi. We detailed a practical use case where a NiFi processor was configured to fetch, process, and transmit weather data directly to a Kafka topic for further utilization within an enterprise ecosystem. This process involved: Fetching real-time weather data from an online source without the need for manual intervention. Transforming the raw data into a structured format suitable for analytical purposes and ensuring it aligns with downstream system requirements. Seamlessly transmitting the processed data to Kafka, demonstrating how NiFi can integrate into broader data architectures. Logging all operations to maintain a clear audit trail and facilitate debugging and operational transparency. By employing a code-first approach using NiPyAPI, we showcased how software engineers can enhance automation, reduce reliance on graphical user interfaces, and increase the scalability and reliability of data flows. This method is especially beneficial in fields like GovTech, where accuracy, efficiency, and reliability are paramount. The techniques discussed provide a foundation for engineers looking to implement similar ETL workflows in their projects, driving forward the capabilities of modern data handling and integration. Introduction Hello, my name is Temirlan, and I'm a Staff Software Engineer in the GovTech domain. My daily tasks often involve streamlining complex workflows and enhancing system efficiencies, which sometimes makes the use of graphical user interfaces (GUIs) more of a hindrance than a help. This is especially true in environments where precision, automation, and scalability are key. Temirlan, and I'm a Staff Software Engineer From my experience, while GUIs can be visually appealing and user-friendly for general tasks, they often lack the flexibility needed for more sophisticated operations. This leads to inefficiencies and frustrations when handling tasks like setting up data streams, which are crucial in our field. That’s why I prefer a code-first approach, using APIs that align more naturally with my work as a software engineer. APIs offer the precision and programmability that GUIs often fail to deliver, allowing for more controlled and customizable management of technology. In this article, I'll share how I use NiPyAPI to manage Apache NiFi data streams programmatically, bypassing the GUI entirely. This approach not only saves time but also increases the reliability and reproducibility of our data flow setups. how I use NiPyAPI to manage Apache NiFi Use Case Overview In this use case, we will explore how a NiFi processor can be utilized to automate the retrieval of weather data from an online source. The primary goal is to demonstrate the power and flexibility of using NiPyAPI for handling data streams without the need for a graphical user interface. The scenario involves the following steps: Data Retrieval: Our NiFi processor is configured to connect to a specified weather information website. This processor is responsible for fetching current weather data, which could include information like temperature, humidity, wind speed, and other relevant meteorological metrics. Data Processing: Once the data is retrieved, it needs to be processed and formatted appropriately. This might involve parsing JSON data into a more usable format, filtering out unnecessary details, or transforming the data into a structure that is useful for downstream applications. Data Transmission to Kafka: After processing, the transformed data is sent to a Kafka topic. This step integrates the weather data into a larger data ecosystem, making it available for real-time analytics or for consumption by other applications within the organization. Logging Actions: To ensure traceability and to aid in debugging, the processor also logs each action it performs. This includes recording the time of data retrieval, any errors encountered during the process, and successful data integration events. Data Retrieval: Our NiFi processor is configured to connect to a specified weather information website. This processor is responsible for fetching current weather data, which could include information like temperature, humidity, wind speed, and other relevant meteorological metrics. Data Retrieval : Our NiFi processor is configured to connect to a specified weather information website. This processor is responsible for fetching current weather data, which could include information like temperature, humidity, wind speed, and other relevant meteorological metrics. Data Retrieval Data Processing: Once the data is retrieved, it needs to be processed and formatted appropriately. This might involve parsing JSON data into a more usable format, filtering out unnecessary details, or transforming the data into a structure that is useful for downstream applications. Data Processing : Once the data is retrieved, it needs to be processed and formatted appropriately. This might involve parsing JSON data into a more usable format, filtering out unnecessary details, or transforming the data into a structure that is useful for downstream applications. Data Processing Data Transmission to Kafka: After processing, the transformed data is sent to a Kafka topic. This step integrates the weather data into a larger data ecosystem, making it available for real-time analytics or for consumption by other applications within the organization. Data Transmission to Kafka : After processing, the transformed data is sent to a Kafka topic. This step integrates the weather data into a larger data ecosystem, making it available for real-time analytics or for consumption by other applications within the organization. Data Transmission to Kafka Logging Actions: To ensure traceability and to aid in debugging, the processor also logs each action it performs. This includes recording the time of data retrieval, any errors encountered during the process, and successful data integration events. Logging Actions : To ensure traceability and to aid in debugging, the processor also logs each action it performs. This includes recording the time of data retrieval, any errors encountered during the process, and successful data integration events. Logging Actions Let’s Go and See How it Looks in Our Code! """ This script is used to create a data stream flow processor that gets data from a weather site, logs it, adds a processor for some simple transformation, and then puts the data to a Kafka topic. Modules: -------- nipyapi: Python client for NiFi API Functions: ---------- nipyapi.config.nifi_config.host: Set the host for NiFi API nipyapi.canvas.get_root_pg_id: Get the root process group id nipyapi.canvas.get_process_group: Get the process group nipyapi.canvas.create_process_group: Create a new process group nipyapi.canvas.get_processor: Get the processor nipyapi.canvas.create_processor: Create a new processor nipyapi.canvas.list_all_connections: List all connections nipyapi.canvas.create_connection: Create a new connection Variables: ---------- root_id: The root process group id root_process_group: The root process group pg_name: The name of the process group existing_pg: The existing process group my_pg: The process group processor_name_gff: The name of the GenerateFlowFile processor existing_processors_gff: The existing GenerateFlowFile processor generate_flowfile_processor: The GenerateFlowFile processor http_processor_name: The name of the GetHTTP processor existing_processors_http: The existing GetHTTP processor get_http_processor: The GetHTTP processor log_processor_name: The name of the Logger processor existing_processors_log: The existing Logger processor get_logger_processor: The Logger processor existing_connections: The existing connections connection_ff_to_http_name: The name of the connection from FlowFile to HTTP is_connection_ff_to_http: The connection from FlowFile to HTTP connection_http_to_logger: The name of the connection from HTTP to logger is_connection_http_to_logger: The connection from HTTP to logger replace_text_processor_name: The name of the ReplaceText processor existing_processors_replace_text: The existing ReplaceText processor replace_text_processor: The ReplaceText processor publish_kafka_processor_name: The name of the PublishKafka processor existing_processors_publish_kafka: The existing PublishKafka processor publish_kafka_processor: The PublishKafka processor connection_http_to_replace_text: The name of the connection from GetHTTP to ReplaceText is_connection_http_to_replace_text: The connection from GetHTTP to ReplaceText connection_replace_text_to_kafka: The name of the connection from ReplaceText to Kafka is_connection_replace_text_to_kafka: The connection from ReplaceText to Kafka """ import nipyapi nipyapi.config.nifi_config.host = 'http://localhost:8888/nifi-api' root_id = nipyapi.canvas.get_root_pg_id() root_process_group = nipyapi.canvas.get_process_group(root_id, 'id') pg_name = 'weather-python' existing_pg = nipyapi.canvas.get_process_group(pg_name, identifier_type='name') if existing_pg is None: my_pg = nipyapi.canvas.create_process_group( parent_pg=root_process_group, new_pg_name=pg_name, location=(400.0, 100.0) ) print(f"Created new process group: {pg_name}") else: my_pg = existing_pg print(f"Using existing process group: {pg_name}") processor_name_gff = 'GenerateFlowFile' existing_processors_gff = nipyapi.canvas.get_processor('GenerateFlowFile') if existing_processors_gff is None: generate_flowfile_processor = nipyapi.canvas.create_processor( parent_pg=my_pg, processor=nipyapi.canvas.get_processor_type('GenerateFlowFile'), name=processor_name_gff, location=(500.0, 100.0), config=nipyapi.nifi.ProcessorConfigDTO( properties={ 'Custom Text': 'Example text', 'File Size': '1 KB', 'time': '${time:format("yyyy-MM-dd HH:mm:ss", "GMT")}' }, scheduling_period='10 sec', scheduling_strategy='TIMER_DRIVEN' ) ) print(f"Created new processor: {processor_name_gff}") else: generate_flowfile_processor = existing_processors_gff print(f"Using existing processor: {processor_name_gff}") http_processor_name = 'GetHTTP' existing_processors_http = nipyapi.canvas.get_processor('GetHTTP') if existing_processors_http is None: get_http_processor = nipyapi.canvas.create_processor( parent_pg=my_pg, processor=nipyapi.canvas.get_processor_type('InvokeHTTP'), name=http_processor_name, location=(500.0, 400.0), config=nipyapi.nifi.ProcessorConfigDTO( properties={ 'HTTP Method': 'GET', 'Remote URL': 'https://api.open-meteo.com/v1/bom?latitude=51.1801&longitude=71.446&daily=temperature_2m_max&timezone=auto&start_date=${time}&end_date=${time}' }, auto_terminated_relationships=['Retry', 'No Retry', 'Original', 'Failure'] ) ) print(f"Created processor: {http_processor_name}") else: get_http_processor = existing_processors_http print(f"Using existing processor: {http_processor_name}") log_processor_name = 'Logger' existing_processors_log = nipyapi.canvas.get_processor('Logger') if existing_processors_log is None: get_logger_processor = nipyapi.canvas.create_processor( parent_pg=my_pg, processor=nipyapi.canvas.get_processor_type('LogMessage'), name=log_processor_name, location=(1200.0, 400.0) ) print(f"Created processor: {log_processor_name}") else: get_logger_processor = existing_processors_log print(f"Using existing processor: {log_processor_name}") existing_connections = nipyapi.canvas.list_all_connections(my_pg.id) connection_ff_to_http_name = 'FlowFile to HTTP' is_connection_ff_to_http = next((conn for conn in existing_connections if conn.component.name == connection_ff_to_http_name), None) if is_connection_ff_to_http is None: nipyapi.canvas.create_connection( source=generate_flowfile_processor, target=get_http_processor, relationships=['success'], name=connection_ff_to_http_name ) print(f"Connected {processor_name_gff} to {http_processor_name}") else: print(f"Connection '{connection_ff_to_http_name}' already exists") connection_http_to_logger = 'HTTP to logger' is_connection_http_to_logger = next((conn for conn in existing_connections if conn.component.name == connection_http_to_logger), None) if is_connection_http_to_logger is None: nipyapi.canvas.create_connection( source=get_http_processor, target=get_logger_processor, relationships=['Response'], name=connection_http_to_logger ) print(f"Connected {http_processor_name} to {log_processor_name}") else: print(f"Connection '{connection_http_to_logger}' already exists") replace_text_processor_name = 'ReplaceText' existing_processors_replace_text = nipyapi.canvas.get_processor(replace_text_processor_name) if existing_processors_replace_text is None: replace_text_processor = nipyapi.canvas.create_processor( parent_pg=my_pg, processor=nipyapi.canvas.get_processor_type('ReplaceText'), name=replace_text_processor_name, location=(800.0, 400.0), config=nipyapi.nifi.ProcessorConfigDTO( properties={ 'Replacement Strategy': 'Regex Replace', 'Search Value': 'regex_to_search', 'Replacement Value': 'replacement_value' } ) ) print(f"Created processor: {replace_text_processor_name}") else: replace_text_processor = existing_processors_replace_text print(f"Using existing processor: {replace_text_processor_name}") publish_kafka_processor_name = 'PublishKafka' existing_processors_publish_kafka = nipyapi.canvas.get_processor(publish_kafka_processor_name) if existing_processors_publish_kafka is None: publish_kafka_processor = nipyapi.canvas.create_processor( parent_pg=my_pg, processor=nipyapi.canvas.get_processor_type('PublishKafka'), name=publish_kafka_processor_name, location=(1100.0, 400.0), config=nipyapi.nifi.ProcessorConfigDTO( properties={ 'Kafka Brokers': 'kafka:9092', 'Topic Name': 'my_awesome_topic', } ) ) print(f"Created processor: {publish_kafka_processor_name}") else: publish_kafka_processor = existing_processors_publish_kafka print(f"Using existing processor: {publish_kafka_processor_name}") connection_http_to_replace_text = 'HTTP to ReplaceText' is_connection_http_to_replace_text = next((conn for conn in existing_connections if conn.component.name == connection_http_to_replace_text), None) if is_connection_http_to_replace_text is None: nipyapi.canvas.create_connection( source=get_http_processor, target=replace_text_processor, relationships=['Response'], name=connection_http_to_replace_text ) print(f"Connected {http_processor_name} to {replace_text_processor_name}") else: print(f"Connection '{connection_http_to_replace_text}' already exists") connection_replace_text_to_kafka = 'ReplaceText to Kafka' is_connection_replace_text_to_kafka = next((conn for conn in existing_connections if conn.component.name == connection_replace_text_to_kafka), None) if is_connection_replace_text_to_kafka is None: nipyapi.canvas.create_connection( source=replace_text_processor, target=publish_kafka_processor, relationships=['success'], name=connection_replace_text_to_kafka ) print(f"Connected {replace_text_processor_name} to {publish_kafka_processor_name}") else: print(f"Connection '{connection_replace_text_to_kafka}' already exists") """ This script is used to create a data stream flow processor that gets data from a weather site, logs it, adds a processor for some simple transformation, and then puts the data to a Kafka topic. Modules: -------- nipyapi: Python client for NiFi API Functions: ---------- nipyapi.config.nifi_config.host: Set the host for NiFi API nipyapi.canvas.get_root_pg_id: Get the root process group id nipyapi.canvas.get_process_group: Get the process group nipyapi.canvas.create_process_group: Create a new process group nipyapi.canvas.get_processor: Get the processor nipyapi.canvas.create_processor: Create a new processor nipyapi.canvas.list_all_connections: List all connections nipyapi.canvas.create_connection: Create a new connection Variables: ---------- root_id: The root process group id root_process_group: The root process group pg_name: The name of the process group existing_pg: The existing process group my_pg: The process group processor_name_gff: The name of the GenerateFlowFile processor existing_processors_gff: The existing GenerateFlowFile processor generate_flowfile_processor: The GenerateFlowFile processor http_processor_name: The name of the GetHTTP processor existing_processors_http: The existing GetHTTP processor get_http_processor: The GetHTTP processor log_processor_name: The name of the Logger processor existing_processors_log: The existing Logger processor get_logger_processor: The Logger processor existing_connections: The existing connections connection_ff_to_http_name: The name of the connection from FlowFile to HTTP is_connection_ff_to_http: The connection from FlowFile to HTTP connection_http_to_logger: The name of the connection from HTTP to logger is_connection_http_to_logger: The connection from HTTP to logger replace_text_processor_name: The name of the ReplaceText processor existing_processors_replace_text: The existing ReplaceText processor replace_text_processor: The ReplaceText processor publish_kafka_processor_name: The name of the PublishKafka processor existing_processors_publish_kafka: The existing PublishKafka processor publish_kafka_processor: The PublishKafka processor connection_http_to_replace_text: The name of the connection from GetHTTP to ReplaceText is_connection_http_to_replace_text: The connection from GetHTTP to ReplaceText connection_replace_text_to_kafka: The name of the connection from ReplaceText to Kafka is_connection_replace_text_to_kafka: The connection from ReplaceText to Kafka """ import nipyapi nipyapi.config.nifi_config.host = 'http://localhost:8888/nifi-api' root_id = nipyapi.canvas.get_root_pg_id() root_process_group = nipyapi.canvas.get_process_group(root_id, 'id') pg_name = 'weather-python' existing_pg = nipyapi.canvas.get_process_group(pg_name, identifier_type='name') if existing_pg is None: my_pg = nipyapi.canvas.create_process_group( parent_pg=root_process_group, new_pg_name=pg_name, location=(400.0, 100.0) ) print(f"Created new process group: {pg_name}") else: my_pg = existing_pg print(f"Using existing process group: {pg_name}") processor_name_gff = 'GenerateFlowFile' existing_processors_gff = nipyapi.canvas.get_processor('GenerateFlowFile') if existing_processors_gff is None: generate_flowfile_processor = nipyapi.canvas.create_processor( parent_pg=my_pg, processor=nipyapi.canvas.get_processor_type('GenerateFlowFile'), name=processor_name_gff, location=(500.0, 100.0), config=nipyapi.nifi.ProcessorConfigDTO( properties={ 'Custom Text': 'Example text', 'File Size': '1 KB', 'time': '${time:format("yyyy-MM-dd HH:mm:ss", "GMT")}' }, scheduling_period='10 sec', scheduling_strategy='TIMER_DRIVEN' ) ) print(f"Created new processor: {processor_name_gff}") else: generate_flowfile_processor = existing_processors_gff print(f"Using existing processor: {processor_name_gff}") http_processor_name = 'GetHTTP' existing_processors_http = nipyapi.canvas.get_processor('GetHTTP') if existing_processors_http is None: get_http_processor = nipyapi.canvas.create_processor( parent_pg=my_pg, processor=nipyapi.canvas.get_processor_type('InvokeHTTP'), name=http_processor_name, location=(500.0, 400.0), config=nipyapi.nifi.ProcessorConfigDTO( properties={ 'HTTP Method': 'GET', 'Remote URL': 'https://api.open-meteo.com/v1/bom?latitude=51.1801&longitude=71.446&daily=temperature_2m_max&timezone=auto&start_date=${time}&end_date=${time}' }, auto_terminated_relationships=['Retry', 'No Retry', 'Original', 'Failure'] ) ) print(f"Created processor: {http_processor_name}") else: get_http_processor = existing_processors_http print(f"Using existing processor: {http_processor_name}") log_processor_name = 'Logger' existing_processors_log = nipyapi.canvas.get_processor('Logger') if existing_processors_log is None: get_logger_processor = nipyapi.canvas.create_processor( parent_pg=my_pg, processor=nipyapi.canvas.get_processor_type('LogMessage'), name=log_processor_name, location=(1200.0, 400.0) ) print(f"Created processor: {log_processor_name}") else: get_logger_processor = existing_processors_log print(f"Using existing processor: {log_processor_name}") existing_connections = nipyapi.canvas.list_all_connections(my_pg.id) connection_ff_to_http_name = 'FlowFile to HTTP' is_connection_ff_to_http = next((conn for conn in existing_connections if conn.component.name == connection_ff_to_http_name), None) if is_connection_ff_to_http is None: nipyapi.canvas.create_connection( source=generate_flowfile_processor, target=get_http_processor, relationships=['success'], name=connection_ff_to_http_name ) print(f"Connected {processor_name_gff} to {http_processor_name}") else: print(f"Connection '{connection_ff_to_http_name}' already exists") connection_http_to_logger = 'HTTP to logger' is_connection_http_to_logger = next((conn for conn in existing_connections if conn.component.name == connection_http_to_logger), None) if is_connection_http_to_logger is None: nipyapi.canvas.create_connection( source=get_http_processor, target=get_logger_processor, relationships=['Response'], name=connection_http_to_logger ) print(f"Connected {http_processor_name} to {log_processor_name}") else: print(f"Connection '{connection_http_to_logger}' already exists") replace_text_processor_name = 'ReplaceText' existing_processors_replace_text = nipyapi.canvas.get_processor(replace_text_processor_name) if existing_processors_replace_text is None: replace_text_processor = nipyapi.canvas.create_processor( parent_pg=my_pg, processor=nipyapi.canvas.get_processor_type('ReplaceText'), name=replace_text_processor_name, location=(800.0, 400.0), config=nipyapi.nifi.ProcessorConfigDTO( properties={ 'Replacement Strategy': 'Regex Replace', 'Search Value': 'regex_to_search', 'Replacement Value': 'replacement_value' } ) ) print(f"Created processor: {replace_text_processor_name}") else: replace_text_processor = existing_processors_replace_text print(f"Using existing processor: {replace_text_processor_name}") publish_kafka_processor_name = 'PublishKafka' existing_processors_publish_kafka = nipyapi.canvas.get_processor(publish_kafka_processor_name) if existing_processors_publish_kafka is None: publish_kafka_processor = nipyapi.canvas.create_processor( parent_pg=my_pg, processor=nipyapi.canvas.get_processor_type('PublishKafka'), name=publish_kafka_processor_name, location=(1100.0, 400.0), config=nipyapi.nifi.ProcessorConfigDTO( properties={ 'Kafka Brokers': 'kafka:9092', 'Topic Name': 'my_awesome_topic', } ) ) print(f"Created processor: {publish_kafka_processor_name}") else: publish_kafka_processor = existing_processors_publish_kafka print(f"Using existing processor: {publish_kafka_processor_name}") connection_http_to_replace_text = 'HTTP to ReplaceText' is_connection_http_to_replace_text = next((conn for conn in existing_connections if conn.component.name == connection_http_to_replace_text), None) if is_connection_http_to_replace_text is None: nipyapi.canvas.create_connection( source=get_http_processor, target=replace_text_processor, relationships=['Response'], name=connection_http_to_replace_text ) print(f"Connected {http_processor_name} to {replace_text_processor_name}") else: print(f"Connection '{connection_http_to_replace_text}' already exists") connection_replace_text_to_kafka = 'ReplaceText to Kafka' is_connection_replace_text_to_kafka = next((conn for conn in existing_connections if conn.component.name == connection_replace_text_to_kafka), None) if is_connection_replace_text_to_kafka is None: nipyapi.canvas.create_connection( source=replace_text_processor, target=publish_kafka_processor, relationships=['success'], name=connection_replace_text_to_kafka ) print(f"Connected {replace_text_processor_name} to {publish_kafka_processor_name}") else: print(f"Connection '{connection_replace_text_to_kafka}' already exists") Summary In this article, we explored how to build a lightweight ETL (Extract, Transform, Load) process using the Python API NiPyAPI, adopting a no-UI approach that leverages programmable interactions with Apache NiFi. We detailed a practical use case where a NiFi processor was configured to fetch, process, and transmit weather data directly to a Kafka topic for further utilization within an enterprise ecosystem. This process involved: Fetching real-time weather data from an online source without the need for manual intervention. Fetching real-time weather data from an online source without the need for manual intervention. Fetching real-time weather data Transforming the raw data into a structured format suitable for analytical purposes and ensuring it aligns with downstream system requirements. Transforming the raw data into a structured format suitable for analytical purposes and ensuring it aligns with downstream system requirements. Transforming the raw data Seamlessly transmitting the processed data to Kafka, demonstrating how NiFi can integrate into broader data architectures. Seamlessly transmitting the processed data to Kafka, demonstrating how NiFi can integrate into broader data architectures. Seamlessly transmitting the processed data Logging all operations to maintain a clear audit trail and facilitate debugging and operational transparency. Logging all operations to maintain a clear audit trail and facilitate debugging and operational transparency. Logging all operations By employing a code-first approach using NiPyAPI, we showcased how software engineers can enhance automation, reduce reliance on graphical user interfaces, and increase the scalability and reliability of data flows. This method is especially beneficial in fields like GovTech, where accuracy, efficiency, and reliability are paramount. The techniques discussed provide a foundation for engineers looking to implement similar ETL workflows in their projects, driving forward the capabilities of modern data handling and integration.