# The MIT License (MIT) # Copyright (c) 2014-2017 University of Bristol
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
# OR OTHER DEALINGS IN THE SOFTWARE.
"""
Module for dealing with time intervals containing TimeInterval, TimeIntervals, and RelativeTimeInterval
"""
from .utils import MIN_DATE, MAX_DATE, utcnow, UTC, Printable, get_timedelta, is_naive, remove_microseconds
from datetime import date, datetime, timedelta
import ciso8601
from collections import namedtuple
import arrow
import json
[docs]def profile(ob):
"""
Comment out this function to be able to use the line_profiler module. e.g. call:
kernprof -l scripts/deploy_summariser.py --loglevel=10
python -m line_profiler deploy_summariser.py.lprof > deploy_summariser.py.summary
:param ob: object
:return: object
"""
return ob
[docs]class TimeIntervals(Printable):
"""
Container class for time intervals, that manages splitting and joining
Example object: (t1,t2] U (t3,t4] U ...
"""
@profile
def __init__(self, intervals=None):
"""
Initialise the object with the given intervals.
These should be in a format that can be parsed by parse_time_tuple
:param intervals: The time intervals
"""
self.intervals = self.parse(intervals)
# @profile
def __str__(self):
return " U ".join(map(str, self.intervals)) if self.intervals else "[]"
# @profile
def __repr__(self):
return "{}([{}])".format(self.__class__.__name__, ", ".join(map(repr, self.intervals)))
[docs] @profile
def parse(self, intervals):
parsed = []
if intervals:
for v in intervals:
if isinstance(v, (tuple, list)):
if len(v) != 2:
raise TypeError()
v = parse_time_tuple(*v)
elif isinstance(v, TimeInterval):
v = TimeInterval(v.start, v.end)
else:
raise TypeError("Expected tuple/list/TimeInterval ({} given)".format(type(v)))
# try:
# v = parse_time_tuple(*v)
# except Exception as e:
# import logging
# logging.debug(e)
# if isinstance(v, TimeInterval):
# v = TimeInterval(v.start, v.end)
# else:
# raise TypeError("Expected tuple/list/TimeInterval ({} given)".format(type(v)))
parsed.append(v)
return parsed
@property
# @profile
def is_empty(self):
return len(self.intervals) == 0
@property
# @profile
def start(self):
return min(self.intervals, key=lambda x: x.start).start if self.intervals else None
@property
# @profile
def end(self):
return max(self.intervals, key=lambda x: x.end).end if self.intervals else None
@property
# @profile
def span(self):
return TimeInterval(self.start, self.end) if self.intervals else None
@property
# @profile
def humanized(self):
return " U ".join(map(lambda x: x.humanized, self.intervals)) if self.intervals else "Empty"
[docs] @profile
def split(self, points):
if len(points) == 0:
return
p = points[-1]
for i in range(len(self.intervals)):
if (self.intervals[i].start < p) and (self.intervals[i].end > p):
self.intervals = self.intervals[:i] \
+ [TimeInterval(self.intervals[i].start, p), TimeInterval(p, self.intervals[i].end)] \
+ self.intervals[(i + 1):]
self.split(points[:-1])
# @profile
[docs] def compress(self):
if len(self.intervals) == 0:
return
v = self.intervals[:1]
for i in range(1, len(self.intervals)):
if self.intervals[i].start == v[-1].end:
v[-1] = TimeInterval(v[-1].start, self.intervals[i].end)
else:
v.append(self.intervals[i])
self.intervals = v
@profile
def __add__(self, other):
if isinstance(other, TimeInterval):
if self.is_empty:
return TimeIntervals([other])
if other.start > self.end:
self.intervals.append(other)
elif other.end < self.start:
self.intervals.insert(0, other)
elif other.start == self.end:
self.intervals[-1].end = other.end
elif other.end == self.start:
self.intervals[0].start = other.start
else:
return self + TimeIntervals([other])
return self
if self.is_empty:
return TimeIntervals(other.intervals)
if other.is_empty:
return TimeIntervals(self.intervals)
self_points = [point for interval in self.intervals for point in (interval.start, interval.end)]
other_points = [point for interval in other.intervals for point in (interval.start, interval.end)]
self.split(other_points)
other.split(self_points)
v = list(set(self.intervals).union(set(other.intervals)))
v.sort(key=lambda ii: ii.start)
new = TimeIntervals(v)
self.compress()
other.compress()
new.compress()
return new
@profile
def __sub__(self, other):
if self == other:
return TimeIntervals([])
if not other:
other = TimeIntervals([])
self_points = [point for interval in self.intervals for point in (interval.start, interval.end)]
other_points = [point for interval in other.intervals for point in (interval.start, interval.end)]
self.split(other_points)
other.split(self_points)
v = list(set(self.intervals).difference(set(other.intervals)))
v.sort(key=lambda ii: ii.start)
new = TimeIntervals(v)
self.compress()
other.compress()
new.compress()
return new
# @profile
def __eq__(self, other):
return isinstance(other, TimeIntervals) \
and len(self.intervals) == len(other.intervals) \
and all(z[0] == z[1] for z in zip(self.intervals, other.intervals))
# @profile
def __ne__(self, other):
return not self == other
# @profile
def __iter__(self):
return iter(sorted(self.intervals))
# @profile
def __getitem__(self, key):
if isinstance(key, slice):
items = self.intervals[key]
if isinstance(items, TimeInterval):
return items
return TimeIntervals(items)
return self.intervals[key]
# @profile
def __bool__(self):
return self.intervals is not None and len(self.intervals) > 0
__nonzero__ = __bool__
# @profile
def __len__(self):
return len(self.intervals)
[docs] def to_json(self):
return json.dumps({'intervals': self.intervals})
[docs]class TimeInterval(namedtuple("TimeInterval", "start end")):
"""
Time interval object.
Thin wrapper around a (start, end) tuple of datetime objects that provides some validation
"""
[docs] @classmethod
# @profile
def all_time(cls):
return TimeInterval(MIN_DATE, MAX_DATE)
[docs] @classmethod
# @profile
def up_to_now(cls):
return TimeInterval(MIN_DATE, utcnow())
[docs] @classmethod
# @profile
def now_minus(cls, weeks=0, days=0, hours=0, minutes=0, seconds=0, milliseconds=0):
delta = timedelta(weeks=weeks, days=days, hours=hours,
minutes=minutes, seconds=seconds, milliseconds=milliseconds, microseconds=0)
now = utcnow()
return TimeInterval(now - delta, now)
# @profile
def __new__(cls, start, end):
"""
Initialise the object with the start and end times
:param start: The start time
:param end: The end time
"""
return super(TimeInterval, cls).__new__(cls, start, end)
@profile
def __init__(self, start, end):
self._start = start
self._end = end
self._validate()
super(TimeInterval, self).__init__()
# @profile
[docs] def to_tuple(self):
return self.start, self.end
@profile
def _validate(self):
if is_naive(self._start):
self._start = self._start.replace(tzinfo=UTC)
if is_naive(self._end):
self._end = self._end.replace(tzinfo=UTC)
# Remove the microseconds, since HyperStream is only millisecond precise
self._start = remove_microseconds(self._start)
self._end = remove_microseconds(self._end)
try:
if self._start >= self._end:
raise ValueError("start should be strictly less than end")
except TypeError:
raise # for debugging
# TODO: Temporarily remove extra validation
#
# if not isinstance(self._start, (date, datetime)):
# raise TypeError("start should be datetime.datetime object")
#
# if not isinstance(self._end, (date, datetime)):
# raise TypeError("end should be datetime.datetime object")
@property
# @profile
def width(self):
return self._end - self._start
@property
# @profile
def start(self):
return self._start
@start.setter
# @profile
def start(self, value):
self._start = value
self._validate()
@property
# @profile
def end(self):
return self._end
@end.setter
# @profile
def end(self, value):
self._end = value
self._validate()
@property
# @profile
def humanized(self):
return "({0} to {1}]".format(arrow.get(self.start).humanize(), arrow.get(self.end).humanize())
# @profile
def __str__(self):
return "({0}, {1}]".format(self.start, self.end)
# @profile
def __repr__(self):
return "{}(start={}, end={})".format(self.__class__.__name__, repr(self.start), repr(self.end))
# @profile
def __eq__(self, other):
return isinstance(other, TimeInterval) and self.start == other.start and self.end == other.end
# @profile
def __ne__(self, other):
return not self == other
# @profile
def __hash__(self):
return hash((self.start, self.end))
# @profile
def __contains__(self, item):
if isinstance(item, (date, datetime)):
return self.start < item <= self.end
if isinstance(item, TimeInterval):
return self.start < item.start and item.end <= self.end
raise TypeError("can't compare datetime.datetime to {}".format(type(item)))
# @profile
def __add__(self, other):
if isinstance(other, timedelta):
return TimeInterval(self.start + other, self.end + other)
if isinstance(other, (tuple, list)) and len(other) == 2:
other = RelativeTimeInterval(*other)
if not isinstance(other, RelativeTimeInterval):
raise ValueError("Can only add a relative time interval to a time interval")
return TimeInterval(self.start + timedelta(other.start), self.end + timedelta(other.end))
[docs] def to_json(self):
return json.dumps((self._start, self._end))
# def resize(self, *args):
# if len(args) == 1:
# if isinstance(args[0], RelativeTimeInterval):
# rti = args[0]
# else:
# raise TypeError("Single argument should be RelativeTimeInterval")
# elif len(args) == 2:
# rti = RelativeTimeInterval(*args)
# else:
# raise ValueError("Too many input arguments")
# return self + rti
# noinspection PyMissingConstructor
[docs]class RelativeTimeInterval(TimeInterval):
"""
Relative time interval object.
Thin wrapper around a (start, end) tuple of timedelta objects that provides some validation
"""
def __init__(self, start, end):
"""
Initialise the object with the start and end times
:param start: The start time
:param end: The end time
"""
self._start = get_timedelta(start)
self._end = get_timedelta(end)
self._validate()
def _validate(self):
if not isinstance(self._start, timedelta):
raise TypeError("start should datetime.timedelta object")
if not isinstance(self._end, timedelta):
raise TypeError("end should datetime.timedelta object")
if self._start >= self._end:
raise ValueError("start should be strictly less than end")
if self._end > timedelta(0):
raise ValueError("relative time intervals in the future are not supported")
@property
def start(self):
return self._start.total_seconds()
@start.setter
def start(self, value):
self._start = get_timedelta(value)
self._validate()
@property
def end(self):
return self._end.total_seconds()
@end.setter
def end(self, value):
self._end = get_timedelta(value)
self._validate()
[docs] def absolute(self, dt):
if not isinstance(dt, (date, datetime)):
raise ValueError("Expected date|datetime, got {}".format(type(dt)))
return TimeInterval(start=dt + self._start, end=dt + self._end)
[docs]@profile
def parse_time_tuple(start, end):
"""
Parse a time tuple. These can be:
relative in seconds, e.g. (-4, 0)
relative in timedelta, e.g. (timedelta(seconds=-4), timedelta(0))
absolute in date/datetime, e.g. (datetime(2016, 4, 28, 20, 0, 0, 0, UTC), datetime(2016, 4, 28, 21, 0, 0, 0, UTC))
absolute in iso strings, e.g. ("2016-04-28T20:00:00.000Z", "2016-04-28T20:01:00.000Z")
Mixtures of relative and absolute are not allowed
:param start: Start time
:param end: End time
:type start: int | timedelta | datetime | str
:type end: int | timedelta | datetime | str
:return: TimeInterval or RelativeTimeInterval object
"""
if isinstance(start, int):
start_time = timedelta(seconds=start)
elif isinstance(start, timedelta):
start_time = start
elif start is None:
start_time = MIN_DATE
elif isinstance(start, (date, datetime)):
start_time = start.replace(tzinfo=UTC)
else:
start_time = ciso8601.parse_datetime(start).replace(tzinfo=UTC)
if isinstance(end, int):
# TODO: add check for future (negative values) and ensure that start < end
if not isinstance(start_time, timedelta):
raise ValueError("Can't mix relative and absolute times")
end_time = timedelta(seconds=end)
elif isinstance(end, timedelta):
if not isinstance(start_time, timedelta):
raise ValueError("Can't mix relative and absolute times")
end_time = end
elif end is None:
end_time = utcnow() # TODO: or MAX_DATE?
elif isinstance(end, datetime):
end_time = end.replace(tzinfo=UTC)
else:
end_time = ciso8601.parse_datetime(end).replace(tzinfo=UTC)
if isinstance(start_time, timedelta):
return RelativeTimeInterval(start=start_time, end=end_time)
else:
return TimeInterval(start=start_time, end=end_time)