{ "cells": [ { "attachments": {}, "cell_type": "markdown", "id": "b773bf8e-c420-44e1-80a6-99f75dd12268", "metadata": {}, "source": [ "## Pipelines and Transformers\n", "\n", "This notebook showcases the current version of data processing pipelines in CapyMOA. \n", "\n", "* Includes examples of how preprocessing can be accomplished via pipelines and transformers.\n", "* Transformers transform an instance, e.g., using standardization, normalization, etc.\n", "* Pipelines bundle transformers, drift detectors, and learners (classifiers, regressors, clustering algorithms etc.)\n", "* Some pipelines act as classifiers or regressors\n", "\n", "Please note that this feature is still under development; some functionality might not yet be available or change in future releases.\n", "\n", "\n", "*More information about CapyMOA can be found in* https://www.capymoa.org\n", "\n", "**notebook last updated on 10/06/2024**\n", "\n", "### Main features\n", "\n", "We've redesigned the pipeline API to be more flexible and modular compared to the initial version.\n", "\n", "- The new API is now generic, meaning it can handle all types of data stream algorithms (e.g., classifiers, regressors, data transformations, change detectors, clustering, etc.). The only requirement is that there exists a `PipelineElement` class compatible with the algorithm. This class represents a single step within the pipeline and provides a unified interface for the overall pipeline object (which is of type `BasePipeline`).\n", "- A pipeline can also function as a `PipelineElement`. This allows for the creation of smaller pipelines that can be combined into a larger pipeline. For instance, you could create separate data cleaning, transformation, and prediction pipelines and then integrate them into one comprehensive pipeline.\n", "- Besides the vanilla `BasePipeline`, we currently support `ClassifierPipeline` and `RegressorPipeline`. These classes act as CapyMOA `Classifiers` and `Regressors`, meaning they support `predict` and `train`.\n", "- Adding drift detectors to a pipeline and specifying their behavior and position within the pipeline is flexible and intuitive.\n", "\n", "This notebook explores these various options with examples.\n", "\n", "### PipelineElements and their structure\n", "\n", "We currently support four types of pipeline elements: `ClassifierPipelineElement`, `RegressorPipelineElement`, `TransformerPipelineElement`, and `DriftDetectorPipelineElement`. They all implement the `PipelineElement` protocol, which provides two functions:\n", "\n", "1. `pass_forward(instance) -> Instance`\n", " - Passes the instance through the pipeline.\n", " - The specific action taken depends on the type of pipeline element.\n", " - For example, a `TransformerPipelineElement` applies a transformation to the instance.\n", "2. `pass_forward_predict(instance, prediction) -> Tuple[Instance, Any]`\n", " - Similar to `pass_forward` but can also pass along a prediction.\n", " - For example, a classifier could predict the instance's label and then pass the tuple `(instance, prediction)` to the next element in the pipeline.\n", "\n", "This protocol offers great flexibility. For instance, one could develop a `ClusteringPipelineElement` that performs clustering and then passes the instance and clustering result to the next element in the pipeline.\n", "\n", "Let's now look at the currently supported pipeline elements:\n", "\n", "#### TransformerPipelineElement\n", "This element is initialized with a CapyMOA `Transformer`.\n", "- `pass_forward` transforms and returns the provided instance.\n", "- `pass_forward_predict` transforms the provided instance and returns the transformed instance and the input prediction.\n", "\n", "#### ClassifierPipelineElement and RegressorPipelineElement\n", "These elements are initialized with a CapyMOA `Classifier` or `Regressor`.\n", "- `pass_forward` trains the learner on the provided instance.\n", "- `pass_forward_predict` predicts the label/value of the instance and returns `(instance, prediction)`.\n", "\n", "#### DriftDetectorPipelineElement\n", "This element is initialized with a CapyMOA `BaseDriftDetector` and a callable `prepare_drift_detector_input_func` that takes an instance as input and returns the input for the change detector.\n", "- `pass_forward` does nothing.\n", "- `pass_forward_predict` updates the change detector. Internally, the drift detector calls `prepare_drift_detector_input_func` and passes the output to the change detector.\n", "\n", "The `prepare_drift_detector_input_func` offers flexibility: it can be used to select a subset of features for the drift detector to monitor (e.g., for unsupervised drift detection), to compute the prediction error (e.g., for regression), or to check if the prediction matches the label (for classification).\n", "\n", "#### BasePipeline (and inheritors)\n", "Pipelines themselves are pipeline elements, allowing you to combine them into larger pipelines.\n", "- `pass_forward` calls `pass_forward` on all its elements.\n", "- `pass_forward_predict` calls `pass_forward_predict` on all its elements." ] }, { "attachments": {}, "cell_type": "markdown", "id": "55d070de-8697-4f98-a11b-eab4e3d5c281", "metadata": {}, "source": [ "## 1. Running onlineBagging without any preprocessing\n", "\n", "First, let us have a look at a simple test-then-train classification example without pipelines. \n", "- We loop over the instances of the data stream\n", "- make a prediction,\n", "- update the evaluator with the prediction and label\n", "- and then train the classifier on the instance." ] }, { "cell_type": "code", "execution_count": 1, "id": "49ff5008", "metadata": { "nbsphinx": "hidden" }, "outputs": [], "source": [ "# This cell is hidden on capymoa.org. See docs/contributing/docs.rst\n", "from util.nbmock import mock_datasets, is_nb_fast\n", "\n", "if is_nb_fast():\n", " mock_datasets()" ] }, { "cell_type": "code", "execution_count": 2, "id": "14681f54-23a1-4f93-9145-abf484c91c54", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "82.06656073446328" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "## Test-then-train loop\n", "from capymoa.datasets import Electricity\n", "from capymoa.classifier import OnlineBagging\n", "from capymoa.evaluation import ClassificationEvaluator\n", "\n", "## Opening a file as a stream\n", "elec_stream = Electricity()\n", "\n", "# Creating a learner\n", "ob_learner = OnlineBagging(schema=elec_stream.get_schema(), ensemble_size=5)\n", "\n", "# Creating the evaluator\n", "ob_evaluator = ClassificationEvaluator(schema=elec_stream.get_schema())\n", "\n", "while elec_stream.has_more_instances():\n", " instance = elec_stream.next_instance()\n", "\n", " prediction = ob_learner.predict(instance)\n", " ob_evaluator.update(instance.y_index, prediction)\n", " ob_learner.train(instance)\n", "\n", "ob_evaluator.accuracy()" ] }, { "cell_type": "markdown", "id": "94362841-b267-471b-9a8c-b4094ad81acb", "metadata": {}, "source": [ "## 2. Transforming instances using pipelines\n", "\n", "If we want to perform some preprocessing, such as normalization or feature transformation, or a combination of both, we can chain multiple `Transformer`s within a pipeline. The most basic pipeline class `BasePipeline` already supports this.\n", "\n", "Creating a basic pipeline consists of the following steps:\n", "1. Create a stream instance\n", "2. Initialize the transformers\n", "4. Create the `BasePipeline`\n", "5. Add the transformers to the pipeline\n", "6. Call `pass_forward` to apply the transformations:" ] }, { "cell_type": "code", "execution_count": 3, "id": "6724b905-53a0-49e9-b8a6-af1b30aececc", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "77.5048552259887" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from capymoa.stream.preprocessing import MOATransformer\n", "from capymoa.stream.preprocessing import BasePipeline\n", "from capymoa.stream import Stream\n", "from moa.streams.filters import AddNoiseFilter, NormalisationFilter\n", "from moa.streams import FilteredStream\n", "\n", "elec_stream = Electricity()\n", "\n", "# Creating the transformers\n", "normalisation_transformer = MOATransformer(schema=elec_stream.get_schema(), moa_filter=NormalisationFilter())\n", "add_noise_transformer = MOATransformer(schema=normalisation_transformer.get_schema(), moa_filter=AddNoiseFilter())\n", "\n", "# Creating and populating the pipeline\n", "pipeline = BasePipeline()\n", "\n", "# Add the transformers to the pipeline. We can change the calls to add_transformer, as they return self\n", "pipeline = (pipeline\n", " .add_transformer(normalisation_transformer)\n", " .add_transformer(add_noise_transformer))\n", "\n", "# Creating a learner\n", "ob_learner = OnlineBagging(schema=add_noise_transformer.get_schema(), ensemble_size=5)\n", "\n", "# Creating the evaluator\n", "ob_evaluator = ClassificationEvaluator(schema=elec_stream.get_schema()) \n", "\n", "while elec_stream.has_more_instances():\n", " instance = elec_stream.next_instance()\n", " transformed_instance = pipeline.pass_forward(instance)\n", " prediction = ob_learner.predict(transformed_instance)\n", " ob_evaluator.update(instance.y_index, prediction)\n", " ob_learner.train(transformed_instance)\n", "\n", "ob_evaluator.accuracy()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "0c1360ef-0583-4c87-8645-1e2d701fffca", "metadata": {}, "source": [ "## 2. Online Bagging using pipelines and transformers\n", "\n", "Similar as classifiers, a `ClassifierPipeline` supports `train` and `test`. Hence, we can use it in the same way as we would use other capymoa classifiers. \n", "\n", "- When calling `train`, the pipeline object internally calls `pass_forward` on all elements.\n", "- When calling test, the pipeline object internally calls `pass_forward_predict` on all elements and then returns the resulting prediction.\n", "\n", "Creating a pipeline consists of the following steps:\n", "\n", "1. Create a stream instance\n", "2. Initialize the transformers\n", "3. Initialize the learner\n", "4. Create the pipeline. Here, we use a `ClassifierPipeline`\n", "5. Add the transformers and the classifier\n", "6. Use the pipeline the same way as any other learner." ] }, { "cell_type": "code", "execution_count": 4, "id": "ae9bb646-e0d1-4de6-b5a1-cff0f0a1b172", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "77.5048552259887" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from capymoa.stream.preprocessing import MOATransformer\n", "from capymoa.stream.preprocessing import ClassifierPipeline\n", "from moa.streams.filters import AddNoiseFilter, NormalisationFilter\n", "\n", "elec_stream = Electricity()\n", "\n", "# Creating the transformers\n", "normalisation_transformer = MOATransformer(\n", " schema=elec_stream.get_schema(), moa_filter=NormalisationFilter()\n", ")\n", "add_noise_transformer = MOATransformer(\n", " schema=normalisation_transformer.get_schema(), moa_filter=AddNoiseFilter()\n", ")\n", "\n", "# Creating a learner\n", "ob_learner = OnlineBagging(schema=add_noise_transformer.get_schema(), ensemble_size=5)\n", "\n", "# Creating and populating the pipeline\n", "pipeline = (ClassifierPipeline()\n", " .add_transformer(normalisation_transformer)\n", " .add_transformer(add_noise_transformer)\n", " .add_classifier(ob_learner))\n", "\n", "# Creating the evaluator\n", "ob_evaluator = ClassificationEvaluator(schema=elec_stream.get_schema())\n", "\n", "while elec_stream.has_more_instances():\n", " instance = elec_stream.next_instance()\n", " prediction = pipeline.predict(instance)\n", " ob_evaluator.update(instance.y_index, prediction)\n", " pipeline.train(instance)\n", "\n", "ob_evaluator.accuracy()" ] }, { "cell_type": "markdown", "id": "676f53b7-0839-47a5-88f9-393b2007855e", "metadata": {}, "source": [ "We can also get a textual representation of the pipeline:" ] }, { "cell_type": "code", "execution_count": 5, "id": "31a481db-d23b-4fc8-a689-fc5c14df5fff", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'PE(Transformer(NormalisationFilter)) | PE(Transformer(AddNoiseFilter)) | PE(OnlineBagging) | '" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "str(pipeline)" ] }, { "cell_type": "markdown", "id": "df255274-83cd-41df-a1da-04778bc427aa", "metadata": {}, "source": [ "### 2.1 Alternative syntax\n", "* An alternative syntax to define the pipeline is shown below\n", "* Since the pipeline behaves like a learner, it can be used with high-level evaluation functions like `prequential_evaluation`" ] }, { "cell_type": "code", "execution_count": 6, "id": "50cb066b-e3e4-4ffd-ad9d-65631d5462e3", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "AdaptiveRandomForest: 88.55049435028248\n", "PE(Transformer(NormalisationFilter)) | PE(AdaptiveRandomForest) | : 88.04069562146893\n" ] }, { "data": { "image/png": "", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "from capymoa.evaluation import prequential_evaluation\n", "from capymoa.classifier import AdaptiveRandomForestClassifier\n", "from capymoa.evaluation.visualization import plot_windowed_results\n", "from capymoa.stream.preprocessing import TransformerPipelineElement, ClassifierPipelineElement\n", "\n", "elec_stream = Electricity()\n", "\n", "# Creating a transformer\n", "normalisation_transformer = MOATransformer(\n", " schema=elec_stream.get_schema(), moa_filter=NormalisationFilter()\n", ")\n", "\n", "# Creating an ARF classifier as a baseline\n", "arf = AdaptiveRandomForestClassifier(\n", " schema=normalisation_transformer.get_schema(), ensemble_size=5\n", ")\n", "\n", "# Alternative syntax. \n", "## first create the pipeline elements\n", "normalisation_transformer_pe = TransformerPipelineElement(normalisation_transformer)\n", "classifier_pe = ClassifierPipelineElement(AdaptiveRandomForestClassifier(schema=add_noise_transformer.get_schema(), ensemble_size=5))\n", "## then pass them as a list to the pipeline initializer\n", "pipeline_arf = ClassifierPipeline([normalisation_transformer_pe, classifier_pe])\n", "\n", "results_arf_pipeline = prequential_evaluation(stream=elec_stream, learner=pipeline_arf, window_size=4500)\n", "results_arf_baseline = prequential_evaluation(stream=elec_stream, learner=arf, window_size=4500)\n", "\n", "print(f\"{arf}: {results_arf_baseline['cumulative'].accuracy()}\")\n", "print(f\"{pipeline_arf}: {results_arf_pipeline['cumulative'].accuracy()}\")\n", "plot_windowed_results(results_arf_pipeline, results_arf_baseline, metric=\"accuracy\", figure_path=None)" ] }, { "cell_type": "markdown", "id": "5cc06f0d-d2a0-4fc6-aa9f-ea80e0d224cd", "metadata": {}, "source": [ "## 3. RegressorPipeline\n", "\n", "* The regression version of the pipeline is quite similar to the classification one" ] }, { "cell_type": "code", "execution_count": 7, "id": "f3c8271b-bbb7-4ca9-97f5-fb41e27ec4fd", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2.5841013025710358\n" ] } ], "source": [ "from capymoa.regressor import AdaptiveRandomForestRegressor\n", "from capymoa.stream.preprocessing import RegressorPipeline\n", "from capymoa.evaluation import RegressionEvaluator\n", "from capymoa.datasets import Fried\n", "\n", "fried_stream = Fried()\n", "\n", "# Creating a transformer\n", "normalisation_transformer = MOATransformer(\n", " schema=fried_stream.get_schema(), moa_filter=NormalisationFilter()\n", ")\n", "\n", "arfreg = AdaptiveRandomForestRegressor(\n", " schema=normalisation_transformer.get_schema(), ensemble_size=5\n", ")\n", "\n", "# Creating and populating the pipeline\n", "pipeline_arfreg = RegressorPipeline()\n", "pipeline_arfreg.add_transformer(normalisation_transformer)\n", "pipeline_arfreg.add_regressor(arfreg)\n", "\n", "# Creating the evaluator\n", "arfreg_evaluator = RegressionEvaluator(schema=fried_stream.get_schema())\n", "\n", "while fried_stream.has_more_instances():\n", " instance = fried_stream.next_instance()\n", " prediction = pipeline_arfreg.predict(instance)\n", " arfreg_evaluator.update(instance.y_value, prediction)\n", " pipeline_arfreg.train(instance)\n", "\n", "print(arfreg_evaluator.rmse())" ] }, { "cell_type": "markdown", "id": "cfd6bceb-9969-4b79-8f91-1d312c72c331", "metadata": {}, "source": [ "## 4. Adding a drift detector to the pipeline\n", "\n", "Let us now add a change detector to the pipeline from section 2.\n", "\n", "Adding a drift detector to a pipeline requires the following steps:\n", "1. create the drift detector\n", "2. define a function that prepares the input for the drift detector based on an instance and a prediction (which can be `None`)\n", "3. create and populate the pipeline\n", "4. run the pipeline" ] }, { "cell_type": "markdown", "id": "4ac8bbd3-009d-4af3-b6a4-23e004f1ba6f", "metadata": {}, "source": [ "### 4.1 Monitoring classifier accuracy" ] }, { "cell_type": "code", "execution_count": 8, "id": "5e18f1ad-c358-4773-8904-b33787585021", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Detected change at index 3487\n", "Detected change at index 8735\n", "Detected change at index 10399\n", "Detected change at index 10559\n", "Detected change at index 10751\n", "Detected change at index 11039\n", "Detected change at index 13343\n", "Detected change at index 13855\n", "Detected change at index 17247\n", "Detected change at index 17343\n", "Detected change at index 18623\n", "Detected change at index 18719\n", "Detected change at index 18975\n", "Detected change at index 19295\n", "Detected change at index 21215\n", "Detected change at index 21375\n", "Detected change at index 23359\n", "Detected change at index 23391\n", "Detected change at index 24127\n", "Detected change at index 25023\n", "Detected change at index 25119\n", "Detected change at index 35327\n", "Detected change at index 35487\n", "Detected change at index 39999\n", "Detected change at index 40127\n", "Detected change at index 40991\n", "Detected change at index 41439\n" ] }, { "data": { "text/plain": [ "77.5048552259887" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from capymoa.drift.detectors import ADWIN\n", "from capymoa.instance import LabeledInstance\n", "from capymoa.type_alias import LabelIndex\n", "\n", "elec_stream = Electricity()\n", "\n", "# Creating the transformers\n", "normalisation_transformer = MOATransformer(schema=elec_stream.get_schema(), moa_filter=NormalisationFilter())\n", "add_noise_transformer = MOATransformer(schema=normalisation_transformer.get_schema(), moa_filter=AddNoiseFilter())\n", "\n", "# Creating a learner\n", "ob_learner = OnlineBagging(schema=add_noise_transformer.get_schema(), ensemble_size=5)\n", "\n", "# Creating a drift detector\n", "drift_detector = ADWIN()\n", "\n", "# Define a function that prepares the input of the drift detector\n", "def label_equals_prediction(instance: LabeledInstance, prediction: LabelIndex) -> LabelIndex:\n", " label = instance.y_index\n", " return int(label == prediction)\n", "\n", "# Creating and populating the pipeline\n", "pipeline = (ClassifierPipeline()\n", " .add_transformer(normalisation_transformer)\n", " .add_transformer(add_noise_transformer)\n", " .add_classifier(ob_learner)\n", " .add_drift_detector(drift_detector, get_drift_detector_input_func=label_equals_prediction))\n", "\n", "# Creating the evaluator\n", "ob_evaluator = ClassificationEvaluator(schema=elec_stream.get_schema()) \n", "\n", "i = 0\n", "while elec_stream.has_more_instances():\n", " instance = elec_stream.next_instance()\n", " prediction = pipeline.predict(instance)\n", " ob_evaluator.update(instance.y_index, prediction)\n", " pipeline.train(instance)\n", " if drift_detector.detected_change():\n", " print(f\"Detected change at index {i}\")\n", " i += 1\n", "\n", "ob_evaluator.accuracy()" ] }, { "cell_type": "markdown", "id": "0392072a-79e6-41d2-9a0c-f72358e936b0", "metadata": {}, "source": [ "### 4.2 Monitoring drift in the first input feature\n", "\n", "We now show how one can easily monitor an input feature by adapting `get_drift_detector_input_func` and the position of the drift detector in the pipline. \n", "\n", "For the sake of illustration, this example is very simple. However, one can easily think of more complex use cases of `get_drift_detector_input_func`. One can provide any object that implements `__call__(instance, prediction)`. For example, one could provide a class that monitors the correlation between a set of input features." ] }, { "cell_type": "code", "execution_count": 9, "id": "d6f2794b-9a65-4be1-a678-fa90dff05c5f", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Detected change at index 95\n", "Detected change at index 159\n" ] }, { "data": { "text/plain": [ "77.5048552259887" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from capymoa.drift.detectors import ADWIN\n", "from capymoa.instance import LabeledInstance\n", "from capymoa.type_alias import LabelIndex\n", "\n", "elec_stream = Electricity()\n", "\n", "# Creating the transformers\n", "normalisation_transformer = MOATransformer(schema=elec_stream.get_schema(), moa_filter=NormalisationFilter())\n", "add_noise_transformer = MOATransformer(schema=normalisation_transformer.get_schema(), moa_filter=AddNoiseFilter())\n", "\n", "# Creating a learner\n", "ob_learner = OnlineBagging(schema=add_noise_transformer.get_schema(), ensemble_size=5)\n", "\n", "# Creating a drift detector\n", "drift_detector = ADWIN()\n", "\n", "# Define a function that prepares the input of the drift detector\n", "def first_feature_is_gt_zero(instance: LabeledInstance, prediction: LabelIndex) -> LabelIndex:\n", " feature_val = instance.x[0]\n", " return int(feature_val > 0.0)\n", "\n", "# Creating and populating the pipeline\n", "pipeline = (ClassifierPipeline()\n", " .add_transformer(normalisation_transformer)\n", " # here, we add the drift detector after the normalization step\n", " .add_drift_detector(drift_detector, get_drift_detector_input_func=first_feature_is_gt_zero) \n", " .add_transformer(add_noise_transformer)\n", " .add_classifier(ob_learner)\n", " )\n", "\n", "# Creating the evaluator\n", "ob_evaluator = ClassificationEvaluator(schema=elec_stream.get_schema()) \n", "\n", "i = 0\n", "while elec_stream.has_more_instances():\n", " instance = elec_stream.next_instance()\n", " prediction = pipeline.predict(instance)\n", " ob_evaluator.update(instance.y_index, prediction)\n", " pipeline.train(instance)\n", " if drift_detector.detected_change():\n", " print(f\"Detected change at index {i}\")\n", " i += 1\n", "\n", "ob_evaluator.accuracy()" ] }, { "cell_type": "markdown", "id": "770f37d9-c293-4379-9412-a07701fe57f3", "metadata": {}, "source": [ "## 5. Pipelines within pipelines\n", "\n", "The following example is based on section 4.1 and shows how one can plug together multiple pipelines" ] }, { "cell_type": "code", "execution_count": 10, "id": "a4fecd51-b0ed-4cf1-b54e-dd3c4185f537", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Detected change at index 3487\n", "Detected change at index 8735\n", "Detected change at index 10399\n", "Detected change at index 10559\n", "Detected change at index 10751\n", "Detected change at index 11039\n", "Detected change at index 13343\n", "Detected change at index 13855\n", "Detected change at index 17247\n", "Detected change at index 17343\n", "Detected change at index 18623\n", "Detected change at index 18719\n", "Detected change at index 18975\n", "Detected change at index 19295\n", "Detected change at index 21215\n", "Detected change at index 21375\n", "Detected change at index 23359\n", "Detected change at index 23391\n", "Detected change at index 24127\n", "Detected change at index 25023\n", "Detected change at index 25119\n", "Detected change at index 35327\n", "Detected change at index 35487\n", "Detected change at index 39999\n", "Detected change at index 40127\n", "Detected change at index 40991\n", "Detected change at index 41439\n" ] }, { "data": { "text/plain": [ "77.5048552259887" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from capymoa.drift.detectors import ADWIN\n", "from capymoa.instance import LabeledInstance\n", "from capymoa.type_alias import LabelIndex\n", "\n", "elec_stream = Electricity()\n", "\n", "# Creating the transformers\n", "normalisation_transformer = MOATransformer(schema=elec_stream.get_schema(), moa_filter=NormalisationFilter())\n", "add_noise_transformer = MOATransformer(schema=normalisation_transformer.get_schema(), moa_filter=AddNoiseFilter())\n", "\n", "# Creating a learner\n", "ob_learner = OnlineBagging(schema=add_noise_transformer.get_schema(), ensemble_size=5)\n", "\n", "# Creating a drift detector\n", "drift_detector = ADWIN()\n", "\n", "# Define a function that prepares the input of the drift detector\n", "def label_equals_prediction(instance: LabeledInstance, prediction: LabelIndex) -> LabelIndex:\n", " label = instance.y_index\n", " return int(label == prediction)\n", "\n", "# Creating and populating the transformation pipeline\n", "trafo_pipeline = (BasePipeline()\n", " .add_transformer(normalisation_transformer)\n", " .add_transformer(add_noise_transformer))\n", "\n", "# Creating and populating the prediction pipeline\n", "prediction_pipeline = ClassifierPipeline().add_classifier(ob_learner)\n", "\n", "# Creating and populating the drift detection pipeline\n", "drift_pipeline = BasePipeline().add_drift_detector(drift_detector, get_drift_detector_input_func=label_equals_prediction)\n", "\n", "# Since pipelines themselves are pipeline elements, we can pass them to the initializer of an overall pipeline object\n", "pipeline = ClassifierPipeline([trafo_pipeline, prediction_pipeline, drift_pipeline])\n", "\n", "# An alternative syntax would be\n", "# pipeline = (ClassifierPipeline()\n", "# .add_pipeline_element(trafo_pipeline)\n", "# .add_pipeline_element(prediction_pipeline)\n", "# .add_pipeline_element(drift_pipeline))\n", "\n", "# Creating the evaluator\n", "ob_evaluator = ClassificationEvaluator(schema=elec_stream.get_schema()) \n", "\n", "i = 0\n", "while elec_stream.has_more_instances():\n", " instance = elec_stream.next_instance()\n", " prediction = pipeline.predict(instance)\n", " ob_evaluator.update(instance.y_index, prediction)\n", " pipeline.train(instance)\n", " if drift_detector.detected_change():\n", " print(f\"Detected change at index {i}\")\n", " i += 1\n", "\n", "ob_evaluator.accuracy()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.19" } }, "nbformat": 4, "nbformat_minor": 5 }