Mise à jour du groupe de processus NiFi, du processeur, du service de contrôleur à l'aide du descripteur de connecteur Ingest
Les processeurs NiFi sont les éléments structurels de base du pipeline de flux de données version 9.1. Les processeurs effectuent des tâches spécifiques dans le pipeline, telles que l'écoute des données entrantes, l'acquisition de données à partir de sources externes et le routage, la transformation ou l'extraction d'informations à partir de fichiers de flux. Les processeurs sont regroupés en groupes de processus. Vous pouvez utiliser une API simple pour mettre à jour les processeurs, les groupes et les services associés existants.
Le nœud final REST pour cette API est :
-
PUT /connectors
-
BODY connectorEntity Json
Processeurs
Vous pouvez mettre à jour ou modifier les configurations de processeur, les propriétés du processeur, la connexion et les configurations de connexion selon vos besoins métier. Vous spécifiez des modifications de processeur en incluant la propriété suivante dans les
La personnalisation se produit dans le processeur spécifié dans les , où ProcessGroupName est le nom du groupe de processus qui contient le processeur lui-même.
Cette API peut également personnaliser tous les processeurs imbriqués dans le groupe de processus spécifié si ''*" est utilisé dans le nom de portée. Par exemple : "ProcessGroupName*".
Mettre à jour les configurations de processeur :
vous pouvez mettre à jour ou modifier les éléments de configuration de processeur suivants :
| Nom de l'élément de configuration | Nom d'élément de configuration correspondant dans JSON | Description | Valeur acceptée |
|---|---|---|---|
| Tâches simultanées | ConcurrentlySchedulableTaskCount | Nombre de tâches qui doivent être programmées simultanément pour le processeur spécifié. | n Note: n est un entier. |
| Exécuter un planning | SchedulingPeriod | Nombre minimal de secondes qui doivent s'écouler entre l'exécution de la tâche | n sec Note: n est un entier. |
| Exécution | ExecutionNode | Les nœuds sur lesquels l'exécution de ce processeur sera programmée. | L'une des valeurs suivantes :
|
| Durée de la pénalité | PenaltyDuration | Durée utilisée lorsque ce processeur pénalise un FlowFile. | n sec Note: n est un entier. |
| Durée du rendement | YieldDuration | Lorsqu'un processeur cède, il ne sera pas reprogrammé tant que cette durée n'est pas écoulée. | n sec Note: n est un entier. |
| Niveau de bulletin | BulletinLevel | Niveau auquel ce processeur générera des bulletins. | L'une des valeurs suivantes :
|
| Stratégie de planification | SchedulingStrategy | Stratégie utilisée pour planifier ce processeur. | L'une des valeurs suivantes :
|
| Commentaires | Commentaires | Chaîne |
- Mettez à jour la valeur des tâches simultanées pour le processeur "Transformer le document - Rechercher des attributs depuis la base de données" dans le groupe de processus "Rechercher des attributs depuis la base de données" sous le canal DatabaseProductStage1c.
{ "name": "auth.reindex", "pipes": [ { "name": "DatabaseProductStage1c", "properties": [ { "name": "ConcurrentlySchedulableTaskCount", "value": "5", "scope": { "name": "Find Attributes from Database.Transform Document - Find Attributes From Database", "type": "PROCESSOR" } } ] } ] } } - Mettez à jour la valeur des tâches simultanées pour tous les processeurs dans le groupe de processus "SCROLL SQL" sous le canal DatabaseProductStage1c.
{ "name": "auth.reindex", "pipes": [ { "name": "DatabaseProductStage1c", "properties": [ { "name": "ConcurrentlySchedulableTaskCount", "value": "3", "scope": { "name": "SCROLL SQL.*", "type": "PROCESSOR" } } ] } ] }
Mettre à jour les propriétés du processeur :
http://nifiHost:nifiPort/nifi-api/processors/{processorId}Vous pouvez afficher dans la réponse JSON ( ) les propriétés qui peuvent être modifiées.
"name": "properties.{property name to change}"Exemple :
{
"name": "auth.reindex",
"pipes": [
{
"name": "DatabaseProductStage1c",
"properties": [
{
"name": "properties.Max Wait Time",
"value": "25 seconds",
"scope": {
"name": "SCROLL SQL.Execute SQL - Find Parent Category",
"type": "PROCESSOR"
}
}
]
}
]
}Connexion
- sourceName : Nom du processeur/groupe de processus d'où provient la connexion (réception des fichiers de flux).
- destinationName : Nom du processeur/groupe de processus vers qui se dirige la connexion (envoi des fichiers de flux).
Exemple :
{
"name": "BackPressureObjectThreshold",
"value": "600",
"scope": {
"name": "SCROLL SQL",
"sourceName": "Execute SQL - Find Parent Category",
"destinationName": "Analyze Successful SQL Response",
"type": "CONNECTION"
}
}
Mettre à jour les configurations de connexion :
Vous pouvez mettre à jour ou modifier les éléments de configuration de connexion suivants :
| Nom de l'élément de configuration | Nom d'élément de configuration correspondant dans JSON | Description | Valeur acceptée |
|---|---|---|---|
| Seuil d'objet de pression arrière | BackPressureObjectThreshold | Nombre maximal d'objets qui peuvent être mis en file d'attente avant l'application de pression arrière. | n Note: n est un entier. |
| Seuil de taille | BackPressureDataSizeThreshold | Taille maximale des données des objets qui peuvent être mis en file d'attente avant l'application de la backpressure. | n Go Note: n est un entier. |
| Expiration de FlowFile | FlowFileExpiration | Durée maximale d'un objet dans le flux avant qu'il ne soit retiré du flux. . |
n sec Note: n est un entier. |
Exemple :
Mettez à jour une connexion entre le processeur "Analyser une réponse SQL fructueuse" et le port de sortie "SUCCESS" dans le groupe "SCROLL SQL" sous "DatabaseProductStage1c".
{
"name": "auth.reindex",
"pipes": [
{
"name": "DatabaseProductStage1c",
"properties": [
{
"name": "BackPressureObjectThreshold",
"value": "200",
"scope": {
"name": "SCROLL SQL",
"sourceName": "Analyze Successful SQL Response",
"destinationName": "SUCCESS",
"type": "CONNECTION"
}
}
]
}
]
}
Groupes de processus
Personnalisez vos groupes de processus en incluant la propriété suivante dans les les .
Cela personnalise les variables au niveau du groupe de processus et uniquement sur le groupe de processus spécifié. Pour personnaliser plusieurs groupes de processus, chacun d'eux aura besoin de sa propre entrée de propriétés de canal.
Exemple :
{
"name": "auth.reindex",
"pipes": [
{
"name": "DatabaseProductStage1c",
"properties": [
{
"name": "matchmaker.proximity",
"value": "0.1",
"scope": {
"name": "Rollup Attributes",
"type": "PROCESS_GROUP"
}
},
{
"name": "matchmaker.proximity",
"value": "0.4",
"scope": {
"name": "Find Attributes",
"type": "PROCESS_GROUP"
}
}
]
}
]
}
Services du contrôleur
Personnalisez les services du contrôleur en incluant la propriété suivante dans les .
Cette propriété permet la personnalisation des variables au niveau du canal principal et uniquement sur le service de contrôleur spécifié. Pour personnaliser plusieurs services de contrôleur, chacun d'eux aura besoin de sa propre entrée de propriétés de canal.
Exemple :
{
"name": "auth.reindex",
"pipes": [
{
"name": "DatabaseProductStage1c",
"properties": [
{
"name": "Max Total Connections",
"value": "20",
"scope": {
"name": "Database Connection Pool",
"type": "CONTROLLER_SERVICE"
}
}
]
}
]
}
Personnalisations de canaux multiples
Cette API prend en charge la personnalisation de plusieurs canaux dans un connecteur. Pour ce faire, vous pouvez inclure plusieurs entrées de canal.
Exemple :
{
"name": "auth.reindex",
"pipes": [
{
"name": "DatabaseProductStage1c",
"properties": [
{
"name": "concurrent.tasks",
"value": "1",
"scope": {
"name": "Find Attributes from Database.Transform Document - Find Attributes From Database",
"type": "PROCESSOR"
}
},
{
"name": "matchmaker.proximity",
"value": "0.6",
"scope": {
"name": "Rollup Attributes",
"type": "PROCESS_GROUP"
}
},
{
"name": "Max Total Connections",
"value": "20",
"scope": {
"name": "Database Connection Pool",
"type": "CONTROLLER_SERVICE"
}
}
]
},
{
"name": "WaitLink",
"label": "WaitLink - Product Stage 1 - 3",
"properties": [
{
"name": "connector.stage.name",
"value": "TESTING",
"scope": {
"name": "Wait for Completion",
"type": "PROCESS_GROUP"
}
},
{
"name": "concurrent.tasks",
"value": "5",
"scope": {
"name": "Wait for Completion.Get Status",
"type": "PROCESSOR"
}
}
]
}
]
}
-
Les personnalisations effectuées avec cette API ne sont pas enregistrées dans ZooKeeper. Par conséquent, si le connecteur est supprimé et créé à nouveau, vous pouvez extraire le descripteur de connecteur personnalisé à l'aide de la requête
GET http://{IngestHost}:{IngestPort}/connectors/{connectorName}. -
Vous pouvez importer un connecteur personnalisé à l'aide de la requête
POST http://{IngestHost}:{IngestPort}/connectors/{connectorName}. A l'aide de cette requête, vous pouvez importer le connecteur personnalisé de l'une des deux manières suivantes :- Avec un corps vide si ZooKeeper contient le descripteur de connecteur personnalisé
OR
- Avec le descripteur de connecteur personnalisé extrait
- Avec un corps vide si ZooKeeper contient le descripteur de connecteur personnalisé
- Enregistrez le descripteur de connecteur personnalisé, car les données dans ZooKeeper peuvent être écrasées à partir d'Ingest. La sauvegarde du descripteur de connecteur personnalisé vous offre de la flexibilité en vous déplaçant facilement entre les différentes versions, car le descripteur personnalisé enregistré peut être fusionné manuellement avec le descripteur de connecteur par défaut.