Wir versuchen, Deltas aus einer Hive-Tabelle zu Kafka zu veröffentlichen. Die fragliche Tabelle ist eine einzelne Partition, einzelne Blockdatei von 244 MB. Unser Cluster ist für eine Blockgröße von 256 MByte konfiguriert, sodass wir in diesem Fall für eine einzelne Datei nur die maximale Größe erreichen.Spark Dataframe leftanti Beitreten fehlgeschlagen
Jedes Mal, wenn diese Tabelle aktualisiert wird, wird eine Kopie archiviert, und dann wird der Delta-Prozess ausgeführt.
In der folgenden Funktion haben wir die verschiedenen Joins isoliert und bestätigt, dass der innere Join akzeptabel ist (etwa 3 Minuten), aber die beiden Anti-Join-Datenrahmen werden nicht vervollständigt - wir werfen immer mehr Ressourcen auf den Spark-Job, aber sehen weiterhin die Fehler unten.
Gibt es eine praktische Grenze für Datenrahmengrößen für diese Art von Join?
private class DeltaColumnPublisher(spark: SparkSession, sink: KafkaSink, source: RegisteredDataset)
extends BasePublisher(spark, sink, source) with Serializable {
val deltaColumn = "hadoop_update_ts" // TODO: move to the dataset object
def publishDeltaRun(dataLocation: String, archiveLocation: String): (Long, Long) = {
val current = spark.read.parquet(dataLocation)
val previous = spark.read.parquet(archiveLocation)
val inserts = current.join(previous, keys, "leftanti")
val updates = current.join(previous, keys).where(current.col(deltaColumn) =!= previous.col(deltaColumn))
val deletes = previous.join(current, keys, "leftanti")
val upsertCounter = spark.sparkContext.longAccumulator("upserts")
val deleteCounter = spark.sparkContext.longAccumulator("deletes")
logInfo("sending inserts to kafka")
sink.sendDeltasToKafka(inserts, "U", upsertCounter)
logInfo("sending updates to kafka")
sink.sendDeltasToKafka(updates, "U", upsertCounter)
logInfo("sending deletes to kafka")
sink.sendDeltasToKafka(deletes, "D", deleteCounter)
(upsertCounter.value, deleteCounter.value)
}
}
Die Fehler, die wir scheinen sind zu sehen, um anzuzeigen, dass der Fahrer Kontakt mit den Vollstreckern verliert. Wir haben den Executor-Speicher auf bis zu 24 G erhöht und das Netzwerk-Timeout auf 900 und das Heartbeat-Intervall auf 120 s erhöht.
17/11/27 20:36:18 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@596e3aa6,BlockManagerId(1, server, 46292, None))] in 2 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
at ...
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
at ...
Später in den Protokollen:
17/11/27 20:42:37 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@25d1bd5f,BlockManagerId(1, server, 46292, None))] in 3 attempts
org.apache.spark.SparkException: Exception thrown in awaitResult
at ...
Caused by: java.lang.RuntimeException: org.apache.spark.SparkException: Could not find HeartbeatReceiver.
Die Konfigurationsschalter wir (ohne Erfolg) manipuliert worden sind --executor-memory 24G --conf spark.network.timeout=900s --conf spark.executor.heartbeatInterval=120s