Autoscaled Queue Workers

machines being created and incinerated
Image by Annie Ruygt

Fly.io converts Docker images to Micro VMs and runs them globally. Fly.io supports Laravel! Check us out.

Fly Machines let you programmatically create and manage fast micro VMs. They’re great for scaling! One thing you might want to scale is queue workers.

I made a thing™ that can do this, automatically.

The Goal

I wanted to do the least amount of work, but still make a resilient system.

This means threading the needle between complex clustering logic and something too-simple-to-actually-work. But then I remembered this is PHP, so “stupid but effective” is the way to go. Obviously, we use CRON.

The things I wanted this to do were:

  1. Have something do all of this work automatically
  2. Scale up Machines (VMs) that immediately start churning through queue jobs
  3. Allow for some number of “base” queue workers (always on and running)
  4. Logic to detect if we need to scale up
  5. The ability to scale down

Here’s how I did those things.

Doing the Work

I had Laravel control the scaling. It is PHP afterall, and if there’s one thing PHP devs love to do, it’s use PHP for literally everything. (I’m allowed to make fun of PHP devs because I’ve learned other languages). (That was sarcasm). (But really, other languages can be cool too).

The poor stooge in charge of all this is a Laravel console command. The actual command is here, but we’ll talk about the highlights below.

php artisan make:command --command=fly:work FlyWorkerCommand

I threw this command into the scheduler, so the almighty CRON can power it. Once a minute, this command runs and decides if we need to do some work.

// File: app/Console/Kernel.php
protected function schedule(Schedule $schedule): void
{
    $schedule->command('fly:work')
        ->onOneServer()
        ->everyMinute();
}

If you’re like Jack and need clarification, I used onOneServer() as well so this command only ever runs in one place, to remove race conditions. In my case I’m using Redis for session, cache, and queues. Using Redis for cache is what matters most here - Laravel uses the Cache system for locking. You need to use some central cache mechanism that all server instances read from to use onOneServer() effectively (redis, database, probably others but I’m not opening a browser tab to find out right now).

Machines to Run Queue Workers

We can run php artisan queue:listen for a queue worker since Laravel’s queue worker is just part of the framework. To run more workers, we just need to make more copies of our application VM!

I’m running my app on Fly.io, which means a Docker image was generated and pushed up to the Fly Registry. Fly uses that image to make a VM. We can tell Fly to make any number of VM’s anytime we want!

So we have the pieces needed to scale up workers. Any new queue worker Machine (VM) we create can re-use our existing image. All we need to do is tell it to run artisan queue:work instead of spin up the web server stuff.

To run the artisan queue:work command, we overwrite the Docker CMD used when starting a container (or VM in our case). This is standard “How to Fly” stuff - we do exactly this to run CRON and Queues on Fly.io.

All we’re adding on top of this is some logic to spin up multiple worker VM’s.

Creating a Machine

Let’s see what it looks like to actually create a Fly Machine that will run as a queue worker.

No need to prattle on about it, here’s some code sending an HTTP request to the Fly API:

$appName = "your-fly-app-name-here";

Http::withToken($this->api_key)
    ->asJson()
    ->acceptJson()
    ->baseUrl("https://api.machines.dev/v1/apps/".$appName)
    ->post('/machines', [
        "region" => env('FLY_REGION'),
        "config" => [
            "image" => env('FLY_IMAGE_REF'),
            "env" => ['foo' => 'bar',], 
            // false for base machines, true for scaled Machines
            "auto_destroy" => false, 
            "guest" => config('fly.vm'), // shared, 1 CPU, 1024M ram
            "init" => [
                // queue:listen on base worker Machines
                // queue:work --stop-on-empty for 
                //   scaled worker Machines
                "cmd" => ["php", "artisan", "queue:listen"],
            ],
            "metadata" => [
                "fly_laravel_queue_machine" => "base", // or "scaled"
            ],
            // restart for base only
            // else policy = "no" for scaled Machines
            "restart" => [ 
                "max_retries" => 3,
                "policy" => "on-failure",
            ],
        ],
    ]);

This is a pretty standard Machine-creation API call. There’s a few things to note!

First, I’ll assume this API call is sent within a Laravel app running in a Fly VM. This means environment variables FLY_REGION and FLY_IMAGE_REF are available. We make use of those to tell our new Machine to run in the same region and re-use the current Docker image, so it runs the worker Machines with the same code.

We also add some metadata to differentiate worker Machines (“base” and “scaled”) vs ones that are managed by flyctl (the web app and CRON Machines).

Finally the init section is used to tell the VM to run php artisan queue:listen instead of doing the default thing of spinning up a web server.

Our code will create n base Machines (configurable). Then we’ll (later) create some scaling logic to scale up past those “base” Machines if our queue passes some threshold we set.

When to Create the Always-On Base VM’s

The fly:work command does a bunch of work (duh). The first thing it does is find out what Machines exist within the current Fly app.

This then gives us the opportunity to see which Machines need to be created (if any).

Here’s some logic on creating “base” Machines - the queue workers we always want working no matter how many queue jobs are pending.

$api = new MachineApi(config('fly.app_name'), config('fly.api_key'));

// We pass MachinePool a list of all the Machines in our app
// MachinePool then does some filtering on API results for us
// to differentiate between web app, cron, base workers
// and scaled worker Machines
$pool = new MachinePool($api->listMachines());

// Destroy base workers if we have too many
if ($pool->baseMachines->count() > config('fly.min_workers')) {
    $getRidOf = $pool->baseMachines->count() - config('fly.min_workers');
    for($d=0; $d<$getRidOf; $d++) {
        $api->destroyMachine($pool->baseMachines[$d]['id']);
    }
}

// Create base workers if needed
if ($pool->baseMachines->count() < config('fly.min_workers')) {
    $neededMachines = config('fly.min_workers') - $pool->baseMachines->count();
    for($i=0; $i<$neededMachines; $i++) {
        $api->createMachine([...]);
    }
}

We get our state of the world (trusting in the API call to list Machines) and then decide if we need to delete base Machines or add base Machines.

We have logic to delete base machines if we change configuration, for example if we had 4 base workers and later decided we only need 2.

The actual code has a few more sanity checks for data integrity (read: a bunch of isset()-style checks), but nothing too crazy.

When to Scale Up

We need some logic telling us when to scale up. I made it so you can write your own scaling logic, and have a default scaling technique that is just dumb enough to work.

I decided I wanted to set a threshold of “queue jobs per machine”. Once that threshold gets too high, we’d scale up queue workers.

I started with a threshold of 10 queue jobs per worker. How’s that work?

Let’s say we suddenly have 100 queue jobs and 2 (base) workers - that’s 50 jobs per machine, well over our threshold! To get down to 10 jobs per worker, we’d need to scale up 8 additional workers (for a total of 10). We’ll then have 100 jobs spread over 10 machines, which gets us down to 10 jobs per worker.

Here’s the JobsPerWorker class, which does the math to decide how many extra worker Machines we should make:

<?php

namespace App\Fly\Scalers;

use App\Fly\MachinePool;
use Illuminate\Contracts\Queue\Factory;
use App\Fly\Scalers\ShouldScaleInterface;

class JobsPerWorker implements ShouldScaleInterface
{
    public function __construct(
        protected Factory $queue,
        protected MachinePool $pool,
        protected int $jobsPerMachine = 10
    ){}

    /**
     * @return int Number of Machines to create
     */
    public function shouldScale(
      ?string $connection=null,
      ?string $queue=null
    ): int
    {
        $totalManagedMachines = $this->pool->managedMachines->count();

        // If we don't have base machines yet (likely on first run)
        // then we don't do any scaling yet
        if($totalManagedMachines == 0) {
            return 0;
        }

        // jobs per machine = total job count / total worker machines
        $currentJobsPerMachine = 
            $this->queue->connection($connection)->size($queue) 
            / $totalManagedMachines;

        if ($currentJobsPerMachine > $this->jobsPerMachine) {
            // return the number of machines 
            // we should scale up by, if any
            return 
                (ceil($currentJobsPerMachine / $this->jobsPerMachine)) 
                  - $totalManagedMachines;
        }

        return 0;
    }
}

Laravel has a way for us to get the count of jobs on any given connection/queue. We use this to help us do some math!

The “managed machines” is the count of workers (base + scaled) we currently have. We scale up additional workers (if needed) if we meet the threshold of $currentJobsPerMachine / $totalManagedMachines (if that number is greater than 10, in our example).

Creating Scaled Queue Workers

The same (ish) API call we saw above can be used to create scaled workers. The main differences for “scaled” machines are that we use queue:work instead of queue:listen, and we set auto_destroy to true.

Why the difference?

The queue:listen command is meant to stay alive - it actually runs queue:work behind the scenes. On the other hand, queue:work is meant to be more short-lived and is able to just process 1+ jobs and then stop. We want it to stop, since that tells the VM to stop and autodestroy itself. Scaling down comes for free!

Here’s some of the logic from the fly:work command:

$connection = null; // default queue connection
$queue = ''; // default queue
$scalingClass = config('fly.scale_controller'); // e.g. JobPerWorker

// Result of our math from above
$numberMachinesNeeded = $pool
  ->getScaler(config('fly.scale_controller'))
  ->shouldScale($connection, $queue);

for($j=0; $j<$numberMachinesNeeded; $j++) {
    $api->createMachine([
        // I use config() in reality, instead of env() here
        "region" => config('fly.region'),
        "config" => [
            // yep, more config()
            "image" => config('fly.image'),
            // We can use use env from 
            // the web app VM!
            "env" => $pool->appMachines->first()['config']['env'], 
            "auto_destroy" => true, // true for scaled machines
            "guest" => config('fly.vm'),
            "init" => [
                "cmd" => [
                  "php", "artisan", 
                  "queue:work", "--stop-when-empty"
                ],
            ],
            "metadata" => [
                "fly_laravel_queue_machine" => "scaled", // vs "base"
            ],
            // restart is for base Machines only
            "restart" => [
                "policy" => "no",
            ],
        ],
    ]);
}

Nothing too crazy there! I moved the env('FLY_REGION') calls to a config/fly.php file so I could make config() calls - which helps for local dev.

I also showed how I got the env populated - I grabbed the environment set for the web application machines.

Finally I set a slightly different metadata, cmd, and restart policies for the scaled machine.

Fly.io ❤️ Laravel

Fly your servers close to your users—and marvel at the speed of close proximity. Deploy globally on Fly in minutes!

Deploy your Laravel app!  

Scaling Down

As I hinted above, I don’t have logic for scaling down workers! Instead, I let the Machines do that automatically for us.

Fly Machines can destroy themselves when whatever it’s running exits. The workers are running php artisan queue:work. We can add a handy flag to have it stop itself: --stop-when-empty.

This means when we’ve scaled up enough, and our queue becomes empty, the queue worker will stop, the Machine will exit, and destroy ourselves.

This actually isn’t perfect, but it’s pretty workable. There is a fun edge case of when a queue is never empty, but also doesn’t have enough jobs to warrant all those extra scaled-up workers.

Fixing that would involve a few potential things:

  1. A “sidecar” (second) process running in the scaled up queue workers that shuts it down after a certain period of time
  2. A max number of jobs the scaled workers are allowed to run before stopping (--max-jobs flag)
  3. Scaling logic that calculates a moving average of the number of pending jobs (the “queue depth”) and using that number against our configured threshold. Then it can decide to add or delete worker scaled worker VM’s.
    • This means adding scale-down logic, instead of letting scaled workers stop themselves

Option 3 is probably technically the best, but it’s more complicated(ish) to do properly - you need to store data somewhere and keep tabs on the number of jobs pending on every run to do the calculation. Idea number 2 is better than the original, and is not a terrible solution, but still not perfect. Idea number 1 is kinda annoying to setup tho, so I think I’m overall in favor of idea 3.

Either way, implementing that is an exercise for the reader (for now).