TorchStream#

class capymoa.stream.TorchStream[source]#

Bases: Stream

A stream adapter for PyTorch datasets.

This class converts PyTorch datasets into CapyMOA streams for both classification and regression tasks.

Creating a classification stream from a PyTorch dataset:

>>> from capymoa.datasets import get_download_dir
>>> from capymoa.stream import TorchStream
>>> from torchvision import datasets, transforms
>>>
>>> dataset = datasets.FashionMNIST(
...     root=get_download_dir(),
...     train=True,
...     download=True,
...     transform=transforms.ToTensor()
... )  
>>> stream = TorchStream.from_classification(
...     dataset, num_classes=10, class_names=dataset.classes
... )  
>>> stream.next_instance()  
LabeledInstance(...)

Creating a shuffled classification stream:

>>> import torch
>>> from torch.utils.data import TensorDataset
>>>
>>> dataset = TensorDataset(
...     torch.tensor([[1.0], [2.0], [3.0]]),
...     torch.tensor([0, 1, 2])
... )
>>> stream = TorchStream.from_classification(
...     dataset, num_classes=3, shuffle=True, shuffle_seed=0
... )
>>> [float(inst.x[0]) for inst in stream]
[3.0, 1.0, 2.0]

Streams can be restarted to iterate again:

>>> stream.restart()
>>> [float(inst.x[0]) for inst in stream]
[3.0, 1.0, 2.0]

Creating a regression stream:

>>> dataset = TensorDataset(
...     torch.tensor([[1.0], [2.0], [3.0]]),
...     torch.tensor([0.5, 1.5, 2.5])
... )
>>> stream = TorchStream.from_regression(
...     dataset, shuffle=True, shuffle_seed=0
... )
>>> [(float(inst.x[0]), float(inst.y_value)) for inst in stream]
[(3.0, 2.5), (1.0, 0.5), (2.0, 1.5)]
__init__(
dataset: Dataset,
schema: Schema,
)[source]#

Construct a TorchStream from a PyTorch Dataset and a Schema.

Usually you want from_classification() or from_regression().

Parameters:
  • dataset – A PyTorch Dataset that yields tuples of (features, target).

  • schema – A Schema object that describes the structure of the data, including feature names and target information.

__iter__() Iterator[_AnyInstance][source]#

Get an iterator over the stream.

This will NOT restart the stream if it has already been iterated over. Please use the restart() method to restart the stream.

Yield:

An iterator over the stream.

__next__() _AnyInstance[source]#

Get the next instance in the stream.

Returns:

The next instance in the stream.

cli_help() str[source]#

Return a help message

static from_classification(
dataset: Dataset[Tuple[Tensor, Tensor | int]],
num_classes: int,
class_names: Sequence[str] | None = None,
dataset_name: str = 'TorchStream',
shape: Sequence[int] | None = None,
shuffle: bool = False,
shuffle_seed: int | None = None,
) TorchStream[source]#

Construct a stream for classification from a PyTorch Dataset.

Parameters:
  • dataset – A PyTorch Dataset that yields tuples of (features, target).

  • num_classes – The number of classes in the classification task.

  • class_names – An optional sequence of class names corresponding to the class indices.

  • dataset_name – An optional name for the stream.

  • shape – An optional shape for the features. If not provided, features will be treated as flat vectors.

  • shuffle – Whether to shuffle the dataset.

  • shuffle_seed – An optional seed for shuffling the dataset.

Returns:

A TorchStream instance.

static from_regression(
dataset: Dataset[Tuple[Tensor, Tensor | float]],
dataset_name: str = 'TorchStream',
shuffle: bool = False,
shuffle_seed: int | None = None,
) TorchStream[source]#

Construct a stream for regression from a PyTorch Dataset.

Parameters:
  • dataset – A PyTorch Dataset that yields tuples of (features, target) for regression tasks.

  • dataset_name – An optional name for the stream.

  • shape – An optional shape for the features. If not provided, features will be treated as flat vectors.

  • shuffle – Whether to shuffle the dataset.

  • shuffle_seed – An optional seed for shuffling the dataset.

Returns:

A TorchStream instance for regression.

get_moa_stream()[source]#

Get the MOA stream object if it exists.

get_schema()[source]#

Return the schema of the stream.

has_more_instances()[source]#

Return True if the stream have more instances to read.

next_instance()[source]#

Return the next instance in the stream.

Raises:

ValueError – If the machine learning task is neither a regression nor a classification task.

Returns:

A labeled instances or a regression depending on the schema.

restart()[source]#

Restart the stream to read instances from the beginning.