Pipeline d'index Catalogue Ingest

Les mappages de données complets à partir de la spécification, de la base de données et du schéma sont affichés pour la catégorie Catalogue.

Mappage de la zone d'index Catalogue à partir de la base de données

La séquence d'étapes suivante illustre le pipeline d'indexation Catalogue implémenté dans Apache NiFi. Le flux se compose principalement de deux étapes :
  1. Création d'un document Catalogue
  2. Association de filtres de catalogue

Etape 1 : Création d'un document Catalogue

Cette étape décrit comment les données Catalogue peuvent être transformées et chargées dans l'index Catalogue. Il commence par exécuter l'instruction SQL suivante pour récupérer les données Catalogue depuis la base de données Commerce.
SELECT C.CATALOG_ID, C.IDENTIFIER, C.MEMBER_ID, D.NAME, D.SHORTDESCRIPTION, 
            COALESCE(D.LANGUAGE_ID, L.LANGUAGE_ID) LANGUAGE_ID, 
            L.LOCALENAME, T.MASTERCATALOG, F.STOREENT_ID
		  FROM LANGUAGE L, STORELANG F, STORECAT T, CATALOG C
          LEFT OUTER JOIN CATALOGDSC D ON(C.CATALOG_ID = D.CATALOG_ID AND D.LANGUAGE_ID = ${param.langId})
		 WHERE T.STOREENT_ID IN (SELECT RELATEDSTORE_ID FROM STOREREL WHERE STATE = 1 AND STRELTYP_ID = -4 AND STORE_ID = ${param.storeId})
            AND T.CATALOG_ID = C.CATALOG_ID  AND C.CATALOG_ID = ${param.catalogId}
		    AND F.STOREENT_ID = ${param.storeId} AND F.LANGUAGE_ID = L.LANGUAGE_ID AND L.LANGUAGE_ID = ${param.langId}	

Ensuite, le jeu de résultats est transmis au processeur CreateCatalogDocumentFromDatabase pour transformation, à l'aide du tableau suivant pour ​mapper la zone de base de données renvoyée depuis l'instruction SQL ci-dessus vers une zone d'index dans l'index Catalogue :​

Nom de zone d'index Type de zone d'index ​​Description
Identificateur du document.
​id/catalog id_​string ID interne du catalogue de vente opérationnelle ; mappé à CATALOG.CATALOG_ID
​id/member id_string​ Le numéro de référence interne qui identifie le propriétaire du catalogue ; mappé à CATALOG.MEMBER_ID
​id/​language id_string Langue utilisée pour toutes les données spécifiques à la langue dans ce document pour le magasin actuel ; mappé à CATALOGDSC.LANGUAGE_ID
​identifier/specification id_​string Définir sur "catalogue"
identifier/​catalog id_​string Une chaîne qui identifie de manière unique catalogue de possession ; mappé à CATALOG.IDENTIFIER
​identifier/language id_​string La chaîne linguistique de cette langue prise en charge ; mappé à CATALOGDSC​.LANGUAGE_ID et LANGUAGE.LANGUAGE
​​Données sensibles à la langue​​​
​name/raw ​​raw Le nom dépendant de la langue de ce groupe de catalogues ; mappé à CATALOGDSC​​.NAME
​​name/normalized normalized Identique à ci-dessus
​​description/raw ​​raw Une courte description de ce groupe de catalogue ; mappé à CATALOGDSC.SHORTDESCRIPTION
Propriétés
type id_​string "maître" comme catalogue principal ou "ventes" comme catalogue de vente ; mappé à STORECAT.MASTERCATALOG
Pour un exemple de code, voir Exemples de l'étape 1.

Etape 2 : Association de filtres de catalogue

Cette étape décrit comment les données du filtre du catalogue peuvent être transformées et chargées dans l'index Attribut. Il commence par exécuter l'instruction SQL suivante pour récupérer les données Catalogue depuis la base de données Commerce :
SELECT F.STOREENT_ID, F.CATALOG_ID, F.USAGE, F.CATFILTER_ID, F.IDENTIFIER, X.EXPRESSION_ID, X.MEMBER_ID,
	       CAST(X.QUERY AS VARCHAR) QUERY, X.TRADING_ID
		  FROM EXPRESSION X, CATFILTER F
	WHERE X.CATFILTER_ID = F.CATFILTER_ID
	AND F.CATALOG_ID IN (SELECT C.CATALOG_ID FROM STORECAT C​ WHERE C.MASTERCATALOG = 1
		                           AND C.STOREENT_ID IN (SELECT RELATEDSTORE_ID FROM STOREREL
			                                              WHERE STATE = 1 AND STRELTYP_ID = -4 AND STORE_ID = ${param.storeId}))
        AND F.STOREENT_ID IN ( SELECT RELATEDSTORE_ID FROM STOREREL WHERE STATE = 1 AND STRELTYP_ID = -32 AND STORE_ID = ${param.storeId} )
​    
Ensuite, le jeu de résultats est transmis au processeur FindFiltersFromDatabase pour transformation, à l'aide du tableau suivant pour ​mapper la zone de base de données renvoyée depuis l'instruction SQL ci-dessus vers une zone d'index dans l'index Catalogue :​
Nom de zone d'index Type de zone d'index ​​Description
Filtres
​filters/id/filter id_string​ L'identificateur interne du filtre du catalogue qui s'applique au catalogue actuel
​filters/id/contract id_string L'identificateur interne du contrat associé à ce filtre de catalogue pour le catalogue actuel
​filters/id/member id_string L'identificateur interne du propriétaire associé à ce filtre de catalogue pour le catalogue actuel
​​filters/id/expression​ raw L'identificateur interne de l'expression de ce filtre de catalogue
​filters/identifier ​​id_string L'identificateur externe de ce filtre de catalogue
​filters/query ​​raw La chaîne de requête pour représenter ce filtre de catalogue
​filters/usage raw L'utilisation de ce filtre de catalogue
Pour un exemple de code, voir Exemples de l'étape 2.

Exemples de l'étape 1

Le code suivant est un exemple des données d'entrée pour le processeur CreateCatalogDocumentFromDatabase :

{
  "CATALOG_ID": 10001,
  "IDENTIFIER": "Extended Sites Catalog Asset Store",
  "MEMBER_ID": 7000000000000001000,
  "NAME": "Extended Sites Catalog Asset Store",
  "SHORTDESCRIPTION": null,
  "LANGUAGE_ID": -1,
  "LOCALENAME": "en_US           ",
  "MASTERCATALOG": "1",
  "STOREENT_ID": 1
}
Le processeur CreateCatalogDocumentFromDatabase transforme les données d'entrée en données de sortie suivantes :

{ "update": { "_id": "1--1-10001", "_index": ".auth.1.catalog.202006160325", 
"retry_on_conflict": 5, "_source": false } }
{
  "doc": {
    "identifier": {
      "catalog": "Extended Sites Catalog Asset Store",
      "specification": "catalog",
      "language": "en_US"
    },
    "name": {
      "normalized": "Extended Sites Catalog Asset Store",
      "raw": "Extended Sites Catalog Asset Store"
    },
    "id": {
      "catalog": "10001",
      "member": "7000000000000001000",
      "language": "-1",
      "store": "1"
    },
    "type": "master",
    "__meta": {
      "created": "2020-07-28T18:42:27.926Z",
      "modified": "2020-07-28T18:42:27.942Z",
      "version": {
        "min": 0,
        "max": 0
      }
    }
  },
  "doc_as_upsert": true
}

Exemples de l'étape 2

Le code suivant est un exemple des données d'entrée pour le processeur FindFiltersFromDatabase :
{
  "STOREENT_ID": 1,
  "CATALOG_ID": 10001,
  "USAGE": null,
  "CATFILTER_ID": 3074457345616679000,
  "IDENTIFIER": "TestCatalogFilter",
  "EXPRESSION_ID": 3074457345618259500,
  "MEMBER_ID": null,
  "QUERY": "( +*:* -parentCatgroup_id_search:\"10001_10006\")",
  "TRADING_ID": null
}

Le processeur FindFiltersFromDatabase transforme les données d'entrée avec store id, language id et catalog id transmis à partir de la classe NiFi FlowFile en données de sortie suivantes :


{ "update": { "_id": "1--1-10001", "_index": ".auth.1.catalog.202006160325", 
"retry_on_conflict": 5, "_source": false } }
{
  "doc": {
    "filters": [
      {
        "identifier": "TestCatalogFilter",
        "usage": "",
        "query": "( +*:* -path.tree:\"10006\")",
        "id": {
          "filter": "3074457345616679000",
          "expression": "3074457345618259500",
          "contract": "",
          "member": ""
        }
      }
    ],
    "__meta": {
      "modified": "2020-07-28T18:54:46.440Z"
    }
  }
}