New Feature: Keep It All Together With CorrelationId

  Feb 9, 2015   |      Dirk Myers

amps messaging correlation response routing

Say hello to a new feature!One of the best things about AMPS is the way that it keeps publishers completely independent from subscribers. Publishers don’t need to know how many subscribers are listening for a message, where they are, or even whether they’re connected at a given point in time. That flexibility pays off: once publishers are set up, you can use the same message stream for any number of different applications, without changing the publisher.

Every now and then, though, this decoupling has a disadvantage. Like helping subscribers identify which messages belong together, even if they’re on a different subscription. Or communicating metadata, like a response address, that isn’t part of the original message.

One way to do this is to enrich the message. With this approach, the publisher can add an extra field with the correlation information to the message before sending it. The subscriber then uses the extra field to figure out which messages go together. There are some real advantages to message enrichment: because the correlation information is part of the message, AMPS can filter on that field, which can increase the precision of your subscriptions.

There are some drawbacks to enrichment, though. You need to add that field to every message when you publish the message. If your AMPS publisher isn’t the original source of the message, this adds extra work because your publisher needs to parse and reassemble the message. Likewise, your subscriber may need that correlation information to figure out how to handle the message, which may mean parsing the message before parsing is required, or (even worse) parsing the message more than once. For lots of applications, enrichment adds minimal overhead, and the added filtering capability makes enrichment a great option. For other applications, though, it’s expensive or impossible to enrich messages. What then?

In AMPS 4.0, there’s another option. We’ve added an optional CorrelationId header for messages published to AMPS. The publisher sets the CorrelationId before sending the message. AMPS passes along the CorrelationId to the subscriber, without parsing, altering, changing, folding, spindling, or mutilating the CorrelationId.

Code it Up

Enough introduction, let’s look at the code. First, a simple publisher sets a value on the CorrelationId:

# Set up a subscription to wait for replies.

def get_replies(message):
    print "Got a response %s on topic %s" % \
          (msg.get_data(), msg.get_topic())

client.subscribe(get_replies,"reply-to-[0-9]")

# Publish messages

command = AMPS.Command("publish").set_topic("all-the-things")

# use the CorrelationId as a reply to topic
for i in range(1,10):
   command.set_correlation_id("reply-to-%d" % i)
   command.set_data('{"message":"hello, subscriber!"}')
   # because we don't check for a response, we use
   # execute_async with no message handler
   client.execute_async(command, None)

# wait for replies
while True:
   time.sleep(1)

First, the publisher sets up a subscription to process replies. It’s important to make sure that the subscription is running before sending any messages, because subscribers may begin replying before all of the messages are sent.

To send the messages, the publisher first creates a publish Command that will hold the common information for all o the publish requests. Each time through the loop, the publisher fills in a distinct CorrelationId and the message data, then publishes the message.

Once all the messages are published, the subscribe waits for replies – each on a different topic communicated to the subscriber in the CorrelationId.

That’s all there is to it. AMPS doesn’t process the CorrelationId, so there’s no need to provide it in any particular format. For the subscriber, we just receive the messages and print what we get. Because, in this case, we use the CorrelationId as the topic to reply to, we just send a message back to the specified topic:

for message in client.subscribe("all-the-things"):
   print "Correlation: '%s' on message '%s'" % \
        (message.get_correlation_id(), message.get_data())
   client.publish(message.get_correlation_id(), \
                  '{"response":"hi, world!"}')

The output of the subscriber is shown below:

Correlation: 'reply-to-1' on message '{"message":"hello, subscriber!"}'
Correlation: 'reply-to-2' on message '{"message":"hello, subscriber!"}'
Correlation: 'reply-to-3' on message '{"message":"hello, subscriber!"}'
Correlation: 'reply-to-4' on message '{"message":"hello, subscriber!"}'
Correlation: 'reply-to-5' on message '{"message":"hello, subscriber!"}'
Correlation: 'reply-to-6' on message '{"message":"hello, subscriber!"}'
Correlation: 'reply-to-7' on message '{"message":"hello, subscriber!"}'
Correlation: 'reply-to-8' on message '{"message":"hello, subscriber!"}'
Correlation: 'reply-to-9' on message '{"message":"hello, subscriber!"}'

And, once the subscriber replies to the messages, the publisher produces this output:

Got a response {"response":"hi, world!"} on topic reply-to-1
Got a response {"response":"hi, world!"} on topic reply-to-2
Got a response {"response":"hi, world!"} on topic reply-to-3
Got a response {"response":"hi, world!"} on topic reply-to-4
Got a response {"response":"hi, world!"} on topic reply-to-5
Got a response {"response":"hi, world!"} on topic reply-to-6
Got a response {"response":"hi, world!"} on topic reply-to-7
Got a response {"response":"hi, world!"} on topic reply-to-8
Got a response {"response":"hi, world!"} on topic reply-to-9

It’s just that simple.

Are there provisos? Caveats? Quid pro quos?

No! Absolutely not! It’s just that simple. Well, most of the time, anyway.

There are a few things you should know about how AMPS treats CorrelationId. These mostly boil down to a simple principle of “do the right thing when the right thing is clear, otherwise do nothing”.

  • For SOW records, the CorrelationId of the record is the CorrelationId of the most recent message that updated the record. Exactly what you would get if the CorrelationId were a field in the SOW. What if there’s no CorrelationId on that SOW record? Then AMPS doesn’t provide one – the message from AMPS doesn’t include that header.

  • For transaction log replay, AMPS includes the CorrelationId on the replayed messages. To make this possible, AMPS stores the CorrelationId for a message in the transaction log when the message includes a CorrelationId.

  • For delta publish, the CorrelationId of the record is updated if there’s a CorrelationId on the update. Otherwise, AMPS preserves the existing CorrelationId.

  • For delta subscribe, AMPS provides the CorrelationId on the message if the record in the SOW has a CorrelationId, or if the new publish adds one. Otherwise, no CorrelationId. This is the same principle as SOW records.

  • For views (including JOINs and aggregations), AMPS never provides a CorrelationId. In this case, the results of the view might come from messages that have different CorrelationId values. Which one is the best? Who’s right and who’s wrong? How can AMPS choose a favorite? There’s no good answer, so AMPS doesn’t provide a CorrelationId.

  • For out-of-focus messages, the CorrelationID AMPS provides depends on why the message went out of focus. If the message has gone out of focus because it no longer matches the subscription, AMPS provides the CorrelationID of the update. If the message has been deleted, or the subscriber is no longer entitled to see the new message, AMPS provides the CorrelationID of the previous message.

  • For topics where AMPS creates messages, such as /AMPS/ClientStatus, AMPS never provides a CorrelationId. Since AMPS doesn’t use the contents of the CorrelationId at all, there’s no good answer for how to fill those in.

Keep it All Together

That’s the story of CorrelationId. It’s a simple feature that does one thing, only one thing, and does it well.

But what will you do with it? How do you use it to keep track of messages and get them where they’re going? That’s up to you. Because AMPS doesn’t change the CorrelationId, you can put whatever routing or processing information you need to in that field. The sky’s the limit1!


[1] Within some common sense guidelines. The CorrelationId is part of the message headers sent to AMPS and is stored as part of a SOW record. The precise limits for those depend on your configuration and the data involved, but there’s no special limit on a CorrelationId.</p>


Read Next:   Yuck! Stateless Poison Message Handling