Code:Pyspark Column.isin() für einen großen Satz
views = sdf \
.where(sdf['PRODUCT_ID'].isin(PRODUCTS)) \
.rdd \
.groupBy(lambda x: x['SESSION_ID']) \
.toLocalIterator()
for sess_id, rows in views:
# do something
PRODUCTS
ist ein set
. Es ist groß, etwa 10000 Artikel.
Der Code schlägt mit:
--> 9 for sess_id, rows in views:
/usr/local/spark/python/pyspark/rdd.py in _load_from_socket(port, serializer)
--> 142 for item in serializer.load_stream(rf):
/usr/local/spark/python/pyspark/serializers.py in load_stream(self, stream)
--> 139 yield self._read_with_length(stream)
/usr/local/spark/python/pyspark/serializers.py in _read_with_length(self, stream)
--> 156 length = read_int(stream)
/usr/local/spark/python/pyspark/serializers.py in read_int(stream)
--> 543 length = stream.read(4)
/opt/conda/lib/python3.5/socket.py in readinto(self, b)
574 try:
--> 575 return self._sock.recv_into(b)
576 except timeout:
577 self._timeout_occurred = True
timeout: timed out
Aber wenn ich PRODUCTS
Satz kleiner machen alles ist in Ordnung. Ich habe versucht, einige Zeitüberschreitungswerte in der Spark-Konfiguration zu ändern. Es hat nicht geholfen. Wie kann man solche Abstürze vermeiden?
UPDATE
PRODUCTS = sdf.sort(['TIMESTAMP']).select('PRODUCT_ID').limit(10000).drop_duplicates()
views = sdf \
.join(PRODUCTS, 'PRODUCT_ID', 'inner') \
.rdd \
.groupBy(lambda x: x['SESSION_ID']) \
.toLocalIterator()
for sess_id, rows in views:
# do ...
Jetzt PRODUCTS
ist ein Datenrahmen. Und ich benutze join
. Haben Sie den gleichen Fehler ..
UPDATE 2
diese Lösung Versuch:
views = sdf \
.join(PRODUCTS, 'PRODUCT_ID', 'inner') \
.rdd \
.groupBy(lambda x: x['SESSION_ID'])
views.cache()
for sess_id, rows in views.toLocalIterator():
pass
Nach einiger Zeit eine sehr lange Fehlermeldung anzeigt:
Py4JJavaError: An error occurred while calling o289.javaToPython.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
....
Dieser Fehler trat nur einmal ! Jetzt bekomme ich die gleichen Timeout-Ausnahmen!
isin für eine große Datenmenge nutzlos Komplexität klug ist besser, ein Join ausführen oder sogar etwas wie versuchen, das Element zu erhalten dann wahr sonst falsch – eliasah
können Sie führen eine cou statt "toLocalIterator"? Ich bin mir nicht sicher, was Sie damit erreichen wollen. – eliasah
Was ich versuche zu tun ist, 'SDF' Datenrahmen durch' SESSION_ID' Spalte zu gruppieren und über diese Gruppen zu iterieren. Aber ich muss nur jene Zeilen auswählen, für die 'PRODUCT_ID' in einem vordefinierten Set ist. Und 'count' statt' toLocalIterator' funktioniert normal. – Leonid