2015-09-25 4 views
42

Also, ich habe viele Jahre in einer objektorientierten Welt mit Code-Wiederverwendung, Design Patterns und Best Practices immer berücksichtigt, ich finde mich etwas mit Code-Organisation und Code-Wiederverwendung in der Welt von Spark kämpfen.Spark Code Organisation und Best Practices

Wenn ich versuche, Code auf wiederverwendbare Weise zu schreiben, kommt es fast immer zu Leistungseinbußen und am Ende schreibe ich alles neu, was für meinen speziellen Anwendungsfall optimal ist. Diese Konstante "schreibe, was für diesen speziellen Anwendungsfall optimal ist" wirkt sich auch auf die Code-Organisation aus, da das Teilen von Code in verschiedene Objekte oder Module schwierig ist, wenn "alles wirklich zusammengehört" und ich somit sehr wenige "Gott" -Objekte länge Ketten komplexer Transformationen. Tatsächlich denke ich häufig, dass ich, wenn ich mir den Großteil des Spark-Codes, den ich gerade schreibe, während ich in der objektorientierten Welt arbeitete, angeschaut hätte und ihn als "Spaghetti-Code" abgetan hätte.

Ich habe im Internet gesurft und versucht, eine Art Entsprechung zu den Best Practices der objektorientierten Welt zu finden, aber ohne viel Glück. Ich kann einige "Best Practices" für die funktionale Programmierung finden, aber Spark fügt nur eine zusätzliche Ebene hinzu, weil die Leistung hier eine wichtige Rolle spielt.

Also meine Frage an Sie ist, habe jemand von euch Spark Gurus gefunden einige Best Practices zum Schreiben von Spark-Code, den Sie empfehlen können?

EDIT

Wie in einem Kommentar geschrieben, ich hatte nicht erwartet, tatsächlich jemand eine Antwort auf schreiben, wie dieses Problem zu lösen, sondern ich hatte gehofft, dass jemand in dieser Gemeinschaft über einige gekommen war Martin Fowler, der irgendwo in Artikeln oder Blogposts geschrieben hatte, wie man Probleme mit der Code-Organisation in der Welt von Spark angehen kann.

@DanielDarabos schlug vor, dass ich ein Beispiel für eine Situation geben könnte, in der Code-Organisation und Leistung widersprüchlich sind. Während ich feststelle, dass ich in meiner täglichen Arbeit häufig Probleme damit habe, finde ich es ein bisschen schwierig, es auf ein gutes minimales Beispiel zu reduzieren;) aber ich werde es versuchen.

In der objektorientierten Welt bin ich ein großer Fan des Single-Responsibility-Prinzips, also würde ich sicherstellen, dass meine Methoden nur für eine Sache verantwortlich sind. Es macht sie wiederverwendbar und leicht testbar. Wenn ich zum Beispiel die Summe einiger Zahlen in einer Liste berechnen müsste (die einige Kriterien erfüllt) und ich den Durchschnitt derselben Zahl berechnen müsste, würde ich definitiv zwei Methoden erstellen - eine, die die Summe berechnet und eine berechnete den Durchschnitt. Wie folgt aus:

def main(implicit args: Array[String]): Unit = { 
    val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)) 

    println("Summed weights for DK = " + summedWeights(list, "DK") 
    println("Averaged weights for DK = " + averagedWeights(list, "DK") 
} 

def summedWeights(list: List, country: String): Double = { 
    list.filter(_._1 == country).map(_._2).sum 
} 

def averagedWeights(list: List, country: String): Double = { 
    val filteredByCountry = list.filter(_._1 == country) 
    filteredByCountry.map(_._2).sum/ filteredByCountry.length 
} 

Ich kann natürlich auch weiterhin SRP in Funken zu ehren:

def main(implicit args: Array[String]): Unit = { 
    val df = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)).toDF("country", "weight") 

    println("Summed weights for DK = " + summedWeights(df, "DK") 
    println("Averaged weights for DK = " + averagedWeights(df, "DK") 
} 


def avgWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = { 
    import org.apache.spark.sql.functions._ 
    import sqlContext.implicits._ 

    val countrySpecific = df.filter('country === country) 
    val summedWeight = countrySpecific.agg(avg('weight)) 

    summedWeight.first().getDouble(0) 
} 

def summedWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = { 
    import org.apache.spark.sql.functions._ 
    import sqlContext.implicits._ 

    val countrySpecific = df.filter('country === country) 
    val summedWeight = countrySpecific.agg(sum('weight)) 

    summedWeight.first().getDouble(0) 
} 

Aber weil meine df Milliarden Zeilen enthalten würde ich lieber nicht die filter zweimal ausführen müssen. Tatsächlich ist die Leistung direkt an die EMR-Kosten gekoppelt, also will ich das wirklich nicht. Um das zu beheben, ich entscheiden, was zu SRP zu verletzen und einfach gesagt, die beiden Funktionen in ein und stellen Sie sicher, ich rufe beharren auf dem Land gefilterten DataFrame, wie folgt aus:

def summedAndAveragedWeights(df: DataFrame, country: String, sqlContext: SQLContext): (Double, Double) = { 
    import org.apache.spark.sql.functions._ 
    import sqlContext.implicits._ 

    val countrySpecific = df.filter('country === country).persist(StorageLevel.MEMORY_AND_DISK_SER) 
    val summedWeights = countrySpecific.agg(sum('weight)).first().getDouble(0) 
    val averagedWeights = summedWeights/countrySpecific.count() 

    (summedWeights, averagedWeights) 
} 

nun dieses Beispiel wenn natürlich eine riesige Vereinfachung dessen, was im wirklichen Leben angetroffen wird.Hier könnte ich es einfach durch Filtern und Persistent dfvor Übergabe an die Summe und avg Funktionen (die auch mehr SRP wäre) lösen, aber im wirklichen Leben gibt es eine Reihe von Zwischenberechnungen gehen, die wieder und benötigt werden nochmal. Mit anderen Worten, die filter Funktion hier ist nur ein Versuch, ein Beispiel für etwas zu machen, das davon profitieren wird, dauerhaft zu sein. In der Tat denke ich, Anrufe an persist ist ein Schlüsselwort hier. Wenn ich persist anrufe, wird das meinen Job enorm beschleunigen, aber die Kosten sind, dass ich den gesamten Code, der von der persistenten DataFrame abhängt, fest koppeln muss - auch wenn sie logisch getrennt sind.

+5

Beliebig Sprache im Besonderen? Ich bin überhaupt kein Guru, aber für Java und Scala glaube ich nicht, dass es einen Grund gibt, Ihren Code nicht nach eigenen Standards zu strukturieren. Databricks Reference Apps (https://github.com/databricks/reference-apps/tree/master/timeseries) sind ein wirklich guter Anfang, um Spark-Projekte zu strukturieren. Ich hoffe es hilft! – Marco

+0

Ich untersuche verschiedene Methoden und habe den Spaghetti-Code, über den du sprichst, wenn ich zum ersten Mal einen Datensatz erfahre. Dann denke ich darüber nach, wie ich die Daten klassifizieren soll, mit denen ich arbeite. Wie mutiert es usw. Von da an funktionieren die klassischen Software-Designmuster für mich. –

+1

Außerdem sehe ich keinen Branchenkonsens darüber, wie effizienter, skalierbarer Code in einer verteilten Umgebung geschrieben werden kann, die wiederverwendbar ist. Die Muster sind in der Regel stark mit Daten gekoppelt, so dass Sie fleißig arbeiten müssen, um Daten mit vereinbarten Standards zu erstellen. Für einige Probleme wird dies niemals effizient genug sein. –

Antwort

7
+4

Ich möchte Ihnen für die umfassende Liste danken und ich stimme Ihnen zu, dass es viele Lektionen gibt, die von anderen gelernt werden können (warum habe ich die Frage überhaupt geschrieben ;-)). Ich habe jedoch das Gefühl, dass ich ziemlich gut verstehe, wie Spark funktioniert - es geht eher darum, Code so zu schreiben, dass er nicht nur in Bezug auf die Leistung optimal ist, sondern auch die Verwendung von guten Programmierpraktiken, wie z Bedenken. Zumindest finde ich, dass die beiden Konzepte sich mehr oder weniger gegenseitig ausschließen. –