2017-02-13 6 views
0

Dies ist eine Follow-up-Frage aufPyspark - Transferkontrolle von Spark-Session (sc)

Pyspark filter operation on Dstream

eine Zählung zu halten, wie viele Fehlermeldungen/Warnmeldungen seit etwa einem Tag kamen durch, Stunde - wie gestaltet man den Job?

Was habe ich versucht:

from __future__ import print_function 

import sys 

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 


    def counts(): 
      counter += 1 
      print(counter.value) 

    if __name__ == "__main__": 

      if len(sys.argv) != 3: 
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr) 
      exit(-1) 


      sc = SparkContext(appName="PythonStreamingNetworkWordCount") 
      ssc = StreamingContext(sc, 5) 
      counter = sc.accumulator(0) 

      lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) 
      errors = lines.filter(lambda l: "error" in l.lower()) 
      errors.foreachRDD(lambda e : e.foreach(counts)) 
      errors.pprint() 

      ssc.start() 
      ssc.awaitTermination() 

dies hat jedoch mehrere Probleme, mit Druck starten funktioniert nicht (keine Ausgabe auf stdout, ich habe darüber gelesen, das Beste, was ich hier verwenden können, ist Protokollierung). Kann ich die Ausgabe dieser Funktion in einer Textdatei speichern und stattdessen diese Datei ankreuzen?

Ich bin nicht sicher, warum das Programm gerade herauskommt, gibt es keine Fehler/irgendwo Dump weiter zu schauen (Funken 1.6.2)

Wie funktioniert ein Staat erhalten? Was ich versuche, zu aggregieren Protokolle, die von Server und Schwere, ist ein weiterer Anwendungsfall zu zählen, wie viele Transaktionen durch die Suche nach bestimmten Schlüsselwörtern verarbeitet wurden

Pseudo-Code für das, was ich versuchen wollen:

foreachRDD(Dstream): 
    if RDD.contains("keyword1 | keyword2 | keyword3"): 
    dictionary[keyword] = dictionary.get(keyword,0) + 1 //add the keyword if not present and increase the counter 
    print dictionary //or send this dictionary to else where 

Die Der letzte Teil des Wörterbuchs zum Senden oder Drucken erfordert das Austauschen des Spark-Streaming-Kontexts - Kann jemand das Konzept erklären?

Antwort

0

Druck funktioniert nicht

ich die design patterns section der Spark-Dokumentation zu lesen empfehlen würde. Ich denke, dass in etwa, was Sie wollen, so etwas wie dieses:

def _process(iter): 
    for item in iter: 
     print item 

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) 
errors = lines.filter(lambda l: "error" in l.lower()) 
errors.foreachRDD(lambda e : e.foreachPartition(_process)) 

Dies wird Ihren Anruf print zu arbeiten (obwohl es ist erwähnenswert, dass die print-Anweisung auf die Arbeiter ausgeführt wird und nicht die Treiber, wenn Sie also Wenn Sie diesen Code auf einem Cluster ausführen, wird er nur in den Arbeitsprotokollen angezeigt.

Allerdings wird es nicht Ihr zweites Problem lösen:

Wie funktioniert ein Staat erhalten?

Schauen Sie sich hierzu updateStateByKey und die related example an.

+0

Wäre es Print-Artikel oder Drucken (Artikel) - Aus irgendeinem Grund, wenn ich "Drucken" irgendwo im Programm verwendet, kommt es ohne irgendwelche Fehler - eher wie "Absturz" ohne irgendwelche Debugging-Informationen/Fehler – GreenThumb

Verwandte Themen