2017-02-21 3 views
1

Ich benutze Spark Datasets API, um Beinahe-Duplikate zu entfernen. Ich versuche, die doppelten Zeilen zu gruppieren, um nur eine Zeile von jeder Gruppe zu belassen, aber mit einer Spalte, die die Anzahl der Zeilen angibt, die in dieser Zeile zusammengefasst wurden.Wie werden einige Operationen an einem Spark-Dataset ausgeführt, ohne dass sich dies auf die Datenstruktur auswirkt?

Betrachten Sie das folgende Beispiel. Ich habe folgende Daten, wobei das letzte Feld die Zeilen spezifiziert in dieser Zeile kollabierte:

  • A, B, C, 5
  • A, D, G, 1

An diesem Punkt, Ich möchte die Daten nach dem ersten Feld gruppieren, den Rest der Felder der Zeile, in der die meisten Zeilen eingeklappt sind, beibehalten und die Anzahl der Zeilen, die in der zweiten Zeile enthalten sind, zur ersten hinzufügen. So wäre das Ergebnis:

  • A, B, C, 6

ich schon implementiert es und das Problem ist, über das Format der resultierenden Daten.

Hier ist mein Code:

val sameTitleArticlesCollapsed = articlesCollapsed.groupByKey(_.TITLE).reduceGroups((a,b) => if(a.TIMES_COLLAPSED > b.TIMES_COLLAPSED) a.copy(TIMES_COLLAPSED = a.TIMES_COLLAPSED + b.TIMES_COLLAPSED) else b.copy(TIMES_COLLAPSED = a.TIMES_COLLAPSED + b.TIMES_COLLAPSED)).toDF("key", "data") 

Wenn ich printSchema auf sameTitleArticlesCollapsed ausführen, ist die Ausgabe:

root 
|-- key: string (nullable = true) 
|-- data: struct (nullable = true) 
| |-- CODE: string (nullable = true) 
| |-- TITLE: string (nullable = true) 
| |-- NAUTHORS: string (nullable = true) 
| |-- AUTHORS: string (nullable = true) 
| |-- TIMES_COLLAPSED: decimal(38,0) (nullable = true) 

ich über die key Spalte ist es egal, und was würde ich bin gerne Extrahieren Sie die Daten innerhalb der Spalte data, um sie im selben Format wie vor dem Anwenden der groupByKey - reduceGroups zu halten.

root 
|-- CODE: string (nullable = true) 
|-- TITLE: string (nullable = true) 
|-- NAUTHORS: string (nullable = true) 
|-- AUTHORS: string (nullable = true) 
|-- TIMES_COLLAPSED: long (nullable = false) 

Wie könnte ich das tun? Gibt es einen besseren Weg, diesen Prozess zu machen?

Vielen Dank!

Antwort

2

Sie eine Karte am Ende wie unten hinzufügen könnte das ursprüngliche Schema

val sameTitleArticlesCollapsed = articlesCollapsed.groupByKey(_.title).reduceGroups((a,b) => if(a.times_collapsed > b.times_collapsed) a.copy(times_collapsed = a.times_collapsed + b.times_collapsed) else b.copy(times_collapsed = a.times_collapsed + b.times_collapsed)) 

val result = sameTitleArticlesCollapsed.map({case (_,value) => value}).toDF 

result.printSchema 
root 
|-- code: string (nullable = true) 
|-- title: string (nullable = true) 
|-- nauthors: string (nullable = true) 
|-- authors: string (nullable = true) 
|-- times_collapsed: long (nullable = true) 
behalten
Verwandte Themen