# The MIT License (MIT) # Copyright (c) 2014-2017 University of Bristol
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
# OR OTHER DEALINGS IN THE SOFTWARE.
from .memory_channel import ReadOnlyMemoryChannel
from ..stream import StreamId, Stream, StreamInstance
from ..utils import Printable, MIN_DATE, UTC
import ciso8601
import os
from semantic_version import Version
import logging
[docs]class FileDateTimeVersion(Printable):
"""
Simple class to hold file details along with the timestamp and version number from the filename.
Uses semantic version.
"""
def __init__(self, filename, split_char='_'):
self.long_filename = filename
self.filename_no_extension, self.extension = os.path.splitext(filename)
self.timestamp, self.version = self.filename_no_extension.split(split_char, 1)
self.timestamp = ciso8601.parse_datetime(self.timestamp)
if not self.timestamp:
raise ValueError("Invalid timestamp {}".format(self.timestamp))
self.timestamp = self.timestamp.replace(tzinfo=UTC)
self.version = Version(self.version[1:])
@property
def is_python(self):
return self.extension == '.py'
[docs]class FileChannel(ReadOnlyMemoryChannel):
"""
An abstract stream channel where the streams are recursive sub-folders under a given path and documents correspond to
all those files which have a timestamp as their prefix in the format yyyy_mm_dd_hh_mm_ss_mmm_*.
All the derived classes must override the function data_loader(short_path,file_long_name) which determines how the
data are loaded into the document of the stream.
The files of the described format must never be deleted.
The call update(up_to_timestamp) must not be called unless it is guaranteed that later no files with earlier
timestamps are added.
"""
path = ""
def __init__(self, channel_id, path, up_to_timestamp=MIN_DATE):
self.path = path
super(FileChannel, self).__init__(channel_id=channel_id, up_to_timestamp=up_to_timestamp)
[docs] def file_filter(self, sorted_file_names):
for file_long_name in sorted_file_names:
if not file_long_name.startswith('__') and file_long_name[-3:] == '.py':
try:
tool_info = FileDateTimeVersion(file_long_name)
yield tool_info
except ValueError as e:
logging.warn('Filename in incorrect format {0}, {1}'.format(file_long_name, e.message))
[docs] @staticmethod
def walk(directory, level=1):
directory = directory.rstrip(os.path.sep)
if not os.path.isdir(directory):
raise ValueError("Invalid path: {}".format(directory))
num_sep = directory.count(os.path.sep)
for root, dirs, files in os.walk(directory):
yield root, dirs, files
num_sep_this = root.count(os.path.sep)
if num_sep + level <= num_sep_this:
del dirs[:]
[docs] def update_streams(self, up_to_timestamp):
path = self.path
for long_path, dir_names, file_names in self.walk(path, level=1):
file_names = list(filter(lambda ff: not ff.startswith('__'), file_names))
if len(file_names) == 0:
continue
name = long_path[len(path) + 1:]
if not name:
# Empty folder
continue
stream_id = StreamId(name=name)
stream = Stream(channel=self, stream_id=stream_id, calculated_intervals=None, sandbox=None)
self.streams[stream_id] = stream
[docs] def data_loader(self, short_path, file_info):
raise NotImplementedError
[docs] def get_results(self, stream, time_interval):
# TODO: Make this behave like the other channels
# if relative_time_interval.end > self.up_to_timestamp:
# raise ValueError(
# 'The stream is not available after ' + str(self.up_to_timestamp) + ' and cannot be calculated')
result = []
module_path = os.path.join(self.path, stream.stream_id.name)
for file_info in self.file_filter(sorted(os.listdir(module_path))):
if file_info.timestamp in time_interval and file_info.timestamp <= self.up_to_timestamp:
result.append(StreamInstance(
timestamp=file_info.timestamp,
value=self.data_loader(stream.stream_id.name, file_info)
))
result.sort(key=lambda x: x.timestamp)
return result