Deploy Feed Application

This page explains how to add a new feed application to Detect. Feed applications ingest data, process it through parsing, aggregation, and enrichment stages, and provide structured inputs for event detection.

A feed application enables Detect to consume and transform data from sources such as Kafka topics or files. Each feed application defines its schema, parsing logic, and configuration for data aggregation and enrichment. To deploy the feed Application in Detect, follow the steps belowy6y

Define feed application

Define the feed name, data models and key attributes for the feed application in the application.json file. The sample application.json file of Recharge Feed is located at $INSTALLER_ROOT/acme/etc/model/applications/recharge/application.json.

For the Recharge Feed, the attributes are configured as shown below:
{
    "feedApplication": {
        "dataModels": ["Identity", "Recharge"],
        "keyAttribute": "MSISDN",
        "name": "Recharge"
    }
}
  • dataModels: This array lists the data models like Identity, used by the feed.
  • keyAttribute: This attribute represents the primary key for identifying records. Here, the key attribute is MSISDN.
  • name: The name of the feed.

Update Schema of Feed Application

Add the feed's attributes and profiles in the data_model.json file to define the schema for the feed application. Refer to the Recharge feed's data_model.json file for structure guidance. The Recharge Feed data_model.jsln is located at $INSTALLER_ROOT/acme/etc/model/feed_data_models/recharge/data_model.json.

Implement Feed Logic in main.py

Define the feed processing in the main.py file.
  1. Define the flow composition using create_flow() to connect the Kafka source and parser operators: source >> parser.
  2. Select the appropriate source operator based on the data type:
    • For ingesting real-time data use Kafka-based Source Operator. Reference implementation in Recharge feed's main.py: $INSTALLER_ROOT/acme/python/acme/application/recharge/main.py.
    • For ingesting batch-processed data use File-based Source Operator. Reference implementation in Customer Profile Refresh feed's main.py: $INSTALLER_ROOT/acme/python/acme/application/customer_profile_refresh/main.py.
  3. The parser operator receives raw string messages from the source operator.
  4. The parser reads the data in its native format and creates attributes for the output tuple.
  5. In _get_parser_schema(), describe the attributes to extract from the raw data for Detect to process events.

Define Parser in parser.py

The parser processes the data ingested from the source operator and converts it into structured data.

  • Kafka Based Parser

    The process() method receives raw string data from the source operator and passes it to process_impl(). The process_impl() method converts the incoming JSON string into a structured record, extracts required fields such as MSISDN and transactionId, and applies transformations like timestamp parsing using DateTimeParser.

    Customizations can be made based on the data in the Kafka topic. Reference implementation for Recharge feed is located at $INSTALLER_ROOT/acme/python/acme/parsers/message_queue/recharge/parser.py

  • File based parser

    _parse_row() method process each line from the file and implements parsing logic for processing each row based on the data format in the file. Reference implementation path is located at : $INSTALLER_PATH/acme/python/acme/parsers/file/customer_profile_refresh/parser.py.

Update drive.json for Feed Registration

Add the feed details under feedApplications[] and feedDataModels[] sections to register the new feed application in the drive.json file to register the feed application and its data model in the overall system configuration.

  • feedApplications: Registers the new feed application "Recharge" with its associated data models and key attribute (MSISDN).
  • feedDataModels: Defines the data model for the "Recharge" feed, specifying its key attribute MSISDN.

Reference Path : $INSTALLER_ROOT/acme/etc/drive.json

Add Aggregates and Enrichments

Configure aggregates and enrichments to enhance data flowing through the feed application.

  • Configure enrichments to enrich the data with profile data or aggregated values or simple transformed attributes or derived attributes, add the data to the tuple record. Reference file path: $INSTALLER_PATH/acme/etc/model/applications/recharge/enrichments.json
  • Add aggregates to calculate data based on configuration and rules configured. You calculate count, sum, average, etc based on any tuple record attribute. Reference file Path: $INSTALLER_PATH/acme/etc/model/applications/recharge/aggregations.json

Restart all services

  • Restart all services for changes to reflect the changes made.