📜 ⬆️ ⬇️

Ruby Threading

Translation of the Multithreading chapter by David Thomas and Andrew Hunt Programming Ruby: The Pragmatic Programmers' Guide, Second Edition.

Often the easiest way to do two things at the same time is to use threads in Ruby. They are in-process, embedded in the Ruby interpreter. This makes Ruby threads fully portable, i.e. independent of the operating system. But at the same time, you will definitely not benefit from the use of native, native streams. What does it mean?

You may experience starvation ( thread starvation is when a thread with a small priority has no chance to start). If you want to block your threads, then the whole process will stop with a screech. And if a situation arises that some threads will send calls to the operating system, which require a considerable amount of time to complete, then all threads will hang until the interpreter gets control back. Finally, if your machine has more than one processor, Ruby threads will not use it, because they run in the same process, and in a single native thread they will be forced to run on the same processor at a time.
')
It all sounds scary. However, in practice, in many cases, the benefits of using threads largely outweigh any potential problems that may arise. Ruby streams are an efficient and easy way to achieve concurrency in your code. You just have to understand the basic problems of implementation, and, accordingly, the architecture.

Creating Ruby Threads


Creating a new thread is pretty straightforward. The following code is a simple example. He simultaneously downloads a set of Web pages. For each URL requested for download, the code creates a separate stream that controls the HTTP transaction.

require 'net/http'
pages = %w( www.rubycentral.com slashdot.org www.google.com )
threads = []
for page_to_fetch in pages
threads << Thread . new(page_to_fetch) do | url |
h = Net :: HTTP . new(url, 80 )
puts "Fetching: #{ url } "
resp = h . get( '/' , nil )
puts "Got #{ url } : #{ resp . message } "
end
end
threads . each { | thr | thr . join }


Result:
  Fetching: www.rubycentral.com
 Fetching: slashdot.org
 Fetching: www.google.com
 Got www.google.com: OK
 Got www.rubycentral.com: OK
 Got slashdot.org: OK 


Let's take a closer look at this code: here are a few subtle points. New threads are created by calling Thread.new. This sets the block containing the code to be executed in the new thread. In our case, the block uses the net / http library to extract the main page of the specified sites. Our trace clearly shows that these extracts are performed in parallel.

When we create a stream, we specify the required URL as a parameter. This parameter is passed to the block as a variable url. Why do we do this when it would be easier to use the value of the page_to_fetch variable inside the block?

A thread shares all global, instance variables, and local variables that are available at the time the thread starts. Anyone with a younger brother can tell you that sharing or sharing is not always a good thing. In this case, all three threads will share the page_to_fetch variable. When the first thread starts, page_to_fetch takes the value “ www.rubycentral.com ”. Meanwhile, the thread-creating loop is still working. At the next point in time, page_to_fetch is set to slashdot.org. If the first thread has not yet finished using the page_to_fetch variable, then it will suddenly start using its new value. This kind of error is very difficult to track down.

However, local variables created inside a stream block are truly local to this stream — each thread will have its own copy of the page address. You can specify any number of arguments in a block using Thread.new.

Flow control

Another subtlety occurs on the last line of our program. Why do we call join for each thread we create?

When the Ruby program terminates, all threads are killed, despite their state. However, you can wait for the completion of a single thread by calling the Thread # join method. The calling thread is blocked until the current thread ends. By calling join for each thread, you can be sure that all three queries will be executed before the end of the main program. If you do not want to block the stream permanently, you can pass a time limit parameter to the join — if this limit ends before the end of the stream, the call to join will return the value nil. Another join option is the Thread # value method, which returns the value of the last operation performed on the thread.

In addition to join, several other convenient operations are used to control threads. Access to the current thread can always be obtained using Thread.current. You can get a list of all threads using Thread.list, which returns a list of all Thread objects: both working and stopped. To determine the status of a separate thread, you can use Thread # status and Thread # alive ?.
Additionally, you can set the thread priority using Thread # priority =. Higher priority threads will run before lower priority threads. We will talk a little later about scheduling of threads, as well as their start and stop.

Flow variables

A thread has normal access to all variables that are in scope at the time of its launch. Local variables of a block containing a stream code are local to the stream itself and are not shared with each other.

But what to do, if you need such variables in the stream, over which you could have access from other streams — to include them in the main stream? A characteristic feature of the class Thread is a special feature that allows you to create and have access by name to the local variables of the stream. You simply treat the stream object as a hash, setting the values ​​of the elements with [] = and reading them with []. In the following example, each thread writes the current value of the counter to the local variable of the stream with the key mycount. To accomplish this, the code uses the string "mycount" as an index of the stream object.

count = 0
threads = []
10. times do | i |
threads [ i ] = Thread . new do
sleep ( rand ( 0.1 ))
Thread . current [ "mycount" ] = count
count += 1
end
end
threads . each { | t | t . join; print t [ "mycount" ] , ", " }
puts "count = #{ count } "


Result:
  4, 1, 0, 8, 7, 9, 5, 6, 3, 2, count = 10 


The main thread waits for the remaining threads to complete, and then displays the counter values ​​captured by each thread. For interest, we added a random delay to each stream before writing the counter value.

Threads and Exceptions


What happens if an unhandled exception occurs in the stream? It depends on the value of the abort_on_exception flag and on the value of the interpreter's debug flag.
If abort_on_exception = false and the debug flag is not enabled (the default state), then an unhandled exception will simply kill the current thread, and all others will continue their work. In reality, you don't even know anything about the exception, until the thread that threw this exception is called join.

In the following example, stream 2 swells and cannot output anything. However, you can still see the trace of the rest of the streams.

threads = []
4. times do | number |
threads << Thread . new(number) do | i |
raise "Boom!" if i == 2
print " #{ i } \n "
end
end
threads . each { | t | t . join }


Result:
  0 
 one
 3
 prog.rb: 4: Boom!  (RuntimeError)
 from prog.rb: 8: in `join '
 from prog.rb: 8
 from prog.rb: 8: in `each '
 from prog.rb: 8 


We can catch an exception at join time.
threads = []
4. times do | number |
threads << Thread . new(number) do | i |
raise "Boom!" if i == 2
print " #{ i } \n "
end
end
threads . each do | t |
begin
t . join
rescue RuntimeError => e
puts "Failed: #{ e . message } "
end
end


Result:
  0
 one
 3 
 Failed: Boom! 

However, if you set abort_on_exception to true or use -d to disable the debug flag, an unhandled exception will kill all running threads. As soon as stream 2 dies, no more output will be made.

Thread . abort_on_exception = true
threads = []
4. times do | number |
threads << Thread . new(number) do | i |
raise "Boom!" if i == 2
print " #{ i } \n "
end
end
threads . each { | t | t . join }


Result:
  0
 one
 prog.rb: 5: Boom!  (RuntimeError)
 from prog.rb: 4: in `initialize '
 from prog.rb: 4: in `new '
 from prog.rb: 4
 from prog.rb: 3: in `times'
 from prog.rb: 3 


This example also illustrates a glitch. Inside the loop, it is preferable to use print to print a number than puts. Why? Because puts secretly breaks its work into two components: it prints its argument, and then prints the newline character. Between the two, a stream may start, and the output will alternate. By calling print of a single line that already contains a newline character, we can work around this problem.

Flow Scheduler Management


In a well-designed application, you just let the threads do their work. Creating time dependencies in a multithreaded application is usually considered bad form because this makes the code much harder to read, and also makes it impossible to optimize the execution of your program by the thread scheduler.

However, sometimes you will need to manage threads explicitly. For example, a jukebox featuring light music. We have to stop it at the time when the music stops. You can use two streams in the form of a producer-consumer scheme, where the consumer must wait if the manufacturer has incomplete orders.

The Thread class provides a set of methods for managing the thread scheduler. A call to Thread.stop stops the current thread, and a call to Thread # run starts a separate thread. Thread.pass starts the scheduler to transfer execution to another thread, and Thread # join and Thread # value pauses the calling thread until the specified threads are completed.

We can demonstrate this feature in the next completely meaningless program. It creates two child threads: t1 and t2, each of which is an instance of the class Chaser. The chase method increments the counter, but does not allow it to become larger than two compared to the counter in another thread. To stop this increase, the method calls Thread.pass, which allows the chase method to start in another thread. For the interest, we immediately after the start suspend the threads, and then randomly start.

class Chaser
attr_reader :count
def initialize ( name )
@name = name
@count = 0
end
def chase (other)
while @count < 5
while @count - other . count > 1
Thread . pass
end
@count += 1
print " #@name : #{ count } \n "
end
end
end

c1 = Chaser . new( "A" )
c2 = Chaser . new( "B" )
threads = [
Thread . new { Thread . stop; c1 . chase(c2) },
Thread . new { Thread . stop; c2 . chase(c1) }
]
start_index = rand ( 2 )
threads [ start_index ]. run
threads [1 - start_index ]. run
threads . each { | t | t . join }


Result:
  B: 1
 B: 2
 A: 1
 B: 3
 A: 2
 B: 4
 A: 3
 B: 5
 A: 4
 A: 5 


However, it is not so easy to use such elementary actions to achieve synchronization in real code - the state of the races will constantly haunt you. And when you work with general data, the state of racing guarantees you a long and disappointing debugging. In fact, the previous example contains an error: it is possible to increment a counter in one thread, but before displaying its value, another thread starts and displays the value of its counter. As a result, the output will be in the wrong order.

Fortunately, streams have one additional possibility - the idea of ​​mutual exclusion (mutual exclusion). Using this, we can create secure synchronization schemes.

Source: https://habr.com/ru/post/94574/


All Articles