Ich werde auf die tatsächliche Frage kommen, aber bitte mit meinem Anwendungsfall zuerst. Ich habe den folgenden Anwendungsfall, sage ich bekam rddStud
von irgendwo:Spark (Streaming) RDD foreachPartitionAsync Funktionalität/Arbeit
val rddStud: RDD[(String,Student)] = ???
Wo 'String' - einige zufällige Zeichenfolge und 'Student' - case class Student(name: String, id: String, arrivalTime: Long, classId: String)
ich Schüler bin mit nur als Ein Beispiel - die eigentliche Geschäftslogik hat eine sehr unterschiedliche komplizierte Klasse mit vielen Feldern.
Was ich erreichen will ist - Studenten mit dem gleichen id
müssen in ansteigender Reihenfolge ihrer arrivalTime
verarbeitet werden.
Für diese hier ist, was ich tue:
//Get RDD from Student.id -> Student
val studMapRdd: RDD[(String,Student)] = rddStud.map(tuple => {
val student = tuple._2
(student.id,student)
})
//Make sure all students with same student.id are in same partition.
//I can potentially use groupByKey/combineByKey.... etc, but I don't see much performance difference
val studPartitionRdd: RDD[(String,Student)] = studMapRdd.partitionBy(new HashPartitioner(studMapRdd.getNumPartitions))
val studSortedRdd: RDD[(String,Student)] = studPartitionRdd.sortBy({ case(studentId,student} =>
student.arrivalTime
}, ascending = true)
studSortedRdd.foreachPartition(itr =>{
itr.foreach{ case (studentId, student) => {
val studentName = student.name
val time = student.arrivalTime
//send for additional processing studentName and time combination
}
})
Meine Fragen sind:
- Wenn ich foreachPartitionAsync verwenden - wird es alle Partitionen parallel verarbeiten, aber die Elemente in jeder Partition, um? Wenn nicht, was ist der Unterschied zwischen foreachPartitionAsync und foreachAsync?
- Scheint der Ansatz der Sortierung nach Repartitionierung sinnvoll? Oder wenn Sie irgendwelche Optimierungen in der obigen Logik vorschlagen könnten?
Sehr geschätzt.
Warum Hash-Partition nur im nächsten Schritt zu sortieren. Es macht überhaupt keinen Sinn. 'foreachPartition' verwendet genau den gleichen Mechanismus wie 'foreach' mit partitionsweiser Parallelität. – zero323
Say RDD hat 3 Partitionen, wo Ereignisse von Schüler mit ID = 1 sind verteilt über die 3. Jetzt Hashpartitioning wird sicherstellen, dass alle Ereignisse von ID = 1 wird in der gleichen Partition sagen p1, aber es wird nicht sichergestellt, dass sie nach AnkunftTime sortiert sind - So möchte ich sie aufgrund einiger Geschäftsanforderungen verarbeiten. Ist irgendetwas falsch in meinem Verständnis? –