Procédez comme suit pour créer votre processeur NiFi personnalisé.
Procedure
-
Créez les classes de processeur et les cas de test personnalisés.
- Les classes pour les processeurs sont créées dans le répertoire /src/main/java/ ;
- Les cas de test de processeur correspondants sont créés dans le répertoire /src/test/java/.
-
Créez une classe sous src/main/java et étendez-la avec org.apache.nifi.processor.AbstractProcessor. Si vous souhaitez étendre les classes par défaut, étendez-les à l'aide de com.hcl.software.data.ingest.processors.AbstractCommerceBaseProcessor.
-
Fournissez les balises et les attributs appropriés à l'aide des annotations suivantes.
@Tags({"example"})
@CapabilityDescription("Provide a descrption")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
-
Pour organiser les importations, cliquez avec le bouton droit de la souris sur le fichier Java et sélectionnez .
-
Implémentez la méthode héritée manquante.
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException
{
// TODO Auto-generated method stub
}
Vous êtes maintenant prêt à commencer à écrire la logique de votre processeur personnalisé.
-
Créez la logique personnalisée pour votre processeur NiFi. Les étapes suivantes illustrent comment la logique que vous créez dans Java affecte le processeur NiFi ou s'affiche dans l'interface utilisateur NiFi.
Reportez-vous au Guide du développeur NiFi pour obtenir plus de détails sur la création d'un processeur NiFi personnalisé et pour une explication approfondie des API utilisées pour développer des extensions.
-
Déclarez les zones de propriété à l'aide de
org.apache.nifi.components. Le type de données PropertyDescriptor apparaît dans l'onglet PROPRIETES du processeur personnalisé.
Par exemple :
public static final PropertyDescriptor SCROLL_DURATION = new PropertyDescriptor.Builder().name("Scroll Duration")
.description("The scroll duration is how long each search context is kept in memory")
.required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true).build();
-
Déclarez les zones de relation à l'aide de
org.apache.nifi.processor. Le type de données de relation apparaît dans l'onglet PARAMETRES du processeur personnalisé.
Par exemple :
public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder().name("success")
.description("The flow file with the specified content was successfully transferred").build();
public static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder().name("failure")
.description("The flow file with the specified content has encountered an error during the transfer").build();
public static final Relationship RELATIONSHIP_NEXT = new Relationship.Builder().name("next")
.description("The flow file with the specified content for the next iteration").build();
-
Ajoutez les propriétés et relations déclarées dans la méthode init
init().
@Override
protected void init(final ProcessorInitializationContext context) {
getSupportedPropertyDescriptors().add(SCROLL_DURATION);
getRelationships().add(RELATIONSHIP_SUCCESS);
getRelationships().add(RELATIONSHIP_FAILURE);
getRelationships().add(RELATIONSHIP_NEXT);
-
Remplacez la méthode
onTrigger() pour effectuer les opérations suivantes :
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final String duration = context.getProperty(SCROLL_DURATION).evaluateAttributeExpressions().getValue();
try {
- Obtenez le FlowFile et son contenu à partir de ProcessSession.
final FlowFile flowFile = session.get();
final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
session.exportTo(flowFile, bytes);
bytes.close();
final String content = bytes.toString();
- Effectuez les opérations que le processeur est censé effectuer avec le contenu.
final Map<String, Object> json = (Map<String, Object>) new JsonSlurper().parseText(content);
final String scroll = (String) json.get("_scroll_id");
final Map<String, Object> hits = (Map<String, Object>) json.get(FIELD_HITS);
final List<Object> docs = (List<Object>) hits.get(FIELD_HITS);
if (docs.size() > 0) {
final String scrollBody = String.format("{ \"scroll\": \"%s\", \"scroll_id\": \"%s\" }", duration, scroll);
FlowFile nextFlowFile = session.create(flowFile);
session.putAttribute(nextFlowFile, "index.scroll.uri", "_search/scroll");
- Ecrivez à nouveau le résultat dans le FlowFile.
nextFlowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream outputStream) throws IOException {
outputStream.write(scrollBody.getBytes());
outputStream.flush();
}
});
session.getProvenanceReporter().modifyContent(nextFlowFile);
- Transférez le FlowFile vers la relation souhaitée.
session.getProvenanceReporter().route(flowFile, RELATIONSHIP_SUCCESS);
session.transfer(flowFile, RELATIONSHIP_SUCCESS);
-
Dans org.apache.nifi.processor.Processor, créez une entrée contenant le nom de classe qualifié complet de votre processeur personnalisé récemment créé. Si vous avez créé un service de contrôleur personnalisé, créez une entrée dans org.apache.nifi.controller.ControllerService.
Par exemple, dans l'explorateur de projets, développez et cliquez sur org.apache.nifi.processor.Processor. Entrez votre nom de classe dans le volet de saisie de texte sur le côté droit de la fenêtre.
Results
Votre processeur personnalisé est créé.