Use the follow steps to create your new custom NiFi processor.
Procedure
-
Create the custom processor classes and test cases.
- Classes for processors are created within the
/src/main/java/ directory;
- Corresponding processor test cases are created within the
/src/test/java/ directory.
-
Create a class under src/main/java and extend it
with org.apache.nifi.processor.AbstractProcessor.
Alternatively, if you want to extend the default classes, extend it
using
com.hcl.software.data.ingest.processors.AbstractCommerceBaseProcessor
.
-
Provide appropriate tags and attributes using the following
annotations.
@Tags({"example"})
@CapabilityDescription("Provide a descrption")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
-
To organize imports, right click the Java file and select .
-
Implement the missing inherited method.
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException
{
// TODO Auto-generated method stub
}
You are now ready to start writing the logic for your custom
processor.
-
Create the custom logic for your NiFi processor. The following steps illustrate
how logic that you create in Java affects the NiFi processor, or displays on the
NiFi user interface.
Refer to the NiFi Developer’s Guidefor further
information on how to create a custom NiFi processor, and for thorough
explanation of the APIs that are used to develop extensions.
-
Declare property fields using
org.apache.nifi.components
. The PropertyDescriptor
data type appears on the PROPERTIES tab of the
custom processor.
For example:
public static final PropertyDescriptor SCROLL_DURATION = new PropertyDescriptor.Builder().name("Scroll Duration")
.description("The scroll duration is how long each search context is kept in memory")
.required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true).build();
-
Declare Relationship fields using
org.apache.nifi.processor
. The Relationship data
type appears on the SETTINGS tab of the custom
processor.
For example:
public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder().name("success")
.description("The flow file with the specified content was successfully transferred").build();
public static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder().name("failure")
.description("The flow file with the specified content has encountered an error during the transfer").build();
public static final Relationship RELATIONSHIP_NEXT = new Relationship.Builder().name("next")
.description("The flow file with the specified content for the next iteration").build();
-
Add the declared properties and relationships within the
init()
method.
@Override
protected void init(final ProcessorInitializationContext context) {
getSupportedPropertyDescriptors().add(SCROLL_DURATION);
getRelationships().add(RELATIONSHIP_SUCCESS);
getRelationships().add(RELATIONSHIP_FAILURE);
getRelationships().add(RELATIONSHIP_NEXT);
-
Override the
onTrigger()
method to perform the
following operations.
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final String duration = context.getProperty(SCROLL_DURATION).evaluateAttributeExpressions().getValue();
try {
- Get the FlowFile and its contents from the
ProcessSession.
final FlowFile flowFile = session.get();
final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
session.exportTo(flowFile, bytes);
bytes.close();
final String content = bytes.toString();
- Perform the operations that the processor is supposed to do with
the
content.
final Map<String, Object> json = (Map<String, Object>) new JsonSlurper().parseText(content);
final String scroll = (String) json.get("_scroll_id");
final Map<String, Object> hits = (Map<String, Object>) json.get(FIELD_HITS);
final List<Object> docs = (List<Object>) hits.get(FIELD_HITS);
if (docs.size() > 0) {
final String scrollBody = String.format("{ \"scroll\": \"%s\", \"scroll_id\": \"%s\" }", duration, scroll);
FlowFile nextFlowFile = session.create(flowFile);
session.putAttribute(nextFlowFile, "index.scroll.uri", "_search/scroll");
- Write the result back into the
FlowFile.
nextFlowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream outputStream) throws IOException {
outputStream.write(scrollBody.getBytes());
outputStream.flush();
}
});
session.getProvenanceReporter().modifyContent(nextFlowFile);
- Transfer the FlowFile to the desired
relationship.
session.getProvenanceReporter().route(flowFile, RELATIONSHIP_SUCCESS);
session.transfer(flowFile, RELATIONSHIP_SUCCESS);
-
In org.apache.nifi.processor.Processor, create an entry
containing the fully qualified classname of your newly created custom processor.
If you created a custom controller service, create an entry in
org.apache.nifi.controller.ControllerService.
For example, in the Project Explorer, expand and click on
org.apache.nifi.processor.Processor
. Enter your classname in
the text entry pane on the right side of the window.
Results
Your custom processor is created.