Celery e Tarefas assíncronas em Python

O Celery é um aplicação, escrita em Python, que permite que você delegue tarefas a diversos workers, localizados em outros servidores.

O Celery é uma forma bastante de realizar estes jobs ou tarefas de forma assíncrona, sem ter de quebrar muito a cabeça com conceitos complicados e difíceis de serem acertados.

Primeiramente, vou explicar como o Celery funciona, em termos de arquitetura.

Temos alguns componentes principais envolvidos:

  1. Aplicação - este carinha aqui é a sua aplicação normal, a qual o usuário ou outra máquina interage;
  2. Broker - este componente é uma fila de mensagens, utilizada para transportar mensagens entre os processos que estamos descrevendo aqui. O Celery permite o uso de diversos brokers, como redis e RabbitMQ. Usamos o redis por aqui, mas é uma preferência;
  3. Worker - este é o componente que trabalha de verdade. Qualquer tarefa agendada pela aplicação no broker, será executada pelo worker;
  4. Beat - é um componente opcional, mas funciona como um agendador de tarefas, que dispara de tempos em tempos as tarefas para o broker, como uma aplicação;

Com esta arquitetura, é bastante fácil escalar a coisa toda para funcionar em várias máquinas, bastando adicionar mais workers conforme a frequência das tarefas aumentam. O Celery é muito customizável e tem opções importantes, como controle de filas (algumas tarefas só podem ser processadas uma a uma, em uma única fila, sem concorrência alguma, enquanto outras podem ser paralelizadas), aplicaçoes de monitoramento (como o Flower) e extensões de biblioteca para os desenvolvedores (como o Jobtastic).

Vamos hoje dar um exemplo bastante simples, integrado a um projeto Django. Não vamos entrar em muitos detalhes de como subir os workers ou o broker, pois vai depender muito do seu ambiente.

Imagine que sua aplicação manda emails. Quase todas as aplicações (web) mandam emails, mas algumas mandam mais emails que outras e com maior frequência. Entre o ponto em que uma requisição feita pelo usuário é processada e uma resposta é retorna, vários segundos podem se passar e não queremos deixar o usuário esperando. Então decidimos delegar o envio do email, que é mais demorado, para uma tarefa assícrona no Celery.

Como fazer isto?

Bem, primeiramente é necessário configurar sua aplicação Celery. Fazemos (aqui na SIGMA) por meio de um arquivo celery.py, localizado junto wsgi.py do projeto. Ele é mais ou menos assim:

# coding: utf-8
"""Configuração inicial do Celery"""
from __future__ import absolute_import
import os
from django.conf import settings

if not "DJANGO_SETTINGS_MODULE" in os.environ:
    os.environ["DJANGO_SETTINGS_MODULE"] = "settings.local"  # aqui na SIGMA usamos settings separados para cada ambiente

from celery import Celery
app = Celery('nome-da-app')

app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

Com esta configuração batuta, podemos iniciar um worker, por exemplo. Além disso, usamos o autodiscover do Celery, que percorrerá todas as INSTALLED_APPS e cadastrará todas as tarefas dentro dos módulos tasks.py.

Ok, com o Celery configurado, podemos startar um worker.

celery -A projeto worker -l info

Agora, vamos criar nossa task e chamar ela de dentro de uma view qualquer. Crie um arquivo chamado tasks.pyem uma de suas apps, por exemplo, sistema.

Para definir a task, utilizaremos um decorador:

from projeto.celery import app  # lembre do nosso arquivo celery.py?


@app.task()
 def email_async(email, assunto, template, contexto):
    template_html = get_template(template)
    template_txt = get_template(template.replace('.html', '.txt'))
    if isinstance(contexto, dict):
        contexto = Context(contexto)

    contexto['email_header_image'] = get_random_image()
    html = template_html.render(contexto)
    txt = template_txt.render(contexto)
    if isinstance(email, basestring):
        email =[email]

    send_mail(assunto,
              txt,
              'remetente',
              recipient_list=email,
              html_message=html)

Com a tarefa definida, podemos invocá-la de forma assícrona. O Celery tem muitas formas de se invocar uma tarefa. Existe um negócio chamado canvas que permite você combinar as tarefas de forma assícrona, paralelas, etc. Confira a documentação do canvas.

Vá em uma de suas views que enviam emails:

from .tasks import email_async

def view_qualquer(request):

    # faça alguma coisa útil

    email_async.delay('novo email', 'sistema/email.html', {'variavel_contexo_a': 'foo', 'b': 'novo email maroto'})

O segredo da chamada assícrona está no método delay, que irá usar o broker para agendar a tarefa e futuramente será executada por um worker.

É um resumo muito curto para mostrar todas as capacidades do Celery, mas é um projeto muito bem mantido, utilizado por muita gente.

O exemplo que demos, apesar de parecer bobo, é bastante significativo quando estamos falando de uma aplicação que envia centenas ou milhares de emails por dia, diminuindo muito o impacto nos servidores de aplicação e permitindo que você lide com uma carga bem mais alta.

Iremos fazer uma pequena série sobre o Celery. Tem jeito de consultar o status da tarefa, agendar tarefas periódicas, controlar exceções e um monte de coisa legal.

Fique ligado e obrigado!