Back to Catalog

dynamic-goal-navigation-v0

A 2D point-mass navigation task where the target goal location evolves continuously to test adaptation to changing reward structures while maintaining fixed transition dynamics. The agent controls a point mass with double-integrator dynamics (mass=1.0, friction=0.1) in a 10x10 bounded arena. The goal follows configurable dynamics: static, linear drift with reflection, Brownian random walk, or periodic teleportation.

Domain

navigation

Difficulty

medium

Observation

Box(shape=[6])

Action

Box(shape=[2])

Reward

dense

Max Steps

1000

Version

v1

Tests (8/8)

syntaximportresetstepobs_spaceaction_spacereward_sanitydeterminism

Use via API

import kualia env = kualia.make("dynamic-goal-navigation-v0") obs, info = env.reset()

Environment Code

8541 chars
import gymnasium as gym
import numpy as np
from typing import Optional, Dict, Any, Tuple


class DynamicGoalNavEnv(gym.Env):
    """
    2D point-mass navigation with evolving goal dynamics.
    
    The agent controls a point mass with double-integrator dynamics in a bounded
    arena. The target goal moves according to configurable dynamics (static,
    linear drift, random walk, or teleport), requiring the agent to adapt to
    changing reward landscapes while state transition dynamics remain fixed.
    
    Observation Space (normalized to [-1, 1]):
        - agent_x: normalized x position [-10, 10] -> [-1, 1]
        - agent_y: normalized y position [-10, 10] -> [-1, 1]
        - velocity_x: normalized x velocity [-5, 5] -> [-1, 1]
        - velocity_y: normalized y velocity [-5, 5] -> [-1, 1]
        - goal_x: normalized goal x position [-10, 10] -> [-1, 1]
        - goal_y: normalized goal y position [-10, 10] -> [-1, 1]
    
    Action Space (continuous):
        - force_x: x-component of force [-1, 1] (N)
        - force_y: y-component of force [-1, 1] (N)
    
    Reward Function:
        - negative Euclidean distance to goal
        - action penalty: -0.01 * L2-norm of action
        - clipped to [-10, 10]
    """
    
    # Physics constants
    MASS: float = 1.0
    FRICTION: float = 0.1
    DT: float = 0.1
    BOUNDS: float = 10.0  # Position bounds +/- 10
    VEL_BOUNDS: float = 5.0  # Velocity bounds +/- 5
    MAX_FORCE: float = 1.0
    
    # Reward constants
    ACTION_PENALTY_COEF: float = 0.01
    MAX_EPISODE_STEPS: int = 1000
    REWARD_CLIP: float = 10.0
    
    def __init__(
        self, 
        render_mode: Optional[str] = None,
        goal_mode: str = "linear_drift",
        alpha: float = 0.5,
        teleport_interval: int = 100
    ):
        super().__init__()
        
        self.render_mode = render_mode
        self.goal_mode = goal_mode
        self.alpha = alpha
        self.teleport_interval = teleport_interval
        
        # Observation space: normalized to [-1, 1]
        self.observation_space = gym.spaces.Box(
            low=-1.0, high=1.0, shape=(6,), dtype=np.float32
        )
        
        # Action space: 2D force commands
        self.action_space = gym.spaces.Box(
            low=-self.MAX_FORCE, high=self.MAX_FORCE, shape=(2,), dtype=np.float32
        )
        
        # State variables (initialized in reset)
        self.position: np.ndarray = np.zeros(2, dtype=np.float32)
        self.velocity: np.ndarray = np.zeros(2, dtype=np.float32)
        self.goal_position: np.ndarray = np.zeros(2, dtype=np.float32)
        self.goal_velocity: np.ndarray = np.zeros(2, dtype=np.float32)
        self.steps: int = 0
        self._last_teleport_step: int = 0
        
    def reset(
        self, 
        *, 
        seed: Optional[int] = None, 
        options: Optional[Dict[str, Any]] = None
    ) -> Tuple[np.ndarray, Dict]:
        super().reset(seed=seed)
        
        # Initialize agent state randomly within bounds
        self.position = self.np_random.uniform(
            -self.BOUNDS, self.BOUNDS, size=2
        ).astype(np.float32)
        self.velocity = self.np_random.uniform(
            -self.VEL_BOUNDS, self.VEL_BOUNDS, size=2
        ).astype(np.float32)
        
        # Initialize goal
        self.goal_position = self.np_random.uniform(
            -self.BOUNDS, self.BOUNDS, size=2
        ).astype(np.float32)
        
        # Initialize goal dynamics based on mode
        if self.goal_mode == "linear_drift":
            angle = self.np_random.uniform(0.0, 2.0 * np.pi)
            self.goal_velocity = self.alpha * np.array(
                [np.cos(angle), np.sin(angle)], dtype=np.float32
            )
        else:
            self.goal_velocity = np.zeros(2, dtype=np.float32)
            
        self.steps = 0
        self._last_teleport_step = 0
        
        observation = self._get_obs()
        info = {}
        
        return observation, info
    
    def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, bool, Dict]:
        # Clamp action to valid range (defensive programming)
        action = np.clip(action, -self.MAX_FORCE, self.MAX_FORCE).astype(np.float32)
        
        # Physics: double integrator with viscous friction
        # F_net = F_applied - friction * velocity
        # acceleration = F_net / mass
        force_applied = action
        acceleration = (force_applied - self.FRICTION * self.velocity) / self.MASS
        
        # Semi-implicit Euler integration
        self.velocity += acceleration * self.DT
        self.velocity = np.clip(self.velocity, -self.VEL_BOUNDS, self.VEL_BOUNDS)
        
        self.position += self.velocity * self.DT
        
        # Boundary reflection for agent
        for i in range(2):
            if self.position[i] > self.BOUNDS:
                self.position[i] = 2.0 * self.BOUNDS - self.position[i]
                self.velocity[i] = -self.velocity[i] * 0.9  # Slight energy loss on bounce
            elif self.position[i] < -self.BOUNDS:
                self.position[i] = -2.0 * self.BOUNDS - self.position[i]
                self.velocity[i] = -self.velocity[i] * 0.9
        
        # Update goal dynamics
        self._update_goal()
        
        # Compute reward components
        distance = np.linalg.norm(self.position - self.goal_position)
        action_norm = np.linalg.norm(action)
        
        reward_distance = -distance
        reward_action_penalty = -self.ACTION_PENALTY_COEF * action_norm
        reward = reward_distance + reward_action_penalty
        
        # Clip reward to range
        reward = float(np.clip(reward, -self.REWARD_CLIP, self.REWARD_CLIP))
        
        # Update counters
        self.steps += 1
        truncated = self.steps >= self.MAX_EPISODE_STEPS
        terminated = False
        
        info = {
            "reward_components": {
                "negative_distance": float(reward_distance),
                "action_penalty": float(reward_action_penalty)
            },
            "distance_to_goal": float(distance),
            "goal_mode": self.goal_mode
        }
        
        observation = self._get_obs()
        return observation, reward, terminated, truncated, info
    
    def _update_goal(self) -> None:
        """Update goal position based on the selected dynamics mode."""
        if self.goal_mode == "static":
            return
            
        elif self.goal_mode == "linear_drift":
            # Constant velocity with boundary reflection
            self.goal_position += self.goal_velocity * self.DT
            
            for i in range(2):
                if self.goal_position[i] > self.BOUNDS:
                    self.goal_position[i] = 2.0 * self.BOUNDS - self.goal_position[i]
                    self.goal_velocity[i] = -self.goal_velocity[i]
                elif self.goal_position[i] < -self.BOUNDS:
                    self.goal_position[i] = -2.0 * self.BOUNDS - self.goal_position[i]
                    self.goal_velocity[i] = -self.goal_velocity[i]
                    
        elif self.goal_mode == "random_walk":
            # Brownian motion: dx = alpha * sqrt(dt) * N(0,1)
            # Using dt factor for proper scaling
            noise = self.np_random.normal(0.0, 1.0, size=2).astype(np.float32)
            self.goal_position += self.alpha * np.sqrt(self.DT) * noise
            
            # Hard clamp to bounds
            self.goal_position = np.clip(
                self.goal_position, -self.BOUNDS, self.BOUNDS
            )
            
        elif self.goal_mode == "teleport":
            if self.steps - self._last_teleport_step >= self.teleport_interval:
                self.goal_position = self.np_random.uniform(
                    -self.BOUNDS, self.BOUNDS, size=2
                ).astype(np.float32)
                self._last_teleport_step = self.steps
    
    def _get_obs(self) -> np.ndarray:
        """
        Construct normalized observation vector.
        
        Normalizes positions by BOUNDS and velocities by VEL_BOUNDS
        to map all values into [-1, 1].
        """
        obs = np.concatenate([
            self.position / self.BOUNDS,
            self.velocity / self.VEL_BOUNDS,
            self.goal_position / self.BOUNDS
        ]).astype(np.float32)
        
        # Numerical safety clamp
        obs = np.clip(obs, -1.0, 1.0)
        return obs
    
    def close(self) -> None:
        """Cleanup resources."""
        pass