2017-06-20 10 views
0

Ich bin noch neu bei Spark/PySpark und habe folgende Frage. Ich habe eine verschachtelte Liste mit ID's drin:Spark/PySpark: Gruppiere nach jedem Element der geschachtelten Liste

result = [[411, 44, 61], [42, 33], [1, 100], [44, 42]] 

Die Sache bin ich zu erreichen versuchen, ist, dass, wenn ein Element der Unterliste ein Element in einem anderen Teilliste entspricht die beiden zusammengeführt werden sollen. Das Ergebnis sollte wie folgt aussehen:

merged_result = [[411, 44, 61, 42, 33, 44, 42], [1,100]] 

Die erste Liste in "Ergebnis" stimmt mit der vierten Liste überein. Die vierte Liste stimmt mit der zweiten überein, daher sollten alle 3 zu einer Liste zusammengeführt werden. Die dritte Liste passt zu keiner anderen Liste, also bleibt sie gleich.

Ich könnte dies erreichen, indem Sie Schleifen mit Python schreiben.

result_after_matching = [] 
for i in result: 
    new_list = i 
    for s in result: 
     if any(x in i for x in s): 
      new_list = new_list + s 
    result_after_matching.append(set(new_list)) 

#merged_result = [[411, 44, 61, 42], [42,33,44], [1, 100], [44,42,33,411,61]] 

Da dies nicht die gewünschte Ausgabe müsste ich die Schleife und tun einen anderen Satz() offenkundigen der „merged_result“)

set([[411,44,61,42,33], [42,33,44,411,61],[1,100], [44,42,33,411,61]]) 
-> [[411, 44, 61, 42, 33], [1,100]] 

Da die Liste der Listen wiederholen, und die Teil-Listen bekommt größer und größer nach der Zeit, wenn neue Daten eingehen werden, wird dies nicht die zu verwendende Funktion sein.

Kann mir jemand sagen, ob es eine Funktion in Spark/Pyspark gibt, um diese verschachtelten Listen viel einfacher und schneller zusammenzuführen/zusammenzuführen/zu gruppieren?

Vielen Dank im Voraus! MG

Antwort

2

Die meisten SDD- oder Datenrahmen-basierten Lösungen werden wahrscheinlich ziemlich ineffizient sein. Dies liegt daran, dass die Art Ihres Problems erfordert, dass jedes Element Ihres Datensatzes möglicherweise mehrfach mit jedem anderen Element verglichen wird. Dies macht es so, dass die Verteilung der Arbeit über einen Cluster bestenfalls ineffizient ist.

Vielleicht wäre eine andere Möglichkeit, dies als Graphproblem neu zu formulieren. Wenn Sie jedes Element in einer Liste als Knoten in einem Diagramm und jede Liste als Unterdiagramm behandeln, sind die verbundenen Komponenten eines übergeordneten Diagramms, das aus den Untergraphen erstellt wird, das gewünschte Ergebnis. Hier ist ein Beispiel für das NetworkX Paket in Python mit:

import networkx as nx 

result = [[411, 44, 61], [42, 33], [1, 100], [44, 42]] 

g = nx.DiGraph() 
for subgraph in result: 
    g.add_path(subgraph) 

u = g.to_undirected() 
output=[] 
for component in nx.connected_component_subgraphs(u): 
    output.append(component.nodes()) 

print(output) 
# [[33, 42, 411, 44, 61], [1, 100]] 

Dies sollte ziemlich effizient sein, aber wenn Sie Ihre Daten sehr groß sind, wird es sinnvoll sein, ein skalierbare Graphanalyse-Tool zu verwenden. Funke hat eine grafische Darstellung Verarbeitungs-Bibliothek namens GraphX:

https://spark.apache.org/docs/latest/graphx-programming-guide.html

Leider ist die pyspark Umsetzung ist hinter etwas hinterher. Also, wenn Sie etwas so verwenden möchten, könnten Sie gerade für scala-spark oder einen anderen Rahmen völlig für festhalten.

+0

Ihre Lösung funktioniert ziemlich schnell! Sogar mit 3k + Listen. Die einzige Sache, die nicht am realen Beispiel arbeitete, war, dass einzelne Wertlisten (z. B. [57]), die nicht als die Ausgabe enthalten sind, haben Sie irgendwelche Erklärungen ?! Einzelner Wert bedeutet, dass diese ID NICHT in einer anderen Liste enthalten ist. Also habe ich die Liste in zwei Teile geteilt, bevor ich den Code benutzt habe und sie danach wieder zusammengefügt habe. – mgruber

+0

Außerdem habe ich versucht, GraphX ​​zu verwenden, aber es funktioniert nicht in Python. Irgendwie kann ich nur "Python" -basierte Skripte in unserer Distribution verwenden. Ich werde mit unserem Vertriebsarchitekten sprechen. – mgruber

1

Ich denke, Sie können aggregate Aktion von RDD verwenden. Im Folgenden stelle ich eine Beispielimplementierung in Scala vor. Bitte beachten Sie, dass ich Rekursion verwendet habe, um es lesbarer zu machen, aber um die Leistung zu verbessern, ist es eine gute Idee, diese Funktionen neu zu implementieren.

def overlap(s1: Seq[Int], s2: Seq[Int]): Boolean = 
    s1.exists(e => s2.contains(e)) 

def mergeSeq(s1: Seq[Int], s2: Seq[Int]): Seq[Int] = 
    s1.union(s2).distinct 

def mergeSeqWithSeqSeq(s: Seq[Int], ss: Seq[Seq[Int]]): Seq[Seq[Int]] = ss match { 
    case Nil => Seq(s) 
    case h +: tail => 
     if(overlap(h, s)) mergeSeqWithSeqSeq(mergeSeq(h, s), tail) 
     else h +: mergeSeqWithSeqSeq(s, tail) 
} 

def mergeSeqSeqWithSeqSeq(s1: Seq[Seq[Int]], s2: Seq[Seq[Int]]): Seq[Seq[Int]] = s1 match { 
    case Nil => s2 
    case h +: tail => mergeSeqWithSeqSeq(h, mergeSeqSeqWithSeqSeq(tail, s2)) 
} 

val result = rdd 
    .aggregate(Seq.empty[Seq[Int]]) (
     {case (ss, s) => mergeSeqWithSeqSeq(s, ss)}, 
     {case (s1, s2) => mergeSeqSeqWithSeqSeq(s1, s2)} 
    ) 
Verwandte Themen