2008-09-17 23 views
13

Ich konnte keine anständige ThreadPool-Implementierung für Ruby finden, also schrieb ich meins (teilweise basierend auf Code von hier: http://snippets.dzone.com/posts/show/3276, aber geändert, um zu warten/Signal und andere Implementierung für ThreadPool Shutdown. Allerdings nach einiger Zeit laufen (mit 100 Threads und Handhabung etwa 1300 Aufgaben), es mit Deadlock auf Linie stirbt. 25 - es gibt einen neuen Job wartet Irgendwelche Ideen, warum kann es passieren,Deadlock in ThreadPool

require 'thread' 
begin 
    require 'fastthread' 
rescue LoadError 
    $stderr.puts "Using the ruby-core thread implementation" 
end 

class ThreadPool 
    class Worker 
    def initialize(callback) 
     @mutex = Mutex.new 
     @cv = ConditionVariable.new 
     @callback = callback 
     @mutex.synchronize {@running = true} 
     @thread = Thread.new do 
     while @mutex.synchronize {@running} 
      block = get_block 
      if block 
      block.call 
      reset_block 
      # Signal the ThreadPool that this worker is ready for another job 
      @callback.signal 
      else 
      # Wait for a new job 
      @mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25? 
      end 
     end 
     end 
    end 

    def name 
     @thread.inspect 
    end 

    def get_block 
     @mutex.synchronize {@block} 
    end 

    def set_block(block) 
     @mutex.synchronize do 
     raise RuntimeError, "Thread already busy." if @block 
     @block = block 
     # Signal the thread in this class, that there's a job to be done 
     @cv.signal 
     end 
    end 

    def reset_block 
     @mutex.synchronize {@block = nil} 
    end 

    def busy? 
     @mutex.synchronize {[email protected]?} 
    end 

    def stop 
     @mutex.synchronize {@running = false} 
     # Signal the thread not to wait for a new job 
     @cv.signal 
     @thread.join 
    end 
    end 

    attr_accessor :max_size 

    def initialize(max_size = 10) 
    @max_size = max_size 
    @workers = [] 
    @mutex = Mutex.new 
    @cv = ConditionVariable.new 
    end 

    def size 
    @mutex.synchronize {@workers.size} 
    end 

    def busy? 
    @mutex.synchronize {@workers.any? {|w| w.busy?}} 
    end 

    def shutdown 
    @mutex.synchronize {@workers.each {|w| w.stop}} 
    end 
    alias :join :shutdown 

    def process(block=nil,&blk) 
    block = blk if block_given? 
    while true 
     @mutex.synchronize do 
     worker = get_worker 
     if worker 
      return worker.set_block(block) 
     else 
      # Wait for a free worker 
      @cv.wait(@mutex) 
     end 
     end 
    end 
    end 

    # Used by workers to report ready status 
    def signal 
    @cv.signal 
    end 

    private 
    def get_worker 
    free_worker || create_worker 
    end 

    def free_worker 
    @workers.each {|w| return w unless w.busy?}; nil 
    end 

    def create_worker 
    return nil if @workers.size >= @max_size 
    worker = Worker.new(self) 
    @workers << worker 
    worker 
    end 
end 

Antwort

10

Ok, so ist das Hauptproblem bei der Implementierung: Wie kann man sicherstellen, dass kein Signal verloren geht und Deadlocks vermieden werden?

Meiner Erfahrung nach ist das mit Zustandsvariablen und Mutex wirklich schwer zu erreichen, aber einfach mit Semaphoren. Es kommt vor, dass Ruby ein Objekt namens Queue (oder SizedQueue) implementiert, das das Problem lösen soll.Hier ist meine vorgeschlagene Umsetzung:

require 'thread' 
begin 
    require 'fasttread' 
rescue LoadError 
    $stderr.puts "Using the ruby-core thread implementation" 
end 

class ThreadPool 
    class Worker 
    def initialize(thread_queue) 
     @mutex = Mutex.new 
     @cv = ConditionVariable.new 
     @queue = thread_queue 
     @running = true 
     @thread = Thread.new do 
     @mutex.synchronize do 
      while @running 
      @cv.wait(@mutex) 
      block = get_block 
      if block 
       @mutex.unlock 
       block.call 
       @mutex.lock 
       reset_block 
      end 
      @queue << self 
      end 
     end 
     end 
    end 

    def name 
     @thread.inspect 
    end 

    def get_block 
     @block 
    end 

    def set_block(block) 
     @mutex.synchronize do 
     raise RuntimeError, "Thread already busy." if @block 
     @block = block 
     # Signal the thread in this class, that there's a job to be done 
     @cv.signal 
     end 
    end 

    def reset_block 
     @block = nil 
    end 

    def busy? 
     @mutex.synchronize { [email protected]? } 
    end 

    def stop 
     @mutex.synchronize do 
     @running = false 
     @cv.signal 
     end 
     @thread.join 
    end 
    end 

    attr_accessor :max_size 

    def initialize(max_size = 10) 
    @max_size = max_size 
    @queue = Queue.new 
    @workers = [] 
    end 

    def size 
    @workers.size 
    end 

    def busy? 
    @queue.size < @workers.size 
    end 

    def shutdown 
    @workers.each { |w| w.stop } 
    @workers = [] 
    end 

    alias :join :shutdown 

    def process(block=nil,&blk) 
    block = blk if block_given? 
    worker = get_worker 
    worker.set_block(block) 
    end 

    private 

    def get_worker 
    if [email protected]? or @workers.size == @max_size 
     return @queue.pop 
    else 
     worker = Worker.new(@queue) 
     @workers << worker 
     worker 
    end 
    end 

end 

Und hier ist ein einfacher Test Code:

tp = ThreadPool.new 500 
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } } 
tp.shutdown 
+0

1. Sollte der Zugriff auf @workers nicht synchronisiert werden? 2. Warum muss im Worker-Thread immer noch gesperrt und entsperrt werden? – Roman

+0

Der Zugriff auf den Worker erfolgt immer vom selben Thread ... also wird keine Synchronisation benötigt. Was die Sperre im Worker-Thread betrifft, müssen Sie den Thread sicher aufwecken. – PierreBdR

+0

Es gibt immer noch ein Problem damit - es besteht die Chance für einen Deadlock - wenn sich der Arbeitsthread selbst zur Warteschlange hinzufügt, kann der ThreadPool ihn aus der Warteschlange nehmen und eine Aufgabe zuweisen. In diesem Fall wird ein Signal gesendet. Wenn der Worker-Thread jedoch nicht auf einen CV wartet, wird das Signal verloren gehen. – Roman

1

ich hier etwas voreingenommen,? aber ich würde vorschlagen, dies in einer Prozesssprache zu modellieren und es zu überprüfen.Frei verfügbare Werkzeuge sind zum Beispiel das MCRL2-Toolset (unter Verwendungeiner ACP-basierten Sprache), die Mobility Workbench (Pi-Kalkül) und Spin (PROMELA).

Andernfalls würde ich empfehlen, jedes Bit des Codes zu entfernen, das für das Problem nicht wesentlich ist, und einen minimalen Fall zu finden, in dem der Deadlock auftritt. Ich bezweifle, dass es die 100 Threads und 1300 Tasks essentiell sind, um einen Deadlock zu bekommen. Bei einem kleineren Gehäuse können Sie wahrscheinlich einige Debug-Drucke hinzufügen, die genügend Informationen liefern, um das Problem zu lösen.

+0

Der fragliche Code konnte nur 1300 Aufgaben von 180000 verarbeiten, konnte nicht mit einem kleineren Satz reproduzieren, leider ... – Roman

1

Ok, das Problem scheint in Ihrer ThreadPool # -Signal-Methode zu sein. Was kann passieren, ist:

1 - Alle Ihre Arbeiter sind damit beschäftigt, und Sie versuchen, einen neuen Job zu verarbeiten

2 - Linie bekommt 90 eine Null Arbeiter

3 - ein Arbeiter es bekommen befreit und Signale, aber das Signal ist verloren, da der ThreadPool nicht darauf wartet

4 - Sie fallen auf Leitung 95, warten, obwohl es einen freien Arbeiter gibt.

Der Fehler hier ist, dass Sie einen freien Arbeiter signalisieren können, selbst wenn niemand zuhört. Diese ThreadPool # -Signalmethode sollte lauten:

Und das Problem ist das gleiche in dem Worker-Objekt. Was kann passieren, ist:

1 - Der Arbeiter gerade eine Job

2 - Es wird überprüft (Linie 17), wenn es einen Job warten: es nicht

3 - Der Thread-Pool send ein neuer Job und Signale, die es ... aber das Signal

verloren

4 - der Arbeiter wartet auf ein Signal, auch wenn es

als besetzt markiert ist, sollten Sie Ihre Methode initialize wie gesagt:

def initialize(callback) 
    @mutex = Mutex.new 
    @cv = ConditionVariable.new 
    @callback = callback 
    @mutex.synchronize {@running = true} 
    @thread = Thread.new do 
    @mutex.synchronize do 
     while @running 
     block = get_block 
     if block 
      @mutex.unlock 
      block.call 
      @mutex.lock 
      reset_block 
      # Signal the ThreadPool that this worker is ready for another job 
      @callback.signal 
     else 
      # Wait for a new job 
      @cv.wait(@mutex) 
     end 
     end 
    end 
    end 
end 

Als nächstes sollten die Worker # get_block und Worker # reset_block Methoden nicht mehr synchronisiert werden. Auf diese Weise können Sie einem Worker zwischen dem Test für einen Block und dem Warten auf ein Signal keinen Block zuweisen.

+0

Ich denke, dass Sie Recht haben! Ich werde das sofort testen, danke! – Roman

+0

Hmm .. jetzt gibt es einen Deadlock, wenn ich darauf warte, dass Threads abgeschlossen werden (z. B. Aufruf von Join für den ThreadPool). Ich versuche herauszufinden, warum. – Roman

8

Sie können versuchen, das work_queue Juwel, entworfen Arbeit zwischen einem Hersteller zu koordinieren und zu einem Pool von Arbeitsthreads.