core package¶
Subpackages¶
Submodules¶
core.filedb module¶
A FileDatabase module.
This module provides an object-oriented design for a file database to load video files.
- Classes:
FileDatabase: An abstract base class for creating custom filedatabase implementations. LocalFileDatabase: A concrete class implementing the FileDatabase for local file operations.
- class core.filedb.FileDatabase¶
Bases:
abc.ABC
An abstract base class for creating custom file database implementations.
This class serves as a blueprint for subclasses that need to implement the get_file method for different types of file databases.
- abstract get_file(filename: str) str ¶
Abstract method for getting a file from the file database.
This method is expected to retrieve a file from the file database and return the local path for the file.
- Parameters
filename (str) – The name of the file to get.
- Returns
The local path for the file.
- Return type
str
- Raises
NotImplementedError – if the subclass does not implement this method.
- class core.filedb.LocalFileDatabase(root_dir: str)¶
Bases:
core.filedb.FileDatabase
A concrete class implementing the FileDatabase for local file operations.
This class uses local filesystem to implement the get_file method defined in the FileDatabase abstract base class.
- Variables
_root_dir (str) – The root directory path for local file operations.
- get_file(filename: str) str ¶
Gets the local path for a file.
This method uses the root directory to find the given file and returns its local path.
- Parameters
filename (str) – The name of the file to get.
- Returns
The local path for the file.
- Return type
str
- Raises
FileNotFoundError – If the root_dir is invalid or failed to find the given file.
core.frame module¶
A Frame module.
This module contains the definition of frame class, which encapsulates raw frame and related information for transferring data between micro services, it can be simply encoded and decoded by protobuf. A frame should include enough information for traceability, observability, etc.
In addition, this module provides an object-oriented design for frame cipher to encrypt and decrypt frame.
- Classes:
- Frame: A class that encapsulates raw frame and related information, can be simply encoded and
decoded by protobuf.
FrameCipherBase: An abstract base class for creating custom frame cipher implementations. QATFrameCipher: A concrete class implementing encrypt and decrypt frame accelerated with QAT.
- class core.frame.Frame(provider: core.stream.StreamProvider, pipeline_id: str, sequence: int, raw_frame: numpy.ndarray)¶
Bases:
object
A class that encapsulates raw frame and related information.
The Frame include enough information for traceability, observability, and can be simply encoded and decoded by protobuf.
- Variables
_provider (StreamProvider) – The stream provider of the frame.
_pipeline_id (str) – The id of the pipeline to which the frame belongs.
_sequence (int) – The monolithic counter of the frame sequence number.
_raw (np.ndarray) – The raw frame of the frame.
_ts_new (float) – The timestamp of the beginning of this new frame.
_ts_infer_end (float) – The timestamp of the inference’s end point of this frame.
- decrypt(actor: core.frame.FrameCipherBase) None ¶
Decrypts the frame with a given frame cipher after transferring it into TEE.
The method should be called after transferring the frame into TEE.
- Parameters
actor (FrameCipherBase) – The frame cipher to use for decryption.
- encrypt(actor: core.frame.FrameCipherBase) None ¶
Encrypts the frame with a given frame cipher before transferring it outside of TEE.
The method should be called before transferring the frame outside of TEE.
- Parameters
actor (FrameCipherBase) – The frame cipher to use for encryption.
- static from_blob(blob: bytes) core.frame.Frame ¶
Restore a frame instance from a blob.
This method uses protobuf to deserialize a blob to a Frame.
- Parameters
blob (bytes) – The blob to restore.
- Returns
The frame restored from a blob.
- Return type
- Raises
TypeError – If the type of ‘blob’ argument is not bytes type.
RuntimeError – If any errors while decoding.
- classmethod get_sequence() int ¶
Get the monolithic count for the sequence number of a frame.
The method will return the last sequence number plus one, and update the last sequence number to the new value.
- Returns
The last sequence number.
- Return type
int
- last_sequence = 0¶
- normalize(target_size: Tuple[int, int]) None ¶
Normalize the frame to the target size.
This method uses OpenCV to resize the frame to the target size.
- Parameters
target_size (Tuple[int, int]) – The target size for frame to normalize.
- Raises
ValueError – If the target_size is invalid.
- property pipeline_id: str¶
The id of the pipeline to which the frame belongs.
- Type
str
- property raw: numpy.ndarray¶
The raw frame of the frame.
- Type
np.ndarray
- property sequence: int¶
The monolithic counter of the frame sequence number.
- Type
int
- property timestamp_infer_end: float¶
The timestamp of the inference’s end point of this frame.
- Type
float
- property timestamp_new_frame: float¶
The timestamp of the beginning of this new frame.
- Type
float
- to_blob() bytes ¶
Encode the raw frame to a blob for transferring to the infer queue.
This method uses protobuf to serialize a Frame into a blob.
- Returns
The blob encoded from raw frame.
- Return type
bytes
- Raises
RuntimeError – If any errors while encoding.
- class core.frame.FrameCipherBase¶
Bases:
abc.ABC
An Abstract base class for creating custom frame cipher implementations.
This class serves as a blueprint for subclass that need to implement encrypt and decrypt method for different accelerators.
- abstract decrypt() None ¶
Decrypt the frame.
The method is to decrypt the encrypted frame.
- Raises
NotImplementedError – If the subclasses don’t implement the method.
- abstract encrypt() None ¶
Encrypt the frame.
The method is to encrypt the frame to protect the privacy of the frame.
- Raises
NotImplementedError – If the subclasses don’t implement the method.
- class core.frame.QATFrameCipher¶
Bases:
core.frame.FrameCipherBase
Class that uses QAT to accelerate frame encryption and decryption.
This class implements the encrypt and decrypt methods from the FrameCipherBase abstract base class.
- decrypt() None ¶
See base class.
- encrypt() None ¶
See base class.
core.infereng module¶
A Inference Engine module.
This module contains the definition of InferenceInfo class, which encapsulates the inference information. It also contains the definition of InferEngineManager class, which manages inference engines.
In addition, this module provides an object-oriented design for inference engine.
- Classes:
InferenceInfo: A class that encapsulates inference information. InferEngineManager: A class to manage inference engines. InferenceEngine: An abstract base class for creating custom inference engine implementations.
- class core.infereng.InferEngineManager(db: core.rtdb.RuntimeDatabaseBase)¶
Bases:
object
The class to manage inference engines.
This class is the class that manages inference engines, it provides register_engine and unregister_engine methods to register and unregister inference engines, and provides search_engine method to search available engines.
- Variables
_db (RuntimeDatabaseBase) – The RuntimeDatabaseBase object for InferEngineManager to use.
- register_engine(infer_info: core.infereng.InferenceInfo, model_info: core.model.ModelInfo) None ¶
Register the infer engine to database.
This method is used to register the infer engine to database.
- Parameters
infer_info (InferenceInfo) – The InferenceInfo object to register.
model_info (ModelInfo) – The ModelInfo object to register.
- search_engine(framework: str, target: str, device: str, model_name: str, model_version: str) Optional[core.infereng.InferenceInfo] ¶
Search inference engine.
This method is uesd to search the engine according to the framework and target requested, return the InferenceInfo include the UUID of infer engine.
- Parameters
framework (str) – Framework of the model
target (str) – Target of the model
device (str) – Device type for infer engine
model_name (str) – Name of the model
model_version (str) – Version of the model
- Returns
An InferenceInfo object if engine is found, None otherwise.
- Return type
- unregister_engine(infer_info_id: str) None ¶
Unregister the infer engine from database.
This method is used to unregister the infer engine from database.
- Parameters
infer_info_id (str) – The UUID of Infer Engine Info to unregister.
- class core.infereng.InferenceEngine¶
Bases:
abc.ABC
An abstract base class for creating custom inference engine implementations.
This class serves as a blueprint for subclasses that need to implement the verify, preprocess, postprocess, _predict methods for different inference frameworks.
- abstract postprocess(frame: numpy.ndarray, outputs: dict) numpy.ndarray ¶
Postprocesses the output from the inference process.
The method is to postprocess the output from the inference process.
- Parameters
frame (np.ndarray) – The input frame.
outputs (dict) – The output result from the inference process.
- Returns
The postprocessed result.
- Return type
np.ndarray
- Raises
NotImplementedError – If this method is not implemented by a subclass.
- predict(frame: numpy.ndarray) Tuple[numpy.ndarray, float] ¶
Performs inference using the loaded model and input data, and measures the latency.
The method is to perform inference using the loaded model and input data, and measures the latency.
- Parameters
frame (np.ndarray) – The input data to use for inference.
- Returns
The postprocessed result and the latency in seconds.
- Return type
Tuple[np.ndarray, float]
- abstract preprocess(frame: numpy.ndarray) numpy.ndarray ¶
Preprocesses the input data.
The method is to preprocess the input data before feeding it to the model.
- Parameters
frame (np.ndarray) – The input data to preprocess.
- Returns
The preprocessed input data.
- Return type
np.ndarray
- Raises
NotImplementedError – If this method is not implemented by a subclass.
- abstract verify() bool ¶
Checks if the model is valid for inference.
The method is to check if the model is valid for inference.
- Returns
True if the model is valid, otherwise False.
- Return type
bool
- Raises
NotImplementedError – If this method is not implemented by a subclass.
- class core.infereng.InferenceInfo(device: str, model_id: str)¶
Bases:
object
Inference information class.
This class encapsulates the inference information, it can be simply encoded and decoded by protobuf. A inference information should include enough information for traceability, observability, etc.
- Variables
_device (str) – The device type for infer engine.
_model_id (str) – The UUID for infer model.
_id (str) – The UUID for Infer Engine Info.
_queue_topic (str) – The queue topic for infer engine.
_input_size (Tuple[int, int]) – The input size tuple required by infer model.
- property device: str¶
The device type for infer engine.
- Type
str
- property id: str¶
The UUID for Infer Engine Info.
- Type
str
- property input_size: Tuple[int, int]¶
The input size tuple required by infer model.
- Type
Tuple[int, int]
- property model_id: str¶
The UUID for infer model.
- Type
str
- property queue_topic: str¶
The queue topic for infer engine.
- Type
str
core.inferqueue module¶
A Inferqueue module.
This module provides an object-oriented design for inference queue client to connect with inference queue server, publish frame to queue server, get frame from queue server, drop frames in queue server judge the availability for the topic in queue server, register topic in queue server, unregister topic in queue server.
- Classes:
InferQueueClientBase: An abstract base class for inference queue client. RedisInferQueueClient: A concrete class implementing the InferQueueClientBase for Redis server. KafkaInferQueueClient: A concrete class implementing the InferQueueClientBase for Kafka server.
- class core.inferqueue.InferQueueClientBase¶
Bases:
abc.ABC
An abstract base class for inference queue client.
This class serves as a blueprint for subclasses that need to implement connect, publish_frame, get_frame, drop, infer_queue_available, register_infer_queue, unregister_infer_queue methods for different types of queue server.
- Variables
_buffer_len (int) – The buffer length for inference queue.
- MAX_RECONNECTION_TIMES = 5¶
- property buffer_len: int¶
The buffer length for infer queue, default 32.
- Type
int
- abstract connect(host: str, port: int)¶
Connect to queue server.
This method is used to connect to queue server, will attempt to reconnect when a connection error occcurs.
- Parameters
host (str) – The host ip or hostname of queue server.
port (int) – The port of queue server.
- Raises
NotImplementedError – If the subclasses don’t implement the method.
- abstract drop(topic: str) int ¶
Drop the frames overflow the buffer.
This method is used to drop the frames overflow the buffer, the length of buffer can be adjusted.
- Parameters
topic (str) – The topic name to drop frames.
- Returns
The count of frames dropped.
- Return type
int
- Raises
NotImplementedError – If the subclasses don’t implement the method.
ValueError – If the topic is None.
- abstract get_frame(topic: str) Optional[bytes] ¶
Get a frame from queue.
This method is used to get a frame from queue server, it will deserialize the Frame after get a frame from the queue server,
- Parameters
topic (str) – The topic name to get frame.
- Returns
The frame get from topic or None if the infer queue is empty now.
- Return type
Optional[bytes]
- Raises
NotImplementedError – If the subclasses don’t implement the method.
ValueError – If the topic is None.
- abstract infer_queue_available(topic: str) bool ¶
Determine whether the inference queue for the topic is available.
The method is used to determine whether the inference queue for the topic is available or not.
- Parameters
topic (str) – The topic name to determine.
- Returns
True if the inference queue for the topic is available, False otherwise.
- Return type
bool
- Raises
NotImplementedError – If the subclasses don’t implement the method.
ValueError – If the topic is None.
- abstract publish_frame(topic: str, frame: bytes) None ¶
Publish a frame to queue server for a topic.
This method is used to publish a frame to queue server, it will serialize the Frame before publish a frame to the queue server.
- Parameters
topic (str) – The topic name to publish to.
frame (bytes) – The frame to publish.
- Raises
NotImplementedError – If the subclasses don’t implement the method.
ValueError – If the topic or frame is None.
- abstract register_infer_queue(topic: str)¶
Register the topic in the inference queue.
The method is used to register the topic in the inference queue.
- Parameters
topic (str) – The topic name to register.
- Raises
NotImplementedError – If the subclasses don’t implement the method.
ValueError – If the topic is None.
- abstract unregister_infer_queue(topic: str)¶
Unregister the topic in the inference queue.
The method is used to unregister the topic in the inference queue.
- Parameters
topic (str) – The topic name to unregister.
- Raises
NotImplementedError – If the subclasses don’t implement the method.
ValueError – If the topic is None.
- class core.inferqueue.KafkaInferQueueClient¶
Bases:
core.inferqueue.InferQueueClientBase
Kafka implementation for inference queue client.
This class implement connect, publish_frame, get_frame, drop, infer_queue_available, register_infer_queue, unregister_infer_queue methods defined in InferQueueClientBase abstract base class for Kafka queue server.
- Variables
_conn (KafkaProducer) – The Kafka producer connection object.
- connect(host='127.0.0.1', port=9092)¶
The Kafka queue client implementation for connect method.
The method overrides the connect method defined in the InferQueueClientBase abstract base class. The main defference is raises.
- Raises
kafka.errors.NoBrokersAvailable – If connection to the Kafka server fails and reconnection exceeds the limit.
- drop(topic: str) int ¶
See base class.
- get_frame(topic: str) Optional[bytes] ¶
See base class.
- infer_queue_available(topic: str) bool ¶
See base class.
- publish_frame(topic: str, frame: bytes) None ¶
See base class.
- register_infer_queue(topic: str)¶
See base class.
- unregister_infer_queue(topic: str)¶
See base class.
- class core.inferqueue.RedisInferQueueClient¶
Bases:
core.inferqueue.InferQueueClientBase
Redis implementation for inference queue client.
This class implement connect, publish_frame, get_frame, drop, infer_queue_available, register_infer_queue, unregister_infer_queue methods defined in InferQueueClientBase abstract base class for Redis queue server.
- Variables
_conn (redis.Redis) – The Redis connection object.
- connect(host: str = '127.0.0.1', port: int = 6379)¶
The Redis queue client implementation for connect method.
The method overrides the connect method defined in the InferQueueClientBase abstract base class. The main defference is raises.
- Raises
redis.exceptions.ConnectionError – If connection to the Redis server fails and reconnection exceeds the limit.
- drop(topic: str) int ¶
See base class.
- get_frame(topic: str) Optional[bytes] ¶
See base class.
- infer_queue_available(topic: str) bool ¶
See base class.
- publish_frame(topic: str, frame: bytes) None ¶
See base class.
- register_infer_queue(topic: str)¶
See base class.
- unregister_infer_queue(topic: str)¶
See base class.
core.metrics module¶
A Metrics module.
This module provides a class to manage metrics in a Prometheus-based application. It provides methods to create, track, and update metrics.
- Classes:
MetricType: An enum for the type of metric. ErrorMessage: An enum for error messages. MetricsManager: A class to manage metrics in a Prometheus-based application.
- class core.metrics.ErrorMessage(value)¶
Bases:
enum.Enum
An enum for error messages.
This enum defines the error messages that can be raised.
- INVALID_METRIC_TYPE = 'Invalid metric type: {}'¶
- METRIC_DOES_NOT_EXIST = 'Metric {} does not exist'¶
- METRIC_EXISTS = 'Metric {} already exists'¶
- METRIC_TYPE_MISMATCH = 'Metric {} is not a {}'¶
- class core.metrics.MetricType(value)¶
Bases:
enum.Enum
An enum for the type of metric.
This enum defines the types of metrics that can be created.
- COUNTER = 'Counter'¶
- GAUGE = 'Gauge'¶
- HISTOGRAM = 'Histogram'¶
- SUMMARY = 'Summary'¶
- class core.metrics.MetricsManager(*args, **kwargs)¶
Bases:
object
A class to manage metrics in a Prometheus-based application.
This class provides methods to create, track, and update metrics.
- Variables
metrics (dict) – A dictionary of all metrics.
- create_metric(metric_type: core.metrics.MetricType, metric_name: str, description: str) None ¶
Creates a new metric.
This method will create a new metric of the specified type and stores it in the metrics dictionary.
- Parameters
metric_type (MetricType) – The type of the metric.
metric_name (str) – The name of the metric. Must be unique across all metrics.
description (str) – The description of the metric.
- Raises
ValueError – If the specified metric already exists or the metric type is invalid.
- get_metric(metric_name: str, metric_type: core.metrics.MetricType) Union[prometheus_client.metrics.Counter, prometheus_client.metrics.Gauge, prometheus_client.metrics.Histogram, prometheus_client.metrics.Summary] ¶
Returns the metric with the specified name and type.
- Parameters
metric_name (str) – The name of the metric.
metric_type (type) – The type of the metric.
- Returns
The metric with the specified name and type.
- Return type
Union[Counter, Gauge, Histogram, Summary]
- Raises
ValueError – If the specified metric does not exist or the metric type is invalid.
- get_metrics() Dict[str, Union[prometheus_client.metrics.Counter, prometheus_client.metrics.Gauge, prometheus_client.metrics.Histogram, prometheus_client.metrics.Summary]] ¶
Returns the dictionary of all metrics.
- Returns
The dictionary of all metrics.
- Return type
dict
- increment_counter(metric_name: str, by: float = 1.0) None ¶
Increments a Counter metric by the specified amount.
- Parameters
metric_name (str) – The name of the Counter metric to increment.
by (int, optional) – The amount to increment by. Default is 1.
- Raises
ValueError – If the specified metric does not exist or is not a Counter.
- latency_decorator(metric_name: str) Callable[[...], Callable[[...], Any]] ¶
Returns a decorator that tracks the latency of the decorated function.
- Parameters
metric_name (str) – The name of the Histogram metric to track the latency.
- Returns
- A decorator that tracks the latency of the decorated
function.
- Return type
Callable[…, Callable[…, Any]]
- Raises
ValueError – If the specified metric does not exist or is not a Histogram.
- observe_histogram(metric_name: str, value: float) None ¶
Observes a value in a Histogram metric.
- Parameters
metric_name (str) – The name of the Histogram metric to observe a value in.
value (float) – The value to observe.
- Raises
ValueError – If the specified metric does not exist or is not a Histogram.
- observe_summary(metric_name: str, value: float) None ¶
Observes a value in a Summary metric.
- Parameters
metric_name (str) – The name of the Summary metric to observe a value in.
value (float) – The value to observe.
- Raises
ValueError – If the specified metric does not exist or is not a Summary.
- set_gauge(metric_name: str, value: float) None ¶
Sets a Gauge metric to the specified value.
- Parameters
metric_name (str) – The name of the Gauge metric to set.
value (float) – The value to set the Gauge to.
- Raises
ValueError – If the specified metric does not exist or is not a Gauge.
core.model module¶
A Model module.
This module contains the definition of the ModelInfo class, which encapsulates all relevant information about an Inference model in a machine learning pipeline.
In addition, ModelInfo instances can be easily serialized to dictionaries for further processing or storage, by using Python’s built-in dict() function.
- Classes:
ModelMetrics: A class that encapsulates the performance metrics of the inference model. ModelDetails: A class that encapsulates the details of the inference model. ModelInfo: A class for managing information about an Inference model. Model: A class for managing an Inference model.
- class core.model.Model(model_provider: str, model_info_url: str, model_id: Optional[str] = None)¶
Bases:
object
A class for managing an Inference model.
This class defines the information of the inference model, including model information and model binary.
- Variables
_model_info (ModelInfo) – The information of the model.
_model_binary (bytes) – The binary of the model.
- property model_binary: bytes¶
The binary of the model.
- Type
bytes
- property model_info: core.model.ModelInfo¶
The information of the model.
- Type
- class core.model.ModelDetails(name: str, version: str, framework: str, target: str, dtype: str)¶
Bases:
object
A class that encapsulates the details of the inference model.
This class defines the details of the inference model, including name, version, framework, target and data type.
- Variables
_name (str) – The name of the model.
_version (str) – The version of the model.
_framework (str) – The framework used to train the model.
_target (str) – The target of the model.
_dtype (str) – The data type of the model.
- property dtype: str¶
The data type of the model.
- Type
str
- property framework: str¶
The framework used to train the model.
- Type
str
- property name: str¶
The name of the model.
- Type
str
- property target: str¶
The target of the model.
- Type
str
- property version: str¶
The version of the model.
- Type
str
- class core.model.ModelInfo(model_id: str, path: str, details: core.model.ModelDetails, uploaded_date: datetime.datetime, metrics: core.model.ModelMetrics)¶
Bases:
object
A class for managing information about an Inference model.
This class defines the information of the inference model, including ID, details, uploaded date and performance metrics.
- Variables
_id (str) – The ID of the model.
_details (ModelDetails) – The details of the model.
_uploaded_date (datetime) – The uploaded date of the model.
_metrics (ModelMetrics) – The performance metrics of the model.
- property details: core.model.ModelDetails¶
The details of the model.
- Type
- property id: str¶
The ID of the model (string of UUID).
- Type
str
- property metrics: core.model.ModelMetrics¶
The performance metrics of the model.
- Type
- property path: str¶
The path of the model.
- Type
str
- property uploaded_date: datetime.datetime¶
The creation date of the model.
- Type
datetime
- class core.model.ModelMetrics(accuracy: float, precision: float, recall: float, f1_score: float, loss: float)¶
Bases:
object
A class that encapsulates the performance metrics of the inference model.
This class defines the performance metrics of the inference model, including accuracy, precision, recall, F1 score and loss value.
- Variables
_accuracy (float) – The accuracy of the model.
_precision (float) – The precision of the model.
_recall (float) – The recall of the model.
_f1_score (float) – The F1 score of the model.
_loss (float) – The loss value of the model.
- property accuracy: float¶
The accuracy of the model.
- Type
float
- property f1_score: float¶
The F1 score of the model.
- Type
float
- property loss: float¶
The loss value of the model.
- Type
float
- property precision: float¶
The precision of the model.
- Type
float
- property recall: float¶
The recall of the model.
- Type
float
core.pipeline module¶
A Pipeline module.
This module contains the definition of the Pipeline class and PipelineManager class.
- Classes:
Pipeline: A Class that encapsulates the pipeline information. PipelineManager: A class that manage the pipeline with runtime database.
- class core.pipeline.Pipeline(provider: core.stream.StreamProvider, infer_engine_info: core.infereng.InferenceInfo)¶
Bases:
object
A Class that encapsulates the pipeline information.
This class defines the pipeline information, including the pipeline ID, stream provider and inference engine information.
- Variables
_id (str) – The pipeline ID.
_provider (StreamProvider) – The stream provider of pipeline.
_infer_engine_dict (dict) – The dict of inference engines inforamtion of pipeline.
- property id: str¶
The pipeline ID (string of UUID).
- Type
str
- class core.pipeline.PipelineManager(db: core.rtdb.RuntimeDatabaseBase)¶
Bases:
object
A class that manage the pipeline with runtime database.
This class manage the pipeline with runtime database, provides the register_pipeline and unregister_pipeline methods.
- Variables
_db (RuntimeDatabaseBase) – The runtime database used by pipeline manager.
- PIPELINE_TABLE = 'Pipeline-table'¶
- clean_infer_engine(infer_info_id: str) None ¶
Clean inference engine in infer_engine_dict.
- Parameters
infer_info_id (str) – The id of inference info to clear inference fps.
- Raises
ValueError – Propagates the ValueError raised by get_all_table_objects_dict or save_table_object_dict if some cases are met.
- register_pipeline(pipeline_obj: core.pipeline.Pipeline) None ¶
Register a new pipeline.
- Parameters
pipeline_obj (Pipeline) – The pipeline object to register.
- Raises
ValueError – Propagates the ValueError raised by save_table_object_dict if some cases are met.
- set_infer_fps(pipeline_id: str, infer_info: core.infereng.InferenceInfo, infer_fps: int) None ¶
Set inference fps for a pipeline.
- Parameters
pipeline_id (str) – The id of pipeline to set inference fps.
infer_info (InferenceInfo) – The inference info to set inference fps.
infer_fps (int) – The inference fps to set.
- Raises
ValueError – Propagates the ValueError raised by get_table_object_dict or save_table_object_dict if some cases are met.
- unregister_pipeline(pipeline_id: str) None ¶
Unregister an existing pipeline.
- Parameters
pipeline_id (str) – The id of pipeline to unregister.
- Raises
ValueError – Propagates the ValueError raised by del_table_object if some cases are met.
core.crypto.qat module¶
A QAT module.
This module provides a class to use Intel qat to accelerate compression/decompression of frames. It provides methods to init, set up, teardown and close QATZip session, as well as compression and decompression using QAT.
- Classes:
QATZip: A class that use QAT to accelerate frame compression and decompression.
- class core.crypto.qat.QATZip(direction: int = 2, comp_lvl: int = 1, algorithm: int = 8)¶
Bases:
core.crypto.zip.ZipBase
A class that use QAT to accelerate frame compression and decompression.
- Variables
_qziplib (CDLL) – The QATZip library.
_direction (int) – The QATZip work way.
_comp_lvl (int) – The QATZip compression level.
_algorithm (int) – The QATZip Compression/decompression algorithm.
_session (QzSession) – The QATZip Session.
_common_session (QzSessionParamsCommon) – The QATZip session with common parameters.
_deflate_params (QzSessionParamsDeflate) – The QATZip session with deflate parameters.
- ALGO_DEFLATE = 8¶
- DIR_BOTH = 2¶
- DIR_COMPRESS = 0¶
- DIR_DECOMPRESS = 1¶
- EXPANSION_RATIO = [5, 20, 50, 100]¶
- LAST = 1¶
- LEVEL_BEST = 9¶
- LEVEL_FAST = 1¶
- SW_BACKUP = 1¶
- property algorithm: int¶
The QATZip compression/decompression algorithm.
- Type
int
- close_session() None ¶
Terminates a QATZip session.
This method closes the connection with QAT.
- Raises
RuntimeError – If any error occurs while closing the session.
- property comp_lvl: int¶
The QATZip compression level.
- Type
int
- compress(src: bytes) bytes ¶
See base class.
- decompress(src: bytes) bytes ¶
See base class.
- property direction: int¶
Compress or decompress or both.
- Type
int
- init_session() None ¶
Initialize QAT hardware.
- Raises
RuntimeError – If any errors during the initialization of QAT.
- set_session() None ¶
Initialize a QATZip session with deflate parameters.
- Raises
RuntimeError – If any error occurs during the session set up.
- teardown_session() None ¶
Uninitialize a QATZip session.
This method disconnects a session from a hardware instance and deallocates buffers. If no session has been initialized, then no action will take place.
- Raises
RuntimeError – If any error occurs during the session teardown.
core.crypto.qat_params module¶
core.rtdb module¶
A RuntimeDatabase module.
This module provides an object-oriented design for Runtime Database to cache the runtime information and use in-memory datbase services (like Redis) as backend service.
- Classes:
RuntimeDatabaseBase: An abstract base class to manage runtime database tables. RedisDB: A concrete class implementing the RuntimeDatabaseBase using Redis as backend service.
- class core.rtdb.RedisDB¶
Bases:
core.rtdb.RuntimeDatabaseBase
Redis backend implementation for runtime database.
This class implements connect, save_table_object_dict, get_table_object_dict, get_all_table_objects_dict, check_table_object_exist, del_table_object methods defined in RuntimeDatabaseBase abstract base class for Redis in-memory datbase backend.
- Variables
_conn (redis.Redis) – The Redis connection object.
_lock (Lock) – The lock for Redis connection.
- check_table_object_exist(table: str, obj: str) bool ¶
See base class.
- connect(host: str = '127.0.0.1', port: int = 6379, db: int = 0) None ¶
The connect method for the runtime database of Redis backend implementation.
The method overrides the connect method defined in the RuntimeDatabaseBase abstract base class. The main difference is raises.
- Raises
redis.exceptions.ConnectionError – If connection to the Redis server fails and reconnection exceeds the limit.
- del_table_object(table: str, obj: str) None ¶
See base class.
- get_all_table_objects_dict(table: str) dict ¶
See base class.
- get_table_object_dict(table: str, obj: str) dict ¶
See base class.
- lock(name)¶
Lock for redis connection.
- save_table_object_dict(table: str, obj: str, d: dict) None ¶
See base class.
- unlock()¶
Unlock for redis connection.
- class core.rtdb.RuntimeDatabaseBase¶
Bases:
abc.ABC
An abstract base class to manage runtime database tables.
This class serves as a blutprint for subclasses that need to implement connect, save_table_object_dict, get_table_object_dict, get_all_table_objects_dict, check_table_object_exist, del_table_object methods for different types of in-memory database services.
- MAX_RECONNECTION_TIMES = 5¶
- abstract check_table_object_exist(table: str, obj: str) bool ¶
Check whether a given object exists in a given table.
This method is used to check whether a given object exists in a given table.
- Parameters
table (str) – The name of the table.
obj (str) – The name of the object.
- Returns
True if a given object exists in a given table, otherwise False.
- Return type
bool
- Raises
NotImplementedError – If the subclasses don’t implement the method.
ValueError – If table or obj is None.
- abstract connect(host: str, port: int)¶
Connect to runtime database.
This method is used to connect to runtime database, will attempt to reconnect when a connection error occcurs.
- Parameters
host (str) – The host ip or hostname of runtime database.
port (int) – The port of runtime database.
- Raises
NotImplementedError – If the subclasses don’t implement the method.
- abstract del_table_object(table: str, obj: str) None ¶
Delete an object from a given table.
This method is used to delete an object from a given table.
- Parameters
table (str) – The name of the table.
obj (str) – The name of the object.
- Raises
NotImplementedError – If the subclasses don’t implement the method.
ValueError – If table or obj is None.
- abstract get_all_table_objects_dict(table: str) dict ¶
Get all dict values from a table.
This method is used to get all dict values from a table.
- Parameters
table (str) – The name of the table.
- Returns
All dict values get by the table name.
- Return type
dict
- Raises
NotImplementedError – If the subclasses don’t implement the method.
ValueError – If table is None.
- abstract get_table_object_dict(table: str, obj: str) dict ¶
Get a dict value for an object from a table.
This method is used to get a dict value for an object from a table.
- Parameters
table (str) – The name of the table.
obj (str) – The name of the object.
- Returns
The dict value get by table name and object name.
- Return type
dict
- Raises
NotImplementedError – If the subclasses don’t implement the method.
ValueError – If table or obj is None.
- abstract save_table_object_dict(table: str, obj: str, d: dict) None ¶
Save a dict value for an object in a table.
This method is used to save a dict value for an object in a table.
- Parameters
table (str) – The name of the table.
obj (str) – The name of the object.
d (dict) – The value to be saved.
- Raises
NotImplementedError – If the subclasses don’t implement the method.
ValueError – If table or obj is None.
core.stream module¶
A Stream module.
This module provides an object-oriented design for Stream provider to provide stream input.
- Classes:
StreamProvider: An abstract base class for creating custom stream provider implementations. CameraSource: A concrete class implementing the StreamProvider for camera source. FileSource: A concrete class implementing the StreamProvider for file source.
- functinos:
create_stream_from_type: Create StreamProvider instance according to type.
- class core.stream.CameraSource(name: str, pathname: str = '/dev/video0')¶
Bases:
core.stream.StreamProvider
A concrete class implementing the StreamProvider for camera source.
This class implement verify, read_raw_frame, open, close methods defined in StreamProvider abstract base class for providing stream input from camera.
- Variables
_device_obj (cv2.VideoCapture) – The OpenCV VideoCapture object.
- close() None ¶
See base class.
- open()¶
Open the stream for camera source stream provider.
The method override the open method defined in StreamProvider abstract base class. If the device node is not valid, or failed to open the camera, the method will raise ValueError or FileNotFoundError or IOError.
- Raises
ValueError – If the dev_num getted from path name is not valid.
FileNotFoundError – If the path name for camera is not found.
IOError – If failed to open the camera.
- read_raw_frame() numpy.ndarray ¶
See base class.
- verify() bool ¶
See base class.
- class core.stream.FileSource(name: str, pathname: str = 'classroom.mp4')¶
Bases:
core.stream.StreamProvider
A concrete class implementing the StreamProvider for file source.
This class implement verify, read_raw_frame, open, close methods defined in StreamProvider abstract base class for providing stream input from video file.
- Variables
_file_db (FileDatabase) – The file database of file source stream provider.
_file_object (cv2.VideoCapture) – The OpenCV VideoCapture object.
_file_path (str) – The video file path of file source stream provider.
_frame_counter (int) – The count of frames that have been read.
_max_frame (int) – The max frame count of file source stream provider.
- DEFAULT_TARGET_FPS = 25¶
- close() None ¶
See base class.
- property file_db: core.filedb.FileDatabase¶
The file database of file source stream provider.
- Type
- open()¶
Open the stream for file source stream provider.
The method overrides the open method defined in StreamProvider abstract base class. If the file path is not valid, or failed to open the file, the method will raise FileNotFoundError or TypeError.
- Raises
FileNotFoundError – If the path name for video file is not found.
TypeError – If failed to create the VideoCapture object.
- read_raw_frame() numpy.ndarray ¶
See base class.
- property target_fps: int¶
The target FPS of file source stream provider.
- Type
int
- verify() bool ¶
See base class.
- class core.stream.StreamProcessor(provider: core.stream.StreamProvider)¶
Bases:
object
The class to process stream.
- Variables
_provider (StreamProvider) – The stream provider to process.
- prepare() None ¶
Prepare the stream processor.
This function is used to prepare the stream processor, like verifying stream provider or registering the pipeline.
- property provider: core.stream.StreamProvider¶
The provider instance.
- Type
- class core.stream.StreamProvider(name: str, pathname: str)¶
Bases:
abc.ABC
An abstract base class for creating custom stream provider implementations.
The class serves as a blueprint for subclasses that need to implement verify, read_raw_frame, open, close methods for different types of stream providers.
- Variables
_name (str) – The name of stream provider.
_pathname (str) – The path name of stream provider.
_raw_size (Tuple[int, int]) – The raw size (width, height) of frame captured from stream.
_raw_fps (int) – The raw FPS from source.
_seq (int) – The sequence of stream provider.
_target_fps (int) – The target FPS.
- DEFAULT_FPS = 15¶
- DEFAULT_HEIGHT = 240¶
- DEFAULT_WIDTH = 320¶
- abstract close() None ¶
Close the stream.
The method is used to close the stream.
- Raises
NotImplementedError – If the subclasses don’t implement the method.
- property name: str¶
The name of stream provider.
- Type
str
- abstract open() None ¶
Open the stream.
The method is used to open the stream.
- Raises
NotImplementedError – If the subclasses don’t implement the method.
- property pathname: str¶
The path name of stream provider.
For example: - Camera: “/dev/video0” - File: “SampleVideo.mp4”
- Type
str
- property raw_fps: int¶
The raw FPS from source.
- Type
int
- property raw_size: Tuple[int, int]¶
The raw size (width, height) of frame captured from stream.
- Type
Tuple[int, int]
- abstract read_raw_frame() numpy.ndarray ¶
Get a frame from source.
This method is used to get a frame from source.
- Returns
An numpy.ndarray object representing the raw frame.
- Return type
numpy.ndarray
- Raises
NotImplementedError – If the subclasses don’t implement the method.
- property target_fps: int¶
The target FPS.
- Type
int
- abstract verify() bool ¶
Verify the provider’s measurement/quote/integrity.
This method is used to verify the provider’s measurement/quote/integrity.
- Returns
True if the verification success, False otherwise.
- Return type
bool
- Raises
NotImplementedError – If the subclasses don’t implement the method.
- core.stream.create_stream_from_type(type_name: str, name: str, pathname: str) core.stream.StreamProvider ¶
Create StreamProvider instance according to type.
The method is used to create StreamProvider instance according to type.
- Parameters
type_name (str) – The type of stream provider to create.
name (str) – The stream name of stream provider to create.
pathname (str) – The path name of stream provider to create.
- Returns
The created stream provider.
- Return type
- Raises
ValueError – If the provider source type is invalid.
core.streambroker module¶
A Streambroker module.
This module provides an object-oriented design for stream broker client to connect with stream broker server, publish frame to stream broker server.
- Classes:
StreamBrokerClientBase: An abstract base class for stream broker client. RedisStreamBrokerClient: A concrete class implementing the StreamBrokerClientBase
for Redis server.
- KafkaStreamBrokerClient: A concrete class implementing the StreamBrokerClientBase
for Kafka server.
- class core.streambroker.KafkaStreamBrokerClient¶
Bases:
core.streambroker.StreamBrokerClientBase
Kafka implementation for stream broker client.
This class implement connect, publish_frame methods defined in StreamBrokerClientBase abstract base class for Kafka stream broker.
- Variables
_conn (KafkaProducer) – The Kafka connection object.
- connect(host='127.0.0.1', port=9092)¶
Implement the connect method for the Kafka stream broker client.
The method overrides the connect method defined in StreamBrokerClientBase abstract base class. If connection to the Kafka server fails and reconnection exceeds the limit, raise NoBrokersAvailable error.
- Raises
kafka.errors.NoBrokersAvailable – if connection to the Kafka server fails and reconnection exceeds the limit.
- publish_frame(topic: str, frame: core.frame.Frame) None ¶
See base class.
- class core.streambroker.RedisStreamBrokerClient¶
Bases:
core.streambroker.StreamBrokerClientBase
Redis implementation for stream broker client.
This class implement connect, publish_frame methods defined in StreamBrokerClientBase abstract base class for Redis stream broker.
- Variables
_conn (redis.Redis) – The Redis connection object.
- connect(host: str = '127.0.0.1', port: int = 6379)¶
Implement the connect method for the Redis stream broker client.
The method overrides the connect method defined in StreamBrokerClientBase abstract base class. If connection to the Redis server fails and reconnection exceeds the limit, raise ConnectionError.
- Raises
redis.exceptions.ConnectionError – If connection to the Redis server fails and reconnection exceeds the limit.
- publish_frame(topic: str, frame: core.frame.Frame) None ¶
See base class.
- class core.streambroker.StreamBrokerClientBase¶
Bases:
abc.ABC
An abstract base class for stream broker client.
This class serves as a blueprint for subclasses that need to implement connect, publish_frame methods for different types of stream broker server.
- MAX_RECONNECTION_TIMES = 5¶
- abstract connect(host: str, port: int)¶
Connect to broker server.
This method is used to connect to stream broker server, will attempt to reconnect when a connection error occcurs.
- Parameters
host (str) – The host ip or hostname of broker server.
port (str) – The port of broker server.
- Raises
NotImplementedError – If the subclasses don’t implement the method.
- abstract publish_frame(topic: str, frame: core.frame.Frame) None ¶
Publish a frame to stream broker server for a topic.
- Parameters
topic (str) – The topic name to publish to.
frame (Frame) – The Frame to publish.
- Raises
NotImplementedError – If the subclasses don’t implement the method.
ValueError – If the topic or frame is None.
RuntimeError – If any errors while encoding before publishing frame.
Module contents¶
The core package.
This package contains the core components of CNAP, including the implementations of engines, processors, stream brokers, etc.