2016-11-15 2 views
1

Ich bin neu in Python und PySpark. Ich habe einen Datenrahmen in PySpark wie die folgenden:Erstellen Sie eine Spalte in einem PySpark-Datenframe mit einer Liste, deren Indizes in einer Spalte des Datenrahmens vorhanden sind

## +---+---+------+ 
## | x1| x2| x3 | 
## +---+---+------+ 
## | 0| a | 13.0| 
## | 2| B | -33.0| 
## | 1| B | -63.0| 
## +---+---+------+ 

Ich habe ein Array: arr = [10, 12, 13]

Ich möchte eine Spalte x4 in dem Datenrahmen erstellen, so dass es sollte habe die entsprechenden Werte aus der Liste basierend auf den Werten von x1 als Indizes. Der letzte Dataset sollte wie folgt aussehen:

## +---+---+------+-----+ 
## | x1| x2| x3 | x4 | 
## +---+---+------+-----+ 
## | 0| a | 13.0| 10 | 
## | 2| B | -33.0| 13 | 
## | 1| B | -63.0| 12 | 
## +---+---+------+-----+ 

Ich habe versucht, den folgenden Code verwenden, um zu erreichen:

df.withColumn("x4", lit(arr[col('x1')])).show()

aber ich erhalte eine Fehlermeldung:

IndexError: only integers, slices (`:`), ellipsis (`...`), numpy.newaxis (`None`) and integer or boolean arrays are valid indices 

Ist Kann ich das irgendwie effizient erreichen?

Antwort

0

Während Sie eine Verbindung zwischen den Indizes Ihres Arrays und Ihrem ursprünglichen DataFrame herstellen, wäre ein Ansatz, Ihr Array in einen DataFrame zu konvertieren, den rownumber()-1 (der zu Ihren Indizes wird) zu generieren und dann die beiden DataFrames zusammenzufügen .

from pyspark.sql import Row 

# Create original DataFrame `df` 
df = sqlContext.createDataFrame(
    [(0, "a", 13.0), (2, "B", -33.0), (1, "B", -63.0)], ("x1", "x2", "x3")) 
df.createOrReplaceTempView("df") 

# Create column "x4" 
row = Row("x4") 

# Take the array 
arr = [10, 12, 13] 

# Convert Array to RDD, and then create DataFrame 
rdd = sc.parallelize(arr) 
df2 = rdd.map(row).toDF() 
df2.createOrReplaceTempView("df2") 

# Create indices via row number 
df3 = spark.sql("SELECT (row_number() OVER (ORDER by x4))-1 as indices, * FROM df2") 
df3.createOrReplaceTempView("df3") 

Jetzt haben Sie die beiden Datenrahmen: df und df3, können Sie die SQL-Abfrage unter laufen, um die beiden Datenrahmen zu verbinden.

select a.x1, a.x2, a.x3, b.x4 from df a join df3 b on b.indices = a.x1 

Hinweis, hier ist auch eine gute Referenzantwort auf die adding columns to DataFrames.

Verwandte Themen