-
Notifications
You must be signed in to change notification settings - Fork 16.7k
Description
Apache Airflow version
3.1.7
If "Other Airflow 2 version" selected, which one?
No response
What happened?
After a Redis broker restart (e.g., VM reboot), the Celery worker reconnects at the transport level but silently loses its consumer registration on the task queue. The worker process stays alive, celery inspect ping returns OK, but inspect.active_queues() returns None — the worker is in a catatonic state where it accepts no new tasks.
This is a known upstream Celery bug (celery/celery#8030, celery/celery#9054, celery/celery#8990) that has persisted across Celery 5.2.x through 5.5.x with Redis broker. The partial fix in celery/celery#8796 did not fully resolve it.
The problem on Airflow's side: The current worker liveness/health check mechanism does not detect this state. Since celery inspect ping responds normally, Docker/Kubernetes health probes pass, and the worker is never restarted. Tasks accumulate in the Redis queue with state queued, hit the scheduler's requeue limit, and are marked failed.
In our case, 302 tasks piled up in the default queue over ~14 hours before we noticed. 7 DAGs failed, all with Task requeue attempts exceeded max; marking failed.
What you think should happen instead?
The Airflow Celery worker health check should verify that the worker has active queue consumers, not just that it responds to ping. Specifically:
# Current check (insufficient):
result = app.control.inspect().ping()
# Returns OK even in catatonic state
# Proposed additional check:
queues = app.control.inspect().active_queues()
if queues is None or worker_name not in queues:
# Worker is alive but not consuming — health check should FAIL
return FalseIf the worker is alive but has no registered queues, the health check should fail, triggering a container restart via the orchestrator (Docker, Kubernetes, systemd).
This would go in the airflow-providers-celery package, likely in the worker CLI health check logic.
How to reproduce
- Start Airflow with CeleryExecutor and Redis broker
- Confirm worker is consuming tasks normally
- Restart Redis (e.g.,
docker restart redisor reboot the Redis host) - Observe: worker process stays alive,
celery inspect pingreturns OK celery inspect active_queuesreturnsNoneor empty for the worker- Schedule a DAG — task goes to
queuedstate and is never picked up - After scheduler requeue attempts, task is marked
failed
Operating System
Debian 12 (Proxmox VM)
Versions of Apache Airflow Providers
apache-airflow-providers-celery (installed with Airflow 3.1.7)
Deployment
Docker Compose
Deployment details
- Airflow 3.1.7 (Docker image
apache/airflow:3.1.7-python3.11) - Celery worker with 16 fork pool workers
- Redis 7.2.10 as broker
- PostgreSQL as result backend
worker_prefetch_multiplier = 1
Anything else?
Related upstream Celery issues:
- Worker stops consuming tasks after redis reconnection on celery 5.2.3 celery/celery#8030
- Worker stops consuming tasks after Redis re-connection on celery 5 celery/celery#8091
- #8796: Worker not consuming tasks after Redis broker restart, maybe not fixed in version 5.4.0 celery/celery#8990 (confirms not fixed in 5.4.0)
- Celery worker stops consuming after Redis restart celery/celery#9054
- Worker stops consuming tasks after redis reconnection (gevent) celery/celery#9191
Previous Airflow issues (closed as upstream):
- Celery worker enters a catatonic state after redis restart #26542
- Celery worker enters a catatonic state after redis restart and the tasks are getting queued #32484
- Celery worker tasks in queued status when airflow-redis-master restarted #24498
- Worker sometimes does not reconnect to redis/celery queue after crash #27032
Those issues were rightfully closed as upstream Celery bugs. This issue proposes a defensive fix on Airflow's side — improving the health check to detect and recover from the catatonic state, regardless of when Celery fixes the root cause.
Workarounds:
--without-heartbeat --without-gossip --without-mingleavoids the code path but loses cluster featuresbroker_connection_retry = Falsemakes the worker crash on broker loss (requires restart policy)- Custom health check script checking
active_queues()instead ofping
I'm willing to work on a PR for this.
🤖 Generated with Claude Code