Ich habe ein Leistungsproblem mit einem Code, den ich revisioniere, wird jedes Mal eine OOM
während einer Zählung geben. Ich denke, ich fand das Problem, im Grunde nach keyBy
Transformation, wird ausgeführt aggregateByKey.
Das Problem liegt an der Tatsache, dass fast 98% der RDD-Elemente hat den gleichen Schlüssel, so AggregationByKey, erzeugen shuffle, setzen fast alle Datensätze in die gleiche Partition, unterste Zeile: nur wenige Executoren funktioniert, und hat viel Speicherdruck.Vermeiden Sie Partitionen unsymmetrisch Spark
Dies ist der Code:
val rddAnomaliesByProcess : RDD[AnomalyPO] = rddAnomalies
.keyBy(po => po.getProcessCreator.name)
.aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_)
.map {case(name,list) =>
val groupByKeys = list.groupBy(po => (po.getPodId, po.getAnomalyCode, po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID))
val lastOfGroupByKeys = groupByKeys.map{po => (po._1, List(po._2.sortBy { po => po.getProcessDate.getMillis }.last))}
lastOfGroupByKeys.flatMap(f => f._2)
}
.flatMap(f => f)
log.info("not duplicated Anomalies: " + rddAnomaliesByProcess.count)
Ich würde eine Art und Weise den Betrieb in einer mehr parallelen Art und Weise durchzuführen, so dass alle Zieher nahezu gleich arbeiten. Wie kann ich das machen?
Muss ich einen benutzerdefinierten Partitionierer verwenden?
* „Das Problem liegt darin, dass fast 98% der RDD-Elemente den gleichen Schlüssel hat“ * Gibt es einen Grund, so viele Elemente haben den gleichen Schlüssel? Ist das eine geschäftliche Anforderung? –
Eigentlich weiß ich nicht, ich habe keine funktionalen Kenntnisse, ich bin nur auf der Suche nach Performance-Engpass. Ich muss bedenken, dass sie darüber nachgedacht haben und die Partitionierung stimmt. – Giorgio
Wenn die Schlüsselgenerierung besser und idealerweise einheitlich wäre, hätten Sie wahrscheinlich kein Problem, wenn eine einzelne Partition so groß ist. –