"""Tabular data utils."""
import os
import warnings
from copy import copy
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Union
import numpy as np
import pandas as pd
from joblib import Parallel
from joblib import delayed
from pandas import DataFrame
from sqlalchemy import create_engine
def get_filelen(fname: str) -> int:
"""Get length of csv file.
Args:
fname: File name.
Returns:
Length of file.
"""
cnt_lines = -1
with open(fname, "rb") as fin:
for line in fin:
if len(line.strip()) > 0:
cnt_lines += 1
return cnt_lines
def get_batch_ids(arr, batch_size):
"""Generator of batched sequences.
Args:
arr: Sequense.
batch_size: Batch size.
Yields:
Sequential batches.
"""
n = 0
while n < len(arr):
yield arr[n : n + batch_size]
n += batch_size
def get_file_offsets(
file: str, n_jobs: Optional[int] = None, batch_size: Optional[int] = None
) -> Tuple[List[int], List[int]]:
"""Get file offsets.
Args:
file: File path.
n_jobs: Number of jobs for multiprocessing.
batch_size: Batch size.
Returns:
Offsets tuple.
"""
assert n_jobs is not None or batch_size is not None, "One of n_jobs or batch size should be defined"
lens = []
with open(file, "rb") as f:
# skip header
header_len = len(f.readline())
# get row lens
length = 0
for row in f:
if len(row.strip()) > 0:
lens.append(length)
length += len(row)
lens = np.array(lens, dtype=np.int64) + header_len
if batch_size:
indexes = list(get_batch_ids(lens, batch_size))
else:
indexes = np.array_split(lens, n_jobs)
offsets = [x[0] for x in indexes]
cnts = [x.shape[0] for x in indexes]
return offsets, cnts
def _check_csv_params(**read_csv_params: dict):
"""Validate parameters for func `read_csv`.
Args:
**read_csv_params: Read parameters.
Returns:
New parameters.
"""
for par in ["skiprows", "nrows", "index_col", "header", "names", "chunksize"]:
if par in read_csv_params:
read_csv_params.pop(par)
warnings.warn(
"Parameter {0} will be ignored in parallel mode".format(par),
UserWarning,
)
return read_csv_params
def read_csv_batch(file: str, offset, cnt, **read_csv_params):
"""Read batch of data from csv.
Args:
file: File path.
offset: Start of file.
cnt: Number of rows to read.
**read_csv_params: Handler parameters.
Returns:
Read data.
"""
read_csv_params = copy(read_csv_params)
if read_csv_params is None:
read_csv_params = {}
try:
usecols = read_csv_params.pop("usecols")
except KeyError:
usecols = None
header = pd.read_csv(file, nrows=0, **read_csv_params).columns
with open(file, "rb") as f:
f.seek(offset)
data = pd.read_csv(f, header=None, names=header, chunksize=None, nrows=cnt, usecols=usecols, **read_csv_params)
return data
def read_csv(file: str, n_jobs: int = 1, **read_csv_params) -> DataFrame:
"""Read data from csv.
Args:
file: File path.
n_jobs: Number of workers.
**read_csv_params: Handler parameters.
Returns:
Read data.
"""
if n_jobs == 1:
return pd.read_csv(file, **read_csv_params)
if n_jobs == -1:
n_jobs = os.cpu_count()
_check_csv_params(**read_csv_params)
offsets, cnts = get_file_offsets(file, n_jobs)
with Parallel(n_jobs) as p:
res = p(
delayed(read_csv_batch)(file, offset=offset, cnt=cnt, **read_csv_params)
for (offset, cnt) in zip(offsets, cnts)
)
res = pd.concat(res, ignore_index=True)
return res
[docs]class Batch:
"""Class to wraps batch of data in different formats.
Default - batch of DataFrame.
"""
@property
def data(self) -> DataFrame:
"""Get data from Batch object.
Returns:
Data.
"""
return self._data
def __init__(self, data):
self._data = data
[docs]class FileBatch(Batch):
"""Batch of csv file.
Args:
file: File path.
offset: File start.
cnt: Number of rows to read.
read_csv_params: Additional params to :func:`pandas.read_csv`.
"""
@property
def data(self) -> DataFrame:
"""Get data from Batch object.
Returns:
Read data.
"""
data_part = read_csv_batch(self.file, cnt=self.cnt, offset=self.offset, **self.read_csv_params)
return data_part
def __init__(self, file, offset, cnt, read_csv_params):
self.file = file
self.offset = offset
self.cnt = cnt
self.read_csv_params = read_csv_params
[docs]class BatchGenerator:
"""Abstract - generator of batches from data.
Args:
batch_size: Batch size. Default is ``None``, split by `n_jobs`.
n_jobs: Number of processes to handle.
"""
def __init__(self, batch_size, n_jobs):
if n_jobs == -1:
n_jobs = os.cpu_count()
self.n_jobs = n_jobs
self.batch_size = batch_size
def __getitem__(self, idx) -> Batch:
raise NotImplementedError
def __len__(self) -> int:
raise NotImplementedError
[docs]class DfBatchGenerator(BatchGenerator):
"""Batch generator from :class:`~pandas.DataFrame`.
Args:
data: Data used for generator.
n_jobs: Number of processes to handle.
batch_size: Batch size. Default is ``None``, split by `n_jobs`.
"""
def __init__(self, data: DataFrame, n_jobs: int = 1, batch_size: Optional[int] = None):
super().__init__(batch_size, n_jobs)
self.data = data
if self.batch_size is not None:
self.idxs = list(get_batch_ids(np.arange(data.shape[0]), batch_size))
else:
self.idxs = [x for x in np.array_split(np.arange(data.shape[0]), n_jobs) if len(x) > 0]
def __len__(self) -> int:
if self.batch_size is not None:
return int(np.ceil(self.data.shape[0] / self.batch_size))
return int(self.n_jobs)
def __getitem__(self, idx):
return Batch(self.data.iloc[self.idxs[idx]])
[docs]class FileBatchGenerator(BatchGenerator):
"""Generator of batches from file.
Args:
file: File path.
n_jobs: Number of processes to handle.
batch_size: Batch size. Default is ``None``, split by `n_jobs`.
read_csv_params: Params of reading csv file.
Look for :func:`pandas.read_csv` params.
"""
def __init__(
self,
file,
n_jobs: int = 1,
batch_size: Optional[int] = None,
read_csv_params: dict = None,
):
super().__init__(batch_size, n_jobs)
self.file = file
self.offsets, self.cnts = get_file_offsets(file, n_jobs, batch_size)
if read_csv_params is None:
read_csv_params = {}
self.read_csv_params = read_csv_params
def __len__(self) -> int:
return len(self.cnts)
def __getitem__(self, idx):
return FileBatch(self.file, self.offsets[idx], self.cnts[idx], self.read_csv_params)
class SqlDataSource:
"""Data wrapper for SQL connection.
Args:
connection_string: database url; for reference see
https://docs.sqlalchemy.org/en/13/core/engines.html#database-urls
query: SQL query to obtain data from
index: optional index column to be removed from the query result; can be None, str of List[str]
"""
def __init__(
self,
connection_string: str,
query: str,
index: Optional[Union[str, List[str]]] = None,
):
self.engine = create_engine(connection_string)
self.query = query
self.index = index
self._data = None
@property
def data(self):
"""Get data associated with the query as :class:`~pandas.DataFrame`.
Returns:
:class:`~pandas.DataFrame`
"""
if self._data is None:
with self.engine.begin() as conn:
self._data = pd.read_sql(self.query, conn, index_col=self.index)
return self._data
def get_batch_generator(self, n_jobs: int = 1, batch_size: int = None):
"""Access data with batch generator.
Args:
n_jobs: Number of processes to read file.
batch_size: Number of entries in one batch.
Returns:
DfBatchGenerator object
"""
return DfBatchGenerator(self.data, n_jobs, batch_size)
ReadableToDf = Union[str, np.ndarray, DataFrame, Dict[str, np.ndarray], Batch]
[docs]def read_data(
data: ReadableToDf,
features_names: Optional[Sequence[str]] = None,
n_jobs: int = 1,
read_csv_params: Optional[dict] = None,
) -> Tuple[DataFrame, Optional[dict]]:
"""Get :class:`~pandas.DataFrame` from different data formats.
Note:
Supported now data formats:
- Path to ``.csv``, ``.parquet``, ``.feather`` files.
- :class:`~numpy.ndarray`, or dict of :class:`~numpy.ndarray`.
For example, ``{'data': X...}``. In this case,
roles are optional, but `train_features`
and `valid_features` required.
- :class:`pandas.DataFrame`.
Args:
data: Readable to DataFrame data.
features_names: Optional features names if ``numpy.ndarray``.
n_jobs: Number of processes to read file.
read_csv_params: Params to read csv file.
Returns:
Tuple with read data and new roles mapping.
"""
if read_csv_params is None:
read_csv_params = {}
# case - new process
if isinstance(data, Batch):
return data.data, None
if isinstance(data, DataFrame):
return data, None
# case - single array passed to inference
if isinstance(data, np.ndarray):
return DataFrame(data, columns=features_names), None
# case - dict of array args passed
if isinstance(data, dict):
df = DataFrame(data["data"], columns=features_names)
upd_roles = {}
for k in data:
if k != "data":
name = "__{0}__".format(k.upper())
assert name not in df.columns, "Not supported feature name {0}".format(name)
df[name] = data[k]
upd_roles[k] = name
return df, upd_roles
if isinstance(data, str):
if data.endswith(".feather"):
# TODO: check about feather columns arg
data = pd.read_feather(data)
if read_csv_params["usecols"] is not None:
data = data[read_csv_params["usecols"]]
return data, None
if data.endswith(".parquet"):
return pd.read_parquet(data, columns=read_csv_params["usecols"]), None
else:
return read_csv(data, n_jobs, **read_csv_params), None
if isinstance(data, SqlDataSource):
return data.data, None
raise ValueError("Input data format is not supported")
[docs]def read_batch(
data: ReadableToDf,
features_names: Optional[Sequence[str]] = None,
n_jobs: int = 1,
batch_size: Optional[int] = None,
read_csv_params: Optional[dict] = None,
) -> Iterable[BatchGenerator]:
"""Read data for inference by batches for simple tabular data.
Note:
Supported now data formats:
- Path to ``.csv``, ``.parquet``, ``.feather`` files.
- :class:`~numpy.ndarray`, or dict of :class:`~numpy.ndarray`.
For example, ``{'data': X...}``. In this case,
roles are optional, but `train_features`
and `valid_features` required.
- :class:`pandas.DataFrame`.
Args:
data: Readable to DataFrame data.
features_names: Optional features names if ``numpy.ndarray``.
n_jobs: Number of processes to read file.
batch_size: Batch size.
read_csv_params: Params to read csv file.
Returns:
Generator of batches.
"""
if read_csv_params is None:
read_csv_params = {}
if isinstance(data, DataFrame):
return DfBatchGenerator(data, n_jobs=n_jobs, batch_size=batch_size)
# case - single array passed to inference
if isinstance(data, np.ndarray):
return DfBatchGenerator(
DataFrame(data, columns=features_names),
n_jobs=n_jobs,
batch_size=batch_size,
)
if isinstance(data, str):
if not (data.endswith(".feather") or data.endswith(".parquet")):
return FileBatchGenerator(
data, n_jobs, batch_size, read_csv_params
) # read_csv(data, n_jobs, **read_csv_params)
else:
data, _ = read_data(data, features_names, n_jobs, read_csv_params)
return DfBatchGenerator(data, n_jobs=n_jobs, batch_size=batch_size)
if isinstance(data, SqlDataSource):
return data.get_batch_generator(n_jobs, batch_size)
raise ValueError("Data type not supported")