Back to Catalog

DynamicMazeWithSelfObservation

RESEARCH HYPOTHESIS: In dynamically changing environments where the environment state transitions after each agent action, agents that incorporate self-observation data (including recent action sequences, reward histories, environment change patterns, and strategy effectiveness metrics) into their observation space will demonstrate superior adaptation performance compared to agents that only observe external environment state SUB-HYPOTHESES: H1: Agents with access to their own recent action history and corresponding rewards will adapt faster to dynamic environment changes than agents without this self-observation capability; H2: Agents that track environment change patterns (how the environment responds to their actions) will develop more robust strategies in dynamic settings; H3: Agents that monitor their own strategy effectiveness (progress toward goals over recent timesteps) will avoid repeating ineffective action sequences and converge to better policies USER'S ORIGINAL IDEA: dynamic reinforcement learning environment and adaptive agent: environment that dynamically change in training, so the agent inside should adapt the new situation every time, everything in the environment changes after each action. the hypothesis: when the environment is dynamic and changes after agent's each action, thus the agent must store the experience and must do self observation while taking actions and also must store the self observation experience and learn from these experiences in order to adapt the dynamically changing environment. Senin hipotezim 3 katmanlı: Environment dinamik değişmeli → ✅ B Agent deneyimi depolamalı (experience storage) → agent'ın "bu durumda şu oldu, ortam böyle değişti" bilgisini açıkça saklaması. Agent self-observation yapmalı → Agent sadece dış ortamı gözlemliyor (duvarlar, hedef, kaynaklar). Ama kendi iç durumunu — "son 5 adımda ne yaptım, ne oldu, ortam nasıl değişti, stratejim işe yaradı mı" — gözlemlesin tezin env kodunda olması gereken şey: Observation space'e agent'ın kendi deneyim özeti eklenmeli: Son N adımdaki action dizisi (ne yaptım) Son N adımdaki reward dizisi (ne oldu) Ortam değişim vektörü (ortam nasıl değişti — duvar toggle oranı, hedef ne kadar kaydı) Strateji etkinliği (son K adımda hedefe yaklaştım mı uzaklaştım mı) Deneyim kalıpları (bu tür ortam değişiminde hangi stratejim işe yaradı) Yani agent'ın observation'ı sadece "dış dünya şu an nasıl" değil, "ben ne yaptım, ne oldu, ortam nasıl tepki verdi" bilgisini de içermeli CRITICAL: The environment MUST implement ALL aspects of the hypothesis, including any agent-side mechanisms (self-observation, experience storage, adaptive behavior tracking) as part of the OBSERVATION SPACE and REWARD FUNCTION. Do not just build the environment dynamics — also embed the agent-side requirements into the env's observation/reward design. ENVIRONMENT SPECIFICATION: OBSERVATION SPACE: 147-dimensional vector containing: (1) Current position (x,y) [2 dims], (2) Goal position (x,y) [2 dims], (3) 10x10 grid maze layout (walls=1, empty=0) [100 dims], (4) Last 5 actions taken [5 dims: 0=up,1=down,2=left,3=right,4=stay], (5) Last 5 rewards received [5 dims], (6) Environment change vector: wall toggle frequency in 3x3 local area over last 10 steps [9 dims], (7) Goal displacement distance over last 10 steps [10 dims], (8) Strategy effectiveness: distance-to-goal change over last 5 steps [5 dims], (9) Action pattern effectiveness: reward per action type over last 10 steps [4 dims], (10) Local exploration coverage: unique cells visited in last 15 steps [5 dims]. ACTION SPACE: Discrete(5) - up, down, left, right, stay. TRANSITION DYNAMICS: After each action: (a) 30% chance each wall in 5x5 area around agent toggles state, (b) Goal position shifts by 1-2 cells in random direction with 40% probability, (c) New walls may block current path to goal. REWARD FUNCTION: +10 for reaching goal, -0.1 per timestep, -1 for hitting wall, +0.5 for moving closer to goal, -0.5 for moving away, +1.0 bonus if agent reaches goal using fewer actions than previous 3 attempts (strategy improvement reward), -0.2 penalty for repeating same failed action sequence (last 3 actions) that previously led to wall collision. EPISODE TERMINATION: Goal reached, 200 timesteps elapsed, or agent stuck (same position for 10 consecutive steps). AGENT-SIDE REQUIREMENTS: Agent must maintain internal buffers for action history, reward history, and compute strategy effectiveness metrics that feed into observation space.

Domain

navigation

Difficulty

hard

Observation

Box(shape=?)

Action

Discrete(shape=?)

Reward

see spec

Max Steps

1000

Version

v1

Tests (8/8)

syntaximportresetstepobs_spaceaction_spacereward_sanitydeterminism

Use via API

import kualia env = kualia.make("dynamicmazewithselfobservation") obs, info = env.reset()

Environment Code

13518 chars
import gymnasium as gym
import numpy as np
from typing import Optional, Tuple, Dict


class DynamicAdaptiveNavigationEnv(gym.Env):
    """
    Dynamic Adaptive Navigation Environment.
    
    A partially-observable navigation environment designed to test adaptive agents 
    in non-stationary conditions. Features stochastic wall toggling, dynamic goal 
    displacement, and rich self-observation mechanisms.
    
    Observation Space (147 dimensions, Box([-1, 1])):
    - [0:2]: Agent position (x, y) normalized to [-1, 1]
    - [2:4]: Agent velocity (vx, vy) scaled and clipped
    - [4:6]: Relative goal position (dx, dy) from agent
    - [6]: Normalized distance to goal
    - [7:39]: Lidar sensor readings (32 directions, 0=far, 1=close)
    - [39:79]: Action history buffer (last 20 actions, 2D continuous)
    - [79:99]: Reward history buffer (last 20 rewards, normalized)
    - [99:119]: Position delta history (last 10 positions relative to current)
    - [119:139]: Goal trajectory history (last 10 goal positions relative to current)
    - [139]: Episode progress (steps / max_steps)
    - [140]: Success rate (moving window average)
    - [141]: Reward trend (slope over recent history)
    - [142]: Stagnation counter (normalized)
    - [143:147]: Wall proximity sensors (4 walls, distance or 1.0 if inactive)
    
    Action Space:
    - Continuous Box([-1, 1], shape=(2,)): Desired velocity in x and y directions
    
    Reward Structure:
    - Dense proximity reward: -0.1 * distance_to_goal
    - Success bonus: +10.0 when goal reached
    - Collision penalty: -5.0 for wall collision
    - Time penalty: -0.01 per step
    - Adaptation bonus: +0.5 * distance_improvement (incentivizes handling goal movement)
    All rewards clipped to [-10, 10].
    """
    
    MAX_STEPS: int = 500
    GOAL_THRESHOLD: float = 0.05
    ARENA_BOUND: float = 1.0
    LIDAR_RAYS: int = 32
    HISTORY_LENGTH: int = 20
    POSITION_HISTORY: int = 10
    WALL_COUNT: int = 4
    
    def __init__(self, render_mode: Optional[str] = None):
        super().__init__()
        
        self.render_mode = render_mode
        
        self.observation_space = gym.spaces.Box(
            low=-1.0, 
            high=1.0, 
            shape=(147,), 
            dtype=np.float32
        )
        
        self.action_space = gym.spaces.Box(
            low=-1.0, 
            high=1.0, 
            shape=(2,), 
            dtype=np.float32
        )
        
        self.agent_pos: np.ndarray = np.zeros(2, dtype=np.float32)
        self.agent_vel: np.ndarray = np.zeros(2, dtype=np.float32)
        self.goal_pos: np.ndarray = np.zeros(2, dtype=np.float32)
        self.goal_vel: np.ndarray = np.zeros(2, dtype=np.float32)
        self.walls_active: np.ndarray = np.zeros(4, dtype=bool)
        self.wall_positions: np.ndarray = np.zeros((4, 4), dtype=np.float32)
        
        self.action_history: np.ndarray = np.zeros((self.HISTORY_LENGTH, 2), dtype=np.float32)
        self.reward_history: np.ndarray = np.zeros(self.HISTORY_LENGTH, dtype=np.float32)
        self.position_history: np.ndarray = np.zeros((self.POSITION_HISTORY, 2), dtype=np.float32)
        self.goal_history: np.ndarray = np.zeros((self.POSITION_HISTORY, 2), dtype=np.float32)
        
        self.steps: int = 0
        self.episode_reward: float = 0.0
        self.success_window: np.ndarray = np.zeros(10, dtype=np.float32)
        self.stagnation_counter: int = 0
        self.prev_distance: float = 0.0
        
    def reset(self, *, seed: Optional[int] = None, options: Optional[Dict] = None) -> Tuple[np.ndarray, Dict]:
        super().reset(seed=seed)
        
        self.agent_pos = self.np_random.uniform(-0.9, 0.9, size=2).astype(np.float32)
        self.agent_vel = np.zeros(2, dtype=np.float32)
        
        self.goal_pos = self.np_random.uniform(-0.9, 0.9, size=2).astype(np.float32)
        while np.linalg.norm(self.goal_pos - self.agent_pos) < 0.3:
            self.goal_pos = self.np_random.uniform(-0.9, 0.9, size=2).astype(np.float32)
        
        self.goal_vel = np.zeros(2, dtype=np.float32)
        
        self.walls_active = self.np_random.random(self.WALL_COUNT) > 0.3
        self._generate_walls()
        
        self.action_history = np.zeros((self.HISTORY_LENGTH, 2), dtype=np.float32)
        self.reward_history = np.zeros(self.HISTORY_LENGTH, dtype=np.float32)
        self.position_history = np.tile(self.agent_pos, (self.POSITION_HISTORY, 1))
        self.goal_history = np.tile(self.goal_pos, (self.POSITION_HISTORY, 1))
        
        self.steps = 0
        self.episode_reward = 0.0
        self.success_window = np.zeros(10, dtype=np.float32)
        self.stagnation_counter = 0
        self.prev_distance = np.linalg.norm(self.goal_pos - self.agent_pos)
        
        obs = self._get_obs()
        info = {}
        
        return obs, info
    
    def _generate_walls(self) -> None:
        """Generate wall segments within the arena."""
        if self.walls_active[0]:
            x = self.np_random.uniform(-0.8, 0.2)
            self.wall_positions[0] = [x, -0.8, x + 0.6, -0.8]
        else:
            self.wall_positions[0] = [0, 0, 0, 0]
            
        if self.walls_active[1]:
            x = self.np_random.uniform(-0.2, 0.8)
            self.wall_positions[1] = [x, 0.8, x + 0.6, 0.8]
        else:
            self.wall_positions[1] = [0, 0, 0, 0]
            
        if self.walls_active[2]:
            y = self.np_random.uniform(-0.8, 0.2)
            self.wall_positions[2] = [-0.8, y, -0.8, y + 0.6]
        else:
            self.wall_positions[2] = [0, 0, 0, 0]
            
        if self.walls_active[3]:
            y = self.np_random.uniform(-0.2, 0.8)
            self.wall_positions[3] = [0.8, y, 0.8, y + 0.6]
        else:
            self.wall_positions[3] = [0, 0, 0, 0]
    
    def step(self, action) -> Tuple[np.ndarray, float, bool, bool, Dict]:
        action = np.clip(np.array(action, dtype=np.float32), -1.0, 1.0)
        
        self.action_history = np.roll(self.action_history, 1, axis=0)
        self.action_history[0] = action
        
        self.position_history = np.roll(self.position_history, 1, axis=0)
        self.position_history[0] = self.agent_pos.copy()
        
        self.goal_history = np.roll(self.goal_history, 1, axis=0)
        self.goal_history[0] = self.goal_pos.copy()
        
        self.agent_vel = 0.8 * self.agent_vel + 0.2 * action * 0.1
        new_pos = self.agent_pos + self.agent_vel
        
        collision = False
        for i in range(self.WALL_COUNT):
            if self.walls_active[i]:
                if self._line_intersect(self.agent_pos, new_pos, 
                                       self.wall_positions[i, 0:2], 
                                       self.wall_positions[i, 2:4]):
                    collision = True
                    self.agent_vel = np.zeros(2, dtype=np.float32)
                    break
        
        if not collision:
            self.agent_pos = np.clip(new_pos, -self.ARENA_BOUND, self.ARENA_BOUND)
        
        if self.steps % 50 == 0 and self.steps > 0:
            displacement = self.np_random.uniform(-0.3, 0.3, size=2)
            self.goal_pos = np.clip(self.goal_pos + displacement, -0.9, 0.9)
        else:
            self.goal_vel = 0.9 * self.goal_vel + 0.1 * self.np_random.normal(0, 0.02, size=2).astype(np.float32)
            self.goal_pos = np.clip(self.goal_pos + self.goal_vel, -0.9, 0.9)
        
        distance = np.linalg.norm(self.goal_pos - self.agent_pos)
        distance_improvement = self.prev_distance - distance
        self.prev_distance = distance
        
        terminated = False
        success = distance < self.GOAL_THRESHOLD
        
        if success:
            terminated = True
            self.success_window = np.roll(self.success_window, 1)
            self.success_window[0] = 1.0
        
        proximity_reward = -0.1 * distance
        success_reward = 10.0 if success else 0.0
        collision_reward = -5.0 if collision else 0.0
        time_reward = -0.01
        adaptation_reward = 0.5 * distance_improvement
        
        total_reward = proximity_reward + success_reward + collision_reward + time_reward + adaptation_reward
        total_reward = np.clip(total_reward, -10.0, 10.0).item()
        
        self.reward_history = np.roll(self.reward_history, 1)
        self.reward_history[0] = total_reward / 10.0
        
        self.episode_reward += total_reward
        self.steps += 1
        
        if distance_improvement < 0.001:
            self.stagnation_counter = min(self.stagnation_counter + 1, 100)
        else:
            self.stagnation_counter = max(self.stagnation_counter - 1, 0)
        
        truncated = self.steps >= self.MAX_STEPS
        
        obs = self._get_obs()
        
        info = {
            "reward_components": {
                "proximity": proximity_reward,
                "success": success_reward,
                "collision": collision_reward,
                "time": time_reward,
                "adaptation": adaptation_reward
            },
            "distance": distance,
            "collision": collision
        }
        
        return obs, float(total_reward), terminated, truncated, info
    
    def _get_obs(self) -> np.ndarray:
        obs = np.zeros(147, dtype=np.float32)
        
        obs[0:2] = self.agent_pos
        obs[2:4] = np.clip(self.agent_vel * 10.0, -1.0, 1.0)
        
        relative_goal = self.goal_pos - self.agent_pos
        obs[4:6] = np.clip(relative_goal, -1.0, 1.0)
        
        distance = np.linalg.norm(relative_goal)
        obs[6] = np.clip(distance / 2.0, 0.0, 1.0)
        
        lidar_readings = self._get_lidar_readings()
        obs[7:39] = lidar_readings
        
        obs[39:79] = self.action_history.flatten()
        obs[79:99] = self.reward_history
        
        position_deltas = self.position_history - self.agent_pos
        obs[99:119] = np.clip(position_deltas.flatten(), -1.0, 1.0)
        
        goal_trajectory = self.goal_history - self.agent_pos
        obs[119:139] = np.clip(goal_trajectory.flatten(), -1.0, 1.0)
        
        obs[139] = self.steps / self.MAX_STEPS
        obs[140] = np.mean(self.success_window)
        
        if self.steps >= 5:
            recent_rewards = self.reward_history[:5]
            x = np.arange(5, dtype=np.float32)
            reward_trend = np.polyfit(x, recent_rewards, 1)[0] * 10.0
        else:
            reward_trend = 0.0
        obs[141] = np.clip(reward_trend, -1.0, 1.0)
        
        obs[142] = self.stagnation_counter / 100.0
        
        wall_proximity = np.ones(4, dtype=np.float32)
        for i in range(self.WALL_COUNT):
            if self.walls_active[i]:
                wall_center = (self.wall_positions[i, 0:2] + self.wall_positions[i, 2:4]) / 2.0
                dist_to_wall = np.linalg.norm(wall_center - self.agent_pos)
                wall_proximity[i] = np.clip(dist_to_wall / 2.0, 0.0, 1.0)
        obs[143:147] = wall_proximity
        
        return np.clip(obs, -1.0, 1.0)
    
    def _get_lidar_readings(self) -> np.ndarray:
        readings = np.ones(self.LIDAR_RAYS, dtype=np.float32)
        max_range = 2.0
        
        for i in range(self.LIDAR_RAYS):
            angle = 2.0 * np.pi * i / self.LIDAR_RAYS
            direction = np.array([np.cos(angle), np.sin(angle)], dtype=np.float32)
            end_point = self.agent_pos + direction * max_range
            
            min_dist = max_range
            for j in range(self.WALL_COUNT):
                if self.walls_active[j]:
                    intersect = self._line_intersect_dist(self.agent_pos, end_point,
                                                         self.wall_positions[j, 0:2],
                                                         self.wall_positions[j, 2:4])
                    if intersect is not None:
                        min_dist = min(min_dist, intersect)
            
            readings[i] = 1.0 - np.clip(min_dist / max_range, 0.0, 1.0)
        
        return readings
    
    def _line_intersect(self, a1: np.ndarray, a2: np.ndarray, b1: np.ndarray, b2: np.ndarray) -> bool:
        """Check if line segment a1-a2 intersects with line segment b1-b2."""
        denom = (b2[1] - b1[1]) * (a2[0] - a1[0]) - (b2[0] - b1[0]) * (a2[1] - a1[1])
        if np.abs(denom) < 1e-10:
            return False
        
        ua = ((b2[0] - b1[0]) * (a1[1] - b1[1]) - (b2[1] - b1[1]) * (a1[0] - b1[0])) / denom
        ub = ((a2[0] - a1[0]) * (a1[1] - b1[1]) - (a2[1] - a1[1]) * (a1[0] - b1[0])) / denom
        
        return 0.0 <= ua <= 1.0 and 0.0 <= ub <= 1.0
    
    def _line_intersect_dist(self, a1: np.ndarray, a2: np.ndarray, b1: np.ndarray, b2: np.ndarray) -> Optional[float]:
        """Return distance from a1 to intersection point, or None if no intersection."""
        denom = (b2[1] - b1[1]) * (a2[0] - a1[0]) - (b2[0] - b1[0]) * (a2[1] - a1[1])
        if np.abs(denom) < 1e-10:
            return None
        
        ua = ((b2[0] - b1[0]) * (a1[1] - b1[1]) - (b2[1] - b1[1]) * (a1[0] - b1[0])) / denom
        ub = ((a2[0] - a1[0]) * (a1[1] - b1[1]) - (a2[1] - a1[1]) * (a1[0] - b1[0])) / denom
        
        if 0.0 <= ua <= 1.0 and 0.0 <= ub <= 1.0:
            intersect_point = a1 + ua * (a2 - a1)
            return np.linalg.norm(intersect_point - a1)
        return None
    
    def close(self) -> None:
        pass