Stream#

A datastream is a sequence of instances arriving one at a time.

class capymoa.stream.Stream[source]#

Bases: object

A datastream that can be learnt instance by instance.

__init__(
moa_stream: InstanceStream | None = None,
schema: Schema | None = None,
CLI: str | None = None,
)[source]#

Construct a Stream from a MOA stream object.

Usually, you will want to construct a Stream using the capymoa.stream.stream_from_file() function.

Parameters:
  • moa_stream – The MOA stream object to read instances from. Is None if the stream is created from a numpy array.

  • schema – The schema of the stream. If None, the schema is inferred from the moa_stream.

  • CLI – Additional command line arguments to pass to the MOA stream.

Raises:
  • ValueError – If no schema is provided and no moa_stream is provided.

  • ValueError – If command line arguments are provided without a moa_stream.

CLI_help() str[source]#

Return cli help string for the stream.

has_more_instances() bool[source]#

Return True if the stream have more instances to read.

next_instance() LabeledInstance | RegressionInstance[source]#

Return the next instance in the stream.

Raises:

ValueError – If the machine learning task is neither a regression nor a classification task.

Returns:

A labeled instances or a regression depending on the schema.

get_schema() Schema[source]#

Return the schema of the stream.

get_moa_stream() InstanceStream | None[source]#

Get the MOA stream object if it exists.

restart()[source]#

Restart the stream to read instances from the beginning.

class capymoa.stream.Schema[source]#

Bases: object

Schema describes the structure of a stream.

It contains the attribute names, datatype, and the possible values for nominal attributes. The schema is crucial for a learner to know how to interpret instances correctly.

When working with datasets built into CapyMOA (see capymoa.datasets) and ARFF files, the schema is automatically created. However, in some cases you might want to create a schema manually. This can be done using the from_custom() method.

__init__(
moa_header: InstancesHeader,
)[source]#

Construct a schema by wrapping a InstancesHeader.

To create a schema without an InstancesHeader use from_custom() method.

Parameters:

moa_header – A Java MOA header object.

get_label_values() Sequence[str][source]#

Return the possible values for the class label.

get_label_indexes() Sequence[int][source]#

Return the possible indexes for the class label.

get_value_for_index(y_index: int | None) str | None[source]#

Return the value for the class label index y_index.

get_index_for_label(y: str)[source]#

Return the index for the class label y.

get_moa_header() InstancesHeader[source]#

Get the JAVA MOA header. Useful for advanced users.

This is needed for advanced operations that are not supported by the Python wrappers (yet).

get_num_attributes() int[source]#

Return the number of attributes excluding the target attribute.

get_num_classes() int[source]#

Return the number of possible classes. If regression, returns 1.

is_regression() bool[source]#

Return True if the problem is a regression problem.

is_classification() bool[source]#

Return True if the problem is a classification problem.

is_y_index_in_range(y_index: int) bool[source]#

Return True if the y_index is in the range of the class label indexes.

property dataset_name: str#

Returns the name of the dataset.

static from_custom(
feature_names: Sequence[str],
values_for_nominal_features: Dict[str, Sequence[str]] = {},
values_for_class_label: Sequence[str] = None,
dataset_name='No_Name',
target_attribute_name=None,
enforce_regression=False,
)[source]#

Create a CapyMOA Schema that defines each attribute in the stream.

The following example shows how to use this method to create a classification schema:

>>> from capymoa.stream import Schema
...
>>> Schema.from_custom(
...     feature_names=["attrib_1", "attrib_2"],
...     dataset_name="MyClassification",
...     target_attribute_name="class",
...     values_for_class_label=["yes", "no"])
@relation MyClassification

@attribute attrib_1 numeric
@attribute attrib_2 numeric
@attribute class {yes,no}

@data

The following example shows how to use this method to create a regression schema:

>>> Schema.from_custom(
...     feature_names=["attrib_1", "attrib_2"],
...     values_for_nominal_features={"attrib_1": ["a", "b"]},
...     dataset_name="MyRegression",
...     target_attribute_name="target",
...     enforce_regression=True)
@relation MyRegression

@attribute attrib_1 {a,b}
@attribute attrib_2 numeric
@attribute target numeric

@data

Sample code to get relevant information from two Numpy arrays: X[rows][features] and y[rows]

Parameters:
  • feature_names – A list containing names of features. if none sets a default name.

  • values_for_nominal_features – Possible values of each nominal feature.

  • values_for_class_label – Possible values for class label. Values are turned into strings.

  • dataset_name – Name of the dataset. Default is “No_Name”.

  • target_attribute_name – Name of the target/class attribute. Default is None.

  • enforce_regression – If True, the schema is interpreted as a regression problem. Default is False.

Return CayMOA Schema:

Initialized CapyMOA Schema which contain all necessary attribute information for all features and the class label

capymoa.stream.stream_from_file(
path_to_csv_or_arff: str = None,
dataset_name: str = 'NoName',
enforce_regression: bool = False,
) Stream[source]#

Create a datastream from a csv or arff file.

>>> from capymoa.stream import stream_from_file
>>> stream = stream_from_file("data/electricity_tiny.csv", dataset_name="Electricity")
>>> stream.next_instance()
LabeledInstance(
    Schema(Electricity),
    x=ndarray(..., 6),
    y_index=1,
    y_label='1'
)
>>> stream.next_instance().x
array([0.021277, 0.051699, 0.415055, 0.003467, 0.422915, 0.414912])
Parameters:
  • path_to_csv_or_arff – A file path to a CSV or ARFF file.

  • dataset_name – A descriptive name given to the dataset, defaults to “NoName”

  • enforce_regression – When working with a CSV file, this parameter allows the user to force the data to be interpreted as a regression problem. Defaults to False.

class capymoa.stream.ARFFStream[source]#

Bases: Stream

A datastream originating from an ARFF file.

__init__(path: str, CLI: str | None = None)[source]#

Construct an ARFFStream object from a file path.

Parameters:
  • path – A filepath

  • CLI – Additional command line arguments to pass to the MOA stream.

CLI_help() str[source]#

Return cli help string for the stream.

get_moa_stream() InstanceStream | None[source]#

Get the MOA stream object if it exists.

get_schema() Schema[source]#

Return the schema of the stream.

has_more_instances() bool[source]#

Return True if the stream have more instances to read.

next_instance() LabeledInstance | RegressionInstance[source]#

Return the next instance in the stream.

Raises:

ValueError – If the machine learning task is neither a regression nor a classification task.

Returns:

A labeled instances or a regression depending on the schema.

restart()[source]#

Restart the stream to read instances from the beginning.

class capymoa.stream.PytorchStream[source]#

Bases: Stream

PytorchStream turns a PyTorch dataset into a datastream.

>>> from capymoa.evaluation import ClassificationEvaluator
...
>>> from capymoa.datasets import get_download_dir
>>> from capymoa.stream import PytorchStream
>>> from torchvision import datasets
>>> from torchvision.transforms import ToTensor
>>> print("Using PyTorch Dataset"); pytorchDataset = datasets.FashionMNIST( 
...     root=get_download_dir(),
...     train=True,
...     download=True,
...     transform=ToTensor()
... )
Using PyTorch Dataset...
>>> pytorch_stream = PytorchStream(dataset=pytorchDataset)
>>> pytorch_stream.get_schema()
@relation PytorchDataset

@attribute attrib_0 numeric
@attribute attrib_1 numeric
...
@attribute attrib_783 numeric
@attribute class {T-shirt/top,Trouser,Pullover,Dress,Coat,Sandal,Shirt,Sneaker,Bag,'Ankle boot'}

@data
>>> pytorch_stream.next_instance()
LabeledInstance(
    Schema(PytorchDataset),
    x=ndarray(..., 784),
    y_index=9,
    y_label='Ankle boot'
)
__init__(
dataset: Dataset,
enforce_regression=False,
)[source]#

Construct PytorchStream from a PyTorch dataset.

Parameters:
  • dataset – PyTorch containing tuples of x and y

  • enforce_regression – Force the task to be a regression task, default is False

has_more_instances()[source]#

Return True if the stream have more instances to read.

next_instance()[source]#

Return the next instance in the stream.

Raises:

ValueError – If the machine learning task is neither a regression nor a classification task.

Returns:

A labeled instances or a regression depending on the schema.

get_schema()[source]#

Return the schema of the stream.

get_moa_stream()[source]#

Get the MOA stream object if it exists.

restart()[source]#

Restart the stream to read instances from the beginning.

CLI_help() str[source]#

Return cli help string for the stream.

class capymoa.stream.CSVStream[source]#

Bases: Stream

__init__(
csv_file_path,
dtypes: list = None,
values_for_nominal_features={},
class_index: int = -1,
values_for_class_label: list = None,
target_attribute_name=None,
enforce_regression=False,
skip_header: bool = False,
delimiter=',',
)[source]#

Construct a Stream from a MOA stream object.

Usually, you will want to construct a Stream using the capymoa.stream.stream_from_file() function.

Parameters:
  • moa_stream – The MOA stream object to read instances from. Is None if the stream is created from a numpy array.

  • schema – The schema of the stream. If None, the schema is inferred from the moa_stream.

  • CLI – Additional command line arguments to pass to the MOA stream.

Raises:
  • ValueError – If no schema is provided and no moa_stream is provided.

  • ValueError – If command line arguments are provided without a moa_stream.

CLI_help() str[source]#

Return cli help string for the stream.

count_number_of_lines()[source]#
has_more_instances()[source]#

Return True if the stream have more instances to read.

next_instance()[source]#

Return the next instance in the stream.

Raises:

ValueError – If the machine learning task is neither a regression nor a classification task.

Returns:

A labeled instances or a regression depending on the schema.

get_schema()[source]#

Return the schema of the stream.

get_moa_stream()[source]#

Get the MOA stream object if it exists.

restart()[source]#

Restart the stream to read instances from the beginning.

Generator#

Generate artificial data streams.

class capymoa.stream.generator.RandomTreeGenerator[source]#

Bases: Stream

Stream generator for a stream based on a randomly generated tree.

>>> from capymoa.stream.generator import RandomTreeGenerator
...
>>> stream = RandomTreeGenerator()
>>> stream.next_instance()
LabeledInstance(
    Schema(generators.RandomTreeGenerator ),
    x=ndarray(..., 10),
    y_index=0,
    y_label='class1'
)
>>> stream.next_instance().x
array([4.        , 2.        , 2.        , 1.        , 4.        ,
       0.39717434, 0.34751803, 0.29405703, 0.50648363, 0.11596709])
__init__(
instance_random_seed: int = 1,
tree_random_seed: int = 1,
num_classes: int = 2,
num_nominals: int = 5,
num_numerics: int = 5,
num_vals_per_nominal: int = 5,
max_tree_depth: int = 5,
first_leaf_level: int = 3,
leaf_fraction: float = 0.15,
)[source]#

Construct a random tree generator.

Parameters:
  • instance_random_seed – Seed for random generation of instances.

  • tree_random_seed – Seed for random generation of tree.

  • num_classes – The number of classes to generate.

  • num_nominals – The number of nominal attributes to generate.

  • num_numerics – The number of numeric attributes to generate.

  • num_vals_per_nominal – The number of values to generate per nominal attribute.

  • max_tree_depth – The maximum depth of the tree concept.

  • first_leaf_level – The first level of the tree above max_tree_depth that can have leaves

  • leaf_fraction – The fraction of leaves per level from first leaf level onwards.

CLI_help() str[source]#

Return cli help string for the stream.

get_moa_stream() InstanceStream | None[source]#

Get the MOA stream object if it exists.

get_schema() Schema[source]#

Return the schema of the stream.

has_more_instances() bool[source]#

Return True if the stream have more instances to read.

next_instance() LabeledInstance | RegressionInstance[source]#

Return the next instance in the stream.

Raises:

ValueError – If the machine learning task is neither a regression nor a classification task.

Returns:

A labeled instances or a regression depending on the schema.

restart()[source]#

Restart the stream to read instances from the beginning.

class capymoa.stream.generator.SEA[source]#

Bases: Stream

Generates SEA concepts functions.

>>> from capymoa.stream.generator import SEA
...
>>> stream = SEA()
>>> stream.next_instance()
LabeledInstance(
    Schema(generators.SEAGenerator ),
    x=ndarray(..., 3),
    y_index=1,
    y_label='groupB'
)
>>> stream.next_instance().x
array([6.58867239, 7.10739628, 1.52736201])

Street, W. N., & Kim, Y. (2001). A streaming ensemble algorithm (SEA) for large-scale classification. doi:10.1145/502512.502568

__init__(
instance_random_seed: int = 1,
function: int = 1,
balance_classes: bool = False,
noise_percentage: int = 10,
)[source]#

Construct a SEA datastream generator.

Parameters:
  • instance_random_seed – Seed for random generation of instances, defaults to 1

  • function – Classification function used, as defined in the original paper, defaults to 1

  • balance_classes – Balance the number of instances of each class, defaults to False

  • noise_percentage – Percentage of noise to add to the data, defaults to 10

CLI_help() str[source]#

Return cli help string for the stream.

get_moa_stream() InstanceStream | None[source]#

Get the MOA stream object if it exists.

get_schema() Schema[source]#

Return the schema of the stream.

has_more_instances() bool[source]#

Return True if the stream have more instances to read.

next_instance() LabeledInstance | RegressionInstance[source]#

Return the next instance in the stream.

Raises:

ValueError – If the machine learning task is neither a regression nor a classification task.

Returns:

A labeled instances or a regression depending on the schema.

restart()[source]#

Restart the stream to read instances from the beginning.

Drift#

Simulate concept drift in datastreams.

class capymoa.stream.drift.DriftStream[source]#

Bases: Stream

__init__(schema=None, CLI=None, moa_stream=None, stream=None)[source]#

Construct a Stream from a MOA stream object.

Usually, you will want to construct a Stream using the capymoa.stream.stream_from_file() function.

Parameters:
  • moa_stream – The MOA stream object to read instances from. Is None if the stream is created from a numpy array.

  • schema – The schema of the stream. If None, the schema is inferred from the moa_stream.

  • CLI – Additional command line arguments to pass to the MOA stream.

Raises:
  • ValueError – If no schema is provided and no moa_stream is provided.

  • ValueError – If command line arguments are provided without a moa_stream.

get_num_drifts()[source]#
get_drifts()[source]#
CLI_help() str[source]#

Return cli help string for the stream.

get_moa_stream() InstanceStream | None[source]#

Get the MOA stream object if it exists.

get_schema() Schema[source]#

Return the schema of the stream.

has_more_instances() bool[source]#

Return True if the stream have more instances to read.

next_instance() LabeledInstance | RegressionInstance[source]#

Return the next instance in the stream.

Raises:

ValueError – If the machine learning task is neither a regression nor a classification task.

Returns:

A labeled instances or a regression depending on the schema.

restart()[source]#

Restart the stream to read instances from the beginning.

class capymoa.stream.drift.Drift[source]#

Bases: object

Represents a concept drift in a DriftStream. See 2.7.1 Concept drift framework in [1].

__init__(position, width=0, alpha=0.0, random_seed=1)[source]#

Construct a drift in a DriftStream.

Parameters:
  • position – The location of the drift in terms of the number of instances processed prior to it occurring.

  • width – The size of the window of change. A width of 0 or 1 corresponds to an abrupt drift.

  • alpha – The grade of change, defaults to 0.0.

  • random_seed – Seed for random number generation, defaults to 1.

class capymoa.stream.drift.GradualDrift[source]#

Bases: Drift

__init__(
position=None,
width=None,
start=None,
end=None,
alpha=0.0,
random_seed=1,
)[source]#

Construct a drift in a DriftStream.

Parameters:
  • position – The location of the drift in terms of the number of instances processed prior to it occurring.

  • width – The size of the window of change. A width of 0 or 1 corresponds to an abrupt drift.

  • alpha – The grade of change, defaults to 0.0.

  • random_seed – Seed for random number generation, defaults to 1.

class capymoa.stream.drift.AbruptDrift[source]#

Bases: Drift

__init__(position, random_seed=1)[source]#

Construct a drift in a DriftStream.

Parameters:
  • position – The location of the drift in terms of the number of instances processed prior to it occurring.

  • width – The size of the window of change. A width of 0 or 1 corresponds to an abrupt drift.

  • alpha – The grade of change, defaults to 0.0.

  • random_seed – Seed for random number generation, defaults to 1.