#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2017-2024, Cabral, Juan
# Copyright (c) 2025, QuatroPe; ClariĆ”, Felipe
# License: MIT
# Full Text:
# https://github.com/quatrope/feets/blob/master/LICENSE
# =============================================================================
# DOC
# =============================================================================
"""Run multiple feature extractors in parallel."""
# =============================================================================
# IMPORTS
# =============================================================================
import copy
import dask
from dask.delayed import delayed
__all__ = ["run", "DataRequiredError"]
# =============================================================================
# CONSTANTS
# =============================================================================
DEFAULT_DASK_OPTIONS = {"scheduler": "processes"}
# =============================================================================
# EXCEPTIONS
# =============================================================================
[docs]
class DataRequiredError(ValueError):
"""A required data vector is missing from the light curve."""
pass
# =============================================================================
# RUNNER
# =============================================================================
def _validate_required_data_single(*, required_data, lc):
missing_data = set(required_data).difference(lc)
if missing_data:
missing_str = ", ".join(missing_data)
raise DataRequiredError(
f"Missing required data vectors in light curve: {missing_str}"
)
def _validate_required_data(*, required_data, lcs, dask_options):
validations = [
_validate_required_data_single(
required_data=required_data,
lc=lc,
)
for lc in lcs
]
dask.compute(*validations, **dask_options)
@delayed
def _get_feature(results, feature):
return results[feature]
@delayed
def _extract_and_validate(extractor, kwargs):
results = extractor.extract(**kwargs)
extractor.validate_extract(results)
return results
def _extract_selected_features(extractors, data, selected_features):
delayed_features = {}
for extractor in extractors:
kwargs = extractor.prepare_extract(data, delayed_features)
delayed_results = _extract_and_validate(extractor, kwargs)
for feature in extractor.get_features():
delayed_features[feature] = _get_feature(delayed_results, feature)
return {
feature: delayed_features[feature] for feature in selected_features
}
def _run_single(*, extractors, selected_features, lc):
delayed_features = _extract_selected_features(
extractors, lc, selected_features
)
return delayed_features
[docs]
def run(
*,
extractors,
selected_features,
required_data,
lcs,
dask_options=None,
):
"""Run instances of feature extractors on a collection of light curves.
Executes the specified extractor instances on each provided light curve,
returning the extracted features for each. Feature extraction is performed
in parallel using Dask, enabling efficient computation across multiple
light curves. The order of execution respects dependencies between
extractors; ensure that the `extractors` list is topologically sorted so
that dependencies are satisfied.
Parameters
----------
extractors : array_like of feets.extractors.Extractor
Feature extractor instances to apply. Must be sorted so that any
extractor appears after those it depends on.
selected_features : array_like of str
Names of features to extract from each light curve.
required_data : array_like of str
Names of required data fields that must be present in each light curve.
lcs : array_like of dict
Light curves to process, each represented as a dictionary of data
vectors.
dask_options : dict, optional
Options for the Dask scheduler. Defaults to
``{"scheduler": "processes"}``.
Returns
-------
list of dict
List of dictionaries, one per input light curve, with the extracted
feature values. Each dictionary contains the extracted features
specified in `selected_features`. The order of the list matches the
input `lcs`.
Raises
------
DataRequiredError
If any of the required data vectors are missing from a light curve.
See Also
--------
feets.Extractor : Abstract base class for feature extractors.
feets.FeatureSpace :
Class to select and extract features from a time series.
dask.compute
Notes
-----
Feature extraction is parallelized using Dask. You can control parallelism
and scheduler behavior via the `dask_options` parameter.
For more information on Dask, see: https://docs.dask.org/en/stable/
Examples
--------
>>> from feets.extractors import Mean
>>>
>>> # Instantiate the feature extractor
>>> mean_extractor = Mean()
>>>
>>> # Light curves to process
>>> lcs = [{"magnitude": [1, 2, 3]}, {"magnitude": [4, 5, 6]}]
>>>
>>> # Run the feature extraction
>>> run(
... extractors=[mean_extractor],
... selected_features=["Mean"],
... required_data=["magnitude"],
... lcs=lcs
... )
[{'Mean': np.float64(2.0)}, {'Mean': np.float64(5.0)}]
"""
if dask_options is None:
dask_options = copy.deepcopy(DEFAULT_DASK_OPTIONS)
_validate_required_data(
required_data=required_data, lcs=lcs, dask_options=dask_options
)
delayed_features_by_lc = [
_run_single(
extractors=extractors,
selected_features=selected_features,
lc=lc,
)
for lc in lcs
]
features_by_lc = dask.compute(*delayed_features_by_lc, **dask_options)
return list(features_by_lc)