Python Notes (0.14.0)

17. Threading

Some definitions before starting:

  • A thread is the smallest unit of code that can be executed.
  • A program that has more than one thread is multithreaded.
  • A process is an executing instance of a program.
  • A process as a complete set of data and variables while one or more threads may share the same data.

Note that for simple multiprocessing, you may also use the os module with fork() or spawn() function families (See the The os module (and sys, and path) page). However, the issue with fork is that it has to copy everything in the program to the new program and the system must have enough resources to handle that. Another issue with fork is that sometimes if several processes need to communicate, it may be difficult using multiple processes. In complex situation, the concept of thread is more appropriate.

The interest of threads is that there are multiplatform unlike subprocesses (unix only).

The thread module provides the functionalities for working with threads including locking, which is a synchronised mechanism. As we will see, only one thread can acquire a lock at a time. Before being too technical, let us start with a quick example.

17.1. Quick Start

The thread module provides simple functionalities and higher level interface is provided within the threading module that should be used instead.

Let us start with the thread module though. It will give us some insights into threading. As a first example, let us run 2 functions in 2 different threads:

>>> import thread
>>> import time
>>> def func1():
...    for i in range(0,3):
...        print("Inside func1")
...        time.sleep(2)
>>> def func2():
...    for i in range(0,3):
...        print("Inside func2")
...        time.sleep(1)
>>> thread.start_new_thread(func1, ())
>>> thread.start_new_thread(func2, ())

Inside func1
Inside func2
Inside func2
Inside func2
Inside func1
Inside func1

The output shows that indeed the 2 threads are running at the same time: func2(), which is faster, ends up before the end func1().

The function thread.start_new_thread() first argument is the function to call and its second argument is a tuple containing the positional list of arguments. Even though we have no arguments in this example, an empty tuple must be provided. Keyword arguments can also be provided.

17.2. Locking

The previous example is rather simple. The 2 threads run simultanously and do not need to communicate or wait for each other.

Let us use now consider another example where you want to fetch several URLs and write the responses into a file, which is a shared resource. The main issue is that access to the file may be random or in the orst case occuring at the same time. To solve this issue, we need a lock. An instance of lock can be created as follows:

lock = thread.allocate_lock()

Locks have 2 states: locked and unlocked. The state can be cheched with the thread.allocate_lock.locked() function:

>>> lock.locked()
False

2 additional methods are used to manipulate them: thread.allocate_lock.acquire() and thread.allocate_lock.release(). The rules linking the lock states and these functions are:

  • if the state is unlocked: a call to acquire() changes the state to locked.
  • if the state is locked: a call to acquire() blocks until another thread calls release().
  • if the state is unlocked: a call to release() raises a RuntimeError exception.
  • if the state is locked: a call to release() changes the state to unlocked().

Note

a lock is not owned by the thread that locked it; another thread may unlock it.

Here is now an example of multi threading with locking to fetch several URL and save the results in a file:

import thread
import time
import urllib2

# This function fetch the data from a URL
def getData(url):
    try:
        f = urllib2.urlopen(url)
        data = f.read()
        f.close()
    except urllib2.URLError:
        data = "Not found. URLError"
    return data

# This one tries to save the data from a URL into a file.
# it is a common functionto shared resource (the file) and
# therefore requires a lock
def fetchURL(url, filename, lock):
    data = getData(url)
    print("%s was requested and returned" % url)
    lock.acquire()
    try:
        f = open(filename, "a")
        f.write("Data from %s fetched\n" % url)
        f.close()
    finally:
        lock.release()
    return data

lock = thread.allocate_lock()

thread.start_new_thread(fetchURL, ("http://www.yahoo.fr","test.txt",lock))
thread.start_new_thread(fetchURL, ("http://www.lemonde.fr","test.txt",lock))
thread.start_new_thread(fetchURL, ("http://www.youtube.fr","test.txt",lock))

17.3. Re-Entrant locks

The previous example shows how to use what is called simple locking. This does not work in all cases. consider a file that you want to read partially with a function that read the first part and another that reads the second part.

def getPart1():
    lock.acquire()
    try:
        ... get first part of the data
    finally:
        lock.release()

def getPart2():
    lock.acquire()
    try:
        ... get first part of the data
    finally:
        lock.release()

We use locking to mqke sure that the file is not changed while reading it. Now, the problem is that a function that reads both part is in trouble:

def getAll():
    first = getPart1()
    second = getPart2()
    return first, second

Indeed, another thread may modify the file in between. A solution could be something like:

def getAll():
    lock.acquire()
    try:
        first = getPart1()
        second = getPart2()
    except:
        lock.release()
    return first, second

However, the getAll function calls acquire so it blocks further acquire calls in getPart1 and 2. This can be soveld using flags but it starts to be complicated. A neat solution is provided in the threading module called RLock (for Re-Entrant locks) that we will start to use now. You would replace:

lock = threading.Lock()
lock.acquire()
lock.acquire() # this blocks the lock

by:

lock = threading.RLock()
lock.acquire()
lock.acquire() # this won't block !!

and the getAll() function above should take care of the recursive call to acquire/release functions.

17.4. Using the Threqding module

Let us revisit the previous examples with the threading module and its threading.Thread class. The Thread class has a run() method that needs to be defined by the user. Once set, you can call it using the start method. The example from the quick start section could be written:

>>> import threading
>>> import time
>>> class ThreadExample(threading.Thread):
>>>    def __init__(self, Id, dt):
...        super(ThreadExample, self).__init__()
...        self.id = Id
...        self.dt = dt
>>>    def run(self):
...        for i in range(0,3):
...            print("Inside func %s" % self.id)
...            time.sleep(self.dt)
>>> t1 = ThreadExample("1", 1)
>>> t2 = ThreadExample("2", 2)
>>> t1.start(); t2.start()
Inside func 1
Inside func 2
Inside func 1
Inside func 1
Inside func 2
Inside func 2

17.5. Semaphore

You can use semaphore, which is a more advanced lock mechanism that has an internal counter. It only blocks if more than a given number of threads have attempted to hold the semaphore. This allows multiple threads to access the same code section simultaneously:

semaphore = threading.BoundedSemaphore()
semaphore.acquire() # decrements the counter
... access the shared resource
semaphore.release() # increments the counter

If the counter reaches zero when acquired, the acquiring thread will block. Semaphores are used to limit access to resource with limited capacity, such as a network connection or a database server. Just initialize the counter to the maximum number, and the semaphore implementation will take care of the rest:

semaphore = threading.BoundedSemaphore(10)

Python’s threading module provides two semaphore implementations; the Semaphore class provides an unlimited semaphore which allows you to call release any number of times to increment the counter. To avoid simple programming errors, it’s usually better to use the BoundedSemaphore class, which considers it to be an error to call release more often than you’ve called acquire.

17.6. Synchronisation between threads

17.6.1. Events

In the threading module, you can also find a class threading.Event that is a simple synchronization object. The event represents an internal flag, and threads can wait for the flag to be set or unset. A server thread can wait for the flag to be set:

>>> e = threading.Event()
>>> e.wait()

wile the client manipulates the event as follows:

>>> e = threading.Event()
>>> e.isSet()
False
>>> e.set()
>>> e.isSet()
True
>>> e.clear()
>>> e.isSet()
False

If the flag is set, the wait method doesn’t do anything. If the flag is cleared, wait will block until it becomes set again. Any number of threads may wait for the same event.

17.6.2. Conditions

A condition is a more advanced version of the event object. It represents a state change in the application, and a thread can wait for a given condition, or signal that the condition has happened. First, you need a condition object:

# represents the addition of an item to a resource
condition = threading.Condition()

The producing thread needs to acquire the condition before it can notify the consumers that a new item is available:

# producer thread
... generate item
condition.acquire()
... add item to resource
condition.notify() # signal that a new item is available
condition.release()

The consumers must acquire the condition (and thus the related lock), and can then attempt to fetch items from the resource:

# consumer thread
condition.acquire()
while True:
    ... get item from resource
    if item:
        break
    condition.wait() # sleep until item becomes available
condition.release()
... process item

The wait method releases the lock, blocks the current thread until another thread calls notify or notifyAll on the same condition, and then reacquires the lock. If multiple threads are waiting, the notify method only wakes up one of the threads, while notifyAll always wakes them all up.

To avoid blocking in wait, you can pass in a timeout value, as a floating-point value in seconds. If given, the method will return after the given time, even if notify hasn’t been called. If you use a timeout, you must inspect the resource to see if something actually happened.

Note that the condition object is associated with a lock, and that lock must be held before you can access the condition. Likewise, the condition lock must be released when you’re done accessing the condition. In production code, you should use try-finally or with, as shown earlier.

To associate the condition with an existing lock, pass the lock to the Condition constructor. This is also useful if you want to use several conditions for a single resource:

lock = threading.RLock()
condition_1 = threading.Condition(lock)
condition_2 = threading.Condition(lock)

See also

mod:Queue

17.7. Another Threading module example

The following example shows how to use lock and threading to fetch several URLs at the same time

class FetchUrls(threading.Thread):
    def __init__(self, urls, output, lock):
        self.lock = lock

    def run(self):
        while self.urls:
            self.lock.acquire()
            print 'lock acquired by %s' % self.name
            self.output.write(d.read())
            print 'write done by %s' % self.name
            print 'lock released by %s' % self.name
            self.lock.release()

def main():
    lock = threading.Lock()
    t1 = FetchUrls("http://www.yahoo.fr", f, lock)
    t2 = FetchUrls("http://www.youtube.fr", f, lock)

17.8. Threading

Here is another example from [Norton] that illustrates the usage of the threading.Thread class:

import math
from threading import Thread
import time

class SquareRootCalculator(object):
    """This class spawns a separate thread to calculate a bunch of
    roots, and checks in it once a second until it finishes."""
    def __init__(self, target):
        """Turn on the calculator thread and, while waiting for it
        finish, periodically monitor its progress."""
        self.results = []
        counter = self.CalculatorThread(self, target)
        print "Turning on the calculator thread..."
        counter.start()

        while len(self.results) < target:
            print "%d square roots calculated so far." % len(self.results)
            time.sleep(1)
        print "Calculated %s square root(s); the last one is sqrt(%d)=%f" % \
            (target, len(self.results), self.results[-1])
    class CalculatorThread(Thread):
        """A separate thread which actually does the calculations."""
        def __init__(self, controller, target):
             """Set up this thread, including making it a daemon thread
             so that the script can end without waiting for this thread to
             finish."""
             Thread.__init__(self)
             self.controller = controller
             self.target = target
             self.setDaemon(True)
        def run(self):
             """Calculate square roots for all numbers between 1 and the target,
             inclusive."""
             for i in range(1, self.target+1):
                 self.controller.results.append(math.sqrt(i))
if __name__ == '__main__':
    import sys
    limit = None
    if len(sys.argv) > 1:
        limit = sys.argv[1]
        try:
            limit = int(limit)
        except ValueError:
            print "Usage: %s [number of square roots to calculate]" \
               % sys.argv[0]
    SquareRootCalculator(limit)

17.9. Others

The threading.Thread class has some attributes htat can be useful. Each thread has a name attribute. It can be retrieved with :name: and be be set with the function threading.Thread.setName().

You can check is a thread is alive:

t.is_alive()
t.isAlive()

You can check if the thread is a Daemon.

t.isDaemon()
t.daemon()

or set it with setDaemon

17.9.1. threading

threading.Timer