Ich bin neu in Spark und Spark Streaming. Ich arbeite an Twitter-Streaming-Daten. Meine Aufgabe besteht darin, jeden Tweet unabhängig zu behandeln, z. B. die Anzahl der Wörter in jedem Tweet zu zählen. Von dem, was ich gelesen habe, jede Eingabe Batch bildet auf RDD in Spark Streaming. Wenn ich also ein Stapelintervall von 2 Sekunden gebe, dann enthält die neue RDD alle Tweets für zwei Sekunden und jede angewendete Transformation wird auf ganze zwei Sekunden Daten angewendet, und es gibt keine Möglichkeit, innerhalb dieser zwei Sekunden mit einzelnen Tweets umzugehen. Ist mein Verständnis richtig? oder bildet jeder Tweet eine neue RDD? Ich bin irgendwie verwirrt ...Batchgröße in Spark Streaming
5
A
Antwort
1
In einer Charge haben Sie eine RDD mit allen Status, die in 2 Sekunden Intervall kam. Dann können Sie diese Status einzeln verarbeiten. Hier ein kurzes Beispiel:
JavaDStream<Status> inputDStream = TwitterUtils.createStream(ctx, new OAuthAuthorization(builder.build()), filters);
inputDStream.foreach(new Function2<JavaRDD<Status>,Time,Void>(){
@Override
public Void call(JavaRDD<Status> status, Time time) throws Exception {
List<Status> statuses=status.collect();
for(Status st:statuses){
System.out.println("STATUS:"+st.getText()+" user:"+st.getUser().getId());
//Process and store status somewhere
}
return null;
}});
ctx.start();
ctx.awaitTermination();
}
Ich hoffe, ich habe Ihre Frage nicht falsch verstanden.
Zoran
Verwandte Themen
- 1. Spark-Streaming-Streaming aktiv Zählung
- 2. RDD-Partitionierung in Spark Streaming
- 3. Parsen json in spark-streaming
- 4. Kafka Spark-Streaming-Integration
- 5. Zählen mit Spark Streaming
- 6. Spark Streaming Akkumulierte Wortzählung
- 7. Spark Streaming MYSql
- 8. Spark Streaming benutzerdefinierte Metriken
- 9. Spark Streaming Replay
- 10. Pause Spark Streaming Job
- 11. Spark-Streaming UpdateStateByKey
- 12. Spark Streaming Kafka Gegendruck
- 13. Spark-Streaming Elasticsearch Abhängigkeiten
- 14. Spark Streaming Model Overwrite
- 15. Spark Streaming - Filter dynamisch
- 16. Spark streaming rawSocketStream
- 17. Spark Streaming Kafka Stream
- 18. Streaming Kmeans Spark JAVA
- 19. Spark Streaming historischen Zustand
- 20. NoClassDefFound Ausnahme Spark-Streaming
- 21. Persisting Spark-Streaming Ausgabe
- 22. Spark-Streaming-Fenster-Operation
- 23. Spark-Streaming-Schema
- 24. Spark Streaming verlieren SparkContext
- 25. Herunterfahren von Spark Streaming
- 26. In-Order-Verarbeitung in Spark Streaming
- 27. Spark-Streaming-Checkpoints für DStreams
- 28. Spark Streaming - Verarbeitung binärer Datendatei
- 29. Spark Streaming für Fitbit-Daten
- 30. Kombinieren von Spark Streaming + MLlib
Vielen Dank. Wenn ich Status einzeln in einer Liste speichern kann, kann ich alle RDD-Transformationen oder Aktionen wie reduceByKey(), countByValue auf der Liste anwenden? Obwohl ich neu in Scala bin, muss ich es in Scala machen. – Naren
Ich habe Ihnen nur ein Beispiel mit einer Liste gegeben, um Ihnen zu zeigen, dass Sie auf einzelne Status zugreifen können, aber wenn Sie funken verwenden möchten, um es weiter zu verarbeiten, sollten Sie keine Status zum Auflisten sammeln. Beispielsweise können Sie die Funktion inputDStream.mapToPair implementieren, die Status von einigen Schlüsseln zurückgibt, z. Benutzer-ID oder was auch immer Sie brauchen. Dann können SieByKey reduzieren. Leider habe ich nur Grundkenntnisse in Scala und kann Ihnen kein Beispiel nennen, aber alles, was Sie in Java tun können, können Sie auch in Scala tun. –
Ich dachte, möglicherweise kann ich die Status eines bestimmten Stapels in einer Liste speichern und diese Liste mithilfe von parallelize() in RDD konvertieren, so dass ich Spark-Transformationen und Aktionen anwenden kann. – Naren