Groupes de processus de lien NiFi
Les groupes de processus de lien sont des groupes de processus NiFi utilitaires qui exécutent des tâches dans des pipelines NiFi HCL Commerce.
Les groupes de processus de lien suivants vous permettent de contrôler le flux de pipeline, d'effectuer ou de démarrer le branchement ou le fractionnement des processus de flux de données ou d'exécuter d'autres fonctions de gestion des pipelines. Si vous devez modifier un lien, vous pouvez personnaliser les modèles de processeur de liens par le biais d'un descripteur de connecteur via le service Ingest. Pour plus d'informations, voir Création d'un connecteur de service NiFi.
Lien d'alias
Le canal Lien d'alias crée un alias pour l'index donné en effectuant les tâches suivantes :
- Extrayez une liste d'alias existants d'Elasticsearch.
- Recherchez le nom d'index interne de l'index donné spécifié via la variable index.alias.name.
- Le format du nom d'index interne est : . “environment name” . “store id” . “index name” . “time id.
- Recherchez le nom d'alias s'il est disponible pour cet index.
- Recréez un alias pour cet index à l'aide du modèle : environment name” . “store id” . “index name.
- Supprimez l'index plus ancien s'il existe.
Syntaxe :
Pour permettre au service Ingest de gérer et de mettre à niveau ce modèle, personnalisez ce modèle par le biais d'un descripteur de connecteur via le service Ingest. Pour plus d'informations, voir Gestion des connecteurs dans le service Ingest.
Fournissez le chargement suivant avec le descripteur de connecteur :
- Utilisez environment.alias.name pour définir un nom d'environnement de l'index de recherche à permuter avec un alias.
- Utilisez index.alias.name pour définir le nom de l'index de recherche à permuter avec un alias.
- Utilisez keep.backup pour conserver la copie de l'index permuté le plus récemment. Cet index récemment permuté peut être conservé à des fins de reprise. Le système ne peut au plus conserver qu'une copie supplémentaire et supprime toutes les copies plus anciennes pour conserver le stockage.
{
"// Activate Catalog index with an environment alias": {
"environment.alias.name": "environment name of the search index to be swapped with an alias",
"index.alias.name": "name of the search index to be swapped with an alias"
},
"name": "AliasLink",
"label": "AliasLink - Catalog",
"properties": [
{
"name": "environment.alias.name",
"value": "auth",
"scope": {
"name": "Alias Index",
"type": "PROCESS_GROUP"
}
},
{
"name": "index.alias.name",
"value": "catalog",
"scope": {
"name": "Alias Index",
"type": "PROCESS_GROUP"
}
}
]
}Lien de cache
Ce canal Lien de cache permet d'effacer tout le cache interne spécifié des services du contrôleur :
- Le service du contrôleur de magasin met en cache les langues prises en charge, les valeurs par défaut au niveau du magasin, le chemin d'accès au magasin associé.
- Le service de contrôleur de configuration met en cache la configuration du composant de recherche.
- Service du contrôleur de catalogue - met en cache la hiérarchie de catalogues et les chemins de navigation pour le magasin donné.
- Le service de contrôleur Matchmaker met en cache les configurations des matchmakers Couleur et Dimension, Unités de mesure.
- Le service de contrôleur NLP met en cache les phrases lemmatisées utilisées pour le traitement du langage naturel (NLP).
- Le service du contrôleur de profil met en cache la configuration de profil Ingest pour la personnalisation.
Syntaxe :
Pour permettre au service Ingest de gérer et de mettre à niveau ce modèle, personnalisez ce modèle par le biais d'un descripteur de connecteur via le service Ingest. Pour plus d'informations, voir Gestion des connecteurs dans le service Ingest.
Fournissez l'ul suivant lorsqu'il est utilisé avec le descripteur de connecteur :
- Utiliser Clear pour effacer tout le contenu du cache du service de contrôleur correspondant
- Utiliser Refresh pour effacer et régénérer le contenu du cache à partir du service de contrôleur correspondant
- Utiliser Skip pour conserver le contenu actuel du cache du service de contrôleur correspondant
Cloner le lien
Le canal Cloner le lien effectue une copie identique de l'index donné.
- Il recherche le nom d'index interne de l'index donné spécifié via les variables index.source.name et index.target.name.
- Il appelle le nœud final Elasticsearch /_clone pour effectuer une opération de clonage d'index.
Syntaxe :
Pour permettre au service Ingest de gérer et de mettre à niveau ce modèle, personnalisez ce modèle par le biais d'un descripteur de connecteur via le service Ingest. Pour plus d'informations, voir Gestion des connecteurs dans le service Ingest.
Fournissez le chargement suivant avec le descripteur de connecteur :
- Utilisez environment.source.name pour définir le nom d'environnement de l'index source à cloner.
- Utilisez index.source.name comme nom de l'index source à cloner.
- Utilisez environment.target.name pour définir le nom d'environnement de l'index cible.
- Utilisez index.target.name comme nom de l'index cible.
- Utilisez wait.timeout pour spécifier la durée maximale d'attente de la fin de l'opération de clonage.
Continuer le lien
Le canal Continuer le lien est utilisé avec le contrôle de flux WaitLink comme marqueur de fin de flux. Il achemine le flux vers le service de terminal, qui à son tour envoie un signal au processeur de contrôle de flux pour libérer le flux suivant. Lorsqu'il n'y a plus de flux, le flux en cours passe à l'étape suivante.
Aucune propriété n'est requise pour ce canal.
Copier le lien
Le canal Copier le lien est un contrôleur de flux de données qui copie un groupe de documents indexés d'un index à un autre. Cette opération de copie est effectuée de manière "intelligente", de sorte qu'elle copie uniquement dans les zones de l'index cible qui ont des valeurs différentes. Même si la zone d'index source a été mise à jour, si la valeur de zone de l'index cible est la même, aucune opération ne sera effectuée. Après chaque mise à jour de l'index cible, une invalidation conditionnelle du cache peut être émise en fonction d'un modèle personnalisé et d'un nom de cache fourni.
Syntaxe :
- Utilisez Replacement Value dans un processeur Générer une requête de SCROLL Elasticsearch pour définir la requête Elasticsearch.
- Utilisez Entity Identifier dans le document source d'extraction pour définir la clé _id du document d'index cible. Cet identificateur peut être exprimé à l'aide de variables de fichier de flux et de registre, ainsi qu'à l'aide du nom de zone d'index dans la réponse de recherche entre crochets [ ].
- Utilisez cache.channel.name pour définir le nom du cache local à utiliser pour stocker temporairement les données extraites.
- Utilisez la variable environment.source.name pour définir le nom de l'environnement d'où provient l'index source.
- Utilisez la variable environment.target.name pour définir le nom de l'environnement dans lequel copier l'index.
- Utilisez la variable index.source.name pour définir le nom de l'index source à copier.
- Utilisez la variable index.target.name pour définir le nom de l'index cible à copier.
- Utilisez les variables scroll.bucket.size et scroll.page.size pour contrôler la taille des lots lors de la lecture et de l'écriture dans l'index de recherche. Utilisez la même valeur pour les deux variables.
- Utilisez scroll.duration pour définir la durée que le moteur de recherche doit utiliser pour conserver les données des résultats de recherche pour le défilement.
- Utilisez Cache Invalidation Strategy dans
Update Target Documentpour définir quand émettre l'invalidation. - Utilisez Cache Invalidation Template dans
Update Target Documentpour définir l'identificateur de dépendance à utiliser pour l'invalidation. Cet identificateur peut être exprimé à l'aide de variables de fichier de flux et de registre, ainsi qu'à l'aide du nom de zone d'index dans la réponse de recherche entre crochets[ ]. - Utilisez Cache Name pour définir le nom du cache auquel les invalidations seront envoyées.
- Utilisez Field Name pour définir un modèle de correspondance avec le nom de zone qui sera utilisé pour l'application de la stratégie d'invalidation.
Limitation
Cette fonction de copie ne fonctionne qu'avec des zones à valeur unique.
{
"// Copying all inventory counts from Inventory index back to Product index": {
"environment.source.name": "the name of the environment where the source index comes from",
"environment.target.name": "the name of the environment to copy the index to",
"cache.channel.name": "the name of the local cache to be used to temporarily store the extracted data",
"index.source.name": "the name of the source index for copying",
"index.target.name": "the name of the target index for copying",
"scroll.page.size": "search query scroll page size",
"scroll.bucket.size": "size of the batch response to be received within each scroll page",
"scroll.duration": "the time period to retain the search context for scrolling"
},
"name": "CopyLink",
"label": "CopyLink - Inventory (Copy To Product)",
"properties": [
{
"name": "connector.stage.name",
"value": "Inventory Stage 2, Copy Inventories",
"scope": {
"name": "Prepare Copy",
"type": "PROCESS_GROUP"
}
},
{
"name": "cache.channel.name",
"value": "services/cache/nifi/Inventory",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "cache.invalidation.template",
"value": "storeId:productId:[id.store]:[id.catentry]",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "cache.invalidation.channel",
"value": "baseCache",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "cache.invalidation.field",
"value": "inventories.*.quantity",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "environment.source.name",
"value": "live",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "environment.target.name",
"value": "live",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "index.source.name",
"value": "live.inventory",
"scope": {
"name": "Prepare Copy",
"type": "PROCESS_GROUP"
}
},
{
"name": "scroll.page.size",
"value": "6000",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "scroll.bucket.size",
"value": "2000",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "scroll.duration",
"value": "30s",
"scope": {
"name": "Copy Document",
"type": "PROCESS_GROUP"
}
},
{
"name": "properties.Route on default language",
"value": "allow",
"scope": {
"name": "Extract Source Document.Route On Language",
"type": "PROCESSOR"
}
},
{
"name": "properties.Route on other supported languages",
"value": "allow",
"scope": {
"name": "Extract Source Document.Route On Language",
"type": "PROCESSOR"
}
},
{
"name": "properties.Entry Identifier",
"value": "${param.storeId}-${param.langId}-[id.catalog]-[id.catentry]",
"scope": {
"name": "Extract Source Document.Extract Source Document",
"type": "PROCESSOR"
}
},
{
"name": "properties.Replacement Value",
"value": "{\"stored_fields\":[\"id.*\",\"inventories.*\"],\"size\":${es.pageSize},\"_source\":false,\"query\":{\"bool\":{\"must\":{\"match_all\":{}},\"filter\":[{\"term\":{\"id.store\":\"${param.storeId}\"}},{\"term\":{\"id.catalog\":\"${param.catalogId}\"}} ${extCatentryES} ${extDataloadES} ]}}}",
"scope": {
"name": "SCROLL Elasticsearch.Generate query",
"type": "PROCESSOR"
}
}
]
}Actualiser le lien
Le canal Actualiser le lien actualise explicitement le moteur de recherche d'index spécifié pour récupérer les dernières mises à jour.
- Il recherche le nom d'index interne de l'index donné spécifié via la variable index.refresh.name.
- Il détermine le format de nom d'index à utiliser selon que l'index en cours est Magasin ou est utilisé pour le traitement en quasi-temps réel (NRT).
- Format de l'index Magasin :
“environment name” . “index name”. - Format pour les utilisations de NRT :
“environment name” . “store id” . “index name”. - Format pour la réindexation complète :
. “environment name” . “store id” . “index name” . “time id”.
- Format de l'index Magasin :
- Appelez Elasticsearch pour effectuer une opération d'actualisation d'index
Syntaxe :
Pour permettre au service Ingest de gérer et de mettre à niveau ce modèle, personnalisez ce modèle par le biais d'un descripteur de connecteur via le service Ingest. Pour plus d'informations, voir Gestion des connecteurs dans le service Ingest.
Fournissez l'ul suivant lorsqu'il est utilisé avec le descripteur de connecteur :
- Utiliser index.refresh.name pour définir le nom de l'index de recherche à actualiser
{
"// Explicitly refresh the Description index searcher to pick up the latest updates": {
"index.refresh.name": "name of the search index to be refreshed"
},
"name": "RefreshLink",
"label": "RefreshLink - Description",
"properties": [
{
"name": "index.refresh.name",
"value": "description",
"scope": {
"name": "Refresh Index",
"type": "PROCESS_GROUP"
}
}
]
}Verrouiller le lien
Le canal Verrouiller le lien modifie l'état de verrouillage d'un index donné. Lorsqu'un index est verrouillé, aucune opération d'écriture n'est autorisée.
- Il recherche le nom d'index interne de l'index donné spécifié via les variables environment.lock.name et index.lock.name.
- Il appelle le nœud final Elasticsearch /_settings pour effectuer l'opération de verrouillage ou de déverrouillage d'index.
Syntaxe :
Pour permettre au service Ingest de gérer et de mettre à niveau ce modèle, personnalisez ce modèle par le biais d'un descripteur de connecteur via le service Ingest. Pour plus d'informations, voir Gestion des connecteurs dans le service Ingest.
Fournissez l'ul suivant lorsqu'il est utilisé avec le descripteur de connecteur :
- Utiliser environment.lock.name comme nom d'environnement de l'index de recherche à verrouiller
- Utiliser index.lock.name pour définir le nom de l'index de recherche à verrouiller
- Utiliser enable.lock pour bloquer toutes les opérations d'écriture à indexer lorsqu'elles sont définies sur false
Fusionner le lien
Le canal Fusionner le lien effectue une opération explicite de fusion de fragments d'index par rapport à l'index donné :
- Recherchez le nom d'index interne de l'index donné spécifié via la variable index.merge.name.
- Appelle le nœud final Elasticsearch /_forcemerge pour effectuer l'opération de fusion d'index de manière synchrone.
Syntaxe :
Pour permettre au service Ingest de gérer et de mettre à niveau ce modèle, personnalisez ce modèle par le biais d'un descripteur de connecteur via le service Ingest. Pour plus d'informations, voir Gestion des connecteurs dans le service Ingest.
Fournissez l'ul suivant lorsqu'il est utilisé avec le descripteur de connecteur :
- Utiliser index.merge.name pour définir le nom de l'index de recherche à fusionner
- Utiliser index.merge.segment pour définir le nombre de segments à fusionner. Pour fusionner complètement les index, définissez cette valeur sur 1.
Lien de réplication
Le canal Lien de réplication active les répliques d'index spécifiées et l'intervalle d'actualisation automatique de l'index :
- Recherchez le nom d'index interne de l'index donné spécifié via la variable index.refresh.name.
- Appelle Elasticsearch pour mettre à jour les paramètres d'index à l'aide des répliques nouvellement définies et de l'intervalle d'actualisation.
Syntaxe :
Pour permettre au service Ingest de gérer et de mettre à niveau ce modèle, personnalisez ce modèle par le biais d'un descripteur de connecteur via le service Ingest. Pour plus d'informations, voir Gestion des connecteurs dans le service Ingest.
Fournissez l'ul suivant lorsqu'il est utilisé avec le descripteur de connecteur :
- Utiliser index.refresh.name pour définir le nom de l'index de recherche à répliquer
- Utiliser index.refresh.interval pour définir le temps d'attente entre chaque actualisation automatique de l'index
- Utiliser index.replicas pour définir le nombre de répliques d'index souhaitées à utiliser pour l'index de recherche donné
{
"// Enabling back Description index replicas and automatic index refresh interval": {
"index.refresh.name": "name of the search index affected",
"index.refresh.interval": "amount of wait time between each index refresh",
"index.replicas": "number of desired index replicas to be used"
},
"name": "ReplicateLink",
"label": "ReplicateLink - Description",
"properties": [
{
"name": "index.refresh.name",
"value": "description",
"scope": {
"name": "Update Refresh Interval",
"type": "PROCESS_GROUP"
}
},
{
"name": "index.refresh.interval",
"value": "1s",
"scope": {
"name": "Update Refresh Interval",
"type": "PROCESS_GROUP"
}
},
{
"name": "index.replicas",
"value": "0",
"scope": {
"name": "Update Refresh Interval",
"type": "PROCESS_GROUP"
}
}
]
}Réinitialiser le lien
Le canal Réinitialiser le lien doit être le premier canal de chaque connecteur. Il réinitialise simplement tous les compteurs NiFi associés pour le magasin actuel. Aucune propriété n'est requise pour ce canal.
Enregistrer le lien
Le canal Enregistrer le lien doit être la troisième section de canal en partant de la fin de chaque connecteur. Il enregistre simplement tous les compteurs de progression associés à l'intérieur de NiFi pour cette opération d'indexation en cours dans Elasticsearch. Aucune propriété n'est requise pour ce canal.
Terminal
Le canal Terminal doit être la section du dernier canal de chaque connecteur. Il achemine simplement le flux vers le service de terminal pour déclencher la génération d'un rapport récapitulatif pour l'opération d'exécution en cours. Aucune propriété n'est requise pour ce canal.
Lien de magasin
Le canal Lien de magasin divise la demande d'indexation de niveau Magasin unique en plusieurs requêtes individuelles, en fonction du nombre de catalogues et de langues pris en charge dans le magasin donné. Ce canal doit être utilisé au tout début d'un connecteur pour initialiser les flux spécifiques à la langue et au catalogue pour chaque magasin.
Outre le fractionnement du flux entrant par catalogue et par langue du magasin donné, ce canal initialise également les attributs de fichier de flux suivants pour chacun de ses flux sortants générés :
- master.catalog est l'ID du catalogue principal du magasin actuel.
- default.catalog est l'ID du catalogue par défaut du magasin actuel.
- default.language est l'ID de la langue par défaut du magasin actuel.
- default.contract est l'ID du contrat par défaut du magasin actuel.
- default.currency est la devise par défaut du magasin actuel.
- param.storeId est l'ID du magasin actuel.
- param.catalogId est l'ID du catalogue utilisé pour le flux actuel.
- param.langId est l'ID de la langue utilisé pour le flux actuel.
- flow.lf définit si le flux en cours utilise ou non la commutation de langue
Lien NRT
Le canal Lien NRT divise une seule demande d'indexation en quasi-temps réel (NRT) en plusieurs requêtes Magasin associées individuelles, en fonction du nombre de catalogues et de langues pris en charge pour tous ses magasins associés. Ce canal doit être utilisé au tout début d'un connecteur NRT pour initialiser les flux spécifiques à la langue et au catalogue pour chaque magasin associé.
Outre le fractionnement du flux entrant par catalogue et par langue de chaque magasin associé, ce canal initialise également les attributs de fichier de flux suivants pour chacun de ses flux sortants générés :
- master.catalog est l'ID du catalogue principal du magasin actuel.
- default.catalog est l'ID du catalogue par défaut du magasin actuel.
- default.language est l'ID de la langue par défaut du magasin actuel.
- default.contract est l'ID du contrat par défaut du magasin actuel.
- default.currency est la devise par défaut du magasin actuel.
- param.storeId est l'ID du magasin actuel.
- param.catalogId est l'ID du catalogue utilisé pour le flux actuel.
- param.langId est l'ID de la langue utilisé pour le flux actuel.
- flow.lf définit si le flux en cours utilise ou non la commutation de langue
Selon le type de pipeline utilisé avec ce connecteur NRT, différents ensembles de variables seront définis en tant qu'attributs de fichier de flux supplémentaires pour la composition de SQL NRT en aval. Pour plus de détails, consultez le chargement associé à partir du processeur UpdateAttribute suivant :
- Pipeline de chargement de données : Processeur NiFi Prepare Dataload Join SQL
extDataloadES- TI_DELTA_CG_FACET_JOIN_QUERY
- TI_DELTA_CG_JOIN_QUERY
- TI_DELTA_CG_URL_1C_JOIN_QUERY
- TI_DELTA_CG_URL_JOIN_QUERY
- TI_DELTA_JOIN_QUERY
- TI_DELTA_JOIN_QUERY_ATTCH
- TI_DELTA_JOIN_QUERY_ATTR
- TI_DELTA_JOIN_QUERY_BUNDLE
- TI_DELTA_JOIN_QUERY_DYNKIT
- TI_DELTA_JOIN_QUERY_FIND_PARENT
- TI_DELTA_JOIN_QUERY_MASSOCCECE
- TI_DELTA_JOIN_QUERY_ROLLUP_ATTR
- Pipeline d'attribut : Processeur NiFi Prepare Attribute NRT SQL extension
- extAttributeAnd
- Pipeline du produit : Processeur NiFi Prepare Product NRT SQL extension
extCatentryAndSQLextCatentryAndSQL1aextCatentryAndSQL1bextCatentryESextCatentryIdFromAndSQLextCatentryIdWhereParentOrChildextCatentryParentIdWhereSQLextCatentryURLESextCatentryWhereSQLextDataCatentryIdextDataCatentryIdParent
- Pipeline de catégorie : Processeur NiFi Prepare Category NRT SQL extension
extCatgroupAndSQLextCatgroupAndSQL1aextCatgroupAndSQL1bextCatgroupESextCatgroupURLES
Lien de chargement de données
Le canal Dataload Link divise une seule demande d'indexation Dataload en plusieurs requêtes Magasin associées individuelles, en fonction du nombre de catalogues et de langues pris en charge pour tous ses magasins associés. Ce canal doit être utilisé au tout début d'un connecteur Dataload pour initialiser les flux spécifiques à la langue et au catalogue pour chaque magasin associé.
Outre le fractionnement du flux entrant par catalogue et par langue de chaque magasin associé, ce canal initialise également les attributs de fichier de flux suivants pour chacun de ses flux sortants générés :
- master.catalog est l'ID du catalogue principal du magasin actuel.
- default.catalog est l'ID du catalogue par défaut du magasin actuel.
- default.language est l'ID de la langue par défaut du magasin actuel.
- default.contract est l'ID du contrat par défaut du magasin actuel.
- default.currency est la devise par défaut du magasin actuel.
- param.storeId est l'ID du magasin actuel.
- param.catalogId est l'ID du catalogue utilisé pour le flux actuel.
- param.langId est l'ID de la langue utilisé pour le flux actuel.
- flow.lf définit si le flux en cours utilise ou non la commutation de langue
Selon le type de pipeline utilisé avec ce connecteur Dataload, différents ensembles de variables seront définies en tant qu'attributs de fichier de flux supplémentaires pour la composition du chargement de données SQL en aval. Pour plus de détails, consultez les processeurs UpdateAttribute suivants :
extDataloadES- TI_DELTA_CG_FACET_JOIN_QUERY
- TI_DELTA_CG_JOIN_QUERY
- TI_DELTA_CG_URL_1C_JOIN_QUERY
- TI_DELTA_CG_URL_JOIN_QUERY
- TI_DELTA_JOIN_QUERY
- TI_DELTA_JOIN_QUERY_ATTCH
- TI_DELTA_JOIN_QUERY_ATTR
- TI_DELTA_JOIN_QUERY_BUNDLE
- TI_DELTA_JOIN_QUERY_DYNKIT
- TI_DELTA_JOIN_QUERY_FIND_PARENT
- TI_DELTA_JOIN_QUERY_MASSOCCECE
- TI_DELTA_JOIN_QUERY_ROLLUP_ATTR
Lien de réindexation
Le canal Reindex Link prépare tous les attributs de fichier de flux requis pour effectuer une demande de réindexation complète pour le magasin donné. Ce canal doit être utilisé au tout début d'un connecteur de réindexation complet.
Syntaxe :
Pour permettre au service Ingest de gérer et de mettre à niveau ce modèle, personnalisez ce modèle par le biais d'un descripteur de connecteur via le service Ingest. Pour plus d'informations, voir Gestion des connecteurs dans le service Ingest.
Fournissez l'ul suivant lorsqu'il est utilisé avec le descripteur de connecteur :
- Utiliser flow.language.fallback pour définir s'il faut ou non effectuer une opération de commutation de langue lors de l'indexation
Lien de fractionnement
Le canal Lien de fractionnement peut être utilisé pour brancher le flux de données en cours afin de lancer un nouveau flux asynchrone par rapport à un connecteur distinct. Un rapport récapitulatif distinct sera généré immédiatement après l'achèvement du flux de données dans ce connecteur.
Syntaxe :
Pour permettre au service Ingest de gérer et de mettre à niveau ce modèle, il est recommandé de personnaliser ce modèle par le biais d'un descripteur de connecteur via le service Ingest. Pour plus d'informations, voir Extension des connecteurs Ingest.
Utilisez la variable split.connector.name pour définir le nom du connecteur afin de transférer le flux de données en cours
{
"// Launch the post-index connector to perform indexing as a background task": {
"split.connector.name": "the name of the Connector to send the event to"
},
"name": "SplitLink",
"label": "SplitLink - Post-Index",
"properties": [
{
"name": "split.connector.name",
"value": "auth.postindex",
"scope": {
"name": "Launch Connector",
"type": "PROCESS_GROUP"
}
}
]
}
Lien d'attente
Le canal Lien d'attente est un contrôleur de flux de données qui bloque un flux de données NiFi jusqu'à ce qu'une certaine condition soit remplie :
- Atteindre la fin de certains pipelines dans un connecteur. Lorsque ce canal est configuré en tant que contrôleur de flux de données au niveau du connecteur, il limite uniquement un nombre défini de flux simultanés s'exécutant dans le même connecteur.
- Achèvement du traitement de tous les FlowFiles NiFi d'un canal donné et des requêtes d'indexation en masse associées envoyées à Elasticsearch. Alternativement, lorsque ce canal est configuré en tant que programme d'écoute d'événement qui attend uniquement un certain type d'événements entrants, en fonction de la "Méthode d'attente" suivante choisie :
- Méthode d'événement : écoute le canal Redis défini dans la propriété Channel Name du processeur NiFi Subscribe Redis. Utilisez
Wait Strategypour contrôler la portée de blocage etWait Linkne se débloque que lorsqu'une clé de message correspondante a été reçue de ce canal Redis :- Verrouillage au niveau du connecteur
- A utiliser comme contrôle de flux. La clé de message est au format suivant :
Son contenu de message JSON possède les propriétés suivantes :<connector.name> - <stage.name>catalog, connector, environment, flow, language, run, status, store, wait
- Verrouillage au niveau du magasin
- A utiliser comme contrôle de flux. La clé de message est au format suivant :
Son contenu de message JSON possède les propriétés suivantes :<connector.name> - <stage.name> - <store.id>catalog, connector, environment, flow, language, run, status, store, wait
- Verrouillage au niveau de l'exécution
- Pour être utilisé en tant que contrôle de flux et la clé de message est au format suivant :
Son contenu de message JSON possède les propriétés suivantes :<connector.name> - <stage.name> - <run.id>wait, environment, run, store, language, catalog, connector, flow, status
- Verrouillage au niveau de la phase
- A utiliser avec
WaitLink. La clé de message est au format suivant :
Son contenu du message JSON possède les propriétés suivantes :<connector.name> - <stage.name> - <store.id> - <language.id> - <catalog.id> - <run.id>wait, environment, run, store, language, catalog, connector, flow, status
Pour illustrer avec un exemple de
WaitLinkà lProduct Stage 1adu connecteur auth.reindex, supposez un flux de données avec 11 store.id, -1 language.id, 10001 catalog.id, pour exécuter9d310aba-756b-4690-adae-856528d70f10. Lorsqu'unWaitLinkest mis en place pour bloquer à cette étape 1a du produit, le flux de données est libéré uniquement lorsqu'un message JSON avec la clé“auth.reindex-DatabaseProductStage1a-11--1-10001-9d310aba-756b-4690-adae-856528d70f10”est reçu du canal Redis services/nifi/wait pour l'environnement de création. - Méthode d'attente : met en pause le flux de données en cours pendant la durée spécifiée dans la "Durée de la pénalité".
- Méthode d'examen : surveille la taille de requête de niveau supérieur du canal spécifié pour attendre que le compteur revienne à zéro avant que le flux de données ne soit libéré pour un traitement en aval ultérieur.
- Méthode d'événement : écoute le canal Redis défini dans la propriété Channel Name du processeur NiFi Subscribe Redis. Utilisez
Syntaxe :
Pour permettre au service Ingest de gérer et de mettre à niveau ce modèle, personnalisez ce modèle par le biais d'un descripteur de connecteur via le service Ingest. Pour plus d'informations, voir Gestion des connecteurs dans le service Ingest.
Fournissez l'ul suivant lorsqu'il est utilisé avec le descripteur de connecteur :
En tant que contrôleur de flux :
- Définissez connector.flow.control sur true.
- Utilisez connector.flow.limit pour définir le nombre maximal de flux parallèles que le connecteur doit exécuter à tout moment.
- Utilisez wait.strategy pour contrôler le niveau de verrouillage :
connector,storeourun. - Assurez-vous que la variable wait.flow.strategy est définie sur l'une des valeurs suivantes, en fonction du niveau de verrouillage défini ci-dessus :
"Connector" = "connector.id" "Store" = "store.id" "Run" = "run.id"
En tant que programme d'écoute d'événement :
- Si vous le souhaitez, définissez connector.flow.control sur false.
- Utiliser connector.stage.name pour définir le nom du groupe de processus NiFi à surveiller
- Utiliser wait.connector.name pour définir le nom d'un autre connecteur à surveiller (facultatif)
- Utiliser wait.error.strategy pour définir s'il faut continuer ou arrêter immédiatement en cas d'erreur
- Utiliser wait.error.limit pour définir le niveau de tolérance aux erreurs avant d'appliquer la stratégie d'erreur
- Utiliser wait.method pour définir s'il faut utiliser un déclencheur basé sur des événements ou analyser la taille de la file d'attente
WaitLink.{
"// Wait for completion of all Product documents before proceeding to the next stage": {
"connector.stage.name": "name of the process group to monitor",
"wait.error.limit": "define the error limit to stop or empty for no limit",
"wait.method": "define whether to use event based trigger or to scan queue size"
},
"name": "WaitLink",
"label": "WaitLink - Product Stage 1a",
"properties": [
{
"name": "connector.stage.name",
"value": "DatabaseProductStage1a",
"scope": {
"name": "Wait for Completion",
"type": "PROCESS_GROUP"
}
},
{
"name": "wait.error.limit",
"value": "0",
"scope": {
"name": "Wait for Completion",
"type": "PROCESS_GROUP"
}
},
{
"name": "wait.method",
"value": "event",
"scope": {
"name": "Wait for Completion",
"type": "PROCESS_GROUP"
}
}
]
}