2017-06-26 3 views
0

Ich bin in Scala, Apache Spark Welt und ich versuche zu verstehen, wie man eine "Pipeline" erstellen, die einen DataFrame basierend auf den Ereignissen generieren wird, die ich erhalte.Update Scala DF basierend auf Ereignissen

Zum Beispiel ist die Idee, dass, wenn ich ein bestimmtes Protokoll/Ereignis erhalte, ich eine Zeile in den DF einfügen/aktualisieren muss.

Lassen Sie uns ein echtes Beispiel machen. Ich möchte einen DataFrame erstellen, der den Status der Benutzer in meiner Datenbank (Postgres, Mongo was auch immer) darstellt. Wenn ich sage Zustand, ich meine den aktuellen Zustand des Benutzers (AKTIV, UNVOLLSTÄNDIG, BLOCKIERT, etc). Diese Statusänderung hängt von der Benutzeraktivität ab, daher erhalte ich Logs (JSON) mit dem Schlüssel "status": "ACTIVE" und so weiter.

So bekomme ich zum Beispiel Protokolle von einem Kafka-Thema .. irgendwann bekomme ich ein Protokoll, das mich interessiert, weil es nützliche Informationen über den Benutzer definiert (den Status etc ..) Ich nehme das log, und ich erstelle ein DF mit diesem Protokoll darin. Dann erhalte ich das zweite Protokoll, aber dieses wurde vom selben Benutzer ausgeführt, also muss die Zeile aktualisiert werden (wenn sich der Status natürlich ändert!) Also keine neue Zeile sondern die vorhandene aktualisieren. Drittes Protokoll, neuer Benutzer, neue Information, also als neue Zeile in der vorhandenen DF speichern und so weiter. Am Ende dieses Prozesses/Pipeline, sollte ich einen DF mit den Informationen aller Benutzer in meinem db und ihre "status" haben, so kann ich sagen: "Oh, sieh dir das an, es gibt 43 Benutzer, die blocked und 13 sind sind active! Erstaunlich! "

Dies ist die Idee .. der Prozess muss in Echtzeit sein.

Bis jetzt habe ich dies mit Dateien versucht, die nicht mit einem Kafka-Thema verbinden. Zum Beispiel habe ich rote Datei wie folgt:

val DF = mysession.read.json("/FileStore/tables/bm2ube021498209258980/exampleLog_dp_api-fac53.json","/FileStore/tables/zed9y2s11498229410434/exampleLog_dp_api-fac53.json") 

, die innerhalb eines DF mit 2 Reihen mit allem, was generats.

+--------------------+-----------------+------+--------------------+-----+ 
|     _id|   _index|_score|    _source|_type| 
+--------------------+-----------------+------+--------------------+-----+ 
|AVzO9dqvoaL5S78GvkQU|dp_api-2017.06.22|  1|[2017-06-22T08:40...|DPAPI| 
| AVzO9dq5S78GvkQU|dp_api-2017.06.22|  1|[null,null,[Wrapp...|DPAPI| 
+--------------------+-----------------+------+--------------------+-----+ 

in _source gibt es alle verschachtelten Dinge (die status ich erwähnt ist hier!).

Dann habe ich einige nützliche Informationen ausgewählt wie

DF.select("_id", "_source.request.user_ip","_source.request.aw", "_type").show(false) 

+--------------------+------------+------------------------------------+-----+ 
|_id     |user_ip  |aw         |_type| 
+--------------------+------------+------------------------------------+-----+ 
|AVzO9dqvoaL5S78GvkQU|111.11.11.12|285d5034-dfd6-44ad-9fb7-ba06a516cdbf|DPAPI| 
|AVzO9dq5S78GvkQU |111.11.11.82|null        |DPAPI| 
+--------------------+------------+------------------------------------+-----+ 

wieder, ist die Idee, diese DF mit den Protokollen von einem kafka Thema ankommen zu erstellen und das Protokoll in diesem DF UPSERT. Hoffe ich erklärte gut, ich möchte keine "Code" -Lösung Ich würde Tipps oder Beispiel, wie Sie dieses Ergebnis erreichen möchten. Danke.

Antwort

0

Wie Sie nach Ressourcen suchen würde ich vorschlagen, die folgenden: Werfen Sie einen Blick auf die Spark-Streaming Programming Guide (https://spark.apache.org/docs/latest/streaming-programming-guide.html) und die Spark-Streaming + Kafka Integration Guide (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html).

Verwenden Sie das Spark Streaming + Kafka-Integrationshandbuch, um zu erfahren, wie Sie einen Stream mit Ihrem Kafka-Inhalt öffnen können. Dann werfen Sie einen Blick auf die möglichen Transformationen, die Sie mit Spark Streaming durchführen können, im Kapitel "Transformationen auf DStreams" im Spark Streaming Programmierhandbuch Sobald Sie den Stream so transformiert haben, dass Sie eine letzte Operation ausführen können einen Blick auf "Ausgabeoperationen auf DStreams" im Spark Streaming Programming Guide.Ich denke, vor allem .forEachRDD könnte sein, was Sie suchen - wie Sie eine Operation (wie überprüfen, ob ein bestimmtes Schlüsselwort in Ihrer Zeichenfolge ist und auf dieser Grundlage einen Datenbankaufruf) für jedes Element des Streams.

Verwandte Themen