Back to Blog

Using Async SQLAlchemy Inside Sync Celery Tasks

March 23, 2026
6 min read
By Kevin
Using Async SQLAlchemy Inside Sync Celery Tasks
TL;DR

Using async SQLAlchemy inside sync Celery tasks is a common challenge for developers because of sync nature of Celery and async nature of SQLAlchemy. This post breaks down why it's tricky and how to do it right.

Introduction

Let me be honest. I've built this beautiful, modern web application using FastAPI and Async SQLAlchemy. Everything is blazing fast and non-blocking. Then, I needed to handle background jobs - sending emails. So, I reached for Celery.

I tried to run my trusty async database queries inside a Celery task, and suddenly, things got weird. I saw errors about event loops, or the task just hangs. Why is this so hard?

If you’ve been there, you’re in the right place. Let’s break down why Celery and async SQLAlchemy don’t play nicely out of the box and, more importantly, how to make them work together without losing your sanity.


The Core Problem: Sync vs. Async Worlds

How Sync SQLAlchemy Works

In a synchronous environment, database operations are blocking. When you execute a query using standard SQLAlchemy:

  1. The driver sends a request to the database server via a TCP socket.
  2. The execution thread is blocked at the system level, waiting for data to be available on that socket.
  3. The CPU cannot perform any other work on that thread until the database response is fully received and parsed.

In Celery, this means a worker process or thread is completely occupied during the entire I/O wait. If you have many concurrent I/O-bound tasks, you quickly saturate your worker pool, leading to significant latency and resource waste.

What is Celery?

In one sentence: Celery is a distributed task queue that processes tasks synchronously (by default) using either multiprocessing, gevent, or threading.

By default, if you give Celery an async function, it won’t know what to do with it. It expects a regular, synchronous function.


Async SQLAlchemy to the Rescue (But Wait...)

Async SQLAlchemy solves this by leveraging non-blocking I/O. Instead of blocking the thread, it yields control back to the event loop while waiting for the database response.

## This is the dream - non-blocking database call
async def get_user(user_id):
    async with AsyncSession() as session:
        result = await session.execute(select(User).where(User.id == user_id))
        return result.scalar_one()

The issue? Celery is synchronous by nature. You can’t just put an await inside a Celery task function because Celery doesn’t run an event loop.

The Naïve Attempt (And Why It Fails)

You might think, "I’ll just use asyncio.run() to run my async code inside the sync Celery task!"

import asyncio
from celery import shared_task
 
@shared_task
def process_user_task(user_id):
    # This seems clever, but it's a trap!
    user = asyncio.run(get_user(user_id))
    return user

This works for one task, but it will eventually crash your database connection pool. Here’s why:

  1. asyncio.run() creates a brand new event loop to run your async function.
  2. Your async function opens a database connection from the pool.
  3. The function finishes, and asyncio.run() destroys the entire event loop.
  4. The database connection was tied to that destroyed loop. When SQLAlchemy tries to return that connection to the pool, the connection is essentially "dead" or stuck in a closed loop.

After a few tasks, your connection pool becomes corrupted, and you start seeing errors like

sqlalchemy.exc.TimeoutError: QueuePool limit of size ... overflow ... reached

The Solution: No Pool + async_to_sync

To fix this, we need to change two things about how we handle database sessions for background tasks:

  1. Don't use a connection pool. Since each task will run in its own isolated event loop (created and destroyed), we should open a fresh connection for the task and close it when the task finishes. No pooling, no cross-loop contamination.
  2. Use async_to_sync to bridge the gap. This utility (from asgiref) allows us to run async code from a sync context without manually managing the event loop's lifecycle as poorly as asyncio.run() does.

Step 1: Configure a "No Pool" Session

In your database configuration, create a specific session maker for your Celery tasks that uses NullPool. This ensures that connections are closed when the session is closed, not kept alive.

## db.py
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.pool import NullPool
 
## Your regular async engine (for FastAPI, etc.)
## This likely uses a pool (like AsyncAdaptedQueuePool)
main_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=20,
)
 
## Celery-specific engine with NO POOL
celery_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    poolclass=NullPool,  # <-- This is the key!
)
 
## Session makers
AsyncMainSession = async_sessionmaker(main_engine, expire_on_commit=False)
AsyncCelerySession = async_sessionmaker(celery_engine, expire_on_commit=False)

Step 2: Use async_to_sync from asgiref

Instead of creating a custom decorator, we can use the async_to_sync utility from the asgiref library. This is the industry-standard way to bridge sync and async code, handling the complexities of managing a per-thread event loop.

Step 3: Call Your Async Logic Directly

Write your core database logic as a standard async function. Then, in your sync Celery task, call it directly using async_to_sync. This keeps the Celery task minimal and makes the bridge explicit.

from celery import shared_task
from asgiref.sync import async_to_sync
from db import AsyncCelerySession
from sqlalchemy import select
from models import Article
 
 
async def _run():
    async with AsyncCelerySession() as session:
        ## Perform non-blocking database queries
        result = await session.execute(select(Article).where(Article.status == "pending"))
        articles = result.scalars().all()
 
        return len(articles)
 
@celery_app.task(name="app.workers.run_article_pipeline", acks_late=True)
def run_article_pipeline_task():
    # Direct bridging without a decorator
    return async_to_sync(_run)()

How This Works Internally

Let's visualize the flow:

  1. Celery Worker starts. It loads the run_article_pipeline_task function as a standard synchronous function.
  2. Task Executes: The function body calls async_to_sync(_run)().
  3. Event Loop: async_to_sync manages the lifecycle of the event loop for the current thread and executes the _run coroutine.
  4. Database Connection: Inside _run, AsyncCelerySession() creates a new connection using NullPool. This avoids cross-loop connection contamination by opening a fresh TCP connection.
  5. The Wait: The event loop handles the "await" points, allowing for efficient I/O management even within the sync Celery worker.
  6. Cleanup: Once the async with block closes the session, NullPool ensures the socket is physically closed. async_to_sync then clears the loop state.

No dead connections, no pool corruption, and you get to use your beautiful async SQLAlchemy code.

A Note on Performance

Using NullPool and creating a new connection for every task adds a small overhead (TCP handshake, authentication). For background tasks that run infrequently (like once every few seconds), this is perfectly fine.

If you are running thousands of tasks per second, you might need a different architecture (like moving to a fully async task queue, or using gevent workers with a sync SQLAlchemy pool). But for 90% of use cases, this "No Pool + async_to_sync" pattern is the cleanest and most reliable way to reuse your async code in a sync Celery environment.


For more blog about backend: backendprep.com/blog

To Practice Backend Interview Questions: backendprep.com

Ready to level up?

Join hundreds of engineers mastering high-stakes system design through real-world simulations.

Become a Senior Backend Engineer