2016-04-20 9 views
0

Ich habe schon viel recherchiert, konnte aber keine Lösung finden. Die nächste Frage, die ich hier finden konnte, ist Why my SPARK works very slowly with mongoDB.Wie effizient Daten von Mongodb lesen und in Dataframe von Spark konvertieren?

Ich versuche, eine Mongodb-Sammlung in Funken DataFrame mit Mongo-Hadoop-Anschluss zu laden. Hier ist ein Ausschnitt des entsprechenden Code:

connection_string = 'mongodb://%s:%s/randdb.%s'%(dbhost, dbport, collection_name) 
trainrdd = sc.mongoRDD(connection_string, config=config) 
#  traindf = sqlcontext.createDataFrame(trainrdd) 
#  traindf = sqlcontext.read.json(trainrdd) 
traindf = sqlcontext.jsonRDD(trainrdd) 

Hier 'sc' ist das SparkContext Objekt. Ich habe auch die Varianten ausprobiert, die im Code auskommentiert sind. Aber alle sind gleichermaßen langsam. Für eine Sammlung von 2 GB Größe (100000 Zeilen und 1000 Spalten) dauert es ungefähr 6 Stunden (holy moly: /) auf einem Cluster von 3 Maschinen mit jeweils 12 Kernen und 72 GB RAM (unter Verwendung aller Kerne in diesem Funkencluster). Der Mongodb-Server läuft ebenfalls auf einer dieser Maschinen.

Ich bin mir nicht sicher, ob ich es richtig mache. Hinweise zur Optimierung dieses Codes wären sehr hilfreich.

+0

Sie sind auf die Verwendung von 'jsonRDD' bezieht, die langsam ist? Können Sie versuchen, die RDD auf andere Weise in DataFrame zu konvertieren? –

+0

Hallo Wan Danke für die Antwort. Ja, die eigentliche Aktion beginnt, wenn Sie 'sqlcontext.jsonRDD (trainrdd)' aufrufen. Dies löst das mongodb-Lesen aus, wobei mongodb-Protokolle angeben, dass Verbindungen hergestellt und gelöscht werden. Ich habe andere Methoden ausprobiert (im obigen Code kommentiert), die ebenso langsam sind. Kürzlich habe ich sqlcontext.read.json in einer json-Datei aus der mongodb-Sammlung exportiert. Dies funktionierte relativ schnell vergleichsweise. – bitspersecond

+0

Welche Glasversion des [mongodb mongo-hadoop-Zündkerzensteckers] (https://github.com/mongodb/mongo-hadoop/blob/master/spark/src/main/python/README.rst) benutzt du? Können Sie versuchen, den MongoDB-Server von Spark-Knoten zu trennen? –

Antwort

2

Standardmäßig wird pyspark.sql.SQLContext.jsonRDD das Schema des angegebenen JSON-Datasets dynamisch ableiten. Spalten werden hinzugefügt, wenn neue JSON-Felder gefunden werden. Dies kann langsam sein, da jedes JSON-Attribut überprüft wird. Vor allem, wenn Sie 1000 Kolumnen haben.

Sie könnten das Schema stattdessen explizit definieren, wenn die Daten bekannt sind oder nur eine bestimmte Gruppe von Feldern benötigt wird.

Zusätzlich zu ObjectId Problem in HADOOP-277 beschrieben müssen Sie entweder Felder entfernen, die solche inkompatible Typen enthalten, oder in andere Typen konvertieren. das heißt str(ObjectId(...))

Zum Beispiel:

from pyspark import SparkContext, SparkConf 
from pyspark.sql import SQLContext 
from pyspark.sql.types import StructType, StructField, StringType 
import pymongo_spark 
pymongo_spark.activate() 
data_rdd = sc.mongoRDD("mongodb://localhost:27017/database.collection") 
sqlcontext = SQLContext(sc) 

# Define your schema explicitly 
schema = StructType([StructField("firstname", StringType()), 
        StructField("lastname", StringType()), 
        StructField("description", StringType())]) 

# Create a mapper function to return only the fields wanted, or to convert. 
def project(doc): 
    return {"firstname": str(doc["firstname"]), 
      "lastname": str(doc["lastname"]), 
      "description": str(doc["description"])} 

projected_rdd = data_rdd.map(project) 
train_df = sqlcontext.jsonRDD(projected_rdd, schema) 
train_df.first() 

Die obige Snippet in Umgebung getestet wurde: Spark v1.6.1, mongo-hadoop spark v1.5.2

+0

Hallo Wan, gut zu wissen über die Verwendung von Schema zu. Ich habe es versucht und für 100 Spalten hat es keinen Unterschied gemacht. Ich werde es auch für 1000 Spalten versuchen und werde die Ergebnisse hier veröffentlichen. Vielen Dank. – bitspersecond

Verwandte Themen