Apache NiFi is a powerful tool for automating data flows with a large number of built-in processors. However, sometimes there is a need for specific processors to handle unique requirements and data stores. In such cases, creating custom processors becomes a necessity. In this article, we will go through the steps to create a very simple custom processor for Apache NiFi. Setup and sources gradle setup dependencies { compile "org.apache.nifi:nifi-api:*" compile "org.apache.nifi:nifi-utils:1.9.2" testCompile "org.apache.nifi:nifi-mock:1.9.2" } source of the custom processor import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.*; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.*; @Tags({"example"}) @CapabilityDescription("Hello World to output") @SeeAlso({}) @ReadsAttributes({@ReadsAttribute(attribute = "", description = "")}) @WritesAttributes({@WritesAttribute(attribute = "", description = "")}) public class MyProcessor extends AbstractProcessor { public static final PropertyDescriptor propertyDescriptor = new PropertyDescriptor .Builder().name("name") .displayName("name") .description("Name to print") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); public static final Relationship successRelations = new Relationship.Builder() .name("success") .description("If everything is ok") .build(); public static final Relationship failureRelations = new Relationship.Builder() .name("failure") .description("If something is wrong") .build(); private List<PropertyDescriptor> descriptorList; private Set<Relationship> relationshipSet; @Override protected void init(final ProcessorInitializationContext context) { final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); descriptors.add(propertyDescriptor); this.descriptorList = Collections.unmodifiableList(descriptors); final Set<Relationship> relationships = new HashSet<Relationship>(); relationships.add(successRelations); relationships.add(failureRelations); this.relationshipSet = Collections.unmodifiableSet(relationships); } public Set<Relationship> getRelationshipSet() { return this.relationshipSet; } @Override public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { return descriptorList; } @OnScheduled public void onScheduled(final ProcessContext context) { } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); if (flowFile == null) { return; } try { String name = context.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue(); String result = "Hello " + name; session.putAttribute(flowFile, "result", result); try (OutputStream flowFileOutputStream = session.write(flowFile)) { flowFileOutputStream.write(result.getBytes(StandardCharsets.UTF_8)); } session.transfer(flowFile, successRelations); } catch (IOException e) { //IO Error processing error log session.transfer(flowFile, failureRelations); } catch (ProcessException e) { // Process error log getLogger().error("Processing error", e); session.transfer(flowFile, failureRelations); } } } Build and deploy Build the project with Gradle: gradle build. Find the generated .nar file in the target folder of your module. Copy the .nar file to the lib folder of your NiFi installation. Screenshot of how the custom processor integrates seamlessly I hope this article was useful for you! Creating a custom processor for Apache NiFi allows you to customize it to meet the specific requirements of your project. By following the steps above, you will be able to develop and integrate a custom processor that will enhance the processing capabilities of your system. Apache NiFi is a powerful tool for automating data flows with a large number of built-in processors. However, sometimes there is a need for specific processors to handle unique requirements and data stores. In such cases, creating custom processors becomes a necessity. In this article, we will go through the steps to create a very simple custom processor for Apache NiFi. Setup and sources gradle setup gradle setup dependencies { compile "org.apache.nifi:nifi-api:*" compile "org.apache.nifi:nifi-utils:1.9.2" testCompile "org.apache.nifi:nifi-mock:1.9.2" } dependencies { compile "org.apache.nifi:nifi-api:*" compile "org.apache.nifi:nifi-utils:1.9.2" testCompile "org.apache.nifi:nifi-mock:1.9.2" } source of the custom processor source of the custom processor import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.*; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.*; @Tags({"example"}) @CapabilityDescription("Hello World to output") @SeeAlso({}) @ReadsAttributes({@ReadsAttribute(attribute = "", description = "")}) @WritesAttributes({@WritesAttribute(attribute = "", description = "")}) public class MyProcessor extends AbstractProcessor { public static final PropertyDescriptor propertyDescriptor = new PropertyDescriptor .Builder().name("name") .displayName("name") .description("Name to print") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); public static final Relationship successRelations = new Relationship.Builder() .name("success") .description("If everything is ok") .build(); public static final Relationship failureRelations = new Relationship.Builder() .name("failure") .description("If something is wrong") .build(); private List<PropertyDescriptor> descriptorList; private Set<Relationship> relationshipSet; @Override protected void init(final ProcessorInitializationContext context) { final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); descriptors.add(propertyDescriptor); this.descriptorList = Collections.unmodifiableList(descriptors); final Set<Relationship> relationships = new HashSet<Relationship>(); relationships.add(successRelations); relationships.add(failureRelations); this.relationshipSet = Collections.unmodifiableSet(relationships); } public Set<Relationship> getRelationshipSet() { return this.relationshipSet; } @Override public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { return descriptorList; } @OnScheduled public void onScheduled(final ProcessContext context) { } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); if (flowFile == null) { return; } try { String name = context.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue(); String result = "Hello " + name; session.putAttribute(flowFile, "result", result); try (OutputStream flowFileOutputStream = session.write(flowFile)) { flowFileOutputStream.write(result.getBytes(StandardCharsets.UTF_8)); } session.transfer(flowFile, successRelations); } catch (IOException e) { //IO Error processing error log session.transfer(flowFile, failureRelations); } catch (ProcessException e) { // Process error log getLogger().error("Processing error", e); session.transfer(flowFile, failureRelations); } } } import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.*; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.*; @Tags({"example"}) @CapabilityDescription("Hello World to output") @SeeAlso({}) @ReadsAttributes({@ReadsAttribute(attribute = "", description = "")}) @WritesAttributes({@WritesAttribute(attribute = "", description = "")}) public class MyProcessor extends AbstractProcessor { public static final PropertyDescriptor propertyDescriptor = new PropertyDescriptor .Builder().name("name") .displayName("name") .description("Name to print") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); public static final Relationship successRelations = new Relationship.Builder() .name("success") .description("If everything is ok") .build(); public static final Relationship failureRelations = new Relationship.Builder() .name("failure") .description("If something is wrong") .build(); private List<PropertyDescriptor> descriptorList; private Set<Relationship> relationshipSet; @Override protected void init(final ProcessorInitializationContext context) { final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); descriptors.add(propertyDescriptor); this.descriptorList = Collections.unmodifiableList(descriptors); final Set<Relationship> relationships = new HashSet<Relationship>(); relationships.add(successRelations); relationships.add(failureRelations); this.relationshipSet = Collections.unmodifiableSet(relationships); } public Set<Relationship> getRelationshipSet() { return this.relationshipSet; } @Override public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { return descriptorList; } @OnScheduled public void onScheduled(final ProcessContext context) { } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); if (flowFile == null) { return; } try { String name = context.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue(); String result = "Hello " + name; session.putAttribute(flowFile, "result", result); try (OutputStream flowFileOutputStream = session.write(flowFile)) { flowFileOutputStream.write(result.getBytes(StandardCharsets.UTF_8)); } session.transfer(flowFile, successRelations); } catch (IOException e) { //IO Error processing error log session.transfer(flowFile, failureRelations); } catch (ProcessException e) { // Process error log getLogger().error("Processing error", e); session.transfer(flowFile, failureRelations); } } } Build and deploy Build and deploy Build the project with Gradle: gradle build. Find the generated .nar file in the target folder of your module. Copy the .nar file to the lib folder of your NiFi installation. Build the project with Gradle: gradle build . gradle build Find the generated .nar file in the target folder of your module. .nar Copy the .nar file to the lib folder of your NiFi installation. .nar lib Screenshot of how the custom processor integrates seamlessly Screenshot of how the custom processor integrates seamlessly I hope this article was useful for you! Creating a custom processor for Apache NiFi allows you to customize it to meet the specific requirements of your project. By following the steps above, you will be able to develop and integrate a custom processor that will enhance the processing capabilities of your system.