2016-05-19 22 views
9

Ich versuche, zwei PySpark Datenrahmen mit einigen Spalten zu verketten, die nur auf jeden von ihnen sind:Concatenate zwei PySpark Datenrahmen

from pyspark.sql.functions import randn, rand 

df_1 = sqlContext.range(0, 10) 

+--+ 
|id| 
+--+ 
| 0| 
| 1| 
| 2| 
| 3| 
| 4| 
| 5| 
| 6| 
| 7| 
| 8| 
| 9| 
+--+ 

df_2 = sqlContext.range(11, 20) 

+--+ 
|id| 
+--+ 
| 10| 
| 11| 
| 12| 
| 13| 
| 14| 
| 15| 
| 16| 
| 17| 
| 18| 
| 19| 
+--+ 

df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal")) 
df_2 = df_2.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal_2")) 

und jetzt will ich eine dritte Datenrahmen erzeugen. Ich möchte so etwas wie Pandas concat:

df_1.show() 
+---+--------------------+--------------------+ 
| id|    uniform|    normal| 
+---+--------------------+--------------------+ 
| 0| 0.8122802274304282| 1.2423430583597714| 
| 1| 0.8642043127063618| 0.3900018344856156| 
| 2| 0.8292577771850476| 1.8077401259195247| 
| 3| 0.198558705368724| -0.4270585782850261| 
| 4|0.012661361966674889| 0.702634599720141| 
| 5| 0.8535692890157796|-0.42355804115129153| 
| 6| 0.3723296190171911| 1.3789648582622995| 
| 7| 0.9529794127670571| 0.16238718777444605| 
| 8| 0.9746632635918108| 0.02448061333761742| 
| 9| 0.513622008243935| 0.7626741803250845| 
+---+--------------------+--------------------+ 

df_2.show() 
+---+--------------------+--------------------+ 
| id|    uniform|   normal_2| 
+---+--------------------+--------------------+ 
| 11| 0.3221262660507942| 1.0269298899109824| 
| 12| 0.4030672316912547| 1.285648175568798| 
| 13| 0.9690555459609131|-0.22986601831364423| 
| 14|0.011913836266515876| -0.678915153834693| 
| 15| 0.9359607054250594|-0.16557488664743034| 
| 16| 0.45680471157575453| -0.3885563551710555| 
| 17| 0.6411908952297819| 0.9161177183227823| 
| 18| 0.5669232696934479| 0.7270125277020573| 
| 19| 0.513622008243935| 0.7626741803250845| 
+---+--------------------+--------------------+ 

#do some concatenation here, how? 

df_concat.show() 

| id|    uniform|    normal| normal_2 | 
+---+--------------------+--------------------+------------+ 
| 0| 0.8122802274304282| 1.2423430583597714| None  | 
| 1| 0.8642043127063618| 0.3900018344856156| None  | 
| 2| 0.8292577771850476| 1.8077401259195247| None  | 
| 3| 0.198558705368724| -0.4270585782850261| None  | 
| 4|0.012661361966674889| 0.702634599720141| None  | 
| 5| 0.8535692890157796|-0.42355804115129153| None  | 
| 6| 0.3723296190171911| 1.3789648582622995| None  | 
| 7| 0.9529794127670571| 0.16238718777444605| None  | 
| 8| 0.9746632635918108| 0.02448061333761742| None  | 
| 9| 0.513622008243935| 0.7626741803250845| None  | 
| 11| 0.3221262660507942| None    | 0.123  | 
| 12| 0.4030672316912547| None    |0.12323  | 
| 13| 0.9690555459609131| None    |0.123  | 
| 14|0.011913836266515876| None    |0.18923  | 
| 15| 0.9359607054250594| None    |0.99123  | 
| 16| 0.45680471157575453| None    |0.123  | 
| 17| 0.6411908952297819| None    |1.123  | 
| 18| 0.5669232696934479| None    |0.10023  | 
| 19| 0.513622008243935| None    |0.916332123 | 
+---+--------------------+--------------------+------------+ 

Ist das möglich?

Antwort

10
df_concat = df_1.union(df_2) 

Die Datenrahmen können identische Spalten haben müssen, in welchem ​​Fall Sie withColumn() verwenden können normal_1 zu erstellen und normal_2

+0

Dank haben. Das Problem ist, wie ich oben sagte, dass die Spalten zwischen den beiden Datenrahmen nicht identisch sind. – Ivan

15

Vielleicht können Sie versuchen, die unexisting Erstellen von Spalten und union (unionAll für Spark 1.6 oder niedriger Aufruf):

cols = ['id', 'uniform', 'normal', 'normal_2']  

df_1_new = df_1.withColumn("normal_2", lit(None)).select(cols) 
df_2_new = df_2.withColumn("normal", lit(None)).select(cols) 

result = df_1_new.union(df_2_new) 
+1

'unionAll()' ist in Spark 2.0 veraltet. Verwenden Sie stattdessen 'union()' anstelle von –

+0

, die Sie mit 'withColumnRenamed' https://spark.apache.org/docs/latest/api/python/pypark.sql.html?highlight=selectexpr#pypark.sql.DataFrame.withColumnRenamed umbenennen können – Hunle

3

Hier ist eine Möglichkeit, es zu tun, falls es noch nützlich: ich laufe dies in pyspark Shell, Python-Version 2.7.12 und meine Funken installieren war Version 2.0.1.

PS: Ich denke, Sie wollten verschiedene Samen für die df_1 df_2 verwenden und der folgende Code spiegelt das wider.

from pyspark.sql.types import FloatType 
from pyspark.sql.functions import randn, rand 
import pyspark.sql.functions as F 

df_1 = sqlContext.range(0, 10) 
df_2 = sqlContext.range(11, 20) 
df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal")) 
df_2 = df_2.select("id", rand(seed=11).alias("uniform"), randn(seed=28).alias("normal_2")) 

def get_uniform(df1_uniform, df2_uniform): 
    if df1_uniform: 
     return df1_uniform 
    if df2_uniform: 
     return df2_uniform 

u_get_uniform = F.udf(get_uniform, FloatType()) 

df_3 = df_1.join(df_2, on = "id", how = 'outer').select("id", u_get_uniform(df_1["uniform"], df_2["uniform"]).alias("uniform"), "normal", "normal_2").orderBy(F.col("id")) 

Hier sind die Ausgänge I erhalten:

df_1.show() 
+---+-------------------+--------------------+ 
| id|   uniform|    normal| 
+---+-------------------+--------------------+ 
| 0|0.41371264720975787| 0.5888539012978773| 
| 1| 0.7311719281896606| 0.8645537008427937| 
| 2| 0.1982919638208397| 0.06157382353970104| 
| 3|0.12714181165849525| 0.3623040918178586| 
| 4| 0.7604318153406678|-0.49575204523675975| 
| 5|0.12030715258495939| 1.0854146699817222| 
| 6|0.12131363910425985| -0.5284523629183004| 
| 7|0.44292918521277047| -0.4798519469521663| 
| 8| 0.8898784253886249| -0.8820294772950535| 
| 9|0.03650707717266999| -2.1591956435415334| 
+---+-------------------+--------------------+ 

df_2.show() 
+---+-------------------+--------------------+ 
| id|   uniform|   normal_2| 
+---+-------------------+--------------------+ 
| 11| 0.1982919638208397| 0.06157382353970104| 
| 12|0.12714181165849525| 0.3623040918178586| 
| 13|0.12030715258495939| 1.0854146699817222| 
| 14|0.12131363910425985| -0.5284523629183004| 
| 15|0.44292918521277047| -0.4798519469521663| 
| 16| 0.8898784253886249| -0.8820294772950535| 
| 17| 0.2731073068483362|-0.15116027592854422| 
| 18| 0.7784518091224375| -0.3785563841011868| 
| 19|0.43776394586845413| 0.47700719174464357| 
+---+-------------------+--------------------+ 

df_3.show() 
+---+-----------+--------------------+--------------------+      
| id| uniform|    normal|   normal_2| 
+---+-----------+--------------------+--------------------+ 
| 0| 0.41371265| 0.5888539012978773|    null| 
| 1| 0.7311719| 0.8645537008427937|    null| 
| 2| 0.19829196| 0.06157382353970104|    null| 
| 3| 0.12714182| 0.3623040918178586|    null| 
| 4| 0.7604318|-0.49575204523675975|    null| 
| 5|0.120307155| 1.0854146699817222|    null| 
| 6| 0.12131364| -0.5284523629183004|    null| 
| 7| 0.44292918| -0.4798519469521663|    null| 
| 8| 0.88987845| -0.8820294772950535|    null| 
| 9|0.036507078| -2.1591956435415334|    null| 
| 11| 0.19829196|    null| 0.06157382353970104| 
| 12| 0.12714182|    null| 0.3623040918178586| 
| 13|0.120307155|    null| 1.0854146699817222| 
| 14| 0.12131364|    null| -0.5284523629183004| 
| 15| 0.44292918|    null| -0.4798519469521663| 
| 16| 0.88987845|    null| -0.8820294772950535| 
| 17| 0.27310732|    null|-0.15116027592854422| 
| 18| 0.7784518|    null| -0.3785563841011868| 
| 19| 0.43776396|    null| 0.47700719174464357| 
+---+-----------+--------------------+--------------------+ 
0

Dies sollte es für Sie tun ...

from pyspark.sql.types import FloatType 
from pyspark.sql.functions import randn, rand, lit, coalesce, col 
import pyspark.sql.functions as F 

df_1 = sqlContext.range(0, 6) 
df_2 = sqlContext.range(3, 10) 
df_1 = df_1.select("id", lit("old").alias("source")) 
df_2 = df_2.select("id") 

df_1.show() 
df_2.show() 
df_3 = df_1.alias("df_1").join(df_2.alias("df_2"), df_1.id == df_2.id, "outer")\ 
    .select(\ 
    [coalesce(df_1.id, df_2.id).alias("id")] +\ 
    [col("df_1." + c) for c in df_1.columns if c != "id"])\ 
    .sort("id") 
df_3.show() 
0

Above Antworten sind sehr elegant. Ich habe diese Funktion lange zurück geschrieben, wo ich auch kämpfte, um zwei Datenrahmen mit verschiedenen Spalten zu verketten.

Angenommen, Sie Datenrahmen sdf1 und sdf2

from pyspark.sql import functions as F 
from pyspark.sql.types import * 

def unequal_union_sdf(sdf1, sdf2): 
    s_df1_schema = set((x.name, x.dataType) for x in sdf1.schema) 
    s_df2_schema = set((x.name, x.dataType) for x in sdf2.schema) 

    for i,j in s_df2_schema.difference(s_df1_schema): 
     sdf1 = sdf1.withColumn(i,F.lit(None).cast(j)) 

    for i,j in s_df1_schema.difference(s_df2_schema): 
     sdf2 = sdf2.withColumn(i,F.lit(None).cast(j)) 

    common_schema_colnames = sdf1.columns 
    sdk = \ 
     sdf1.select(common_schema_colnames).union(sdf2.select(common_schema_colnames)) 
    return sdk 

sdf_concat = unequal_union_sdf(sdf1, sdf2) 
Verwandte Themen