2017-11-27 17 views
0

Wir versuchen, Deltas aus einer Hive-Tabelle zu Kafka zu veröffentlichen. Die fragliche Tabelle ist eine einzelne Partition, einzelne Blockdatei von 244 MB. Unser Cluster ist für eine Blockgröße von 256 MByte konfiguriert, sodass wir in diesem Fall für eine einzelne Datei nur die maximale Größe erreichen.Spark Dataframe leftanti Beitreten fehlgeschlagen

Jedes Mal, wenn diese Tabelle aktualisiert wird, wird eine Kopie archiviert, und dann wird der Delta-Prozess ausgeführt.

In der folgenden Funktion haben wir die verschiedenen Joins isoliert und bestätigt, dass der innere Join akzeptabel ist (etwa 3 Minuten), aber die beiden Anti-Join-Datenrahmen werden nicht vervollständigt - wir werfen immer mehr Ressourcen auf den Spark-Job, aber sehen weiterhin die Fehler unten.

Gibt es eine praktische Grenze für Datenrahmengrößen für diese Art von Join?

private class DeltaColumnPublisher(spark: SparkSession, sink: KafkaSink, source: RegisteredDataset) 
    extends BasePublisher(spark, sink, source) with Serializable { 

    val deltaColumn = "hadoop_update_ts" // TODO: move to the dataset object 

    def publishDeltaRun(dataLocation: String, archiveLocation: String): (Long, Long) = { 

     val current = spark.read.parquet(dataLocation) 
     val previous = spark.read.parquet(archiveLocation) 

     val inserts = current.join(previous, keys, "leftanti") 
     val updates = current.join(previous, keys).where(current.col(deltaColumn) =!= previous.col(deltaColumn)) 
     val deletes = previous.join(current, keys, "leftanti") 

     val upsertCounter = spark.sparkContext.longAccumulator("upserts") 
     val deleteCounter = spark.sparkContext.longAccumulator("deletes") 

     logInfo("sending inserts to kafka") 
     sink.sendDeltasToKafka(inserts, "U", upsertCounter) 

     logInfo("sending updates to kafka") 
     sink.sendDeltasToKafka(updates, "U", upsertCounter) 

     logInfo("sending deletes to kafka") 
     sink.sendDeltasToKafka(deletes, "D", deleteCounter) 

     (upsertCounter.value, deleteCounter.value) 
    } 
    } 

Die Fehler, die wir scheinen sind zu sehen, um anzuzeigen, dass der Fahrer Kontakt mit den Vollstreckern verliert. Wir haben den Executor-Speicher auf bis zu 24 G erhöht und das Netzwerk-Timeout auf 900 und das Heartbeat-Intervall auf 120 s erhöht.

17/11/27 20:36:18 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@596e3aa6,BlockManagerId(1, server, 46292, None))] in 2 attempts 
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.executor.heartbeatInterval 
    at ... 
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] 
    at ... 

Später in den Protokollen:

17/11/27 20:42:37 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@25d1bd5f,BlockManagerId(1, server, 46292, None))] in 3 attempts 
org.apache.spark.SparkException: Exception thrown in awaitResult 
    at ... 
Caused by: java.lang.RuntimeException: org.apache.spark.SparkException: Could not find HeartbeatReceiver. 

Die Konfigurationsschalter wir (ohne Erfolg) manipuliert worden sind --executor-memory 24G --conf spark.network.timeout=900s --conf spark.executor.heartbeatInterval=120s

Antwort

0

Die Option, die ich zu prüfen, gescheitert ist mein Fahrer Ressourcen zu erhöhen. Ich fügte --driver-memory 4G und --driver-cores 2 hinzu und sah meine Arbeit in ungefähr 9 Minuten abgeschlossen.

Es scheint, dass ein innerer Join dieser beiden Dateien (oder die Verwendung der integrierten Methode except()) den Executoren Speicherdruck auferlegt. Das Partitionieren auf einer der Schlüsselspalten scheint den Speicherdruck zu verringern, erhöht jedoch die Gesamtzeit, da mehr Mischvorgänge erforderlich sind.

Um den linken Anti-Join zwischen diesen beiden Dateien zu erreichen, müssen wir mehr Treiberressourcen haben. Ich habe das nicht erwartet.