Source code for gymnasium.vector.vector_env

"""Base class for vectorized environments."""
from typing import Any, List, Optional, Tuple, Union

import numpy as np
from numpy.typing import NDArray

import gymnasium as gym
from gymnasium.vector.utils.spaces import batch_space


__all__ = ["VectorEnv"]


[docs]class VectorEnv(gym.Env): """Base class for vectorized environments to run multiple independent copies of the same environment in parallel. Vector environments can provide a linear speed-up in the steps taken per second through sampling multiple sub-environments at the same time. To prevent terminated environments waiting until all sub-environments have terminated or truncated, the vector environments autoreset sub-environments after they terminate or truncated. As a result, the final step's observation and info are overwritten by the reset's observation and info. Therefore, the observation and info for the final step of a sub-environment is stored in the info parameter, using `"final_observation"` and `"final_info"` respectively. See :meth:`step` for more information. The vector environments batch `observations`, `rewards`, `terminations`, `truncations` and `info` for each parallel environment. In addition, :meth:`step` expects to receive a batch of actions for each parallel environment. Gymnasium contains two types of Vector environments: :class:`AsyncVectorEnv` and :class:`SyncVectorEnv`. The Vector Environments have the additional attributes for users to understand the implementation - :attr:`num_envs` - The number of sub-environment in the vector environment - :attr:`observation_space` - The batched observation space of the vector environment - :attr:`single_observation_space` - The observation space of a single sub-environment - :attr:`action_space` - The batched action space of the vector environment - :attr:`single_action_space` - The action space of a single sub-environment Note: The info parameter of :meth:`reset` and :meth:`step` was originally implemented before OpenAI Gym v25 was a list of dictionary for each sub-environment. However, this was modified in OpenAI Gym v25+ and in Gymnasium to a dictionary with a NumPy array for each key. To use the old info style using the :class:`VectorListInfo`. Note: To render the sub-environments, use :meth:`call` with "render" arguments. Remember to set the `render_modes` for all the sub-environments during initialization. Note: All parallel environments should share the identical observation and action spaces. In other words, a vector of multiple different environments is not supported. """ def __init__( self, num_envs: int, observation_space: gym.Space, action_space: gym.Space, ): """Base class for vectorized environments. Args: num_envs: Number of environments in the vectorized environment. observation_space: Observation space of a single environment. action_space: Action space of a single environment. """ self.num_envs = num_envs self.is_vector_env = True self.observation_space = batch_space(observation_space, n=num_envs) self.action_space = batch_space(action_space, n=num_envs) self.closed = False self.viewer = None # The observation and action spaces of a single environment are # kept in separate properties self.single_observation_space = observation_space self.single_action_space = action_space def reset_async( self, seed: Optional[Union[int, List[int]]] = None, options: Optional[dict] = None, ): """Reset the sub-environments asynchronously. This method will return ``None``. A call to :meth:`reset_async` should be followed by a call to :meth:`reset_wait` to retrieve the results. Args: seed: The reset seed options: Reset options """ pass def reset_wait( self, seed: Optional[Union[int, List[int]]] = None, options: Optional[dict] = None, ): """Retrieves the results of a :meth:`reset_async` call. A call to this method must always be preceded by a call to :meth:`reset_async`. Args: seed: The reset seed options: Reset options Returns: The results from :meth:`reset_async` Raises: NotImplementedError: VectorEnv does not implement function """ raise NotImplementedError("VectorEnv does not implement function")
[docs] def reset( self, *, seed: Optional[Union[int, List[int]]] = None, options: Optional[dict] = None, ): """Reset all parallel environments and return a batch of initial observations and info. Args: seed: The environment reset seeds options: If to return the options Returns: A batch of observations and info from the vectorized environment. Example: >>> import gymnasium as gym >>> envs = gym.vector.make("CartPole-v1", num_envs=3) >>> envs.reset(seed=42) (array([[ 0.0273956 , -0.00611216, 0.03585979, 0.0197368 ], [ 0.01522993, -0.04562247, -0.04799704, 0.03392126], [-0.03774345, -0.02418869, -0.00942293, 0.0469184 ]], dtype=float32), {}) """ self.reset_async(seed=seed, options=options) return self.reset_wait(seed=seed, options=options)
def step_async(self, actions): """Asynchronously performs steps in the sub-environments. The results can be retrieved via a call to :meth:`step_wait`. Args: actions: The actions to take asynchronously """ def step_wait( self, **kwargs ) -> Tuple[Any, NDArray[Any], NDArray[Any], NDArray[Any], dict]: """Retrieves the results of a :meth:`step_async` call. A call to this method must always be preceded by a call to :meth:`step_async`. Args: **kwargs: Additional keywords for vector implementation Returns: The results from the :meth:`step_async` call """ raise NotImplementedError()
[docs] def step( self, actions ) -> Tuple[Any, NDArray[Any], NDArray[Any], NDArray[Any], dict]: """Take an action for each parallel environment. Args: actions: element of :attr:`action_space` Batch of actions. Returns: Batch of (observations, rewards, terminations, truncations, infos) Note: As the vector environments autoreset for a terminating and truncating sub-environments, the returned observation and info is not the final step's observation or info which is instead stored in info as `"final_observation"` and `"final_info"`. Example: >>> import gymnasium as gym >>> import numpy as np >>> envs = gym.vector.make("CartPole-v1", num_envs=3) >>> _ = envs.reset(seed=42) >>> actions = np.array([1, 0, 1]) >>> observations, rewards, termination, truncation, infos = envs.step(actions) >>> observations array([[ 0.02727336, 0.18847767, 0.03625453, -0.26141977], [ 0.01431748, -0.24002443, -0.04731862, 0.3110827 ], [-0.03822722, 0.1710671 , -0.00848456, -0.2487226 ]], dtype=float32) >>> rewards array([1., 1., 1.]) >>> termination array([False, False, False]) >>> truncation array([False, False, False]) >>> infos {} """ self.step_async(actions) return self.step_wait()
def call_async(self, name, *args, **kwargs): """Calls a method name for each parallel environment asynchronously.""" def call_wait(self, **kwargs) -> List[Any]: # type: ignore """After calling a method in :meth:`call_async`, this function collects the results.""" def call(self, name: str, *args, **kwargs) -> List[Any]: """Call a method, or get a property, from each parallel environment. Args: name (str): Name of the method or property to call. *args: Arguments to apply to the method call. **kwargs: Keyword arguments to apply to the method call. Returns: List of the results of the individual calls to the method or property for each environment. """ self.call_async(name, *args, **kwargs) return self.call_wait() def get_attr(self, name: str): """Get a property from each parallel environment. Args: name (str): Name of the property to be get from each individual environment. Returns: The property with name """ return self.call(name) def set_attr(self, name: str, values: Union[list, tuple, object]): """Set a property in each sub-environment. Args: name (str): Name of the property to be set in each individual environment. values (list, tuple, or object): Values of the property to be set to. If `values` is a list or tuple, then it corresponds to the values for each individual environment, otherwise a single value is set for all environments. """ def close_extras(self, **kwargs): """Clean up the extra resources e.g. beyond what's in this base class.""" pass
[docs] def close(self, **kwargs): """Close all parallel environments and release resources. It also closes all the existing image viewers, then calls :meth:`close_extras` and set :attr:`closed` as ``True``. Warnings: This function itself does not close the environments, it should be handled in :meth:`close_extras`. This is generic for both synchronous and asynchronous vectorized environments. Note: This will be automatically called when garbage collected or program exited. Args: **kwargs: Keyword arguments passed to :meth:`close_extras` """ if self.closed: return if self.viewer is not None: self.viewer.close() self.close_extras(**kwargs) self.closed = True
def _add_info(self, infos: dict, info: dict, env_num: int) -> dict: """Add env info to the info dictionary of the vectorized environment. Given the `info` of a single environment add it to the `infos` dictionary which represents all the infos of the vectorized environment. Every `key` of `info` is paired with a boolean mask `_key` representing whether or not the i-indexed environment has this `info`. Args: infos (dict): the infos of the vectorized environment info (dict): the info coming from the single environment env_num (int): the index of the single environment Returns: infos (dict): the (updated) infos of the vectorized environment """ for k in info.keys(): if k not in infos: info_array, array_mask = self._init_info_arrays(type(info[k])) else: info_array, array_mask = infos[k], infos[f"_{k}"] info_array[env_num], array_mask[env_num] = info[k], True infos[k], infos[f"_{k}"] = info_array, array_mask return infos def _init_info_arrays(self, dtype: type) -> Tuple[np.ndarray, np.ndarray]: """Initialize the info array. Initialize the info array. If the dtype is numeric the info array will have the same dtype, otherwise will be an array of `None`. Also, a boolean array of the same length is returned. It will be used for assessing which environment has info data. Args: dtype (type): data type of the info coming from the env. Returns: array (np.ndarray): the initialized info array. array_mask (np.ndarray): the initialized boolean array. """ if dtype in [int, float, bool] or issubclass(dtype, np.number): array = np.zeros(self.num_envs, dtype=dtype) else: array = np.zeros(self.num_envs, dtype=object) array[:] = None array_mask = np.zeros(self.num_envs, dtype=bool) return array, array_mask def __del__(self): """Closes the vector environment.""" if not getattr(self, "closed", True): self.close() def __repr__(self) -> str: """Returns a string representation of the vector environment. Returns: A string containing the class name, number of environments and environment spec id """ if self.spec is None: return f"{self.__class__.__name__}({self.num_envs})" else: return f"{self.__class__.__name__}({self.spec.id}, {self.num_envs})"
class VectorEnvWrapper(VectorEnv): """Wraps the vectorized environment to allow a modular transformation. This class is the base class for all wrappers for vectorized environments. The subclass could override some methods to change the behavior of the original vectorized environment without touching the original code. Note: Don't forget to call ``super().__init__(env)`` if the subclass overrides :meth:`__init__`. """ def __init__(self, env: VectorEnv): assert isinstance(env, VectorEnv) self.env = env # explicitly forward the methods defined in VectorEnv # to self.env (instead of the base class) def reset_async(self, **kwargs): return self.env.reset_async(**kwargs) def reset_wait(self, **kwargs): return self.env.reset_wait(**kwargs) def step_async(self, actions): return self.env.step_async(actions) def step_wait(self): return self.env.step_wait() def close(self, **kwargs): return self.env.close(**kwargs) def close_extras(self, **kwargs): return self.env.close_extras(**kwargs) def call(self, name, *args, **kwargs): return self.env.call(name, *args, **kwargs) def set_attr(self, name, values): return self.env.set_attr(name, values) # implicitly forward all other methods and attributes to self.env def __getattr__(self, name): if name.startswith("_"): raise AttributeError(f"attempted to get missing private attribute '{name}'") return getattr(self.env, name) @property def unwrapped(self): return self.env.unwrapped def __repr__(self): return f"<{self.__class__.__name__}, {self.env}>" def __del__(self): self.env.__del__()