Blog Post Header Image

> Schedule background tasks in FastAPI with Python

Wie man die Scheduler-Library mit FastAPI integriert

10 min read
Last updated:

In diesem Blogpost zeigen wir, wie man unsere Open-Source Scheduler-Library in eine FastAPI-Anwendung integriert. Wir demonstrieren diese Integration im Kontext einer hypothetischen SaaS-Anwendung namens reminder_ai - einer Plattform fĂĽr geplante AI-gesteuerte WhatsApp-Nachrichten.

Ein weiterer Scheduler fĂĽr Python?

2021 begannen wir mit der Entwicklung einer Scheduler-Library, da uns die Benutzerfreundlichkeit bestehender Libraries nicht ausreichte. Wir suchten ein Tool, das einfach, flexibel und mit unseren Best Practices ĂĽbereinstimmte. Die Scheduler-Library ist das Ergebnis dieser BemĂĽhungen und hat sich seitdem als wertvoller Bestandteil unseres Toolsets erwiesen.

Wir haben die Scheduler-Library in verschiedenen Projekten genutzt, darunter für das Planen von Aufgaben für eingehende Webhook-Events von der Stripe API, das tägliche Aggregieren und Aktualisieren von Daten und das Management von Datensammlungen für maschinelles Lernen.

Ein praktischer FastAPI Use Case: Reminder AI

Stellen wir uns eine SaaS-Plattform namens Reminder AI vor, die Benutzern hilft, AI-gesteuerte Erinnerungsnachrichten für WhatsApp-Gruppen und Kontakte zu planen. Jeder Benutzer hat eine tägliche Kontingentbegrenzung, um die Kosten für ChatGPT zu kontrollieren. In diesem Szenario wird die Plattform genutzt, um Erinnerungsnachrichten für ein bevorstehendes Ereignis zu senden. Wir behandeln folgende Funktionen:

  • Planen eines Health-Checks der Plattform
  • Planen der täglichen KontingentzurĂĽcksetzung
  • Planen von wöchentlichen Erinnerungsnachrichten fĂĽr eine WhatsApp-Gruppe
  • Implementierung einer „Remind me“-Funktion, die es Benutzern ermöglicht, eine Einzelnachricht zu einem bestimmten Zeitpunkt zu planen

Dependencies installieren

Wir gehen davon aus, dass du uv auf deinem System installiert hast, um Environments und Pakete zu verwalten. Du kannst aber auch pip oder einen anderen Paketmanager verwenden.

uv init reminder_ai_project
cd reminder_ai_project
uv add fastapi scheduler uvicorn
uv pip install -e .

Du solltest nun ein Verzeichnis mit folgenden Dateien haben:

tree --gitignore
.
├── main.py
├── pyproject.toml
├── README.md
└── uv.lock

Aufbau der FastAPI-Anwendung

Erstellen wir nun die komplette Dateistruktur, die für die FastAPI-Anwendung erforderlich ist. Beginne mit dem Erstellen der Ordner reminder_ai, api und v1 und füge die restlichen (zunächst leeren) Dateien hinzu:

tree --gitignore
.
├── pyproject.toml
├── README.md
├── reminder_ai
│   ├── api
│   │   ├── __init__.py
│   │   └── v1
│   │       ├── __init__.py
│   │       ├── reminders.py
│   │       └── status.py
│   ├── __init__.py
│   ├── __main__.py
│   ├── main.py
│   ├── py.typed
│   └── scheduler.py
└── uv.lock

Beginnen wir mit der Definition des Entrypoints unserer Anwendung. Hier starten wir die FastAPI-App mit dem ASGI Web-Server uvicorn:

# reminder_ai/__main__.py
import uvicorn


def main() -> None:
    uvicorn.run(
        "reminder_ai.main:app",
        host="127.0.0.1",
        port=8000,
        log_level="info",
        reload=True,
    )


if __name__ == "__main__":
    main()

Damit uvicorn den Entrypoint reminder_ai.main:app finden kann, mĂĽssen wir ihn in main.py einrichten. FĂĽge folgende Imports hinzu:

# reminder_ai/main.py
import asyncio
import datetime as dt
import random
import signal
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator

from fastapi import FastAPI

from reminder_ai.api import router_api
from reminder_ai.scheduler import UTC, Scheduler

Wir definieren nachher noch router_api, UTC und Scheduler. Zunächst aber implementieren wir eine Callback-Funktion, die genutzt wird, um den Scheduler zu stoppen, wenn die FastAPI-Anwendung beendet wird:

# reminder_ai/main.py
def stop_scheduler(*args: Any) -> None:
    Scheduler.stop_scheduler()

Nun erstellen wir die asynchronen Funktionen fĂĽr health check und KontingentzurĂĽcksetzung. Die folgende Implementierung simuliert lediglich etwas Arbeit durch einen delay, um reale Workloads abzubilden:

# reminder_ai/main.py
async def fake_health_check() -> None:
    """Simuliert eine Gesundheitsüberprüfung mit zufälligem Erfolg/Fehler."""
    await asyncio.sleep(0.1)  # simuliert etwas Arbeit
    status = "ok" if random.random() < 0.8 else "bad"
    print(f"[{dt.datetime.now()}] Plattform-GesundheitsĂĽberprĂĽfung: {status}")


async def fake_reset_daily_token_quota() -> None:
    """Simuliert eine tägliche Token-Kontingentzurücksetzung."""
    print(f"[{dt.datetime.now()}] Starte die tägliche Token-Kontingentzurücksetzung für alle Benutzer")
    await asyncio.sleep(0.5)  # simuliert etwas Arbeit
    print(f"[{dt.datetime.now()}] Erfolgreich zurĂĽckgesetzt")

Die genannten Funktionen können nun innerhalb des Lifespans von FastAPI geplant werden.

# reminder_ai/main.py
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
    Scheduler.start_scheduler()
    signal.signal(signal.SIGINT, stop_scheduler)

    Scheduler.schedule.cyclic(dt.timedelta(seconds=30), fake_health_check)
    Scheduler.schedule.daily(
        dt.time(hour=0, minute=0, second=0, tzinfo=UTC), fake_reset_daily_token_quota
    )
    yield


app = FastAPI(
    title="Reminder AI",
    lifespan=lifespan,
)

app.include_router(router=router_api, prefix="/api")

Mit signal.signal haben wir nun die stop_scheduler-Funktion registriert, um auf den IPC-Signal SIGINT zu reagieren. Dies unterbricht die AusfĂĽhrung geplanter Jobs, z.B. bei DrĂĽcken von <STRG> + <C>.

Wir nutzen Scheduler.start_scheduler, wobei Scheduler ein Singleton ist, das die tatsächliche scheduler.asyncio.Scheduler-Instanz enthält, die wir schedule genannt haben, um die Jobverarbeitung zu starten.

Die Funktion Scheduler.schedule.cyclic ermöglicht es, eine Aufgabe wiederholt in einem bestimmten Zeitintervall auszuführen, z.B. alle 30 Sekunden. In unserem Beispiel wird sie für den simulierten Health Check genutzt, um sicherzustellen, dass das System weiterhin reagiert. Scheduler.schedule.daily plant eine tägliche Aufgabe, zu einer bestimmten Uhrzeit, z.B. Mitternacht, was ideal für die Zurücksetzung der täglichen Kontingente ist.

Schauen wir uns nun an, wie wir das Scheduler-Singleton in scheduler.py implementieren können:

# reminder_ai/scheduler.py
import asyncio
import datetime as dt

from scheduler.asyncio import Scheduler as AioScheduler

UTC = dt.timezone.utc


class Scheduler:
    is_running_event = asyncio.Event()
    schedule: AioScheduler

    @classmethod
    def start_scheduler(cls) -> None:
        cls.schedule = AioScheduler(tzinfo=UTC)
        cls.is_running_event.set()

    @classmethod
    def stop_scheduler(cls) -> None:
        cls.is_running_event.clear()

    @classmethod
    def is_running(cls) -> bool:
        return cls.is_running_event.is_set()

Beachte die is_running-Methode, die eine sichere Möglichkeit bietet, den Zustand des Schedulers über ein asyncio.Event zu verfolgen. Dies kann in asynchronen Schleifen genutzt werden, um die Ausführung zu kontrollieren und eine gesicherte Beendigung zu ermöglichen.

Definition der Reminders-API

Erledigen wir die erforderlichen Imports und definieren wir die reminders-Route:

# reminder_ai/api/v1/reminders.py
import asyncio
import datetime as dt
from uuid import UUID

from fastapi import APIRouter
from fastapi.responses import PlainTextResponse
from scheduler.trigger import weekday as weekday_factory
from pydantic import BaseModel

from reminder_ai.scheduler import UTC, Scheduler

ROUTE_REMINDERS = "reminders"

router_reminders = APIRouter(tags=[ROUTE_REMINDERS])

Wir können nun die fake_remind-Funktion definieren, die simuliert, wie Erinnerungen an Benutzer und Gruppen generiert und gesendet werden:

# reminder_ai/api/v1/reminders.py
async def fake_remind(
    *,
    user_id: UUID | None = None,
    group_id: UUID | None = None,
    prompt: str,
) -> None:
    t_generate = "[{entity}] Generate reminder message with prompt: {prompt}..."
    t_remind = "[{entity}] Sending reminder message via WhatsApp API"

    match user_id, group_id:
        case UUID() as uuid, None:
            print(t_generate.format(entity=f"User {uuid}", prompt=prompt[:20]))
            await asyncio.sleep(4)  # simulate request to llm api
            print(t_remind.format(entity=f"User {uuid}"))
        case None, UUID() as uuid:
            print(t_generate.format(entity=f"Group {uuid}", prompt=prompt[:20]))
            await asyncio.sleep(4)  # simulate request to llm api
            print(t_remind.format(entity=f"Group {uuid}"))
        case _:
            raise ValueError("Exactly one of user_id or group_id is required.")

Hier nutzen wir das Pattern-Matching Feature von Python, um sowohl Benutzer- als auch Gruppen-Erinnerungen zu behandeln. Wir können nun den Endpunkt /users/{user_id} einfach definieren:

# reminder_ai/api/v1/reminders.py
class V1RemindersUsers_Post_Body(BaseModel):
    prompt: str
    schedule_time: dt.datetime | None = None  # None means now


@router_reminders.post("/users/{user_id}")
async def _(*, user_id: UUID, body: V1RemindersUsers_Post_Body) -> PlainTextResponse:
    Scheduler.schedule.once(
        body.schedule_time or dt.timedelta(),
        fake_remind,
        kwargs={"user_id": user_id, "prompt": body.prompt},
    )
    return PlainTextResponse(content="OK")

Scheduler.schedule.once kann genutzt werden, um eine Aufgabe zu planen, die entweder zu einem bestimmten Zeitpunkt oder nach einer bestimmten Verzögerung einmalig ausgeführt wird. Ein timedelta von Null bedeutet, dass die Aufgabe so bald wie möglich vom asyncio-Event-Loop ausgeführt wird. Zudem nutzen wir den kwargs-Parameter, um die Argumente user_id und prompt an die fake_remind-Funktion weiterzugeben.

Ebenso können wir einen maßgeschneiderten Endpunkt definieren, der es ermöglicht, Erinnerungsnachrichten an eine Zielgruppe wöchentlich zu senden:

# reminder_ai/api/v1/reminders.py
class V1RemindersGroups_Post_Body(BaseModel):
    prompt: str
    weekday: int  # 0: Montag, ..., 6: Sonntag
    n_weeks: int


@router_reminders.post("/groups/{group_id}")
async def _(
    *,
    group_id: UUID,
    body: V1RemindersGroups_Post_Body,
) -> PlainTextResponse:
    day_and_time = weekday_factory(body.weekday, dt.time(hour=9, minute=0, tzinfo=UTC))
    Scheduler.schedule.weekly(
        day_and_time,
        fake_remind,
        kwargs={"group_id": group_id, "prompt": body.prompt},
        max_attempts=body.n_weeks,
    )
    return PlainTextResponse(content="OK")

Hier nutzen wir die scheduler.trigger.weekday-Funktion, um den Tag und die Uhrzeit der Woche zu definieren, an der die Erinnerung gesendet werden soll. Der max_attempts-Parameter begrenzt die maximale Anzahl an Erinnerungen, die diese Aufgabe senden kann.

Definition der Status-API

Das Scheduler-Objekt enthält einen Zustand, der aus einer Liste aktiver Jobs besteht. Die eingebaute Funktion str kann genutzt werden, um eine menschenlesbare Tabellenform des Scheduler-Zustands zu erhalten. Dies ermöglicht uns, eine Status-API einfach zu implementieren:

from fastapi import APIRouter
from fastapi.responses import PlainTextResponse

from reminder_ai.scheduler import Scheduler

ROUTE_STATUS = "status"
router_status = APIRouter(tags=[ROUTE_STATUS])


@router_status.get("/scheduler")
async def _() -> PlainTextResponse:
    return PlainTextResponse(content=str(Scheduler.schedule), status_code=200)

Registriere FastAPI-Routen

FastAPI bietet ein einfaches hierarchisches Router-System, um die Endpunkte zu registrieren, die wir in den vorherigen Abschnitten definiert haben:

# reminder_ai/api/v1/__init__.py
from fastapi import APIRouter

from reminder_ai.api.v1.reminders import ROUTE_REMINDERS, router_reminders
from reminder_ai.api.v1.status import ROUTE_STATUS, router_status

router_v1 = APIRouter()
router_v1.include_router(router_reminders, prefix=f"/{ROUTE_REMINDERS}")
router_v1.include_router(router_status, prefix=f"/{ROUTE_STATUS}")

Die Anwendung in Aktion

Wir haben nun eine Anwendung mit mehreren Endpunkten implementiert, die mit dem Scheduler interagieren. Die Anwendung kann lokal ausgefĂĽhrt und ĂĽber HTTP-Anfragen genutzt werden. Um die Anwendung zu starten, fĂĽhre folgenden Befehl aus:

uv run python -m reminder_ai

Standardmäßig stellt FastAPI ein Swagger UI unter http://127.0.0.1:8000/docs bereit, die eine interaktive Oberfläche für das Testen von API-Endpunkten bietet.

FastAPI Swagger UI

Beginnen wir damit, zu sehen, was der /status-Endpunkt macht und ob der Scheduler wie erwartet läuft. Klicke dazu auf das Accordion mit der Bezeichnung /api/v1/status/scheduler, finde und klicke auf den Try it out-Button. Dies zeigt den Execute-Button an. Ein Klick darauf sendet eine GET-Anfrage und zeigt die Antwort an. Wenn alles wie erwartet funktioniert, solltest du eine Antwort wie folgt sehen:

FastAPI Swagger UI - Status-Antwort

Die Oberfläche stellt zudem einen curl-Befehl bereit, den du nutzen kannst, um den Endpunkt von der Kommandozeile aus zu testen. Zusammen mit watch kannst du den Status des Schedulers in Echtzeit überwachen:

watch -n 1 curl -s http://localhost:8000/api/v1/status/scheduler

Mit der Option -n 1 aktualisiert watch die Ausgabe jede Sekunde. So kannst du direkt beobachten, was der Scheduler macht, wenn du mit anderen Endpunkten interagierst.

Planen wir nun eine einzelne Erinnerung fĂĽr den Benutzer mit der ID 07861d07-7f90-4f3d-bfbc-8b4d44d1679e zum angegebenen UTC-Zeitpunkt 2025-06-06T11:30:00.000Z:

curl -X 'POST' \
  'http://127.0.0.1:8000/api/v1/reminders/users/07861d07-7f90-4f3d-bfbc-8b4d44d1679e' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "prompt": "Write user X a nice reminder to register for the upcoming event!",
  "schedule_time": "2025-06-06T11:30:00.000Z"
}'

Planen wir weiterhin eine Gruppen-Erinnerung für die Gruppe mit der ID 8e4f7f3d-3680-4f7b-b915-dc0452a330d6, die für die nächsten 3 Wochen jeden Mittwoch Erinnerungen an das bevorstehende Ereignis sendet:

curl -X 'POST' \
  'http://127.0.0.1:8000/api/v1/reminders/groups/8e4f7f3d-3680-4f7b-b915-dc0452a330d6' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "prompt": "Write a nice reminder for the python WhatsApp group to register for the upcoming event!",
  "weekday": 2,
  "n_weeks": 3
}'

Du solltest in Echtzeit sehen können, wie der Scheduler die Aufgaben bearbeitet und den Status aktualisiert.

Weitere Funktionen

Während dieses Tutorial viele der Schlüsselmerkmale des Schedulers behandelt, gibt es noch einige weitere Features, die von Interesse sein könnten:

Zusammenfassung

Wir haben gezeigt, wie man die Python Scheduler-Library in eine FastAPI-Anwendung integriert und demonstriert, wie man verschiedene Arten von Jobs planen kann, darunter:

  • Regelmäßige Health Checks
  • Tägliche KontingentzurĂĽcksetzungen
  • Wöchentliche Erinnerungen
  • Einmalige Erinnerungen

Der vollständige Quellcode wird in Kürze auf GitHub verfügbar sein. Teile uns mit, welche Art von Inhalten du dir in Zukunft wünschst. Wir freuen uns auf dein Feedback und deine Vorschläge - kontaktiere uns, indem du eine Issue im GitHub-Repository öffnest oder eine E-Mail sendest.