dramatiq is not your architecture
I wanted to “convert Dramatiq to event-driven architecture.” That’s a category error, and it took an embarrassingly long conversation to figure out why.
Dramatiq is a message transport. Event-driven architecture is a coordination pattern you build on top of a message transport. Like asking how to convert a highway into a logistics company. The highway is infrastructure. The logistics is what you do with it.
Our actual problem had three parts:
-
Manual coordination logic. We had a
task_finish_conversation_hookthat imperatively orchestrated follow-up tasks. “When transcription finishes, run summarization, then merging, then ETL.” Every new downstream task meant editing this god function. -
Tight coupling through direct database mutations. Every task reached into the database and flipped conversation states directly. No events, no contracts, just raw updates scattered across task files.
-
Missing event semantics. Tasks performed actions rather than reacting to domain events. There was no concept of “TranscriptionCompleted”, just a task that happened to run after transcription.
The migration preserved Dramatiq entirely. Kept every actor, every queue configuration. What changed was the coordination layer on top:
# Before: imperative orchestrationdef task_finish_conversation_hook(conversation_id): run_summarization(conversation_id) run_merging(conversation_id) run_etl(conversation_id)
# After: event causalityevent_bus.publish("ConversationFinished", { "conversation_id": conversation_id})# Handlers subscribe independently, no central coordinatorCompletion detection was the interesting part. When you split an audio file into chunks and transcribe them in parallel, how do you know when all chunks are done? Atomic Redis counters:
# Each chunk transcription incrementsevent_bus.increment_counter(f"conv:{conversation_id}:transcribed")
# When counter == total_chunks, emit completion eventif event_bus.get_counter(f"conv:{conversation_id}:transcribed") == total_chunks: event_bus.publish("AllChunksTranscribed", { "conversation_id": conversation_id })Pipeline became: TranscriptionRequested -> ChunkTranscribed -> AllChunksTranscribed -> ConversationFinished. Each step is a domain event. Each handler is independent. Adding a new reaction to ConversationFinished means writing a new handler, not editing existing coordination logic.
Deployed alongside existing tasks with legacy wrappers for zero-downtime migration. Updated calling code to use event_bus.publish(), verified everything worked, removed the wrappers.
The migration was architectural, not technological. The actual business logic (transcription, summarization, database operations) stayed completely unchanged. We just restructured how things talked to each other. Domain logic doesn’t care about coordination. Coordination is a separate concern.