Scaled-Out Batch Processing with AMPS Queue Barriers

  Dec 16, 2019   |      David Noor

amps queues priority queues barrier expressions introduction

runners at a starting line, waiting for the signal to startScaling out your data processing using AMPS queues allows you to dynamically adjust how many workers you apply to your data based on your needs and your computing resources. Larger orders coming in or more events to process? Just spin up more subscribers to your AMPS queue and let them take their share of the load. AMPS dynamically adjusts how many messages it gives to each consumer based on their available backlog and message processing rate, and your work gets done without needing costly reconfiguration or re-partitioning.

Scale-out with queues works great when each message is an independent unit of work. Each subscriber works independently. The subscribers work in competition with one another, each processing messages as fast as possible, acknowledging messages back to AMPS as quickly as possible, and receiving more messages as quickly as possible. AMPS includes features to intelligently deliver messages to consumers that are processing quickly, to avoid a consumer ever having to wait for work when work is available.

Some problems require coordination, however. In this article, we look at a typical situation where you want multiple consumers competing to process messages as fast as possible, but you also need them to just work on the same batch of data and to wait for each other to finish and move to the next batch in unison. We’ll examine a powerful new feature in AMPS queues, Barrier Expressions, that allows consumers to coordinate and work in concert to efficiently process data.

Starting Small

Let’s imagine that we run a very successful nationwide chain of spatula stores, Spatula City. Each of our 1200 stores transfers their individual sales and customer interaction data every night. We aim to process this data as quickly as possible so we can make real-time business decisions based on actual spatula trends. We use this data to update a global inventory management system, and then in aggregate, use the data for ever-more-advanced predictive analytics.

When our business was small we could get this done with a pretty simple program:

# Simple file processing example
# (before adding AMPS queues)
#
# For sample purposes, assume that
# spatula_utils includes an existing
# library for database connectivity,
# and so forth.

from spatula_utils import * 

import sys

database = Database()
db_conn = database.connect()


# For each line in each file, 
# the program does a nightly update.
# After every file is processed,
# predictive analytics start.

processed_count = 0
for daily_file in sys.argv[1:]:
    f = open(daily_file)
    for line in f.xreadlines():
        clean_data = preprocess(line)
        json_data = convert_to_json(clean_data)
        update_inventory_system(json_data)
        db_conn.insert(json_data)
        processed_count += 1
        if processed_count % 100 == 0: print processed_count
print "Done for the day! Starting predictive analytics."
run_predictive_analytics(db_conn)

As we processed each row of data from each store, we’d take a moment to update our inventory control system. Once the whole day’s set of files was processed, we’d run our larger analytics.

Batches Get Bigger

This system works great when all of the work can be done by a single process in a reasonable amount of time. As our analytics become more advanced, however, we need to get through our file loading much more quickly. And as the number of stores and their sales volume increases (everyone needs spatulas!), our single process for file loading desperately needs scale.

AMPS message queues to the rescue! Let’s configure and build an AMPS message queue and publisher to help us distribute this workload efficiently:

AMPS Configuration File

Here’s a simple configuration file to distribute work over a set of subscribers:

<AMPSConfig>
  <SOW>
    <Queue>
      <Name>NIGHTLY_SALES_DATA</Name>
      <MessageType>json</MessageType>
      <Expiration>24h</Expiration>
    </Queue>
  </SOW>
  ...
  <!-- NIGHTLY_SALES_DATA is also
       recorded in the transaction log -->
</AMPSConfig>

In the AMPS configuration file, we’ve created a single message queue that will be used to distribute our data in a JSON format. Any number of consumers can connect to this instance and subscribe to the queue to become consumers. This message queue will automatically be configured with “at-least-once” semantics, meaning that a message will remain in the queue until it has been delivered to a consumer and acknowledged. Now let’s examine the code for our publisher and consumers:

Publisher

As mentioned above, the publisher reads a set of files and publishes information from each line of the file to AMPS, for a consumer to process:

# Simple AMPS queue publisher
# example.

# Assume that spatula_utils has
# database classes and so on.

from spatula_utils import * 

import sys, os
import AMPS

database = Database()
db_conn = database.connect()

client = AMPS.Client("publisher-%d"%os.getpid())
client.connect(...)
client.logon()

# For each file, publish the work
# to be processed to the AMPS queue

processed_count = 0
for daily_file in sys.argv[1:]:
    f = open(daily_file)
    for line in f.xreadlines():
        clean_data = preprocess(line)
        if processed_count % 100 == 0: print processed_count
        client.publish("NIGHTLY_SALES_DATA", clean_data)
        processed_count += 1

## Wait until everything is processed...
## ... but how long should we wait?
time.sleep(...)

print "Done for the day! Starting predictive analytics."
run_predictive_analytics(db_conn)

Consumer

The consumer is just as simple as the publisher. For each message from AMPS, the consumer processes the message, and then acknowledges to the server that the work is done. (We are not tuning queue delivery for maximum performance here, just showing the simplest way to convert the existing process.)

# Read from an AMPS queue and
# process sales data.

# Assume that spatula_utils has
# database classes and so on.

from spatula_utils import * 

import sys, os, socket
import AMPS

database = Database()
db_conn = database.connect()

hostname = socket.gethostname()
pid = os.getpid()
unique_client_name = "consumer-%s-%d" % (hostname, pid)
client = AMPS.Client(unique_client_name)
client.connect(...)
client.logon()
print "%s: connected" % unique_client_name


processed_count = 0
for message in client.subscribe("NIGHTLY_SALES_DATA"):
    data = message.get_data()
    json_data = json.loads(data)
    update_inventory_system(json_data)
    db_conn.insert(json_data)
    message.ack() # Tell the server the message is complete
    processed_count += 1
    if processed_count % 100 == 0:
        print "%s: %d" % (unique_client_name, processed_count)

    # This should exit if nightly processing is done...
    # ... but how do we know for sure?

With just a few lines of code, we’re able to convert our single-threaded script into a fully-distributed system based on AMPS queues and scale our work out over any number of processors and hosts. We can now increase processing scale just by running another instance of the consumer. But we have one critical, unsolved problem: how do we know when we’re done?

Endings Can be Difficult

Here’s what we know: all of the data will eventually be processed and loaded into the database. Even if a processor fails midway through, AMPS will redeliver that work to another processor, until every message in the queue is processed.

What we don’t know, though, is when the day’s worth of data is finished being loaded into the database. Each consumer runs independently, and doesn’t need to know whether the other consumers are finished. In fact, that’s the whole point: we can add as many consumers as we need to just by starting up another process. They don’t need to do any sort of coordination: AMPS queues automatically handle message distribution.

In our publisher, we need a way to know once all of our messages are fully processed since we need to run analytics at the end. In our consumers, we’d like to be able to exit gracefully once the batch is completed.

This problem is harder than it seems for a few reasons:

  • If the publisher were to wait for the queue to be completely drained, what happens if a consumer fails and returns a message back to the queue?
  • Maybe every consumer could somehow keep a separate topic up-to-date with every message that’s been processed. But how would that scale, and what if there’s a failure writing to that topic?
  • What if publishers want to start writing the next day’s data before the previous day is finished? You wouldn’t want consumers to march ahead into the next day’s data until the previous day is done. (That’s not a problem that Spatula City has now, but we have big dreams!)

These aren’t new problems, and we’ve seen all sorts of creative ways to signal the completion of a batch. Some are more failure-prone than others, and all of them require thinking about the myriad ways a job might get stuck or might be seen as completed when it’s really not. Some solutions require coordinating between consumers, or require a publisher to absolutely know in advance how much work there is to do, or involve guessing based on probability (“if no new messages have shown up to be processed in 60 seconds, I guess that must mean the publisher is done”).

Wouldn’t it be great if there was a way to clearly communicate to all consumers when messages up to a certain point had been fully processed, and to keep the next day’s messages from being processed until the current one is finished?

Introducing Queue Barriers

Consumers Receive Everything up to the Barrier Message. Nothing after the barrier message is eligible for delivery.

AMPS (starting with 5.3.1) has a unique, built-in way to solve problems like these. AMPS will identify specific messages in a message queue as barrier messages based on whether the message matches a BarrierExpression you configure. A barrier message won’t be delivered until all prior messages are delivered and acknowledged. Also, unlike typical queue messages (which are delivered to a single subscriber), when the barrier message is delivered, the barrier message will be delivered to all subscribers to the queue (provided their entitlements and content filter match, of course). Messages after a barrier message will not be delivered to any consumer until the barrier message has been released and delivered.

When the barrier message is released (becaues all previous messages are processed), the barrier message is delivered to all subscribers, and normal queue delivery begins with the message after the barrier message.

Let’s use a barrier message in our file-loading application to coordinate and scale-out our post-processing analytics! In our config file, we add one line of configuration to indicate how AMPS should determine if a message is a barrier message. We’ll simply add an “is_eof” field to the message when it demarcates the end of the day:

<AMPSConfig>
  <SOW>
    <Queue>
      <Name>NIGHTLY_SALES_DATA</Name>
      <MessageType>json</MessageType>
      <Expiration>24h</Expiration>
      <!-- Added BarrierExpression: 
           any message with is_eof = 1
           will be considered a barrier message -->
      <BarrierExpression>/is_eof = 1</BarrierExpression>
    </Queue>
  </SOW>
  ...
  <!-- NIGHTLY_SALES_DATA is also
       recorded in the transaction log -->
</AMPSConfig>

In our publisher, we write a message that matches the BarrierExpression by including an “is_eof” when we’re done publishing the day’s spatula data. Unlike all of the other messages we’ve written, this message will be delivered to all consumers, and it will only be delivered once all of the previous messages we wrote are completely processed. This gives us a built-in way to know for sure that the day’s work has been processed by the consumers.

We alter our publisher to also consume our EOF message so that it knows when to run the analytics step:

# More effective AMPS queue publisher
# example. This uses the BarrierExpression
# configured for the queue to know
# when consumers have finished work.

# Assume that spatula_utils has
# database classes and so on.

from spatula_utils import * 

import os, sys
import AMPS

database = Database()
db_conn = database.connect()

client = AMPS.Client("publisher-%d"%os.getpid())
client.connect(...)
client.logon()

processed_count = 0
for daily_file in sys.argv[1:]:
    f = open(daily_file)
    for line in f.xreadlines():
        clean_data = preprocess(line)
        if processed_count % 100 == 0: print processed_count
        client.publish("NIGHTLY_SALES_DATA", clean_data)
        processed_count += 1

# Publish EOF, then wait for it to be sent back to
# us indicating all prior messages are consumed
# The subscription uses a content filter so only
# the EOF matches
eof_stream = client.subscribe("NIGHTLY_SALES_DATA", "/is_eof = 1")
client.publish("NIGHTLY_SALES_DATA", '{"is_eof": 1}')
print "Waiting for consumers to finish..."

# Wait for the EOF
for message in eof_stream:
    break

print "Done for the day! Starting predictive analytics."
run_predictive_analytics(db_conn)

Great! Our publisher knows when all of the data it’s written has been fully processed. Let’s modify our consumer to look for this EOF message and use that message to terminate:

# Read from an AMPS queue and
# process sales data. This version
# uses a queue barrier to know when
# processing for the day is complete.

# Assume that spatula_utils has
# database classes and so on.

from spatula_utils import *

import sys, os, socket
import AMPS

database = Database()
db_conn = database.connect()

hostname = socket.gethostname()
pid = os.getpid()
unique_client_name = "consumer-%s-%d" % (hostname, pid)
client = AMPS.Client(unique_client_name)
client.connect(...)
client.logon()
print "%s: connected" % unique_client_name

processed_count = 0
for message in client.subscribe("NIGHTLY_SALES_DATA"):
    data = message.get_data()
    json_data = json.loads(data)

    # When we see the barrier message, we can exit
    # because there's nothing left to process.
    if "is_eof" in json_data:
      break

    update_inventory_system(json_data)
    db_conn.insert(json_data)
    message.ack()
    processed_count += 1
    if processed_count % 100 == 0:
        print "%s: %d" % (unique_client_name, processed_count)

print "%s: EOF received, exiting" % unique_client_name

Once every non-barrier message has been consumed by a queue consumer, AMPS will send the single barrier message from our publisher to every consumer. The consumer does not need to alter its AMPS subscription to receive this message, and the consumer can exit as soon as it receives a message letting it know that the publisher has finished.

Conclusion

Batch processing remains an important part of many data scenarios. AMPS Message Queues let you scale-out processing of batches in a low-latency fashion by allowing you to treat the data in your batch as independent messages, and yet retain the semantics of a “batch” across queue consumers.


Read Next:   First Things First: Priority Queues