"""``StickyAction`` wrapper - There is a probability that the action is taken again."""
from __future__ import annotations
from typing import Any
import numpy as np
import gymnasium as gym
from gymnasium.core import ActType, ObsType
from gymnasium.error import InvalidBound, InvalidProbability
__all__ = ["StickyAction"]
[docs]
class StickyAction(
gym.ActionWrapper[ObsType, ActType, ActType], gym.utils.RecordConstructorArgs
):
"""Adds a probability that the action is repeated for the same ``step`` function.
This wrapper follows the implementation proposed by `Machado et al., 2018 <https://arxiv.org/pdf/1709.06009.pdf>`_
in Section 5.2 on page 12, and adds the possibility to repeat the action for
more than one step.
No vector version of the wrapper exists.
Example:
>>> import gymnasium as gym
>>> env = gym.make("CartPole-v1")
>>> env = StickyAction(env, repeat_action_probability=0.9)
>>> env.reset(seed=123)
(array([ 0.01823519, -0.0446179 , -0.02796401, -0.03156282], dtype=float32), {})
>>> env.step(1)
(array([ 0.01734283, 0.15089367, -0.02859527, -0.33293587], dtype=float32), 1.0, False, False, {})
>>> env.step(0)
(array([ 0.0203607 , 0.34641072, -0.03525399, -0.6344974 ], dtype=float32), 1.0, False, False, {})
>>> env.step(1)
(array([ 0.02728892, 0.5420062 , -0.04794393, -0.9380709 ], dtype=float32), 1.0, False, False, {})
>>> env.step(0)
(array([ 0.03812904, 0.34756234, -0.06670535, -0.6608303 ], dtype=float32), 1.0, False, False, {})
Change logs:
* v1.0.0 - Initially added
* v1.1.0 - Add `repeat_action_duration` argument for dynamic number of sticky actions
"""
def __init__(
self,
env: gym.Env[ObsType, ActType],
repeat_action_probability: float,
repeat_action_duration: int | tuple[int, int] = 1,
):
"""Initialize StickyAction wrapper.
Args:
env (Env): the wrapped environment,
repeat_action_probability (int | float): a probability of repeating the old action,
repeat_action_duration (int | tuple[int, int]): the number of steps
the action is repeated. It can be either an int (for deterministic
repeats) or a tuple[int, int] for a range of stochastic number of repeats.
"""
if not 0 <= repeat_action_probability < 1:
raise InvalidProbability(
f"`repeat_action_probability` should be in the interval [0,1). Received {repeat_action_probability}"
)
if isinstance(repeat_action_duration, int):
repeat_action_duration = (repeat_action_duration, repeat_action_duration)
if not isinstance(repeat_action_duration, tuple):
raise ValueError(
f"`repeat_action_duration` should be either an integer or a tuple. Received {repeat_action_duration}"
)
elif len(repeat_action_duration) != 2:
raise ValueError(
f"`repeat_action_duration` should be a tuple or a list of two integers. Received {repeat_action_duration}"
)
elif repeat_action_duration[0] > repeat_action_duration[1]:
raise InvalidBound(
f"`repeat_action_duration` is not a valid bound. Received {repeat_action_duration}"
)
elif np.any(np.array(repeat_action_duration) < 1):
raise ValueError(
f"`repeat_action_duration` should be larger or equal than 1. Received {repeat_action_duration}"
)
gym.utils.RecordConstructorArgs.__init__(
self, repeat_action_probability=repeat_action_probability
)
gym.ActionWrapper.__init__(self, env)
self.repeat_action_probability = repeat_action_probability
self.repeat_action_duration_range = repeat_action_duration
self.last_action: ActType | None = None
self.is_sticky_actions: bool = False # if sticky actions are taken
self.num_repeats: int = 0 # number of sticky action repeats
self.repeats_taken: int = 0 # number of sticky actions taken
def reset(
self, *, seed: int | None = None, options: dict[str, Any] | None = None
) -> tuple[ObsType, dict[str, Any]]:
"""Reset the environment."""
self.last_action = None
self.is_sticky_actions = False
self.num_repeats = 0
self.repeats_taken = 0
return super().reset(seed=seed, options=options)
def action(self, action: ActType) -> ActType:
"""Execute the action."""
# either the agent was already "stuck" into repeats, or a new series of repeats is triggered
if self.is_sticky_actions or (
self.last_action is not None
and self.np_random.uniform() < self.repeat_action_probability
):
# if a new series starts, randomly sample its duration
if self.num_repeats == 0:
self.num_repeats = self.np_random.integers(
self.repeat_action_duration_range[0],
self.repeat_action_duration_range[1] + 1,
)
action = self.last_action
self.is_sticky_actions = True
self.repeats_taken += 1
# repeats are done, reset "stuck" status
if self.is_sticky_actions and self.num_repeats == self.repeats_taken:
self.is_sticky_actions = False
self.num_repeats = 0
self.repeats_taken = 0
self.last_action = action
return action