userv package¶
Submodules¶
userv.inference module¶
A Inference Service module.
This module contains the implementation of the InferenceService and InferenceTask classes, which are used to run inference service.
- Classes:
- InferenceService: A concrete class implementing the MicroAppBase for inference Service,
provides a Restful API to run inference.
InferenceTask: A concrete class of MicroServiceTask for inference task.
- class userv.inference.InferenceService¶
Bases:
userv.uapp.MicroAppBase
A concrete class implementing the MicroAppBase for inference Service.
Inference Service provides a Restful API to run inference.
The InferenceService class is responsible for connecting to a runtime database, loading a model, and setting up the necessary connections to a queue and a broker for running inference on incoming data.
- Variables
_db (RuntimeDatabaseBase) – The runtime database instance.
_model_provider (str) – The model provider instance.
_model (Model) – The model instance.
_infer_queue_connector (InferQueueClientBase) – The inference queue connector instance.
_infer_broker_connector (StreamBrokerClientBase) – The inference broker connector instance.
_infer_engine_manager (InferEngineManager) – The inference engine manager instance.
_infer_info (InferenceInfo) – The inference information.
_inference_engine (InferenceEngine) – The inference engine instance.
_is_stopping (bool) – A boolean variable that flags if the service is stopping or not.
_pipeline_manager (PipelineManager) – The pipeline manager instance.
_zip_tool (ZipBase) – The zip tool instance.
- cleanup()¶
Cleanup.
Sets the stopping flag and performs necessary cleanup actions.
- property db: core.rtdb.RuntimeDatabaseBase¶
The runtime database instance.
If _db is None, this function will create a RuntimeDatabaseBase object and connect to runtime database.
- Returns
An instance of the runtime database.
- Return type
- Raises
NotImplementedError – If the runtime database type is not supported.
redis.exceptions.ConnectionError – If connection to the Redis server fails.
- property infer_broker_connector: core.streambroker.StreamBrokerClientBase¶
The inference broker connector instance.
If _infer_broker_connector is None, this function will create a StreamBrokerClientBase object and connect to stream broker server.
- Returns
The inference broker connector instance.
- Return type
- Raises
NotImplementedError – If the runtime database type is not supported.
redis.exceptions.ConnectionError – If connection to the Redis server fails.
- property infer_engine_manager: core.infereng.InferEngineManager¶
The inference engine manager instance.
If _infer_engine_manager is None, this function will create a InferEngineManager object.
- Returns
The inference engine manager instance.
- Return type
- property infer_info: core.infereng.InferenceInfo¶
The inference information.
If _infer_info is None, this function will create a InferenceInfo object.
- Returns
The inference information.
- Return type
- property infer_queue_connector: core.inferqueue.InferQueueClientBase¶
The inference queue connector instance.
If _infer_queue_connector is None, this function will create a InferQueueClientBase object and connect to queue server.
- Returns
The inference queue connector instance.
- Return type
- Raises
NotImplementedError – If the runtime database type is not supported.
redis.exceptions.ConnectionError – If connection to the Redis server fails.
- property inference_engine: core.infereng.InferenceEngine¶
The inference engine instance.
If _inference_engine is None, this function will create a InferenceEngine object.
- Returns
The inference engine instance.
- Return type
- Raises
NotImplementedError – If the framework is not supported.
- init()¶
Initialization.
- property model: core.model.Model¶
The model instance.
- property model_provider: str¶
The model provider instance.
The method is to instantiate the model provider instance.
- Returns
The model provider instance. “Not Implemented” in the current version.
- Return type
str
- property pipeline_manager: core.pipeline.PipelineManager¶
The pipeline manager instance.
If _pipeline_manager is None, this function will create a PipelineManager object.
- Returns
An instance of PipelineManager class.
- Return type
- run() bool ¶
The main execution function for the service.
- Returns
Whether execution was successful or not.
- Return type
bool
- property zip_tool: core.crypto.zip.ZipBase¶
The zip tool instance.
If _zip_tool is None, this function will create a ZipBase object.
- Returns
The zip tool instance.
- Return type
ZipBase
- Raises
NotImplementedError – If the runtime database type is not supported.
qat.exceptions.RuntimeError – If QAT initialize fails.
- class userv.inference.InferenceTask(inference_engine: core.infereng.InferenceEngine, infer_info: core.infereng.InferenceInfo, infer_queue_connector: core.inferqueue.InferQueueClientBase, infer_broker_connector: core.streambroker.StreamBrokerClientBase, pipeline_manager: core.pipeline.PipelineManager, fps_time_window: float, zip_tool: core.crypto.zip.ZipBase)¶
Bases:
userv.uapp.MicroServiceTask
A concrete class of MicroServiceTask for inference task.
- Variables
inference_engine (InferenceEngine) – The inference engine instance.
infer_info (InferenceInfo) – The inference information.
infer_queue_connector (InferQueueClientBase) – The inference queue connector instance.
infer_broker_connector (StreamBrokerClientBase) – The inference broker connector instance.
pipeline_manager (PipelineManager) – The pipeline manager instance.
infer_frame_count (dict) – A dictonary that saves the inference frame counts for every pipeline.
drop_frame_count (dict) – A dictonary that saves the drop frame counts for every pipeline.
infer_frame_count_sum (int) – The sum of inference frame counts.
drop_frame_count_sum (int) – The sum of drop frame counts.
infer_start_time (float) – The start time of inference/drop frame counting.
fps_time_window (float) – The time interval of a inference/drop fps calculation window.
zip_tool (ZipBase) – The zip tool.
- calculate_fps(frame: core.frame.Frame, drop_count: int) None ¶
Calculate infer/drop FPS and export metrics.
- Parameters
frame (Frame) – The frame being inferred when calculating FPS.
drop_count (int) – The count of dropped frames when calculating FPS.
- execute()¶
The task logic of inference task.
- export_fps_metrics(pipeline_id: str, elapsed_time: float) None ¶
Export the calculated FPS metrics and save FPS of pipeline to Pipeline-table.
- Parameters
pipeline_id (str) – The id of pipeline to save FPS to Pipeline-table.
elapsed_time (float) – The elapsed time to calculate FPS.
- process_frame(frame: bytes) core.frame.Frame ¶
Process the frame.
- Parameters
frame (bytes) – frame get from the infer queue.
- Returns
deserialized frame.
- Return type
- Raises
TypeError – If the type of frame is not bytes.
RuntimeError – If any errors while decoding or decompressing.
- reset_frame_counts() None ¶
Reset the frame counts for the next calculation window.
- stop()¶
Stop the inference task.
- update_frame_counts(pipeline_id: str, drop_count: int) None ¶
Update the frame counts for inference and drop.
- Parameters
pipeline_id (str) – The id of pipeline to update the frame counts.
drop_count (int) – The count of dropped frames when updating the frame counts.
- userv.inference.entry() None ¶
The entry of Inference Service.
userv.pipeline_server module¶
A Pipeline Sever module.
Pipeline Server provides a Restful API to get pipeline data.
- Classes:
PipelineService: A class for pipeline server services to get pipeline data.
- functinos:
_get_env: A function to get environment variable. api_get_pipelines: Restful API for getting pipeline data. health_check: Health check endpoint.
- class userv.pipeline_server.PipelineService¶
Bases:
object
A class for pipeline server services to get pipeline data.
- Variables
_db (RuntimeDatabaseBase) – The RuntimeDatabaseBase object for PipelineService to use.
- INFER_ENGINE_TABLE = 'InferEngine-table'¶
- PIPELINE_TABLE = 'Pipeline-table'¶
- property db: core.rtdb.RuntimeDatabaseBase¶
The runtime database.
If _db is None, this function will create a RuntimeDatabaseBase object and connect to runtime database.
- Returns
An instance of the RuntimeDatabaseBase class.
- Return type
- Raises
NotImplementedError – If the runtime database type is not supported.
redis.exceptions.ConnectionError – If connection to the Redis server fails.
- classmethod inst()¶
Singleton instance.
- Returns
An instance of the PipelineService class.
- Return type
- property pipelines: list¶
The pipeline list.
- Returns
List of pipeline dictionaries.
- Return type
list
- Raises
ValueError – If the pipeline table does not exist.
- property pipelines_have_changed: bool¶
Check if the pipelines have changed since the last check.
- Returns
True if pipelines have changed, False otherwise.
- Return type
bool
- userv.pipeline_server.api_get_pipelines() flask.wrappers.Response ¶
Restful API for getting pipeline data.
- Returns
Flask response object with JSON data and status.
- Return type
Response
- userv.pipeline_server.health_check() flask.wrappers.Response ¶
Health check endpoint.
- Returns
Flask response object with message and status.
- Return type
Response
- userv.pipeline_server.sse()¶
Server Sent Events (SSE) endpoint.
- Returns
Flask response object with JSON data and status.
- Return type
Response
userv.streaming module¶
A Streaming Service module.
This module contains the implementation of the StreamingService and StreamingTask classes, which are used to run streaming service.
- Classes:
StreamingService: A concrete class implementing the MicroAppBase for streaming Service. StreamingTask: A concrete class of MicroServiceTask for streaming task.
- class userv.streaming.StreamingService¶
Bases:
userv.uapp.MicroAppBase
A concrete class implementing the MicroAppBase for Streaming Service.
Streaming service, read frames from the stream provider, and process frames, then publish frames to inference queue.
- Variables
_db (RuntimeDatabaseBase) – The runtime database instance.
_provider (StreamProvider) – The stream provider instance.
_infer_engine_manager (InferEngineManager) – The inference engine manager instance.
_pipeline_manager (PipelineManager) – The pipeline manager instance.
_infer_engine_info (Optional[InferenceInfo]) – The infer engine info instance.
_pipeline (Pipeline) – The pipeline instance.
_is_stopping (bool) – A boolean variable that flags if the service is stopping or not.
_infer_queue_connector (InferQueueClientBase) – The inference queue connector instance.
_zip_tool (ZipBase) – The zip tool instance.
- cleanup()¶
Cleanup.
Sets the stopping flag and performs necessary cleanup actions.
- property db: core.rtdb.RuntimeDatabaseBase¶
The runtime database instance.
If _db is None, this function will create a RuntimeDatabaseBase object and connect to runtime database.
- Returns
An instance of the RuntimeDatabaseBase class.
- Return type
- Raises
NotImplementedError – If the runtime database type is not supported yet.
redis.exceptions.ConnectionError – If connection to the Redis server fails and reconnection exceeds the limit.
- property infer_engine_info: Optional[core.infereng.InferenceInfo]¶
The infer engine info instance.
If infer_engine_info is None, this function will search the matching inference engine.
- Returns
An instance of InferenceInfo class. None: The requested inference engine was not found.
- Return type
- property infer_engine_manager: core.infereng.InferEngineManager¶
The inference engine manager instance.
If _infer_engine_manager is None, this function will create a InferEngineManager object.
- Returns
An instance of InferEngineManager class.
- Return type
- property infer_queue_connector: core.inferqueue.InferQueueClientBase¶
The inference queue connector instance.
If _infer_queue_connector is None, this function will create a InferQueueClientBase object and connect to queue server.
- Returns
An instance of InferQueueClientBase class.
- Return type
- Raises
NotImplementedError – If the queue type is not supported.
redis.exceptions.ConnectionError – If connection to the Redis server fails and reconnection exceeds the limit.
- init()¶
Initialization.
- property pipeline: core.pipeline.Pipeline¶
The pipeline instance.
- property pipeline_manager: core.pipeline.PipelineManager¶
The pipeline manager instance.
If _pipeline_manager is None, this function will create a PipelineManager object.
- Returns
An instance of PipelineManager class.
- Return type
- property provider: core.stream.StreamProvider¶
The stream provider instance.
If _provider is None, this function will create a StreamProvider object.
- Returns
An instance of StreamProvider class.
- Return type
- Raises
NotImplementedError – If the file database type is not supported.
ValueError – If the provider type is invalid.
- run() bool ¶
The main logic of streaming service.
- Returns
A bool object that flags whether the streaming service run sunccessfully.
- property zip_tool: core.crypto.zip.ZipBase¶
The zip tool instance.
If _zip_tool is None, this function will create a ZipBase object.
- Returns
The zip tool instance.
- Return type
ZipBase
- Raises
NotImplementedError – If the runtime database type is not supported.
qat.exceptions.RuntimeError – If QAT initialize fails.
- class userv.streaming.StreamingTask(provider: core.stream.StreamProvider, infer_engine_info: core.infereng.InferenceInfo, infer_queue_connector: core.inferqueue.InferQueueClientBase, pipeline_id: str, zip_tool: core.crypto.zip.ZipBase)¶
Bases:
userv.uapp.MicroServiceTask
A concrete class of MicroServiceTask for streaming task.
- Variables
provider (StreamProvider) – The stream provider of streaming task.
infer_engine_info (InferenceInfo) – The inference information of streaming task.
infer_queue_connector (InferQueueClientBase) – The inference queue client of streaming task.
pipeline_id (str) – The pipeline id of streaming task.
zip_tool (ZipBase) – The zip tool.
- execute()¶
The task logic of streaming task.
- process_frame(frame: core.frame.Frame) bytes ¶
Process the frame.
- Parameters
frame (Frame) – The Frame need to publish.
- Returns
serialized frame.
- Return type
bytes
- Raises
OverflowError – If compression integer overflow occurs.
ValueError – If the frame length is invalid for compression.
RuntimeError – If any errors while encoding or compressing.
- stop()¶
Stop the streaming task.
- userv.streaming.entry() None ¶
The entry of Streaming Service.
userv.uapp module¶
A Uapp module.
Application template for Micro Service apps and tasks classes.
- Classes:
MicroServiceTask: A base class for micro service task of threading based Task. MicroAppBase: An abstract base class for micro service app.
- class userv.uapp.MicroAppBase¶
Bases:
abc.ABC
An abstract base class for micro service app.
A cloud native application should accept the parameters from environment variable. There might be one or more tasks. Accept the interrupt for kubernetes lifecycle management, etc.
- abstract cleanup()¶
Cleanup.
- Raises
NotImplementedError – If the subclasses don’t implement the method.
- static get_env(key, default: Optional[Any] = None) Optional[Any] ¶
Get environment variable.
- Parameters
key (str) – The name of the environment variable.
default (Optional[Any]) – The default value to return if the environment variable does not exist.
- Returns
The value of the environment variable or the default value.
- Return type
Optional[Any]
- abstract init()¶
Initialization entry for an application instance.
- Raises
NotImplementedError – If the subclasses don’t implement the method.
- abstract run()¶
Application main logic provided by inherited class.
- Raises
NotImplementedError – If the subclasses don’t implement the method.
- run_and_wait_task()¶
Run and wait all tasks.
- stop()¶
Stop an application instance.
- class userv.uapp.MicroServiceTask(name: Optional[str] = None, exec_func: Optional[Callable] = None, is_asyncio: bool = False)¶
Bases:
threading.Thread
A base class for micro service task of threading based Task.
In general, a cloud native app may consists several threading tasks.
- Variables
_is_asyncio (bool) – A boolean variable that flags if the task is asyncio or not.
_exec_func (Callable) – The execute function of the micro service task.
_event_loop (asyncio.AbstractEventLoop) – The event loop of the asyncio tasks.
_is_task_stopping (bool) – A boolean variable that flags if the task is stopping or not.
_name (str) – The name of the micro service task.
- execute()¶
The task logic of micro service task.
Inherited class should implement customized task logic.
- Raises
NotImplementedError – If the subclasses don’t implement the method.
- property is_task_stopping¶
A boolean variable that flags if the task is stopping or not.
- property name¶
The name of the micro service task.
- run()¶
Main entry for a task.
- stop()¶
Stop a running task.
- classmethod stop_all_tasks()¶
Stops all tasks.
- classmethod wait_all_tasks_end()¶
Wait all task end.
userv.websocket_server module¶
A WebSocket Server module.
The WebSocket Server subscribes to all inferred streams from the Redis broker and hosts a WebSocket server to publish the streams to an HTML5 based front-end SPA (Single Page Application).
It is based on asyncio and coroutine programming models since most operations are IO bound.
- Classes:
- StreamWebSocketServer: A class for Stream WebSocket Server to publish streams via WebSocket
from the broker.
- class userv.websocket_server.StreamWebSocketServer¶
Bases:
object
A class for Stream WebSocket Server to publish streams via WebSocket from the broker.
- Variables
_stream_broker_redis_host (str) – The host ip or hostname of stream broker Redis server.
_stream_broker_redis_port (int) – The host port of stream broker Redis server.
_ws_send_time_out (float) – The time out for websocket send message.
_pipelines (dict) – A dictionary object representing the stream publish tasks for every pipelines.
_users (dict) – A dictionary object representing all established WebSocket connections.
- run()¶
The main entry of WebSocket Server.
Module contents¶
The user micro service.
The package contains the user micro service, which is responsible for handling user requests and responses.