2015-07-26 14 views
19

Ich habe eine Tabelle mit zwei String-Typ Spalten (Benutzername, Freund) und für jeden Benutzernamen möchte ich alle seine Freunde in einer Zeile, verkettet als Strings ('username1', "Freunde1, Freunde2, Freunde3"). Ich weiß, MySql tut dies durch GROUP_CONCAT, gibt es eine Möglichkeit, dies mit SPARK SQL zu tun?SPARK SQL-Ersatz für mysql GROUP_CONCAT Aggregatfunktion

Dank

Antwort

32

Bevor Sie fortfahren: Diese Operationen ist noch eine weitere andere groupByKey. Während es mehrere legitime Anwendungen hat, ist es relativ teuer, also verwenden Sie es nur bei Bedarf.


Nicht gerade kurz oder effiziente Lösung, aber Sie können UserDefinedAggregateFunction in Spark-1.5.0 eingeführt verwenden:

object GroupConcat extends UserDefinedAggregateFunction { 
    def inputSchema = new StructType().add("x", StringType) 
    def bufferSchema = new StructType().add("buff", ArrayType(StringType)) 
    def dataType = StringType 
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = { 
     buffer.update(0, ArrayBuffer.empty[String]) 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row) = { 
     if (!input.isNullAt(0)) 
     buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0)) 
    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { 
     buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0)) 
    } 

    def evaluate(buffer: Row) = UTF8String.fromString(
     buffer.getSeq[String](0).mkString(",")) 
} 

Beispiel Nutzung:

val df = sc.parallelize(Seq(
    ("username1", "friend1"), 
    ("username1", "friend2"), 
    ("username2", "friend1"), 
    ("username2", "friend3") 
)).toDF("username", "friend") 

df.groupBy($"username").agg(GroupConcat($"friend")).show 

## +---------+---------------+ 
## | username|  friends| 
## +---------+---------------+ 
## |username1|friend1,friend2| 
## |username2|friend1,friend3| 
## +---------+---------------+ 

Sie können auch einen Python-Wrapper erstellen, wie gezeigt in Spark: How to map Python with Scala or Java User Defined Functions?

In der Praxis kann es sein Fas um RDD, groupByKey, mkString zu extrahieren und DataFrame neu zu erstellen.

Sie können einen ähnlichen Effekt durch die Kombination von collect_list Funktion (Spark> = 1.6.0) mit concat_ws:

import org.apache.spark.sql.functions.{collect_list, udf, lit} 

df.groupBy($"username") 
    .agg(concat_ws(",", collect_list($"friend")).alias("friends")) 
+0

What If Ich möchte es in SQL verwenden Wie kann ich diese UDF in Spark SQL registrieren? –

+0

@MurtazaKanchwala [Es gibt eine 'register' Methode, die UDAFS akzeptiert] (https://github.com/apache/spark/blob/37c617e4f580482b59e1abbe3c0c27c7125cf605/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration .scala # L63-L69), so sollte es als Standard-UDF funktionieren. – zero323

+0

@ Zero323 jeder Ansatz, um das selbe in Spark sql 1.4.1 –

2

Eine Möglichkeit, es mit pyspark zu tun < 1.6, die leider nicht vom Benutzer nicht unterstützt definiert Aggregatfunktion:

byUsername = df.rdd.reduceByKey(lambda x, y: x + ", " + y) 

und wenn Sie wollen, dass es einen Datenrahmen machen wieder:

sqlContext.createDataFrame(byUsername, ["username", "friends"]) 

Ab 1.6 können Sie collect_list verwenden und dann die erstellte Liste ein:

from pyspark.sql import functions as F 
from pyspark.sql.types import StringType 
join_ = F.udf(lambda x: ", ".join(x), StringType()) 
df.groupBy("username").agg(join_(F.collect_list("friend").alias("friends")) 
10

können Sie versuchen, die collect_list Funktion

sqlContext.sql("select A, collect_list(B), collect_list(C) from Table1 group by A 

Oder Sie können ein UDF so etwas wie

sqlContext.udf.register("myzip",(a:Long,b:Long)=>(a+","+b)) 
regieter

und Sie können diese Funktion in der Abfrage

verwenden
+1

Ich habe das versucht, aber es funktioniert nur mit HiveContext –

2

Sprache: Scala Spark-Version: 1.5.2

hatte ich das gleiche Problem und habe auch versucht, es zu lösen udfs verwenden, aber leider dies zu mehr Problemen geführt hat später im Code aufgrund von Typinkonsistenzen.Ich war in der Lage, indem man zuerst um diesen den Weg zur Arbeit von die DF zu einer RDD dann Gruppierung Umwandlung und die Daten in der gewünschten Art und Weise zu manipulieren und dann zu einem DF die RDD zurück Umwandlung wie folge:

val df = sc 
    .parallelize(Seq(
     ("username1", "friend1"), 
     ("username1", "friend2"), 
     ("username2", "friend1"), 
     ("username2", "friend3"))) 
    .toDF("username", "friend") 

+---------+-------+ 
| username| friend| 
+---------+-------+ 
|username1|friend1| 
|username1|friend2| 
|username2|friend1| 
|username2|friend3| 
+---------+-------+ 

val dfGRPD = df.map(Row => (Row(0), Row(1))) 
    .groupByKey() 
    .map{ case(username:String, groupOfFriends:Iterable[String]) => (username, groupOfFriends.mkString(","))} 
    .toDF("username", "groupOfFriends") 

+---------+---------------+ 
| username| groupOfFriends| 
+---------+---------------+ 
|username1|friend2,friend1| 
|username2|friend3,friend1| 
+---------+---------------+