Source code for hyperstream.time_interval

# The MIT License (MIT) # Copyright (c) 2014-2017 University of Bristol
#
#  Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to deal
#  in the Software without restriction, including without limitation the rights
#  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
#  copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
#  The above copyright notice and this permission notice shall be included in all
#  copies or substantial portions of the Software.
#
#  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
#  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
#  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
#  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
#  DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
#  OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
#  OR OTHER DEALINGS IN THE SOFTWARE.
"""
Module for dealing with time intervals containing TimeInterval, TimeIntervals, and RelativeTimeInterval
"""

from .utils import MIN_DATE, MAX_DATE, utcnow, UTC, Printable, get_timedelta, is_naive, remove_microseconds

from datetime import date, datetime, timedelta
import ciso8601
from collections import namedtuple
import arrow
import json


[docs]def profile(ob): """ Comment out this function to be able to use the line_profiler module. e.g. call: kernprof -l scripts/deploy_summariser.py --loglevel=10 python -m line_profiler deploy_summariser.py.lprof > deploy_summariser.py.summary :param ob: object :return: object """ return ob
[docs]class TimeIntervals(Printable): """ Container class for time intervals, that manages splitting and joining Example object: (t1,t2] U (t3,t4] U ... """ @profile def __init__(self, intervals=None): """ Initialise the object with the given intervals. These should be in a format that can be parsed by parse_time_tuple :param intervals: The time intervals """ self.intervals = self.parse(intervals) # @profile def __str__(self): return " U ".join(map(str, self.intervals)) if self.intervals else "[]" # @profile def __repr__(self): return "{}([{}])".format(self.__class__.__name__, ", ".join(map(repr, self.intervals)))
[docs] @profile def parse(self, intervals): parsed = [] if intervals: for v in intervals: if isinstance(v, (tuple, list)): if len(v) != 2: raise TypeError() v = parse_time_tuple(*v) elif isinstance(v, TimeInterval): v = TimeInterval(v.start, v.end) else: raise TypeError("Expected tuple/list/TimeInterval ({} given)".format(type(v))) # try: # v = parse_time_tuple(*v) # except Exception as e: # import logging # logging.debug(e) # if isinstance(v, TimeInterval): # v = TimeInterval(v.start, v.end) # else: # raise TypeError("Expected tuple/list/TimeInterval ({} given)".format(type(v))) parsed.append(v) return parsed
@property # @profile def is_empty(self): return len(self.intervals) == 0 @property # @profile def start(self): return min(self.intervals, key=lambda x: x.start).start if self.intervals else None @property # @profile def end(self): return max(self.intervals, key=lambda x: x.end).end if self.intervals else None @property # @profile def span(self): return TimeInterval(self.start, self.end) if self.intervals else None @property # @profile def humanized(self): return " U ".join(map(lambda x: x.humanized, self.intervals)) if self.intervals else "Empty"
[docs] @profile def split(self, points): '''Splits the list of time intervals in the specified points The function assumes that the time intervals do not overlap and ignores points that are not inside of any interval. Parameters ========== points: list of datetime ''' for p in points: for i in range(len(self.intervals)): if (self.intervals[i].start < p) and (self.intervals[i].end > p): self.intervals = (self.intervals[:i] + [TimeInterval(self.intervals[i].start, p), TimeInterval(p, self.intervals[i].end)] + self.intervals[(i + 1):]) break
# @profile
[docs] def compress(self): if len(self.intervals) == 0: return v = self.intervals[:1] for i in range(1, len(self.intervals)): if self.intervals[i].start == v[-1].end: v[-1] = TimeInterval(v[-1].start, self.intervals[i].end) else: v.append(self.intervals[i]) self.intervals = v
@profile def __add__(self, other): if isinstance(other, TimeInterval): if self.is_empty: return TimeIntervals([other]) if other.start > self.end: self.intervals.append(other) elif other.end < self.start: self.intervals.insert(0, other) elif other.start == self.end: self.intervals[-1].end = other.end elif other.end == self.start: self.intervals[0].start = other.start else: return self + TimeIntervals([other]) return self if self.is_empty: return TimeIntervals(other.intervals) if other.is_empty: return TimeIntervals(self.intervals) self_points = [point for interval in self.intervals for point in (interval.start, interval.end)] other_points = [point for interval in other.intervals for point in (interval.start, interval.end)] self.split(other_points) other.split(self_points) v = list(set(self.intervals).union(set(other.intervals))) v.sort(key=lambda ii: ii.start) new = TimeIntervals(v) self.compress() other.compress() new.compress() return new @profile def __sub__(self, other): if self == other: return TimeIntervals([]) if not other: other = TimeIntervals([]) self_points = [point for interval in self.intervals for point in (interval.start, interval.end)] other_points = [point for interval in other.intervals for point in (interval.start, interval.end)] self.split(other_points) other.split(self_points) v = list(set(self.intervals).difference(set(other.intervals))) v.sort(key=lambda ii: ii.start) new = TimeIntervals(v) self.compress() other.compress() new.compress() return new # @profile def __eq__(self, other): return isinstance(other, TimeIntervals) \ and len(self.intervals) == len(other.intervals) \ and all(z[0] == z[1] for z in zip(self.intervals, other.intervals)) # @profile def __ne__(self, other): return not self == other # @profile def __iter__(self): return iter(sorted(self.intervals)) # @profile def __getitem__(self, key): if isinstance(key, slice): items = self.intervals[key] if isinstance(items, TimeInterval): return items return TimeIntervals(items) return self.intervals[key] # @profile def __bool__(self): return self.intervals is not None and len(self.intervals) > 0 __nonzero__ = __bool__ # @profile def __len__(self): return len(self.intervals)
[docs] def to_json(self): return json.dumps({'intervals': self.intervals})
[docs]class TimeInterval(namedtuple("TimeInterval", "start end")): """ Time interval object. Thin wrapper around a (start, end) tuple of datetime objects that provides some validation """
[docs] @classmethod # @profile def all_time(cls): return TimeInterval(MIN_DATE, MAX_DATE)
[docs] @classmethod # @profile def up_to_now(cls): return TimeInterval(MIN_DATE, utcnow())
[docs] @classmethod # @profile def now_minus(cls, weeks=0, days=0, hours=0, minutes=0, seconds=0, milliseconds=0): delta = timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds, milliseconds=milliseconds, microseconds=0) now = utcnow() return TimeInterval(now - delta, now)
# @profile def __new__(cls, start, end): """ Initialise the object with the start and end times :param start: The start time :param end: The end time """ return super(TimeInterval, cls).__new__(cls, start, end) @profile def __init__(self, start, end): self._start = start self._end = end self._validate() super(TimeInterval, self).__init__() # @profile
[docs] def to_tuple(self): return self.start, self.end
@profile def _validate(self): if is_naive(self._start): self._start = self._start.replace(tzinfo=UTC) if is_naive(self._end): self._end = self._end.replace(tzinfo=UTC) # Remove the microseconds, since HyperStream is only millisecond precise self._start = remove_microseconds(self._start) self._end = remove_microseconds(self._end) try: if self._start >= self._end: raise ValueError("start should be strictly less than end") except TypeError: raise # for debugging # TODO: Temporarily remove extra validation # # if not isinstance(self._start, (date, datetime)): # raise TypeError("start should be datetime.datetime object") # # if not isinstance(self._end, (date, datetime)): # raise TypeError("end should be datetime.datetime object") @property # @profile def width(self): return self._end - self._start @property # @profile def start(self): return self._start @start.setter # @profile def start(self, value): self._start = value self._validate() @property # @profile def end(self): return self._end @end.setter # @profile def end(self, value): self._end = value self._validate() @property # @profile def humanized(self): return "({0} to {1}]".format(arrow.get(self.start).humanize(), arrow.get(self.end).humanize()) # @profile def __str__(self): return "({0}, {1}]".format(self.start, self.end) # @profile def __repr__(self): return "{}(start={}, end={})".format(self.__class__.__name__, repr(self.start), repr(self.end)) # @profile def __eq__(self, other): return isinstance(other, TimeInterval) and self.start == other.start and self.end == other.end # @profile def __ne__(self, other): return not self == other # @profile def __hash__(self): return hash((self.start, self.end)) # @profile def __contains__(self, item): if isinstance(item, (date, datetime)): return self.start < item <= self.end if isinstance(item, TimeInterval): return self.start < item.start and item.end <= self.end raise TypeError("can't compare datetime.datetime to {}".format(type(item))) # @profile def __add__(self, other): if isinstance(other, timedelta): return TimeInterval(self.start + other, self.end + other) if isinstance(other, (tuple, list)) and len(other) == 2: other = RelativeTimeInterval(*other) if not isinstance(other, RelativeTimeInterval): raise ValueError("Can only add a relative time interval to a time interval") return TimeInterval(self.start + timedelta(other.start), self.end + timedelta(other.end))
[docs] def to_json(self): return json.dumps((self._start, self._end))
# def resize(self, *args): # if len(args) == 1: # if isinstance(args[0], RelativeTimeInterval): # rti = args[0] # else: # raise TypeError("Single argument should be RelativeTimeInterval") # elif len(args) == 2: # rti = RelativeTimeInterval(*args) # else: # raise ValueError("Too many input arguments") # return self + rti # noinspection PyMissingConstructor
[docs]class RelativeTimeInterval(TimeInterval): """ Relative time interval object. Thin wrapper around a (start, end) tuple of timedelta objects that provides some validation """ def __init__(self, start, end): """ Initialise the object with the start and end times :param start: The start time :param end: The end time """ self._start = get_timedelta(start) self._end = get_timedelta(end) self._validate() def _validate(self): if not isinstance(self._start, timedelta): raise TypeError("start should datetime.timedelta object") if not isinstance(self._end, timedelta): raise TypeError("end should datetime.timedelta object") if self._start >= self._end: raise ValueError("start should be strictly less than end") if self._end > timedelta(0): raise ValueError("relative time intervals in the future are not supported") @property def start(self): return self._start.total_seconds() @start.setter def start(self, value): self._start = get_timedelta(value) self._validate() @property def end(self): return self._end.total_seconds() @end.setter def end(self, value): self._end = get_timedelta(value) self._validate()
[docs] def absolute(self, dt): if not isinstance(dt, (date, datetime)): raise ValueError("Expected date|datetime, got {}".format(type(dt))) return TimeInterval(start=dt + self._start, end=dt + self._end)
[docs]@profile def parse_time_tuple(start, end): """ Parse a time tuple. These can be: relative in seconds, e.g. (-4, 0) relative in timedelta, e.g. (timedelta(seconds=-4), timedelta(0)) absolute in date/datetime, e.g. (datetime(2016, 4, 28, 20, 0, 0, 0, UTC), datetime(2016, 4, 28, 21, 0, 0, 0, UTC)) absolute in iso strings, e.g. ("2016-04-28T20:00:00.000Z", "2016-04-28T20:01:00.000Z") Mixtures of relative and absolute are not allowed :param start: Start time :param end: End time :type start: int | timedelta | datetime | str :type end: int | timedelta | datetime | str :return: TimeInterval or RelativeTimeInterval object """ if isinstance(start, int): start_time = timedelta(seconds=start) elif isinstance(start, timedelta): start_time = start elif start is None: start_time = MIN_DATE elif isinstance(start, (date, datetime)): start_time = start.replace(tzinfo=UTC) else: start_time = ciso8601.parse_datetime(start).replace(tzinfo=UTC) if isinstance(end, int): # TODO: add check for future (negative values) and ensure that start < end if not isinstance(start_time, timedelta): raise ValueError("Can't mix relative and absolute times") end_time = timedelta(seconds=end) elif isinstance(end, timedelta): if not isinstance(start_time, timedelta): raise ValueError("Can't mix relative and absolute times") end_time = end elif end is None: end_time = utcnow() # TODO: or MAX_DATE? elif isinstance(end, datetime): end_time = end.replace(tzinfo=UTC) else: end_time = ciso8601.parse_datetime(end).replace(tzinfo=UTC) if isinstance(start_time, timedelta): return RelativeTimeInterval(start=start_time, end=end_time) else: return TimeInterval(start=start_time, end=end_time)