Source code for gymnasium.wrappers.vector.vectorize_action

"""Vectorizes action wrappers to work for `VectorEnv`."""

from __future__ import annotations

from collections.abc import Callable
from copy import deepcopy
from typing import TYPE_CHECKING, Any, Generic

import numpy as np

from gymnasium import Space
from gymnasium.core import Env
from gymnasium.logger import warn
from gymnasium.vector import VectorActionWrapper, VectorEnv
from gymnasium.vector.utils import batch_space, concatenate, create_empty_array, iterate
from gymnasium.wrappers import transform_action

if TYPE_CHECKING:
    from typing_extensions import TypeVar

    _T_contra = TypeVar("_T_contra", contravariant=True, default=Any)
    _T_co = TypeVar("_T_co", covariant=True, default=_T_contra)
else:
    from typing import TypeVar

    _T_contra = TypeVar("_T_contra", contravariant=True)
    _T_co = TypeVar("_T_co", covariant=True)


[docs] class TransformAction(VectorActionWrapper, Generic[_T_contra, _T_co]): """Transforms an action via a function provided to the wrapper. The function :attr:`func` will be applied to all vector actions. If the observations from :attr:`func` are outside the bounds of the ``env``'s action space, provide an :attr:`action_space` which specifies the action space for the vectorized environment. Example - Without action transformation: >>> import gymnasium as gym >>> envs = gym.make_vec("MountainCarContinuous-v0", num_envs=3) >>> _ = envs.action_space.seed(123) >>> obs, info = envs.reset(seed=123) >>> for _ in range(10): ... obs, rew, term, trunc, info = envs.step(envs.action_space.sample()) ... >>> envs.close() >>> obs array([[-0.46553135, -0.00142543], [-0.498371 , -0.00715587], [-0.46515748, -0.00624371]], dtype=float32) Example - With action transformation: >>> import gymnasium as gym >>> from gymnasium.spaces import Box >>> def shrink_action(act): ... return act * 0.3 ... >>> envs = gym.make_vec("MountainCarContinuous-v0", num_envs=3) >>> new_action_space = Box(low=shrink_action(envs.action_space.low), high=shrink_action(envs.action_space.high)) >>> envs = TransformAction(env=envs, func=shrink_action, action_space=new_action_space) >>> _ = envs.action_space.seed(123) >>> obs, info = envs.reset(seed=123) >>> for _ in range(10): ... obs, rew, term, trunc, info = envs.step(envs.action_space.sample()) ... >>> envs.close() >>> obs array([[-0.48468155, -0.00372536], [-0.47599354, -0.00545912], [-0.46543318, -0.00615723]], dtype=float32) """ single_action_space: Space action_space: Space func: Callable[[_T_contra], _T_co] def __init__( self, env: VectorEnv, func: Callable[[_T_contra], _T_co], action_space: Space | None = None, single_action_space: Space | None = None, ) -> None: """Constructor for the lambda action wrapper. Args: env: The vector environment to wrap func: A function that will transform an action. If this transformed action is outside the action space of ``env.action_space`` then provide an ``action_space``. action_space: The action spaces of the wrapper. If None, then it is computed from ``single_action_space``. If ``single_action_space`` is not provided either, then it is assumed to be the same as ``env.action_space``. single_action_space: The action space of the non-vectorized environment. If None, then it is assumed the same as ``env.single_action_space``. """ super().__init__(env) if action_space is None: if single_action_space is not None: self.single_action_space = single_action_space self.action_space = batch_space(single_action_space, self.num_envs) else: self.action_space = action_space if single_action_space is not None: self.single_action_space = single_action_space # TODO: We could compute single_action_space from the action_space if only the latter is provided and avoid the warning below. if self.action_space != batch_space(self.single_action_space, self.num_envs): warn( f"For {env}, the action space and the batched single action space don't match as expected, action_space={env.action_space}, batched single_action_space={batch_space(self.single_action_space, self.num_envs)}" ) self.func = func def actions(self, actions: _T_contra) -> _T_co: """Applies the :attr:`func` to the actions.""" return self.func(actions)
[docs] class VectorizeTransformAction(VectorActionWrapper): """Vectorizes a single-agent transform action wrapper for vector environments. Example - Without action transformation: >>> import gymnasium as gym >>> envs = gym.make_vec("MountainCarContinuous-v0", num_envs=3) >>> _ = envs.action_space.seed(123) >>> obs, info = envs.reset(seed=123) >>> obs, rew, term, trunc, info = envs.step(envs.action_space.sample()) >>> envs.close() >>> obs array([[-4.6343064e-01, 9.8971417e-05], [-4.4488689e-01, -1.9375233e-03], [-4.3118435e-01, -1.5342437e-03]], dtype=float32) Example - Adding a transform that applies a ReLU to the action: >>> import gymnasium as gym >>> from gymnasium.wrappers import TransformAction >>> envs = gym.make_vec("MountainCarContinuous-v0", num_envs=3) >>> envs = VectorizeTransformAction(envs, wrapper=TransformAction, func=lambda x: (x > 0.0) * x, action_space=envs.single_action_space) >>> _ = envs.action_space.seed(123) >>> obs, info = envs.reset(seed=123) >>> obs, rew, term, trunc, info = envs.step(envs.action_space.sample()) >>> envs.close() >>> obs array([[-4.6343064e-01, 9.8971417e-05], [-4.4354835e-01, -5.9898634e-04], [-4.3034542e-01, -6.9532328e-04]], dtype=float32) """ class _SingleEnv(Env): """Fake single-agent environment used for the single-agent wrapper.""" action_space: Space def __init__(self, action_space: Space) -> None: """Constructor for the fake environment.""" self.action_space = action_space wrapper: transform_action.TransformAction single_action_space: Space action_space: Space same_out: bool out: np.ndarray def __init__( self, env: VectorEnv, wrapper: type[transform_action.TransformAction], **kwargs: Any, ) -> None: """Constructor for the vectorized lambda action wrapper. Args: env: The vector environment to wrap wrapper: The wrapper to vectorize **kwargs: Arguments for the LambdaAction wrapper """ super().__init__(env) self.wrapper = wrapper(self._SingleEnv(self.env.single_action_space), **kwargs) self.single_action_space = self.wrapper.action_space self.action_space = batch_space(self.single_action_space, self.num_envs) self.same_out = self.action_space == self.env.action_space # ty doesn't support `@single_dispatch` yet self.out = create_empty_array(self.env.single_action_space, self.num_envs) # ty:ignore[invalid-assignment] def actions(self, actions: np.ndarray) -> np.ndarray: """Applies the wrapper to each of the action. Args: actions: The actions to apply the function to Returns: The updated actions using the wrapper func """ if self.same_out: actions_out = concatenate( self.env.single_action_space, tuple( self.wrapper.func(action) for action in iterate(self.action_space, actions) ), actions, ) else: actions_out = deepcopy( concatenate( self.env.single_action_space, tuple( self.wrapper.func(action) for action in iterate(self.action_space, actions) ), self.out, ) ) # ty doesn't support `@single_dispatch` yet return actions_out # ty:ignore[invalid-return-type]
[docs] class ClipAction(VectorizeTransformAction): """Clip the continuous action within the valid :class:`Box` observation space bound. Example - Passing an out-of-bounds action to the environment to be clipped. >>> import numpy as np >>> import gymnasium as gym >>> envs = gym.make_vec("MountainCarContinuous-v0", num_envs=3) >>> envs = ClipAction(envs) >>> _ = envs.action_space.seed(123) >>> obs, info = envs.reset(seed=123) >>> obs, rew, term, trunc, info = envs.step(np.array([5.0, -5.0, 2.0])) >>> envs.close() >>> obs array([[-0.4624777 , 0.00105192], [-0.44504836, -0.00209899], [-0.42884544, 0.00080468]], dtype=float32) """ def __init__(self, env: VectorEnv) -> None: """Constructor for the Clip Action wrapper. Args: env: The vector environment to wrap """ super().__init__(env, transform_action.ClipAction)
[docs] class RescaleAction(VectorizeTransformAction): """Affinely rescales the continuous action space of the environment to the range [min_action, max_action]. Example - Without action scaling: >>> import numpy as np >>> import gymnasium as gym >>> envs = gym.make_vec("MountainCarContinuous-v0", num_envs=3) >>> _ = envs.action_space.seed(123) >>> obs, info = envs.reset(seed=123) >>> for _ in range(10): ... obs, rew, term, trunc, info = envs.step(0.5 * np.ones((3, 1))) ... >>> envs.close() >>> obs array([[-0.44799727, 0.00266526], [-0.4351738 , 0.00133522], [-0.42683297, 0.00048403]], dtype=float32) Example - With action scaling: >>> import numpy as np >>> import gymnasium as gym >>> envs = gym.make_vec("MountainCarContinuous-v0", num_envs=3) >>> envs = RescaleAction(envs, 0.0, 1.0) >>> _ = envs.action_space.seed(123) >>> obs, info = envs.reset(seed=123) >>> for _ in range(10): ... obs, rew, term, trunc, info = envs.step(0.5 * np.ones((3, 1))) ... >>> envs.close() >>> obs array([[-0.48657528, -0.00395268], [-0.47377947, -0.00529102], [-0.46546045, -0.00614867]], dtype=float32) """ def __init__( self, env: VectorEnv, min_action: float | int | np.ndarray, max_action: float | int | np.ndarray, ) -> None: """Initializes the :class:`RescaleAction` wrapper. Args: env (Env): The vector environment to wrap min_action (float, int or np.ndarray): The min values for each action. This may be a numpy array or a scalar. max_action (float, int or np.ndarray): The max values for each action. This may be a numpy array or a scalar. """ super().__init__( env, transform_action.RescaleAction, min_action=min_action, max_action=max_action, )