Source code for lightautoml.transformers.categorical

"""Categorical features transformerrs."""

from itertools import combinations
from typing import List
from typing import Optional
from typing import Sequence
from typing import Union
from typing import cast

import numpy as np

from pandas import DataFrame
from pandas import Series
from sklearn.preprocessing import OneHotEncoder
from sklearn.utils.murmurhash import murmurhash3_32

from ..dataset.base import LAMLDataset
from ..dataset.np_pd_dataset import CSRSparseDataset
from ..dataset.np_pd_dataset import NumpyDataset
from ..dataset.np_pd_dataset import PandasDataset
from ..dataset.roles import CategoryRole
from ..dataset.roles import NumericRole
from .base import LAMLTransformer


# type - something that can be convered to pandas dataset
NumpyOrPandas = Union[NumpyDataset, PandasDataset]
NumpyOrSparse = Union[NumpyDataset, CSRSparseDataset]


def categorical_check(dataset: LAMLDataset):
    """Check if all passed vars are categories.

    Raises AssertionError if non-categorical features are present.

    Args:
        dataset: LAMLDataset to check.

    """
    roles = dataset.roles
    features = dataset.features
    for f in features:
        assert roles[f].name == "Category", "Only categories accepted in this transformer"


def oof_task_check(dataset: LAMLDataset):
    """Check if all passed vars are categories.

    Args:
        dataset: Input.

    """
    task = dataset.task
    assert task.name in [
        "binary",
        "reg",
    ], "Only binary and regression tasks supported in this transformer"


def multiclass_task_check(dataset: LAMLDataset):
    """Check if all passed vars are categories.

    Args:
        dataset: Input.

    """
    task = dataset.task
    assert task.name in ["multiclass"], "Only multiclass tasks supported in this transformer"


def encoding_check(dataset: LAMLDataset):
    """Check if all passed vars are categories.

    Args:
        dataset: Input.

    """
    roles = dataset.roles
    features = dataset.features
    for f in features:
        assert roles[
            f
        ].label_encoded, "Transformer should be applied to category only after label encoding. Feat {0} is {1}".format(
            f, roles[f]
        )


[docs]class LabelEncoder(LAMLTransformer): """Simple LabelEncoder in order of frequency. Labels are integers from 1 to n. Unknown category encoded as 0. NaN is handled as a category value. Args: subs: Subsample to calculate freqs. If None - full data. random_state: Random state to take subsample. """ _fit_checks = (categorical_check,) _transform_checks = () _fname_prefix = "le" # _output_role = CategoryRole(np.int32, label_encoded=True) _fillna_val = 0 def __init__(self, subs: Optional[int] = None, random_state: int = 42): self.subs = subs self.random_state = random_state self._output_role = CategoryRole(np.int32, label_encoded=True) def _get_df(self, dataset: NumpyOrPandas) -> DataFrame: """Get df and sample. Args: dataset: Input dataset. Returns: Subsample. """ dataset = dataset.to_pandas() df = dataset.data if self.subs is not None and df.shape[0] >= self.subs: subs = df.sample(n=self.subs, random_state=self.random_state) else: subs = df return subs
[docs] def fit(self, dataset: NumpyOrPandas): """Estimate label frequencies and create encoding dicts. Args: dataset: Pandas or Numpy dataset of categorical features. Returns: self. """ # set transformer names and add checks super().fit(dataset) # set transformer features # convert to accepted dtype and get attributes roles = dataset.roles subs = self._get_df(dataset) self.dicts = {} for i in subs.columns: role = roles[i] # TODO: think what to do with this warning co = role.unknown cnts = ( subs[i] .value_counts(dropna=False) .reset_index() .sort_values([i, "index"], ascending=[False, True]) .set_index("index") ) vals = cnts[cnts[i] > co].index.values self.dicts[i] = Series(np.arange(vals.shape[0], dtype=np.int32) + 1, index=vals) return self
[docs] def transform(self, dataset: NumpyOrPandas) -> NumpyDataset: """Transform categorical dataset to int labels. Args: dataset: Pandas or Numpy dataset of categorical features. Returns: Numpy dataset with encoded labels. """ # checks here super().transform(dataset) # convert to accepted dtype and get attributes dataset = dataset.to_pandas() df = dataset.data # transform new_arr = np.empty(dataset.shape, dtype=self._output_role.dtype) for n, i in enumerate(df.columns): # to be compatible with OrdinalEncoder if i in self.dicts: new_arr[:, n] = df[i].map(self.dicts[i]).fillna(self._fillna_val).values else: new_arr[:, n] = df[i].values.astype(self._output_role.dtype) # create resulted output = dataset.empty().to_numpy() output.set_data(new_arr, self.features, self._output_role) return output
[docs]class OHEEncoder(LAMLTransformer): """Simple OneHotEncoder over label encoded categories. Args: make_sparse: Create sparse matrix. total_feats_cnt: Initial features number. dtype: Dtype of new features. """ _fit_checks = (categorical_check, encoding_check) _transform_checks = () _fname_prefix = "ohe" @property def features(self) -> List[str]: """Features list.""" return self._features def __init__( self, make_sparse: Optional[bool] = None, total_feats_cnt: Optional[int] = None, dtype: type = np.float32, ): self.make_sparse = make_sparse self.total_feats_cnt = total_feats_cnt self.dtype = dtype if self.make_sparse is None: assert self.total_feats_cnt is not None, "Param total_feats_cnt should be defined if make_sparse is None"
[docs] def fit(self, dataset: NumpyOrPandas): """Calc output shapes. Automatically do ohe in sparse form if approximate fill_rate < `0.2`. Args: dataset: Pandas or Numpy dataset of categorical features. Returns: self. """ # set transformer names and add checks for check_func in self._fit_checks: check_func(dataset) # set transformer features # convert to accepted dtype and get attributes dataset = dataset.to_numpy() data = dataset.data max_idx = data.max(axis=0) min_idx = data.min(axis=0) # infer make sparse if self.make_sparse is None: fill_rate = self.total_feats_cnt / (self.total_feats_cnt - max_idx.shape[0] + max_idx.sum()) self.make_sparse = fill_rate < 0.2 # create ohe self.ohe = OneHotEncoder( categories=[np.arange(x, y + 1, dtype=np.int32) for (x, y) in zip(min_idx, max_idx)], # drop=np.ones(max_idx.shape[0], dtype=np.int32), dtype=self.dtype, sparse=self.make_sparse, handle_unknown="ignore", ) self.ohe.fit(data) features = [] for cats, name in zip(self.ohe.categories_, dataset.features): # cats = cats[cats != 1] features.extend(["ohe_{0}__{1}".format(x, name) for x in cats]) self._features = features return self
[docs] def transform(self, dataset: NumpyOrPandas) -> NumpyOrSparse: """Transform categorical dataset to ohe. Args: dataset: Pandas or Numpy dataset of categorical features. Returns: Numpy dataset with encoded labels. """ # checks here super().transform(dataset) # convert to accepted dtype and get attributes dataset = dataset.to_numpy() data = dataset.data # transform data = self.ohe.transform(data) # create resulted output = dataset.empty() if self.make_sparse: output = output.to_csr() output.set_data(data, self.features, NumericRole(self.dtype)) return output
[docs]class FreqEncoder(LabelEncoder): """Labels are encoded with frequency in train data. Labels are integers from 1 to n. Unknown category encoded as 1. """ _fit_checks = (categorical_check,) _transform_checks = () _fname_prefix = "freq" # _output_role = NumericRole(np.float32) _fillna_val = 1 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._output_role = NumericRole(np.float32)
[docs] def fit(self, dataset: NumpyOrPandas): """Estimate label frequencies and create encoding dicts. Args: dataset: Pandas or Numpy dataset of categorical features Returns: self. """ # set transformer names and add checks LAMLTransformer.fit(self, dataset) # set transformer features # convert to accepted dtype and get attributes dataset = dataset.to_pandas() df = dataset.data self.dicts = {} for i in df.columns: # we make assertion in checks, so cast is ok # TODO: think what to do with this warning cnts = df[i].value_counts(dropna=False) self.dicts[i] = cnts[cnts > 1] return self
[docs]class TargetEncoder(LAMLTransformer): """Out-of-fold target encoding. Limitation: - Required .folds attribute in dataset - array of int from 0 to n_folds-1. - Working only after label encoding. Args: alphas: Smooth coefficients. """ _fit_checks = (categorical_check, oof_task_check, encoding_check) _transform_checks = () _fname_prefix = "oof" def __init__(self, alphas: Sequence[float] = (0.5, 1.0, 2.0, 5.0, 10.0, 50.0, 250.0, 1000.0)): self.alphas = alphas
[docs] @staticmethod def binary_score_func(candidates: np.ndarray, target: np.ndarray) -> int: """Score candidates alpha with logloss metric. Args: candidates: Candidate oof encoders. target: Target array. Returns: Index of best encoder. """ target = target[:, np.newaxis] scores = -(target * np.log(candidates) + (1 - target) * np.log(1 - candidates)).mean(axis=0) idx = scores.argmin() return idx
[docs] @staticmethod def reg_score_func(candidates: np.ndarray, target: np.ndarray) -> int: """Score candidates alpha with mse metric. Args: candidates: Candidate oof encoders. target: Target array. Returns: Index of best encoder. """ target = target[:, np.newaxis] scores = ((target - candidates) ** 2).mean(axis=0) idx = scores.argmin() return idx
[docs] def fit(self, dataset: NumpyOrPandas): """Fit encoder.""" super().fit_transform(dataset)
[docs] def fit_transform(self, dataset: NumpyOrPandas) -> NumpyDataset: """Calc oof encoding and save encoding stats for new data. Args: dataset: Pandas or Numpy dataset of categorical label encoded features. Returns: NumpyDataset - target encoded features. """ # set transformer names and add checks super().fit(dataset) # set transformer features # convert to accepted dtype and get attributes dataset = dataset.to_numpy() data = dataset.data target = dataset.target.astype(np.int32) score_func = self.binary_score_func if dataset.task.name == "binary" else self.reg_score_func folds = dataset.folds n_folds = folds.max() + 1 alphas = np.array(self.alphas)[np.newaxis, :] self.encodings = [] prior = target.mean() # folds priors f_sum = np.zeros(n_folds, dtype=np.float64) f_count = np.zeros(n_folds, dtype=np.float64) np.add.at(f_sum, folds, target) np.add.at(f_count, folds, 1) folds_prior = (f_sum.sum() - f_sum) / (f_count.sum() - f_count) oof_feats = np.zeros(data.shape, dtype=np.float32) for n in range(data.shape[1]): vec = data[:, n] # calc folds stats enc_dim = vec.max() + 1 f_sum = np.zeros((enc_dim, n_folds), dtype=np.float64) f_count = np.zeros((enc_dim, n_folds), dtype=np.float64) np.add.at(f_sum, (vec, folds), target) np.add.at(f_count, (vec, folds), 1) # calc total stats t_sum = f_sum.sum(axis=1, keepdims=True) t_count = f_count.sum(axis=1, keepdims=True) # calc oof stats oof_sum = t_sum - f_sum oof_count = t_count - f_count # calc candidates alpha candidates = ( (oof_sum[vec, folds, np.newaxis] + alphas * folds_prior[folds, np.newaxis]) / (oof_count[vec, folds, np.newaxis] + alphas) ).astype(np.float32) idx = score_func(candidates, target) # write best alpha oof_feats[:, n] = candidates[:, idx] # calc best encoding enc = ((t_sum[:, 0] + alphas[0, idx] * prior) / (t_count[:, 0] + alphas[0, idx])).astype(np.float32) self.encodings.append(enc) output = dataset.empty() self.output_role = NumericRole(np.float32, prob=output.task.name == "binary") output.set_data(oof_feats, self.features, self.output_role) return output
[docs] def transform(self, dataset: NumpyOrPandas) -> NumpyOrSparse: """Transform categorical dataset to target encoding. Args: dataset: Pandas or Numpy dataset of categorical features. Returns: Numpy dataset with encoded labels. """ # checks here super().transform(dataset) # convert to accepted dtype and get attributes dataset = dataset.to_numpy() data = dataset.data # transform out = np.zeros(data.shape, dtype=np.float32) for n, enc in enumerate(self.encodings): out[:, n] = enc[data[:, n]] # create resulted output = dataset.empty() output.set_data(out, self.features, self.output_role) return output
[docs]class MultiClassTargetEncoder(LAMLTransformer): """Out-of-fold target encoding for multiclass task. Limitation: - Required .folds attribute in dataset - array of int from 0 to n_folds-1. - Working only after label encoding """ _fit_checks = (categorical_check, multiclass_task_check, encoding_check) _transform_checks = () _fname_prefix = "multioof" @property def features(self) -> List[str]: """List of features.""" return self._features def __init__(self, alphas: Sequence[float] = (0.5, 1.0, 2.0, 5.0, 10.0, 50.0, 250.0, 1000.0)): self.alphas = alphas
[docs] @staticmethod def score_func(candidates: np.ndarray, target: np.ndarray) -> int: """Choose the best encoder. Args: candidates: np.ndarray. target: np.ndarray. Returns: index of best encoder. """ target = target[:, np.newaxis, np.newaxis] scores = -np.log(np.take_along_axis(candidates, target, axis=1)).mean(axis=0)[0] idx = scores.argmin() return idx
[docs] def fit_transform(self, dataset: NumpyOrPandas) -> NumpyDataset: """Estimate label frequencies and create encoding dicts. Args: dataset: Pandas or Numpy dataset of categorical label encoded features. Returns: NumpyDataset - target encoded features. """ # set transformer names and add checks for check_func in self._fit_checks: check_func(dataset) # set transformer features # convert to accepted dtype and get attributes dataset = dataset.to_numpy() data = dataset.data target = dataset.target.astype(np.int32) n_classes = target.max() + 1 self.n_classes = n_classes folds = dataset.folds n_folds = folds.max() + 1 alphas = np.array(self.alphas)[np.newaxis, np.newaxis, :] self.encodings = [] # prior prior = cast(np.ndarray, np.arange(n_classes)[:, np.newaxis] == target).mean(axis=1) # folds prior f_sum = np.zeros((n_classes, n_folds), dtype=np.float64) f_count = np.zeros((1, n_folds), dtype=np.float64) np.add.at(f_sum, (target, folds), 1) np.add.at(f_count, (0, folds), 1) # N_classes x N_folds folds_prior = ((f_sum.sum(axis=1, keepdims=True) - f_sum) / (f_count.sum(axis=1, keepdims=True) - f_count)).T oof_feats = np.zeros(data.shape + (n_classes,), dtype=np.float32) self._features = [] for i in dataset.features: for j in range(n_classes): self._features.append("{0}_{1}__{2}".format("multioof", j, i)) for n in range(data.shape[1]): vec = data[:, n] # calc folds stats enc_dim = vec.max() + 1 f_sum = np.zeros((enc_dim, n_classes, n_folds), dtype=np.float64) f_count = np.zeros((enc_dim, 1, n_folds), dtype=np.float64) np.add.at(f_sum, (vec, target, folds), 1) np.add.at(f_count, (vec, 0, folds), 1) # calc total stats t_sum = f_sum.sum(axis=2, keepdims=True) t_count = f_count.sum(axis=2, keepdims=True) # calc oof stats oof_sum = t_sum - f_sum oof_count = t_count - f_count # (N x N_classes x 1 + 1 x 1 x N_alphas * N x N_classes x 1) / (N x 1 x 1 + N x 1 x 1) -> N x N_classes x N_alphas candidates = ( (oof_sum[vec, :, folds, np.newaxis] + alphas * folds_prior[folds, :, np.newaxis]) / (oof_count[vec, :, folds, np.newaxis] + alphas) ).astype(np.float32) # norm over 1 axis candidates /= candidates.sum(axis=1, keepdims=True) idx = self.score_func(candidates, target) oof_feats[:, n] = candidates[..., idx] enc = ((t_sum[..., 0] + alphas[0, 0, idx] * prior) / (t_count[..., 0] + alphas[0, 0, idx])).astype( np.float32 ) enc /= enc.sum(axis=1, keepdims=True) self.encodings.append(enc) output = dataset.empty() output.set_data( oof_feats.reshape((data.shape[0], -1)), self.features, NumericRole(np.float32, prob=True), ) return output
[docs] def transform(self, dataset: NumpyOrPandas) -> NumpyOrSparse: """Transform categorical dataset to target encoding. Args: dataset: Pandas or Numpy dataset of categorical features. Returns: Numpy dataset with encoded labels. """ # checks here super().transform(dataset) # convert to accepted dtype and get attributes dataset = dataset.to_numpy() data = dataset.data # transform out = np.zeros(data.shape + (self.n_classes,), dtype=np.float32) for n, enc in enumerate(self.encodings): out[:, n] = enc[data[:, n]] out = out.reshape((data.shape[0], -1)) # create resulted output = dataset.empty() output.set_data(out, self.features, NumericRole(np.float32, prob=True)) return output
[docs]class CatIntersectstions(LabelEncoder): """Build label encoded intertsections of categorical variables. Args: intersections: Columns to create intersections. Default is None - all. max_depth: Max intersection depth. """ _fit_checks = (categorical_check,) _transform_checks = () _fname_prefix = "inter" def __init__( self, subs: Optional[int] = None, random_state: int = 42, intersections: Optional[Sequence[Sequence[str]]] = None, max_depth: int = 2, ): super().__init__(subs, random_state) self.intersections = intersections self.max_depth = max_depth @staticmethod def _make_category(df: DataFrame, cols: Sequence[str]) -> np.ndarray: """Make hash for category interactions. Args: df: Input DataFrame cols: List of columns Returns: Hash np.ndarray. """ res = np.empty((df.shape[0],), dtype=np.int32) for n, inter in enumerate(zip(*(df[x] for x in cols))): h = murmurhash3_32("_".join(map(str, inter)), seed=42) res[n] = h return res def _build_df(self, dataset: NumpyOrPandas) -> PandasDataset: """Perform encoding. Args: dataset: Pandas or Numpy dataset of categorical features. Returns: Dataset. """ dataset = dataset.to_pandas() df = dataset.data roles = {} new_df = DataFrame(index=df.index) for comb in self.intersections: name = "({0})".format("__".join(comb)) new_df[name] = self._make_category(df, comb) roles[name] = CategoryRole( object, unknown=max((dataset.roles[x].unknown for x in comb)), label_encoded=True, ) output = dataset.empty() output.set_data(new_df, new_df.columns, roles) return output
[docs] def fit(self, dataset: NumpyOrPandas): """Create label encoded intersections and save mapping. Args: dataset: Pandas or Numpy dataset of categorical features. Returns: self. """ # set transformer names and add checks for check_func in self._fit_checks: check_func(dataset) if self.intersections is None: self.intersections = [] for i in range(2, min(self.max_depth, len(dataset.features)) + 1): self.intersections.extend(list(combinations(dataset.features, i))) inter_dataset = self._build_df(dataset) return super().fit(inter_dataset)
[docs] def transform(self, dataset: NumpyOrPandas) -> NumpyDataset: """Create label encoded intersections and apply mapping. Args: dataset: Pandas or Numpy dataset of categorical features Returns: Transformed dataset. """ inter_dataset = self._build_df(dataset) return super().transform(inter_dataset)
[docs]class OrdinalEncoder(LabelEncoder): """Encoding ordinal categories into numbers. Number type categories passed as is, object type sorted in ascending lexicographical order. """ _fit_checks = (categorical_check,) _transform_checks = () _fname_prefix = "ord" # _output_role = NumericRole(np.float32) _fillna_val = np.nan def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._output_role = NumericRole(np.float32)
[docs] def fit(self, dataset: NumpyOrPandas): """Estimate label frequencies and create encoding dicts. Args: dataset: Pandas or Numpy dataset of categorical features. Returns: Self. """ # set transformer names and add checks LAMLTransformer.fit(self, dataset) # set transformer features # convert to accepted dtype and get attributes roles = dataset.roles subs = self._get_df(dataset) self.dicts = {} for i in subs.columns: role = roles[i] try: flg_number = np.issubdtype(role.dtype, np.number) except TypeError: flg_number = False if not flg_number: co = role.unknown cnts = subs[i].value_counts(dropna=True) cnts = cnts[cnts > co].reset_index() cnts = Series(cnts["index"].astype(str).rank().values, index=cnts["index"].values) cnts = cnts.append(Series([cnts.shape[0] + 1], index=[np.nan])) self.dicts[i] = cnts return self