Ingest Inventory index pipeline
For information about the Product index schema design, see Ingest Product index schema. For information on calling the Ingest service, see Search Ingest Service API. For a complete listing of Elasticsearch index fields and parameters, see Elasticsearch index field types.
Inventory field mapping from database
The following sequence of steps illustrates the Inventory indexing pipeline implemented in Apache NiFi. The flow consists of mainly two stages:
- Stage 1: Associate parent inventories to Product document
- This stage describes how the Inventory data can be transformed and loaded into the Product index.
- Stage 2: Associate child inventories to Product document
- This stage describes how the child inventory data can be transformed and loaded into
the Product index. It starts with running the following SQL to retrieve inventory data
from the Commerce database:
Next, the result set is passed to the FindChildInventoriesFromDatabase processor for transformation, using the following table to map the database field returned from the SQL above to an index field in the Product index:SELECT CATENTRY.CATENTRY_ID, LISTAGG(INVENTORY.FFMCENTER_ID, ', ') WITHIN GROUP (ORDER BY INVENTORY.FFMCENTER_ID) FFMCENTER_ID, LISTAGG(TO_CHAR(INVENTORY.QUANTITY), ', ') WITHIN GROUP (ORDER BY INVENTORY.FFMCENTER_ID) QUANTITY, SUM(INVENTORY.QUANTITY) TOTAL FROM INVENTORY, CATENTRY, CATGPENREL WHERE CATGPENREL.CATALOG_ID = ${param.catalogId} AND CATENTRY.CATENTRY_ID = CATGPENREL.CATENTRY_ID AND CATENTRY.CATENTTYPE_ID = 'ItemBean' AND CATENTRY.MARKFORDELETE = 0 AND CATENTRY.CATENTRY_ID = INVENTORY.CATENTRY_ID AND INVENTORY.STORE_ID = ${param.storeId} GROUP BY CATENTRY.CATENTRY_ID ORDER BY CATENTRY.CATENTRY_ID OFFSET ${param.offset} ROWS FETCH NEXT ${param.pageSize} ROWS ONLY
For example code, see stage 2 samplesIndex Field Name Index Field Type Description Properties inventories/<fulfillment>/quantity float Quantity of the current inventory; mapped to INVENTORY.QUANTITY inventories/<fulfillment>/id id_string Fulfillment center id of the current inventory; mapped to INVENTORY.FFMCENTER_ID inventories/total float Sum of the quantity from the INVENTORY.QUANTITY from all the fulfillment centers for the catalog entry.
- Stage 1 samples
- The following code is an example of the input data for the
FindParentInventoriesFromDatabase
processor:
{ "CATENTRY_ID_PARENT": 13698, "FFMCENTER_ID": "10501, 11501", "QUANTITY": "606, 600", "TOTAL": 1206.0 }
- Stage 2 samples
- The following code is an example of the input data for the
FindChildInventoriesFromDatabase
processor:
{ "CATENTRY_ID": 11778, "FFMCENTER_ID": "10501, 11501, 11502, 11503, 11504, 11505, 11506, 11507, 11508, 11509, 11510, 11511, 11512, 11513, 11514, 11515, 11516, 11517, 11518, 11519, 11520, 11521, 11522, 11523, 11524, 11525, 11526, 11527, 11528, 11529, 11530, 11531, 11532, 11533, 11534, 11535, 11536, 11537, 11538, 11539, 11540, 11541, 11542, 11543, 11544, 11545, 11546, 11547, 11548, 11549, 11550, 11551, 11552, 11553, 11554, 11555", "QUANTITY": "101, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100", "TOTAL": 5601.0 }