Multi-threading in Python

Programming with threads is one of the more difficult tasks in programming. The Python threading and Queue modules make this significantly easier, but it still takes some deliberation to use threads in an efficient way.

Python’s thread and threading libraries use POSIX threads. The threading library is the higher level of the two and is therefore the one to use in your typical programming tasks. The Queue module provides a thread-safe mechanism for communicating between threads, like a combination list and semaphore.

POSIX threads are expensive, so it takes a little planning to know when to use them. Generally, the best uses of threads are for multiple tasks that cause side effects but do not depend on the state of other threads, such as output. An example of this is a program that writes out a large number of files with data from a database or a large data migration. Here is the general template for a threading class that encapsulates the actions to be taken.

Assume we have around 500 XML documents to download off of a remote server (via HTTP). Each is a large enough file that it warrants downloading several at a time. The server and the network can take a fair load, but we don’t want to simulate a botnet attack or overload our local network connection, which would slow down each download to a crawl and drastically increase collisions and errors while downloading. Let’s limit the number of files we download at once to 4. We start off with a function to download the files:

import urllib
 
def get_file(url):
    try:
        f = urllib.urlopen(url)
        contents = f.read()
        f.close()
        return contents
     except IOError:
        print "Could not open document: %s" % url

So much for error handling, but you get the idea. Assuming our url is stored in a variable with the name url, to execute this function in another thread, we run:

import threading
 
thread = threading.Thread(target=get_file, args=(url,))

We can make that a little simpler in two ways. The object oriented way is to implement the threading.Thread class. We would then put the code get_file(url) in our run() method. This is useful for instances when the result of the function is required for later processing. If the results are not needed, we can simplify using the functional programming method and utilize a partial application:

from functools import partial, threading
 
thread = threading.Thread(target=partial(get_file, url))

While that method is more fun, let’s use the OO method (no pun intended) since we want to do something with this data. Remember, we are downloading the file and storing it as a string, rather than simply downloading the file to the local file system. That implies we have more work to do after the download.

import urllib, threading
 
class FileGetter(threading.Thread):
    def __init__(self, url):
        self.url = url
        self.result = None
        threading.Thread.__init__(self)
 
    def get_result(self):
        return self.result
 
    def run(self):
        try:
            f = urllib.urlopen(url)
            contents = f.read()
            f.close()
            self.result = contents
         except IOError:
            print "Could not open document: %s" % url

Now we have our Thread implementation. Note that instantiating an instance of FileGetter does not cause the thread to start. That is done with the start() method. However, we don’t want all of the threads running at the same time, so we need to use the Queue module and a couple of helper functions to manage our list of files.

import threading
from Queue import Queue
 
def get_files(files):
    def producer(q, files):
        for file in files:
            thread = FileGetter(file)
            thread.start()
            q.put(thread, True)
 
    finished = []
    def consumer(q, total_files):
        while len(finished) < total_files:
            thread = q.get(True)
            thread.join()
            finished.append(thread.get_result())
 
    q = Queue(3)
    prod_thread = threading.Thread(target=producer, args=(q, files))
    cons_thread = threading.Thread(target=consumer, args=(q, len(files))
    prod_thread.start()
    cons_thread.start()
    prod_thread.join()
    cons_thread.join()

Let’s take a look at what we did here. The first function, producer, accepts the queue and the list of files. For each file, it starts a new FileGetter thread. The last line is significant. We add the thread to the queue. The second parameter, boolean True, tells the put() method to block until a slot is available. Note that the thread stores before the blocking does. This means that even if the queue is full, the thread will have started. Because of this, we reduce our queue size to 3.

The second function, the consumer, reads items out of the queue, blocking until an item is available in the queue. Then comes the important part, thread.join(). This causes the consumer to block until the thread completes its execution. This line is what keeps the queue from emptying before the next thread has complete execution (and therefore starting more threads). The consumer uses the module-level variable, finished, to store the results of each thread’s execution.

Last, we begin a thread for the producer and the consumer, start them, and then block until they have completed. Here is the complete code:

import urllib, threading
from Queue import Queue
 
class FileGetter(threading.Thread):
    def __init__(self, url):
        self.url = url
        self.result = None
        threading.Thread.__init__(self)
 
    def get_result(self):
        return self.result
 
    def run(self):
        try:
            f = urllib.urlopen(url)
            contents = f.read()
            f.close()
            self.result = contents
         except IOError:
            print "Could not open document: %s" % url
 
def get_files(files):
    def producer(q, files):
        for file in files:
            thread = FileGetter(file)
            thread.start()
            q.put(thread, True)
 
    finished = []
    def consumer(q, total_files):
        while len(finished) < total_files:
            thread = q.get(True)
            thread.join()
            finished.append(thread.get_result())
 
    q = Queue(3)
    prod_thread = threading.Thread(target=producer, args=(q, files))
    cons_thread = threading.Thread(target=consumer, args=(q, len(files))
    prod_thread.start()
    cons_thread.start()
    prod_thread.join()
    cons_thread.join()

Of course, this approach is not perfect. A queue is FIFO – first in, first out. If one of the threads currently executing finishes before the thread ahead of it, we lose efficiency in that now we only have three files downloading at a time. However, the solution to that is a complex one and outside the scope of this article.

Edit 02/19/2009: Fixed: FileGetter was not setting self.result in its run method. Thanks to tgray for pointing out the problem.

Leave a comment | Trackback
Nov 30th, 2007 | Posted in Programming
  1. tgray
    Feb 18th, 2009 at 15:38 | #1

    I’m confused as to how self.result gets updated, would you please explain this?

    On a similar note, I was under the impression that a “return” from the run() method doesn’t do anything since run() is called from Thread.start(). Does this somehow **magic** “contents” into “self.result”?

  2. Jeff
    Feb 18th, 2009 at 15:52 | #2

    Thanks for pointing out the bug. I don’t know how that went unnoticed for so long :).

  3. denis
    May 23rd, 2009 at 17:24 | #3

    Hi, thanks for this helpful tutorial. Please correct me if I’m wrong, but I believe you forgot calling the init :

    class FileGetter(threading.Thread):
    def __init__(self, url):
    self.url = url
    self.result = None
    threading.Thread.__init__(self)

    Thanks again BTW.

    Best Regards,

    Denis

  4. Jeff
    May 29th, 2009 at 06:39 | #4

    Denis – thanks for noticing that. I’ve updated the code.

  5. kristall
    Jul 15th, 2009 at 20:18 | #5

    Aloha,

    in the run() parts it should be “self.url” instead of just “url”, or?

    Else a very nice example, thanks for it.

    greetinx
    kristall

  6. kristall
    Jul 15th, 2009 at 21:53 | #6

    Aloha,

    also imho “get_files(files)” should “return finished”. And even its inside a def imho one should not use “file” as a variable since there is a builtin with that name.

    Also ‘<’ does not show what it should.

    kristall

  7. Jan 7th, 2010 at 00:19 | #7

    Hey mate ! Thanks for the tutorial. Allowed me to get my head around threads again ! Hadn’t done it for years! Dam python makes it easier these days … /me remembers doing it in C++ …

  8. bora
    Jan 20th, 2010 at 14:50 | #8

    Great tutorial. Thanks

  9. Salman
    Jan 21st, 2010 at 10:35 | #9

    i guess u forgot “)” after
    ‘cons_thread = threading.Thread(target=consumer, args=(q, len(files))’
    :P

  10. Feb 10th, 2010 at 14:56 | #10

    awesome … probably the most detailed one so far

  11. Brandon
    Mar 9th, 2010 at 14:00 | #11

    You know this is simply doing co-operative multitasking, not actual concurrency because of the GIL. To get the files all downloading at the same time, you need the multiprocessing module (or similar). See the video of David Beazley’s talk at PyCon 2010.

  12. Faheem Mitha
    Jul 24th, 2010 at 11:24 | #12

    You’re missing a bracket:

    cons_thread = threading.Thread(target=consumer, args=(q, len(files))

    should read

    cons_thread = threading.Thread(target=consumer, args=(q, len(files)))

    Thanks for the helpful article.

    Regards, Faheem.

  13. Faheem Mitha
    Jul 24th, 2010 at 15:23 | #13

    Some more errors:

    1) < should be <

    2) You need self.url in two places in run().

    With these changes the code works.

    Regards, Faheem.

  14. Martin
    Nov 1st, 2010 at 15:05 | #14

    Awesome, thanks!

  15. jaslin
    Dec 21st, 2010 at 03:21 | #15

    You claim it is a complete code. Do you need a main program to drive this? Are you missing a statement such as get_files(list of URLs) to start it? Thanks.

  16. jaslin
    Dec 21st, 2010 at 17:29 | #16

    It works. However, there are two more threads running in parallel than the number in the Queue() call. Do not know why. Nevertheless, I can live with the difference. Many thanks for the example.

  17. Reggie
    Jan 19th, 2011 at 19:28 | #17

    Thanks for the example. Suggestion: test code before posting on the internet (missing closing parentheses, url should be self.url, etc.)

  18. neil
    Feb 24th, 2011 at 01:04 | #18

    not to self:

    clear all code before submitting immensely helpful tutorials. really, very nice explanation, and the corrections (!) submitted are helpful, just wanted to say thank you.

  19. Spookie
    Feb 28th, 2011 at 14:12 | #19

    Correct me if i am wrong: i think there can be 5 FileGetter-threads active (instead of 4), right?

    This is how i think that it works:
    When starting the thread calling the producer-function, 4 FileGetter-threads will be started while 3 will be added to the queue. The 4th call to put will wait because the Queue is full.
    When the thread calling the consumer-function is now started, one of the 3 threads is taken from the queue and the current thread waits for it (because of the join).
    But now the producer-thread can finish its put-call (where it was waiting) and start the 5th FileGetter-thread.
    So the number of parallel downloads is limited to a maximum of 5, not 4, right?

    Moreover i have another question. :)
    When waiting for the first FileGetter-thread (on the join-statement) other FileGetter-threads (e.g. the second and third) may finish while we are still waiting on the join-statement. So there may be unused queue-slots while waiting for the first thread which may take a long time.
    Or do i understand something wrong?

    Best Regards,
    Spookie

  20. riz
    Mar 22nd, 2011 at 23:51 | #20

    This time I tried the application host to host, with TCP / IP connection using python, but sometimes during the same process occurs foult segmentation, it is when the query or insert into the table. whether the process should be the threading? and there are 2 processes in the same connection. A process runs every 5 minutes or so, and I make use of process B, but the response was too late and have been hit by a timeout, after 5 hours and then sent a late response but it did go through the process A. I try also to process B is not in time out but it is just as incurred through the process A. What should I do?

    My email

    regard
    riz

  21. Scott
    Nov 9th, 2011 at 15:00 | #21

    I get an error when running this that claims __init__ is never called. I believe you need threading.Thread.__init__(self) to appear first in your def __init__ before anything else, otherwise self.url, etc. doesn’t know what to reference.

    Am I wrong?

  22. Dec 21st, 2011 at 02:45 | #22

    You Have Error On Your Script…
    Change
    “for file in files:” to
    “for ufile in files:
    thread = FileGetter(ufile)”

    and

    urllib.urlopen(self.url)

    • Dec 21st, 2011 at 02:48 | #23

      cons_thread = threading.Thread( target=consumer, args=(q, len(files)) “)” is missing so put it

  23. Dec 21st, 2011 at 02:47 | #24

    and
    cons_thread = threading.Thread( target=consumer, args=(q, len(files)) “)” This is Missing on The Script So Put it

  24. Alastair
    Dec 21st, 2011 at 15:06 | #25

    Perhaps I am missing something but why are you putting the threads themselves in a queue? This seems like a very strange use of queues. Also you end up creating one thread per task which is very wasteful.

    Personally I would stick the jobs to be performed (in this case URLs to be downloaded) in a queue then create n (in this case 4) threads to work through that queue. If necessary, these threads can place the results in another queue to be processed by some other number of consumer threads.

    This solution would have the huge advantage that jobs can be added as you go along, which is absolutely crucial if you want to chain a few of these things together.

    I also do not understand the point of creating a Thread class to replace the get_files function. Ostensibly you do this to make things simpler, but actually I think it’s more complicated (at least for illustration purposes). Potentially you might want your tasks to be objects so your pool of threads would take task objects from a queue, do something to those objects and put the “finished” objects into another queue for further processing. But I can’t see why you need your tasks to be thread objects.

  25. Jan 23rd, 2012 at 09:21 | #26

    I agree with Alastair.

    class Worker(threading.Thread):
        def __init__(self, taskQueue, resultQueue):
            self.taskQueue = taskQueue
    self.resultQueue = resultQueue
            super(threading.Thread, self).__init__()
        def run(self):
    while True: # until end marker found
    try:
    url = self.taskQueue().get(True)
    if url is None: # end-marker?
    self.resultQueue.put(self) # report being finished
    f = urllib.urlopen(url)
    contents = f.read()
    f.close()
    self.resultQueue.put(contents)
    except Exception, e:
    self.resultQueue.put(e)

    def get_files(urls):
    taskQueue = Queue()
    for url in urls:
    taskQueue.put(url)
    resultQueue = Queue()
    workers = set([ Worker(taskQueue, resultQueue) for i in range(4) ])
    for worker in workers:
    worker.start()
    taskQueue.put(None) # one end marker for each worker thread
    while workers:
    result = resultQueue.get(True)
    if result in workers:
    workers.remove(result)
    else:
    yield result

  26. Jan 23rd, 2012 at 09:24 | #27

    I agree with Alastair.

    class Worker(threading.Thread):
        def __init__(self, taskQueue, resultQueue):
    self.taskQueue = taskQueue
    self.resultQueue = resultQueue
    super(threading.Thread, self).__init__()
    def run(self):
    while True: # until end marker found
    try:
    url = self.taskQueue().get(True)
    if url is None: # end-marker?
    self.resultQueue.put(self) # report being finished
    f = urllib.urlopen(url)
    contents = f.read()
    f.close()
    self.resultQueue.put(contents)
    except Exception, e:
    self.resultQueue.put(e)

    def get_files(urls):
    taskQueue = Queue()
    for url in urls:
    taskQueue.put(url)
    resultQueue = Queue()
    workers = set([ Worker(taskQueue, resultQueue) for i in range(4) ])
    for worker in workers:
    worker.start()
    taskQueue.put(None) # one end marker for each worker thread
    while workers:
    result = resultQueue.get(True)
    if result in workers:
    workers.remove(result)
    else:
    yield result

  27. Jan 23rd, 2012 at 09:26 | #28

    Hmpf. Sorry for spamming :-(

    The blog does not allow removing old posts and does not handle hard spaces properly at all places. I hope you can decipher my code nevertheless.

    Alfe

Show Hide 1 trackbacks