# The MIT License (MIT)
# Copyright (c) 2014-2017 University of Bristol
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
# OR OTHER DEALINGS IN THE SOFTWARE.
from ..time_interval import TimeInterval, TimeIntervals
from ..utils import Printable
from . import StreamInstance
import logging
from collections import deque
from itertools import islice
[docs]class StreamView(Printable):
"""
Simple helper class for storing streams with a time interval (i.e. a "view" on a stream)
:param stream: The stream upon which this is a view
:param time_interval: The time interval over which this view is defined
:param force_calculation: Whether we should force calculation for this stream view if data does not exist
:type stream: Stream
:type time_interval: TimeInterval
"""
def __init__(self, stream, time_interval, force_calculation=False):
from . import Stream
if not isinstance(stream, Stream):
raise ValueError("stream must be Stream object")
if not isinstance(time_interval, TimeInterval):
raise ValueError("relative_time_interval must be TimeInterval object")
self.stream = stream
self.time_interval = time_interval
self.force_calculation = force_calculation
def __iter__(self):
required_intervals = TimeIntervals([self.time_interval]) - self.stream.calculated_intervals
from . import AssetStream
# if not isinstance(self.stream, AssetStream) and not required_intervals.is_empty:
if not required_intervals.is_empty:
if self.force_calculation:
if self.stream.parent_node is not None and self.stream.parent_node.factor is not None:
# Try to request upstream computation
for interval in required_intervals:
self.stream.parent_node.factor.execute(interval)
# Is there still computation needing doing?
required_intervals = TimeIntervals([self.time_interval]) - self.stream.calculated_intervals
if not required_intervals.is_empty:
logging.warn(
"Stream {} not available for time interval {}. Perhaps upstream calculations haven't been performed"
.format(self.stream.stream_id, required_intervals))
for item in self.stream.channel.get_results(self.stream, self.time_interval):
yield item
[docs] def items(self):
"""
Return all results as a list
:return: The results
:rtype: list[StreamInstance]
"""
return list(self.iteritems())
[docs] def iteritems(self):
return iter(self)
[docs] def dict_iteritems(self, flat=True):
return map(lambda x: x.as_dict(flat=flat), self)
[docs] def dict_items(self, flat=True):
return list(self.dict_iteritems(flat))
[docs] def timestamps(self):
return list(self.itertimestamps())
[docs] def itertimestamps(self):
return map(lambda x: x.timestamp, self.iteritems())
[docs] def values(self):
return list(self.itervalues())
[docs] def itervalues(self):
return map(lambda x: x.value, self.iteritems())
[docs] def last(self, default=None):
item = default
for item in self:
pass
return item
[docs] def first(self, default=None):
for el in self:
return el
return default
[docs] def head(self, n):
i = 0
for d in self.iteritems():
i += 1
if i > n:
break
yield d
[docs] def tail(self, n):
return iter(deque(self, maxlen=n))
[docs] def islice(self, start, stop=None, step=1):
return islice(self, start, stop, step)
[docs] def component(self, key):
# TODO: is this needed now we have a Component() tool?
for (time, data) in self.iteritems():
if key in data:
yield StreamInstance(time, data[key])
[docs] def component_filter(self, key, values):
# TODO: is this needed now we have a ComponentFilter() tool?
for (time, data) in self.iteritems():
if key in data and data[key] in values:
yield StreamInstance(time, data)
[docs] def delete_nones(self):
# TODO: Test this against ComponentFilter(key, values=[None], complement=true)
for (time, data) in self.iteritems():
data2 = {}
for (key, value) in data.items():
if value is not None:
data2[key] = value
yield StreamInstance(time, data2)