# TODO: проверить что NLP не падает, CV
"""Tabular presets."""
import logging
import os
from collections import Counter
from copy import copy
from copy import deepcopy
from typing import Iterable
from typing import Optional
from typing import Sequence
from typing import cast
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from joblib import Parallel
from joblib import delayed
from pandas import DataFrame
from tqdm import tqdm
from ...addons.utilization import TimeUtilization
from ...dataset.np_pd_dataset import NumpyDataset
from ...ml_algo.boost_cb import BoostCB
from ...ml_algo.boost_lgbm import BoostLGBM
from ...ml_algo.dl_model import TorchModel
from ...ml_algo.linear_sklearn import LinearLBFGS
from ...ml_algo.random_forest import RandomForestSklearn
from ...ml_algo.tuning.optuna import DLOptunaTuner
from ...ml_algo.tuning.optuna import OptunaTuner
from ...pipelines.features.lgb_pipeline import LGBAdvancedPipeline
from ...pipelines.features.lgb_pipeline import LGBSeqSimpleFeatures
from ...pipelines.features.lgb_pipeline import LGBSimpleFeatures
from ...pipelines.features.linear_pipeline import LinearFeatures
from ...pipelines.features.torch_pipeline import TorchSimpleFeatures
from ...pipelines.ml.nested_ml_pipe import NestedTabularMLPipeline
from ...pipelines.selection.base import ComposedSelector
from ...pipelines.selection.base import SelectionPipeline
from ...pipelines.selection.importance_based import ImportanceCutoffSelector
from ...pipelines.selection.importance_based import ModelBasedImportanceEstimator
from ...pipelines.selection.permutation_importance_based import (
NpIterativeFeatureSelector,
)
from ...pipelines.selection.permutation_importance_based import (
NpPermutationImportanceEstimator,
)
from ...reader.base import DictToPandasSeqReader
from ...reader.base import PandasToPandasReader
from ...reader.tabular_batch_generator import ReadableToDf
from ...reader.tabular_batch_generator import read_batch
from ...reader.tabular_batch_generator import read_data
from ...tasks import Task
from ..blend import MeanBlender
from ..blend import WeightedBlender
from .base import AutoMLPreset
from .base import upd_params
from .utils import calc_feats_permutation_imps
from .utils import change_datetime
from .utils import plot_pdp_with_distribution
_base_dir = os.path.dirname(__file__)
logger = logging.getLogger(__name__)
[docs]class TabularAutoML(AutoMLPreset):
"""Classic preset - work with tabular data.
Supported data roles - numbers, dates, categories.
Limitations:
- No memory management
- No text support
GPU support in catboost/lightgbm (if installed for GPU) training.
Commonly _params kwargs (ex. timing_params) set via
config file (config_path argument).
If you need to change just few params, it's possible
to pass it as dict of dicts, like json.
To get available params please look on default config template.
Also you can find there param description.
To generate config template call
:meth:`TabularAutoML.get_config('config_path.yml')`.
Args:
task: Task to solve.
timeout: Timeout in seconds.
memory_limit: Memory limit that are passed to each automl.
cpu_limit: CPU limit that that are passed to each automl.
gpu_ids: GPU IDs that are passed to each automl.
debug: To catch running model exceptions or not.
timing_params: Timing param dict. Optional.
config_path: Path to config file.
general_params: General param dict.
reader_params: Reader param dict.
read_csv_params: Params to pass ``pandas.read_csv``
(case of train/predict from file).
nested_cv_params: Param dict for nested cross-validation.
tuning_params: Params of Optuna tuner.
selection_params: Params of feature selection.
lgb_params: Params of lightgbm model.
cb_params: Params of catboost model.
rf_params: Params of Sklearn Random Forest model.
linear_l2_params: Params of linear model.
nn_params: Params of neural network model.
gbm_pipeline_params: Params of feature generation
for boosting models.
linear_pipeline_params: Params of feature generation
for linear models.
nn_pipeline_params: Params of feature generation
for neural network models.
"""
_default_config_path = "tabular_config.yml"
# set initial runtime rate guess for first level models
_time_scores = {
"lgb": 1,
"lgb_tuned": 3,
"linear_l2": 0.7,
"cb": 2,
"cb_tuned": 6,
"rf": 5,
"rf_tuned": 10,
"nn": 10,
"nn_tuned": 20,
}
def __init__(
self,
task: Task,
timeout: int = 3600,
memory_limit: int = 16,
cpu_limit: int = 4,
gpu_ids: Optional[str] = "all",
debug: bool = False,
timing_params: Optional[dict] = None,
config_path: Optional[str] = None,
general_params: Optional[dict] = None,
reader_params: Optional[dict] = None,
read_csv_params: Optional[dict] = None,
nested_cv_params: Optional[dict] = None,
tuning_params: Optional[dict] = None,
selection_params: Optional[dict] = None,
lgb_params: Optional[dict] = None,
cb_params: Optional[dict] = None,
rf_params: Optional[dict] = None,
linear_l2_params: Optional[dict] = None,
nn_params: Optional[dict] = None,
gbm_pipeline_params: Optional[dict] = None,
linear_pipeline_params: Optional[dict] = None,
nn_pipeline_params: Optional[dict] = None,
time_series_pipeline_params: Optional[dict] = None,
is_time_series: bool = False,
):
super().__init__(task, timeout, memory_limit, cpu_limit, gpu_ids, debug, timing_params, config_path)
#
self.is_time_series = is_time_series
# upd manual params
for name, param in zip(
[
"general_params",
"reader_params",
"read_csv_params",
"nested_cv_params",
"tuning_params",
"lgb_params",
"cb_params",
"rf_params",
"linear_l2_params",
"nn_params",
"gbm_pipeline_params",
"linear_pipeline_params",
"nn_pipeline_params",
],
[
general_params,
reader_params,
read_csv_params,
nested_cv_params,
tuning_params,
lgb_params,
cb_params,
rf_params,
linear_l2_params,
nn_params,
gbm_pipeline_params,
linear_pipeline_params,
nn_pipeline_params,
],
):
if param is None:
param = {}
self.__dict__[name] = upd_params(self.__dict__[name], param)
# if not time-series mode --> update selection_params too
if not self.is_time_series:
for name, param in zip(["selection_params"], [selection_params]):
if param is None:
param = {}
self.__dict__[name] = upd_params(self.__dict__[name], param)
# if time-series mode --> update time_series_pipeline_params
if self.is_time_series:
for name, param in zip(["time_series_pipeline_params"], [time_series_pipeline_params]):
if param is None:
param = {}
self.__dict__[name] = upd_params(self.__dict__[name], param)
def infer_auto_params(self, train_data: DataFrame, multilevel_avail: bool = False):
length = train_data.shape[0]
# infer optuna tuning iteration based on dataframe len
if self.tuning_params["max_tuning_iter"] == "auto":
if length < 10000:
self.tuning_params["max_tuning_iter"] = 100
elif length < 30000:
self.tuning_params["max_tuning_iter"] = 50
elif length < 100000:
self.tuning_params["max_tuning_iter"] = 10
else:
self.tuning_params["max_tuning_iter"] = 5
if self.general_params["use_algos"] == "auto":
# TODO: More rules and add cases
self.general_params["use_algos"] = [["lgb", "lgb_tuned", "linear_l2", "cb", "cb_tuned"]]
if self.task.name == "multi:reg" and self.is_time_series:
self.general_params["use_algos"] = [["cb", "linear_l2", "rf"]]
else:
if self.task.name == "multiclass" and multilevel_avail:
self.general_params["use_algos"].append(["linear_l2", "lgb"])
if (self.task.name == "multi:reg") or (self.task.name == "multilabel"):
self.general_params["use_algos"] = [["linear_l2", "cb", "rf", "rf_tuned", "cb_tuned"]]
if not self.general_params["nested_cv"]:
self.nested_cv_params["cv"] = 1
# check gpu to use catboost
gpu_cnt = torch.cuda.device_count()
gpu_ids = self.gpu_ids
if gpu_cnt > 0 and gpu_ids:
if gpu_ids == "all":
gpu_ids = ",".join(list(map(str, range(gpu_cnt))))
self.nn_params["device"] = gpu_ids.split(",")
self.cb_params["default_params"]["task_type"] = "GPU"
self.cb_params["default_params"]["devices"] = gpu_ids.replace(",", ":")
else:
self.nn_params["device"] = "cpu"
# check all n_jobs params
cpu_cnt = min(os.cpu_count(), self.cpu_limit)
torch.set_num_threads(cpu_cnt)
self.cb_params["default_params"]["thread_count"] = min(
self.cb_params["default_params"]["thread_count"], cpu_cnt
)
self.lgb_params["default_params"]["num_threads"] = min(
self.lgb_params["default_params"]["num_threads"], cpu_cnt
)
self.reader_params["n_jobs"] = min(self.reader_params["n_jobs"], cpu_cnt)
[docs] def get_feature_pipeline(self, model, **kwargs):
"""Get LGBSeqSimpleFeatures pipeline if task is the time series prediction.
Args:
model: one from ["gbm", "linear_l2",, "rf", "nn"].
kwargs: Arbitrary keyword arguments.
Returns:
appropriate features pipeline.
"""
if self.is_time_series and model in ["gbm", "linear_l2", "rf", "nn"]:
return LGBSeqSimpleFeatures(fill_na=True, scaler=True, transformers_params=self.time_series_pipeline_params)
else:
if model == "nn":
return TorchSimpleFeatures(**self.nn_pipeline_params)
if model == "linear_l2":
return LinearFeatures(output_categories=True, **self.linear_pipeline_params)
if model == "gbm":
return LGBAdvancedPipeline(**self.gbm_pipeline_params, **kwargs)
if model == "rf":
if "fill_na" in kwargs:
return LGBAdvancedPipeline(**self.gbm_pipeline_params, **kwargs)
return LGBAdvancedPipeline(**self.gbm_pipeline_params, fill_na=True, **kwargs)
def get_time_score(self, n_level: int, model_type: str, nested: Optional[bool] = None):
if nested is None:
nested = self.general_params["nested_cv"]
score = self._time_scores[model_type]
mult = 1
if nested:
if self.nested_cv_params["n_folds"] is not None:
mult = self.nested_cv_params["n_folds"]
else:
mult = self.nested_cv_params["cv"]
if n_level > 1:
mult *= 0.8 if self.general_params["skip_conn"] else 0.1
score = score * mult
# lower score for catboost on gpu
if model_type in ["cb", "cb_tuned"] and self.cb_params["default_params"]["task_type"] == "GPU":
score *= 0.5
return score
def get_selector(self, n_level: Optional[int] = 1) -> SelectionPipeline:
selection_params = self.selection_params
# lgb_params
lgb_params = deepcopy(self.lgb_params)
lgb_params["default_params"] = {
**lgb_params["default_params"],
**{"feature_fraction": 1},
}
cb_params = deepcopy(self.cb_params)
cb_params["default_params"] = {
**cb_params["default_params"],
**{"rsm": 1},
}
mode = selection_params["mode"]
# create pre selection based on mode
pre_selector = None
if mode > 0:
# if we need selector - define model
# timer will be useful to estimate time for next gbm runs
selection_feats = LGBSimpleFeatures()
if (self.task.name == "multi:reg") or (self.task.name == "multilabel"):
time_score = self.get_time_score(n_level, "cb", False)
sel_timer_0 = self.timer.get_task_timer("cb", time_score)
selection_gbm = BoostCB(timer=sel_timer_0, **cb_params)
model_name = "cb"
else:
time_score = self.get_time_score(n_level, "lgb", False)
sel_timer_0 = self.timer.get_task_timer("lgb", time_score)
selection_gbm = BoostLGBM(timer=sel_timer_0, **lgb_params)
model_name = "lgb"
selection_gbm.set_prefix("Selector")
time_score = self.get_time_score(n_level, model_name, False)
sel_timer_0 = self.timer.get_task_timer(model_name, time_score)
if selection_params["importance_type"] == "permutation":
importance = NpPermutationImportanceEstimator()
else:
importance = ModelBasedImportanceEstimator()
pre_selector = ImportanceCutoffSelector(
selection_feats,
selection_gbm,
importance,
cutoff=selection_params["cutoff"],
fit_on_holdout=selection_params["fit_on_holdout"],
)
if mode == 2:
time_score = self.get_time_score(n_level, model_name, False)
sel_timer_1 = self.timer.get_task_timer(model_name, time_score)
selection_feats = LGBSimpleFeatures()
if (self.task.name == "multi:reg") or (self.task.name == "multilabel"):
selection_gbm = BoostCB(timer=sel_timer_1, **cb_params)
else:
selection_gbm = BoostLGBM(timer=sel_timer_1, **lgb_params)
selection_gbm.set_prefix("Selector")
# TODO: Check about reusing permutation importance
importance = NpPermutationImportanceEstimator()
extra_selector = NpIterativeFeatureSelector(
selection_feats,
selection_gbm,
importance,
feature_group_size=selection_params["feature_group_size"],
max_features_cnt_in_result=selection_params["max_features_cnt_in_result"],
)
pre_selector = ComposedSelector([pre_selector, extra_selector])
return pre_selector
def get_nn(
self, keys: Sequence[str], n_level: int = 1, pre_selector: Optional[SelectionPipeline] = None
) -> NestedTabularMLPipeline:
ml_algos = []
force_calc = []
nn_feats = self.get_feature_pipeline(model="nn")
general_nn_params = deepcopy(self.nn_params)
if "0" in self.nn_params:
for i in range(len(keys)):
if str(i) in general_nn_params:
del general_nn_params[str(i)]
for i, key in enumerate(keys):
time_score = self.get_time_score(n_level, "nn")
nn_timer = self.timer.get_task_timer("reg_nn", time_score)
model_params = deepcopy(general_nn_params)
model_params.update(self.nn_params.get(str(i), general_nn_params))
model_name = key
if isinstance(key, str):
tuned = "_tuned" in key
if key[:2] == "nn":
model_name = "mlp"
_name = "TorchNN_" + model_name + "_" + str(i)
model_name = model_name.replace("_tuned", "")
else:
tuned = model_params.get("tuned")
_name = "TorchNN_" + str(i)
model_params["model"] = model_name
nn_model = TorchModel(
timer=nn_timer,
default_params=model_params,
freeze_defaults=model_params["freeze_defaults"],
optimization_search_space=model_params.get("optimization_search_space", None),
)
nn_model._name = _name
if tuned:
nn_model.set_prefix("Tuned")
nn_tuner = DLOptunaTuner(
n_trials=model_params["tuning_params"]["max_tuning_iter"],
timeout=model_params["tuning_params"]["max_tuning_time"],
fit_on_holdout=model_params["tuning_params"]["fit_on_holdout"],
)
nn_model = (nn_model, nn_tuner)
ml_algos.append(nn_model)
force_calc.append(True if not len(ml_algos) - 1 else False)
nn_pipe = NestedTabularMLPipeline(
ml_algos, force_calc, pre_selection=None, features_pipeline=nn_feats, **self.nested_cv_params
)
return nn_pipe
def get_linear(self, n_level: int = 1, pre_selector: Optional[SelectionPipeline] = None) -> NestedTabularMLPipeline:
# linear model with l2
time_score = self.get_time_score(n_level, "linear_l2")
linear_l2_timer = self.timer.get_task_timer("reg_l2", time_score)
linear_l2_model = LinearLBFGS(timer=linear_l2_timer, **self.linear_l2_params)
linear_l2_feats = self.get_feature_pipeline(model="linear_l2")
linear_l2_pipe = NestedTabularMLPipeline(
[linear_l2_model],
force_calc=True,
pre_selection=pre_selector,
features_pipeline=linear_l2_feats,
**self.nested_cv_params
)
return linear_l2_pipe
def get_gbms(
self,
keys: Sequence[str],
n_level: int = 1,
pre_selector: Optional[SelectionPipeline] = None,
):
gbm_feats = self.get_feature_pipeline(model="gbm", feats_imp=pre_selector)
ml_algos = []
force_calc = []
for key, force in zip(keys, [True, False, False, False]):
tuned = "_tuned" in key
algo_key = key.split("_")[0]
time_score = self.get_time_score(n_level, key)
gbm_timer = self.timer.get_task_timer(algo_key, time_score)
if algo_key == "lgb":
gbm_model = BoostLGBM(timer=gbm_timer, **self.lgb_params)
elif algo_key == "cb":
gbm_model = BoostCB(timer=gbm_timer, **self.cb_params)
else:
raise ValueError("Wrong algo key")
if tuned:
gbm_model.set_prefix("Tuned")
gbm_tuner = OptunaTuner(
n_trials=self.tuning_params["max_tuning_iter"],
timeout=self.tuning_params["max_tuning_time"],
fit_on_holdout=self.tuning_params["fit_on_holdout"],
)
gbm_model = (gbm_model, gbm_tuner)
ml_algos.append(gbm_model)
force_calc.append(force)
gbm_pipe = NestedTabularMLPipeline(
ml_algos, force_calc, pre_selection=pre_selector, features_pipeline=gbm_feats, **self.nested_cv_params
)
return gbm_pipe
def get_rfs(self, keys: Sequence[str], n_level: int = 1, pre_selector: Optional[SelectionPipeline] = None):
rf_feats = self.get_feature_pipeline(model="rf", feats_imp=pre_selector, fill_na=True)
ml_algos = []
force_calc = []
for key, force in zip(keys, [True, False]):
tuned = "_tuned" in key
algo_key = key.split("_")[0]
time_score = self.get_time_score(n_level, key)
rf_timer = self.timer.get_task_timer(algo_key, time_score)
rf_model = RandomForestSklearn(timer=rf_timer, **self.rf_params)
if tuned:
rf_model.set_prefix("Tuned")
rf_tuner = OptunaTuner(
n_trials=self.tuning_params["max_tuning_iter"],
timeout=self.tuning_params["max_tuning_time"],
fit_on_holdout=self.tuning_params["fit_on_holdout"],
)
rf_model = (rf_model, rf_tuner)
ml_algos.append(rf_model)
force_calc.append(force)
rf_pipe = NestedTabularMLPipeline(
ml_algos, force_calc, pre_selection=pre_selector, features_pipeline=rf_feats, **self.nested_cv_params
)
return rf_pipe
[docs] def create_automl(self, **fit_args):
"""Create basic automl instance.
Args:
**fit_args: Contain all information needed for creating automl.
"""
train_data = fit_args["train_data"]
multilevel_avail = fit_args["valid_data"] is None and fit_args["cv_iter"] is None
if self.is_time_series:
self.infer_auto_params(train_data["seq"]["seq0"], multilevel_avail)
reader = DictToPandasSeqReader(task=self.task, **self.reader_params)
pre_selector = None
else:
self.infer_auto_params(train_data, multilevel_avail)
reader = PandasToPandasReader(task=self.task, **self.reader_params)
pre_selector = self.get_selector()
levels = []
for n, names in enumerate(self.general_params["use_algos"]):
lvl = []
# regs
rf_models = [x for x in ["rf", "rf_tuned"] if x in names]
if len(rf_models) > 0:
selector = None
if (
self.is_time_series
or "rf" in self.selection_params["select_algos"]
and (self.general_params["skip_conn"] or n == 0)
):
selector = pre_selector
lvl.append(self.get_rfs(rf_models, n + 1, selector))
if "linear_l2" in names:
selector = None
if (
self.is_time_series
or "linear_l2" in self.selection_params["select_algos"]
and (self.general_params["skip_conn"] or n == 0)
):
selector = pre_selector
lvl.append(self.get_linear(n + 1, selector))
gbm_models = [
x for x in ["lgb", "lgb_tuned", "cb", "cb_tuned"] if x in names and x.split("_")[0] in self.task.losses
]
if len(gbm_models) > 0:
selector = None
if (
self.is_time_series
or "gbm" in self.selection_params["select_algos"]
and (self.general_params["skip_conn"] or n == 0)
):
selector = pre_selector
lvl.append(self.get_gbms(gbm_models, n + 1, selector))
available_nn_models = [
"nn",
"mlp",
"dense",
"denselight",
"resnet",
"snn",
"linear_layer",
"_linear_layer",
"node",
]
available_nn_models = available_nn_models + [x + "_tuned" for x in available_nn_models]
nn_models = [
x for x in names if x in available_nn_models or (isinstance(x, type) and issubclass(x, nn.Module))
]
if len(nn_models) > 0:
selector = None
lvl.append(self.get_nn(nn_models, n + 1, selector))
if len(lvl) != 0:
levels.append(lvl)
# blend everything
blender = WeightedBlender(max_nonzero_coef=self.general_params["weighted_blender_max_nonzero_coef"])
# initialize
self._initialize(
reader,
levels,
skip_conn=self.general_params["skip_conn"],
blender=blender,
return_all_predictions=self.general_params["return_all_predictions"],
timer=self.timer,
debug=self.debug,
)
def _get_read_csv_params(self):
try:
cols_to_read = self.reader.used_features
numeric_dtypes = {
x: self.reader.roles[x].dtype for x in self.reader.roles if self.reader.roles[x].name == "Numeric"
}
except AttributeError:
cols_to_read = []
numeric_dtypes = {}
# cols_to_read is empty if reader is not fitted
if len(cols_to_read) == 0:
cols_to_read = None
read_csv_params = copy(self.read_csv_params)
read_csv_params = {
**read_csv_params,
**{"usecols": cols_to_read, "dtype": numeric_dtypes},
}
return read_csv_params
[docs] def fit_predict(
self,
train_data: ReadableToDf,
roles: Optional[dict] = None,
train_features: Optional[Sequence[str]] = None,
cv_iter: Optional[Iterable] = None,
valid_data: Optional[ReadableToDf] = None,
valid_features: Optional[Sequence[str]] = None,
log_file: str = None,
verbose: int = 0,
) -> NumpyDataset:
"""Fit and get prediction on validation dataset.
Almost same as :meth:`lightautoml.automl.base.AutoML.fit_predict`.
Additional features - working with different data formats.
Supported now:
- Path to ``.csv``, ``.parquet``, ``.feather`` files.
- :class:`~numpy.ndarray`, or dict of :class:`~numpy.ndarray`.
For example, ``{'data': X...}``. In this case,
roles are optional, but `train_features`
and `valid_features` required.
- :class:`pandas.DataFrame`.
Args:
train_data: Dataset to train.
roles: Roles dict.
train_features: Optional features names, if can't be inferred from `train_data`.
cv_iter: Custom cv-iterator. For example, :class:`~lightautoml.validation.np_iterators.TimeSeriesIterator`.
valid_data: Optional validation dataset.
valid_features: Optional validation dataset features if cannot be inferred from `valid_data`.
verbose: Controls the verbosity: the higher, the more messages.
<1 : messages are not displayed;
>=1 : the computation process for layers is displayed;
>=2 : the information about folds processing is also displayed;
>=3 : the hyperparameters optimization process is also displayed;
>=4 : the training process for every algorithm is displayed;
log_file: Filename for writing logging messages. If log_file is specified,
the messages will be saved in a the file. If the file exists, it will be overwritten.
Returns:
Dataset with predictions. Call ``.data`` to get predictions array.
"""
# roles may be none in case of train data is set {'data': np.ndarray, 'target': np.ndarray ...}
self.set_logfile(log_file)
if roles is None:
roles = {}
read_csv_params = self._get_read_csv_params()
if self.is_time_series:
train_data = train_data["seq"]["seq0"]
train, upd_roles = read_data(train_data, train_features, self.cpu_limit, read_csv_params)
if upd_roles:
roles = {**roles, **upd_roles}
if valid_data is not None:
data, _ = read_data(valid_data, valid_features, self.cpu_limit, self.read_csv_params)
if self.is_time_series:
train = {"seq": {"seq0": train}}
oof_pred = super().fit_predict(train, roles=roles, cv_iter=cv_iter, valid_data=valid_data, verbose=verbose)
return cast(NumpyDataset, oof_pred)
[docs] def predict(
self,
data: ReadableToDf,
features_names: Optional[Sequence[str]] = None,
batch_size: Optional[int] = None,
n_jobs: Optional[int] = 1,
return_all_predictions: Optional[bool] = None,
) -> NumpyDataset:
"""Get dataset with predictions.
Almost same as :meth:`lightautoml.automl.base.AutoML.predict`
on new dataset, with additional features.
Additional features - working with different data formats.
Supported now:
- Path to ``.csv``, ``.parquet``, ``.feather`` files.
- :class:`~numpy.ndarray`, or dict of :class:`~numpy.ndarray`. For example,
``{'data': X...}``. In this case roles are optional,
but `train_features` and `valid_features` required.
- :class:`pandas.DataFrame`.
Parallel inference - you can pass ``n_jobs`` to speedup
prediction (requires more RAM).
Batch_inference - you can pass ``batch_size``
to decrease RAM usage (may be longer).
Args:
data: Dataset to perform inference.
features_names: Optional features names,
if cannot be inferred from `train_data`.
batch_size: Batch size or ``None``.
n_jobs: Number of jobs.
return_all_predictions: if True,
returns all model predictions from last level
Returns:
Dataset with predictions.
"""
read_csv_params = self._get_read_csv_params()
if batch_size is None and n_jobs == 1:
if self.is_time_series:
data = data["seq"]["seq0"]
data, _ = read_data(data, features_names, self.cpu_limit, read_csv_params)
if self.is_time_series:
data = {"seq": {"seq0": data}}
pred = super().predict(data, features_names, return_all_predictions)
return cast(NumpyDataset, pred)
data_generator = read_batch(
data,
features_names,
n_jobs=n_jobs,
batch_size=batch_size,
read_csv_params=read_csv_params,
)
if n_jobs == 1:
res = [self.predict(df, features_names, return_all_predictions) for df in data_generator]
else:
# TODO: Check here for pre_dispatch param
with Parallel(n_jobs, pre_dispatch=len(data_generator) + 1) as p:
res = p(delayed(self.predict)(df, features_names, return_all_predictions) for df in data_generator)
res = NumpyDataset(
np.concatenate([x.data for x in res], axis=0),
features=res[0].features,
roles=res[0].roles,
)
return res
def get_feature_scores(
self,
calc_method: str = "fast",
data: Optional[ReadableToDf] = None,
features_names: Optional[Sequence[str]] = None,
silent: bool = True,
):
if calc_method == "fast":
for level in self.levels:
for pipe in level:
fi = pipe.pre_selection.get_features_score()
if fi is not None:
used_feats = set(self.collect_used_feats())
fi = fi.reset_index()
fi.columns = ["Feature", "Importance"]
return fi[fi["Feature"].map(lambda x: x in used_feats)]
else:
if not silent:
logger.info2("No feature importances to show. Please use another calculation method")
return None
if calc_method != "accurate":
if not silent:
logger.info2(
"Unknown calc_method. "
+ "Currently supported methods for feature importances calculation are 'fast' and 'accurate'."
)
return None
if data is None:
if not silent:
logger.info2("Data parameter is not setup for accurate calculation method. Aborting...")
return None
read_csv_params = self._get_read_csv_params()
data, _ = read_data(data, features_names, self.cpu_limit, read_csv_params)
used_feats = self.collect_used_feats()
fi = calc_feats_permutation_imps(
self,
used_feats,
data,
self.reader.target,
self.task.get_dataset_metric(),
silent=silent,
)
return fi
def get_individual_pdp(
self,
test_data: ReadableToDf,
feature_name: str,
n_bins: Optional[int] = 30,
top_n_categories: Optional[int] = 10,
datetime_level: Optional[str] = "year",
):
assert feature_name in self.reader._roles
assert datetime_level in ["year", "month", "dayofweek"]
test_i = test_data.copy()
# Numerical features
if self.reader._roles[feature_name].name == "Numeric":
counts, bin_edges = np.histogram(test_data[feature_name].dropna(), bins=n_bins)
grid = (bin_edges[:-1] + bin_edges[1:]) / 2
ys = []
for i in tqdm(grid):
test_i[feature_name] = i
preds = self.predict(test_i).data
ys.append(preds)
# Categorical features
if self.reader._roles[feature_name].name == "Category":
feature_cnt = test_data[feature_name].value_counts()
grid = list(feature_cnt.index.values[:top_n_categories])
counts = list(feature_cnt.values[:top_n_categories])
ys = []
for i in tqdm(grid):
test_i[feature_name] = i
preds = self.predict(test_i).data
ys.append(preds)
if len(feature_cnt) > top_n_categories:
freq_mapping = {feature_cnt.index[i]: i for i, _ in enumerate(feature_cnt)}
# add "OTHER" class
test_i = test_data.copy()
# sample from other classes with the same distribution
test_i[feature_name] = (
test_i[feature_name][np.array([freq_mapping[k] for k in test_i[feature_name]]) > top_n_categories]
.sample(n=test_data.shape[0], replace=True)
.values
)
preds = self.predict(test_i).data
grid.append("<OTHER>")
ys.append(preds)
counts.append(feature_cnt.values[top_n_categories:].sum())
# Datetime Features
if self.reader._roles[feature_name].name == "Datetime":
test_data_read = self.reader.read(test_data)
feature_datetime = pd.arrays.DatetimeArray(test_data_read._data[feature_name])
if datetime_level == "year":
grid = np.unique([i.year for i in feature_datetime])
elif datetime_level == "month":
grid = np.arange(1, 13)
else:
grid = np.arange(7)
ys = []
for i in tqdm(grid):
test_i[feature_name] = change_datetime(feature_datetime, datetime_level, i)
preds = self.predict(test_i).data
ys.append(preds)
counts = Counter([getattr(i, datetime_level) for i in feature_datetime])
counts = [counts[i] for i in grid]
return grid, ys, counts
def plot_pdp(
self,
test_data: ReadableToDf,
feature_name: str,
individual: Optional[bool] = False,
n_bins: Optional[int] = 30,
top_n_categories: Optional[int] = 10,
top_n_classes: Optional[int] = 10,
datetime_level: Optional[str] = "year",
):
grid, ys, counts = self.get_individual_pdp(
test_data=test_data,
feature_name=feature_name,
n_bins=n_bins,
top_n_categories=top_n_categories,
datetime_level=datetime_level,
)
plot_pdp_with_distribution(
test_data,
grid,
ys,
counts,
self.reader,
feature_name,
individual,
top_n_classes,
datetime_level,
)
[docs]class TabularUtilizedAutoML(TimeUtilization):
"""Template to make TimeUtilization from TabularAutoML.
Simplifies using ``TimeUtilization`` module for ``TabularAutoMLPreset``.
Args:
task: Task to solve.
timeout: Timeout in seconds.
memory_limit: Memory limit that are passed to each automl.
cpu_limit: CPU limit that that are passed to each automl.
gpu_ids: GPU IDs that are passed to each automl.
timing_params: Timing params level that are passed to each automl.
configs_list: List of str path to configs files.
drop_last: Usually last automl will be stopped with timeout.
Flag that defines if we should drop it from ensemble.
return_all_predictions: skip blending phase
max_runs_per_config: Maximum number of multistart loops.
random_state: Initial random seed that will be set
in case of search in config.
"""
def __init__(
self,
task: Task,
timeout: int = 3600,
memory_limit: int = 16,
cpu_limit: int = 4,
gpu_ids: Optional[str] = None,
timing_params: Optional[dict] = None,
configs_list: Optional[Sequence[str]] = None,
drop_last: bool = True,
return_all_predictions: bool = False,
max_runs_per_config: int = 5,
random_state: int = 42,
outer_blender_max_nonzero_coef: float = 0.05,
**kwargs
):
if configs_list is None:
configs_list = [
os.path.join(_base_dir, "tabular_configs", x)
for x in [
"conf_0_sel_type_0.yml",
"conf_1_sel_type_1.yml",
"conf_2_select_mode_1_no_typ.yml",
"conf_3_sel_type_1_no_inter_lgbm.yml",
"conf_4_sel_type_0_no_int.yml",
"conf_5_sel_type_1_tuning_full.yml",
"conf_6_sel_type_1_tuning_full_no_int_lgbm.yml",
]
]
inner_blend = MeanBlender()
outer_blend = WeightedBlender(max_nonzero_coef=outer_blender_max_nonzero_coef)
super().__init__(
TabularAutoML,
task,
timeout,
memory_limit,
cpu_limit,
gpu_ids,
timing_params,
configs_list,
inner_blend,
outer_blend,
drop_last,
return_all_predictions,
max_runs_per_config,
None,
random_state,
**kwargs
)
def get_feature_scores(
self,
calc_method: str = "fast",
data: Optional[ReadableToDf] = None,
features_names: Optional[Sequence[str]] = None,
silent: bool = True,
):
if calc_method == "fast":
feat_imps = []
for pipe in self.outer_pipes:
for model in pipe.ml_algos:
fi = model.models[0][0].get_feature_scores("fast")
if fi is not None:
feat_imps.append(fi)
n_feat_imps = len(feat_imps)
if n_feat_imps == 0:
if not silent:
logger.info2("No feature importances to show. Please use another calculation method")
return None
return (
pd.concat(feat_imps).groupby("Feature")["Importance"].agg(sum).sort_values(ascending=False)
/ n_feat_imps
).reset_index()
if calc_method != "accurate":
if not silent:
logger.info2(
"Unknown calc_method. "
+ "Currently supported methods for feature importances calculation are 'fast' and 'accurate'."
)
return None
if data is None:
if not silent:
logger.info2("Data parameter is not setup for accurate calculation method. Aborting...")
return None
automl = self.outer_pipes[0].ml_algos[0].models[0][0]
read_csv_params = automl._get_read_csv_params()
data, _ = read_data(data, features_names, self.cpu_limit, read_csv_params)
used_feats = set()
for pipe in self.outer_pipes:
used_feats.update(pipe.ml_algos[0].models[0][0].collect_used_feats())
fi = calc_feats_permutation_imps(
self,
list(used_feats),
data,
automl.reader.target,
automl.task.get_dataset_metric(),
silent=silent,
)
return fi
def create_model_str_desc(self, pref_tab_num: int = 0, split_line_len: int = 80) -> str:
res = "Final prediction for new objects = \n"
for it, (model, weight) in enumerate(zip(self.outer_pipes, self.outer_blend.wts)):
config_path = model.ml_algos[0].models[0][0].config_path.split("/")[-1]
res += "\t" * (pref_tab_num + 1) + "+ " * (it > 0)
res += '{:.5f} * {} averaged models with config = "{}" and different CV random_states. Their structures: \n\n'.format(
weight, len(model.ml_algos[0].models[0]), config_path
)
for it1, m in enumerate(model.ml_algos[0].models[0]):
cur_model_desc = m.create_model_str_desc(pref_tab_num + 2, split_line_len)
res += "\t" * (pref_tab_num + 1) + " Model #{}.\n{}\n\n".format(it1, cur_model_desc)
return res
def get_individual_pdp(
self,
test_data: ReadableToDf,
feature_name: str,
n_bins: Optional[int] = 30,
top_n_categories: Optional[int] = 10,
datetime_level: Optional[str] = "year",
):
reader = self.outer_pipes[0].ml_algos[0].models[0][0].reader
assert feature_name in reader._roles
assert datetime_level in ["year", "month", "dayofweek"]
test_i = test_data.copy()
# Numerical features
if reader._roles[feature_name].name == "Numeric":
counts, bin_edges = np.histogram(test_data[feature_name].dropna(), bins=n_bins)
grid = (bin_edges[:-1] + bin_edges[1:]) / 2
ys = []
for i in tqdm(grid):
test_i[feature_name] = i
preds = self.predict(test_i).data
ys.append(preds)
# Categorical features
if reader._roles[feature_name].name == "Category":
feature_cnt = test_data[feature_name].value_counts()
grid = list(feature_cnt.index.values[:top_n_categories])
counts = list(feature_cnt.values[:top_n_categories])
ys = []
for i in tqdm(grid):
test_i[feature_name] = i
preds = self.predict(test_i).data
ys.append(preds)
if len(feature_cnt) > top_n_categories:
freq_mapping = {feature_cnt.index[i]: i for i, _ in enumerate(feature_cnt)}
# add "OTHER" class
test_i = test_data.copy()
# sample from other classes with the same distribution
test_i[feature_name] = (
test_i[feature_name][np.array([freq_mapping[k] for k in test_i[feature_name]]) > top_n_categories]
.sample(n=test_data.shape[0], replace=True)
.values
)
preds = self.predict(test_i).data
grid.append("<OTHER>")
ys.append(preds)
counts.append(feature_cnt.values[top_n_categories:].sum())
# Datetime Features
if reader._roles[feature_name].name == "Datetime":
test_data_read = reader.read(test_data)
feature_datetime = pd.arrays.DatetimeArray(test_data_read._data[feature_name])
if datetime_level == "year":
grid = np.unique([i.year for i in feature_datetime])
elif datetime_level == "month":
grid = np.arange(1, 13)
else:
grid = np.arange(7)
ys = []
for i in tqdm(grid):
test_i[feature_name] = change_datetime(feature_datetime, datetime_level, i)
preds = self.predict(test_i).data
ys.append(preds)
counts = Counter([getattr(i, datetime_level) for i in feature_datetime])
counts = [counts[i] for i in grid]
return grid, ys, counts
def plot_pdp(
self,
test_data: ReadableToDf,
feature_name: str,
individual: Optional[bool] = False,
n_bins: Optional[int] = 30,
top_n_categories: Optional[int] = 10,
top_n_classes: Optional[int] = 10,
datetime_level: Optional[str] = "year",
):
reader = self.outer_pipes[0].ml_algos[0].models[0][0].reader
grid, ys, counts = self.get_individual_pdp(
test_data=test_data,
feature_name=feature_name,
n_bins=n_bins,
top_n_categories=top_n_categories,
datetime_level=datetime_level,
)
plot_pdp_with_distribution(
test_data,
grid,
ys,
counts,
reader,
feature_name,
individual,
top_n_classes,
datetime_level,
)