Source code for lightautoml.ml_algo.tuning.optuna

"""Classes to implement hyperparameter tuning using Optuna."""

import logging

from copy import copy
from copy import deepcopy
from typing import Callable
from typing import Optional
from typing import Tuple
from typing import TypeVar
from typing import Union

import optuna
from tqdm import tqdm

from ...dataset.base import LAMLDataset
from ..base import MLAlgo
from .base import Choice
from .base import ParamsTuner
from .base import Uniform
from ...validation.base import HoldoutIterator
from ...validation.base import TrainValidIterator
from ...utils.logging import get_stdout_level


logger = logging.getLogger(__name__)
optuna.logging.enable_propagation()
optuna.logging.disable_default_handler()
optuna.logging.set_verbosity(optuna.logging.DEBUG)

TunableAlgo = TypeVar("TunableAlgo", bound=MLAlgo)


class ChoiceWrapOptuna:
    """TODO."""

    def __init__(self, choice) -> None:
        self.choice = choice

    def __call__(self, name, trial):
        """_summary_.

        Args:
            name (_type_): _description_
            trial (_type_): _description_

        Returns:
            _type_: _description_
        """
        return trial.suggest_categorical(name=name, choices=self.choice.options)


class UniformWrapOptuna:
    """TODO."""

    def __init__(self, choice) -> None:
        self.choice = choice

    def __call__(self, name, trial):
        """_summary_.

        Args:
            name (_type_): _description_
            trial (_type_): _description_

        Returns:
            _type_: _description_
        """
        if (self.choice.q is not None) and float(self.choice.q).is_integer() and (self.choice.q == 1):
            result = trial.suggest_int(name=name, low=self.choice.low, high=self.choice.high)
        else:
            result = trial.suggest_float(
                name=name, low=self.choice.low, high=self.choice.high, step=self.choice.q, log=self.choice.log
            )

        return result


OPTUNA_DISTRIBUTIONS_MAP = {Choice: ChoiceWrapOptuna, Uniform: UniformWrapOptuna}


[docs] class OptunaTuner(ParamsTuner): """Wrapper for optuna tuner. Args: timeout: Maximum learning time. n_trials: Maximum number of trials. direction: Direction of optimization. Set ``minimize`` for minimization and ``maximize`` for maximization. fit_on_holdout: Will be used holdout cv-iterator. random_state: Seed for optuna sampler. fail_tolerance: the maximum allowed percentage of failed tuner trials. Exception will be thrown after crossing the threshold value. """ _name: str = "OptunaTuner" study: optuna.study.Study = None estimated_n_trials: int = None mean_trial_time: Optional[int] = None def __init__( # TODO: For now, metric is designed to be greater is better. Change maximize param after metric refactor if needed self, timeout: Optional[int] = 1000, n_trials: Optional[int] = 100, direction: Optional[str] = "maximize", fit_on_holdout: bool = True, random_state: int = 42, fail_tolerance: float = 0.5, ): self.timeout = timeout self.n_trials = n_trials self.estimated_n_trials = n_trials self.direction = direction self._fit_on_holdout = fit_on_holdout self.random_state = random_state self.fail_tolerance = fail_tolerance def _upd_timeout(self, timeout): self.timeout = min(self.timeout, timeout)
[docs] def fit( self, ml_algo: TunableAlgo, train_valid_iterator: Optional[TrainValidIterator] = None, ) -> Tuple[Optional[TunableAlgo], Optional[LAMLDataset]]: """Tune model. Args: ml_algo: Algo that is tuned. train_valid_iterator: Classic cv-iterator. Returns: Tuple (None, None) if an optuna exception raised or ``fit_on_holdout=True`` and ``train_valid_iterator`` is not :class:`~lightautoml.validation.base.HoldoutIterator`. Tuple (MlALgo, preds_ds) otherwise. """ assert not ml_algo.is_fitted, "Fitted algo cannot be tuned." # optuna.logging.set_verbosity(logger.getEffectiveLevel()) # upd timeout according to ml_algo timer estimated_tuning_time = ml_algo.timer.estimate_tuner_time(len(train_valid_iterator)) if estimated_tuning_time: # TODO: Check for minimal runtime! estimated_tuning_time = max(estimated_tuning_time, 1) self._upd_timeout(estimated_tuning_time) logger.info( f"Start hyperparameters optimization for \x1b[1m{ml_algo._name}\x1b[0m ... Time budget is {self.timeout:.2f} secs" ) metric_name = train_valid_iterator.train.task.get_dataset_metric().name ml_algo = deepcopy(ml_algo) flg_new_iterator = False if self._fit_on_holdout and not isinstance(train_valid_iterator, HoldoutIterator): train_valid_iterator = train_valid_iterator.convert_to_holdout_iterator() flg_new_iterator = True # TODO: Check if time estimation will be ok with multiprocessing def update_trial_time(study: optuna.study.Study, trial: optuna.trial.FrozenTrial): """Callback for number of iteration with time cut-off. Args: study: Optuna study object. trial: Optuna trial object. """ ml_algo.mean_trial_time = study.trials_dataframe()["duration"].mean().total_seconds() self.estimated_n_trials = min(self.n_trials, self.timeout // ml_algo.mean_trial_time) logger.info3( f"\x1b[1mTrial {len(study.trials)}\x1b[0m with hyperparameters {trial.params} scored {trial.value} in {trial.duration}" ) def check_fail_tolerance(study: optuna.study.Study, trial: optuna.trial.FrozenTrial): df = study.trials_dataframe() if (self.estimated_n_trials > 0) and ( df[df["state"] == "FAIL"].shape[0] / self.estimated_n_trials > self.fail_tolerance ): raise Exception( f"Too much trials was failed ({df[df['state'] == 'FAIL'].shape[0]} of {df.shape[0]}). Check the model or search space for it." ) try: # Custom progress bar def custom_progress_bar(study: optuna.study.Study, trial: optuna.trial.FrozenTrial): best_trial = study.best_trial progress_bar.set_postfix(best_trial=best_trial.number, best_value=best_trial.value) progress_bar.update(1) # Initialize progress bar if get_stdout_level() in [logging.INFO, logging.INFO2]: progress_bar = tqdm(total=self.n_trials, desc="Optimization Progress") sampler = optuna.samplers.TPESampler(seed=self.random_state) self.study = optuna.create_study(direction=self.direction, sampler=sampler) self.study.optimize( func=self._get_objective( ml_algo=ml_algo, estimated_n_trials=self.estimated_n_trials, train_valid_iterator=train_valid_iterator, ), n_trials=self.n_trials, timeout=self.timeout, callbacks=( [update_trial_time, check_fail_tolerance, custom_progress_bar] if get_stdout_level() in [logging.INFO, logging.INFO2] else [update_trial_time, check_fail_tolerance] ), catch=[Exception], ) # Close the progress bar if it was initialized if get_stdout_level() in [logging.INFO, logging.INFO2]: progress_bar.close() # need to update best params here self._best_params = self.study.best_params ml_algo.params = self._best_params logger.info(f"Hyperparameters optimization for \x1b[1m{ml_algo._name}\x1b[0m completed") logger.info2( f"The set of hyperparameters \x1b[1m{self._best_params}\x1b[0m\n achieve {self.study.best_value:.4f} {metric_name}" ) if flg_new_iterator: # if tuner was fitted on holdout set we dont need to save train results return None, None preds_ds = ml_algo.fit_predict(train_valid_iterator) return ml_algo, preds_ds except optuna.exceptions.OptunaError: return None, None
def _get_objective( self, ml_algo: TunableAlgo, estimated_n_trials: int, train_valid_iterator: TrainValidIterator, ) -> Callable[[optuna.trial.Trial], Union[float, int]]: """Get objective. Args: ml_algo: Tunable algorithm. estimated_n_trials: Maximum number of hyperparameter estimations. train_valid_iterator: Used for getting parameters depending on dataset. Returns: Callable objective. """ assert isinstance(ml_algo, MLAlgo) def objective(trial: optuna.trial.Trial) -> float: _ml_algo = deepcopy(ml_algo) optimization_search_space = _ml_algo.optimization_search_space if not optimization_search_space: optimization_search_space = _ml_algo._get_default_search_spaces( suggested_params=_ml_algo.init_params_on_input(train_valid_iterator), estimated_n_trials=estimated_n_trials, ) if callable(optimization_search_space): _ml_algo.params = optimization_search_space( trial=trial, estimated_n_trials=estimated_n_trials, suggested_params=_ml_algo.init_params_on_input(train_valid_iterator), ) else: _ml_algo.params = self._sample( trial=trial, optimization_search_space=optimization_search_space, suggested_params=_ml_algo.init_params_on_input(train_valid_iterator), ) output_dataset = _ml_algo.fit_predict(train_valid_iterator=train_valid_iterator) return _ml_algo.score(output_dataset) return objective def _sample( self, optimization_search_space, trial: optuna.trial.Trial, suggested_params: dict, ) -> dict: # logger.info3(f'Suggested parameters: {suggested_params}') trial_values = copy(suggested_params) for parameter_name, search_space in optimization_search_space.items(): not_supported = True for key_class in OPTUNA_DISTRIBUTIONS_MAP: if isinstance(search_space, key_class): wrapped_search_space = OPTUNA_DISTRIBUTIONS_MAP[key_class](search_space) trial_values[parameter_name] = wrapped_search_space( name=parameter_name, trial=trial, ) not_supported = False if not_supported: raise ValueError(f"Optuna does not support distribution {search_space}") return trial_values
[docs] def plot(self): """Plot optimization history of all trials in a study.""" return optuna.visualization.plot_optimization_history(self.study)