How We Learn Step-Level Rewards From Preference for Solving Reward-Stimulating Places Using an Online Reward Process

In this study, we examine online reward learning (OPRR) and show how to learn dense, high-level reward signals from trajectory perception to solve sparfus-reward-reinforced learning tasks. We go through each part, starting from the nature of the maze and the reward network of the choice of the generation we like, training, and testing, while looking at how the agent processes its behavior through the adoption of the Internet. By using this end-to-end functionality, we gain a practical insight into how OprL enables better credit allocation, more policy learning, and policy efficiency in challenging environments where the agent is difficult where the agent is difficult to get the reward. Look The complete code manual.
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
import matplotlib.pyplot as plt
from collections import deque
import random
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)
class MazeEnv:
def __init__(self, size=8):
self.size = size
self.start = (0, 0)
self.goal = (size-1, size-1)
self.obstacles = set([(i, size//2) for i in range(1, size-2)])
self.reset()
def reset(self):
self.pos = self.start
self.steps = 0
return self._get_state()
def _get_state(self):
state = np.zeros(self.size * self.size)
state[self.pos[0] * self.size + self.pos[1]] = 1
return state
def step(self, action):
moves = [(-1,0), (0,1), (1,0), (0,-1)]
new_pos = (self.pos[0] + moves[action][0],
self.pos[1] + moves[action][1])
if (0 <= new_pos[0] < self.size and
0 <= new_pos[1] < self.size and
new_pos not in self.obstacles):
self.pos = new_pos
self.steps += 1
done = self.pos == self.goal or self.steps >= 60
reward = 10.0 if self.pos == self.goal else 0.0
return self._get_state(), reward, done
def render(self):
grid = [['.' for _ in range(self.size)] for _ in range(self.size)]
for obs in self.obstacles:
grid[obs[0]][obs[1]] = '█'
grid[self.goal[0]][self.goal[1]] = 'G'
grid[self.pos[0]][self.pos[1]] = 'A'
return 'n'.join([''.join(row) for row in grid])
class ProcessRewardModel(nn.Module):
def __init__(self, state_dim, hidden=128):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, hidden),
nn.LayerNorm(hidden),
nn.ReLU(),
nn.Linear(hidden, hidden),
nn.LayerNorm(hidden),
nn.ReLU(),
nn.Linear(hidden, 1),
nn.Tanh()
)
def forward(self, states):
return self.net(states)
def trajectory_reward(self, states):
return self.forward(states).sum()
class PolicyNetwork(nn.Module):
def __init__(self, state_dim, action_dim, hidden=128):
super().__init__()
self.backbone = nn.Sequential(
nn.Linear(state_dim, hidden),
nn.ReLU(),
nn.Linear(hidden, hidden),
nn.ReLU()
)
self.actor = nn.Linear(hidden, action_dim)
self.critic = nn.Linear(hidden, 1)
def forward(self, state):
features = self.backbone(state)
return self.actor(features), self.critic(features)
We are laying the entire foundation for our OPRRL program by importing libraries, defining the maze environment, and creating reward lists and policy platforms. We establish how the world is represented, how obstacles block movement, and how the sparse reward structure works. We also developed Core Neural models that will later learn rewards and drive policy decisions. Look The complete code manual.
class OPRLAgent:
def __init__(self, state_dim, action_dim, lr=3e-4):
self.policy = PolicyNetwork(state_dim, action_dim)
self.reward_model = ProcessRewardModel(state_dim)
self.policy_opt = Adam(self.policy.parameters(), lr=lr)
self.reward_opt = Adam(self.reward_model.parameters(), lr=lr)
self.trajectories = deque(maxlen=200)
self.preferences = deque(maxlen=500)
self.action_dim = action_dim
def select_action(self, state, epsilon=0.1):
if random.random() < epsilon:
return random.randint(0, self.action_dim - 1)
state_t = torch.FloatTensor(state).unsqueeze(0)
with torch.no_grad():
logits, _ = self.policy(state_t)
probs = F.softmax(logits, dim=-1)
return torch.multinomial(probs, 1).item()
def collect_trajectory(self, env, epsilon=0.1):
states, actions, rewards = [], [], []
state = env.reset()
done = False
while not done:
action = self.select_action(state, epsilon)
next_state, reward, done = env.step(action)
states.append(state)
actions.append(action)
rewards.append(reward)
state = next_state
traj = {
'states': torch.FloatTensor(np.array(states)),
'actions': torch.LongTensor(actions),
'rewards': torch.FloatTensor(rewards),
'return': float(sum(rewards))
}
self.trajectories.append(traj)
return traj
We start building the oprl agent by using action selection and trajectory collection. We use a greedy strategy to validate tests and collect sequences of states, actions, and returns. As we run the agent through the maze, we store all trajectories that will later serve as selection data for shaping the reward model. Look The complete code manual.
def generate_preference(self):
if len(self.trajectories) < 2:
return
t1, t2 = random.sample(list(self.trajectories), 2)
label = 1.0 if t1['return'] > t2['return'] else 0.0
self.preferences.append({'t1': t1, 't2': t2, 'label': label})
def train_reward_model(self, n_updates=5):
if len(self.preferences) < 32:
return 0.0
total_loss = 0.0
for _ in range(n_updates):
batch = random.sample(list(self.preferences), 32)
loss = 0.0
for item in batch:
r1 = self.reward_model.trajectory_reward(item['t1']['states'])
r2 = self.reward_model.trajectory_reward(item['t2']['states'])
logit = r1 - r2
pred_prob = torch.sigmoid(logit)
label = item['label']
loss += -(label * torch.log(pred_prob + 1e-8) +
(1-label) * torch.log(1 - pred_prob + 1e-8))
loss = loss / len(batch)
self.reward_opt.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.reward_model.parameters(), 1.0)
self.reward_opt.step()
total_loss += loss.item()
return total_loss / n_updates
We generate preference ribs from the collected trajectories and train a reward model for the process using the Bradley-Terry formulation. We compare trajectory-level scores, control probabilities, and update the reward model to show which behaviors seem better. This allows us to study the large, discrete, rewards of learned actions that guide the agent even when the environment itself is slippery. Look The complete code manual.
def train_policy(self, n_updates=3, gamma=0.98):
if len(self.trajectories) < 5:
return 0.0
total_loss = 0.0
for _ in range(n_updates):
traj = random.choice(list(self.trajectories))
with torch.no_grad():
process_rewards = self.reward_model(traj['states']).squeeze()
shaped_rewards = traj['rewards'] + 0.1 * process_rewards
returns = []
G = 0
for r in reversed(shaped_rewards.tolist()):
G = r + gamma * G
returns.insert(0, G)
returns = torch.FloatTensor(returns)
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
logits, values = self.policy(traj['states'])
log_probs = F.log_softmax(logits, dim=-1)
action_log_probs = log_probs.gather(1, traj['actions'].unsqueeze(1))
advantages = returns - values.squeeze().detach()
policy_loss = -(action_log_probs.squeeze() * advantages).mean()
value_loss = F.mse_loss(values.squeeze(), returns)
entropy = -(F.softmax(logits, dim=-1) * log_probs).sum(-1).mean()
loss = policy_loss + 0.5 * value_loss - 0.01 * entropy
self.policy_opt.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.policy.parameters(), 1.0)
self.policy_opt.step()
total_loss += loss.item()
return total_loss / n_updates
def train_oprl(episodes=500, render_interval=100):
env = MazeEnv(size=8)
agent = OPRLAgent(state_dim=64, action_dim=4, lr=3e-4)
returns, reward_losses, policy_losses = [], [], []
success_rate = []
for ep in range(episodes):
epsilon = max(0.05, 0.5 - ep / 1000)
traj = agent.collect_trajectory(env, epsilon)
returns.append(traj['return'])
if ep % 2 == 0 and ep > 10:
agent.generate_preference()
if ep > 20 and ep % 2 == 0:
rew_loss = agent.train_reward_model(n_updates=3)
reward_losses.append(rew_loss)
if ep > 10:
pol_loss = agent.train_policy(n_updates=2)
policy_losses.append(pol_loss)
success = 1 if traj['return'] > 5 else 0
success_rate.append(success)
if ep % render_interval == 0 and ep > 0:
test_env = MazeEnv(size=8)
agent.collect_trajectory(test_env, epsilon=0)
print(test_env.render())
return returns, reward_losses, policy_losses, success_rate
We train the policy using the set rewards generated by the reward model of the learned process. We manage returns, returns, valuations, and performance bonuses, enabling the agent to improve their strategy over time. We then create a complete training loop where the toy tests, the preferences accumulate, and both the reward model and the policy are continuously updated. Look The complete code manual.
print("Training OPRL Agent on Sparse Reward Maze...n")
returns, rew_losses, pol_losses, success = train_oprl(episodes=500, render_interval=250)
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
axes[0,0].plot(returns, alpha=0.3)
axes[0,0].plot(np.convolve(returns, np.ones(20)/20, mode="valid"), linewidth=2)
axes[0,0].set_xlabel('Episode')
axes[0,0].set_ylabel('Return')
axes[0,0].set_title('Agent Performance')
axes[0,0].grid(alpha=0.3)
success_smooth = np.convolve(success, np.ones(20)/20, mode="valid")
axes[0,1].plot(success_smooth, linewidth=2, color="green")
axes[0,1].set_xlabel('Episode')
axes[0,1].set_ylabel('Success Rate')
axes[0,1].set_title('Goal Success Rate')
axes[0,1].grid(alpha=0.3)
axes[1,0].plot(rew_losses, linewidth=2, color="orange")
axes[1,0].set_xlabel('Update Step')
axes[1,0].set_ylabel('Loss')
axes[1,0].set_title('Reward Model Loss')
axes[1,0].grid(alpha=0.3)
axes[1,1].plot(pol_losses, linewidth=2, color="red")
axes[1,1].set_xlabel('Update Step')
axes[1,1].set_ylabel('Loss')
axes[1,1].set_title('Policy Loss')
axes[1,1].grid(alpha=0.3)
plt.tight_layout()
plt.show()
print("OPRL Training Complete!")
print("Process rewards, preference learning, reward shaping, and online updates demonstrated.")
We visualize the power of learning by retrieving strategies, success rates, reward loss, and policy loss. We monitor how the agent's performance evolves as Oprl sets the reward state. At the end of the visual, we clearly see the impact of the rewards of the process of solving the challenging maze, which is exciting.
In conclusion, we see how Oprl transforms the results of a sparful execution into a rich online response that continuously guides the agent's performance. We look at the reward model of the process Learn preferences, structure the return signal, and speed up the policy's ability to reach the goal. With large mazes, various different skills, or real people's choice answer, we appreciate how OfRL offers a flexible and additional framework for the assignment of credit in decision-making activities. We conclude with a clear, hands-on understanding of how Oprl works and how we can extend it to advanced agentic RL settings.
Look The complete code manual and Paper. Feel free to take a look at ours GitHub page for tutorials, code and notebooks. Also, feel free to follow us Kind of stubborn and don't forget to join ours 100K + ML Subreddit and sign up Our newsletter. Wait! Do you telegraph? Now you can join us by telegraph.
AsifAzzaq is the CEO of MarktechPost Media Inc.. as a visionary entrepreneur and developer, Asifi is committed to harnessing the power of social intelligence for good. His latest effort is the launch of a media intelligence platform, MarktechPpost, which stands out for its deep understanding of machine learning and deep learning stories that are technically sound and easily understood by a wide audience. The platform sticks to more than two million monthly views, which shows its popularity among the audience.
Follow Marktechpost: Add us as a favorite source on Google.


![Black Forest Labs Releases FLUX.2 [klein]: Integrated Flow Models for Interactive Visual Intelligence Black Forest Labs Releases FLUX.2 [klein]: Integrated Flow Models for Interactive Visual Intelligence](https://i2.wp.com/www.marktechpost.com/wp-content/uploads/2026/01/blog-banner23-30-1024x731.png?w=390&resize=390,220&ssl=1)
