October19

User defined delimiters in Ruby

I came across some ways of specifying different methods for everyday things and how to specify your own delimiters in some of these:

Different ways to enclose a multi-line string

data<<-EOF
 Some multiline
 text here
EOF

data = " 
 Some multiline
 text here
" 

%q{
 Some multiline
 text here
}

# With %q you can specify any operator to act as a delimiter
%q| some text |
%q+ some text +

There are a few variations on the %q theme:

  • %q is equivalent to a string with single quotes
  • %Q is equivalent to a string with double quotes
  • %x is equivalent to using the backticks

Another similar function is:

  • %w{apple computers are sexy} – which produces a string array of the words provided e.g. [“apple”,”computers”,”are”,”sexy”] – you can use your own delimiters in the same way as %q above.

Posted by kingsleyh | Filed in Ruby |

October19

Creating a countdown latch in Ruby

Recently at work I needed to synchronise a number of threads running in the ruby based automated test tool that we have written. To test for any concurrency issues in our application we wanted to have the threads running each of our test scenarios to collect and wait until they had all reached a specific point and then carry on. This ensures that for every test we can execute business process transactions concurrently.

I borrowed the solution from Java – where they implement a countdown latch. Essentially there is a count and each thread that reaches the designated area causes the count to decrement until it reaches zero and all the threads are released.

I could have gone for a cyclic barrier but that was a bit more complicated than I needed. So to understand how I implemented the countdown latch lets set up some threading. A simple thread pool which consists of worker threads. We can then add jobs to the thread pool and let it handle creating worker threads (or re-using ones that are free) to do the actual work.

Here is the thread pool including the countdown latch:

require 'thread'
require File.expand_path(File.dirname(__FILE__) + '/worker_thread')

class CountDownLatch

   def initialize(count)
     @count = count
     @mutex = Mutex.new
     @conditional = ConditionVariable.new
   end

   def countdown_and_await
      @mutex.synchronize do
       @count -= 1
       while @count > 0
         p "waiting in latch" 
         @conditional.wait(@mutex)        
       end
       @conditional.broadcast
     end
   end

end

class ThreadPool

  attr_accessor :maximum_threads, :timeout
  attr_reader :worker_threads

  def initialize(settings={})
    settings = default_settings.merge(settings)
    @maximum_threads, @timeout = settings[:maximum_threads], settings[:timeout]
    @worker_threads = []
    @mutex = Mutex.new
  end

  def add_job(description=nil, &block)
   assign_job(description,block)
  end

  def add_synchronised_job(description,block)
     assign_job(description,block)
  end

  def wait_until_finished
    start_time = Time.now
    while busy?
      raise_ruckus if (Time.now - start_time) > @timeout
      allow_time_to_process
    end
  end

  def thread_pool_busy
    busy?
  end

  private

  def assign_job(description,block)
     while true
        @mutex.synchronize do
          worker_thread = worker_thread_available?
         # p worker_thread.inspect
          return worker_thread.set_block(block, description) if worker_thread
        end
        allow_time_to_process
      end
  end

  def default_settings
    {
      :maximum_threads => 10,
      :timeout => 300
    }
  end

  def allow_time_to_process
    sleep 0.01
  end

  def worker_thread_available?
    free_worker_thread? || create_new_worker_thread
  end

  def free_worker_thread?
     @worker_threads.each{|worker_thread| return worker_thread unless worker_thread.busy?};false
  end

  def clean_up_idle_threads
    @worker_threads.each{|worker_thread| worker_thread.exit}
  end

  def create_new_worker_thread
    return nil if @worker_threads.size >= @maximum_threads
    worker_thread = WorkerThread.new
    @worker_threads << worker_thread
    worker_thread
  end

  def busy?
    @mutex.synchronize {@worker_threads.any? {|worker_thread| worker_thread.alive? and worker_thread.busy?}}
  end

  def raise_ruckus
    puts "**** blocking threads: " 
    @worker_threads.each do |t|
      puts "    " + (t.description || "(WorkerThread #{t.__id__})") if t.alive? && t.busy?
    end
    raise "PANIC: thread pool wouldn't exit after #{@timeout} seconds!" 
  end
end

This is the worker thread that is used in the thread pool:

class WorkerThread

  attr_accessor :description

  def initialize
    @mutex = Mutex.new
    @thread = Thread.new {
      while true
        sleep 0.05
        block = get_block
        if block
           block.call
          reset_block
        end
      end
    }
  end

  def get_block
    @mutex.synchronize{@block}
  end

  def set_block(block, description)
    @mutex.synchronize{
      raise WorkerThreadBusy, "Thread already busy." if @block
      @block, @description = block, description
    }
  end

  def reset_block
    @mutex.synchronize {@block = @description = nil}
  end

  def busy?
    @mutex.synchronize {!@block.nil?}
  end

  def exit
    @mutex.synchronize {@thread.exit}
  end

  def alive?
    @mutex.synchronize {@thread.alive?}
  end

end

class WorkerThreadBusy < Exception
end

Now we have a basic threading model lets see how we can use the latch:

require 'thread_pool'

thread_pool = ThreadPool.new(:maximum_threads => 2)

def login(item)
  puts "Login #{item}" 
end

def bind_latch(latch,data)
  Proc.new { 
      latch.countdown_and_await
      login(data)
     }

end

login_latch = nil
login_count = 2

(0...100).each_with_index do |item, index|
  data = "data_#{item}" 

  login_latch = CountDownLatch.new(login_count) if  index%login_count==0
  thread_pool.add_synchronised_job("Login #{data}",bind_latch(login_latch,data))

end

thread_pool.wait_until_finished 

We have a loop 0 to 100 and each time we go around the loop we add a job to the thread pool that executes a login function. But we don’t want all of our threads to go off and login instead we want them to synchronize in pairs of 2 threads and then login. So we create a new countdown latch per 2 threads to ensure that we always have 2 threads concurrently running the login function.

If we had more points that we want to synchronise at then we just create some more latches and drop them before the function we want to execute. The login_count above is important to get right – as its the count that the latch counts down from. You always need to have at least 2 threads for the above scenario to work since one thread will wait at the mutex in the latch and another thread is required to reach the same point to decrement the counter and signal the latched threads to be released.

Also because of the way the thread pool is implemented we need to bind the latch and the function to be executed before we add the job to the thread pool otherwise the first thread will wait forever in the latch.

Posted by kingsleyh | Filed in Ruby |

October14

Mysql on Snow Leopard

I recently moved to using Snow Leopard on my mac pro and went thru a bit of pain getting rails and mysql to work. It turns out that ruby runs by default in 64 bit mode on snow leopard and so all my previous gems were compiled against a 32 bit ruby. So I had to re-install a list of the gems – luckily I came across this handy ruby script to show me what needed doing:

#!/usr/bin/env ruby 
puts "looking for the gems to upgrade..." 
gem_info = Struct.new(:name, :version)
to_reinstall = []
Dir.glob('/Library/Ruby/Gems/**/*.bundle').map do |path| 
  path =~ /.*1.8\/gems\/(.*)-(.*?)\/.*/
  name, version = $1, $2
  bundle_info = `file path`
  to_reinstall << gem_info.new(name, version) unless bundle_info =~ /bundle x86_64/
end

gemnames = to_reinstall.map{|ginfo| ginfo.name}.uniq.delete_if{|name| name =~ /mysql|passenger/}
puts "***" 
puts "Please reinstall:" 
gemnames.each do |name|
  gems = to_reinstall.select{|ginfo| ginfo.name == name}
  puts "#{name} versions: #{gems.map{|ginfo| ginfo.version}.join(', ')}" 
end 

puts "or uninstall all gems that need to be reinstalled:\n" 
puts "$ sudo gem uninstall #{gemnames.join(' ')}" 
puts " " 
puts "and reinstall them:\n" 
puts "$ sudo gem install #{gemnames.join(' ')}" 

But before I even got to running that script when I was trying to do a rake db:create:all I was getting this annoying error:

Couldn't create database for {"username"=>"my_username", "adapter"=>"mysql", "database"=>"my_database", "password"=>"my_password"}, charset: utf8, collation: utf8_general_ci (if you set the charset manually, make sure you have a matching collation)

So a bit more digging showed me that to avoid weird errors with mysql I should move to using the 64 bit version of mysql. So I uninstalled the 32 bit version by doing this:

sudo rm /usr/local/mysql
sudo rm -rf /usr/local/mysql*
sudo rm -rf /Library/StartupItems/MySQLCOM
sudo rm -rf /Library/PreferencePanes/My*
edit /etc/hostconfig and remove the line MYSQLCOM=-YES-
rm -rf ~/Library/PreferencePanes/My*
sudo rm -rf /Library/Receipts/mysql*
sudo rm -rf /Library/Receipts/MySQL*

and then installed the 64 bit version and uninstalled the mysql gem and then installed it again:

sudo env ARCHFLAGS="-arch x86_64" gem install mysql -- --with-mysql-config=/usr/local/mysql/bin/mysql_config

Posted by kingsleyh | Filed in |