"""Timer."""
import logging
from time import time
from typing import List
from typing import Optional
from typing import Union
import numpy as np
from .logging import DuplicateFilter
logger = logging.getLogger(__name__)
logger.addFilter(DuplicateFilter())
[docs]class Timer:
"""Timer to limit the duration tasks."""
_timeout = 1e10
_overhead = 0
_mode = 1
def __init__(self):
self.start_time = None
@property
def time_left(self) -> float:
if self.time_spent is not None:
return self.timeout - self.time_spent
return None
@property
def time_spent(self) -> float:
if self.start_time is not None:
return time() - self.start_time
return None
@property
def perc_left(self) -> float:
if self.time_left is not None:
return self.time_left / self.timeout
return None
@property
def perc_spent(self) -> float:
if self.time_spent is not None:
return self.time_spent / self.timeout
return None
@property
def timeout(self) -> float:
return self._timeout
def time_limit_exceeded(self) -> bool:
if self._mode == 0:
return False
if self._mode == 1:
return self.time_left < 0
if self._mode == 2:
if self.time_left:
return (self.time_left - self._overhead) < 0
return None
def start(self):
self.start_time = time()
return self
[docs]class PipelineTimer(Timer):
"""Timer is used to control time over full automl run.
It decides how much time spend to each algo
Args:
timeout: Maximum amount of time that AutoML can run.
overhead: (0, 1) - Rate of time that will be used to early stop.
Ex. if set to `0.1` and timing mode is set to 2,
timer will finish tasks after `0.9` of all time spent.
mode: Timing mode. Can be 0, 1 or 2.
Keep in mind - all time limitations will
turn on after at least single model/single fold will be computed.
tuning_rate: Approximate fraction of all time will be used for tuning.
Note:
Modes explanation:
- 0 - timer is used to estimate runtime,
but if something goes out of time,
keep it run (Real life mode).
- 1 - timer is used to terminate tasks,
but do it after real timeout (Trade off mode).
- 2 - timer is used to terminate tasks
with the goal to be exactly
in time (Benchmarking/competitions mode).
"""
def __init__(
self,
timeout: Optional[float] = None,
overhead: float = 0.1,
mode: int = 1,
tuning_rate: float = 0.7,
):
if timeout is not None:
self._timeout = timeout
self._task_scores = 0
self._rate_overhead = overhead
self._overhead = overhead * self.timeout
self.run_info = {}
self.run_scores = {}
self._mode = mode
self.tuning_rate = tuning_rate
self.child_out_of_time = False
def add_task(self, score: float = 1.0):
self._task_scores += score
def close_task(self, score: float = 1.0):
self._task_scores -= score
def get_time_for_next_task(self, score: float = 1.0):
if round(self._task_scores, 3) == 0:
return self.time_left
return (self.time_left - self._overhead) * (score / self._task_scores)
def get_task_timer(self, key: Optional[str] = None, score: float = 1.0) -> "TaskTimer":
return TaskTimer(self, key, score, self._rate_overhead, self._mode, self.tuning_rate)
[docs]class TaskTimer(Timer):
"""Timer is used to control time over single ML task run.
It decides how much time is ok to spend on tuner
and if we have enough time to calc more folds.
Args:
pipe_timer: Global automl timer.
key: String name that will be associated with this task.
score: Time score for current task.
For ex. if you want to give more
of total time to task set it > 1.
overhead: See overhead of :class:`~lightautoml.utils.timer.PipelineTimer`.
mode: See mode for :class:`~lightautoml.utils.timer.PipelineTimer`.
default_tuner_time_rate: If no timing history for the moment
of estimating tuning time,
timer will use this rate of `time_left`.
"""
@property
def in_progress(self) -> bool:
"""Check if the task is running."""
return self.start_time is not None
def __init__(
self,
pipe_timer: PipelineTimer,
key: Optional[str] = None,
score: float = 1.0,
overhead: Optional[float] = 1,
mode: int = 1,
default_tuner_time_rate: float = 0.7,
):
self.score = score
pipe_timer.add_task(self.score)
self.pipe_timer = pipe_timer
self.start_time = None
self.key = key
self._rate_overhead = overhead
self._mode = mode
self.default_tuner_rate = default_tuner_time_rate
[docs] def start(self):
"""Starts counting down.
Returns:
self.
"""
if self.in_progress:
return self
self.start_time = time()
self._timeout = self.pipe_timer.get_time_for_next_task(self.score)
self._overhead = self._rate_overhead * self.time_left
self.pipe_timer.close_task(self.score)
return self
[docs] def set_control_point(self):
"""Set control point.
Updates the countdown and time left parameters.
"""
self._timeout = self.timeout - self.time_spent
self.start_time = time()
[docs] def write_run_info(self):
"""Collect timer history."""
if self.key in self.pipe_timer.run_info:
self.pipe_timer.run_info[self.key].append(self.time_spent)
self.pipe_timer.run_scores[self.key].append(self.score)
else:
self.pipe_timer.run_info[self.key] = [self.time_spent]
self.pipe_timer.run_scores[self.key] = [self.score]
[docs] def get_run_results(self) -> Union[None, np.ndarray]:
"""Get timer history.
Returns:
``None`` if there is no history, or array with history of runs.
"""
if self.key in self.pipe_timer.run_info:
return self.pipe_timer.run_info[self.key]
[docs] def get_run_scores(self) -> Union[None, np.ndarray]:
"""Get timer scores.
Returns:
``None`` if there is no scores, or array with scores of runs.
"""
if self.key in self.pipe_timer.run_scores:
return self.pipe_timer.run_scores[self.key]
[docs] def estimate_folds_time(self, n_folds: int = 1) -> Optional[float]:
"""Estimate time for n_folds.
Args:
n_folds: Number of folds.
Returns:
Estimated time needed to run all `n_folds`.
"""
run_results, run_scores = self.get_run_results(), self.get_run_scores()
if run_results is None:
if self._mode > 0:
return None
# case - at least one algo runs before and timer mode set to 0 (conservative mode)
total_run_info, total_run_scores = [], []
for k in self.pipe_timer.run_info:
total_run_info.extend(self.pipe_timer.run_info[k])
total_run_scores.extend(self.pipe_timer.run_scores[k])
if len(total_run_info) == 0:
return None
single_run_est = np.array(total_run_info).sum() / np.array(total_run_scores).sum() * self.score
return single_run_est * n_folds
# case - algo runs at least ones
if self._mode > 0:
single_run_est = np.max(np.array(run_results) / np.array(run_scores))
else:
single_run_est = np.mean(np.array(run_results) / np.array(run_scores))
single_run_est = single_run_est * self.score
return single_run_est * n_folds
[docs] def estimate_tuner_time(self, n_folds: int = 1) -> float:
"""Estimates time that is ok to spend on tuner.
Args:
n_folds: Number of folds.
Returns:
How much time timer will be able spend on tuner.
"""
folds_est = self.estimate_folds_time(n_folds)
if folds_est is None:
if self.time_left:
return self.default_tuner_rate * self.time_left
else:
return None
return self.time_left - folds_est
[docs] def time_limit_exceeded(self) -> bool:
"""Estimate time limit and send results to parent timer.
Returns:
``True`` if time limit exceeded.
"""
out_of_time = super().time_limit_exceeded()
if out_of_time:
self.pipe_timer.child_out_of_time = True
return out_of_time
def __copy__(self):
proxy_timer = PipelineTimer().start()
logger.warning(
"Copying TaskTimer may affect the parent PipelineTimer, so copy will create new unlimited TaskTimer"
)
return proxy_timer.get_task_timer(self.key)
def __deepcopy__(self, *args, **kwargs):
return self.__copy__()
[docs] def split_timer(self, n_parts: int) -> List["TaskTimer"]:
"""Split the timer into equal-sized tasks.
Args:
n_parts: Number of tasks.
Returns:
Timers for each tasks.
"""
new_tasks_score = self.score / n_parts
timers = [self.pipe_timer.get_task_timer(self.key, new_tasks_score) for _ in range(n_parts)]
self.pipe_timer.close_task(self.score)
return timers