2017-03-28 4 views
0

Ich habe das Gefühl, Spark ist schlauer als ich und Neuordnung (oder zumindest im Vergleich zu dem geschriebenen Code), was auf Executors usw. läuft.Speichert Spark meine UDF nur für Datensätze, die angezeigt werden?

Angenommen, ich habe eine sehr einfache Spark Anfrage in Scala wie folgt.

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 
val rawData = sqlContext.sql("FROM mytable SELECT *") 

I eine neue Spalte einige Funktionen in einem UDF dann erstellen verwendet wird, ist diese Funktion nicht leicht (oder zumindest einen Teil der Zeit) und stützt sich auf mehrere Spalten in den Daten. Grob gesagt sieht meine UDF ähnlich aus, obwohl die Verarbeitung nur ein Beispiel ist.

def method1(s1:String, s2:String):String = { 
    List(s1, s2).mkString(" ") 
} 

val method1UDF = udf(method1 _) 

val dataWithCol = rawData 
        .withColumn("newcol", method1UDF($"c1",$"c2")) 

dataWithCol.show(100) 

Meine Frage dreht sich tatsächlich um die letzte Anweisung, oder zumindest glaube ich es tut.

Wenn meine Datenmenge 1 Milliarde Datensätze enthält, wendet Spark tatsächlich nur meine withColumn auf 100 Datensätze an, oder wendet es sie auf alle 1 Millionen Datensätze an und gibt dann nur die ersten 100 zurück?

In Hive nehme ich das Äquivalent wäre:

SELECT t.c1, t.c2, CONCAT_WS(" ",t.c1,t.c2) as newCol from (
    SELECT c1,c2 as newCol FROM mytable limit 100 
) t 

Auch in Code obwohl es, wie ich das Äquivalent der Abfrage

SELECT * from (
    SELECT c1,c2, CONCAT_WS(" ",c1,c2) as newCol FROM mytable 
) t limit 100 
folgende geschrieben haben, sieht

ich vermute, dass es das tut früher, da das Hinzufügen eines Filters auf der neuen Spalte die Operation drastisch verlangsamt. Wenn ich die letzte Zeile zu ändern:

dataWithCol.filter($"newCol" === "H i").show(100) 

Dies wird nun mit der Funktion zu viel mehr Daten anzuwenden (vermutlich der gesamte Datensatz), bevor es funktioniert die Grenze von 100, ähnlich der folgenden Hive Abfrage:

SELECT * from (
    SELECT c1,c2, CONCAT_WS(" ",c1,c2) as newCol FROM mytable 
) t where t.newCol == "H i" limit 100 

Bin ich auf dem richtigen Weg mit dem, was Spark im Hintergrund macht? Wird meine Abfrage optimiert, indem nur die Verarbeitung von Datensätzen angewendet wird, die am Ende angezeigt werden?

+0

Hinweis: Überprüfen Sie "erklären" Ergebnis :) –

Antwort

1

Wenn Sie nicht sicher sind, euch alle Tage ein Experiment machen:

Spark context available as 'sc' (master = local[*], app id = local-1490732267478). 
Spark session available as 'spark'. 
Welcome to 
     ____    __ 
    /__/__ ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/ '_/ 
    /___/ .__/\_,_/_/ /_/\_\ version 2.1.0 
     /_/ 

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_121) 
Type in expressions to have them evaluated. 
Type :help for more information. 

scala> :paste 
// Entering paste mode (ctrl-D to finish) 

val rawData = spark.range(0, 1000000000, 1, 1000) 
    .toDF("id") 
    .select(
    $"id".cast("string").alias("s1"), 
    $"id".cast("string").alias("s2")) 

val counter = sc.longAccumulator("counter") 

def f = udf((s1: String, s2: String) => { 
    counter.add(1) 
    s"$s1 $s2" 
}) 

rawData.select(f($"s1", $"s2")).show(10) 



// Exiting paste mode, now interpreting. 
+-----------+ 
|UDF(s1, s2)| 
+-----------+ 
|  0 0| 
|  1 1| 
|  2 2| 
|  3 3| 
|  4 4| 
|  5 5| 
|  6 6| 
|  7 7| 
|  8 8| 
|  9 9| 
+-----------+ 
only showing top 10 rows 

rawData: org.apache.spark.sql.DataFrame = [s1: string, s2: string] 
counter: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(counter), value: 12) 
f: org.apache.spark.sql.expressions.UserDefinedFunction 

scala> counter.value 
res1: Long = 12 

Wie Sie Funken sehen begrenzt die Anzahl der Datensätze verarbeitet werden, aber es ist nicht genau das präzise. Sie sollten auch daran denken, dass diese Ergebnisse versions- und abfrageabhängig sind.

Zum Beispiel frühere Spark-Version, wo ziemlich begrenzt bei der Anwendung von Optimierungen auf UDF-Aufrufe. Auch die Upstream-Wide-Transformation kann dieses Verhalten beeinflussen und dazu führen, dass mehr (oder sogar alle) Datensätze verarbeitet werden.

1

Spark wendet etwas an, das als "faule Ausführung" bekannt ist. Dies bedeutet, dass Aktionen nur bei Bedarf ausgewertet werden. Es macht also etwas zwischen den beiden Aussagen, die du geschrieben hast. Der Ausführungsplaner ist clever genug, um herauszufinden, was zu tun ist und was nicht. Um mehr Details zu sehen, gehen Sie zu localhost: 4040 (erhöhen Sie den Port für jeden laufenden Kontext um 1).

+0

Danke für Ihre Antwort, beide half mir herauszufinden, was vor sich ging, aber kann nur eine als Antwort auswählen. –

Verwandte Themen