Multi-threading in Python
Programming with threads is one of the more difficult tasks in programming. The Python threading and Queue modules make this significantly easier, but it still takes some deliberation to use threads in an efficient way.
Python’s thread and threading libraries use POSIX threads. The threading library is the higher level of the two and is therefore the one to use in your typical programming tasks. The Queue module provides a thread-safe mechanism for communicating between threads, like a combination list and semaphore.
POSIX threads are expensive, so it takes a little planning to know when to use them. Generally, the best uses of threads are for multiple tasks that cause side effects but do not depend on the state of other threads, such as output. An example of this is a program that writes out a large number of files with data from a database or a large data migration. Here is the general template for a threading class that encapsulates the actions to be taken.
Assume we have around 500 XML documents to download off of a remote server (via HTTP). Each is a large enough file that it warrants downloading several at a time. The server and the network can take a fair load, but we don’t want to simulate a botnet attack or overload our local network connection, which would slow down each download to a crawl and drastically increase collisions and errors while downloading. Let’s limit the number of files we download at once to 4. We start off with a function to download the files:
import urllib def get_file(url): try: f = urllib.urlopen(url) contents = f.read() f.close() return contents except IOError: print "Could not open document: %s" % url
So much for error handling, but you get the idea. Assuming our url is stored in a variable with the name url, to execute this function in another thread, we run:
import threading thread = threading.Thread(target=get_file, args=(url,))
We can make that a little simpler in two ways. The object oriented way is to implement the threading.Thread class. We would then put the code get_file(url)
in our run() method. This is useful for instances when the result of the function is required for later processing. If the results are not needed, we can simplify using the functional programming method and utilize a partial application:
from functools import partial, threading thread = threading.Thread(target=partial(get_file, url))
While that method is more fun, let’s use the OO method (no pun intended) since we want to do something with this data. Remember, we are downloading the file and storing it as a string, rather than simply downloading the file to the local file system. That implies we have more work to do after the download.
import urllib, threading class FileGetter(threading.Thread): def __init__(self, url): self.url = url self.result = None threading.Thread.__init__(self) def get_result(self): return self.result def run(self): try: f = urllib.urlopen(url) contents = f.read() f.close() self.result = contents except IOError: print "Could not open document: %s" % url
Now we have our Thread implementation. Note that instantiating an instance of FileGetter does not cause the thread to start. That is done with the start() method. However, we don’t want all of the threads running at the same time, so we need to use the Queue module and a couple of helper functions to manage our list of files.
import threading from Queue import Queue def get_files(files): def producer(q, files): for file in files: thread = FileGetter(file) thread.start() q.put(thread, True) finished = [] def consumer(q, total_files): while len(finished) < total_files: thread = q.get(True) thread.join() finished.append(thread.get_result()) q = Queue(3) prod_thread = threading.Thread(target=producer, args=(q, files)) cons_thread = threading.Thread(target=consumer, args=(q, len(files)) prod_thread.start() cons_thread.start() prod_thread.join() cons_thread.join()
Let’s take a look at what we did here. The first function, producer, accepts the queue and the list of files. For each file, it starts a new FileGetter thread. The last line is significant. We add the thread to the queue. The second parameter, boolean True, tells the put() method to block until a slot is available. Note that the thread stores before the blocking does. This means that even if the queue is full, the thread will have started. Because of this, we reduce our queue size to 3.
The second function, the consumer, reads items out of the queue, blocking until an item is available in the queue. Then comes the important part, thread.join(). This causes the consumer to block until the thread completes its execution. This line is what keeps the queue from emptying before the next thread has complete execution (and therefore starting more threads). The consumer uses the module-level variable, finished, to store the results of each thread’s execution.
Last, we begin a thread for the producer and the consumer, start them, and then block until they have completed. Here is the complete code:
import urllib, threading from Queue import Queue class FileGetter(threading.Thread): def __init__(self, url): self.url = url self.result = None threading.Thread.__init__(self) def get_result(self): return self.result def run(self): try: f = urllib.urlopen(url) contents = f.read() f.close() self.result = contents except IOError: print "Could not open document: %s" % url def get_files(files): def producer(q, files): for file in files: thread = FileGetter(file) thread.start() q.put(thread, True) finished = [] def consumer(q, total_files): while len(finished) < total_files: thread = q.get(True) thread.join() finished.append(thread.get_result()) q = Queue(3) prod_thread = threading.Thread(target=producer, args=(q, files)) cons_thread = threading.Thread(target=consumer, args=(q, len(files)) prod_thread.start() cons_thread.start() prod_thread.join() cons_thread.join()
Of course, this approach is not perfect. A queue is FIFO – first in, first out. If one of the threads currently executing finishes before the thread ahead of it, we lose efficiency in that now we only have three files downloading at a time. However, the solution to that is a complex one and outside the scope of this article.
Edit 02/19/2009: Fixed: FileGetter was not setting self.result in its run method. Thanks to tgray for pointing out the problem.
I’m confused as to how self.result gets updated, would you please explain this?
On a similar note, I was under the impression that a “return” from the run() method doesn’t do anything since run() is called from Thread.start(). Does this somehow **magic** “contents” into “self.result”?
Thanks for pointing out the bug. I don’t know how that went unnoticed for so long :).
Hi, thanks for this helpful tutorial. Please correct me if I’m wrong, but I believe you forgot calling the init :
class FileGetter(threading.Thread):
def __init__(self, url):
self.url = url
self.result = None
threading.Thread.__init__(self)
Thanks again BTW.
Best Regards,
Denis
Denis – thanks for noticing that. I’ve updated the code.
Aloha,
in the run() parts it should be “self.url” instead of just “url”, or?
Else a very nice example, thanks for it.
greetinx
kristall
Aloha,
also imho “get_files(files)” should “return finished”. And even its inside a def imho one should not use “file” as a variable since there is a builtin with that name.
Also ‘<’ does not show what it should.
kristall
Hey mate ! Thanks for the tutorial. Allowed me to get my head around threads again ! Hadn’t done it for years! Dam python makes it easier these days … /me remembers doing it in C++ …
Great tutorial. Thanks
i guess u forgot “)” after
‘cons_thread = threading.Thread(target=consumer, args=(q, len(files))’
:P
awesome … probably the most detailed one so far
You know this is simply doing co-operative multitasking, not actual concurrency because of the GIL. To get the files all downloading at the same time, you need the multiprocessing module (or similar). See the video of David Beazley’s talk at PyCon 2010.
You’re missing a bracket:
cons_thread = threading.Thread(target=consumer, args=(q, len(files))
should read
cons_thread = threading.Thread(target=consumer, args=(q, len(files)))
Thanks for the helpful article.
Regards, Faheem.
Some more errors:
1) < should be <
2) You need self.url in two places in run().
With these changes the code works.
Regards, Faheem.
Awesome, thanks!
You claim it is a complete code. Do you need a main program to drive this? Are you missing a statement such as get_files(list of URLs) to start it? Thanks.
It works. However, there are two more threads running in parallel than the number in the Queue() call. Do not know why. Nevertheless, I can live with the difference. Many thanks for the example.
Thanks for the example. Suggestion: test code before posting on the internet (missing closing parentheses, url should be self.url, etc.)
not to self:
clear all code before submitting immensely helpful tutorials. really, very nice explanation, and the corrections (!) submitted are helpful, just wanted to say thank you.
Correct me if i am wrong: i think there can be 5 FileGetter-threads active (instead of 4), right?
This is how i think that it works:
When starting the thread calling the producer-function, 4 FileGetter-threads will be started while 3 will be added to the queue. The 4th call to put will wait because the Queue is full.
When the thread calling the consumer-function is now started, one of the 3 threads is taken from the queue and the current thread waits for it (because of the join).
But now the producer-thread can finish its put-call (where it was waiting) and start the 5th FileGetter-thread.
So the number of parallel downloads is limited to a maximum of 5, not 4, right?
Moreover i have another question. :)
When waiting for the first FileGetter-thread (on the join-statement) other FileGetter-threads (e.g. the second and third) may finish while we are still waiting on the join-statement. So there may be unused queue-slots while waiting for the first thread which may take a long time.
Or do i understand something wrong?
Best Regards,
Spookie
This time I tried the application host to host, with TCP / IP connection using python, but sometimes during the same process occurs foult segmentation, it is when the query or insert into the table. whether the process should be the threading? and there are 2 processes in the same connection. A process runs every 5 minutes or so, and I make use of process B, but the response was too late and have been hit by a timeout, after 5 hours and then sent a late response but it did go through the process A. I try also to process B is not in time out but it is just as incurred through the process A. What should I do?
My email
regard
riz
I get an error when running this that claims __init__ is never called. I believe you need threading.Thread.__init__(self) to appear first in your def __init__ before anything else, otherwise self.url, etc. doesn’t know what to reference.
Am I wrong?
You Have Error On Your Script…
Change
“for file in files:” to
“for ufile in files:
thread = FileGetter(ufile)”
and
urllib.urlopen(self.url)
cons_thread = threading.Thread( target=consumer, args=(q, len(files)) “)” is missing so put it
and
cons_thread = threading.Thread( target=consumer, args=(q, len(files)) “)” This is Missing on The Script So Put it
Perhaps I am missing something but why are you putting the threads themselves in a queue? This seems like a very strange use of queues. Also you end up creating one thread per task which is very wasteful.
Personally I would stick the jobs to be performed (in this case URLs to be downloaded) in a queue then create n (in this case 4) threads to work through that queue. If necessary, these threads can place the results in another queue to be processed by some other number of consumer threads.
This solution would have the huge advantage that jobs can be added as you go along, which is absolutely crucial if you want to chain a few of these things together.
I also do not understand the point of creating a Thread class to replace the get_files function. Ostensibly you do this to make things simpler, but actually I think it’s more complicated (at least for illustration purposes). Potentially you might want your tasks to be objects so your pool of threads would take task objects from a queue, do something to those objects and put the “finished” objects into another queue for further processing. But I can’t see why you need your tasks to be thread objects.
I agree with Alastair.
class Worker(threading.Thread):
def __init__(self, taskQueue, resultQueue):
self.taskQueue = taskQueue
self.resultQueue = resultQueue
super(threading.Thread, self).__init__()
def run(self):
while True: # until end marker found
try:
url = self.taskQueue().get(True)
if url is None: # end-marker?
self.resultQueue.put(self) # report being finished
f = urllib.urlopen(url)
contents = f.read()
f.close()
self.resultQueue.put(contents)
except Exception, e:
self.resultQueue.put(e)
def get_files(urls):
taskQueue = Queue()
for url in urls:
taskQueue.put(url)
resultQueue = Queue()
workers = set([ Worker(taskQueue, resultQueue) for i in range(4) ])
for worker in workers:
worker.start()
taskQueue.put(None) # one end marker for each worker thread
while workers:
result = resultQueue.get(True)
if result in workers:
workers.remove(result)
else:
yield result
I agree with Alastair.
class Worker(threading.Thread):
def __init__(self, taskQueue, resultQueue):
self.taskQueue = taskQueue
self.resultQueue = resultQueue
super(threading.Thread, self).__init__()
def run(self):
while True: # until end marker found
try:
url = self.taskQueue().get(True)
if url is None: # end-marker?
self.resultQueue.put(self) # report being finished
f = urllib.urlopen(url)
contents = f.read()
f.close()
self.resultQueue.put(contents)
except Exception, e:
self.resultQueue.put(e)
def get_files(urls):
taskQueue = Queue()
for url in urls:
taskQueue.put(url)
resultQueue = Queue()
workers = set([ Worker(taskQueue, resultQueue) for i in range(4) ])
for worker in workers:
worker.start()
taskQueue.put(None) # one end marker for each worker thread
while workers:
result = resultQueue.get(True)
if result in workers:
workers.remove(result)
else:
yield result
Hmpf. Sorry for spamming :-(
The blog does not allow removing old posts and does not handle hard spaces properly at all places. I hope you can decipher my code nevertheless.
Alfe