Development Tip

Pure-Ruby 동시 해시

yourdevel 2020. 12. 5. 10:47
반응형

Pure-Ruby 동시 해시


여러 스레드에서 수정할 수 있지만 잠금 수가 가장 적은 해시를 구현하는 가장 좋은 방법은 무엇입니까? 이 질문의 목적을 위해 해시가 읽기가 많을 것이라고 가정 할 수 있습니다. JRuby와 같이 진정한 동시 방식으로 작동하는 구현을 포함하여 모든 Ruby 구현에서 스레드로부터 안전해야하며, 순수 Ruby로 작성되어야합니다 (C 또는 Java 허용 안 됨).

항상 잠기는 순진한 솔루션을 제출해도되지만 이것이 최선의 솔루션은 아닐 것입니다. 우아함이 중요하지만 잠금 가능성이 적을수록 코드가 작아집니다.


이제 'threadsafe'의 실제 의미를 지정 했으므로 다음과 같은 두 가지 잠재적 구현이 있습니다. 다음 코드는 MRI 및 JRuby에서 영원히 실행됩니다. 잠금없는 구현은 마스터가 유동적 일 경우 각 스레드가 자체 해시 뷰를 사용하는 최종 일관성 모델을 따릅니다. 스레드에 모든 정보를 저장하는 것이 메모리 누수를 방지하는 데 약간의 트릭이 필요하지만, 처리되고 테스트됩니다.이 코드를 실행하면 프로세스 크기가 커지지 않습니다. 두 구현 모두 '완전'하려면 더 많은 작업이 필요합니다. 즉, 삭제, 업데이트 등은 약간의 생각이 필요하지만 아래 두 개념 중 하나가 요구 사항을 충족합니다.

이 글을 읽는 사람들은 전체 문제가 JRuby에만 국한된다는 것을 깨닫는 것이 매우 중요합니다. MRI에서는 내장 된 해시로 충분합니다.

module Cash
  def Cash.new(*args, &block)
    env = ENV['CASH_IMPL']
    impl = env ? Cash.const_get(env) : LocklessImpl
    klass = defined?(JRUBY_VERSION) ? impl : ::Hash
    klass.new(*args)
  end

  class LocklessImpl
    def initialize
      @hash = {}
    end

    def thread_hash
      thread = Thread.current
      thread[:cash] ||= {}
      hash = thread[:cash][thread_key]
      if hash
        hash
      else
        hash = thread[:cash][thread_key] = {}
        ObjectSpace.define_finalizer(self){ thread[:cash].delete(thread_key) }
        hash
      end
    end

    def thread_key
      [Thread.current.object_id, object_id]
    end

    def []=(key, val)
      time = Time.now.to_f
      tuple = [time, val]
      @hash[key] = tuple
      thread_hash[key] = tuple
      val
    end

    def [](key)
    # check the master value
    #
      val = @hash[key]

    # someone else is either writing the key or it has never been set.  we
    # need to invalidate our own copy in either case
    #
      if val.nil?
        thread_val = thread_hash.delete(key)
        return(thread_val ? thread_val.last : nil)
      end

    # check our own thread local value
    #
      thread_val = thread_hash[key]

    # in this case someone else has written a value that we have never seen so
    # simply return it
    #
      if thread_val.nil?
        return(val.last)
      end

    # in this case there is a master *and* a thread local value, if the master
    # is newer juke our own cached copy
    #
      if val.first > thread_val.first
        thread_hash.delete(key)
        return val.last
      else
        return thread_val.last
      end
    end
  end

  class LockingImpl < ::Hash
    require 'sync'

    def initialize(*args, &block)
      super
    ensure
      extend Sync_m
    end

    def sync(*args, &block)
      sync_synchronize(*args, &block)
    end

    def [](key)
      sync(:SH){ super }
    end

    def []=(key, val)
      sync(:EX){ super }
    end
  end
end



if $0 == __FILE__
  iteration = 0

  loop do
    n = 42
    hash = Cash.new

    threads =
      Array.new(10) {
        Thread.new do
          Thread.current.abort_on_exception = true
          n.times do |key|
            hash[key] = key
            raise "#{ key }=nil" if hash[key].nil?
          end
        end
      }

    threads.map{|thread| thread.join}

    puts "THREADSAFE: #{ iteration += 1 }"
  end
end

내 Stack Overflow 신용을 높이기 위해 기본 / 순진한 솔루션 게시 :

require 'thread'

class ConcurrentHash < Hash
  def initialize
    super
    @mutex = Mutex.new
  end

  def [](*args)
    @mutex.synchronize { super }
  end

  def []=(*args)
    @mutex.synchronize { super }
  end
end

Yehuda, ivar 설정이 원자 적이라고 언급 한 것 같아요? 그러면 간단한 복사 및 교체는 어떻습니까?

require 'thread'

class ConcurrentHash
  def initialize
    @reader, @writer = {}, {}
    @lock = Mutex.new
  end

  def [](key)
    @reader[key]
  end

  def []=(key, value)
    @lock.synchronize {
      @writer[key] = value
      @reader, @writer = @writer, @reader
      @writer[key] = value
    }
  end
end

이것은 동시 판독기를 허용하지만 다른 모든 유형의 액세스 (반복 읽기 포함)에 대해 사물을 잠그는 Hash 주변의 래퍼 클래스입니다.

class LockedHash
  def initialize
    @hash = Hash.new
    @lock = ThreadAwareLock.new()
    @reader_count = 0
  end

  def [](key)
    @lock.lock_read
    ret = @hash[key]
    @lock.unlock_read
    ret
  end

  def []=(key, value)
    @lock.lock_write
    @hash[key] = value
    @lock.unlock_write
  end

  def method_missing(method_sym, *arguments, &block)
    if @hash.respond_to? method_sym
      @lock.lock_block
      val = lambda{@hash.send(method_sym,*arguments, &block)}.call
      @lock.unlock_block
      return val
    end
    super
  end
end

사용하는 잠금 코드는 다음과 같습니다.

class RWLock
  def initialize
    @outer = Mutex.new
    @inner = Mutex.new
    @reader_count = 0
  end
  def lock_read
    @outer.synchronize{@inner.synchronize{@reader_count += 1}}
  end
  def unlock_read
    @inner.synchronize{@reader_count -= 1}
  end
  def lock_write
    @outer.lock
    while @reader_count > 0 ;end
  end
  def unlock_write
    @outer.unlock
  end
end

class ThreadAwareLock < RWLock
  def initialize
    @owner = nil
    super
  end
  def lock_block
    lock_write
    @owner = Thread.current.object_id
  end
  def unlock_block
    @owner = nil
    unlock_write
  end
  def lock_read
    super unless my_block?
  end
  def unlock_read
    super unless my_block?
  end
  def lock_write
    super unless my_block?
  end
  def unlock_write
    super unless my_block?
  end
  def my_block?
    @owner == Thread.current.object_id
  end
end

스레드 인식 잠금은 클래스를 한 번 잠근 다음 일반적으로 잠글 수있는 메서드를 호출하고 잠그지 않도록하는 것입니다. 일부 메서드 내부의 블록으로 양보하고 해당 블록이 개체에 대한 잠금 메서드를 호출 할 수 있고 교착 상태 또는 이중 잠금 오류를 원하지 않기 때문에이 기능이 필요합니다. 대신 계수 잠금을 사용할 수 있습니다.

다음은 버킷 수준 읽기-쓰기 잠금을 구현하려는 시도입니다.

class SafeBucket
  def initialize
    @lock = RWLock.new()
    @value_pairs = []
  end

  def get(key)
    @lock.lock_read
    pair = @value_pairs.select{|p| p[0] == key}
    unless pair && pair.size > 0
      @lock.unlock_read
      return nil
    end
    ret = pair[0][1]
    @lock.unlock_read
    ret
  end

  def set(key, value)
    @lock.lock_write
    pair = @value_pairs.select{|p| p[0] == key}
    if pair && pair.size > 0
      pair[0][1] = value
      @lock.unlock_write
      return
    end
    @value_pairs.push [key, value]
    @lock.unlock_write
    value
  end

  def each
    @value_pairs.each{|p| yield p[0],p[1]}
  end

end

class MikeConcurrentHash
  def initialize
    @buckets = []
    100.times {@buckets.push SafeBucket.new}
  end

  def [](key)
    bucket(key).get(key)
  end

  def []=(key, value)
    bucket(key).set(key, value)
  end

  def each
    @buckets.each{|b| b.each{|key, value| yield key, value}}
  end

  def bucket(key)
    @buckets[key.hash % 100]
  end
end

너무 느리기 때문에 각 방법이 안전하지 않고 (반복 중에 다른 스레드에 의한 변형 허용) 대부분의 해시 메서드를 지원하지 않기 때문에 작업을 중단했습니다.

다음은 동시 해시에 대한 테스트 도구입니다.

require 'thread'
class HashHarness
  Keys = [:a, :basic, :test, :harness, :for, :concurrent, :testing, :of, :hashes,
          :that, :tries, :to, :provide, :a, :framework, :for, :designing, :a, :good, :ConcurrentHash,
          :for, :all, :ruby, :implementations]

  def self.go
    h = new
    r = h.writiness_range(20, 10000, 0, 0)
    r.each{|k, v| p k + ' ' + v.map{|p| p[1]}.join(' ')}
    return
  end
  def initialize(classes = [MikeConcurrentHash, JoshConcurrentHash, JoshConcurrentHash2, PaulConcurrentHash, LockedHash, Hash])
    @classes = classes
  end
  def writiness_range(basic_threads, ops, each_threads, loops)
    result = {}
    @classes.each do |hash_class|
      res = []
      0.upto 10 do |i|
        writiness = i.to_f / 10
        res.push [writiness,test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)]
      end
      result[hash_class.name] = res
    end
    result
  end
  def test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)
    time = Time.now
    threads = []
    hash = hash_class.new
    populate_hash(hash)
    begin
    basic_threads.times do
      threads.push Thread.new{run_basic_test(hash, writiness, ops)}
    end
    each_threads.times do
      threads.push Thread.new{run_each_test(hash, writiness, loops)}
    end
    threads.each{|t| t.join}
    rescue ThreadError => e
      p [e.message, hash_class.name, basic_threads, ops, each_threads, loops, writiness].join(' ')
      return -1
    end
    p [hash_class.name, basic_threads, ops, each_threads, loops, writiness, Time.now - time].join(' ')
    return Time.now - time
  end
  def run_basic_test(hash, writiness, ops)
    ops.times do
      rand < writiness ? hash[choose_key]= rand : hash[choose_key]
    end
  end
  def run_each_test(hash, writiness, loops)
    loops.times do
      hash.each do |k, v|
        if rand < writiness
          each_write_work(hash, k, v)
        else
          each_read_work(k, v)
        end
      end
    end
  end
  def each_write_work(hash, key, value)
    hash[key] = rand
  end
  def each_read_work(key, value)
    key.to_s + ": " + value.to_s
  end
  def choose_key
    Keys[rand(Keys.size)]
  end
  def populate_hash(hash)
    Keys.each{|key| hash[key]=rand}  
  end
end

숫자 : Jruby

Writiness      0.0   0.1   0.2   0.3   0.4   0.5   0.6   0.7   0.8   0.9   1.0
ConcurrentHash 2.098 3.179 2.971 3.083 2.731 2.941 2.564 2.480 2.369 1.862 1.881
LockedHash     1.873 1.896 2.085 2.058 2.001 2.055 1.904 1.921 1.873 1.841 1.630
Hash           0.530 0.672 0.685 0.822 0.719 0.877 0.901 0.931 0.942 0.950 1.001

그리고 MRI

Writiness      0.0    0.1    0.2    0.3    0.4    0.5    0.6    0.7    0.8    0.9    1.0
ConcurrentHash  9.214  9.913  9.064 10.112 10.240 10.574 10.566 11.027 11.323 11.837 13.036
LockedHash     19.593 17.712 16.998 17.045 16.687 16.609 16.647 15.307 14.464 13.931 14.146
Hash            0.535  0.537  0.534  0.599  0.594  0.676  0.635  0.650  0.654  0.661  0.692

MRI 수치는 꽤 놀랍습니다. MRI를 잠그는 것은 정말 짜증납니다.


이것은 햄스터 보석 의 사용 사례 일 수 있습니다.

Hamster 는 순수한 Ruby에서 HAMT (Hash Array Mapped Tries) 및 기타 영구 데이터 구조를 구현 합니다.

영구 데이터 구조는 변경할 수 없으며 해시에서 키-값 쌍을 추가하거나 대체하는 것과 같이 구조를 변경 (변경)하는 대신 변경 사항이 포함 된 새 데이터 구조를 반환합니다. 영구적 인 불변 데이터 구조의 트릭은 새로 반환 된 데이터 구조가 가능한 한 많은 선행 데이터 구조를 재사용한다는 것입니다.

햄스터를 사용하여 구현하려면 변경 가능한 해시 래퍼를 사용합니다.이 래퍼는 모든 읽기를 영구적 인 불변 해시의 현재 값 (즉, 빠르다)으로 전달하는 동시에 모든 쓰기를 뮤텍스로 보호하고 새 값으로 스와핑합니다. 쓰기 후 영구 불변 해시의.

예를 들면 :

require 'hamster'
require 'hamster/experimental/mutable_hash'    
hsh = Hamster.mutable_hash(:name => "Simon", :gender => :male)

# reading goes directly to hash
puts hsh[:name] # Simon

# writing is actually swapping to new value of underlying persistent data structure
hsh.put(:name, "Joe")
puts hsh[:name] # Joe

따라서 설명 된 것과 유사한 유형의 문제에 이것을 사용하겠습니다.

( 여기에 요점 )

require 'hamster'
require 'hamster/experimental/mutable_hash'

# a bunch of threads with a read/write ratio of 10:1
num_threads = 100
num_reads_per_write = 10
num_loops = 100 
hsh = Hamster.mutable_hash

puts RUBY_DESCRIPTION
puts "#{num_threads} threads x #{num_loops} loops, #{num_reads_per_write}:1 R/W ratio"

t0 = Time.now
Thread.abort_on_exception = true
threads = (0...num_threads).map do |n|
  Thread.new do
    write_key = n % num_reads_per_write
    read_keys = (0...num_reads_per_write).to_a.shuffle # random order
    last_read = nil

    num_loops.times do
      read_keys.each do |k|
        # Reads
        last_read = hsh[k]

        Thread.pass

        # Atomic increments in the correct ratio to reads
        hsh.put(k) { |v| (v || 0) + 1 } if k == write_key
      end
    end
  end
end

threads.map { |t| t.join }
t1 = Time.now

puts "Error in keys" unless (0...num_reads_per_write).to_a == hsh.keys.sort.to_a
puts "Error in values" unless hsh.values.all? { |v| v == (num_loops * num_threads) / num_reads_per_write }
puts "Time elapsed: #{t1 - t0} s"

다음과 같은 출력이 표시됩니다.

ruby 1.9.2p320 (2012-04-20 revision 35421) [x86_64-linux]
100 threads x 100 loops, 10:1 R/W ratio
Time elapsed: 5.763414627 s

jruby 1.7.0 (1.9.3p203) 2012-10-22 ff1ebbe on Java HotSpot(TM) 64-Bit Server VM 1.6.0_26-b03 [linux-amd64]
100 threads x 100 loops, 10:1 R/W ratio
Time elapsed: 1.697 s

이것에 대해 어떻게 생각하십니까?

이 솔루션은 Scala 또는 Clojure에서이 문제를 해결하는 방법과 더 유사하지만, 해당 언어에서는 구현되는 원자 비교 및 ​​스왑 작업에 대해 저수준 CPU 지원이있는 소프트웨어 트랜잭션 메모리를 사용할 가능성이 더 큽니다.

편집 : 햄스터 구현이 빠른 이유 중 하나는 잠금없는 읽기 경로를 제공한다는 것 입니다. 그것에 대해 질문이 있거나 작동 방식에 대해 의견을 보내주십시오.


이 ( video , pdf )는 자바로 구현 된 잠금없는 해시 테이블에 관한 것입니다.

스포일러 : 원자 CAS (Compare-And-Swap) 작업을 사용합니다. Ruby에서 사용할 수없는 경우 잠금으로 에뮬레이트 할 수 있습니다. 간단한 잠금 보호 해시 테이블보다 이점이 있는지 확실하지 않습니다.


테스트되지 않았으며 읽기 최적화에 순진합니다. 대부분의 경우 값이 잠기지 않는다고 가정합니다. 그렇다면 타이트 루프는 그럴 때까지 시도합니다. Thread.critical쓰기가 완료 될 때까지 읽기 스레드가 실행되지 않도록하기 위해 여기에 넣었 습니다. 중요한 부분이 필요한지 확실하지 않은 경우 읽기가 얼마나 많은지에 따라 다르므로 벤치마킹이 필요합니다.

class ConcurrentHash < Hash

  def initialize(*args)
    @semaphore = Mutex.new
    super
  end

  def []=(k,v)
    begin
      old_crit = Thread.critical
      Thread.critical = true unless old_crit
      @semaphore.synchronize { super }
    ensure
      Thread.critical = old_crit
    end
  end

  def [](k)
    while(true)
      return super unless @semaphore.locked?
    end
  end

end

@semaphore 잠금을 확인하는 데 필요한 몇 가지 다른 읽기 메서드가있을 수 있습니다. 다른 모든 것이 # [] 측면에서 구현되었는지는 모르겠습니다.


나는 이것의 의미에 대해 꽤 불분명합니다. 가장 간단한 구현은 단순히

Hash

즉, 기본 제공 루비 해시 스레드 세이프에 의해 스레드 세이프가 1 개 이상의 스레드가 액세스를 시도해도 폭발하지 않는다는 의미라면 스레드 세이프입니다. 이 코드는 영원히 안전하게 실행됩니다.

n = 4242
hash = {}

loop do
  a =
    Thread.new do
      n.times do
        hash[:key] = :val
      end
    end

  b =
    Thread.new do
      n.times do
        hash.delete(:key)
      end
    end

  c =
    Thread.new do
      n.times do
        val = hash[:key]
        raise val.inspect unless [nil, :val].include?(val)
      end
    end

  a.join
  b.join
  c.join
  p :THREADSAFE
end

스레드 안전에 의해 정말 ACID를 의미한다고 생각합니다. 예를 들어 hash [: key] = : val과 같은 쓰기와 has [: key]가 : val을 반환하면 읽기가 이어집니다. 그러나 락킹에 대한 어떠한 속임수도 그것을 제공 할 수 없습니다.-라스트 인은 항상 이길 것입니다. 예를 들어, 스레드 세이프 해시를 업데이트하는 모든 스레드가 42 개라고 가정합니다. 43'rd에서 읽어야하는 값은 무엇입니까 ?? 확실히 threasafe에 의해 쓰기에 대한 일종의 총 순서를 의미하지는 않습니다. 따라서 42 개의 스레드가 적극적으로 '올바른'값을 쓰고 있다면 어떤 권리가 있습니까? 하지만 루비의 내장 Hash는 이런 식으로 작동합니다.

아마도 당신은 같은 것을 의미합니다

hash.each do ...

한 스레드에서

hash.delete(key)

서로 간섭하지 않겠습니까? 스레드 안전을 원하지만 MRI 루비 를 사용하는 단일 스레드 에서도 안전하지 않습니다 (분명히 반복하는 동안 해시를 수정할 수 없습니다)

그래서 당신은 'threadsafe'가 의미하는 바에 대해 더 구체적으로 말할 수 있습니까?

ACID 의미 체계를 제공하는 유일한 방법은 전체 잠금입니다 (블록을 ​​사용하는 방법 일 수 있지만 여전히 외부 잠금).

루비의 스레드 스케줄러는 임의의 c 함수 (내장 된 해시 영역 설정 메소드와 같은)의 중간에 스레드 스맥을 예약하는 것이 아니므로 효과적으로 스레드로부터 안전합니다.


안타깝게도 Michael Sofaer가 소개하는 답변에 댓글을 추가 할 수 없습니다 : class RWLock 및 class LockedHash with @reader_count 등 (아직 충분한 카르마가 없습니다)

이 솔루션은 작동하지 않습니다. 오류가 발생합니다. in`unlock ': 잠겨 있지 않은 뮤텍스를 잠금 해제하려고합니다 (ThreadError).

논리적 버그로 인해 : 잠금을 해제 할 때가되면 잠금 해제가 1 회 추가로 발생합니다 (check my_block? (). 잠금 해제 된 음소거는 예외를 발생시킵니다. (이 게시물 끝에이 오류를 재현하는 방법에 대한 전체 코드를 붙여 넣겠습니다).

또한 Michael은 "각 메서드는 안전하지 않습니다 (반복하는 동안 다른 스레드에 의한 변형 허용)"라고 언급했습니다. 이것은 저에게 매우 중요했습니다. 그래서 저는이 단순화 된 솔루션을 사용하여 모든 사용 사례에서 작동하고 단순히 모든 호출에 대해 뮤텍스를 잠급니다. 다른 스레드에서 호출 된 모든 해시 메서드 (잠금을 소유 한 동일한 스레드에서 호출이 교착 상태를 방지하기 위해 차단되지 않음) :

#
# This TrulyThreadSafeHash works!
#
# Note if one thread iterating the hash by #each method
# then the hash will be locked for all other threads (they will not be 
# able to even read from it)
#
class TrulyThreadSafeHash
  def initialize
    @mutex = Mutex.new
    @hash = Hash.new
  end

  def method_missing(method_sym, *arguments, &block)

    if !@mutex.owned?  # Returns true if this lock is currently held by current thread
        # We're trying to lock only if mutex is not owned by the current thread (is not locked or is locked by some other thread).
        # Following call will be blocking if mutex locked by other thread:
        @mutex.synchronize{
            return lambda{@hash.send(method_sym,*arguments, &block)}.call
        }
    end

    # We already own the lock (from current thread perspective).
    # We don't even check if @hash.respond_to?(method_sym), let's make Hash
    # respond properly on all calls (including bad calls (example: wrong method names))
    lambda{@hash.send(method_sym,*arguments, &block)}.call
  end

  # since we're tyring to mimic Hash we'll pretend to respond as Hash would
  def self.respond_to?(method_sym, include_private = false)
    Hash.respond_to(method_sym, include_private)
  end

  # override Object's to_s because our method_missing won't be called for to_s
  def to_s(*arguments)
      @mutex.synchronize{
        return @hash.to_s
      }
  end

  # And for those, who want to run extra mile:
  # to make our class json-friendly we shoud require 'json' and uncomment this:
  #def to_json(*options)
  #    @mutex.synchronize{
  #        return @hash.to_json(*options)
  #    }
  #end

end

이제 Michael Sofaer의 솔루션에서 이중 잠금 해제 오류를 시연 / 재현하는 전체 예제 :

#!/usr/bin/env ruby

# ======= unchanged copy-paste part from Michael Sofaer answer (begin) =======

class LockedHash
  def initialize
    @hash = Hash.new
    @lock = ThreadAwareLock.new()
    @reader_count = 0
  end

  def [](key)
    @lock.lock_read
    ret = @hash[key]
    @lock.unlock_read
    ret
  end

  def []=(key, value)
    @lock.lock_write
    @hash[key] = value
    @lock.unlock_write
  end

  def method_missing(method_sym, *arguments, &block)
    if @hash.respond_to? method_sym
      @lock.lock_block
      val = lambda{@hash.send(method_sym,*arguments, &block)}.call
      @lock.unlock_block
      return val
    end
    super
  end
end



class RWLock
  def initialize
    @outer = Mutex.new
    @inner = Mutex.new
    @reader_count = 0
  end
  def lock_read
    @outer.synchronize{@inner.synchronize{@reader_count += 1}}
  end
  def unlock_read
    @inner.synchronize{@reader_count -= 1}
  end
  def lock_write
    @outer.lock
    while @reader_count > 0 ;end
  end
  def unlock_write
    @outer.unlock
  end
end

class ThreadAwareLock < RWLock
  def initialize
    @owner = nil
    super
  end
  def lock_block
    lock_write
    @owner = Thread.current.object_id
  end
  def unlock_block
    @owner = nil
    unlock_write
  end
  def lock_read
    super unless my_block?
  end
  def unlock_read
    super unless my_block?
  end
  def lock_write
    super unless my_block?
  end
  def unlock_write
    super unless my_block?
  end
  def my_block?
    @owner == Thread.current.object_id
  end
end

# ======= unchanged copy-paste part from Michael Sofaer answer (end) =======


# global hash object, which will be 'shared' across threads
$h = LockedHash.new

# hash_reader is just iterating through the 'shared' hash $h
# and prints specified delimeter (capitalized when last hash item read)
def hash_reader(delim)
    loop{
        count = 0
        $h.each{
            count += 1
            if count != $h.size
                $stderr.print delim
            else
                $stderr.puts delim.upcase
            end
        }
    }
end

# fill hash with 10 items
10.times{|i|
    $h[i] = i
}

# create a thread which will read $h hash
t1 = Thread.new(){
    hash_reader("o")
}

t1.join  # will never happen, but for completeness

, 다음 오류가 발생합니다.

./LockedHash_fails_to_unlock.rb
oooooooooO
./LockedHash_fails_to_unlock.rb:55:in `unlock': Attempt to unlock a mutex which is not locked (ThreadError)
        from ./LockedHash_fails_to_unlock.rb:55:in `unlock_write'
        from ./LockedHash_fails_to_unlock.rb:82:in `unlock_write'
        from ./LockedHash_fails_to_unlock.rb:70:in `unlock_block'
        from ./LockedHash_fails_to_unlock.rb:29:in `method_missing'
        from ./LockedHash_fails_to_unlock.rb:100:in `block in hash_reader'
        from ./LockedHash_fails_to_unlock.rb:98:in `loop'
        from ./LockedHash_fails_to_unlock.rb:98:in `hash_reader'
        from ./LockedHash_fails_to_unlock.rb:119:in `block in <main>'

해시가 읽기 무겁다 고 언급 했으므로 하나의 뮤텍스가 읽기와 쓰기를 모두 잠그면 읽기에 의해 가장 많이 발생하는 경쟁 조건이 발생합니다. 괜찮다면 대답을 무시하십시오.

쓰기에 우선 순위를 부여하려면 읽기-쓰기 잠금이 도움이 될 것입니다. 다음 코드는 운영 체제 클래스에 대한 오래된 C ++ 할당을 기반으로하므로 최상의 품질은 아니지만 일반적인 아이디어를 제공합니다.

require 'thread'

class ReadWriteLock
  def initialize
    @critical_section = Mutex.new
    @are_writers_finished = ConditionVariable.new
    @are_readers_finished = ConditionVariable.new
    @readers = 0
    @writers = 0
    @writer_locked = false
  end

  def read
    begin
      start_read
      yield
    ensure
      end_read
    end
  end

  def start_read
    @critical_section.lock
    while (@writers != 0 || @writer_locked)
      @are_writers_finished.wait(@critical_section)
    end
    @readers += 1
    @critical_section.unlock
  end

  def end_read
    @critical_section.lock
    if (@readers -= 1) == 0
      @are_readers_finished.broadcast
    end
    @critical_section.unlock
  end

  def write
    begin
      start_write
      yield
    ensure
      end_write
    end
  end

  def start_write
    @critical_section.lock
    @writers += 1
    while @readers > 0
      @are_readers_finished.wait(@critical_section)
    end
    while @writer_locked
      @are_writers_finished.wait(@critical_section)
    end
    @writers -= 1
    @writer_locked = true
    @critical_section.unlock
  end

  def end_write
    @critical_section.lock
    @writer_locked = false
    @are_writers_finished.broadcast
    @critical_section.unlock
  end
end

그런 다음 [] = 및 []를 lock.write 및 lock.read로 감 쌉니다. 성능에 영향을 미칠 수 있지만 쓰기가 읽기를 '통과'하도록 보장합니다. 이것의 유용성은 실제로 읽는 것이 얼마나 무거운 지에 달려 있습니다.

참고 URL : https://stackoverflow.com/questions/1080993/pure-ruby-concurrent-hash

반응형