Ingest Catalog index pipeline
The complete data mappings from specification, database and schema are shown for the Catalog category.
Catalog index field mapping from database
The following sequence of steps illustrates the Catalog indexing pipeline
implemented in Apache NiFi. The flow consists of mainly two stages:
- Creating a Catalog document
- Associating Catalog filters
Stage 1: Creating a Catalog document
This stage describes how to transform the Catalog data and load it into the Catalog
index. It starts with running the following SQL to retrieve Catalog data from the
Commerce
database.
SELECT C.CATALOG_ID, C.IDENTIFIER, C.MEMBER_ID, D.NAME, D.SHORTDESCRIPTION,
COALESCE(D.LANGUAGE_ID, L.LANGUAGE_ID) LANGUAGE_ID,
L.LOCALENAME, T.MASTERCATALOG, F.STOREENT_ID
FROM LANGUAGE L, STORELANG F, STORECAT T, CATALOG C
LEFT OUTER JOIN CATALOGDSC D ON(C.CATALOG_ID = D.CATALOG_ID AND D.LANGUAGE_ID = ${param.langId})
WHERE T.STOREENT_ID IN (SELECT RELATEDSTORE_ID FROM STOREREL WHERE STATE = 1 AND STRELTYP_ID = -4 AND STORE_ID = ${param.storeId})
AND T.CATALOG_ID = C.CATALOG_ID AND C.CATALOG_ID = ${param.catalogId}
AND F.STOREENT_ID = ${param.storeId} AND F.LANGUAGE_ID = L.LANGUAGE_ID AND L.LANGUAGE_ID = ${param.langId}
Next, the result set is passed to the CreateCatalogDocumentFromDatabase processor for transformation, using the following table to map the database field returned from the SQL above to an index field in the Catalog index.
Index Field Name | Index Field Type | Description |
---|---|---|
Document Identifier | ||
id/catalog | id_string | Internal id of the operational sales catalog; mapped to CATALOG.CATALOG_ID |
id/member | id_string | The internal reference number that identifies the owner of the catalog; mapped to CATALOG.MEMBER_ID |
id/language | id_string | The language used for all language specific data in this document for the current store; mapped to CATALOGDSC.LANGUAGE_ID |
identifier/specification | id_string | Set to "catalog" |
identifier/catalog | id_string | A string that uniquely identifies the owning catalog; mapped to CATALOG.IDENTIFIER |
identifier/language | id_string | The language string of this supported language; mapped to CATALOGDSC.LANGUAGE_ID and LANGUAGE.LANGUAGE |
Language Sensitive Data | ||
name/raw | raw | The language-dependent name of this catalog group; mapped to CATALOGDSC.NAME |
name/normalized | normalized | Same as above |
description/raw | raw | A short description of this catalog group; mapped to CATALOGDSC.SHORTDESCRIPTION |
Properties | ||
type | id_string | "master" as the master catalog, or "sales" as the sales catalog; mapped to STORECAT.MASTERCATALOG |
Stage 2: Associating Catalog filters
This stage describes how to transform the catalog filter data and load it into the Catalog index. It starts with running the following SQL to retrieve Catalog data from the Commerce database:SELECT F.STOREENT_ID, F.CATALOG_ID, F.USAGE, F.CATFILTER_ID, F.IDENTIFIER, X.EXPRESSION_ID, X.MEMBER_ID,
CAST(X.QUERY AS VARCHAR) QUERY, X.TRADING_ID
FROM EXPRESSION X, CATFILTER F
WHERE X.CATFILTER_ID = F.CATFILTER_ID
AND F.CATALOG_ID IN (SELECT C.CATALOG_ID FROM STORECAT C WHERE C.MASTERCATALOG = 1
AND C.STOREENT_ID IN (SELECT RELATEDSTORE_ID FROM STOREREL
WHERE STATE = 1 AND STRELTYP_ID = -4 AND STORE_ID = ${param.storeId}))
AND F.STOREENT_ID IN ( SELECT RELATEDSTORE_ID FROM STOREREL WHERE STATE = 1 AND STRELTYP_ID = -32 AND STORE_ID = ${param.storeId} )
Next,
the result set is passed to the FindFiltersFromDatabase
processor for transformation, using the following table to map the database field
returned from the SQL above to an index field in the Catalog index:
For example code, see Stage 2 samples.
Index Field Name | Index Field Type | Description |
---|---|---|
Filters | ||
filters/id/filter | id_string | The internal identifier of the catalog filter that applies to the current catalog |
filters/id/contract | id_string | The internal identifier of the contract that is associated with this catalog filter for the current catalog |
filters/id/member | id_string | The internal identifier of the owner that is associated with this catalog filter for the current catalog |
filters/id/expression | raw | The internal identifier of the expression of this catalog filter |
filters/identifier | id_string | The external identifier of this catalog filter |
filters/query | raw | The query string for representing this catalog filter |
filters/usage | raw | The usage of this catalog filter |
Stage 1 samples
The following code is an example of the input data for the
CreateCatalogDocumentFromDatabase
processor:
{
"CATALOG_ID": 10001,
"IDENTIFIER": "Extended Sites Catalog Asset Store",
"MEMBER_ID": 7000000000000001000,
"NAME": "Extended Sites Catalog Asset Store",
"SHORTDESCRIPTION": null,
"LANGUAGE_ID": -1,
"LOCALENAME": "en_US ",
"MASTERCATALOG": "1",
"STOREENT_ID": 1
}
The CreateCatalogDocumentFromDatabase processor transforms the
input data into the following output
data:
{ "update": { "_id": "1--1-10001", "_index": ".auth.1.catalog.202006160325",
"retry_on_conflict": 5, "_source": false } }
{
"doc": {
"identifier": {
"catalog": "Extended Sites Catalog Asset Store",
"specification": "catalog",
"language": "en_US"
},
"name": {
"normalized": "Extended Sites Catalog Asset Store",
"raw": "Extended Sites Catalog Asset Store"
},
"id": {
"catalog": "10001",
"member": "7000000000000001000",
"language": "-1",
"store": "1"
},
"type": "master",
"__meta": {
"created": "2020-07-28T18:42:27.926Z",
"modified": "2020-07-28T18:42:27.942Z",
"version": {
"min": 0,
"max": 0
}
}
},
"doc_as_upsert": true
}
Stage 2 samples
The following code is an example of the input data for the FindFiltersFromDatabase
processor:
{
"STOREENT_ID": 1,
"CATALOG_ID": 10001,
"USAGE": null,
"CATFILTER_ID": 3074457345616679000,
"IDENTIFIER": "TestCatalogFilter",
"EXPRESSION_ID": 3074457345618259500,
"MEMBER_ID": null,
"QUERY": "( +*:* -parentCatgroup_id_search:\"10001_10006\")",
"TRADING_ID": null
}
The FindFiltersFromDatabase processor transforms the input data
with the store id
, language id
, and
catalog id
passed in from the NiFi FlowFile class into the
following output data:
{ "update": { "_id": "1--1-10001", "_index": ".auth.1.catalog.202006160325",
"retry_on_conflict": 5, "_source": false } }
{
"doc": {
"filters": [
{
"identifier": "TestCatalogFilter",
"usage": "",
"query": "( +*:* -path.tree:\"10006\")",
"id": {
"filter": "3074457345616679000",
"expression": "3074457345618259500",
"contract": "",
"member": ""
}
}
],
"__meta": {
"modified": "2020-07-28T18:54:46.440Z"
}
}
}