How To Refactor to Microservices using Queue Runners to Scale

How To Refactor to queue runner microservices using both SQL and Message Queues. This is an example demonstrating how to refactor code from a traditional app to use microservices by using a SQL database and RabbitMQ Queue Runner microservice in order to manage peak loads and scale up and out.

The Need for Speed: Refactoring to Microservices by delegating less time critical tasks using queue runners to deliver tasks outside a main customer touch point allows higher throughput, improved customer experience and better use of resources.

There are many reasons to use Microservices, including:

  • Increase throughput;
  • Improved user experience;
  • Improved handling of peak times;
  • Richer and more complex functionality;
  • Improve code management/debugging;
  • Increase management and control;
  • Increase reliability;
  • Ability to scale.

By way of analogy, to travel faster take a train, to travel faster still take a plane.

The Plot

Why refactor to use a Queue Runner?

Most projects start small, with limited budgets and tight time scales.

Unless you've got a particularly generous budget and an insightful crystal ball, most projects start small and with modest aims, they deliver as much as they can, and they work!

Then the fun starts... After successful proof of concept and enough clients to move forward, refractoring is a great way to build on existing work with minimal effort. Microservices are a great way to reliably deliver much more, and at scale.

This article demonstrates refactoring to use a queue runner microservice by collecting some user information from a website, then offloading delivery of an email to another service. By offloading the mail delivery task, the main client touch point can be more responsive (faster) and process more requests (quantity). The mail delivery task can be delivered on a separate box and/or at a lower priority, and/or at a more convenient time. This article offers three examples, the first is our starting point, a traditional app that does everything in a single process, the second makes a note of the email task by recording it in a database for later processing, and the third example uses a message queue to record and process the task at scale. For simplicity, this article and associated code has almost no input validation, error checking or tests.

For ease of understanding, this article uses Python's Django, the web framework for perfectionists with deadlines, plus RabbitMQ and Pika for the message queue, it can be easily understood and translated into your favourite framework. In addition to a brief list of advantages and disadvantages, there is a 'What's Next' that offers suggestions on how to further improve queue runners.  Although Django is thread safe, threading is not required to scale and is left as an exercise for the reader. For your convenience, the code is available on Github.

Why is it called a Queue runner? A Runner is a television term for a junior member of staff who runs errands, (US Gopher). In software, a Queue Runner does resource intensive tasks off the main stage allowing the main process such as a user interface to increase focus and responsiveness. https://en.wikipedia.org/wiki/Television_crew#Runner

Episode ONE - The Pilot

Episode ONE - The Pilot

The scene is a user enticed to request information on their favourite fruit. After entering their name, email and preference, they are thanked and a follow up email is sent with lots of goodies and exciting offers. In this episode, everything happens in real time. It Works!

Django, like many alternatives, is great for delivering websites quickly and easily. You don't need to know the framework to understand these examples but if you haven't come across it before or would like more information there is a great tutorial at the Django project.

Basic input form

We are only interested in a name, an email and a preference, so our example has a model with just a name, email and choice. This form is simple in order to demonstrate how to refactor one aspect. When refactoring in real life, it's normal to incorporate changes, in our examples we will be keeping things the same to highlight the changes due to refactoring to SQL and message queues. The example code uses the same templates for form and email demonstrating user interfaces and outputs don't change other than to show which form is used, basic, sql or message queue.

class Preference(models.Model):
    """Model to capture a fruit preference"""
    name = models.CharField(max_length=75)
    email = models.EmailField(max_length=250)
    fruit = models.CharField(max_length=2, choices=FRUIT_CHOICES, default=FRUIT_PLEASE_CHOOSE)

Django normally adds an id so sql to create the table might look like this:

CREATE TABLE refactortoscale_preference (
    id integer NOT NULL,
    name varchar(75) NOT NULL,
    email varchar(100) NOT NULL,
    fruit varchar(2) NOT NULL,
    PRIMARY KEY (id)
);

The web app offers a form and when submitted by the user, confirms the request and sends an email. The Django view looks like this:

def submitbasic(request):
    """Basic submit page"""
    context = {
            "type": "basic",
            "choices": FRUIT_CHOICES,
    }
    if request.method == 'POST': # If the form has been submitted...
        try:
            p = Preference()
            p.name = request.POST['name']
            p.email = request.POST['email']
            p.fruit = request.POST['fruit']
            p.save()
            p.sendemail()
            request.session['preference_id'] = p.id
        except:
            msg_error = u'Something went wrong saving basic preference'
            logger.error(msg_error)
            messages.add_message(request, messages.ERROR, msg_error)
            a = ActivityLog(pref=self, message=msg_error)
            a.save()
            return render(request, 'error.html', context)
        return HttpResponseRedirect(reverse('thanks'))
    return render(request, 'refactortoscale/choices.html', context)

As you can see it does what is required, and is fairly straightforward. There is little error, no input checking, and the code to send the email is packaged up in the model, the details aren't important as there are many examples on how to send an email.

At it's first airing, all went smoothly and everyone is happy.

What have we achieved?

We have designed and delivered a typical web app. It's simple, straightforward and works. It's suitable for most low to medium traffic websites.

What are the advantages?

  • Straightforward
  • Easy to maintain

Disadvantages

  • Limited scalability.
Episode TWO - SQL Runner

Episode TWO - SQL Queue Runner

Everyone liked the first episode, with great reviews and added publicity, expectations for this episode are high. Product placement, glitzy offers, and sponsors wanting more!

This episode needs a slicker way to capture more opportunities. Although the website creaked last time, it coped. This time it'll have significantly more people eagerly submitting their choices and they will want to enter their preference quickly during the ad break and be back glued to the screen asap for more.

Easily done, just make a note and send the email as soon as resources allow. the time consuming task of generating and sending an email can be done a few minutes later.

There are various ways of capturing this task. As the requirement is simply to record a preference and send an email, a separate table/model is created for the task. We want to record all activity for the user as there will be other activities later on so the event will be recorded in an activity log. The task queue and activity log are as follows:

class SqlQueue(models.Model):
    """Model for feeding SQL queue runners"""
    pref = models.ForeignKey(Preference)
    status = models.SmallIntegerField(default=STATUS_TODO)
    queueid = models.SmallIntegerField(default=0)

class ActivityLog(models.Model):
    """Model for recording events"""
    pref = models.ForeignKey(Preference)
    message = models.TextField()

The change to the user form processing is minimal, simply removing the part that sends the email and adding a record to the SqlQueue table/model of the person to whom the email should be sent.

This small change makes a large difference to speed as generating an email and sending it at scale is non trivial. The code follows:

def submitsql(request):
    """SQL submit page"""
    context = {
            "type": "sql",
            "choices": FRUIT_CHOICES,
    }
    if request.method == 'POST': # If the form has been submitted...
        p = None
        try:
            # save preference
            p = Preference()
            p.name = request.POST['name']
            p.email = request.POST['email']
            p.fruit = request.POST['fruit']
            p.save()
            request.session['preference_id'] = p.id
            # Add a task to the Sql queue to send a confirmation email
            q = SqlQueue()
            q.pref = p
            q.save()
        except:
            msg_error = u'Something went wrong saving the sql queue preference task'
            logger.error(msg_error)
            messages.add_message(request, messages.ERROR, msg_error)
            a = ActivityLog(pref=p, message=msg_error)
            a.save()
            return render(request, 'error.html', context)
        return HttpResponseRedirect(reverse('thanks'))
    return render(request, 'refactortoscale/choices.html', context)

Now for the queue runner that sends the email.

A useful feature with Django is the ability to add management commands. We will call ours sqlqueuerunner.

The requirements are to go through a list of tasks from the SqlQueue table/model, send an email, and record a log of the event in an activity log.

In addition to its primary task, the queue runner needs to be stopped without repeating or losing tasks. This is achieved by capturing any SIGINT or SIGKILL signals, completing the current task, then stopping. The part of the code that does the work and sends an email follows:

    def handle(self, *args, **options):
        """Fetch the first available task, then repeat until done"""
        while self.continue_running:
            # Get a batch of available tasks
            msg_error = u'Fetching next batch of tasks'
            logger.debug(msg_error)
            # The following is suitable for a single queue runner
            q = SqlQueue.objects.filter(status=STATUS_TODO)[:10]
            # The following is suitable for databases that support
            # select_for_update(skip_locked=True) such as PostgreSQL and Oracle
            # and allow for multiple queue runners
            #q = SqlQueue.objects.select_for_update(
            #        skip_locked=True).filter(
            #        status=STATUS_TODO)[:10]
            if len(q) == 0:
                # If there is nothing to process,
                # then be kind and pause for a few seconds
                sleep(30)
            for task in q:
                if not self.continue_running:
                    self.stdout.write('Ok, stopping now')
                    return 0
                # Update this task to show we are working on it
                # update status = busy
                task.status = STATUS_INPROGRESS
                task.save()
                pref = Preference.objects.get(pk=task.pref.id)
                try:
                    pref.sendemail()
                    msg_error = u'Fruit email sent'
                    logger.debug(msg_error)
                    a = ActivityLog(pref=pref, message=msg_error)
                    a.save()
                    task.status = STATUS_COMPLETE
                    task.save()
                except:
                    msg_error = u'Something went wrong sending sql email'
                    logger.error(msg_error)
                    task.status = STATUS_ERROR
                    task.save()
                    a = ActivityLog(pref=pref, message=msg_error)
                    a.save()

By separating sending emails into a distinct task it's possible to increase thoughput.  Sadly not all databases are equal, although it's easy to increase the number of front end web services because they just add new records, not all databases have SELECT ... FOR UPDATE and SKIP LOCKED or in Django select_for_update() limiting their scalability; as of version 1.11, the postgresql, oracle, and mysql database backends support select_for_update(). However, MySQL doesn’t support skip_locked arguments limiting the number of queue runners that can be run in parallel.

By keeping management command line functions and web services together in one application, the various loosely coupled elements are automatically in sync, thus increasing re-usability and reducing maintenance.

Separating out and running the task that sends an email, it's feasible to be more elaborate, include more graphics, include more live information than before without impacting the user experience.

Episode TWO is a success, easily coped with increased workload, fast response, and both users and sponsors are happy.

What have we achieved with our SQL queue runner?

We have improved the user experience by making it faster and increased throughput so it can both scale and accommodate usage peaks. The app is now suitable for large websites where traffic volume and speed are important.

Advantages

  • Better able to handle peak loads;
  • Scalable (database dependent);
  • Relatively straightforward;
  • No additional dependencies;
  • Relatively Low maintenance;
  • Offloads slow processing tasks to another time or server.

Disadvantages

  • Additional tasks/processes to maintain;
  • Limited scalability depending on database support.

What's Next

  • Monitor time on queue;
  • Improve delivery/non delivery management;
  • Increase delivery processes/threads.
Episode THREE - Message Queue Runner

Episode THREE - Message Queue Runners

The Wow factor kicked in. Everyone is happy and scaling like mad, with so much added interest and going nationally, continental, worldwide [choose your up-scale here], there's a need to do more and faster - that means scale up and out.

Adding more front end web services is OK, it's the number of additional microservices / Queue Runners. Thankfully RabbitMQ comes to the rescue and it's easy to do. In 'Real Life' more would change, but for the sake of this article, everything else is staying the same. Same information, same email, same outcome, just scalable with a message queue.

In episode THREE - Message Queue Runners, the website will queue the id of the user preference, and the runner will use this to send an email. If your preference is for Functional programming, the queue could have included information such as the Django view context or JSON encoded information to be processed. Pika, the module that connects to RabbitMQ supports various connectors, for simplicity and perfectly adequate, this article demonstrates BlockingConnection.

As you can see from the following code, there is a lot more of it to accommodate pika the message queue module as it sends the task to an external queue instead of a database. Although the overhead is slightly more than entering a database record, it's still minimal compared to the task of sending an email.

def submitmq(request):
    """Message queue submit page"""
    context = {
            "type": "mq",
            "choices": FRUIT_CHOICES,
    }
    if request.method == 'POST': # If the form has been submitted...
        p = None
        try:
            # save preference
            p = Preference()
            p.name = request.POST['name']
            p.email = request.POST['email']
            p.fruit = request.POST['fruit']
            p.save()
            # Save the newly created preference in the session
            # so it can be used again
            request.session['preference_id'] = p.id
        except:
            msg_error = u'Something went wrong saving the message queue preference task'
            logger.error(msg_error)
            messages.add_message(request, messages.ERROR, msg_error)
            a = ActivityLog(pref=p, message=msg_error)
            a.save()
            return render(request, 'error.html', context)

        # Add a task to the message queue to send a confirmation email
        credentials = pika.credentials.PlainCredentials(
            settings.AMQP_USERNAME,
            settings.AMQP_PASSWORD
        )
        #        credentials=credentials,
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                host=settings.AMQP_HOST,
            )
        )

        try:
            channel = connection.channel()
            channel.queue_declare(queue=settings.AMQP_QUEUE, durable=True)
            channel.basic_publish(exchange='',
                routing_key=settings.AMQP_QUEUE,
                body=str(p.id),
                properties=pika.BasicProperties(
                    delivery_mode = 2, # make message persistent
                ), mandatory=True, immediate=False)
            logger.info("Queued message task for {} {} {}".format(p.id, p.name, p.email))
        except:
            msg_error = u'Something went wrong saving the message queue preference task'
            logger.error(msg_error)
            messages.add_message(request, messages.ERROR, msg_error)
            a = ActivityLog(pref=p, message=msg_error)
            a.save()
            return render(request, 'error.html', context)

        channel.close()
        connection.close()
        return HttpResponseRedirect(reverse('thanks'))
    return render(request, 'refactortoscale/choices.html', context)

Pika is comprehensive, the message queue handles acknowledgement, non delivery, and delivering messages. This makes the queue runner straightforward, it simple opens a connection, listens for messages, processes them, then acknowledges success so the message is removed from the queue. As many runners can be run as necessary. The runner code is in two parts, the first (handle()) sets up the environment, the second (consumer_callback()), listed first so the callback function can be defined and passed in the initialisation (handle()). consumer_callback() does all the work processes incoming messages.

    def consumer_callback(self, channel, method_frame, header_frame, body):
        """Process messages from the queue"""
        logger.debug(method_frame.delivery_tag)

        id = 0
        try:
            id = int(body)
            msg_error = "Successfully found message {}".format(id)
            logger.debug(message_error)
        except:
            message_error = "Unable to find message"
            logger.error(msg_error)

        if id:
            pref = Preference.objects.get(pk=id)
            try:
                pref.sendemail()
                msg_error = u'Fruit email sent'
                logger.debug(msg_error)
                a = ActivityLog(pref=pref, message=msg_error)
                a.save()
                channel.basic_ack(delivery_tag = method_frame.delivery_tag)
            except Exception as e:
                msg_error = u"Failed to send message: {}: {}".format(date_now, e)
                logger.error(msg_error)
                a = ActivityLog(pref=pref, message=msg_error)
                a.save()
                channel.basic_nack(delivery_tag=method_frame.delivery_tag)
            except:
                msg_error = u"Failed"
                logger.error(msg_error)
                a = ActivityLog(pref=pref, message=msg_error)
                a.save()
                channel.basic_nack(delivery_tag=method_frame.delivery_tag)

    def handle(self, *args, **options):
        """Initialise the queue and start accepting messages"""
        logger.debug("Initial program setup")

        credentials = pika.credentials.PlainCredentials(
            settings.AMQP_USERNAME,
            settings.AMQP_PASSWORD
        )
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                host=settings.AMQP_HOST,
                virtual_host='/',
            ),
        )
        self._channel = connection.channel()
        self._channel.basic_qos(prefetch_count=1)
        self._channel.queue_declare(queue=settings.AMQP_QUEUE, durable=True)

        logger.debug("Waiting for messages")
        consumer_tag = self._channel.basic_consume(
            self.consumer_callback,
            queue=settings.AMQP_QUEUE
        )
        logger.debug("Consumer tag: {}".format(consumer_tag))

        try:
            self._channel.start_consuming()
        except KeyboardInterrupt:
            self._channel.stop_consuming()
            requeued_messages = self._channel.cancel()
            msg_error = 'Requeued {} messages'.format(requeued_messages)
            logger.warning(msg_error)
        except InterruptedError:
            self._channel.stop_consuming()
            requeued_messages = self._channel.cancel()
            msg_error = 'Requeued {} messages'.format(requeued_messages)
            logger.warning(msg_error)
        connection.close()
        msg_error = 'Exited normally'
        logger.debug(msg_error)

Episode THREE is a runaway success, easily coped with significantly increased workload, fast response, both users and sponsors are happy and we're effortlessly going global.

What have we achieved with our Message Queue Runner?

We are achieving significant high peak throughput capturing the most from our opportunities, responses are handled quickly and effortlessly.

Significantly, the options available are flexible enough to accommodate both scale and change.

Advantages

  • Scalable
  • Flexible

Disadvantages

  • Adds additional dependencies
  • Adds management complexity
  • Adds application complexity

What's Next

  • Automate demand provision;
  • Profile and refactor.