2017-04-20 2 views
0

Ich bin nicht sicher, ob wir RDDs in Spark testen können.So testen Sie Spark RDD

Ich stieß auf einen Artikel, wo es heißt, eine RDD Mocking ist keine gute Idee. Gibt es eine andere Methode oder Best Practice für das Testen von RDDs?

+1

Haben Sie sich Holden [Spark-Test-Base] (https://github.com/holdenk/spark-testing-base) schon angesehen? – Pushkr

Antwort

0

Es gibt zwei Methoden zum Testen von Spark RDD/Applications. Sie sind wie folgt:

Zum Beispiel:

Einheit Test:

import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD 

class WordCount { 
    def get(url: String, sc: SparkContext): RDD[(String, Int)] = { 
    val lines = sc.textFile(url) lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) 
    } 
} 

Jetzt Methode 1 zu Test ist wie folgt:

import org.scalatest.{ BeforeAndAfterAll, FunSuite } 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 

class WordCountTest extends FunSuite with BeforeAndAfterAll { 
    private var sparkConf: SparkConf = _ 
    private var sc: SparkContext = _ 

    override def beforeAll() { 
    sparkConf = new SparkConf().setAppName("unit-testing").setMaster("local") 
    sc = new SparkContext(sparkConf) 
    } 

    private val wordCount = new WordCount 

    test("get word count rdd") { 
    val result = wordCount.get("file.txt", sc) 
    assert(result.take(10).length === 10) 
    } 

    override def afterAll() { 
    sc.stop() 
    } 
} 

In Methode 1 verspotten wir RDD nicht. Wir überprüfen nur das Verhalten unserer WordCount Klasse. Aber hier müssen wir Sparkcontext selbst erstellen und zerstören. Also, wenn Sie zusätzlichen Code nicht für die schreiben wollen, dann können Sie spark-testing-base verwenden, wie folgt aus:

Methode 2:

import org.scalatest.FunSuite 
import com.holdenkarau.spark.testing.SharedSparkContext 

class WordCountTest extends FunSuite with SharedSparkContext { 
    private val wordCount = new WordCount 

    test("get word count rdd") { 
    val result = wordCount.get("file.txt", sc) 
    assert(result.take(10).length === 10) 
    } 
} 

Oder

import org.scalatest.FunSuite 
import com.holdenkarau.spark.testing.SharedSparkContext 
import com.holdenkarau.spark.testing.RDDComparisons 

class WordCountTest extends FunSuite with SharedSparkContext { 
    private val wordCount = new WordCount 

    test("get word count rdd with comparison") { 
    val expected = sc.textFile("file.txt") 
        .flatMap(_.split(" ")) 
        .map((_, 1)) 
        .reduceByKey(_ + _) 

    val result = wordCount.get("file.txt", sc) 

    assert(RDDComparisons.compare(expected, result).isEmpty) 
    } 
} 

Für mehr Details über Funken RDD-Prüfung beziehen sich das - KnolX: Unit Testing of Spark Applications

+0

Mine ist nur ein kleines Programm, also versuche ich, Methode zu verwenden: 1, Aber was Sie (himanshu) in Methode 1 gezeigt haben, vergleicht nicht die RDD. Sie führen eine Aktion auf dieser RDD aus und versuchen dann, den Integer-Wert gleichzusetzen. Was ich will, ist 2 RDDs zu vergleichen ... Sagen wir RDD [myClass] === RDD [myClass] – AJm

+0

Zum Vergleich RDD (s) sollten Sie 'RDDComparisons', in Methode 2 erwähnt. – himanshuIIITian

+0

Aber das ist durch Verwenden einer von Some entwickelten benutzerdefinierten Bibliothek, die sich noch in der Entwicklung befindet und nicht unter einem großen Umbrella wie Apache. Möglicherweise ist es auch nicht produktionsbereit. – AJm

1

Danke Sie für diese offene Frage da draußen. Aus irgendeinem Grund, wenn es um Spark geht, sind alle so in der Analytik gefangen, dass sie die großartigen Softwareentwicklungspraktiken vergessen, die in den letzten 15 Jahren entstanden sind. Deshalb legen wir Wert darauf, Tests und kontinuierliche Integration (ua DevOps) in unserem Kurs zu diskutieren.

A Quick Abgesehen auf Terminologie

Bevor ich auf, ich habe eine kleine Meinungsverschiedenheit mit der KnolX Präsentation zum Ausdruck bringt @himanshuIIITian zitiert. Ein true Komponententest bedeutet, dass Sie die vollständige Kontrolle über alle Komponenten im Test haben. Es kann keine Interaktion mit Datenbanken, REST-Aufrufen, Dateisystemen oder sogar der Systemuhr geben; alles muss "verdoppelt" werden (z. B. verspottet, gestempelt usw.), wie Gerard Mezaros es in xUnit Test Patterns ausdrückt. Ich weiß, das scheint Semantik zu sein, aber es ist wirklich wichtig. Wenn Sie dies nicht verstehen, ist dies einer der Hauptgründe dafür, dass bei der kontinuierlichen Integration unterbrochene Testfehler auftreten.

Wir können noch Unit Test

So dieses Verständnis gegeben, Unit-Tests ein RDD unmöglich ist. Bei der Entwicklung von Analysen ist jedoch noch ein Platz für Unit-Tests vorhanden.

(Hinweis: Ich werde Scala für die Beispiele verwenden, aber die Konzepte überspannen Sprachen und Frameworks.)

eine einfache Bedienung vor:

rdd.map(foo).map(bar) 

Hier foo und bar sind einfache Funktionen. Diese können normal getestet werden, und sie sollten mit so vielen Ecken Fällen, wie Sie aufbringen können. Warum kümmert es sie, woher sie ihre Eingaben bekommen, ob es ein Testgerät oder ein ist?

Vergessen Sie nicht den Funken Shell

Dies ist nicht die Prüfung per se, aber in diesen frühen Phasen sollten Sie auch in der Spark-Shell experimentieren Ihre Transformationen, um herauszufinden, und vor allem die Folgen Ihres Ansatzes. Sie können beispielsweise physische und logische Abfragepläne, Partitionierungsstrategien und -konservierungen sowie den Status Ihrer Daten mit vielen verschiedenen Funktionen wie toDebugString, explain, glom, show, printSchema und so weiter untersuchen. Ich werde dich diese erkunden lassen.

Sie können Ihren Master auch auf local[2] in der Spark-Shell und in Ihren Tests festlegen, um mögliche Probleme zu identifizieren, die erst auftreten können, wenn Sie mit der Arbeit beginnen.

Integrationstests mit Funken

Nun zum spaßigen Teil.

Um Integrationstest Funken, nachdem Sie in der Qualität Ihrer Helferfunktionen und RDD/DataFrame Transformationslogik sicher fühlen, ist es wichtig, ein paar Dinge (unabhängig von Build-Tool und Test-Framework) zu tun:

  • Erhöhen Sie den JVM-Speicher.
  • Aktivieren Sie Forking, deaktivieren Sie jedoch die parallele Ausführung.
  • Verwenden Sie Ihr Testframework, um Ihre Spark-Integrationstests in Suites zusammenzufassen, und initialisieren Sie das SparkContext vor allen Tests und stoppen Sie es nach allen Tests.

Es gibt mehrere Möglichkeiten, diese letzte zu tun. Eine ist verfügbar von der spark-testing-base, die sowohl von @Pushkr als auch von der KnolX-Präsentation zitiert wird, die durch @himanshuIIITian verbunden ist.

Das Loan-Muster

Ein weiterer Ansatz ist die Loan Pattern zu verwenden.

Zum Beispiel (unter Verwendung von ScalaTest):

class MySpec extends WordSpec with Matchers with SparkContextSetup { 
    "My analytics" should { 
    "calculate the right thing" in withSparkContext { (sparkContext) => 
     val data = Seq(...) 
     val rdd = sparkContext.parallelize(data) 
     val total = rdd.map(...).filter(...).map(...).reduce(_ + _) 

     total shouldBe 1000 
    } 
    } 
} 

trait SparkContextSetup { 
    def withSparkContext(testMethod: (SparkContext) => Any) { 
    val conf = new SparkConf() 
     .setMaster("local") 
     .setAppName("Spark test") 
    val sparkContext = new SparkContext(conf) 
    try { 
     testMethod(sparkContext) 
    } 
    finally sparkContext.stop() 
    } 
} 

Wie Sie sehen können, macht das Darlehen Muster Verwendung von Funktionen höherer Ordnung auf „Darlehen“ die SparkContext auf die Probe und dann darüber zu verfügen, nachdem es erledigt.

Suffering-Oriented Programming (Danke, Nathan)

Es ist völlig eine Frage der Präferenz, aber ich ziehe die Loan-Muster und Draht Dinge selbst zu verwenden, so lange, wie ich kann, bevor bringen ein anderer Rahmen. Abgesehen von dem Versuch, leicht zu bleiben, fügen Frameworks manchmal eine Menge "Magie" hinzu, die das Debuggen von Testfehlern schwierig macht. Also nehme ich einen Suffering-Oriented Programming Ansatz - wo ich es vermeide, ein neues Framework hinzuzufügen, bis der Schmerz, es nicht zu haben, zu viel ist. Aber das liegt wieder bei Ihnen.

nun ein Ort, an dem Funkentestbasis wirklich glänzt, ist mit den Hadoop-basierten Helfer wie HDFSClusterLike und YARNClusterLike. Das Mischen dieser Eigenschaften kann dir sehr viel Rüstungsschmerzen ersparen. Ein anderer Ort, wo es scheint, ist mit den Scalacheck-ähnliche Eigenschaften und Generatoren. Aber noch einmal, ich würde persönlich darauf verzichten, es zu benutzen, bis meine Analytik und meine Tests dieses Niveau der Raffinesse erreichen.

Integrationstests mit Spark-Streaming

Schließlich würde Ich mag nur einen Ausschnitt aus präsentieren, was ein SparkStreaming Integration Testaufbau mit Werten im Speicher aussehen würde:

val sparkContext: SparkContext = ... 
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3")) 
val rdd: RDD[(String, String)] = sparkContext.parallelize(data) 
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]] 
val streamingContext = new StreamingContext(sparkContext, Seconds(1)) 
val dStream: InputDStream = streamingContext.queueStream(strings) 
strings += rdd 

Das ist einfacher als es aussieht. Es verwandelt wirklich nur eine Folge von Daten in eine Warteschlange, um die DStream zu versorgen. Der größte Teil davon ist wirklich nur ein Setup, das mit den Spark-APIs funktioniert.

Dies könnte meine längste Post jemals sein, also werde ich es hier lassen. Ich hoffe, dass andere mit anderen Ideen zusammenspielen, um die Qualität unserer Analysen mit den gleichen agilen Softwareentwicklungspraktiken zu verbessern, die alle anderen Anwendungsentwicklungen verbessert haben.

Und mit Entschuldigungen für die schamlose Stecker, können Sie unseren Kurs Analytics with Apache Spark, wo wir eine Menge dieser Ideen und vieles mehr. Wir hoffen, bald eine Online-Version zu haben.