How to design a mini augmented learning agent with intelligent local feedback, decision synchronization, and multitasking communication

In this tutorial, we demonstrate a virtual learning setup where an agent-alent system learns to navigate a grid world through interaction, feedback, and decision making. We build everything from scratch and include three agent roles: action agent, tool agent, and manager, so we can see how easy it is for heuristics, analysis, and supervision to combine to achieve something you understand. Also, we see how agents work together, refine their strategies, and gradually learn to achieve a goal while overcoming obstacles and uncertainties. Look Full codes here.
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clear_output
import time
from collections import defaultdict
class GridWorld:
def __init__(self, size=8):
self.size = size
self.agent_pos = [0, 0]
self.goal_pos = [size-1, size-1]
self.obstacles = self._generate_obstacles()
self.visited = set()
self.step_count = 0
self.max_steps = size * size * 2
def _generate_obstacles(self):
obstacles = set()
n_obstacles = self.size
while len(obstacles) < n_obstacles:
pos = (np.random.randint(1, self.size-1),
np.random.randint(1, self.size-1))
if pos != (0, 0) and pos != (self.size-1, self.size-1):
obstacles.add(pos)
return obstacles
def reset(self):
self.agent_pos = [0, 0]
self.visited = {tuple(self.agent_pos)}
self.step_count = 0
return self._get_state()
def _get_state(self):
return {
'position': tuple(self.agent_pos),
'goal': self.goal_pos,
'distance_to_goal': abs(self.agent_pos[0] - self.goal_pos[0]) +
abs(self.agent_pos[1] - self.goal_pos[1]),
'visited_count': len(self.visited),
'steps': self.step_count,
'can_move': self._get_valid_actions()
}
def _get_valid_actions(self):
valid = []
moves = {'up': [-1, 0], 'down': [1, 0], 'left': [0, -1], 'right': [0, 1]}
for action, delta in moves.items():
new_pos = [self.agent_pos[0] + delta[0], self.agent_pos[1] + delta[1]]
if (0 <= new_pos[0] < self.size and 0 <= new_pos[1] < self.size and
tuple(new_pos) not in self.obstacles):
valid.append(action)
return valid
We set up the whole nature of the fridge and explain how the agent, the goal, and the constraints are there in it. We establish the structure of state representations and valid movements, and prepare the environment so that we can interact with it powerfully. As we work this part, we see the world taking shape and ready for agents to explore. Look Full codes here.
class GridWorld(GridWorld):
def step(self, action):
self.step_count += 1
moves = {'up': [-1, 0], 'down': [1, 0], 'left': [0, -1], 'right': [0, 1]}
if action not in moves:
return self._get_state(), -1, False, "Invalid action"
delta = moves[action]
new_pos = [self.agent_pos[0] + delta[0], self.agent_pos[1] + delta[1]]
if not (0 <= new_pos[0] < self.size and 0 <= new_pos[1] < self.size):
return self._get_state(), -1, False, "Hit wall"
if tuple(new_pos) in self.obstacles:
return self._get_state(), -1, False, "Hit obstacle"
self.agent_pos = new_pos
pos_tuple = tuple(self.agent_pos)
reward = -0.1
if pos_tuple not in self.visited:
reward += 0.5
self.visited.add(pos_tuple)
done = False
info = "Moved"
if self.agent_pos == self.goal_pos:
reward += 10
done = True
info = "Goal reached!"
elif self.step_count >= self.max_steps:
done = True
info = "Max steps reached"
return self._get_state(), reward, done, info
def render(self, agent_thoughts=None):
grid = np.zeros((self.size, self.size, 3))
for pos in self.visited:
grid[pos[0], pos[1]] = [0.7, 0.9, 1.0]
for obs in self.obstacles:
grid[obs[0], obs[1]] = [0.2, 0.2, 0.2]
grid[self.goal_pos[0], self.goal_pos[1]] = [0, 1, 0]
grid[self.agent_pos[0], self.agent_pos[1]] = [1, 0, 0]
plt.figure(figsize=(10, 8))
plt.imshow(grid, interpolation='nearest')
plt.title(f"Step: {self.step_count} | Visited: {len(self.visited)}/{self.size*self.size}")
for i in range(self.size + 1):
plt.axhline(i - 0.5, color="gray", linewidth=0.5)
plt.axvline(i - 0.5, color="gray", linewidth=0.5)
if agent_thoughts:
plt.text(0.5, -1.5, agent_thoughts, ha="center", fontsize=9,
bbox=dict(boxstyle="round", facecolor="wheat", alpha=0.8),
wrap=True, transform=plt.gca().transData)
plt.axis('off')
plt.tight_layout()
plt.show()
We explain how each step works in nature and how the world is rendered in perspective. We calculate rewards, see collisions, track progress, and show everything by using grid expression visualization. As we roll out this concept, we watch the agent's journey unfold in real time with clear feedback. Look Full codes here.
class ActionAgent:
def __init__(self):
self.q_values = defaultdict(lambda: defaultdict(float))
self.epsilon = 0.3
self.learning_rate = 0.1
self.discount = 0.95
def choose_action(self, state):
valid_actions = state['can_move']
if not valid_actions:
return None
pos = state['position']
if np.random.random() < self.epsilon:
action = np.random.choice(valid_actions)
reasoning = f"Exploring randomly: chose '{action}'"
else:
action_values = {a: self.q_values[pos][a] for a in valid_actions}
action = max(action_values, key=action_values.get)
reasoning = f"Exploiting: chose '{action}' (Q={self.q_values[pos][action]:.2f})"
return action, reasoning
def learn(self, state, action, reward, next_state):
pos = state['position']
next_pos = next_state['position']
current_q = self.q_values[pos][action]
next_max_q = max([self.q_values[next_pos][a] for a in next_state['can_move']], default=0)
new_q = current_q + self.learning_rate * (
reward + self.discount * next_max_q - current_q)
self.q_values[pos][action] = new_q
class ToolAgent:
def analyze(self, state, action_taken, reward, history):
suggestions = []
distance = state['distance_to_goal']
if distance <= 3:
suggestions.append("๐ฏ Very close to goal! Prioritize direct path.")
exploration_rate = state['visited_count'] / (state['steps'] + 1)
if exploration_rate < 0.5 and distance > 5:
suggestions.append("๐ Low exploration rate. Consider exploring more.")
if len(history) >= 5:
recent_rewards = [h[2] for h in history[-5:]]
avg_reward = np.mean(recent_rewards)
if avg_reward < -0.5:
suggestions.append("โ ๏ธ Negative reward trend. Try different strategy.")
elif avg_reward > 0.3:
suggestions.append("โ
Good progress! Current strategy working.")
if len(state['can_move']) <= 2:
suggestions.append("๐ง Limited movement options. Be careful.")
return suggestions
Using an action agent and a tool agent, we provide the learning system with the ability to learn and analyze feedback. We see how the action agent chooses the action with moderation and exploitation, while the tool agent evaluates the performance and suggests improvements. Together, they create an experiential learning loop. Look Full codes here.
class SupervisorAgent:
def decide(self, state, proposed_action, tool_suggestions):
if not proposed_action:
return None, "No valid actions available"
decision = proposed_action
reasoning = f"Approved action '{proposed_action}'"
for suggestion in tool_suggestions:
if "goal" in suggestion.lower() and "close" in suggestion.lower():
goal_direction = self._get_goal_direction(state)
if goal_direction in state['can_move']:
decision = goal_direction
reasoning = f"Override: Moving '{goal_direction}' toward goal"
break
return decision, reasoning
def _get_goal_direction(self, state):
pos = state['position']
goal = state['goal']
if goal[0] > pos[0]:
return 'down'
elif goal[0] < pos[0]:
return 'up'
elif goal[1] > pos[1]:
return 'right'
else:
return 'left'
We introduce the Superpeisor agent, which acts as the final Agent in the system. We see how it interprets proposals, prioritizes risky decisions, and ensures that actions are always consistent with overall goals. As we use this component, we get a unified integrated decision flow. Look Full codes here.
def train_multi_agent(episodes=5, visualize=True):
env = GridWorld(size=8)
action_agent = ActionAgent()
tool_agent = ToolAgent()
supervisor = SupervisorAgent()
episode_rewards = []
episode_steps = []
for episode in range(episodes):
state = env.reset()
total_reward = 0
done = False
history = []
print(f"n{'='*60}")
print(f"EPISODE {episode + 1}/{episodes}")
print(f"{'='*60}")
while not done:
action_result = action_agent.choose_action(state)
if action_result is None:
break
proposed_action, action_reasoning = action_result
suggestions = tool_agent.analyze(state, proposed_action, total_reward, history)
final_action, supervisor_reasoning = supervisor.decide(state, proposed_action, suggestions)
if final_action is None:
break
next_state, reward, done, info = env.step(final_action)
total_reward += reward
action_agent.learn(state, final_action, reward, next_state)
history.append((state, final_action, reward, next_state))
if visualize:
clear_output(wait=True)
thoughts = (f"Action Agent: {action_reasoning}n"
f"Supervisor: {supervisor_reasoning}n"
f"Tool Agent: {', '.join(suggestions[:2]) if suggestions else 'No suggestions'}n"
f"Reward: {reward:.2f} | Total: {total_reward:.2f}")
env.render(thoughts)
time.sleep(0.3)
state = next_state
episode_rewards.append(total_reward)
episode_steps.append(env.step_count)
print(f"nEpisode {episode+1} Complete!")
print(f"Total Reward: {total_reward:.2f}")
print(f"Steps Taken: {env.step_count}")
print(f"Cells Visited: {len(env.visited)}/{env.size**2}")
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(episode_rewards, marker="o")
plt.title('Episode Rewards')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.grid(True, alpha=0.3)
plt.subplot(1, 2, 2)
plt.plot(episode_steps, marker="s", color="orange")
plt.title('Episode Steps')
plt.xlabel('Episode')
plt.ylabel('Steps to Complete')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return action_agent, tool_agent, supervisor
if __name__ == "__main__":
print("๐ค Multi-Agent RL System: Grid World Navigation")
print("=" * 60)
print("Components:")
print(" โข Action Agent: Proposes actions using Q-learning")
print(" โข Tool Agent: Analyzes performance and suggests improvements")
print(" โข Supervisor Agent: Makes final decisions")
print("=" * 60)
trained_agents = train_multi_agent(episodes=5, visualize=True)
We use a full training loop where all employees work together in an environment on many topics. We track rewards, look for movement patterns, and visualize learning progress with each trial. As we complete this loop, we see a multi-agent system that improves and becomes more efficient in moving around the grid world.
In conclusion, we see that the RL system of the agent of the agent of the agent of the agent of the agent of the multis comes from a clean part and how each layer contributes to the renewal of Smartem: and the agent of the agent learns the development, and the Superterror directs the safe selection, directed to the state. We appreciate how this simple grid environment helps us visualize learning, evaluating, and making decisions in real time.
Look Full codes here. Feel free to take a look at ours GitHub page for tutorials, code and notebooks. Also, feel free to follow us Kind of stubborn and don't forget to join ours 100K + ML Subreddit and sign up Our newsletter. Wait! Do you telegraph? Now you can join us by telegraph.
AsifAzzaq is the CEO of MarktechPost Media Inc.. as a visionary entrepreneur and developer, Asifi is committed to harnessing the power of social intelligence for good. His latest effort is the launch of a media intelligence platform, MarktechPpost, which stands out for its deep understanding of machine learning and deep learning stories that are technically sound and easily understood by a wide audience. The platform sticks to more than two million monthly views, which shows its popularity among the audience.
Follow Marktechpost: Add us as a favorite source on Google.



