2016-05-24 5 views
3

Ich benutze Spark-Streaming auf Google Cloud Dataproc für die Ausführung eines Framework (in Python geschrieben), die aus mehreren kontinuierlichen Pipelines, die jeweils einen einzigen Job auf Dataproc, die grundsätzlich lesen aus Kafka-Warteschlangen und schreibe die transformierte Ausgabe nach Bigtable. Alle Pipelines verarbeiten mehrere Gigabytes Daten pro Tag über 2 Cluster, einen mit 3 Workern und einen mit 4.Spark Streaming Daten Pipelines auf Dataproc plötzliche häufige Socket Timeouts

Dieses Spark-Streaming-Framework auf Dataproc läuft bis Anfang Mai (3. Mai) ziemlich stabil um genau zu sein): Wir haben häufige Socket-Timeout-Ausnahmen erlebt, die unsere Pipelines beenden. Es scheint nicht mit der Belastung des Clusters zu tun zu haben, da es nicht wesentlich zugenommen hat. Es geschieht auch ziemlich zufällig während des Tages und ich habe möglicherweise verwandte Codeänderungen überprüft, aber ich konnte keine finden. Darüber hinaus scheint dies nur auf dem Cluster mit 4 Arbeiterknoten zu geschehen, während die Pipelines auf dem Cluster mit 3 Knoten sehr ähnlich sind und überhaupt keine Zeitüberschreitungen erfahren. Ich habe den Cluster bereits zweimal neu erstellt, aber das Problem bleibt bestehen und betrifft alle Pipelines, die auf diesem Dataproc-Cluster ausgeführt werden. Cluster mit 3 Knoten ist ein n1-standard-4 Maschinentyp, während der problematische Cluster mit 4 Knoten ein n1-standard-8 Maschinentyp ist, außer dass ihre Konfiguration identisch ist.

Beispiel Ausgang einer Pipeline Jobausführung, wenn das Problem auftritt, und der Job beendet:

java.net.SocketTimeoutException: Accept timed out 
    at java.net.PlainSocketImpl.socketAccept(Native Method) 
    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409) 
    at java.net.ServerSocket.implAccept(ServerSocket.java:545) 
    at java.net.ServerSocket.accept(ServerSocket.java:513) 
    at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645) 
16/05/23 14:45:45 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error running job streaming job 1464014740000 ms.0 
org.apache.spark.SparkException: An exception was raised by Python: 
Traceback (most recent call last): 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 65, in call 
    r = self.func(t, *rdds) 
    File "/tmp/b85990ba-e152-4d5b-8977-fb38915e78c4/transformfwpythonfiles.zip/transformationsframework/StreamManager.py", line 138, in process_kafka_rdd 
    .foreach(lambda *args: None) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 747, in foreach 
    self.mapPartitions(processPartition).count() # Force evaluation 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1004, in count 
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 995, in sum 
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 869, in fold 
    vals = self.mapPartitions(func).collect() 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 772, in collect 
    return list(_load_from_socket(port, self._jrdd_deserializer)) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 142, in _load_from_socket 
    for item in serializer.load_stream(rf): 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream 
    yield self._read_with_length(stream) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 156, in _read_with_length 
    length = read_int(stream) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 543, in read_int 
    length = stream.read(4) 
    File "/usr/lib/python2.7/socket.py", line 380, in read 
    data = self._sock.recv(left) 
timeout: timed out 

Der Start des Stacktrace ist in unserem StreamManager Modul, Methode process_kafka_rdd: eine einzelne diskrete RDD Prozesse innerhalb des direkten Strom von Kafka-Nachrichten. Unsere Integration von Kafka mit Spark-Streaming basiert auf dem "direkten Ansatz", der auf http://spark.apache.org/docs/latest/streaming-kafka-integration.html

+0

Wie viele Verbraucher und Partitionen haben Sie bis zu diesem Fehler? –

Antwort

1

Meine Erfahrung mit Spark-und Socket-Fehler ist, dass einige Executor plötzlich gestorben ist. Ein anderer Executor, der gerade mit ihm kommuniziert, erhöht den Socket-Fehler.

Nach meiner Erfahrung ist die Ursache für den unerwarteten Tod eines Executors ein Mangel an Ressourcen, normalerweise ein Mangel an Speicher.

(Es ist wichtig, die Größe des Speichers Testamentsvollstrecker abzustimmen verwenden können. Die Standardeinstellungen sind in der Regel viel zu niedrig. Aber ich vermute, Sie sind bereits bewusst.)

Ich gehe davon aus Spark oben auf Garn läuft? Unglücklicherweise hat Spark nach meiner Erfahrung einen schlechten Job, die Ursache des Problems zu melden, wenn es in den Gedärmen vorkommt. Leider muss man in die Garnlisten graben, um herauszufinden, was den plötzlichen Tod des Executors tatsächlich verursacht hat. Die Ausführenden laufen jeweils in einem Garn- "Container"; Irgendwo in den Garnstämmen sollte ein Umschlag eines Containers stehen.