πŸ“œ ⬆️ ⬇️

Threading Basics in Python

Foreword


I started to write this article after increasing questions both on the forum and icq questions on the topic of multithreading in CPython. The problem of the people who ask them comes mainly from ignorance or misunderstanding of the basic principles of multi-threaded applications. At least, this refers to the multithreading model I use, which is called the Thread Pool. Often there is another problem: people do not have the basic skills of working with standard CPython modules. In the article I will try to give examples of such ignorance, not dwelling on personalities, since in my humble opinion it does not matter. Based on the conditions in which this article is written, we will also touch upon work through proxy servers (not to be confused with SOCKS).

Details


The article was written at the time when the latest versions of CPython were: 2.6.2 for the second branch and 3.1.1 for the third branch. The article uses the newly appeared in CPython 2.6 with statements, therefore, if you want to use this code in earlier versions, you will have to redo the code at your discretion. In the process of writing this article, only standard modules available "out of the box" were used. Also, based on the fact that I am not a professional programmer, but a self-taught person, I apologize to a respected audience for possible inaccuracies regarding the interpretation of certain concepts. Therefore, I invite you to ask questions, which I will answer as much as possible.

So, let's start, in fact, what I was going to describe for the first time was recommended by respected lorien with python.su (although in his example, Queue was processed in a separate thread :)), not sure that he was the author of the concept he demonstrated, but for the first time I saw it published it is from him that it’s not even the Thread Pool, but the Task Pool (although I may be wrong in interpreting this term).
What is a multi-threaded application? This is an application in which a certain number of threads perform certain tasks. The trouble of many is that they do not fully capture the fact that the threads act separately from each other as long as the main thread is active. Personally, I try to write in such a way that it does not bother them, but more on that later. Also, their problem is the so-called "Hindu" code, which is simply and thoughtlessly copied from somewhere, and the program is brought to the level of "just to work." Gentlemen, once and for all: if you do not understand how this or that section of your program works, then rewrite it so that it is clear to YOU, if in the future you will grow to understand those things that you assumed to copy without thinking, then you no problem can use this code. The main thing is YOUR understanding of how your creation works.
We will address the problem of separate work streams . Gentlemen, the interaction of threads should be thought out before you start writing the application, and not when you have already written it. In principle, if you adhere to some rules for working with the source code of an application, then remaking a program from single-threaded to multi-threaded occurs easily, painlessly, and quickly.
Regarding the activity of the main thread . When, as it seems to you, you start ONE thread, TWO threads are actually running. You need to understand that the number of threads currently active is equal to the number of threads that are currently running by you +1 thread in which the main body of the application is running. Personally, I try to write in such a way as to clearly separate the main thread from those started by me. If you do not do this, then perhaps the premature (as it seems to you) application shutdown, although in fact the application will work exactly as you wrote it.
It seems in words it is clear, now we start practice. In practice, CPython has such an intelligibility as GIL (Global Interpreter Lock). Sim means a global blocking of the interpreter at the moment when the threads of your application are accessing the processor. In fact, at any given moment, only one thread is working with the processor. In this regard, the maximum number of threads, which generally can be run in a standard CPython, fluctuates around 350 pieces.
As an example, an attempt will be made to implement a multi-threaded parser www.google.com . As I already wrote above, only standard modules will be used for the work, the urllib2, urllib, queue, threading, re modules will be needed to complete the task.

In order:
#==================< >==================
import urllib2
# HTTP,
import urllib
# HTTP, urllib2,
# - urllib.urlquote
from Queue import Queue
#, "Pool", ,
# ,
#
import threading
# ,
#threading.active_count, threading.Thread, threading.Thread.start,
#threading.Rlock
import re
# ,
#
import time
# , sleep
queue = Queue()
# , (..
# Queue Queue )
#==================</ >=================

#==============================<>==============================
PROXY = "10.10.31.103:3128"
# -,
# ,
# PROXY, -.
# None
HEADERS = { "User-Agent" : "Opera/9.64 (Windows NT 5.1; U; en) Presto/2.1.1" ,
"Accept" : "text/html, application/xml;q=0.9, application/xhtml+xml, image/ png, image/jpeg, image/gif, image/x-xbitmap, */*;q=0.1" ,
"Accept-Language" : "ru,uk-UA;q=0.9,uk;q=0.8,en;q=0.7" ,
"Accept-Charset" : "iso-8859-1, utf-8, utf-16, *;q=0.1" ,
"Accept-Encoding" : "identity, *;q=0" ,
"Connection" : "Keep-Alive" }
# www.google.com
# , HEADERS,
# Opera ,
# zlib compressed data, ..
# - ,
# ...
THREADS_COUNT = 10
# , -
DEEP = 30
# - , ,
# , ,
# .
ENCODING = "UTF-8"
# (
# )
#==============================</>===================================

LOCK = threading . RLock()
# threading
# LOCK, threading.RLock
# threading, - ,
#
#acquire() threading.RLock threading.Lock (
# threading) ,
#threading.RLock , threading.Lock
# .

#==================< >==================
import urllib2
# HTTP,
import urllib
# HTTP, urllib2,
# - urllib.urlquote
from Queue import Queue
#, "Pool", ,
# ,
#
import threading
# ,
#threading.active_count, threading.Thread, threading.Thread.start,
#threading.Rlock
import re
# ,
#
import time
# , sleep
queue = Queue()
# , (..
# Queue Queue )
#==================</ >=================

#==============================<>==============================
PROXY = "10.10.31.103:3128"
# -,
# ,
# PROXY, -.
# None
HEADERS = { "User-Agent" : "Opera/9.64 (Windows NT 5.1; U; en) Presto/2.1.1" ,
"Accept" : "text/html, application/xml;q=0.9, application/xhtml+xml, image/ png, image/jpeg, image/gif, image/x-xbitmap, */*;q=0.1" ,
"Accept-Language" : "ru,uk-UA;q=0.9,uk;q=0.8,en;q=0.7" ,
"Accept-Charset" : "iso-8859-1, utf-8, utf-16, *;q=0.1" ,
"Accept-Encoding" : "identity, *;q=0" ,
"Connection" : "Keep-Alive" }
# www.google.com
# , HEADERS,
# Opera ,
# zlib compressed data, ..
# - ,
# ...
THREADS_COUNT = 10
# , -
DEEP = 30
# - , ,
# , ,
# .
ENCODING = "UTF-8"
# (
# )
#==============================</>===================================

LOCK = threading . RLock()
# threading
# LOCK, threading.RLock
# threading, - ,
#
#acquire() threading.RLock threading.Lock (
# threading) ,
#threading.RLock , threading.Lock
# .


')
The main principle for the smooth implementation of multithreading, I think the modularity of the code. It is not necessary to take the functions you use into separate files or classes, it is enough that at least these are separate functions (Please forgive the pun).

Now I will highlight the work of the flow in a separate function, let it be a kind of abstract concept of work. At this stage, while some of the points will be incomprehensible, but later it will all line up in a clear algorithm.

def worker ():
# worker,
global queue
#
# ,
# (!)
while True :
# ,
try :
# , try/except,
# QueueEmpty , ,
#
target_link = queue . get_nowait()
#
# queue
except Exception , error:
#
return
#
parsed_data = get_and_parse_page(target_link)
# ,
#
if parsed_data != "ERROR" :
# ,
write_to_file(parsed_data)
#
else :
queue . put(target_link)
# , queue

def worker ():
# worker,
global queue
#
# ,
# (!)
while True :
# ,
try :
# , try/except,
# QueueEmpty , ,
#
target_link = queue . get_nowait()
#
# queue
except Exception , error:
#
return
#
parsed_data = get_and_parse_page(target_link)
# ,
#
if parsed_data != "ERROR" :
# ,
write_to_file(parsed_data)
#
else :
queue . put(target_link)
# , queue


The main thing that needs to be clearly understood is the algorithm for the operation of the stream itself, and that it is the threads that must be processed independently of each other. Total, the flow tasks are very simple - get a link to the search page, pass it to the handler function, from which links to the found sites and the titles of these sites will be returned, after writing the links and title to the file (all this will be in parsed_data).

As work progresses, tasks from the simplest to the most complex are implemented, so at these stages the issues of multithreading are practically not affected. Then came the turn of the implementation of the function that will be responsible for writing to the file.

def write_to_file (parsed_data):
# write_to_file, –
global LOCK
global ENCODING
LOCK . acquire()
#" ",
#
with open ( "parsed_data.txt" , "a" ) as out:
# with statement, parsed_data.txt
# "a", ,
# out ( )
for site in parsed_data:
# parsed data,
# site
link, title = site[ 0 ], site[ 1 ]
# link title site
title = title . replace( "<em>" , "" ) . replace( "</em>" , "" ) . replace( "<b>" , "" ) . replace( "</b>" , "" )
#.replace - HTML-, title
out . write( u"{link}|{title} \n " . format(link = link, title = title) . encode( "cp1251" ))
# ,
# .format, % ,
# , :
# | title \n - (
# cp1251)
LOCK . release()
#"" ,
# . -,
# , ,
# ( )
# β€œ ”
# parsed_data.txt

def write_to_file (parsed_data):
# write_to_file, –
global LOCK
global ENCODING
LOCK . acquire()
#" ",
#
with open ( "parsed_data.txt" , "a" ) as out:
# with statement, parsed_data.txt
# "a", ,
# out ( )
for site in parsed_data:
# parsed data,
# site
link, title = site[ 0 ], site[ 1 ]
# link title site
title = title . replace( "<em>" , "" ) . replace( "</em>" , "" ) . replace( "<b>" , "" ) . replace( "</b>" , "" )
#.replace - HTML-, title
out . write( u"{link}|{title} \n " . format(link = link, title = title) . encode( "cp1251" ))
# ,
# .format, % ,
# , :
# | title \n - (
# cp1251)
LOCK . release()
#"" ,
# . -,
# , ,
# ( )
# β€œ ”
# parsed_data.txt



Next comes the implementation of the get_and_parse_page function:
def get_and_parse_page (target_link):
# , –
global PROXY
# , PROXY
#
global HEADERS
# Headers
if PROXY is not None :
# PROXY None
proxy_handler = urllib2 . ProxyHandler( { "http" : "" + PROXY + "/" } )
# -
opener = urllib2 . build_opener(proxy_handler)
# opener c -
urllib2 . install_opener(opener)
# - ,
#, urllib2
#(
#PROXY)
page_request = urllib2 . Request(url = target_link, headers = HEADERS)
# Request, Request instance,
# GET ,
# ...
try :
# ,
#, ,
page = urllib2 . urlopen(url = page_request) . read() . decode( "UTF-8" , "replace" )
# page ,
# unicode UTF-8 (, www.google.com) (
#Python 2.6 unicode - (!))
except Exception ,error:
# error
print str (error)
# ,
#( )
return "ERROR"
# ,
harvested_data = re . findall( r'''\<li\ class\=g\>\<h3\ class\=r\>\<a\ href\=\"(.*?)".*?>(.*?)\<\/a\>\<\/h3\>''' , page)
# title
# , .
for data in harvested_data:
# harvested_data data
if data[ 0 ] . startswith( "/" ):
# data() /
harvested_data . remove(data)
# harvested_data
if ".google.com" in data[ 0 ]:
# data() .google.com
harvested_data . remove(data)
# harvested_data
return harvested_data
#

def get_and_parse_page (target_link):
# , –
global PROXY
# , PROXY
#
global HEADERS
# Headers
if PROXY is not None :
# PROXY None
proxy_handler = urllib2 . ProxyHandler( { "http" : "" + PROXY + "/" } )
# -
opener = urllib2 . build_opener(proxy_handler)
# opener c -
urllib2 . install_opener(opener)
# - ,
#, urllib2
#(
#PROXY)
page_request = urllib2 . Request(url = target_link, headers = HEADERS)
# Request, Request instance,
# GET ,
# ...
try :
# ,
#, ,
page = urllib2 . urlopen(url = page_request) . read() . decode( "UTF-8" , "replace" )
# page ,
# unicode UTF-8 (, www.google.com) (
#Python 2.6 unicode - (!))
except Exception ,error:
# error
print str (error)
# ,
#( )
return "ERROR"
# ,
harvested_data = re . findall( r'''\<li\ class\=g\>\<h3\ class\=r\>\<a\ href\=\"(.*?)".*?>(.*?)\<\/a\>\<\/h3\>''' , page)
# title
# , .
for data in harvested_data:
# harvested_data data
if data[ 0 ] . startswith( "/" ):
# data() /
harvested_data . remove(data)
# harvested_data
if ".google.com" in data[ 0 ]:
# data() .google.com
harvested_data . remove(data)
# harvested_data
return harvested_data
#

def get_and_parse_page (target_link):
# , –
global PROXY
# , PROXY
#
global HEADERS
# Headers
if PROXY is not None :
# PROXY None
proxy_handler = urllib2 . ProxyHandler( { "http" : "" + PROXY + "/" } )
# -
opener = urllib2 . build_opener(proxy_handler)
# opener c -
urllib2 . install_opener(opener)
# - ,
#, urllib2
#(
#PROXY)
page_request = urllib2 . Request(url = target_link, headers = HEADERS)
# Request, Request instance,
# GET ,
# ...
try :
# ,
#, ,
page = urllib2 . urlopen(url = page_request) . read() . decode( "UTF-8" , "replace" )
# page ,
# unicode UTF-8 (, www.google.com) (
#Python 2.6 unicode - (!))
except Exception ,error:
# error
print str (error)
# ,
#( )
return "ERROR"
# ,
harvested_data = re . findall( r'''\<li\ class\=g\>\<h3\ class\=r\>\<a\ href\=\"(.*?)".*?>(.*?)\<\/a\>\<\/h3\>''' , page)
# title
# , .
for data in harvested_data:
# harvested_data data
if data[ 0 ] . startswith( "/" ):
# data() /
harvested_data . remove(data)
# harvested_data
if ".google.com" in data[ 0 ]:
# data() .google.com
harvested_data . remove(data)
# harvested_data
return harvested_data
#



Finally it was the turn to implement the main body of the application:
def main ():
# ,
print "STARTED"
#
global THREADS_COUNT
global DEEP
global ENCODING
#
#
with open ( "requests.txt" ) as requests:
# requests
for request in requests:
# ,
# , ,
# , :)
request = request . translate( None , " \r\n " ) . decode(ENCODING, "replace" )
#
# ( )
empty_link = "www.google.com/search?hl=ru&client=opera&rls=ru&hs=67v&q={request}&start={N}&sa=N"
# ,
for i in xrange ( 0 , DEEP, 10 ):
# # 0 DEEP,
#
# 10, ..
# , .. 10, 20, 30 ( )
queue . put(empty_link . format(request = request . encode( "UTF-8" ), N = i))
#
# UTF-8 ( )
for _ in xrange (THREADS_COUNT):
#
thread_ = threading . Thread(target = worker)
# , target- ,
# ,
thread_ . start()
# start() ,
while threading . active_count() >1 :
# , 1 (,
# )
time . sleep( 1 )
# 1
print "FINISHED"
#

def main ():
# ,
print "STARTED"
#
global THREADS_COUNT
global DEEP
global ENCODING
#
#
with open ( "requests.txt" ) as requests:
# requests
for request in requests:
# ,
# , ,
# , :)
request = request . translate( None , " \r\n " ) . decode(ENCODING, "replace" )
#
# ( )
empty_link = "www.google.com/search?hl=ru&client=opera&rls=ru&hs=67v&q={request}&start={N}&sa=N"
# ,
for i in xrange ( 0 , DEEP, 10 ):
# # 0 DEEP,
#
# 10, ..
# , .. 10, 20, 30 ( )
queue . put(empty_link . format(request = request . encode( "UTF-8" ), N = i))
#
# UTF-8 ( )
for _ in xrange (THREADS_COUNT):
#
thread_ = threading . Thread(target = worker)
# , target- ,
# ,
thread_ . start()
# start() ,
while threading . active_count() >1 :
# , 1 (,
# )
time . sleep( 1 )
# 1
print "FINISHED"
#

def main ():
# ,
print "STARTED"
#
global THREADS_COUNT
global DEEP
global ENCODING
#
#
with open ( "requests.txt" ) as requests:
# requests
for request in requests:
# ,
# , ,
# , :)
request = request . translate( None , " \r\n " ) . decode(ENCODING, "replace" )
#
# ( )
empty_link = "www.google.com/search?hl=ru&client=opera&rls=ru&hs=67v&q={request}&start={N}&sa=N"
# ,
for i in xrange ( 0 , DEEP, 10 ):
# # 0 DEEP,
#
# 10, ..
# , .. 10, 20, 30 ( )
queue . put(empty_link . format(request = request . encode( "UTF-8" ), N = i))
#
# UTF-8 ( )
for _ in xrange (THREADS_COUNT):
#
thread_ = threading . Thread(target = worker)
# , target- ,
# ,
thread_ . start()
# start() ,
while threading . active_count() >1 :
# , 1 (,
# )
time . sleep( 1 )
# 1
print "FINISHED"
#



As a result, we get a normally working multithreaded parser. Naturally with many drawbacks, but beautifully written, I’m not happy to comment.

Code:
This article + source : sendspace.com/file/mw0pac
Code with russian comments: dumpz.org/15202
Code with Ukrainian comments: dumpz.org/15201

PS Yes, I know that this example will seem to someone irrational using the Queue (hello, cr0w). But the easiest way to handle error handling is using it.
PPS Material does not claim infallibility. Naturally, there are 100% bydlokod, no understanding by me of what I am describing, misunderstandings with terms, I am bydlokoder, etc. etc. But here there is something that you can’t scold - it WORKS, and it works exactly as it is expected of, the code is clear and commented so that it will be understandable even to a baby. I hope that it will help someone ...

Β© login999
uasc.org.ua

Source: https://habr.com/ru/post/78267/


All Articles