Deploy Feed Application
This page explains how to add a new feed application to Detect. Feed applications ingest data, process it through parsing, aggregation, and enrichment stages, and provide structured inputs for event detection.
A feed application enables Detect to consume and transform data from sources such as Kafka topics or files. Each feed application defines its schema, parsing logic, and configuration for data aggregation and enrichment. To deploy the feed Application in Detect, follow the steps belowy6y
Define feed application
Define the feed name, data models and key attributes for the feed application in the
application.json file. The sample application.json file of Recharge Feed is located
at
$INSTALLER_ROOT/acme/etc/model/applications/recharge/application.json.
{
"feedApplication": {
"dataModels": ["Identity", "Recharge"],
"keyAttribute": "MSISDN",
"name": "Recharge"
}
}- dataModels: This array lists the data models like Identity, used by the feed.
- keyAttribute: This attribute represents the primary key for identifying records. Here, the key attribute is MSISDN.
- name: The name of the feed.
Update Schema of Feed Application
Add the feed's attributes and profiles in the data_model.json file to
define the schema for the feed application. Refer to the Recharge feed's
data_model.json file for structure guidance. The Recharge Feed data_model.jsln is
located at
$INSTALLER_ROOT/acme/etc/model/feed_data_models/recharge/data_model.json.
Implement Feed Logic in main.py
main.py file.- Define the flow composition using
create_flow()to connect the Kafka source and parser operators:source >> parser. - Select the appropriate source operator based on the data type:
- For ingesting real-time data use Kafka-based Source Operator.
Reference implementation in Recharge feed's main.py:
$INSTALLER_ROOT/acme/python/acme/application/recharge/main.py. - For ingesting batch-processed data use File-based Source
Operator. Reference implementation in Customer Profile
Refresh feed's main.py:
$INSTALLER_ROOT/acme/python/acme/application/customer_profile_refresh/main.py.
- For ingesting real-time data use Kafka-based Source Operator.
Reference implementation in Recharge feed's main.py:
- The parser operator receives raw string messages from the source operator.
- The parser reads the data in its native format and creates attributes for the output tuple.
- In
_get_parser_schema(), describe the attributes to extract from the raw data for Detect to process events.
Define Parser in parser.py
The parser processes the data ingested from the source operator and converts it into structured data.
- Kafka Based Parser
The
process()method receives raw string data from the source operator and passes it toprocess_impl(). Theprocess_impl()method converts the incoming JSON string into a structured record, extracts required fields such asMSISDNandtransactionId, and applies transformations like timestamp parsing usingDateTimeParser.Customizations can be made based on the data in the Kafka topic. Reference implementation for Recharge feed is located at
$INSTALLER_ROOT/acme/python/acme/parsers/message_queue/recharge/parser.py - File based parser
_parse_row() method process each line from the file and implements parsing logic for processing each row based on the data format in the file. Reference implementation path is located at :
$INSTALLER_PATH/acme/python/acme/parsers/file/customer_profile_refresh/parser.py.
Update drive.json for Feed Registration
Add the feed details under feedApplications[] and feedDataModels[] sections to register the new feed application in the drive.json file to register the feed application and its data model in the overall system configuration.
- feedApplications: Registers the new feed application "Recharge" with its associated data models and key attribute (MSISDN).
- feedDataModels: Defines the data model for the "Recharge" feed, specifying its key attribute MSISDN.
Reference Path : $INSTALLER_ROOT/acme/etc/drive.json
Add Aggregates and Enrichments
Configure aggregates and enrichments to enhance data flowing through the feed application.
- Configure enrichments to enrich the data with profile data or
aggregated values or simple transformed attributes or derived attributes, add
the data to the tuple record. Reference file path:
$INSTALLER_PATH/acme/etc/model/applications/recharge/enrichments.json - Add aggregates to calculate data based on configuration and rules
configured. You calculate count, sum, average, etc based on any tuple record
attribute. Reference file Path:
$INSTALLER_PATH/acme/etc/model/applications/recharge/aggregations.json
Restart all services
- Restart all services for changes to reflect the changes made.