Distributed Publish & Subscribe for IoT
Hello world


Prerequisites

#include <dps/dbg.h>
#include <dps/dps.h>

The first step in creating a DPS application is to include the necessary header files.

Creating a node

const char *separators = "/.";
DPS_KeyStore* keyStore = NULL;
const DPS_KeyId* keyId = NULL;
DPS_Node* node = DPS_CreateNode(separators, keyStore, keyId);
if (!node) {
goto Exit;
}

Each entity in DPS is represented by a DPS_Node. The node may be a publisher, subscriber, both, or neither. For this example, we're going to be creating publisher and subscriber nodes.

Creating a node requires three parameters: the topic separators, a key store, and a key identifier. For now we're only concerned with the separators. Key stores and identifiers are covered later when discussing how to secure communications.

The separators parameter is a string containing the characters used as topic level separators. Providing /. as the separators parameter value allows both / and . as separators.

See also
DPS_SetNodeData(), DPS_GetNodeData()

Starting a node

DPS_Status ret = DPS_StartNode(node, mcastPub, NULL);
if (ret != DPS_OK) {
goto Exit;
}

Once created, a node must be started. Starting a node enables it to begin sending and receiving DPS messages in the network.

For this example, we are going to be sending and receiving multicast publications so we enable both and let DPS assign the listening port.

See also
DPS_MCAST_PUB_DISABLED, DPS_GetListenAddress()

Publishing

Creating a publication

if (!pub) {
goto Exit;
}
const char* topics[] = {
"a/b/c/d"
};
size_t numTopics = A_SIZEOF(topics);
int noWildCard = DPS_FALSE;
ret = DPS_InitPublication(pub, topics, numTopics, noWildCard, NULL, NULL);
if (ret != DPS_OK) {
goto Exit;
}

Each publication in DPS is represented by a DPS_Publication. Each publication has a set of topics, a UUID, and a sequence number. In this example we are creating a publication with one topic, a/b/c/d. The UUID is assigned by DPS and the sequence number will be incremented each time we publish.

The noWildCard parameter is used by the publisher to control whether a subscription is required to match the publication's topics exactly or can use wildcards to match the topics. If we set noWildCard to DPS_TRUE then only a subscription to a/b/c/d will receive this publication. This allows the publisher to prevent publications being sent to catchall subscriptions such as +/#. Since we set noWildCard to DPS_FALSE here, subscriptions to a/#, a/+/+/d, or similar variations will receive this publication.

Both the publication's key identifier and acknowledgement handler are set to NULL here; they are covered in later sections.

See also
DPS_SetPublicationData(), DPS_GetPublicationData(), DPS_PublicationGetNode(), DPS_PublicationGetUUID(), DPS_PublicationGetSequenceNum()

Sending a publication

const char* payload = "Hello";
size_t numPayloadBytes = strlen(payload) + 1;
int16_t ttl = 0;
ret = DPS_Publish(pub, (const uint8_t*)payload, numPayloadBytes, ttl);
if (ret != DPS_OK) {
goto Exit;
}

Once created and initialized with a set of topics, application payloads may be sent. Payload data is simply an array of bytes in DPS, no assumptions are made with regards to the payload format.

In this example the ttl parameter is zero, indicating that the publication will be sent best-effort to all active subscribing nodes. A non-zero ttl is referred to as a retained publication and is covered later.

A publisher may send additional publications via the same DPS_Publication. Each additional send increments the sequence number of the publication.

Subscribing

Creating a subscription

const char* topics[] = {
"a/b/c/d"
};
size_t numTopics = A_SIZEOF(topics);
DPS_Subscription* sub = DPS_CreateSubscription(node, topics, numTopics);
if (!sub) {
goto Exit;
}

Each subscription in DPS is represented by a DPS_Subscription. In this example we are creating a subscription with one topic with no wildcards, a/b/c/d.

Wildcards may be used to match a broader set of topics. A + matches any single topic level, and a # matches all topic levels from that level on. In this instance since the publisher is allowing wildcard matching, the subscriber could use either a/b/+/d or a/# (among others) as the topic and still receive the publication.

A subscription may also be created with multiple topics. The publication must include all of the topics to be received.

See also
DPS_SetSubscriptionData(), DPS_GetSubscriptionData(), DPS_SubscriptionGetNode()

Receiving a publication

ret = DPS_Subscribe(sub, PublicationHandler);
if (ret != DPS_OK) {
goto Exit;
}

Publications are received asynchronously. The first step in receiving a publication is to provide the publication handler to DPS and start the subscription. The publication handler will be called for each received publication.

Note
Each instance of DPS_Node creates and runs its own thread. The lifetime of this thread is the same as the lifetime of the node. The publication handler is dispatched from this thread.
static void PublicationHandler(DPS_Subscription* sub, const DPS_Publication* pub,
uint8_t* payload, size_t numPayloadBytes)
{
size_t i;
for (i = 0; i < DPS_SubscriptionGetNumTopics(sub); ++i) {
const char* topic = DPS_SubscriptionGetTopic(sub, i);
DPS_PRINT("subscription topic[%ld]=%s\n", i, topic);
}
const DPS_UUID* uuid = DPS_PublicationGetUUID(pub);
DPS_PRINT("uuid=%s\n", DPS_UUIDToString(uuid));
uint32_t n = DPS_PublicationGetSequenceNum(pub);
DPS_PRINT("sequence number=%d\n", n);
for (i = 0; i < DPS_PublicationGetNumTopics(pub); ++i) {
const char* topic = DPS_PublicationGetTopic(pub, i);
DPS_PRINT("publication topic[%ld]=%s\n", i, topic);
}
DPS_PRINT("payload=%.*s\n", numPayloadBytes, payload);
}

This publication handler exercises the APIs for retrieving the subscription and publication information.

Acknowledging

Acknowledgements provide an optional means for subscribers to reply to publications. For example, they may be used when the publication is logically a request and the acknowledgements are responses. Similar to publications, acknowledgements may include an application payload, and no assumptions are made by DPS with regards to the acknowledgement payload format.

Requesting an acknowledgement

if (!pub) {
goto Exit;
}
const char* topics[] = {
"a/b/c/d"
};
size_t numTopics = A_SIZEOF(topics);
int noWildCard = DPS_FALSE;
ret = DPS_InitPublication(pub, topics, numTopics, noWildCard, NULL,
AcknowledgementHandler);
if (ret != DPS_OK) {
goto Exit;
}

Requesting an acknowledgement is identical to Creating a publication, with the addition of the DPS_AcknowledgementHandler.

Sending an acknowledgement

const char* payload = "World";
size_t numPayloadBytes = strlen(payload) + 1;
DPS_Status ret = DPS_AckPublication(pub, (const uint8_t*)payload, numPayloadBytes);
if (ret != DPS_OK) {
goto Exit;
}
}

To determine if a publication has requested an ack, call DPS_PublicationIsAckRequested(). To send an acknowledgement, along with any optional acknowledgement payload, call DPS_AckPublication().

The pub parameter of the publication handler is only valid during the body of the handler. In order to acknowledge a publication after the handler has returned, the application must first call DPS_CopyPublication() to create a partial copy of the publication. The copy may be used after the handler returns.

Receiving an acknowledgement

static void AcknowledgementHandler(DPS_Publication* pub,
uint8_t* payload, size_t numPayloadBytes)
{
size_t i;
const DPS_UUID* uuid = DPS_PublicationGetUUID(pub);
DPS_PRINT("uuid=%s\n", DPS_UUIDToString(uuid));
uint32_t n = DPS_PublicationGetSequenceNum(pub);
DPS_PRINT("sequence number=%d\n", n);
for (i = 0; i < DPS_PublicationGetNumTopics(pub); ++i) {
const char* topic = DPS_PublicationGetTopic(pub, i);
DPS_PRINT("publication topic[%ld]=%s\n", i, topic);
}
DPS_PRINT("payload=%.*s\n", numPayloadBytes, payload);
}

Acknowledgements are received asynchronously. The acknowledgement handler will be called for each received acknowledgement.

This acknowledgement handler exercises the APIs for retrieving the publication information associated with the acknowledgement.

Note
The acknowledgement handler is dispatched from the DPS_Node's thread.