Source code for hyperstream.channels.file_channel

# The MIT License (MIT) # Copyright (c) 2014-2017 University of Bristol
#
#  Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to deal
#  in the Software without restriction, including without limitation the rights
#  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
#  copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
#  The above copyright notice and this permission notice shall be included in all
#  copies or substantial portions of the Software.
#
#  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
#  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
#  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
#  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
#  DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
#  OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
#  OR OTHER DEALINGS IN THE SOFTWARE.

from .memory_channel import ReadOnlyMemoryChannel
from ..stream import StreamId, Stream, StreamInstance
from ..utils import Printable, MIN_DATE, UTC

import ciso8601
import os
from semantic_version import Version
import logging


[docs]class FileDateTimeVersion(Printable): """ Simple class to hold file details along with the timestamp and version number from the filename. Uses semantic version. """ def __init__(self, filename, split_char='_'): self.long_filename = filename self.filename_no_extension, self.extension = os.path.splitext(filename) self.timestamp, self.version = self.filename_no_extension.split(split_char, 1) self.timestamp = ciso8601.parse_datetime(self.timestamp) if not self.timestamp: raise ValueError("Invalid timestamp {}".format(self.timestamp)) self.timestamp = self.timestamp.replace(tzinfo=UTC) self.version = Version(self.version[1:]) @property def is_python(self): return self.extension == '.py'
[docs]class FileChannel(ReadOnlyMemoryChannel): """ An abstract stream channel where the streams are recursive sub-folders under a given path and documents correspond to all those files which have a timestamp as their prefix in the format yyyy_mm_dd_hh_mm_ss_mmm_*. All the derived classes must override the function data_loader(short_path,file_long_name) which determines how the data are loaded into the document of the stream. The files of the described format must never be deleted. The call update(up_to_timestamp) must not be called unless it is guaranteed that later no files with earlier timestamps are added. """ path = "" def __init__(self, channel_id, path, up_to_timestamp=MIN_DATE): self.path = path super(FileChannel, self).__init__(channel_id=channel_id, up_to_timestamp=up_to_timestamp)
[docs] def file_filter(self, sorted_file_names): for file_long_name in sorted_file_names: if not file_long_name.startswith('__') and file_long_name[-3:] == '.py': try: tool_info = FileDateTimeVersion(file_long_name) yield tool_info except ValueError as e: logging.warn('Filename in incorrect format {0}, {1}'.format(file_long_name, e.message))
[docs] @staticmethod def walk(directory, level=1): directory = directory.rstrip(os.path.sep) if not os.path.isdir(directory): raise ValueError("Invalid path: {}".format(directory)) num_sep = directory.count(os.path.sep) for root, dirs, files in os.walk(directory): yield root, dirs, files num_sep_this = root.count(os.path.sep) if num_sep + level <= num_sep_this: del dirs[:]
[docs] def update_streams(self, up_to_timestamp): path = self.path for long_path, dir_names, file_names in self.walk(path, level=1): file_names = list(filter(lambda ff: not ff.startswith('__'), file_names)) if len(file_names) == 0: continue name = long_path[len(path) + 1:] if not name: # Empty folder continue stream_id = StreamId(name=name) stream = Stream(channel=self, stream_id=stream_id, calculated_intervals=None, sandbox=None) self.streams[stream_id] = stream
[docs] def data_loader(self, short_path, file_info): raise NotImplementedError
[docs] def get_results(self, stream, time_interval): # TODO: Make this behave like the other channels # if relative_time_interval.end > self.up_to_timestamp: # raise ValueError( # 'The stream is not available after ' + str(self.up_to_timestamp) + ' and cannot be calculated') result = [] module_path = os.path.join(self.path, stream.stream_id.name) for file_info in self.file_filter(sorted(os.listdir(module_path))): if file_info.timestamp in time_interval and file_info.timestamp <= self.up_to_timestamp: result.append(StreamInstance( timestamp=file_info.timestamp, value=self.data_loader(stream.stream_id.name, file_info) )) result.sort(key=lambda x: x.timestamp) return result