Back to Catalog
Morphing Grid Navigation
A 10x10 partially observable grid world where the agent navigates from bottom-left to a dynamically relocating goal while collecting resources. After every action, the environment stochastically morphs: walls toggle with 30% probability per cell, the goal teleports, and resources shift positions. The agent receives a 5x5 local view of walls and relative vectors to the goal and nearest resources. Features anti-oscillation and stagnation penalties to prevent reward hacking.
Domain
navigation
Difficulty
hard
Observation
Box(shape=[41])
Action
Discrete(shape=[1])
Reward
composite
Max Steps
1000
Version
v1
Tests (8/8)
syntaximportresetstepobs_spaceaction_spacereward_sanitydeterminism
Use via API
import kualia
env = kualia.make("morphing-grid-navigation")
obs, info = env.reset()Environment Code
15843 charsimport gymnasium as gym
import numpy as np
from typing import Tuple, Dict, Any, Optional
class MorphingGridNavigationEnv(gym.Env):
"""
Morphing Grid Navigation Environment.
A 10x10 grid where the agent must adapt to drastic environmental changes
after every action. Walls stochastically toggle, the goal relocates, and
resources shift. The agent receives partial observability via a local
wall view and relative positions to targets.
Observation Space (41 dims, float32):
- [0:2]: Agent position (row, col) normalized to [0, 1]
- [2:27]: 5x5 local wall view (binary, padded with 1s), flattened
- [27:29]: Relative goal position (dr/9, dc/9) clipped to [-1, 1]
- [29:35]: Relative positions of up to 3 nearest resources (6 dims)
- [35:38]: Binary mask for resource existence (3 dims)
- [38]: Collected resources count normalized
- [39]: Episode progress (step/max_steps)
- [40]: Stagnation flag (1.0 if stuck last step, else 0.0)
Action Space (Discrete 4):
0: UP, 1: DOWN, 2: LEFT, 3: RIGHT
Reward Structure:
- Goal reach: +10.0
- Resource collection: +1.0
- Step penalty: -0.1
- Wall collision: -5.0
- Anti-hacking penalties: Oscillation (-0.5), Stagnation (-0.3)
"""
metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 10}
GRID_SIZE: int = 10
MAX_STEPS: int = 500
NUM_RESOURCES: int = 3
WALL_TOGGLE_PROB: float = 0.3
VIEW_SIZE: int = 5 # Must be odd
INITIAL_WALL_DENSITY: float = 0.2
REWARD_GOAL: float = 10.0
REWARD_RESOURCE: float = 1.0
REWARD_STEP: float = -0.1
REWARD_WALL_HIT: float = -5.0
REWARD_OSCILLATION: float = -0.5
REWARD_STAGNATION: float = -0.3
STAGNATION_THRESHOLD: int = 3
def __init__(self, render_mode: Optional[str] = None):
super().__init__()
self.render_mode = render_mode
self.window = None
self.clock = None
# Observation space: 41 dimensions as described in docstring
obs_shape = self._get_observation_shape()
self.observation_space = gym.spaces.Box(
low=-1.0, high=1.0, shape=(obs_shape,), dtype=np.float32
)
self.action_space = gym.spaces.Discrete(4)
# State variables (initialized in reset)
self.agent_pos: np.ndarray = np.zeros(2, dtype=np.int32)
self.goal_pos: np.ndarray = np.zeros(2, dtype=np.int32)
self.walls: np.ndarray = np.zeros((self.GRID_SIZE, self.GRID_SIZE), dtype=np.bool_)
self.resources: list = []
self.collected_count: int = 0
self.step_count: int = 0
# Anti-hacking tracking
self.prev_pos: Optional[np.ndarray] = None
self.stagnation_counter: int = 0
self.last_move_direction: Optional[int] = None
self._pos_history: list = []
def _get_observation_shape(self) -> int:
"""Calculate observation dimensionality."""
pos_dims = 2
wall_view_dims = self.VIEW_SIZE * self.VIEW_SIZE
goal_dims = 2
resource_dims = self.NUM_RESOURCES * 2
resource_mask_dims = self.NUM_RESOURCES
misc_dims = 3 # collected count, progress, stagnation flag
return pos_dims + wall_view_dims + goal_dims + resource_dims + resource_mask_dims + misc_dims
def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[Dict[str, Any]] = None
) -> Tuple[np.ndarray, Dict[str, Any]]:
super().reset(seed=seed)
# Initialize grid: bottom-left is (GRID_SIZE-1, 0)
self.agent_pos = np.array([self.GRID_SIZE - 1, 0], dtype=np.int32)
self.prev_pos = None
self.stagnation_counter = 0
self.last_move_direction = None
self.step_count = 0
self.collected_count = 0
self._pos_history = []
# Initialize walls with random density
self.walls = self.np_random.random((self.GRID_SIZE, self.GRID_SIZE)) < self.INITIAL_WALL_DENSITY
self.walls[self.agent_pos[0], self.agent_pos[1]] = False
# Initialize goal
self.goal_pos = self._sample_free_cell(exclude=[tuple(self.agent_pos)])
# Initialize resources
self.resources = []
for _ in range(self.NUM_RESOURCES):
pos = self._sample_free_cell(exclude=[tuple(self.agent_pos), tuple(self.goal_pos)] + self.resources)
self.resources.append(pos)
obs = self._get_obs()
info = {"episode_step": 0, "collected": 0}
return obs, info
def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, Dict[str, Any]]:
assert self.action_space.contains(action), f"Invalid action {action}"
# Decode action
dr, dc = 0, 0
if action == 0: # UP
dr = -1
elif action == 1: # DOWN
dr = 1
elif action == 2: # LEFT
dc = -1
elif action == 3: # RIGHT
dc = 1
# Calculate intended position
intended_pos = self.agent_pos + np.array([dr, dc], dtype=np.int32)
# Check bounds and walls
hit_wall = False
if (intended_pos[0] < 0 or intended_pos[0] >= self.GRID_SIZE or
intended_pos[1] < 0 or intended_pos[1] >= self.GRID_SIZE or
self.walls[intended_pos[0], intended_pos[1]]):
hit_wall = True
new_pos = self.agent_pos.copy()
else:
new_pos = intended_pos
# Update position history
self.prev_pos = self.agent_pos.copy()
self.agent_pos = new_pos
# Check stagnation (position didn't change)
if np.array_equal(self.agent_pos, self.prev_pos):
self.stagnation_counter += 1
else:
self.stagnation_counter = 0
# Check resource collection
resource_reward = 0.0
new_resources = []
for r_pos in self.resources:
if np.array_equal(self.agent_pos, np.array(r_pos)):
resource_reward += self.REWARD_RESOURCE
self.collected_count += 1
# Respawn resource elsewhere immediately
exclude = [tuple(self.agent_pos), tuple(self.goal_pos)] + new_resources
new_pos_res = self._sample_free_cell(exclude=exclude)
new_resources.append(new_pos_res)
else:
new_resources.append(r_pos)
self.resources = new_resources
# Check goal reach
terminated = False
goal_reward = 0.0
if np.array_equal(self.agent_pos, self.goal_pos):
goal_reward = self.REWARD_GOAL
terminated = True
# Morph environment (walls, goal, resources shift)
self._morph_environment()
# Calculate anti-hacking penalties
# Oscillation: check if we moved back to the position from 2 steps ago
osc_penalty = 0.0
if self.prev_pos is not None and len(self._pos_history) >= 1:
if np.array_equal(self.agent_pos, self._pos_history[-1]):
osc_penalty = self.REWARD_OSCILLATION
# Update history after check
self._pos_history.append(self.prev_pos.copy())
if len(self._pos_history) > 2:
self._pos_history.pop(0)
# Stagnation penalty
stag_penalty = 0.0
if self.stagnation_counter >= self.STAGNATION_THRESHOLD:
stag_penalty = self.REWARD_STAGNATION
# Step penalty
step_penalty = self.REWARD_STEP
# Wall collision penalty
wall_penalty = self.REWARD_WALL_HIT if hit_wall else 0.0
# Sum rewards
total_reward = (
goal_reward + resource_reward + step_penalty +
wall_penalty + osc_penalty + stag_penalty
)
# Clip to range
total_reward = float(np.clip(total_reward, -10.0, 10.0))
self.step_count += 1
truncated = self.step_count >= self.MAX_STEPS
obs = self._get_obs()
info = {
"episode_step": self.step_count,
"collected": self.collected_count,
"hit_wall": hit_wall,
"reward_components": {
"goal": goal_reward,
"resource": resource_reward,
"step_penalty": step_penalty,
"wall_collision": wall_penalty,
"oscillation_penalty": osc_penalty,
"stagnation_penalty": stag_penalty,
"total": total_reward
}
}
return obs, total_reward, terminated, truncated, info
def _morph_environment(self) -> None:
"""Apply stochastic morphing: walls toggle, goal and resources relocate."""
# Toggle walls with 30% probability per cell
toggle_mask = self.np_random.random((self.GRID_SIZE, self.GRID_SIZE)) < self.WALL_TOGGLE_PROB
self.walls ^= toggle_mask
# Ensure agent and goal positions remain free
self.walls[self.agent_pos[0], self.agent_pos[1]] = False
self.walls[self.goal_pos[0], self.goal_pos[1]] = False
# Relocate goal to new random free position
self.goal_pos = np.array(
self._sample_free_cell(exclude=[tuple(self.agent_pos)]),
dtype=np.int32
)
# Shift resources to new random positions
new_resources = []
for _ in range(self.NUM_RESOURCES):
exclude = [tuple(self.agent_pos), tuple(self.goal_pos)] + new_resources
new_pos = self._sample_free_cell(exclude=exclude)
new_resources.append(new_pos)
self.resources = new_resources
def _sample_free_cell(self, exclude: list) -> Tuple[int, int]:
"""Sample a random free cell not in exclude list."""
max_attempts = 1000
for _ in range(max_attempts):
r = self.np_random.integers(0, self.GRID_SIZE)
c = self.np_random.integers(0, self.GRID_SIZE)
if not self.walls[r, c] and (r, c) not in exclude:
return (r, c)
# Fallback: return first free cell found
for r in range(self.GRID_SIZE):
for c in range(self.GRID_SIZE):
if not self.walls[r, c] and (r, c) not in exclude:
return (r, c)
return (0, 0) # Should never reach here in valid configs
def _get_obs(self) -> np.ndarray:
"""Construct observation vector."""
obs_parts = []
# 1. Agent position normalized [0, 1]
agent_norm = self.agent_pos.astype(np.float32) / (self.GRID_SIZE - 1)
obs_parts.append(agent_norm)
# 2. Local wall view (5x5) centered on agent
half_view = self.VIEW_SIZE // 2
wall_view = np.ones((self.VIEW_SIZE, self.VIEW_SIZE), dtype=np.float32) # 1 = wall
for i in range(self.VIEW_SIZE):
for j in range(self.VIEW_SIZE):
grid_r = self.agent_pos[0] - half_view + i
grid_c = self.agent_pos[1] - half_view + j
if 0 <= grid_r < self.GRID_SIZE and 0 <= grid_c < self.GRID_SIZE:
wall_view[i, j] = float(self.walls[grid_r, grid_c])
obs_parts.append(wall_view.flatten())
# 3. Relative goal position normalized to [-1, 1]
goal_rel = (self.goal_pos - self.agent_pos).astype(np.float32) / (self.GRID_SIZE - 1)
goal_rel = np.clip(goal_rel, -1.0, 1.0)
obs_parts.append(goal_rel)
# 4. Nearest resources relative positions and mask
# Calculate distances to all resources
resource_rels = []
distances = []
for r_pos in self.resources:
rel = (np.array(r_pos) - self.agent_pos).astype(np.float32) / (self.GRID_SIZE - 1)
dist = np.abs(rel[0]) + np.abs(rel[1]) # Manhattan distance proxy
resource_rels.append(rel)
distances.append(dist)
# Sort by distance and take top 3
if len(distances) > 0:
sorted_indices = np.argsort(distances)[:self.NUM_RESOURCES]
else:
sorted_indices = []
resource_obs = np.zeros(self.NUM_RESOURCES * 2, dtype=np.float32)
resource_mask = np.zeros(self.NUM_RESOURCES, dtype=np.float32)
for idx, res_idx in enumerate(sorted_indices):
if idx < self.NUM_RESOURCES:
resource_obs[idx*2:(idx+1)*2] = resource_rels[res_idx]
resource_mask[idx] = 1.0
obs_parts.append(resource_obs)
obs_parts.append(resource_mask)
# 5. Collected count normalized
collected_norm = np.array([float(self.collected_count) / max(1, self.NUM_RESOURCES)], dtype=np.float32)
obs_parts.append(collected_norm)
# 6. Progress normalized
progress_norm = np.array([float(self.step_count) / self.MAX_STEPS], dtype=np.float32)
obs_parts.append(progress_norm)
# 7. Stagnation flag
stagnation_flag = np.array([1.0 if self.stagnation_counter > 0 else 0.0], dtype=np.float32)
obs_parts.append(stagnation_flag)
# Concatenate
obs = np.concatenate(obs_parts).astype(np.float32)
# Ensure shape matches
expected_shape = self._get_observation_shape()
if obs.shape[0] != expected_shape:
# Pad or truncate if necessary (should not happen)
if obs.shape[0] < expected_shape:
obs = np.pad(obs, (0, expected_shape - obs.shape[0]), mode='constant')
else:
obs = obs[:expected_shape]
return obs
def render(self) -> Optional[np.ndarray]:
"""Render the environment as RGB array or print to console."""
if self.render_mode == "rgb_array":
# Create simple RGB representation
cell_size = 20
rgb = np.zeros((self.GRID_SIZE * cell_size, self.GRID_SIZE * cell_size, 3), dtype=np.uint8)
for r in range(self.GRID_SIZE):
for c in range(self.GRID_SIZE):
color = [255, 255, 255] # White background
if self.walls[r, c]:
color = [0, 0, 0] # Black wall
elif np.array_equal([r, c], self.agent_pos):
color = [0, 0, 255] # Blue agent
elif np.array_equal([r, c], self.goal_pos):
color = [0, 255, 0] # Green goal
elif any(np.array_equal([r, c], np.array(res)) for res in self.resources):
color = [255, 215, 0] # Gold resource
rgb[r*cell_size:(r+1)*cell_size, c*cell_size:(c+1)*cell_size] = color
return rgb
elif self.render_mode == "human":
# Console rendering
grid = [['.' for _ in range(self.GRID_SIZE)] for _ in range(self.GRID_SIZE)]
for r, c in self.resources:
grid[r][c] = 'R'
if self.walls.any():
for r in range(self.GRID_SIZE):
for c in range(self.GRID_SIZE):
if self.walls[r, c]:
grid[r][c] = '#'
grid[self.goal_pos[0]][self.goal_pos[1]] = 'G'
grid[self.agent_pos[0]][self.agent_pos[1]] = 'A'
print("\n".join([" ".join(row) for row in grid]))
return None
def close(self) -> None:
"""Clean up any resources."""
pass