{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"id": "b773bf8e-c420-44e1-80a6-99f75dd12268",
"metadata": {},
"source": [
"## Pipelines and Transformers\n",
"\n",
"This notebook showcases the current version of data processing pipelines in CapyMOA. \n",
"\n",
"* Includes examples of how preprocessing can be accomplished via pipelines and transformers.\n",
"* Transformers transform an instance, e.g., using standardization, normalization, etc.\n",
"* Pipelines bundle transformers, drift detectors, and learners (classifiers, regressors, clustering algorithms etc.)\n",
"* Some pipelines act as classifiers or regressors\n",
"\n",
"Please note that this feature is still under development; some functionality might not yet be available or change in future releases.\n",
"\n",
"\n",
"*More information about CapyMOA can be found in* https://www.capymoa.org\n",
"\n",
"**notebook last updated on 10/06/2024**\n",
"\n",
"### Main features\n",
"\n",
"We've redesigned the pipeline API to be more flexible and modular compared to the initial version.\n",
"\n",
"- The new API is now generic, meaning it can handle all types of data stream algorithms (e.g., classifiers, regressors, data transformations, change detectors, clustering, etc.). The only requirement is that there exists a `PipelineElement` class compatible with the algorithm. This class represents a single step within the pipeline and provides a unified interface for the overall pipeline object (which is of type `BasePipeline`).\n",
"- A pipeline can also function as a `PipelineElement`. This allows for the creation of smaller pipelines that can be combined into a larger pipeline. For instance, you could create separate data cleaning, transformation, and prediction pipelines and then integrate them into one comprehensive pipeline.\n",
"- Besides the vanilla `BasePipeline`, we currently support `ClassifierPipeline` and `RegressorPipeline`. These classes act as CapyMOA `Classifiers` and `Regressors`, meaning they support `predict` and `train`.\n",
"- Adding drift detectors to a pipeline and specifying their behavior and position within the pipeline is flexible and intuitive.\n",
"\n",
"This notebook explores these various options with examples.\n",
"\n",
"### PipelineElements and their structure\n",
"\n",
"We currently support four types of pipeline elements: `ClassifierPipelineElement`, `RegressorPipelineElement`, `TransformerPipelineElement`, and `DriftDetectorPipelineElement`. They all implement the `PipelineElement` protocol, which provides two functions:\n",
"\n",
"1. `pass_forward(instance) -> Instance`\n",
" - Passes the instance through the pipeline.\n",
" - The specific action taken depends on the type of pipeline element.\n",
" - For example, a `TransformerPipelineElement` applies a transformation to the instance.\n",
"2. `pass_forward_predict(instance, prediction) -> Tuple[Instance, Any]`\n",
" - Similar to `pass_forward` but can also pass along a prediction.\n",
" - For example, a classifier could predict the instance's label and then pass the tuple `(instance, prediction)` to the next element in the pipeline.\n",
"\n",
"This protocol offers great flexibility. For instance, one could develop a `ClusteringPipelineElement` that performs clustering and then passes the instance and clustering result to the next element in the pipeline.\n",
"\n",
"Let's now look at the currently supported pipeline elements:\n",
"\n",
"#### TransformerPipelineElement\n",
"This element is initialized with a CapyMOA `Transformer`.\n",
"- `pass_forward` transforms and returns the provided instance.\n",
"- `pass_forward_predict` transforms the provided instance and returns the transformed instance and the input prediction.\n",
"\n",
"#### ClassifierPipelineElement and RegressorPipelineElement\n",
"These elements are initialized with a CapyMOA `Classifier` or `Regressor`.\n",
"- `pass_forward` trains the learner on the provided instance.\n",
"- `pass_forward_predict` predicts the label/value of the instance and returns `(instance, prediction)`.\n",
"\n",
"#### DriftDetectorPipelineElement\n",
"This element is initialized with a CapyMOA `BaseDriftDetector` and a callable `prepare_drift_detector_input_func` that takes an instance as input and returns the input for the change detector.\n",
"- `pass_forward` does nothing.\n",
"- `pass_forward_predict` updates the change detector. Internally, the drift detector calls `prepare_drift_detector_input_func` and passes the output to the change detector.\n",
"\n",
"The `prepare_drift_detector_input_func` offers flexibility: it can be used to select a subset of features for the drift detector to monitor (e.g., for unsupervised drift detection), to compute the prediction error (e.g., for regression), or to check if the prediction matches the label (for classification).\n",
"\n",
"#### BasePipeline (and inheritors)\n",
"Pipelines themselves are pipeline elements, allowing you to combine them into larger pipelines.\n",
"- `pass_forward` calls `pass_forward` on all its elements.\n",
"- `pass_forward_predict` calls `pass_forward_predict` on all its elements."
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "55d070de-8697-4f98-a11b-eab4e3d5c281",
"metadata": {},
"source": [
"## 1. Running onlineBagging without any preprocessing\n",
"\n",
"First, let us have a look at a simple test-then-train classification example without pipelines. \n",
"- We loop over the instances of the data stream\n",
"- make a prediction,\n",
"- update the evaluator with the prediction and label\n",
"- and then train the classifier on the instance."
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "49ff5008",
"metadata": {
"nbsphinx": "hidden"
},
"outputs": [],
"source": [
"# This cell is hidden on capymoa.org. See docs/contributing/docs.rst\n",
"from util.nbmock import mock_datasets, is_nb_fast\n",
"\n",
"if is_nb_fast():\n",
" mock_datasets()"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "14681f54-23a1-4f93-9145-abf484c91c54",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"82.06656073446328"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"## Test-then-train loop\n",
"from capymoa.datasets import Electricity\n",
"from capymoa.classifier import OnlineBagging\n",
"from capymoa.evaluation import ClassificationEvaluator\n",
"\n",
"## Opening a file as a stream\n",
"elec_stream = Electricity()\n",
"\n",
"# Creating a learner\n",
"ob_learner = OnlineBagging(schema=elec_stream.get_schema(), ensemble_size=5)\n",
"\n",
"# Creating the evaluator\n",
"ob_evaluator = ClassificationEvaluator(schema=elec_stream.get_schema())\n",
"\n",
"while elec_stream.has_more_instances():\n",
" instance = elec_stream.next_instance()\n",
"\n",
" prediction = ob_learner.predict(instance)\n",
" ob_evaluator.update(instance.y_index, prediction)\n",
" ob_learner.train(instance)\n",
"\n",
"ob_evaluator.accuracy()"
]
},
{
"cell_type": "markdown",
"id": "94362841-b267-471b-9a8c-b4094ad81acb",
"metadata": {},
"source": [
"## 2. Transforming instances using pipelines\n",
"\n",
"If we want to perform some preprocessing, such as normalization or feature transformation, or a combination of both, we can chain multiple `Transformer`s within a pipeline. The most basic pipeline class `BasePipeline` already supports this.\n",
"\n",
"Creating a basic pipeline consists of the following steps:\n",
"1. Create a stream instance\n",
"2. Initialize the transformers\n",
"4. Create the `BasePipeline`\n",
"5. Add the transformers to the pipeline\n",
"6. Call `pass_forward` to apply the transformations:"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "6724b905-53a0-49e9-b8a6-af1b30aececc",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"77.5048552259887"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from capymoa.stream.preprocessing import MOATransformer\n",
"from capymoa.stream.preprocessing import BasePipeline\n",
"from capymoa.stream import Stream\n",
"from moa.streams.filters import AddNoiseFilter, NormalisationFilter\n",
"from moa.streams import FilteredStream\n",
"\n",
"elec_stream = Electricity()\n",
"\n",
"# Creating the transformers\n",
"normalisation_transformer = MOATransformer(schema=elec_stream.get_schema(), moa_filter=NormalisationFilter())\n",
"add_noise_transformer = MOATransformer(schema=normalisation_transformer.get_schema(), moa_filter=AddNoiseFilter())\n",
"\n",
"# Creating and populating the pipeline\n",
"pipeline = BasePipeline()\n",
"\n",
"# Add the transformers to the pipeline. We can change the calls to add_transformer, as they return self\n",
"pipeline = (pipeline\n",
" .add_transformer(normalisation_transformer)\n",
" .add_transformer(add_noise_transformer))\n",
"\n",
"# Creating a learner\n",
"ob_learner = OnlineBagging(schema=add_noise_transformer.get_schema(), ensemble_size=5)\n",
"\n",
"# Creating the evaluator\n",
"ob_evaluator = ClassificationEvaluator(schema=elec_stream.get_schema()) \n",
"\n",
"while elec_stream.has_more_instances():\n",
" instance = elec_stream.next_instance()\n",
" transformed_instance = pipeline.pass_forward(instance)\n",
" prediction = ob_learner.predict(transformed_instance)\n",
" ob_evaluator.update(instance.y_index, prediction)\n",
" ob_learner.train(transformed_instance)\n",
"\n",
"ob_evaluator.accuracy()"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "0c1360ef-0583-4c87-8645-1e2d701fffca",
"metadata": {},
"source": [
"## 2. Online Bagging using pipelines and transformers\n",
"\n",
"Similar as classifiers, a `ClassifierPipeline` supports `train` and `test`. Hence, we can use it in the same way as we would use other capymoa classifiers. \n",
"\n",
"- When calling `train`, the pipeline object internally calls `pass_forward` on all elements.\n",
"- When calling test, the pipeline object internally calls `pass_forward_predict` on all elements and then returns the resulting prediction.\n",
"\n",
"Creating a pipeline consists of the following steps:\n",
"\n",
"1. Create a stream instance\n",
"2. Initialize the transformers\n",
"3. Initialize the learner\n",
"4. Create the pipeline. Here, we use a `ClassifierPipeline`\n",
"5. Add the transformers and the classifier\n",
"6. Use the pipeline the same way as any other learner."
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "ae9bb646-e0d1-4de6-b5a1-cff0f0a1b172",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"77.5048552259887"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from capymoa.stream.preprocessing import MOATransformer\n",
"from capymoa.stream.preprocessing import ClassifierPipeline\n",
"from moa.streams.filters import AddNoiseFilter, NormalisationFilter\n",
"\n",
"elec_stream = Electricity()\n",
"\n",
"# Creating the transformers\n",
"normalisation_transformer = MOATransformer(\n",
" schema=elec_stream.get_schema(), moa_filter=NormalisationFilter()\n",
")\n",
"add_noise_transformer = MOATransformer(\n",
" schema=normalisation_transformer.get_schema(), moa_filter=AddNoiseFilter()\n",
")\n",
"\n",
"# Creating a learner\n",
"ob_learner = OnlineBagging(schema=add_noise_transformer.get_schema(), ensemble_size=5)\n",
"\n",
"# Creating and populating the pipeline\n",
"pipeline = (ClassifierPipeline()\n",
" .add_transformer(normalisation_transformer)\n",
" .add_transformer(add_noise_transformer)\n",
" .add_classifier(ob_learner))\n",
"\n",
"# Creating the evaluator\n",
"ob_evaluator = ClassificationEvaluator(schema=elec_stream.get_schema())\n",
"\n",
"while elec_stream.has_more_instances():\n",
" instance = elec_stream.next_instance()\n",
" prediction = pipeline.predict(instance)\n",
" ob_evaluator.update(instance.y_index, prediction)\n",
" pipeline.train(instance)\n",
"\n",
"ob_evaluator.accuracy()"
]
},
{
"cell_type": "markdown",
"id": "676f53b7-0839-47a5-88f9-393b2007855e",
"metadata": {},
"source": [
"We can also get a textual representation of the pipeline:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "31a481db-d23b-4fc8-a689-fc5c14df5fff",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'PE(Transformer(NormalisationFilter)) | PE(Transformer(AddNoiseFilter)) | PE(OnlineBagging) | '"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"str(pipeline)"
]
},
{
"cell_type": "markdown",
"id": "df255274-83cd-41df-a1da-04778bc427aa",
"metadata": {},
"source": [
"### 2.1 Alternative syntax\n",
"* An alternative syntax to define the pipeline is shown below\n",
"* Since the pipeline behaves like a learner, it can be used with high-level evaluation functions like `prequential_evaluation`"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "50cb066b-e3e4-4ffd-ad9d-65631d5462e3",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"AdaptiveRandomForest: 88.55049435028248\n",
"PE(Transformer(NormalisationFilter)) | PE(AdaptiveRandomForest) | : 88.04069562146893\n"
]
},
{
"data": {
"image/png": "",
"text/plain": [
"