Complete DevOps Bootcamp: Master DevOps in 12 Weeks
FastAPIFiles, Forms and Background Tasks

Building Small Async Workflows

There comes a point in most projects where BackgroundTasks stops being enough. The work needs to survive a restart. It needs to retry. It needs to run on a schedule. It needs to scale separately from the web servers. This page is about that next step - not a full Celery deep-dive, but enough to pick a sensible tool and wire up a small, honest workflow without blowing the project's complexity budget.

A short tour of options

            simpler  ─────────────────────────────►  more capable
            ────────────────────────────────────────────────────
            BackgroundTasks → Arq / RQ → Celery → Temporal / Airflow

There's a real ladder here. Each rung adds capability but also moves, install, and run complexity. Climb only as far as you need.

ToolSweet spotSkip if...
BackgroundTasksFire-and-forget, in-process, loss-tolerantYou need retries or restart-survival
ArqAsync, Redis-backed, simpleYou want a huge ecosystem
RQSync, Redis-backed, very smallYou need async tasks or scheduling
CeleryThe mainstream choice, many featuresThe complexity is overkill for your scale
Temporal / AirflowMulti-step orchestrationYou don't have multi-day workflows

For FastAPI specifically, Arq tends to be the most natural fit - it's async, the task code looks like FastAPI handler code, and Redis is the only extra moving part.

A working Arq example

pip install arq

Tasks are async functions:

# worker.py
from arq.connections import RedisSettings

REDIS_SETTINGS = RedisSettings(host="localhost", port=6379)

async def send_welcome_email(ctx, to: str, name: str) -> None:
    # ctx is a dict - Arq passes shared resources here
    await ctx["mailer"].send(to=to, subject="Welcome", body=f"Hi {name}")

async def startup(ctx):
    ctx["mailer"] = await connect_mailer()

async def shutdown(ctx):
    await ctx["mailer"].close()

class WorkerSettings:
    redis_settings = REDIS_SETTINGS
    functions = [send_welcome_email]
    on_startup = startup
    on_shutdown = shutdown

Run the worker as a separate process:

arq worker.WorkerSettings

In the FastAPI app, enqueue:

from arq import create_pool
from arq.connections import RedisSettings

REDIS_SETTINGS = RedisSettings(host="localhost", port=6379)

@app.on_event("startup")
async def setup_queue():
    app.state.queue = await create_pool(REDIS_SETTINGS)

@app.post("/signup")
async def signup(payload: UserCreate):
    user = create_user(payload)
    await app.state.queue.enqueue_job(
        "send_welcome_email",
        to=user.email,
        name=user.name,
    )
    return user

That's it. The route returns instantly. The worker picks up the job. If the worker crashes mid-job, the job is retried.

What you gain from a real queue

Things that were impossible with BackgroundTasks and become free:

  • Retries with backoff: arq retries failed jobs with exponential backoff out of the box.
  • Scheduled jobs: _defer_by=timedelta(hours=24) runs it tomorrow.
  • Cron-like recurring jobs: cron_jobs=[cron("send_digest", hour=9)] in the worker settings.
  • Process isolation: A bad task kills its worker, not your web tier.
  • Independent scaling: Web pods serve requests, worker pods do work. Scale them differently based on what's hot.
  • Visibility: You can list pending jobs, requeue failed ones, etc.

The idempotency rule

The moment a job can retry, you have to think about idempotency: "is it safe for this to run twice?"

Examples of unsafe-but-easy-to-fix:

# Unsafe: a retry sends two emails
async def charge_customer(ctx, user_id: int, amount: int):
    await stripe.charges.create(amount=amount, customer=user_id)

# Safer: pass an idempotency key Stripe will honor
async def charge_customer(ctx, user_id: int, amount: int, charge_id: str):
    await stripe.charges.create(
        amount=amount,
        customer=user_id,
        idempotency_key=charge_id,
    )

The same goes for sending notifications, awarding points, updating counters. If the task isn't naturally idempotent, give it a key the underlying system can dedupe on, or check "has this already been done?" at the start of the job.

A small worked workflow

Suppose a user uploads a document. The processing has three steps: extract text, summarize with an LLM, email the user with the summary.

async def process_document(ctx, document_id: int):
    db = ctx["db_factory"]()
    doc = db.get(Document, document_id)
    if doc.processed_at is not None:
        return                                           # already done

    text = extract_text(doc.storage_key)
    summary = await summarize_text(text)

    doc.text = text
    doc.summary = summary
    doc.processed_at = datetime.utcnow()
    db.commit()

    await ctx["queue"].enqueue_job(
        "email_user",
        user_id=doc.user_id,
        document_id=doc.id,
    )

async def email_user(ctx, user_id: int, document_id: int):
    db = ctx["db_factory"]()
    user = db.get(User, user_id)
    doc = db.get(Document, document_id)
    await ctx["mailer"].send(
        to=user.email,
        subject=f"Your document '{doc.original_name}' is ready",
        body=render("doc_ready.html", user=user, doc=doc),
    )

Notes worth noticing:

  • Each job re-fetches its own state. They don't pass objects between them, only IDs. Objects don't serialize well across processes.
  • process_document enqueues email_user. Workflows are just jobs that enqueue other jobs.
  • The early if doc.processed_at: return makes the job idempotent - if it gets retried after partly succeeding, the second run becomes a no-op.

Scheduling: the cron use case

A daily digest, a weekly cleanup, an hourly cache warm. Use the queue's built-in scheduler rather than inventing one with asyncio.sleep.

In Arq:

from arq.cron import cron

class WorkerSettings:
    redis_settings = REDIS_SETTINGS
    functions = [send_digest, cleanup_temp_uploads]
    cron_jobs = [
        cron("send_digest", hour=9, minute=0),
        cron("cleanup_temp_uploads", hour=3, minute=30),
    ]

The worker becomes both an executor of one-off jobs and a tiny cron daemon. One process to deploy, one set of logs to look at.

For more sophisticated needs, APScheduler plugs into FastAPI well and supports interval / cron / one-off schedules without Redis.

Observability is non-optional

Once you have async jobs, the most common new failure mode is "the job failed silently and nobody noticed." Two habits:

  • Log every job start and end with the job name, arguments, and elapsed time. Tie it back to the originating request id if you can.
  • Wire failed jobs into your alerting. Arq publishes job failures; if you're using Prometheus, scrape the metrics and alert on a non-zero failure rate.
async def with_logging(ctx, name, **kwargs):
    log.info("job start", extra={"job": name, "args": kwargs})
    try:
        # ...do work...
        log.info("job done", extra={"job": name})
    except Exception:
        log.exception("job failed", extra={"job": name, "args": kwargs})
        raise

A queue you can't see into is worse than no queue at all, because it gives a false sense of reliability.

When you've outgrown Arq

You probably haven't, yet. But the markers:

  • You need workflows with conditional branches and human-in-the-loop steps → look at Temporal.
  • You need to run heterogenous tasks across many languages → look at Celery (broader ecosystem) or NATS-based systems.
  • You're orchestrating data pipelines on schedules with dependencies → look at Airflow / Dagster / Prefect.

Reaching for any of those before you need them costs more time and complexity than the value they add. The general direction of travel for most apps is: nothing → BackgroundTasks → Arq → only-then-consider-something-bigger.

Pulling the section together

Across the six pages of this section we've built up a complete picture of how data and work enter and leave a FastAPI app:

  • Forms for the old reliable way of submitting structured fields.
  • File uploads for binary payloads, with the validation and storage habits that keep them safe.
  • Static file serving and downloads for handing those payloads back out.
  • Email and notifications for the most common out-of-band side effects.
  • BackgroundTasks for short, best-effort work the request shouldn't wait on.
  • Async workflows for the work that must survive, retry, and scale.

The thread tying it all together: a request should do as little as it can get away with, return as fast as it honestly can, and leave the slow or unreliable parts to the right tier of background machinery. That single principle is what separates an API that survives growth from one that buckles.

How is this guide?

Last updated on