core package

Subpackages

Submodules

core.filedb module

A FileDatabase module.

This module provides an object-oriented design for a file database to load video files.

Classes:

FileDatabase: An abstract base class for creating custom filedatabase implementations. LocalFileDatabase: A concrete class implementing the FileDatabase for local file operations.

class core.filedb.FileDatabase

Bases: abc.ABC

An abstract base class for creating custom file database implementations.

This class serves as a blueprint for subclasses that need to implement the get_file method for different types of file databases.

abstract get_file(filename: str) str

Abstract method for getting a file from the file database.

This method is expected to retrieve a file from the file database and return the local path for the file.

Parameters

filename (str) – The name of the file to get.

Returns

The local path for the file.

Return type

str

Raises

NotImplementedError – if the subclass does not implement this method.

class core.filedb.LocalFileDatabase(root_dir: str)

Bases: core.filedb.FileDatabase

A concrete class implementing the FileDatabase for local file operations.

This class uses local filesystem to implement the get_file method defined in the FileDatabase abstract base class.

Variables

_root_dir (str) – The root directory path for local file operations.

get_file(filename: str) str

Gets the local path for a file.

This method uses the root directory to find the given file and returns its local path.

Parameters

filename (str) – The name of the file to get.

Returns

The local path for the file.

Return type

str

Raises

FileNotFoundError – If the root_dir is invalid or failed to find the given file.

core.frame module

A Frame module.

This module contains the definition of frame class, which encapsulates raw frame and related information for transferring data between micro services, it can be simply encoded and decoded by protobuf. A frame should include enough information for traceability, observability, etc.

In addition, this module provides an object-oriented design for frame cipher to encrypt and decrypt frame.

Classes:
Frame: A class that encapsulates raw frame and related information, can be simply encoded and

decoded by protobuf.

FrameCipherBase: An abstract base class for creating custom frame cipher implementations. QATFrameCipher: A concrete class implementing encrypt and decrypt frame accelerated with QAT.

class core.frame.Frame(provider: core.stream.StreamProvider, pipeline_id: str, sequence: int, raw_frame: numpy.ndarray)

Bases: object

A class that encapsulates raw frame and related information.

The Frame include enough information for traceability, observability, and can be simply encoded and decoded by protobuf.

Variables
  • _provider (StreamProvider) – The stream provider of the frame.

  • _pipeline_id (str) – The id of the pipeline to which the frame belongs.

  • _sequence (int) – The monolithic counter of the frame sequence number.

  • _raw (np.ndarray) – The raw frame of the frame.

  • _ts_new (float) – The timestamp of the beginning of this new frame.

  • _ts_infer_end (float) – The timestamp of the inference’s end point of this frame.

decrypt(actor: core.frame.FrameCipherBase) None

Decrypts the frame with a given frame cipher after transferring it into TEE.

The method should be called after transferring the frame into TEE.

Parameters

actor (FrameCipherBase) – The frame cipher to use for decryption.

encrypt(actor: core.frame.FrameCipherBase) None

Encrypts the frame with a given frame cipher before transferring it outside of TEE.

The method should be called before transferring the frame outside of TEE.

Parameters

actor (FrameCipherBase) – The frame cipher to use for encryption.

static from_blob(blob: bytes) core.frame.Frame

Restore a frame instance from a blob.

This method uses protobuf to deserialize a blob to a Frame.

Parameters

blob (bytes) – The blob to restore.

Returns

The frame restored from a blob.

Return type

Frame

Raises
  • TypeError – If the type of ‘blob’ argument is not bytes type.

  • RuntimeError – If any errors while decoding.

classmethod get_sequence() int

Get the monolithic count for the sequence number of a frame.

The method will return the last sequence number plus one, and update the last sequence number to the new value.

Returns

The last sequence number.

Return type

int

last_sequence = 0
normalize(target_size: Tuple[int, int]) None

Normalize the frame to the target size.

This method uses OpenCV to resize the frame to the target size.

Parameters

target_size (Tuple[int, int]) – The target size for frame to normalize.

Raises

ValueError – If the target_size is invalid.

property pipeline_id: str

The id of the pipeline to which the frame belongs.

Type

str

property raw: numpy.ndarray

The raw frame of the frame.

Type

np.ndarray

property sequence: int

The monolithic counter of the frame sequence number.

Type

int

property timestamp_infer_end: float

The timestamp of the inference’s end point of this frame.

Type

float

property timestamp_new_frame: float

The timestamp of the beginning of this new frame.

Type

float

to_blob() bytes

Encode the raw frame to a blob for transferring to the infer queue.

This method uses protobuf to serialize a Frame into a blob.

Returns

The blob encoded from raw frame.

Return type

bytes

Raises

RuntimeError – If any errors while encoding.

class core.frame.FrameCipherBase

Bases: abc.ABC

An Abstract base class for creating custom frame cipher implementations.

This class serves as a blueprint for subclass that need to implement encrypt and decrypt method for different accelerators.

abstract decrypt() None

Decrypt the frame.

The method is to decrypt the encrypted frame.

Raises

NotImplementedError – If the subclasses don’t implement the method.

abstract encrypt() None

Encrypt the frame.

The method is to encrypt the frame to protect the privacy of the frame.

Raises

NotImplementedError – If the subclasses don’t implement the method.

class core.frame.QATFrameCipher

Bases: core.frame.FrameCipherBase

Class that uses QAT to accelerate frame encryption and decryption.

This class implements the encrypt and decrypt methods from the FrameCipherBase abstract base class.

decrypt() None

See base class.

encrypt() None

See base class.

core.infereng module

A Inference Engine module.

This module contains the definition of InferenceInfo class, which encapsulates the inference information. It also contains the definition of InferEngineManager class, which manages inference engines.

In addition, this module provides an object-oriented design for inference engine.

Classes:

InferenceInfo: A class that encapsulates inference information. InferEngineManager: A class to manage inference engines. InferenceEngine: An abstract base class for creating custom inference engine implementations.

class core.infereng.InferEngineManager(db: core.rtdb.RuntimeDatabaseBase)

Bases: object

The class to manage inference engines.

This class is the class that manages inference engines, it provides register_engine and unregister_engine methods to register and unregister inference engines, and provides search_engine method to search available engines.

Variables

_db (RuntimeDatabaseBase) – The RuntimeDatabaseBase object for InferEngineManager to use.

register_engine(infer_info: core.infereng.InferenceInfo, model_info: core.model.ModelInfo) None

Register the infer engine to database.

This method is used to register the infer engine to database.

Parameters
  • infer_info (InferenceInfo) – The InferenceInfo object to register.

  • model_info (ModelInfo) – The ModelInfo object to register.

search_engine(framework: str, target: str, device: str, model_name: str, model_version: str) Optional[core.infereng.InferenceInfo]

Search inference engine.

This method is uesd to search the engine according to the framework and target requested, return the InferenceInfo include the UUID of infer engine.

Parameters
  • framework (str) – Framework of the model

  • target (str) – Target of the model

  • device (str) – Device type for infer engine

  • model_name (str) – Name of the model

  • model_version (str) – Version of the model

Returns

An InferenceInfo object if engine is found, None otherwise.

Return type

InferenceInfo

unregister_engine(infer_info_id: str) None

Unregister the infer engine from database.

This method is used to unregister the infer engine from database.

Parameters

infer_info_id (str) – The UUID of Infer Engine Info to unregister.

class core.infereng.InferenceEngine

Bases: abc.ABC

An abstract base class for creating custom inference engine implementations.

This class serves as a blueprint for subclasses that need to implement the verify, preprocess, postprocess, _predict methods for different inference frameworks.

abstract postprocess(frame: numpy.ndarray, outputs: dict) numpy.ndarray

Postprocesses the output from the inference process.

The method is to postprocess the output from the inference process.

Parameters
  • frame (np.ndarray) – The input frame.

  • outputs (dict) – The output result from the inference process.

Returns

The postprocessed result.

Return type

np.ndarray

Raises

NotImplementedError – If this method is not implemented by a subclass.

predict(frame: numpy.ndarray) Tuple[numpy.ndarray, float]

Performs inference using the loaded model and input data, and measures the latency.

The method is to perform inference using the loaded model and input data, and measures the latency.

Parameters

frame (np.ndarray) – The input data to use for inference.

Returns

The postprocessed result and the latency in seconds.

Return type

Tuple[np.ndarray, float]

abstract preprocess(frame: numpy.ndarray) numpy.ndarray

Preprocesses the input data.

The method is to preprocess the input data before feeding it to the model.

Parameters

frame (np.ndarray) – The input data to preprocess.

Returns

The preprocessed input data.

Return type

np.ndarray

Raises

NotImplementedError – If this method is not implemented by a subclass.

abstract verify() bool

Checks if the model is valid for inference.

The method is to check if the model is valid for inference.

Returns

True if the model is valid, otherwise False.

Return type

bool

Raises

NotImplementedError – If this method is not implemented by a subclass.

class core.infereng.InferenceInfo(device: str, model_id: str)

Bases: object

Inference information class.

This class encapsulates the inference information, it can be simply encoded and decoded by protobuf. A inference information should include enough information for traceability, observability, etc.

Variables
  • _device (str) – The device type for infer engine.

  • _model_id (str) – The UUID for infer model.

  • _id (str) – The UUID for Infer Engine Info.

  • _queue_topic (str) – The queue topic for infer engine.

  • _input_size (Tuple[int, int]) – The input size tuple required by infer model.

property device: str

The device type for infer engine.

Type

str

property id: str

The UUID for Infer Engine Info.

Type

str

property input_size: Tuple[int, int]

The input size tuple required by infer model.

Type

Tuple[int, int]

property model_id: str

The UUID for infer model.

Type

str

property queue_topic: str

The queue topic for infer engine.

Type

str

core.inferqueue module

A Inferqueue module.

This module provides an object-oriented design for inference queue client to connect with inference queue server, publish frame to queue server, get frame from queue server, drop frames in queue server judge the availability for the topic in queue server, register topic in queue server, unregister topic in queue server.

Classes:

InferQueueClientBase: An abstract base class for inference queue client. RedisInferQueueClient: A concrete class implementing the InferQueueClientBase for Redis server. KafkaInferQueueClient: A concrete class implementing the InferQueueClientBase for Kafka server.

class core.inferqueue.InferQueueClientBase

Bases: abc.ABC

An abstract base class for inference queue client.

This class serves as a blueprint for subclasses that need to implement connect, publish_frame, get_frame, drop, infer_queue_available, register_infer_queue, unregister_infer_queue methods for different types of queue server.

Variables

_buffer_len (int) – The buffer length for inference queue.

MAX_RECONNECTION_TIMES = 5
property buffer_len: int

The buffer length for infer queue, default 32.

Type

int

abstract connect(host: str, port: int)

Connect to queue server.

This method is used to connect to queue server, will attempt to reconnect when a connection error occcurs.

Parameters
  • host (str) – The host ip or hostname of queue server.

  • port (int) – The port of queue server.

Raises

NotImplementedError – If the subclasses don’t implement the method.

abstract drop(topic: str) int

Drop the frames overflow the buffer.

This method is used to drop the frames overflow the buffer, the length of buffer can be adjusted.

Parameters

topic (str) – The topic name to drop frames.

Returns

The count of frames dropped.

Return type

int

Raises
  • NotImplementedError – If the subclasses don’t implement the method.

  • ValueError – If the topic is None.

abstract get_frame(topic: str) Optional[bytes]

Get a frame from queue.

This method is used to get a frame from queue server, it will deserialize the Frame after get a frame from the queue server,

Parameters

topic (str) – The topic name to get frame.

Returns

The frame get from topic or None if the infer queue is empty now.

Return type

Optional[bytes]

Raises
  • NotImplementedError – If the subclasses don’t implement the method.

  • ValueError – If the topic is None.

abstract infer_queue_available(topic: str) bool

Determine whether the inference queue for the topic is available.

The method is used to determine whether the inference queue for the topic is available or not.

Parameters

topic (str) – The topic name to determine.

Returns

True if the inference queue for the topic is available, False otherwise.

Return type

bool

Raises
  • NotImplementedError – If the subclasses don’t implement the method.

  • ValueError – If the topic is None.

abstract publish_frame(topic: str, frame: bytes) None

Publish a frame to queue server for a topic.

This method is used to publish a frame to queue server, it will serialize the Frame before publish a frame to the queue server.

Parameters
  • topic (str) – The topic name to publish to.

  • frame (bytes) – The frame to publish.

Raises
  • NotImplementedError – If the subclasses don’t implement the method.

  • ValueError – If the topic or frame is None.

abstract register_infer_queue(topic: str)

Register the topic in the inference queue.

The method is used to register the topic in the inference queue.

Parameters

topic (str) – The topic name to register.

Raises
  • NotImplementedError – If the subclasses don’t implement the method.

  • ValueError – If the topic is None.

abstract unregister_infer_queue(topic: str)

Unregister the topic in the inference queue.

The method is used to unregister the topic in the inference queue.

Parameters

topic (str) – The topic name to unregister.

Raises
  • NotImplementedError – If the subclasses don’t implement the method.

  • ValueError – If the topic is None.

class core.inferqueue.KafkaInferQueueClient

Bases: core.inferqueue.InferQueueClientBase

Kafka implementation for inference queue client.

This class implement connect, publish_frame, get_frame, drop, infer_queue_available, register_infer_queue, unregister_infer_queue methods defined in InferQueueClientBase abstract base class for Kafka queue server.

Variables

_conn (KafkaProducer) – The Kafka producer connection object.

connect(host='127.0.0.1', port=9092)

The Kafka queue client implementation for connect method.

The method overrides the connect method defined in the InferQueueClientBase abstract base class. The main defference is raises.

Raises

kafka.errors.NoBrokersAvailable – If connection to the Kafka server fails and reconnection exceeds the limit.

drop(topic: str) int

See base class.

get_frame(topic: str) Optional[bytes]

See base class.

infer_queue_available(topic: str) bool

See base class.

publish_frame(topic: str, frame: bytes) None

See base class.

register_infer_queue(topic: str)

See base class.

unregister_infer_queue(topic: str)

See base class.

class core.inferqueue.RedisInferQueueClient

Bases: core.inferqueue.InferQueueClientBase

Redis implementation for inference queue client.

This class implement connect, publish_frame, get_frame, drop, infer_queue_available, register_infer_queue, unregister_infer_queue methods defined in InferQueueClientBase abstract base class for Redis queue server.

Variables

_conn (redis.Redis) – The Redis connection object.

connect(host: str = '127.0.0.1', port: int = 6379)

The Redis queue client implementation for connect method.

The method overrides the connect method defined in the InferQueueClientBase abstract base class. The main defference is raises.

Raises

redis.exceptions.ConnectionError – If connection to the Redis server fails and reconnection exceeds the limit.

drop(topic: str) int

See base class.

get_frame(topic: str) Optional[bytes]

See base class.

infer_queue_available(topic: str) bool

See base class.

publish_frame(topic: str, frame: bytes) None

See base class.

register_infer_queue(topic: str)

See base class.

unregister_infer_queue(topic: str)

See base class.

core.metrics module

A Metrics module.

This module provides a class to manage metrics in a Prometheus-based application. It provides methods to create, track, and update metrics.

Classes:

MetricType: An enum for the type of metric. ErrorMessage: An enum for error messages. MetricsManager: A class to manage metrics in a Prometheus-based application.

class core.metrics.ErrorMessage(value)

Bases: enum.Enum

An enum for error messages.

This enum defines the error messages that can be raised.

INVALID_METRIC_TYPE = 'Invalid metric type: {}'
METRIC_DOES_NOT_EXIST = 'Metric {} does not exist'
METRIC_EXISTS = 'Metric {} already exists'
METRIC_TYPE_MISMATCH = 'Metric {} is not a {}'
class core.metrics.MetricType(value)

Bases: enum.Enum

An enum for the type of metric.

This enum defines the types of metrics that can be created.

COUNTER = 'Counter'
GAUGE = 'Gauge'
HISTOGRAM = 'Histogram'
SUMMARY = 'Summary'
class core.metrics.MetricsManager(*args, **kwargs)

Bases: object

A class to manage metrics in a Prometheus-based application.

This class provides methods to create, track, and update metrics.

Variables

metrics (dict) – A dictionary of all metrics.

create_metric(metric_type: core.metrics.MetricType, metric_name: str, description: str) None

Creates a new metric.

This method will create a new metric of the specified type and stores it in the metrics dictionary.

Parameters
  • metric_type (MetricType) – The type of the metric.

  • metric_name (str) – The name of the metric. Must be unique across all metrics.

  • description (str) – The description of the metric.

Raises

ValueError – If the specified metric already exists or the metric type is invalid.

get_metric(metric_name: str, metric_type: core.metrics.MetricType) Union[prometheus_client.metrics.Counter, prometheus_client.metrics.Gauge, prometheus_client.metrics.Histogram, prometheus_client.metrics.Summary]

Returns the metric with the specified name and type.

Parameters
  • metric_name (str) – The name of the metric.

  • metric_type (type) – The type of the metric.

Returns

The metric with the specified name and type.

Return type

Union[Counter, Gauge, Histogram, Summary]

Raises

ValueError – If the specified metric does not exist or the metric type is invalid.

get_metrics() Dict[str, Union[prometheus_client.metrics.Counter, prometheus_client.metrics.Gauge, prometheus_client.metrics.Histogram, prometheus_client.metrics.Summary]]

Returns the dictionary of all metrics.

Returns

The dictionary of all metrics.

Return type

dict

increment_counter(metric_name: str, by: float = 1.0) None

Increments a Counter metric by the specified amount.

Parameters
  • metric_name (str) – The name of the Counter metric to increment.

  • by (int, optional) – The amount to increment by. Default is 1.

Raises

ValueError – If the specified metric does not exist or is not a Counter.

latency_decorator(metric_name: str) Callable[[...], Callable[[...], Any]]

Returns a decorator that tracks the latency of the decorated function.

Parameters

metric_name (str) – The name of the Histogram metric to track the latency.

Returns

A decorator that tracks the latency of the decorated

function.

Return type

Callable[…, Callable[…, Any]]

Raises

ValueError – If the specified metric does not exist or is not a Histogram.

observe_histogram(metric_name: str, value: float) None

Observes a value in a Histogram metric.

Parameters
  • metric_name (str) – The name of the Histogram metric to observe a value in.

  • value (float) – The value to observe.

Raises

ValueError – If the specified metric does not exist or is not a Histogram.

observe_summary(metric_name: str, value: float) None

Observes a value in a Summary metric.

Parameters
  • metric_name (str) – The name of the Summary metric to observe a value in.

  • value (float) – The value to observe.

Raises

ValueError – If the specified metric does not exist or is not a Summary.

set_gauge(metric_name: str, value: float) None

Sets a Gauge metric to the specified value.

Parameters
  • metric_name (str) – The name of the Gauge metric to set.

  • value (float) – The value to set the Gauge to.

Raises

ValueError – If the specified metric does not exist or is not a Gauge.

core.model module

A Model module.

This module contains the definition of the ModelInfo class, which encapsulates all relevant information about an Inference model in a machine learning pipeline.

In addition, ModelInfo instances can be easily serialized to dictionaries for further processing or storage, by using Python’s built-in dict() function.

Classes:

ModelMetrics: A class that encapsulates the performance metrics of the inference model. ModelDetails: A class that encapsulates the details of the inference model. ModelInfo: A class for managing information about an Inference model. Model: A class for managing an Inference model.

class core.model.Model(model_provider: str, model_info_url: str, model_id: Optional[str] = None)

Bases: object

A class for managing an Inference model.

This class defines the information of the inference model, including model information and model binary.

Variables
  • _model_info (ModelInfo) – The information of the model.

  • _model_binary (bytes) – The binary of the model.

property model_binary: bytes

The binary of the model.

Type

bytes

property model_info: core.model.ModelInfo

The information of the model.

Type

ModelInfo

class core.model.ModelDetails(name: str, version: str, framework: str, target: str, dtype: str)

Bases: object

A class that encapsulates the details of the inference model.

This class defines the details of the inference model, including name, version, framework, target and data type.

Variables
  • _name (str) – The name of the model.

  • _version (str) – The version of the model.

  • _framework (str) – The framework used to train the model.

  • _target (str) – The target of the model.

  • _dtype (str) – The data type of the model.

property dtype: str

The data type of the model.

Type

str

property framework: str

The framework used to train the model.

Type

str

property name: str

The name of the model.

Type

str

property target: str

The target of the model.

Type

str

property version: str

The version of the model.

Type

str

class core.model.ModelInfo(model_id: str, path: str, details: core.model.ModelDetails, uploaded_date: datetime.datetime, metrics: core.model.ModelMetrics)

Bases: object

A class for managing information about an Inference model.

This class defines the information of the inference model, including ID, details, uploaded date and performance metrics.

Variables
  • _id (str) – The ID of the model.

  • _details (ModelDetails) – The details of the model.

  • _uploaded_date (datetime) – The uploaded date of the model.

  • _metrics (ModelMetrics) – The performance metrics of the model.

property details: core.model.ModelDetails

The details of the model.

Type

ModelDetails

property id: str

The ID of the model (string of UUID).

Type

str

property metrics: core.model.ModelMetrics

The performance metrics of the model.

Type

ModelMetrics

property path: str

The path of the model.

Type

str

property uploaded_date: datetime.datetime

The creation date of the model.

Type

datetime

class core.model.ModelMetrics(accuracy: float, precision: float, recall: float, f1_score: float, loss: float)

Bases: object

A class that encapsulates the performance metrics of the inference model.

This class defines the performance metrics of the inference model, including accuracy, precision, recall, F1 score and loss value.

Variables
  • _accuracy (float) – The accuracy of the model.

  • _precision (float) – The precision of the model.

  • _recall (float) – The recall of the model.

  • _f1_score (float) – The F1 score of the model.

  • _loss (float) – The loss value of the model.

property accuracy: float

The accuracy of the model.

Type

float

property f1_score: float

The F1 score of the model.

Type

float

property loss: float

The loss value of the model.

Type

float

property precision: float

The precision of the model.

Type

float

property recall: float

The recall of the model.

Type

float

core.pipeline module

A Pipeline module.

This module contains the definition of the Pipeline class and PipelineManager class.

Classes:

Pipeline: A Class that encapsulates the pipeline information. PipelineManager: A class that manage the pipeline with runtime database.

class core.pipeline.Pipeline(provider: core.stream.StreamProvider, infer_engine_info: core.infereng.InferenceInfo)

Bases: object

A Class that encapsulates the pipeline information.

This class defines the pipeline information, including the pipeline ID, stream provider and inference engine information.

Variables
  • _id (str) – The pipeline ID.

  • _provider (StreamProvider) – The stream provider of pipeline.

  • _infer_engine_dict (dict) – The dict of inference engines inforamtion of pipeline.

property id: str

The pipeline ID (string of UUID).

Type

str

class core.pipeline.PipelineManager(db: core.rtdb.RuntimeDatabaseBase)

Bases: object

A class that manage the pipeline with runtime database.

This class manage the pipeline with runtime database, provides the register_pipeline and unregister_pipeline methods.

Variables

_db (RuntimeDatabaseBase) – The runtime database used by pipeline manager.

PIPELINE_TABLE = 'Pipeline-table'
clean_infer_engine(infer_info_id: str) None

Clean inference engine in infer_engine_dict.

Parameters

infer_info_id (str) – The id of inference info to clear inference fps.

Raises

ValueError – Propagates the ValueError raised by get_all_table_objects_dict or save_table_object_dict if some cases are met.

register_pipeline(pipeline_obj: core.pipeline.Pipeline) None

Register a new pipeline.

Parameters

pipeline_obj (Pipeline) – The pipeline object to register.

Raises

ValueError – Propagates the ValueError raised by save_table_object_dict if some cases are met.

set_infer_fps(pipeline_id: str, infer_info: core.infereng.InferenceInfo, infer_fps: int) None

Set inference fps for a pipeline.

Parameters
  • pipeline_id (str) – The id of pipeline to set inference fps.

  • infer_info (InferenceInfo) – The inference info to set inference fps.

  • infer_fps (int) – The inference fps to set.

Raises

ValueError – Propagates the ValueError raised by get_table_object_dict or save_table_object_dict if some cases are met.

unregister_pipeline(pipeline_id: str) None

Unregister an existing pipeline.

Parameters

pipeline_id (str) – The id of pipeline to unregister.

Raises

ValueError – Propagates the ValueError raised by del_table_object if some cases are met.

core.crypto.qat module

A QAT module.

This module provides a class to use Intel qat to accelerate compression/decompression of frames. It provides methods to init, set up, teardown and close QATZip session, as well as compression and decompression using QAT.

Classes:

QATZip: A class that use QAT to accelerate frame compression and decompression.

class core.crypto.qat.QATZip(direction: int = 2, comp_lvl: int = 1, algorithm: int = 8)

Bases: core.crypto.zip.ZipBase

A class that use QAT to accelerate frame compression and decompression.

Variables
  • _qziplib (CDLL) – The QATZip library.

  • _direction (int) – The QATZip work way.

  • _comp_lvl (int) – The QATZip compression level.

  • _algorithm (int) – The QATZip Compression/decompression algorithm.

  • _session (QzSession) – The QATZip Session.

  • _common_session (QzSessionParamsCommon) – The QATZip session with common parameters.

  • _deflate_params (QzSessionParamsDeflate) – The QATZip session with deflate parameters.

ALGO_DEFLATE = 8
DIR_BOTH = 2
DIR_COMPRESS = 0
DIR_DECOMPRESS = 1
EXPANSION_RATIO = [5, 20, 50, 100]
LAST = 1
LEVEL_BEST = 9
LEVEL_FAST = 1
SW_BACKUP = 1
property algorithm: int

The QATZip compression/decompression algorithm.

Type

int

close_session() None

Terminates a QATZip session.

This method closes the connection with QAT.

Raises

RuntimeError – If any error occurs while closing the session.

property comp_lvl: int

The QATZip compression level.

Type

int

compress(src: bytes) bytes

See base class.

decompress(src: bytes) bytes

See base class.

property direction: int

Compress or decompress or both.

Type

int

init_session() None

Initialize QAT hardware.

Raises

RuntimeError – If any errors during the initialization of QAT.

set_session() None

Initialize a QATZip session with deflate parameters.

Raises

RuntimeError – If any error occurs during the session set up.

teardown_session() None

Uninitialize a QATZip session.

This method disconnects a session from a hardware instance and deallocates buffers. If no session has been initialized, then no action will take place.

Raises

RuntimeError – If any error occurs during the session teardown.

core.crypto.qat_params module

core.rtdb module

A RuntimeDatabase module.

This module provides an object-oriented design for Runtime Database to cache the runtime information and use in-memory datbase services (like Redis) as backend service.

Classes:

RuntimeDatabaseBase: An abstract base class to manage runtime database tables. RedisDB: A concrete class implementing the RuntimeDatabaseBase using Redis as backend service.

class core.rtdb.RedisDB

Bases: core.rtdb.RuntimeDatabaseBase

Redis backend implementation for runtime database.

This class implements connect, save_table_object_dict, get_table_object_dict, get_all_table_objects_dict, check_table_object_exist, del_table_object methods defined in RuntimeDatabaseBase abstract base class for Redis in-memory datbase backend.

Variables
  • _conn (redis.Redis) – The Redis connection object.

  • _lock (Lock) – The lock for Redis connection.

check_table_object_exist(table: str, obj: str) bool

See base class.

connect(host: str = '127.0.0.1', port: int = 6379, db: int = 0) None

The connect method for the runtime database of Redis backend implementation.

The method overrides the connect method defined in the RuntimeDatabaseBase abstract base class. The main difference is raises.

Raises

redis.exceptions.ConnectionError – If connection to the Redis server fails and reconnection exceeds the limit.

del_table_object(table: str, obj: str) None

See base class.

get_all_table_objects_dict(table: str) dict

See base class.

get_table_object_dict(table: str, obj: str) dict

See base class.

lock(name)

Lock for redis connection.

save_table_object_dict(table: str, obj: str, d: dict) None

See base class.

unlock()

Unlock for redis connection.

class core.rtdb.RuntimeDatabaseBase

Bases: abc.ABC

An abstract base class to manage runtime database tables.

This class serves as a blutprint for subclasses that need to implement connect, save_table_object_dict, get_table_object_dict, get_all_table_objects_dict, check_table_object_exist, del_table_object methods for different types of in-memory database services.

MAX_RECONNECTION_TIMES = 5
abstract check_table_object_exist(table: str, obj: str) bool

Check whether a given object exists in a given table.

This method is used to check whether a given object exists in a given table.

Parameters
  • table (str) – The name of the table.

  • obj (str) – The name of the object.

Returns

True if a given object exists in a given table, otherwise False.

Return type

bool

Raises
  • NotImplementedError – If the subclasses don’t implement the method.

  • ValueError – If table or obj is None.

abstract connect(host: str, port: int)

Connect to runtime database.

This method is used to connect to runtime database, will attempt to reconnect when a connection error occcurs.

Parameters
  • host (str) – The host ip or hostname of runtime database.

  • port (int) – The port of runtime database.

Raises

NotImplementedError – If the subclasses don’t implement the method.

abstract del_table_object(table: str, obj: str) None

Delete an object from a given table.

This method is used to delete an object from a given table.

Parameters
  • table (str) – The name of the table.

  • obj (str) – The name of the object.

Raises
  • NotImplementedError – If the subclasses don’t implement the method.

  • ValueError – If table or obj is None.

abstract get_all_table_objects_dict(table: str) dict

Get all dict values from a table.

This method is used to get all dict values from a table.

Parameters

table (str) – The name of the table.

Returns

All dict values get by the table name.

Return type

dict

Raises
  • NotImplementedError – If the subclasses don’t implement the method.

  • ValueError – If table is None.

abstract get_table_object_dict(table: str, obj: str) dict

Get a dict value for an object from a table.

This method is used to get a dict value for an object from a table.

Parameters
  • table (str) – The name of the table.

  • obj (str) – The name of the object.

Returns

The dict value get by table name and object name.

Return type

dict

Raises
  • NotImplementedError – If the subclasses don’t implement the method.

  • ValueError – If table or obj is None.

abstract save_table_object_dict(table: str, obj: str, d: dict) None

Save a dict value for an object in a table.

This method is used to save a dict value for an object in a table.

Parameters
  • table (str) – The name of the table.

  • obj (str) – The name of the object.

  • d (dict) – The value to be saved.

Raises
  • NotImplementedError – If the subclasses don’t implement the method.

  • ValueError – If table or obj is None.

core.stream module

A Stream module.

This module provides an object-oriented design for Stream provider to provide stream input.

Classes:

StreamProvider: An abstract base class for creating custom stream provider implementations. CameraSource: A concrete class implementing the StreamProvider for camera source. FileSource: A concrete class implementing the StreamProvider for file source.

functinos:

create_stream_from_type: Create StreamProvider instance according to type.

class core.stream.CameraSource(name: str, pathname: str = '/dev/video0')

Bases: core.stream.StreamProvider

A concrete class implementing the StreamProvider for camera source.

This class implement verify, read_raw_frame, open, close methods defined in StreamProvider abstract base class for providing stream input from camera.

Variables

_device_obj (cv2.VideoCapture) – The OpenCV VideoCapture object.

close() None

See base class.

open()

Open the stream for camera source stream provider.

The method override the open method defined in StreamProvider abstract base class. If the device node is not valid, or failed to open the camera, the method will raise ValueError or FileNotFoundError or IOError.

Raises
  • ValueError – If the dev_num getted from path name is not valid.

  • FileNotFoundError – If the path name for camera is not found.

  • IOError – If failed to open the camera.

read_raw_frame() numpy.ndarray

See base class.

verify() bool

See base class.

class core.stream.FileSource(name: str, pathname: str = 'classroom.mp4')

Bases: core.stream.StreamProvider

A concrete class implementing the StreamProvider for file source.

This class implement verify, read_raw_frame, open, close methods defined in StreamProvider abstract base class for providing stream input from video file.

Variables
  • _file_db (FileDatabase) – The file database of file source stream provider.

  • _file_object (cv2.VideoCapture) – The OpenCV VideoCapture object.

  • _file_path (str) – The video file path of file source stream provider.

  • _frame_counter (int) – The count of frames that have been read.

  • _max_frame (int) – The max frame count of file source stream provider.

DEFAULT_TARGET_FPS = 25
close() None

See base class.

property file_db: core.filedb.FileDatabase

The file database of file source stream provider.

Type

FileDatabase

open()

Open the stream for file source stream provider.

The method overrides the open method defined in StreamProvider abstract base class. If the file path is not valid, or failed to open the file, the method will raise FileNotFoundError or TypeError.

Raises
  • FileNotFoundError – If the path name for video file is not found.

  • TypeError – If failed to create the VideoCapture object.

read_raw_frame() numpy.ndarray

See base class.

property target_fps: int

The target FPS of file source stream provider.

Type

int

verify() bool

See base class.

class core.stream.StreamProcessor(provider: core.stream.StreamProvider)

Bases: object

The class to process stream.

Variables

_provider (StreamProvider) – The stream provider to process.

prepare() None

Prepare the stream processor.

This function is used to prepare the stream processor, like verifying stream provider or registering the pipeline.

property provider: core.stream.StreamProvider

The provider instance.

Type

StreamProvider

class core.stream.StreamProvider(name: str, pathname: str)

Bases: abc.ABC

An abstract base class for creating custom stream provider implementations.

The class serves as a blueprint for subclasses that need to implement verify, read_raw_frame, open, close methods for different types of stream providers.

Variables
  • _name (str) – The name of stream provider.

  • _pathname (str) – The path name of stream provider.

  • _raw_size (Tuple[int, int]) – The raw size (width, height) of frame captured from stream.

  • _raw_fps (int) – The raw FPS from source.

  • _seq (int) – The sequence of stream provider.

  • _target_fps (int) – The target FPS.

DEFAULT_FPS = 15
DEFAULT_HEIGHT = 240
DEFAULT_WIDTH = 320
abstract close() None

Close the stream.

The method is used to close the stream.

Raises

NotImplementedError – If the subclasses don’t implement the method.

property name: str

The name of stream provider.

Type

str

abstract open() None

Open the stream.

The method is used to open the stream.

Raises

NotImplementedError – If the subclasses don’t implement the method.

property pathname: str

The path name of stream provider.

For example: - Camera: “/dev/video0” - File: “SampleVideo.mp4”

Type

str

property raw_fps: int

The raw FPS from source.

Type

int

property raw_size: Tuple[int, int]

The raw size (width, height) of frame captured from stream.

Type

Tuple[int, int]

abstract read_raw_frame() numpy.ndarray

Get a frame from source.

This method is used to get a frame from source.

Returns

An numpy.ndarray object representing the raw frame.

Return type

numpy.ndarray

Raises

NotImplementedError – If the subclasses don’t implement the method.

property target_fps: int

The target FPS.

Type

int

abstract verify() bool

Verify the provider’s measurement/quote/integrity.

This method is used to verify the provider’s measurement/quote/integrity.

Returns

True if the verification success, False otherwise.

Return type

bool

Raises

NotImplementedError – If the subclasses don’t implement the method.

core.stream.create_stream_from_type(type_name: str, name: str, pathname: str) core.stream.StreamProvider

Create StreamProvider instance according to type.

The method is used to create StreamProvider instance according to type.

Parameters
  • type_name (str) – The type of stream provider to create.

  • name (str) – The stream name of stream provider to create.

  • pathname (str) – The path name of stream provider to create.

Returns

The created stream provider.

Return type

StreamProvider

Raises

ValueError – If the provider source type is invalid.

core.streambroker module

A Streambroker module.

This module provides an object-oriented design for stream broker client to connect with stream broker server, publish frame to stream broker server.

Classes:

StreamBrokerClientBase: An abstract base class for stream broker client. RedisStreamBrokerClient: A concrete class implementing the StreamBrokerClientBase

for Redis server.

KafkaStreamBrokerClient: A concrete class implementing the StreamBrokerClientBase

for Kafka server.

class core.streambroker.KafkaStreamBrokerClient

Bases: core.streambroker.StreamBrokerClientBase

Kafka implementation for stream broker client.

This class implement connect, publish_frame methods defined in StreamBrokerClientBase abstract base class for Kafka stream broker.

Variables

_conn (KafkaProducer) – The Kafka connection object.

connect(host='127.0.0.1', port=9092)

Implement the connect method for the Kafka stream broker client.

The method overrides the connect method defined in StreamBrokerClientBase abstract base class. If connection to the Kafka server fails and reconnection exceeds the limit, raise NoBrokersAvailable error.

Raises

kafka.errors.NoBrokersAvailable – if connection to the Kafka server fails and reconnection exceeds the limit.

publish_frame(topic: str, frame: core.frame.Frame) None

See base class.

class core.streambroker.RedisStreamBrokerClient

Bases: core.streambroker.StreamBrokerClientBase

Redis implementation for stream broker client.

This class implement connect, publish_frame methods defined in StreamBrokerClientBase abstract base class for Redis stream broker.

Variables

_conn (redis.Redis) – The Redis connection object.

connect(host: str = '127.0.0.1', port: int = 6379)

Implement the connect method for the Redis stream broker client.

The method overrides the connect method defined in StreamBrokerClientBase abstract base class. If connection to the Redis server fails and reconnection exceeds the limit, raise ConnectionError.

Raises

redis.exceptions.ConnectionError – If connection to the Redis server fails and reconnection exceeds the limit.

publish_frame(topic: str, frame: core.frame.Frame) None

See base class.

class core.streambroker.StreamBrokerClientBase

Bases: abc.ABC

An abstract base class for stream broker client.

This class serves as a blueprint for subclasses that need to implement connect, publish_frame methods for different types of stream broker server.

MAX_RECONNECTION_TIMES = 5
abstract connect(host: str, port: int)

Connect to broker server.

This method is used to connect to stream broker server, will attempt to reconnect when a connection error occcurs.

Parameters
  • host (str) – The host ip or hostname of broker server.

  • port (str) – The port of broker server.

Raises

NotImplementedError – If the subclasses don’t implement the method.

abstract publish_frame(topic: str, frame: core.frame.Frame) None

Publish a frame to stream broker server for a topic.

Parameters
  • topic (str) – The topic name to publish to.

  • frame (Frame) – The Frame to publish.

Raises
  • NotImplementedError – If the subclasses don’t implement the method.

  • ValueError – If the topic or frame is None.

  • RuntimeError – If any errors while encoding before publishing frame.

Module contents

The core package.

This package contains the core components of CNAP, including the implementations of engines, processors, stream brokers, etc.