Как запускать периодические задачи в Celery
Перевод статьи Antonio Di Mariano: How to run periodic tasks in Celery (opens new window).
Недавно я столкнулся с необходимостью реализовать серию задач (tasks), которые будут запускаться периодически для выполнения некоторых заданий. Такой сценарий хорошо подходит для crontab, но я решил попробовать Celery и использовать Celery Beat.
# Назначение и преимущества Celery
Прежде всего, давайте кратко рассмотрим, что такое Celery и в чем он хорош. Celery - это библиотека Python, используемая для обработки трудоемких задач и делегирования их отдельным процессам или распределенным сетевым узлам, чтобы снизить нагрузку на веб-серверы / серверные службы.
Большинство случаев использования показывают, что Celery идеально подходит для веб-проектов.
Я не веб-разработчик, и мой главный интерес - проектирование и разработка того, что обычно называют backend микросервисами. Celery - хорошее решение для запуска заданий по расписанию в микросервисной архитектуре.
Celery использует архитектуру опирающуюся на два основных компонента: Очереди (Queues) и Обработчики (Workers). Очередь (Queue) (opens new window) - это структура данных, куда наши задачи (tasks) отправляются и откуда они извлекаются обработчиками (workers), чтобы быть выполненными. Задача (task) - некоторый объем кода который мы поручаем выполнить Celery в определенное время или периодически, например, отправка электронного письма или создание отчета в конце месяца. Здесь следует отметить одну важную деталь: сама очередь не входит в состав библиотеки. Для этой цели должен использоваться внешний брокер сообщений, такой как Redis или RabbitMQ. Использование внешних брокеров сообщений в качестве очереди делает наше приложение надежным и способным управлять поставленными в очередь задачами даже после сбоя нашего приложения. Celery реализует обработчиков с помощью пула выполнения, поэтому количество задач, которые может выполнять каждый обработчики, зависит от количества процессов в пуле выполнения. Celery позволяет нам отслеживать, а также настраивать различные правила повторного запуска задач, которые не были выполнены по какой либо причине. В то же время Celery позволяет ограничить количество задач выполняемых одновременно. Мы можем использовать Celery для реализации:
- Периодических задач: задачи которые нам необходимо выполнить в определенное время или через заданный промежуток. Например создание ежемесячного отчета или периодической проверки ресурсов.
- 3-стороннее асинхронное взаимодействие: Web-приложение должно обслуживать пользователей, не дожидаясь завершения других действий во время загрузки страницы, таких как отправка e-mail, уведомлений, обновления внутренних компонентов (таких как A/B тестирование либо системное логирование). Здесь Celery выступает 3-й стороной позволяющей реализовать асинхронное взаимодействие между front-end и back-end
- Долгосрочные задачи: задачи, требующие больших затрат ресурсов, при которых пользователю приходится ожидать вычисления результата, например выполнение сложных рабочих процессов (DAG workflow), создание графических элементов, обслуживание мультимедийного контента (видео, аудио).
# Планировщик Celery Beat
Чтобы добиться асинхронного выполнения перечня запланированных задач, я буду использовать Celery Beat.
Celery Beat - это планировщик, который через регулярные промежутки времени объявляет задачи, которые будут выполняться рабочими узлами в кластере. Как сообщает официальный сайт:
По умолчанию записи берутся из настройки beat_schedule, но также можно использовать настраиваемые хранилища, например, для хранения записей в базе данных SQL. Вы должны убедиться, что одновременно работает только один планировщик, иначе у вас будут дублирующиеся задачи. Использование централизованного подхода означает, что расписание не нужно синхронизировать, и служба может работать без использования блокировок.
Хороший вопрос, который я задал себе перед тем, как пробовать Celery, - зачем мне использовать его? Почему не crontab?
Ну, во-первых, для того, чтобы использовать службу crond, вам нужно иметь права root'ом вашего nix-дистрибутива, а иногда данные права отсутствуют. Не говоря уже о том, что я никогда не был большим поклонником использования скрипта crontab в распределенном приложении. Более того, использование внешнего решения для управления задачами позволяет не беспокоиться о том, где будет развернуто приложение, если оно имеет доступ к Celery (каждая среда, которая может запускать Python, также может получить доступ к Celery)
Так что портативность - всегда хороший повод для выбора решения.
# Настройка Celery
Чтобы начать работу с Celery, просто следуйте пошаговым инструкциям в официальной документации (opens new window).
# Планировщик Celery
Чтобы создавать периодические задачи, нам нужно определить их с помощью параметра beat_scheduler. Celery Beat проверяет параметр beat_scheduler для управления задачами, которые необходимо выполнять по расписанию.
В моем примере я использую Redis в качестве брокера сообщений. Итак, первым делом нужно сообщить Celery, кто является его брокером сообщений.
Давайте создадим файл task.py
#tasks.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
Первый аргумент Celery - это имя текущего модуля, задачи в моем примере. Таким образом, имена могут быть автоматически сгенерированы. Второй аргумент - это ключевое слово брокера, указывающее URL-адрес брокера сообщений. Допустим, нам нужно проверять наличие заданного задания каждые 10 секунд.
#tasks.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def check():
print('I am checking your stuff')
app.conf.beat_schedule = {
'run-me-every-ten-seconds': {
'task': 'tasks.check',
'schedul': 10.0
}
}
Мы говорим beat_schedule запускать функцию check() каждые 10,0 секунд. Функция для запуска идентифицируется <имя модуля>.<имя функции>.
Если мы запустим это с помощью следующей команды:
celery -A tasks beat --loglevel=INFO
В консоли мы увидим несколько строк вывода, которые каждые 10 секунд будут запускать задание tasks.check.
celery beat v4.3.0 is starting.
__ — … __ — _
LocalTime -> 2019–10–07 11:52:35
Configuration ->
. broker -> redis://localhost:6379/0
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> 5.00 minutes (300s)
[2019–10–07 11:52:35,266: INFO/MainProcess] beat: Starting…
[2019–10–07 11:52:45,282: INFO/MainProcess] Scheduler: Sending due task run-me-every-ten-seconds (tasks.check)
[2019–10–07 11:52:55,271: INFO/MainProcess] Scheduler: Sending due task run-me-every-ten-seconds (tasks.check)
[2019–10–07 11:53:05,271: INFO/MainProcess] Scheduler: Sending due task run-me-every-ten-seconds (tasks.check)
[2019–10–07 11:53:15,271: INFO/MainProcess] Scheduler: Sending due task run-me-every-ten-seconds (tasks.check)
[2019–10–07 11:53:25,271: INFO/MainProcess] Scheduler: Sending due task run-me-every-ten-seconds (tasks.check)
tasks.check
запускается каждые 10 секунд, но где он на самом деле выполняется? Как было сказано ранее, Celery использует настроенный брокер сообщений для отправки и получения сообщений, в нашем примере Redis является брокером сообщений.
Celery beat отправляет задачу tasks.check
в очередь Redis каждые 10 секунд.
# Работа обработчика (worker)
Обработчики получают сообщения, извлекая их из очереди. В настоящий момент мы видим, что Celery Beat периодически добавляет задачи в очередь. Мы хотим, чтобы обработчики выполнили задачи.
В другом терминале запускаем нашего обработчика (worker):
celery -A task worker --loglevel=INFO
В консоли при этом мы увидим нечто подобное:
[2019–10–07 11:53:17,310: INFO/ForkPoolWorker-8] Task tasks.check[db70d065-b4f6–48f3–8b52–5187090da703] succeeded in 0.00230525199999998s: None
[2019–10–07 11:53:17,312: INFO/MainProcess] Received task: tasks.check[08059687-d0b4–4bab-b824–28f11487c393]
[2019–10–07 11:53:17,313: INFO/MainProcess] Received task: tasks.check[aa215cac-fca6–4d2e-8ccc-d827a2946280]
[2019–10–07 11:53:17,312: WARNING/ForkPoolWorker-2] I am checking your stuff
[2019–10–07 11:53:17,314: INFO/ForkPoolWorker-2] Task tasks.check[3cd58782-c592–49e1–8d6f-5955aa843583] succeeded in 0.0018000820000001472s: None
[2019–10–07 11:53:17,315: INFO/MainProcess] Received task: tasks.check[84b5bc2a-f0fc-4d47–9601-ed3651c8fb7c]
[2019–10–07 11:53:17,316: WARNING/ForkPoolWorker-4] I am checking your stuff
[2019–10–07 11:53:17,316: WARNING/ForkPoolWorker-5] I am checking your stuff
[2019–10–07 11:53:17,317: INFO/ForkPoolWorker-4] Task tasks.check[08059687-d0b4–4bab-b824–28f11487c393] succeeded in 0.002428212999999957s: None
[2019–10–07 11:53:17,318: INFO/ForkPoolWorker-5] Task tasks.check[aa215cac-fca6–4d2e-8ccc-d827a2946280] succeeded in 0.0025583710000001147s: None
Как видим, обработчик получил задачу и вскоре она выполнена. Это простейший пример, но в реальном приложении время выполнения задачи может быть совсем иным.
Celery позволяет нам иметь необходимую конфигурацию в папке. Давайте создадим папку «checker», следующей структуры:
- checker
- __init__.py
- celery.py
- celeryconfig.py
- tasks.py
Здесь мы видим, как различные параметры конфигурации можно разделить на отдельные файлы. celery.py
сообщит Celery, где найти файл конфигурации app.config_from_object
и файл содержащий код задач.
#celery.py
from celery import Celery
app = Celery('checker', include=['checker.tasks'])
app.config_from_object('checker.celeryconfig')
app.conf.beat_schedule = {
'run-me-every-ten-seconds': {
'task': 'checker.tasks.check',
'schedule': 10.0
}
}
# celeryconfig.py
broker_url = 'redis://localhost:6379/0'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Dublin'
enable_utc = True
#tasks.py
from checker.celery import app
@app.task
def check():
print('I am checking your stuff')
Итак запустим Celery Beat. Откройте терминал и перейдите на уровень выше только что созданной папки с именем checker.
Для запуска Celery Beat, мы вызываем имя настроенного нами модуля, в нашем случае checker
:
celery -A checker beat --loglevel=INFO
и в другом терминале запускаем обработчик:
celery -A checker worker --loglevel=INFO
Имейте в виду, что в окружении со значительным трафиком, таком как production, вероятно, было бы лучше запустить несколько обработчиков, чтобы обрабатывать несколько запросов одновременно.
Другой способ работы с запланированной задачей - использовать Crontab Schedules из Celery Вeat. Если нам нужно больше контроля над задачей для выполнения, мы можем использовать:
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks', broker='redis://localhost:6379/0')
app.conf.beat_schedule = {
# Executes every Monday morning at 7:30 a.m.
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}
Предположим у нас есть список пользователей, которым мы хотим ежедневно отправлять сообщения с задачами которые необходимо выполнить.
Для начала изменим celery.py
:
# celery.py
from celery import Celery
from celery.schedules import crontab
app = Celery('checker', include=['checker.tasks'])
app.config_from_object('checker.celeryconfig')
app.conf.beat_schedule = {
'everyday-task': {
'task': 'checker.tasks.remember_tasks_to_do',
'schedule': crontab(hour=7, minute=0)
}
}
Здесь мы указываем Celery запускать checker.tasks.remember_tasks_to_do
каждый день в 7:00. Измененный tasks.py:
#tasks.py
from checker.celery import app
import json
@app.task
def remember_tasks_to_do():
file_object = open('employees.json', 'r')
# Load JSON file data to a list of python dict object.
employees_list = json.load(file_object)
for employee in employees_list:
print(f'Good morning {employee.get("username")}.' +
f'This is your list of tasks to do {employee.get("tasks")}')
Исходный код может быть найден на github (opens new window)