Recently at work I needed to synchronise a number of threads running in the ruby based automated test tool that we have written. To test for any concurrency issues in our application we wanted to have the threads running each of our test scenarios to collect and wait until they had all reached a specific point and then carry on. This ensures that for every test we can execute business process transactions concurrently.
I borrowed the solution from Java – where they implement a countdown latch. Essentially there is a count and each thread that reaches the designated area causes the count to decrement until it reaches zero and all the threads are released.
I could have gone for a cyclic barrier but that was a bit more complicated than I needed. So to understand how I implemented the countdown latch lets set up some threading. A simple thread pool which consists of worker threads. We can then add jobs to the thread pool and let it handle creating worker threads (or re-using ones that are free) to do the actual work.
Here is the thread pool including the countdown latch:
require 'thread'
require File.expand_path(File.dirname(__FILE__) + '/worker_thread')
class CountDownLatch
def initialize(count)
@count = count
@mutex = Mutex.new
@conditional = ConditionVariable.new
end
def countdown_and_await
@mutex.synchronize do
@count -= 1
while @count > 0
p "waiting in latch"
@conditional.wait(@mutex)
end
@conditional.broadcast
end
end
end
class ThreadPool
attr_accessor :maximum_threads, :timeout
attr_reader :worker_threads
def initialize(settings={})
settings = default_settings.merge(settings)
@maximum_threads, @timeout = settings[:maximum_threads], settings[:timeout]
@worker_threads = []
@mutex = Mutex.new
end
def add_job(description=nil, &block)
assign_job(description,block)
end
def add_synchronised_job(description,block)
assign_job(description,block)
end
def wait_until_finished
start_time = Time.now
while busy?
raise_ruckus if (Time.now - start_time) > @timeout
allow_time_to_process
end
end
def thread_pool_busy
busy?
end
private
def assign_job(description,block)
while true
@mutex.synchronize do
worker_thread = worker_thread_available?
# p worker_thread.inspect
return worker_thread.set_block(block, description) if worker_thread
end
allow_time_to_process
end
end
def default_settings
{
:maximum_threads => 10,
:timeout => 300
}
end
def allow_time_to_process
sleep 0.01
end
def worker_thread_available?
free_worker_thread? || create_new_worker_thread
end
def free_worker_thread?
@worker_threads.each{|worker_thread| return worker_thread unless worker_thread.busy?};false
end
def clean_up_idle_threads
@worker_threads.each{|worker_thread| worker_thread.exit}
end
def create_new_worker_thread
return nil if @worker_threads.size >= @maximum_threads
worker_thread = WorkerThread.new
@worker_threads << worker_thread
worker_thread
end
def busy?
@mutex.synchronize {@worker_threads.any? {|worker_thread| worker_thread.alive? and worker_thread.busy?}}
end
def raise_ruckus
puts "**** blocking threads: "
@worker_threads.each do |t|
puts " " + (t.description || "(WorkerThread #{t.__id__})") if t.alive? && t.busy?
end
raise "PANIC: thread pool wouldn't exit after #{@timeout} seconds!"
end
end
This is the worker thread that is used in the thread pool:
class WorkerThread
attr_accessor :description
def initialize
@mutex = Mutex.new
@thread = Thread.new {
while true
sleep 0.05
block = get_block
if block
block.call
reset_block
end
end
}
end
def get_block
@mutex.synchronize{@block}
end
def set_block(block, description)
@mutex.synchronize{
raise WorkerThreadBusy, "Thread already busy." if @block
@block, @description = block, description
}
end
def reset_block
@mutex.synchronize {@block = @description = nil}
end
def busy?
@mutex.synchronize {!@block.nil?}
end
def exit
@mutex.synchronize {@thread.exit}
end
def alive?
@mutex.synchronize {@thread.alive?}
end
end
class WorkerThreadBusy < Exception
end
Now we have a basic threading model lets see how we can use the latch:
require 'thread_pool'
thread_pool = ThreadPool.new(:maximum_threads => 2)
def login(item)
puts "Login #{item}"
end
def bind_latch(latch,data)
Proc.new {
latch.countdown_and_await
login(data)
}
end
login_latch = nil
login_count = 2
(0...100).each_with_index do |item, index|
data = "data_#{item}"
login_latch = CountDownLatch.new(login_count) if index%login_count==0
thread_pool.add_synchronised_job("Login #{data}",bind_latch(login_latch,data))
end
thread_pool.wait_until_finished
We have a loop 0 to 100 and each time we go around the loop we add a job to the thread pool that executes a login function. But we don’t want all of our threads to go off and login instead we want them to synchronize in pairs of 2 threads and then login. So we create a new countdown latch per 2 threads to ensure that we always have 2 threads concurrently running the login function.
If we had more points that we want to synchronise at then we just create some more latches and drop them before the function we want to execute. The login_count above is important to get right – as its the count that the latch counts down from. You always need to have at least 2 threads for the above scenario to work since one thread will wait at the mutex in the latch and another thread is required to reach the same point to decrement the counter and signal the latched threads to be released.
Also because of the way the thread pool is implemented we need to bind the latch and the function to be executed before we add the job to the thread pool otherwise the first thread will wait forever in the latch.
Latest Comments
"Very nice. Used to access
nick xidis / 20.Jul.2009 at 11:29pm
"Thanks Mark – I have put ..."
kingsley / 24.Jun.2009 at 11:10am
"Nice work! Could you publish th..."
mark yoon / 10.Jun.2009 at 05:03pm
"Totally! Ruby is a great freest..."
raymond barlow / 14.Feb.2009 at 03:08pm