Pipeline d'index Stock Ingest

Pour plus d'informations sur la conception du schéma d'index Produit, voir Schéma d'index Produit Ingest. Pour plus d'informations sur l'appel du service Ingest, voir Rechercher l'API de service Ingest. Pour une liste complète des zones et des paramètres d'index Elasticsearch, voir Types de zones d'index Elasticsearch.

Mappage de la zone de stock à partir de la base de données

La séquence d'étapes suivante ​illustre le pipeline d'indexation Inventaire implémenté dans Apache NiFi. Le flux se compose principalement de deux étapes :

Etape 1 : Associer les stocks parent au document Produit
Cette étape décrit comment les données Stock peuvent être transformées et chargées dans l'index Produit.
Il commence par exécuter l'instruction SQL suivante pour récupérer les données Stock depuis la base de données Commerce :
 SELECT A.CATENTRY_ID_PARENT,
             LISTAGG(A.FFMCENTER_ID, ', ') WITHIN GROUP (ORDER BY A.FFMCENTER_ID) FFMCENTER_ID,
	     LISTAGG(TO_CHAR(A.QUANTITY), ', ') WITHIN GROUP (ORDER BY A.FFMCENTER_ID) QUANTITY,
	     SUM(A.QUANTITY) TOTAL
       FROM (SELECT M.CATENTRY_ID_PARENT, M.FFMCENTER_ID, SUM(M.QUANTITY) QUANTITY
		    FROM (SELECT L.CATENTRY_ID_PARENT, L.CATENTRY_ID_CHILD, I.FFMCENTER_ID, I.QUANTITY
					FROM CATENTREL L, CATGPENREL R, CATENTRY C, INVENTORY I
				    WHERE R.CATALOG_ID = ${param.catalogId}
				      AND R.CATALOG_ID IN
				          (SELECT CATALOG_ID FROM STORECAT
				            WHERE STOREENT_ID IN
					              (SELECT RELATEDSTORE_ID FROM STOREREL WHERE STATE = 1 AND STRELTYP_ID = -4 AND STORE_ID = ${param.storeId}))
					  AND R.CATENTRY_ID = C.CATENTRY_ID AND C.MARKFORDELETE = 0 AND C.CATENTRY_ID = L.CATENTRY_ID_PARENT
					  AND C.CATENTTYPE_ID NOT IN ('ItemBean')
					  AND L.CATRELTYPE_ID NOT IN ('BUNDLE_COMPONENT', 'PACKAGE_COMPONENT')
					  AND L.CATENTRY_ID_CHILD = I.CATENTRY_ID AND I.STORE_ID = ${param.storeId}
				    UNION
				   SELECT CATENTRY_ID_PARENT, KITCOMP.CATENTRY_ID_CHILD, KITCOMP.FFMCENTER_ID, KITCOMP.QUANTITY
				     FROM (SELECT L.CATENTRY_ID_PARENT, L.CATENTRY_ID_CHILD, I.FFMCENTER_ID, I.QUANTITY
				            FROM CATENTREL L, CATGPENREL R, CATENTRY C, INVENTORY I
				           WHERE R.CATALOG_ID = ${param.catalogId}
				             AND R.CATALOG_ID IN (SELECT CATALOG_ID FROM STORECAT
				                                   WHERE STOREENT_ID IN
					                                     (SELECT RELATEDSTORE_ID FROM STOREREL
				                                           WHERE STATE = 1 AND STRELTYP_ID = -4 AND STORE_ID = ${param.storeId}))
					         AND R.CATENTRY_ID = C.CATENTRY_ID AND C.MARKFORDELETE = 0 AND C.CATENTRY_ID = L.CATENTRY_ID_PARENT
						     AND CATRELTYPE_ID in ('BUNDLE_COMPONENT','PACKAGE_COMPONENT')
						     AND L.CATENTRY_ID_CHILD = I.CATENTRY_ID AND I.STORE_ID = ${param.storeId} ) KITCOMP) M
		   GROUP BY M.CATENTRY_ID_PARENT, M.FFMCENTER_ID) A
       GROUP BY A.CATENTRY_ID_PARENT
       ORDER BY A.CATENTRY_ID_PARENT
       OFFSET ${param.offset} ROWS FETCH NEXT ${param.pageSize} ROWS ONLY 	     
Ensuite, le jeu de résultats est transmis au processeur FindParentInventoriesFromDatabase​ pour transformation, à l'aide du tableau suivant pour ​mapper la zone de base de données renvoyée depuis l'instruction SQL ci-dessus vers une zone d'index dans l'index Produit :​
Nom de zone d'index Type de zone d'index ​​Description
Propriétés
​​inventories/<fulfillment>/quantity float Quantité du stock actuel ; mappé à INVENTORY.QUANTITY
inventories/<fulfillment>/id id_string​ L'ID du centre de distribution du stock actuel ; mappé à INVENTORY.FFMCENTER_ID
inventories/total float Somme de la quantité provenant de INVENTORY. QUANTITY de tous les centres de distribution pour l'entrée de catalogue.
Pour un exemple de code, voir les exemples de l'étape 1
Etape 2 : Associer les stocks enfant au document Produit
Cette étape décrit comment les données du stock enfant peuvent être transformées et chargées dans l'index Produit. Il commence par exécuter l'instruction SQL suivante pour récupérer les données Stock depuis la base de données Commerce :
SELECT CATENTRY.CATENTRY_ID,
	     		LISTAGG(INVENTORY.FFMCENTER_ID, ', ') WITHIN GROUP (ORDER BY INVENTORY.FFMCENTER_ID) FFMCENTER_ID,
	     		LISTAGG(TO_CHAR(INVENTORY.QUANTITY), ', ') WITHIN GROUP (ORDER BY INVENTORY.FFMCENTER_ID) QUANTITY,
	     		SUM(INVENTORY.QUANTITY) TOTAL
	     FROM INVENTORY, CATENTRY, CATGPENREL
	    WHERE CATGPENREL.CATALOG_ID = ${param.catalogId}
	      AND CATENTRY.CATENTRY_ID = CATGPENREL.CATENTRY_ID
	      AND CATENTRY.CATENTTYPE_ID = 'ItemBean'
	      AND CATENTRY.MARKFORDELETE = 0
	      AND CATENTRY.CATENTRY_ID = INVENTORY.CATENTRY_ID
	      AND INVENTORY.STORE_ID = ${param.storeId}
	    GROUP BY CATENTRY.CATENTRY_ID
            ORDER BY CATENTRY.CATENTRY_ID
            OFFSET ${param.offset} ROWS FETCH NEXT ${param.pageSize} ROWS ONLY
Ensuite, le jeu de résultats est transmis au processeur FindChildInventoriesFromDatabase​ pour transformation, à l'aide du tableau suivant pour ​mapper la zone de base de données renvoyée depuis l'instruction SQL ci-dessus vers une zone d'index dans l'index Produit :​
Nom de zone d'index Type de zone d'index ​​Description
Propriétés
​​inventories/<fulfillment>/quantity float Quantité du stock actuel ; mappé à INVENTORY.QUANTITY
inventories/<fulfillment>/id id_string​ L'ID du centre de distribution du stock actuel ; mappé à INVENTORY.FFMCENTER_ID
inventories/total float Somme de la quantité provenant de INVENTORY. QUANTITY de tous les centres de distribution pour l'entrée de catalogue.
Pour un exemple de code, voir les exemples de l'étape 2
Exemples de l'étape 1
Le code suivant est un exemple des données d'entrée pour le processeur FindParentInventoriesFromDatabase​ :
{
  "CATENTRY_ID_PARENT": 13698,
  "FFMCENTER_ID": "10501, 11501",
  "QUANTITY": "606, 600",
  "TOTAL": 1206.0
}
Le processeur FindParentInventoriesFromDatabase​ transforme les données d'entrée en données de sortie suivantes :
{ "update": { "_id": "1--1-10001-13698", "_index": ".auth.1.product.202006160325", "retry_on_conflict": 5, "_source": false } }
{ "doc": {"inventories":{"total":{"quantity":1206.0},"10501":{"quantity":606.0,"id":"10501"},"11501":{"quantity":600.0,"id":"11501"}},"__meta":{"modified":"2020-07-27T13:49:59.403Z"}} }
Exemples de l'étape 2
Le code suivant est un exemple des données d'entrée pour le processeur FindChildInventoriesFromDatabase​ :
{
  "CATENTRY_ID": 11778,
  "FFMCENTER_ID": "10501, 11501, 11502, 11503, 11504, 11505, 11506, 11507, 11508, 11509, 11510, 11511, 11512, 11513, 11514, 11515, 11516, 11517, 11518, 11519, 11520, 11521, 11522, 11523, 11524, 11525, 11526, 11527, 11528, 11529, 11530, 11531, 11532, 11533, 11534, 11535, 11536, 11537, 11538, 11539, 11540, 11541, 11542, 11543, 11544, 11545, 11546, 11547, 11548, 11549, 11550, 11551, 11552, 11553, 11554, 11555",
  "QUANTITY": "101, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100",
  "TOTAL": 5601.0
}
Le processeur FindChildInventoriesFromDatabase​ transforme les données d'entrée en données de sortie suivantes :
{ "update": { "_id": "1--1-10001-11778", "_index": ".auth.1.product.202006160325", "retry_on_conflict": 5, "_source": false } }
{
  "doc": {
    "inventories": {
      "10501": {
        "quantity": 101,
        "id": "10501"
      },
      "11501": {
        "quantity": 100,
        "id": "11501"
      },
      "11502": {
        "quantity": 100,
        "id": "11502"
      },
      "11503": {
        "quantity": 100,
        "id": "11503"
      },
      "11504": {
        "quantity": 100,
        "id": "11504"
      },
      "11505": {
        "quantity": 100,
        "id": "11505"
      },
      "11506": {
        "quantity": 100,
        "id": "11506"
      },
      "11507": {
        "quantity": 100,
        "id": "11507"
      },
      "11508": {
        "quantity": 100,
        "id": "11508"
      },
      "11509": {
        "quantity": 100,
        "id": "11509"
      },
      "11510": {
        "quantity": 100,
        "id": "11510"
      },
      "11511": {
        "quantity": 100,
        "id": "11511"
      },
      "11512": {
        "quantity": 100,
        "id": "11512"
      },
      "11513": {
        "quantity": 100,
        "id": "11513"
      },
      "11514": {
        "quantity": 100,
        "id": "11514"
      },
      "11515": {
        "quantity": 100,
        "id": "11515"
      },
      "11516": {
        "quantity": 100,
        "id": "11516"
      },
      "11517": {
        "quantity": 100,
        "id": "11517"
      },
      "11518": {
        "quantity": 100,
        "id": "11518"
      },
      "11519": {
        "quantity": 100,
        "id": "11519"
      },
      "11520": {
        "quantity": 100,
        "id": "11520"
      },
      "11521": {
        "quantity": 100,
        "id": "11521"
      },
      "11522": {
        "quantity": 100,
        "id": "11522"
      },
      "11523": {
        "quantity": 100,
        "id": "11523"
      },
      "11524": {
        "quantity": 100,
        "id": "11524"
      },
      "11525": {
        "quantity": 100,
        "id": "11525"
      },
      "11526": {
        "quantity": 100,
        "id": "11526"
      },
      "11527": {
        "quantity": 100,
        "id": "11527"
      },
      "11528": {
        "quantity": 100,
        "id": "11528"
      },
      "11529": {
        "quantity": 100,
        "id": "11529"
      },
      "11530": {
        "quantity": 100,
        "id": "11530"
      },
      "11531": {
        "quantity": 100,
        "id": "11531"
      },
      "11532": {
        "quantity": 100,
        "id": "11532"
      },
      "11533": {
        "quantity": 100,
        "id": "11533"
      },
      "11534": {
        "quantity": 100,
        "id": "11534"
      },
      "11535": {
        "quantity": 100,
        "id": "11535"
      },
      "11536": {
        "quantity": 100,
        "id": "11536"
      },
      "11537": {
        "quantity": 100,
        "id": "11537"
      },
      "11538": {
        "quantity": 100,
        "id": "11538"
      },
      "11539": {
        "quantity": 100,
        "id": "11539"
      },
      "11540": {
        "quantity": 100,
        "id": "11540"
      },
      "11541": {
        "quantity": 100,
        "id": "11541"
      },
      "11542": {
        "quantity": 100,
        "id": "11542"
      },
      "11543": {
        "quantity": 100,
        "id": "11543"
      },
      "11544": {
        "quantity": 100,
        "id": "11544"
      },
      "11545": {
        "quantity": 100,
        "id": "11545"
      },
      "11546": {
        "quantity": 100,
        "id": "11546"
      },
      "11547": {
        "quantity": 100,
        "id": "11547"
      },
      "11548": {
        "quantity": 100,
        "id": "11548"
      },
      "11549": {
        "quantity": 100,
        "id": "11549"
      },
      "11550": {
        "quantity": 100,
        "id": "11550"
      },
      "11551": {
        "quantity": 100,
        "id": "11551"
      },
      "11552": {
        "quantity": 100,
        "id": "11552"
      },
      "11553": {
        "quantity": 100,
        "id": "11553"
      },
      "11554": {
        "quantity": 100,
        "id": "11554"
      },
      "11555": {
        "quantity": 100,
        "id": "11555"
      },
      "total": {
        "quantity": 5601
      }
    },
    "__meta": {
      "modified": "2020-07-27T15:12:15.138Z"
    }
  }
}