2016-06-28 7 views
2

Ich werde auf die tatsächliche Frage kommen, aber bitte mit meinem Anwendungsfall zuerst. Ich habe den folgenden Anwendungsfall, sage ich bekam rddStud von irgendwo:Spark (Streaming) RDD foreachPartitionAsync Funktionalität/Arbeit

val rddStud: RDD[(String,Student)] = ???

Wo 'String' - einige zufällige Zeichenfolge und 'Student' - case class Student(name: String, id: String, arrivalTime: Long, classId: String)

ich Schüler bin mit nur als Ein Beispiel - die eigentliche Geschäftslogik hat eine sehr unterschiedliche komplizierte Klasse mit vielen Feldern.

Was ich erreichen will ist - Studenten mit dem gleichen id müssen in ansteigender Reihenfolge ihrer arrivalTime verarbeitet werden.

Für diese hier ist, was ich tue:

//Get RDD from Student.id -> Student 
val studMapRdd: RDD[(String,Student)] = rddStud.map(tuple => { 
    val student = tuple._2 
    (student.id,student) 
}) 

//Make sure all students with same student.id are in same partition. 
//I can potentially use groupByKey/combineByKey.... etc, but I don't see much performance difference  
val studPartitionRdd: RDD[(String,Student)] = studMapRdd.partitionBy(new HashPartitioner(studMapRdd.getNumPartitions)) 

val studSortedRdd: RDD[(String,Student)] = studPartitionRdd.sortBy({ case(studentId,student} => 
    student.arrivalTime 
}, ascending = true) 

studSortedRdd.foreachPartition(itr =>{ 
    itr.foreach{ case (studentId, student) => { 
     val studentName = student.name 
     val time = student.arrivalTime 
     //send for additional processing studentName and time combination 
    } 
}) 

Meine Fragen sind:

  1. Wenn ich foreachPartitionAsync verwenden - wird es alle Partitionen parallel verarbeiten, aber die Elemente in jeder Partition, um? Wenn nicht, was ist der Unterschied zwischen foreachPartitionAsync und foreachAsync?
  2. Scheint der Ansatz der Sortierung nach Repartitionierung sinnvoll? Oder wenn Sie irgendwelche Optimierungen in der obigen Logik vorschlagen könnten?

Sehr geschätzt.

+0

Warum Hash-Partition nur im nächsten Schritt zu sortieren. Es macht überhaupt keinen Sinn. 'foreachPartition' verwendet genau den gleichen Mechanismus wie 'foreach' mit partitionsweiser Parallelität. – zero323

+0

Say RDD hat 3 Partitionen, wo Ereignisse von Schüler mit ID = 1 sind verteilt über die 3. Jetzt Hashpartitioning wird sicherstellen, dass alle Ereignisse von ID = 1 wird in der gleichen Partition sagen p1, aber es wird nicht sichergestellt, dass sie nach AnkunftTime sortiert sind - So möchte ich sie aufgrund einiger Geschäftsanforderungen verarbeiten. Ist irgendetwas falsch in meinem Verständnis? –

Antwort

2

Keine Wahl zwischen synchroner (foreach(Partition)) und asynchroner (foreach(Partition)Async) Einreichung oder Wahl zwischen elementweisem und partitionsweisem Zugriff beeinflusst die Ausführungsreihenfolge. Im ersten Fall ist die wichtige Unterscheidung zwischen Blockierung und nicht blockierender Ausführung, im zweiten Fall die Art und Weise, in der Daten offengelegt werden, aber der tatsächliche Ausführungsmechanismus mehr oder weniger der gleiche ist.

Sortierung nach Neupartitionierung ist kein gültiger Ansatz. sortBy wird den vollständigen Shuffle auslösen und die vorhandene Datenverteilung nicht beibehalten. Wenn Sie das bestehende Datenlayout erhalten möchten, können Sie entweder innerhalb der folgenden mapPartitions Phase sortieren oder besser noch repartitionAndSortWithinPartitions verwenden.

class StudentIdPartitioner[V](n: Int) extends org.apache.spark.Partitioner { 
    def numPartitions: Int = n 
    def getPartition(key: Any): Int = { 
    val x = key.asInstanceOf[Student].id.hashCode % n 
    x + (if (x < 0) n else 0) 
    } 
} 

val rddStud: RDD[Student] = ??? 
val partitioner = new StudentIdPartitioner(rddStud.getNumPartitions) 
val arrTimeOrdering = scala.math.Ordering.by[Student, Long](_.arrivalTime) 


{ 
    implicit val ord = arrTimeOrdering 
    rddStud.map((_, null)).repartitionAndSortWithinPartitions(partitioner) 
} 
+0

SO schlägt vor, mehr Diskussion verschoben werden hier https://chat.stackoverflow.com/rooms/115889/discussion-between-kp-and-zero323 –

+0

Danke für die Klärung bearbeiten ... auch, wahrscheinlich eine eigene Frage wert, wir schauen wir uns eine foreach-aktion an, die nur auf einem prozess läuft, sie aktualisiert ein akkumulierbares mit einem angepassten "add", wir müssen es auf allen prozessoren ausführen, ist das was foreachAsync tut? Gibt es einen Grund, foreachAsyncPartition zu verwenden? Lassen Sie mich wissen, wenn separate Frage wünschenswert ist danke @ Zero323 – JimLohse

+0

@JimLohse Natürlich, danke für das Hinzeigen. Ich bin mir nicht sicher, ob ich Ihren Anwendungsfall verstehe. Was meinst du mit _einem Prozess_? Einzelner Executor? – zero323