How to design a data-agent and infrastructure system using Qwen's left-hand models with active pipeline intelligence?

In this tutorial, we build an agentic system for the data and infrastructure system using the Qwen2.5-0.5b-Teacher lightweight model for efficient execution. We start by building a Flexible LLM agent framework and then develop specialized agents that handle different layers of data management, from import and quality analysis to infrastructure analysis. We integrate these agents into an Orchestrator that coordinates their interactions, ensuring smooth collaboration in the data pipeline. Using hands-on examples such as e-commerce and IoT pipelines, we explore how autonomous decision-making can guide complex data operations. Look Full codes here.
!pip install -q transformers torch accelerate datasets huggingface_hub
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import json, time
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
import pandas as pd
class LightweightLLMAgent:
def __init__(self, role: str, model_name: str = "Qwen/Qwen2.5-0.5B-Instruct"):
self.role = role
self.model_name = model_name
self.device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Loading {model_name} for {role} agent on {self.device}...")
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
device_map="auto"
)
self.conversation_history = []
def generate_response(self, prompt: str, max_tokens: int = 150) -> str:
messages = [
{"role": "system", "content": f"You are a {self.role} agent in a data infrastructure system."},
{"role": "user", "content": prompt}
]
text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
model_inputs = self.tokenizer([text], return_tensors="pt").to(self.device)
with torch.no_grad():
generated_ids = self.model.generate(
model_inputs.input_ids,
max_new_tokens=max_tokens,
temperature=0.7,
do_sample=True,
top_p=0.95
)
generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)]
response = self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
self.conversation_history.append({"prompt": prompt, "response": response})
return response
We start by setting up a lightweight raw agent infrastructure using the Qwen2.5-0.5b- model. We load the model with the tokenizer, and define a class of virtual agents capable of handling state conversations and generating intelligent responses. This forms the main basis on which our special agents work well within colob. Look Full codes here.
class DataIngestionAgent(LightweightLLMAgent):
def __init__(self):
super().__init__(role="Data Ingestion Specialist")
def analyze_data_source(self, source_info: Dict) -> Dict:
prompt = f"""Analyze this data source and provide ingestion strategy:
Source Type: {source_info.get('type', 'unknown')}
Volume: {source_info.get('volume', 'unknown')}
Frequency: {source_info.get('frequency', 'unknown')}
Provide a brief strategy focusing on: 1) Ingestion method, 2) Key considerations."""
strategy = self.generate_response(prompt, max_tokens=100)
return {"source": source_info, "strategy": strategy, "timestamp": datetime.now().isoformat()}
class DataQualityAgent(LightweightLLMAgent):
def __init__(self):
super().__init__(role="Data Quality Analyst")
def assess_data_quality(self, data_sample: Dict) -> Dict:
prompt = f"""Assess data quality for this sample:
Completeness: {data_sample.get('completeness', 'N/A')}%
Consistency: {data_sample.get('consistency', 'N/A')}%
Issues Found: {data_sample.get('issues', 0)}
Provide brief quality assessment and top 2 recommendations."""
assessment = self.generate_response(prompt, max_tokens=100)
return {"assessment": assessment, "severity": self._calculate_severity(data_sample), "timestamp": datetime.now().isoformat()}
def _calculate_severity(self, data_sample: Dict) -> str:
completeness = data_sample.get('completeness', 100)
consistency = data_sample.get('consistency', 100)
avg_score = (completeness + consistency) / 2
if avg_score >= 90: return "LOW"
elif avg_score >= 70: return "MEDIUM"
else: return "HIGH"
We design data entry and data quality agents to focus on systematic analysis of data pipelines. We allow the input agent to determine the best way to flow data, while the quality agent checks for data completeness, consistency, and problems to provide unbiased insight. Together, they established the first two layers of independent data management. Look Full codes here.
class InfrastructureOptimizationAgent(LightweightLLMAgent):
def __init__(self):
super().__init__(role="Infrastructure Optimization Specialist")
def optimize_resources(self, metrics: Dict) -> Dict:
prompt = f"""Analyze infrastructure metrics and suggest optimizations:
CPU Usage: {metrics.get('cpu_usage', 0)}%
Memory Usage: {metrics.get('memory_usage', 0)}%
Storage: {metrics.get('storage_used', 0)}GB / {metrics.get('storage_total', 0)}GB
Query Latency: {metrics.get('query_latency', 0)}ms
Provide 2 optimization recommendations."""
recommendations = self.generate_response(prompt, max_tokens=100)
return {"current_metrics": metrics, "recommendations": recommendations, "priority": self._calculate_priority(metrics), "timestamp": datetime.now().isoformat()}
def _calculate_priority(self, metrics: Dict) -> str:
cpu = metrics.get('cpu_usage', 0)
memory = metrics.get('memory_usage', 0)
if cpu > 85 or memory > 85: return "CRITICAL"
elif cpu > 70 or memory > 70: return "HIGH"
else: return "NORMAL"
We develop an infrastructure utilization agent to continuously analyze key metrics such as CPU, memory, and storage usage. We use it to generate intelligent suggestions, helping us maintain high performance and resource efficiency. This agent ensures that our infrastructure remains responsive and visible during data processing. Look Full codes here.
class AgenticDataOrchestrator:
def __init__(self):
print("n" + "="*70)
print("Initializing Agentic Data Infrastructure System")
print("="*70 + "n")
self.ingestion_agent = DataIngestionAgent()
self.quality_agent = DataQualityAgent()
self.optimization_agent = InfrastructureOptimizationAgent()
self.execution_log = []
def process_data_pipeline(self, pipeline_config: Dict) -> Dict:
results = {"pipeline_id": pipeline_config.get("id", "unknown"), "start_time": datetime.now().isoformat(), "stages": []}
print("n[Stage 1] Data Ingestion Analysis")
ingestion_result = self.ingestion_agent.analyze_data_source(pipeline_config.get("source", {}))
print(f"Strategy: {ingestion_result['strategy'][:150]}...")
results["stages"].append({"stage": "ingestion", "result": ingestion_result})
print("n[Stage 2] Data Quality Assessment")
quality_result = self.quality_agent.assess_data_quality(pipeline_config.get("quality_metrics", {}))
print(f"Assessment: {quality_result['assessment'][:150]}...")
print(f"Severity: {quality_result['severity']}")
results["stages"].append({"stage": "quality", "result": quality_result})
print("n[Stage 3] Infrastructure Optimization")
optimization_result = self.optimization_agent.optimize_resources(pipeline_config.get("infrastructure_metrics", {}))
print(f"Recommendations: {optimization_result['recommendations'][:150]}...")
print(f"Priority: {optimization_result['priority']}")
results["stages"].append({"stage": "optimization", "result": optimization_result})
results["end_time"] = datetime.now().isoformat()
results["status"] = "completed"
self.execution_log.append(results)
return results
def generate_summary_report(self) -> pd.DataFrame:
if not self.execution_log: return pd.DataFrame()
summary_data = []
for log in self.execution_log:
summary_data.append({"Pipeline ID": log["pipeline_id"], "Start Time": log["start_time"], "Status": log["status"], "Stages Completed": len(log["stages"])})
return pd.DataFrame(summary_data)
We built the Agentic Data Orchestrator to connect all specialized agents under a unified workflow. We use it to manage the execution of final pipelines, which completes completion, triggers installation, quality checks, and optimization respectively. By doing this, we bring structure, collaboration, and automation to the entire multi-agent system. Look Full codes here.
def main():
orchestrator = AgenticDataOrchestrator()
print("n" + "="*70)
print("EXAMPLE 1: E-commerce Data Pipeline")
print("="*70)
ecommerce_pipeline = {
"id": "ecommerce_pipeline_001",
"source": {"type": "REST API", "volume": "10GB/day", "frequency": "real-time"},
"quality_metrics": {"completeness": 87, "consistency": 92, "issues": 15},
"infrastructure_metrics": {"cpu_usage": 78, "memory_usage": 82, "storage_used": 450, "storage_total": 1000, "query_latency": 250}
}
result1 = orchestrator.process_data_pipeline(ecommerce_pipeline)
print("nn" + "="*70)
print("EXAMPLE 2: IoT Sensor Data Pipeline")
print("="*70)
iot_pipeline = {
"id": "iot_pipeline_002",
"source": {"type": "Message Queue (Kafka)", "volume": "50GB/day", "frequency": "streaming"},
"quality_metrics": {"completeness": 95, "consistency": 88, "issues": 8},
"infrastructure_metrics": {"cpu_usage": 65, "memory_usage": 71, "storage_used": 780, "storage_total": 2000, "query_latency": 180}
}
result2 = orchestrator.process_data_pipeline(iot_pipeline)
print("nn" + "="*70)
print("EXECUTION SUMMARY REPORT")
print("="*70 + "n")
summary_df = orchestrator.generate_summary_report()
print(summary_df.to_string(index=False))
print("n" + "="*70)
print("Tutorial Complete!")
print("="*70)
print("nKey Concepts Demonstrated:")
print("âś“ Lightweight LLM agent architecture")
print("âś“ Specialized agents for different data tasks")
print("âś“ Multi-agent orchestration")
print("âś“ Infrastructure monitoring and optimization")
print("âś“ Autonomous decision-making in data pipelines")
if __name__ == "__main__":
main()
We demonstrate our complete system using two real-world examples, an e-commerce and an IOT data pipeline. We see how each agent performs its role independently while contributing to a shared goal. Finally, we produce a summary report, confirming the efficiency of the vascular meter and the agentic power of the lightweight agentic agent.
In conclusion, we designed and built a framework for a smart, multi-database infrastructure powered by an open source model. We prove that autonomous but autonomous agents can independently analyze, and efficiently use, real-world data systems. The entire setup demonstrates how a small LLMS can manage infrastructure intelligence, while highlighting how aventic orchestration transforms traditional data flows into dynamic, adaptive systems that are ready for critical business applications.
Look Full codes here. Feel free to take a look at ours GitHub page for tutorials, code and notebooks. Also, feel free to follow us Kind of stubborn and don't forget to join ours 100K + ML Subreddit and sign up Our newsletter. Wait! Do you telegraph? Now you can join us by telegraph.
AsifAzzaq is the CEO of MarktechPost Media Inc.. as a visionary entrepreneur and developer, Asifi is committed to harnessing the power of social intelligence for good. His latest effort is the launch of a media intelligence platform, MarktechPpost, which stands out for its deep understanding of machine learning and deep learning stories that are technically sound and easily understood by a wide audience. The platform sticks to more than two million monthly views, which shows its popularity among the audience.
Follow Marktechpost: Add us as a favorite source on Google.



