from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import numpy as np
# Optional PyTorch import
try:
import torch
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
torch = None
from pyhdc.config import get_default_backend, get_default_device
from pyhdc.exceptions import GeneratorNotSupportedError
from pyhdc.generation.base import DefaultGenerator, HDCGenerator
from pyhdc.hypervector import BackendManager, EncodingSpec, Hypervector
from pyhdc.types import ArrayLike, Backend, Device
def _unpack_operation_result(
result: Union[ArrayLike, Tuple[ArrayLike, Dict[str, Any]]], operation_fn: Callable
) -> Tuple[ArrayLike, Dict[str, Any]]:
"""
Unpack operation result, handling both old and new-style returns.
Adds 'operation' key with function name to metadata dict.
Args:
result: Either raw ArrayLike or (ArrayLike, metadata_dict) tuple
operation_fn: The operation function that produced this result
Returns:
Tuple of (data, metadata_dict)
"""
if isinstance(result, tuple) and len(result) == 2:
data, metadata = result
import functools
if isinstance(operation_fn, functools.partial):
op_name = operation_fn.func.__name__
else:
op_name = operation_fn.__name__
if isinstance(metadata, dict):
enhanced_metadata = {"operation": op_name, **metadata}
return data, enhanced_metadata
else:
# Non-dict secondary return (e.g. MatrixMultiplication returns matrices list)
return data, {"operation": op_name, "aux": metadata}
return result, {}
[docs]
class Encoding(ABC):
"""
Base class for hypervector encoding schemes.
An encoding defines how hypervectors are generated and how operations
(similarity, bundling, binding) are performed on them.
"""
def __init__(
self,
dimension: int = 10_000,
backend: Optional[Backend] = None,
device: Optional[Device] = None,
dtype: Optional[Any] = None,
mask: Optional[int] = None,
generator: Optional[HDCGenerator] = None,
similarity_remap: Optional[Callable] = None,
) -> None:
"""
Initialize an encoding scheme.
Args:
dimension: Number of dimensions in hypervectors
backend: Backend to use ('numpy' or 'torch'). Defaults to the global
preference (see ``pyhdc.prefer_torch``/``prefer_numpy``), which is
'numpy' unless changed.
device: Device for PyTorch backend. Defaults to the global preference
(see ``pyhdc.prefer_cuda``/``prefer_cpu``).
dtype: Data type override
mask: Optional mask value
generator: Optional custom generator (uses default if None)
similarity_remap: Optional callable applied to every similarity result
before returning. All similarity functions return [-1, 1] by default;
use this to remap the output, e.g. ``pyhdc.components.similarity.remap_to_unit``
to shift to [0, 1].
"""
if backend is None:
backend = get_default_backend()
if device is None:
device = get_default_device()
self.dimension = dimension
self.backend = backend
self.device = device if backend == "torch" else None
self._similarity_remap = similarity_remap
if backend == "torch" and not TORCH_AVAILABLE:
raise ImportError(
"PyTorch backend requested but PyTorch is not installed. "
"Install it with: pip install torch"
)
# Set up generator
self._has_custom_generator = generator is not None
self._generator = generator if generator is not None else DefaultGenerator()
# Get encoding specification
spec = self._get_encoding_spec()
# Override dtype if provided
if dtype is not None:
spec.dtype = dtype
if mask is not None:
spec.mask = mask
self._spec = spec
self._validate_generator()
def _validate_generator(self) -> None:
"""Validate that generator supports required output type."""
output_type = self._spec.generator_output_type
if output_type == "bits" and not self._generator.supports_bits():
raise GeneratorNotSupportedError(
f"Generator {self._generator.__class__.__name__} does not support "
f"bit generation required by {self.__class__.__name__}"
)
elif output_type == "words" and not self._generator.supports_words():
raise GeneratorNotSupportedError(
f"Generator {self._generator.__class__.__name__} does not support "
f"word generation required by {self.__class__.__name__}"
)
elif output_type == "floats" and not self._generator.supports_floats():
raise GeneratorNotSupportedError(
f"Generator {self._generator.__class__.__name__} does not support "
f"float generation required by {self.__class__.__name__}"
)
[docs]
@abstractmethod
def _get_encoding_spec(self) -> EncodingSpec:
"""Get the encoding specification for this encoding type."""
def _generate_with_generator(self, size: Union[int, Tuple[int, ...]]) -> np.ndarray:
"""
Generate data using the custom generator.
Args:
size: Size specification
Returns:
Generated numpy array
"""
# Determine total number of elements
if isinstance(size, int):
total_elements = size
shape = (size,)
elif isinstance(size, tuple):
total_elements = int(np.prod(size))
shape = size
else:
raise ValueError(f"Invalid size specification: {size}")
output_type = self._spec.generator_output_type
# Generate based on output type
if output_type == "bits":
data = self._generator.generate_bits(total_elements)
elif output_type == "words":
# Determine word size from dtype
dtype_bits = np.dtype(self._spec.dtype).itemsize * 8
data = self._generator.generate_words(total_elements, dtype_bits)
elif output_type == "floats":
# Determine range based on dtype
if np.issubdtype(self._spec.dtype, np.integer):
# For integer dtypes, generate in [0, 1] then scale
floats = self._generator.generate_floats(total_elements, 0.0, 1.0)
# Convert to appropriate range
if (
self._spec.dtype == np.int8
or "bipolar" in self._spec.element_generator.__name__.lower()
):
# Bipolar: map to {-1, 1}
data = [int(2 * round(f) - 1) for f in floats]
else:
# Binary: map to {0, 1}
data = [int(round(f)) for f in floats]
else:
# For float dtypes, generate in [-1, 1] or appropriate range
data = self._generator.generate_floats(total_elements, -1.0, 1.0)
else:
raise ValueError(f"Unknown output type: {output_type}")
# Convert to numpy array and reshape
arr = np.array(data, dtype=self._spec.dtype)
return arr.reshape(shape)
def _generate_one(self, dim: int, use_generator: bool) -> np.ndarray:
"""
Generate a single ``(dim,)`` hypervector as a numpy array.
Args:
dim: Hypervector dimension.
use_generator: Whether to use the custom HDCGenerator pathway.
Returns:
A 1D numpy array of length ``dim``.
"""
if use_generator and self._generator is not None:
return self._generate_with_generator(dim)
return self._spec.element_generator(dim, self._spec.dtype)
[docs]
def generate(
self,
size: Union[int, Tuple[int, ...]] = None,
backend: Optional[Backend] = None,
device: Optional[Device] = None,
use_generator: Optional[bool] = None,
) -> Hypervector:
"""
Generate random hypervector(s).
Hypervectors are dimension-first. A scalar (or ``None``) ``size`` produces a
single ``(D,)`` hypervector; a tuple ``(D, N)`` produces a batch of ``N``
hypervectors of dimension ``D`` stored as columns of a ``(D, N)`` array
(and likewise ``(D, N, M)`` for higher-rank batches).
Batched generation is defined as generating the ``N`` hypervectors one at a
time and stacking them as columns, so under a fixed seed
``generate(size=(D, N))`` yields exactly the same vectors as ``N`` successive
``generate(size=D)`` calls -- including for ordered generators (LCG/LFSR/...).
Args:
size: ``None`` or int for a single ``(D,)`` vector; a tuple
``(D, *batch)`` for a batch of ``prod(batch)`` vectors of dimension
``D``.
backend: Backend override (defaults to the encoding's backend).
device: Device override for the torch backend.
use_generator: Whether to use the HDCGenerator pathway. Defaults to True
if a custom generator was passed at construction, False otherwise
(uses element_generator directly, which gives the correct
per-encoding distribution).
Returns:
A new Hypervector.
"""
if backend is None:
backend = self.backend
if device is None:
device = self.device
if use_generator is None:
use_generator = self._has_custom_generator
if size is None or isinstance(size, int):
dim = self.dimension if size is None else size
data = self._generate_one(dim, use_generator)
elif isinstance(size, tuple):
dim, batch = size[0], size[1:]
if not batch:
data = self._generate_one(dim, use_generator)
else:
count = 1
for axis in batch:
count *= int(axis)
columns = [self._generate_one(dim, use_generator) for _ in range(count)]
data = np.stack(columns, axis=-1).reshape((dim, *batch))
else:
raise ValueError(f"Invalid size specification: {size}")
# Convert to appropriate backend
if backend == "torch":
data = BackendManager.to_torch(data, device)
return Hypervector(data, self, backend, None)
[docs]
def zeros(
self,
size: Union[int, Tuple[int, ...]] = None,
backend: Optional[Backend] = None,
device: Optional[Device] = None,
) -> Hypervector:
"""Generate zero hypervector(s)."""
if size is None:
size = self.dimension
if backend is None:
backend = self.backend
if device is None:
device = self.device
if backend == "torch":
data = torch.zeros(size, dtype=self._spec.dtype, device=device)
else:
data = np.zeros(size, dtype=self._spec.dtype)
return Hypervector(data, self, backend, None)
[docs]
def from_array(
self, array: ArrayLike, backend: Optional[Backend] = None
) -> Hypervector:
"""Create a Hypervector from an existing array."""
if backend is None:
backend = BackendManager.get_backend(array)
return Hypervector(array, self, backend, None)
[docs]
def set_generator(self, generator: HDCGenerator) -> None:
"""
Set a new generator for this encoding.
Args:
generator: The new generator to use
Raises:
GeneratorNotSupportedError: If generator doesn't support required output type
"""
self._generator = generator
self._validate_generator()
[docs]
def get_generator(self) -> HDCGenerator:
"""Get the current generator."""
return self._generator
[docs]
def similarity(
self,
hvA: Union[ArrayLike, Hypervector, List],
hvB: Optional[Union[ArrayLike, Hypervector, List]] = None,
) -> Union[float, ArrayLike, List[Union[float, ArrayLike]]]:
"""
Compute similarity between hypervector(s).
Hypervectors are dimension-first ``(D, N)``. Calling conventions:
- ``similarity(a, b)`` with two ``(D,)`` vectors -> a scalar score
- ``similarity(A, B)`` with two ``(D, N)`` batches -> ``N`` per-column scores
- ``similarity(v, B)`` with a vector and a ``(D, N)`` batch -> ``N`` scores
- ``similarity(batch)`` with one ``(D, N)`` batch -> ``N-1`` scores of
column 0 against each remaining column
- ``similarity([..], [..])`` with two equal-length lists -> pairwise scores
Args:
hvA: First hypervector(s) (Hypervector, array, or list), or a single
``(D, N)`` batch when ``hvB`` is omitted.
hvB: Optional second hypervector(s).
Returns:
A scalar, a 1D array of scores, or a list of scores (for list inputs).
Examples:
>>> bsc.similarity(hv1, hv2) # scalar
>>> enc.similarity(codebook) # col 0 vs the rest
>>> bsc.similarity([hv1, hv2], [hv4, hv5]) # [sim(1,4), sim(2,5)]
"""
from pyhdc.components.input_formatting import _extract_data
# Batched if both are lists of equal length
if isinstance(hvA, list) and isinstance(hvB, list):
if len(hvA) != len(hvB):
raise ValueError(
f"Batched similarity requires equal-length lists. "
f"Got {len(hvA)} and {len(hvB)}"
)
results = []
for a, b in zip(hvA, hvB):
data_a = _extract_data(a)
data_b = _extract_data(b)
sim = self._spec.similarity_fn(data_a, data_b)
if self._similarity_remap is not None:
sim = self._similarity_remap(sim)
results.append(sim)
return results
# Single (D, N) batch (hvB omitted) or a two-operand comparison
if hvB is None:
result = self._spec.similarity_fn(_extract_data(hvA))
else:
result = self._spec.similarity_fn(_extract_data(hvA), _extract_data(hvB))
if self._similarity_remap is not None:
result = self._similarity_remap(result)
return result
[docs]
def bundle(
self,
*hypervectors: Union[ArrayLike, Hypervector, List],
batch_dim: Optional[int] = None,
) -> Union[Hypervector, List[Hypervector]]:
"""
Bundle multiple hypervectors, optionally in batches.
Args:
*hypervectors: Hypervector objects, raw arrays, or lists to bundle
batch_dim: If provided with 3D+ array, split along this dimension for batching
Returns:
Single Hypervector (if not batched) or List of Hypervectors (if batched)
Examples:
>>> # Single bundle (current behavior)
>>> bsc.bundle(hv1, hv2, hv3) # Returns: Hypervector
>>> bsc.bundle([hv1, hv2, hv3]) # Returns: Hypervector
>>> # Batched bundles (new)
>>> bsc.bundle([[hv1, hv2], [hv3, hv4]]) # Returns: [bundled1, bundled2]
>>> bsc.bundle(array_3d, batch_dim=0) # Returns: list of bundled hypervectors
"""
from pyhdc.components.input_formatting import (
_detect_batch_structure,
_normalize_inputs,
)
# Detect if this is a batched operation
is_batched, groups = _detect_batch_structure(*hypervectors, batch_dim=batch_dim)
if is_batched:
# Process each batch group independently
results = []
for group in groups:
# Normalize each group
if isinstance(group, (list, tuple)):
data_arrays, _, _ = _normalize_inputs(*group)
else:
data_arrays, _, _ = _normalize_inputs(group)
result = self._spec.bundling_fn(*data_arrays)
result_data, metadata = _unpack_operation_result(
result, self._spec.bundling_fn
)
results.append(Hypervector(result_data, self, self.backend, metadata))
return results
else:
# Single operation
if isinstance(groups, (list, tuple)) and len(groups) > 0:
data_arrays, _, _ = _normalize_inputs(*groups)
else:
data_arrays, _, _ = _normalize_inputs(groups)
result = self._spec.bundling_fn(*data_arrays)
result_data, metadata = _unpack_operation_result(
result, self._spec.bundling_fn
)
return Hypervector(result_data, self, self.backend, metadata)
[docs]
def thin(
self, hypervector: Union[ArrayLike, Hypervector, List]
) -> Union[Hypervector, List[Hypervector]]:
"""
Apply thinning to hypervector(s).
Supports batching: if a list is provided, applies thinning independently
to each hypervector in the list.
Args:
hypervector: Hypervector object, raw array, or list of hypervectors to thin
Returns:
Single Hypervector (if single input) or List of Hypervectors (if list input)
Examples:
>>> # Single thinning
>>> bsc.thin(hv) # Returns: Hypervector
>>> # Batched thinning
>>> bsc.thin([hv1, hv2, hv3]) # Returns: [thinned1, thinned2, thinned3]
"""
from pyhdc.components.input_formatting import _extract_data
# Batched if list input
if isinstance(hypervector, list):
results = []
for hv in hypervector:
data = _extract_data(hv)
result = self._spec.thinning_fn(data)
result_data, metadata = _unpack_operation_result(
result, self._spec.thinning_fn
)
results.append(Hypervector(result_data, self, self.backend, metadata))
return results
else:
# Single operation
data = _extract_data(hypervector)
result = self._spec.thinning_fn(data)
result_data, metadata = _unpack_operation_result(
result, self._spec.thinning_fn
)
return Hypervector(result_data, self, self.backend, metadata)
[docs]
def bind(
self,
*hypervectors: Union[ArrayLike, Hypervector, List],
batch_dim: Optional[int] = None,
) -> Union[Hypervector, List[Hypervector]]:
"""
Bind multiple hypervectors, optionally in batches.
Args:
*hypervectors: Hypervector objects, raw arrays, or lists to bind
batch_dim: If provided with 3D+ array, split along this dimension for batching
Returns:
Single Hypervector (if not batched) or List of Hypervectors (if batched)
"""
from pyhdc.components.input_formatting import (
_detect_batch_structure,
_normalize_inputs,
)
# Detect if this is a batched operation
is_batched, groups = _detect_batch_structure(*hypervectors, batch_dim=batch_dim)
if is_batched:
# Process each batch group independently
results = []
for group in groups:
# Normalize each group
if isinstance(group, (list, tuple)):
data_arrays, _, _ = _normalize_inputs(*group)
else:
data_arrays, _, _ = _normalize_inputs(group)
result = self._spec.binding_fn(*data_arrays)
result_data, metadata = _unpack_operation_result(
result, self._spec.binding_fn
)
results.append(Hypervector(result_data, self, self.backend, metadata))
return results
else:
# Single operation
if isinstance(groups, (list, tuple)) and len(groups) > 0:
data_arrays, _, _ = _normalize_inputs(*groups)
else:
data_arrays, _, _ = _normalize_inputs(groups)
result = self._spec.binding_fn(*data_arrays)
result_data, metadata = _unpack_operation_result(
result, self._spec.binding_fn
)
return Hypervector(result_data, self, self.backend, metadata)
[docs]
def unbind(
self,
*hypervectors: Union[ArrayLike, Hypervector, List],
batch_dim: Optional[int] = None,
) -> Union[Hypervector, List[Hypervector]]:
"""
Unbind hypervectors, optionally in batches.
Args:
*hypervectors: Hypervector objects, raw arrays, or lists to unbind
batch_dim: If provided with 3D+ array, split along this dimension for batching
Returns:
Single Hypervector (if not batched) or List of Hypervectors (if batched)
"""
from pyhdc.components.input_formatting import (
_detect_batch_structure,
_normalize_inputs,
)
# Detect if this is a batched operation
is_batched, groups = _detect_batch_structure(*hypervectors, batch_dim=batch_dim)
if is_batched:
# Process each batch group independently
results = []
for group in groups:
# Normalize each group
if isinstance(group, (list, tuple)):
data_arrays, _, _ = _normalize_inputs(*group)
else:
data_arrays, _, _ = _normalize_inputs(group)
result = self._spec.unbinding_fn(*data_arrays)
result_data, metadata = _unpack_operation_result(
result, self._spec.unbinding_fn
)
results.append(Hypervector(result_data, self, self.backend, metadata))
return results
else:
# Single operation
if isinstance(groups, (list, tuple)) and len(groups) > 0:
data_arrays, _, _ = _normalize_inputs(*groups)
else:
data_arrays, _, _ = _normalize_inputs(groups)
result = self._spec.unbinding_fn(*data_arrays)
result_data, metadata = _unpack_operation_result(
result, self._spec.unbinding_fn
)
return Hypervector(result_data, self, self.backend, metadata)