2016-04-17 11 views
1

Hier ist ein Beispiel meiner Daten haben kann:Wie ich eine verschachtelte Transformation in PySpark

data1 = [[ 'red blue hi you red' ], 
    [ 'ball green ball go four ball'], 
    [ 'nice red start nice' ], 
    [ 'ball no kill tree go go' ]] 

Abrufen den folgenden von den vorherigen Daten:

data2 = 
[[[ 'red', 2 ], [ 'blue', 1 ], [ 'hi', 1 ], [ 'you', 1 ]], 
[[ 'green', 1 ], [ 'go', 1 ], [ 'four', 1 ], [ 'ball', 3 ]], 
[[ 'red, 1 ], [ 'start', 1 ], [ 'nice', 2 ]], 
[[ 'ball', 1 ], [ 'no', 1 ], [ 'kill', 1 ], [ 'tree', 1 ], [ 'go', 2 ]]] 

Hinweis: Beachten Sie, dass die RDD Daten2 hat verschachtelte Listen enthält die Häufigkeit, mit der das Wort in jedem Element in den RDD-Daten erwähnt wird. 1 Was ich möchte, ist die Anwendung des folgenden Codes:

data3 = data2.map(lambda x: [data1.filter(lambda z: y[0] in z) for y in x]) 

Die Ausgabe sollte die Listen oder die Elemente von data1 sein, die das angegebene Wort enthalten. Zum Beispiel: Wenn das Wort ‚rot‘ ging dann an das Schleifenfilter, sollte es mir 2 Listen von data1 geben, die sind:

[ 'red blue hi you red' ] 
[ 'nice red start nice' ] 

aber es hält die folgenden Fehler geben:

Ausnahme: Es scheint, dass Sie versuchen, eine RDD zu senden oder eine RDD von einer Aktion oder Transformation zu referenzieren. RDD-Transformationen und -Aktionen können nur vom Treiber und nicht innerhalb anderer Transformationen aufgerufen werden. Beispielsweise ist rdd1.map (Lambda x: rdd2.values.count() * x) ungültig, da die Werteumwandlung und die Zählaktion nicht innerhalb der Umwandlung von rdd1.map ausgeführt werden können. Weitere Informationen finden Sie unter SPARK-5063.

Ich habe versucht, eine andere Art und Weise zu tun, die eine Funktion dann ist die Definition passieren innerhalb der Transformationskarte, wie:

def func(y) 
    data1.filter(lambda z: y[0] in z) 
data3 = data2.map(lambda x: [ func(y) for y in x]) 

Aber es ist immer noch die gleichen Fehler, offenbar versucht, schlau sein funktioniert nicht tho : 3 Was kann ich tun? Danke im Voraus.

Antwort

1

Die Antwort ist kurz und ziemlich definitiv: Sie können nicht. Verschachtelte Operationen auf verteilten Datenstrukturen werden in Spark nicht unterstützt und werden höchstwahrscheinlich auch nicht unterstützt. Je nach Kontext können Sie diese durch join oder map durch lokale (optional übermittelte) Datenstruktur ersetzen.

+0

also, was kann ich mit meinem Code tun? Ich möchte das Ergebnis finden. – Kale

Verwandte Themen