2017-06-08 5 views
0

Der Kontext

Zwei Dokumente eines Mongodb auf Schienen/Mongoid-Klassen abgebildet. Die beiden Klassen sind Task und Subscription. Aus Leistungsgründen speichert eine Task::CurrentTask, die eine Teilmenge der Attribute einer Task enthält, aber die tatsächliche aktuelle Aufgabe, die eine Subskription entspricht, ist die mit der höchsten Task#pos für eine gegebene Task#subscription_id.Wie kann man mit Mongoid aggregieren, um Daten aus zwei verschiedenen Feldern zu aggregieren?

Das Problem

Einige Unstimmigkeiten aus dem Subscription.current_task zwischen einigen Attributen erschien und das sollte Task, insbesondere die state Feld werden übereinstimmen.

Tor

Auflistung aller aktuellen Aufgaben von Subscription s, welche die letzte Aufgabe für dieses Abonnement entspricht.

Lösung richtet

Zuerst Karte/reduzieren Task die letzte für jedes Abonnement zu erhalten und diese in eine temporäre Sammlung speichern. Drittens wird mit dieser temporären Sammlung unter Subscription fortgefahren, um für jede Subskription ein Objekt zu erhalten, das sowohl die tatsächliche letzte Aufgabe als auch die aktuelle eingebettete Teilmengenkopie enthält. Drittens erstellen Sie den Bericht für Elemente, bei denen tatsächliche und kopierte Aufgaben nicht übereinstimmen.

Schwierigkeit

Während des official mongodb und mangoid documentation und anderes Beispiel in misc gelesen zu haben. blog wie MongoDB Map Re-Reduce and joins – performance tuning und MongoDB, Mongoid, MapReduce and Embedded Documents., ich bin immer noch nicht in der Lage, mit einer funktionierenden Lösung für den Weiterverarbeitungsschritt zu kommen.

Die nicht-funktionelle Lösung schrieb bisher:

# map/reduce of tasks to get the last one of each subscripton 
last_task_map = %Q{ 
    function() { 
    var key = this.subscription_id; 
    var value = { 
     task: { 
      pos: this.pos, 
      task_id: this._id, 
      state: this.state 
     }, 
     current_task: null 
    }; 
    emit(key, value); 
    } 
} 
last_task_reduce = %Q{ 
    function(key, tasks) { 
    var last_task = tasks[0]; 
    for (var i=1; i < tasks.length; i++) { 
     if(tasks[i].pos > last_task.pos) { 
     last_task = tasks[i]; 
     } 
    } 

    var value = { 
     task: {pos: last_task.pos, task_id: last_task.task_id, state: last_task.state}, 
     current_task: null 
    }; 
    return value; 
    } 
} 

# map/reduce of `current_task`s to merged with previous results 
subscription_map = %Q{ 
    function() { 
    if(!this.current_task) { 
     return; 
    } 
    var key = this._id; 
    var value = { 
     task: null, 
     current_task: { 
     pos: this.current_task.pos, 
     task_id: this.current_task.task_id, 
     state: this.current_task.state, 
     source: 'current_task', 
     } 
    }; 
    emit(key, value); 
    }; 
} 

reduce = %Q{ 
    function(key, tasks) { 
    if(tasks[0].current_task == nill) { 
     return {task: tasks[0].task, current_task: tasks[1].current_task}; 
    } 
    return {task: tasks[1].task, current_task: tasks[0].current_task}; 
    } 
} 


buffer = 'current_task_consistency' 
# temporary collection seems unremoved when serially calling the script with 
# `load` in a `rails c` prompt, so we drop it to avoid unwanted glitch merge 
Mongoid.default_client[buffer].drop 
t = Task.map_reduce(last_task_map, last_task_reduce).out(replace: buffer) 
s = Subscription.map_reduce(subscription_map, reduce).out(reduce: buffer) 
t.each{ |e| puts e } # ok: `{"_id"=>BSON::ObjectId('592dd603e138236671587b04'), "value"=>{"task"=>{"pos"=>0.0, "task_id"=>BSON::ObjectId('592dd604e138236671587b0f'), "state"=>40.0}, "current_task"=>nil}}` 
puts t.counts # ok: {"input"=>83900, "emit"=>83900, "reduce"=>36115, "output"=>28625} 
s.each{ |e| puts e } # ko: {"_id"=>BSON::ObjectId('592dd603e138236671587b04'), "value"=>{"task"=>nil, "current_task"=>{"pos"=>0.0, "task_id"=>BSON::ObjectId('592dd604e138236671587b0f'), "state"=>40.0, "source"=>"current_task"}}} 
puts s.counts # ko: {"input"=>28632, "emit"=>28624, "reduce"=>0, "output"=>28624} 

Das erwartete Ergebnis für die zweite Karte/reduziert, ist eine Zusammenführung der current_task_consistency und die subscription_map Ergebnisse, die alle innerhalb der verringern passieren sollen, wenn keine durchgeführt wird gemäß zu counts, und tatsächlich zeigt die Ausgabe von s Elementen, dass kein task Schlüssel mit dem current_task_consistency Wert zugewiesen wurde.

Fragen an die exponierte Problem im Zusammenhang

  • Was die Umsetzung Seen tut?
  • Soweit ich undestand, diese Lösung merge Funktionen liefern, die indempotent sind und den Ausgang im Einklang mit der entsprechenden match Funktion zurückkehrt bietet. Was kann ich missverstehen, wie der out Parameter funktioniert und wie die redudeuce Eingabe/Ausgabe verwaltet werden sollte?

Zusätzliche Bemerkungen

Der dritte Schritt, um den Bericht zu generieren, soll als finalize Funktion auf der zweiten Map/Reduce angewendet umgesetzt werden.Aber vielleicht ist eine dritte Map/Reduce ein besserer Weg oder nicht. Insgesamt ist die Implementierung, zumindest unter dem Gesichtspunkt der Performance, schlecht strukturiert, und Feedback ist auch in diesem Punkt willkommen.

+1

Wenn ich richtig verstehe, dann erhalten Sie zuerst Daten von 'Task', die auf' subscription_id' gruppiert ist und die übereinstimmenden Daten für den "größten" Wert von "pos" zurückgibt. Dann willst du die Abonnements durchgehen und einige Daten mit den Ergebnissen der ersten Ausgabe verbinden? Werden Daten von "Subskriptionen" für Schlüssel zurückgegeben, die nicht von der Ausgabe "Aufgaben" erzeugt werden? Weil, wo es keine neuen Schlüssel geben würde, sollten Sie wahrscheinlich Aggregat und '$ lookup' verwenden, anstatt einen zweistufigen mapReduce zu machen. –

+0

Es scheint, dass Sie das Ziel gut verstanden haben, und mir war die Möglichkeit des $ Lookup nicht bewusst. Ich werde untersuchen, dass, sobald ich mit der Lösung map/reduce/finalize fertig bin, es für mich auch eine gute Übung ist, herauszufinden, wie man so etwas in Mongo abfragt. Vielen Dank. – psychoslave

+0

siehe [Aggregationsdokumentation] (https://docs.mongodb.com/manual/aggregation/) und [$ lookup-Dokumentation] (https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/#) pipe._S_lookup) – psychoslave

Antwort

0

Ein erstes Problem der vorgeschlagenen Lösung war einfach ein Ruby/Js-Syntax-Mix, nill anstelle von null. Leider scheiterte das Skript still, zumindest in der Pry-Konsole, wo ich load current_task_consistency.rb lief.

Hier ist eine funktionierende Lösung mit zwei map/reduce und einer Abfrage der resultierenden temporären Sammlung.

# map/reduce of tasks to get the last one of each subscripton 
last_task_map = %Q{ 
    function() { 
    var key = this.subscription_id; 
    var value = { 
     task: { 
      pos: this.pos, 
      task_id: this._id, 
      state: this.state 
     }, 
     current_task: null 
    }; 
    emit(key, value); 
    } 
} 
last_task_reduce = %Q{ 
    function(key, tasks) { 
    var last_task = tasks[0]; 
    for (var i=1; i < tasks.length; i++) { 
     if(tasks[i].pos > last_task.pos) { 
     last_task = tasks[i]; 
     } 
    } 

    var value = { 
     task: {pos: last_task.pos, task_id: last_task.task_id, state: last_task.state}, 
     current_task: null 
    }; 
    return value; 
    } 
} 

# map/reduce of `current_task`s merged side by side with the corresponding 
# subscription last task 
subscription_map = %Q{ 
    function() { 
    if(!this.current_task) { 
     return; 
    } 
    var key = this._id; 
    var value = { 
     task: null, 
     current_task: { 
     pos: this.current_task.pos, 
     task_id: this.current_task.task_id, 
     state: this.current_task.state, 
     } 
    }; 
    emit(key, value); 
    }; 
} 

subscription_reduce = %Q{ 
    function(key, tasks) { 
    if(tasks[0].current_task == null) { 
     return {task: tasks[0].task, current_task: tasks[1].current_task}; 
    } 
    return {task: tasks[1].task, current_task: tasks[0].current_task}; 
    } 
} 

buffer = 'current_task_consistency' 
# temporary collection seems unremoved when serially calling the script with 
# `load` in a `rails c` prompt, so we drop it to avoid unwanted merge glitch 
Mongoid.default_client[buffer].drop 

Task.map_reduce(last_task_map, last_task_reduce). 
    out(replace: buffer). 
    execute 

Subscription. 
    map_reduce(subscription_map, subscription_reduce). 
    out(reduce: buffer). 
    execute 

ascertain_inconsistency = %Q{ 
    this.value.current_task == null || 
    this.value.current_task.state != this.value.task.state 
} 

inconsistencies = Mongoid.default_client['current_task_consistency']. 
    find({ "$where": ascertain_inconsistency }) 
Verwandte Themen