2016-03-09 15 views
5

Ich erstelle eine RDD aus einer Liste von URLs und versuche dann, Daten mit einem asynchronen http-Aufruf abzurufen. Ich brauche alle Ergebnisse, bevor ich andere Berechnungen anstelle. Im Idealfall muss ich die HTTP-Aufrufe auf verschiedenen Knoten für Skalierungsüberlegungen vornehmen.Spark-Job mit Async-HTTP-Aufruf

Ich habe so etwas wie dies:

//init spark 
val sparkContext = new SparkContext(conf) 
val datas = Seq[String]("url1", "url2") 

//create rdd 
val rdd = sparkContext.parallelize[String](datas) 

//httpCall return Future[String] 
val requests = rdd.map((url: String) => httpCall(url)) 

//await all results (Future.sequence may be better) 
val responses = requests.map(r => Await.result(r, 10.seconds)) 

//print responses 
response.collect().foreach((s: String) => println(s)) 

//stop spark 
sparkContext.stop() 

Diese Arbeit, aber Spark-Job beenden nie!

Ich frage mich, was sind die besten Praktiken für den Umgang mit Zukunft mit Spark (oder Future [RDD]).

Ich denke, dass dieser Anwendungsfall ziemlich häufig aussieht, aber noch keine Antwort gefunden hat.

Mit freundlichen Grüßen

Antwort

4

dieser Anwendungsfall sieht ziemlich häufig

Nicht wirklich, weil es einfach nicht, wie Sie (wahrscheinlich) erwarten nicht funktioniert. Da jede Task auf der Standard-Scala Iterators ausgeführt wird, werden diese Operationen zusammen gequetscht. Es bedeutet, dass alle Operationen in der Praxis blockiert werden. Angenommen, Sie drei URLs haben [ „x“, „y“, „z“] Sie Code in einer folgenden Reihenfolge ausgeführt werden:

Await.result(httpCall("x", 10.seconds)) 
Await.result(httpCall("y", 10.seconds)) 
Await.result(httpCall("z", 10.seconds)) 

Sie können ganz einfach das gleiche Verhalten lokal reproduzieren. Wenn Sie Ihren Code ausführen möchten asynchron sollten Sie damit umgehen explizit mit mapPartitions:

rdd.mapPartitions(iter => { 
    ??? // Submit requests 
    ??? // Wait until all requests completed and return Iterator of results 
}) 

aber das ist relativ schwierig. Es gibt keine Garantie, dass alle Daten für eine bestimmte Partition in den Speicher passen, daher benötigen Sie wahrscheinlich auch einen Batch-Mechanismus.

All dies gesagt, dass ich das beschriebene Problem nicht reproduzieren konnte, kann ein Konfigurationsproblem oder ein Problem mit httpCall selbst sein.

Auf einer Seitennotiz, die eine einzige Zeitüberschreitung ermöglicht, um ganze Aufgabe zu töten, sieht nicht wie eine gute Idee aus.

1

Das wird nicht funktionieren.

Sie können nicht erwarten, dass die Anforderungsobjekte verteilt und Antworten von anderen Knoten über einen Cluster gesammelt werden. Wenn Sie das tun, wird der Funke für die Zukunft nie enden. Die Futures werden in diesem Fall niemals funktionieren.

Wenn Ihre map() sync (http) -Aufrufe durchführt, sammeln Sie bitte Antworten innerhalb des gleichen Aktions-/Umwandlungsaufrufs und unterziehen die Ergebnisse (Antworten) weiteren Karten-/Reduzierungs-/anderen Aufrufen.

In Ihrem Fall, bitte umschreiben Logik sammeln Sie die Antworten für jeden Anruf synchron und entfernen Sie die Vorstellung von Futures dann sollte alles in Ordnung sein.

+0

Problem ist, es sollte keine Datenbewegung zwischen "Anfragen" und "Antworten" geben, so dass beide Transformationen in der gleichen Phase ausgeführt werden sollten, daher die gleichen Executoren und Kontexte. – zero323

1

Ich habe es endlich mit Scalaj-http anstelle von Dispatch gemacht. Anruf ist synchron, aber das passt zu meinem Anwendungsfall.

Ich denke, der Spark-Job beendet nie Dispatch, weil die HTTP-Verbindung nicht ordnungsgemäß geschlossen wurde.

Mit freundlichen Grüßen

1

Ich konnte nicht einen einfachen Weg finden, dies zu erreichen. Aber nach mehreren Iterationen von Wiederholungen ist dies das, was ich getan habe, und es arbeitet für eine riesige Liste von Abfragen. Im Grunde genommen haben wir damit eine Batch-Operation für eine große Abfrage in mehrere Unterabfragen durchgeführt.

// Break down your huge workload into smaller chunks, in this case huge query string is broken 
// down to a small set of subqueries 
// Here if needed to optimize further down, you can provide an optimal partition when parallelizing 
val queries = sqlContext.sparkContext.parallelize[String](subQueryList.toSeq) 

// Then map each one those to a Spark Task, in this case its a Future that returns a string 
val tasks: RDD[Future[String]] = queries.map(query => { 
    val task = makeHttpCall(query) // Method returns http call response as a Future[String] 
    task.recover { 
     case ex => logger.error("recover: " + ex.printStackTrace()) } 
    task onFailure { 
     case t => logger.error("execution failed: " + t.getMessage) } 
    task 
}) 

// Note:: Http call is still not invoked, you are including this as part of the lineage 

// Then in each partition you combine all Futures (means there could be several tasks in each partition) and sequence it 
// And Await for the result, in this way you making it to block untill all the future in that sequence is resolved 

val contentRdd = tasks.mapPartitions[String] { f: Iterator[Future[String]] => 
    val searchFuture: Future[Iterator[String]] = Future sequence f 
    Await.result(searchFuture, threadWaitTime.seconds) 
} 

// Note: At this point, you can do any transformations on this rdd and it will be appended to the lineage. 
// When you perform any action on that Rdd, then at that point, 
// those mapPartition process will be evaluated to find the tasks and the subqueries to perform a full parallel http requests and 
// collect those data in a single rdd. 

Wenn Sie möchten nicht auf den Inhalt jeder Transformation durchzuführen, wie die Antwort-Payload Parsen usw. Dann könnten Sie foreachPartition anstelle von mapPartitions an alle diejenigen führen http sofort aufruft.