Source code for hyperstream.tool.base_tool

# The MIT License (MIT)
# Copyright (c) 2014-2017 University of Bristol
# 
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# 
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# 
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
# OR OTHER DEALINGS IN THE SOFTWARE.

from ..models import ToolModel, ToolParameterModel
from ..utils import Printable, Hashable, func_dump, func_load, camel_to_snake

import logging
import pickle


[docs]class BaseTool(Printable, Hashable): """ Base class for all tools """ def __init__(self, **kwargs): """ Base class initializer. Note that this performs setattr on all of the keyword arguments, so this does not have to be done manually inside the tools. :param kwargs: The keyword arguments """ if kwargs: logging.debug('Defining a {} tool with parameters {}'.format(self.__class__.__name__, kwargs)) else: logging.debug('Defining a {} tool'.format(self.__class__.__name__)) for k, v in kwargs.items(): self.__setattr__(k, v) def __eq__(self, other): """ Equality test :param other: the other tool :return: whether self and other are equal """ return isinstance(other, BaseTool) and hash(self) == hash(other)
[docs] def message(self, interval): """ Get the execution message :param interval: The time interval :return: The execution message """ return '{} running from {} to {}'.format(self.__class__.__name__, str(interval.start), str(interval.end))
@property def name(self): """ Get the name of the tool, converted to snake format (e.g. "splitter_from_stream") :return: The name """ return camel_to_snake(super(BaseTool, self).name) @name.setter def name(self, value): """ Set the name :param value: The name :return: None """ self._name = value @property def parameters_dict(self): """ Get the tool parameters as a simple dictionary :return: The tool parameters """ d = {} for k, v in self.__dict__.items(): if not k.startswith("_"): d[k] = v return d @property def parameters(self): """ Get the tool parameters :return: The tool parameters along with additional information (whether they are functions or sets) """ parameters = [] for k, v in self.__dict__.items(): if k.startswith("_"): continue is_function = False is_set = False if callable(v): value = pickle.dumps(func_dump(v)) is_function = True elif isinstance(v, set): value = list(v) is_set = True else: value = v parameters.append(dict( key=k, value=value, is_function=is_function, is_set=is_set )) return parameters
[docs] @staticmethod def parameters_from_model(parameters_model): """ Get the tool parameters model from dictionaries :param parameters_model: The parameters as a mongoengine model :return: The tool parameters as a dictionary """ parameters = {} for p in parameters_model: if p.is_function: code, defaults, closure = pickle.loads(p.value) parameters[p.key] = func_load(code, defaults, closure, globs=globals()) elif p.is_set: parameters[p.key] = set(p.value) else: parameters[p.key] = p.value return parameters
[docs] @staticmethod def parameters_from_dicts(parameters): """ Get the tool parameters model from dictionaries :param parameters: The parameters as dictionaries :return: The tool parameters model """ return map(lambda p: ToolParameterModel(**p), parameters)
[docs] def get_model(self): """ Gets the mongoengine model for this tool, which serializes parameters that are functions :return: The mongoengine model. TODO: Note that the tool version is currently incorrect (0.0.0) """ return ToolModel( name=self.name, version="0.0.0", parameters=self.parameters_from_dicts(self.parameters) )
[docs] @staticmethod def write_to_history(**kwargs): """ Write to the history of executions of this tool :param kwargs: keyword arguments describing the executions :return: None """ from hyperstream import HyperStream hs = HyperStream(loglevel=logging.CRITICAL, file_logger=False, console_logger=False, mqtt_logger=None) if hs.current_session: hs.current_session.write_to_history(**kwargs)