Introduction to Celery
Celery is an open-source distributed task queue system built in Python. It is widely used in production to handle asynchronous task processing, background executions, and periodic tasks. Celery supports multiple message brokers like RabbitMQ, Redis, and even SQL-based solutions, making it flexible and easy to integrate into various projects.
At its core, Celery allows you to offload time-consuming tasks to worker processes, ensuring that your main application remains responsive and scalable. Whether you’re building a web app, an API, or a data pipeline, Celery can supercharge your application by handling tasks asynchronously.
Key Features and APIs
Basic Task Definition
A Celery task is simply a Python function annotated with a @task
decorator. Here is an example:
from celery import Celery
app = Celery('my_app', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
With this task, you can asynchronously calculate the sum of two numbers.
Task Execution
To execute a task, use the delay()
method:
result = add.delay(4, 6)
print(f'Task ID: {result.id}')
This immediately queues the task for execution and returns a task result object, allowing you to track task progress or fetch the result.
Getting Task Results
Once the task completes, you can retrieve its result using the get()
method:
if result.ready():
print(f'Result: {result.get()}') # Output: Result: 10
Or, you can wait for the task to finish if needed:
result = add.delay(15, 25)
print(f'Waiting for task result: {result.wait()}') # Output: Waiting for task result: 40
Periodic Tasks (Scheduled Tasks)
Celery supports scheduling tasks at regular intervals using celery-beat
. Here is an example of defining a periodic task:
from celery import Celery
from celery.schedules import crontab
app = Celery('my_app', broker='redis://localhost:6379/0')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(
crontab(minute=0, hour=0), # Executes every day at midnight
send_report.s('Daily Summary Report'),
)
@app.task
def send_report(report_name):
print(f'Sending report: {report_name}')
Task Retries
Celery allows automatic retries for failing tasks:
from celery import Celery
from celery.exceptions import Retry
app = Celery('my_app', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3)
def fetch_data(self, url):
try:
response = requests.get(url)
response.raise_for_status() # Raise exception for HTTP errors
return response.json()
except Exception as exc:
raise self.retry(exc=exc, countdown=5) # Retry after a 5-second delay
Chord: Combining Multiple Tasks
Celery provides a chord
primitive to execute a group of tasks and collect their results:
from celery import group, chord
@app.task
def process_data_chunk(chunk_id):
return f"Processed chunk {chunk_id}"
@app.task
def summarize_results(results):
return f"Summary: {results}"
chunks = [process_data_chunk.s(i) for i in range(10)]
workflow = chord(group(chunks), summarize_results.s())
result = workflow.delay()
print(result.get())
Example: Real-World Application with Celery
Imagine you are building an image processing app where users can upload images to be processed (e.g., resizing, thumbnail generation, or filtering). Here’s how you could use Celery:
Task Definition
from celery import Celery
from PIL import Image
import os
app = Celery('image_processor', broker='redis://localhost:6379/0')
@app.task
def resize_image(image_path, width, height):
with Image.open(image_path) as img:
resized = img.resize((width, height))
output_path = os.path.splitext(image_path)[0] + '_resized.jpg'
resized.save(output_path)
return f"Image saved to {output_path}"
Calling the Task
image_file = './uploaded_image.jpg'
result = resize_image.delay(image_file, 800, 600)
print(f"Task queued with ID: {result.id}")
print(f"Processing result: {result.get()}")
Scaling with Multiple Workers
To make the system scalable, you can run multiple Celery worker processes:
celery -A image_processor worker --loglevel=info --concurrency=4
Conclusion
Celery is a powerhouse for asynchronous task processing, allowing modern applications to scale effectively. Whether you need to offload complex computations, manage background jobs, or schedule periodic tasks, Celery has you covered. Start integrating it into your Python projects today and unlock phenomenal performance gains.