Под Микроскопом — Celery и ThreadPoolExecutor

Alexander Podrabinovich
7 min readJun 12, 2021

--

В данном посте мы рассмотрим нюансы многопоточной работы внутри Celery задач. А так как рубрика наша называется “Под Микроскопом”, то разберем заодно как работает Celery Execution Pool и каких типов они бывают, а так же порассуждаем о потока и процессах. Заваривайте чаёк, пост будет длинным.

ThreadPoolExecutor — воспроизводим проблему

Итак, предположим, у нас стоит задача оптимизации работы задачи Celery. Предположим, что “бутылочное горлышко” нашей задачи — Input / Output операции, т.е. тратится основное время не на какие-то вычисления процессорные, а просто на ожидания ответа, скажем, от сервера с внешним ресурсом, при запросе к оному. Как такое оптимизировать? Правильно, первое, что приходит в голову, внедрить ThreadPoolExecutor, чтобы обрабатывать не один за другим запрос, а сразу 3–5–10 и т.д., насколько позволяет ваше железо. Что же, давайте реализуем на демо проекте.

Стартуем новый проект. Через pip устанавливаем celery, redis в качестве брокера и gevent для пула (подробнее об этом речь пойдет ниже). Если на вашей машине еще не стоит redis — то установите. Этот шаг я опущу. Так же я сэкономлю ваше время и приведу содержимое requirements.txt ниже для быстрой установки всего через pip install -r requirements.txt:

celery==5.0.2
redis==3.5.3
gevent==20.9.0

Создаем celeryconfig.py, где прописываем все, что надо для работы celery:

broker_url = 'redis://localhost:6379'task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Moscow'
enable_utc = True

Создаем tasks.py — тут и будем писать наш головной код. Начнем с малого. Создадим объект Celery, сконфигурируем его из прописанного файла с конфигом выше, добавим задачу на выполнение в расписание раз в 30 секунд, создадим саму задачу. Задача у нас будет простая — будем входить в бесконечный цикл и писать в файлик что-то раз в секунду. Не удивляйтесь такой странной задаче, она нужна для максимально наглядности тем нашей статьи.

import os
import threading
from datetime import timedelta
from time import sleep
from celery import Celery
app = Celery('tasks')
app.config_from_object('celeryconfig')
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.demo',
'schedule': timedelta(minutes=1),
},
}
def writer(arg):
pid = os.getpid()
while True:
f = open('beat.log', 'a')
f.write(f'pid #{pid}, thread #{threading.get_ident()} - {arg} - beat ...\n')
f.close()
sleep(1)
@app.task()
def demo():
my_list = [1, 2, 3, 4, 5]
for item in my_list:
writer(item)

Как видно, в задаче мы добавили список из пяти элементов, чтобы сэмулировать ситуацию, когда нам надо выполнить долгую задачу не для одного элемента, а для множества. В файлик мы пишем помимо дефолтного beat-а еще id текущего запущенного процесса и id потока. Запускаем и смотрим, как оно все работает. Для запуска в терминале стартуем непосредственно celery worker-а, а во втором терминале стартуем beat. Для worker-а команды будет выглядеть так: celery -A tasks worker -l INFO -P gevent. Для beat-а: celery -A tasks beat -l DEBUG. Если надо очистить очередь, то: celery -A tasks purge.

Дожидаемся, когда бит пошлет задачу на выполнение, дожидаемся, когда worker ее получит и начнет исполнять. Открываем beat.log и смотрим как каждую секунду там появляется нечто такое:

pid #5604, thread #2636447535688 - 1 - beat ...
pid #5604, thread #2636447535688 - 1 - beat ...
pid #5604, thread #2636447535688 - 1 - beat ...
pid #5604, thread #2636447535688 - 1 - beat ...
pid #5604, thread #2636447535688 - 1 - beat …

Как видим, у нас взялся наш первый объект из списка для обработки (единичка), все остальные ждут, пока обработка завершится. У нас используется один процесс и один поток. Завершим задачу через Ctrl+C в терминале с celery worker, этим действием мы убиваем процесс worker-а и его основной поток.

Наша основная задача — начать обработку объектов параллельно друг другу. Добавляем работу с ThreadPoolExecutor-ом. Для этого пишем импорт: import concurrent.futures. И меняем код задачи:

@app.task()
def demo():
my_list = [1, 2, 3, 4, 5]
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.map(writer, my_list)

Тут мы говорим, что хотим запустить два параллельных потока, которые будут обрабатывать список с нашими объектами. Запускаем и смотрим, что происходит. Открыв файл beat.log мы видим следующую картину:

pid #14888, thread #2651248497480 - 1 - beat ...
pid #14888, thread #2651248497736 - 2 - beat ...
pid #14888, thread #2651248497480 - 1 - beat ...
pid #14888, thread #2651248497736 - 2 - beat ...
pid #14888, thread #2651248497480 - 1 - beat ...
pid #14888, thread #2651248497736 - 2 - beat ...
pid #14888, thread #2651248497480 - 1 - beat ...
pid #14888, thread #2651248497736 - 2 - beat …

Все как мы и хотели. В рамках процесса celery worker-а создалось два потока, которые начали обрабатывать первый и второй объект нашего списка. Отлично. Завершим процесс celery worker-а через Ctrl+C аналогично.

Что такое? Проблемы? Да, проблемы. Мы не можем этого сделать. Сколько бы мы не жали Ctrl+C, процесс не будет завершен, он будет “подвешен”. Мы можем наблюдать в терминале следующую картину:

worker: Hitting Ctrl+C again will terminate all running tasks!worker: Warm shutdown (MainProcess)worker: Cold shutdown (MainProcess)worker: Cold shutdown (MainProcess)

Я трижды нажал Ctrl+C, но ничего не происходит. Процесс как работал, так и продолжает работать, а строчки в beat.log как писались, так и продолжают писаться. И это не баг, это фича. Почему? Разберем ниже. А пока вам придется руками убить процесс Python, чтобы прекратить спам в файл.

Сейчас важно запомнить один момент: Python не будет завершать процесс, пока не закончил исполнение поток отличный от головного потока процесса. Именно поэтому в рамках нашей задачи, если бы мы прописали у задачи time_limit=N, где N — кол-во секунд, задача бы продолжала работать, процессы бы продолжали спамить в файл даже после выхода за максимально обозначенное время работы задачи в time_limit. Давайте разбираться в особенностях этого поведения.

Celery Execution Pools

Довольно не тривиальной для понимания темой является тема Celery Execution Pools. Вроде, всем понятно, что такое Celery, что такое Celery Worker и Celery Beat. Но вот что за Pool-ы? Что такое gevent, для чего писать -P gevent, почему можно не писать это в команде и почему иногда ничего не работает, если не указать pool?

Когда мы запускаем celery worker-а через терминал, как это делали выше, все, что мы делаем — запускаем supervisor процесс. Сам по себе Celery worker не обрабатывает никаких задач вообще. Зато данный процесс создает дочерние процессы (или потоки) , которые как раз и занимаются поставленными задачами. Так вот, эти самые дочерние процессы (или потоки) как раз и известны под именем execution pool.

Размер execution pool-а определяет количество задач, которое может обработать Celery worker процесс. Чем больше процессов (или потоков) worker может создать, тем больше задач он сможет обработать одновременно. Celery поддерживает работу с четырьмя типами Execution pool-ов:

  • prefork
  • solo
  • eventlet
  • Gevent

Имплементация prefork pool-а основывается на мультипроцессовый пакете Python. Данный тип pool-а позволяет Celery worker-у обойти Global Interpreter Lock и использовать в полную силу несколько процессоров на машине. Как нетрудно догадаться, данный pool логично использовать, когда задачи содержат сложные вычисления, привязанные к мощности CPU. Т.е. когда скорость выполнение задач может быть увеличена только за счет CPU. Количество ядер процессора физически лимитирует количество одновременно запущенных процессов. Если мы при запуске celery worker-а не укажем pool как это делали выше с gevent, то автоматически будет выбран prefork pool для работы. Это следует держать в уме всегда.

Solo pool — это особенный pool, потому что он не относится ни к процессорным, ни к потоковым pool-ам. Если уж говорить совсем точно, то это вовсе не pool, потому что всегда выполняется одна задача. Более того, Solo pool противоречит головному принципу Celery, что процесс Celery worker-а не выполняет сам никаких задач (говорили об этом выше). Solo pool исполняется внутри worker процесса. Solo pool очень быстр, но он полностью блокирует worker-а на время выполнения задачи. Для старта worker-а в режиме Solo pool-а необходимо написать параметр — pool=solo. Использовать solo pool целесообразно, когда необходимо выполнять некоторые высоконагружаемые CPU задачи в окружении микросервиса. В Docker Swarm или Kubernetes бывает намного легче управлять общим количество worker-ов, чем размером execution pool-ов.

Eventlet и gevent pool-ы — самые популярные типы execution pool-ов, когда речь заходит о многопоточности. Т.е. когда выполняемые задачи тратят основное время на Input / Output. В данных имплементациях pool-ы исполняются внутри процесса Celery worker-а. И, если быть точными, то данные pool-ы используют не потоки, а так называемые greenlet-ы. Greenlet — это green thread (зеленый поток), некое объединение потока и корутина, на выходе дающее по сути поток, но без использования стандартных потоков. Обычные потоки управляются ядром ОС, которое использует свой дефолтный планировщик для переключения между потоками. Вот этот самый планировщик может иногда быть не особо эффективным. Greenlet-ы же эмулируют полностью мультипоточность, но не полагаясь не на какие инструменты управления от ядра ОС. Их управление происходит непосредственно внутри приложения, а не в ядре ОС. Никаких планировщиков для переключения между greenlet-ами не существует, вместо этого они сами передают друг другу управление в соответствии с написанным вами кодом. Самым большим плюсом использования gevent или eventlet pool-ов — это то, что Celery worker может делать много больше работы, чем он мог раньше. Для того, чтобы использовать данные Pool-ы необходимо указать параметр — pool=gevent или — pool=eventlet. Не забываем также указывать количество количество потоков, которое мы разрешим нашему worker-у стартовать. Например: celery -A my_project worker — pool=gevent — concurrency=500 Теперь, зная разницу между всеми доступными execution pool-ами, можно выбирать, какой именно pool использоваться в зависимости от задач, которые предстоит решать Celery Worker-у. Есть лишь один финальный важный вопрос — сколько процессов/потоков разрешить стартовать? Для этого как раз используется выше использованный параметр — concurrency. Для prefork pool-а количество процессов не должно превышать количество физических ядер процессора. Если параметр — concurrency не указан в команде, то Celery по умолчанию установит его равным количеству ядер процессора, какой бы pool вы не выбрали. Поэтому, опускать данный параметр, когда речь идет о потоковых pool-ах — очень глупо, потому что потоки не имеют прямого отношения к ядрам процессора.

Убийство Python потока

Теперь вернемся еще раз к не совсем тривиальному поведению, при попытке убить процесс с запущенными дочерним потоком. На самом деле мы не смогли завершить наш процесс из-за того, что дочерний поток (потоки) продолжал работать (писать в файл). Python, при завершении процесса, запускает определенную логику, которая запускается непосредственно перед завершение процесса и все, что она делает — это ожидает завершения работы всех дочерних потоков (если только это не daemon потоки). Поэтому, когда мы попытались убить процесс (нажав Ctrl +C или по Hard time limit-у задачи), процесс получил нужный сигнал, был готов покончить с собой, но тут запустился механизм ожидания завершения работы всех дочерних потоков. Поэтому то мы и не смогли его завершить, не прибегая к уничтожению процесса средствами ядра ОС.

Следует запомнить одну вещь: поток невозможно убить. И это правильное поведение. Т.е. Python, как язык, не предоставляет никаких API для убийства потоков. Большинство опытных разработчиков в релевантных обсуждениях в интернете сходятся во мнении, что внезапное убийство потока является очень плохим паттерном как в Python, так и вообще в любом языке, потому что:

  • поток может работать с критически важными ресурсами, работа с которыми должна быть завершена корректно, а не внезапно;
  • поток может породить ряд дочерних потоков, которые тоже должны быть завершены.

Таким образом, возвращаясь к нашей задаче, мы либо не используем hard time limit-ы в Celery задаче, а обходимся какими-то путями через обновление задачи в процессе работы потока. Либо вообще переписываем код без использования ThreadPoolExecutor-а, а путем реализации большего количества worker-ов или большего размера execution pool-а.

--

--

Alexander Podrabinovich

Web developer with 16+ years of exp. Stack: Python, Django, Flask, FastAPI, Celery, Redis, PostgreSQL, Docker. webdevre@gmail.com https://github.com/UNREALre