2017-01-03 2 views
3

Ich bin der Prozess unten zu stimmen versuchen, weil ich eine sehr Java heap space error.Tuning Spark-Job

Mit Blick auf die Spark-UI mit bin, gibt es eine cogroup, die in einer sehr seltsamen Art und Weise verhält. Vor dieser Phase scheint alles sehr ausgeglichen zu sein (im Moment habe ich die Anzahl der Partitionen festgeschrieben, 48). Innerhalb der Methode loadParentMPoint gibt es die cogroup trasformation und im Grunde, wenn ich die nächste Zählung ausführen werde, wird die cogroup berechnet und im Grunde sind 48 Aufgaben geplant, aber 47 von ihnen enden sofort (scheint nichts zu verarbeiten), außer einer, die shuffling zu tun beginnen, bis es Heap-Speicher füllen und Ausnahme ausgelöst wird.

Ich habe einige Male den Prozess mit dem gleichen Datensatz gestartet und das Ende ist immer das gleiche. Everytime Es funktioniert nur ein Executor., Während zuvor ist gut ausbalanciert.

Warum habe ich dieses Verhalten?Vielleicht vermisse ich etwas? Ich versuchte repartition Daten vor Cogroup, weil ich vermutete, dass es unausgewogen war, aber es funktioniert nicht, das gleiche, wenn ich versuchte, partitionBy zu verwenden.

Dies ist der Code Auszug:

class BillingOrderGeneratorProcess extends SparkApplicationErrorHandler { 

    implicit val ctx = sc 
    val log = LoggerFactory.getLogger(classOf[BillingOrderGeneratorProcess]) 
    val ipc = new Handler[ConsumptionComputationBigDataIPC] 
    val billingOrderDao = new Handler[BillingOrderDao] 
    val mPointDao = new Handler[MeasurementPointDAO] 
    val billingOrderBDao = new Handler[BillingOrderBDAO] 
    val ccmDiscardBdao = new Handler[CCMDiscardBDAO] 
    val ccmService = new Handler[ConsumptionComputationBillingService] 
    val registry = new Handler[IncrementalRegistryTableData] 
    val podTimeZoneHelper = new Handler[PodDateTimeUtils] 
    val billingPodStatusDao = new Handler[BillingPodStatusBDAO] 
    val config = new Handler[PropertyManager] 
    val paramFacade = new Handler[ConsumptionParameterFacade] 
    val consumptionMethods = new Handler[ConsumptionMethods] 
    val partitions = config.get.defaultPartitions() 
    val appName = sc.appName 
    val appId = sc.applicationId 
    val now = new DateTime 

    val extracted = ctx.accumulator(0l, "Extracted from planning") 
    val generated = ctx.accumulator(0l, "Billing orders generated") 
    val discarded = ctx.accumulator(0l, "Billing orders discarded") 

    // initialize staging 
    val staging = new TxStagingTable(config.get().billingOrderGeneratorStagingArea()) 
    staging.prepareReading 

    val rddExtractedFromPlanning = staging 
     .read[ExtractedPO]() 
     .repartition(48) 
     .setName("rddExtractedFromPlanning") 
     .cache 

    val rddExtracted = rddExtractedFromPlanning 
     .filter { x => 
     extracted += 1 
     (x.getExtracted == EExtractedType.EXTRACTED || 
     x.getExtracted == EExtractedType.EXTRACTED_BY_USER || 
     x.getExtracted == EExtractedType.EXTRACTED_BY_TDC) 
     } 
     .map { x => 
     log.info("1:extracted>{}", x) 
     val bo = MapperUtil.mapExtractedPOtoBO(x) 
     bo 
     } 

    val podWithExtractedAndLastBillingOrderPO = rddExtracted.map { e => 
     val billOrdr = CCMIDGenerator.newIdentifier(CCMIDGenerator.Context.GENERATOR, e.getPod, e.getCycle(), e.getExtractionDate()) 
     val last = billingOrderDao.get.getLastByPodExcludedActual(e.getPod, billOrdr) 
     log.info("2:last Billing order>{}", last); 
     (e.getPod, e, last) 
    } 
     .setName("podWithExtractedAndLastBillingOrderPO") 
     .cache() 

    val podWithExtractedAndLastBillingOrder = podWithExtractedAndLastBillingOrderPO.map(e => (e._1, (e._2, MapperUtil.mapBillingOrderPOtoBO(e._3)))) 

    val rddRegistryFactoryKeys = podWithExtractedAndLastBillingOrderPO 
     .map(e => (e._1,1)) 
     .reduceByKey(_+_) 
     .keys 

    val rddRegistryFactory = registry.get().createIncrementalRegistryFromPods(rddRegistryFactoryKeys, List()) 

    val rddExtractedWithMPoint = ConsumptionComputationUtil 
     .groupPodWithMPoint(podWithExtractedAndLastBillingOrder, rddRegistryFactory) 
     .filter{ e => 
     val mPoint = e._3 
     val condition = mPoint != null 
     condition match { 
      case false => log.error("MPoint is NULL for POD -> " + e._1) 
      case true => 
     } 
     condition 
     } 
     .setName("rddExtractedWithMPoint") 
     .cache 

    rddExtractedWithMPoint.count 

    val rddExtractedWithMPointWithParent = ConsumptionComputationUtil 
     .groupWithParent(rddExtractedWithMPoint) 
     .map{ 
     case (pod, extracted, measurementPoint, billOrder, parentMpointId, factory) => 
      if (!parentMpointId.isEmpty) { 
      val mPointParent = mPointDao.get.findByMPoint(parentMpointId.get) 
      log.info("2.1:parentMpoin>Mpoint=" + parentMpointId + " parent for pod -> " + pod) 
      (pod, extracted, measurementPoint, billOrder, mPointParent.getPod, factory) 
      } else { 
      log.info("2.1:parentMpoin>Mpoint=null parent for pod -> " + pod) 
      (pod, extracted, measurementPoint, billOrder, null, factory) 
      } 
     } 
     .setName("rddExtractedWithMPointWithParent") 
     .cache() 

    rddExtractedWithMPointWithParent.count 

    val rddRegistryFactoryParentKeys = rddExtractedWithMPointWithParent 
     .filter(e => Option(e._5).isDefined) 
     .map(e => (e._5,1)) 
     .reduceByKey(_+_) 
     .keys 

    rddRegistryFactoryParentKeys.count 

    val rddRegistryFactoryParent = registry.get().createIncrementalRegistryFromPods(rddRegistryFactoryParentKeys, List()) 

    rddRegistryFactoryParent.count 

    val imprb = new Handler[IncrementalMeasurementPointRegistryBuilder] 

    val rddNew = rddExtractedWithMPointWithParent.map({ 
     case (pod, extracted, measurementPoint, billingOrder, parentPod, factory) => 
     (parentPod, (pod, extracted, measurementPoint, billingOrder, factory)) 
    }) 
    rddNew.count 

    val p = rddNew.cogroup(rddRegistryFactoryParent) 
    p.count 

    val rddExtractedWithMPointWithMpointParent = p.filter{ case (pod, (inputs, mpFactories)) => inputs.nonEmpty } 
    .flatMap{ case (pod, (inputs, mpFactories)) => 
     val factory = mpFactories.headOption //eventually one or none factory 
     val results = inputs.map{e => 
      val measurementPointTupla = factory.flatMap{f => 
      Option(imprb.get.buildSparkDecorator(new MeasurementPointFactoryAdapter(f)).getMeasurementPointByDate(e._2.getRequestDate), f) 
     } 
      val tupla = measurementPointTupla.getOrElse(null) 
      val toBeBilled = if(tupla!=null && tupla._1!=null) false else true 
      val m = if(tupla!=null && tupla._1!=null) tupla._1 else null 
      val f = if(tupla!=null && tupla._2!=null) tupla._2 else null 
      (e._1, e._2, e._3, e._4, m, toBeBilled, e._5 , f) 
     } 
     results 
    } 
    .setName("rddExtractedWithMPointWithMpointParent") 
    .cache() 

    rddExtractedWithMPointWithMpointParent.foreach({ e => 
     log.info("2.2:parentMpoint>MpointComplete=" + e._5 + " parent for pod -> " + e._1) 
    }) 
} 

Dies sind die Stufen für die beiden in die cogroup Operation beteiligt RDDs, rddNew:

enter image description here

rddRegistryFactory:

enter image description here

und dies ist die Stufe des cogroup:

enter image description here

dies ist die Lagersituation:

enter image description here

dies ist die Ausführenden tabs :

enter image description here

N. B. Ich habe die Count-Aktion nur zum Debuggen hinzugefügt.

UPDATE:

  • ich entfernt Cache versucht adn den Prozess erneut starten, jetzt jeder Vollstrecker hat rund 100 Millionen für die Speicherung von Daten verwendet werden, aber das Verhalten ist das gleiche: Shuffle Lese geschieht nur für eine Vollstrecker.
  • Ich habe auch versucht, eine Join-Operation zwischen den gleichen zwei RDDs vor der Cogroup zu machen, nur um zu wissen, ob das Problem nur mit der Cogroup zusammenhängt oder auf alle großen Transformationen und auch auf den Join ausgedehnt wird. Das Verhalten ist genau das gleiche.
+0

scheint, als würde Ihr 'Cache' Speicherdruck erzeugen. Warum Cache ist hier erforderlich? hast du es mit cache ausprobiert? –

+0

Ich habe zwei weitere Bilder hinzugefügt, die die Situation von Storage und Executors darstellen. Vielleicht gibt es ja ein wenig Heap-Druck, aber das Verhalten ist seltsam, kann dies nur durch Caching-Missbrauch verursacht werden? – Giorgio

+0

gibt es verschiedene Faktoren nicht eine, pls Cache zu entfernen und zu sehen –

Antwort

2

Ich löste es, das Problem wurde über partitioning bezogen. Grundlegend Daten in die rdd Aufruf cogroup Operation hatte alle Schlüssel auf dem gleichen Wert, so wenn cogroup passiert, Spark versuchte Hash-Partitionierung beide RDDs bringen Schlüssel der beiden RDD auf dem gleichen Executor, um sie zu cogroup.

+0

also wie hast du es gelöst? –

+0

Eigentlich war das Problem nicht die Cogroup selbst, aber im Moment der Ausführung der Cogroup waren die Daten komplett unpartitioniert, tatsächlich entdeckte ich einen Bug, der alle Daten in die gleiche Partition brachte, also fixierte die Daten einfach in die Cogroup. – Giorgio

+0

@Giorgio, Hi Ich bin mit Speicherproblemen konfrontiert, während ich 'cogroup' benutze, alle Vorschläge und Tipps wären großartig-voll, da ich neu bin, um zu funken und nicht weiß, wie ich es lösen kann.Hier finden Sie die Frage [https://stackoverflow.com/questions/47180307/how-to-use-cogroup-for-large-datasets] – Vignesh

2
  • Ich glaube stark, diese Java heap space error wegen gecached RDDs ist, die basierend auf dem letzten Screenshot wie nicht notwendig erscheint, die Registerkarte Speicher ist.

enter image description here

Je nachdem, wie oft der Datensatz zugegriffen wird und die Menge der dabei beteiligten Arbeit so kann erneut Berechnung schneller sein als die durch den erhöhten Speicherdruck gezahlten Preis.

Es sollte selbstverständlich sein, dass, wenn Sie nur ein Dataset einmal lesen, es keinen Sinn hat, es zwischenzuspeichern, wird es tatsächlich Ihre Arbeit langsamer machen.

  • Zum Zählen für Debugzwecke können Sie countApprox() anstelle von count verwenden. Sobald das Testen abgeschlossen ist, können Sie es für die reale Verwendung Ihres Jobs entfernen.
  • +0

    Ich habe es ausgeführt, aber das Verhalten ist das gleiche, macht Shuffle lesen nur von einem Executor. Jetzt haben Executoren Speicher frei (nur ungefähr 100M benutzt jedes). Irgendwelche anderen Vorschläge? – Giorgio

    +0

    ok Heap Space Fehler kommt wieder? –

    +0

    Ja, kommt genau das gleiche wie vorher, bitte schauen Sie sich die Post an, ich habe sie mit den nächsten Tests aktualisiert, die ich gemacht habe – Giorgio