Source code for pyhdc.encodings.base

from abc import ABC, abstractmethod
from math import pi, sqrt
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import numpy as np

# Optional PyTorch import
try:
    import torch

    TORCH_AVAILABLE = True
except ImportError:
    TORCH_AVAILABLE = False
    torch = None


from pyhdc.components.binding import (
    ElementAngleAddition,
    ElementAngleSubtraction,
    ElementMultiplication,
    ExclusiveOr,
)
from pyhdc.components.elements import (
    BernoulliBinary,
    BernoulliBipolar,
    BernoulliSparse,
    NormalReal,
    UniformAngles,
    UniformBipolar,
)
from pyhdc.components.unary import CyclicShift
from pyhdc.config import get_default_backend, get_default_device
from pyhdc.exceptions import GeneratorNotSupportedError
from pyhdc.generation.base import DefaultGenerator, HDCGenerator
from pyhdc.hypervector import BackendManager, EncodingSpec, Hypervector
from pyhdc.types import ArrayLike, Backend, Device

# Element generators eligible for the vectorized batched-generation fast path.
# Each is a pure i.i.d. per-element draw, so the whole batch is drawn in one
# (D, *batch) call (see Encoding._fast_path_draw).
_IID_ELEMENT_GENERATORS = frozenset(
    {
        BernoulliBipolar,
        BernoulliBinary,
        UniformBipolar,
        UniformAngles,
        NormalReal,
        BernoulliSparse,
    }
)

# Binders that handle a batched ``(D, *batch)`` input natively (broadcast or
# vectorize). Any other binder is applied per column when bind/unbind receives a
# batch, so a batched call still returns a single batched Hypervector.
_BATCH_SAFE_BINDERS = frozenset(
    {
        ElementMultiplication,
        ExclusiveOr,
        ElementAngleAddition,
        ElementAngleSubtraction,
    }
)


def _warn_batch_dim() -> None:
    """Emit the deprecation warning for the ``batch_dim`` parameter."""
    import warnings

    warnings.warn(
        "batch_dim is deprecated and will be removed in a future release. Pass a "
        "batched array directly (bundle/bind/unbind batch automatically) or use "
        "axis= on bundle.",
        DeprecationWarning,
        stacklevel=3,
    )


def _unpack_operation_result(
    result: Union[ArrayLike, Tuple[ArrayLike, Dict[str, Any]]], operation_fn: Callable
) -> Tuple[ArrayLike, Dict[str, Any]]:
    """
    Unpack operation result, handling both old and new-style returns.

    Adds 'operation' key with function name to metadata dict.

    Args:
        result: Either raw ArrayLike or (ArrayLike, metadata_dict) tuple
        operation_fn: The operation function that produced this result

    Returns:
        Tuple of (data, metadata_dict)
    """
    if isinstance(result, tuple) and len(result) == 2:
        data, metadata = result
        import functools

        if isinstance(operation_fn, functools.partial):
            op_name = operation_fn.func.__name__
        else:
            op_name = operation_fn.__name__

        if isinstance(metadata, dict):
            enhanced_metadata = {"operation": op_name, **metadata}
            return data, enhanced_metadata
        else:
            # Non-dict secondary return
            # (e.g. MatrixMultiplication returns matrices list)
            return data, {"operation": op_name, "aux": metadata}
    return result, {}


[docs] class Encoding(ABC): """ Base class for hypervector encoding schemes. An encoding defines how hypervectors are generated and how operations (similarity, bundling, binding) are performed on them. """ def __init__( self, dimension: int = 10_000, backend: Optional[Backend] = None, device: Optional[Device] = None, dtype: Optional[Any] = None, mask: Optional[int] = None, generator: Optional[HDCGenerator] = None, similarity_remap: Optional[Callable] = None, ) -> None: """ Initialize an encoding scheme. Args: dimension: Number of dimensions in hypervectors backend: Backend to use ('numpy' or 'torch'). Defaults to the global preference (see ``pyhdc.prefer_torch``/``prefer_numpy``), which is 'numpy' unless changed. device: Device for PyTorch backend. Defaults to the global preference (see ``pyhdc.prefer_cuda``/``prefer_cpu``). dtype: Data type override mask: Optional mask value generator: Optional custom generator (uses default if None) similarity_remap: Optional callable applied to every similarity result before returning. All similarity functions return [-1, 1] by default; use this to remap the output, e.g. ``pyhdc.components.similarity.remap_to_unit`` to shift to [0, 1]. """ if backend is None: backend = get_default_backend() if device is None: device = get_default_device() self.dimension = dimension self.backend = backend self.device = device if backend == "torch" else None self._similarity_remap = similarity_remap if backend == "torch" and not TORCH_AVAILABLE: raise ImportError( "PyTorch backend requested but PyTorch is not installed. " "Install it with: pip install torch" ) # Set up generator self._has_custom_generator = generator is not None self._generator = generator if generator is not None else DefaultGenerator() # Get encoding specification spec = self._get_encoding_spec() # Override dtype if provided if dtype is not None: spec.dtype = dtype if mask is not None: spec.mask = mask self._spec = spec self._validate_generator() def _validate_generator(self) -> None: """Validate that generator supports required output type.""" output_type = self._spec.generator_output_type if output_type == "bits" and not self._generator.supports_bits(): raise GeneratorNotSupportedError( f"Generator {self._generator.__class__.__name__} does not support " f"bit generation required by {self.__class__.__name__}" ) elif output_type == "words" and not self._generator.supports_words(): raise GeneratorNotSupportedError( f"Generator {self._generator.__class__.__name__} does not support " f"word generation required by {self.__class__.__name__}" ) elif output_type == "floats" and not self._generator.supports_floats(): raise GeneratorNotSupportedError( f"Generator {self._generator.__class__.__name__} does not support " f"float generation required by {self.__class__.__name__}" )
[docs] @abstractmethod def _get_encoding_spec(self) -> EncodingSpec: """Get the encoding specification for this encoding type."""
def _generate_with_generator(self, size: Union[int, Tuple[int, ...]]) -> np.ndarray: """ Generate data using the custom generator. Args: size: Size specification Returns: Generated numpy array """ # Determine total number of elements if isinstance(size, int): total_elements = size shape = (size,) elif isinstance(size, tuple): total_elements = int(np.prod(size)) shape = size else: raise ValueError(f"Invalid size specification: {size}") output_type = self._spec.generator_output_type # Generate based on output type if output_type == "bits": data = self._generator.generate_bits(total_elements) elif output_type == "words": # Determine word size from dtype dtype_bits = np.dtype(self._spec.dtype).itemsize * 8 data = self._generator.generate_words(total_elements, dtype_bits) elif output_type == "floats": # Determine range based on dtype if np.issubdtype(self._spec.dtype, np.integer): # For integer dtypes, generate in [0, 1] then scale floats = self._generator.generate_floats(total_elements, 0.0, 1.0) # Convert to appropriate range if ( self._spec.dtype == np.int8 or "bipolar" in self._spec.element_generator.__name__.lower() ): # Bipolar: map to {-1, 1} data = [int(2 * round(f) - 1) for f in floats] else: # Binary: map to {0, 1} data = [int(round(f)) for f in floats] else: # For float dtypes, generate in [-1, 1] or appropriate range data = self._generator.generate_floats(total_elements, -1.0, 1.0) else: raise ValueError(f"Unknown output type: {output_type}") # Convert to numpy array and reshape arr = np.array(data, dtype=self._spec.dtype) return arr.reshape(shape) def _generate_one(self, dim: int, use_generator: bool) -> np.ndarray: """ Generate a single ``(dim,)`` hypervector as a numpy array. Args: dim: Hypervector dimension. use_generator: Whether to use the custom HDCGenerator pathway. Returns: A 1D numpy array of length ``dim``. """ if use_generator and self._generator is not None: return self._generate_with_generator(dim) return self._spec.element_generator(dim, self._spec.dtype) def _fast_path_draw( self, dim: int, batch: Tuple[int, ...], use_generator: bool ) -> Optional[np.ndarray]: """ Vectorized i.i.d. draw for a batch, or ``None`` to use the sequential loop. Draws the whole ``(D, *batch)`` array in one call, directly in dimension-first layout, for the pure i.i.d. element generators. The result is reproducible under a fixed seed for a given batch shape, but is not value-identical to generating the vectors one at a time (the RNG stream is consumed as a single block rather than per column). Ordered/custom generators and ``SparseSegmented`` return ``None`` and fall back to the sequential loop. """ if use_generator: return None gen = self._spec.element_generator if gen not in _IID_ELEMENT_GENERATORS: return None # Draw directly in (D, *batch) output layout. Each call mirrors its element # generator keyword-for-keyword so the per-element distribution is identical. shape = (dim, *batch) if gen is BernoulliBipolar: arr = np.random.choice([-1, 1], size=shape, p=[0.5, 0.5]) elif gen is BernoulliBinary: arr = np.random.binomial(size=shape, n=1, p=0.5) elif gen is UniformBipolar: arr = np.random.uniform(-1, 1, shape) elif gen is UniformAngles: arr = np.random.uniform(-pi, pi, shape) elif gen is NormalReal: arr = np.random.normal(0, sqrt(1 / dim), size=shape) elif gen is BernoulliSparse: arr = np.random.binomial(size=shape, n=1, p=1 / sqrt(dim)) else: # pragma: no cover return None return np.ascontiguousarray(arr, dtype=self._spec.dtype)
[docs] def generate( self, size: Union[int, Tuple[int, ...]] = None, backend: Optional[Backend] = None, device: Optional[Device] = None, use_generator: Optional[bool] = None, ) -> Hypervector: """ Generate random hypervector(s). Hypervectors are dimension-first. A scalar (or ``None``) ``size`` produces a single ``(D,)`` hypervector; a tuple ``(D, N)`` produces a batch of ``N`` hypervectors of dimension ``D`` stored as columns of a ``(D, N)`` array (and likewise ``(D, N, M)`` for higher-rank batches). Batched generation is reproducible under a fixed seed for a given batch shape. For the i.i.d. element generators the batch is drawn in one vectorized call. Ordered and custom generators draw per vector. Args: size: ``None`` or int for a single ``(D,)`` vector; a tuple ``(D, *batch)`` for a batch of ``prod(batch)`` vectors of dimension ``D``. backend: Backend override (defaults to the encoding's backend). device: Device override for the torch backend. use_generator: Whether to use the HDCGenerator pathway. Defaults to True if a custom generator was passed at construction, False otherwise (uses element_generator directly, which gives the correct per-encoding distribution). Returns: A new Hypervector. """ if backend is None: backend = self.backend if device is None: device = self.device if use_generator is None: use_generator = self._has_custom_generator if size is None or isinstance(size, int): dim = self.dimension if size is None else size data = self._generate_one(dim, use_generator) elif isinstance(size, tuple): dim, batch = size[0], size[1:] if not batch: data = self._generate_one(dim, use_generator) else: data = self._fast_path_draw(dim, batch, use_generator) if data is None: count = 1 for axis in batch: count *= int(axis) columns = [ self._generate_one(dim, use_generator) for _ in range(count) ] data = np.stack(columns, axis=-1).reshape((dim, *batch)) else: raise ValueError(f"Invalid size specification: {size}") # Convert to appropriate backend if backend == "torch": data = BackendManager.to_torch(data, device) return Hypervector(data, self, backend, None)
[docs] def zeros( self, size: Union[int, Tuple[int, ...]] = None, backend: Optional[Backend] = None, device: Optional[Device] = None, ) -> Hypervector: """Generate zero hypervector(s).""" if size is None: size = self.dimension if backend is None: backend = self.backend if device is None: device = self.device if backend == "torch": data = torch.zeros(size, dtype=self._spec.dtype, device=device) else: data = np.zeros(size, dtype=self._spec.dtype) return Hypervector(data, self, backend, None)
[docs] def from_array( self, array: ArrayLike, backend: Optional[Backend] = None ) -> Hypervector: """Create a Hypervector from an existing array.""" if backend is None: backend = BackendManager.get_backend(array) return Hypervector(array, self, backend, None)
[docs] def set_generator(self, generator: HDCGenerator) -> None: """ Set a new generator for this encoding. Args: generator: The new generator to use Raises: GeneratorNotSupportedError: If generator doesn't support required output type """ self._generator = generator self._validate_generator()
[docs] def get_generator(self) -> HDCGenerator: """Get the current generator.""" return self._generator
[docs] def similarity( self, hvA: Union[ArrayLike, Hypervector, List], hvB: Optional[Union[ArrayLike, Hypervector, List]] = None, *, axis: Optional[int] = None, ) -> Union[float, ArrayLike, List[Union[float, ArrayLike]]]: """ Compute similarity between hypervector(s). Hypervectors are dimension-first ``(D, N)``. Calling conventions: - ``similarity(a, b)`` with two ``(D,)`` vectors -> a scalar score - ``similarity(A, B)`` with two ``(D, N)`` batches -> ``N`` per-column scores - ``similarity(v, B)`` with a vector and a ``(D, N)`` batch -> ``N`` scores - ``similarity(batch)`` with one ``(D, N)`` batch -> ``N-1`` scores of column 0 against each remaining column - ``similarity([..], [..])`` with two equal-length lists -> pairwise scores Args: hvA: First hypervector(s) (Hypervector, array, or list), or a single ``(D, N)`` batch when ``hvB`` is omitted. hvB: Optional second hypervector(s). Returns: A scalar, a 1D array of scores, or a list of scores (for list inputs). Examples: >>> bsc.similarity(hv1, hv2) # scalar >>> enc.similarity(codebook) # col 0 vs the rest >>> bsc.similarity([hv1, hv2], [hv4, hv5]) # [sim(1,4), sim(2,5)] """ from pyhdc.components.input_formatting import _extract_data # Batched if both are lists of equal length if isinstance(hvA, list) and isinstance(hvB, list): if len(hvA) != len(hvB): raise ValueError( f"Batched similarity requires equal-length lists. " f"Got {len(hvA)} and {len(hvB)}" ) results = [] for a, b in zip(hvA, hvB): data_a = _extract_data(a) data_b = _extract_data(b) sim = self._spec.similarity_fn(data_a, data_b, axis=axis) if self._similarity_remap is not None: sim = self._similarity_remap(sim) results.append(sim) return results # Single (D, N) batch (hvB omitted) or a two-operand comparison if hvB is None: result = self._spec.similarity_fn(_extract_data(hvA), axis=axis) else: result = self._spec.similarity_fn( _extract_data(hvA), _extract_data(hvB), axis=axis ) if self._similarity_remap is not None: result = self._similarity_remap(result) return result
[docs] def bundle( self, *hypervectors: Union[ArrayLike, Hypervector, List], axis: Union[None, int, Tuple[int, ...]] = None, batch_dim: Optional[int] = None, ) -> Union[Hypervector, List[Hypervector]]: """ Bundle multiple hypervectors, optionally in batches. Args: *hypervectors: Hypervector objects, raw arrays, or lists to bundle batch_dim: If provided with 3D+ array, split along this dimension for batching Returns: Single Hypervector (if not batched) or List of Hypervectors (if batched) Examples: >>> # Single bundle (current behavior) >>> bsc.bundle(hv1, hv2, hv3) # Returns: Hypervector >>> bsc.bundle([hv1, hv2, hv3]) # Returns: Hypervector >>> # Batched bundles (new) >>> bsc.bundle([[hv1, hv2], [hv3, hv4]]) # Returns: [bundled1, bundled2] >>> bsc.bundle(array_3d, batch_dim=0) ... # Returns: list of bundled hypervectors >>> enc.bundle(tensor_DNM, axis=2) # (D, N, M) -> (D, N) """ if axis is not None and batch_dim is not None: raise ValueError("bundle accepts either axis= or batch_dim=, not both") if batch_dim is not None: _warn_batch_dim() from pyhdc.components.input_formatting import ( _detect_batch_structure, _extract_data, _normalize_inputs, ) # Vectorized fast path: a single 3D array split by batch_dim is the same # as reducing the *other* batch axis in one op, then splitting the result. # Taken before _detect_batch_structure so we skip its per-slice split and # the per-group loop. Ragged nested lists, batch_dim=0, and 4D+ fall # through; the wrapper loop in _split_batch_result does no numerical work. if batch_dim in (1, 2) and len(hypervectors) == 1: arr = _extract_data(hypervectors[0]) if getattr(arr, "ndim", 0) == 3: reduce_axis = 2 if batch_dim == 1 else 1 result = self._spec.bundling_fn(arr, axis=reduce_axis) result_data, metadata = _unpack_operation_result( result, self._spec.bundling_fn ) return self._split_batch_result(result_data, metadata) # Detect if this is a batched operation is_batched, groups = _detect_batch_structure(*hypervectors, batch_dim=batch_dim) if is_batched: # General path: per-group loop (ragged nested lists, other ranks). results = [] for group in groups: # Normalize each group if isinstance(group, (list, tuple)): data_arrays, _, _ = _normalize_inputs(*group) else: data_arrays, _, _ = _normalize_inputs(group) result = self._spec.bundling_fn(*data_arrays) result_data, metadata = _unpack_operation_result( result, self._spec.bundling_fn ) results.append(Hypervector(result_data, self, self.backend, metadata)) return results else: # Single operation if isinstance(groups, (list, tuple)) and len(groups) > 0: data_arrays, _, _ = _normalize_inputs(*groups) else: data_arrays, _, _ = _normalize_inputs(groups) result = self._spec.bundling_fn(*data_arrays, axis=axis) result_data, metadata = _unpack_operation_result( result, self._spec.bundling_fn ) return Hypervector(result_data, self, self.backend, metadata)
def _split_batch_result( self, result_data: ArrayLike, metadata: Dict[str, Any] ) -> List[Hypervector]: """ Split a vectorized ``(D, n)`` bundle result into ``n`` ``(D,)`` Hypervectors. Backs the ``batch_dim`` fast path. Per-output metadata arrays (whose last axis matches the kept batch axis) are sliced per result, scalars and the ``operation`` name are shared. """ n = result_data.shape[1] results = [] for j in range(n): meta_j = { key: ( val[..., j] if hasattr(val, "shape") and getattr(val, "ndim", 0) >= 1 and val.shape[-1] == n else val ) for key, val in metadata.items() } results.append(Hypervector(result_data[:, j], self, self.backend, meta_j)) return results def _bind_single(self, fn: Callable, data_arrays: List[ArrayLike]) -> Hypervector: """ Apply a binding function to one set of operands, batching automatically. Element-wise binders broadcast a batch natively. Any other binder is applied per column over the trailing batch axes, so a batched call still returns one batched Hypervector without requiring ``batch_dim``. """ import functools base_fn = fn.func if isinstance(fn, functools.partial) else fn batched = any(getattr(a, "ndim", 1) > 1 for a in data_arrays) if batched and base_fn not in _BATCH_SAFE_BINDERS: return self._bind_per_column(fn, data_arrays) result = fn(*data_arrays) result_data, metadata = _unpack_operation_result(result, fn) return Hypervector(result_data, self, self.backend, metadata) def _bind_per_column( self, fn: Callable, data_arrays: List[ArrayLike] ) -> Hypervector: """Apply a non-batch-safe binder per column over the trailing batch axes.""" from pyhdc.components.input_formatting import _broadcast_operands is_torch = self.backend == "torch" operands = _broadcast_operands(data_arrays, is_torch) ref = operands[0] dim = ref.shape[0] batch_shape = tuple(int(s) for s in ref.shape[1:]) count = 1 for size in batch_shape: count *= size flats = [op.reshape(op.shape[0], -1) for op in operands] cols = [] agg_meta: Dict[str, Any] = {} for i in range(count): result = fn(*(flat[:, i] for flat in flats)) res_data, meta = _unpack_operation_result(result, fn) cols.append(res_data) for key, val in meta.items(): if key == "operation": agg_meta[key] = val else: agg_meta.setdefault(key, []).append(val) if is_torch: stacked = torch.stack(cols, dim=-1) else: stacked = np.stack(cols, axis=-1) stacked = stacked.reshape((dim, *batch_shape)) return Hypervector(stacked, self, self.backend, agg_meta)
[docs] def thin( self, hypervector: Union[ArrayLike, Hypervector, List] ) -> Union[Hypervector, List[Hypervector]]: """ Apply thinning to hypervector(s). Supports batching: if a list is provided, applies thinning independently to each hypervector in the list. Args: hypervector: Hypervector object, raw array, or list of hypervectors to thin Returns: Single Hypervector (if single input) or List of Hypervectors (if list input) Examples: >>> # Single thinning >>> bsc.thin(hv) # Returns: Hypervector >>> # Batched thinning >>> bsc.thin([hv1, hv2, hv3]) # Returns: [thinned1, thinned2, thinned3] """ from pyhdc.components.input_formatting import _extract_data # Batched if list input if isinstance(hypervector, list): results = [] for hv in hypervector: data = _extract_data(hv) result = self._spec.thinning_fn(data) result_data, metadata = _unpack_operation_result( result, self._spec.thinning_fn ) results.append(Hypervector(result_data, self, self.backend, metadata)) return results else: # Single operation data = _extract_data(hypervector) result = self._spec.thinning_fn(data) result_data, metadata = _unpack_operation_result( result, self._spec.thinning_fn ) return Hypervector(result_data, self, self.backend, metadata)
[docs] def permute( self, hypervector: Union[ArrayLike, Hypervector], shift: int = 1 ) -> Hypervector: """ Permute (cyclic-shift) a hypervector along the dimension axis (axis 0). Args: hypervector: Hypervector or raw array to permute. shift: Positions to roll along axis 0; negative inverts the permute. Returns: A new permuted Hypervector. """ from pyhdc.components.input_formatting import _extract_data fn = self._spec.permute_fn or CyclicShift data = _extract_data(hypervector) result = fn(data, shift=shift) result_data, metadata = _unpack_operation_result(result, fn) return Hypervector(result_data, self, self.backend, metadata)
[docs] def inverse(self, hypervector: Union[ArrayLike, Hypervector]) -> Hypervector: """ Binding inverse of a hypervector. Raises ``NotImplementedError`` for encodings whose binding has no defined inverse (e.g. MAP_C continuous, VTB, MBAT, BSDC_*). """ from pyhdc.components.input_formatting import _extract_data data = _extract_data(hypervector) result = self._spec.inverse_fn(data) result_data, metadata = _unpack_operation_result(result, self._spec.inverse_fn) return Hypervector(result_data, self, self.backend, metadata)
[docs] def negative(self, hypervector: Union[ArrayLike, Hypervector]) -> Hypervector: """ Bundling (additive) inverse of a hypervector. Raises ``NotImplementedError`` for encodings with no defined negative (e.g. FHRR, BSC, BSDC_*). """ from pyhdc.components.input_formatting import _extract_data data = _extract_data(hypervector) result = self._spec.negative_fn(data) result_data, metadata = _unpack_operation_result(result, self._spec.negative_fn) return Hypervector(result_data, self, self.backend, metadata)
[docs] def normalize(self, hypervector: Union[ArrayLike, Hypervector]) -> Hypervector: """ Normalize a hypervector to its encoding's canonical form (L2 unit length for real encodings, bipolar sign for MAP, phase wrap for FHRR). Raises ``NotImplementedError`` for encodings with no defined normalization (e.g. BSC, BSDC_*). """ from pyhdc.components.input_formatting import _extract_data data = _extract_data(hypervector) result = self._spec.normalize_fn(data) result_data, metadata = _unpack_operation_result( result, self._spec.normalize_fn ) return Hypervector(result_data, self, self.backend, metadata)
[docs] def bind( self, *hypervectors: Union[ArrayLike, Hypervector, List], batch_dim: Optional[int] = None, ) -> Union[Hypervector, List[Hypervector]]: """ Bind hypervectors, batching automatically when the input is batched. A single ``(D, N)`` (or higher-rank) batch is bound in one call: the element-wise binders (MAP multiply, BSC xor, FHRR angle add) must broadcast natively, and the others (convolution, shifting, matrix, VTB, CDT) are applied per column internally, so the result is one batched Hypervector. ``batch_dim`` is no longer required. Args: *hypervectors: Hypervector objects, raw arrays, or lists to bind. batch_dim: Deprecated. Splits a 3D+ array along this axis and returns a list of results, pass a batched array directly instead. Returns: A single Hypervector, or a list of Hypervectors for the deprecated ``batch_dim`` / nested-list forms. """ if batch_dim is not None: _warn_batch_dim() from pyhdc.components.input_formatting import ( _detect_batch_structure, _normalize_inputs, ) is_batched, groups = _detect_batch_structure(*hypervectors, batch_dim=batch_dim) if is_batched: results = [] for group in groups: if isinstance(group, (list, tuple)): data_arrays, _, _ = _normalize_inputs(*group) else: data_arrays, _, _ = _normalize_inputs(group) results.append(self._bind_single(self._spec.binding_fn, data_arrays)) return results if isinstance(groups, (list, tuple)) and len(groups) > 0: data_arrays, _, _ = _normalize_inputs(*groups) else: data_arrays, _, _ = _normalize_inputs(groups) return self._bind_single(self._spec.binding_fn, data_arrays)
[docs] def unbind( self, *hypervectors: Union[ArrayLike, Hypervector, List], batch_dim: Optional[int] = None, ) -> Union[Hypervector, List[Hypervector]]: """ Unbind hypervectors, batching automatically when the input is batched. Mirrors :meth:`bind`: a single ``(D, N)`` batch is unbound in one call (element-wise unbinders broadcast; the others are applied per column), so ``batch_dim`` is no longer required. Args: *hypervectors: Hypervector objects, raw arrays, or lists to unbind. batch_dim: Deprecated. Splits a 3D+ array along this axis and returns a list of results, pass a batched array directly instead. Returns: A single Hypervector, or a list of Hypervectors for the deprecated ``batch_dim`` / nested-list forms. Raises: NotImplementedError: For encodings that do not support unbinding. """ if batch_dim is not None: _warn_batch_dim() from pyhdc.components.input_formatting import ( _detect_batch_structure, _normalize_inputs, ) is_batched, groups = _detect_batch_structure(*hypervectors, batch_dim=batch_dim) if is_batched: results = [] for group in groups: if isinstance(group, (list, tuple)): data_arrays, _, _ = _normalize_inputs(*group) else: data_arrays, _, _ = _normalize_inputs(group) results.append(self._bind_single(self._spec.unbinding_fn, data_arrays)) return results if isinstance(groups, (list, tuple)) and len(groups) > 0: data_arrays, _, _ = _normalize_inputs(*groups) else: data_arrays, _, _ = _normalize_inputs(groups) return self._bind_single(self._spec.unbinding_fn, data_arrays)