Ich benutze Spark-Streaming auf Google Cloud Dataproc für die Ausführung eines Framework (in Python geschrieben), die aus mehreren kontinuierlichen Pipelines, die jeweils einen einzigen Job auf Dataproc, die grundsätzlich lesen aus Kafka-Warteschlangen und schreibe die transformierte Ausgabe nach Bigtable. Alle Pipelines verarbeiten mehrere Gigabytes Daten pro Tag über 2 Cluster, einen mit 3 Workern und einen mit 4.Spark Streaming Daten Pipelines auf Dataproc plötzliche häufige Socket Timeouts
Dieses Spark-Streaming-Framework auf Dataproc läuft bis Anfang Mai (3. Mai) ziemlich stabil um genau zu sein): Wir haben häufige Socket-Timeout-Ausnahmen erlebt, die unsere Pipelines beenden. Es scheint nicht mit der Belastung des Clusters zu tun zu haben, da es nicht wesentlich zugenommen hat. Es geschieht auch ziemlich zufällig während des Tages und ich habe möglicherweise verwandte Codeänderungen überprüft, aber ich konnte keine finden. Darüber hinaus scheint dies nur auf dem Cluster mit 4 Arbeiterknoten zu geschehen, während die Pipelines auf dem Cluster mit 3 Knoten sehr ähnlich sind und überhaupt keine Zeitüberschreitungen erfahren. Ich habe den Cluster bereits zweimal neu erstellt, aber das Problem bleibt bestehen und betrifft alle Pipelines, die auf diesem Dataproc-Cluster ausgeführt werden. Cluster mit 3 Knoten ist ein n1-standard-4
Maschinentyp, während der problematische Cluster mit 4 Knoten ein n1-standard-8
Maschinentyp ist, außer dass ihre Konfiguration identisch ist.
Beispiel Ausgang einer Pipeline Jobausführung, wenn das Problem auftritt, und der Job beendet:
java.net.SocketTimeoutException: Accept timed out
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
at java.net.ServerSocket.implAccept(ServerSocket.java:545)
at java.net.ServerSocket.accept(ServerSocket.java:513)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645)
16/05/23 14:45:45 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error running job streaming job 1464014740000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 65, in call
r = self.func(t, *rdds)
File "/tmp/b85990ba-e152-4d5b-8977-fb38915e78c4/transformfwpythonfiles.zip/transformationsframework/StreamManager.py", line 138, in process_kafka_rdd
.foreach(lambda *args: None)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 747, in foreach
self.mapPartitions(processPartition).count() # Force evaluation
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1004, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 995, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 869, in fold
vals = self.mapPartitions(func).collect()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 772, in collect
return list(_load_from_socket(port, self._jrdd_deserializer))
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 142, in _load_from_socket
for item in serializer.load_stream(rf):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream
yield self._read_with_length(stream)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 156, in _read_with_length
length = read_int(stream)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 543, in read_int
length = stream.read(4)
File "/usr/lib/python2.7/socket.py", line 380, in read
data = self._sock.recv(left)
timeout: timed out
Der Start des Stacktrace ist in unserem StreamManager
Modul, Methode process_kafka_rdd: eine einzelne diskrete RDD Prozesse innerhalb des direkten Strom von Kafka-Nachrichten. Unsere Integration von Kafka mit Spark-Streaming basiert auf dem "direkten Ansatz", der auf http://spark.apache.org/docs/latest/streaming-kafka-integration.html
Wie viele Verbraucher und Partitionen haben Sie bis zu diesem Fehler? –