2015-06-15 3 views
5

Ich habe etwas harte Zeit, die Idee hinter Fibers \ coroutines und die Implementierung in Crystal zu lernen.Crystal konvertiert die Idee hinter Thread Pool zu Fibers/spawn

Ich hoffe, dass dies der richtige Ort, um dies zu fragen, ich werde total eine „nicht hier“ Antwort akzeptieren :)

Dies ist meine übliche Art und Weise in Ruby Multi-Threading Handhabung:

threads = [] 
max_threads = 10 

loop do 
    begin 
    threads << Thread.new do 
     helper_method(1,2,3,4) 
    end 
    rescue Exception => e 
    puts "Error Starting thread" 
    end 

    begin 
    threads = threads.select { |t| t.alive? ? true : (t.join; false) } 
    while threads.size >= max_threads 
     puts 'Got Maximum threads' 
     sleep 1 
     threads = threads.select { |t| t.alive? ? true : (t.join; false) } 
    end 
    rescue Exception => e 
    puts e 
    end 
end 

Auf diese Weise öffne ich einen neuen Thread, in der Regel von einer eingehenden Verbindung oder eine andere Sache, den Thread zu einem Thread-Array hinzufügen, und dann überprüfen, dass ich nicht mehr Threads habe, was ich wollte.

Was wäre eine gute Möglichkeit, etwas Ähnliches in Crystal mit spawn \ channels \ fibers usw. zu implementieren?

Antwort

12

Etwas wie folgt aus:

require "socket" 

ch = Channel(TCPSocket).new 

10.times do 
    spawn do 
    loop do 
     socket = ch.receive 
     socket.puts "Hi!" 
     socket.close 
    end 
    end 
end 

server = TCPServer.new(1234) 
loop do 
    socket = server.accept 
    ch.send socket 
end 

Dieser Code wird Laich vor 10 Fasern, um die Fragen und Wünsche an. Der Kanal ist ungepuffert, so dass die Verbindungen nicht in Warteschlangen stehen, wenn sie von keiner Faser bedient werden können.

+1

Genau das, was ich gesucht habe, danke! – Ba7a7chy

5

Sie können die Funktionsweise von Threads nicht replizieren. spawn gibt kein Coroutine-Objekt zurück, und es gibt keinen Weg zu join Korutinen.

Dennoch können wir einen Kanal für die Kommunikation zwischen den Coroutines und dem Pool-Manager öffnen. Dieser Manager kann innerhalb seiner eigenen Coroutine oder als Haupt-Coroutine ausgeführt werden, wodurch verhindert wird, dass der Prozess beendet wird. Hier

ist ein funktionierendes Beispiel mit einer worker(&block) Methode, die eine Koroutine spawnen, und öffnen Sie einen Kanal seinen Status zurückzukehren (es fehlgeschlagen ist oder es beendet), und ein pool(&block) Verfahren, die einen Pool dieser Arbeitnehmer halten und lesen Von den Ergebniskanälen können Sie den Status der Coroutines kennenlernen und neue generieren.

def worker(&block) 
    result = UnbufferedChannel(Exception?).new 

    ::spawn do 
    begin 
     block.call 
    rescue ex 
     result.send(ex) 
    else 
     result.send(nil) 
    end 
    end 

    result 
end 

def pool(size, &block) 
    counter = 0 
    results = [] of UnbufferedChannel(Exception?) 

    loop do 
    while counter < size 
     counter += 1 
     puts "spawning worker" 
     results << worker(&block) 
    end 

    result = Channel.select(results) 
    counter -= 1 
    results.delete(result) 

    if ex = result.receive 
     puts "ERROR: #{ex.message}" 
    else 
     puts "worker terminated" 
    end 
    end 
end 

pool(5) do 
    loop { helper_method(1, 2, 3, 4) } 
end 
Verwandte Themen