2017-05-05 2 views
0

i nifi Flow (Standalone) wieWie man Flowfile Count in Nifi Warteschlange?

executestreamprocessor(hive script) -> executestreamprocessor(hadoop script). 

für jede und jeden eingehenden Flowfile haben, läuft hive Skript mit dem Befehl INSERT..INTO..SELECT..FROM und hadoop Skript löscht die einzelnen Dateien aus dem Speicher Bereich.

Manchmal wird das Hive-Skript fehlgeschlagen, wenn gleichzeitig das Hadoop-Skript den Befehl ausführt.

Ich bekomme maximal 4 Anzahl von Dateien für jede Stunde. Also plante ich, den Kontrollprozessor zwischen den Hive- und Hadoop-Prozessoren zu verwenden. Ich setze die Bedingung, wenn die Zählung der Warteschlange 4 Flowfile erreicht, dann sollte ein Hadoop-Skript ausgeführt werden. Aber Kontrolle hat die Eigenschaft, nur für maximale Rate einzustellen. Es hat keinen Mindestsatz.

Gibt es eine mögliche Lösung zu erreichen? oder irgendeine andere Lösung?

Antwort

2

Sie sollten in der Lage sein ExecuteScript dafür zu verwenden, versuchen Sie diese Groovy Skript:

def flowFiles = session.get(4) 
if(!flowFiles || flowFiles.size() < 4) { 
    session.rollback() 
} else { 
    session.transfer(flowFiles, REL_SUCCESS) 
} 

Wenn Sie nur einmal die Abströmung auslösen wollen, dann können Sie ein Kind Flow-Datei von den Eltern zu schaffen (und berichten eine Herkunft Ereignis JOIN):

def flowFiles = session.get(4) 
if(!flowFiles || flowFiles.size() < 4) { 
    session.rollback() 
} else { 
    def flowFile = session.create(flowFiles) 
    session.provenanceReporter.join(flowFiles, flowFile) 
    session.remove(flowFiles) 
    session.transfer(flowFile, REL_SUCCESS) 
} 

gesagt haben, dass, wenn Sie nicht über die Flow-Datei Inhalte kümmern (dh Sie eine Flow-Datei als Trigger) verwenden, können Sie MergeContent mit einem Minimum verwenden könnte und Maximale Anzahl der Einträge = 4.

+0

Ich bekomme 4 Flowfile in einem Ort. Ich werde jede dieser Dateien nehmen, weitergeleitet an Hive-Skripte (executivestreamprocessor) und hadoop-Skripte (executivestreamprocessor). Im Grunde werde ich jede Datei über Hive-Skripte und Hadoop-Skript verarbeiten, um alle Dateien zu löschen. Aber ich möchte die Dateien nicht einzeln löschen. hier denke ich nicht, daß execscript (groovy) und mergecontent hineinpassen. bitte erleuchte mehr. –

+0

Kennt das Skript bereits die zu löschenden Dateien (wie alle aus einem Ordner)? Wenn ja, sollte das zweite Skript funktionieren, nach den Hive-Skripten und vor dem Hadoop executestream-Befehl, dann erhalten Sie 4 Hive-Befehle, dann ein Hadoop delete – mattyb

+0

ist es möglich, den Zustand des zweiten Skripts für die andere Beziehung zu überprüfen? –

Verwandte Themen