2017-12-27 15 views
1

ich eine Anwendung entwickle, wo ich mit dem gleichen Schlüssel in RDD für jedes Paar von Zeilen Berechnungen durchzuführen, ist hier die RDD Struktur:Was geschieht, wenn ich rdd.join nennen (RDD)

List<Tuple2<String, Tuple2<Integer, Integer>>> dat2 = new ArrayList<>(); 
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Alice", new Tuple2<Integer, Integer>(1, 1))); 
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Alice", new Tuple2<Integer, Integer>(2, 5))); 
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Alice", new Tuple2<Integer, Integer>(3, 78))); 
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Bob", new Tuple2<Integer, Integer>(1, 6))); 
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Bob", new Tuple2<Integer, Integer>(2, 11))); 
JavaRDD<Tuple2<String, Tuple2<Integer, Integer>>> y2 = sc.parallelize(dat2); 

Jetzt können Daten für jede Person wie angezeigt werden: (Zeitstempel, Wert). Ich möchte für jede Zeile wissen, wie viele Werte in + -1 Zeitmarken vorkommen. (Ich bin mir bewusst, das sieht aus wie Schiebefenster, aber ich möchte Ereignisebene Granularität)

y2.join(y2); 
resultOfJoin.filter(t -> t._2()._1()._1() - t._2()._2()._1() <= 1 && t._2()._1()._1() - t._2()._2()._1() >= -1) 

Die beste Lösung, die ich in diesem Fall kam, war das RDD mit sich selbst zu verbinden, wodurch k^2 Reihen für jede Person, wobei k ist die Anzahl der Zeilen, die dieser Person zugeordnet sind.

jetzt weiß ich, das ist ein vollständige Katastrophe. Ich verstehe, dass dies ein Shuffle verursachen wird (und Shuffles sind schlecht m'key), aber ich konnte nichts Besseres mitbringen.

Ich habe 3 Fragen:

  1. Da ich direkt nach der Join-Filter, wird die Belastung, die durch die Join verursacht bewirken (mit anderen Worten, es wird irgendwelche Optimierungen sein)?
  2. Wie viele Zeilen wurden im Netzwerk übertragen? (Ich bin mir bewusst, dass das RDD im schlimmsten Fall n^2 Zeilen haben wird) werden die im Netzwerk gesendeten Zeilen #workers n (Senden nur einer Kopie und Duplizieren auf Worker) oder #workers n^2 (sendende Zeile für jede 2-Zeilen-Kombination auf dem Ergebnis-Arbeiter)?
  3. Wenn ich mit Dataset arbeitete, könnte ich mit Filter verbinden. Ich verstehe, Datasets haben zusätzliche Optimierung für die Berechnung Grafik. Wie viel Verbesserung, wenn überhaupt, sollte ich erwarten, wenn ich zu Datasets übergehe?

Antwort

1

Da ich richtig filtern nach dem Join, wird die Belastung, die durch die Verbindung verursacht bewirken (in anderen Worten, es Optimierungen sein)?

Nein, es wird keine Optimierungen geben.

Wie viele Zeilen wurden im Netzwerk übertragen?

O (N) (genauer gesagt jeder Datensatz wird zweimal neu gemischt werden, einmal für jeden Elternteil) Sie von Join-Schlüssel, so dass jeder Punkt geht an einen, und nur eine Partition.

Wenn ich von Dataset arbeitete, könnte ich mit Filter verbinden. Ich verstehe, Datasets haben zusätzliche Optimierung für die Berechnung Grafik. Wie viel Verbesserung, wenn überhaupt, sollte ich erwarten, wenn ich zu Datasets übergehe?

Der Mischvorgang ist besser optimiert, aber ansonsten können keine fallspezifischen Optimierungen erwartet werden.

möchte für jede Zeile die Anzahl der Werte in + -1 Zeitstempel wissen.

Try Fensterfunktionen:

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.expressions._ 

val w = Window.partitionBy("id").ordetBy("timestamp") 

rdd.toDF("id", "data") 
    .select($"id", $"data._1" as "timestamp", $"data._2" as "value")) 
    .withColumn("lead", lead($"value", 1).over(w)) 
    .withColumn("lag", lag($"value", 1).over(w)) 
+0

Zunächst vielen Dank für die Antwort. Ich habe Ihre Antwort akzeptiert, aber es gibt eine kleine Sache, die dort notiert werden sollte (in meinem OP wurde dies nicht ausdrücklich erwähnt, das ist der Grund, warum ich akzeptiert habe). Fenster, Verzögerung und Vorlauf lassen Sie das nächste Element in der geordneten Liste betrachten, das bedeutet, dass diese Berechnung falsch ist für Fälle, in denen: 1. Nicht alle Zeitstempel Daten 2 haben. Mehrere Zeilen können innerhalb des Fensters vorhanden sein (z. B. um zu aggregieren) alle Daten in 1 Sekunde Fenster und Reihen haben Nanosekunden Granularität). –

Verwandte Themen