Source code for pyhdc.hypervector

#!/usr/bin/env python
"""
Hyperdimensional Computing Library

A professional library for hyperdimensional computing with support for
multiple backends (NumPy and PyTorch), custom generators, and recovery methods.
"""

from __future__ import annotations

import numbers
import warnings
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Tuple, Union

import numpy as np

from pyhdc.exceptions import RaiseNotImplementedError
from pyhdc.types import ArrayLike, Backend, Device, GeneratorOutputType

# Use TYPE_CHECKING to avoid circular imports at runtime
if TYPE_CHECKING:
    from pyhdc.encodings.base import Encoding

# Optional PyTorch import
try:
    import torch

    TORCH_AVAILABLE = True
except ImportError:
    TORCH_AVAILABLE = False
    torch = None


[docs] @dataclass class EncodingSpec: """Specification for a hypervector encoding scheme.""" dtype: Any element_generator: Callable similarity_fn: Callable bundling_fn: Callable thinning_fn: Callable binding_fn: Callable unbinding_fn: Callable mask: Optional[int] = None generator_output_type: GeneratorOutputType = "floats" # What type of output needed permute_fn: Optional[Callable] = None # None -> the shared CyclicShift permute inverse_fn: Callable = RaiseNotImplementedError normalize_fn: Callable = RaiseNotImplementedError negative_fn: Callable = RaiseNotImplementedError
[docs] class BackendManager: """Manages backend operations for numpy and PyTorch."""
[docs] @staticmethod def get_backend(array: ArrayLike) -> Backend: """Determine the backend of an array.""" if TORCH_AVAILABLE and torch.is_tensor(array): return "torch" return "numpy"
[docs] @staticmethod def to_numpy(array: ArrayLike) -> np.ndarray: """Convert array to numpy.""" if TORCH_AVAILABLE and torch.is_tensor(array): return array.detach().cpu().numpy() return np.asarray(array)
[docs] @staticmethod def to_torch( array: ArrayLike, device: Optional[Device] = None ) -> "torch.Tensor": # pyright: ignore[reportInvalidTypeForm] """Convert array to PyTorch tensor.""" if not TORCH_AVAILABLE: raise ImportError( "PyTorch is not installed. Install it with: pip install torch" ) if torch.is_tensor(array): return array.to(device) if device else array tensor = torch.from_numpy(np.asarray(array)) return tensor.to(device) if device else tensor
[docs] @staticmethod def get_device(array: ArrayLike) -> Optional[Device]: """Get the device of a tensor (None for numpy arrays).""" if TORCH_AVAILABLE and torch.is_tensor(array): return array.device return None
[docs] class Hypervector: """ A hypervector representation supporting multiple backends. Similar to numpy's ndarray, this class can represent a single hypervector or an array of hypervectors, and supports both numpy and PyTorch backends. Attributes: data: The underlying array (numpy.ndarray or torch.Tensor) encoding: The encoding scheme used for operations backend: The backend being used ('numpy' or 'torch') """ def __init__( self, data: ArrayLike, encoding: "Encoding", backend: Optional[Backend] = None, metadata: Optional[Dict[str, Any]] = None, ) -> None: """ Initialize a Hypervector. Args: data: The underlying array data encoding: The encoding scheme for this hypervector backend: Backend to use (auto-detected if None) metadata: Optional operational metadata dict """ self._encoding = encoding if backend is None: backend = BackendManager.get_backend(data) self._backend = backend self._data = data self._metadata = metadata if metadata is not None else {} @property def data(self) -> ArrayLike: """Get the underlying array data.""" return self._data @property def encoding(self) -> "Encoding": """Get the encoding scheme.""" return self._encoding @property def backend(self) -> Backend: """Get the current backend.""" return self._backend @property def shape(self) -> Tuple[int, ...]: """Get the shape of the hypervector.""" return self._data.shape @property def ndim(self) -> int: """Get the number of dimensions.""" return self._data.ndim @property def dtype(self) -> Any: """Get the data type.""" return self._data.dtype @property def device(self) -> Optional[Device]: """Get the device (for PyTorch backend).""" return BackendManager.get_device(self._data)
[docs] def get_metadata(self) -> Dict[str, Any]: """ Get operational metadata for this hypervector. Returns: Dictionary containing metadata from the operation that created this hypervector. Empty dict if no metadata available. """ return self._metadata.copy() # Return copy to prevent mutation
[docs] def __repr__(self) -> str: return ( f"Hypervector(shape={self.shape}, backend='{self.backend}', " f"encoding={self.encoding.__class__.__name__})" )
[docs] def __len__(self) -> int: return len(self._data)
[docs] def __getitem__(self, key) -> "Hypervector": """Support indexing and slicing.""" return Hypervector( self._data[key], self._encoding, self._backend, self._metadata )
[docs] def select(self, indices) -> "Hypervector": """ Select hypervectors along the batch axis (axis 1). Hypervectors are dimension-first ``(D, N)``; ``select`` keeps the columns at the given indices. Args: indices: Integer (non-negative) indices of the hypervectors to keep, as a sequence, numpy array, or tensor. Returns: A new Hypervector of shape ``(D, len(indices))`` with the selected columns, preserving encoding, backend, and metadata. """ data = self._data if self._backend == "torch": idx = ( indices if torch.is_tensor(indices) else torch.as_tensor(np.asarray(indices)) ) idx = idx.to(device=data.device, dtype=torch.long) selected = data.index_select(1, idx) else: selected = data[:, np.asarray(indices, dtype=np.intp)] return Hypervector(selected, self._encoding, self._backend, self._metadata)
[docs] def to_numpy(self) -> "Hypervector": """Convert to numpy backend.""" if self._backend == "numpy": return self return Hypervector( BackendManager.to_numpy(self._data), self._encoding, "numpy", self._metadata )
[docs] def to_torch(self, device: Optional[Device] = None) -> "Hypervector": """Convert to PyTorch backend.""" if self._backend == "torch" and device is None: return self return Hypervector( BackendManager.to_torch(self._data, device), self._encoding, "torch", self._metadata, )
[docs] def to(self, device: Device) -> "Hypervector": """Move to specified device (PyTorch only).""" if self._backend != "torch": raise ValueError("to() method is only available for PyTorch backend") return self.to_torch(device)
[docs] def cpu(self) -> "Hypervector": """Move to CPU.""" if self._backend == "torch": return self.to("cpu") return self
[docs] def cuda(self, device: Optional[int] = None) -> "Hypervector": """Move to CUDA device.""" device_str = f"cuda:{device}" if device is not None else "cuda" return self.to_torch(device_str)
[docs] def similarity( self, other: Optional["Hypervector"] = None, *, axis: Optional[int] = None ) -> Union[ float, np.ndarray, "torch.Tensor" ]: # pyright: ignore[reportInvalidTypeForm] """ Compute similarity with another hypervector, or within a batch. Args: other: Another hypervector to compare with. If omitted, ``self`` must be a ``(D, N)`` batch and the similarity of column 0 against each remaining column is returned. axis: For a single ``(D, N, M, ...)`` batch (``other`` omitted), the batch axis along which to split column 0 against the rest. Returns: Similarity score(s) """ if other is None: return self._encoding.similarity(self, axis=axis) self._check_compatibility(other) return self._encoding.similarity(self, other, axis=axis)
[docs] def bundle( self, *others: "Hypervector", axis: Union[None, int, Tuple[int, ...]] = None, batch_dim: Optional[int] = None, ) -> "Hypervector": """ Bundle this hypervector with others. Note: Batching via batch_dim is available at the Encoding class level, not at the instance method level (which always operates on single instances). Args: *others: Other hypervectors to bundle axis: Batch axis (or axes) to fold when bundling a single batched hypervector (defaults to the last batch axis) batch_dim: Passed through to Encoding.bundle (instance methods don't use batching) Returns: A new bundled hypervector """ for other in others: self._check_compatibility(other) # Encoding.bundle now handles Hypervector objects and returns a Hypervector return self._encoding.bundle(self, *others, axis=axis, batch_dim=batch_dim)
[docs] def bind( self, *others: "Hypervector", batch_dim: Optional[int] = None ) -> "Hypervector": """ Bind this hypervector with others. Note: Batching via batch_dim is available at the Encoding class level, not at the instance method level (which always operates on single instances). Args: *others: Other hypervectors to bind batch_dim: Passed through to Encoding.bind (instance methods don't use batching) Returns: A new bound hypervector """ for other in others: self._check_compatibility(other) # Encoding.bind now handles Hypervector objects and returns a Hypervector return self._encoding.bind(self, *others, batch_dim=batch_dim)
[docs] def unbind( self, *others: "Hypervector", batch_dim: Optional[int] = None ) -> "Hypervector": """ Unbind this hypervector from others. Note: Batching via batch_dim is available at the Encoding class level, not at the instance method level (which always operates on single instances). Args: *others: Other hypervectors to unbind batch_dim: Passed through to Encoding.unbind (instance methods don't use batching) Returns: A new unbound hypervector """ for other in others: self._check_compatibility(other) # Encoding.unbind now handles Hypervector objects and returns a Hypervector return self._encoding.unbind(self, *others, batch_dim=batch_dim)
[docs] def thin(self) -> "Hypervector": """ Apply thinning operation. Returns: A new thinned hypervector """ # Encoding.thin now handles Hypervector objects and returns a Hypervector return self._encoding.thin(self)
[docs] def permute(self, shift: int = 1) -> "Hypervector": """Permute (cyclic-shift) this hypervector along the dimension axis.""" return self._encoding.permute(self, shift=shift)
[docs] def inverse(self) -> "Hypervector": """Return the binding inverse of this hypervector.""" return self._encoding.inverse(self)
[docs] def negative(self) -> "Hypervector": """Return the bundling (additive) inverse of this hypervector.""" return self._encoding.negative(self)
[docs] def normalize(self) -> "Hypervector": """Normalize this hypervector to its encoding's entry space.""" return self._encoding.normalize(self)
[docs] def __add__(self, other: "Hypervector") -> "Hypervector": """``a + b`` bundles the two hypervectors.""" if not isinstance(other, Hypervector): return NotImplemented return self.bundle(other)
[docs] def __mul__(self, other: "Hypervector") -> "Hypervector": """``a * b`` binds the two hypervectors.""" if not isinstance(other, Hypervector): return NotImplemented return self.bind(other)
[docs] def __truediv__(self, other: "Hypervector") -> "Hypervector": """``a / b`` unbinds ``b`` from ``a``.""" if not isinstance(other, Hypervector): return NotImplemented return self.unbind(other)
[docs] def __invert__(self) -> "Hypervector": """``~a`` returns the binding inverse of ``a``.""" return self.inverse()
[docs] def __rshift__(self, shift: int) -> "Hypervector": """``a >> k`` permutes ``a`` by ``+k`` positions.""" if isinstance(shift, bool) or not isinstance(shift, numbers.Integral): return NotImplemented return self.permute(shift=int(shift))
[docs] def __lshift__(self, shift: int) -> "Hypervector": """``a << k`` permutes ``a`` by ``-k`` positions (inverse of ``>>``).""" if isinstance(shift, bool) or not isinstance(shift, numbers.Integral): return NotImplemented return self.permute(shift=-int(shift))
def _check_compatibility(self, other: "Hypervector") -> None: """Check if another hypervector is compatible.""" if self._backend != other._backend: raise ValueError( f"Backend mismatch: {self._backend} vs {other._backend}. " f"Use .to_numpy() or .to_torch() to convert." ) if self._encoding.__class__ != other._encoding.__class__: warnings.warn( f"Encoding mismatch: {self._encoding.__class__.__name__} vs " f"{other._encoding.__class__.__name__}" )
# Convenience functions for API
[docs] def generate( encoding: Encoding, size: Union[int, Tuple[int, ...]], use_generator: Optional[bool] = None, ) -> Hypervector: """ Generate random hypervector(s) using the specified encoding. Args: encoding: The encoding scheme to use size: Size of hypervector(s) to generate use_generator: Whether to use the custom generator Returns: A new Hypervector """ return encoding.generate(size, use_generator=use_generator)
[docs] def zeros(encoding: Encoding, size: Union[int, Tuple[int, ...]] = None) -> Hypervector: """ Generate zero hypervector(s) using the specified encoding. Args: encoding: The encoding scheme to use size: Size of hypervector(s) to generate Returns: A new zero Hypervector """ return encoding.zeros(size)
[docs] def bundle(*hypervectors: Hypervector) -> Hypervector: """ Bundle multiple hypervectors together. Args: *hypervectors: Hypervectors to bundle Returns: A new bundled Hypervector """ if not hypervectors: raise ValueError("At least one hypervector required") return hypervectors[0].bundle(*hypervectors[1:])
[docs] def bind(*hypervectors: Hypervector) -> Hypervector: """ Bind multiple hypervectors together. Args: *hypervectors: Hypervectors to bind Returns: A new bound Hypervector """ if not hypervectors: raise ValueError("At least one hypervector required") return hypervectors[0].bind(*hypervectors[1:])
[docs] def unbind(*hypervectors: Hypervector) -> Hypervector: """Unbind hypervectors: ``hypervectors[0]`` unbound from the rest.""" if not hypervectors: raise ValueError("At least one hypervector required") return hypervectors[0].unbind(*hypervectors[1:])
[docs] def permute(hypervector: Hypervector, shift: int = 1) -> Hypervector: """Permute (cyclic-shift) a hypervector along the dimension axis.""" return hypervector.permute(shift=shift)
[docs] def inverse(hypervector: Hypervector) -> Hypervector: """Return the binding inverse of a hypervector.""" return hypervector.inverse()
[docs] def negative(hypervector: Hypervector) -> Hypervector: """Return the bundling (additive) inverse of a hypervector.""" return hypervector.negative()
[docs] def normalize(hypervector: Hypervector) -> Hypervector: """Normalize a hypervector to its encoding's entry space.""" return hypervector.normalize()
[docs] def stack(hypervectors: "list[Hypervector]") -> Hypervector: """ Combine hypervectors/batches into one dimension-first ``(D, N)`` Hypervector. Backend-agnostic (numpy or torch). Concatenates along the batch axis (axis 1); a 1D ``(D,)`` vector is treated as a single column ``(D, 1)``. For example, ``stack([prototype, codebook])`` with a ``(D,)`` prototype and a ``(D, N)`` codebook returns a ``(D, N + 1)`` Hypervector with the prototype as column 0. Args: hypervectors: A non-empty list of Hypervectors sharing a backend (and, ideally, an encoding). Returns: A new Hypervector with the inputs concatenated along the batch axis. Raises: ValueError: If the list is empty or the backends differ. """ if not hypervectors: raise ValueError("At least one hypervector required") first = hypervectors[0] backend = first.backend encoding = first.encoding arrays = [] for hv in hypervectors: if hv.backend != backend: raise ValueError( f"Backend mismatch in stack: {backend} vs {hv.backend}. " f"Use .to_numpy() or .to_torch() to convert." ) if hv.encoding.__class__ != encoding.__class__: warnings.warn( f"Encoding mismatch in stack: {encoding.__class__.__name__} vs " f"{hv.encoding.__class__.__name__}" ) arrays.append(hv.data) if backend == "torch": result = torch.column_stack(arrays) else: result = np.column_stack(arrays) return Hypervector(result, encoding, backend)