Metadata Magic with New AMPS Functions

  Jan 22, 2019   |      Dirk Myers

amps functions message headers

magician holding a magic top hat and wandFrom the beginning, AMPS has been content aware. Most AMPS applications use content filtering, and features like the State-of-the-World, delta messaging, aggregation, and message enrichment all depend on AMPS being able to parse and filter messages.

The key to content filtering and message manipulation is the AMPS expression language. The expression language provides a format-independent way of working with content, and is extensible with server-side plugin modules.

As useful as the expression language is, up until AMPS AMPS expressions could only refer to:

  • The contents of the message itself (using XPath-based identifiers), or
  • Context-free functions (like UNIX_TIMESTAMP())

These two types of functions are enough for working with data, but for monitoring and administration, it’s sometimes more important to be able to describe the message itself, rather than being limited to the content of the message. For example, it was difficult to filter a subscription to just publishes being submitted by a specific publisher or from a specific set of IP addresses or below a certain payload size.

Enter the client and message functions of AMPS! With these functions, you can write expressions that describe the message itself or the connection that published the message. These aren’t necessarily functions that every application needs. On the other hand, if you’ve ever tried to quickly find whether a message has been read from a SOW topic, or you want to get all the messages that a specific publisher is submitting, the new functions make magic possible!

In this post, I’ll show how to use some of these functions to answer some common requests. (Full descriptions of these functions are available in the AMPS User Guide.)

The post will focus on the following functions:

Function nameDescription
LAST_UPDATED()The last time a message in a SOW topic was published to.
MESSAGE_SIZE()The size, in bytes, of the data within a message.
REMOTE_ADDRESS()The address (typically IP address) of the connection executing the command.
USER()The authenticated user name of the user executing the command.

Finding Older Messages

One common request is to be able to find messages in a SOW topic that haven’t been updated for a certain interval of time. Before AMPS, being able to run this query would depend on adding a timestamp field to the data (typically through enrichment).

With AMPS (and later), though, we can use the LAST_UPDATED() function to access the last time the record was updated. With the UNIX_TIMESTAMP() function, it’s simple to write a query that finds records that haven’t been updated in the last five minutes. Using the spark utility, all we have to do is provide the filter UNIX_TIMESTAMP() - LAST_UPDATED() > 300.

$ spark sow -server myserver:port -type json -topic sampleTopic \
            -filter 'UNIX_TIMESTAMP() - LAST_UPDATED() > 300'

For each message in the topic, AMPS subtracts the timestamp for the last time the message was updated from the current timestamp. If the difference is more than 300 (300 seconds, that is, five minutes), the filter is true and AMPS returns the message.

A filter like this could also be used in a sow_delete command to find and remove messages that haven’t been active in a certain interval of time.

Query Versus Subscription

Notice that for the command above, I used sow rather than subscribe or sow_and_subscribe. There’s an important reason for this: AMPS evaluates filters against a message when a sow query runs, or when there’s a change to the message. That means that a filter like UNIX_TIMESTAMP() - LAST_UPDATED() > 300 is very useful for a query, but isn’t very useful for a subscription.

Here’s why: imagine a message hasn’t been updated in 298 seconds when someone issues a sow_and_subscribe with this filter. The message doesn’t match during the sow part of the command, so it isn’t returned with the query results. After 2 more seconds, the message would match the filter. However, AMPS doesn’t re-evaluate the message against the filter until the message changes, so the subscription doesn’t receive the message. (If someone does publish to the message, that will reset the LAST_UPDATED() value, and the message would no longer match.)

Message Provenance

In some applications, it’s very important to be able to reliably identify the source of a change. In versions prior to, the most commonly used pattern was to require each publisher to annotate changes with the user name of the person making the change, while using an entitlement filter to guarantee that the annotations matched the expected user name.

With, you can use the USER() function with enrichment to have AMPS directly annotate each change. This makes things simpler for publishers, and can also provide information that was not previously verifiable.

For example, the following simple topic annotates each publish with the authenticated user name of the connection submitting the publish and captures the address from which the publish originated.

         <Field>CONCAT(USER(), ' connected from ',
                AS /lastChangedBy</Field>

Since enrichment modifies the message before the message is written to the transaction log, the enrichment is also captured in the transaction log for an embedded audit trail.

Calculating Message Size Statistics

An important factor in capacity planning is understanding message sizes. Estimates before a system goes into production are helpful, but as a system comes online, more precise data is available and can be used to validate (or correct) the initial estimates. Likewise, for views, it can be inconvenient to persist the contents of the view to be able to estimate averages.

With the MESSAGE_SIZE() function and aggregated subscriptions, it’s easy to calculate metrics over a topic in the state of the world.

For example, the following spark query computes basic message size metrics for a SOW topic or view:

spark sow -topic test -server localhost:9007/amps/json -opts \
'projection=[sum(message_size()) as /totalSize,\
             max(message_size()) as /biggestMessage,\
             avg(message_size()) as /averageSize,\
             stddev_samp(message_size()) as /stdDev],\

Notice that, since we want to calculate a single value for the entire topic, the spark command deliberately sets grouping to a field that isn’t in the messages. The result is that all the messages in the topic are in the same group, and the metrics are computed over the entire topic.

Data Up Your Sleeve

Just as every good magic trick relies on skill and knowledge, using the new metadata functions well also requires some understanding of how AMPS works and what functions are available when.

The simple rule to follow is that a metadata function returns meaningful results if AMPS has the information to provide, and NULL otherwise.

For example, AMPS can only provide a LAST_UPDATED() value for messages that are in the State of the World. If there’s no State of the World (that is – AMPS isn’t persisting messages in a way that they can be queried), then there’s no way for AMPS to calculate LAST_UPDATED(). Likewise, since AMPS only assigns bookmarks to messages that are stored in the transaction log, the BOOKMARK() function only returns a value for that function for topics that are stored in the transaction log.

The AMPS User Guide lists the circumstances under which each of the functions returns a non-NULL value.

Tricks of the Trade

In this post, we’ve just scratched the surface of the functions available and the ways that those functions can be used.

Have a recipe that isn’t listed here? Know a great trick for monitoring AMPS with these functions, or have a cool technique that isn’t mentioned here? How will you use the new functions?

Let us know in the comments!

Read Next:   Managing Large Topics in the SOW