Source code for gymnasium.spaces.multi_discrete

"""Implementation of a space that represents the cartesian product of `Discrete` spaces."""
from __future__ import annotations

from typing import Any, Iterable, Mapping, Sequence

import numpy as np
from numpy.typing import NDArray

import gymnasium as gym
from gymnasium.spaces.discrete import Discrete
from import MaskNDArray, Space

[docs] class MultiDiscrete(Space[NDArray[np.integer]]): """This represents the cartesian product of arbitrary :class:`Discrete` spaces. It is useful to represent game controllers or keyboards where each key can be represented as a discrete action space. Note: Some environment wrappers assume a value of 0 always represents the NOOP action. e.g. Nintendo Game Controller - Can be conceptualized as 3 discrete action spaces: 1. Arrow Keys: Discrete 5 - NOOP[0], UP[1], RIGHT[2], DOWN[3], LEFT[4] - params: min: 0, max: 4 2. Button A: Discrete 2 - NOOP[0], Pressed[1] - params: min: 0, max: 1 3. Button B: Discrete 2 - NOOP[0], Pressed[1] - params: min: 0, max: 1 It can be initialized as ``MultiDiscrete([ 5, 2, 2 ])`` such that a sample might be ``array([3, 1, 0])``. Although this feature is rarely used, :class:`MultiDiscrete` spaces may also have several axes if ``nvec`` has several axes: Example: >>> from gymnasium.spaces import MultiDiscrete >>> import numpy as np >>> observation_space = MultiDiscrete(np.array([[1, 2], [3, 4]]), seed=42) >>> observation_space.sample() array([[0, 0], [2, 2]]) """ def __init__( self, nvec: NDArray[np.integer[Any]] | list[int], dtype: str | type[np.integer[Any]] = np.int64, seed: int | np.random.Generator | None = None, start: NDArray[np.integer[Any]] | list[int] | None = None, ): """Constructor of :class:`MultiDiscrete` space. The argument ``nvec`` will determine the number of values each categorical variable can take. If ``start`` is provided, it will define the minimal values corresponding to each categorical variable. Args: nvec: vector of counts of each categorical variable. This will usually be a list of integers. However, you may also pass a more complicated numpy array if you'd like the space to have several axes. dtype: This should be some kind of integer type. seed: Optionally, you can use this argument to seed the RNG that is used to sample from the space. start: Optionally, the starting value the element of each class will take (defaults to 0). """ self.nvec = np.array(nvec, dtype=dtype, copy=True) if start is not None: self.start = np.array(start, dtype=dtype, copy=True) else: self.start = np.zeros(self.nvec.shape, dtype=dtype) assert ( self.start.shape == self.nvec.shape ), "start and nvec (counts) should have the same shape" assert (self.nvec > 0).all(), "nvec (counts) have to be positive" super().__init__(self.nvec.shape, dtype, seed) @property def shape(self) -> tuple[int, ...]: """Has stricter type than :class:`gym.Space` - never None.""" return self._shape # type: ignore @property def is_np_flattenable(self): """Checks whether this space can be flattened to a :class:`spaces.Box`.""" return True
[docs] def sample( self, mask: tuple[MaskNDArray, ...] | None = None ) -> NDArray[np.integer[Any]]: """Generates a single random sample this space. Args: mask: An optional mask for multi-discrete, expects tuples with a ``np.ndarray`` mask in the position of each action with shape ``(n,)`` where ``n`` is the number of actions and ``dtype=np.int8``. Only ``mask values == 1`` are possible to sample unless all mask values for an action are ``0`` then the default action ``self.start`` (the smallest element) is sampled. Returns: An ``np.ndarray`` of :meth:`Space.shape` """ if mask is not None: def _apply_mask( sub_mask: MaskNDArray | tuple[MaskNDArray, ...], sub_nvec: MaskNDArray | np.integer[Any], sub_start: MaskNDArray | np.integer[Any], ) -> int | list[Any]: if isinstance(sub_nvec, np.ndarray): assert isinstance( sub_mask, tuple ), f"Expects the mask to be a tuple for sub_nvec ({sub_nvec}), actual type: {type(sub_mask)}" assert len(sub_mask) == len( sub_nvec ), f"Expects the mask length to be equal to the number of actions, mask length: {len(sub_mask)}, nvec length: {len(sub_nvec)}" return [ _apply_mask(new_mask, new_nvec, new_start) for new_mask, new_nvec, new_start in zip( sub_mask, sub_nvec, sub_start ) ] else: assert np.issubdtype( type(sub_nvec), np.integer ), f"Expects the sub_nvec to be an action, actually: {sub_nvec}, {type(sub_nvec)}" assert isinstance( sub_mask, np.ndarray ), f"Expects the sub mask to be np.ndarray, actual type: {type(sub_mask)}" assert ( len(sub_mask) == sub_nvec ), f"Expects the mask length to be equal to the number of actions, mask length: {len(sub_mask)}, action: {sub_nvec}" assert ( sub_mask.dtype == np.int8 ), f"Expects the mask dtype to be np.int8, actual dtype: {sub_mask.dtype}" valid_action_mask = sub_mask == 1 assert np.all( np.logical_or(sub_mask == 0, valid_action_mask) ), f"Expects all masks values to 0 or 1, actual values: {sub_mask}" if np.any(valid_action_mask): return ( self.np_random.choice(np.where(valid_action_mask)[0]) + sub_start ) else: return sub_start return np.array(_apply_mask(mask, self.nvec, self.start), dtype=self.dtype) return (self.np_random.random(self.nvec.shape) * self.nvec).astype( self.dtype ) + self.start
def contains(self, x: Any) -> bool: """Return boolean specifying if x is a valid member of this space.""" if isinstance(x, Sequence): x = np.array(x) # Promote list to array for contains check # if nvec is uint32 and space dtype is uint32, then 0 <= x < self.nvec guarantees that x # is within correct bounds for space dtype (even though x does not have to be unsigned) return bool( isinstance(x, np.ndarray) and x.shape == self.shape and x.dtype != object and np.all(self.start <= x) and np.all(x - self.start < self.nvec) ) def to_jsonable( self, sample_n: Sequence[NDArray[np.integer[Any]]] ) -> list[Sequence[int]]: """Convert a batch of samples from this space to a JSONable data type.""" return [sample.tolist() for sample in sample_n] def from_jsonable( self, sample_n: list[Sequence[int]] ) -> list[NDArray[np.integer[Any]]]: """Convert a JSONable data type to a batch of samples from this space.""" return [np.array(sample) for sample in sample_n] def __repr__(self): """Gives a string representation of this space.""" if np.any(self.start != 0): return f"MultiDiscrete({self.nvec}, start={self.start})" return f"MultiDiscrete({self.nvec})" def __getitem__(self, index: int | tuple[int, ...]): """Extract a subspace from this ``MultiDiscrete`` space.""" nvec = self.nvec[index] start = self.start[index] if nvec.ndim == 0: subspace = Discrete(nvec, start=start) else: subspace = MultiDiscrete(nvec, self.dtype, start=start) # you don't need to deepcopy as np random generator call replaces the state not the data subspace.np_random.bit_generator.state = self.np_random.bit_generator.state return subspace def __len__(self): """Gives the ``len`` of samples from this space.""" if self.nvec.ndim >= 2: gym.logger.warn( "Getting the length of a multi-dimensional MultiDiscrete space." ) return len(self.nvec) def __eq__(self, other: Any) -> bool: """Check whether ``other`` is equivalent to this instance.""" return bool( isinstance(other, MultiDiscrete) and self.dtype == other.dtype and np.all(self.nvec == other.nvec) and np.all(self.start == other.start) ) def __setstate__(self, state: Iterable[tuple[str, Any]] | Mapping[str, Any]): """Used when loading a pickled space. This method has to be implemented explicitly to allow for loading of legacy states. Args: state: The new state """ state = dict(state) if "start" not in state: state["start"] = np.zeros(state["_shape"], dtype=state["dtype"]) super().__setstate__(state)