2016-04-13 16 views
0

Ich habe ein Programm in Spark Streaming, das eingehende Dateien in HDFS erkennt, was ich tun möchte, ist jede Datei analysieren und testen das Vorhandensein von zwei Wörtern in jeder Datei, und an jedem Punkt wissen, wie viele Dateien enthalten diese beiden Wörter. Was ich versucht zu tun:Zählen mit Spark Streaming

val recherche1 = lines.map(x => (x.split(":")(0),x.split(":")(1))).filter(x => x._2 == "mot1") 
    val recherche2 = lines.map(x => (x.split(":")(0),x.split(":")(1))).filter(x => x._2 == "mot2") 
    val n1 = recherche1.count() 
    val n2 = recherche1.count() 
    val p = n1.foreachRDD(rdd => {cont1 = rdd.count() 
    if (cont1 > 0) 
    {n2.foreachRDD(rdd => {cont2 = rdd.count() 
     if (cont2 > 0) 
     {number = number + 1} 
     else 
     {number = number} 
    })} 
    }) 

, was ich will, ist der Wert der Variablen „Zahl“ zu verwenden, so da ich es nicht in Spark-Streaming drucken können, versuche ich es in HBase zu setzen, leider es nicht funktioniert und wenn ich das Skript in funken einreichen starten, gibt es den Fehler:

adding new inputs transformations and output operations after starting a context is not supported 

weiß jemand, was ich falsch mache oder kann mir sagen, wie dies zu tun?

Vielen Dank im Voraus

+0

Der Grund für diesen Fehler liegt darin, dass einige 'DStream'-Operationen aufgerufen werden, nachdem Sie' streamingContext.start' aufgerufen haben. Daher muss das Problem außerhalb des Bereichs des hier gezeigten Code-Snippets liegen. – maasg

+0

Ein Seitenkommentar zum Problem: Es sollte möglich sein, die Zuordnung über die Eingabe zweimal zu vermeiden, um die beiden Wörter zu finden. Lassen Sie uns zuerst über den aktuellen Fehler hinausgehen und dann über die Optimierung dieses Prozesses nachdenken. – maasg

+0

@ maasg, wenn ich dieses Code-Snippet lösche, verschwindet der Fehler, deshalb nahm ich an, dass es die Ursache des Problems ist – Jean

Antwort

0

ich die ganze Struktur des in der Frage nach der Antwort auf Ihre Frage angezeigten Code geändert: filter the lines by two words Spark Streaming und es hat funktioniert, vielleicht diese Struktur für Spark ist nicht geeignet Streaming ..