Параллелизм на Python в Unix/Linux

Давно интересовался как можно распараллелить выполнение ресурсов при написании кода на Python. Года 2 назад, даже пробовал что-то написать, но для меня это было сложно т.к не имел большого опыта на данном языке программировании. Все жизни меняется, прогресс не стоит на месте, так получилось — я тоже не сидел на месте и изучал питон. Накапливал опыт с каждым написанным скриптом и его логикой. Совсем недавно, я сменил работу и прийдя на новое место, познакомился с одним из очень крутых чуваком на проекте. Он всячески мотивировал меня в разных отраслях. как-то он написал скрипт, который умел параллелить ресурсы. Когда я поглядел код, — он был простой и понятный мне. Мне тоже захотелось написать подобное! Вот тут и начал писать распараллеливание.

На самом то деле, есть несколько способов сделать это. Сейчас расскажу как можно достичь желаемого результата на примере, скачивания файлов с интернета.

В общем, многозадачность — это способность выполнять несколько задач одновременно, с технической точки зрения, многозадачность означает способность операционной системы выполнять разные задачи одновременно.
Например, вы загружаете что-то на свой компьютер и слушаете песни и одновременно играете в игру, все это выполняется одной и той же операционной системой (ОС), это не что иное, как многозадачность. В ОС есть два типа многозадачности:

  • На основе потоков (Thread-Based): единый процесс, состоящий из отдельных задач. Пример: игра Vice-City состоит из различных потоков.
  • На основе процессов (Process-Based): несколько потоков одновременно работают в одной ОС. Пример: загрузка, прослушивание песен и игра в игру.

Ну что, приступим к примерам…

Multithreading на Python в Unix/Linux

Что такое потоки (threads)?
thread — это так называемые независимые потоком выполнения какой-то из программ. Т.е один процесс может состоять из нескольких потоков, каждый поток в программе выполняет определенную задачу.

Пример: В игре Vice-City игра в целом представляет собой единый процесс, но состоит из нескольких потоков, отвечающих за воспроизведение музыки, принимающих ввод от пользователя.

Многопоточность в Python может быть достигнута путем импорта модуля потоковой передачи, но перед импортом модуля вам необходимо установить этот модуль в соответствующей среде IDE.

Ниже я приведу куски готовых программ и вы наглядно сможете увидеть пример работы.

Прежде чем перейти к созданию потоков, позвольте мне сказать вам, когда использовать многопоточность в Python?
Многопоточность очень полезна для экономии времени и повышения производительности, но не может применяться везде. В предыдущем примере Vice-City музыкальные потоки независимы, это поток, который получал ввод от пользователя. В случае, если эти потоки были взаимозависимыми, многопоточность использовать нельзя.

В реальной жизни вы можете вызывать веб запрос с помощью API или ждать пакета в сетевом сокете, поэтому в это время вы ждете, а ваш процессор ничего не делает. Многопоточность пытается использовать это время простоя, и во время этого простоя вы хотите, чтобы ваш процессор выполнял некоторую работу.
Итак, давайте воспользуемся многопоточностью, чтобы сократить время, и программа будет выполняться намного быстрее.

Как создать многопоточность в Python3?

Вот приведу пример кода:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import os
import argparse
import threading

from urllib.request import urlopen
from threading import Thread, ThreadError


class Bgcolors:
    def __init__(self):
        self.get = {
            'HEADER': '\033[95m',
            'OKBLUE': '\033[94m',
            'OKGREEN': '\033[92m',
            'WARNING': '\033[93m',
            'FAIL': '\033[91m',
            'ENDC': '\033[0m',
            'BOLD': '\033[1m',
            'UNDERLINE': '\033[4m'
        }


class StartThreads(Thread):
    def __init__(self, name, daemon, url):
        """Initialize the thread"""
        try:
            Thread.__init__(self)
            self.name = name
            self.daemon = daemon
            self.url = url
            self.setDaemon(daemon)
            self.start()
        except ThreadError as error:
            print('Error: Unable to initialize the {0} thread: \n{1}'.format(
                threading.currentThread().getName(),
                error)
            )

    def run(self):
        """Run the thread"""
        handle = urlopen(self.url)
        fname = os.path.basename(self.url)
        with open(fname, "wb") as f_handler:
            while True:
                chunk = handle.read(1024)
                if not chunk:
                    break
                f_handler.write(chunk)
        print("{0} is starting".format(threading.currentThread().getName()))
        msg = "{0} has finished downloading {1}!".format(self.name, self.url)
        print(msg)

    @staticmethod
    def thread_active_count():
        print("Active threads count: ", threading.activeCount())


def threads(urls):
    try:
        for item, url in enumerate(urls):
            thread = StartThreads(name="Thread-{0}".format(item + 1), daemon=True, url=url)

            StartThreads.thread_active_count()

            if thread.daemon:
                thread.join(timeout=1)

        print("Threading is done")
    except ThreadError as error:
        print("Error: Unable to start thread: \n{0}".format(error))

    return threads


def main():
    start__time = time.time()

    parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
                                     usage='python3 script_name.py {ARGS}',
                                     add_help=True,
                                     prefix_chars='--/',
                                     epilog='''created by Vitalii Natarov''')
    parser.add_argument('--version', action='version', version='v0.1.0')

    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
            ]

    threads(urls)

    end__time = round(time.time() - start__time, 2)
    print("--- %s seconds ---" % end__time)

    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])


if __name__ == '__main__':
    main()

Многопоточность существует в одном процессе. Потоки будут совместно использовать адресное пространство, у них есть свои собственные наборы инструкций, каждый поток выполняет определенные задачи.

Что такое GIL? Почему это важно?
GIL (Global Interpreter Lock). Python не был разработан давно и в то время использовались только одноядерные процессоры и тогда не возникало никаких проблем (не предполагалось мулитипроцессорность), поэтому GIL необходим, потому что Python не является потокобезопасным, и существует глобальная блокировка при доступе к объекту Python. Что мы можем сделать?

Многопроцессорность позволяет создавать программы, которые могут выполняться одновременно (в обход GIL) и использовать все ядро вашего процессора. Библиотека многопроцессорной обработки предоставляет каждому процессу свой собственный интерпретатор Python и каждому свой собственный GIL.

Из-за этого обычные проблемы, связанные с потоками (например, повреждение данных и взаимоблокировки), больше не являются проблемой. Поскольку процессы не используют общую память, они не могут изменять одну и ту же память одновременно.
Глобальная блокировка интерпретатора Python
CPython (стандартная реализация Python) имеет нечто, называемое GIL (Global Interpreter Lock); GIL предотвращает одновременное выполнение двух потоков в одной программе. Однако два потока могут выполняться одновременно, и один может запускать код, а другой может ждать. GIL из коробки ограничивает параллельное программирование на Python.

Можно использовать локи, например вот так:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import time
import os
import argparse
import logging
import threading

from urllib.request import urlopen
from threading import Thread, Lock, ThreadError

logging.basicConfig(level=logging.DEBUG,
                    format='[%(levelname)s] (%(ThreadName)-1s) %(message)s',
                    datefmt="%H:%M:%S"
                    )


class Bgcolors:
    def __init__(self):
        self.get = {
            'HEADER': '\033[95m',
            'OKBLUE': '\033[94m',
            'OKGREEN': '\033[92m',
            'WARNING': '\033[93m',
            'FAIL': '\033[91m',
            'ENDC': '\033[0m',
            'BOLD': '\033[1m',
            'UNDERLINE': '\033[4m'
        }


class StartThreads(Thread):
    def __init__(self, name, daemon, url):
        """Initialize the thread"""
        try:
            Thread.__init__(self)
            self.name = name
            self.daemon = daemon
            self.url = url
            self.lock = Lock()
            self.setDaemon(daemon)
            self.start()
        except ThreadError as error:
            print('Error: Unable to initialize the {0} thread: \n{1}'.format(
                threading.currentThread().getName(),
                error)
            )

    def run(self):
        """Run the thread"""
        handle = urlopen(self.url)
        fname = os.path.basename(self.url)

        logging.debug('Waiting for a lock for {0}'.format(self.name))
        self.lock.acquire()

        try:
            with open(fname, "wb") as f_handler:
                while True:
                    chunk = handle.read(1024)
                    if not chunk:
                        break
                    f_handler.write(chunk)
            print("{0} is starting".format(threading.currentThread().getName()))
            msg = "{0} has finished downloading {1}!".format(self.name, self.url)
            print(msg)
        except Exception as error:
            print('Error: Got issue with {0} thread: \n{1}'.format(self.name, error))
        finally:
            logging.debug('Released a lock for {0}'.format(threading.currentThread().getName()))
            self.lock.release()

    @staticmethod
    def enumerate():
        for thread in threading.enumerate():
            print("Thread name is %s." % thread.getName())

    @staticmethod
    def thread_active_count():
        print("Active threads count: ", threading.activeCount())


def threads(urls):
    try:
        print("TIMEOUT_MAX: ", threading.TIMEOUT_MAX)
        for item, url in enumerate(urls):
            thread = StartThreads(name="Thread-{0}".format(item + 1), daemon=True, url=url)

            StartThreads.thread_active_count()

            if thread.daemon:
                thread.join(timeout=1)

        print("Threading is done")
    except ThreadError as error:
        print("Error: Unable to start thread: \n{0}".format(error))

    return threads


def main():
    start__time = time.time()

    parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
                                     usage='python3 script_name.py {ARGS}',
                                     add_help=True,
                                     prefix_chars='--/',
                                     epilog='''created by Vitalii Natarov''')
    parser.add_argument('--version', action='version', version='v0.1.0')

    urls = [
        # "https://www.irs.gov/pub/irs-pdf/p3.pdf",
        # "https://www.irs.gov/pub/irs-pdf/p15.pdf",
        # "https://www.irs.gov/pub/irs-pdf/p15a.pdf",
        # "https://www.irs.gov/pub/irs-pdf/p15b.pdf",
        # "https://www.irs.gov/pub/irs-pdf/p15t.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
    ]

    threads(urls)

    end__time = round(time.time() - start__time, 2)
    print("--- %s seconds ---" % end__time)

    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])


if __name__ == '__main__':
    main()

Приведу примеры использования очереди:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import os
import argparse
import logging
import threading

from urllib.request import urlopen
from threading import Thread, ThreadError
from queue import Queue

logging.basicConfig(level=logging.DEBUG,
                    format='[%(levelname)s] (%(threadName)-1s) %(message)s',
                    datefmt="%H:%M:%S"
                    )


class Bgcolors:
    def __init__(self):
        self.get = {
            'HEADER': '\033[95m',
            'OKBLUE': '\033[94m',
            'OKGREEN': '\033[92m',
            'WARNING': '\033[93m',
            'FAIL': '\033[91m',
            'ENDC': '\033[0m',
            'BOLD': '\033[1m',
            'UNDERLINE': '\033[4m'
        }


class StartThreads(Thread):
    def __init__(self, name, daemon, queue):
        """Initialize the thread"""
        try:
            Thread.__init__(self)
            self.name = name
            self.daemon = daemon
            self.queue = queue
            self.setDaemon(daemon)
            self.start()
        except ThreadError as error:
            print('Error: Unable to initialize the {0} thread: \n{1}'.format(
                threading.currentThread().getName(),
                error)
            )

    def run(self):
        """Run the thread"""
        while True:
            # Get the work from the queue and expand the tuple
            directory, link = self.queue.get()
            try:
                fname = os.path.basename(link)
                handle = urlopen(link)

                file_dir = os.path.abspath(directory) + "/" + fname
                print("File in directory ===== ", file_dir)

                with open(file_dir, "wb") as f_handler:
                    while True:
                        chunk = handle.read(1024)
                        if not chunk:
                            break
                        f_handler.write(chunk)
                print("{0} is starting".format(threading.currentThread().getName()))
                msg = "{0} has finished downloading {1}!".format(self.name, link)
                print(msg)
            except Exception as error:
                print('Error: Got issue with {0} thread: \n{1}'.format(self.name, error))
            finally:
                self.queue.task_done()

    @staticmethod
    def thread_active_count():
        print("Active threads count: ", threading.activeCount())


def workers(urls, thread_counts):
    file_dir = "dir"
    if not os.path.exists(file_dir):
        os.makedirs(file_dir)

    try:
        # Create a queue to communicate with the worker threads
        queue = Queue()
        # Create (thread_counts) worker threads
        for item in range(thread_counts):
            StartThreads(name="Thread-{0}".format(item + 1), daemon=True, queue=queue)

            StartThreads.thread_active_count()

        # Put the tasks into the queue as a tuple
        for url in urls:
            logging.info('Queueing {}'.format(url))
            queue.put((file_dir, url))
        # Causes the main thread to wait for the queue to finish processing all the tasks
        queue.join()

        # print("queue.empty() ------: ", queue.empty())
        # StartThreads.thread_active_count()

        print("Threading is done")
    except ThreadError as error:
        print("Error: Unable to start thread: \n{0}".format(error))

    return workers


def main():
    start__time = time.time()

    parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
                                     usage='python3 script_name.py {ARGS}',
                                     add_help=True,
                                     prefix_chars='--/',
                                     epilog='''created by Vitalii Natarov''')
    parser.add_argument('--version', action='version', version='v0.1.0')
    parser.add_argument('--threads', dest='thread_counts', help='Set thread counts', default=10)

    results = parser.parse_args()
    thread_counts = results.thread_counts

    urls = [
        "https://www.irs.gov/pub/irs-pdf/p3.pdf",
        "https://www.irs.gov/pub/irs-pdf/p15.pdf",
        "https://www.irs.gov/pub/irs-pdf/p15a.pdf",
        "https://www.irs.gov/pub/irs-pdf/p15b.pdf",
        "https://www.irs.gov/pub/irs-pdf/p15t.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
    ]

    workers(urls, thread_counts)

    end__time = round(time.time() - start__time, 2)
    print("--- %s seconds ---" % end__time)

    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])


if __name__ == '__main__':
    main()

Еще один вариант кода:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import time
import argparse
import threading

from urllib.request import urlopen
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures import as_completed
from concurrent.futures.process import BrokenProcessPool


class Bgcolors:
    def __init__(self):
        self.get = {
            'HEADER': '\033[95m',
            'OKBLUE': '\033[94m',
            'OKGREEN': '\033[92m',
            'WARNING': '\033[93m',
            'FAIL': '\033[91m',
            'ENDC': '\033[0m',
            'BOLD': '\033[1m',
            'UNDERLINE': '\033[4m'
        }


def download_file(url, directory):
    print('[{0}] is starting.....'.format(threading.current_thread().name))

    handle = urlopen(url)
    fname = os.path.basename(url)

    file_dir = os.path.abspath(directory) + "/" + fname
    print("File in directory ===== ", file_dir)

    try:
        with open(file_dir, "wb") as f_handler:
            while True:
                chunk = handle.read(1024)
                if not chunk:
                    break
                f_handler.write(chunk)
            return "The {0} has been downloaded!".format(fname)
    except Exception as error:
        print("Woops: {0}".format(error))
        return "Woops: {0}".format(error)


def threads_pool_executing(workers_count, file_dir, urls):
    print('[{0}] is starting.....'.format(threading.current_thread().name))
    try:
        with ThreadPoolExecutor(max_workers=workers_count, thread_name_prefix='thread') as ex:
            # future_to_url = {ex.submit(download_file, url, file_dir) for url in urls}
            # future_to_url = ex.map(download_file, url, file_dir) for url in urls
            future_to_url = dict((ex.submit(download_file, url, file_dir), url)
                                 for url in urls)

            try:
                for future in as_completed(future_to_url):
                    url = future_to_url[future]
                    if future.exception() is not None:
                        print('%r generated an exception: %s' % (url,
                                                                 future.exception()))
                    else:
                        print('%r page is %d bytes' % (url, len(future.result())))
            except BrokenProcessPool as error:
                print('could not start new tasks: {}'.format(error))
    except Exception as error:
        print("Woops: {0}".format(error))

    return threads_pool_executing


def main():
    start__time = time.time()

    parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
                                     usage='python3 script_name.py {ARGS}',
                                     add_help=True,
                                     prefix_chars='--/',
                                     epilog='''created by Vitalii Natarov''')
    parser.add_argument('--version', action='version', version='v0.1.0')
    parser.add_argument('--workers', dest='workers_counts', help='Set workers counts for pool executing', default=10)

    results = parser.parse_args()
    workers_counts = results.workers_counts

    urls = [
        "https://www.irs.gov/pub/irs-pdf/p3.pdf",
        "https://www.irs.gov/pub/irs-pdf/p15.pdf",
        "https://www.irs.gov/pub/irs-pdf/p15a.pdf",
        "https://www.irs.gov/pub/irs-pdf/p15b.pdf",
        "https://www.irs.gov/pub/irs-pdf/p15t.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
    ] * 100

    file_dir = "dir"
    if not os.path.exists(file_dir):
        os.makedirs(file_dir)

    threads_pool_executing(workers_counts, file_dir, urls)

    end__time = round(time.time() - start__time, 2)
    print("--- %s seconds ---" % end__time)

    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])


if __name__ == '__main__':
    main()

Я потратил неделю времени на данные примеры и разобраться как работает разный из примеров. Но зато, было увлекательно!

Multiprocessing на Python в Unix/Linux

Что такое multiprocessing?
Многопроцессорность позволяет системе поддерживать более одного процессора одновременно. При многопроцессорной обработке процессы порождаются путем создания объекта Process и последующего вызова его start() метода.

Мультиобработка — это разные программы или процессы, запущенные на вашем компьютере. У процесса есть собственная виртуальная память или адресное пространство, теперь он может создавать внутри себя несколько потоков. Если процессы должны взаимодействовать друг с другом, они используют методы межпроцессного взаимодействия, такие как файл на диске, общая память (очередь) или канал сообщений.

Обмен данными между процессами можно доститьс помощью очереди. Поскольку мы знаем, что у нескольких процессов есть собственное адресное пространство, они не разделяют адресное пространство, что приводит к проблеме, поэтому для обмена данными между процессами вам необходимо использовать некоторые методы:

Приведу простой пример мультипроцессора:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import os
import argparse
import threading

from urllib.request import urlopen
from multiprocessing import Process, ProcessError


class Bgcolors:
    def __init__(self):
        self.get = {
            'HEADER': '\033[95m',
            'OKBLUE': '\033[94m',
            'OKGREEN': '\033[92m',
            'WARNING': '\033[93m',
            'FAIL': '\033[91m',
            'ENDC': '\033[0m',
            'BOLD': '\033[1m',
            'UNDERLINE': '\033[4m'
        }


class StartProcesses(Process):
    def __init__(self, name, daemon, directory, url):
        """Initialize the thread"""
        try:
            Process.__init__(self)
            self.name = name
            self.url = url
            self.directory = directory
            self.daemon = daemon
            self.start()
        except ProcessError as error:
            print('Error: Unable to initialize the {0} thread: \n{1}'.format(
                threading.currentThread().getName(),
                error)
            )

    def run(self):
        """Run the thread"""
        handle = urlopen(self.url)
        fname = os.path.basename(self.url)

        file_dir = os.path.abspath(self.directory) + "/" + fname
        print("File in directory ===== ", file_dir)

        with open(file_dir, "wb") as f_handler:
            while True:
                chunk = handle.read(1024)
                if not chunk:
                    break
                f_handler.write(chunk)
        print("{0} is starting".format(multiprocessing.current_process().name))
        msg = "{0} has finished downloading {1}!".format(self.name, self.url)
        print(msg)

    @staticmethod
    def thread_active_count():
        print("Active threads count: ", threading.activeCount())


def processes(file_dir, urls):
    try:
        for item, url in enumerate(urls):
            process = StartProcesses(name="Process-{0}".format(item + 1), daemon=False, directory=file_dir, url=url)
            print("process: ", process)

            if process.daemon:
                process.join()

            StartProcesses.thread_active_count()

        print("StartProcesses is done")
    except ProcessError as error:
        print("Error: Unable to start a process: \n{0}".format(error))

    return processes


def main():
    start__time = time.time()

    parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
                                     usage='python3 script_name.py {ARGS}',
                                     add_help=True,
                                     prefix_chars='--/',
                                     epilog='''created by Vitalii Natarov''')
    parser.add_argument('--version', action='version', version='v0.1.0')

    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
            ]

    file_dir = "dir"
    if not os.path.exists(file_dir):
        os.makedirs(file_dir)

    processes(file_dir, urls)

    end__time = round(time.time() - start__time, 2)
    print("--- %s seconds ---" % end__time)

    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])


if __name__ == '__main__':
    main()

Пишем код, но с очередями:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import os
import argparse
import logging
import threading
import multiprocessing

from urllib.request import urlopen
from multiprocessing import Process, ProcessError, JoinableQueue

class Bgcolors:
    def __init__(self):
        self.get = {
            'HEADER': '\033[95m',
            'OKBLUE': '\033[94m',
            'OKGREEN': '\033[92m',
            'WARNING': '\033[93m',
            'FAIL': '\033[91m',
            'ENDC': '\033[0m',
            'BOLD': '\033[1m',
            'UNDERLINE': '\033[4m'
        }


class StartProcesses(Process):
    def __init__(self, name, daemon, queue):
        """Initialize the thread"""
        try:
            Process.__init__(self)
            self.name = name
            self.queue = queue
            self.event = multiprocessing.Event()
            self.daemon = daemon
            self.start()
        except ProcessError as error:
            print('Error: Unable to initialize the {0} thread: \n{1}'.format(
                multiprocessing.current_process().name,
                error)
            )

    def run(self):
        """Run the thread"""
        while True:
            # Get the work from the queue and expand the tuple
            directory, link = self.queue.get(timeout=60)

            try:
                fname = os.path.basename(link)
                handle = urlopen(link)

                file_dir = os.path.abspath(directory) + "/" + fname
                print("File in directory ===== ", file_dir)

                with open(file_dir, "wb") as f_handler:
                    while True:
                        chunk = handle.read(1024)
                        if not chunk:
                            break
                        f_handler.write(chunk)
                print("{0} is starting".format(multiprocessing.current_process().name))
                msg = "{0} has finished downloading {1}!".format(self.name, link)
                print(msg)
            except Exception as error:
                print('Error: Got issue with {0} thread: \n{1}'.format(self.name, error))
            # finally:
            #     # print(self.event)
            #     # self.stop()
            #     # self.queue.task_done()

    @staticmethod
    def thread_active_count():
        print("Active threads count: ", threading.activeCount())

    @staticmethod
    def current_process_pid():
        print("Active process count: ", multiprocessing.current_process().pid)


def processes(cores, file_dir, urls):
    try:
        if cores is None:
            cpu_cores = multiprocessing.cpu_count()
        else:
            cpu_cores = cores

        all_processes = []

        # Create a queue to communicate with the worker threads
        queue = JoinableQueue()

        for item in range(cpu_cores):
            process = StartProcesses(name="Process-{0}".format(item + 1), daemon=True, queue=queue)
            all_processes.append(process)
            print("process: ", process)

            StartProcesses.thread_active_count()

        # Put the tasks into the queue as a tuple
        for url in urls:
            logging.info('Queueing {}'.format(url))
            queue.put((file_dir, url))

        # Causes the main thread to wait for the queue to finish processing all the tasks
        for pr in all_processes:
            pr.join(timeout=1)

        print("StartProcesses is done")
    except ProcessError as error:
        print("Error: Unable to start a process: \n{0}".format(error))

    return processes


def main():
    start__time = time.time()

    parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
                                     usage='python3 script_name.py {ARGS}',
                                     add_help=True,
                                     prefix_chars='--/',
                                     epilog='''created by Vitalii Natarov''')
    parser.add_argument('--version', action='version', version='v0.1.0')
    parser.add_argument('--dir', dest='file_dir', help='Set directory for files', default="dir")
    parser.add_argument('--cores', dest='cpu_cores',
                        help='Set cores for work. If None - will use cores count from host',
                        default=None)

    results = parser.parse_args()
    file_dir = results.file_dir
    cores = results.cpu_cores

    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
            ] * 10

    if not os.path.exists(file_dir):
        os.makedirs(file_dir)

    processes(cores, file_dir, urls)

    end__time = round(time.time() - start__time, 2)
    print("--- %s seconds ---" % end__time)

    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])


if __name__ == '__main__':
    main()

Можно добавить локи:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import os
import argparse
import logging
import threading
import multiprocessing

from urllib.request import urlopen
from multiprocessing import Process, ProcessError, Lock

logging.basicConfig(level=logging.DEBUG,
                    format='[%(levelname)s] (%(processName)-1s) %(message)s',
                    datefmt="%H:%M:%S"
                    )


class Bgcolors:
    def __init__(self):
        self.get = {
            'HEADER': '\033[95m',
            'OKBLUE': '\033[94m',
            'OKGREEN': '\033[92m',
            'WARNING': '\033[93m',
            'FAIL': '\033[91m',
            'ENDC': '\033[0m',
            'BOLD': '\033[1m',
            'UNDERLINE': '\033[4m'
        }


class StartProcesses(Process):
    def __init__(self, name, daemon, directory, url):
        """Initialize the thread"""
        try:
            Process.__init__(self)
            self.name = name
            self.directory = directory
            self.url = url
            self.daemon = daemon
            self.lock = Lock()
            self.start()
        except ProcessError as error:
            print('Error: Unable to initialize the {0} thread: \n{1}'.format(
                multiprocessing.current_process().name,
                error)
            )

    def run(self):
        """Run the thread"""
        handle = urlopen(self.url)
        fname = os.path.basename(self.url)

        file_dir = os.path.abspath(self.directory) + "/" + fname
        print("File in directory ===== ", file_dir)

        logging.debug('Waiting for a lock for {0}'.format(self.name))
        self.lock.acquire()

        try:
            with open(file_dir, "wb") as f_handler:
                while True:
                    chunk = handle.read(1024)
                    if not chunk:
                        break
                    f_handler.write(chunk)
            print("{0} is starting".format(self.name))
            msg = "{0} has finished downloading {1}!".format(self.name, self.url)
            print(msg)
        except Exception as error:
            print('Error: Got issue with {0} thread: \n{1}'.format(self.name, error))
        finally:
            logging.debug('Released a lock for {0}'.format(multiprocessing.current_process().name))
            self.lock.release()

    @staticmethod
    def thread_active_count():
        print("Active threads count: ", threading.activeCount())

    @staticmethod
    def current_process_pid():
        print("Active process count: ", multiprocessing.current_process().pid)


def processes(file_dir, urls):
    try:
        for item, url in enumerate(urls):
            process = StartProcesses(name="Process-{0}".format(item + 1), daemon=False, directory=file_dir, url=url)
            print("process: ", process)

            if process.daemon:
                process.join()

            StartProcesses.thread_active_count()

        print("StartProcesses is done")
    except ProcessError as error:
        print("Error: Unable to start a process: \n{0}".format(error))

    return processes


def main():
    start__time = time.time()

    parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
                                     usage='python3 script_name.py {ARGS}',
                                     add_help=True,
                                     prefix_chars='--/',
                                     epilog='''created by Vitalii Natarov''')
    parser.add_argument('--version', action='version', version='v0.1.0')
    parser.add_argument('--dir', dest='file_dir', help='Set directory for files', default="dir")

    results = parser.parse_args()
    file_dir = results.file_dir

    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
            ] * 1

    if not os.path.exists(file_dir):
        os.makedirs(file_dir)

    processes(file_dir, urls)

    end__time = round(time.time() - start__time, 2)
    print("--- %s seconds ---" % end__time)

    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])


if __name__ == '__main__':
    main()

Можно тоже самое сделать с менеджером мультипроцессора:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import os
import argparse
import logging
import threading
import multiprocessing

from urllib.request import urlopen
from multiprocessing import Process, ProcessError, Manager

logging.basicConfig(level=logging.DEBUG,
                    format='[%(levelname)s] (%(processName)-1s) %(message)s',
                    datefmt="%H:%M:%S"
                    )


class Bgcolors:
    def __init__(self):
        self.get = {
            'HEADER': '\033[95m',
            'OKBLUE': '\033[94m',
            'OKGREEN': '\033[92m',
            'WARNING': '\033[93m',
            'FAIL': '\033[91m',
            'ENDC': '\033[0m',
            'BOLD': '\033[1m',
            'UNDERLINE': '\033[4m'
        }


class StartProcesses(Process):
    def __init__(self, name, daemon, queue):
        """Initialize the thread"""
        try:
            Process.__init__(self)
            self.name = name
            self.queue = queue
            self.event = multiprocessing.Event()
            self.daemon = daemon
            self.start()
        except ProcessError as error:
            print('Error: Unable to initialize the {0} thread: \n{1}'.format(
                multiprocessing.current_process().name,
                error)
            )

    def run(self):
        """Run the thread"""
        while True:
            # Get the work from the queue and expand the tuple
            directory, link = self.queue.get(timeout=60)

            try:
                fname = os.path.basename(link)
                handle = urlopen(link)

                file_dir = os.path.abspath(directory) + "/" + fname
                print("File in directory ===== ", file_dir)

                with open(file_dir, "wb") as f_handler:
                    while True:
                        chunk = handle.read(1024)
                        if not chunk:
                            break
                        f_handler.write(chunk)
                print("{0} is starting".format(multiprocessing.current_process().name))
                msg = "{0} has finished downloading {1}!".format(self.name, link)
                print(msg)
            except Exception as error:
                print('Error: Got issue with {0} thread: \n{1}'.format(self.name, error))
            # finally:
            #     # print(self.event)
            #     # self.stop()
            #     # self.queue.task_done()

    @staticmethod
    def thread_active_count():
        print("Active threads count: ", threading.activeCount())

    @staticmethod
    def current_process_pid():
        print("Active process count: ", multiprocessing.current_process().pid)


def processes(cores, file_dir, urls):
    try:
        if cores is None:
            cpu_cores = multiprocessing.cpu_count()
        else:
            cpu_cores = cores

        all_processes = []

        # Create process manager & a queue to communicate with the worker threads
        process_manager = Manager()
        queue = process_manager.Queue()

        for item in range(cpu_cores):
            process = StartProcesses(name="Process-{0}".format(item + 1), daemon=True, queue=queue)
            all_processes.append(process)

        # Put the tasks into the queue as a tuple
        for url in urls:
            logging.info('Queueing {}'.format(url))
            queue.put((file_dir, url))

        # Causes the main thread to wait for the queue to finish processing all the tasks
        for pr in all_processes:
            pr.join(timeout=1)

        print("StartProcesses is done")
    except ProcessError as error:
        print("Error: Unable to start a process: \n{0}".format(error))

    return processes


def main():
    start__time = time.time()

    parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
                                     usage='python3 script_name.py {ARGS}',
                                     add_help=True,
                                     prefix_chars='--/',
                                     epilog='''created by Vitalii Natarov''')
    parser.add_argument('--version', action='version', version='v0.1.0')
    parser.add_argument('--dir', dest='file_dir', help='Set directory for files', default="dir")
    parser.add_argument('--cores', dest='cpu_cores',
                        help='Set cores for work. If None - will use cores count from host',
                        default=None)

    results = parser.parse_args()
    file_dir = results.file_dir
    cores = results.cpu_cores

    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
            ] * 10

    if not os.path.exists(file_dir):
        os.makedirs(file_dir)

    processes(cores, file_dir, urls)

    end__time = round(time.time() - start__time, 2)
    print("--- %s seconds ---" % end__time)

    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])


if __name__ == '__main__':
    main()

А как насчет Pool? Та легко, вот пример:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import os
import argparse
import threading
import multiprocessing
from functools import partial

from urllib.request import urlopen
from multiprocessing import ProcessError, Pool


class Bgcolors:
    def __init__(self):
        self.get = {
            'HEADER': '\033[95m',
            'OKBLUE': '\033[94m',
            'OKGREEN': '\033[92m',
            'WARNING': '\033[93m',
            'FAIL': '\033[91m',
            'ENDC': '\033[0m',
            'BOLD': '\033[1m',
            'UNDERLINE': '\033[4m'
        }


def download_file(url, directory):
    handle = urlopen(url)
    fname = os.path.basename(url)

    file_dir = os.path.abspath(directory) + "/" + fname
    print("File in directory ===== ", file_dir)
    print("{0} is starting".format(threading.currentThread().getName()))

    with open(file_dir, "wb") as f_handler:
        while True:
            chunk = handle.read(1024)
            if not chunk:
                break
            f_handler.write(chunk)

    msg = "{0} has finished downloading {1}!".format(threading.currentThread().getName(), url)
    print(msg)
    return msg


def thread_active_count():
    print("Active threads count: ", threading.activeCount())


def processes(urls, directory, cores, chanksize):
    if cores is None:
        cpu_cores = multiprocessing.cpu_count()
    else:
        cpu_cores = cores

    try:
        thread_active_count()
        with Pool(processes=cpu_cores) as pool:
            thread_active_count()
            download_file_x = partial(download_file, directory=directory)
            pool.map(download_file_x, urls, chunksize=chanksize)
            # pool.imap(download_file_x, urls, chunksize=chanksize)
            # pool.imap_unordered(download_file_x, urls, chunksize=chanksize)
            thread_active_count()

    except ProcessError as error:
        print('Error: Unable to initialize the {0} thread: \n{1}'.format(
            threading.currentThread().getName(),
            error)
        )

    return processes


def main():
    start__time = time.time()

    parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
                                     usage='python3 script_name.py {ARGS}',
                                     add_help=True,
                                     prefix_chars='--/',
                                     epilog='''created by Vitalii Natarov''')
    parser.add_argument('--version', action='version', version='v0.1.0')
    parser.add_argument('--dir', dest='file_dir', help='Set directory for files', default="dir")
    parser.add_argument('--chanksize', dest='chanksize', help='Set chanksize', default=1)
    parser.add_argument('--cores', dest='cpu_cores',
                        help='Set cores for work. If None - will use cores count from host',
                        default=None)

    results = parser.parse_args()
    file_dir = results.file_dir
    cores = results.cpu_cores
    chanksize = results.chanksize

    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
            ] * 10

    if not os.path.exists(file_dir):
        os.makedirs(file_dir)

    processes(urls, file_dir, cores, chanksize)

    end__time = round(time.time() - start__time, 2)
    print("--- %s seconds ---" % end__time)

    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])


if __name__ == '__main__':
    main()

Или:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import time
import argparse
import threading

from urllib.request import urlopen
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
from concurrent.futures.process import BrokenProcessPool


class Bgcolors:
    def __init__(self):
        self.get = {
            'HEADER': '\033[95m',
            'OKBLUE': '\033[94m',
            'OKGREEN': '\033[92m',
            'WARNING': '\033[93m',
            'FAIL': '\033[91m',
            'ENDC': '\033[0m',
            'BOLD': '\033[1m',
            'UNDERLINE': '\033[4m'
        }


def download_file(url, directory):
    print('[{0}] is starting.....'.format(
        threading.current_thread().name)
    )

    handle = urlopen(url)
    fname = os.path.basename(url)

    file_dir = os.path.abspath(directory) + "/" + fname
    print("File in directory ===== ", file_dir)

    try:
        with open(file_dir, "wb") as f_handler:
            while True:
                chunk = handle.read(1024)
                if not chunk:
                    break
                f_handler.write(chunk)
            return "The {0} has been downloaded!".format(fname)
    except Exception as error:
        print("Woops: {0}".format(error))
        return "Woops: {0}".format(error)


def threads_pool_executing(workers_count, file_dir, urls):
    print('[{0}] is starting.....'.format(threading.current_thread().name))
    try:
        with ProcessPoolExecutor(max_workers=workers_count) as ex:
            # future_to_url = {ex.submit(download_file, url, file_dir) for url in urls}
            # future_to_url = ex.map(download_file, url, file_dir) for url in urls
            future_to_url = dict((ex.submit(download_file, url, file_dir), url)
                                 for url in urls)

            try:
                for future in as_completed(future_to_url):
                    url = future_to_url[future]
                    if future.exception() is not None:
                        print('%r generated an exception: %s' % (url,
                                                                 future.exception()))
                    else:
                        print('%r page is %d bytes' % (url, len(future.result())))
            except BrokenProcessPool as error:
                print('could not start new tasks: {}'.format(error))
    except Exception as error:
        print("Woops: {0}".format(error))

    return threads_pool_executing


def main():
    start__time = time.time()

    parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
                                     usage='python3 script_name.py {ARGS}',
                                     add_help=True,
                                     prefix_chars='--/',
                                     epilog='''created by Vitalii Natarov''')
    parser.add_argument('--version', action='version', version='v0.1.0')
    parser.add_argument('--workers', dest='workers_counts', help='Set workers counts for pool executing', default=10)

    results = parser.parse_args()
    workers_counts = results.workers_counts

    urls = [
        "https://www.irs.gov/pub/irs-pdf/p3.pdf",
        "https://www.irs.gov/pub/irs-pdf/p15.pdf",
        "https://www.irs.gov/pub/irs-pdf/p15a.pdf",
        "https://www.irs.gov/pub/irs-pdf/p15b.pdf",
        "https://www.irs.gov/pub/irs-pdf/p15t.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
    ] * 100

    file_dir = "dir"
    if not os.path.exists(file_dir):
        os.makedirs(file_dir)

    threads_pool_executing(workers_counts, file_dir, urls)

    end__time = round(time.time() - start__time, 2)
    print("--- %s seconds ---" % end__time)

    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])


if __name__ == '__main__':
    main()

Как-то так!

AsyncIO на Python в Unix/Linux

Теории пока не дам, почитайте на досуге сами, а вот примеры приведу:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import os
import argparse
import logging
import asyncio
import aiohttp

from functools import partial

logging.basicConfig(level=logging.DEBUG,
                    format='[%(levelname)s] %(asctime)s - (%(processName)-1s) - %(name)s - %(message)s '
                           '[in %(funcName)s: %(pathname)s:%(lineno)d]',
                    datefmt="%H:%M:%S"
                    )
logger = logging.getLogger(__name__)


class Bgcolors:
    def __init__(self):
        self.get = {
            'HEADER': '\033[95m',
            'OKBLUE': '\033[94m',
            'OKGREEN': '\033[92m',
            'WARNING': '\033[93m',
            'FAIL': '\033[91m',
            'ENDC': '\033[0m',
            'BOLD': '\033[1m',
            'UNDERLINE': '\033[4m'
        }


async def dir_check(directory):
    print('Check if the dir is present, else - create it')
    try:
        if not os.path.exists(directory):
            os.makedirs(directory)
            logger.info("The {0} has been created".format(directory))
    except OSError as err:
        logger.error("Directory {0} can not be created: \n{1}".format(directory, err))

    return directory


async def download_file(session, url, directory):
    async with session.get(url) as response:
        fname = os.path.basename(url)
        file_dir = os.path.abspath(directory) + "/" + fname
        logger.info("File in directory ===== {}".format(file_dir))

        with open(file_dir, "wb") as f_handler:
            logger.info("{0} is starting".format(fname))
            while True:
                # await pauses execution until the 1024 (or less) bytes are read from the stream
                chunk = await response.content.read(1024)
                if not chunk:
                    # We are done reading the file, break out of the while loop
                    break
                f_handler.write(chunk)
    logger.info("{0} has finished downloading {1}!".format(fname, url))

    return download_file


async def fetch(session, url):
    async with session.get(url, raise_for_status=False) as resp:
        # assert resp.status == 200
        print(resp.status, url)
        return resp.status, url


async def main():
    start__time = time.time()

    parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
                                     usage='python3 script_name.py {ARGS}',
                                     add_help=True,
                                     prefix_chars='--/',
                                     epilog='''created by Vitalii Natarov''')
    parser.add_argument('--version', action='version', version='v0.1.0')
    parser.add_argument('--dir', dest='file_dir', help='Set directory for files', default="dir")

    results = parser.parse_args()
    file_dir = results.file_dir

    urls = [
            "http://www.irs.gov/pub/irs-pdf/f1040.pdf",
            "https://www.irs.gov/pub/irs-prior/f1040a--2015.pdf",
            "https://www.irs.gov/pub/irs-prior/i1040ez--2017.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
            ] * 1

    urls_list = []
    async with aiohttp.ClientSession(raise_for_status=True) as session:
        for url in urls:
            # urls_list.append(partial(fetch, session=session, url=url))
            urls_list.append(partial(download_file, session=session, url=url, directory=file_dir))

        await dir_check(file_dir)
        for f in urls_list:
            await f()

    end__time = round(time.time() - start__time, 2)
    print("--- %s seconds ---" % end__time)
    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.set_debug(False)

    try:
        tasks = [
            loop.create_task(main())
        ]

        wait_tasks = asyncio.wait(tasks)
        loop.run_until_complete(wait_tasks)
        # loop.run_until_complete(main())
    except Exception as error:
        logger.error(error)
    finally:
        # Shutdown the loop even if there is an exception
        loop.close()

Не понравился данный модуль, если честно.

Gevent на Python в Unix/Linux

Вот пример данной реализации:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import os
import argparse
import logging
import gevent
import gevent.monkey

from urllib.request import urlopen

logging.basicConfig(level=logging.DEBUG,
                    format='[%(levelname)s] %(asctime)s - (%(processName)-1s) - %(name)s - %(message)s '
                           '[in %(funcName)s: %(pathname)s:%(lineno)d]',
                    datefmt="%H:%M:%S"
                    )
logger = logging.getLogger(__name__)


class Bgcolors:
    def __init__(self):
        self.get = {
            'HEADER': '\033[95m',
            'OKBLUE': '\033[94m',
            'OKGREEN': '\033[92m',
            'WARNING': '\033[93m',
            'FAIL': '\033[91m',
            'ENDC': '\033[0m',
            'BOLD': '\033[1m',
            'UNDERLINE': '\033[4m'
        }


def dir_check(directory):
    print('Check if the dir is present, else - create it')
    try:
        if not os.path.exists(directory):
            os.makedirs(directory)
            logger.info("The {0} has been created".format(directory))
    except OSError as err:
        logger.error("Directory {0} can not be created: \n{1}".format(directory, err))

    return directory


def download_file(directory, url):
    handle = urlopen(url)
    fname = os.path.basename(url)

    file_dir = os.path.abspath(directory) + "/" + fname
    logger.info("File in directory ===== {}".format(file_dir))

    with open(file_dir, "wb") as f_handler:
        while True:
            chunk = handle.read(1024)
            if not chunk:
                break
            f_handler.write(chunk)

    return download_file


def main():
    start__time = time.time()

    parser = argparse.ArgumentParser(prog='python3 script_name.py -h',
                                     usage='python3 script_name.py {ARGS}',
                                     add_help=True,
                                     prefix_chars='--/',
                                     epilog='''created by Vitalii Natarov''')
    parser.add_argument('--version', action='version', version='v0.1.0')
    parser.add_argument('--dir', dest='file_dir', help='Set directory for files', default="dir")

    results = parser.parse_args()
    file_dir = results.file_dir

    urls = [
            "http://www.irs.gov/pub/irs-pdf/f1040.pdf",
            "https://www.irs.gov/pub/irs-prior/f1040a--2015.pdf",
            "https://www.irs.gov/pub/irs-prior/i1040ez--2017.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"
            ] * 1

    gevent.monkey.patch_all()
    threads = [
        gevent.spawn(dir_check, file_dir),
    ]
    for url in urls:
        threads.append(gevent.spawn(download_file, directory=file_dir, url=url))

    # gevent.joinall(threads)
    gevent.wait(threads)

    end__time = round(time.time() - start__time, 2)
    print("--- %s seconds ---" % end__time)
    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "==========================FINISHED==========================",
        Bgcolors().get['ENDC'])
    print(
        Bgcolors().get['OKGREEN'], "============================================================",
        Bgcolors().get['ENDC'])


if __name__ == '__main__':
    main()

Как-то так!

Код выложил на гитхабе — https://github.com/SebastianUA/concurrency-in-python . Можно заюзать как-то.

Вот и все, статья «Параллелизм на Python в Unix/Linux» завершена.

This entry was posted in Без рубрики. Bookmark the permalink.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Этот сайт использует Akismet для борьбы со спамом. Узнайте, как обрабатываются ваши данные комментариев.