2016-12-07 6 views
11

Ich habe ein Datenframe, das eine Zeile und mehrere Spalten hat. Einige der Spalten sind Einzelwerte und andere sind Listen. Alle Listenspalten haben die gleiche Länge. Ich möchte jede Listenspalte in eine separate Zeile aufteilen und dabei jede Nicht-Listenspalte beibehalten.Pyspark: Split mehrere Array-Spalten in Zeilen

Probe DF:

df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')]) 
# +---+---------+---------+---+ 
# | a|  b|  c| d| 
# +---+---------+---------+---+ 
# | 1|[1, 2, 3]|[7, 8, 9]|foo| 
# +---+---------+---------+---+ 

Was ich will:

+---+---+----+------+ 
| a| b| c | d | 
+---+---+----+------+ 
| 1| 1| 7 | foo | 
| 1| 2| 8 | foo | 
| 1| 3| 9 | foo | 
+---+---+----+------+ 

Wenn ich nur eine Liste Spalte hatte, dies einfach sein würde, nur um ein explode tun:

df_exploded = df.withColumn('b', explode('b')) 
# >>> df_exploded.show() 
# +---+---+---------+---+ 
# | a| b|  c| d| 
# +---+---+---------+---+ 
# | 1| 1|[7, 8, 9]|foo| 
# | 1| 2|[7, 8, 9]|foo| 
# | 1| 3|[7, 8, 9]|foo| 
# +---+---+---------+---+ 

Wenn ich jedoch auch versuchen, explode die c Spalte, ich am Ende mit einem Dataf Rame mit einer Länge auf den Platz, was ich will:

df_exploded_again = df_exploded.withColumn('c', explode('c')) 
# >>> df_exploded_again.show() 
# +---+---+---+---+ 
# | a| b| c| d| 
# +---+---+---+---+ 
# | 1| 1| 7|foo| 
# | 1| 1| 8|foo| 
# | 1| 1| 9|foo| 
# | 1| 2| 7|foo| 
# | 1| 2| 8|foo| 
# | 1| 2| 9|foo| 
# | 1| 3| 7|foo| 
# | 1| 3| 8|foo| 
# | 1| 3| 9|foo| 
# +---+---+---+---+ 

Was ich will, ist - für jede Spalte, nehmen Sie die n-te Element des Feldes in dieser Spalte und fügen hinzu, dass auf eine neue Zeile. Ich habe versucht, eine quer durch alle Spalten in dem Datenrahmen explodiert Abbildung, aber das scheint nicht zu funktionieren:

df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF() 

Antwort

13

Mit DataFrames und UDF:

from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType 
from pyspark.sql.functions import col, udf 

zip_ = udf(
    lambda x, y: list(zip(x, y)), 
    ArrayType(StructType([ 
     # Adjust types to reflect data types 
     StructField("first", IntegerType()), 
     StructField("second", IntegerType()) 
    ])) 
) 

(df 
    .withColumn("tmp", zip_("b", "c")) 
    # UDF output cannot be directly passed to explode 
    .withColumn("tmp", explode("tmp")) 
    .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d")) 

Mit RDDs:

(df 
    .rdd 
    .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)]) 
    .toDF(["a", "b", "c", "d"])) 

Beide Lösungen sind aufgrund des Python-Kommunikationsaufwands ineffizient. Wenn die Datengröße festgelegt ist, können Sie etwas tun:

from functools import reduce 
from pyspark.sql import DataFrame 

# Length of array 
n = 3 

# For legacy Python you'll need a separate function 
# in place of method accessor 
reduce(
    DataFrame.unionAll, 
    (df.select("a", col("b").getItem(i), col("c").getItem(i), "d") 
     for i in range(n)) 
).toDF("a", "b", "c", "d") 

oder sogar:

from pyspark.sql.functions import array, struct 

# SQL level zip of arrays of known size 
# followed by explode 
tmp = explode(array(*[ 
    struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c")) 
    for i in range(n) 
])) 

(df 
    .withColumn("tmp", tmp) 
    .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d")) 

Diese deutlich schneller im Vergleich zu UDF oder RDD werden soll. Generali eine beliebige Anzahl von Spalten zu unterstützen:

# This uses keyword only arguments 
# If you use legacy Python you'll have to change signature 
# Body of the function can stay the same 
def zip_and_explode(*colnames, n): 
    return explode(array(*[ 
     struct(*[col(c).getItem(i).alias(c) for c in colnames]) 
     for i in range(n) 
    ])) 

df.withColumn("tmp", zip_and_explode("b", "c", n=3)) 
4

Sie bräuchten flatMap verwenden, nicht map wie Sie mehrere Ausgabezeilen aus jeder Eingabezeile vornehmen möchten.

from pyspark.sql import Row 
def dualExplode(r): 
    rowDict = r.asDict() 
    bList = rowDict.pop('b') 
    cList = rowDict.pop('c') 
    for b,c in zip(bList, cList): 
     newDict = dict(rowDict) 
     newDict['b'] = b 
     newDict['c'] = c 
     yield Row(**newDict) 

df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))