2016-08-17 3 views
5

Code:Pyspark Column.isin() für einen großen Satz

views = sdf \ 
    .where(sdf['PRODUCT_ID'].isin(PRODUCTS)) \ 
    .rdd \ 
    .groupBy(lambda x: x['SESSION_ID']) \ 
    .toLocalIterator() 

for sess_id, rows in views: 
    # do something 

PRODUCTS ist ein set. Es ist groß, etwa 10000 Artikel.

Der Code schlägt mit:

--> 9 for sess_id, rows in views: 

/usr/local/spark/python/pyspark/rdd.py in _load_from_socket(port, serializer) 
--> 142   for item in serializer.load_stream(rf): 

/usr/local/spark/python/pyspark/serializers.py in load_stream(self, stream) 
--> 139     yield self._read_with_length(stream) 

/usr/local/spark/python/pyspark/serializers.py in _read_with_length(self, stream) 
--> 156   length = read_int(stream) 

/usr/local/spark/python/pyspark/serializers.py in read_int(stream) 
--> 543  length = stream.read(4) 

/opt/conda/lib/python3.5/socket.py in readinto(self, b) 
    574    try: 
--> 575     return self._sock.recv_into(b) 
    576    except timeout: 
    577     self._timeout_occurred = True 

timeout: timed out 

Aber wenn ich PRODUCTS Satz kleiner machen alles ist in Ordnung. Ich habe versucht, einige Zeitüberschreitungswerte in der Spark-Konfiguration zu ändern. Es hat nicht geholfen. Wie kann man solche Abstürze vermeiden?

UPDATE

PRODUCTS = sdf.sort(['TIMESTAMP']).select('PRODUCT_ID').limit(10000).drop_duplicates() 

views = sdf \ 
    .join(PRODUCTS, 'PRODUCT_ID', 'inner') \ 
    .rdd \ 
    .groupBy(lambda x: x['SESSION_ID']) \ 
    .toLocalIterator() 

for sess_id, rows in views: 
    # do ... 

Jetzt PRODUCTS ist ein Datenrahmen. Und ich benutze join. Haben Sie den gleichen Fehler ..

UPDATE 2

diese Lösung Versuch:

views = sdf \ 
    .join(PRODUCTS, 'PRODUCT_ID', 'inner') \ 
    .rdd \ 
    .groupBy(lambda x: x['SESSION_ID']) 
views.cache() 

for sess_id, rows in views.toLocalIterator(): 
    pass 

Nach einiger Zeit eine sehr lange Fehlermeldung anzeigt:

Py4JJavaError: An error occurred while calling o289.javaToPython. 
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) 
.... 

Dieser Fehler trat nur einmal ! Jetzt bekomme ich die gleichen Timeout-Ausnahmen!

+1

isin für eine große Datenmenge nutzlos Komplexität klug ist besser, ein Join ausführen oder sogar etwas wie versuchen, das Element zu erhalten dann wahr sonst falsch – eliasah

+0

können Sie führen eine cou statt "toLocalIterator"? Ich bin mir nicht sicher, was Sie damit erreichen wollen. – eliasah

+0

Was ich versuche zu tun ist, 'SDF' Datenrahmen durch' SESSION_ID' Spalte zu gruppieren und über diese Gruppen zu iterieren. Aber ich muss nur jene Zeilen auswählen, für die 'PRODUCT_ID' in einem vordefinierten Set ist. Und 'count' statt' toLocalIterator' funktioniert normal. – Leonid

Antwort

0

Wie @eliasah in seinem Kommentar sagte. Sie sollten versuchen, beiden DataFrames beizutreten, um auszuschließen, was nicht in Ihrer PRODUCTS-Tabelle enthalten ist.

views = sdf \ 
    .join(PRODUCTS) \ 
    .where(sdf['PRODUCT_ID']) \ 
    .rdd \ 
    .groupBy(lambda x: x['SESSION_ID']) \ 
    .toLocalIterator() 
+0

Willkommen bei SO! Aber bitte machen Sie sich etwas Mühe, um Ihren Code zu erklären und ihn zu formatieren. – eliasah

1

Ich glaube, dies im Grunde einen Fehler bei der Umsetzung von toLocalIterator() in pyspark 2.0.2 zurückzuführen ist. Sie können mehr hier lesen: [SPARK-18281][SQL][PySpark] Remove timeout for reading data through socket for local iterator.

Es scheint, dass das Update im nächsten Update nach 2.0.2 und im 2.1.x Release verfügbar sein wird. Wenn Sie sich wünschen, es zu beheben vorübergehend, die Änderungen aus der Ausgabe oben anwenden können:

diese ersetzen um die Linie 138 von rdd.py (auf dem tatsächlichen Funken Cluster, es scheint, dass Sie die rdd.py innen pyspark.zip aktualisieren müssen:

try: 
    rf = sock.makefile("rb", 65536) 
    for item in serializer.load_stream(rf): 
     yield item 
finally: 
    sock.close() 

mit diesem:..

sock.settimeout(None) # << this is they key line that disables timeout after the initial connection 
return serializer.load_stream(sock.makefile("rb", 65536)) 
+1

Sieht so aus, als ob das Update veröffentlicht wurde. Ich hatte das gleiche Problem und nach dem Update auf 2.1.0 ging es weg. – amustafa