Ich benutze Spark 1.6.0 mit Spark-Streaming und ein Problem mit Wide-Operationen.Anwendung hängt, wenn ich für PipelinedRDD und RDD von DStream verbinden
Codebeispiel: Es gibt RDD namens "a" mit dem Typ: class 'pyspark.rdd.PipelainedRDD'.
"a" wurde erhalten als:
# Load a text file and convert each line to a Row.
lines = sc.textFile(filename)
parts = lines.map(lambda l: l.split(","))
clients = parts.map(lambda p: Row(client_id=int(p[0]), clientname=p[1] ...))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(clients)
schemaPeople.registerTempTable("clients")
client_list = sqlContext.sql("SELECT * FROM clients")
und nach:
a = client_list.map(lambda entry: (int(entry[1]), value_from_entry(entry)))
Es zweitem Teil "B" ist mit Typ-Klasse 'pyspark.streaming.dstream.TransformedDStream'. Ich bin "b" von Flume Empfang:
DStreamB = flumeStream.map(lambda tup: function_for_map(tup[1].encode('ascii','ignore')))
und
b = DStreamB.map(lambda event: (int(event[2]), value_from_event(event)))
Problem ist: Wenn ich als versuchen beitreten:
mult = b.transform(lambda rdd: rdd.join(a))
meine Anwendung in dieser Phase hängt (Jetzt zeige ich den Bildschirm nach b.pprint() und vor der Bühne .join())
01 Aberwenn ich hinzufügen:
Deklarieren RDD "test":
test = sc.parallelize(range(1, 100000)).map(lambda k: (k, 'value'))
und zu tun:
mult0 = a.join(test) mult = b.transform(lambda rdd: rdd.join(mult0))`
Dann funktioniert es (!!):
Auch kann ich tun:
mult0 = b.transform(lambda rdd: rdd.join(test))
So:
Ich habe RDDs "a" und "Test". DStream "b". Und ich multiplizieren kann:
- a * Test * b
- b * test
Aber ich kann 'b * a' nicht.
Jede Hilfe ist willkommen! Vielen Dank!