Keeping State in AMPS, Rebooted

  Jun 15, 2016   |      Tom Martens

stateless 60east python bookmark replay

keep track of where your subscription is at with this simple bookmark store -- image by Simon Cocks -- CC BY 2.0 In this post, we will revisit the topic of extending the AMPS client to provide a bookmark store for stateless clients. This post is in response to requests for a simple stateless store that does not provide all of the functionality of the local stores, but instead just makes it possible for an application with very simple needs to pick up a subscription after failover. The implementation that we will discuss here is fairly limited, but should provide a starting point for less restrictive implementations. Before we begin, I would like to encourage you to read No Filesystem? No Problem! Keeping State in AMPS in order to gain a deeper understanding of bookmark stores.

Update: This post describes an approach that was used with older versions of the AMPS clients. Current client versions include a recovery point adapter interface that can be used to store recovery points. Current clients also include an out-of-the-box recovery point adapter that can be used with a SOW topic.

For current client versions, 60East recommends using a recovery point adapter, as described at Bookmark State Without a Filesystem: Ultimate Director's Cut, rather than the approach in this blog.


Maintaining a Bookmark Store in AMPS

This implementation of the bookmark store will do two things:

  1. Load state from AMPS during subscription or re-subscription.
  2. Update the bookmark store as messages are processed by the client.

These two operations maintain the point at which a particular subscription should resume on recovery.

As with the bookmark store in the previous post, we can avoid state on the client by updating a State-of-the-World(SOW) topic with the bookmark of a message when processed by a subscriber. It is worth noting that the instance that contains the store SOW topic does not need to be the instance that the subscription is placed on.

Restrictions of this Implementation

As mentioned before, the implementation discussed in this post will be quite limited. The following restrictions will apply to this store:

  • Bookmark Live subscriptions are not supported.
  • Messages are required to be discarded in the exact order that they are received.
  • Replicated topics should not be used with this store.
  • You must use this store with an HAClient.
  • The client provided to the bookmark store MUST NOT be the client placing the subscription.

Bookmark Store Messages

The messages that will be sent to the SOW bookmark store are of the following form:

{"clientName":"trackedClient",
 "subId":"1",
 "bookmark":"13948331409633568391|1465596069899000013|"}

For this example we are using JSON, but any message type can be used. For a simple bookmark store, we only need these 3 pieces of information because AMPS enforces the following 3 rules:

  1. The client name must be unique, as is the case anytime you are using a transaction log.
  2. A subId must be unique to each client. The same subId can be used with different clients. If no subId is provided to AMPS, one is automatically assigned.
  3. The bookmark corresponds to a unique message in the transaction log. The bookmark value that we record in the bookmark SOW represents the last message processed by the subscriber.

A more complex bookmark store may require message fields to indicate if a message has been discarded, or if a persisted ack has been sent. Since we have designed this bookmark store for a very limited use case, we don’t need to worry about that here.

Now that we know what our messages will look like, it’s time to configure AMPS!

Configuring AMPS

The SOW needs to be configured so that we only have one record per clientName and subId pair. We do this by making these our Key values.

<SOW>
 <TopicDefinition>
    <Topic>/sample/bookmarkStore</Topic>
    <FileName>./data/sow/bookmark.sow</FileName>
    <MessageType>json</MessageType>
    <Key>/clientName</Key>
    <Key>/subId</Key>
 </TopicDefinition>
</SOW>

Since we are using a JSON message type, the server must have a transport configured that is able to accept JSON messages. Note that just because we are using JSON messages for the bookmark store does not mean that the message type must be JSON for the bookmark subscription.

Working with the Bookmark Store Interface

Unlike the previous blog article, we will not be calling method implementations used in other stores. Instead, we will define each method in the sow_bookmark_store class. The following methods will be implemented:

  • set_server_version(version) Internally used to set the server version so that the store knows how to deal with persisted acks and calls to get_most_recent(subid).
    • Though this method is required, we will not be using it because this implementation requires that messages be processed in-order. As such, acks will not be processed.
  • get_most_recent(subid) returns the most recent bookmark from the store. This bookmark should be used for (re-)subscriptions.
  • is_discarded(message) is called for each arriving message to determine if the application has already seen this bookmark. If it has, then the message should not be reprocessed.
    • Since we are requiring that messages are processed in-order, and this store does not provide any duplicate detection, this will always return False.
  • log(message) is used to log a bookmark into the store and return the bookmark sequence number. The bookmark sequence number is the internal location where the store recorded the bookmark for this message.
    • Since we will only ever have one record per clientName and subId pair, the sequence number does not matter. Thus, we will always return 1.
  • persisted(subid, bookmark) marks all bookmarks up to the provided one as replicated to all replication destinations for the give subscription.
    • This is only used for Bookmark Live subscriptions and Replication. Since our sample bookmark store will support neither, we can leave this unimplemented.
  • discard_message(message) marks a message as seen by the subscriber.
  • discard(subid, seqnumber) is deprecated, so we will not be implementing it.

Defining the sow_bookmark_store Class

Now we will be implementing the class for the SOW bookmark store. The bulk of the work will be done when we initialize the class. The init method will take 3 arguments in the following order:

  1. bookmark_client: This is the client that will become our internal client for the bookmark store. It must be connect and logged on.
  2. topic: The SOW topic defined in the config that will function as our bookmark store.
  3. tracked_client_name: The name of the client whose bookmarks this store manages.

Again, the bookmark_client does not need to be connected to to same AMPS server as the subscriptions being tracked. The client corresponding to the tracked_client_name must be an HAClient.

class sow_bookmark_store(object):
    def __init__(self, bookmark_client, topic, tracked_client_name):
        """ Class for creating and managing a SOW bookmark store

        :param bookmark_client: The client that will become the bookmark store internal client
        :type bookmark_client: AMPS.HAClient

        :param topic: The SOW topic defined in the config that will be used for the bookmark store.
        :type topic: string

        :param tracked_client_name: The name of the client whos bookmarks we will be storing.
        :type tracked_client_name: string

        :raises AMPS.AMPSException: if the internal client fails to query the SOW.

        """
        self._internalClient = bookmark_client
        self._trackedName = tracked_client_name
        self._topic = topic
        self._mostRecentBookmark = {}

        try:
            for message in self._internalClient.sow(self._topic, "/clientName = '%s'" % self._trackedName):
                if message.get_command() != 'sow':
                    continue

                data = message.get_data()
                bookmark_data = json.loads(data)

                if 'bookmark' in bookmark_data and 'subId' in bookmark_data:
                    self._mostRecentBookmark[bookmark_data['subId']] = bookmark_data['bookmark']
        except AMPS.AMPSException as aex:
            raise AMPS.AMPSException("Error reading bookmark store", aex)

The __init__ method is responsible for getting the most recent bookmark for all subIds corresponding to the tracked_client_name. This operation is performed in __init__ as opposed to get_most_recent(subid) as a performance enhancement. Instead of issuing a SOW query for each subId, we can issue one SOW query and create a dictionary from the results. Doing the work in __init__ also allows us to throw an exception if the _internalClient is not able to reach the server.

Subscribing and Recovering

Upon subscribing or recovering, get_most_recent(subid) will be called. This method is responsible for returning the last bookmark processed for the corresponding subscription identifier.

def get_most_recent(self, subid):
        """ Returns the most recent bookmark from the store that ought to be used for (re-)subscriptions.

        :param subid: The id of the subscription to check.
        :type subid: string

        :returns: mostRecentBookmark[subid] or '0'

        """
        # if we have a most recent value for that subId, then we'll return it
        # if not, we return EPOCH
        if subid in self._mostRecentBookmark:
            return self._mostRecentBookmark[subid]
        else:
            return '0'

This method will simply check the _mostRecentBookmark dictionary for the subid. If we find it, we will return the bookmark stored in the dictionary. If we do not find that key, then we assume that this is a brand new subscription. As such, we return EPOCH.

Publishing to the Bookmark Store

Since one of the requirements is that the message will be processed in the order that they are received, we can tell the bookmark store that a message was processed by the subscriber any time discard(message) is called. To mark a message as processed, all we need to do it publish the bookmark of that message to the store using the message format mentioned above.

def discard_message(self, message):
        """ Mark a message as seen by the application.

        :param message: The message to mark as seen.
        :type message: AMPS.Message

        :raises AMPS.AMPSException: if the internal client cannot publish to the server.
        """
        subid = message.get_sub_id()
        bookmark = message.get_bookmark()
        if bookmark is None or subid is None:
            return
        msg = '{"clientName": "%s", "subId": "%s", "bookmark": "%s"}' % (self._trackedName, subid, bookmark)
        try:
            self._internalClient.publish(self._topic, msg)
            self._mostRecentBookmark[subid] = bookmark
        except AMPS.AMPSException as aex:
            raise AMPS.AMPSException("Error updating bookmark store", aex)

The subscription identifier and bookmark for the message being passed to this method can be retrieved by calling get_sub_id() and get_bookmark(), respectively, on the message object. You will notice that this method is called discard_message(message) not discard(message), but the client will still call discard(message). Before publishing, we check that subid and bookmark are set, this is to prevent messages from entering our bookmark store that should not. For example, a subscriber calling discard on every message that it sees could update the bookmark store with a message that does not contain a bookmark: if we were to save that message to the store, we might store an empty or invalid bookmark and recover from the wrong point.

The log Method

For bookmark stores that will support out-of-order message processing, log(message) is responsible for assigning a sequence number to a bookmark, then publishing this information to the bookmark store. The method then returns the sequence number that it assigned to the message.

def log(self, message):
        """ Log a bookmark to the store.

        :param message: The message to log in the store
        :type message: AMPS.Message

        :returns: The corresponding bookmark sequence number for this bookmark.

        """
        # since we only ever have one SOW record per _trackedName and subId pair, this can
        # always return '1'
        return '1'

In this bookmark store, we are requiring that messages are processed in-order. Since this is the case, we need not need to assign a unique sequence number to each message. Instead, we can return ‘1’ since there will be at most one message for each _trackedName and subscription identifier pair.

Checking if a Message is Discarded

In a bookmark store that supports out-of-order message processing, is_discarded(message) will return a boolean value indicating if a message has been discarded or not. During replay, AMPS checks if a message is discarded before delivering it. This is to prevent messages that have already be processed by the application from being delivered again.

def is_discarded(self, message):
        """ Called for each arriving message to determine if the application has already seen this bookmark and
        should not be reprocessed. Returns 'true' if the bookmark should not be re-processed, false otherwise.

        :param message: The message to check
        :type message: AMPS.Message

        :returns: True or False

        """
        # since messages are being processed in order, we never see a discarded message.
        return False

As mentioned before, the bookmark store that we are building is designed to process messages in the order that they are received. As such, we will never have a situation where a message that is discarded will be sent to a subscriber. This being the case, we can always return False.

This concludes the methods that will need to be implemented. However there are 2 more methods that will need to exist.

Unimplemented Methods

The first of these methods is set_server_version(version). This method is used to tell AMPS how to handle persisted acks. Based on the in-order processing of messages, we will not need to concern ourselves with acking. For this reason we can leave this method unimplemented.

def set_server_version(self, version):
        """ Internally used to set the server version so the store knows how to deal
            with persisted acks and calls to get_most_recent().

        :param version: The version of the server being used.
        :type version: int

        """
        pass

The next method is persisted(subid, bookmark). This method is used to mark all bookmarks prior to the provided one as persisted. Persisted acks are necessary for Bookmark Live subscriptions, but our implementation will not support this functionality. With that in mind, we do not need to concern ourselves with implementing this method.

def persisted(self, subid, bookmark):
        """ Mark all bookmarks up to the provided one as replicated to all replication destinations
        for the given subscription.

        :param subid: The subscription Id to which to bookmark applies
        :type subid: string

        :param bookmark: The most recent bookmark replicated everywhere.
        :type bookmark: string

        """
        # Bookmark Live and Replication are not supported, so this does nothing.
        pass

It is also worth noting here that there was previously an option to discard based on subscription identifier and sequence number. This method has been deprecated and should not be implemented.

Using the Store

To use the bookmark store we will need to follow these simple steps:

  1. Create a client for the bookmark store to use. This client must be connected and logged on to the instance that contains the SOW topic for the bookmark store. This need not be the instance that contains the topic the application will subscribe to.
  2. Construct an HAClient for your application to use.
  3. Set the bookmark store for the HAClient.
  4. Call discard(message) on each message when your application is done with it.
class handler:
    def __init__(self, client):
        self._client = client

    def __call__(self, message):
        print message.get_data()
        self._client.discard(message)


bkmrkchooser = AMPS.DefaultServerChooser()
bkmrkchooser.add("tcp://localhost:9007/amps/json")

bkmrkclient = AMPS.HAClient("bkmrk")
bkmrkclient.set_server_chooser(bkmrkchooser)
bkmrkclient.connect_and_logon()

chooser = AMPS.DefaultServerChooser()
chooser.add("tcp://localhost:9007/amps/json")

haclient = AMPS.HAClient("haclient")
haclient.set_bookmark_store(sow_bookmark_store(bkmrkclient, "/sample/bookmarkStore", "haclient"))
haclient.set_server_chooser(chooser)
haclient.connect_and_logon()

haclient.bookmark_subscribe(handler(haclient), "orders", "recent")

Every time haclient processes a message via the handler class, self._client.discard(message) will be called. This will keep the bookmark store up-to-date with the messages processed by the subscriber.

Closing Thoughts

Most bookmark stores protect against duplicate message delivery. The store that we created in this post does not, and it can cause duplicate messages to be sent to your application on recovery. One way this can happen is when the AMPS instance containing the bookmark store becomes unavailable. This would result in the store implementation being unable to update the SOW that contains the bookmarks.

If, at this point, the subscriber were to restart, the store would be one message behind. If more than one message has been processed before the subscriber restarts, the store would be further behind.

With that in mind, if your application can tolerate duplicate messages, then this simple implementation should work for you!

Get the code

The bookmark store implementation, a configuration file that includes the SOW configuration, and a simple sample program can be downloaded here.


Read Next:   bFAT: Messaging with Extra Cheese