Back to Catalog

MorphingMazeAdaptiveIntelligence

A 10x10 grid maze with continuous morphing dynamics where walls toggle probabilistically after every action, the goal relocates periodically, and resources respawn. The 63-dimensional observation space includes external state (position, local wall view), experience storage (action/reward history), self-observation metrics (distance trends, collision rates, exploration efficiency), and meta-awareness signals (environment change magnitude, pattern familiarity). Designed to test whether self-observational capabilities improve adaptation speed in non-stationary environments.

Domain

navigation

Difficulty

hard

Observation

Box(shape=[63])

Action

Discrete(shape=[4])

Reward

dense

Max Steps

1000

Version

v1

Tests (8/8)

syntaximportresetstepobs_spaceaction_spacereward_sanitydeterminism

Use via API

import kualia env = kualia.make("morphingmazeadaptiveintelligence") obs, info = env.reset()

Environment Code

20109 chars
import gymnasium as gym
import numpy as np
from typing import Tuple, Dict, Any, Optional


class MorphingMazeSelfObsEnv(gym.Env):
    """
    Morphing Maze with Self-Observation.
    
    A 10x10 grid where walls stochastically toggle, the goal relocates,
    and resources shift — after EVERY action. The agent must navigate to
    the goal while adapting to continuous structural changes.
    
    Observation Space (63 dims): External state (33), Experience storage (14),
    Self-observation metrics (12), Meta-awareness (4).
    
    Reward-hacking countermeasures: Anti-oscillation penalty and anti-stagnation
    penalty discourage repetitive non-progressive behaviors.
    """
    
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 10}

    GRID_SIZE: int = 10
    MAX_STEPS: int = 500
    NUM_RESOURCES: int = 3
    WALL_TOGGLE_PROB: float = 0.15
    VIEW_SIZE: int = 5
    INITIAL_WALL_DENSITY: float = 0.2
    GOAL_RELOCATE_MIN: int = 80
    GOAL_RELOCATE_MAX: int = 150
    HISTORY_LEN: int = 8
    SHORT_HISTORY: int = 5
    OBS_DIM: int = 63

    # Rewards
    R_GOAL: float = 10.0
    R_STEP: float = -0.1
    R_WALL: float = -0.5
    R_PROGRESS: float = 0.3
    R_RESOURCE: float = 1.0
    R_OSCILLATION: float = -0.2
    R_STAGNATION: float = -0.05

    # Directions: UP, DOWN, LEFT, RIGHT
    DR = np.array([-1, 1, 0, 0], dtype=np.int32)
    DC = np.array([0, 0, -1, 1], dtype=np.int32)

    def __init__(self, render_mode: Optional[str] = None) -> None:
        super().__init__()
        self.render_mode = render_mode

        self.action_space = gym.spaces.Discrete(4)
        self.observation_space = gym.spaces.Box(
            low=-1.0, high=1.0, shape=(self.OBS_DIM,), dtype=np.float32
        )

        # Core state
        self.agent_pos: np.ndarray = np.zeros(2, dtype=np.int32)
        self.goal_pos: np.ndarray = np.zeros(2, dtype=np.int32)
        self.walls: np.ndarray = np.zeros((self.GRID_SIZE, self.GRID_SIZE), dtype=np.bool_)
        self.prev_walls: Optional[np.ndarray] = None
        self.resources: list[Tuple[int, int]] = []
        
        # Episode counters
        self.step_count: int = 0
        self.goal_relocate_timer: int = 0
        self.goal_relocate_interval: int = 100
        
        # History buffers
        self.action_history: np.ndarray = np.full(self.HISTORY_LEN, -1.0, dtype=np.float32)
        self.reward_history: np.ndarray = np.zeros(self.SHORT_HISTORY, dtype=np.float32)
        self.distance_history: np.ndarray = np.zeros(self.HISTORY_LEN, dtype=np.float32)
        self.wall_hit_history: np.ndarray = np.zeros(self.HISTORY_LEN, dtype=np.bool_)
        self.position_history: list[Tuple[int, int]] = []
        self.local_view_history: list[bytes] = []
        self.visited_cells: set[Tuple[int, int]] = set()
        
        # Metrics for self-observation
        self.cumulative_reward: float = 0.0
        self.consecutive_stuck_steps: int = 0
        self.pattern_reward_map: Dict[bytes, float] = {}
        self.steps_since_collision: int = 0
        
    def reset(
        self, *, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None
    ) -> Tuple[np.ndarray, Dict[str, Any]]:
        super().reset(seed=seed)
        
        # Initialize walls with random density
        self.walls = self.np_random.random((self.GRID_SIZE, self.GRID_SIZE)) < self.INITIAL_WALL_DENSITY
        
        # Agent starts bottom-left
        self.agent_pos = np.array([self.GRID_SIZE - 1, 0], dtype=np.int32)
        self.walls[self.agent_pos[0], self.agent_pos[1]] = False
        
        # Goal at random free cell
        self.goal_pos = np.array(self._sample_free([tuple(self.agent_pos)]), dtype=np.int32)
        self.walls[self.goal_pos[0], self.goal_pos[1]] = False
        
        # Initialize resources
        self.resources = []
        exclude = [tuple(self.agent_pos), tuple(self.goal_pos)]
        for _ in range(self.NUM_RESOURCES):
            pos = self._sample_free(exclude + self.resources)
            self.resources.append(pos)
            
        # Reset timers
        self.step_count = 0
        self.goal_relocate_timer = 0
        self.goal_relocate_interval = int(self.np_random.integers(
            self.GOAL_RELOCATE_MIN, self.GOAL_RELOCATE_MAX
        ))
        
        # Reset histories
        self.action_history = np.full(self.HISTORY_LEN, -1.0, dtype=np.float32)
        self.reward_history = np.zeros(self.SHORT_HISTORY, dtype=np.float32)
        self.distance_history = np.full(
            self.HISTORY_LEN, 
            self._manhattan() / (2 * self.GRID_SIZE), 
            dtype=np.float32
        )
        self.wall_hit_history = np.zeros(self.HISTORY_LEN, dtype=np.bool_)
        self.position_history = [tuple(self.agent_pos)]
        self.local_view_history = []
        self.visited_cells = {tuple(self.agent_pos)}
        self.prev_walls = self.walls.copy()
        self.cumulative_reward = 0.0
        self.consecutive_stuck_steps = 0
        self.steps_since_collision = 0
        self.pattern_reward_map = {}
        
        return self._get_obs(), {"step": 0}

    def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, Dict[str, Any]]:
        assert self.action_space.contains(action)
        
        # Store pre-step state
        prev_pos = self.agent_pos.copy()
        prev_dist = self._manhattan()
        
        # Update action history
        self.action_history = np.roll(self.action_history, -1)
        self.action_history[-1] = (action / 1.5) - 1.0  # Normalize [0,3] -> [-1, 1]
        
        # Execute movement
        nr = self.agent_pos[0] + self.DR[action]
        nc = self.agent_pos[1] + self.DC[action]
        
        hit_wall = False
        if nr < 0 or nr >= self.GRID_SIZE or nc < 0 or nc >= self.GRID_SIZE or self.walls[nr, nc]:
            hit_wall = True
            self.consecutive_stuck_steps += 1
            self.steps_since_collision = 0
        else:
            self.agent_pos[0] = nr
            self.agent_pos[1] = nc
            self.consecutive_stuck_steps = 0
            self.steps_since_collision += 1
            
        # Update wall hit history
        self.wall_hit_history = np.roll(self.wall_hit_history, -1)
        self.wall_hit_history[-1] = hit_wall
        
        # Update position and visitation tracking
        self.position_history.append(tuple(self.agent_pos))
        if len(self.position_history) > self.HISTORY_LEN * 2:
            self.position_history = self.position_history[-self.HISTORY_LEN * 2:]
        self.visited_cells.add(tuple(self.agent_pos))
        
        # Resource collection
        resource_reward = 0.0
        new_resources = []
        for rpos in self.resources:
            if tuple(self.agent_pos) == rpos:
                resource_reward += self.R_RESOURCE
                # Respawn resource
                exclude = [tuple(self.agent_pos), tuple(self.goal_pos)] + new_resources
                new_resources.append(self._sample_free(exclude))
            else:
                new_resources.append(rpos)
        self.resources = new_resources
        
        # Check goal termination
        terminated = bool(np.array_equal(self.agent_pos, self.goal_pos))
        goal_reward = self.R_GOAL if terminated else 0.0
        
        # Morph environment (continuous dynamics)
        self.prev_walls = self.walls.copy()
        self._morph()
        
        # Calculate rewards
        new_dist = self._manhattan()
        progress = (prev_dist - new_dist) / (2 * self.GRID_SIZE)
        progress_reward = self.R_PROGRESS * max(0.0, progress)
        
        wall_penalty = self.R_WALL if hit_wall else 0.0
        
        # Reward-hacking countermeasures
        oscillation_penalty = 0.0
        if self._detect_oscillation():
            oscillation_penalty = self.R_OSCILLATION
            
        stagnation_penalty = 0.0
        if self.consecutive_stuck_steps > 2:
            stagnation_penalty = self.R_STAGNATION * (self.consecutive_stuck_steps - 2)
        
        total_reward = (
            goal_reward + resource_reward + self.R_STEP + 
            progress_reward + wall_penalty + oscillation_penalty + stagnation_penalty
        )
        total_reward = float(np.clip(total_reward, -10.0, 10.0))
        
        # Update histories
        self.reward_history = np.roll(self.reward_history, -1)
        self.reward_history[-1] = total_reward
        self.cumulative_reward += total_reward
        
        self.distance_history = np.roll(self.distance_history, -1)
        self.distance_history[-1] = new_dist / (2 * self.GRID_SIZE)
        
        # Update local view history for meta-awareness
        current_view = self._local_view().tobytes()
        self.local_view_history.append(current_view)
        if len(self.local_view_history) > 20:
            self.local_view_history = self.local_view_history[-20:]
            
        # Update pattern-reward mapping for strategy-environment match
        if current_view in self.pattern_reward_map:
            # Exponential moving average
            self.pattern_reward_map[current_view] = (
                0.7 * self.pattern_reward_map[current_view] + 0.3 * total_reward
            )
        else:
            self.pattern_reward_map[current_view] = total_reward
        
        self.step_count += 1
        self.goal_relocate_timer += 1
        
        # Check if we should relocate goal
        if self.goal_relocate_timer >= self.goal_relocate_interval:
            self.goal_relocate_timer = 0
            self.goal_relocate_interval = int(self.np_random.integers(
                self.GOAL_RELOCATE_MIN, self.GOAL_RELOCATE_MAX
            ))
            exclude = [tuple(self.agent_pos)] + self.resources
            self.goal_pos = np.array(self._sample_free(exclude), dtype=np.int32)
        
        truncated = self.step_count >= self.MAX_STEPS
        
        info = {
            "step": self.step_count,
            "reward_components": {
                "goal": goal_reward,
                "resource": resource_reward,
                "step": self.R_STEP,
                "progress": progress_reward,
                "wall": wall_penalty,
                "oscillation": oscillation_penalty,
                "stagnation": stagnation_penalty
            }
        }
        
        return self._get_obs(), float(total_reward), bool(terminated), bool(truncated), info

    def _get_obs(self) -> np.ndarray:
        """Construct the 63-dimensional observation vector."""
        obs = np.zeros(self.OBS_DIM, dtype=np.float32)
        
        # External state (0-32)
        # Agent position (2) normalized to [-1, 1]
        obs[0] = (self.agent_pos[0] / (self.GRID_SIZE - 1)) * 2 - 1
        obs[1] = (self.agent_pos[1] / (self.GRID_SIZE - 1)) * 2 - 1
        
        # Goal position (2)
        obs[2] = (self.goal_pos[0] / (self.GRID_SIZE - 1)) * 2 - 1
        obs[3] = (self.goal_pos[1] / (self.GRID_SIZE - 1)) * 2 - 1
        
        # 5x5 local wall view (25) - flattened
        local_view = self._local_view()
        obs[4:29] = local_view.flatten().astype(np.float32) * 2 - 1  # [0,1] -> [-1,1]
        
        # Normalized Manhattan distance (1)
        max_dist = 2 * (self.GRID_SIZE - 1)
        obs[29] = self._manhattan() / max_dist
        
        # Episode progress (1)
        obs[30] = self.step_count / self.MAX_STEPS
        
        # Relative goal direction (2) - normalized vector
        dr = self.goal_pos[0] - self.agent_pos[0]
        dc = self.goal_pos[1] - self.agent_pos[1]
        norm = max(abs(dr), abs(dc), 1)
        obs[31] = dr / norm
        obs[32] = dc / norm
        
        # Experience storage (33-46)
        # Last 8 actions (8) - already normalized to [-1, 1]
        obs[33:41] = self.action_history
        
        # Last 5 rewards (5) - clip and normalize
        clipped_rewards = np.clip(self.reward_history / 10.0, -1.0, 1.0)
        obs[41:46] = clipped_rewards
        
        # Cumulative reward normalized (1)
        obs[46] = float(np.clip(self.cumulative_reward / 100.0, -1.0, 1.0))
        
        # Self-observation metrics (47-58)
        metrics = self._calculate_self_obs_metrics()
        obs[47:59] = metrics
        
        # Meta-awareness (59-62)
        meta = self._calculate_meta_awareness()
        obs[59:63] = meta
        
        return np.clip(obs, -1.0, 1.0)

    def _calculate_self_obs_metrics(self) -> np.ndarray:
        """Calculate 12 self-observation metrics."""
        metrics = np.zeros(12, dtype=np.float32)
        
        # 1. Distance trend slope (1) - simple linear regression on last 8 distances
        if self.step_count >= 2:
            x = np.arange(self.HISTORY_LEN)
            y = self.distance_history
            x_mean, y_mean = np.mean(x), np.mean(y)
            denom = np.sum((x - x_mean) ** 2)
            if denom > 1e-6:
                slope = np.sum((x - x_mean) * (y - y_mean)) / denom
                metrics[0] = np.clip(slope * 10, -1, 1)  # Scale up for sensitivity
        
        # 2. Wall collision rate (1)
        metrics[1] = np.mean(self.wall_hit_history) * 2 - 1
        
        # 3. Movement efficiency (1) - actual progress vs steps
        if self.step_count > 0:
            start_dist = self.distance_history[0] if self.distance_history[0] > 0 else 1.0
            current_dist = self.distance_history[-1]
            efficiency = (start_dist - current_dist) / (self.step_count / self.MAX_STEPS + 0.01)
            metrics[2] = np.clip(efficiency, -1, 1)
        
        # 4. Action diversity/entropy (1)
        if self.step_count >= self.HISTORY_LEN:
            unique_actions = len(np.unique(self.action_history))
            metrics[3] = (unique_actions / 4.0) * 2 - 1
        
        # 5. Reward momentum (1) - trend in recent rewards
        if self.step_count >= 2:
            recent = self.reward_history[-3:] if len(self.reward_history) >= 3 else self.reward_history
            if len(recent) >= 2:
                metrics[4] = np.clip(np.mean(np.diff(recent)) * 5, -1, 1)
        
        # 6. Oscillation detection (1) - binary encoded as -1 or 1
        metrics[5] = 1.0 if self._detect_oscillation() else -1.0
        
        # 7. Stagnation score (1)
        metrics[6] = np.clip(self.consecutive_stuck_steps / 5.0, 0, 1) * 2 - 1
        
        # 8. Progress rate (1) - normalized step count vs distance remaining
        if self._manhattan() > 0:
            progress_ratio = (self.step_count / self.MAX_STEPS) / (self._manhattan() / (2 * self.GRID_SIZE))
            metrics[7] = np.clip(1.0 - progress_ratio, -1, 1)
        
        # 9. Exploration coverage (1)
        total_cells = self.GRID_SIZE ** 2
        coverage = len(self.visited_cells) / total_cells
        metrics[8] = coverage * 2 - 1
        
        # 10. Strategy consistency (1) - variance in action history
        if self.step_count >= self.HISTORY_LEN:
            metrics[9] = np.clip(1.0 - np.std(self.action_history), -1, 1)
        
        # 11. Escape ability (1) - how quickly recovering from collisions
        if np.any(self.wall_hit_history):
            recent_hits = np.where(self.wall_hit_history)[0]
            if len(recent_hits) > 0 and self.steps_since_collision < 10:
                metrics[10] = 1.0 - (self.steps_since_collision / 10.0)
            else:
                metrics[10] = -1.0
        else:
            metrics[10] = 0.0
        
        # 12. Adaptation score (1) - improvement in reward over time
        if self.step_count >= self.SHORT_HISTORY:
            first_half = np.mean(self.reward_history[:self.SHORT_HISTORY//2])
            second_half = np.mean(self.reward_history[self.SHORT_HISTORY//2:])
            metrics[11] = np.clip((second_half - first_half) * 2, -1, 1)
        
        return metrics

    def _calculate_meta_awareness(self) -> np.ndarray:
        """Calculate 4 meta-awareness signals."""
        meta = np.zeros(4, dtype=np.float32)
        
        # 1. Environment change magnitude (1) - Hamming distance between wall grids
        if self.prev_walls is not None:
            changes = np.sum(self.walls != self.prev_walls)
            max_changes = self.GRID_SIZE ** 2
            meta[0] = (changes / max_changes) * 2 - 1
        
        # 2. Strategy-environment match (1) - avg reward for current pattern
        current_view = self._local_view().tobytes()
        if current_view in self.pattern_reward_map:
            avg_reward = self.pattern_reward_map[current_view]
            meta[1] = np.clip(avg_reward / 5.0, -1, 1)
        else:
            meta[1] = 0.0  # Unknown pattern
        
        # 3. Recovery speed after collision (1)
        if np.any(self.wall_hit_history):
            meta[2] = 1.0 - min(self.steps_since_collision / 10.0, 1.0)
        else:
            meta[2] = 1.0  # No recent collisions, full recovery
        
        # 4. Situation novelty (1) - how often we've seen this pattern
        if current_view in self.pattern_reward_map:
            # Less frequent = more novel
            meta[3] = -0.5  # Familiar
        else:
            meta[3] = 1.0  # Novel
        
        return meta

    def _manhattan(self) -> int:
        """Calculate Manhattan distance to goal."""
        return abs(self.agent_pos[0] - self.goal_pos[0]) + abs(self.agent_pos[1] - self.goal_pos[1])

    def _sample_free(self, exclude: list[Tuple[int, int]]) -> Tuple[int, int]:
        """Sample a random free cell not in exclude list."""
        for _ in range(1000):
            r = int(self.np_random.integers(0, self.GRID_SIZE))
            c = int(self.np_random.integers(0, self.GRID_SIZE))
            if not self.walls[r, c] and (r, c) not in exclude:
                return (r, c)
        # Fallback: find any free cell
        for r in range(self.GRID_SIZE):
            for c in range(self.GRID_SIZE):
                if not self.walls[r, c] and (r, c) not in exclude:
                    return (r, c)
        return (0, 0)  # Should never happen

    def _local_view(self) -> np.ndarray:
        """Extract 5x5 local view around agent. 1 = wall/out of bounds, 0 = free."""
        view = np.ones((self.VIEW_SIZE, self.VIEW_SIZE), dtype=np.bool_)
        half = self.VIEW_SIZE // 2
        
        for dr in range(-half, half + 1):
            for dc in range(-half, half + 1):
                r, c = self.agent_pos[0] + dr, self.agent_pos[1] + dc
                vr, vc = dr + half, dc + half
                if 0 <= r < self.GRID_SIZE and 0 <= c < self.GRID_SIZE:
                    view[vr, vc] = self.walls[r, c]
                else:
                    view[vr, vc] = True  # Out of bounds is wall
        
        return view

    def _morph(self) -> None:
        """Toggle walls probabilistically and relocate resources."""
        # Toggle walls
        toggle_mask = self.np_random.random((self.GRID_SIZE, self.GRID_SIZE)) < self.WALL_TOGGLE_PROB
        self.walls = np.logical_xor(self.walls, toggle_mask)
        
        # Ensure agent and goal positions remain free
        self.walls[self.agent_pos[0], self.agent_pos[1]] = False
        self.walls[self.goal_pos[0], self.goal_pos[1]] = False
        
        # Occasionally respawn resources
        if self.np_random.random() < 0.1:
            exclude = [tuple(self.agent_pos), tuple(self.goal_pos)]
            new_resources = []
            for rpos in self.resources:
                if self.np_random.random() < 0.3:  # 30% chance to move
                    new_pos = self._sample_free(exclude + new_resources)
                    new_resources.append(new_pos)
                else:
                    new_resources.append(rpos)
            self.resources = new_resources

    def _detect_oscillation(self) -> bool:
        """Detect A->B->A->B pattern in recent positions."""
        if len(self.position_history) < 4:
            return False
        recent = self.position_history[-4:]
        return recent[0] == recent[2] and recent[1] == recent[3] and recent[0] != recent[1]

    def close(self) -> None:
        pass