NiFi link process groups
Link process groups are utility NiFi Process Groups that perform tasks in HCL Commerce NiFi pipelines.
The following Link process groups allow you to control pipeline flow, perform or initiate branching or splitting dataflow processes, or perform other pipeline housekeeping functions. If you need to modify a Link, you can customize the Link processor templates through a connector descriptor via the Ingest service. For more information, see Creating a NiFi service connector.
Alias Link
Alias Link pipe creates an alias for the given index by performing the following tasks:
- Retrieve a list of existing aliases from Elasticsearch.
- Scan for the internal index name of the given index specified through the index.alias.name variable.
- The format of the internal index name is: . “environment name” . “store id” . “index name” . “time id.
- Locate the alias name if available for this index.
- Recreate an alias for this index using the pattern: environment name” . “store id” . “index name.
- Delete the older index if it exists.
Usage
To allow the Ingest service to manage and upgrade this template, customize this template through a connector descriptor via the Ingest service. For more information, see Managing connectors in the Ingest service.
Provide the following upload with the connector descriptor:
- Use environment.alias.name to define an environment name of the search index to be swapped with an alias
- Use index.alias.name to define the search index name to be swapped with an alias
- Use keep.backup to retain the copy of the most recently swapped out index. This recently swapped out index can be kept for recovery purposes. The system can at most only keep one extra copy and deletes all older copies to conserve storage.
{
"// Activate Catalog index with an environment alias": {
"environment.alias.name": "environment name of the search index to be swapped with an alias",
"index.alias.name": "name of the search index to be swapped with an alias"
},
"name": "AliasLink",
"label": "AliasLink - Catalog",
"properties": [
{
"name": "environment.alias.name",
"value": "auth",
"scope": {
"name": "Alias Index",
"type": "PROCESS_GROUP"
}
},
{
"name": "index.alias.name",
"value": "catalog",
"scope": {
"name": "Alias Index",
"type": "PROCESS_GROUP"
}
}
]
}
Cache Link
This Cache Link pipe clears out all specified internal cache from the controller services:
- Store controller service caches supported languages, store level defaults, related store path.
- Configuration controller service caches Search component configuration.
- Catalog controller service caches catalog hierarchy and navigation paths for the given store.
- Matchmaker controller service caches configurations for Color and Dimension matchmakers, units of measure.
- NLP controller service caches lemmatized phrases used for Natural Language Processing (NLP).
- Profile controller service caches Ingest profile configuration for customization.
Usage
To allow the Ingest service to manage and upgrade this template, customize this template through a connector descriptor via the Ingest service. For more information, see Managing connectors in the Ingest service.
Provide the following ul when used with the connector descriptor:
- Use Clear to erase all cache contents from the corresponding controller service
- Use Refresh to erase and rebuild the cache contents from the corresponding controller service
- Use Skip to keep the current cache contents of the corresponding controller service
Clone Link
The Clone Link pipe makes an identical copy of the given index.
- It scans for the internal index name of the given index specified through the index.source.name and index.target.name variables.
- It calls the Elasticsearch /_clone endpoint to perform an index clone operation.
Usage
To allow the Ingest service to manage and upgrade this template, customize this template through a connector descriptor via the Ingest service. For more information, see Managing connectors in the Ingest service.
Provide the following upload with the connector descriptor:
- Use environment.source.name to define the environment name of the source index to be cloned.
- Use index.source.name as the name of the source index to be cloned.
- Use environment.target.name to define the environment name of the target index.
- Use index.target.nameas the name of the target index.
- Use wait.timeout to specify the maximum amount of time to wait for the clone operation to finish.
Continue Link
The Continue Link pipe is used together with the Flow Control
WaitLink
as the end-of-flow marker. It routes the flow to the
terminal service, which in turn sends a signal to the flow control processor to
release the next flow. When there is no more flow left, the current flow then
continues to the next stage.
No property is needed for this pipe.
Copy Link
The Copy Link pipe is a dataflow controller that copies a group of indexed document from one index to another. This copy operation is done is a "smart" way so that it only copies to fields in the target index that have different values. Even though the source index field has been updated, if the field value in the target index is the same, no operation will be performed. After each update on the target index, a conditional cache invalidation can be issued based on a custom pattern and a cache name provided.
Usage
- Use Replacement Value in a Generate Query processor of SCROLL Elasticsearch to define the Elasticsearch query.
- Use Entity Identifier in the extract source document to define the _id key of the target index document; this identifier can be expressed using flowfile and registry variables, as well as using index field name in the search response surrounded by square brackets [ ].
- Use cache.channel.name to define the name of the local cache to be used to temporarily store the extracted data.
- Use the environment.source.name variable to define the name of the environment where the source index comes from.
- Use the environment.target.name variable to define the name of the environment to which to copy the index.
- Use the index.source.name variable to define the name of the source index for copying.
- Use the index.target.name variable to define the name of the target index for copying.
- Use the scroll.bucket.size and scroll.page.size variables to control the batch size when reading and writing to the search index. Use the same value for both variables.
- Use scroll.duration to define the amount of time the search engine should use to retain the search result data for scrolling.
- Use Cache Invalidation Strategy in
Update Target Document
to define when to issue invalidation. - Use Cache Invalidation Template in
Update Target Document
to define the dependency identifier to be used for invalidation. This identifier can be expressed using flowfile and registry variables, as well as using index field name in the search response surrounded by square brackets[ ]
. - Use Cache Name to define the name of the cache to which invalidations will be sent.
- Use Field Name to define a pattern for matching the field name that will be used for applying the invalidation strategy.
Limitation
This copy function only works with single value fields.
{
"// Copying all inventory counts from Inventory index back to Product index": {
"environment.source.name": "the name of the environment where the source index comes from",
"environment.target.name": "the name of the environment to copy the index to",
"cache.channel.name": "the name of the local cache to be used to temporarily store the extracted data",
"index.source.name": "the name of the source index for copying",
"index.target.name": "the name of the target index for copying",
"scroll.page.size": "search query scroll page size",
"scroll.bucket.size": "size of the batch response to be received within each scroll page",
"scroll.duration": "the time period to retain the search context for scrolling"
},
"name": "CopyLink",
"label": "CopyLink - Inventory (Copy To Product)",
"properties": [
{
"name": "connector.stage.name",
"value": "Inventory Stage 2, Copy Inventories",
"scope": {
"name": "Prepare Copy",
"type": "PROCESS_GROUP"
}
},
{
"name": "cache.channel.name",
"value": "services/cache/nifi/Inventory",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "cache.invalidation.template",
"value": "storeId:productId:[id.store]:[id.catentry]",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "cache.invalidation.channel",
"value": "baseCache",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "cache.invalidation.field",
"value": "inventories.*.quantity",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "environment.source.name",
"value": "live",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "environment.target.name",
"value": "live",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "index.source.name",
"value": "live.inventory",
"scope": {
"name": "Prepare Copy",
"type": "PROCESS_GROUP"
}
},
{
"name": "scroll.page.size",
"value": "6000",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "scroll.bucket.size",
"value": "2000",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "scroll.duration",
"value": "30s",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "properties.Route on default language",
"value": "allow",
"scope": {
"name": "Extract Source Document.Route On Language",
"type": "PROCESSOR"
}
},
{
"name": "properties.Route on other supported languages",
"value": "allow",
"scope": {
"name": "Extract Source Document.Route On Language",
"type": "PROCESSOR"
}
},
{
"name": "properties.Entry Identifier",
"value": "${param.storeId}-${param.langId}-[id.catalog]-[id.catentry]",
"scope": {
"name": "Extract Source Document.Extract Source Document",
"type": "PROCESSOR"
}
},
{
"name": "properties.Replacement Value",
"value": "{\"stored_fields\":[\"id.*\",\"inventories.*\"],\"size\":${es.pageSize},\"_source\":false,\"query\":{\"bool\":{\"must\":{\"match_all\":{}},\"filter\":[{\"term\":{\"id.store\":\"${param.storeId}\"}},{\"term\":{\"id.catalog\":\"${param.catalogId}\"}} ${extCatentryES} ${extDataloadES} ]}}}",
"scope": {
"name": "SCROLL Elasticsearch.Generate query",
"type": "PROCESSOR"
}
}
]
}
Refresh Link
The Refresh Link pipe explicitly refreshes the specified index searcher to pick up the latest updates.
- It scans for the internal index name of the given index specified through the index.refresh.name variable.
- It determines what index name format to use based on whether the current index
is Store, or is used for Near Real Time (NRT) processing:
- Format for Store index:
“environment name” . “index name”
. - Format for NRT usages:
“environment name” . “store id” . “index name”
. - Format for full re-indexing:
. “environment name” . “store id” . “index name” . “time id”
.
- Format for Store index:
- Call Elasticsearch to perform an index refresh operation.
Usage
To allow the Ingest service to manage and upgrade this template, customize this template through a connector descriptor via the Ingest service. For more information, see Managing connectors in the Ingest service.
Provide the following ul when used with the connector descriptor:
- Use index.refresh.name to define the name of the search index to be refreshed
{
"// Explicitly refresh the Description index searcher to pick up the latest updates": {
"index.refresh.name": "name of the search index to be refreshed"
},
"name": "RefreshLink",
"label": "RefreshLink - Description",
"properties": [
{
"name": "index.refresh.name",
"value": "description",
"scope": {
"name": "Refresh Index",
"type": "PROCESS_GROUP"
}
}
]
}
Lock Link
The Lock Link pipe changes the lock state of a given index. When an index is locked, no write operation is allowed.
- Scans for the internal index name of the given index specified through the environment.lock.nameand index.lock.namevariable.
- Calls Elasticsearch /_settings endpoint to perform the index lock or unlock operation
Usage
To allow the Ingest service to manage and upgrade this template, customize this template through a connector descriptor via the Ingest service. For more information, see Managing connectors in the Ingest service.
Provide the following ul when used with the connector descriptor:
- Use environment.lock.name as the environment name of the search index to be locked
- Use index.lock.name to define the name of the search index to be locked
- Use enable.lock to block all write operations to index when set to false
Merge Link
The Merge Link pipe performs an explicity index shard merge operation against the given index:
- Scans for the internal index name of the given index specified through the index.merge.namevariable.
- Calls Elasticsearch /_forcemerge endpoint to perform the index merge operation synchronously.
Usage
To allow the Ingest service to manage and upgrade this template, customize this template through a connector descriptor via the Ingest service. For more information, see Managing connectors in the Ingest service.
Provide the following ul when used with the connector descriptor:
- Use index.merge.name to define the name of the search index to be merged
- Use index.merge.segment to define the number of segments to merge to; to fully merge indices, set this to 1
Replicate Link
The Replicate Link pipe enables back the specified index replicas and automatic index refresh interval:
- Scans for the internal index name of the given index specified through the index.refresh.namevariable.
- Calls Elasticsearch to update index settings using the newly defined replicas and refresh interval.
Usage
To allow the Ingest service to manage and upgrade this template, customize this template through a connector descriptor via the Ingest service. For more information, see Managing connectors in the Ingest service.
Provide the following upload when used with the connector descriptor:
- Use index.refresh.name to define the name of the search index to be replicated
- Use index.refresh.intervalto define the amount of wait time between each automatic index refresh
- Use index.replicas to define the number of desired index replicas to be used for the given search index
{
"// Enabling back Description index replicas and automatic index refresh interval": {
"index.refresh.name": "name of the search index affected",
"index.refresh.interval": "amount of wait time between each index refresh",
"index.replicas": "number of desired index replicas to be used"
},
"name": "ReplicateLink",
"label": "ReplicateLink - Description",
"properties": [
{
"name": "index.refresh.name",
"value": "description",
"scope": {
"name": "Update Refresh Interval",
"type": "PROCESS_GROUP"
}
},
{
"name": "index.refresh.interval",
"value": "1s",
"scope": {
"name": "Update Refresh Interval",
"type": "PROCESS_GROUP"
}
},
{
"name": "index.replicas",
"value": "0",
"scope": {
"name": "Update Refresh Interval",
"type": "PROCESS_GROUP"
}
}
]
}
Reset Link
The Reset Link pipe should be the first pipe section of each connector. It simply resets all related NiFi counters for the current store. No property is needed for this pipe.
Save Link
The Save Link pipe should be the second-to-last last pipe section of each connector. It simply saves all related progress counters inside of Nifi for this current indexing operation into Elasticsearch. No property is needed for this pipe.
Terminal
The Terminal pipe should be the last pipe section of each connector. It simply routes the flow to the Terminal Service to trigger a summary report to be generated for the current run operation. No property is needed for this pipe.
If an x-lock is detected in the cache for any exclusive operation (such as Indexing
or PTL), the SynchronizationHelper
, a custom processor, is employed
in UNLOCK
mode to release the x-lock at this stage of the PTL or
Indexing pipeline.
Store Link
The Store Link pipe splits up the single Store level indexing request into multiple individual requests, depending on the number of catalogs and languages supported in the given store. This pipe should be used at the very beginning of a connector to initialize language and catalog specific flows for each store.
In addition to splitting up the incoming flow per catalog per language of the given store, this pipe also initializes the following flowfile attributes for each of its outbound flows generated:
- master.catalog is the master catalog ID for the current store.
- default.catalog is the default catalog ID for the current store.
- default.language is the default language ID for the current store.
- default.contract is the default contract ID for the current store.
- default.currency is the default currency for the current store.
- param.storeId is the current store ID.
- param.catalogId is the catalog ID used for the current flow.
- param.langId is the language ID used for the current flow.
- flow.lf defines whether or not the current flow uses language fallback.
NRT Link
The NRT Link pipe splits up a single Near Real-Time (NRT) indexing request into multiple individual related Store requests, depending on the number of catalogs and languages supported for all its related stores. This pipe should be used at the very beginning of an NRT connector to initialize language and catalog specific flows for each related store.
In addition to splitting up the incoming flow per catalog per language of each related store, this pipe also initializes the following flowfile attributes for each of its outbound flows generated:
- master.catalog is the master catalog ID for the current store.
- default.catalog is the default catalog ID for the current store.
- default.language is the default language ID for the current store.
- default.contract is the default contract ID for the current store.
- default.currency is the default currency for the current store.
- param.storeId is the current store ID.
- param.catalogId is the catalog ID used for the current flow.
- param.langId is the language ID used for the current flow.
- flow.lf defines whether or not the current flow uses language fallback.
Moreover, this pipe prevents the concurrent execution of NRT while any exclusive operation (Indexing or Push-to-live) is in progress. The SynchronizationHelper, a custom processor, operates in the "VALIDATE" mode to monitor ongoing exclusive operations. It examines the Redis cache for an x-lock entry. If an x-lock is detected, the flowfile will be placed in a wait queue until the x-lock is released or the specified time in the "wait-time" parameter elapses. It is important to adjust this parameter according to the specific environment.
Depending on the kind of pipeline being used with this NRT connector, different sets of variables will be set as additional flowfile attributes for downstream NRT SQL composition. See the associated upload from the following UpdateAttribute processor for details:
- Dataload pipeline: Prepare Dataload Join SQL NiFi processor
extDataloadES
- TI_DELTA_CG_FACET_JOIN_QUERY
- TI_DELTA_CG_JOIN_QUERY
- TI_DELTA_CG_URL_1C_JOIN_QUERY
- TI_DELTA_CG_URL_JOIN_QUERY
- TI_DELTA_JOIN_QUERY
- TI_DELTA_JOIN_QUERY_ATTCH
- TI_DELTA_JOIN_QUERY_ATTR
- TI_DELTA_JOIN_QUERY_BUNDLE
- TI_DELTA_JOIN_QUERY_DYNKIT
- TI_DELTA_JOIN_QUERY_FIND_PARENT
- TI_DELTA_JOIN_QUERY_MASSOCCECE
- TI_DELTA_JOIN_QUERY_ROLLUP_ATTR
- Attribute pipeline: Prepare Attribute NRT SQL extension
NiFi processor
- extAttributeAnd
- Product pipeline: Prepare Product NRT SQL extension NiFi
processor
extCatentryAndSQL
extCatentryAndSQL1a
extCatentryAndSQL1b
extCatentryES
extCatentryIdFromAndSQL
extCatentryIdWhereParentOrChild
extCatentryParentIdWhereSQL
extCatentryURLES
extCatentryWhereSQL
extDataCatentryId
extDataCatentryIdParent
- Category pipeline: Prepare Category NRT SQL extension NiFi
processor
extCatgroupAndSQL
extCatgroupAndSQL1a
extCatgroupAndSQL1b
extCatgroupES
extCatgroupURLES
Dataload Link
The Dataload Link pipe splits up a single Dataload indexing request into multiple individual related Store requests, depending on the number of catalogs and languages supported for all its related stores. This pipe should be used at the very beginning of a Dataload connector to initialize language and catalog specific flows for each related store.
In addition to splitting up the incoming flow per catalog per language of each related store, this pipe also initializes the following flowfile attributes for each of its outbound flows generated:
- master.catalog is the master catalog ID for the current store.
- default.catalog is the default catalog ID for the current store.
- default.language is the default language ID for the current store.
- default.contract is the default contract ID for the current store.
- default.currency is the default currency for the current store.
- param.storeId is the current store ID.
- param.catalogId is the catalog ID used for the current flow.
- param.langId is the language ID used for the current flow.
- flow.lf defines whether or not the current flow uses language fallback.
Depending on the kind of pipeline being used with this dataload connector, different sets of variables will be set as additional flowfile attributes for the downstream composition of dataload SQL.
This pipe prevents the concurrent execution of Dataload while any exclusive operation (Indexing or Push-to-live) is in progress. The SynchronizationHelper, a custom processor, operates in the "VALIDATE" mode to monitor ongoing exclusive operations. It examines the Redis cache for an x-lock entry. If an x-lock is detected, the flowfile will be placed in a wait queue until the x-lock is released or the specified time in the "wait-time" parameter elapses. It is important to adjust this parameter according to the specific environment.
See the following UpdateAttribute processors for details:
extDataloadES
- TI_DELTA_CG_FACET_JOIN_QUERY
- TI_DELTA_CG_JOIN_QUERY
- TI_DELTA_CG_URL_1C_JOIN_QUERY
- TI_DELTA_CG_URL_JOIN_QUERY
- TI_DELTA_JOIN_QUERY
- TI_DELTA_JOIN_QUERY_ATTCH
- TI_DELTA_JOIN_QUERY_ATTR
- TI_DELTA_JOIN_QUERY_BUNDLE
- TI_DELTA_JOIN_QUERY_DYNKIT
- TI_DELTA_JOIN_QUERY_FIND_PARENT
- TI_DELTA_JOIN_QUERY_MASSOCCECE
- TI_DELTA_JOIN_QUERY_ROLLUP_ATTR
Reindex Link
The Reindex Link pipe prepares all required flowfile attributes to perform a full re-indexing request for the given store. This pipe should be used at the very beginning of a full re-index connector.
- From version 9.1.15.2
-
This pipe ensures preventing the concurrent execution while this exclusive operation is running.
The
Wait for No Activity – process-group
is responsible for implementing thewait+lock
mechanism with the operation mode set toLOCK
, lock-level set toLOCAL
, and parameters including wait-time and cacheTTL.The behavior of this mechanism is determined by the values specified for the parameters wait-time and cacheTTL in the custom processors
WaitForNoActivity
andSynchronizationHelper
. Therefore, adjustments to these parameters should be made based on the specific environment.It is advised to avoid concurrent execution of PTL and Indexing. If PTL relies on indexing, the indexing status should be monitored until completion. Once the indexing is finished, PTL can be initiated.
Note: For detailed instructions on adjusting the wait time and cacheTTL parameters, please refer to the documentation available on the NiFi User Interface at the following location:
- From version 9.1.16.0
-
This pipe ensures preventing the concurrent execution while this exclusive operation is running.
The validate and lock (Process-group) is responsible for implementing the
wait+lock
mechanism, with the operation mode set toLOCK
, the lock level set toLOCAL
, and parameters including wait-time and cacheTTL.The behavior of this mechanism is determined by the values specified for the parameters wait-time and cache TTL in the custom processor
SynchronizationHelper
. Therefore, these parameters should be adjusted based on the specific environment.Avoid concurrent execution of PTL and Indexing. If PTL relies on indexing, the indexing status should be monitored until completion. Once the indexing is finished, PTL can be initiated.
Note: For detailed instructions on adjusting the wait time and cacheTTL parameters, please refer to the documentation available on the NiFi User Interface at the following location:
- Usage
-
To allow the Ingest service to manage and upgrade this template, customize this template through a connector descriptor via the Ingest service. For more information, see Managing connectors in the Ingest service.
Provide the following upload when used with the connector descriptor:
- Use flow.language.fallback to define whether or not to perform language fallback operation during indexing time
Split Link
Split Link Pipe can be used to branch off the current dataflow so as to launch a new asynchronous flow against a separate connector. A separate summary report will be generated immediately following the completion of the dataflow in this connector.
Usage
To allow Ingest service to manage and upgrade this template, it is recommended to customize this template through a connector descriptor via the Ingest service. For more information, see Extending Ingest connectors.
Use the split.connector.name variable to define the connector name to forward the current dataflow
{
"// Launch the post-index connector to perform indexing as a background task": {
"split.connector.name": "the name of the Connector to send the event to"
},
"name": "SplitLink",
"label": "SplitLink - Post-Index",
"properties": [
{
"name": "split.connector.name",
"value": "auth.postindex",
"scope": {
"name": "Launch Connector",
"type": "PROCESS_GROUP"
}
}
]
}

This addition does not affect existing customizations to NiFi. Existing customizations made to NiFi are kept in separate connectors and you can call these connectors from SplitLink.
Wait Link
The Wait Link pipe is a dataflow controller that blocks a NiFi dataflow until a certain condition has been satisfied:
- Reaching the end of certain pipeline within a connector. When this pipe is configured as a connector level dataflow controller, it only limits up to a defined number of concurrent flows running within the same connector.
- Completion of processing all NiFi flowfiles of a given pipe and its associated
bulk indexing requests sent to Elasticsearch. Alternatively, when this pipe is
setup as an event listener that only waits for a certain type of incoming
events, based on which of the following "Wait Method" has been chosen:
- Event method: listens to the Redis channel defined in the
Channel Name property of the Subscribe
Redis NiFi processor. Use
Wait Strategy
to control the blocking scope andWait Link
only unblocks when a matching message key has been received from this Redis channel:- Connector level locking
- To be used as Flow Control. The message key is in this
format:
Its JSON message content has these properties:<connector.name> - <stage.name>
catalog, connector, environment, flow, language, run, status, store, wait
- Store level locking
- To be used as Flow Control. The message key is in this
format:
Its JSON message content has these properties:<connector.name> - <stage.name> - <store.id>
catalog, connector, environment, flow, language, run, status, store, wait
- Run level locking
- To be used as Flow Control and the message key is in this
format:
Its JSON message content has these properties:<connector.name> - <stage.name> - <run.id>
wait, environment, run, store, language, catalog, connector, flow, status
- Stage level locking
- To be used with
WaitLink
. The message key is in this format:
Iits JSON message content has these properties:<connector.name> - <stage.name> - <store.id> - <language.id> - <catalog.id> - <run.id>
wait, environment, run, store, language, catalog, connector, flow, status
To illustrate with a
WaitLink
example atProduct Stage 1a
of the auth.reindex connector, assume a dataflow with 11 store.id, -1 language.id, 10001 catalog.id, for run9d310aba-756b-4690-adae-856528d70f10
. When aWaitLink
is set up to block at this Product Stage 1a, the dataflow will only be released when a JSON message with key“auth.reindex-DatabaseProductStage1a-11--1-10001-9d310aba-756b-4690-adae-856528d70f10”
is received from the services/nifi/wait Redis channel for the Authoring environment. - Wait method: pauses the current dataflow for the amount of time specified in the “Penalty Duration.”
- Scan method: monitors the top level query size of the specified pipe to wait until the counter falls to zero before the dataflow is released for further downstream processing.
- Event method: listens to the Redis channel defined in the
Channel Name property of the Subscribe
Redis NiFi processor. Use
Usage
To allow the Ingest service to manage and upgrade this template, customize this template through a connector descriptor via the Ingest service. For more information, see Managing connectors in the Ingest service.
Provide the following upload when used with the connector descriptor:
As a Flow Controller:
- Set connector.flow.control to true .
- Use connector.flow.limit to define the maximum number of parallel flows to be performed by the connector at any time.
- Use wait.strategy to control the level of locking:
connector
,store
, orrun
. - Ensure that the wait.flow.strategy variable is set to one of
the following values, based on the level of locking set above:
"Connector" = "connector.id" "Store" = "store.id" "Run" = "run.id"
As an Event Listener:
- Optionally set connector.flow.control to false.
- Use connector.stage.name to define the name of the NiFi process group to monitor.
- Use wait.connector.name to define the name of another connector to monitor (optional).
- Use wait.error.strategy to define whether to continue or stop immediately when an error happens.
- Use wait.error.limit to define the level of error toleration before applying the error strategy.
- Use wait.method to define whether to use event based trigger or to scan queue size.
WaitLink
pipe.{
"// Wait for completion of all Product documents before proceeding to the next stage": {
"connector.stage.name": "name of the process group to monitor",
"wait.error.limit": "define the error limit to stop or empty for no limit",
"wait.method": "define whether to use event based trigger or to scan queue size"
},
"name": "WaitLink",
"label": "WaitLink - Product Stage 1a",
"properties": [
{
"name": "connector.stage.name",
"value": "DatabaseProductStage1a",
"scope": {
"name": "Wait for Completion",
"type": "PROCESS_GROUP"
}
},
{
"name": "wait.error.limit",
"value": "0",
"scope": {
"name": "Wait for Completion",
"type": "PROCESS_GROUP"
}
},
{
"name": "wait.method",
"value": "event",
"scope": {
"name": "Wait for Completion",
"type": "PROCESS_GROUP"
}
}
]
}
PTLLink
The NiFi push-to-live pipeline's objective is to use the index clone method to move indices from authoring to Live environment. If you do not want to execute a full reindex for Live indices, you may use push to Live environment to shift the authoring indices to Live environment.
This pipe prevents concurrent concurrent execution while this exclusive operation is running.
The Wait for No Activity – process-group
is responsible for
implementing the wait+lock
mechanism with the operation mode set to
LOCK
, lock-level set to GLOBAL
, and parameters
including wait-time and cacheTTL.
The behavior of this mechanism is determined by the values specified for the
parameters wait-time and cacheTTL in the custom processors
WaitForNoActivity
and SynchronizationHelper
.
Therefore, these parameters should be adjusted based on the specific
environment.