53

Ich habe einen Spark DataFrame (mit PySpark 1.5.1) und möchte eine neue Spalte hinzufügen.Wie füge ich einem Spark DataFrame (mit PySpark) eine neue Spalte hinzu?

Ich habe versucht, die folgenden ohne Erfolg:

type(randomed_hours) # => list 

# Create in Python and transform to RDD 

new_col = pd.DataFrame(randomed_hours, columns=['new_col']) 

spark_new_col = sqlContext.createDataFrame(new_col) 

my_df_spark.withColumn("hours", spark_new_col["new_col"]) 

bekam auch einen Fehler mit dieser:

my_df_spark.withColumn("hours", sc.parallelize(randomed_hours)) 

Wie füge ich eine neue Spalte (basierend auf Python Vektor) zu ein bestehender DataFrame mit PySpark?

Antwort

101

Sie können keine beliebige Spalte zu einem DataFrame in Spark hinzufügen.

from pyspark.sql.functions import exp 

df_with_x5 = df_with_x4.withColumn("x5", exp("x3")) 
df_with_x5.show() 

## +---+---+-----+---+--------------------+ 
## | x1| x2| x3| x4|     x5| 
## +---+---+-----+---+--------------------+ 
## | 1| a| 23.0| 0| 9.744803446248903E9| 
## | 3| B|-23.0| 0|1.026187963170189...| 
## +---+---+-----+---+--------------------+ 

enthalten mit join:

from pyspark.sql.functions import exp 

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v")) 
df_with_x6 = (df_with_x5 
    .join(lookup, col("x1") == col("k"), "leftouter") 
    .drop("k") 
    .withColumnRenamed("v", "x6")) 

## +---+---+-----+---+--------------------+----+ 
## | x1| x2| x3| x4|     x5| x6| 
## +---+---+-----+---+--------------------+----+ 
## | 1| a| 23.0| 0| 9.744803446248903E9| foo| 
## | 3| B|-23.0| 0|1.026187963170189...|null| 
## +---+---+-----+---+--------------------+----+ 

oder erzeugt neue Spalten können nur von Literalen mit (anderen wörtlichen Typen werden beschrieben in How to add a constant column in a Spark DataFrame?)

from pyspark.sql.functions import lit 

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3")) 

df_with_x4 = df.withColumn("x4", lit(0)) 
df_with_x4.show() 

## +---+---+-----+---+ 
## | x1| x2| x3| x4| 
## +---+---+-----+---+ 
## | 1| a| 23.0| 0| 
## | 3| B|-23.0| 0| 
## +---+---+-----+---+ 

Umwandlung eines bestehenden Spalte erstellt werden mit Funktion/udf:

from pyspark.sql.functions import rand 

df_with_x7 = df_with_x6.withColumn("x7", rand()) 
df_with_x7.show() 

## +---+---+-----+---+--------------------+----+-------------------+ 
## | x1| x2| x3| x4|     x5| x6|     x7| 
## +---+---+-----+---+--------------------+----+-------------------+ 
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617| 
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873| 
## +---+---+-----+---+--------------------+----+-------------------+ 

Leistungsmäßig integrierte Funktionen (pyspark.sql.functions), die Catalyst-Ausdruck zugeordnet werden, werden normalerweise gegenüber benutzerdefinierten Python-Funktionen bevorzugt.

Wenn Sie Inhalte einer beliebigen RDD als Spalte hinzufügen möchten, können Sie

+0

"Neue Spalten können nur mithilfe von Literalen erstellt werden" Was genau bedeuten Literale in diesem Kontext? – timbram

35

Um eine Spalte mit einem UDF hinzufügen:

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3")) 

from pyspark.sql.functions import udf 
from pyspark.sql.types import * 

def valueToCategory(value): 
    if value == 1: return 'cat1' 
    elif value == 2: return 'cat2' 
    ... 
    else: return 'n/a' 

# NOTE: it seems that calls to udf() must be after SparkContext() is called 
udfValueToCategory = udf(valueToCategory, StringType()) 
df_with_cat = df.withColumn("category", udfValueToCategory("x1")) 
df_with_cat.show() 

## +---+---+-----+---------+ 
## | x1| x2| x3| category| 
## +---+---+-----+---------+ 
## | 1| a| 23.0|  cat1| 
## | 3| B|-23.0|  n/a| 
## +---+---+-----+---------+ 
13

Für Spark 2.0

# assumes schema has 'age' column 
df.select('*', (df.age + 10).alias('agePlusTen')) 
+1

Muss df.select sein ('*', (df.age + 10) .alias ('agePlusTen')) –

+0

Danke, und wenn Sie 'df = df.select ('*', (df.age + 10) .alias ('agePlusTen')) 'du fügst effektiv eine arbitrary column_ als @ zero323 warnte uns oben war unmöglich, es sei denn, es ist etwas falsch mit dies in Spark zu tun, in Pandas ist es der Standard Weg .. – cardamom

+0

Gibt es a Version davon für pySpark? – Tagar

-1

Sie können eine neues udf definieren, wenn ein column_name Zugabe:

u_f = F.udf(lambda :yourstring,StringType()) 
a.select(u_f().alias('column_name') 
-1
from pyspark.sql.functions import udf 
from pyspark.sql.types import * 
func_name = udf(
    lambda val: val, # do sth to val 
    StringType() 
) 
df.withColumn('new_col', func_name(df.old_col)) 
+0

Sie müssen 'StringType()' aufrufen. – gberger

0

Ich mag ein allgemeines Beispiel für einen sehr ähnlichen Anwendungsfall bieten:

Anwendungsfall: Ich habe eine CSV bestehend aus:

First|Third|Fifth 
data|data|data 
data|data|data 
...billion more lines 

Ich brauche einige Umwandlungen durchführen und die endgültige csv muss aussehen

First|Second|Third|Fourth|Fifth 
data|null|data|null|data 
data|null|data|null|data 
...billion more lines 

Ich muss dies tun, weil dies das Schema durch ein Modell, und ich brauche für meine endgültige Daten definiert ist zu sein interoperabel mit SQL Bulk Inserts und solchen Dingen.

so:

1) las ich die ursprüngliche csv spark.read verwenden und es "df" nennen.

2) Ich mache etwas mit den Daten.

3) ich die Null-Spalten mit diesem Skript hinzufügen:

outcols = [] 
for column in MY_COLUMN_LIST: 
    if column in df.columns: 
     outcols.append(column) 
    else: 
     outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column))) 

df = df.select(outcols) 

Auf diese Weise Sie Ihr Schema nach dem Laden einer CSV-Struktur kann (würde auch für Spalten Neuordnungs arbeiten, wenn Sie dies für viele zu tun haben, Tabellen).

Verwandte Themen