Upgrading batch version for pipes
To use the latest version of your Ingest components, the preferred approach is to upgrade your pipelines using the Version context menu in the NiFi editor.
About this task
NiFi pipelines are constructed out of pipes, with each pipe containing components
that determine how the incoming flow is to be processed. Pipes are often used in
multiple connectors and sometimes issues can occur with these pipes. In such cases
they must be updated throughout NiFi. The upgrade process described here applies to
pipes throughout the entirety of NiFi; it will find all occurances of a pipe and
attempt to upgrade their version. It is possible to use the
http://Ingest_HOST:Ingest_PORT/upgrade
API to upgrade individual pipes, however this is much longer process because this
API is connector-specific. It will not upgrade pipes across all of NiFi.
Procedure
To upgrade your ingest pipeline components across all of NiFi:
-
Add a new version of your pipe. You can do this in two ways.
- Right-click on your chosen pipe. In the menu that pops up, select An asterisk (* symbol) will appear in the title bar of the pipe. This indicates that there are local changes in that pipe.
- Alternatively, you can upload a modified or updated pipe descriptor to the registry. For instructions on how to do this, see the Apache NiFi Registry System Administrator’s Guide.
Note: The status bar under the pipe's title shows useful information about your upgrade. An upward-pointing arrow in a red circle displays the number of pipes that can be updated using http://Ingest_HOST:Ingest_PORT/pipe/{id}/upgradeVersion. An exclamation mark in a red circle shows the number of pipes that have local changes and so will have to use the http://Ingest_HOST:Ingest_PORT/pipe/{id}/revertVersion API and then http://Ingest_HOST:Ingest_PORT/pipe/{id}/upgradeVersion. -
Upgrade the pipe version, by right clicking on the pipe and selecting
. Three responses are possible:
- If the action is completely successful, the REST API will return a 200 code.
- A partly successful or incomplete result returns a 409 code.
- An unsuccessful result returns a 409, along with a detailed list of the errors encountered.
If a request has partly failed or fully failed, issue the REST call POST http://Ingest_HOST:Ingest_PORT//pipe/{pipe.name}/resetAndUpgrade. This call reverts the pipe to its base version by removing all the local changes, and auto upgrades the pipes if possible.When this call returns a 200 (success) reponse, note down the detailed message in the response body. Each line describes a pipe that contained local changes and therefore must be updated using the PUT http://Ingest_HOST:Ingest_PORT/connector API. This will reapply pipe configuration changes.
-
In the case of a 409 result, reapply your local changes. This is needed because
local changes are made not only manually but also during the Ingest startup
process. Those startup local changes contain vital modifications to ensure that
connector logic does not break for the different environments (auth, live).
Examine the body content from your initial REST calls to locate duplicate
instances of connectorId, as those updates can be grouped
into a single request. For example,
"pipeId:f53eb8e1-017a-1000-2179-f7f4c528f0ec, connectorId:auth.reindex, pipeName:DatabaseURLStage1f, pipeLabelName:URL Stage 1f (Content URL)" "pipeId:f53da399-017a-1000-bfaf-0a1e2104cc99, connectorId:auth.content, pipeName:DatabaseURLStage1f, pipeLabelName:URL Stage 1f (Content URL)" "pipeId:f544d053-017a-1000-394b-a19ed82ba8c4, connectorId:live.reindex, pipeName:DatabaseURLStage1f, pipeLabelName:URL Stage 1f (Content URL)" "pipeId:f542fb64-017a-1000-eef0-2a643ac00868, connectorId:live.content, pipeName:DatabaseURLStage1f, pipeLabelName:URL Stage 1f (Content URL)" "pipeId:f542fb64-017a-1000-eef0-2a643ac00868, connectorId:live.content, pipeName:DatabaseURLStage1f, pipeLabelName:URL Stage 1f (Content URL)"
- Find your local changes. For each connectorId, call GET http://Ingest_HOST:Ingest_PORT/connectors/{connectorId}. Parse through the response to find pipe entries with name {pipeName} and label: {pipeLabelName}.
-
Create an update body by copy the entire pipe entry with all the properties
except for:
"name": "connector.name"
"name": "pipe.name"
For example,{ "name": {connectorId}, "pipes": [ //PIPES ] }
{ "name": "auth.reindex", "pipes": [ { "name": "DatabaseURLStage1f", "label": "URL Stage 1f (Content URL)", "properties": [ { "name": "scroll.bucket.size", "value": "1500", "scope": { "name": "Create Content URL Document", "type": "PROCESS_GROUP" } }, { "name": "scroll.page.size", "value": "100000", "scope": { "name": "Create Content URL Document", "type": "PROCESS_GROUP" } }, { "name": "Database User", "value": "${AUTH_JDBC_USER_NAME}", "scope": { "name": "Database Connection Pool", "type": "CONTROLLER_SERVICE" } }, ] } ] }
- Apply the updates by copying the update body and sending a request with that content to PUT http://Ingest_HOST:Ingest_PORT/connectors.