2016-04-02 4 views
1

Ich möchte eindeutige IDs als Wertänderungen aus der vorherigen Zeile in der angegebenen Spalte generieren. Ich habe Datenframe in Spark Scala und möchte Unique_ID Spalte zu bestehenden Datenrahmen hinzufügen. Ich kann Zeilennummer nicht über Partitionen oder groupBy verwenden, da die gleichen Product_IDs mehrmals vorkommen und Unique_ID jedes Mal, wenn sie in die Spalte kommen, möchten.Willst du eindeutige IDs als Wertänderungen aus der vorherigen Zeile mit Hilfe von Scala generieren

Product_IDs Unique_ID 
Prod_1   1 
Prod_1   1 
Prod_1   1 
Prod_2   2 
Prod_3   3 
Prod_3   3 
Prod_2   4 
Prod_3   5 
Prod_1   6 
Prod_1   6 
Prod_4   7 

Ich brauche diesen Datenrahmen mit Spark Scala.

+1

Zuerst müssen Sie erklären, wie Sie eine Reihenfolge der Zeile definieren. – zero323

Antwort

0

Hier ist eine Lösung, die nicht unbedingt die effizienteste (ich gebe zu ich nicht einen Weg finden könnte optimieren Sie es), und ein bisschen lang, aber funktioniert.

ich den Eingang gehe davon aus Aufzeichnungen von diesem Fall Klasse dargestellt zusammen:

case class Record(id: Int, productId: String) 

Wo die id die Reihenfolge definiert.

Wir werden zwei Berechnungen durchführen:

  1. für jeden Datensatz die Minimum finden id jeder nachfolgenden Datensatz mit einem verschiedenenproductId
  2. Gruppe von diesem Wert (das ist ein repräsentiert Gruppe von aufeinander folgenden Datensätzen mit dem gleichen productId, und dann zipWithIndex, um die eindeutige ID zu erstellen, die wir interessiert sind

Ich Misch RDD-Operationen (für # 2) und SQL (für # 1) vor allem für die Bequemlichkeit, gehe davon aus ich beiden Operationen in jeder API durchgeführt werden können (obwohl ich nicht ausprobiert hätte):

val input = sqlContext.createDataFrame(Seq(
    Record(1, "Prod_1"), 
    Record(2, "Prod_1"), 
    Record(3, "Prod_1"), 
    Record(4, "Prod_2"), 
    Record(5, "Prod_3"), 
    Record(6, "Prod_3"), 
    Record(7, "Prod_2"), 
    Record(8, "Prod_3"), 
    Record(9, "Prod_1"), 
    Record(10, "Prod_1"), 
    Record(11, "Prod_4") 
)) 

input.registerTempTable("input") 

// Step 1: find "nextShiftId" for each record 
val withBlockId = sqlContext.sql(
    """ 
    |SELECT FIRST(a.id) AS id, FIRST(a.productId) AS productId, MIN(b.id) AS nextShiftId 
    |FROM input a 
    |LEFT JOIN input b ON a.productId != b.productId AND a.id < b.id 
    |GROUP BY a.id 
    """.stripMargin) 

withBlockId.show() 
// prints: 
// +---+---------+-----------+ 
// | id|productId|nextShiftId| 
// +---+---------+-----------+ 
// | 1| Prod_1|   4| 
// | 2| Prod_1|   4| 
// | 3| Prod_1|   4| 
// | 4| Prod_2|   5| 
// | 5| Prod_3|   7| 
// | 6| Prod_3|   7| 
// | 7| Prod_2|   8| 
// | 8| Prod_3|   9| 
// | 9| Prod_1|   11| 
// | 10| Prod_1|   11| 
// | 11| Prod_4|  null| 
// +---+---------+-----------+ 

// Step 2: group by "productId" and "nextShiftId" 
val resultRdd = withBlockId.rdd 
    .groupBy(r => (r.getAs[String]("productId"), r.getAs[Int]("nextShiftId"))) 
    // sort by nextShiftId to get the order right before adding index 
    .sortBy { 
    case ((prodId, 0), v) => Long.MaxValue // to handle the last batch where nextShiftId is null 
    case ((prodId, nextShiftId), v) => nextShiftId 
    } 
    // zip with index (which would be the "unique id") and flatMap to just what we need: 
    .values 
    .zipWithIndex() 
    .flatMap { case (records, index) => records.map(r => (r.getAs[String]("productId"), index+1))} 

// transform back into DataFrame: 
val result = sqlContext.createDataFrame(resultRdd) 

result.show() 
// prints: 
// +------+---+ 
// | _1| _2| 
// +------+---+ 
// |Prod_1| 1| 
// |Prod_1| 1| 
// |Prod_1| 1| 
// |Prod_2| 2| 
// |Prod_3| 3| 
// |Prod_3| 3| 
// |Prod_2| 4| 
// |Prod_3| 5| 
// |Prod_1| 6| 
// |Prod_1| 6| 
// |Prod_4| 7| 
// +------+---+ 
+0

@ Tzach Sohar vielen Dank, das ist eine fantastische Lösung und hat sehr gut für mich gearbeitet. – Nikhil

0

Es gibt Möglichkeiten, eine Spalte mit eindeutigen IDs hinzuzufügen, an die ich gerade denken kann. Eine davon ist die Verwendung zipWithUniqueId:

val rows = df.rdd.zipWithUniqueId().map { 
    case (r: Row, id: Long) => Row.fromSeq(r.toSeq :+ id) 
} 

val newDf = sqlContext.createDataFrame(rows, StructType(df.schema.fields :+ StructField("uniqueIdColumn", LongType, false))) 

ein anderer ist MonotonicallyIncreasingId Funktion zu verwenden:

import org.apache.spark.sql.functions.monotonicallyIncreasingId 
val newDf = df.withColumn("uniqueIdColumn", monotonicallyIncreasingId) 
+0

@Tzach Sohar 1., danke für deine Antworten. Beide oben genannten Funktionen werden mir nur die Reihenfolge erhöhen. Aber ich möchte Unique_ID Nummer als Produkt IDs von oben genannten Zeile generieren generieren. zum Beispiel für Prod_1 ist seine eindeutige_id 1, Prod_2 ist 2, Prod_3 ist 3, wieder Prod_2 ist um 1 erhöht, wie 4 und so weiter. also, wenn product_id nicht gleich product ids in der obigen Zeile ist, dann erhöhe die Unique_Id um 1. – Nikhil

+0

@Nikhil das ist nicht meine Antwort, ich habe es nur ein bisschen für die Lesbarkeit bearbeitet :) –

+0

@TzachZohar, hast du eine Lösung für diese Frage? – Nikhil

Verwandte Themen