Source code for lightautoml.report.report_deco

"""Classes for report generation and add-ons."""

import logging
import os
import warnings

from copy import copy
from copy import deepcopy

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns

from jinja2 import Environment
from jinja2 import FileSystemLoader
from json2html import json2html
from sklearn.metrics import average_precision_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import explained_variance_score
from sklearn.metrics import f1_score
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error
from sklearn.metrics import median_absolute_error
from sklearn.metrics import precision_recall_curve
from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import precision_score
from sklearn.metrics import r2_score
from sklearn.metrics import recall_score
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
from sklearn.utils.multiclass import type_of_target

from lightautoml.addons.uplift import metrics as uplift_metrics
from lightautoml.addons.uplift.metalearners import TLearner
from lightautoml.addons.uplift.metalearners import XLearner
from lightautoml.addons.uplift.utils import _get_treatment_role
from lightautoml.dataset import roles as laml_roles


logger = logging.getLogger(__name__)

base_dir = os.path.dirname(__file__)


def extract_params(input_struct):
    params = dict()
    iterator = input_struct if isinstance(input_struct, dict) else input_struct.__dict__
    for key in iterator:
        if key.startswith(("_", "autonlp_params")):
            continue
        value = iterator[key]
        if type(value) in [bool, int, float, str]:
            params[key] = value
        elif value is None:
            params[key] = None
        elif hasattr(value, "__dict__") or isinstance(value, dict):
            params[key] = extract_params(value)
        else:
            params[key] = str(type(value))
    return params


def plot_roc_curve_image(data, path):
    sns.set(style="whitegrid", font_scale=1.5)
    plt.figure(figsize=(10, 10))

    fpr, tpr, _ = roc_curve(data["y_true"], data["y_pred"])
    auc_score = roc_auc_score(data["y_true"], data["y_pred"])

    lw = 2
    plt.plot(fpr, tpr, color="blue", lw=lw, label="Trained model")
    plt.plot([0, 1], [0, 1], color="red", lw=lw, linestyle="--", label="Random model")
    plt.xlim([-0.05, 1.05])
    plt.ylim([-0.05, 1.05])
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    lgd = plt.legend(bbox_to_anchor=(0.5, -0.15), loc="upper center", ncol=2)
    plt.xticks(np.arange(0, 1.01, 0.05), rotation=45)
    plt.yticks(np.arange(0, 1.01, 0.05))
    plt.grid(color="gray", linestyle="-", linewidth=1)
    plt.title("ROC curve (GINI = {:.3f})".format(2 * auc_score - 1))
    plt.savefig(path, bbox_extra_artists=(lgd,), bbox_inches="tight")
    plt.close()
    return auc_score


def plot_pr_curve_image(data, path):
    sns.set(style="whitegrid", font_scale=1.5)
    plt.figure(figsize=(10, 10))

    precision, recall, _ = precision_recall_curve(data["y_true"], data["y_pred"])
    ap_score = average_precision_score(data["y_true"], data["y_pred"])

    lw = 2
    plt.plot(recall, precision, color="blue", lw=lw, label="Trained model")
    positive_rate = np.sum(data["y_true"] == 1) / data.shape[0]
    plt.plot(
        [0, 1],
        [positive_rate, positive_rate],
        color="red",
        lw=lw,
        linestyle="--",
        label="Random model",
    )
    plt.xlim([-0.05, 1.05])
    plt.ylim([0.45, 1.05])
    plt.xlabel("Recall")
    plt.ylabel("Precision")
    lgd = plt.legend(bbox_to_anchor=(0.5, -0.15), loc="upper center", ncol=2)
    plt.xticks(np.arange(0, 1.01, 0.05), rotation=45)
    plt.yticks(np.arange(0, 1.01, 0.05))
    plt.grid(color="gray", linestyle="-", linewidth=1)
    plt.title("PR curve (AP = {:.3f})".format(ap_score))
    plt.savefig(path, bbox_extra_artists=(lgd,), bbox_inches="tight")
    plt.close()


def plot_preds_distribution_by_bins(data, path):
    sns.set(style="whitegrid", font_scale=1.5)
    fig, axs = plt.subplots(figsize=(16, 10))

    box_plot_data = []
    labels = []
    for name, group in data.groupby("bin"):
        labels.append(name)
        box_plot_data.append(group["y_pred"].values)

    box = axs.boxplot(box_plot_data, patch_artist=True, labels=labels)
    for patch in box["boxes"]:
        patch.set_facecolor("green")
    axs.set_yscale("log")
    axs.set_xlabel("Bin number")
    axs.set_ylabel("Prediction")
    axs.set_title("Distribution of object predictions by bin")

    fig.savefig(path, bbox_inches="tight")
    plt.close()


def plot_distribution_of_logits(data, path):
    sns.set(style="whitegrid", font_scale=1.5)
    fig, axs = plt.subplots(figsize=(16, 10))

    data["proba_logit"] = np.log(data["y_pred"].values / (1 - data["y_pred"].values))
    sns.kdeplot(
        data[data["y_true"] == 0]["proba_logit"],
        shade=True,
        color="r",
        label="Class 0 logits",
        ax=axs,
    )
    sns.kdeplot(
        data[data["y_true"] == 1]["proba_logit"],
        shade=True,
        color="g",
        label="Class 1 logits",
        ax=axs,
    )
    axs.set_xlabel("Logits")
    axs.set_ylabel("Density")
    axs.set_title("Logits distribution of object predictions (by classes)")
    fig.savefig(path, bbox_inches="tight")
    plt.close()


def plot_pie_f1_metric(data, F1_thresh, path):
    tn, fp, fn, tp = confusion_matrix(data["y_true"], (data["y_pred"] > F1_thresh).astype(int)).ravel()
    (_, prec), (_, rec), (_, F1), (_, _) = precision_recall_fscore_support(
        data["y_true"], (data["y_pred"] > F1_thresh).astype(int)
    )

    sns.set(style="whitegrid", font_scale=1.5)
    fig, ax = plt.subplots(figsize=(20, 10), subplot_kw=dict(aspect="equal"))

    recipe = [
        "{} True Positives".format(tp),
        "{} False Positives".format(fp),
        "{} False Negatives".format(fn),
        "{} True Negatives".format(tn),
    ]

    wedges, texts = ax.pie([tp, fp, fn, tn], wedgeprops=dict(width=0.5), startangle=-40)

    bbox_props = dict(boxstyle="square,pad=0.3", fc="w", ec="k", lw=0.72)
    kw = dict(
        arrowprops=dict(arrowstyle="-", color="k"),
        bbox=bbox_props,
        zorder=0,
        va="center",
    )

    for i, p in enumerate(wedges):
        ang = (p.theta2 - p.theta1) / 2.0 + p.theta1
        y = np.sin(np.deg2rad(ang))
        x = np.cos(np.deg2rad(ang))
        horizontalalignment = {-1: "right", 1: "left"}[int(np.sign(x))]
        connectionstyle = "angle,angleA=0,angleB={}".format(ang)
        kw["arrowprops"].update({"connectionstyle": connectionstyle})
        ax.annotate(
            recipe[i], xy=(x, y), xytext=(1.35 * np.sign(x), 1.4 * y), horizontalalignment=horizontalalignment, **kw
        )

    ax.set_title(
        "Trained model: Precision = {:.2f}%, Recall = {:.2f}%, F1-Score = {:.2f}%".format(
            prec * 100, rec * 100, F1 * 100
        )
    )
    plt.savefig(path, bbox_inches="tight")
    plt.close()
    return prec, rec, F1


def f1_score_w_co(input_data, min_co=0.01, max_co=0.99, step=0.01):
    data = input_data.copy()
    data["y_pred"] = np.clip(np.ceil(data["y_pred"].values / step) * step, min_co, max_co)

    pos = data["y_true"].sum()
    neg = data["y_true"].shape[0] - pos

    grp = pd.DataFrame(data).groupby("y_pred")["y_true"].agg(["sum", "count"])
    grp.sort_index(inplace=True)

    grp["fp"] = grp["sum"].cumsum()
    grp["tp"] = pos - grp["fp"]
    grp["tn"] = (grp["count"] - grp["sum"]).cumsum()
    grp["fn"] = neg - grp["tn"]

    grp["pr"] = grp["tp"] / (grp["tp"] + grp["fp"])
    grp["rec"] = grp["tp"] / (grp["tp"] + grp["fn"])

    grp["f1_score"] = 2 * (grp["pr"] * grp["rec"]) / (grp["pr"] + grp["rec"])

    best_score = grp["f1_score"].max()
    best_co = grp.index.values[grp["f1_score"] == best_score].mean()

    # print((y_pred < best_co).mean())

    return best_score, best_co


def get_bins_table(data):
    bins_table = data.groupby("bin").agg({"y_true": [len, np.mean], "y_pred": [np.min, np.mean, np.max]}).reset_index()
    bins_table.columns = [
        "Bin number",
        "Amount of objects",
        "Mean target",
        "Min probability",
        "Average probability",
        "Max probability",
    ]
    return bins_table.to_html(index=False)


# Regression plots:


def plot_target_distribution_1(data, path):
    sns.set(style="whitegrid", font_scale=1.5)
    fig, axs = plt.subplots(2, 1, figsize=(16, 20))

    sns.kdeplot(data["y_true"], shade=True, color="g", ax=axs[0])
    axs[0].set_xlabel("Target value")
    axs[0].set_ylabel("Density")
    axs[0].set_title("Target distribution (y_true)")

    sns.kdeplot(data["y_pred"], shade=True, color="r", ax=axs[1])
    axs[1].set_xlabel("Target value")
    axs[1].set_ylabel("Density")
    axs[1].set_title("Target distribution (y_pred)")

    fig.savefig(path, bbox_inches="tight")
    plt.close()


def plot_target_distribution_2(data, path):
    sns.set(style="whitegrid", font_scale=1.5)
    fig, axs = plt.subplots(figsize=(16, 10))

    sns.kdeplot(data["y_true"], shade=True, color="g", label="y_true", ax=axs)
    sns.kdeplot(data["y_pred"], shade=True, color="r", label="y_pred", ax=axs)
    axs.set_xlabel("Target value")
    axs.set_ylabel("Density")
    axs.set_title("Target distribution")

    fig.savefig(path, bbox_inches="tight")
    plt.close()


def plot_target_distribution(data, path):
    data_pred = pd.DataFrame({"Target value": data["y_pred"]})
    data_pred["source"] = "y_pred"
    data_true = pd.DataFrame({"Target value": data["y_true"]})
    data_true["source"] = "y_true"
    data = pd.concat([data_pred, data_true], ignore_index=True)

    sns.set(style="whitegrid", font_scale=1.5)
    g = sns.displot(
        data,
        x="Target value",
        row="source",
        height=9,
        aspect=1.5,
        kde=True,
        color="m",
        facet_kws=dict(margin_titles=True),
    )
    g.fig.suptitle("Target distribution")
    g.fig.tight_layout()
    g.fig.subplots_adjust(top=0.95)

    g.fig.savefig(path, bbox_inches="tight")
    plt.close()


def plot_error_hist(data, path):
    sns.set(style="whitegrid", font_scale=1.5)
    fig, ax = plt.subplots(figsize=(16, 10))

    sns.kdeplot(data["y_pred"] - data["y_true"], shade=True, color="m", ax=ax)
    ax.set_xlabel("Error = y_pred - y_true")
    ax.set_ylabel("Density")
    ax.set_title("Error histogram")

    fig.savefig(path, bbox_inches="tight")
    plt.close()


def plot_reg_scatter(data, path):
    sns.set(style="whitegrid", font_scale=1.5)
    g = sns.jointplot(
        x="y_pred",
        y="y_true",
        data=data,
        kind="reg",
        truncate=False,
        color="m",
        height=14,
    )
    g.fig.suptitle("Scatter plot")
    g.fig.tight_layout()
    g.fig.subplots_adjust(top=0.95)

    g.fig.savefig(path, bbox_inches="tight")
    plt.close()


# Multiclass plots:


def plot_confusion_matrix(data, path):
    sns.set(style="whitegrid", font_scale=1.5)
    fig, ax = plt.subplots(figsize=(16, 12))

    cmat = confusion_matrix(data["y_true"], data["y_pred"], normalize="true")
    sns.heatmap(cmat, annot=True, linewidths=0.5, cmap="Purples", ax=ax)
    ax.set_xlabel("y_pred")
    ax.set_ylabel("y_true")
    ax.set_title("Confusion matrix")

    fig.savefig(path, bbox_inches="tight")
    plt.close()


# Feature importance


def plot_feature_importance(feat_imp, path, features_max=100):
    sns.set(style="whitegrid", font_scale=1.5)
    fig, axs = plt.subplots(figsize=(16, features_max / 2.5))
    sns.barplot(x="Importance", y="Feature", data=feat_imp[:features_max], ax=axs, color="m")
    plt.savefig(path, bbox_inches="tight")
    plt.close()


def list2table(feature_list: list, html_params: dict = {}) -> str:
    """Creates HTML table with feature description from list of converts list of items.

    Args:
        feature_list: list of dictionaries with features' properties (e.g. name, length, stat. properties, etc.);
        html_params: extra parameters for pandas.DataFrame().to_html() function;

    Returns:
        String representation of HTML table.

    """
    default_html_params = {"index": False, "justify": "left"}
    default_html_params.update(html_params)
    if len(feature_list) == 0:
        return None
    else:
        return pd.DataFrame(feature_list).to_html(**default_html_params)


[docs]class ReportDeco: """ Decorator to wrap :class:`~lightautoml.automl.base.AutoML` class to generate html report on ``fit_predict`` and ``predict``. Example: >>> report_automl = ReportDeco(output_path="output_path", report_file_name="report_file_name")(automl). >>> report_automl.fit_predict(train_data) >>> report_automl.predict(test_data) Report will be generated at output_path/report_file_name automatically. Warning: Do not use it just to inference (if you don"t need report), because: - It needs target variable to calc performance metrics. - It takes additional time to generate report. - Dump of decorated automl takes more memory to store. To get unwrapped fitted instance to pickle and inferecne access ``report_automl.model`` attribute. """ @property def model(self): """Get unwrapped model. Returns: model. """ return self._model @property def mapping(self): return self._model.reader.class_mapping @property def task(self): return self._model.reader.task._name
[docs] def __init__(self, *args, **kwargs): """ Note: Valid kwargs are: - output_path: Folder with report files. - report_file_name: Name of main report file. Args: *args: Arguments. **kwargs: Additional parameters. """ if not kwargs: kwargs = {} # default params self.fi_params = {"method": "fast", "n_sample": 100_000} self.interpretation_params = { "top_n_features": 5, "top_n_categories": 10, "top_n_classes": 10, "n_bins": 30, "datetime_level": "year", "n_sample": 100_000, } fi_input_params = kwargs.get("fi_params", {}) self.fi_params.update(fi_input_params) interpretation_input_params = kwargs.get("interpretation_params", {}) self.interpretation_params.update(interpretation_input_params) self.interpretation = kwargs.get("interpretation", False) self.n_bins = kwargs.get("n_bins", 20) self.template_path = kwargs.get("template_path", os.path.join(base_dir, "lama_report_templates/")) self.output_path = kwargs.get("output_path", "lama_report/") self.report_file_name = kwargs.get("report_file_name", "lama_interactive_report.html") self.pdf_file_name = kwargs.get("pdf_file_name", None) if not os.path.exists(self.output_path): os.makedirs(self.output_path, exist_ok=True) self._base_template_path = "lama_base_template.html" self._model_section_path = "model_section.html" self._train_set_section_path = "train_set_section.html" self._results_section_path = "results_section.html" self._fi_section_path = "feature_importance_section.html" self._interpretation_section_path = "interpretation_section.html" self._interpretation_subsection_path = "interpretation_subsection.html" self._inference_section_path = { "binary": "binary_inference_section.html", "reg": "reg_inference_section.html", "multiclass": "multiclass_inference_section.html", } self.title = "LAMA report" if self.interpretation: self.sections_order = [ "intro", "model", "train_set", "fi", "interpretation", "results", ] self._interpretation_top = [] else: self.sections_order = ["intro", "model", "train_set", "fi", "results"] self._sections = {} self._sections["intro"] = "<p>This report was generated automatically.</p>" self._model_results = [] self.generate_report()
def __call__(self, model): self._model = model # add informataion to report self._model_name = model.__class__.__name__ self._model_parameters = json2html.convert(extract_params(model)) self._model_summary = None self._sections = {} self._sections["intro"] = "<p>This report was generated automatically.</p>" self._model_results = [] self._n_test_sample = 0 self._generate_model_section() self.generate_report() return self def _binary_classification_details(self, data): self._inference_content["sample_bins_table"] = get_bins_table(data) prec, rec, F1 = plot_pie_f1_metric( data, self._F1_thresh, path=os.path.join(self.output_path, self._inference_content["pie_f1_metric"]), ) auc_score = plot_roc_curve_image( data, path=os.path.join(self.output_path, self._inference_content["roc_curve"]), ) plot_pr_curve_image( data, path=os.path.join(self.output_path, self._inference_content["pr_curve"]), ) plot_preds_distribution_by_bins( data, path=os.path.join(self.output_path, self._inference_content["preds_distribution_by_bins"]), ) plot_distribution_of_logits( data, path=os.path.join(self.output_path, self._inference_content["distribution_of_logits"]), ) return auc_score, prec, rec, F1 def _regression_details(self, data): # graphics plot_target_distribution( data, path=os.path.join(self.output_path, self._inference_content["target_distribution"]), ) plot_error_hist( data, path=os.path.join(self.output_path, self._inference_content["error_hist"]), ) plot_reg_scatter( data, path=os.path.join(self.output_path, self._inference_content["scatter_plot"]), ) # metrics mean_ae = mean_absolute_error(data["y_true"], data["y_pred"]) median_ae = median_absolute_error(data["y_true"], data["y_pred"]) mse = mean_squared_error(data["y_true"], data["y_pred"]) r2 = r2_score(data["y_true"], data["y_pred"]) evs = explained_variance_score(data["y_true"], data["y_pred"]) return mean_ae, median_ae, mse, r2, evs def _multiclass_details(self, data): y_true = data["y_true"] y_pred = data["y_pred"] # precision p_micro = precision_score(y_true, y_pred, average="micro") p_macro = precision_score(y_true, y_pred, average="macro") p_weighted = precision_score(y_true, y_pred, average="weighted") # recall r_micro = recall_score(y_true, y_pred, average="micro") r_macro = recall_score(y_true, y_pred, average="macro") r_weighted = recall_score(y_true, y_pred, average="weighted") # f1-score f_micro = f1_score(y_true, y_pred, average="micro") f_macro = f1_score(y_true, y_pred, average="macro") f_weighted = f1_score(y_true, y_pred, average="weighted") # classification report for features if self.mapping: classes = sorted(self.mapping, key=self.mapping.get) else: classes = np.arange(self._N_classes) p, r, f, s = precision_recall_fscore_support(y_true, y_pred) cls_report = pd.DataFrame( { "Class name": classes, "Precision": p, "Recall": r, "F1-score": f, "Support": s, } ) self._inference_content["classification_report"] = cls_report.to_html( index=False, float_format="{:.4f}".format, justify="left" ) plot_confusion_matrix( data, path=os.path.join(self.output_path, self._inference_content["confusion_matrix"]), ) return [ p_micro, p_macro, p_weighted, r_micro, r_macro, r_weighted, f_micro, f_macro, f_weighted, ] def _collect_data(self, preds, sample): data = pd.DataFrame({"y_true": sample[self._target].values}) if self.task in "multiclass": if self.mapping is not None: data["y_true"] = np.array([self.mapping[y] for y in data["y_true"].values]) data["y_pred"] = preds._data.argmax(axis=1) data = data[~np.isnan(preds._data).any(axis=1)] else: data["y_pred"] = preds._data[:, 0] data.sort_values("y_pred", ascending=False, inplace=True) data["bin"] = (np.arange(data.shape[0]) / data.shape[0] * self.n_bins).astype(int) data = data[~data["y_pred"].isnull()] return data
[docs] def fit_predict(self, *args, **kwargs): """Wrapped ``automl.fit_predict`` method. Valid args, kwargs are the same as wrapped automl. Args: *args: Arguments. **kwargs: Additional parameters. Returns: OOF predictions. """ # TODO: parameters parsing in general case preds = self._model.fit_predict(*args, **kwargs) train_data = kwargs["train_data"] if "train_data" in kwargs else args[0] input_roles = kwargs["roles"] if "roles" in kwargs else args[1] self._target = input_roles["target"] valid_data = kwargs.get("valid_data", None) if valid_data is None: data = self._collect_data(preds, train_data) else: data = self._collect_data(preds, valid_data) self._inference_content = {} if self.task == "binary": # filling for html self._inference_content = {} self._inference_content["roc_curve"] = "valid_roc_curve.png" self._inference_content["pr_curve"] = "valid_pr_curve.png" self._inference_content["pie_f1_metric"] = "valid_pie_f1_metric.png" self._inference_content["preds_distribution_by_bins"] = "valid_preds_distribution_by_bins.png" self._inference_content["distribution_of_logits"] = "valid_distribution_of_logits.png" # graphics and metrics _, self._F1_thresh = f1_score_w_co(data) auc_score, prec, rec, F1 = self._binary_classification_details(data) # update model section evaluation_parameters = ["AUC-score", "Precision", "Recall", "F1-score"] self._model_summary = pd.DataFrame( { "Evaluation parameter": evaluation_parameters, "Validation sample": [auc_score, prec, rec, F1], } ) elif self.task == "reg": # filling for html self._inference_content["target_distribution"] = "valid_target_distribution.png" self._inference_content["error_hist"] = "valid_error_hist.png" self._inference_content["scatter_plot"] = "valid_scatter_plot.png" # graphics and metrics mean_ae, median_ae, mse, r2, evs = self._regression_details(data) # model section evaluation_parameters = [ "Mean absolute error", "Median absolute error", "Mean squared error", "R^2 (coefficient of determination)", "Explained variance", ] self._model_summary = pd.DataFrame( { "Evaluation parameter": evaluation_parameters, "Validation sample": [mean_ae, median_ae, mse, r2, evs], } ) elif self.task == "multiclass": self._N_classes = len(train_data[self._target].drop_duplicates()) self._inference_content["confusion_matrix"] = "valid_confusion_matrix.png" index_names = np.array([["Precision", "Recall", "F1-score"], ["micro", "macro", "weighted"]]) index = pd.MultiIndex.from_product(index_names, names=["Evaluation metric", "Average"]) summary = self._multiclass_details(data) self._model_summary = pd.DataFrame({"Validation sample": summary}, index=index) self._inference_content["title"] = "Results on validation sample" self._generate_model_section() # generate train data section self._train_data_overview = self._data_genenal_info(train_data) self._describe_roles(train_data) self._describe_dropped_features(train_data) self._generate_train_set_section() # generate fit_predict section self._generate_inference_section() # generate feature importance and interpretation sections self._generate_fi_section(valid_data) if self.interpretation: self._generate_interpretation_section(valid_data) self.generate_report() return preds
[docs] def predict(self, *args, **kwargs): """Wrapped automl.predict method. Valid args, kwargs are the same as wrapped automl. Args: *args: arguments. **kwargs: additional parameters. Returns: predictions. """ self._n_test_sample += 1 # get predictions test_preds = self._model.predict(*args, **kwargs) test_data = kwargs["test"] if "test" in kwargs else args[0] data = self._collect_data(test_preds, test_data) if self.task == "binary": # filling for html self._inference_content = {} self._inference_content["roc_curve"] = "test_roc_curve_{}.png".format(self._n_test_sample) self._inference_content["pr_curve"] = "test_pr_curve_{}.png".format(self._n_test_sample) self._inference_content["pie_f1_metric"] = "test_pie_f1_metric_{}.png".format(self._n_test_sample) self._inference_content["bins_preds"] = "test_bins_preds_{}.png".format(self._n_test_sample) self._inference_content["preds_distribution_by_bins"] = "test_preds_distribution_by_bins_{}.png".format( self._n_test_sample ) self._inference_content["distribution_of_logits"] = "test_distribution_of_logits_{}.png".format( self._n_test_sample ) # graphics and metrics auc_score, prec, rec, F1 = self._binary_classification_details(data) if self._n_test_sample >= 2: self._model_summary["Test sample {}".format(self._n_test_sample)] = [ auc_score, prec, rec, F1, ] else: self._model_summary["Test sample"] = [auc_score, prec, rec, F1] elif self.task == "reg": # filling for html self._inference_content = {} self._inference_content["target_distribution"] = "test_target_distribution_{}.png".format( self._n_test_sample ) self._inference_content["error_hist"] = "test_error_hist_{}.png".format(self._n_test_sample) self._inference_content["scatter_plot"] = "test_scatter_plot_{}.png".format(self._n_test_sample) # graphics mean_ae, median_ae, mse, r2, evs = self._regression_details(data) # update model section if self._n_test_sample >= 2: self._model_summary["Test sample {}".format(self._n_test_sample)] = [ mean_ae, median_ae, mse, r2, evs, ] else: self._model_summary["Test sample"] = [mean_ae, median_ae, mse, r2, evs] elif self.task == "multiclass": self._inference_content["confusion_matrix"] = "test_confusion_matrix_{}.png".format(self._n_test_sample) test_summary = self._multiclass_details(data) if self._n_test_sample >= 2: self._model_summary["Test sample {}".format(self._n_test_sample)] = test_summary else: self._model_summary["Test sample"] = test_summary # layout depends on number of test samples if self._n_test_sample >= 2: self._inference_content["title"] = "Results on test sample {}".format(self._n_test_sample) else: self._inference_content["title"] = "Results on test sample" # update model section self._generate_model_section() # generate predict section self._generate_inference_section() self.generate_report() return test_preds
def _generate_fi_section(self, valid_data): if ( self.fi_params["method"] == "accurate" and valid_data is not None and valid_data.shape[0] > self.fi_params["n_sample"] ): valid_data = valid_data.sample(n=self.fi_params["n_sample"]) print( "valid_data was sampled for feature importance calculation: n_sample = {}".format( self.fi_params["n_sample"] ) ) if self.fi_params["method"] == "accurate" and valid_data is None: # raise ValueError("You must set valid_data with accurate feature importance method") self.fi_params["method"] = "fast" warnings.warn( "You must set valid_data with 'accurate' feature importance method. Changed to 'fast' automatically." ) self.feat_imp = self._model.get_feature_scores( calc_method=self.fi_params["method"], data=valid_data, silent=False ) if self.feat_imp is None: fi_path = None else: fi_path = "feature_importance.png" plot_feature_importance(self.feat_imp, path=os.path.join(self.output_path, fi_path)) # add to _sections fi_content = { "fi_method": self.fi_params["method"], "feature_importance": fi_path, } env = Environment(loader=FileSystemLoader(searchpath=self.template_path)) fi_section = env.get_template(self._fi_section_path).render(fi_content) self._sections["fi"] = fi_section def _generate_interpretation_content(self, test_data): self._interpretation_content = {} if test_data is None: self._interpretation_content["interpretation_top"] = None return if self.feat_imp is None: interpretation_feat_list = list(self._model.reader._roles.keys())[ : self.interpretation_params["top_n_features"] ] else: interpretation_feat_list = self.feat_imp["Feature"].values[: self.interpretation_params["top_n_features"]] for feature_name in interpretation_feat_list: interpretaton_subsection = {} interpretaton_subsection["feature_name"] = feature_name interpretaton_subsection["feature_interpretation_plot"] = feature_name + "_interpretation.png" self._plot_pdp( test_data, feature_name, path=os.path.join( self.output_path, interpretaton_subsection["feature_interpretation_plot"], ), ) env = Environment(loader=FileSystemLoader(searchpath=self.template_path)) interpretation_subsection = env.get_template(self._interpretation_subsection_path).render( interpretaton_subsection ) self._interpretation_top.append(interpretation_subsection) print(f"Interpretation info for {feature_name} appended") self._interpretation_content["interpretation_top"] = self._interpretation_top def _generate_interpretation_section(self, test_data): if test_data is not None and test_data.shape[0] > self.interpretation_params["n_sample"]: test_data = test_data.sample(n=self.interpretation_params["n_sample"]) self._generate_interpretation_content(test_data) env = Environment(loader=FileSystemLoader(searchpath=self.template_path)) interpretation_section = env.get_template(self._interpretation_section_path).render( self._interpretation_content ) self._sections["interpretation"] = interpretation_section def _plot_pdp(self, test_data, feature_name, path): feature_role = self._model.reader._roles[feature_name].name # I. Count interpretation print("Calculating interpretation for {}:".format(feature_name)) grid, ys, counts = self._model.get_individual_pdp( test_data=test_data, feature_name=feature_name, n_bins=self.interpretation_params["n_bins"], top_n_categories=self.interpretation_params["top_n_categories"], datetime_level=self.interpretation_params["datetime_level"], ) # II. Plot pdp sns.set(style="whitegrid", font_scale=1.5) fig, axs = plt.subplots(2, 1, figsize=(16, 12), gridspec_kw={"height_ratios": [3, 1]}) axs[0].set_title("PDP plot: " + feature_name) n_classes = ys[0].shape[1] if n_classes == 1: data = pd.concat( [pd.DataFrame({"x": grid[i], "y": ys[i].ravel()}) for i, _ in enumerate(grid)] ).reset_index(drop=True) if feature_role in ["Numeric", "Datetime"]: g0 = sns.lineplot(data=data, x="x", y="y", ax=axs[0], color="m") else: g0 = sns.boxplot(data=data, x="x", y="y", ax=axs[0], showfliers=False, color="m") else: if self.mapping: classes = sorted(self.mapping, key=self.mapping.get)[: self.interpretation_params["top_n_classes"]] else: classes = np.arange(min(n_classes, self.interpretation_params["top_n_classes"])) data = pd.concat( [ pd.DataFrame({"x": grid[i], "y": ys[i][:, k], "class": name}) for i, _ in enumerate(grid) for k, name in enumerate(classes) ] ).reset_index(drop=True) if self._model.reader._roles[feature_name].name in ["Numeric", "Datetime"]: g0 = sns.lineplot(data=data, x="x", y="y", hue="class", ax=axs[0]) else: g0 = sns.boxplot(data=data, x="x", y="y", hue="class", ax=axs[0], showfliers=False) g0.set(ylabel="y_pred") # III. Plot distribution counts = np.array(counts) / sum(counts) if feature_role == "Numeric": g0.set(xlabel="feature value") g1 = sns.histplot(test_data[feature_name], kde=True, color="gray", ax=axs[1]) elif feature_role == "Category": g0.set(xlabel=None) axs[0].set_xticklabels(grid, rotation=90) g1 = sns.barplot(x=grid, y=counts, ax=axs[1], color="gray") else: g0.set(xlabel=self.interpretation_params["datetime_level"]) g1 = sns.barplot(x=grid, y=counts, ax=axs[1], color="gray") g1.set(xlabel=None) g1.set(ylabel="Frequency") g1.set(xticklabels=[]) # IV. Save picture plt.tight_layout() fig.savefig(path, bbox_inches="tight") plt.close() def _data_genenal_info(self, data): general_info = pd.DataFrame(columns=["Parameter", "Value"]) general_info.loc[0] = ("Number of records", data.shape[0]) general_info.loc[1] = ("Total number of features", data.shape[1]) general_info.loc[2] = ("Used features", len(self._model.reader._used_features)) general_info.loc[3] = ( "Dropped features", len(self._model.reader._dropped_features), ) # general_info.loc[4] = ("Number of positive cases", np.sum(data[self._target] == 1)) # general_info.loc[5] = ("Number of negative cases", np.sum(data[self._target] == 0)) return general_info.to_html(index=False, justify="left") def _describe_roles(self, train_data): # detect feature roles roles = self._model.reader._roles numerical_features = [feat_name for feat_name in roles if roles[feat_name].name == "Numeric"] categorical_features = [feat_name for feat_name in roles if roles[feat_name].name == "Category"] datetime_features = [feat_name for feat_name in roles if roles[feat_name].name == "Datetime"] text_features = [feat_name for feat_name in roles if roles[feat_name].name == "Text"] # numerical roles numerical_features_df = [] for feature_name in numerical_features: item = {"Feature name": feature_name} item["NaN ratio"] = "{:.4f}".format(train_data[feature_name].isna().sum() / train_data.shape[0]) values = train_data[feature_name].dropna().values item["min"] = np.min(values) item["quantile_25"] = np.quantile(values, 0.25) item["average"] = np.mean(values) item["median"] = np.median(values) item["quantile_75"] = np.quantile(values, 0.75) item["max"] = np.max(values) numerical_features_df.append(item) self._numerical_features_table = list2table(numerical_features_df, {"float_format": "{:.2f}".format}) # categorical roles categorical_features_df = [] for feature_name in categorical_features: item = {"Feature name": feature_name} item["NaN ratio"] = "{:.4f}".format(train_data[feature_name].isna().sum() / train_data.shape[0]) value_counts = train_data[feature_name].value_counts(normalize=True) values = value_counts.index.values counts = value_counts.values item["Number of unique values"] = len(counts) item["Most frequent value"] = values[0] item["Occurance of most frequent"] = "{:.1f}%".format(100 * counts[0]) item["Least frequent value"] = values[-1] item["Occurance of least frequent"] = "{:.1f}%".format(100 * counts[-1]) categorical_features_df.append(item) self._categorical_features_table = list2table(categorical_features_df) # datetime roles datetime_features_df = [] for feature_name in datetime_features: item = {"Feature name": feature_name} item["NaN ratio"] = "{:.4f}".format(train_data[feature_name].isna().sum() / train_data.shape[0]) values = train_data[feature_name].dropna().values item["min"] = np.min(values) item["max"] = np.max(values) item["base_date"] = self._model.reader._roles[feature_name].base_date datetime_features_df.append(item) self._datetime_features_table = list2table(datetime_features_df) # text roles text_features_df = [] for feature_name in text_features: item = {"Feature name": feature_name} feature_length = train_data[feature_name].str.len() item["Amount of empty records"] = (feature_length == 0).sum(axis=0) item["Length of the shortest sentence"] = feature_length.min() item["Length of the longest sentence"] = feature_length.max() text_features_df.append(item) self._text_features_table = list2table(text_features_df) def _describe_dropped_features(self, train_data): self._max_nan_rate = self._model.reader.max_nan_rate self._max_constant_rate = self._model.reader.max_constant_rate self._features_dropped_list = self._model.reader._dropped_features # dropped features table dropped_list = [col for col in self._features_dropped_list if col != self._target] if dropped_list == []: self._dropped_features_table = None else: dropped_nan_ratio = train_data[dropped_list].isna().sum() / train_data.shape[0] dropped_most_occured = pd.Series(np.nan, index=dropped_list) for col in dropped_list: col_most_occured = train_data[col].value_counts(normalize=True).values if len(col_most_occured) > 0: dropped_most_occured[col] = col_most_occured[0] dropped_features_table = pd.DataFrame( {"nan_rate": dropped_nan_ratio, "constant_rate": dropped_most_occured} ) self._dropped_features_table = ( dropped_features_table.reset_index() .rename(columns={"index": "Название переменной"}) .to_html(index=False, justify="left") ) def _generate_model_section(self): model_summary = None if self._model_summary is not None: model_summary = self._model_summary.to_html( index=self.task == "multiclass", justify="left", float_format="{:.4f}".format, ) env = Environment(loader=FileSystemLoader(searchpath=self.template_path)) model_section = env.get_template(self._model_section_path).render( model_name=self._model_name, model_parameters=self._model_parameters, model_summary=model_summary, ) self._sections["model"] = model_section def _generate_train_set_section(self): env = Environment(loader=FileSystemLoader(searchpath=self.template_path)) train_set_section = env.get_template(self._train_set_section_path).render( train_data_overview=self._train_data_overview, numerical_features_table=self._numerical_features_table, categorical_features_table=self._categorical_features_table, datetime_features_table=self._datetime_features_table, text_features_table=self._text_features_table, target=self._target, max_nan_rate=self._max_nan_rate, max_constant_rate=self._max_constant_rate, dropped_features_table=self._dropped_features_table, ) self._sections["train_set"] = train_set_section def _generate_inference_section(self): env = Environment(loader=FileSystemLoader(searchpath=self.template_path)) inference_section = env.get_template(self._inference_section_path[self.task]).render(self._inference_content) self._model_results.append(inference_section) def _generate_results_section(self): if self._model_results: env = Environment(loader=FileSystemLoader(searchpath=self.template_path)) results_section = env.get_template(self._results_section_path).render(model_results=self._model_results) self._sections["results"] = results_section def generate_report(self): # collection sections self._generate_results_section() sections_list = [] for sec_name in self.sections_order: if sec_name in self._sections: sections_list.append(self._sections[sec_name]) # put sections inside env = Environment(loader=FileSystemLoader(searchpath=self.template_path)) report = env.get_template(self._base_template_path).render( title=self.title, sections=sections_list, pdf=self.pdf_file_name ) with open(os.path.join(self.output_path, self.report_file_name), "w", encoding="utf-8") as f: f.write(report) if self.pdf_file_name: try: from weasyprint import HTML HTML(string=report, base_url=self.output_path).write_pdf( os.path.join(self.output_path, self.pdf_file_name) ) except ModuleNotFoundError: print("Can't generate PDF report: check manual for installing pdf extras.")
_default_wb_report_params = { "automl_date_column": "", "report_name": "autowoe_report.html", "report_version_id": 1, "city": "", "model_aim": "", "model_name": "", "zakazchik": "", "high_level_department": "", "ds_name": "", "target_descr": "", "non_target_descr": "", }
[docs]class ReportDecoWhitebox(ReportDeco): """ Special report wrapper for :class:`~lightautoml.automl.presets.whitebox_presets.WhiteBoxPreset`. Usage case is the same as main :class:`~lightautoml.report.report_deco.ReportDeco` class. It generates same report as :class:`~lightautoml.report.report_deco.ReportDeco` , but with additional whitebox report part. Difference: - report_automl.predict gets additional report argument. It stands for updating whitebox report part. Calling ``report_automl.predict(test_data, report=True)`` will update test part of whitebox report. Calling ``report_automl.predict(test_data, report=False)`` will extend general report with. New data and keeps whitebox part as is (much more faster). - :class:`~lightautoml.automl.presets.whitebox_presets.WhiteBoxPreset` should be created with parameter ``general_params={"report": True}`` to get white box report part. If ``general_params`` set to ``{"report": False}``, only standard ReportDeco part will be created (much faster). """ @property def model(self): """Get unwrapped WhiteBox. Returns: model. """ # this is made to remove heavy whitebox inner report deco model = copy(self._model) try: model_wo_report = model.whitebox.model except AttributeError: return self._model pipe = copy(self._model.levels[0][0]) ml_algo = copy(pipe.ml_algos[0]) ml_algo.models = [model_wo_report] pipe.ml_algos = [ml_algo] model.levels = [[pipe]] return model @property def content(self): return self._model.whitebox._ReportDeco__stat def __init__(self, **kwargs): super().__init__(**kwargs) self.wb_report_params = copy(_default_wb_report_params) # self.wb_report_params = wb_report_params self.wb_report_params["output_path"] = self.output_path self._whitebox_section_path = "whitebox_section.html" self.sections_order.append("whitebox")
[docs] def fit_predict(self, *args, **kwargs): """Wrapped :meth:`AutoML.fit_predict` method. Valid args, kwargs are the same as wrapped automl. Args: *args: Arguments. **kwargs: Additional parameters. Returns: OOF predictions. """ predict_proba = super().fit_predict(*args, **kwargs) if self._model.general_params["report"]: self._generate_whitebox_section() else: logger.info2("Whitebox part is not created. Fit WhiteBox with general_params['report'] = True") self.generate_report() return predict_proba
[docs] def predict(self, *args, **kwargs): """Wrapped :meth:`AutoML.predict` method. Valid args, kwargs are the same as wrapped automl. Args: *args: Arguments. **kwargs: Additional parameters. Returns: Predictions. """ if len(args) >= 2: args = (args[0],) kwargs["report"] = self._model.general_params["report"] predict_proba = super().predict(*args, **kwargs) if self._model.general_params["report"]: self._generate_whitebox_section() else: logger.info2("Whitebox part is not created. Fit WhiteBox with general_params['report'] = True") self.generate_report() return predict_proba
def _generate_whitebox_section(self): self._model.whitebox.generate_report(self.wb_report_params) content = self.content.copy() if self._n_test_sample >= 2: content["n_test_sample"] = self._n_test_sample content["model_coef"] = pd.DataFrame(content["model_coef"], columns=["Feature name", "Coefficient"]).to_html( index=False ) content["p_vals"] = pd.DataFrame(content["p_vals"], columns=["Feature name", "P-value"]).to_html(index=False) content["p_vals_test"] = pd.DataFrame(content["p_vals_test"], columns=["Feature name", "P-value"]).to_html( index=False ) content["train_vif"] = pd.DataFrame(content["train_vif"], columns=["Feature name", "VIF value"]).to_html( index=False ) content["psi_total"] = pd.DataFrame(content["psi_total"], columns=["Feature name", "PSI value"]).to_html( index=False ) content["psi_zeros"] = pd.DataFrame(content["psi_zeros"], columns=["Feature name", "PSI value"]).to_html( index=False ) content["psi_ones"] = pd.DataFrame(content["psi_ones"], columns=["Feature name", "PSI value"]).to_html( index=False ) env = Environment(loader=FileSystemLoader(searchpath=self.template_path)) self._sections["whitebox"] = env.get_template(self._whitebox_section_path).render(content)
def plot_data_hist(data, title="title", bins=100, path=None): sns.set(style="whitegrid", font_scale=1.5) fig, axs = plt.subplots(figsize=(16, 10)) sns.distplot(data, bins=bins, color="m", ax=axs) axs.set_title(title) fig.savefig(path, bbox_inches="tight") plt.close() class ReportDecoNLP(ReportDeco): """ Special report wrapper for :class:`~lightautoml.automl.presets.text_presets.TabularNLPAutoML`. Usage case is the same as main :class:`~lightautoml.report.report_deco.ReportDeco` class. It generates same report as :class:`~lightautoml.report.report_deco.ReportDeco` , but with additional NLP report part. """ def __init__(self, **kwargs): super().__init__(**kwargs) self._nlp_section_path = "nlp_section.html" self._nlp_subsection_path = "nlp_subsection.html" self._nlp_subsections = [] self.sections_order.append("nlp") def __call__(self, model): self._model = model # add informataion to report self._model_name = model.__class__.__name__ self._model_parameters = json2html.convert(extract_params(model)) self._model_summary = None self._sections = {} self._sections["intro"] = "<p>This report was generated automatically.</p>" self._model_results = [] self._n_test_sample = 0 self._generate_model_section() self.generate_report() return self def fit_predict(self, *args, **kwargs): """Wrapped :meth:`TabularNLPAutoML.fit_predict` method. Valid args, kwargs are the same as wrapped automl. Args: *args: Arguments. **kwargs: Additional parameters. Returns: OOF predictions. """ preds = super().fit_predict(*args, **kwargs) train_data = kwargs["train_data"] if "train_data" in kwargs else args[0] roles = kwargs["roles"] if "roles" in kwargs else args[1] self._text_fields = self._get_text_fields(roles) train_data[self._text_fields] = train_data[self._text_fields].fillna("") for text_field in self._text_fields: content = {} content["title"] = "Text field: " + text_field content["char_len_hist"] = text_field + "_char_len_hist.png" plot_data_hist( data=train_data[text_field].apply(len).values, path=os.path.join(self.output_path, content["char_len_hist"]), title="Length in char", ) content["tokens_len_hist"] = text_field + "_tokens_len_hist.png" plot_data_hist( data=train_data[text_field].str.split(" ").apply(len).values, path=os.path.join(self.output_path, content["tokens_len_hist"]), title="Length in tokens", ) self._generate_nlp_subsection(content) # Concatenated text fields if len(self._text_fields) >= 2: all_fields = train_data[self._text_fields].agg(" ".join, axis=1) content = {} content["title"] = "Concatenated text fields" content["char_len_hist"] = "concat_char_len_hist.png" plot_data_hist( data=all_fields.apply(len).values, path=os.path.join(self.output_path, content["char_len_hist"]), title="Length in char", ) content["tokens_len_hist"] = "concat_tokens_len_hist.png" plot_data_hist( data=all_fields.str.split(" ").apply(len).values, path=os.path.join(self.output_path, content["tokens_len_hist"]), title="Length in tokens", ) self._generate_nlp_subsection(content) self._generate_nlp_section() self.generate_report() return preds def _generate_nlp_subsection(self, content): # content has the following fields: # title: subsection title # char_len_hist: path to histogram of text length (number of chars) # tokens_len_hist: path to histogram of text length (number of tokens) env = Environment(loader=FileSystemLoader(searchpath=self.template_path)) nlp_subsection = env.get_template(self._nlp_subsection_path).render(content) self._nlp_subsections.append(nlp_subsection) def _generate_nlp_section(self): if self._model_results: env = Environment(loader=FileSystemLoader(searchpath=self.template_path)) nlp_section = env.get_template(self._nlp_section_path).render(nlp_subsections=self._nlp_subsections) self._sections["nlp"] = nlp_section @staticmethod def _get_text_fields(roles: dict) -> list: """Returns all text fields, mentioned in roles. Args: roles: Roles. Returns: List of text fields. """ text_roles = roles.get("text", []) for role_type, role_name in roles.items(): if isinstance(role_type, laml_roles.TextRole): if isinstance(role_name, str): text_roles.append(role_name) elif isinstance(role_name, list): text_roles.extend(role_name) return text_roles def get_uplift_data(test_target, uplift_pred, test_treatment, mode): try: perfect = uplift_metrics.perfect_uplift_curve(test_target, test_treatment) xs_perfect, ys_perfect = uplift_metrics.calculate_graphic_uplift_curve( test_target, perfect, test_treatment, mode ) except NotImplementedError as e: # noqa: F841 xs_perfect, ys_perfect = None, None xs, ys = uplift_metrics.calculate_graphic_uplift_curve(test_target, uplift_pred, test_treatment, mode) normed = type_of_target(test_target) == "binary" uplift_auc = uplift_metrics.calculate_uplift_auc(test_target, uplift_pred, test_treatment, mode, normed=normed) return xs, ys, xs_perfect, ys_perfect, uplift_auc def plot_uplift_curve(test_target, uplift_pred, test_treatment, path): sns.set(style="whitegrid", font_scale=1.5) # plt.figure(figsize=(10, 10)); fig, axs = plt.subplots(3, 1, figsize=(10, 30)) # qini xs, ys, xs_perfect, ys_perfect, uplift_auc = get_uplift_data(test_target, uplift_pred, test_treatment, mode="qini") axs[0].plot(xs, ys, color="blue", lw=2, label="qini mode") if xs_perfect is not None and ys_perfect is not None: axs[0].plot(xs_perfect, ys_perfect, color="black", lw=1, label="perfect uplift") axs[0].plot( (0, xs[-1]), (0, ys[-1]), color="black", lw=1, linestyle="--", label="random model", ) axs[0].set_title("Uplift qini, AUC={:.3f}".format(uplift_auc)) axs[0].legend(loc="lower right") # cum_gain xs, ys, xs_perfect, ys_perfect, uplift_auc = get_uplift_data( test_target, uplift_pred, test_treatment, mode="cum_gain" ) axs[1].plot(xs, ys, color="red", lw=2, label="cum_gain model") if xs_perfect is not None and ys_perfect is not None: axs[1].plot(xs_perfect, ys_perfect, color="black", lw=1, label="perfect uplift") axs[1].plot( (0, xs[-1]), (0, ys[-1]), color="black", lw=1, linestyle="--", label="random model", ) axs[1].set_title("Uplift cum_gain, AUC={:.3f}".format(uplift_auc)) axs[1].legend(loc="lower right") # adj_qini xs, ys, xs_perfect, ys_perfect, uplift_auc = get_uplift_data( test_target, uplift_pred, test_treatment, mode="adj_qini" ) axs[2].plot(xs, ys, color="green", lw=2, label="adj_qini mode") if xs_perfect is not None and ys_perfect is not None: axs[2].plot(xs_perfect, ys_perfect, color="black", lw=1, label="perfect uplift") axs[2].plot( (0, xs[-1]), (0, ys[-1]), color="black", lw=1, linestyle="--", label="random model", ) axs[2].set_title("Uplift adj_qini, AUC={:.3f}".format(uplift_auc)) axs[2].legend(loc="lower right") plt.savefig(path, bbox_inches="tight") plt.close() class ReportDecoUplift(ReportDeco): _available_metalearners = (TLearner, XLearner) @property def reader(self): if self._is_xlearner: return self._model.learners["outcome"]["treatment"].reader # effect else: return self._model.treatment_learner.reader @property def task(self): if self._is_xlearner: return "reg" else: return self.reader.task._name def __init__(self, **kwargs): super().__init__(**kwargs) self._uplift_section_path = "uplift_section.html" self._uplift_subsection_path = "uplift_subsection.html" self.sections_order.append("uplift") self._uplift_results = [] def __call__(self, model): self._model = model self._is_xlearner = isinstance(model, XLearner) # add informataion to report self._model_name = model.__class__.__name__ self._model_parameters = json2html.convert(extract_params(model)) self._model_summary = None self._sections = {} self._sections["intro"] = "<p>This report was generated automatically.</p>" self._model_results = [] self._n_test_sample = 0 self._generate_model_section() self.generate_report() return self def fit(self, *args, **kwargs): """Wrapped automl.fit_predict method. Valid args, kwargs are the same as wrapped automl. Args: *args: arguments. **kwargs: additional parameters. """ train_data = kwargs["train_data"] if "train_data" in kwargs else args[0] input_roles = kwargs["roles"] if "roles" in kwargs else args[1] self._target = input_roles["target"] self._treatment_col = input_roles["treatment"] if self._is_xlearner: self._fit_xlearner(train_data, input_roles) else: self._fit_tlearner(train_data, input_roles) self._model._is_fitted = True self._generate_model_section() self._train_data_overview = self._data_general_info(train_data, "train") self._describe_roles(train_data) self._describe_dropped_features(train_data) self._generate_train_set_section() self.generate_report() def predict(self, test_data): """Wrapped tlearner.predict method. Valid args, kwargs are the same as wrapped automl. Args: test_data: Dataset to perform inference. Returns: predictions. """ self._n_test_sample += 1 # get predictions test_target = test_data[self._target].values test_treatment = test_data[self._treatment_col].values # test_data = test_data.drop([self._target, self._treatment_col], axis=1) uplift, treatment_preds, control_preds = self._model.predict(test_data) if self._n_test_sample >= 2: treatment_title = "Treatment test {}".format(self._n_test_sample) control_title = "Control test {}".format(self._n_test_sample) else: treatment_title = "Treatment test" control_title = "Control test" # treatment data data = pd.DataFrame({"y_true": test_target[test_treatment == 1]}) data["y_pred"] = treatment_preds[test_treatment == 1] data.sort_values("y_pred", ascending=False, inplace=True) data["bin"] = (np.arange(data.shape[0]) / data.shape[0] * self.n_bins).astype(int) data = data[~data["y_pred"].isnull()] self._generate_test_subsection(data, "treatment", treatment_title) self._generate_inference_section() # control data data = pd.DataFrame({"y_true": test_target[test_treatment == 0]}) data["y_pred"] = control_preds[test_treatment == 0] data.sort_values("y_pred", ascending=False, inplace=True) data["bin"] = (np.arange(data.shape[0]) / data.shape[0] * self.n_bins).astype(int) data = data[~data["y_pred"].isnull()] self._generate_test_subsection(data, "control", control_title) self._generate_inference_section() # update model section self._generate_model_section() # uplift section self._uplift_content = {} if self._n_test_sample >= 2: self._uplift_content["title"] = "Test sample {}".format(self._n_test_sample) self._uplift_content["uplift_curve"] = "uplift_curve_{}.png".format(self._n_test_sample) self._uplift_content["uplift_distribution"] = "uplift_distribution_{}.png".format(self._n_test_sample) else: self._uplift_content["title"] = "Test sample" self._uplift_content["uplift_curve"] = "uplift_curve.png" self._uplift_content["uplift_distribution"] = "uplift_distribution.png" plot_uplift_curve( test_target, uplift, test_treatment, path=os.path.join(self.output_path, self._uplift_content["uplift_curve"]), ) self._uplift_distribution( test_target, uplift, test_treatment, path=os.path.join(self.output_path, self._uplift_content["uplift_distribution"]), ) self._uplift_content["test_data_overview"] = self._data_general_info(test_data, "test") self._generate_uplift_subsection() self._generate_uplift_section() self.generate_report() return uplift, treatment_preds, control_preds def _uplift_distribution(self, test_target, uplift, test_treatment, path): data = pd.DataFrame({"y_true": test_target, "y_pred": uplift, "treatment": test_treatment}) data.sort_values("y_pred", ascending=True, inplace=True) data["bin"] = (np.arange(data.shape[0]) / data.shape[0] * self.n_bins).astype(int) # 'Uplift fact' mean_target_treatment = ( data[data["treatment"].values == 1].groupby("bin").agg({"y_true": [np.mean]}).values[:, 0] ) mean_target_control = data[data["treatment"].values == 0].groupby("bin").agg({"y_true": [np.mean]}).values[:, 0] uplift_fact = mean_target_treatment - mean_target_control bins_table = data.groupby("bin").agg({"y_true": [len], "y_pred": [np.min, np.mean, np.max]}).reset_index() bins_table.columns = [ "Bin number", "Amount of objects", "Min uplift", "Mean uplift", "Max uplift", ] bins_table["Uplift fact"] = uplift_fact self._uplift_content["uplift_bins_table"] = bins_table.to_html(index=False) # uplift kde distribution sns.set(style="whitegrid", font_scale=1.5) fig, axs = plt.subplots(figsize=(16, 10)) sns.kdeplot(data["y_pred"], shade=True, color="g", label="y_pred", ax=axs) axs.set_xlabel("Uplift value") axs.set_ylabel("Density") axs.set_title("Uplift distribution") fig.savefig(path, bbox_inches="tight") plt.close() def _fit_tlearner(self, train_data, roles): treatment_role, _ = _get_treatment_role(roles) new_roles = deepcopy(roles) new_roles.pop(treatment_role) self._model._timer._timeout = 1e10 self._model._timer.start() # treatment treatment_train_data = train_data[train_data[self._treatment_col] == 1] treatment_target = treatment_train_data[self._target].values treatment_train_data.drop(self._treatment_col, axis=1, inplace=True) treatment_preds = self._model.treatment_learner.fit_predict(treatment_train_data, new_roles) # control control_train_data = train_data[train_data[self._treatment_col] == 0] control_target = control_train_data[self._target].values control_train_data.drop(self._treatment_col, axis=1, inplace=True) control_preds = self._model.control_learner.fit_predict(control_train_data, new_roles) self._generate_fit_section(treatment_preds, control_preds, treatment_target, control_target) def _fit_xlearner(self, train_data, roles): treatment_role, _ = _get_treatment_role(roles) new_roles = deepcopy(roles) new_roles.pop(treatment_role) self._model._timer._timeout = 1e10 self._model._timer.start() self._model._fit_propensity_learner(train_data, roles) self._model._fit_outcome_learners(train_data, roles) # treatment treatment_train_data = train_data[train_data[self._treatment_col] == 1] treatment_train_data.drop(self._treatment_col, axis=1, inplace=True) outcome_pred = self._model.learners["outcome"]["control"].predict(treatment_train_data).data.ravel() treatment_train_data[self._target] = treatment_train_data[self._target] - outcome_pred treatment_target = treatment_train_data[self._target].values treatment_preds = self._model.learners["effect"]["treatment"].fit_predict(treatment_train_data, new_roles) # control control_train_data = train_data[train_data[self._treatment_col] == 0] control_train_data.drop(self._treatment_col, axis=1, inplace=True) outcome_pred = self._model.learners["outcome"]["treatment"].predict(control_train_data).data.ravel() control_train_data[self._target] = control_train_data[self._target] - outcome_pred control_train_data[self._target] *= -1 control_target = control_train_data[self._target].values control_preds = self._model.learners["effect"]["control"].fit_predict(control_train_data, new_roles) self._generate_fit_section(treatment_preds, control_preds, treatment_target, control_target) def _generate_fit_section(self, treatment_preds, control_preds, treatment_target, control_target): self._generate_model_summary_table() # treatment model treatment_data = self._collect_data(treatment_preds, treatment_target) self._generate_training_subsection(treatment_data, "treatment", "Treatment train") self._generate_inference_section() control_data = self._collect_data(control_preds, control_target) self._generate_training_subsection(control_data, "control", "Control train") self._generate_inference_section() def _collect_data(self, preds, target): data = pd.DataFrame({"y_true": target}) if self.task in "multiclass": if self.mapping is not None: data["y_true"] = np.array([self.mapping[y] for y in data["y_true"].values]) data["y_pred"] = preds._data.argmax(axis=1) else: data["y_pred"] = preds._data[:, 0] data.sort_values("y_pred", ascending=False, inplace=True) data["bin"] = (np.arange(data.shape[0]) / data.shape[0] * self.n_bins).astype(int) # remove NaN in predictions: data = data[~data["y_pred"].isnull()] return data def _generate_model_summary_table(self): if self.task == "binary": evaluation_parameters = ["AUC-score", "Precision", "Recall", "F1-score"] self._model_summary = pd.DataFrame({"Evaluation parameter": evaluation_parameters}) elif self.task == "reg": evaluation_parameters = [ "Mean absolute error", "Median absolute error", "Mean squared error", "R^2 (coefficient of determination)", "Explained variance", ] self._model_summary = pd.DataFrame({"Evaluation parameter": evaluation_parameters}) def _generate_training_subsection(self, data, prefix, title): self._inference_content = {} self._inference_content["title"] = title if self.task == "binary": # filling for html self._inference_content["roc_curve"] = prefix + "_roc_curve.png" self._inference_content["pr_curve"] = prefix + "_pr_curve.png" self._inference_content["pie_f1_metric"] = prefix + "_pie_f1_metric.png" self._inference_content["preds_distribution_by_bins"] = prefix + "_preds_distribution_by_bins.png" self._inference_content["distribution_of_logits"] = prefix + "_distribution_of_logits.png" # graphics and metrics _, self._F1_thresh = f1_score_w_co(data) self._model_summary[title] = self._binary_classification_details(data) elif self.task == "reg": # filling for html self._inference_content["target_distribution"] = prefix + "_target_distribution.png" self._inference_content["error_hist"] = prefix + "_error_hist.png" self._inference_content["scatter_plot"] = prefix + "_scatter_plot.png" # graphics and metrics self._model_summary[title] = self._regression_details(data) def _generate_test_subsection(self, data, prefix, title): self._inference_content = {} self._inference_content["title"] = title if self.task == "binary": # filling for html self._inference_content["roc_curve"] = prefix + "_roc_curve_{}.png".format(self._n_test_sample) self._inference_content["pr_curve"] = prefix + "_pr_curve_{}.png".format(self._n_test_sample) self._inference_content["pie_f1_metric"] = prefix + "_pie_f1_metric_{}.png".format(self._n_test_sample) self._inference_content["bins_preds"] = prefix + "_bins_preds_{}.png".format(self._n_test_sample) self._inference_content[ "preds_distribution_by_bins" ] = prefix + "_preds_distribution_by_bins_{}.png".format(self._n_test_sample) self._inference_content["distribution_of_logits"] = prefix + "_distribution_of_logits_{}.png".format( self._n_test_sample ) # graphics and metrics self._model_summary[title] = self._binary_classification_details(data) elif self.task == "reg": # filling for html self._inference_content["target_distribution"] = prefix + "_target_distribution_{}.png".format( self._n_test_sample ) self._inference_content["error_hist"] = prefix + "_error_hist_{}.png".format(self._n_test_sample) self._inference_content["scatter_plot"] = prefix + "_scatter_plot_{}.png".format(self._n_test_sample) # graphics self._model_summary[title] = self._regression_details(data) def _data_general_info(self, data, stage="train"): general_info = pd.DataFrame(columns=["Parameter", "Value"]) general_info.loc[0] = ("Number of records", data.shape[0]) general_info.loc[1] = ("Share of treatment", np.mean(data[self._treatment_col])) general_info.loc[2] = ("Mean target", np.mean(data[self._target])) general_info.loc[3] = ( "Mean target on treatment", np.mean(data[self._target][data[self._treatment_col] == 1]), ) general_info.loc[4] = ( "Mean target on control", np.mean(data[self._target][data[self._treatment_col] == 0]), ) if stage == "train": general_info.loc[5] = ("Total number of features", data.shape[1]) general_info.loc[6] = ("Used features", len(self.reader._used_features)) dropped_list = [col for col in self.reader._dropped_features if col != self._target] general_info.loc[7] = ("Dropped features", len(dropped_list)) return general_info.to_html(index=False, justify="left") def _describe_roles(self, train_data): # detect feature roles # roles = self._model.reader._roles roles = self.reader._roles numerical_features = [feat_name for feat_name in roles if roles[feat_name].name == "Numeric"] categorical_features = [feat_name for feat_name in roles if roles[feat_name].name == "Category"] datetime_features = [feat_name for feat_name in roles if roles[feat_name].name == "Datetime"] text_features = [feat_name for feat_name in roles if roles[feat_name].name == "Text"] # numerical roles numerical_features_df = [] for feature_name in numerical_features: item = {"Feature name": feature_name} item["NaN ratio"] = "{:.4f}".format(train_data[feature_name].isna().sum() / train_data.shape[0]) values = train_data[feature_name].dropna().values item["min"] = np.min(values) item["quantile_25"] = np.quantile(values, 0.25) item["average"] = np.mean(values) item["median"] = np.median(values) item["quantile_75"] = np.quantile(values, 0.75) item["max"] = np.max(values) numerical_features_df.append(item) if numerical_features_df == []: self._numerical_features_table = None else: self._numerical_features_table = pd.DataFrame(numerical_features_df).to_html( index=False, float_format="{:.2f}".format, justify="left" ) # categorical roles categorical_features_df = [] for feature_name in categorical_features: item = {"Feature name": feature_name} item["NaN ratio"] = "{:.4f}".format(train_data[feature_name].isna().sum() / train_data.shape[0]) value_counts = train_data[feature_name].value_counts(normalize=True) values = value_counts.index.values counts = value_counts.values item["Number of unique values"] = len(counts) item["Most frequent value"] = values[0] item["Occurance of most frequent"] = "{:.1f}%".format(100 * counts[0]) item["Least frequent value"] = values[-1] item["Occurance of least frequent"] = "{:.1f}%".format(100 * counts[-1]) categorical_features_df.append(item) if categorical_features_df == []: self._categorical_features_table = None else: self._categorical_features_table = pd.DataFrame(categorical_features_df).to_html( index=False, justify="left" ) # datetime roles datetime_features_df = [] for feature_name in datetime_features: item = {"Feature name": feature_name} item["NaN ratio"] = "{:.4f}".format(train_data[feature_name].isna().sum() / train_data.shape[0]) values = train_data[feature_name].dropna().values item["min"] = np.min(values) item["max"] = np.max(values) item["base_date"] = self.reader._roles[feature_name].base_date datetime_features_df.append(item) if datetime_features_df == []: self._datetime_features_table = None else: self._datetime_features_table = pd.DataFrame(datetime_features_df).to_html(index=False, justify="left") # text roles text_features_df = [] for feature_name in text_features: item = {"Feature name": feature_name} feature_length = train_data[feature_name].str.len() item["Amount of empty records"] = (feature_length == 0).sum(axis=0) item["Length of the shortest sentence"] = feature_length.min() item["Length of the longest sentence"] = feature_length.max() text_features_df.append(item) self._text_features_table = list2table(text_features_df) def _describe_dropped_features(self, train_data): self._max_nan_rate = self.reader.max_nan_rate self._max_constant_rate = self.reader.max_constant_rate self._features_dropped_list = self.reader._dropped_features # dropped features table dropped_list = [col for col in self._features_dropped_list if col != self._target] if dropped_list == []: self._dropped_features_table = None else: dropped_nan_ratio = train_data[dropped_list].isna().sum() / train_data.shape[0] dropped_most_occured = pd.Series(np.nan, index=dropped_list) for col in dropped_list: col_most_occured = train_data[col].value_counts(normalize=True).values if len(col_most_occured) > 0: dropped_most_occured[col] = col_most_occured[0] dropped_features_table = pd.DataFrame( {"nan_rate": dropped_nan_ratio, "constant_rate": dropped_most_occured} ) self._dropped_features_table = ( dropped_features_table.reset_index() .rename(columns={"index": "Название переменной"}) .to_html(index=False, justify="left") ) def _generate_train_set_section(self): env = Environment(loader=FileSystemLoader(searchpath=self.template_path)) train_set_section = env.get_template(self._train_set_section_path).render( train_data_overview=self._train_data_overview, numerical_features_table=self._numerical_features_table, categorical_features_table=self._categorical_features_table, datetime_features_table=self._datetime_features_table, target=self._target, max_nan_rate=self._max_nan_rate, max_constant_rate=self._max_constant_rate, dropped_features_table=self._dropped_features_table, ) self._sections["train_set"] = train_set_section def _generate_uplift_subsection(self): env = Environment(loader=FileSystemLoader(searchpath=self.template_path)) uplift_subsection = env.get_template(self._uplift_subsection_path).render(self._uplift_content) self._uplift_results.append(uplift_subsection) def _generate_uplift_section(self): if self._model_results: env = Environment(loader=FileSystemLoader(searchpath=self.template_path)) results_section = env.get_template(self._uplift_section_path).render(uplift_results=self._uplift_results) self._sections["uplift"] = results_section