Разгоняем Python с помощью конкурентности параллелизма и asyncio.

Разгоняем Python с помощью конкурентности параллелизма и asyncio.

Перевод статьи Jace Medlin: Speeding up python with concurrency parallelism and asyncio (opens new window).

Есть много причин, по которым ваши приложения могут работать медленно. Иногда это происходит из-за плохой алгоритмической конструкции или неправильного выбора структуры данных. Однако иногда это происходит из-за неподконтрольных нам сил, таких как аппаратные ограничения или особенности работы в сети. Вот где подходят конкурентность и параллелизм. Они позволяют вашим программам одновременно выполнять несколько задач, либо сократить время на ожидание выполнения ресурсоемких задач.

Независимо от того, имеете ли вы дело с внешними веб-ресурсами, читаете и записываете в несколько файлов или вам нужно несколько раз использовать функцию, требующую интенсивных вычислений, с разными параметрами, этот пост должен помочь вам максимизировать эффективность и скорость вашего кода.

Во-первых, мы углубимся в то, что такое конкурентность и параллелизм и как они вписываются в область Python, используя стандартные библиотеки, такие как threading, multiprocessing и asyncio. В последней части этого поста мы сравним реализацию async/await в Python с тем, как они реализованы в других языках.

Вы можете найти все примеры кода из этого поста в репозитории concurrency-parallelism-and-asyncio (opens new window) на GitHub.

Чтобы работать с примерами в данной статье, вы должны быть знакомы с тем как работать с HTTP запросами. Устанавливать модули Python с помощью pip. В части тестирования знать основы работы с pytest.

# Цели

К концу текущей статьи вы получите ответы на следующие вопросы:

  1. Что такое конкурентность (concurrency)?
  2. Что такое поток (thread)?
  3. Что означает когда какая либо задача неблокирующая?
  4. Что такое цикл событий (event-loop)?
  5. Что такое "обратный вызов" (callback)?
  6. Почему методы asyncio всегда несколько быстрее чем методы threading?
  7. Когда следует использовать threading, и когда следует предпочесть asyncio?
  8. Что такое параллелизм (parallelism)?
  9. Какая разница между конкурентностью и параллелизмом?
  10. Возможно ли комбинировать asyncio и multiprocessing?
  11. Когда лучше использовать multiprocessing, asyncio, threading?
  12. В чем разница между multiprocessing, asyncio или concurrency.futures?
  13. Как тестировать asyncio с помощью pytest?

# Конкурентность (concurrency)

Эффективное определение конкурентности - это «возможность выполнять несколько задач одновременно». Это немного вводит в заблуждение, поскольку задачи могут или не могут быть выполнены в одно и то же время. Вместо этого процесс может начаться, а затем, как только он приступит к ожиданию завершения некоторой инструкции, переключиться на новую задачу, чтобы впоследствии вернуться, после того как инструкция будет выполнена. Когда одна задача завершена, процесс снова переключается на незавершенную задачу, пока все они не будут выполнены. Задачи начинаются асинхронно, выполняются асинхронно, а затем асинхронно заканчиваются.

Если вас это сбивает с толку, давайте вместо этого подумаем о аналогии: скажем, вы хотите сделать сэндвич. Сначала вы должны положить бекон на сковороду и начать его жарить на среднем или медленном огне. Пока бекон готовится, вы можете достать помидоры и салат и начать их готовить (мыть и нарезать). Все это время вы продолжаете проверять и время от времени переворачиваете свой бекон.

На данный момент вы начали задачу, а затем начали и выполнили еще две, пока вы все еще ждете первого.

В конце концов вы кладете хлеб в тостер. Пока он поджаривается, вы продолжаете проверять свой бекон. Когда кусочки будут готовы, вы вытаскиваете их и кладете на тарелку. После того, как ваш хлеб поджаривается, вы наносите на него соус по своему выбору, а затем можете начинать насыпать помидоры, салат, а затем, когда он готов, ваш бекон. Только после того, как все будет приготовлено, подготовлено и разложено слоями, вы можете положить последний кусок тоста на свой бутерброд, нарезать его (по желанию) и съесть.

Поскольку для этого требуется, чтобы вы выполняли несколько задач одновременно, создание сэндвича по сути является конкурентным процессом, даже если вы не уделяете все свое внимание каждой из этих задач одновременно. Во всех смыслах и целях в следующем разделе мы будем называть эту форму параллелизма просто «конкурентность».

По этой причине конкурентность отлично подходит для процессов с интенсивным вводом-выводом - задач, которые ожидают завершения веб-запросов или операции чтения/записи файлов.

В Python есть несколько разных способов добиться конкурентности. В первую очередь мы рассмотрим библиотеку threading.

В качестве примеров в этом разделе мы собираемся создать небольшую программу на Python, которая 5 раз выбирает случайный музыкальный жанр из Genrenator API Binary Jazz, выводит его на экран и помещает каждый в отдельный файл.

Для работы с потоками в Python нам потребуется импортировать единственный модуль threading, но для нашего примера мы также импортируем urllib для работы с HTTP-запросами, time для реализации измерения времени выполнения функций и json для удобного преобразования данных полученных из Generation API в json формат.

Начнем с простой функции:

def write_genre(file_name):
    """
    Использует генератор binaryjazz.us чтобы записать 
    название случайного жанра в файл с именем file_name.
    """

    req = Request("https://binaryjazz.us/wp-json/genrenator/v1/genre/", 
                  headers={'User-Agent': 'Mozilla/5.0'})
    genre = json.load(urlopen(req))

    with open(file_name, "w") as new_file:
        print(f'Записываестя "{genre}" в "{file_name}"...')
        new_file.write(genre)

Код приведенный выше выполняет запрос к Generator API, загружает ответ сервиса в формате JSON (случайный музыкальный жанр), выводит его и записывает в файл.

Без указания в заголовке запроса "User-Agent" будет возвращен код ответа 304.

Что нас действительно интересует, так это следующая часть, в которой реализуется фактическая многопоточность:

threads = []

for i in range(5):
    thread = threading.Thread(
        target=write_genre,
        args=[f"./threading/new_file{i}.txt"]
    )
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

Вначале мы создаем пустой список threads. Затем мы реализуем цикл на 5 итераций который каждый раз создает новый поток, запускает его и добавляет в наш список threads. Затем в другом цикле мы перебираем наш список thead и присоединяемся (join) к каждому из потоков.

Создание потоков в Python выполяется следующим образом:

Чтобы создать новый поток, используйте threading.Thread(). Вы можете передать ему именованные аргументы kwarg со параметрами любой функции, которую вы хотите запустить в данном потоке. Но передавать следует только имя функции, не вызывая ее (в нашем случае write_genre, а не write_genre()). Чтобы передать аргументы функции, передайте "kwargs" (который принимает словарь (dict) аргументов) или "args" (который принимает итерируемый объект, содержащий ваши аргументы - в данном случае список).

Однако создание потока - это не то же самое, что запуск потока. Чтобы запустить его, используйте {имя вашего потока}.start(). Запуск потока означает «начало его выполнения».

Наконец, когда мы присоединяемся к потокам с помощью thread.join(), все, что мы делаем, - это проверяем завершение потока, прежде чем продолжить работу с нашим кодом.

# Потоки (threads)

Поток - это способ позволить вашему компьютеру разбить отдельный процесс/программу на множество легковесных частей, которые выполняются параллельно. Несколько сбивает с толку стандартная реализация потоковой передачи Python, которая ограничивает потоки только возможностью их выполнения по одному по причине реализации Глобальной Блокировкой Интерпретатора (GIL) (opens new window). GIL необходим, потому что управление памятью CPython (реализация Python по умолчанию) не является потокобезопасным. Из-за этого ограничения потоки в Python являются конкурентными, но не параллельными. Чтобы обойти это, в Python есть отдельный модуль multiprocessing, не ограниченный GIL, который запускает отдельные процессы, обеспечивая параллельное выполнение вашего кода. Использование модуля multiprocessing почти идентично использованию модуля threading.

Более подробную информацию о Python GIL и безопасности потоков можно найти в официальных документах Real Python (opens new window) и официальной документации Python (opens new window).

Вскоре мы более подробно рассмотрим multiprocessing в Python.

Прежде чем мы покажем потенциальное улучшение скорости по сравнению с кодом выполняемым в один поток, я взял на себя смелость также создать версию той же программы (опять же, доступную на GitHub (opens new window)) без использования многопоточности. Вместо того чтобы создавать новый поток и присоединяться к каждому из них, он вызывает write_genre в цикле for, который повторяется 5 раз.

Чтобы сравнить тесты скорости, я также импортировал библиотеку time, чтобы измерить время выполнения наших скриптов:

Starting...
Writing "binary indoremix" to "./sync/new_file0.txt"...
Writing "slavic aggro polka fusion" to "./sync/new_file1.txt"...
Writing "israeli new wave" to "./sync/new_file2.txt"...
Writing "byzantine motown" to "./sync/new_file3.txt"...
Writing "dutch hate industrialtune" to "./sync/new_file4.txt"...
Time to complete synchronous read/writes: 1.42 seconds

Как видно, на моем компьютере выполнение скрипта занимает около 1,42 сек. Не так уж плохо.

Теперь запустим только что созданную версию скрипта с использованием многопоточности:

Starting...
Writing "college k-dubstep" to "./threading/new_file2.txt"...
Writing "swiss dirt" to "./threading/new_file0.txt"...
Writing "bop idol alternative" to "./threading/new_file4.txt"...
Writing "ethertrio" to "./threading/new_file1.txt"...
Writing "beach aust shanty français" to "./threading/new_file3.txt"...
Time to complete threading read/writes: 0.77 seconds

Первое, что может броситься в глаза, - это то, что функции выполняются не по порядку: 2 - 0 - 4 - 1 - 3

Это связано с асинхронным характером потоковой передачи: когда одна функция переходит к ожиданию ответа, начинается другая и т. Д. Поскольку мы можем продолжать выполнять задачи, пока ждем завершения других (из-за сетевых операций или операций ввода-вывода файлов), вы также могли заметить, что мы сокращаем время примерно вдвое: 0,77 секунды. Хотя сейчас это может показаться не очень большим отрывом, легко представить себе очень реальный случай создания веб-приложения, которому необходимо записывать гораздо больше данных в файл или взаимодействовать с гораздо более сложными веб-сервисами.

Итак, если многопоточность настолько хороша, почему бы нам не закончить статью в этом месте?

Потому что есть даже лучшие способы одновременного выполнения задач.

# Asyncio

Давайте посмотрим на пример использования asyncio. Для этого нам потребуется установить модуль aiohttp с помощью pip. Это позволит нам делать неблокирующие запросы и получать ответы с использованием синтаксиса async/await, который будет представлен в ближайшее время. Он также имеет дополнительное преимущество в виде функции, которая преобразует ответ JSON без необходимости импорта библиотеки json. Мы также установим и импортируем aiofiles, что позволит выполнять неблокирующие файловые операции. Помимо aiohttp и aiofiles, импортируйте asyncio, который является частью стандартной библиотекой Python.

«Неблокирующий» (non-blocking) означает, что программа позволяет другим потокам продолжать работу, пока она находится в состоянии ожидания. Это противоположно «блокирующему» коду, который полностью останавливает выполнение вашей программы на время ожидания ответа. Нормальные синхронные операции ввода-вывода страдают от этого ограничения.

После того как мы установили и импортировали требуемые модули, взглянем на асинхронную версию функции write_genre из нашего примера threading:

sync def write_genre(file_name):
    """
    Использует генератор binaryjazz.us чтобы записать 
    название случайного жанра в файл с именем file_name.
    """

    async with aiohttp.ClientSession() as session:
        async with session.get(
                "https://binaryjazz.us/wp-json/genrenator/v1/genre/"
            ) as response:
            genre = await response.json()

    async with aiofiles.open(file_name, "w") as new_file:
        print(f'Запись "{genre}" в файл "{file_name}"...')
        await new_file.write(genre)

Для тех, кто не знаком с синтаксисом async/await, который можно найти во многих других современных языках, async объявляет, что функция, цикл for или оператор with должны использоваться асинхронно. Чтобы вызвать асинхронную функцию, вы должны либо использовать ключевое слово await из другой асинхронной функции, либо вызвать create_task() непосредственно из цикла событий, который можно получить из asyncio вызвав get_event_loop(), то есть loop = asyncio.get_event_loop().

Кроме того:

  1. async with позволяет ожидать асинхронных ответов и файловых операций.
  2. async for (здесь не используется) выполняет итерацию по асинхронному потоку (opens new window).

# Цикл событий (event loop)

Циклы событий - это конструкции, присущие асинхронному программированию, которые позволяют выполнять задачи асинхронно. Так как вы читаете эту статью, я могу предположить, что вы, вероятно, не слишком знакомы с этой концепцией. Однако, даже если вы никогда не писали асинхронных приложений, у вас есть опыт работы с циклами событий каждый раз, когда вы используете компьютер. Независимо от того, ожидает ли ваш компьютер ввод с клавиатуры, играете ли вы в многопользовательские онлайн-игры или просматриваете Reddit, в тот момент, когда файлы копируются в фоновом режиме, цикл событий является движущей силой, которая обеспечивает бесперебойную и эффективную работу данных процессов. По своей сути, цикл событий - это процесс, который ожидает срабатывания некоторых триггеров, а затем выполняет определенные (запрограммированные) действия, как только эти триггеры срабатывают. Триггеры часто возвращают какое-то объект, будь то «promise» (синтаксис JavaScript) или «future» (синтаксис Python) как признак того, что задача была добавлена. После завершения задачи future возвращает значение, переданное обратно из вызываемой функции (при условии, что функция действительно возвращает значение).

Передача функции в качестве одного из параметров другой функции и как следсвтвие выполнение некоторой функции в ответ на вызов другой называется «обратным вызовом» (callback).

Еще один взгляд на обратные вызовы и события в великолепном ответе на Stack Overflow (opens new window):

«Обратный вызов» - это любая функция, которая вызывается другой функцией, которая принимает первую функцию в качестве параметра. В большинстве случаев «обратный вызов» - это функция, которая вызывается, когда что-то происходит. Это что-то можно назвать «событием» на языке программистов.

Представьте себе такой сценарий: вы ожидаете посылку через пару дней. Посылка - подарок соседу. Поэтому, получив посылку, вы хотите, чтобы она была доставлена ​​соседям. Вы уезжаете из города, поэтому вы оставляете инструкции для супруги.

Вы могли бы сказать ей, чтобы она забрала пакет и отнесла его соседям. Если бы ваша супруга был глупа, как компьютер, она сидела бы у двери и ждали посылку, пока она не пришла (НИЧЕГО НЕ ДЕЛАТЬ), а затем, как только она пришла, отнесла бы ее соседям. Но есть способ получше. Скажите супруге, что ОДИН РАЗ, когда она получит посылку, она должна принести ее соседям. Таким образом она может жить нормальной жизнью ДО тех пор, пока не получит посылку.

В нашем примере получение пакета - это «событие», а его передача соседям - «обратный вызов». Ваша супруга "выполняет" ваши инструкции, чтобы принести посылку, только когда посылка прибудет. Намного лучше!

Такое мышление очевидно в повседневной жизни, но у компьютеров нет такого же здравого смысла. Рассмотрим, как программисты обычно пишут в файл:

fileObject = open(file)
# тепрь после того как мы ДОЖДАЛИСЬ открытия файла, мы можем писать в него
fileObject.write("We are writing to the file.")
# теперь мы можем продолжать делать другие, совершенно не связанные с нашей программой вещи

Здесь мы ЖДЕМ открытия файла, прежде чем писать в него. Это «блокирует» поток выполнения, и наша программа не может делать ничего из того, что ей может понадобиться! Что, если бы мы могли сделать это вместо этого:

# мы передаем writeToFile (ФУНКЦИЮ ОБРАТНОГО ВЫЗОВА (CALLBACK)!) в функцию open
fileObject = open(file, writeToFile)
# выполнение продолжается - мы не ждем пока файл будет открыт
# КОГДА файл будет открыт мы пишем в него, но пока мы ждем МЫ МОЖЕМ ВЫПОЛНЯТЬ >> ДРУГИЕ ЗАДАЧИ!

Вот пошаговый разбор нашей функции:

Мы используем async with для асинхронного открытия клиентского сеанса. Класс aiohttp.ClientSession() позволяет нам делать HTTP-запросы и оставаться подключенными к источнику, не блокируя выполнение нашего кода. Затем мы делаем асинхронный запрос к Genrenator API и ожидаем ответа в формате JSON (случайный музыкальный жанр). В следующей строке мы снова используем async with с библиотекой aiofiles, чтобы асинхронно открыть новый файл для записи нашего нового жанра. Выводим жанр на экран, после чего записываем его в файл.

В отличие от обычных скриптов Python, программирование с помощью asyncio в значительной степени требует использования какой-то «основной» функции.

Это потому, что вам нужно использовать ключевое слово «async», чтобы использовать синтаксис «await», а синтаксис «await» - это единственный способ фактически запустить другие асинхронные функции.

Вот наша main функция:

async def main():
    tasks = []

    for i in range(5):
        tasks.append(write_genre(f"./async/new_file{i}.txt"))

    await asyncio.gather(*tasks)

asyncio.run(main())

Как видите, мы объявили ее как асинхронную с помощью «async». Затем мы создаем пустой список tasks, для размещения наших асинхронных задач (вызовов Genrenator и нашего файлового ввода-вывода). Мы добавляем наши задачи в список tasks, но они еще не выполняются. Вызовы фактически не выполняются до тех пор, пока мы не запланируем их с помощью await asyncio.gather(* tasks). Данный метод запускает все задачи из нашего списка и ожидает их завершения, прежде чем продолжить работу с остальной частью нашей программы. Наконец, мы используем asyncio.run(main ()) для запуска нашей «main» функции. Функция .run() - это точка входа в нашу программу, и обычно ее следует вызывать только один раз для каждого процесса.

Все что нам было нужно сделать. Теперь, запускаем нашу программу (исходник включает тот же метод измерения времени выполнения функции точ и пример thread)...

Writing "albuquerque fiddlehaus" to "./async/new_file1.txt"...
Writing "euroreggaebop" to "./async/new_file2.txt"...
Writing "shoedisco" to "./async/new_file0.txt"...
Writing "russiagaze" to "./async/new_file4.txt"...
Writing "alternative xylophone" to "./async/new_file3.txt"...
Time to complete asyncio read/writes: 0.71 seconds

... мы видим что скрипт выполнен даже быстрее чем в случае с threading. И в целом метод asyncio всегда будет немного быстрее, чем метод threading. Это потому, что, когда мы используем синтаксис «await», мы, по сути, говорим нашей программе «подожди, я сейчас вернусь», но наша программа отслеживает, сколько времени у нас уходит на то, чтобы закончить то, что мы делаем. Как только мы закончим, наша программа узнает об этом и возобновит работу, как только сможет. Многопоточность в Python допускает асинхронность, но наша программа теоретически может пропускать разные потоки, которые могут быть еще не готовы, затрачивая время если есть потоки, готовые продолжить работу.

Итак, когда стоит использовать threading, а когда - asyncio?

Когда вы пишете новый код, используйте asyncio. Если вам нужно взаимодействовать со старыми библиотеками или теми, которые не поддерживают asyncio, вам может быть лучше использовать threading.

# Тестирование asyncio

# Тестирование asyncio с pytest-asyncio

Оказывается, тестировать асинхронные функции с помощью pytest так же просто, как тестировать синхронные функции. Просто установите пакет pytest-asyncio (opens new window) с помощью pip, пометьте свои тесты ключевым словом async и примените декоратор, который сообщает pytest об асинхронности: @pytest.mark.asyncio. Давайте посмотрим на пример.

Для начала напишем произвольную асинхронную функцию в файл hello_asyncio.py:

import asyncio

async def say_hello(name: str):
    """ Задержать поток на 2 сек, затем вывести 'Hello, {{ name }}!' """
    try:
        if type(name) != str:
            raise TypeError('"name" должно быть string')
        if name == "":
            raise ValueError('"name" не может быть пустым')
    except (TypeError, ValueError):
        raise

    print('Sleeping...')
    await asyncio.sleep(2)
    print(f"Hello, {name}!")

Функция принимает единственный строковый аргумент: имя. Убедившись, что имя является строкой с длиной больше 1, наша функция асинхронно засыпает в течение двух секунд, а затем печатает «Hello, {name}!» к консоли.

Разница между asyncio.sleep() и time.sleep() в том что asyncio.sleep() является неблокирущей функцией.

Теперь давайте протестируем say_hello() с помощью pytest. В том же каталоге, что и hello_asyncio.py, создайте файл с именем test_hello_asyncio.py, затем откройте его в своем любимом текстовом редакторе.

import pytest # Note: pytest-asyncio не требует отдельного импорта
import asyncio

from hello_asyncio import say_hello

@pytest.mark.parametrize('name', [
    'Robert Paulson',
    'Seven of Nine',
    'x Æ a-12'
])
@pytest.mark.asyncio
async def test_say_hello(name):
    await say_hello(name)

Здесь:

  • Декоратор @pytest.mark.asyncio позволяет pytest работать асинхронно
  • В нашем тесте используется синтаксис async
  • Мы ждем await нашу асинхронную функцию, как если бы мы запускали ее вне теста.

Теперь давайте запустим наш тест с параметром -v:

...
collected 3 items

test_hello_asyncio.py::test_say_hello[Robert Paulson] PASSED    [ 33%]
test_hello_asyncio.py::test_say_hello[Seven of Nine] PASSED     [ 66%]
test_hello_asyncio.py::test_say_hello[x \xc6 a-12] PASSED       [100%]

Выглядит хорошо. Далее мы напишем пару тестов с плохим вводом. Вернувшись в test_hello_asyncio.py, давайте создадим класс под названием TestSayHelloThrowsExceptions:

    @pytest.mark.parametrize('name', [
        '',
    ])
    @pytest.mark.asyncio
    async def test_say_hello_value_error(self, name):
        with pytest.raises(ValueError):
            await say_hello(name)

    @pytest.mark.parametrize('name', [
        19,
        {'name', 'Diane'},
        []
    ])
    @pytest.mark.asyncio
    async def test_say_hello_type_error(self, name):
        with pytest.raises(TypeError):
            await say_hello(name)

Снова мы используем декоратор @pytest.mark.asyncio, используя async синтаксис для наших тестов, затем вызываем нашу функцию с await.

Запустим тесты снова:

pytest -v
...
collected 7 items

test_hello_asyncio.py::test_say_hello[Robert Paulson] PASSED                                    [ 14%]
test_hello_asyncio.py::test_say_hello[Seven of Nine] PASSED                                     [ 28%]
test_hello_asyncio.py::test_say_hello[x \xc6 a-12] PASSED                                       [ 42%]
test_hello_asyncio.py::TestSayHelloThrowsExceptions::test_say_hello_value_error[] PASSED        [ 57%]
test_hello_asyncio.py::TestSayHelloThrowsExceptions::test_say_hello_type_error[19] PASSED       [ 71%]
test_hello_asyncio.py::TestSayHelloThrowsExceptions::test_say_hello_type_error[name1] PASSED    [ 85%]
test_hello_asyncio.py::TestSayHelloThrowsExceptions::test_say_hello_type_error[name2] PASSED    [100%]

# Без использования pytest-asyncio

В качестве альтернативы pytest-asyncio, можно создать pytest фикстуру которая генерирует цикл событий asyncio:

@pytest.fixture
def event_loop():
    loop = asyncio.get_event_loop()
    yield loop

Затем вместо использования async/await синтаксиса, вы создаете ваши тесты как обычные синхронные:

@pytest.mark.parametrize('name', [
    'Robert Paulson',
    'Seven of Nine',
    'x Æ a-12'
])
def test_say_hello(event_loop, name):
    event_loop.run_until_complete(say_hello(name))

class TestSayHelloThrowsExceptions:
    @pytest.mark.parametrize('name', [
        '',
    ])
    def test_say_hello_value_error(self, event_loop, name):
        with pytest.raises(ValueError):
            event_loop.run_until_complete(say_hello(name))

    @pytest.mark.parametrize('name', [
        19,
        {'name', 'Diane'},
        []
    ])
    def test_say_hello_type_error(self, event_loop, name):
        with pytest.raises(TypeError):
            event_loop.run_until_complete(say_hello(name))

Если вам интересно и вы владеете английским, здесь можно почитать о продвинутых техниках тестирования asyncio (opens new window)

# Дальнейшее чтение (на английском)

Если вы хотите узнать больше о том, что отличает реализацию threading в Python от asyncio, вот отличная статья на Medium (opens new window).

Лучшие примеры и объяснения многопоточности в Python можно найти в видео Кори Шафера (opens new window), в котором содержится более подробное описание, в том числе с использованием библиотеки concurrent.futures.

Наконец, для глубокого погружения в сам asyncio, вот статья из Real Python (opens new window), полностью посвященная ему.

Бонус: еще одна библиотека, которая может вас заинтересовать, называется Unsync (opens new window), особенно если вы хотите легко преобразовать текущий синхронный код в асинхронный. Чтобы использовать его, вы устанавливаете библиотеку с помощью pip, импортируете ее с помощью from unsync import unsync, а затем декорируете любую текущую синхронную функцию с помощью @unsync, чтобы сделать ее асинхронной. Чтобы дождаться выполнения асинхронной функции и получить результат (что можно делать где угодно - оно не обязательно должно быть в функции async/unsync), просто вызовите .result () после вызова функции.

# Параллельное выполнение кода (parallelism)

Параллелизм во многом связан с конкурентностью. Фактически, параллелизм - это подмножество конкурентности: в то время как конкурентный процесс выполняет несколько задач одновременно, переключаясь между задачами, параллельный процесс физически выполняет несколько задач одновременно. Хорошим примером может быть вождение, прослушивание музыки и одновременное употребление сендвича, который мы сделали в предыдущем разделе.

Поскольку они не требуют больших усилий, вы можете выполнить их все сразу, не дожидаясь чего-либо или отвлекая свое внимание.

Теперь давайте посмотрим, как это реализовать в Python. Мы могли бы использовать библиотеку multiprocessing, но давайте вместо этого воспользуемся библиотекой concurrent.futures - это избавляет от необходимости вручную управлять количеством процессов. Поскольку основное преимущество многопроцессорности возникает, когда вы выполняете несколько задач с высокой нагрузкой на процессор, мы собираемся вычислить квадраты от 1 миллиона (1000000) до 1 миллиона и 16 (1000016).

Нам потребуется лишь один импорт concurrent.futures:

import concurrent.futures

if __name__ == "__main__":
    pow_list = [i for i in range(1000000, 1000016)]

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = [executor.submit(pow, i, i) for i in pow_list]

    for f in concurrent.futures.as_completed(futures):
        print('okay')

Поскольку я разрабатываю на машине Windows, я использую if __name__ == "main". Это необходимо, потому что Windows не имеет системного вызова fork, присущего системам Unix. Поскольку Windows не имеет такой возможности, она запускает новый интерпретатор с каждым процессом, который пытается импортировать основной модуль. Если основной модуль не существует, он повторно запускает всю вашу программу, вызывая рекурсивный хаос.

Итак, посмотрим на нашу основную функцию, мы используем генератор списка (list comprehension) чтобы создать список от 1 миллиона до 1 миллиона и 16, мы открываем ProcessPoolExecutor с concurrent.futures, и мы используем генератор списка и ProcessPoolExecutor().Submit() чтобы запустить наши процессы и добавить их в список под названием «futures».

Мы также могли бы использовать ThreadPoolExecutor(), если бы хотели вместо этого использовать потоки - concurrent.futures универсален.

И здесь проявляется асинхронность: список «results» фактически не содержит результатов выполнения наших функций. Вместо этого он содержит «futures», которые похожи на идею «promises» в JavaScript. Чтобы наша программа продолжала работать, мы возвращаем эти futures, которые представляют собой заполнитель для значения. Если мы попытаемся вывести на печать future, в зависимости от того, завершена она или нет, мы получим либо состояние «pending» (ожидает), либо «finished» (завершено). По завершении мы можем получить возвращаемое значение (при условии, что оно есть) с помощью var.result(). В этом случае наш var будет "result".

Затем мы перебираем наш список futures, но вместо того, чтобы печатать наши значения, мы просто печатаем «okay». По причине того, насколько большими оказываются полученные результаты вычислений.

Как и раньше, я создал сценарий сравнения, который делает это синхронно, вы можете найти его на GitHub (opens new window).

Запустив нашу управляющую программу, которая также включает в себя функции для синхронизации нашей программы, мы получаем:

Starting...
okay
...
okay
Time to complete: 54.64

54,64 секунды - довольно много времени. Посмотрим, будет ли наша версия с многопроцессорностью лучше:

Starting...
okay
...
okay
Time to complete: 6.24

Наше время значительно сократилось и составило примерно в 1/9 от первоначального.

Что бы произошло, если бы вместо этого мы использовали потоки?

Я уверен, вы можете догадаться - это было бы не намного быстрее, чем при синхронном выполнении. Фактически, это может быть медленнее, потому что для запуска новых потоков все же требуется немного времени и ресурсов. Но не верьте мне на слово, вот что мы получаем, когда заменяем ProcessPoolExecutor() на ThreadPoolExecutor():

Starting...
okay
...
okay
Time to complete: 53.83

Как я упоминал ранее, многопоточность позволяет вашим приложениям сосредоточиться на новых задачах, пока другие ждут. В этом случае мы никогда не сидим сложа руки. С другой стороны, многопроцессорность запускает совершенно новые службы, обычно на отдельных ядрах ЦП, готовые делать все, что вы попросите, полностью в тандеме с тем, что еще делает ваш скрипт. Вот почему многопроцессорная версия, занимающая примерно 1/9 времени, имеет смысл - у меня 8 ядер в моем процессоре.

Теперь, когда мы поговорили о конкурентности и параллелизме в Python, мы можем, наконец, внести ясность в термины.

# Дальнейшее чтение (на английском)

В Real Python есть отличная статья о конкурентности и параллелизме (opens new window).

У Engineer Man есть хорошее видео (opens new window), в котором сравниваются потоки и многопроцессорность.

Кори Шафер также имеет хорошее видео о многопроцессорной (opens new window) обработке в том же духе, что и его видео с потоками.

Если вы посмотрите только одно видео, посмотрите превосходное выступление Раймонда Хеттингера (opens new window). Он великолепно объясняет разницу между многопроцессорностью, многопоточностью и асинхронностью.

# Комбинирование AsyncIO с Multipocessing

Что, если мне нужно совместить множество операций ввода-вывода с тяжелыми вычислениями?

Мы тоже можем это сделать. Допустим, вам нужно обработать данные со 100 веб-страниц (скрапинг) для нахождения определенного фрагмента информации, а затем вам нужно сохранить этот фрагмент информации в файле для дальнейшего использования. Мы можем разделить вычислительную мощность между ядрами нашего компьютера, заставив каждый процесс сканировать часть страниц.

Для этого скрипта давайте установим BeautifulSoup, чтобы облегчить процесс скрапинга наших страниц: pip install beautifulsoup4. На этот раз у нас действительно довольно много импорта.

Во-первых, мы собираемся создать асинхронную функцию, которая отправляет запрос в Википедию для возврата случайных страниц. Мы отсканируем каждую полученную страницу по ее заголовку с помощью BeautifulSoup, а затем добавим его в указанный файл; мы разделим каждый заголовок табуляцией. Функция примет два аргумента:

  1. num_pages - количество страниц для запроса и скрапинга страницы в поиске заголовка
  2. output_file - файл для добавления наших заголовков
import asyncio                 # Даст нам async/await
import aiohttp                 # Для асинхронного выполнения HTTP запросов
import aiofiles                # Дла асинхронного выполнения операций с файлами
import concurrent.futures      # Позволяет создать новый процесс
from multiprocessing import cpu_count # Вернет количество ядер процессора
from bs4 import BeautifulSoup  # Для скрапинга страниц
from math import floor         # Поможет разделить запросы между ядрами CPU

async def get_and_scrape_pages(num_pages: int, output_file: str):
    async with \
    aiohttp.ClientSession() as client, \
    aiofiles.open(output_file, "a+", encoding="utf-8") as f:

        for _ in range(num_pages):
            async with client.get(
                    'https://en.wikipedia.org/wiki/Special:Random'
                ) as response:
                if response.status > 399:
                    response.raise_for_status()

                page = await response.text()
                soup = BeautifulSoup(page, features="html.parser")
                title = soup.find("h1").text

                await f.write(title + "\t")

        await f.write("\n")

Мы асинхронно открываем aiohttp ClientSession и наш выходной файл. Режим "+" означает добавление в конец файла и создание его, если он еще не существует. Кодирование наших строк как utf-8 гарантирует, что мы не получим ошибки, если наши заголовки содержат unicode символы. Если мы получим ответ об ошибке, мы поднимем его вместо продолжения (при больших объемах запросов я получал код ответа 429 Too Many Requests). Мы асинхронно получаем текст из нашего ответа, затем мы анализируем заголовок и асинхронно и добавляем его в наш файл. После того, как мы добавим все наши заголовки, мы добавим новую строку: «\n».

Наша следующая функция - это функция, которую мы запускаем с каждым новым процессом, чтобы разрешить его асинхронный запуск:

def start_scraping(num_pages: int, output_file: str):
    asyncio.run(get_and_scrape_pages(num_pages, output_file))

Теперь о нашей main функции. Начнем с некоторых констант (и нашего объявления функции):

def main():
    NUM_PAGES = 100 # Суммарное количество страниц для скрапинга
    NUM_CORES = cpu_count() # Количество ядер CPU (влкючая логические)
    OUTPUT_FILE = "./wiki_titles.tsv" # Файл для добавления заголовков

    PAGES_PER_CORE = floor(NUM_PAGES / NUM_CORES)
    # Для нашего последнего ядра
    PAGES_FOR_FINAL_CORE = PAGES_PER_CORE + NUM_PAGES % PAGES_PER_CORE 

И теперь логика:

futures = [] # To store our futures

    with concurrent.futures.ProcessPoolExecutor(NUM_CORES) as executor:
        for i in range(NUM_CORES):
            new_future = executor.submit(
                start_scraping, # Function to perform
                # v Arguments v
                num_pages=PAGES_PER_CORE,
                output_file=OUTPUT_FILE,
            )
            futures.append(new_future)

        futures.append(
            executor.submit(
                start_scraping, PAGES_FOR_FINAL_CORE, OUTPUT_FILE
            )
        )

    concurrent.futures.wait(futures)

Мы создаем массив для хранения наших future, затем мы создаем ProcessPoolExecutor, устанавливая его max_workers равным нашему количеству ядер. Мы перебираем диапазон, равный нашему количеству ядер минус 1, запускаем новый процесс с нашей функцией start_scraping. Затем мы добавляем его в наш список futures. Нашему последнему ядру потенциально потребуется дополнительная работа, поскольку оно будет скрапить количество страниц то же что и другие ядра, но дополнительно выполняет скрапинг страниц, количество которых равно остатку, который мы получили при разделении нашего общего количества страниц, по нашему общему количеству ядер процессора.

Убедитесь, что вы действительно запустили main функцию:

if __name__ == "__main__":
    main()

После запуска программы на моем 8-ядерно процессоре:

Для asyncio вместе с multiprocessing:

Time to complete: 5.65 seconds.

Только mulitporcessing:

Time to complete: 8.87 seconds.

Только asyncio:

Time to complete: 47.92 seconds.

Полностью синхронный код:

Time to complete: 88.86 seconds.

На самом деле я очень удивлен, увидев, что улучшение asyncio с многопроцессорной обработкой по сравнению с простой многопроцессорной обработкой не было таким большим, как я думал.

# Резюме: когда использовать multiprocessing, asyncio или threading

  1. Используйте многопроцессорность (multiprocessing), когда вам нужно выполнить много тяжелых вычислений, и вы можете их разделить.
  2. Используйте asyncio или многопоточность(threading), когда вы выполняете операции ввода-вывода - общение с внешними ресурсами или чтение/запись из/в файлы.
  3. Многопроцессорность и asyncio можно использовать вместе, но хорошее практическое правило состоит в том, чтобы разветвлять процесс перед потоком/использованием asyncio, а не наоборот - потоки относительно дешевы по сравнению с процессами.

# Async/Await в других языках

async/await и аналогичный синтаксис также существует в других языках, и в некоторых из этих языков его реализация может сильно отличаться.

# JavaScript

Синтаксис async/await, впервые представленный в ES6, по сути, является абстракцией над Promise JavaScript (которые похожи на Futures Python). Однако в отличие от Python, пока вы не в ожидании, вы можете вызывать асинхронную функцию обычно без специальной функции, такой как asyncio.start() в Python:

// Объявляем функцию с async
async function returnHello(){
    return "hello world";
}

async function printSomething(){
    // ожидаем async строку
    const result = await returnHello();

    // выводим строку полученную асинхронно
    console.log(result);
}

// Запуск нашего асинхронного кода
printSomething();

# Java

Java не имеет встроенного синтаксиса async/await, но имеет возможности параллелизма с использованием модуля java.util.concurrent. Однако Electronic Arts написала библиотеку (opens new window) Async, которая позволяет использовать ожидание в качестве метода. Это не совсем то же самое, что Python/C#/JavaScript/Rust, но на него стоит обратить внимание, если вы разработчик Java и заинтересованы в такой функциональности.

# C++

Хотя C ++ также не имеет синтаксиса async/await, у него есть возможность использовать futures для одновременного выполнения кода с использованием модуля futures:

#include <iostream>
#include <string>

// Необходимо для futures
#include <future>

// Декларация async не требуется
std::string return_hello() {
    return "hello world";
}

int main ()
{
    // Объявление string future
    std::future<std::string> fut = std::async(return_hello);

    // Ожидает результат от future
    std::string result = fut.get();

    // Печатает строку полученную асинхронно
    std::cout << result << '\n';
}

Нет необходимости объявлять функцию с каким-либо ключевым словом, чтобы указать, может ли она и должна ли она выполняться асинхронно. Вместо этого вы объявляете первоначальный future всякий раз, когда он вам нужен, с помощью std::future<тип_возвращаемый_функцией> и устанавливаете его равным std::async(), включая имя функции, которую вы хотите выполнять асинхронно вместе с любые аргументы, которые он принимает - например, std::async(do_something, 1, 2, "string"). Чтобы дождаться значения future, используйте для него синтаксис .get().

Вы можете найти более подробную документацию по async в C++ (opens new window) на cplusplus.com.

# Заключение

Независимо от того, работаете ли вы с асинхронными сетевыми или файловыми операциями или выполняете множество сложных вычислений, есть несколько различных способов максимизировать эффективность вашего кода.

Если вы используете Python, вы можете использовать asyncio или threading, чтобы максимально использовать операции ввода-вывода, или модуль multiprocessing для кода, интенсивно использующего CPU.

Также помните, что модуль concurrent.futures может использоваться вместо threading или multiprocessing.

Если вы используете другой язык программирования, скорее всего, для него тоже есть реализация async/await.