1

Ich habe einige Probleme bei der Änderung des Schemas, das ich für eine Zeitreihendatenbank verwende, die ich mit Mongo DB erstellt habe. Derzeit habe ich Aufzeichnungen wie unten gezeigt:Aggregation Abwickeln von Dokumentschlüsseln als neue Dokumente

{ 
    "_id" : 20, 
    "name" : "Bob, 
    "location" : "London", 
    "01/01/1993" : { 
     "height" : "110cm", 
     "weight" : "60kg", 
    }, 
    "02/01/1993" : { 
     "height" : "112cm", 
     "weight" : "61kg", 
    } 

}

Ich wünsche dem Aggregations Framework verwenden mehrere Datensätze für jede „Person“, eine für jeden „Zeitwert“ Subdokument zu schaffen, in der ursprüngliche Datensatz:

{ 
    "_id" : 20, 
    "name" : "Bob, 
    "date" : "01/01/1993" 
    "location" : "London", 
    "height" : "110cm", 
    "weight" : "60kg", 
}, 

{ 
    "_id" : 20, 
    "name" : "Bob, 
    "date" : "02/01/1993" 
    "location" : "London", 
    "height" : "112cm", 
    "weight" : "61kg", 
} 

die neue Regelung sollte viel effizienter sein, wenn zu jedem Datensatz eine große Anzahl von Zeitreihen Werte addiert und ich sollte nicht in eine max Dokumentgröße Fehler laufen!

Jede Hilfe, wie Sie dies mit der Mongo DB-Aggregationspipeline tun können, wäre sehr willkommen!

Antwort

1

Zwar gibt es Funktionen in modernen Versionen des Aggregation Framework, die Ihnen erlauben können, dies zu tun, aber die Laufleistung kann davon abweichen, ob es tatsächlich die beste Lösung dafür ist.

Im Wesentlichen können Sie ein Array von Einträgen erstellen, die aus den Dokumentschlüsseln bestehen, die die anderen Schlüssel der obersten Ebene nicht enthalten, die dann in dem Dokument enthalten wären.

db.getCollection('input').aggregate([ 
    { "$project": { 
    "name": 1, 
    "location": 1, 
    "data": { 
     "$filter": { 
     "input": { "$objectToArray": "$$ROOT" }, 
     "as": "d", 
     "cond": { 
      "$not": { "$in": [ "$$d.k", ["_id","name","location"] ] }  
     } 
     } 
    } 
    }}, 
    { "$unwind": "$data" }, 
    { "$replaceRoot": { 
    "newRoot": { 
     "$arrayToObject": { 
     "$concatArrays": [ 
      [{ "k": "id", "v": "$_id" }, 
      { "k": "name", "v": "$name" }, 
      { "k": "location", "v": "$location" }, 
      { "k": "date", "v": "$data.k" }], 
      { "$objectToArray": "$data.v" } 
     ] 
     } 
    } 
    }}, 
    { "$out": "output" } 
]) 

oder abwechselnd alles tun, die Umbildung in der Anfangs $project erzeugt innerhalb der Array-Elemente: Sie

db.getCollection('input').aggregate([ 
    { "$project": { 
    "_id": 0, 
    "data": { 
     "$map": { 
     "input": { 
      "$filter": { 
      "input": { "$objectToArray": "$$ROOT" }, 
      "as": "d", 
      "cond": { 
       "$not": { "$in": [ "$$d.k", ["_id", "name", "location"] ] }  
      } 
      } 
     }, 
     "as": "d", 
     "in": { 
      "$arrayToObject": { 
      "$concatArrays": [ 
       { "$filter": { 
       "input": { "$objectToArray": "$$ROOT" }, 
       "as": "r", 
       "cond": { "$in": [ "$$r.k", ["_id", "name", "location"] ] } 
       }}, 
       [{ "k": "date", "v": "$$d.k" }], 
       { "$objectToArray": "$$d.v" } 
      ] 
      } 
     } 
     } 
    } 
    }}, 
    { "$unwind": "$data" }, 
    { "$replaceRoot": { "newRoot": "$data" } }, 
    { "$out": "output" } 
]) 

So verwenden Das Array kann dann mit $unwind und das gesamte Ergebnis umgestaltet in neue Dokumente verarbeitet werden $objectToArray und $filter, um ein Array aus den Schlüsseln zu machen, die tatsächlich die Datenpunkte für jedes Datum enthalten.

Nach $unwind wenden wir grundsätzlich $arrayToObject auf einem Satz von benannten Tasten in dem „Array-Format“, um die für newRoot$replaceRoot schreiben und dann an die neue Sammlung zu konstruieren, als ein neues Dokument für jeden Datenschlüssel $out verwenden.

Das kann Sie nur einen Teil der Weg, obwohl Sie wirklich die "date" Daten zu einem BSON-Datum ändern sollten. Es benötigt viel weniger Speicherplatz und ist auch einfacher abzufragen.

var updates = []; 
db.getCollection('output').find().forEach(d => { 
    updates.push({ 
    "updateOne": { 
     "filter": { "_id": d._id }, 
     "update": { 
     "$set": { 
      "date": new Date(
      Date.UTC.apply(null, 
       d.date.split('/') 
       .reverse().map((e,i) => (i == 1) ? parseInt(e)-1: parseInt(e)) 
      ) 
     ) 
     } 
     } 
    } 
    }); 
    if (updates.length >= 500) { 
    db.getCollection('output').bulkWrite(updates); 
    updates = []; 
    } 
}) 

if (updates.length != 0) { 
    db.getCollection('output').bulkWrite(updates); 
    updates = []; 
} 

Natürlich, wenn Ihr MongoDB-Server-Funktionen, diese Aggregation fehlt, dann sind Sie besser dran, nur die Ausgabe in eine neue Kollektion Schreiben durch die Schleife in erster Linie iterieren:

var output = []; 

db.getCollection('input').find().forEach(d => { 
    output = [ 
    ...output, 
    ...Object.keys(d) 
     .filter(k => ['_id','name','location'].indexOf(k) === -1) 
     .map(k => Object.assign(
     { 
      id: d._id, 
      name: d.name, 
      location: d.location, 
      date: new Date(
      Date.UTC.apply(null, 
       k.split('/') 
       .reverse().map((e,i) => (i == 1) ? parseInt(e)-1: parseInt(e)) 
      ) 
     ) 
     }, 
     d[k] 
    )) 
    ]; 

    if (output.length >= 500) { 
    db.getCollection('output').insertMany(output); 
    output = [];  
    } 
}) 

if (output.length != 0) { 
    db.getCollection('output').insertMany(output); 
    output = []; 
} 

In jedem die Fälle, die wir auf die umgekehrten String-Elemente anwenden wollen, verwenden wir Date.UTC aus dem vorhandenen "string" -basierten Datum und erhalten einen Wert, der in ein BSON-Datum umgewandelt werden kann.

Die Aggregation Framework selbst nicht Gießen von Typen erlaubt so die nur Lösung für diesen Teil (und es ist ein notwendiger Teil) ist, um tatsächlich Schleife und zu aktualisieren, aber die Formulare unter Verwendung zumindest macht es effizient Schleife und aktualisieren.

Entweder Fall gibt Ihnen die gleiche Ende Ausgabe:

/* 1 */ 
{ 
    "_id" : ObjectId("599275b1e38f41729f1d64fe"), 
    "id" : 20.0, 
    "name" : "Bob", 
    "location" : "London", 
    "date" : ISODate("1993-01-01T00:00:00.000Z"), 
    "height" : "110cm", 
    "weight" : "60kg" 
} 

/* 2 */ 
{ 
    "_id" : ObjectId("599275b1e38f41729f1d64ff"), 
    "id" : 20.0, 
    "name" : "Bob", 
    "location" : "London", 
    "date" : ISODate("1993-01-02T00:00:00.000Z"), 
    "height" : "112cm", 
    "weight" : "61kg" 
} 
+0

Große Antwort. Im ersten Teil (Aggregationspipeline) verwenden Sie folgendes: "$ not": {"$ in": ["$$ d.k", ["_id", "name", "location"]]}. Worauf bezieht sich der $$ d.k? Ich verstehe, dass dies verwendet wird, um das Objekt zu einem Array-Befehl für die Felder _id, name und location zu "überspringen", aber unsicher bezüglich der genauen Methodik! – Ctrp

+0

Ich kann einfach keinen Hinweis auf 'k' in dem Code vor dieser Anweisung finden – Ctrp

+0

@Ctrp Sie müssen sich die Dokumentationsverknüpfungen in der Antwort anschauen, insbesondere bei '$ objectToArray'. Was es tut, nimmt jeden "Schlüssel" und "Wert" der Eingabe "Objekt" und erzeugt ein "Array" mit Einträgen wie '[{" k ":" 01/01/1993 "," v ": {" height " ":" 110cm "," Gewicht ":" 60kg "}},]'. Das ist also das, was "k" und "v" während der Auflistung bedeuten. –

Verwandte Themen