Easy Request/Response Recipe for AMPS

  Dec 17, 2013   |      Eric Mericle

request / response configuration use cases

The power of AMPS filtering allows your application to receive the messages it wants to process, and none of the messages it doesn’t. Client-side filtering is no longer required with AMPS. The ability to use Perl Compatible Regular Expressions (PCRE) to define filters on a topic, a message, or both allows your application to achieve an unrivaled level of precision in message delivery.

One example of this precision is an application which implements topic filtering to facilitate request-response messaging between a monitoring system and an agent for a large scale heartbeat system. Heartbeats are a pattern often used to monitor the health of one or more systems or services in a group of systems. In a heartbeat system a monitoring application can send out a heartbeat message (request) to one or more monitoring agents. The monitoring agents then send a message back to the monitoring application (response) indicating the health of the system. If a monitoring agent fails to respond within a specified window of time, the monitoring application will assume that the service is unavailable.

In a previous article, we introduced the selectivity granted by the content filtering and topic filtering in AMPS. It is recommended to read the article to be familiar with the concepts introduced there before diving into this article.

In this post, we give a brief overview of how AMPS implements topic filtering, and then build upon that knowledge to describe how to implement a simple request / response heartbeat monitoring system using AMPS topic filtering. We also provide an example in Java, and discuss how to enhance the system to be more (or less) selective by using different topic filters.

AMPS Topic Filtering

With AMPS, a client can use a regular expression to subscribe to topics that match a given pattern. This feature can be used in two different ways:

  • to subscribe to topics without knowing the topic names in advance. This is known as a “greedy subscription” and can be used to subscribe to many topics simultaneously. For example, the subscription to the topic foo.* would match topics “foo”, “fool”, “food” and “footie”.

  • to subscribe to topics that only match a very selective pattern. The simplest example of this is matching on an exact pattern, for example - we only want to return messages from the topic “Client”.

For our request-response heartbeat monitoring system, we will be focusing on greedy subscriptions.

The messages themselves have no notion of a topic pattern. The topic for a given message is unambiguously specified using a literal string. From the publisher's point of view, it is publishing a message to a topic; it is never publishing to a topic pattern.

Subscription topics are interpreted as regular expressions if they include special regular expression characters. Otherwise, they must be an exact match. Some examples of regular expressions within topics are included in table below.

Topic Behavior
trade matches only “trade”.
^client.* matches “client”, “clients”, “client001”, etc.
.*trade.* matches “NYSEtrades”, “ICEtrade”, etc.

Setting up the Example

Heartbeat messages consist of two parts - the request message (requesting a heartbeat) and the response message (letting the requester know that it is still alive). In each of these messages, there is a “source” and a “destination”. In the heartbeat request message, the source is a heartbeat monitoring application and the destination is the agent that is being monitored by the monitoring application.

Monitor subscribes to ^A<-.*, while the agent subscribes to .*->A$

With this information, we want to conceive of a format for our message topic that can be used to control the flow of messages from the source to the destination, regardless if it is a request or a response message. This lends nicely to a topic format that follows:

<MACHINE_1> <ROUTING_INSTRUCTION> <MACHINE_2>

In this topic format, we can define MACHINE_1 to be a source or a destination depending on the ROUTING_INSTRUCTION we use. We will use the following convention to define the ROUTING_INSTRUCTION

  • When -> is the ROUTING_INSTRUCTION, this is a request message and the topic can be read as:

    <SOURCE> -> <DESTINATION>

    Where <SOURCE> is the monitoring application and <DESTINATION> is the heartbeat agent.

  • When <- is the ROUTING_INSTRUCTION, this is a response message and the topic can be read as:

    <DESTINATION> <- <SOURCE>

    Where <SOURCE> is the heartbeat agent and the <DESTINATION> is the monitoring application.

With these tokens our routing message topics would look like M->A for the request messages, and M<-A for the response messages - where M represents the heartbeat monitoring system, and A represents the heartbeat agent.

We have included an implementation of the `Monitor` and `Agent` classes in Java along with our sample, and will be presenting code samples from that project. The full project, along with instructions in the README file for building and running are available here: [01\_simple\_request\_response.zip](https://www.crankuptheamps.com/downloads/01\_simple\_request\_response.zip)

With our topic format defined, we’re ready to create the subscriptions for our monitoring system and our agent. The monitoring system will subscribe to topic ^M<-.* - which can be interpreted as “subscribe to all messages where the topic begins with the string M<-”.

Similarly, the agent will subscribe to the topic ->A$ - which can be interpreted as “subscribe to all messages where the topic ends with the string ->A”.

Regarding our message format, it's worth pointing out is that we chose to implement our Request/Response system using topic filtering, as opposed to content filtering. This decision is based entirely on the premise we would like to keep the routing rules in the message's topic separate from the message contents. In our particular example, our routing doesn't need to know anything about our message contents when making decisions about delivery.

An Example

By now you should see the beginnings of how such a system would work, but let’s walk through the lifespan of a single request-response to see the system in action!

The code samples included below come from the included demonstration. These are used to highlight some of the features of the heartbeat request-response system. Note that the Monitor and Agent classes are implemented in their entirety in the included demonstration. Only highlights from those classes are shown here.

To begin the lifespan of the heartbeat message, the system is put into motion when the Monitor class (M) publishes a heartbeat with a timestamp message to topic M->A.

// Monitoring System publish heartbeat message.
  Date d = new Date();
  SimpleDateFormat sdf = new SimpleDateFormat(
    "MM/dd/yyyy h:mm:ss a");
  msg = "Heartbeat!  Timestamp: " + sdf.format(d);
  System.out.println(
             "**************************\n" +
             "Monitor - sending message:" +
             "\n\tmessage: " + msg + "\n" +
             "**************************\n");
  monitorClient.publish("M->A", msg);

The Agent class (A)- which is subscribed to Topic ->A$ - receives the message, recognizes this is a request topic because of the presence of the -> token, parses the prefix of the Topic field, and then constructs a response message to be returned to M. A publishes the response message to topic M<-A.

// Agent subscription.
  AgentMessageHandler mh = new AgentMessageHandler();
  agent.subscribe(mh, "->A$", 10000);

...

  static class AgentMessageHandler implements MessageHandler{
     public void invoke(Message m){
        // Parse the incoming message.
        String topic = m.getTopic();
        TopicParse parse = TopicParser.parse(topic);
        System.out.println(
           "**************************\n" +
           "Agent - received message:" +
           "\n\t type: " + 
              (parse.type == "<-" ? "response" : "request") +
           "\n\t message: " + m.getData() + "\n" +
           "**************************\n");
        try{
           // Construct and send the response message.
           Date d = new Date();
           SimpleDateFormat sdf = new SimpleDateFormat(
              "MM/dd/yyyy h:mm:ss a");
           String msg = "Response: " + sdf.format(d);
           System.out.println(
               "**************************\n" +
               "Agent - Sending response: " +
               "\n\t message: " + msg + "\n" +
               "**************************\n");
           agent.publish(parse.prefix + "<-" + 
              parse.postfix, msg);
        } catch(Exception ex){
           System.err.println("Agent exception: " + 
                 ex.toString());
           ex.printStackTrace();
        }
     }
  }

Monitor M - who is subscribed to Topic ^M<-.*- receives the response message, recognizes this as a response topic because of the presence of the <- symbol, parses the postfix of the Topic field to see that it is from A and completes processing the heartbeat.

// Monitoring System subscription.
  MonitorMessageHandler mh = new MonitorMessageHandler();
  monitorClient.subscribe(mh, "^M<-.*", 10000);

 ...

  static class MonitorMessageHandler implements MessageHandler{
     public void invoke(Message m){
        String topic = m.getTopic();
        TopicParse parse = TopicParser.parse(topic);
        System.out.println(
           "**************************\n" +
           "Monitor - received message:" +
           "\n\t type: " + 
              (parse.type == "<-" ? "response" : "request") +
           "\n\t message: " + m.getData() + "\n" +
           "**************************\n");
     }
  }

In our example, processing the message means that we write a message to the console noting that the heartbeat request and response message have been received. In a more sophisticated system, additional logging and processing could take place.

class TopicParser{
     void TopicParser(){;}

     public static TopicParse parse(String topic){
        TopicParse parse = new TopicParse();

        if (topic.indexOf("->") >= 0){
            // We have a request message
            parse.type = "->";
            parse.prefix = topic.substring(0, 
                topic.indexOf("->"));
            parse.postfix = topic.substring(
                topic.indexOf("->")+2, 
                topic.length());
        }
        else if (topic.indexOf("<-") >= 0){
            // We have a response message.
            parse.type = "<-";
            parse.prefix = topic.substring(0, 
                topic.indexOf("<-"));
            parse.postfix = topic.substring(
                    topic.indexOf("<-")+2, 
                    topic.length());
        }
        else{
            // We don't recognize this message type.
            parse.type = "unknown";
            parse.prefix = null;
            parse.postfix = null;
        }
        return parse;
    }
}

For completeness and to show how simple it is to create a class to parse the messages manually using our routing tokens, the TopicParser class has been included above. In our example, each of the three cases are enumerated in the if / else structure, and populate a TopicParse object, which is then returned as a result of the parse() method. The actions taken as a result of the parse function can easily be replaced with more advanced features, but that will be an exercise which is left to the reader.

Conclusion

While the example here is a simple one, it would be trivial to add even more powerful features such as: listener notifications when heartbeats are received / missed, processing messages once and only once, detectng messages received out of order. Through the powerful combination of AMPS Topic filtering, PCRE, and some creativity in combining these two features - we are able to architect a solution that provides a simple heartbeat request-response system in AMPS.

Next Steps

Try it yourself to see how easy request/response messaging can be with AMPS! Download the sample project in Java, along with instructions in the README file for building and running the sample: 01_simple_request_response.zip


Read Next:   Come and See 60East at the Trading Show with Diablo Technologies