userv package

Submodules

userv.inference module

A Inference Service module.

This module contains the implementation of the InferenceService and InferenceTask classes, which are used to run inference service.

Classes:
InferenceService: A concrete class implementing the MicroAppBase for inference Service,

provides a Restful API to run inference.

InferenceTask: A concrete class of MicroServiceTask for inference task.

class userv.inference.InferenceService

Bases: userv.uapp.MicroAppBase

A concrete class implementing the MicroAppBase for inference Service.

Inference Service provides a Restful API to run inference.

The InferenceService class is responsible for connecting to a runtime database, loading a model, and setting up the necessary connections to a queue and a broker for running inference on incoming data.

Variables
  • _db (RuntimeDatabaseBase) – The runtime database instance.

  • _model_provider (str) – The model provider instance.

  • _model (Model) – The model instance.

  • _infer_queue_connector (InferQueueClientBase) – The inference queue connector instance.

  • _infer_broker_connector (StreamBrokerClientBase) – The inference broker connector instance.

  • _infer_engine_manager (InferEngineManager) – The inference engine manager instance.

  • _infer_info (InferenceInfo) – The inference information.

  • _inference_engine (InferenceEngine) – The inference engine instance.

  • _is_stopping (bool) – A boolean variable that flags if the service is stopping or not.

  • _pipeline_manager (PipelineManager) – The pipeline manager instance.

  • _zip_tool (ZipBase) – The zip tool instance.

cleanup()

Cleanup.

Sets the stopping flag and performs necessary cleanup actions.

property db: core.rtdb.RuntimeDatabaseBase

The runtime database instance.

If _db is None, this function will create a RuntimeDatabaseBase object and connect to runtime database.

Returns

An instance of the runtime database.

Return type

RuntimeDatabaseBase

Raises
  • NotImplementedError – If the runtime database type is not supported.

  • redis.exceptions.ConnectionError – If connection to the Redis server fails.

property infer_broker_connector: core.streambroker.StreamBrokerClientBase

The inference broker connector instance.

If _infer_broker_connector is None, this function will create a StreamBrokerClientBase object and connect to stream broker server.

Returns

The inference broker connector instance.

Return type

StreamBrokerClientBase

Raises
  • NotImplementedError – If the runtime database type is not supported.

  • redis.exceptions.ConnectionError – If connection to the Redis server fails.

property infer_engine_manager: core.infereng.InferEngineManager

The inference engine manager instance.

If _infer_engine_manager is None, this function will create a InferEngineManager object.

Returns

The inference engine manager instance.

Return type

InferEngineManager

property infer_info: core.infereng.InferenceInfo

The inference information.

If _infer_info is None, this function will create a InferenceInfo object.

Returns

The inference information.

Return type

InferenceInfo

property infer_queue_connector: core.inferqueue.InferQueueClientBase

The inference queue connector instance.

If _infer_queue_connector is None, this function will create a InferQueueClientBase object and connect to queue server.

Returns

The inference queue connector instance.

Return type

InferQueueClientBase

Raises
  • NotImplementedError – If the runtime database type is not supported.

  • redis.exceptions.ConnectionError – If connection to the Redis server fails.

property inference_engine: core.infereng.InferenceEngine

The inference engine instance.

If _inference_engine is None, this function will create a InferenceEngine object.

Returns

The inference engine instance.

Return type

InferenceEngine

Raises

NotImplementedError – If the framework is not supported.

init()

Initialization.

property model: core.model.Model

The model instance.

property model_provider: str

The model provider instance.

The method is to instantiate the model provider instance.

Returns

The model provider instance. “Not Implemented” in the current version.

Return type

str

property pipeline_manager: core.pipeline.PipelineManager

The pipeline manager instance.

If _pipeline_manager is None, this function will create a PipelineManager object.

Returns

An instance of PipelineManager class.

Return type

PipelineManager

run() bool

The main execution function for the service.

Returns

Whether execution was successful or not.

Return type

bool

property zip_tool: core.crypto.zip.ZipBase

The zip tool instance.

If _zip_tool is None, this function will create a ZipBase object.

Returns

The zip tool instance.

Return type

ZipBase

Raises
  • NotImplementedError – If the runtime database type is not supported.

  • qat.exceptions.RuntimeError – If QAT initialize fails.

class userv.inference.InferenceTask(inference_engine: core.infereng.InferenceEngine, infer_info: core.infereng.InferenceInfo, infer_queue_connector: core.inferqueue.InferQueueClientBase, infer_broker_connector: core.streambroker.StreamBrokerClientBase, pipeline_manager: core.pipeline.PipelineManager, fps_time_window: float, zip_tool: core.crypto.zip.ZipBase)

Bases: userv.uapp.MicroServiceTask

A concrete class of MicroServiceTask for inference task.

Variables
  • inference_engine (InferenceEngine) – The inference engine instance.

  • infer_info (InferenceInfo) – The inference information.

  • infer_queue_connector (InferQueueClientBase) – The inference queue connector instance.

  • infer_broker_connector (StreamBrokerClientBase) – The inference broker connector instance.

  • pipeline_manager (PipelineManager) – The pipeline manager instance.

  • infer_frame_count (dict) – A dictonary that saves the inference frame counts for every pipeline.

  • drop_frame_count (dict) – A dictonary that saves the drop frame counts for every pipeline.

  • infer_frame_count_sum (int) – The sum of inference frame counts.

  • drop_frame_count_sum (int) – The sum of drop frame counts.

  • infer_start_time (float) – The start time of inference/drop frame counting.

  • fps_time_window (float) – The time interval of a inference/drop fps calculation window.

  • zip_tool (ZipBase) – The zip tool.

calculate_fps(frame: core.frame.Frame, drop_count: int) None

Calculate infer/drop FPS and export metrics.

Parameters
  • frame (Frame) – The frame being inferred when calculating FPS.

  • drop_count (int) – The count of dropped frames when calculating FPS.

execute()

The task logic of inference task.

export_fps_metrics(pipeline_id: str, elapsed_time: float) None

Export the calculated FPS metrics and save FPS of pipeline to Pipeline-table.

Parameters
  • pipeline_id (str) – The id of pipeline to save FPS to Pipeline-table.

  • elapsed_time (float) – The elapsed time to calculate FPS.

process_frame(frame: bytes) core.frame.Frame

Process the frame.

Parameters

frame (bytes) – frame get from the infer queue.

Returns

deserialized frame.

Return type

Frame

Raises
  • TypeError – If the type of frame is not bytes.

  • RuntimeError – If any errors while decoding or decompressing.

reset_frame_counts() None

Reset the frame counts for the next calculation window.

stop()

Stop the inference task.

update_frame_counts(pipeline_id: str, drop_count: int) None

Update the frame counts for inference and drop.

Parameters
  • pipeline_id (str) – The id of pipeline to update the frame counts.

  • drop_count (int) – The count of dropped frames when updating the frame counts.

userv.inference.entry() None

The entry of Inference Service.

userv.pipeline_server module

A Pipeline Sever module.

Pipeline Server provides a Restful API to get pipeline data.

Classes:

PipelineService: A class for pipeline server services to get pipeline data.

functinos:

_get_env: A function to get environment variable. api_get_pipelines: Restful API for getting pipeline data. health_check: Health check endpoint.

class userv.pipeline_server.PipelineService

Bases: object

A class for pipeline server services to get pipeline data.

Variables

_db (RuntimeDatabaseBase) – The RuntimeDatabaseBase object for PipelineService to use.

INFER_ENGINE_TABLE = 'InferEngine-table'
PIPELINE_TABLE = 'Pipeline-table'
property db: core.rtdb.RuntimeDatabaseBase

The runtime database.

If _db is None, this function will create a RuntimeDatabaseBase object and connect to runtime database.

Returns

An instance of the RuntimeDatabaseBase class.

Return type

RuntimeDatabaseBase

Raises
  • NotImplementedError – If the runtime database type is not supported.

  • redis.exceptions.ConnectionError – If connection to the Redis server fails.

classmethod inst()

Singleton instance.

Returns

An instance of the PipelineService class.

Return type

PipelineService

property pipelines: list

The pipeline list.

Returns

List of pipeline dictionaries.

Return type

list

Raises

ValueError – If the pipeline table does not exist.

property pipelines_have_changed: bool

Check if the pipelines have changed since the last check.

Returns

True if pipelines have changed, False otherwise.

Return type

bool

userv.pipeline_server.api_get_pipelines() flask.wrappers.Response

Restful API for getting pipeline data.

Returns

Flask response object with JSON data and status.

Return type

Response

userv.pipeline_server.health_check() flask.wrappers.Response

Health check endpoint.

Returns

Flask response object with message and status.

Return type

Response

userv.pipeline_server.sse()

Server Sent Events (SSE) endpoint.

Returns

Flask response object with JSON data and status.

Return type

Response

userv.streaming module

A Streaming Service module.

This module contains the implementation of the StreamingService and StreamingTask classes, which are used to run streaming service.

Classes:

StreamingService: A concrete class implementing the MicroAppBase for streaming Service. StreamingTask: A concrete class of MicroServiceTask for streaming task.

class userv.streaming.StreamingService

Bases: userv.uapp.MicroAppBase

A concrete class implementing the MicroAppBase for Streaming Service.

Streaming service, read frames from the stream provider, and process frames, then publish frames to inference queue.

Variables
  • _db (RuntimeDatabaseBase) – The runtime database instance.

  • _provider (StreamProvider) – The stream provider instance.

  • _infer_engine_manager (InferEngineManager) – The inference engine manager instance.

  • _pipeline_manager (PipelineManager) – The pipeline manager instance.

  • _infer_engine_info (Optional[InferenceInfo]) – The infer engine info instance.

  • _pipeline (Pipeline) – The pipeline instance.

  • _is_stopping (bool) – A boolean variable that flags if the service is stopping or not.

  • _infer_queue_connector (InferQueueClientBase) – The inference queue connector instance.

  • _zip_tool (ZipBase) – The zip tool instance.

cleanup()

Cleanup.

Sets the stopping flag and performs necessary cleanup actions.

property db: core.rtdb.RuntimeDatabaseBase

The runtime database instance.

If _db is None, this function will create a RuntimeDatabaseBase object and connect to runtime database.

Returns

An instance of the RuntimeDatabaseBase class.

Return type

RuntimeDatabaseBase

Raises
  • NotImplementedError – If the runtime database type is not supported yet.

  • redis.exceptions.ConnectionError – If connection to the Redis server fails and reconnection exceeds the limit.

property infer_engine_info: Optional[core.infereng.InferenceInfo]

The infer engine info instance.

If infer_engine_info is None, this function will search the matching inference engine.

Returns

An instance of InferenceInfo class. None: The requested inference engine was not found.

Return type

InferenceInfo

property infer_engine_manager: core.infereng.InferEngineManager

The inference engine manager instance.

If _infer_engine_manager is None, this function will create a InferEngineManager object.

Returns

An instance of InferEngineManager class.

Return type

InferEngineManager

property infer_queue_connector: core.inferqueue.InferQueueClientBase

The inference queue connector instance.

If _infer_queue_connector is None, this function will create a InferQueueClientBase object and connect to queue server.

Returns

An instance of InferQueueClientBase class.

Return type

InferQueueClientBase

Raises
  • NotImplementedError – If the queue type is not supported.

  • redis.exceptions.ConnectionError – If connection to the Redis server fails and reconnection exceeds the limit.

init()

Initialization.

property pipeline: core.pipeline.Pipeline

The pipeline instance.

property pipeline_manager: core.pipeline.PipelineManager

The pipeline manager instance.

If _pipeline_manager is None, this function will create a PipelineManager object.

Returns

An instance of PipelineManager class.

Return type

PipelineManager

property provider: core.stream.StreamProvider

The stream provider instance.

If _provider is None, this function will create a StreamProvider object.

Returns

An instance of StreamProvider class.

Return type

StreamProvider

Raises
  • NotImplementedError – If the file database type is not supported.

  • ValueError – If the provider type is invalid.

run() bool

The main logic of streaming service.

Returns

A bool object that flags whether the streaming service run sunccessfully.

property zip_tool: core.crypto.zip.ZipBase

The zip tool instance.

If _zip_tool is None, this function will create a ZipBase object.

Returns

The zip tool instance.

Return type

ZipBase

Raises
  • NotImplementedError – If the runtime database type is not supported.

  • qat.exceptions.RuntimeError – If QAT initialize fails.

class userv.streaming.StreamingTask(provider: core.stream.StreamProvider, infer_engine_info: core.infereng.InferenceInfo, infer_queue_connector: core.inferqueue.InferQueueClientBase, pipeline_id: str, zip_tool: core.crypto.zip.ZipBase)

Bases: userv.uapp.MicroServiceTask

A concrete class of MicroServiceTask for streaming task.

Variables
  • provider (StreamProvider) – The stream provider of streaming task.

  • infer_engine_info (InferenceInfo) – The inference information of streaming task.

  • infer_queue_connector (InferQueueClientBase) – The inference queue client of streaming task.

  • pipeline_id (str) – The pipeline id of streaming task.

  • zip_tool (ZipBase) – The zip tool.

execute()

The task logic of streaming task.

process_frame(frame: core.frame.Frame) bytes

Process the frame.

Parameters

frame (Frame) – The Frame need to publish.

Returns

serialized frame.

Return type

bytes

Raises
  • OverflowError – If compression integer overflow occurs.

  • ValueError – If the frame length is invalid for compression.

  • RuntimeError – If any errors while encoding or compressing.

stop()

Stop the streaming task.

userv.streaming.entry() None

The entry of Streaming Service.

userv.uapp module

A Uapp module.

Application template for Micro Service apps and tasks classes.

Classes:

MicroServiceTask: A base class for micro service task of threading based Task. MicroAppBase: An abstract base class for micro service app.

class userv.uapp.MicroAppBase

Bases: abc.ABC

An abstract base class for micro service app.

A cloud native application should accept the parameters from environment variable. There might be one or more tasks. Accept the interrupt for kubernetes lifecycle management, etc.

abstract cleanup()

Cleanup.

Raises

NotImplementedError – If the subclasses don’t implement the method.

static get_env(key, default: Optional[Any] = None) Optional[Any]

Get environment variable.

Parameters
  • key (str) – The name of the environment variable.

  • default (Optional[Any]) – The default value to return if the environment variable does not exist.

Returns

The value of the environment variable or the default value.

Return type

Optional[Any]

abstract init()

Initialization entry for an application instance.

Raises

NotImplementedError – If the subclasses don’t implement the method.

abstract run()

Application main logic provided by inherited class.

Raises

NotImplementedError – If the subclasses don’t implement the method.

run_and_wait_task()

Run and wait all tasks.

stop()

Stop an application instance.

class userv.uapp.MicroServiceTask(name: Optional[str] = None, exec_func: Optional[Callable] = None, is_asyncio: bool = False)

Bases: threading.Thread

A base class for micro service task of threading based Task.

In general, a cloud native app may consists several threading tasks.

Variables
  • _is_asyncio (bool) – A boolean variable that flags if the task is asyncio or not.

  • _exec_func (Callable) – The execute function of the micro service task.

  • _event_loop (asyncio.AbstractEventLoop) – The event loop of the asyncio tasks.

  • _is_task_stopping (bool) – A boolean variable that flags if the task is stopping or not.

  • _name (str) – The name of the micro service task.

execute()

The task logic of micro service task.

Inherited class should implement customized task logic.

Raises

NotImplementedError – If the subclasses don’t implement the method.

property is_task_stopping

A boolean variable that flags if the task is stopping or not.

property name

The name of the micro service task.

run()

Main entry for a task.

stop()

Stop a running task.

classmethod stop_all_tasks()

Stops all tasks.

classmethod wait_all_tasks_end()

Wait all task end.

userv.websocket_server module

A WebSocket Server module.

The WebSocket Server subscribes to all inferred streams from the Redis broker and hosts a WebSocket server to publish the streams to an HTML5 based front-end SPA (Single Page Application).

It is based on asyncio and coroutine programming models since most operations are IO bound.

Classes:
StreamWebSocketServer: A class for Stream WebSocket Server to publish streams via WebSocket

from the broker.

class userv.websocket_server.StreamWebSocketServer

Bases: object

A class for Stream WebSocket Server to publish streams via WebSocket from the broker.

Variables
  • _stream_broker_redis_host (str) – The host ip or hostname of stream broker Redis server.

  • _stream_broker_redis_port (int) – The host port of stream broker Redis server.

  • _ws_send_time_out (float) – The time out for websocket send message.

  • _pipelines (dict) – A dictionary object representing the stream publish tasks for every pipelines.

  • _users (dict) – A dictionary object representing all established WebSocket connections.

run()

The main entry of WebSocket Server.

Module contents

The user micro service.

The package contains the user micro service, which is responsible for handling user requests and responses.