"""Deep Learning transformers for calculating sentence embeddings."""
import gc
from copy import deepcopy
from typing import Any
from typing import Dict
from typing import Optional
from typing import Sequence
import numpy as np
import torch
import torch.nn as nn
from sklearn.base import TransformerMixin
from torch.utils.data import DataLoader
from tqdm import tqdm
try:
from transformers import AutoModel
except:
import warnings
warnings.warn("'transformers' - package isn't installed")
from ..ml_algo.torch_based.nn_models import SequenceAvgPooler
from ..ml_algo.torch_based.nn_models import SequenceClsPooler
from ..ml_algo.torch_based.nn_models import SequenceIndentityPooler
from ..ml_algo.torch_based.nn_models import SequenceMaxPooler
from ..ml_algo.torch_based.nn_models import SequenceSumPooler
from .dp_utils import CustomDataParallel
from .utils import _dtypes_mapping
from .utils import collate_dict
from .utils import parse_devices
from .utils import seed_everything
from .utils import single_text_hash
pooling_by_name = {
"mean": SequenceAvgPooler,
"sum": SequenceSumPooler,
"max": SequenceMaxPooler,
"cls": SequenceClsPooler,
"none": SequenceIndentityPooler,
}
def position_encoding_init(n_pos: int, embed_size: int) -> torch.Tensor:
"""Compute positional embedding matrix.
Args:
n_pos: Len of sequence.
embed_size: Size of output sentence embedding.
Returns:
Torch tensor with all positional embeddings.
"""
position_enc = np.array(
[
[pos / np.power(10000, 2 * (j // 2) / embed_size) for j in range(embed_size)]
if pos != 0
else np.zeros(embed_size)
for pos in range(n_pos)
]
)
position_enc[1:, 0::2] = np.sin(position_enc[1:, 0::2])
position_enc[1:, 1::2] = np.cos(position_enc[1:, 1::2])
return torch.from_numpy(position_enc).float()
[docs]class BOREP(nn.Module):
"""Class to compute Bag of Random Embedding Projections sentence embeddings from words embeddings.
Bag of Random Embedding Projections sentence embeddings.
Args:
embed_size: Size of word embeddings.
proj_size: Size of output sentence embedding.
pooling: Pooling type.
max_length: Maximum length of sentence.
init: Type of weight initialization.
pos_encoding: Add positional embedding.
**kwargs: Ignored params.
Note:
There are several pooling types:
- `'max'`: Maximum on seq_len dimension for non masked inputs.
- `'mean'`: Mean on seq_len dimension for non masked inputs.
- `'sum'`: Sum on seq_len dimension for non masked inputs.
For init parameter there are several options:
- `'orthogonal'`: Orthogonal init.
- `'normal'`: Normal with std 0.1.
- `'uniform'`: Uniform from -0.1 to 0.1.
- `'kaiming'`: Uniform kaiming init.
- `'xavier'`: Uniform xavier init.
"""
name = "BOREP"
_poolers = {"max", "mean", "sum"}
def __init__(
self,
embed_size: int = 300,
proj_size: int = 300,
pooling: str = "mean",
max_length: int = 200,
init: str = "orthogonal",
pos_encoding: bool = False,
**kwargs: Any
):
super(BOREP, self).__init__()
self.embed_size = embed_size
self.proj_size = proj_size
self.pos_encoding = pos_encoding
seed_everything(42)
if self.pos_encoding:
self.pos_code = position_encoding_init(max_length, self.embed_size).view(1, max_length, self.embed_size)
self.pooling = pooling_by_name[pooling]()
self.proj = nn.Linear(self.embed_size, self.proj_size, bias=False)
if init == "orthogonal":
nn.init.orthogonal_(self.proj.weight)
elif init == "normal":
nn.init.normal_(self.proj.weight, std=0.1)
elif init == "uniform":
nn.init.uniform_(self.proj.weight, a=-0.1, b=0.1)
elif init == "kaiming":
nn.init.kaiming_uniform_(self.proj.weight)
elif init == "xavier":
nn.init.xavier_uniform_(self.proj.weight)
[docs] def get_out_shape(self) -> int:
"""Output shape.
Returns:
Int with module output shape.
"""
return self.proj_size
[docs] def get_name(self) -> str:
"""Module name.
Returns:
String with module name.
"""
return self.name
@torch.no_grad()
def forward(self, inp: Dict[str, torch.Tensor]) -> torch.Tensor:
"""Forward-pass."""
x = inp["text"]
batch_size, batch_max_length = x.shape[0], x.shape[1]
if self.pos_encoding:
x = x + self.pos_code[:, :batch_max_length, :].to(x.device)
x = x.contiguous().view(batch_size * batch_max_length, -1)
x = self.proj(x)
out = x.contiguous().view(batch_size, batch_max_length, -1)
x_length = (torch.arange(out.shape[1])[None, :].to(out.device) < inp["length"][:, None])[:, :, None]
out = self.pooling(out, x_length)
return out
[docs]class RandomLSTM(nn.Module):
"""Class to compute Random LSTM sentence embeddings from words embeddings.
Args:
embed_size: Size of word embeddings.
hidden_size: Size of hidden dimensions of LSTM.
pooling: Pooling type.
num_layers: Number of lstm layers.
**kwargs: Ignored params.
Note:
There are several pooling types:
- `'max'`: Maximum on seq_len dimension for non masked inputs.
- `'mean'`: Mean on seq_len dimension for non masked inputs.
- `'sum'`: Sum on seq_len dimension for non masked inputs.
"""
name = "RandomLSTM"
_poolers = ("max", "mean", "sum")
def __init__(
self, embed_size: int = 300, hidden_size: int = 256, pooling: str = "mean", num_layers: int = 1, **kwargs: Any
):
super(RandomLSTM, self).__init__()
if pooling not in self._poolers:
raise ValueError("pooling - {} - not in the list of available types {}".format(pooling, self._poolers))
seed_everything(42)
self.hidden_size = hidden_size
self.lstm = nn.LSTM(
embed_size,
hidden_size,
num_layers=num_layers,
bidirectional=True,
batch_first=True,
)
self.pooling = pooling_by_name[pooling]()
[docs] def get_out_shape(self) -> int:
"""Output shape.
Returns:
Int with module output shape.
"""
return self.hidden_size * 2
[docs] def get_name(self) -> str:
"""Module name.
Returns:
String with module name.
"""
return self.name
@torch.no_grad()
def forward(self, inp: Dict[str, torch.Tensor]) -> torch.Tensor:
"""Forward-pass."""
out, _ = self.lstm(inp["text"])
x_length = (torch.arange(out.shape[1])[None, :].to(out.device) < inp["length"][:, None])[:, :, None]
out = self.pooling(out, x_length)
return out
[docs]class BertEmbedder(nn.Module):
"""Class to compute `HuggingFace <https://huggingface.co>`_ transformers words or sentence embeddings.
Bert sentence or word embeddings.
Args:
model_name: Name of transformers model.
pooling: Pooling type.
**kwargs: Ignored params.
Note:
There are several pooling types:
- `'cls'`: Use CLS token for sentence embedding
from last hidden state.
- `'max'`: Maximum on seq_len dimension
for non masked inputs from last hidden state.
- `'mean'`: Mean on seq_len dimension for non masked
inputs from last hidden state.
- `'sum'`: Sum on seq_len dimension for non masked inputs
from last hidden state.
- `'none'`: Don't use pooling (for RandomLSTM pooling strategy).
"""
name = "BertEmb"
_poolers = {"cls", "max", "mean", "sum", "none"}
def __init__(self, model_name: str, pooling: str = "none", **kwargs: Any):
super(BertEmbedder, self).__init__()
if pooling not in self._poolers:
raise ValueError("pooling - {} - not in the list of available types {}".format(pooling, self._poolers))
self.pooling = pooling_by_name[pooling]()
self.model_name = model_name
self.transformer = AutoModel.from_pretrained(model_name)
[docs] def forward(self, inp: Dict[str, torch.Tensor]) -> torch.Tensor:
"""Forward-pass."""
encoded_layers, _ = self.transformer(
input_ids=inp["input_ids"],
attention_mask=inp["attention_mask"],
token_type_ids=inp.get("token_type_ids"),
return_dict=False,
)
encoded_layers = self.pooling(encoded_layers, inp["attention_mask"].unsqueeze(-1).bool())
return encoded_layers
[docs] def freeze(self):
"""Freeze module parameters."""
for param in self.transformer.parameters():
param.requires_grad = False
[docs] def get_name(self) -> str:
"""Module name.
Returns:
String with module name.
"""
return self.name + single_text_hash(self.model_name)
[docs] def get_out_shape(self) -> int:
"""Output shape.
Returns:
Int with module output shape.
"""
return self.transformer.config.hidden_size