Back to Catalog

Dynamic Resource Market

A medium-complexity economic simulation where an agent manages a portfolio of 5 resources. The agent observes resource quantities, market prices, and demand levels to make buy/sell/hold decisions. Market conditions exhibit volatility cycles (20-50% fluctuation ranges) and random scarcity/abundance events. The agent must optimize portfolio value while minimizing transaction costs and drawdowns.

Domain

finance

Difficulty

medium

Observation

Box(shape=[17])

Action

Discrete(shape=[15])

Reward

shaped

Max Steps

1000

Version

v1

Tests (8/8)

syntaximportresetstepobs_spaceaction_spacereward_sanitydeterminism

Use via API

import kualia env = kualia.make("dynamic-resource-market") obs, info = env.reset()

Environment Code

9576 chars
import gymnasium as gym
import numpy as np
from typing import Tuple, Dict, Any


class ResourceMarketEnv(gym.Env):
    """
    A resource trading environment with 5 assets, discrete actions, and stochastic market cycles.
    
    Observation Space (17-dim, normalized to [-1, 1]):
        - [0:5]: Resource holdings normalized by max_inventory
        - [5:10]: Market prices normalized by 200.0
        - [10:15]: Demand levels (0-1 mapped to [-1, 1])
        - [15]: Cash ratio normalized by (initial_cash * 5)
        - [16]: Volatility regime indicator (0=low, 1=med, 2=high)
    
    Action Space (Discrete 15):
        - Encoded as resource_idx * 3 + action_type
        - action_type: 0=hold, 1=buy (1 unit), 2=sell (1 unit)
        - resource_idx: 0-4 targeting specific resource
    
    Reward:
        - Portfolio log-return minus transaction costs and drawdown penalty
        - Clipped to [-10, 10]
        - Dense reward provided at each step
    """
    
    # Environment constants
    N_RESOURCES: int = 5
    MAX_STEPS: int = 100
    INITIAL_CASH: float = 10000.0
    MAX_INVENTORY: float = 100.0
    TRANSACTION_COST: float = 0.01
    PRICE_MAX: float = 200.0
    PRICE_MIN: float = 1.0
    
    # Volatility regimes (std dev of price changes)
    VOLATILITY_LEVELS = [0.20, 0.35, 0.50]
    
    def __init__(self, render_mode: str | None = None) -> None:
        super().__init__()
        self.render_mode = render_mode
        
        # Spaces
        self.observation_space = gym.spaces.Box(
            low=-1.0, high=1.0, shape=(17,), dtype=np.float32
        )
        self.action_space = gym.spaces.Discrete(15)  # 5 resources * 3 actions
        
        # State variables (initialized in reset)
        self.holdings: np.ndarray | None = None
        self.prices: np.ndarray | None = None
        self.demands: np.ndarray | None = None
        self.cash: float = 0.0
        self.volatility_regime: int = 0
        self.step_count: int = 0
        self.portfolio_history: list[float] = []
        
    def reset(
        self, 
        *, 
        seed: int | None = None, 
        options: dict | None = None
    ) -> Tuple[np.ndarray, Dict[str, Any]]:
        super().reset(seed=seed)
        
        # Initialize state
        self.step_count = 0
        self.cash = self.INITIAL_CASH
        self.portfolio_history = [self.INITIAL_CASH]
        
        # Random initial conditions
        self.holdings = np.zeros(self.N_RESOURCES, dtype=np.float32)
        self.prices = self.np_random.uniform(
            20.0, 80.0, size=self.N_RESOURCES
        ).astype(np.float32)
        self.demands = self.np_random.uniform(
            0.3, 0.7, size=self.N_RESOURCES
        ).astype(np.float32)
        self.volatility_regime = int(self.np_random.integers(0, 3))
        
        obs = self._get_observation()
        info: Dict[str, Any] = {}
        
        return obs, info
    
    def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, Dict[str, Any]]:
        # Validate and decode action
        action = int(np.clip(action, 0, 14))
        resource_idx = action // 3
        action_type = action % 3  # 0=hold, 1=buy, 2=sell
        
        # Record pre-trade state
        old_portfolio_value = self._get_portfolio_value()
        trade_value = 0.0
        
        # Execute trade
        if action_type == 1:  # Buy
            trade_value = self._execute_buy(resource_idx)
        elif action_type == 2:  # Sell
            trade_value = self._execute_sell(resource_idx)
        # Hold does nothing
        
        # Update market conditions (post-action dynamics)
        self._update_market_conditions()
        
        # Calculate new state
        new_portfolio_value = self._get_portfolio_value()
        self.portfolio_history.append(new_portfolio_value)
        self.step_count += 1
        
        # Compute reward components
        if old_portfolio_value > 1e-8:
            raw_return = (new_portfolio_value - old_portfolio_value) / old_portfolio_value
        else:
            raw_return = 0.0
        
        transaction_penalty = (self.TRANSACTION_COST * trade_value) / max(old_portfolio_value, 1e-8)
        
        # Drawdown penalty (risk management)
        peak_value = max(self.portfolio_history) if self.portfolio_history else new_portfolio_value
        drawdown = (peak_value - new_portfolio_value) / peak_value if peak_value > 0 else 0.0
        drawdown_penalty = 0.1 * drawdown
        
        # Total reward
        reward = raw_return - transaction_penalty - drawdown_penalty
        reward = float(np.clip(reward, -10.0, 10.0))
        
        # Termination checks
        terminated = bool(new_portfolio_value <= 0)
        truncated = bool(self.step_count >= self.MAX_STEPS)
        
        obs = self._get_observation()
        info = {
            "reward_components": {
                "portfolio_return": float(raw_return),
                "transaction_cost": float(-transaction_penalty),
                "drawdown_penalty": float(-drawdown_penalty),
                "portfolio_value": float(new_portfolio_value),
                "cash_held": float(self.cash),
                "volatility_regime": self.volatility_regime
            }
        }
        
        return obs, reward, terminated, truncated, info
    
    def _execute_buy(self, resource_idx: int) -> float:
        """Execute buy order for 1 unit of resource. Returns trade value."""
        price = self.prices[resource_idx]
        max_affordable = int(
            self.cash / (price * (1.0 + self.TRANSACTION_COST))
        )
        max_buyable = int(self.MAX_INVENTORY - self.holdings[resource_idx])
        qty = min(1, max_affordable, max_buyable)
        
        if qty > 0:
            cost = qty * price * (1.0 + self.TRANSACTION_COST)
            self.cash -= cost
            self.holdings[resource_idx] += qty
            return qty * price
        return 0.0
    
    def _execute_sell(self, resource_idx: int) -> float:
        """Execute sell order for 1 unit of resource. Returns trade value."""
        qty = min(1.0, self.holdings[resource_idx])
        
        if qty > 0:
            price = self.prices[resource_idx]
            revenue = qty * price * (1.0 - self.TRANSACTION_COST)
            self.cash += revenue
            self.holdings[resource_idx] -= qty
            return qty * price
        return 0.0
    
    def _update_market_conditions(self) -> None:
        """Update prices, demands, and volatility regime with stochastic dynamics."""
        # Volatility cycle shift (10% chance per step)
        if self.np_random.random() < 0.1:
            self.volatility_regime = int(self.np_random.integers(0, 3))
        
        current_vol = self.VOLATILITY_LEVELS[self.volatility_regime]
        
        # Update each resource
        for i in range(self.N_RESOURCES):
            # Base random walk
            shock = self.np_random.normal(0.0, current_vol)
            
            # Demand influence (high demand -> upward pressure)
            demand_effect = (self.demands[i] - 0.5) * 0.05
            
            # Mean reversion to 50.0
            mean_reversion = (50.0 - self.prices[i]) * 0.03
            
            # Scarcity/Abundance events (5% chance)
            event_shock = 0.0
            if self.np_random.random() < 0.05:
                if self.np_random.random() < 0.5:
                    # Scarcity: price spike, demand increases
                    event_shock = 0.4
                    self.demands[i] = min(1.0, self.demands[i] + 0.15)
                else:
                    # Abundance: price crash, demand decreases
                    event_shock = -0.4
                    self.demands[i] = max(0.0, self.demands[i] - 0.15)
            
            # Apply price change
            total_change = shock + demand_effect + mean_reversion + event_shock
            self.prices[i] *= (1.0 + total_change)
            self.prices[i] = np.clip(
                self.prices[i], self.PRICE_MIN, self.PRICE_MAX
            )
        
        # Random demand evolution
        demand_noise = self.np_random.normal(0.0, 0.08, size=self.N_RESOURCES)
        self.demands = np.clip(self.demands + demand_noise, 0.0, 1.0)
    
    def _get_portfolio_value(self) -> float:
        """Calculate total portfolio value (cash + holdings)."""
        holdings_value = np.sum(self.holdings * self.prices)
        return self.cash + holdings_value
    
    def _get_observation(self) -> np.ndarray:
        """Construct normalized observation vector."""
        # Holdings: [0, MAX_INVENTORY] -> [-1, 1]
        holdings_norm = (self.holdings / self.MAX_INVENTORY) * 2.0 - 1.0
        
        # Prices: [0, PRICE_MAX] -> [-1, 1]
        prices_norm = (self.prices / self.PRICE_MAX) * 2.0 - 1.0
        
        # Demands: [0, 1] -> [-1, 1]
        demands_norm = self.demands * 2.0 - 1.0
        
        # Cash: [0, INITIAL_CASH*5] -> [-1, 1], clipped
        cash_ratio = np.clip(self.cash / (self.INITIAL_CASH * 5.0), 0.0, 1.0)
        cash_norm = cash_ratio * 2.0 - 1.0
        
        # Volatility regime: {0,1,2} -> [-1, 1]
        vol_norm = (self.volatility_regime / 2.0) * 2.0 - 1.0
        
        obs = np.concatenate([
            holdings_norm,
            prices_norm,
            demands_norm,
            np.array([cash_norm], dtype=np.float32),
            np.array([vol_norm], dtype=np.float32)
        ]).astype(np.float32)
        
        return obs
    
    def close(self) -> None:
        """Cleanup resources."""
        pass