2017-06-26 5 views
1

Ich habe eine CSV-Datei und die Daten für customer_id sieht wie folgt aus.Wie teile ich Datensätze auf?

CUSTID,LOC,PRODNAME,UNITS,TYPE,PURCHASE_DATE 
123,"SA","PROD1",1000,"PAY","20-DEC-2016" 
123,"SA","PROD2",500,"REC","31-AUG-2016" 

Und das hat umgewandelt in drei Datensätze ein als übergeordnete Datensatz und die beiden anderen als Kind Aufzeichnungen über die Spalte TYPE, wie unten zu stützen. Dies muss schließlich in eine andere CSV-Datei gehen.

ROWTYPE,CUSTID,LOC,TYPE,PRODUCT_NAME,UNITS,PURCHASE_DATE 
PARENT,123,"SA" 
CHILD,123,"SA","PAY","PROD1","20-DEC-2016" 
CHILD,123,"SA","REC","PROD2","31-AUG-2016" 

Zwei weitere Dinge auf die gleiche Aufgabe.

  1. Meine Ausgabe CSV-Datei hat nur eine Kopfzeile nur für den übergeordneten Datensatz und untergeordnete Datensätze können mehr oder weniger Felder als der übergeordnete Datensatz haben. Aus diesem Grund kann ich UNION Eltern und Kinder nicht, also habe ich gerade alle Dataframes in RDD umgewandelt und eine Union auf diesem gemacht. Also nur zu wissen, wenn dies mit Dataframes getan werden muss, wie würde ich es tun?

  2. Dann muss ich schließlich eine CSV-Datei in einer bestimmten Reihenfolge basierend auf CUSTID und TYPE-Feld erstellen. Ich weiß, dass dies einfach bei Datenrahmen ist, aber seit ich sie in RDDs umgewandelt habe, habe ich Folgendes getan, weiß aber nicht, ob es ein optimaler Ansatz ist.

     
    val rdd: RDD[((String,Int),Row)] = solution.map(row => ({ 
        val custId: String = row.getString(1) 
        val transType: Int = row.getString(0) match { 
        case "PAY" => 1 
        case "REC" => 2 
        case _ => 0 
        } 
        (custId,transType)},row)) 
    
    implicit val caseInsensitiveOrdering = new Ordering[String] { 
        override def compare(a: String, b: String) = a.toLowerCase().compare(b.toLowerCase()) 
    } 
    

Auch dies funktioniert nicht, wenn ich die Ausgabe CSV-Datei neu partitionieren und fusioniert, wie die Bestellung für einen Wurf geht.

+0

Können Sie erklären, was _ "basierend auf der Spalte TYPE wie folgt" _ bedeutet? –

Antwort

1

Getting PARENT,123,"SA" scheint einfach durch eine einfache Abfrage mit groupBy.

val parents = customers. 
    select(lit("PARENT") as "ROWTYPE", $"custid", $"loc"). 
    dropDuplicates 
scala> parents.show 
+-------+------+---+ 
|ROWTYPE|custid|loc| 
+-------+------+---+ 
| PARENT| 123| SA| 
+-------+------+---+ 

Damit Sie union es mit dem Rest der endgültigen PARENT und CHILD Aufzeichnungen zu bekommen.

Da union kann nur mit der gleichen Anzahl von Spalten in Tabellen durchgeführt werden, und customers 6 Spalten und parents hat drei Spalten, müssen Sie die Datensätze übereinstimmen.

val fullParents = parents. 
    withColumn("PRODUCT_NAME", lit("")). 
    withColumn("UNITS", lit("")). 
    withColumn("TYPE", lit("")). 
    withColumn("PURCHASE_DATE", lit("")) 
scala> .show 
+-------+------+---+------------+-----+----+-------------+ 
|ROWTYPE|custid|loc|PRODUCT_NAME|UNITS|TYPE|PURCHASE_DATE| 
+-------+------+---+------------+-----+----+-------------+ 
| PARENT| 123| SA|   |  | |    | 
+-------+------+---+------------+-----+----+-------------+ 

Lassen Sie uns die ROWTYPE als CHILD zu customers hinzufügen.

val rowtypedCustomers = customers. 
    select(lit("CHILD") as "ROWTYPE", customers("*")). 
    withColumnRenamed("PRODNAME", "PRODUCT_NAME") 
scala> rowtypedCustomers.show 
+-------+------+---+------------+-----+----+-------------+ 
|ROWTYPE|CUSTID|LOC|PRODUCT_NAME|UNITS|TYPE|PURCHASE_DATE| 
+-------+------+---+------------+-----+----+-------------+ 
| CHILD| 123| SA|  PROD1| 1000| PAY| 20-DEC-2016| 
| CHILD| 123| SA|  PROD2| 500| REC| 31-AUG-2016| 
+-------+------+---+------------+-----+----+-------------+ 

val solution = fullParents.union(rowtypedCustomers) 
scala> .show 
+-------+------+---+------------+-----+----+-------------+ 
|ROWTYPE|custid|loc|PRODUCT_NAME|UNITS|TYPE|PURCHASE_DATE| 
+-------+------+---+------------+-----+----+-------------+ 
| PARENT| 123| SA|   |  | |    | 
| CHILD| 123| SA|  PROD1| 1000| PAY| 20-DEC-2016| 
| CHILD| 123| SA|  PROD2| 500| REC| 31-AUG-2016| 
+-------+------+---+------------+-----+----+-------------+ 

als CSV-Schreiben ist so einfach wie die folgende Abfrage:

solution.write.csv("solution.csv") 

Erledigt. Herzlichen Glückwunsch!

+0

Wenn die Antwort hilfreich ist, akzeptiere sie bitte (und möglicherweise auch die Einladung). Vielen Dank! –

+0

Hallo Jacek, Nur zwei weitere Dinge, 1. Entschuldigung, die Anzahl der Spalten zwischen Eltern und Kind Datensätze sind in meinem Fall anders, so dass ich Eltern-Kind-Datenrahmen nicht verbinden kann. Gibt es einen Weg, damit umzugehen? 2. Im Fall, wenn ich einige generische mathematische Berechnungen für die untergeordneten Datensätze durchführen muss, die tatsächlich auf den übergeordneten Spalten basiert. Wie führe ich diese Berechnungen durch? Danke – 1pluszara

+0

Re 1. Deshalb habe ich 'withColumn' verwendet, um' fullParents' zu erstellen, um die Anzahl der Spalten in 'customers' zu erreichen. Zu 2. Das ist eine separate Frage, die ich Ihnen als separate Frage zu StackOverflow empfehlen möchte. Bitte akzeptieren Sie die Antwort, wenn die Antwort geholfen hat. –