Managing connectors in the Ingest service
How to manage your descriptors and connectors, so as to optimize your search indexing for Elasticsearch.
Connectors are used to ingest data into the HCL Commerce Search service. You use a connector descriptor to tell NiFi how to combine the various pipes to make the connector. The relationship is akin to the concepts of ‘class’ and ‘object’ in Java. In this case, a connector is like an object and a descriptor is like a class.
For information on calling the Ingest service, see Search Ingest Service API.
Viewing all connectors
The GET: /connectors
endpoint can be called to get the descriptors
of all existing connectors in NiFi.
Viewing a specific connector
The GET: /connectors/{id}
endpoint can be used to get the descriptor
of the specified connector using its connector Id.
If the connector Id is not known then, GET:/connectors
endpoint can
be called to return the descriptors of all connectors. You can then look through this list
to find the id of the specific connector you are looking for.
Creating a connector
For detailed instructions on creating connectors, including examples and illustrations, see Creating a NiFi service connector.
Enabling a connector
The POST /connectors/{id}/enable
endpoint can be used to start all
the pipes that are contained in the specified connector.
Disabling a connector
The POST /connectors/{id}/disable
endpoint can be used to stop all
the pipes that are contained in the specified connector.
Upgrading a connector
The POST: /connectors/{id}/upgrade
endpoint can be used to upgrade
individual connectors.
If the ID of the connector to be upgraded is not known, then the GET:
/connectors
endpoint can be used to get all descriptors. You can then find the
desired connector to identify its Id.
If the Upgrade API was executed with queued data or Ingest Service processes running and the NiFi UI Status Bar indicates invalid components, the Upgrade process will fail. The queue must be cleared by locating the Process Groups in the pipeline that has queued flowfile data. Stop the Process Groups that have queued data. For more information, see Stopping a component.
/connectors/auth.reindex/upgrade
Upgrade the current connector with the newest version of the pipes
There may be cases where a connector is already defined, but the pipes in that connector have been changed (The connector has the same pipes, but the pipes in Registry have had their contents updated).
In this case, the POST: /connectors/{id}/upgrade
endpoint can be
called to update the current connector with the newest version of the pipes.
Recreate the connectors based on the new registry
You can use this endpoint to recreate connectors based on the new registry. If you have persistent volume enabled for NiFi, then it stores the connectors in a flow.xml file with the registryId and flowId linked to that version of the registry. When changing versions, a new registry is pulled and deployed to recreate the connectors based on the new registry. The pipes have to be properly linked to the registry so that the connectors can be re-built. When NiFi starts up, it creates the connectors based on the flow.xml. If the pipes within the connectors do not have the correct registryId and flowId that match up with the new registry, it will cause issues because the IDs do not match up. NiFi will not be able to run the index properly. To avoid this the pipes have to be properly linked to the registry so that the upgrade API can rebuild the connector.
Customize connectors
You can use this endpoint for customization. When the upgrade API is called with the specified connector id/name (For example, auth.reindex), it deletes the existing connector in NiFi and recreates it using either the provided ConnectorDescriptorJson or, ConnectorDescriptorJson that is stored in Zookeeper. If the body of the request is empty, then it uses the ConnectorDescriptorJson that is stored in Zookeeper. When a body is provided with the API, it uses this to create the connector and then store it in Zookeeper. This enables you to make the customizations on the default ConnectorDescriptorJson by adding or removing pipes and changing the structure of the connector. This is recommended if you want to keep the customizations across the version. For more information, see Extending the Ingest service.
Example:
Adding 2 "UploadPriceStage1" pipes to the existing auth.price connector.
{
"id": "auth.price",
"name": "auth.price",
"description": "This is the connector for the contract price indexing pipeline to perform incremental updates. This operation involves only the Product indexing pipeline.",
"created": "2020-11-17T12:55:36.949",
"modified": "2020-11-17T12:55:36.949",
"pipes": [
{
"name": "NRTLink",
"properties": [
{
"name": "connector.name",
"value": "auth.price",
"scope": {
"name": "NRTLink",
"type": "PROCESS_GROUP"
}
},
{
"name": "pipe.name",
"value": "NRTLink",
"scope": {
"name": "NRTLink",
"type": "PROCESS_GROUP"
}
}
]
},
{
"name": "UploadPriceStage1",
"properties": [
{
"name": "connector.name",
"value": "auth.price",
"scope": {
"name": "UploadPriceStage1",
"type": "PROCESS_GROUP"
}
},
{
"name": "pipe.name",
"value": "UploadPriceStage1",
"scope": {
"name": "UploadPriceStage1",
"type": "PROCESS_GROUP"
}
}
]
},
{
"name": "Terminal",
"properties": [
{
"name": "connector.name",
"value": "auth.price",
"scope": {
"name": "Terminal",
"type": "PROCESS_GROUP"
}
},
{
"name": "pipe.name",
"value": "Terminal",
"scope": {
"name": "Terminal",
"type": "PROCESS_GROUP"
}
}
]
}
]
}
{
"id": "auth.price",
"name": "auth.price",
"description": "This is the connector for the contract price indexing pipeline to perform incremental updates. This operation involves only the Product indexing pipeline.",
"pipes": [
{
"name": "NRTLink",
"properties": [
{
"name": "connector.name",
"value": "auth.price",
"scope": {
"name": "NRTLink",
"type": "PROCESS_GROUP"
}
},
{
"name": "pipe.name",
"value": "NRTLink",
"scope": {
"name": "NRTLink",
"type": "PROCESS_GROUP"
}
}
]
},
{
"name": "UploadPriceStage1",
"properties": [
{
"name": "connector.name",
"value": "auth.price",
"scope": {
"name": "UploadPriceStage1",
"type": "PROCESS_GROUP"
}
},
{
"name": "pipe.name",
"value": "UploadPriceStage1",
"scope": {
"name": "UploadPriceStage1",
"type": "PROCESS_GROUP"
}
}
]
},
{
"name": "UploadPriceStage1",
"properties": [
{
"name": "connector.name",
"value": "auth.price",
"scope": {
"name": "UploadPriceStage1",
"type": "PROCESS_GROUP"
}
},
{
"name": "pipe.name",
"value": "UploadPriceStage1",
"scope": {
"name": "UploadPriceStage1",
"type": "PROCESS_GROUP"
}
}
]
},
{
"name": "UploadPriceStage1",
"properties": [
{
"name": "connector.name",
"value": "auth.price",
"scope": {
"name": "UploadPriceStage1",
"type": "PROCESS_GROUP"
}
},
{
"name": "pipe.name",
"value": "UploadPriceStage1",
"scope": {
"name": "UploadPriceStage1",
"type": "PROCESS_GROUP"
}
}
]
},
{
"name": "Terminal",
"properties": [
{
"name": "connector.name",
"value": "auth.price",
"scope": {
"name": "Terminal",
"type": "PROCESS_GROUP"
}
},
{
"name": "pipe.name",
"value": "Terminal",
"scope": {
"name": "Terminal",
"type": "PROCESS_GROUP"
}
}
]
}
]
}
Upgrading batch version for pipes
To use the latest version of your Ingest components, the preferred approach is to upgrade your pipelines using the Version context menu in the NiFi editor. If you want to update all of your connectors, see Upgrading batch version for pipes.
Cancelling a connector
POST:
/connectors/{id}/cancel
endpoint. This clears out all Ingest operations
that are currently in progress. This endpoint is also useful when the index run gets
stuck in the middle of the connector and you want to restart. This endpoint stops
the specified connector and the routing service. It stops the connector using the
provided ID (connector name). For example, auth.reindex. It then looks through each
pipe in the connector and looks into all connections. Once it finds a connection
with queued data, a drop request api is called to clear the queue in that
connection.Deleting a connector
You can delete a connector by calling the DELETE: /connectors/{id}
endpoint. If you do not know the ID of the connector to be deleted, you can call the
GET: /connectors
endpoint to learn all descriptors. You can
then locate the desired connector and read its ID.