Ich möchte K-Means in Spark auf Daten aus einer MongoDB ausführen. Ich habe ein Arbeitsbeispiel, der gegen eine Flatfile wirkt:Wie man MongoDB Daten in Spark für kmeans abbildet?
sc = SparkContext(appName="KMeansExample") # SparkContext
data = sc.textFile("/home/mhoeller/kmeans_data.txt")
parsedData = data.map(lambda line: array([int(x) for x in line.split(' ')]))
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
Dies ist das Format des Flatfile wird:
Jetzt möchte ich die flat~~POS=TRUNC mit MongoDB ersetzen:
spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \
.getOrCreate()
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/ycsb.usertable").load()
# <<<< Here I am missing the parsing >>>>>
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
Ich mag es zu verstehen, wie Daten aus dem df zugeordnet werden, so dass es als Eingabe für Kmeans verwendet werden kann.
"Layout" der Datenbank ist:
root
| - _id: string (nullable = true)
| - Field0: binary (nullable = true)
| - field1: binär (nullable = true)
| - field2: binary (nullable = true)
| - field3: binary (nullable = true)
| - field4: binary (nullable = true)
| - field5: binary (nullable = true)
| - fiel d6: binary (nullable = true)
| - Feld7: binary (nullable = true)
| - field8: binary (nullable = true)
| - field9: binary (nullable = true)