2016-07-18 4 views
0

Ich verwende Funken: 1.6.2 und MongoDB: 3.2.8Spark DB Insertion unter 10 Stunden für 60Gb Daten mongo

Ich habe einen Datenrahmen mit 8-Säule und 1 Milliarde Zeilen. shuffle schreiben für Datenrahmen ist 60 GB.

Ich werde diesen Datenrahmen in Mongodb mit Mongo-Spark-Connector (Mongo-Spark-Connector_2.10) einfügen.

MongoSpark.write(sourceValueDf).options(mongoDbOptions).mode(SaveMode.Append).save(); 

Für Einfügungen dauert es mehr als 10 Stunden.

Wie kann ich die Leistung erhöhen?

+0

können Sie weitere Einzelheiten zu Ihren Optionen angeben? Wie schreibst du es? in Masse oder je 1 Dokument? – somallg

Antwort

4

gibt es nicht viel mit zu gehen:

MongoSpark.write(sourceValueDf).options(mongoDbOptions).mode(SaveMode.Append).save()

Aber unabhängig von mongoDBOptions die Abstimmung müssen sein zweifach und Performance-Engpässe benötigen in Funken- und MongoDB fixiert werden. Der Schlüssel zum Erfolg besteht darin, zu verstehen, was passiert, wenn Sie den obigen Code ausführen, nur dann können Sie den besten Weg zur Verbesserung der Leistung bestimmen.

Funken

Ich habe einen Datenrahmen mit 8-Säule und 1 Milliarde Zeilen. shuffle write für Datenrahmen ist 60GB.

Es gibt keine Informationen über die sourceValueDf aber Sie müssen die Quelle konfigurieren und verstehen, dass der Flaschenhals? In der Dokumentation Spark Monitoring erfahren Sie mehr darüber, was in Ihren Spark-Jobs passiert.

Im Allgemeinen sind die wichtigsten Punkte für Spark Tuning; Partitionierung, Caching, Serialisierung und die shuffle Operation. Weitere Informationen finden Sie in diesem tollen Blogbeitrag von Cloudera: Working with Apache Spark: Or, How I Learned to Stop Worrying and Love the Shuffle. Es gibt viele Möglichkeiten, um Spark-Jobs zu verbessern.

MongoDB

die bei Werfen sie einen Blick, was die MongoDB Funkenstecker mit tun:

MongoSpark.write(sourceValueDf).options(mongoDbOptions).mode(SaveMode.Append).save()

Hier ist der Anschluss nimmt die darunter liegende RDD und speichert die Daten als Dokumente in einer vorhandenen Datenbank mit dem folgenden Logik:

rdd.foreachPartition(iter => if (iter.nonEmpty) { 
    mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[D] => 
    iter.grouped(DefaultMaxBatchSize).foreach(batch => collection.insertMany(batch.toList.asJava)) 
    }) 
}) 

Für jede Partition wird es die Schreibvorgänge in insertMany mit 512 Dokumenten pro Batch Batch (die zugrunde liegende Java-Treiber-Batchgröße). Eine geringe Anzahl von Partitionen in der sourceValueDf könnte sich negativ auf die Speicherleistung auswirken. Die Erhöhung der Anzahl der Partitionen kann die Parallelisierbarkeit dieser Methode für die Spark-Mitarbeiter erhöhen und somit die Durchgängigkeit erhöhen.

Es gibt andere allgemeine Methoden, um die Schreibleistung für Massenvorgänge zu MongoDB zu verbessern:

  • Netzwerk

    die Spark-Arbeitnehmer zu gewährleisten und die MongoDB-Instanz (en) sind colocated oder als Standard klein wie ein Netzwerk-Hop. Sie können Physik nicht schlagen.

  • Sharding

    Erhöhen Sie die Parallelisierbarkeit von Schreibvorgängen durch zu einer sharded Sammlung Einsetzen besonders, wenn Daten vorsortiert auf der Scherbe Schlüssel einsetzen. Co-location von Spark-Mitarbeitern mit Shaded MongoDs kann das schnellstmögliche Szenario für Schreibvorgänge bieten. Weitere Informationen zu Konfigurationsoptionen finden Sie im Abschnitt how can I achieve data locality in der Connector-Dokumentation.

  • Indizes

    Entfernen Indizes, bevor die Daten einfügen und sie nach dem Umbau. Beim Einfügen großer Datenmengen haben Benutzer Leistungsverbesserungen gefunden, indem sie Indizes zu Beginn des Prozesses entfernt und sie am Ende nur einmal erstellt haben. Zum Beispiel:

    val writeConfig = WriteConfig(mongoDbOptions) 
    MongoConnector(writeConfig.asOptions).withCollectionDo(writeConfig, { 
        coll: MongoCollection[Document] => coll.dropIndex("index") 
    }) 
    
    MongoSpark.write(sourceValueDf) 
          .options(writeConfig.asOptions) 
          .mode(SaveMode.Append) 
          .save() 
    
    MongoConnector(writeConfig.asOptions).withCollectionDo(writeConfig, { 
        coll: MongoCollection[Document] => coll.createIndex(...) 
    }) 
    
  • schreiben Sorge

    auf dem primären Knoten Schreiben nur und nicht für die Replikation warten verbessert auf Kosten der Redundanz Geschwindigkeit. Dies kann über die WriteConfig/mongoDbOptions konfiguriert werden. Siehe die Output configuration documentation.

Wie hoch ist die Belastung der MongoDB-Maschine (n), wenn Sie diesen Job ausführen? Ist es der Flaschenhals? Systeme wie MongoDB Cloud Manager bieten vollständige Sichtbarkeit und Überwachung der Leistung, damit Sie besser verstehen, was auf der MongoDB-Ebene passiert.

Verbesserung MongoDB und Funkenleistung

Kurz gesagt gibt es kein Allheilmittel oder magische Konfigurationsoption, um die Leistung zu verbessern. Es erfordert Debugging, Verständnis für das vorliegende Problem und möglicherweise in Betracht gezogene Konfiguration von Spark- und MongoDB-Clustern. Zusammen haben sie bereits gezeigt, very fast compute and storage zur Verfügung zu stellen, aber es hängt von der Verwendung ab und jedes System arbeitet zusammen.

Die ersten Schritte bestehen darin, die verfügbaren Überwachungstools zu verwenden, um zu verstehen, wo sich die Engpässe befinden.

+0

Ross ist richtig, du solltest einen Blick auf den Cloud Manager oder sogar auf Mongostat/Mongotop werfen, um zu sehen, ob Mongodb maximiert ist oder einfach im Leerlauf ist. Dann können Sie sich ein Bild davon machen, ob Sie die Softwarekonfiguration optimieren müssen oder ob Sie die Hardware-/Architekturkonfiguration anpassen müssen. – sweaves