2017-08-02 3 views
0

Ich möchte K-Means in Spark auf Daten aus einer MongoDB ausführen. Ich habe ein Arbeitsbeispiel, der gegen eine Flatfile wirkt:Wie man MongoDB Daten in Spark für kmeans abbildet?

sc = SparkContext(appName="KMeansExample") # SparkContext 
data = sc.textFile("/home/mhoeller/kmeans_data.txt") 
parsedData = data.map(lambda line: array([int(x) for x in line.split(' ')])) 
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random") 

Dies ist das Format des Flatfile wird:

Jetzt möchte ich die flat~~POS=TRUNC mit MongoDB ersetzen:

spark = SparkSession \ 
.builder \ 
.appName("myApp") \ 
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \ 
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \ 
.getOrCreate() 

df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/ycsb.usertable").load() 

# <<<< Here I am missing the parsing >>>>> 

clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random") 

Ich mag es zu verstehen, wie Daten aus dem df zugeordnet werden, so dass es als Eingabe für Kmeans verwendet werden kann.

"Layout" der Datenbank ist:
root
| - _id: string (nullable = true)
| - Field0: binary (nullable = true)
| - field1: binär (nullable = true)
| - field2: binary (nullable = true)
| - field3: binary (nullable = true)
| - field4: binary (nullable = true)
| - field5: binary (nullable = true)
| - fiel d6: binary (nullable = true)
| - Feld7: binary (nullable = true)
| - field8: binary (nullable = true)
| - field9: binary (nullable = true)

Antwort

1

Ich mag es zu verstehen, wie Daten aus dem df zugeordnet werden, so dass es als Eingabe für Kmeans verwendet werden kann.

Basierend auf Ihrem Snippet ging ich davon aus, dass Sie PySpark verwenden.

Wenn man sich in clustering.KMeans Python API doc, können Sie sehen, dass der erste Parameter RDD of Vector or convertible sequence types

Nachdem Sie unter Code ausgeführt werden muss, die Ladedaten von MongoDB mit MongoDB Spark Connector

df = spark.read.format("com.mongodb.spark.sql.DefaultSource") 
       .option("uri","mongodb://127.0.0.1/ycsb.usertable") 
       .load() 

Was haben Sie in df ist ein DataFrame, also müssen wir es in etwas wandeln, umwandeln zu einem Vektortyp.

Da Sie in Ihrem Textdateibeispiel numpy.array verwenden, können wir diesen Array-Typ weiterhin verwenden, um den Übergang zu lernen.

Basierend auf der bereitgestellten layout, zuerst müssen wir die _id Spalte entfernen, da es nicht für das Clustering-Training benötigt wird. Siehe auch Vector Datentyp für weitere Informationen.

Mit den oben genannten Informationen, lassen Sie uns in sie erhalten:

# Drop _id column and get RDD representation of the DataFrame 
rowRDD = df.drop("_id").rdd 

# Convert RDD of Row into RDD of numpy.array 
parsedRdd = rowRDD.map(lambda row: array([int(x) for x in row])) 

# Feed into KMeans 
clusters = KMeans.train(parsedRdd, 2, maxIterations=10, initializationMode="random") 

Wenn Sie den booleschen Wert (True/False) statt integer (1/0) behalten möchten, dann können Sie die int entfernen Teil. Wie weiter unten:

parsedRdd = rowRDD.map(lambda row: array([x for x in row])) 

Setzen sie alle zusammen:

from numpy import array 
from pyspark.mllib.clustering import KMeans 
import org.apache.spark.sql.SparkSession 

spark = SparkSession \ 
.builder \ 
.appName("myApp") \ 
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \ 
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \ 
.getOrCreate() 

df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").load() 

rowRDD = df.drop("_id").rdd 
parsedRdd = rowRDD.map(lambda row: array([int(x) for x in row])) 

clusters = KMeans.train(parsedRdd, 2, maxIterations=10, initializationMode="random") 
clusters.clusterCenters 
Verwandte Themen