
Давно интересовался как можно распараллелить выполнение ресурсов при написании кода на Python. Года 2 назад, даже пробовал что-то написать, но для меня это было сложно т.к не имел большого опыта на данном языке программировании. Все жизни меняется, прогресс не стоит на месте, так получилось — я тоже не сидел на месте и изучал питон. Накапливал опыт с каждым написанным скриптом и его логикой. Совсем недавно, я сменил работу и прийдя на новое место, познакомился с одним из очень крутых чуваком на проекте. Он всячески мотивировал меня в разных отраслях. как-то он написал скрипт, который умел параллелить ресурсы. Когда я поглядел код, — он был простой и понятный мне. Мне тоже захотелось написать подобное! Вот тут и начал писать распараллеливание.
На самом то деле, есть несколько способов сделать это. Сейчас расскажу как можно достичь желаемого результата на примере, скачивания файлов с интернета.
В общем, многозадачность — это способность выполнять несколько задач одновременно, с технической точки зрения, многозадачность означает способность операционной системы выполнять разные задачи одновременно.
Например, вы загружаете что-то на свой компьютер и слушаете песни и одновременно играете в игру, все это выполняется одной и той же операционной системой (ОС), это не что иное, как многозадачность. В ОС есть два типа многозадачности:
- На основе потоков (Thread-Based): единый процесс, состоящий из отдельных задач. Пример: игра Vice-City состоит из различных потоков.
- На основе процессов (Process-Based): несколько потоков одновременно работают в одной ОС. Пример: загрузка, прослушивание песен и игра в игру.
Ну что, приступим к примерам…
Multithreading на Python в Unix/Linux
Что такое потоки (threads)?
thread — это так называемые независимые потоком выполнения какой-то из программ. Т.е один процесс может состоять из нескольких потоков, каждый поток в программе выполняет определенную задачу.
Пример: В игре Vice-City игра в целом представляет собой единый процесс, но состоит из нескольких потоков, отвечающих за воспроизведение музыки, принимающих ввод от пользователя.

Многопоточность в Python может быть достигнута путем импорта модуля потоковой передачи, но перед импортом модуля вам необходимо установить этот модуль в соответствующей среде IDE.
Ниже я приведу куски готовых программ и вы наглядно сможете увидеть пример работы.
Прежде чем перейти к созданию потоков, позвольте мне сказать вам, когда использовать многопоточность в Python?
Многопоточность очень полезна для экономии времени и повышения производительности, но не может применяться везде. В предыдущем примере Vice-City музыкальные потоки независимы, это поток, который получал ввод от пользователя. В случае, если эти потоки были взаимозависимыми, многопоточность использовать нельзя.
В реальной жизни вы можете вызывать веб запрос с помощью API или ждать пакета в сетевом сокете, поэтому в это время вы ждете, а ваш процессор ничего не делает. Многопоточность пытается использовать это время простоя, и во время этого простоя вы хотите, чтобы ваш процессор выполнял некоторую работу.
Итак, давайте воспользуемся многопоточностью, чтобы сократить время, и программа будет выполняться намного быстрее.
Как создать многопоточность в Python3?
Вот приведу пример кода:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import os
import argparse
import threading
from urllib.request import urlopen
from threading import Thread, ThreadError
class Bgcolors:
def __init__(self):
self.get = {
'HEADER': '\033[95m',
'OKBLUE': '\033[94m',
'OKGREEN': '\033[92m',
'WARNING': '\033[93m',
'FAIL': '\033[91m',
'ENDC': '\033[0m',
'BOLD': '\033[1m',
'UNDERLINE': '\033[4m'
}
class StartThreads(Thread):
def __init__(self, name, daemon, url):
"""Initialize the thread"""
try:
Thread.__init__(self)
self.name = name
self.daemon = daemon
self.url = url
self.setDaemon(daemon)
self.start()
except ThreadError as error:
print('Error: Unable to initialize the {0} thread: \n{1}'.format(
threading.currentThread().getName(),
error)
)
def run(self):
"""Run the thread"""
handle = urlopen(self.url)
fname = os.path.basename(self.url)
with open(fname, "wb") as f_handler:
while True:
chunk = handle.read(1024)
if not chunk:
break
f_handler.write(chunk)
print("{0} is starting".format(threading.currentThread().getName()))
msg = "{0} has finished downloading {1}!".format(self.name, self.url)
print(msg)
@staticmethod
def thread_active_count():
print("Active threads count: ", threading.activeCount())
def threads(urls):
try:
for item, url in enumerate(urls):
thread = StartThreads(name="Thread-{0}".format(item + 1), daemon=True, url=url)
StartThreads.thread_active_count()
if thread.daemon:
thread.join(timeout=1)
print("Threading is done")
except ThreadError as error:
print("Error: Unable to start thread: \n{0}".format(error))
return threads
def main():
start__time = time.time()
parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
usage='python3 script_name.py {ARGS}',
add_help=True,
prefix_chars='--/',
epilog='''created by Vitalii Natarov''')
parser.add_argument('--version', action='version', version='v0.1.0')
urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
]
threads(urls)
end__time = round(time.time() - start__time, 2)
print("--- %s seconds ---" % end__time)
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
if __name__ == '__main__':
main()
Многопоточность существует в одном процессе. Потоки будут совместно использовать адресное пространство, у них есть свои собственные наборы инструкций, каждый поток выполняет определенные задачи.
Что такое GIL? Почему это важно?
GIL (Global Interpreter Lock). Python не был разработан давно и в то время использовались только одноядерные процессоры и тогда не возникало никаких проблем (не предполагалось мулитипроцессорность), поэтому GIL необходим, потому что Python не является потокобезопасным, и существует глобальная блокировка при доступе к объекту Python. Что мы можем сделать?
Многопроцессорность позволяет создавать программы, которые могут выполняться одновременно (в обход GIL) и использовать все ядро вашего процессора. Библиотека многопроцессорной обработки предоставляет каждому процессу свой собственный интерпретатор Python и каждому свой собственный GIL.
Из-за этого обычные проблемы, связанные с потоками (например, повреждение данных и взаимоблокировки), больше не являются проблемой. Поскольку процессы не используют общую память, они не могут изменять одну и ту же память одновременно.
Глобальная блокировка интерпретатора Python
CPython (стандартная реализация Python) имеет нечто, называемое GIL (Global Interpreter Lock); GIL предотвращает одновременное выполнение двух потоков в одной программе. Однако два потока могут выполняться одновременно, и один может запускать код, а другой может ждать. GIL из коробки ограничивает параллельное программирование на Python.
Можно использовать локи, например вот так:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import os
import argparse
import logging
import threading
from urllib.request import urlopen
from threading import Thread, Lock, ThreadError
logging.basicConfig(level=logging.DEBUG,
format='[%(levelname)s] (%(ThreadName)-1s) %(message)s',
datefmt="%H:%M:%S"
)
class Bgcolors:
def __init__(self):
self.get = {
'HEADER': '\033[95m',
'OKBLUE': '\033[94m',
'OKGREEN': '\033[92m',
'WARNING': '\033[93m',
'FAIL': '\033[91m',
'ENDC': '\033[0m',
'BOLD': '\033[1m',
'UNDERLINE': '\033[4m'
}
class StartThreads(Thread):
def __init__(self, name, daemon, url):
"""Initialize the thread"""
try:
Thread.__init__(self)
self.name = name
self.daemon = daemon
self.url = url
self.lock = Lock()
self.setDaemon(daemon)
self.start()
except ThreadError as error:
print('Error: Unable to initialize the {0} thread: \n{1}'.format(
threading.currentThread().getName(),
error)
)
def run(self):
"""Run the thread"""
handle = urlopen(self.url)
fname = os.path.basename(self.url)
logging.debug('Waiting for a lock for {0}'.format(self.name))
self.lock.acquire()
try:
with open(fname, "wb") as f_handler:
while True:
chunk = handle.read(1024)
if not chunk:
break
f_handler.write(chunk)
print("{0} is starting".format(threading.currentThread().getName()))
msg = "{0} has finished downloading {1}!".format(self.name, self.url)
print(msg)
except Exception as error:
print('Error: Got issue with {0} thread: \n{1}'.format(self.name, error))
finally:
logging.debug('Released a lock for {0}'.format(threading.currentThread().getName()))
self.lock.release()
@staticmethod
def enumerate():
for thread in threading.enumerate():
print("Thread name is %s." % thread.getName())
@staticmethod
def thread_active_count():
print("Active threads count: ", threading.activeCount())
def threads(urls):
try:
print("TIMEOUT_MAX: ", threading.TIMEOUT_MAX)
for item, url in enumerate(urls):
thread = StartThreads(name="Thread-{0}".format(item + 1), daemon=True, url=url)
StartThreads.thread_active_count()
if thread.daemon:
thread.join(timeout=1)
print("Threading is done")
except ThreadError as error:
print("Error: Unable to start thread: \n{0}".format(error))
return threads
def main():
start__time = time.time()
parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
usage='python3 script_name.py {ARGS}',
add_help=True,
prefix_chars='--/',
epilog='''created by Vitalii Natarov''')
parser.add_argument('--version', action='version', version='v0.1.0')
urls = [
# "https://www.irs.gov/pub/irs-pdf/p3.pdf",
# "https://www.irs.gov/pub/irs-pdf/p15.pdf",
# "https://www.irs.gov/pub/irs-pdf/p15a.pdf",
# "https://www.irs.gov/pub/irs-pdf/p15b.pdf",
# "https://www.irs.gov/pub/irs-pdf/p15t.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
]
threads(urls)
end__time = round(time.time() - start__time, 2)
print("--- %s seconds ---" % end__time)
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
if __name__ == '__main__':
main()
Приведу примеры использования очереди:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import os
import argparse
import logging
import threading
from urllib.request import urlopen
from threading import Thread, ThreadError
from queue import Queue
logging.basicConfig(level=logging.DEBUG,
format='[%(levelname)s] (%(threadName)-1s) %(message)s',
datefmt="%H:%M:%S"
)
class Bgcolors:
def __init__(self):
self.get = {
'HEADER': '\033[95m',
'OKBLUE': '\033[94m',
'OKGREEN': '\033[92m',
'WARNING': '\033[93m',
'FAIL': '\033[91m',
'ENDC': '\033[0m',
'BOLD': '\033[1m',
'UNDERLINE': '\033[4m'
}
class StartThreads(Thread):
def __init__(self, name, daemon, queue):
"""Initialize the thread"""
try:
Thread.__init__(self)
self.name = name
self.daemon = daemon
self.queue = queue
self.setDaemon(daemon)
self.start()
except ThreadError as error:
print('Error: Unable to initialize the {0} thread: \n{1}'.format(
threading.currentThread().getName(),
error)
)
def run(self):
"""Run the thread"""
while True:
# Get the work from the queue and expand the tuple
directory, link = self.queue.get()
try:
fname = os.path.basename(link)
handle = urlopen(link)
file_dir = os.path.abspath(directory) + "/" + fname
print("File in directory ===== ", file_dir)
with open(file_dir, "wb") as f_handler:
while True:
chunk = handle.read(1024)
if not chunk:
break
f_handler.write(chunk)
print("{0} is starting".format(threading.currentThread().getName()))
msg = "{0} has finished downloading {1}!".format(self.name, link)
print(msg)
except Exception as error:
print('Error: Got issue with {0} thread: \n{1}'.format(self.name, error))
finally:
self.queue.task_done()
@staticmethod
def thread_active_count():
print("Active threads count: ", threading.activeCount())
def workers(urls, thread_counts):
file_dir = "dir"
if not os.path.exists(file_dir):
os.makedirs(file_dir)
try:
# Create a queue to communicate with the worker threads
queue = Queue()
# Create (thread_counts) worker threads
for item in range(thread_counts):
StartThreads(name="Thread-{0}".format(item + 1), daemon=True, queue=queue)
StartThreads.thread_active_count()
# Put the tasks into the queue as a tuple
for url in urls:
logging.info('Queueing {}'.format(url))
queue.put((file_dir, url))
# Causes the main thread to wait for the queue to finish processing all the tasks
queue.join()
# print("queue.empty() ------: ", queue.empty())
# StartThreads.thread_active_count()
print("Threading is done")
except ThreadError as error:
print("Error: Unable to start thread: \n{0}".format(error))
return workers
def main():
start__time = time.time()
parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
usage='python3 script_name.py {ARGS}',
add_help=True,
prefix_chars='--/',
epilog='''created by Vitalii Natarov''')
parser.add_argument('--version', action='version', version='v0.1.0')
parser.add_argument('--threads', dest='thread_counts', help='Set thread counts', default=10)
results = parser.parse_args()
thread_counts = results.thread_counts
urls = [
"https://www.irs.gov/pub/irs-pdf/p3.pdf",
"https://www.irs.gov/pub/irs-pdf/p15.pdf",
"https://www.irs.gov/pub/irs-pdf/p15a.pdf",
"https://www.irs.gov/pub/irs-pdf/p15b.pdf",
"https://www.irs.gov/pub/irs-pdf/p15t.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
]
workers(urls, thread_counts)
end__time = round(time.time() - start__time, 2)
print("--- %s seconds ---" % end__time)
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
if __name__ == '__main__':
main()
Еще один вариант кода:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import time
import argparse
import threading
from urllib.request import urlopen
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures import as_completed
from concurrent.futures.process import BrokenProcessPool
class Bgcolors:
def __init__(self):
self.get = {
'HEADER': '\033[95m',
'OKBLUE': '\033[94m',
'OKGREEN': '\033[92m',
'WARNING': '\033[93m',
'FAIL': '\033[91m',
'ENDC': '\033[0m',
'BOLD': '\033[1m',
'UNDERLINE': '\033[4m'
}
def download_file(url, directory):
print('[{0}] is starting.....'.format(threading.current_thread().name))
handle = urlopen(url)
fname = os.path.basename(url)
file_dir = os.path.abspath(directory) + "/" + fname
print("File in directory ===== ", file_dir)
try:
with open(file_dir, "wb") as f_handler:
while True:
chunk = handle.read(1024)
if not chunk:
break
f_handler.write(chunk)
return "The {0} has been downloaded!".format(fname)
except Exception as error:
print("Woops: {0}".format(error))
return "Woops: {0}".format(error)
def threads_pool_executing(workers_count, file_dir, urls):
print('[{0}] is starting.....'.format(threading.current_thread().name))
try:
with ThreadPoolExecutor(max_workers=workers_count, thread_name_prefix='thread') as ex:
# future_to_url = {ex.submit(download_file, url, file_dir) for url in urls}
# future_to_url = ex.map(download_file, url, file_dir) for url in urls
future_to_url = dict((ex.submit(download_file, url, file_dir), url)
for url in urls)
try:
for future in as_completed(future_to_url):
url = future_to_url[future]
if future.exception() is not None:
print('%r generated an exception: %s' % (url,
future.exception()))
else:
print('%r page is %d bytes' % (url, len(future.result())))
except BrokenProcessPool as error:
print('could not start new tasks: {}'.format(error))
except Exception as error:
print("Woops: {0}".format(error))
return threads_pool_executing
def main():
start__time = time.time()
parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
usage='python3 script_name.py {ARGS}',
add_help=True,
prefix_chars='--/',
epilog='''created by Vitalii Natarov''')
parser.add_argument('--version', action='version', version='v0.1.0')
parser.add_argument('--workers', dest='workers_counts', help='Set workers counts for pool executing', default=10)
results = parser.parse_args()
workers_counts = results.workers_counts
urls = [
"https://www.irs.gov/pub/irs-pdf/p3.pdf",
"https://www.irs.gov/pub/irs-pdf/p15.pdf",
"https://www.irs.gov/pub/irs-pdf/p15a.pdf",
"https://www.irs.gov/pub/irs-pdf/p15b.pdf",
"https://www.irs.gov/pub/irs-pdf/p15t.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
] * 100
file_dir = "dir"
if not os.path.exists(file_dir):
os.makedirs(file_dir)
threads_pool_executing(workers_counts, file_dir, urls)
end__time = round(time.time() - start__time, 2)
print("--- %s seconds ---" % end__time)
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
if __name__ == '__main__':
main()
Я потратил неделю времени на данные примеры и разобраться как работает разный из примеров. Но зато, было увлекательно!
Multiprocessing на Python в Unix/Linux
Что такое multiprocessing?
Многопроцессорность позволяет системе поддерживать более одного процессора одновременно. При многопроцессорной обработке процессы порождаются путем создания объекта Process и последующего вызова его start() метода.
Мультиобработка — это разные программы или процессы, запущенные на вашем компьютере. У процесса есть собственная виртуальная память или адресное пространство, теперь он может создавать внутри себя несколько потоков. Если процессы должны взаимодействовать друг с другом, они используют методы межпроцессного взаимодействия, такие как файл на диске, общая память (очередь) или канал сообщений.
Обмен данными между процессами можно доститьс помощью очереди. Поскольку мы знаем, что у нескольких процессов есть собственное адресное пространство, они не разделяют адресное пространство, что приводит к проблеме, поэтому для обмена данными между процессами вам необходимо использовать некоторые методы:

Приведу простой пример мультипроцессора:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import os
import argparse
import threading
from urllib.request import urlopen
from multiprocessing import Process, ProcessError
class Bgcolors:
def __init__(self):
self.get = {
'HEADER': '\033[95m',
'OKBLUE': '\033[94m',
'OKGREEN': '\033[92m',
'WARNING': '\033[93m',
'FAIL': '\033[91m',
'ENDC': '\033[0m',
'BOLD': '\033[1m',
'UNDERLINE': '\033[4m'
}
class StartProcesses(Process):
def __init__(self, name, daemon, directory, url):
"""Initialize the thread"""
try:
Process.__init__(self)
self.name = name
self.url = url
self.directory = directory
self.daemon = daemon
self.start()
except ProcessError as error:
print('Error: Unable to initialize the {0} thread: \n{1}'.format(
threading.currentThread().getName(),
error)
)
def run(self):
"""Run the thread"""
handle = urlopen(self.url)
fname = os.path.basename(self.url)
file_dir = os.path.abspath(self.directory) + "/" + fname
print("File in directory ===== ", file_dir)
with open(file_dir, "wb") as f_handler:
while True:
chunk = handle.read(1024)
if not chunk:
break
f_handler.write(chunk)
print("{0} is starting".format(multiprocessing.current_process().name))
msg = "{0} has finished downloading {1}!".format(self.name, self.url)
print(msg)
@staticmethod
def thread_active_count():
print("Active threads count: ", threading.activeCount())
def processes(file_dir, urls):
try:
for item, url in enumerate(urls):
process = StartProcesses(name="Process-{0}".format(item + 1), daemon=False, directory=file_dir, url=url)
print("process: ", process)
if process.daemon:
process.join()
StartProcesses.thread_active_count()
print("StartProcesses is done")
except ProcessError as error:
print("Error: Unable to start a process: \n{0}".format(error))
return processes
def main():
start__time = time.time()
parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
usage='python3 script_name.py {ARGS}',
add_help=True,
prefix_chars='--/',
epilog='''created by Vitalii Natarov''')
parser.add_argument('--version', action='version', version='v0.1.0')
urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
]
file_dir = "dir"
if not os.path.exists(file_dir):
os.makedirs(file_dir)
processes(file_dir, urls)
end__time = round(time.time() - start__time, 2)
print("--- %s seconds ---" % end__time)
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
if __name__ == '__main__':
main()
Пишем код, но с очередями:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import os
import argparse
import logging
import threading
import multiprocessing
from urllib.request import urlopen
from multiprocessing import Process, ProcessError, JoinableQueue
class Bgcolors:
def __init__(self):
self.get = {
'HEADER': '\033[95m',
'OKBLUE': '\033[94m',
'OKGREEN': '\033[92m',
'WARNING': '\033[93m',
'FAIL': '\033[91m',
'ENDC': '\033[0m',
'BOLD': '\033[1m',
'UNDERLINE': '\033[4m'
}
class StartProcesses(Process):
def __init__(self, name, daemon, queue):
"""Initialize the thread"""
try:
Process.__init__(self)
self.name = name
self.queue = queue
self.event = multiprocessing.Event()
self.daemon = daemon
self.start()
except ProcessError as error:
print('Error: Unable to initialize the {0} thread: \n{1}'.format(
multiprocessing.current_process().name,
error)
)
def run(self):
"""Run the thread"""
while True:
# Get the work from the queue and expand the tuple
directory, link = self.queue.get(timeout=60)
try:
fname = os.path.basename(link)
handle = urlopen(link)
file_dir = os.path.abspath(directory) + "/" + fname
print("File in directory ===== ", file_dir)
with open(file_dir, "wb") as f_handler:
while True:
chunk = handle.read(1024)
if not chunk:
break
f_handler.write(chunk)
print("{0} is starting".format(multiprocessing.current_process().name))
msg = "{0} has finished downloading {1}!".format(self.name, link)
print(msg)
except Exception as error:
print('Error: Got issue with {0} thread: \n{1}'.format(self.name, error))
# finally:
# # print(self.event)
# # self.stop()
# # self.queue.task_done()
@staticmethod
def thread_active_count():
print("Active threads count: ", threading.activeCount())
@staticmethod
def current_process_pid():
print("Active process count: ", multiprocessing.current_process().pid)
def processes(cores, file_dir, urls):
try:
if cores is None:
cpu_cores = multiprocessing.cpu_count()
else:
cpu_cores = cores
all_processes = []
# Create a queue to communicate with the worker threads
queue = JoinableQueue()
for item in range(cpu_cores):
process = StartProcesses(name="Process-{0}".format(item + 1), daemon=True, queue=queue)
all_processes.append(process)
print("process: ", process)
StartProcesses.thread_active_count()
# Put the tasks into the queue as a tuple
for url in urls:
logging.info('Queueing {}'.format(url))
queue.put((file_dir, url))
# Causes the main thread to wait for the queue to finish processing all the tasks
for pr in all_processes:
pr.join(timeout=1)
print("StartProcesses is done")
except ProcessError as error:
print("Error: Unable to start a process: \n{0}".format(error))
return processes
def main():
start__time = time.time()
parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
usage='python3 script_name.py {ARGS}',
add_help=True,
prefix_chars='--/',
epilog='''created by Vitalii Natarov''')
parser.add_argument('--version', action='version', version='v0.1.0')
parser.add_argument('--dir', dest='file_dir', help='Set directory for files', default="dir")
parser.add_argument('--cores', dest='cpu_cores',
help='Set cores for work. If None - will use cores count from host',
default=None)
results = parser.parse_args()
file_dir = results.file_dir
cores = results.cpu_cores
urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
] * 10
if not os.path.exists(file_dir):
os.makedirs(file_dir)
processes(cores, file_dir, urls)
end__time = round(time.time() - start__time, 2)
print("--- %s seconds ---" % end__time)
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
if __name__ == '__main__':
main()
Можно добавить локи:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import os
import argparse
import logging
import threading
import multiprocessing
from urllib.request import urlopen
from multiprocessing import Process, ProcessError, Lock
logging.basicConfig(level=logging.DEBUG,
format='[%(levelname)s] (%(processName)-1s) %(message)s',
datefmt="%H:%M:%S"
)
class Bgcolors:
def __init__(self):
self.get = {
'HEADER': '\033[95m',
'OKBLUE': '\033[94m',
'OKGREEN': '\033[92m',
'WARNING': '\033[93m',
'FAIL': '\033[91m',
'ENDC': '\033[0m',
'BOLD': '\033[1m',
'UNDERLINE': '\033[4m'
}
class StartProcesses(Process):
def __init__(self, name, daemon, directory, url):
"""Initialize the thread"""
try:
Process.__init__(self)
self.name = name
self.directory = directory
self.url = url
self.daemon = daemon
self.lock = Lock()
self.start()
except ProcessError as error:
print('Error: Unable to initialize the {0} thread: \n{1}'.format(
multiprocessing.current_process().name,
error)
)
def run(self):
"""Run the thread"""
handle = urlopen(self.url)
fname = os.path.basename(self.url)
file_dir = os.path.abspath(self.directory) + "/" + fname
print("File in directory ===== ", file_dir)
logging.debug('Waiting for a lock for {0}'.format(self.name))
self.lock.acquire()
try:
with open(file_dir, "wb") as f_handler:
while True:
chunk = handle.read(1024)
if not chunk:
break
f_handler.write(chunk)
print("{0} is starting".format(self.name))
msg = "{0} has finished downloading {1}!".format(self.name, self.url)
print(msg)
except Exception as error:
print('Error: Got issue with {0} thread: \n{1}'.format(self.name, error))
finally:
logging.debug('Released a lock for {0}'.format(multiprocessing.current_process().name))
self.lock.release()
@staticmethod
def thread_active_count():
print("Active threads count: ", threading.activeCount())
@staticmethod
def current_process_pid():
print("Active process count: ", multiprocessing.current_process().pid)
def processes(file_dir, urls):
try:
for item, url in enumerate(urls):
process = StartProcesses(name="Process-{0}".format(item + 1), daemon=False, directory=file_dir, url=url)
print("process: ", process)
if process.daemon:
process.join()
StartProcesses.thread_active_count()
print("StartProcesses is done")
except ProcessError as error:
print("Error: Unable to start a process: \n{0}".format(error))
return processes
def main():
start__time = time.time()
parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
usage='python3 script_name.py {ARGS}',
add_help=True,
prefix_chars='--/',
epilog='''created by Vitalii Natarov''')
parser.add_argument('--version', action='version', version='v0.1.0')
parser.add_argument('--dir', dest='file_dir', help='Set directory for files', default="dir")
results = parser.parse_args()
file_dir = results.file_dir
urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
] * 1
if not os.path.exists(file_dir):
os.makedirs(file_dir)
processes(file_dir, urls)
end__time = round(time.time() - start__time, 2)
print("--- %s seconds ---" % end__time)
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
if __name__ == '__main__':
main()
Можно тоже самое сделать с менеджером мультипроцессора:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import os
import argparse
import logging
import threading
import multiprocessing
from urllib.request import urlopen
from multiprocessing import Process, ProcessError, Manager
logging.basicConfig(level=logging.DEBUG,
format='[%(levelname)s] (%(processName)-1s) %(message)s',
datefmt="%H:%M:%S"
)
class Bgcolors:
def __init__(self):
self.get = {
'HEADER': '\033[95m',
'OKBLUE': '\033[94m',
'OKGREEN': '\033[92m',
'WARNING': '\033[93m',
'FAIL': '\033[91m',
'ENDC': '\033[0m',
'BOLD': '\033[1m',
'UNDERLINE': '\033[4m'
}
class StartProcesses(Process):
def __init__(self, name, daemon, queue):
"""Initialize the thread"""
try:
Process.__init__(self)
self.name = name
self.queue = queue
self.event = multiprocessing.Event()
self.daemon = daemon
self.start()
except ProcessError as error:
print('Error: Unable to initialize the {0} thread: \n{1}'.format(
multiprocessing.current_process().name,
error)
)
def run(self):
"""Run the thread"""
while True:
# Get the work from the queue and expand the tuple
directory, link = self.queue.get(timeout=60)
try:
fname = os.path.basename(link)
handle = urlopen(link)
file_dir = os.path.abspath(directory) + "/" + fname
print("File in directory ===== ", file_dir)
with open(file_dir, "wb") as f_handler:
while True:
chunk = handle.read(1024)
if not chunk:
break
f_handler.write(chunk)
print("{0} is starting".format(multiprocessing.current_process().name))
msg = "{0} has finished downloading {1}!".format(self.name, link)
print(msg)
except Exception as error:
print('Error: Got issue with {0} thread: \n{1}'.format(self.name, error))
# finally:
# # print(self.event)
# # self.stop()
# # self.queue.task_done()
@staticmethod
def thread_active_count():
print("Active threads count: ", threading.activeCount())
@staticmethod
def current_process_pid():
print("Active process count: ", multiprocessing.current_process().pid)
def processes(cores, file_dir, urls):
try:
if cores is None:
cpu_cores = multiprocessing.cpu_count()
else:
cpu_cores = cores
all_processes = []
# Create process manager & a queue to communicate with the worker threads
process_manager = Manager()
queue = process_manager.Queue()
for item in range(cpu_cores):
process = StartProcesses(name="Process-{0}".format(item + 1), daemon=True, queue=queue)
all_processes.append(process)
# Put the tasks into the queue as a tuple
for url in urls:
logging.info('Queueing {}'.format(url))
queue.put((file_dir, url))
# Causes the main thread to wait for the queue to finish processing all the tasks
for pr in all_processes:
pr.join(timeout=1)
print("StartProcesses is done")
except ProcessError as error:
print("Error: Unable to start a process: \n{0}".format(error))
return processes
def main():
start__time = time.time()
parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
usage='python3 script_name.py {ARGS}',
add_help=True,
prefix_chars='--/',
epilog='''created by Vitalii Natarov''')
parser.add_argument('--version', action='version', version='v0.1.0')
parser.add_argument('--dir', dest='file_dir', help='Set directory for files', default="dir")
parser.add_argument('--cores', dest='cpu_cores',
help='Set cores for work. If None - will use cores count from host',
default=None)
results = parser.parse_args()
file_dir = results.file_dir
cores = results.cpu_cores
urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
] * 10
if not os.path.exists(file_dir):
os.makedirs(file_dir)
processes(cores, file_dir, urls)
end__time = round(time.time() - start__time, 2)
print("--- %s seconds ---" % end__time)
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
if __name__ == '__main__':
main()
А как насчет Pool? Та легко, вот пример:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import os
import argparse
import threading
import multiprocessing
from functools import partial
from urllib.request import urlopen
from multiprocessing import ProcessError, Pool
class Bgcolors:
def __init__(self):
self.get = {
'HEADER': '\033[95m',
'OKBLUE': '\033[94m',
'OKGREEN': '\033[92m',
'WARNING': '\033[93m',
'FAIL': '\033[91m',
'ENDC': '\033[0m',
'BOLD': '\033[1m',
'UNDERLINE': '\033[4m'
}
def download_file(url, directory):
handle = urlopen(url)
fname = os.path.basename(url)
file_dir = os.path.abspath(directory) + "/" + fname
print("File in directory ===== ", file_dir)
print("{0} is starting".format(threading.currentThread().getName()))
with open(file_dir, "wb") as f_handler:
while True:
chunk = handle.read(1024)
if not chunk:
break
f_handler.write(chunk)
msg = "{0} has finished downloading {1}!".format(threading.currentThread().getName(), url)
print(msg)
return msg
def thread_active_count():
print("Active threads count: ", threading.activeCount())
def processes(urls, directory, cores, chanksize):
if cores is None:
cpu_cores = multiprocessing.cpu_count()
else:
cpu_cores = cores
try:
thread_active_count()
with Pool(processes=cpu_cores) as pool:
thread_active_count()
download_file_x = partial(download_file, directory=directory)
pool.map(download_file_x, urls, chunksize=chanksize)
# pool.imap(download_file_x, urls, chunksize=chanksize)
# pool.imap_unordered(download_file_x, urls, chunksize=chanksize)
thread_active_count()
except ProcessError as error:
print('Error: Unable to initialize the {0} thread: \n{1}'.format(
threading.currentThread().getName(),
error)
)
return processes
def main():
start__time = time.time()
parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
usage='python3 script_name.py {ARGS}',
add_help=True,
prefix_chars='--/',
epilog='''created by Vitalii Natarov''')
parser.add_argument('--version', action='version', version='v0.1.0')
parser.add_argument('--dir', dest='file_dir', help='Set directory for files', default="dir")
parser.add_argument('--chanksize', dest='chanksize', help='Set chanksize', default=1)
parser.add_argument('--cores', dest='cpu_cores',
help='Set cores for work. If None - will use cores count from host',
default=None)
results = parser.parse_args()
file_dir = results.file_dir
cores = results.cpu_cores
chanksize = results.chanksize
urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
] * 10
if not os.path.exists(file_dir):
os.makedirs(file_dir)
processes(urls, file_dir, cores, chanksize)
end__time = round(time.time() - start__time, 2)
print("--- %s seconds ---" % end__time)
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
if __name__ == '__main__':
main()
Или:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import time
import argparse
import threading
from urllib.request import urlopen
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
from concurrent.futures.process import BrokenProcessPool
class Bgcolors:
def __init__(self):
self.get = {
'HEADER': '\033[95m',
'OKBLUE': '\033[94m',
'OKGREEN': '\033[92m',
'WARNING': '\033[93m',
'FAIL': '\033[91m',
'ENDC': '\033[0m',
'BOLD': '\033[1m',
'UNDERLINE': '\033[4m'
}
def download_file(url, directory):
print('[{0}] is starting.....'.format(
threading.current_thread().name)
)
handle = urlopen(url)
fname = os.path.basename(url)
file_dir = os.path.abspath(directory) + "/" + fname
print("File in directory ===== ", file_dir)
try:
with open(file_dir, "wb") as f_handler:
while True:
chunk = handle.read(1024)
if not chunk:
break
f_handler.write(chunk)
return "The {0} has been downloaded!".format(fname)
except Exception as error:
print("Woops: {0}".format(error))
return "Woops: {0}".format(error)
def threads_pool_executing(workers_count, file_dir, urls):
print('[{0}] is starting.....'.format(threading.current_thread().name))
try:
with ProcessPoolExecutor(max_workers=workers_count) as ex:
# future_to_url = {ex.submit(download_file, url, file_dir) for url in urls}
# future_to_url = ex.map(download_file, url, file_dir) for url in urls
future_to_url = dict((ex.submit(download_file, url, file_dir), url)
for url in urls)
try:
for future in as_completed(future_to_url):
url = future_to_url[future]
if future.exception() is not None:
print('%r generated an exception: %s' % (url,
future.exception()))
else:
print('%r page is %d bytes' % (url, len(future.result())))
except BrokenProcessPool as error:
print('could not start new tasks: {}'.format(error))
except Exception as error:
print("Woops: {0}".format(error))
return threads_pool_executing
def main():
start__time = time.time()
parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
usage='python3 script_name.py {ARGS}',
add_help=True,
prefix_chars='--/',
epilog='''created by Vitalii Natarov''')
parser.add_argument('--version', action='version', version='v0.1.0')
parser.add_argument('--workers', dest='workers_counts', help='Set workers counts for pool executing', default=10)
results = parser.parse_args()
workers_counts = results.workers_counts
urls = [
"https://www.irs.gov/pub/irs-pdf/p3.pdf",
"https://www.irs.gov/pub/irs-pdf/p15.pdf",
"https://www.irs.gov/pub/irs-pdf/p15a.pdf",
"https://www.irs.gov/pub/irs-pdf/p15b.pdf",
"https://www.irs.gov/pub/irs-pdf/p15t.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
] * 100
file_dir = "dir"
if not os.path.exists(file_dir):
os.makedirs(file_dir)
threads_pool_executing(workers_counts, file_dir, urls)
end__time = round(time.time() - start__time, 2)
print("--- %s seconds ---" % end__time)
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
if __name__ == '__main__':
main()
Как-то так!
AsyncIO на Python в Unix/Linux
Теории пока не дам, почитайте на досуге сами, а вот примеры приведу:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import os
import argparse
import logging
import asyncio
import aiohttp
from functools import partial
logging.basicConfig(level=logging.DEBUG,
format='[%(levelname)s] %(asctime)s - (%(processName)-1s) - %(name)s - %(message)s '
'[in %(funcName)s: %(pathname)s:%(lineno)d]',
datefmt="%H:%M:%S"
)
logger = logging.getLogger(__name__)
class Bgcolors:
def __init__(self):
self.get = {
'HEADER': '\033[95m',
'OKBLUE': '\033[94m',
'OKGREEN': '\033[92m',
'WARNING': '\033[93m',
'FAIL': '\033[91m',
'ENDC': '\033[0m',
'BOLD': '\033[1m',
'UNDERLINE': '\033[4m'
}
async def dir_check(directory):
print('Check if the dir is present, else - create it')
try:
if not os.path.exists(directory):
os.makedirs(directory)
logger.info("The {0} has been created".format(directory))
except OSError as err:
logger.error("Directory {0} can not be created: \n{1}".format(directory, err))
return directory
async def download_file(session, url, directory):
async with session.get(url) as response:
fname = os.path.basename(url)
file_dir = os.path.abspath(directory) + "/" + fname
logger.info("File in directory ===== {}".format(file_dir))
with open(file_dir, "wb") as f_handler:
logger.info("{0} is starting".format(fname))
while True:
# await pauses execution until the 1024 (or less) bytes are read from the stream
chunk = await response.content.read(1024)
if not chunk:
# We are done reading the file, break out of the while loop
break
f_handler.write(chunk)
logger.info("{0} has finished downloading {1}!".format(fname, url))
return download_file
async def fetch(session, url):
async with session.get(url, raise_for_status=False) as resp:
# assert resp.status == 200
print(resp.status, url)
return resp.status, url
async def main():
start__time = time.time()
parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
usage='python3 script_name.py {ARGS}',
add_help=True,
prefix_chars='--/',
epilog='''created by Vitalii Natarov''')
parser.add_argument('--version', action='version', version='v0.1.0')
parser.add_argument('--dir', dest='file_dir', help='Set directory for files', default="dir")
results = parser.parse_args()
file_dir = results.file_dir
urls = [
"http://www.irs.gov/pub/irs-pdf/f1040.pdf",
"https://www.irs.gov/pub/irs-prior/f1040a--2015.pdf",
"https://www.irs.gov/pub/irs-prior/i1040ez--2017.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
] * 1
urls_list = []
async with aiohttp.ClientSession(raise_for_status=True) as session:
for url in urls:
# urls_list.append(partial(fetch, session=session, url=url))
urls_list.append(partial(download_file, session=session, url=url, directory=file_dir))
await dir_check(file_dir)
for f in urls_list:
await f()
end__time = round(time.time() - start__time, 2)
print("--- %s seconds ---" % end__time)
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.set_debug(False)
try:
tasks = [
loop.create_task(main())
]
wait_tasks = asyncio.wait(tasks)
loop.run_until_complete(wait_tasks)
# loop.run_until_complete(main())
except Exception as error:
logger.error(error)
finally:
# Shutdown the loop even if there is an exception
loop.close()
Не понравился данный модуль, если честно.
Gevent на Python в Unix/Linux
Вот пример данной реализации:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import os
import argparse
import logging
import gevent
import gevent.monkey
from urllib.request import urlopen
logging.basicConfig(level=logging.DEBUG,
format='[%(levelname)s] %(asctime)s - (%(processName)-1s) - %(name)s - %(message)s '
'[in %(funcName)s: %(pathname)s:%(lineno)d]',
datefmt="%H:%M:%S"
)
logger = logging.getLogger(__name__)
class Bgcolors:
def __init__(self):
self.get = {
'HEADER': '\033[95m',
'OKBLUE': '\033[94m',
'OKGREEN': '\033[92m',
'WARNING': '\033[93m',
'FAIL': '\033[91m',
'ENDC': '\033[0m',
'BOLD': '\033[1m',
'UNDERLINE': '\033[4m'
}
def dir_check(directory):
print('Check if the dir is present, else - create it')
try:
if not os.path.exists(directory):
os.makedirs(directory)
logger.info("The {0} has been created".format(directory))
except OSError as err:
logger.error("Directory {0} can not be created: \n{1}".format(directory, err))
return directory
def download_file(directory, url):
handle = urlopen(url)
fname = os.path.basename(url)
file_dir = os.path.abspath(directory) + "/" + fname
logger.info("File in directory ===== {}".format(file_dir))
with open(file_dir, "wb") as f_handler:
while True:
chunk = handle.read(1024)
if not chunk:
break
f_handler.write(chunk)
return download_file
def main():
start__time = time.time()
parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
usage='python3 script_name.py {ARGS}',
add_help=True,
prefix_chars='--/',
epilog='''created by Vitalii Natarov''')
parser.add_argument('--version', action='version', version='v0.1.0')
parser.add_argument('--dir', dest='file_dir', help='Set directory for files', default="dir")
results = parser.parse_args()
file_dir = results.file_dir
urls = [
"http://www.irs.gov/pub/irs-pdf/f1040.pdf",
"https://www.irs.gov/pub/irs-prior/f1040a--2015.pdf",
"https://www.irs.gov/pub/irs-prior/i1040ez--2017.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
] * 1
gevent.monkey.patch_all()
threads = [
gevent.spawn(dir_check, file_dir),
]
for url in urls:
threads.append(gevent.spawn(download_file, directory=file_dir, url=url))
# gevent.joinall(threads)
gevent.wait(threads)
end__time = round(time.time() - start__time, 2)
print("--- %s seconds ---" % end__time)
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
Bgcolors().get['ENDC'])
print(
Bgcolors().get['OKGREEN'], "============================================================",
Bgcolors().get['ENDC'])
if __name__ == '__main__':
main()
Как-то так!
Код выложил на гитхабе — https://github.com/SebastianUA/concurrency-in-python . Можно заюзать как-то.
Вот и все, статья «Параллелизм на Python в Unix/Linux» завершена.