Back to Catalog
VariableMorphingComplexityEnvironment
A 12x12 morphing maze with parametric volatility (10-30% wall toggle), variable goal relocation (50-200 steps), and dynamic resource counts (2-5). Features a 75-dimensional observation space combining external state, experience storage, and enhanced self-observation metrics for studying adaptation to continuous environmental changes. Tests whether self-observing agents outperform external-only agents under varying morphing intensities.
Domain
navigation
Difficulty
hard
Observation
Box(shape=[75])
Action
Discrete(shape=[4])
Reward
dense_with_difficulty_scaling
Max Steps
1000
Version
v1
Tests (2/8)
syntaximportresetstepobs_spaceaction_spacereward_sanitydeterminism
Use via API
import kualia
env = kualia.make("variablemorphingcomplexityenvironment")
obs, info = env.reset()Environment Code
7194 charsimport gymnasium as gym
import numpy as np
from typing import Tuple, Dict, Any, Optional
from collections import deque
class ParametricMorphingMazeEnv(gym.Env):
"""
Parametric Morphing Maze Environment.
A 12x12 grid navigation task with continuous structural changes after every action.
Designed to test whether agents with self-observation capabilities (behavioral tracking,
meta-cognitive metrics) adapt better to varying levels of environmental volatility
compared to agents with only external state observations.
Observation Space (75 dims):
- 0-45: External state (agent pos, goal pos, 6x6 local view, distance, progress,
goal direction, morphing rate, steps since relocation)
- 46-64: Experience storage (last 12 actions, last 6 rewards, cumulative reward)
- 65-74: Enhanced self-observation (distance trend, collision rate, efficiency,
diversity, momentum, oscillation, adaptation metrics, predictability)
Action Space: Discrete(4) - UP=0, DOWN=1, LEFT=2, RIGHT=3
Reward: Dense with difficulty scaling, loop detection penalty, and stagnation penalty.
"""
metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 10}
GRID_SIZE: int = 12
MAX_STEPS: int = 600
OBS_DIM: int = 75
LOCAL_VIEW_SIZE: int = 6
# Action mappings: UP, DOWN, LEFT, RIGHT
DR: np.ndarray = np.array([-1, 1, 0, 0], dtype=np.int32)
DC: np.ndarray = np.array([0, 0, -1, 1], dtype=np.int32)
# Reward constants
R_GOAL_BASE: float = 15.0
R_STEP: float = -0.05
R_WALL: float = -0.8
R_PROGRESS: float = 0.5
R_RESOURCE: float = 1.5
R_LOOP: float = -0.3
R_STAGNATION: float = -0.2
MIN_REWARD_THRESHOLD: float = -50.0
def __init__(
self,
render_mode: Optional[str] = None,
morphing_rate: float = 0.2,
goal_relocate_interval: int = 100,
num_resources: int = 3
) -> None:
super().__init__()
self.render_mode = render_mode
self.morphing_rate = float(np.clip(morphing_rate, 0.1, 0.3))
self.goal_relocate_interval = int(np.clip(goal_relocate_interval, 50, 200))
self.num_resources = int(np.clip(num_resources, 2, 5))
self.observation_space = gym.spaces.Box(
low=-1.0, high=1.0, shape=(self.OBS_DIM,), dtype=np.float32
)
self.action_space = gym.spaces.Discrete(4)
# State variables (initialized in reset)
self.agent_pos: Optional[np.ndarray] = None
self.goal_pos: Optional[np.ndarray] = None
self.walls: Optional[np.ndarray] = None
self.resources: set = set()
self.steps: int = 0
self.cumulative_reward: float = 0.0
self.prev_dist: float = 0.0
# Experience storage
self.action_history: deque = deque(maxlen=12)
self.reward_history: deque = deque(maxlen=6)
# Self-observation tracking
self.collision_count: int = 0
self.positions_history: deque = deque(maxlen=20)
self.distance_history: deque = deque(maxlen=10)
self.reward_trend: deque = deque(maxlen=10)
self.action_counts: np.ndarray = np.zeros(4, dtype=np.float32)
self.steps_since_relocation: int = 0
self.goal_relocation_count: int = 0
def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[Dict[str, Any]] = None
) -> Tuple[np.ndarray, Dict[str, Any]]:
super().reset(seed=seed)
self.steps = 0
self.cumulative_reward = 0.0
self.collision_count = 0
self.steps_since_relocation = 0
self.goal_relocation_count = 0
# Initialize maze walls (random 20% density)
self.walls = np.zeros((self.GRID_SIZE, self.GRID_SIZE), dtype=bool)
for r in range(self.GRID_SIZE):
for c in range(self.GRID_SIZE):
if self.np_random.random() < 0.2:
self.walls[r, c] = True
# Initialize agent and goal positions
self.agent_pos = self._random_free_cell()
self.goal_pos = self._random_free_cell()
# Initialize resources
self.resources = set()
for _ in range(self.num_resources):
pos = self._random_free_cell()
self.resources.add(tuple(pos.tolist()))
# Clear histories
self.action_history.clear()
self.reward_history.clear()
self.positions_history.clear()
self.distance_history.clear()
self.reward_trend.clear()
self.action_counts = np.zeros(4, dtype=np.float32)
self.prev_dist = self._manhattan_distance(self.agent_pos, self.goal_pos)
obs = self._get_obs()
info: Dict[str, Any] = {}
return obs, info
def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, Dict[str, Any]]:
action = int(np.clip(action, 0, 3))
self.steps += 1
self.steps_since_relocation += 1
self.action_counts[action] += 1.0
# Track position for stagnation/loop detection
self.positions_history.append(tuple(self.agent_pos.tolist()))
# Morph maze: toggle walls with probability morphing_rate
morphing_changes = 0
for r in range(self.GRID_SIZE):
for c in range(self.GRID_SIZE):
# Don't morph agent or goal position
if (r == self.agent_pos[0] and c == self.agent_pos[1]) or \
(r == self.goal_pos[0] and c == self.goal_pos[1]):
continue
if self.np_random.random() < self.morphing_rate:
self.walls[r, c] = not self.walls[r, c]
morphing_changes += 1
# Remove resource if wall placed on it
if self.walls[r, c] and (r, c) in self.resources:
self.resources.remove((r, c))
# Relocate goal if interval reached
goal_relocated = False
if self.steps_since_relocation >= self.goal_relocate_interval:
self.goal_pos = self._random_free_cell()
self.steps_since_relocation = 0
self.goal_relocation_count += 1
goal_relocated = True
# Execute action
new_r = int(self.agent_pos[0] + self.DR[action])
new_c = int(self.agent_pos[1] + self.DC[action])
collision = False
if 0 <= new_r < self.GRID_SIZE and 0 <= new_c < self.GRID_SIZE:
if not self.walls[new_r, new_c]:
self.agent_pos = np.array([new_r, new_c], dtype=np.int32)
else:
collision = True
else:
collision = True
if collision:
self.collision_count += 1
# Check resource collection
resource_collected = False
pos_tuple = tuple(self.agent_pos.tolist())
if pos_tuple in self.resources:
self.resources.remove(pos_tuple)
resource_collected = True