Updating NiFi process group, processor, controller service using Ingest connector descriptor
NiFi processors are the basic building blocks of the Version 9.1 dataflow pipeline. Processors perform specific tasks within the pipeline such as listening for incoming data, pulling data from external sources, and routing, transforming, or extracting information from flow files. Processors are grouped into process groups. You can use a simple API to update existing processors, groups, and their associated services.
The REST endpoint for this API is:
-
PUT /connectors
-
BODY connectorEntity Json
Processors
You can update or change the processor configurations, processor properties, connection, connection configurations as per your business requirements. You specify processor changes by including the following property in
The customization occurs in the processor that is specified in pipe
, where ProcessGroupName is the name of the process group that contains the processor itself.This API can also customize all processors that are nested in the process group specified if ''*" is used in the scope name. for example: "ProcessGroupName*".
Update processor configurations:
You can update or change the following processor configuration items:
Configuration item name | Corresponding configuration item name in JSON | Description | Accepted value |
---|---|---|---|
Concurrent Tasks | ConcurrentlySchedulableTaskCount | The number of tasks that should be concurrently scheduled for the specified processor. | n Note: n is an integer. |
Run Schedule | SchedulingPeriod | The minimum number of seconds that should elapse between task execution | n sec Note: n is an integer. |
Execution | ExecutionNode | The nodes that this processor will be scheduled to run on. | Any one of the following values:
|
Penalty Duration | PenaltyDuration | The amount of time used when this processor penalizes a flowFile. | n sec Note: n is an integer. |
Yield Duration | YieldDuration | When a processor yields, it will not be rescheduled again until this amount of time has elapsed. | n sec Note: n is an integer. |
Bulletin Level | BulletinLevel | The level at which this processor will generate bulletins. | Any one of the following values:
|
Scheduling Strategy | SchedulingStrategy | The strategy used to schedule this processor. | Any one of the following values:
|
Comments | Comments | String |
- Update concurrent tasks value for the processor "Transform Document - Find
Attributes From Database" in process group "Find Attributes from Database"
under the DatabaseProductStage1c
pipe.
{ "name": "auth.reindex", "pipes": [ { "name": "DatabaseProductStage1c", "properties": [ { "name": "ConcurrentlySchedulableTaskCount", "value": "5", "scope": { "name": "Find Attributes from Database.Transform Document - Find Attributes From Database", "type": "PROCESSOR" } } ] } ] } }
- Update the concurrent tasks value for all processors in "SCROLL SQL" process
group under the DatabaseProductStage1c
pipe.
{ "name": "auth.reindex", "pipes": [ { "name": "DatabaseProductStage1c", "properties": [ { "name": "ConcurrentlySchedulableTaskCount", "value": "3", "scope": { "name": "SCROLL SQL.*", "type": "PROCESSOR" } } ] } ] }
Update processor properties:
http://nifiHost:nifiPort/nifi-api/processors/{processorId}
You can view in JSON response (
) the properties that can be changed."name": "properties.{property name to change}"
Example:
{
"name": "auth.reindex",
"pipes": [
{
"name": "DatabaseProductStage1c",
"properties": [
{
"name": "properties.Max Wait Time",
"value": "25 seconds",
"scope": {
"name": "SCROLL SQL.Execute SQL - Find Parent Category",
"type": "PROCESSOR"
}
}
]
}
]
}
Connection
- sourceName: The name of the processor/process group the connection is going from (receiving the flow files).
- destinationName: The name of the processor/process group the connection is going to (sending the flow files).
Example:
{
"name": "BackPressureObjectThreshold",
"value": "600",
"scope": {
"name": "SCROLL SQL",
"sourceName": "Execute SQL - Find Parent Category",
"destinationName": "Analyze Successful SQL Response",
"type": "CONNECTION"
}
}
Update connection configurations:
You can update or change the following connection configuration items:
Configuration item name | Corresponding configuration item name in JSON | Description | Accepted value |
---|---|---|---|
Back Pressure Object Threshold | BackPressureObjectThreshold | The maximum number of objects that can be queued before backpressure is applied. | n Note: n is an integer. |
Size Threshold | BackPressureDataSizeThreshold | The maximum data size of objects that can be queued before backpressure is applied. | n GB Note: n is an integer. |
FlowFile Expiration | FlowFileExpiration | The maximum amount of time an object may be in the flow before it is aged out of the flow. . |
n sec Note: n is an integer. |
Example:
Update a connection between "Analyze Successful SQL Response" processor and "SUCCESS" output port in the "SCROLL SQL" group under "DatabaseProductStage1c".
{
"name": "auth.reindex",
"pipes": [
{
"name": "DatabaseProductStage1c",
"properties": [
{
"name": "BackPressureObjectThreshold",
"value": "200",
"scope": {
"name": "SCROLL SQL",
"sourceName": "Analyze Successful SQL Response",
"destinationName": "SUCCESS",
"type": "CONNECTION"
}
}
]
}
]
}
Process groups
Customize your process groups by including the following property in
.This customizes variables at the process group level and only on the process group specified. To customize multiple process group each will need its own pipe properties entry.
Example:
{ "name": "auth.reindex", "pipes": [ { "name": "DatabaseProductStage1c", "properties": [ { "name": "matchmaker.proximity", "value": "0.1", "scope": { "name": "Rollup Attributes", "type": "PROCESS_GROUP" } }, { "name": "matchmaker.proximity", "value": "0.4", "scope": { "name": "Find Attributes", "type": "PROCESS_GROUP" } } ] } ] }
Controller services
Customize controller services by including the following property in
.This property permits customization of variables in the main pipe level and only on the controller service specified. To customize multiple controller services each will need its own pipe properties entry.
Example:
{ "name": "auth.reindex", "pipes": [ { "name": "DatabaseProductStage1c", "properties": [ { "name": "Max Total Connections", "value": "20", "scope": { "name": "Database Connection Pool", "type": "CONTROLLER_SERVICE" } } ] } ] }
Multiple pipe customizations
This API does support customizing multiple pipe in one connector. Do this customization by including multiple pipe entries.
Example:
{ "name": "auth.reindex", "pipes": [ { "name": "DatabaseProductStage1c", "properties": [ { "name": "concurrent.tasks", "value": "1", "scope": { "name": "Find Attributes from Database.Transform Document - Find Attributes From Database", "type": "PROCESSOR" } }, { "name": "matchmaker.proximity", "value": "0.6", "scope": { "name": "Rollup Attributes", "type": "PROCESS_GROUP" } }, { "name": "Max Total Connections", "value": "20", "scope": { "name": "Database Connection Pool", "type": "CONTROLLER_SERVICE" } } ] }, { "name": "WaitLink", "label": "WaitLink - Product Stage 1 - 3", "properties": [ { "name": "connector.stage.name", "value": "TESTING", "scope": { "name": "Wait for Completion", "type": "PROCESS_GROUP" } }, { "name": "concurrent.tasks", "value": "5", "scope": { "name": "Wait for Completion.Get Status", "type": "PROCESSOR" } } ] } ] }
-
Customizations made with this API are saved in Zookeeper. Consequently, if the connector is deleted and created again, then you can retrieve the customized connector descriptor using the request,
GET http://{IngestHost}:{IngestPort}/connectors/{connectorName}
. -
You can import customized connector using the request,
POST http://{IngestHost}:{IngestPort}/connectors/{connectorName}
. Using this request, you can import the customized connector in any one of the following two ways:- with an empty body if Zookeeper contains the customized
connector descriptor
OR
- With the extracted customized connector descriptor
- with an empty body if Zookeeper contains the customized
connector descriptor
- Save the customized connector descriptor as data in Zookeeper can get overwritten from Ingest. Saving the customized connector descriptor offers you the flexibility to easily move across different versions as saved customized descriptor can be manually merged with default connector descriptor.