2017-09-28 2 views
5

Ich habe eine Java-Anwendung, die eine große Menge an Daten aus MongoDB 3.2 lesen und in Hadoop übertragen muss.Lesen Sie große mongodb Daten

Diese Batch-Anwendung wird alle 4 Stunden 6 Mal am Tag ausgeführt.

Daten Spezifikationen:

  • Dokumente: 80000 zu einem Zeitpunkt (alle 4 Stunden)
  • Größe:
3GB

Derzeit verwende ich MongoTemplate und Morphia in um auf MongoDB zuzugreifen. Jedoch habe ich eine OOM Ausnahme erhalten, wenn diese Datenverarbeitung die folgende Verwendung:

List<MYClass> datalist = datasource.getCollection("mycollection").find().asList(); 

Was ist der beste Weg, um diese Daten zu lesen und zu Hadoop bevölkern?

  • MongoTemplate::Stream() und schreiben Sie auf Hadoop eins nach dem anderen?
  • batchSize(someLimit) und schreiben Sie die gesamte Charge auf Hadoop?
  • Cursor.batch() und schreiben zu hdfs eins nach dem anderen?

Antwort

1

Ihr Problem liegt auf den asList() Anruf

Dieser den Fahrer zwingt, durch den gesamten Cursor (80000 docs paar Gigs) zu durchlaufen, die alle im Speicher zu halten.

batchSize(someLimit) und Cursor.batch() Hilfe hier, wie Sie den gesamten Cursor durchlaufen, egal, welche Batch-Größe ist.

Stattdessen können Sie:

1) Iterate die Cursor: List<MYClass> datalist = datasource.getCollection("mycollection").find()

2) Lesen Sie ein Dokument gleichzeitig und füttern die Dokumente in einen Puffer (lassen Sie sich eine Liste sagen)

3) Für alle 1000 Dokumente (sagen wir) rufen Sie die Hadoop API auf, löschen Sie den Puffer und starten Sie ihn erneut.

0

Der Aufruf asList() versucht, die gesamte Mongodb-Sammlung in den Speicher zu laden. Versuchen, ein in-Memory-Listenobjekt größer als 3 GB zu machen.

Das Iterieren der Sammlung mit einem Cursor wird dieses Problem beheben. Sie können dies mit der Datenquellenklasse tun, aber ich bevorzuge die Art von sicheren Abstraktionen, die Morphia mit den Klassen DAO anbietet:

class Dao extends BasicDAO<Order, String> { 
    Dao(Datastore ds) { 
     super(Order.class, ds); 
    } 
    } 

    Datastore ds = morphia.createDatastore(mongoClient, DB_NAME); 
    Dao dao = new Dao(ds); 

    Iterator<> iterator = dao.find().fetch(); 
    while (iterator.hasNext()) { 
     Order order = iterator.next; 
     hadoopStrategy.add(order); 
    }