Hier ist eine Lösung, die nicht unbedingt die effizienteste (ich gebe zu ich nicht einen Weg finden könnte optimieren Sie es), und ein bisschen lang, aber funktioniert.
ich den Eingang gehe davon aus Aufzeichnungen von diesem Fall Klasse dargestellt zusammen:
case class Record(id: Int, productId: String)
Wo die id
die Reihenfolge definiert.
Wir werden zwei Berechnungen durchführen:
- für jeden Datensatz die Minimum finden
id
jeder nachfolgenden Datensatz mit einem verschiedenenproductId
- Gruppe von diesem Wert (das ist ein repräsentiert Gruppe von aufeinander folgenden Datensätzen mit dem gleichen
productId
, und dann zipWithIndex, um die eindeutige ID zu erstellen, die wir interessiert sind
Ich Misch RDD-Operationen (für # 2) und SQL (für # 1) vor allem für die Bequemlichkeit, gehe davon aus ich beiden Operationen in jeder API durchgeführt werden können (obwohl ich nicht ausprobiert hätte):
val input = sqlContext.createDataFrame(Seq(
Record(1, "Prod_1"),
Record(2, "Prod_1"),
Record(3, "Prod_1"),
Record(4, "Prod_2"),
Record(5, "Prod_3"),
Record(6, "Prod_3"),
Record(7, "Prod_2"),
Record(8, "Prod_3"),
Record(9, "Prod_1"),
Record(10, "Prod_1"),
Record(11, "Prod_4")
))
input.registerTempTable("input")
// Step 1: find "nextShiftId" for each record
val withBlockId = sqlContext.sql(
"""
|SELECT FIRST(a.id) AS id, FIRST(a.productId) AS productId, MIN(b.id) AS nextShiftId
|FROM input a
|LEFT JOIN input b ON a.productId != b.productId AND a.id < b.id
|GROUP BY a.id
""".stripMargin)
withBlockId.show()
// prints:
// +---+---------+-----------+
// | id|productId|nextShiftId|
// +---+---------+-----------+
// | 1| Prod_1| 4|
// | 2| Prod_1| 4|
// | 3| Prod_1| 4|
// | 4| Prod_2| 5|
// | 5| Prod_3| 7|
// | 6| Prod_3| 7|
// | 7| Prod_2| 8|
// | 8| Prod_3| 9|
// | 9| Prod_1| 11|
// | 10| Prod_1| 11|
// | 11| Prod_4| null|
// +---+---------+-----------+
// Step 2: group by "productId" and "nextShiftId"
val resultRdd = withBlockId.rdd
.groupBy(r => (r.getAs[String]("productId"), r.getAs[Int]("nextShiftId")))
// sort by nextShiftId to get the order right before adding index
.sortBy {
case ((prodId, 0), v) => Long.MaxValue // to handle the last batch where nextShiftId is null
case ((prodId, nextShiftId), v) => nextShiftId
}
// zip with index (which would be the "unique id") and flatMap to just what we need:
.values
.zipWithIndex()
.flatMap { case (records, index) => records.map(r => (r.getAs[String]("productId"), index+1))}
// transform back into DataFrame:
val result = sqlContext.createDataFrame(resultRdd)
result.show()
// prints:
// +------+---+
// | _1| _2|
// +------+---+
// |Prod_1| 1|
// |Prod_1| 1|
// |Prod_1| 1|
// |Prod_2| 2|
// |Prod_3| 3|
// |Prod_3| 3|
// |Prod_2| 4|
// |Prod_3| 5|
// |Prod_1| 6|
// |Prod_1| 6|
// |Prod_4| 7|
// +------+---+
Zuerst müssen Sie erklären, wie Sie eine Reihenfolge der Zeile definieren. – zero323