Ich konnte keine anständige ThreadPool-Implementierung für Ruby finden, also schrieb ich meins (teilweise basierend auf Code von hier: http://snippets.dzone.com/posts/show/3276, aber geändert, um zu warten/Signal und andere Implementierung für ThreadPool Shutdown. Allerdings nach einiger Zeit laufen (mit 100 Threads und Handhabung etwa 1300 Aufgaben), es mit Deadlock auf Linie stirbt. 25 - es gibt einen neuen Job wartet Irgendwelche Ideen, warum kann es passieren,Deadlock in ThreadPool
require 'thread'
begin
require 'fastthread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
while @mutex.synchronize {@running}
block = get_block
if block
block.call
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
end
end
end
end
def name
@thread.inspect
end
def get_block
@mutex.synchronize {@block}
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@mutex.synchronize {@block = nil}
end
def busy?
@mutex.synchronize {[email protected]?}
end
def stop
@mutex.synchronize {@running = false}
# Signal the thread not to wait for a new job
@cv.signal
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@workers = []
@mutex = Mutex.new
@cv = ConditionVariable.new
end
def size
@mutex.synchronize {@workers.size}
end
def busy?
@mutex.synchronize {@workers.any? {|w| w.busy?}}
end
def shutdown
@mutex.synchronize {@workers.each {|w| w.stop}}
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
while true
@mutex.synchronize do
worker = get_worker
if worker
return worker.set_block(block)
else
# Wait for a free worker
@cv.wait(@mutex)
end
end
end
end
# Used by workers to report ready status
def signal
@cv.signal
end
private
def get_worker
free_worker || create_worker
end
def free_worker
@workers.each {|w| return w unless w.busy?}; nil
end
def create_worker
return nil if @workers.size >= @max_size
worker = Worker.new(self)
@workers << worker
worker
end
end
1. Sollte der Zugriff auf @workers nicht synchronisiert werden? 2. Warum muss im Worker-Thread immer noch gesperrt und entsperrt werden? – Roman
Der Zugriff auf den Worker erfolgt immer vom selben Thread ... also wird keine Synchronisation benötigt. Was die Sperre im Worker-Thread betrifft, müssen Sie den Thread sicher aufwecken. – PierreBdR
Es gibt immer noch ein Problem damit - es besteht die Chance für einen Deadlock - wenn sich der Arbeitsthread selbst zur Warteschlange hinzufügt, kann der ThreadPool ihn aus der Warteschlange nehmen und eine Aufgabe zuweisen. In diesem Fall wird ein Signal gesendet. Wenn der Worker-Thread jedoch nicht auf einen CV wartet, wird das Signal verloren gehen. – Roman