2015-06-28 11 views
5

Ich bin neu in Spark und Spark Streaming. Ich arbeite an Twitter-Streaming-Daten. Meine Aufgabe besteht darin, jeden Tweet unabhängig zu behandeln, z. B. die Anzahl der Wörter in jedem Tweet zu zählen. Von dem, was ich gelesen habe, jede Eingabe Batch bildet auf RDD in Spark Streaming. Wenn ich also ein Stapelintervall von 2 Sekunden gebe, dann enthält die neue RDD alle Tweets für zwei Sekunden und jede angewendete Transformation wird auf ganze zwei Sekunden Daten angewendet, und es gibt keine Möglichkeit, innerhalb dieser zwei Sekunden mit einzelnen Tweets umzugehen. Ist mein Verständnis richtig? oder bildet jeder Tweet eine neue RDD? Ich bin irgendwie verwirrt ...Batchgröße in Spark Streaming

Antwort

1

In einer Charge haben Sie eine RDD mit allen Status, die in 2 Sekunden Intervall kam. Dann können Sie diese Status einzeln verarbeiten. Hier ein kurzes Beispiel:

JavaDStream<Status> inputDStream = TwitterUtils.createStream(ctx, new OAuthAuthorization(builder.build()), filters); 

     inputDStream.foreach(new Function2<JavaRDD<Status>,Time,Void>(){ 
      @Override 
      public Void call(JavaRDD<Status> status, Time time) throws Exception { 
       List<Status> statuses=status.collect(); 
       for(Status st:statuses){ 
        System.out.println("STATUS:"+st.getText()+" user:"+st.getUser().getId());      
       //Process and store status somewhere 
       } 
       return null; 
      }});   
    ctx.start(); 
     ctx.awaitTermination();  
} 

Ich hoffe, ich habe Ihre Frage nicht falsch verstanden.

Zoran

+0

Vielen Dank. Wenn ich Status einzeln in einer Liste speichern kann, kann ich alle RDD-Transformationen oder Aktionen wie reduceByKey(), countByValue auf der Liste anwenden? Obwohl ich neu in Scala bin, muss ich es in Scala machen. – Naren

+0

Ich habe Ihnen nur ein Beispiel mit einer Liste gegeben, um Ihnen zu zeigen, dass Sie auf einzelne Status zugreifen können, aber wenn Sie funken verwenden möchten, um es weiter zu verarbeiten, sollten Sie keine Status zum Auflisten sammeln. Beispielsweise können Sie die Funktion inputDStream.mapToPair implementieren, die Status von einigen Schlüsseln zurückgibt, z. Benutzer-ID oder was auch immer Sie brauchen. Dann können SieByKey reduzieren. Leider habe ich nur Grundkenntnisse in Scala und kann Ihnen kein Beispiel nennen, aber alles, was Sie in Java tun können, können Sie auch in Scala tun. –

+0

Ich dachte, möglicherweise kann ich die Status eines bestimmten Stapels in einer Liste speichern und diese Liste mithilfe von parallelize() in RDD konvertieren, so dass ich Spark-Transformationen und Aktionen anwenden kann. – Naren