Spark-2.x
Es ist möglich, aber recht teuer. Mit den Daten Sie zur Verfügung gestellt haben:
case class Record(
visitorId: String, trackingIds: Array[String], emailIds: Array[String])
val df = Seq(
Record("a158", Array("666b"), Array("12")),
Record("7g21", Array("c0b5"), Array("45")),
Record("7g21", Array("c0b4"), Array("87")),
Record("a158", Array("666b", "777c"), Array.empty[String])).toDF
und eine Hilfsfunktion:
import org.apache.spark.sql.functions.udf
val flatten = udf((xs: Seq[Seq[String]]) => xs.flatten)
können wir die Rohlinge mit Platzhalter füllen:
import org.apache.spark.sql.functions.{array, lit, when}
val dfWithPlaceholders = df.withColumn(
"emailIds",
when(size($"emailIds") === 0, array(lit(""))).otherwise($"emailIds"))
collect_lists
und flatten
:
import org.apache.spark.sql.functions.{array, collect_listn}
val emailIds = flatten(collect_list($"emailIds")).alias("emailIds")
val trackingIds = flatten(collect_list($"trackingIds")).alias("trackingIds")
df
.groupBy($"visitorId")
.agg(trackingIds, emailIds)
// +---------+------------------+--------+
// |visitorId| trackingIds|emailIds|
// +---------+------------------+--------+
// | a158|[666b, 666b, 777c]| [12, ]|
// | 7g21| [c0b5, c0b4]|[45, 87]|
// +---------+------------------+--------+
Mit getippt statisch Dataset
:
df.as[Record]
.groupByKey(_.visitorId)
.mapGroups { case (key, vs) =>
vs.map(v => (v.trackingIds, v.emailIds)).toArray.unzip match {
case (trackingIds, emailIds) =>
Record(key, trackingIds.flatten, emailIds.flatten)
}}
// +---------+------------------+--------+
// |visitorId| trackingIds|emailIds|
// +---------+------------------+--------+
// | a158|[666b, 666b, 777c]| [12, ]|
// | 7g21| [c0b5, c0b4]|[45, 87]|
// +---------+------------------+--------+
Spark-1.x
Sie nach RDD und Gruppe umwandeln kann
import org.apache.spark.sql.Row
dfWithPlaceholders.rdd
.map {
case Row(id: String,
trcks: Seq[String @ unchecked],
emails: Seq[String @ unchecked]) => (id, (trcks, emails))
}
.groupByKey
.map {case (key, vs) => vs.toArray.unzip match {
case (trackingIds, emailIds) =>
Record(key, trackingIds.flatten, emailIds.flatten)
}}
.toDF
// +---------+------------------+--------+
// |visitorId| trackingIds|emailIds|
// +---------+------------------+--------+
// | 7g21| [c0b5, c0b4]|[45, 87]|
// | a158|[666b, 666b, 777c]| [12, ]|
// +---------+------------------+--------+
, was diese Methode nicht abflachen genau tun? – xXxpRoGrAmmErxXx
Was, wenn wir Dubletten in 'trackingIds' entfernen müssen? – puru