First Things First: Priority Queues

  Nov 13, 2019   |      Eric Mericle

amps queues priority queues introduction

dial labelled 'priority' with hand about to turn the dial

AMPS queues provide a simple way to distribute work across a group of consumers. By default, AMPS queues provide work in first-in-first-out fashion: that is, the oldest message in the queue is provided to subscribers first, then the next oldest, and so on. For some problems, though, it’s important that the most important work happen first, even if the most important message in the queue isn’t actually the oldest. For example, a monitoring system may want to log and analyze all events, but may want to process a critical alert immediately, even if there are hundreds of informational events ahead of that alert in the queue. Likewise, a compute grid may need to have time-critical requests processed before work that is less time-critical.

One approach for these problems would be to build a separate queue for the higher-priority messages, or to create dedicated “high priority” processors that use a content filter to retrieve only high-priority messages. Those approaches can work very well, but sometimes it’s important to have a solution that doesn’t require clients to be aware of the priority, or that allows a more flexible priority system rather than relying on a limited number of priority levels.

AMPS Priority Queues

With AMPS priority queues (available in 5.3.0 and higher releases), you can use the properties of the message to automatically set the priority of the message within the queue. To do this, you simply add a Priority to the configuration of the queue. The Priority tells AMPS how to calculate the priority of each message in the queue.

Unlike some other queueing systems, AMPS does not require the publisher to set an explicit priority (or even know that message delivery will be prioritized), and AMPS does not require a fixed set of priority levels or categories. This gives AMPS the ability to provide priority queues that are simple to set up, easy to use, and extremely flexible and adaptable as your application evolves. The details on how priority queues work are available in the Priority Queues section of the AMPS User Guide.

Here are a few examples.

A Basic Example

In the simplest case, creating a priority queue can be as easy as including the priority in the message, and using that field to set the delivery priority:

<Priority>/info/priority</Priority>

With a priority configuration like the one above, if you publish the following two messages to the AMPS queue, AMPS will deliver the second message from the queue before the first message:

{"message":1, "info":{"priority":1, "note":"Low priority message"}}

{"message":2, "info":{"priority":1000, "note":"High priority message"}}

Notice that, in AMPS, priority is always sorted so that the highest priority value is delivered first. (Even if you are used to considering “priority 1” or “priority 0” as meaning “highest priority”, because 1000 is greater than 1, AMPS delivers the message with priority 1000 first.

Complex Expressions

The Priority element can include any AMPS expression that produces an integer. This means that more complicated expressions are possible, and that, in many cases, you can use data that is already contained in the message to calculate the priority.

For example, you might want to process orders that have the largest total value before orders with lower values (as computed by the price of the item times the quantity of the item in the order). In this case, there’s no need for a publisher to add an extra field to the message, since all of the information necessary to calcualate the priority is already there.

<!-- added to a Queue configuration eleement -->
<Priority>(/price * /quantity) as /priority</Priority>

The format for the expression is the same format used for constructing fields. All of the functions that you can use in a standard AMPS filter are available. AMPS calculates the priority when it starts tracking the message for queue delivery (that is, when the message enters the queue), and delivers the message when all higher-priority messages have been delivered.

It’s also important to remember that there are no predetermined priority levels: there’s no need to determine ahead of time whether a message where /price * /quantity is 400 is a high-priority mesage, a medium-priority message, or a low-priority message. With AMPS, that message is higher priority than a message where /price * /quantity is 399, and lower priority than a message where /price * /quantity is 401. There’s no need for complicated configuration or arbitrary boundaries between priority levels.

Complete Example

With that background, here’s a complete example that includes a Queue definition, a publisher and a consumer.

Below is a full example of a Queue definition that has higher values of the /pri field in a JSON message prioritized over lower values:

<SOW>
    <Queue>
        <Name>q</Name>
        <MessageType>json</MessageType>
        <!-- simply use the /pri field of
             published messages -->
        <Priority>/pri</Priority>
    </Queue>
</SOW>

<!-- This example assumes that q is recorded in
     the transaction log for the instance -->

Next, we can show how the messages are published to the queue using the AMPS Python Client

import AMPS, random, json, time

COUNT = 10
client = AMPS.Client("pri-pub-%d" % os.getpid())
client.connect("tcp://localhost:9007/amps/json")
client.logon()

for i in xrange(COUNT):
  client.publish("q", json.dumps({"id":i, 
                                  "pri":random.randrange(5), 
                                  "ts":time.time()}))

In the example above, we are going to publish 10 messages to the priority queue we configured above q. Each message has a /pri field receiving a value between 0 and 4, and each message has a timestamp to help us track message order.

Consuming messages from the priority queue is identical to consuming regular queue messages. Simply subscribe to the queue topic, process, and ack the messages.

for msg in client.subscribe("q"):
    body = json.loads(msg.get_data())
    print "Message with priority: %s" % (body["pri"])
    msg.ack()

Notice that when you run this sample, AMPS returns messages in priority order (higher values of the pri field are returned first) rather than the publication order (which would show a lower id number first, with the values rising).

For example, the subscriber above might produce output such as the following:

Message with priority: 4
Message with priority: 4
Message with priority: 4
Message with priority: 3
Message with priority: 2
Message with priority: 2
Message with priority: 1
Message with priority: 1
Message with priority: 0
Message with priority: 0

(At this point, the program would block waiting for the next message from the priority queue.)

If publishers continued to publish messages, new messages would also be delivered in priority order.

Views on Queues

AMPS provides support for views that use a queue as the underlying topic. These views show messages that are currently available for delivery to a consumer. Adding a materialized view of a queue topic is a common approach to create monitoring over the contents of a queue.

In the following example, the view is configured to enumerate each of the distinct priority levels in the priority queue, along with the number of messages in the queue, and the oldest timestamp. This simple view configuration can be used to ascertain whether the priority queue is functioning properly.

<View>
    <Name>q-status</Name>
    <MessageType>json</MessageType>
    <UnderlyingTopic>q</UnderlyingTopic>
    <Grouping>
        <Field>/pri</Field>
    </Grouping>
    <Projection>
        <Field>/pri as /pri</Field>
        <Field>COUNT(/id) as /count</Field>
        <Field>MIN(/ts) as /oldest_ts</Field>
    </Projection>
</View>

Using the spark command that is bundled with AMPS, monitoring the state and progress of the priority queues becomes simple.

watch -n1 "~/spark sow -server localhost:9007 -type json \
                       -topic 'q-status' -orderby '/pri'"

After running the simple publish script above several times, the results of this command would look something like the following (although, of course, the actual output will reflect the messages currently in the queue, so the results will depend on how many times you run the publisher, what random priority values were assigned, and whether you have acknowledged messages from the queue).

{"count":22, "oldest_ts":1547650270.46372,"pri",0}
{"count":22, "oldest_ts":1547650270.46447,"pri",1}
{"count":22, "oldest_ts":1547650270.46455,"pri",2}
{"count":22, "oldest_ts":1547650270.46462,"pri",3}
{"count":22, "oldest_ts":1547650270.46492,"pri",4}
Total messages received: 5 (1666.67/s)

For an actual monitoring system, we would use a sow_and_subscribe command to receive the current state and process updates. For example purposes, we use watch at the command line and re-run the query from scratch every time because this makes the output easy to view in a Linux terminal.

Conclusion

AMPS provides a priority queue that is simultaneously familiar to existing priority queue semantics, yet provides the flexibility for queue semantics to go beyond merely assigning priority.

How will you use priority queues?


[Edit: 2019-11-14 Remove unused imports from python sample.]

[Edit: 2019-11-18 Various typo fixes.]

[Edit: 2019-11-19 Add sample output for queue section subscriber to make it clear that the output in the view section is for the watch/spark command, not the subscriber.]


Read Next:   Secure your AMPS instances with Kerberos