2015-07-16 8 views

Antwort

69

Mit rohen SQL können Sie CONCAT verwenden:

  • In Python

    df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v")) 
    df.registerTempTable("df") 
    sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df") 
    
  • In Scala

    import sqlContext.implicits._ 
    
    val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v") 
    df.registerTempTable("df") 
    sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df") 
    

Da Spark-1.5.0 können Sie concat verwenden Funktion Witz h Datenrahmen API:

  • In Python:

    from pyspark.sql.functions import concat, col, lit 
    
    df.select(concat(col("k"), lit(" "), col("v"))) 
    
  • In Scala:

    import org.apache.spark.sql.functions.{concat, lit} 
    
    df.select(concat($"k", lit(" "), $"v")) 
    

Es gibt auch concat_ws Funktion, die einen String-Separator als erstes Argument annimmt.

+0

Was ist, wenn der Datenrahmen Nullwert hat? wie folgt df = sqlContext.createDataFrame ([("foo", 1), ("bar", 2), ("check", null)], ("k", "v")) –

+1

@TarunKumar Meinst du? etwas wie [das] (http://stackoverflow.com/a/33152113/1560062)? – zero323

+0

das ist was ich wollte. danke –

14

Wenn Sie DF verwenden möchten, können Sie mithilfe von udf eine neue Spalte basierend auf vorhandenen Spalten hinzufügen.

val sqlContext = new SQLContext(sc) 
case class MyDf(col1: String, col2: String) 

//here is our dataframe 
val df = sqlContext.createDataFrame(sc.parallelize(
    Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F")) 
)) 

//Define a udf to concatenate two passed in string values 
val getConcatenated = udf((first: String, second: String) => { first + " " + second }) 

//use withColumn method to add a new column called newColName 
df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show() 
+0

Gibt es eine Möglichkeit, um dynamisch die Spalten von einem Eingang String zu verketten? – ashK

+0

Dies ist nicht optimal, im Vergleich zu DataFrame.concat_ws, da Funken nicht udfs Optimierung sehr gut auf allen /. Natürlich, in dem Moment, in dem Sie benutzerdefinierte Logik in Ihrer Verkettung benötigen, können Sie das UDF nicht vermeiden. –

4

Hier ist eine weitere Möglichkeit, dies für pyspark tun:

#import concat and lit functions from pyspark.sql.functions 
from pyspark.sql.functions import concat, lit 

#Create your data frame 
countryDF = sqlContext.createDataFrame([('Ethiopia',), ('Kenya',), ('Uganda',), ('Rwanda',)], ['East Africa']) 

#Use select, concat, and lit functions to do the concatenation 
personDF = countryDF.select(concat(countryDF['East Africa'], lit('n')).alias('East African')) 

#Show the new data frame 
personDF.show() 

----------RESULT------------------------- 

84 
+------------+ 
|East African| 
+------------+ 
| Ethiopian| 
|  Kenyan| 
|  Ugandan| 
|  Rwandan| 
+------------+ 
0

Ein anderer Weg, um es in pySpark zu tun SqlContext ...

#Suppose we have a dataframe: 
df = sqlContext.createDataFrame([('row1_1','row1_2')], ['colname1', 'colname2']) 

# Now we can concatenate columns and assign the new column a name 
df = df.select(concat(df.colname1, df.colname2).alias('joined_colname')) 
13

hier mit, wie Sie tun können benutzerdefinierte Benennung

import pyspark 
from pyspark.sql import functions as sf 
sc = pyspark.SparkContext() 
sqlc = pyspark.SQLContext(sc) 
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2']) 
df.show() 

gibt,

+--------+--------+ 
|colname1|colname2| 
+--------+--------+ 
| row11| row12| 
| row21| row22| 
+--------+--------+ 

neue Spalte erstellen, indem Sie verketten:

df = df.withColumn('joined_column', 
        sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2'))) 
df.show() 

+--------+--------+-------------+ 
|colname1|colname2|joined_column| 
+--------+--------+-------------+ 
| row11| row12| row11_row12| 
| row21| row22| row21_row22| 
+--------+--------+-------------+ 
+0

Warum nennst du 'sf.lit ('_')' und nicht nur ' '_''? –

+2

'lit' erzeugt eine Spalte von' _' – muon

3

Hier ein Vorschlag für ist, wenn Sie nicht wissen, die Nummer oder den Namen der Spalten in der Datenrahmen.

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*)) 
Verwandte Themen