Mise à jour du groupe de processus NiFi, du processeur, du service de contrôleur à l'aide du descripteur de connecteur Ingest

Les processeurs NiFi sont les éléments structurels de base du pipeline de flux de données version 9.1. Les processeurs effectuent des tâches spécifiques dans le pipeline, telles que l'écoute des données entrantes, l'acquisition de données à partir de sources externes et le routage, la transformation ou l'extraction d'informations à partir de fichiers de flux. Les processeurs sont regroupés en groupes de processus. Vous pouvez utiliser une API simple pour mettre à jour les processeurs, les groupes et les services associés existants.

Le nœud final REST pour cette API est :

  • PUT /connectors

  • BODY connectorEntity Json

Processeurs

Vous pouvez mettre à jour ou modifier les configurations de processeur, les propriétés du processeur, la connexion et les configurations de connexion selon vos besoins métier. Vous spécifiez des modifications de processeur en incluant la propriété suivante dans les entrées de canal > propriétés de canal > portée > type = PROCESSOR

La personnalisation se produit dans le processeur spécifié dans les propriétés de canal > portée > name = ProcessGroupName.Processor, où ProcessGroupName est le nom du groupe de processus qui contient le processeur lui-même.

Cette API peut également personnaliser tous les processeurs imbriqués dans le groupe de processus spécifié si ''*" est utilisé dans le nom de portée. Par exemple : "ProcessGroupName*".

Mettre à jour les configurations de processeur :

vous pouvez mettre à jour ou modifier les éléments de configuration de processeur suivants :

Nom de l'élément de configuration Nom d'élément de configuration correspondant dans JSON Description Valeur acceptée
Tâches simultanées ConcurrentlySchedulableTaskCount Nombre de tâches qui doivent être programmées simultanément pour le processeur spécifié. n
Note: n est un entier.
Exécuter un planning SchedulingPeriod Nombre minimal de secondes qui doivent s'écouler entre l'exécution de la tâche n sec
Note: n est un entier.
Exécution ExecutionNode Les nœuds sur lesquels l'exécution de ce processeur sera programmée. L'une des valeurs suivantes :
  • Tout
  • PRIMAIRE
Durée de la pénalité PenaltyDuration Durée utilisée lorsque ce processeur pénalise un FlowFile. n sec
Note: n est un entier.
Durée du rendement YieldDuration Lorsqu'un processeur cède, il ne sera pas reprogrammé tant que cette durée n'est pas écoulée. n sec
Note: n est un entier.
Niveau de bulletin BulletinLevel Niveau auquel ce processeur générera des bulletins. L'une des valeurs suivantes :
  • AUCUN
  • DEBUG
  • INFO
  • WARN
  • ERREUR
Stratégie de planification SchedulingStrategy Stratégie utilisée pour planifier ce processeur. L'une des valeurs suivantes :
  • TIMER_DRIVEN
  • EVENT_DRIVEN
  • CRON_DRIVEN
Commentaires Commentaires Chaîne
Exemples
  • Mettez à jour la valeur des tâches simultanées pour le processeur "Transformer le document - Rechercher des attributs depuis la base de données" dans le groupe de processus "Rechercher des attributs depuis la base de données" sous le canal DatabaseProductStage1c.
    {
        "name": "auth.reindex",
        "pipes": [
            {
                "name": "DatabaseProductStage1c",
                "properties": [
                    {
                        "name": "ConcurrentlySchedulableTaskCount",
                        "value": "5",
                        "scope": {
                            "name": "Find Attributes from Database.Transform Document - Find Attributes From Database",
                            "type": "PROCESSOR"
                        }
                    }
                ]
            }
        ]
    }
    }
  • Mettez à jour la valeur des tâches simultanées pour tous les processeurs dans le groupe de processus "SCROLL SQL" sous le canal DatabaseProductStage1c.
    {
        "name": "auth.reindex",
        "pipes": [
            {
                "name": "DatabaseProductStage1c",
                "properties": [
                        {
                        "name": "ConcurrentlySchedulableTaskCount",
                        "value": "3",
                        "scope": {
                            "name": "SCROLL SQL.*",
                            "type": "PROCESSOR"
                        }
                    }
                ]
            }
        ]
    }

Mettre à jour les propriétés du processeur :

Outre les configurations de processeur susmentionnées, vous pouvez également modifier les propriétés du processeur. Utilisez l'API suivante pour obtenir la liste des propriétés que vous pouvez modifier pour le processeur :
http://nifiHost:nifiPort/nifi-api/processors/{processorId}

Vous pouvez afficher dans la réponse JSON ( component > config > properties) les propriétés qui peuvent être modifiées.

Le corps JSON utilisé pour mettre à jour les propriétés est similaire à la mise à jour de la configuration, à l'exception de la façon dont vous spécifiez le nom de la propriété. Il est modifié comme suit :
"name": "properties.{property name to change}"

Exemple :

Mettez à jour la propriété "Temps d'attente max" pour le processeur "Exécuter SQL - Rechercher une catégorie parent".
{
    "name": "auth.reindex",
    "pipes": [
        {
            "name": "DatabaseProductStage1c",
            "properties": [
                {
                    "name": "properties.Max Wait Time",
                    "value": "25 seconds",
                    "scope": {
                        "name": "SCROLL SQL.Execute SQL - Find Parent Category",
                        "type": "PROCESSOR"
                    }
                }
            ]
        }
    ]
}

Connexion

La nouvelle version de l'API prend également en charge la mise à jour des connexions et des configurations de connexion. Le corps d'une mise à jour de connexions est différent d'une mise à jour de processeur ou de groupe de processus. Contrairement au processeur et aux groupes de processus, les connexions n'ont pas de nom identifiable de manière unique et il n'est donc pas possible d'obtenir un ID de connexion basé sur le nom de connexion. Pour résoudre ce problème, ces nouvelles zones de la portée ont été ajoutées :
  • sourceName : Nom du processeur/groupe de processus d'où provient la connexion (réception des fichiers de flux).
  • destinationName : Nom du processeur/groupe de processus vers qui se dirige la connexion (envoi des fichiers de flux).
Le nom dans la portée est le nom du groupe de processus immédiat qui contient la connexion.

Exemple :

{
    "name": "BackPressureObjectThreshold",
    "value": "600",
    "scope": {
        "name": "SCROLL SQL",
        "sourceName": "Execute SQL - Find Parent Category",
        "destinationName": "Analyze Successful SQL Response",
        "type": "CONNECTION"
    }
}

Mettre à jour les configurations de connexion :

Vous pouvez mettre à jour ou modifier les éléments de configuration de connexion suivants :

Nom de l'élément de configuration Nom d'élément de configuration correspondant dans JSON Description Valeur acceptée
Seuil d'objet de pression arrière BackPressureObjectThreshold Nombre maximal d'objets qui peuvent être mis en file d'attente avant l'application de pression arrière. n
Note: n est un entier.
Seuil de taille BackPressureDataSizeThreshold Taille maximale des données des objets qui peuvent être mis en file d'attente avant l'application de la backpressure. n Go
Note: n est un entier.
Expiration de FlowFile FlowFileExpiration

Durée maximale d'un objet dans le flux avant qu'il ne soit retiré du flux.

.
n sec
Note: n est un entier.

Exemple :

Mettez à jour une connexion entre le processeur "Analyser une réponse SQL fructueuse" et le port de sortie "SUCCESS" dans le groupe "SCROLL SQL" sous "DatabaseProductStage1c".

{
    "name": "auth.reindex",
    "pipes": [
        {
            "name": "DatabaseProductStage1c",
            "properties": [
                {
                    "name": "BackPressureObjectThreshold",
                    "value": "200",
                    "scope": {
                        "name": "SCROLL SQL",
                        "sourceName": "Analyze Successful SQL Response",
                        "destinationName": "SUCCESS",
                        "type": "CONNECTION"
                    }
                }
            ]
        }
    ]
}

Groupes de processus

Personnalisez vos groupes de processus en incluant la propriété suivante dans les les entrées de canal > propriétés de canal > portée > type = PROCESS_GROUP.

Cela personnalise les variables au niveau du groupe de processus et uniquement sur le groupe de processus spécifié. Pour personnaliser plusieurs groupes de processus, chacun d'eux aura besoin de sa propre entrée de propriétés de canal.

Exemple :

{
    "name": "auth.reindex",
    "pipes": [
        {
            "name": "DatabaseProductStage1c",
            "properties": [
                    {
                        "name": "matchmaker.proximity",
                        "value": "0.1",
                        "scope": {
                            "name": "Rollup Attributes",
                            "type": "PROCESS_GROUP"
                        }
                    },
                    {
                        "name": "matchmaker.proximity",
                        "value": "0.4",
                        "scope": {
                            "name": "Find Attributes",
                            "type": "PROCESS_GROUP"
                        }
                    }
            ]
        }
    ]
}

Services du contrôleur

Personnalisez les services du contrôleur en incluant la propriété suivante dans les entrées de canal > propriétés de canal > portée > type = CONTROLLER_SERVICE.

Cette propriété permet la personnalisation des variables au niveau du canal principal et uniquement sur le service de contrôleur spécifié. Pour personnaliser plusieurs services de contrôleur, chacun d'eux aura besoin de sa propre entrée de propriétés de canal.

Exemple :

{
    "name": "auth.reindex",
    "pipes": [
        {
            "name": "DatabaseProductStage1c",
            "properties": [
                {
                    "name": "Max Total Connections",
                    "value": "20",
                    "scope": {
                        "name": "Database Connection Pool",
                        "type": "CONTROLLER_SERVICE"
                    }
                }
            ]
        }
    ]
}

Personnalisations de canaux multiples

Cette API prend en charge la personnalisation de plusieurs canaux dans un connecteur. Pour ce faire, vous pouvez inclure plusieurs entrées de canal.

Exemple :

{
    "name": "auth.reindex",
    "pipes": [
        {
            "name": "DatabaseProductStage1c",
            "properties": [
                {
                    "name": "concurrent.tasks",
                    "value": "1",
                    "scope": {
                        "name": "Find Attributes from Database.Transform Document - Find Attributes From Database",
                        "type": "PROCESSOR"
                    }
                },
                {
                    "name": "matchmaker.proximity",
                    "value": "0.6",
                    "scope": {
                        "name": "Rollup Attributes",
                        "type": "PROCESS_GROUP"
                    }
                },
                {
                    "name": "Max Total Connections",
                    "value": "20",
                    "scope": {
                        "name": "Database Connection Pool",
                        "type": "CONTROLLER_SERVICE"
                    }
                }
            ]
        },
        {
            "name": "WaitLink",
            "label": "WaitLink - Product Stage 1 - 3",
            "properties": [
                {
                    "name": "connector.stage.name",
                    "value": "TESTING",
                    "scope": {
                        "name": "Wait for Completion",
                        "type": "PROCESS_GROUP"
                    }
                },
                {
                    "name": "concurrent.tasks",
                    "value": "5",
                    "scope": {
                        "name": "Wait for Completion.Get Status",
                        "type": "PROCESSOR"
                    }
                }
            ]
        }
    ]
}
HCL Commerce Version 9.1.5.0 or later
Important:
  • Les personnalisations effectuées avec cette API ne sont pas enregistrées dans ZooKeeper. Par conséquent, si le connecteur est supprimé et créé à nouveau, vous pouvez extraire le descripteur de connecteur personnalisé à l'aide de la requête GET http://{IngestHost}:{IngestPort}/connectors/{connectorName}.

  • Vous pouvez importer un connecteur personnalisé à l'aide de la requête POST http://{IngestHost}:{IngestPort}/connectors/{connectorName}. A l'aide de cette requête, vous pouvez importer le connecteur personnalisé de l'une des deux manières suivantes :
    • Avec un corps vide si ZooKeeper contient le descripteur de connecteur personnalisé

      OR

    • Avec le descripteur de connecteur personnalisé extrait
  • Enregistrez le descripteur de connecteur personnalisé, car les données dans ZooKeeper peuvent être écrasées à partir d'Ingest. La sauvegarde du descripteur de connecteur personnalisé vous offre de la flexibilité en vous déplaçant facilement entre les différentes versions, car le descripteur personnalisé enregistré peut être fusionné manuellement avec le descripteur de connecteur par défaut.