2015-09-23 8 views
12

Ich habe eine RDD mit einem Tupel von Werten (String, sparsevector) und ich möchte ein Dataframe mit dem RDD erstellen. Um ein (label: string, features: vector) zu erhalten, ist DataFrame das Schema, das von den meisten Bibliotheken des ml-Algorithmus benötigt wird. Ich weiß, dass es getan werden kann, weil HashingTF ml Bibliothek einen Vektor ausgibt, wenn eine Feature-Spalte eines DataFrame gegeben wird.Wie konvertiere ich eine RDD mit einer sparsevector Spalte zu einem Datenrahmen mit einer Spalte als Vektor

temp_df = sqlContext.createDataFrame(temp_rdd, StructType([ 
     StructField("label", DoubleType(), False), 
     StructField("tokens", ArrayType(StringType()), False) 
    ])) 

#assumming there is an RDD (double,array(strings)) 

hashingTF = HashingTF(numFeatures=COMBINATIONS, inputCol="tokens", outputCol="features") 

ndf = hashingTF.transform(temp_df) 
ndf.printSchema() 

#outputs 
#root 
#|-- label: double (nullable = false) 
#|-- tokens: array (nullable = false) 
#| |-- element: string (containsNull = true) 
#|-- features: vector (nullable = true) 

Also meine Frage ist, kann ich irgendwie ein RDD von (String, sparsevector) mit wandelt es in einen Dataframe von (String, Vektor). Ich versuchte es mit dem üblichen sqlContext.createDataFrame, aber es gibt keine DataType, die die Bedürfnisse ich habe.

df = sqlContext.createDataFrame(rdd,StructType([ 
     StructField("label" , StringType(),True), 
     StructField("features" , ?Type(),True) 
    ])) 

Antwort

17

Sie haben VectorUDT hier verwenden:

# In Spark 1.x 
# from pyspark.mllib.linalg import SparseVector, VectorUDT 
from pyspark.ml.linalg import SparseVector, VectorUDT 

temp_rdd = sc.parallelize([ 
    (0.0, SparseVector(4, {1: 1.0, 3: 5.5})), 
    (1.0, SparseVector(4, {0: -1.0, 2: 0.5}))]) 

schema = StructType([ 
    StructField("label", DoubleType(), True), 
    StructField("features", VectorUDT(), True) 
]) 

temp_rdd.toDF(schema).printSchema() 

## root 
## |-- label: double (nullable = true) 
## |-- features: vector (nullable = true) 

Nur der Vollständigkeit halber Scala-Äquivalent:

import org.apache.spark.sql.Row 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.types.{DoubleType, StructType} 
// In Spark 1x. 
// import org.apache.spark.mllib.linalg.{Vectors, VectorUDT} 
import org.apache.spark.ml.linalg.Vectors 
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType 

val schema = new StructType() 
    .add("label", DoubleType) 
    // In Spark 1.x 
    //.add("features", new VectorUDT()) 
    .add("features",VectorType) 

val temp_rdd: RDD[Row] = sc.parallelize(Seq(
    Row(0.0, Vectors.sparse(4, Seq((1, 1.0), (3, 5.5)))), 
    Row(1.0, Vectors.sparse(4, Seq((0, -1.0), (2, 0.5)))) 
)) 

spark.createDataFrame(temp_rdd, schema).printSchema 

// root 
// |-- label: double (nullable = true) 
// |-- features: vector (nullable = true) 
+2

Wow, ich habe das im Laufe der Jahre gesucht! fast weinen vor Glück:,) +1 –

+1

Das hat funktioniert! vielen Dank! kannst du mir sagen wo in der Dokumentation das ist? kann keine VectorUDT auf linalg Apache Funken Docs –

+0

@OrangelMarquez vielleicht eine Pull-Anfrage erforderlich ist –

4

Während @ zero323 antworten https://stackoverflow.com/a/32745924/1333621 Sinn macht, und ich wünsche es für mich gearbeitet - die dem Dataframe zugrunde liegende rdd, sqlContext.createDataFrame (temp_rdd, schema), die noch enthaltenen SparseVectors-Typen hatte ich folgendes zu tun, um DenseVector Typen zu konvertieren - wenn jemand eine kürzere/bessere Möglichkeit hat ich will

temp_rdd = sc.parallelize([ 
    (0.0, SparseVector(4, {1: 1.0, 3: 5.5})), 
    (1.0, SparseVector(4, {0: -1.0, 2: 0.5}))]) 

schema = StructType([ 
    StructField("label", DoubleType(), True), 
    StructField("features", VectorUDT(), True) 
]) 

temp_rdd.toDF(schema).printSchema() 
df_w_ftr = temp_rdd.toDF(schema) 

print 'original convertion method: ',df_w_ftr.take(5) 
print('\n') 
temp_rdd_dense = temp_rdd.map(lambda x: Row(label=x[0],features=DenseVector(x[1].toArray()))) 
print type(temp_rdd_dense), type(temp_rdd) 
print 'using map and toArray:', temp_rdd_dense.take(5) 

temp_rdd_dense.toDF().show() 

root 
|-- label: double (nullable = true) 
|-- features: vector (nullable = true) 

original convertion method: [Row(label=0.0, features=SparseVector(4, {1: 1.0, 3: 5.5})), Row(label=1.0, features=SparseVector(4, {0: -1.0, 2: 0.5}))] 


<class 'pyspark.rdd.PipelinedRDD'> <class 'pyspark.rdd.RDD'> 
using map and toArray: [Row(features=DenseVector([0.0, 1.0, 0.0, 5.5]), label=0.0), Row(features=DenseVector([-1.0, 0.0, 0.5, 0.0]), label=1.0)] 

+------------------+-----+ 
|   features|label| 
+------------------+-----+ 
| [0.0,1.0,0.0,5.5]| 0.0| 
|[-1.0,0.0,0.5,0.0]| 1.0| 
+------------------+-----+ 
1

dies ist ein Beispiel in scala wissen für die Funken 2,1

import org.apache.spark.ml.linalg.Vector 

def featuresRDD2DataFrame(features: RDD[Vector]): DataFrame = { 
    import sparkSession.implicits._ 
    val rdd: RDD[(Double, Vector)] = features.map(x => (0.0, x)) 
    val df = rdd.toDF("label","features").select("features") 
    df 
    } 

die toDF() wurde nicht vom Compiler auf die Funktionen erkannt rdd

Verwandte Themen