# The MIT License (MIT)
# Copyright (c) 2014-2017 University of Bristol
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
# OR OTHER DEALINGS IN THE SOFTWARE.
import logging
try:
import copy_reg
except ImportError:
# python 3.X
import copyreg as copy_reg
import marshal
import types
from copy import deepcopy
from mongoengine.context_managers import switch_db
from . import Workflow
from ..time_interval import TimeIntervals
from ..models import WorkflowDefinitionModel, FactorDefinitionModel, NodeDefinitionModel, WorkflowStatusModel
from ..utils import Printable, FrozenKeyDict, StreamNotFoundError, utcnow, ToolInitialisationError, ToolNotFoundError, \
IncompatibleToolError
from ..factor import Factor, NodeCreationFactor, MultiOutputFactor
from ..tool import Tool
[docs]def code_unpickler(data):
return marshal.loads(data)
[docs]def code_pickler(code):
return code_unpickler, (marshal.dumps(code),)
# Register the code_pickler and code_unpickler handlers for code objects. See http://effbot.org/librarybook/copy-reg.htm
copy_reg.pickle(types.CodeType, code_pickler, code_unpickler)
[docs]class WorkflowManager(Printable):
"""
Workflow manager. Responsible for reading and writing workflows to the database, and can execute all of the
workflows
"""
def __init__(self, channel_manager, plate_manager):
"""
Initialise the workflow object
:param channel_manager: The channel manager
:param plate_manager: The plate manager
"""
self.channel_manager = channel_manager
self.plate_manager = plate_manager
self.workflows = FrozenKeyDict()
self.uncommitted_workflows = set()
with switch_db(WorkflowDefinitionModel, db_alias='hyperstream'):
for workflow_definition in WorkflowDefinitionModel.objects():
try:
self.load_workflow(workflow_definition.workflow_id)
except (StreamNotFoundError, ToolInitialisationError, ToolNotFoundError, IncompatibleToolError) as e:
logging.warn(str(e))
[docs] def load_workflow(self, workflow_id):
"""
Load workflow from the database and store in memory
:param workflow_id: The workflow id
:return: The workflow
"""
with switch_db(WorkflowDefinitionModel, db_alias='hyperstream'):
workflow_definition = WorkflowDefinitionModel.objects.get(workflow_id=workflow_id)
if not workflow_definition:
logging.warn("Attempted to load workflow with id {}, but not found".format(workflow_id))
workflow = Workflow(
workflow_id=workflow_id,
name=workflow_definition.name,
description=workflow_definition.description,
owner=workflow_definition.owner,
online=workflow_definition.online,
monitor=workflow_definition.monitor
)
for n in workflow_definition.nodes:
workflow.create_node(
stream_name=n.stream_name,
channel=self.channel_manager.get_channel(n.channel_id),
plates=[self.plate_manager.plates[p] for p in n.plate_ids])
for f in workflow_definition.factors:
source_nodes = [workflow.nodes[node_id] for node_id in f.sources] if f.sources else []
sink_nodes = [workflow.nodes[node_id] for node_id in f.sinks] if f.sinks else []
alignment_node = workflow.nodes[f.alignment_node] if f.alignment_node else None
splitting_node = workflow.nodes[f.splitting_node] if f.splitting_node else None
output_plate = f.output_plate
parameters = Tool.parameters_from_model(f.tool.parameters)
# tool = dict(name=f.tool.name, parameters=parameters)
tool = self.channel_manager.get_tool(f.tool.name, parameters, version=None)
if f.factor_type == "Factor":
if len(sink_nodes) != 1:
raise ValueError(
"Standard factors should have a single sink node, received {}"
.format(len(sink_nodes)))
if splitting_node is not None:
raise ValueError(
"Standard factors do not support splitting nodes")
if output_plate is not None:
raise ValueError(
"Standard factors do not support output plates")
workflow.create_factor(
tool=tool,
sources=source_nodes,
sink=sink_nodes[0],
alignment_node=alignment_node
)
elif f.factor_type == "MultiOutputFactor":
if len(source_nodes) > 1:
raise ValueError(
"MultiOutputFactor factors should have at most one source node, received {}"
.format(len(source_nodes)))
if len(sink_nodes) != 1:
raise ValueError(
"MultiOutputFactor factors should have a single sink node, received {}"
.format(len(sink_nodes)))
if alignment_node is not None:
raise ValueError(
"MultiOutputFactor does not support alignment nodes")
if output_plate is not None:
raise ValueError(
"MultiOutputFactor does not support output plates")
workflow.create_multi_output_factor(
tool=tool,
source=source_nodes[0] if source_nodes else None,
splitting_node=splitting_node,
sink=sink_nodes[0]
)
elif f.factor_type == "NodeCreationFactor":
if len(source_nodes) > 1:
raise ValueError(
"NodeCreationFactor factors should no more than one source node, received {}"
.format(len(source_nodes)))
if len(sink_nodes) != 0:
raise ValueError(
"NodeCreationFactor factors should not have sink nodes"
.format(len(sink_nodes)))
if output_plate is None:
raise ValueError(
"NodeCreationFactor requires an output plate definition")
workflow.create_node_creation_factor(
tool=tool,
source=source_nodes[0] if source_nodes else None,
output_plate=output_plate.to_mongo().to_dict(),
plate_manager=self.plate_manager
)
else:
raise NotImplementedError("Unsupported factor type {}".format(f.factor_type))
self.add_workflow(workflow, False)
return workflow
[docs] def add_workflow(self, workflow, commit=False):
"""
Add a new workflow and optionally commit it to the database
:param workflow: The workflow
:param commit: Whether to commit the workflow to the database
:type workflow: Workflow
:type commit: bool
:return: None
"""
if workflow.workflow_id in self.workflows:
raise KeyError("Workflow with id {} already exists".format(workflow.workflow_id))
self.workflows[workflow.workflow_id] = workflow
logging.info("Added workflow {} to workflow manager".format(workflow.workflow_id))
# Optionally also save the workflow to database
if commit:
self.commit_workflow(workflow.workflow_id)
else:
self.uncommitted_workflows.add(workflow.workflow_id)
[docs] def delete_workflow(self, workflow_id):
"""
Delete a workflow from the database
:param workflow_id:
:return: None
"""
deleted = False
with switch_db(WorkflowDefinitionModel, "hyperstream"):
workflows = WorkflowDefinitionModel.objects(workflow_id=workflow_id)
if len(workflows) == 1:
workflows[0].delete()
deleted = True
else:
logging.debug("Workflow with id {} does not exist".format(workflow_id))
with switch_db(WorkflowStatusModel, "hyperstream"):
workflows = WorkflowStatusModel.objects(workflow_id=workflow_id)
if len(workflows) == 1:
workflows[0].delete()
deleted = True
else:
logging.debug("Workflow status with id {} does not exist".format(workflow_id))
if workflow_id in self.workflows:
del self.workflows[workflow_id]
deleted = True
if deleted:
logging.info("Deleted workflow with id {}".format(workflow_id))
[docs] def commit_workflow(self, workflow_id):
"""
Commit the workflow to the database
:param workflow_id: The workflow id
:return: None
"""
# TODO: We should also be committing the Stream definitions if there are new ones
workflow = self.workflows[workflow_id]
with switch_db(WorkflowDefinitionModel, "hyperstream"):
workflows = WorkflowDefinitionModel.objects(workflow_id=workflow_id)
if len(workflows) > 0:
logging.warn("Workflow with id {} already exists in database".format(workflow_id))
return
factors = []
for f in workflow.factors:
tool = f.tool.get_model()
if isinstance(f, Factor):
sources = [s.node_id for s in f.sources] if f.sources else []
sinks = [f.sink.node_id]
alignment_node = f.alignment_node.node_id if f.alignment_node else None
splitting_node = None
output_plate = None
elif isinstance(f, MultiOutputFactor):
sources = [f.source.node_id] if f.source else []
sinks = [f.sink.node_id]
alignment_node = None
splitting_node = f.splitting_node.node_id if f.splitting_node else None
output_plate = None
elif isinstance(f, NodeCreationFactor):
sources = [f.source.node_id] if f.source else []
sinks = []
alignment_node = None
splitting_node = None
output_plate = f.output_plate
else:
raise NotImplementedError("Unsupported factor type")
if output_plate:
output_plate_copy = deepcopy(output_plate)
if 'parent_plate' in output_plate_copy:
del output_plate_copy['parent_plate']
else:
output_plate_copy = None
factor = FactorDefinitionModel(
tool=tool,
factor_type=f.__class__.__name__,
sources=sources,
sinks=sinks,
alignment_node=alignment_node,
splitting_node=splitting_node,
output_plate=output_plate_copy
)
factors.append(factor)
nodes = []
for n in workflow.nodes.values():
nodes.append(NodeDefinitionModel(
stream_name=n.node_id,
plate_ids=n.plate_ids,
channel_id=n._channel.channel_id
))
workflow_definition = WorkflowDefinitionModel(
workflow_id=workflow.workflow_id,
name=workflow.name,
description=workflow.description,
nodes=nodes,
factors=factors,
owner=workflow.owner,
online=workflow.online,
monitor=workflow.monitor
)
workflow_definition.save()
with switch_db(WorkflowStatusModel, db_alias='hyperstream'):
workflow_status = WorkflowStatusModel(
workflow_id=workflow.workflow_id,
last_updated=utcnow(),
last_accessed=utcnow(),
requested_intervals=[]
)
workflow_status.save()
self.uncommitted_workflows.remove(workflow_id)
logging.info("Committed workflow {} to database".format(workflow_id))
[docs] def commit_all(self):
"""
Commit all workflows to the database
:return: None
"""
for workflow_id in self.uncommitted_workflows:
self.commit_workflow(workflow_id)
[docs] def execute_all(self):
"""
Execute all workflows
"""
for workflow_id in self.workflows:
if self.workflows[workflow_id].online:
for interval in self.workflows[workflow_id].requested_intervals:
logging.info("Executing workflow {} over interval {}".format(workflow_id, interval))
self.workflows[workflow_id].execute(interval)
# self.workflows[workflow_id].requested_intervals -= interval
[docs] def set_requested_intervals(self, workflow_id, requested_intervals):
"""
Sets the requested intervals for a given workflow
:param workflow_id: The workflow id
:param requested_intervals: The requested intervals
:return: None
:type requested_intervals: TimeIntervals
"""
if workflow_id not in self.workflows:
raise ValueError("Workflow {} not found".format(workflow_id))
self.workflows[workflow_id].requested_intervals = requested_intervals
[docs] def set_all_requested_intervals(self, requested_intervals):
"""
Sets the requested intervals for all workflow
:param requested_intervals: The requested intervals
:return: None
:type requested_intervals: TimeIntervals
"""
for workflow_id in self.workflows:
if self.workflows[workflow_id].online:
self.workflows[workflow_id].requested_intervals = requested_intervals