2017-01-16 4 views
0

Ich habe ein Leistungsproblem mit einem Code, den ich revisioniere, wird jedes Mal eine OOM während einer Zählung geben. Ich denke, ich fand das Problem, im Grunde nach keyBy Transformation, wird ausgeführt aggregateByKey. Das Problem liegt an der Tatsache, dass fast 98% der RDD-Elemente hat den gleichen Schlüssel, so AggregationByKey, erzeugen shuffle, setzen fast alle Datensätze in die gleiche Partition, unterste Zeile: nur wenige Executoren funktioniert, und hat viel Speicherdruck.Vermeiden Sie Partitionen unsymmetrisch Spark

Dies ist der Code:

val rddAnomaliesByProcess : RDD[AnomalyPO] = rddAnomalies 
    .keyBy(po => po.getProcessCreator.name) 
    .aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_) 
    .map {case(name,list) => 
     val groupByKeys = list.groupBy(po => (po.getPodId, po.getAnomalyCode, po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID)) 
     val lastOfGroupByKeys = groupByKeys.map{po => (po._1, List(po._2.sortBy { po => po.getProcessDate.getMillis }.last))} 
     lastOfGroupByKeys.flatMap(f => f._2) 
    } 
    .flatMap(f => f) 

log.info("not duplicated Anomalies: " + rddAnomaliesByProcess.count) 

Ich würde eine Art und Weise den Betrieb in einer mehr parallelen Art und Weise durchzuführen, so dass alle Zieher nahezu gleich arbeiten. Wie kann ich das machen?

Muss ich einen benutzerdefinierten Partitionierer verwenden?

+0

* „Das Problem liegt darin, dass fast 98% der RDD-Elemente den gleichen Schlüssel hat“ * Gibt es einen Grund, so viele Elemente haben den gleichen Schlüssel? Ist das eine geschäftliche Anforderung? –

+1

Eigentlich weiß ich nicht, ich habe keine funktionalen Kenntnisse, ich bin nur auf der Suche nach Performance-Engpass. Ich muss bedenken, dass sie darüber nachgedacht haben und die Partitionierung stimmt. – Giorgio

+0

Wenn die Schlüsselgenerierung besser und idealerweise einheitlich wäre, hätten Sie wahrscheinlich kein Problem, wenn eine einzelne Partition so groß ist. –

Antwort

1

Wenn Ihre Beobachtung ist richtig und

98% der RDD-Elemente hat die gleichen Schlüssel

dann Wechsel Partitionierungs werden Ihnen nicht helfen. Durch die Definition des Partitioners müssen 98% der Daten von einem einzelnen Executor verarbeitet werden.

Glücklicherweise ist schlechter Code hier wohl das größere Problem als der Skew. Überspringen:

.aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_) 

, die nur eine Volksmagie ist es wie die ganze Pipeline aussieht, kann als einfaches reuceByKey neu geschrieben werden. Pseudo-Code:

  • Kombinieren name und lokale Schlüssel in einen einzigen Schlüssel:

    def key(po: AnomalyPO) = (
        // "major" key 
        po.getProcessCreator.name, 
        // "minor" key 
        po.getPodId, po.getAnomalyCode, 
        po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID 
    ) 
    

    Schlüssel mit Namen, Datum und zusätzliche Felder sollten viel höher Mächtigkeit als die allein Namen haben.

  • Karte zu Paaren und reduzieren durch Schlüssel:

    rddAnomalies 
        .map(po => (key(po), po)) 
        .reduceByKey((x, y) => 
        if(x.getProcessDate.getMillis > y.getProcessDate.getMillis) x else y 
    ) 
    
Verwandte Themen