2015-06-26 11 views
9

Ich habe derzeit ein Problem, die Bearbeitung von Listen in Elixier parallel. Der Grund für die Parallelität ist, dass ich die Ergebnisse in einer API speichere und wenn ich sie alle auf einmal in die Luft sprenge, wird es DDOS'd und wird heruntergefahren.Elixir lang Bearbeitungslisten parallel

Der folgende Code soll das Ergebnis einer SQL Abfrage aufteilen und jede Zeile in einer separaten Aufgabe verarbeiten und wenn alle Aufgaben abgeschlossen sind, sollte es beendet werden.

Was passiert ist, dass die erste Aufgabe nach dem Absetzen einer Nachricht das Skript beendet. Ich habe Antworten gesehen, wo sie den Empfang in eine Funktion setzen und diese Funktion ruft sich immer wieder auf, aber ich habe das Gefühl, dass es noch einen besseren Weg geben muss, damit umzugehen.

results = Enum.chunk(results, 500) 

# Give this process a name 
Process.register(self(), :core) 

# Loop over the chunks making a process for each 
Enum.each results, fn(result) -> 
    task = Task.async(fn -> Person.App.process(result, "Test", "1") end) 
end 

# And listen for messages 
receive do 
    {:hello, msg} -> IO.inspect msg 
    {:world, _} -> "won't match" 
end 

Antwort

12

Wenn Task.async mit es am zweckmäßigsten, das Ergebnis mit Task.await zu erhalten:

results 
|> Enum.map(fn result -> Task.async(fn -> Person.App.process(result, "Test", "1") end) end) 
|> Enum.map(&Task.await/1) 
|> Enum.each(&IO.inspect/1) 

In der Tat, wenn Sie für das Ergebnis der async nicht await tun es immer noch an den Prozess gesendet werden, dass genannt async und in seiner Mailbox gespeichert, wodurch möglicherweise ein Speicherleck! Wenn Sie eine Task erstellen wollen, in der Sie sich nicht um das Ergebnis kümmern, verwenden Sie stattdessen Task.start oder Task.start_link.

+1

Wenn ich Task.async in Task.start_link ändere, wird der Hauptprozess beendet, bevor die Zeilen die Verarbeitung beenden. Wie behalte ich das Skript am Leben, bis alle Aufgaben beendet sind? –

+0

Ich denke, Sie sollten die Methode in der Antwort verwenden - verwenden Sie 'Task.async' und dann' Task.await' für alle Aufgaben (das ist, was 'Enum.map (& Task.Await/1)' tut). Das Ergebnis dieser Zeile werden alle Ergebnisse der Aufgaben in einer Liste gesammelt - Sie können sie dann ausdrucken ("Enum.each (& IO.inspect/1)" in der Antwort) oder etwas anderes mit ihnen tun. –

+0

Ok cool das hat für mich funktioniert! Ich musste die Aufgabe ändern, um ein Timeout zu nehmen, da manchmal die Aufgaben 10-20 Sekunden dauern Enum.map (& Task.await (& 1, 20000)) –