Der Kontext
Zwei Dokumente eines Mongodb auf Schienen/Mongoid-Klassen abgebildet. Die beiden Klassen sind Task
und Subscription
. Aus Leistungsgründen speichert eine Task::CurrentTask
, die eine Teilmenge der Attribute einer Task
enthält, aber die tatsächliche aktuelle Aufgabe, die eine Subskription entspricht, ist die mit der höchsten Task#pos
für eine gegebene Task#subscription_id
.Wie kann man mit Mongoid aggregieren, um Daten aus zwei verschiedenen Feldern zu aggregieren?
Das Problem
Einige Unstimmigkeiten aus dem Subscription.current_task
zwischen einigen Attributen erschien und das sollte Task
, insbesondere die state
Feld werden übereinstimmen.
Tor
Auflistung aller aktuellen Aufgaben von Subscription
s, welche die letzte Aufgabe für dieses Abonnement entspricht.
Lösung richtet
Zuerst Karte/reduzieren Task
die letzte für jedes Abonnement zu erhalten und diese in eine temporäre Sammlung speichern. Drittens wird mit dieser temporären Sammlung unter Subscription
fortgefahren, um für jede Subskription ein Objekt zu erhalten, das sowohl die tatsächliche letzte Aufgabe als auch die aktuelle eingebettete Teilmengenkopie enthält. Drittens erstellen Sie den Bericht für Elemente, bei denen tatsächliche und kopierte Aufgaben nicht übereinstimmen.
Schwierigkeit
Während des official mongodb und mangoid documentation und anderes Beispiel in misc gelesen zu haben. blog wie MongoDB Map Re-Reduce and joins – performance tuning und MongoDB, Mongoid, MapReduce and Embedded Documents., ich bin immer noch nicht in der Lage, mit einer funktionierenden Lösung für den Weiterverarbeitungsschritt zu kommen.
Die nicht-funktionelle Lösung schrieb bisher:
# map/reduce of tasks to get the last one of each subscripton
last_task_map = %Q{
function() {
var key = this.subscription_id;
var value = {
task: {
pos: this.pos,
task_id: this._id,
state: this.state
},
current_task: null
};
emit(key, value);
}
}
last_task_reduce = %Q{
function(key, tasks) {
var last_task = tasks[0];
for (var i=1; i < tasks.length; i++) {
if(tasks[i].pos > last_task.pos) {
last_task = tasks[i];
}
}
var value = {
task: {pos: last_task.pos, task_id: last_task.task_id, state: last_task.state},
current_task: null
};
return value;
}
}
# map/reduce of `current_task`s to merged with previous results
subscription_map = %Q{
function() {
if(!this.current_task) {
return;
}
var key = this._id;
var value = {
task: null,
current_task: {
pos: this.current_task.pos,
task_id: this.current_task.task_id,
state: this.current_task.state,
source: 'current_task',
}
};
emit(key, value);
};
}
reduce = %Q{
function(key, tasks) {
if(tasks[0].current_task == nill) {
return {task: tasks[0].task, current_task: tasks[1].current_task};
}
return {task: tasks[1].task, current_task: tasks[0].current_task};
}
}
buffer = 'current_task_consistency'
# temporary collection seems unremoved when serially calling the script with
# `load` in a `rails c` prompt, so we drop it to avoid unwanted glitch merge
Mongoid.default_client[buffer].drop
t = Task.map_reduce(last_task_map, last_task_reduce).out(replace: buffer)
s = Subscription.map_reduce(subscription_map, reduce).out(reduce: buffer)
t.each{ |e| puts e } # ok: `{"_id"=>BSON::ObjectId('592dd603e138236671587b04'), "value"=>{"task"=>{"pos"=>0.0, "task_id"=>BSON::ObjectId('592dd604e138236671587b0f'), "state"=>40.0}, "current_task"=>nil}}`
puts t.counts # ok: {"input"=>83900, "emit"=>83900, "reduce"=>36115, "output"=>28625}
s.each{ |e| puts e } # ko: {"_id"=>BSON::ObjectId('592dd603e138236671587b04'), "value"=>{"task"=>nil, "current_task"=>{"pos"=>0.0, "task_id"=>BSON::ObjectId('592dd604e138236671587b0f'), "state"=>40.0, "source"=>"current_task"}}}
puts s.counts # ko: {"input"=>28632, "emit"=>28624, "reduce"=>0, "output"=>28624}
Das erwartete Ergebnis für die zweite Karte/reduziert, ist eine Zusammenführung der current_task_consistency
und die subscription_map
Ergebnisse, die alle innerhalb der verringern passieren sollen, wenn keine durchgeführt wird gemäß zu counts
, und tatsächlich zeigt die Ausgabe von s
Elementen, dass kein task
Schlüssel mit dem current_task_consistency
Wert zugewiesen wurde.
Fragen an die exponierte Problem im Zusammenhang
- Was die Umsetzung Seen tut?
- Soweit ich undestand, diese Lösung
merge
Funktionen liefern, die indempotent sind und den Ausgang im Einklang mit der entsprechendenmatch
Funktion zurückkehrt bietet. Was kann ich missverstehen, wie derout
Parameter funktioniert und wie die redudeuce Eingabe/Ausgabe verwaltet werden sollte?
Zusätzliche Bemerkungen
Der dritte Schritt, um den Bericht zu generieren, soll als finalize
Funktion auf der zweiten Map/Reduce angewendet umgesetzt werden.Aber vielleicht ist eine dritte Map/Reduce ein besserer Weg oder nicht. Insgesamt ist die Implementierung, zumindest unter dem Gesichtspunkt der Performance, schlecht strukturiert, und Feedback ist auch in diesem Punkt willkommen.
Wenn ich richtig verstehe, dann erhalten Sie zuerst Daten von 'Task', die auf' subscription_id' gruppiert ist und die übereinstimmenden Daten für den "größten" Wert von "pos" zurückgibt. Dann willst du die Abonnements durchgehen und einige Daten mit den Ergebnissen der ersten Ausgabe verbinden? Werden Daten von "Subskriptionen" für Schlüssel zurückgegeben, die nicht von der Ausgabe "Aufgaben" erzeugt werden? Weil, wo es keine neuen Schlüssel geben würde, sollten Sie wahrscheinlich Aggregat und '$ lookup' verwenden, anstatt einen zweistufigen mapReduce zu machen. –
Es scheint, dass Sie das Ziel gut verstanden haben, und mir war die Möglichkeit des $ Lookup nicht bewusst. Ich werde untersuchen, dass, sobald ich mit der Lösung map/reduce/finalize fertig bin, es für mich auch eine gute Übung ist, herauszufinden, wie man so etwas in Mongo abfragt. Vielen Dank. – psychoslave
siehe [Aggregationsdokumentation] (https://docs.mongodb.com/manual/aggregation/) und [$ lookup-Dokumentation] (https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/#) pipe._S_lookup) – psychoslave