hyperstream.workflow package

Submodules

hyperstream.workflow.factor module

hyperstream.workflow.meta_data_manager module

hyperstream.workflow.node module

hyperstream.workflow.plate module

hyperstream.workflow.plate_manager module

hyperstream.workflow.workflow module

Workflow and WorkflowMonitor definitions.

class hyperstream.workflow.workflow.Workflow(workflow_id, name, description, owner, online=False, monitor=False)[source]

Bases: hyperstream.utils.containers.Printable

Workflow. This defines the graph of operations through “nodes” and “factors”.

static check_multi_output_plate_compatibility(source_plates, sink_plate)[source]

Check multi-output plate compatibility. This ensures that the source plates and sink plates match for a multi- output plate

Parameters:
  • source_plates – The source plates
  • sink_plate – The sink plate
Returns:

True if the plates are compatible

static check_plate_compatibility(tool, source_plate, sink_plate)[source]

Checks whether the source and sink plate are compatible given the tool

Parameters:
  • tool (Tool) – The tool
  • source_plate (Plate) – The source plate
  • sink_plate (Plate) – The sink plate
Returns:

Either an error, or None

Return type:

None | str

create_factor(tool, sources, sink, alignment_node=None)[source]

Creates a factor. Instantiates a single tool for all of the plates, and connects the source and sink nodes with that tool.

Note that the tool parameters these are currently fixed over a plate. For parameters that vary over a plate, an extra input stream should be used

Parameters:
  • alignment_node (Node | None) –
  • tool (Tool | dict) – The tool to use. This is either an instantiated Tool object or a dict with “name” and “parameters”
  • sources (list[Node] | tuple[Node] | None) – The source nodes
  • sink (Node) – The sink node
Returns:

The factor object

Return type:

Factor

create_factor_general(*args, **kwargs)[source]

General signature for factor creation that tries each of the factor creation types using duck typing

Parameters:
  • args – The positional arguments
  • kwargs – The named arguments
Returns:

The created factor

create_multi_output_factor(tool, source, splitting_node, sink)[source]

Creates a multi-output factor. This takes a single node, applies a MultiOutputTool to create multiple nodes on a new plate Instantiates a single tool for all of the input plate values, and connects the source and sink nodes with that tool.

Note that the tool parameters these are currently fixed over a plate. For parameters that vary over a plate, an extra input stream should be used

Parameters:
  • tool (MultiOutputTool | dict) – The tool to use. This is either an instantiated Tool object or a dict with “name” and “parameters”
  • source (Node | None) – The source node
  • splitting_node – The node over which to split
  • sink (Node) – The sink node
Returns:

The factor object

Return type:

Factor

create_node(stream_name, channel, plates)[source]

Create a node in the graph. Note: assumes that the streams already exist

Parameters:
  • stream_name – The name of the stream
  • channel – The channel where this stream lives
  • plates – The plates. The stream meta-data will be auto-generated from these
Returns:

The streams associated with this node

create_node_creation_factor(tool, source, output_plate, plate_manager)[source]

Creates a factor that itself creates an output node, and ensures that the plate for the output node exists along with all relevant meta-data

Parameters:
  • tool – The tool
  • source – The source node
  • output_plate (dict) – The details of the plate that will be created (dict)
  • plate_manager (PlateManager) – The hyperstream plate manager
Returns:

The created factor

execute(time_interval)[source]

Here we execute the factors over the streams in the workflow Execute the factors in reverse order. We can’t just execute the last factor because there may be multiple “leaf” factors that aren’t triggered by upstream computations.

Parameters:time_interval – The time interval to execute this workflow over
static factorgraph_viz(d)[source]

Map the dictionary into factorgraph-viz format. See https://github.com/mbforbes/factorgraph-viz

Parameters:d – The dictionary
Returns:The formatted dictionary
requested_intervals

Get the requested intervals (from the database)

Returns:The requested intervals
to_dict(tool_long_names=True)[source]

Get a representation of the workflow as a dictionary for display purposes

Parameters:tool_long_names (bool) – Indicates whether to use long names, such as SplitterFromStream(element=None, use_mapping_keys_only=True) or short names, such as splitter_from_stream
Returns:The dictionary of nodes, factors and plates
to_json(formatter=None, tool_long_names=True, **kwargs)[source]

Get a JSON representation of the workflow

Parameters:
  • tool_long_names – Indicates whether to use long names, such as SplitterFromStream(element=None, use_mapping_keys_only=True) or short names, such as splitter_from_stream
  • formatter – The formatting function
  • kwargs – Keyword arguments for the json output
Returns:

A JSON string

class hyperstream.workflow.workflow.WorkflowMonitor(workflow)[source]

Bases: object

Small helper class that provides logging output to monitor workflow progress

hyperstream.workflow.workflow_manager module

class hyperstream.workflow.workflow_manager.WorkflowManager(channel_manager, plate_manager)[source]

Bases: hyperstream.utils.containers.Printable

Workflow manager. Responsible for reading and writing workflows to the database, and can execute all of the workflows

add_workflow(workflow, commit=False)[source]

Add a new workflow and optionally commit it to the database :param workflow: The workflow :param commit: Whether to commit the workflow to the database :type workflow: Workflow :type commit: bool :return: None

commit_all()[source]

Commit all workflows to the database :return: None

commit_workflow(workflow_id)[source]

Commit the workflow to the database :param workflow_id: The workflow id :return: None

delete_workflow(workflow_id)[source]

Delete a workflow from the database :param workflow_id: :return: None

execute_all()[source]

Execute all workflows

load_workflow(workflow_id)[source]

Load workflow from the database and store in memory :param workflow_id: The workflow id :return: The workflow

set_all_requested_intervals(requested_intervals)[source]

Sets the requested intervals for all workflow :param requested_intervals: The requested intervals :return: None :type requested_intervals: TimeIntervals

set_requested_intervals(workflow_id, requested_intervals)[source]

Sets the requested intervals for a given workflow :param workflow_id: The workflow id :param requested_intervals: The requested intervals :return: None :type requested_intervals: TimeIntervals

hyperstream.workflow.workflow_manager.code_pickler(code)[source]
hyperstream.workflow.workflow_manager.code_unpickler(data)[source]

Module contents