2016-03-24 15 views
2

Ich habe einen Spark-Streaming-Job. Ich möchte Filter auf meine Eingabe RDD anwenden.Spark Streaming - Filter dynamisch

Ich möchte Filterkriterien jedes Mal dynamisch von Hbase während jeder Spark-Streaming-Batch abrufen.

Wie erreiche ich das?

Ich kann Verbindungsobjekt einmal mit Map-Partitionen erstellen.

Aber mit in Funkenfilter wie erreiche ich das gleiche?

+0

Abhängig von Ihren Filterkriterien können Sie dies möglicherweise mit einem 'Join' erreichen. Sie müssten ein vollständigeres Beispiel dafür geben, was Sie zu tun versuchen, aber wenn die linke Seite der Verknüpfung Ihr Spark-Stream ist, wäre die rechte Seite eine Reihe von Kriterien. Wenn keines der Kriterien zutrifft, führt der Join zu keinen Zeilen - er filtert sie. –

Antwort

0

Ich denke, der richtige Ansatz eine Filterfunktion des eigenen (Pseudo-Code) schreibt:

DStream<Integer> intDstream= someIntegerIntoDStream; 
intDstream.foreachPartition{ 
    create HBase connection here if you need it for a batch 
    while(arg0.hasNext()){ //here you have an iterator 
      Integer current = arg0.next(); 
      create HBase connection here if you need it for each element 
      //Here is your filter function: 
      if(current meets your condition) 
       arg0.remove(); 

Also, was passiert ist, dass Sie auf Ihrem Testamentsvollstrecker ausgeführt werden und Sie manuell jedes Element Kommissionierung, Anlegen einer Bedingung dazu und entfernen Sie es, wenn es Ihren Kriterien entspricht.