2015-02-01 17 views
8

ich zur Zeit der Suche in Spark-Streaming mit in Logfile-ähnlichen Einträgen zu nehmen und einige Berechnung auf sich aus statistischen Gründen zu tun.Spark-Streaming mit einer dynamischen Lookup-Tabelle

Es gibt Datensätze auf HDFS, auf die von HBase und Hive aus zugegriffen werden kann. Diese werden benötigt, um einige Daten zu suchen und zu transformieren, z. B. Mappings zwischen IPs und Maschinennamen und Maschinenbesitzern.

Die Funke-Anwendung wird Tag für Tag auf unserem Cluster laufen, ohne Neustart. Diese Referenztabellen werden jedoch alle paar Stunden aktualisiert.

Es ist in Ordnung, wenn die Daten etwas alt sind, aber es ist nicht in Ordnung, dass die Daten zwei Wochen alt sind. Daher möchte ich wissen, wie ich Daten für Transformationen und Anreicherungen in meiner Map suchen und Phasen reduzieren kann. Ich hatte ein paar Ideen.

  1. Broadcast-Variablen können den Datensatz einlesen und effizient weiterleiten. Sobald jedoch eine Broadcast-Variable festgelegt wurde, kann sie nicht mehr geändert werden, und das erneute Abrufen der Daten in der Treiberklasse führt dazu, dass das Nichtbestehen und Übertragen des neuen nicht funktioniert, da die Worker-Zeiger alle auf das alte Dataset zeigen. Ich weiß nicht, ob es eine Möglichkeit gibt, das zu umgehen.

  2. HBase get() Abfragen können gemacht werden. Wenn die Daten basierend auf dem Schlüssel der Suche zu Reduzierern geleitet werden, kann jeder Reduzierer einen Cache einer Teilmenge des Gesamtdatensatzes halten und kann seinen eigenen lokalen Cache halten. HBase sollte eine minimale Latenz beim Abrufen einzelner Datensätze haben.

  3. Etwas anderes?

Antwort

3

Sie haben hier zwei Möglichkeiten.

Zunächst ist foreachRDD Transformation auf Ihre DSTREAM zu verwenden. foreachRDD wird auf der Treiberseite ausgeführt, dh Sie können dort eine neue RDD erstellen. Sie können den Zeitzähler speichern und die Datei alle 10 bis 15 Minuten erneut aus HDFS lesen

Zweitens ist eine Datei in der transform Transformation über den DStream lesen und speichern Sie die Ergebnisse davon im Speicher. Mit diesem Ansatz Sie die gesamte Lookup-Tabelle von jedem der Testamentsvollstrecker zu lesen, die

nicht effizient ist, würde ich Ihnen empfehlen, den ersten Ansatz zu verwenden. Um noch genauer zu sein, können Sie das Flag bei der letzten Aktualisierung der Daten speichern und in Ihrer Spark-Anwendung speichern. Bei jeder Iteration prüfen Sie den Wert dieses Flags (z. B. in HBase oder Zookeeper gespeichert) und vergleichen ihn mit dem lokal gespeicherten - wenn es anders ist, dann lesen Sie die Nachschlagetabelle erneut, falls nicht - führen Sie den Vorgang mit der alte

+0

Ich habe eine verwandte Frage. Meine Nachschlagetabelle ist ungefähr 2 Millionen Zeilen und ist statisch. Key ist eine Zeichenfolge mit etwa 100 Zeichen und der Wert eine Zeichenfolge mit etwa 10 Zeichen. Im Moment habe ich diese Daten in einer indizierten mongo db Sammlung gespeichert und suche während eines Transformationsschritts. Ich mache meinen Anruf im Batch, so dass ich nur einen Treffer pro Transformation erhalte, aber es ist immer noch ein Netzwerkanruf. Macht es Sinn, eine Lookup-Tabelle so groß wie eine Spark-Broadcast-Var zu machen? –

+1

2 Millionen Datensätze 110 Bytes sind jeweils nur 220 MB Daten - nicht so viel für Broadcast-Variable. Wenn Sie 1 Executor pro HW-Knoten haben, sollten Sie sicherstellen, dass die erforderliche Mindestmenge an Kopien dieser 220 MB in Ihrem Cluster gespeichert wird. Wenn es statisch ist, können Sie es zu Beginn Ihrer Verarbeitung in den Speicher laden und später verwenden. Ich empfehle Ihnen nicht, eine zentralisierte Sache wie MongoDB zu verwenden, da der Cluster wächst, wird es Ihr Flaschenhals sein.Wenn die Daten vollständig statisch sind, können Sie in Erwägung ziehen, die Daten in einer Datei auf jedem der Knoten oder in einem lokalen Speicher auf jedem Knoten zu speichern. – 0x0FFF

Verwandte Themen