hyperstream.channels package

Submodules

hyperstream.channels.assets_channel module

Assets channel module.

class hyperstream.channels.assets_channel.AssetsChannel(channel_id)[source]

Bases: hyperstream.channels.database_channel.DatabaseChannel

Assets Channel. Special kind of database channel for static assets and user input data (workflow parameters etc)

create_stream(stream_id, sandbox=None)[source]

Create the stream

Parameters:
  • stream_id – The stream identifier
  • sandbox – The sandbox for this stream
Returns:

None

Raises:

NotImplementedError

purge_stream(stream_id, remove_definition=False, sandbox=None)[source]

Purge the stream

Parameters:
  • stream_id – The stream identifier
  • remove_definition – Whether to remove the stream definition as well
  • sandbox – The sandbox for this stream
Returns:

None

update_streams(up_to_timestamp)[source]

Update the streams

Parameters:up_to_timestamp
Returns:
write_to_stream(stream_id, data, sandbox=None)[source]

Write to the stream

Parameters:
  • stream_id (StreamId) – The stream identifier
  • data – The stream instances
  • sandbox – The sandbox for this stream
Returns:

None

Raises:

NotImplementedError

hyperstream.channels.assets_channel2 module

hyperstream.channels.base_channel module

class hyperstream.channels.base_channel.BaseChannel(channel_id, can_calc=False, can_create=False, calc_agent=None)[source]

Bases: hyperstream.utils.containers.Printable

Abstract base class for channels

create_stream(stream_id, sandbox=None)[source]

Must be overridden by deriving classes, must create the stream according to the tool and return its unique identifier stream_id

execute_tool(stream, interval)[source]

Executes the stream’s tool over the given time interval

Parameters:
  • stream – the stream reference
  • interval – the time interval
Returns:

None

find_stream(**kwargs)[source]

Finds a single stream with the given meta data values. Useful for debugging purposes.

Parameters:kwargs – The meta data as keyword arguments
Returns:The stream found
find_streams(**kwargs)[source]

Finds streams with the given meta data values. Useful for debugging purposes.

Parameters:kwargs – The meta data as keyword arguments
Returns:The streams found
get_or_create_stream(stream_id, try_create=True)[source]

Helper function to get a stream or create one if it’s not already defined

Parameters:
  • stream_id – The stream id
  • try_create – Whether to try to create the stream if not found
Returns:

The stream object

get_results(stream, time_interval)[source]

Must be overridden by deriving classes. 1. Calculates/receives the documents in the stream for the time interval given 2. Returns success or failure and the results (for some channels the values of kwargs can override the return process, e.g. introduce callbacks)

get_stream_writer(stream)[source]

Must be overridden by deriving classes, must return a function(document_collection) which writes all the given documents of the form (timestamp,data) from document_collection to the stream Example:

.. code-block:: python
if stream_id==1:
def f(document_collection):
for (timestamp,data) in document_collection:
database[timestamp] = data

return(f)

else:
raise Exception(‘No stream with id ‘+str(stream_id))
purge_node(node_id, remove_definition=False, sandbox=None)[source]

Purges a node (collection of streams)

Parameters:
  • node_id – The node identifier
  • remove_definition – Whether to remove the stream definition as well
  • sandbox – The sandbox
Returns:

None

purge_stream(stream_id, remove_definition=False, sandbox=None)[source]

Must be overridden by deriving classes, purges the stream and removes the calculated intervals

update_streams(up_to_timestamp)[source]

Deriving classes must override this function

hyperstream.channels.database_channel module

Database channel module.

class hyperstream.channels.database_channel.DatabaseChannel(channel_id)[source]

Bases: hyperstream.channels.base_channel.BaseChannel

Database Channel. Data stored and retrieved in mongodb using mongoengine.

create_stream(stream_id, sandbox=None)[source]

Create the stream

Parameters:
  • stream_id – The stream identifier
  • sandbox – The sandbox for this stream
Returns:

None

Raises:

NotImplementedError

get_results(stream, time_interval)[source]

Get the results for a given stream

Parameters:
  • time_interval – The time interval
  • stream – The stream object
Returns:

A generator over stream instances

get_stream_writer(stream)[source]

Gets the database channel writer The mongoengine model checks whether a stream_id/datetime pair already exists in the DB (unique pairs) Should be overridden by users’ personal channels - allows for non-mongo outputs.

Parameters:stream – The stream
Returns:The stream writer function
purge_stream(stream_id, remove_definition=False, sandbox=None)[source]

Purge the stream

Parameters:
  • stream_id – The stream identifier
  • remove_definition – Whether to remove the stream definition as well
  • sandbox – The sandbox for this stream
Returns:

None

Raises:

NotImplementedError

update_streams(up_to_timestamp)[source]

Update the streams

Parameters:up_to_timestamp
Returns:

hyperstream.channels.file_channel module

class hyperstream.channels.file_channel.FileChannel(channel_id, path, up_to_timestamp=datetime.datetime(1, 1, 1, 0, 0, tzinfo=<UTC>))[source]

Bases: hyperstream.channels.memory_channel.ReadOnlyMemoryChannel

An abstract stream channel where the streams are recursive sub-folders under a given path and documents correspond to all those files which have a timestamp as their prefix in the format yyyy_mm_dd_hh_mm_ss_mmm_*. All the derived classes must override the function data_loader(short_path,file_long_name) which determines how the data are loaded into the document of the stream. The files of the described format must never be deleted. The call update(up_to_timestamp) must not be called unless it is guaranteed that later no files with earlier timestamps are added.

data_loader(short_path, file_info)[source]
file_filter(sorted_file_names)[source]
get_results(stream, time_interval)[source]
path = ''
update_streams(up_to_timestamp)[source]
static walk(directory, level=1)[source]
class hyperstream.channels.file_channel.FileDateTimeVersion(filename, split_char='_')[source]

Bases: hyperstream.utils.containers.Printable

Simple class to hold file details along with the timestamp and version number from the filename. Uses semantic version.

is_python

hyperstream.channels.memory_channel module

class hyperstream.channels.memory_channel.MemoryChannel(channel_id)[source]

Bases: hyperstream.channels.base_channel.BaseChannel

Channel whose data lives in memory

check_calculation_times()[source]
create_stream(stream_id, sandbox=None)[source]

Must be overridden by deriving classes, must create the stream according to the tool and return its unique identifier stream_id

get_results(stream, time_interval)[source]

Calculates/receives the documents in the stream interval determined by the stream :param stream: The stream reference :param time_interval: The time interval :return: The sorted data items

get_stream_writer(stream)[source]
non_empty_streams
purge_all(remove_definitions=False)[source]

Clears all streams in the channel - use with caution!

Returns:None
purge_stream(stream_id, remove_definition=False, sandbox=None)[source]

Clears all the data in a given stream and the calculated intervals

Parameters:
  • stream_id – The stream id
  • remove_definition – Whether to remove the stream definition as well
  • sandbox – The sandbox id
Returns:

None

update_streams(up_to_timestamp)[source]
class hyperstream.channels.memory_channel.ReadOnlyMemoryChannel(channel_id, up_to_timestamp=datetime.datetime(1, 1, 1, 0, 0, tzinfo=<UTC>))[source]

Bases: hyperstream.channels.base_channel.BaseChannel

An abstract channel with a read-only set of memory-based streams. By default it is constructed empty with the last update at MIN_DATE. New streams and documents within streams are created with the update(up_to_timestamp) method, which ensures that the channel is up to date until up_to_timestamp. No documents nor streams are ever deleted. Any deriving class must override update_streams(up_to_timestamp) which must update self.streams to be calculated until up_to_timestamp exactly. The data structure self.streams is a dict of streams indexed by stream_id, each stream is a list of tuples (timestamp,data), in no specific order. Names and identifiers are the same in this channel.

create_stream(stream_id, sandbox=None)[source]
get_results(stream, time_interval)[source]
get_stream_writer(stream)[source]
update_state(up_to_timestamp)[source]

Call this function to ensure that the channel is up to date at the time of timestamp. I.e., all the streams that have been created before or at that timestamp are calculated exactly until up_to_timestamp.

update_streams(up_to_timestamp)[source]

Deriving classes must override this function

hyperstream.channels.module_channel module

class hyperstream.channels.module_channel.ModuleChannel(channel_id, path, up_to_timestamp=datetime.datetime(1, 1, 1, 0, 0, tzinfo=<UTC>))[source]

Bases: hyperstream.channels.file_channel.FileChannel

A channel of module streams, the documents in the streams contain functions that can be called to import the respective module

data_loader(short_path, tool_info)[source]
file_filter(sorted_file_names)[source]
update_state(up_to_timestamp)[source]
versions = None

hyperstream.channels.tool_channel module

class hyperstream.channels.tool_channel.ToolChannel(channel_id, path, up_to_timestamp=datetime.datetime(1, 1, 1, 0, 0, tzinfo=<UTC>))[source]

Bases: hyperstream.channels.module_channel.ModuleChannel

Special case of the file/module channel to load the tools to execute other streams

get_results(stream, time_interval)[source]

Module contents