Source code for lightautoml.automl.blend

"""Blenders."""

import logging

from typing import Callable
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import cast

import numpy as np

from scipy.optimize import minimize_scalar

from ..dataset.base import LAMLDataset
from ..dataset.np_pd_dataset import NumpyDataset
from ..dataset.roles import NumericRole
from ..pipelines.ml.base import MLPipeline


logger = logging.getLogger(__name__)

np.seterr(divide="ignore", invalid="ignore")


[docs]class Blender: """Basic class for blending. Blender learns how to make blend on sequence of prediction datasets and prune pipes, that are not used in final blend. """ _outp_dim = None _bypass = False @property def outp_dim(self) -> int: # noqa: D102 return self._outp_dim
[docs] def fit_predict( self, predictions: Sequence[LAMLDataset], pipes: Sequence[MLPipeline] ) -> Tuple[LAMLDataset, Sequence[MLPipeline]]: """Wraps custom ``._fit_predict`` methods of blenders. Method wraps individual ``._fit_predict`` method of blenders. If input is single model - take it, else ``._fit_predict`` Note - some pipelines may have more than 1 model. So corresponding prediction dataset have multiple prediction cols. Args: predictions: Sequence of datasets with predictions. pipes: Sequence of pipelines. Returns: Single prediction dataset and sequence of pruned pipelines. """ if len(pipes) == 1 and len(pipes[0].ml_algos) == 1: self._bypass = True return predictions[0], pipes return self._fit_predict(predictions, pipes)
def _fit_predict( self, predictions: Sequence[LAMLDataset], pipes: Sequence[MLPipeline] ) -> Tuple[LAMLDataset, Sequence[MLPipeline]]: """Defines how to fit, predict and prune - Abstract. Args: predictions: Sequence of datasets with predictions. pipes: Sequence of pipelines. Returns: # noqa: DAR202 Single prediction dataset and sequence of pruned ``MLPipelines``. """ raise NotImplementedError
[docs] def predict(self, predictions: Sequence[LAMLDataset]) -> LAMLDataset: """Wraps custom ``._fit_predict`` methods of blenders. Args: predictions: Sequence of predictions from pruned datasets. Returns: Dataset with predictions. """ if self._bypass: return predictions[0] return self._predict(predictions)
def _predict(self, predictions: Sequence[LAMLDataset]) -> LAMLDataset: """Blend predictions on new sample. Args: predictions: Sequence of predictions from pruned datasets. Returns: # noqa: DAR201 Dataset with predictions. """ raise NotImplementedError
[docs] def split_models(self, predictions: Sequence[LAMLDataset]) -> Tuple[Sequence[LAMLDataset], List[int], List[int]]: """Split predictions by single model prediction datasets. Args: predictions: Sequence of datasets with predictions. Returns: Split predictions, model indices, pipe indices. """ splitted_preds = [] model_idx = [] pipe_idx = [] for n, preds in enumerate(predictions): features = preds.features n_models = len(features) // self.outp_dim for k in range(n_models): curr_pred = preds[:, features[k * self.outp_dim : (k + 1) * self.outp_dim]] splitted_preds.append(curr_pred) model_idx.append(k) pipe_idx.append(n) return splitted_preds, model_idx, pipe_idx
def _set_metadata(self, predictions: Sequence[LAMLDataset], pipes: Sequence[MLPipeline]): pred0 = predictions[0] pipe0 = pipes[0] self._outp_dim = pred0.shape[1] // len(pipe0.ml_algos) self._outp_prob = pred0.task.name in ["binary", "multiclass"] self._score = predictions[0].task.get_dataset_metric()
[docs] def score(self, dataset: LAMLDataset) -> float: """Score metric for blender. Args: dataset: Blended predictions dataset. Returns: Metric value. """ return self._score(dataset, True)
[docs]class BestModelSelector(Blender): """Select best single model from level. Drops pipes that are not used in calc best model. Works in general case (even on some custom things) and most efficient on inference. Perform worse than other on tables, specially if some of models was terminated by timer. """ def _fit_predict( self, predictions: Sequence[LAMLDataset], pipes: Sequence[MLPipeline] ) -> Tuple[LAMLDataset, Sequence[MLPipeline]]: """Simple fit - just take one best. Args: predictions: Sequence of datasets with predictions. pipes: Sequence of pipelines. Returns: Single prediction dataset and Sequence of pruned pipelines. """ self._set_metadata(predictions, pipes) splitted_preds, model_idx, pipe_idx = self.split_models(predictions) best_pred = None best_pipe_idx = 0 best_model_idx = 0 best_score = -np.inf for pred, mod, pipe in zip(splitted_preds, model_idx, pipe_idx): score = self.score(pred) if score > best_score: best_pipe_idx = pipe best_model_idx = mod best_score = score best_pred = pred best_pipe = pipes[best_pipe_idx] best_pipe.ml_algos = [best_pipe.ml_algos[best_model_idx]] return best_pred, [best_pipe] def _predict(self, predictions: Sequence[LAMLDataset]) -> LAMLDataset: """Simple predict - pruned pipe is a single model. Args: predictions: Sequence of predictions from pruned dataset. Returns: Dataset with predictions. """ return predictions[0]
[docs]class MeanBlender(Blender): """Simple average level predictions. Works only with TabularDatasets. Doesn't require target to fit. No pruning. """ def _get_mean_pred(self, splitted_preds: Sequence[NumpyDataset]) -> NumpyDataset: outp = splitted_preds[0].empty() pred = np.nanmean([x.data for x in splitted_preds], axis=0) outp.set_data( pred, ["MeanBlend_{0}".format(x) for x in range(pred.shape[1])], NumericRole(np.float32, prob=self._outp_prob), ) return outp def _fit_predict( self, predictions: Sequence[NumpyDataset], pipes: Sequence[MLPipeline] ) -> Tuple[NumpyDataset, Sequence[MLPipeline]]: """Simple fit_predict - just average and no prune. Args: predictions: Sequence of predictions. pipes: Sequence of pipelines. Returns: Single prediction dataset and Sequence of pruned pipelines. """ self._set_metadata(predictions, pipes) splitted_preds, _, __ = cast(List[NumpyDataset], self.split_models(predictions)) outp = self._get_mean_pred(splitted_preds) return outp, pipes def _predict(self, predictions: Sequence[LAMLDataset]) -> LAMLDataset: """Simple fit_predict - just average. Args: predictions: Dataset with predictions. Returns: Dataset with averaged predictions. """ splitted_preds, _, __ = cast(List[NumpyDataset], self.split_models(predictions)) outp = self._get_mean_pred(splitted_preds) return outp
[docs]class WeightedBlender(Blender): """Weighted Blender based on coord descent, optimize task metric directly. Weight sum eq. 1. Good blender for tabular data, even if some predictions are NaN (ex. timeout). Model with low weights will be pruned. Args: max_iters: Max number of coord desc loops. max_inner_iters: Max number of iters to solve inner scalar optimization task. max_nonzero_coef: Maximum model weight value to stay in ensemble. """ def __init__( self, max_iters: int = 5, max_inner_iters: int = 7, max_nonzero_coef: float = 0.05, ): self.max_iters = max_iters self.max_inner_iters = max_inner_iters self.max_nonzero_coef = max_nonzero_coef self.wts = [1] def _get_weighted_pred(self, splitted_preds: Sequence[NumpyDataset], wts: Optional[np.ndarray]) -> NumpyDataset: length = len(splitted_preds) if wts is None: wts = np.ones(length, dtype=np.float32) / length weighted_pred = np.nansum([x.data * w for (x, w) in zip(splitted_preds, wts)], axis=0).astype(np.float32) not_nulls = np.sum( [np.logical_not(np.isnan(x.data).any(axis=1)) * w for (x, w) in zip(splitted_preds, wts)], axis=0, ).astype(np.float32) not_nulls = not_nulls[:, np.newaxis] weighted_pred /= not_nulls weighted_pred = np.where(not_nulls == 0, np.nan, weighted_pred) outp = splitted_preds[0].empty() outp.set_data( weighted_pred, ["WeightedBlend_{0}".format(x) for x in range(weighted_pred.shape[1])], NumericRole(np.float32, prob=self._outp_prob), ) return outp def _get_candidate(self, wts: np.ndarray, idx: int, value: float): candidate = wts.copy() sl = np.arange(wts.shape[0]) != idx s = candidate[sl].sum() candidate[sl] = candidate[sl] / s * (1 - value) candidate[idx] = value # this is the part for pipeline pruning order = candidate.argsort() for idx in order: if candidate[idx] < self.max_nonzero_coef: candidate[idx] = 0 candidate /= candidate.sum() else: break return candidate def _get_scorer(self, splitted_preds: Sequence[NumpyDataset], idx: int, wts: np.ndarray) -> Callable: def scorer(x): candidate = self._get_candidate(wts, idx, x) pred = self._get_weighted_pred(splitted_preds, candidate) score = self.score(pred) return -score return scorer def _optimize(self, splitted_preds: Sequence[NumpyDataset]) -> np.ndarray: length = len(splitted_preds) candidate = np.ones(length, dtype=np.float32) / length pre_candidate = candidate best_pred = self._get_weighted_pred(splitted_preds, candidate) best_score = self.score(best_pred) logger.info("Blending: optimization starts with equal weights and score \x1b[1m{0}\x1b[0m".format(best_score)) score = best_score iter_best_score = None iter_best_weights = None for _ in range(self.max_iters): flg_no_upd = True for i in range(len(splitted_preds)): if candidate[i] == 1: continue obj = self._get_scorer(splitted_preds, i, candidate) opt_res = minimize_scalar( obj, method="Bounded", bounds=(0, 1), options={"disp": False, "maxiter": self.max_inner_iters}, ) w = opt_res.x score = -opt_res.fun pre_candidate = self._get_candidate(candidate, i, w) if i == 0 or iter_best_score < score: iter_best_score = score iter_best_weights = pre_candidate if score > best_score: flg_no_upd = False best_score = score # if w < self.max_nonzero_coef: # w = 0 candidate = pre_candidate logger.info( "Blending: iteration \x1b[1m{0}\x1b[0m: score = \x1b[1m{1}\x1b[0m, weights = \x1b[1m{2}\x1b[0m".format( _, iter_best_score, iter_best_weights ) ) if flg_no_upd: logger.info("Blending: no score update. Terminated\n") break return candidate @staticmethod def _prune_pipe( pipes: Sequence[MLPipeline], wts: np.ndarray, pipe_idx: np.ndarray ) -> Tuple[Sequence[MLPipeline], np.ndarray]: new_pipes = [] for i in range(max(pipe_idx) + 1): pipe = pipes[i] weights = wts[np.array(pipe_idx) == i] pipe.ml_algos = [x for (x, w) in zip(pipe.ml_algos, weights) if w > 0] new_pipes.append(pipe) new_pipes = [x for x in new_pipes if len(x.ml_algos) > 0] wts = wts[wts > 0] return new_pipes, wts def _fit_predict( self, predictions: Sequence[NumpyDataset], pipes: Sequence[MLPipeline] ) -> Tuple[NumpyDataset, Sequence[MLPipeline]]: """Perform coordinate descent. Args: predictions: Sequence of prediction datasets. pipes: Sequence of pipelines. Returns: Single prediction dataset and Sequence of pruned pipelines. Returns: Dataset and MLPipeline. """ self._set_metadata(predictions, pipes) splitted_preds, _, pipe_idx = cast(List[NumpyDataset], self.split_models(predictions)) wts = self._optimize(splitted_preds) splitted_preds = [x for (x, w) in zip(splitted_preds, wts) if w > 0] pipes, self.wts = self._prune_pipe(pipes, wts, pipe_idx) outp = self._get_weighted_pred(splitted_preds, self.wts) return outp, pipes def _predict(self, predictions: Sequence[LAMLDataset]) -> LAMLDataset: """Simple - weighted average. Args: predictions: Sequence of predictions. Returns: Dataset with weighted predictions. """ splitted_preds, _, __ = cast(List[NumpyDataset], self.split_models(predictions)) outp = self._get_weighted_pred(splitted_preds, self.wts) return outp