2016-03-25 12 views
1

Ich möchte die Ausgabe von Nachrichten ansammeln, die von verschiedenen Prozessen in einer bestimmten Reihenfolge gesendet wurden.Akkumulieren von Nachrichtenausgaben in einer angegebenen Reihenfolge

Zum Beispiel habe ich eine Liste von pids [pid0, pid1]. Wenn ich zuerst pid0 bekomme, dann gut. Wenn ich zuerst pid1 bekomme, passiert nichts, bis ich pid0 bekomme.

Ich weiß, dass dies durch die Verwendung von Karten oder Stichwortsuchen und Bestellung gelöst werden kann, nachdem alle Nachrichten empfangen wurden. Ist dies jedoch durch Mustervergleich möglich?

Zum Beispiel so etwas wie dieses Schreiben zu garantieren es nur versucht, eine Nachricht input_pid (die erste Nachricht in der Liste) zu erhalten: Die

defp loop([input_pid | remaining_input_pids], acc) do 
    receive do 
    {:forward, ^input_pid, output} -> loop(remaining_input_pids, [output | acc]) 
    end 
end 

Antwort

2

Let zunächst ein nicht-funktionierendes Beispiel sehen und dann einen Weg finden um es zu beheben:

caller = self 

pids = for x <- 1..100 do 
    spawn fn -> 
    :random.seed(:erlang.monotonic_time) 
    :timer.sleep(:random.uniform(1000)) 
    send(caller, x) 
    end 
end 

for _ <- pids do 
    receive do 
    x -> IO.inspect(x) 
    end 
end 

Diese 100 Prozesse laicht, wo jeder für ein unbestimmte Zeit schläft vor x zurück an den Anrufer zu senden. Später werden die Ergebnisse in der Reihenfolge empfangen, in der sie zurückgesendet werden, was zufällig sein wird.

Wenn wir die Ergebnisse in der Reihenfolge erhalten wollen, in der die Prozesse erzeugt wurden, müssen wir dem Empfangsprozess einen Hinweis geben. Wie Sie richtig gesehen haben, können wir den erzeugten Prozess 'pid verwenden, um das zu tun. Wir erhalten die pid in dem Anrufer als Rückgabewert von spawn, aber wir können auch die pid eines erzeugten Prozess erhalten, indem self innerhalb des Prozesses aufrufen und dann an den Aufrufer zurückschicken:

caller = self 

pids = for x <- 1..100 do 
    spawn fn -> 
    :random.seed(:erlang.monotonic_time) 
    :timer.sleep(:random.uniform(1000)) 
    send(caller, {self, x}) 
    end 
end 

for pid <- pids do 
    receive do 
    {^pid, x} -> IO.inspect(x) 
    end 
end 

Just bewusst sein, Das könnte den Prozess-Posteingang des Aufrufers überfluten. Im schlimmsten Fall könnte der zuerst erzeugte Prozess zuletzt empfangen werden, und dann bleiben alle anderen Nachrichten bis zum Ende erhalten.

Nun, ob Sie einen Schritt weiter gehen müssen, hängt von der Anzahl der Prozesse ab und Sie sollten die Ergebnisse für Ihren speziellen Fall benchmarken. Das Problem mit einer großen Anzahl von Prozessen ist, wie das Postfach verarbeitet wird: Jedes Mal, wenn Sie receive aufrufen, wird es durch alle Nachrichten durchlaufen, bis eine Übereinstimmung gefunden wird. Das heißt im schlimmsten Fall iterieren Sie über die Mailbox n + (n-1) + (n-2) + ... + 1 = n*(n+1)/2 mal (quadratische Zeitkomplexität O(n²)), was zu einem Engpass werden kann.

In einem solchen Fall könnte es eine bessere Option sein, die Nachrichten sofort zu empfangen, sie in einer Karte zu speichern und sie dann in der richtigen Reihenfolge auszulesen. Der Nachteil davon ist, dass Sie immer warten müssen, bis alle Nachrichten empfangen werden. Bei der ersten Annäherung werden die Nachrichten verarbeitet, sobald die nächste in der Reihenfolge ankommt. Hier ist ein Beispiel, wie man das macht. In diesem Fall habe ich 100.000 Prozesse, die bereits sehr langsam mit dem anderen Ansatz wird:

caller = self 

pids = for x <- 1..100_000 do 
    spawn fn -> 
    :random.seed(:erlang.monotonic_time) 
    :timer.sleep(:random.uniform(1000)) 
    send(caller, {self, x}) 
    end 
end 

results = Enum.reduce pids, %{}, fn(_, results) -> 
    receive do 
    {pid, x} -> Map.put(results, pid, x) 
    end 
end 

for pid <- pids do 
    IO.inspect(results[pid]) 
end 
+0

Dank für die eingehende Antwort. Ich bin mit einer anderen Version gegangen, die einen geraden Patten-Abgleich verwendet. Aber weil es so interessant war, werde ich deine Antwort als richtig markieren. –

0

@ patrick-oscity eine wirklich gründliche Antwort gegeben hat, und ich mag es sehr. Ich bin jedoch mit einer Version gegangen, die dies nur mit dem Mustervergleich mit dem Pin-Operator tun kann.

Für Kontext, ich schreibe einen Akku, der die Ergebnisse aller seiner Eingaben an einen Aktor weiterleitet, wenn alle Eingänge empfangen wurden.Ich möchte auch, dass die akkumulierte Ausgabe, die eine Liste ist, eine vordefinierte Reihenfolge der Prozesse einhält, die ursprünglich ihre Ausgaben gesendet haben.

Danach beginnt der Akku wieder auf Nachrichten zu warten.

Der Akkumulator sieht wie folgt aus:

def start_link(actuator, actuator_pid) do 
    Task.start_link(fn -> loop(actuator, actuator_pid, actuator.inputs, []) end) 
end 

defp loop(actuator, actuator_pid, [], acc) do 
    result = acc |> Enum.reverse 
    send actuator_pid, {:forward, self(), result} 
    loop(actuator, actuator_pid, actuator.inputs, []) 
end 

defp loop(actuator, actuator_pid, [input | remaining_inputs], acc) do 
    receive do 
    {:forward, ^input, output} -> 
     loop(actuator, actuator_pid, remaining_inputs, [output | acc]) 
    end 
end 

Um zu beweisen, es tut, was ich will, denn dies ist die Tests sind wie folgt:

test "When I recieve the outputs from all my inputs, then I return the accumulated result in the order of my actuator's inputs" do 

{_, pid0} = Task.start_link(fn -> test_loop end) 
{_, pid1} = Task.start_link(fn -> test_loop end) 
{_, pid2} = Task.start_link(fn -> test_loop end) 

actuator = %Cerebrum.Actuator{ 
    inputs: [pid0, pid1, pid2] 
} 

actuator_pid = self() 
{_, pid} = start_link(actuator, actuator_pid) 

send pid, {:forward, pid0, 0} 
refute_receive {:forward, pid, [0]} 

send pid, {:forward, pid1, 1} 
refute_receive {:forward, pid, [0, 1]} 

send pid, {:forward, pid2, 2} 
assert_receive {:forward, pid, [0, 1, 2]} 

send pid, {:forward, pid0, 0} 
send pid, {:forward, pid1, 1} 
send pid, {:forward, pid2, 2} 
assert_receive {:forward, pid, [0, 1, 2]} 

send pid, {:forward, pid0, 0} 
send pid, {:forward, pid2, 2} 
send pid, {:forward, pid1, 1} 
assert_receive {:forward, pid, [0, 1, 2]} 

send pid, {:forward, pid1, 1} 
send pid, {:forward, pid0, 0} 
send pid, {:forward, pid2, 2} 
assert_receive {:forward, pid, [0, 1, 2]} 

send pid, {:forward, pid1, 1} 
send pid, {:forward, pid2, 2} 
send pid, {:forward, pid0, 0} 
assert_receive {:forward, pid, [0, 1, 2]} 

send pid, {:forward, pid2, 2} 
send pid, {:forward, pid1, 1} 
send pid, {:forward, pid0, 0} 
assert_receive {:forward, pid, [0, 1, 2]} 

send pid, {:forward, pid2, 2} 
send pid, {:forward, pid1, 1} 
send pid, {:forward, pid0, 0} 
assert_receive {:forward, pid, [0, 1, 2]} 

end 

defp test_loop do 
    test_loop 
end 
+0

Ich denke, dass dies im Grunde auf meine erste Annäherung läuft, scheint aus meiner Sicht gut! –

Verwandte Themen