Source code for gymnasium.wrappers.stateful_action

"""``StickyAction`` wrapper - There is a probability that the action is taken again."""

from __future__ import annotations

from typing import Any

import numpy as np

import gymnasium as gym
from gymnasium.core import ActType, ObsType
from gymnasium.error import InvalidBound, InvalidProbability


__all__ = ["StickyAction"]


[docs] class StickyAction( gym.ActionWrapper[ObsType, ActType, ActType], gym.utils.RecordConstructorArgs ): """Adds a probability that the action is repeated for the same ``step`` function. This wrapper follows the implementation proposed by `Machado et al., 2018 <https://arxiv.org/pdf/1709.06009.pdf>`_ in Section 5.2 on page 12, and adds the possibility to repeat the action for more than one step. No vector version of the wrapper exists. Example: >>> import gymnasium as gym >>> env = gym.make("CartPole-v1") >>> env = StickyAction(env, repeat_action_probability=0.9) >>> env.reset(seed=123) (array([ 0.01823519, -0.0446179 , -0.02796401, -0.03156282], dtype=float32), {}) >>> env.step(1) (array([ 0.01734283, 0.15089367, -0.02859527, -0.33293587], dtype=float32), 1.0, False, False, {}) >>> env.step(0) (array([ 0.0203607 , 0.34641072, -0.03525399, -0.6344974 ], dtype=float32), 1.0, False, False, {}) >>> env.step(1) (array([ 0.02728892, 0.5420062 , -0.04794393, -0.9380709 ], dtype=float32), 1.0, False, False, {}) >>> env.step(0) (array([ 0.03812904, 0.34756234, -0.06670535, -0.6608303 ], dtype=float32), 1.0, False, False, {}) Change logs: * v1.0.0 - Initially added * v1.1.0 - Add `repeat_action_duration` argument for dynamic number of sticky actions """ def __init__( self, env: gym.Env[ObsType, ActType], repeat_action_probability: float, repeat_action_duration: int | tuple[int, int] = 1, ): """Initialize StickyAction wrapper. Args: env (Env): the wrapped environment, repeat_action_probability (int | float): a probability of repeating the old action, repeat_action_duration (int | tuple[int, int]): the number of steps the action is repeated. It can be either an int (for deterministic repeats) or a tuple[int, int] for a range of stochastic number of repeats. """ if not 0 <= repeat_action_probability < 1: raise InvalidProbability( f"`repeat_action_probability` should be in the interval [0,1). Received {repeat_action_probability}" ) if isinstance(repeat_action_duration, int): repeat_action_duration = (repeat_action_duration, repeat_action_duration) if not isinstance(repeat_action_duration, tuple): raise ValueError( f"`repeat_action_duration` should be either an integer or a tuple. Received {repeat_action_duration}" ) elif len(repeat_action_duration) != 2: raise ValueError( f"`repeat_action_duration` should be a tuple or a list of two integers. Received {repeat_action_duration}" ) elif repeat_action_duration[0] > repeat_action_duration[1]: raise InvalidBound( f"`repeat_action_duration` is not a valid bound. Received {repeat_action_duration}" ) elif np.any(np.array(repeat_action_duration) < 1): raise ValueError( f"`repeat_action_duration` should be larger or equal than 1. Received {repeat_action_duration}" ) gym.utils.RecordConstructorArgs.__init__( self, repeat_action_probability=repeat_action_probability ) gym.ActionWrapper.__init__(self, env) self.repeat_action_probability = repeat_action_probability self.repeat_action_duration_range = repeat_action_duration self.last_action: ActType | None = None self.is_sticky_actions: bool = False # if sticky actions are taken self.num_repeats: int = 0 # number of sticky action repeats self.repeats_taken: int = 0 # number of sticky actions taken def reset( self, *, seed: int | None = None, options: dict[str, Any] | None = None ) -> tuple[ObsType, dict[str, Any]]: """Reset the environment.""" self.last_action = None self.is_sticky_actions = False self.num_repeats = 0 self.repeats_taken = 0 return super().reset(seed=seed, options=options) def action(self, action: ActType) -> ActType: """Execute the action.""" # either the agent was already "stuck" into repeats, or a new series of repeats is triggered if self.is_sticky_actions or ( self.last_action is not None and self.np_random.uniform() < self.repeat_action_probability ): # if a new series starts, randomly sample its duration if self.num_repeats == 0: self.num_repeats = self.np_random.integers( self.repeat_action_duration_range[0], self.repeat_action_duration_range[1] + 1, ) action = self.last_action self.is_sticky_actions = True self.repeats_taken += 1 # repeats are done, reset "stuck" status if self.is_sticky_actions and self.num_repeats == self.repeats_taken: self.is_sticky_actions = False self.num_repeats = 0 self.repeats_taken = 0 self.last_action = action return action