Generative AI

How to Build an Advanced Distributed Workflow System Using Kombu for Topical and Concurrent Workforce

In this tutorial, we create a fully functional event-driven workflow using Kombuit considers messaging as a core building skill. We walk step-by-step through the setup of exchanges, routing keys, backends, and producers at the same time, allowing us to look at a real distributed system. As we implement each component, we realize that clean message flow, synchronous processing, and routing patterns give us the same capabilities that production microservices rely on every day. Check it out COMPLETE CODES.

!pip install kombu


import threading
import time
import logging
import uuid
import datetime
import sys


from kombu import Connection, Exchange, Queue, Producer, Consumer
from kombu.mixins import ConsumerMixin


logging.basicConfig(
   level=logging.INFO,
   format="%(message)s",
   handlers=[logging.StreamHandler(sys.stdout)],
   force=True
)
logger = logging.getLogger(__name__)


BROKER_URL = "memory://localhost/"

We start by installing Kombu, importing dependencies, and configuring logging to clearly see all the messages flowing through the system. We also set up a memory vendor URL, which allows us to use everything locally in Colab without needing RabbitMQ. This setup forms the basis of our distributed message flow. Check it out COMPLETE CODES.

media_exchange = Exchange('media_exchange', type="topic", durable=True)


task_queues = [
   Queue('video_queue', media_exchange, routing_key='video.#'),
   Queue('audit_queue', media_exchange, routing_key='#'),
]

We define subject exchange to easily send messages using wildcard patterns. We also created two lines: one dedicated to video-related tasks and a test line that listens to everything. Using subject routing, we can precisely control how messages flow through the system. Check out COMPLETE CODES.

class Worker(ConsumerMixin):
   def __init__(self, connection, queues):
       self.connection = connection
       self.queues = queues
       self.should_stop = False


   def get_consumers(self, Consumer, channel):
       return [
           Consumer(queues=self.queues,
                    callbacks=[self.on_message],
                    accept=['json'],
                    prefetch_count=1)
       ]


   def on_message(self, body, message):
       routing_key = message.delivery_info['routing_key']
       payload_id = body.get('id', 'unknown')


       logger.info(f"n⚡ RECEIVED MSG via key: [{routing_key}]")
       logger.info(f"   Payload ID: {payload_id}")
      
       try:
           if 'video' in routing_key:
               self.process_video(body)
           elif 'audit' in routing_key:
               logger.info("   🔍 [Audit] Logging event...")
          
           message.ack()
           logger.info(f"   ✅ ACKNOWLEDGED")


       except Exception as e:
           logger.error(f"   ❌ ERROR: {e}")


   def process_video(self, body):
       logger.info("   ⚙️  [Processor] Transcoding video (Simulating work...)")
       time.sleep(0.5)

We use a custom worker using Kombu's ConsumerMixin to run it in a background thread. In the message callback, we check the routing key, request the appropriate processing function, and accept the message. This staff structure gives us cleanliness, simultaneous messaging and full control. Check out COMPLETE CODES.

def publish_messages(connection):
   producer = Producer(connection)
  
   tasks = [
       ('video.upload', {'file': 'movie.mp4'}),
       ('user.login', {'user': 'admin'}),
   ]


   logger.info("n🚀 PRODUCER: Starting to publish messages...")
  
   for r_key, data in tasks:
       data['id'] = str(uuid.uuid4())[:8]
      
       logger.info(f"📤 SENDING: {r_key} -> {data}")
      
       producer.publish(
           data,
           exchange=media_exchange,
           routing_key=r_key,
           serializer="json"
       )
       time.sleep(1.5)


   logger.info("🏁 PRODUCER: Done.")

We now create a producer that sends a JSON payload to an exchange with different routing keys. We create unique IDs for each event and check how they are routed to other lines. This mirrors the real-world microservice event publishing, where producers and consumers are often disconnected. Check out COMPLETE CODES.

def run_example():
   with Connection(BROKER_URL) as conn:
       worker = Worker(conn, task_queues)
       worker_thread = threading.Thread(target=worker.run)
       worker_thread.daemon = True
       worker_thread.start()
      
       logger.info("✅ SYSTEM: Worker thread started.")
       time.sleep(1)


       try:
           publish_messages(conn)
           time.sleep(2)
       except KeyboardInterrupt:
           pass
       finally:
           worker.should_stop = True
           logger.info("n👋 SYSTEM: Execution complete.")


if __name__ == "__main__":
   run_example()

We start the worker on the background thread and fire the producer on the main thread. This structure gives us a small distributed system that works in Colab. By looking at the logs, we see messages published → transmitted → consumed → received, completing the full life cycle of event processing.

In conclusion, we have designed a flexible, distributed workflow pipeline that processes real-time events with clarity and precision. We've seen how Kombu removes the complexity of messaging systems while still giving us precise control over routing, usage, and employee compatibility. As we see messages flow from producer to queue to worker, we've gained a deep appreciation for the beauty of event-driven system design, and are now well-equipped to scale this foundation into robust microservices, back-end processors, and enterprise-grade workflows.


Check it out COMPLETE CODES. Feel free to check out our GitHub page for Tutorials, Codes and Notebooks. Also, feel free to follow us Twitter and don't forget to join our 100k+ ML SubReddit and Subscribe to Our newspaper.


Asif Razzaq is the CEO of Marktechpost Media Inc. As a visionary entrepreneur and engineer, Asif is committed to harnessing the power of Artificial Intelligence for the benefit of society. His latest endeavor is the launch of Artificial Intelligence Media Platform, Marktechpost, which stands out for its extensive coverage of machine learning and deep learning stories that sound technically sound and easily understood by a wide audience. The platform boasts of more than 2 million monthly views, which shows its popularity among the audience.

Source link

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button