Back to Catalog

VariableMorphingComplexityEnvironment

A 12x12 morphing maze with parametric volatility (10-30% wall toggle), variable goal relocation (50-200 steps), and dynamic resource counts (2-5). Features a 75-dimensional observation space combining external state, experience storage, and enhanced self-observation metrics for studying adaptation to continuous environmental changes. Tests whether self-observing agents outperform external-only agents under varying morphing intensities.

Domain

navigation

Difficulty

hard

Observation

Box(shape=[75])

Action

Discrete(shape=[4])

Reward

dense_with_difficulty_scaling

Max Steps

1000

Version

v1

Tests (2/8)

syntaximportresetstepobs_spaceaction_spacereward_sanitydeterminism

Use via API

import kualia env = kualia.make("variablemorphingcomplexityenvironment") obs, info = env.reset()

Environment Code

7194 chars
import gymnasium as gym
import numpy as np
from typing import Tuple, Dict, Any, Optional
from collections import deque


class ParametricMorphingMazeEnv(gym.Env):
    """
    Parametric Morphing Maze Environment.
    
    A 12x12 grid navigation task with continuous structural changes after every action.
    Designed to test whether agents with self-observation capabilities (behavioral tracking,
    meta-cognitive metrics) adapt better to varying levels of environmental volatility
    compared to agents with only external state observations.
    
    Observation Space (75 dims):
    - 0-45: External state (agent pos, goal pos, 6x6 local view, distance, progress, 
             goal direction, morphing rate, steps since relocation)
    - 46-64: Experience storage (last 12 actions, last 6 rewards, cumulative reward)
    - 65-74: Enhanced self-observation (distance trend, collision rate, efficiency, 
             diversity, momentum, oscillation, adaptation metrics, predictability)
    
    Action Space: Discrete(4) - UP=0, DOWN=1, LEFT=2, RIGHT=3
    
    Reward: Dense with difficulty scaling, loop detection penalty, and stagnation penalty.
    """
    
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 10}
    
    GRID_SIZE: int = 12
    MAX_STEPS: int = 600
    OBS_DIM: int = 75
    LOCAL_VIEW_SIZE: int = 6
    
    # Action mappings: UP, DOWN, LEFT, RIGHT
    DR: np.ndarray = np.array([-1, 1, 0, 0], dtype=np.int32)
    DC: np.ndarray = np.array([0, 0, -1, 1], dtype=np.int32)
    
    # Reward constants
    R_GOAL_BASE: float = 15.0
    R_STEP: float = -0.05
    R_WALL: float = -0.8
    R_PROGRESS: float = 0.5
    R_RESOURCE: float = 1.5
    R_LOOP: float = -0.3
    R_STAGNATION: float = -0.2
    MIN_REWARD_THRESHOLD: float = -50.0
    
    def __init__(
        self,
        render_mode: Optional[str] = None,
        morphing_rate: float = 0.2,
        goal_relocate_interval: int = 100,
        num_resources: int = 3
    ) -> None:
        super().__init__()
        
        self.render_mode = render_mode
        self.morphing_rate = float(np.clip(morphing_rate, 0.1, 0.3))
        self.goal_relocate_interval = int(np.clip(goal_relocate_interval, 50, 200))
        self.num_resources = int(np.clip(num_resources, 2, 5))
        
        self.observation_space = gym.spaces.Box(
            low=-1.0, high=1.0, shape=(self.OBS_DIM,), dtype=np.float32
        )
        self.action_space = gym.spaces.Discrete(4)
        
        # State variables (initialized in reset)
        self.agent_pos: Optional[np.ndarray] = None
        self.goal_pos: Optional[np.ndarray] = None
        self.walls: Optional[np.ndarray] = None
        self.resources: set = set()
        self.steps: int = 0
        self.cumulative_reward: float = 0.0
        self.prev_dist: float = 0.0
        
        # Experience storage
        self.action_history: deque = deque(maxlen=12)
        self.reward_history: deque = deque(maxlen=6)
        
        # Self-observation tracking
        self.collision_count: int = 0
        self.positions_history: deque = deque(maxlen=20)
        self.distance_history: deque = deque(maxlen=10)
        self.reward_trend: deque = deque(maxlen=10)
        self.action_counts: np.ndarray = np.zeros(4, dtype=np.float32)
        self.steps_since_relocation: int = 0
        self.goal_relocation_count: int = 0
        
    def reset(
        self,
        *,
        seed: Optional[int] = None,
        options: Optional[Dict[str, Any]] = None
    ) -> Tuple[np.ndarray, Dict[str, Any]]:
        super().reset(seed=seed)
        
        self.steps = 0
        self.cumulative_reward = 0.0
        self.collision_count = 0
        self.steps_since_relocation = 0
        self.goal_relocation_count = 0
        
        # Initialize maze walls (random 20% density)
        self.walls = np.zeros((self.GRID_SIZE, self.GRID_SIZE), dtype=bool)
        for r in range(self.GRID_SIZE):
            for c in range(self.GRID_SIZE):
                if self.np_random.random() < 0.2:
                    self.walls[r, c] = True
        
        # Initialize agent and goal positions
        self.agent_pos = self._random_free_cell()
        self.goal_pos = self._random_free_cell()
        
        # Initialize resources
        self.resources = set()
        for _ in range(self.num_resources):
            pos = self._random_free_cell()
            self.resources.add(tuple(pos.tolist()))
        
        # Clear histories
        self.action_history.clear()
        self.reward_history.clear()
        self.positions_history.clear()
        self.distance_history.clear()
        self.reward_trend.clear()
        self.action_counts = np.zeros(4, dtype=np.float32)
        
        self.prev_dist = self._manhattan_distance(self.agent_pos, self.goal_pos)
        
        obs = self._get_obs()
        info: Dict[str, Any] = {}
        
        return obs, info
    
    def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, Dict[str, Any]]:
        action = int(np.clip(action, 0, 3))
        
        self.steps += 1
        self.steps_since_relocation += 1
        self.action_counts[action] += 1.0
        
        # Track position for stagnation/loop detection
        self.positions_history.append(tuple(self.agent_pos.tolist()))
        
        # Morph maze: toggle walls with probability morphing_rate
        morphing_changes = 0
        for r in range(self.GRID_SIZE):
            for c in range(self.GRID_SIZE):
                # Don't morph agent or goal position
                if (r == self.agent_pos[0] and c == self.agent_pos[1]) or \
                   (r == self.goal_pos[0] and c == self.goal_pos[1]):
                    continue
                if self.np_random.random() < self.morphing_rate:
                    self.walls[r, c] = not self.walls[r, c]
                    morphing_changes += 1
                    # Remove resource if wall placed on it
                    if self.walls[r, c] and (r, c) in self.resources:
                        self.resources.remove((r, c))
        
        # Relocate goal if interval reached
        goal_relocated = False
        if self.steps_since_relocation >= self.goal_relocate_interval:
            self.goal_pos = self._random_free_cell()
            self.steps_since_relocation = 0
            self.goal_relocation_count += 1
            goal_relocated = True
        
        # Execute action
        new_r = int(self.agent_pos[0] + self.DR[action])
        new_c = int(self.agent_pos[1] + self.DC[action])
        
        collision = False
        if 0 <= new_r < self.GRID_SIZE and 0 <= new_c < self.GRID_SIZE:
            if not self.walls[new_r, new_c]:
                self.agent_pos = np.array([new_r, new_c], dtype=np.int32)
            else:
                collision = True
        else:
            collision = True
        
        if collision:
            self.collision_count += 1
        
        # Check resource collection
        resource_collected = False
        pos_tuple = tuple(self.agent_pos.tolist())
        if pos_tuple in self.resources:
            self.resources.remove(pos_tuple)
            resource_collected = True