Source code for gymnasium.vector.sync_vector_env

"""Implementation of a synchronous (for loop) vectorization method of any environment."""
from __future__ import annotations

from copy import deepcopy
from typing import Any, Callable, Iterator, Sequence

import numpy as np

from gymnasium import Env
from gymnasium.core import ActType, ObsType, RenderFrame
from gymnasium.vector.utils import batch_space, concatenate, create_empty_array, iterate
from gymnasium.vector.vector_env import ArrayType, VectorEnv


__all__ = ["SyncVectorEnv"]


[docs] class SyncVectorEnv(VectorEnv): """Vectorized environment that serially runs multiple environments. Example: >>> import gymnasium as gym >>> envs = gym.make_vec("Pendulum-v1", num_envs=2, vectorization_mode="sync") >>> envs SyncVectorEnv(Pendulum-v1, num_envs=2) >>> envs = gym.vector.SyncVectorEnv([ ... lambda: gym.make("Pendulum-v1", g=9.81), ... lambda: gym.make("Pendulum-v1", g=1.62) ... ]) >>> envs SyncVectorEnv(num_envs=2) >>> obs, infos = envs.reset(seed=42) >>> obs array([[-0.14995256, 0.9886932 , -0.12224312], [ 0.5760367 , 0.8174238 , -0.91244936]], dtype=float32) >>> infos {} >>> _ = envs.action_space.seed(42) >>> actions = envs.action_space.sample() >>> obs, rewards, terminates, truncates, infos = envs.step(actions) >>> obs array([[-0.1878752 , 0.98219293, 0.7695615 ], [ 0.6102389 , 0.79221743, -0.8498053 ]], dtype=float32) >>> rewards array([-2.96562607, -0.99902063]) >>> terminates array([False, False]) >>> truncates array([False, False]) >>> infos {} >>> envs.close() """ def __init__( self, env_fns: Iterator[Callable[[], Env]] | Sequence[Callable[[], Env]], copy: bool = True, ): """Vectorized environment that serially runs multiple environments. Args: env_fns: iterable of callable functions that create the environments. copy: If ``True``, then the :meth:`reset` and :meth:`step` methods return a copy of the observations. Raises: RuntimeError: If the observation space of some sub-environment does not match observation_space (or, by default, the observation space of the first sub-environment). """ self.copy = copy self.env_fns = env_fns # Initialise all sub-environments self.envs = [env_fn() for env_fn in env_fns] # Define core attributes using the sub-environments # As we support `make_vec(spec)` then we can't include a `spec = self.envs[0].spec` as this doesn't guarantee we can actual recreate the vector env. self.num_envs = len(self.envs) self.metadata = self.envs[0].metadata self.render_mode = self.envs[0].render_mode # Initialises the single spaces from the sub-environments self.single_observation_space = self.envs[0].observation_space self.single_action_space = self.envs[0].action_space self._check_spaces() # Initialise the obs and action space based on the single versions and num of sub-environments self.observation_space = batch_space( self.single_observation_space, self.num_envs ) self.action_space = batch_space(self.single_action_space, self.num_envs) # Initialise attributes used in `step` and `reset` self._observations = create_empty_array( self.single_observation_space, n=self.num_envs, fn=np.zeros ) self._rewards = np.zeros((self.num_envs,), dtype=np.float64) self._terminations = np.zeros((self.num_envs,), dtype=np.bool_) self._truncations = np.zeros((self.num_envs,), dtype=np.bool_) self._autoreset_envs = np.zeros((self.num_envs,), dtype=np.bool_) @property def np_random_seed(self) -> tuple[int, ...]: """Returns a tuple of np random seeds for the wrapped envs.""" return self.get_attr("np_random_seed") @property def np_random(self) -> tuple[np.random.Generator, ...]: """Returns a tuple of the numpy random number generators for the wrapped envs.""" return self.get_attr("np_random")
[docs] def reset( self, *, seed: int | list[int] | None = None, options: dict[str, Any] | None = None, ) -> tuple[ObsType, dict[str, Any]]: """Resets each of the sub-environments and concatenate the results together. Args: seed: Seeds used to reset the sub-environments, either * ``None`` - random seeds for all environment * ``int`` - ``[seed, seed+1, ..., seed+n]`` * List of ints - ``[1, 2, 3, ..., n]`` options: Option information used for each sub-environment Returns: Concatenated observations and info from each sub-environment """ if seed is None: seed = [None for _ in range(self.num_envs)] elif isinstance(seed, int): seed = [seed + i for i in range(self.num_envs)] assert ( len(seed) == self.num_envs ), f"If seeds are passed as a list the length must match num_envs={self.num_envs} but got length={len(seed)}." self._terminations = np.zeros((self.num_envs,), dtype=np.bool_) self._truncations = np.zeros((self.num_envs,), dtype=np.bool_) observations, infos = [], {} for i, (env, single_seed) in enumerate(zip(self.envs, seed)): env_obs, env_info = env.reset(seed=single_seed, options=options) observations.append(env_obs) infos = self._add_info(infos, env_info, i) # Concatenate the observations self._observations = concatenate( self.single_observation_space, observations, self._observations ) return deepcopy(self._observations) if self.copy else self._observations, infos
[docs] def step( self, actions: ActType ) -> tuple[ObsType, ArrayType, ArrayType, ArrayType, dict[str, Any]]: """Steps through each of the environments returning the batched results. Returns: The batched environment step results """ actions = iterate(self.action_space, actions) observations, infos = [], {} for i, action in enumerate(actions): if self._autoreset_envs[i]: env_obs, env_info = self.envs[i].reset() self._rewards[i] = 0.0 self._terminations[i] = False self._truncations[i] = False else: ( env_obs, self._rewards[i], self._terminations[i], self._truncations[i], env_info, ) = self.envs[i].step(action) observations.append(env_obs) infos = self._add_info(infos, env_info, i) # Concatenate the observations self._observations = concatenate( self.single_observation_space, observations, self._observations ) self._autoreset_envs = np.logical_or(self._terminations, self._truncations) return ( deepcopy(self._observations) if self.copy else self._observations, np.copy(self._rewards), np.copy(self._terminations), np.copy(self._truncations), infos, )
def render(self) -> tuple[RenderFrame, ...] | None: """Returns the rendered frames from the environments.""" return tuple(env.render() for env in self.envs)
[docs] def call(self, name: str, *args: Any, **kwargs: Any) -> tuple[Any, ...]: """Calls a sub-environment method with name and applies args and kwargs. Args: name: The method name *args: The method args **kwargs: The method kwargs Returns: Tuple of results """ results = [] for env in self.envs: function = env.get_wrapper_attr(name) if callable(function): results.append(function(*args, **kwargs)) else: results.append(function) return tuple(results)
[docs] def get_attr(self, name: str) -> tuple[Any, ...]: """Get a property from each parallel environment. Args: name (str): Name of the property to get from each individual environment. Returns: The property with name """ return self.call(name)
[docs] def set_attr(self, name: str, values: list[Any] | tuple[Any, ...] | Any): """Sets an attribute of the sub-environments. Args: name: The property name to change values: Values of the property to be set to. If ``values`` is a list or tuple, then it corresponds to the values for each individual environment, otherwise, a single value is set for all environments. Raises: ValueError: Values must be a list or tuple with length equal to the number of environments. """ if not isinstance(values, (list, tuple)): values = [values for _ in range(self.num_envs)] if len(values) != self.num_envs: raise ValueError( "Values must be a list or tuple with length equal to the number of environments. " f"Got `{len(values)}` values for {self.num_envs} environments." ) for env, value in zip(self.envs, values): env.set_wrapper_attr(name, value)
def close_extras(self, **kwargs: Any): """Close the environments.""" if hasattr(self, "envs"): [env.close() for env in self.envs] def _check_spaces(self) -> bool: """Check that each of the environments obs and action spaces are equivalent to the single obs and action space.""" for env in self.envs: if not (env.observation_space == self.single_observation_space): raise RuntimeError( f"Some environments have an observation space different from `{self.single_observation_space}`. " "In order to batch observations, the observation spaces from all environments must be equal." ) if not (env.action_space == self.single_action_space): raise RuntimeError( f"Some environments have an action space different from `{self.single_action_space}`. " "In order to batch actions, the action spaces from all environments must be equal." ) return True