"""Torch models."""
from collections import OrderedDict
from typing import List
from typing import Optional
from typing import Union
import numpy as np
import torch
import torch.nn as nn
from lightautoml.ml_algo.torch_based.node_nn_model import DenseODSTBlock, MeanPooling
class GaussianNoise(nn.Module):
"""Adds gaussian noise.
Args:
stddev: Std of noise.
device: Device to compute on.
"""
def __init__(self, stddev: float, device: torch.device):
super().__init__()
self.stddev = stddev
self.device = device
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward-pass."""
if self.training:
return x + torch.randn(x.size(), device=self.device) * self.stddev
return x
class UniformNoise(nn.Module):
"""Add uniform noise.
Args:
stddev: Std of noise.
device: Device to compute on.
"""
def __init__(self, stddev: float, device: torch.Tensor):
super().__init__()
self.stddev = stddev
self.device = device
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward-pass."""
if self.training:
return x + (torch.rand(x.size(), device=self.device) - 0.5) * self.stddev
return x
class DenseLightBlock(nn.Module):
"""Realisation of `'denselight'` model block.
Args:
n_in: Input dim.
n_out: Output dim.
drop_rate: Dropout rate.
noise_std: Std of noise.
act_fun: Activation function.
use_bn: Use BatchNorm.
use_noise: Use noise.
device: Device to compute on.
"""
def __init__(
self,
n_in: int,
n_out: int,
drop_rate: float = 0.1,
noise_std: float = 0.05,
act_fun: nn.Module = nn.ReLU,
use_bn: bool = True,
use_noise: bool = False,
device: torch.device = torch.device("cuda:0"),
**kwargs,
):
super(DenseLightBlock, self).__init__()
self.features = nn.Sequential(OrderedDict([]))
if use_bn:
self.features.add_module("norm", nn.BatchNorm1d(n_in))
if drop_rate:
self.features.add_module("dropout", nn.Dropout(p=drop_rate))
if use_noise:
self.features.add_module("noise", GaussianNoise(noise_std, device))
self.features.add_module("dense", nn.Linear(n_in, n_out))
self.features.add_module("act", act_fun())
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward-pass."""
for name, layer in self.features.named_children():
x = layer(x)
return x
[docs]class DenseLightModel(nn.Module):
"""Realisation of `'denselight'` model.
Args:
n_in: Input dim.
n_out: Output dim.
hidden_size: List of hidden dims.
drop_rate: Dropout rate for each layer separately or altogether.
act_fun: Activation function.
noise_std: Std of noise.
num_init_features: If not none add fc layer before model with certain dim.
use_bn: Use BatchNorm.
use_noise: Use noise.
concat_input: Concatenate input to all hidden layers.
device: Device to compute on.
"""
def __init__(
self,
n_in: int,
n_out: int = 1,
hidden_size: List[int] = [
512,
750,
],
drop_rate: Union[float, List[float]] = 0.1,
act_fun: nn.Module = nn.ReLU,
noise_std: float = 0.05,
num_init_features: Optional[int] = None,
use_bn: bool = True,
use_noise: bool = False,
concat_input: bool = True,
device: torch.device = torch.device("cuda:0"),
**kwargs,
):
super(DenseLightModel, self).__init__()
if isinstance(drop_rate, float):
drop_rate = [drop_rate] * len(hidden_size)
assert len(hidden_size) == len(drop_rate), "Wrong number hidden_sizes/drop_rates. Must be equal."
self.concat_input = concat_input
num_features = n_in if num_init_features is None else num_init_features
self.features = nn.Sequential(OrderedDict([]))
if num_init_features is not None:
self.features.add_module("dense0", nn.Linear(n_in, num_features))
for i, hid_size in enumerate(hidden_size):
block = DenseLightBlock(
n_in=num_features,
n_out=hid_size,
drop_rate=drop_rate[i],
noise_std=noise_std,
act_fun=act_fun,
use_bn=use_bn,
use_noise=use_noise,
device=device,
)
self.features.add_module("denseblock%d" % (i + 1), block)
if concat_input:
num_features = n_in + hid_size
else:
num_features = hid_size
num_features = hidden_size[-1]
self.fc = nn.Linear(num_features, n_out)
[docs] def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward-pass."""
input = x.detach().clone()
for name, layer in self.features.named_children():
if name != "denseblock1" and name != "dense0" and self.concat_input:
x = torch.cat([x, input], 1)
x = layer(x)
x = self.fc(x)
return x
[docs]class MLP(DenseLightModel):
"""Realisation of `'mlp'` model.
Args:
n_in: Input dim.
n_out: Output dim.
hidden_size: List of hidden dims.
drop_rate: Dropout rate for each layer separately or altogether.
act_fun: Activation function.
noise_std: Std of noise.
num_init_features: If not none add fc layer before model with certain dim.
use_bn: Use BatchNorm.
use_noise: Use noise.
device: Device to compute on.
"""
def __init__(self, *args, **kwargs):
super(MLP, self).__init__(*args, **{**kwargs, **{"concat_input": False}})
class _LinearLayer(DenseLightBlock):
"""Realisation of `'_linear_layer'` model.
Args:
n_in: Input dim.
n_out: Output dim.
hidden_size: List of hidden dims.
noise_std: Std of noise.
num_init_features: If not none add fc layer before model with certain dim.
device: Device to compute on.
"""
def __init__(self, *args, **kwargs):
super(_LinearLayer, self).__init__(
*args,
**{
**kwargs,
**{
"use_bn": True,
"use_noise": False,
"drop_rate": 0.0,
"act_fun": nn.Identity,
},
},
)
class LinearLayer(DenseLightBlock):
"""Realisation of `'linear_layer'` model.
Args:
n_in: Input dim.
n_out: Output dim.
hidden_size: List of hidden dims.
noise_std: Std of noise.
num_init_features: If not none add fc layer before model with certain dim.
device: Device to compute on.
"""
def __init__(self, *args, **kwargs):
super(LinearLayer, self).__init__(
*args,
**{
**kwargs,
**{
"use_bn": False,
"use_noise": False,
"drop_rate": 0.0,
"act_fun": nn.Identity,
},
},
)
class DenseLayer(nn.Module):
"""Realisation of `'dense'` model layer.
Args:
n_in: Input dim.
growth_size: Output dim.
bn_factor: Dim of intermediate fc is increased times `bn_factor` in DenseModel layer.
drop_rate: Dropout rate.
act_fun: Activation function.
use_bn: Use BatchNorm.
"""
def __init__(
self,
n_in: int,
growth_size: int = 256,
bn_factor: float = 2,
drop_rate: float = 0.1,
act_fun: nn.Module = nn.ReLU,
use_bn: bool = True,
**kwargs,
):
super(DenseLayer, self).__init__()
self.features1 = nn.Sequential(OrderedDict([]))
self.features2 = nn.Sequential(OrderedDict([]))
if use_bn:
self.features1.add_module("norm1", nn.BatchNorm1d(n_in))
self.features1.add_module("dense1", nn.Linear(n_in, int(bn_factor * n_in)))
self.features1.add_module("act1", act_fun())
if use_bn:
self.features2.add_module("norm2", nn.BatchNorm1d(int(bn_factor * n_in)))
self.features2.add_module("dense2", nn.Linear(int(bn_factor * n_in), growth_size))
self.features2.add_module("act2", act_fun())
if drop_rate:
self.features2.add_module("dropout", nn.Dropout(drop_rate))
def forward(self, prev_features: List[torch.Tensor]):
"""Forward-pass."""
x = self.features1(torch.cat(prev_features, 1))
x = self.features2(x)
return x
class Transition(nn.Sequential):
"""Compress input to lower dim.
Args:
n_in: Input dim.
n_out: Output dim.
growth_size: Output dim of every layer.
act_fun: Activation function.
use_bn: Use BatchNorm.
"""
def __init__(self, n_in: int, n_out: int, act_fun: nn.Module, use_bn: bool = True):
super(Transition, self).__init__()
if use_bn:
self.add_module("norm", nn.BatchNorm1d(n_in))
self.add_module("dense", nn.Linear(n_in, n_out))
self.add_module("act", act_fun())
class DenseBlock(nn.Module):
"""Realisation of `'dense'` model block.
Args:
n_in: Input dim.
num_layers: Number of layers.
bn_factor: Dim of intermediate fc is increased times `bn_factor` in DenseModel layer.
growth_size: Output dim of every layer.
drop_rate: Dropout rate.
act_fun: Activation function.
use_bn: Use BatchNorm.
"""
def __init__(
self,
num_layers: int,
n_in: int,
bn_factor: float,
growth_size: int,
drop_rate: float = 0.1,
act_fun: nn.Module = nn.ReLU,
use_bn: bool = True,
**kwargs,
):
super(DenseBlock, self).__init__()
for i in range(num_layers):
layer = DenseLayer(
n_in + i * growth_size,
growth_size=growth_size,
bn_factor=bn_factor,
drop_rate=drop_rate,
act_fun=act_fun,
use_bn=use_bn,
)
self.add_module("denselayer%d" % (i + 1), layer)
def forward(self, init_features: List[torch.Tensor]):
"""Forward-pass with layer output concatenation in the end."""
features = [init_features]
for name, layer in self.named_children():
new_features = layer(features)
features.append(new_features)
return torch.cat(features, 1)
[docs]class DenseModel(nn.Module):
"""Realisation of `'dense'` model.
Args:
n_in: Input dim.
n_out: Output dim.
block_config: List of number of layers within each block
drop_rate: Dropout rate for each layer separately or altogether.
num_init_features: If not none add fc layer before model with certain dim.
compression: portion of neuron to drop after block.
growth_size: Output dim of every layer.
bn_factor: Dim of intermediate fc is increased times `bn_factor` in DenseModel layer.
act_fun: Activation function.
use_bn: Use BatchNorm.
"""
def __init__(
self,
n_in: int,
n_out: int = 1,
block_config: List[int] = [2, 2],
drop_rate: Union[float, List[float]] = 0.1,
num_init_features: Optional[int] = None,
compression: float = 0.5,
growth_size: int = 256,
bn_factor: float = 2,
act_fun: nn.Module = nn.ReLU,
use_bn: bool = True,
**kwargs,
):
super(DenseModel, self).__init__()
assert 0 < compression <= 1, "compression of densenet should be between 0 and 1"
if isinstance(drop_rate, float):
drop_rate = [drop_rate] * len(block_config)
assert len(block_config) == len(drop_rate), "Wrong number hidden_sizes/drop_rates. Must be equal."
num_features = n_in if num_init_features is None else num_init_features
self.features = nn.Sequential(OrderedDict([]))
if num_init_features is not None:
self.features.add_module("dense0", nn.Linear(n_in, num_features))
for i, num_layers in enumerate(block_config):
block = DenseBlock(
num_layers=num_layers,
n_in=num_features,
bn_factor=bn_factor,
growth_size=growth_size,
drop_rate=drop_rate[i],
act_fun=act_fun,
use_bn=use_bn,
)
self.features.add_module("denseblock%d" % (i + 1), block)
num_features = num_features + num_layers * growth_size
if i != len(block_config) - 1:
trans = Transition(
n_in=num_features,
n_out=max(10, int(num_features * compression)),
act_fun=act_fun,
use_bn=use_bn,
)
self.features.add_module("transition%d" % (i + 1), trans)
num_features = max(10, int(num_features * compression))
if use_bn:
self.features.add_module("norm_final", nn.BatchNorm1d(num_features))
self.fc = nn.Linear(num_features, n_out)
[docs] def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward-pass."""
x = self.features(x)
x = torch.flatten(x, 1)
x = self.fc(x)
x = x.view(x.shape[0], -1)
return x
class ResNetBlock(nn.Module):
"""Realisation of `'resnet'` model block.
Args:
n_in: Input dim.
n_out: Output dim.
hid_factor: Dim of intermediate fc is increased times this factor in ResnetModel layer.
drop_rate: Dropout rates.
noise_std: Std of noise.
act_fun: Activation function.
use_bn: Use BatchNorm.
use_noise: Use noise.
device: Device to compute on.
"""
def __init__(
self,
n_in: int,
hid_factor: float,
n_out: int,
drop_rate: List[float] = [0.1, 0.1],
noise_std: float = 0.05,
act_fun: nn.Module = nn.ReLU,
use_bn: bool = True,
use_noise: bool = False,
device: torch.device = torch.device("cuda:0"),
**kwargs,
):
super(ResNetBlock, self).__init__()
self.features = nn.Sequential(OrderedDict([]))
if use_bn:
self.features.add_module("norm", nn.BatchNorm1d(n_in))
if use_noise:
self.features.add_module("noise", GaussianNoise(noise_std, device))
self.features.add_module("dense1", nn.Linear(n_in, int(hid_factor * n_in)))
self.features.add_module("act1", act_fun())
if drop_rate[0]:
self.features.add_module("drop1", nn.Dropout(p=drop_rate[0]))
self.features.add_module("dense2", nn.Linear(int(hid_factor * n_in), n_out))
if drop_rate[1]:
self.features.add_module("drop2", nn.Dropout(p=drop_rate[1]))
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward-pass."""
x = self.features(x)
return x
[docs]class ResNetModel(nn.Module):
"""The ResNet model from https://github.com/Yura52/rtdl.
Args:
n_in: Input dim.
n_out: Output dim.
hid_factor: Dim of intermediate fc is increased times this factor in ResnetModel layer.
drop_rate: Dropout rate for each layer separately or altogether.
noise_std: Std of noise.
act_fun: Activation function.
num_init_features: If not none add fc layer before model with certain dim.
use_bn: Use BatchNorm.
use_noise: Use noise.
device: Device to compute on.
"""
def __init__(
self,
n_in: int,
n_out: int = 1,
hid_factor: List[float] = [2, 2],
drop_rate: Union[float, List[float], List[List[float]]] = 0.1,
noise_std: float = 0.05,
act_fun: nn.Module = nn.ReLU,
num_init_features: Optional[int] = None,
use_bn: bool = True,
use_noise: bool = False,
device: torch.device = torch.device("cuda:0"),
**kwargs,
):
super(ResNetModel, self).__init__()
if isinstance(drop_rate, float):
drop_rate = [[drop_rate, drop_rate]] * len(hid_factor)
elif isinstance(drop_rate, list) and len(drop_rate) == 2:
drop_rate = [drop_rate] * len(hid_factor)
else:
assert (
len(drop_rate) == len(hid_factor) and len(drop_rate[0]) == 2
), "Wrong number hidden_sizes/drop_rates. Must be equal."
num_features = n_in if num_init_features is None else num_init_features
self.dense0 = nn.Linear(n_in, num_features) if num_init_features is not None else nn.Identity()
self.features1 = nn.Sequential(OrderedDict([]))
for i, hd_factor in enumerate(hid_factor):
block = ResNetBlock(
n_in=num_features,
hid_factor=hd_factor,
n_out=num_features,
drop_rate=drop_rate[i],
noise_std=noise_std,
act_fun=act_fun,
use_bn=use_bn,
use_noise=use_noise,
device=device,
)
self.features1.add_module("resnetblock%d" % (i + 1), block)
self.features2 = nn.Sequential(OrderedDict([]))
if use_bn:
self.features2.add_module("norm", nn.BatchNorm1d(num_features))
self.features2.add_module("act", act_fun())
self.fc = nn.Linear(num_features, n_out)
[docs] def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward-pass."""
x = self.dense0(x)
identity = x
for name, layer in self.features1.named_children():
if name != "resnetblock1":
x += identity
identity = x
x = layer(x)
x = self.features2(x)
x = self.fc(x)
return x.view(x.shape[0], -1)
[docs]class SNN(nn.Module):
"""Realisation of `'snn'` model.
Args:
n_in: Input dim.
n_out: Output dim.
hidden_size: List of hidden dims.
drop_rate: Dropout rate for each layer separately or altogether.
num_init_features: If not none add fc layer before model with certain dim.
"""
def __init__(
self,
n_in: int,
n_out: int,
hidden_size: List[int] = [512, 512, 512],
num_init_features: Optional[int] = None,
drop_rate: Union[float, List[float]] = 0.1,
**kwargs,
):
super().__init__()
if isinstance(drop_rate, float):
drop_rate = [drop_rate] * len(hidden_size)
num_features = n_in if num_init_features is None else num_init_features
layers = OrderedDict([])
if num_init_features is not None:
layers["dense-1"] = nn.Linear(n_in, num_features, bias=False)
for i in range(len(hidden_size) - 1):
layers[f"dense{i}"] = nn.Linear(num_features, hidden_size[i], bias=False)
layers[f"selu_{i}"] = nn.SELU()
if drop_rate[i]:
layers[f"dropout_{i}"] = nn.AlphaDropout(p=drop_rate[i])
num_features = hidden_size[i]
layers[f"dense_{i}"] = nn.Linear(hidden_size[i], n_out, bias=True)
self.network = nn.Sequential(layers)
self.reset_parameters()
[docs] def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward-pass."""
x = self.network(x)
x = x.view(x.shape[0], -1)
return x
[docs] def reset_parameters(self):
"""Init weights."""
for layer in self.network:
if not isinstance(layer, nn.Linear):
continue
nn.init.normal_(layer.weight, std=1 / np.sqrt(layer.out_features))
if layer.bias is not None:
fan_in, _ = nn.init._calculate_fan_in_and_fan_out(layer.weight)
bound = 1 / np.sqrt(fan_in)
nn.init.uniform_(layer.bias, -bound, bound)
"""Different Pooling strategies for sequence data."""
[docs]class SequenceAbstractPooler(nn.Module):
"""Abstract pooling class."""
def __init__(self):
super(SequenceAbstractPooler, self).__init__()
[docs] def forward(self, x: torch.Tensor, x_mask: torch.Tensor) -> torch.Tensor:
"""Forward-pass."""
raise NotImplementedError
def __call__(self, *args, **kwargs):
"""Forward-call."""
return self.forward(*args, **kwargs)
[docs]class SequenceClsPooler(SequenceAbstractPooler):
"""CLS token pooling."""
def __init__(self):
super(SequenceClsPooler, self).__init__()
[docs] def forward(self, x: torch.Tensor, x_mask: torch.Tensor) -> torch.Tensor:
"""Forward-pass."""
return x[..., 0, :]
[docs]class SequenceMaxPooler(SequenceAbstractPooler):
"""Max value pooling."""
def __init__(self):
super(SequenceMaxPooler, self).__init__()
[docs] def forward(self, x: torch.Tensor, x_mask: torch.Tensor) -> torch.Tensor:
"""Forward-pass."""
x = x.masked_fill(~x_mask, -float("inf"))
values, _ = torch.max(x, dim=-2)
return values
[docs]class SequenceSumPooler(SequenceAbstractPooler):
"""Sum value pooling."""
def __init__(self):
super(SequenceSumPooler, self).__init__()
[docs] def forward(self, x: torch.Tensor, x_mask: torch.Tensor) -> torch.Tensor:
"""Forward-pass."""
x = x.masked_fill(~x_mask, 0)
values = torch.sum(x, dim=-2)
return values
[docs]class SequenceAvgPooler(SequenceAbstractPooler):
"""Mean value pooling."""
def __init__(self):
super(SequenceAvgPooler, self).__init__()
[docs] def forward(self, x: torch.Tensor, x_mask: torch.Tensor) -> torch.Tensor:
"""Forward-pass."""
x = x.masked_fill(~x_mask, 0)
x_active = torch.sum(x_mask, dim=-2)
x_active = x_active.masked_fill(x_active == 0, 1)
values = torch.sum(x, dim=-2) / x_active.data
return values
[docs]class SequenceIndentityPooler(SequenceAbstractPooler):
"""Identity pooling."""
def __init__(self):
super(SequenceIndentityPooler, self).__init__()
[docs] def forward(self, x: torch.Tensor, x_mask: torch.Tensor) -> torch.Tensor:
"""Forward-pass."""
return x
class NODE(nn.Module):
"""The NODE model from https://github.com/Qwicen.
Args:
n_in: Input dim.
n_out: Output dim.
layer_dim: num trees in one layer.
num_layers: number of forests.
tree_dim: number of response channels in the response of individual tree.
use_original_head use averaging as a head or put linear layer instead.
depth: number of splits in every tree.
drop_rate: Dropout rate for each layer altogether.
act_fun: Activation function.
num_init_features: If not none add fc layer before model with certain dim.
use_bn: Use BatchNorm.
"""
def __init__(
self,
n_in: int,
n_out: int = 1,
layer_dim: int = 2048,
num_layers: int = 1,
tree_dim: int = 1,
use_original_head: bool = False,
depth: int = 6,
drop_rate: float = 0.0,
act_fun: nn.Module = nn.ReLU,
num_init_features: Optional[int] = None,
use_bn: bool = True,
**kwargs,
):
super(NODE, self).__init__()
num_features = n_in if num_init_features is None else num_init_features
self.dense0 = nn.Linear(n_in, num_features) if num_init_features is not None else nn.Identity()
self.features1 = nn.Sequential(OrderedDict([]))
block = DenseODSTBlock(
input_dim=num_features,
layer_dim=layer_dim,
num_layers=num_layers,
tree_dim=tree_dim if not use_original_head else n_out,
depth=depth,
input_dropout=drop_rate,
flatten_output=not use_original_head,
)
self.features1.add_module("ODSTForestblock%d", block)
self.features2 = nn.Sequential(OrderedDict([]))
if use_original_head:
last_layer = MeanPooling(n_out, dim=-2)
self.features2.add_module("head", last_layer)
else:
if use_bn:
self.features2.add_module("norm", nn.BatchNorm1d(layer_dim * num_layers * tree_dim))
self.features2.add_module("act", act_fun())
fc = nn.Linear(layer_dim * num_layers * tree_dim, n_out)
self.features2.add_module("fc", fc)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward-pass."""
x = self.dense0(x)
x = self.features1(x)
x = self.features2(x)
return x.view(x.shape[0], -1)