Link

odd-jobs logo

Odd Jobs: Full-featured Haskell job-queue

Last updated: 17 May, 2020

odd-jobs is Postgres-backed job-queue, written in Haskell. It builds upon the techniques used by Rails’ delayed_jobs.

When we started adopting Haskell at Vacation Labs, back in 2016, something like delayed_jobs was curiously missing from the Haskell eco-system. It’s not like there was nothing attempting to solve this problem, but we certainly felt that there was nothing appropriate for our use-case. Hence, we wrote odd-jobs to “scratch our own itch”

Table of Contents

  1. Getting started guide
  2. How does it work
  3. Deployment
  4. Admin UI
    1. Using the in-built admin UI
    2. Bundling the admin UI with an existing web-app
  5. Controlling concurrency
  6. Customising the job’s payload structure
  7. Making changes to the Haskell-type representing your job payload
  8. Structured logging
  9. Graceful shutdown
  10. Alerts and notifications
  11. Future enhancements, and known issues

Getting started guide

1. Create a table to store jobs

In this example, our jobs table will be called jobs_test

ghci> import Datasbe.PostgreSQL.Simple (connectPostgreSQL)
ghci> import OddJobs.Migrations
ghci> conn <- connectPostgreSQL "dbname=jobs_test user=jobs_test password=jobs_test host=localhost"
ghci> createJobTable conn "jobs_test"

2. Create a module for your job-runner

Ideally, this module should be compiled into a separate executable and should depend on your application’s library module. If you do not wish to deploy odd-jobs as an independent executable, you may embed it within your main application’s executable as well. This is described in deployment.

3. Set-up a Haskell type to represent your job-payload

  • Ideally, this data-type should be defined inside your application’s code and the module containing this type-definition should be part of the exposed-modules stanza.
  • To work with all the default settings provided by ‘OddJobs.ConfigBuilder’ this data-type should have a “tagged” JSON serialisation, i.e.:

    In case your JSON payload does not conform to this structure, please look at customising the job-payload’s structure.

  • In this example, we are blindly deriving ToJSON and FromJSON instances because the default behaviour of Aeson is to generate a tagged JSON as-per the example given above.

4. Write the core job-runner function

In this example, the core job-runner function is in the IO monad. In all probability, you application’s code will be in a custom monad, and not IO. Pleae refer to TODO, on how to work with custom monads.

5. Write the main function using OddJobs.Cli

6. Compile and start the Odd Jobs runner

$ stack install <your-package-name>:exe:odd-jobs-cli
$ odd-jobs-cli start --daemonize --web-ui-basic-auth=oddjobs --web-ui-basic-password=awesome

7. Enqueue some jobs from within your application’s code

ghci> import OddJobs.Job (createJob)
ghci> import Database.PostgreSQL.Simple
ghci> conn <- connectPostgreSQL "dbname=jobs_test user=jobs_test password=jobs_test host=localhost"
ghci> createJob conn $ SendWelcomeEmail 10
ghci> createJob conn $ SetupSampleData 10

8. Check-out the awesome web UI

Visit http://localhost:7777 (username=oddjobs / password=awesome as configured earlier).

9. Check-out the log file to see what Odd Jobs is doing

$ tail -f oddjobs.log

10. Finally, shutdown Odd Jobs gracefully

Please read graceful shutdown to know more.

$ odd-jobs-cli stop --timeout 65

How does it work

Job creation is a straighforward INSERT into the jobs table, specified by cfgTableName.

Running jobs requires a “job runner”, which, internally runs (and coordinates) two background threads:

  • One thread (called jobEventListener), which uses Postgres’ LISTEN/NOTIFY mechanism to be immediately notified every time a row is inserted into the jobs table. This allows jobs to be run as soon as they’re queued. But, it can’t run jobs scheduled for executing in the future.
  • Which is why another thread (called jobPoller) polls the jobs table every cfgPollingInterval seconds looking for jobs which match any of the following conditions:
    • jobRunAt is in the past AND state is queued or retry
    • OR, state is locked, but locked_at is cfgDefaultLockTimeout seconds in the past, which possibly means that the job-worker may have crashed

Every time a job is found, the following is done:

  • a new background thread is spawned in which the cfgJobRunner function provided by you (i.e. the library user) is invoked with the newly found job. (Caveat: This is a simplification, as cfgConcurrencyControl may prevent too many threads from being spawned simultaneously and overloading the machine).
  • the job’s state in the DB is changed to locked
  • the locked_by column is updated to record the hostname, PID, and threadId of the worker process
  • the locked_at column is updated to record the time at which job execution was started. If the job’s status is not changed to success, retry, or failed, within monitorLockTimeout seconds, then it will be considered as “crashed”, and will be picked-up again.

Deployment

Broadly speaking, you’ve got two options to deploy Odd Jobs, which are compared in the table below:

Scenario Separate Process 1 Bundled with your app 2
Scaling workloads independently: Ability to spin-up more servers for your main application OR Odd Jobs depending upon what the workload demands. Yes No
Managing workload spikes: Can a spike in jobs possibly cause a slowdown in your main application, and vice versa? No Yes
Independent deploys/upgrades: Does upgrading the job-runner require re-deploying your main application, and vice versa? Yes, if your application modules/packages are architected appropriately No. Also, re-deploying your main app might take longer as the bundled Odd Jobs will wait for a graceful shutdown before exiting.
Memory usage Possibly higher Possibly lower
Admin UI Enable via a command-line switch Write custom code to integrate into main application
  1. Compile the Odd Jobs runner as a separate binary, and run it in an independent process (separate from your main application’s process). OddJobs.Cli provides all the building blocks required to put together an independent binary.
  2. Bundle it within your main application’s binary and run it in the same process as your main application

Admin UI

Odd Jobs comes with a built-in admin UI that enables you to do the following:

  • See which jobs are currently running, by which machine + process + thread
  • View the the latest error of any job which is in the retry or failed state
  • See which jobs are scheduled to run in the future
  • Retry any job that has been permanently failed after max-attempts
  • Immediately run a job that is otherwise scheduled to run in the future
  • Cancel a job that is scheduled to run in the future
  • Filter by job-type
  • Filter by job-runner
  • View the payloads of each job (Related: Customising the job’s payload structure)

Using the in-built admin UI

If you are using OddJobs.Cli to deploy Odd Jobs as a separate binary, you can enable the admin UI by picking the authentication scheme for the UI:

  • Either pass --web-ui-basic-auth-user=<USER> and --web-ui-basic-auth-password=<PASS> at the command-line
  • Or, pass --web-ui-no-auth at the command-line

More details about command-line options are available at StartArgs

Bundling the admin UI with an existing web-app

Heads-up: This section of the documentation is incomplete. Please open an issue for the specific framework / library that you’re interested in.

If you’re bundling Odd Jobs into your main application, in all probability you’d want it to run on the same port as your main web-app.

  • TODO: General instructions
  • TODO: Specific instructions for Scotty
  • TODO: Specific instructions for Yesod
  • TODO: Specific instructions for Spock
  • TODO: Specific instructions for Servant

Controlling concurrency

Odd Jobs uses Haskell’s in-built forkIO threads to execute multiple jobs concurrently in the same OS process. This is unlike other job-queues, such as Delayed Jobs, or Resque, where you need to run multiple OS processes, because each job-runner can execute only one job at a time.

Compared to such job-queues Odd Jobs has the following advantages as a result of executing jobs in concurrent threads:

  • It potentially uses less memory because only one OS process needs to be running
  • Lesser number of process are polling the jobs table
  • It can scale-up / scale-down the number of threads easily depending upon the workload. With other queueing systems you need to pre-plan the amount of concurrency you need and run those many number of processes on your job machine - you waste resources if your workload is lower, and you cannot scale-up as easily when your workload is higher.

On the other hand, here are the disadvantages:

  • Firing too many concurrent jobs can potentially starve your job-machine of resources and can either slow it down to a crawl, or cause out-of-memory errors.
  • In case the single job-runner process crashes, it takes down all concurrently running jobs with it (although they will be retried after cfgDefaultLockTimeout seconds.

Therefore it is necessary to artificially limit the maximum concurrency of an Odd Jobs runner. This can be done via the cfgConcurrencyControl parameter:

  • Maximum concurrent threads: This is the easiest to set-up and tells Odd Jobs to run at most N threads per job-runner. How to pick this N depends a lot on what your job workload actually is. Is it CPU heavy? Is it network heavy? Is it acquiring DB connections (from your application’s DB pool) for a long time? In absence of such information, one can assume that acquiring a DB connection from your application’s DB pool is a natural constraint, and that you can use N = size of DB pool as a starting point.
  • Dynamic concurrency control: Trying to come up with a fixed number for maximum threads (N) is very hard and actually doesn’t utilise the power of running jobs in threads. If you pick N to be too low, you’re wasting resources. On the other hand, if you pick N to be too high, you might overload your machine. There’s another way, albeit currently experimental, where before spwaning a new job, Odd Jobs can run an IO Bool action you provide. If this action evalutes to True the job is picked for execution, else it is left for executing in the next poll, or will be picked up by some other job-runner. In this IO Bool action you can write logic to check the CPU-load, free-memory, or IOPS of the machine, and determine whether it is safe to execute another job concurently, without overloading the machine.
  • Unbounded concurrency: Generally speaking, this is not recommended because it can potentially cause your server to run out of resources.

Customising the job’s payload structure

Important Please ensure that the payload field in your jobs table is appropriately indexed, else filtering jobs by job-type could potentially be slow. In the example discussed below, a BTREE index on payload #>> '{payload,tag}' is strongly recommended

Through a number of default config params, Odd Jobs assumes that the Haskell-type representing your job payload, is serialised into the following JSON structure:

{ "tag": "..."
, "contents": ...
}

However, that may not always be the case. For example, in our application at Vacation Labs, our job payload looks like the following:

{
  "user_id": xx
, "account_id": xx
, "txn_source": "xx"
, "payload": 
  {
    "tag": "xx"
  , "contents": xx
  }
}

So, we use the following code snippet to let Odd Jobs know how to get the job-type of a given job:

import Control.Lens ((^?))
import Data.Aeson.Lens (key)
import Data.Maybe (fromMaybe)

let cfg = mkConfig jobLogger "jobs" jobPool jobRunner $ \x ->
          x { cfgJobType = \j -> fromMaybe "unknown" $ j ^? (key "payload") . (key "tag")
            , cfgJobTypeSql = "payload #>> '{payload,tag}'"
            }

Making changes to the Haskell-type representing your job payload

As your application evolves, so will the Haskell type representing your job-payload. Due to changes in your main application code, it is very common to perform one of the following operations:

  1. Add a new constructor to your sum-type: This scenario poses no problem and you can add a new constructor without any issues.
  2. Remove a constructor from your sum-type: If you have jobs scheduled in the future that use a constructur that has been removed, those jobs will fail (because they won’t get parsed into your Haskell job-type). Therefore, depending upon why you’re removing this constructor, you need to take one of the following actions:

    • Either, write a DB migration that removes the jobs related to this constructor from the DB
    • Or, wait for all jobs related to this constructor to be executed (you can use runJobNowIO to force a job to execute immediately)
  3. Alter a constructor by either changing its arguments or renaming the constructor itself:

    • Either, you don’t alter the constructor in the first place. Instead introduce a new constructor to handle the new code paths, and phase out the old constructor over the next few releases, before removing it completely.
    • Or, you write a DB migration to convert the data in the payload column for the rows that are related to the constructor.

Structured logging

If you’re using things like Kibana/LogStash, etc. you might want to directly log in JSON instead of plain-text strings. This allows you to build dashboards for Odd Jobs and track things like:

  • Successful jobs per minute
  • Failed/retried jobs per minute
  • Plot job-failure by job-type
  • and more…

Here’s an example of how we’re using this at Vacation Labs. In our production environment, we don’t use Kibana/LogStash, but standard rsyslog along with cee logging. rsyslog then inserts this cee/JSON into Postgres/TimescaleDB from where it is visualised using Grafana.

import OddJobs.Job
import OddJobs.ConfigBuilder (mkConfig, defaultJsonLogEvent)
import System.Posix.Syslog as Syslog -- from the `hsyslog` package
import qualified Data.ByteString.Lazy as BSL
import Foreign.C.String
import Data.Aeson as Aeson
import Data.ByteString.Unsafe (unsafeUseAsCStringLen)

jobLogger :: LogLevel -> LogEvent -> IO ()
jobLogger logLevel logEvent =
  unsafeUseAsCStringLen logStr $ \cStr -> do
    syslog Nothing sysLogLevel cStr
  where
    sysLogLevel = case logLevel of
      LevelDebug -> Syslog.Debug
      LevelInfo -> Syslog.Info
      LevelWarn -> Syslog.Warning
      LevelError -> Syslog.Error
      LevelOther _ -> Syslog.Notice

    logStr = 
      "@cee:" <> (BSL.toStrict $ Aeson.encode $ defaultJsonLogEvent logEvent)

main :: IO ()
main = withSysLog "odd-jobs" [] User $ do
  let cfg = mkConfig jobLogger "jobs" jobPool (MaxConcurrentJobs (2 * poolSize)) jobRunner Prelude.id

  -- use `cfg` here

Graceful shutdown

It is highly recommended to always shutdown the Odd Jobs runner gracefully. This can be achieved by sendnig a SIGINT signal to the runner process. Once the Odd Jobs runner receives a SIGINT it initiates a graceful shutdown:

  1. It stops picking new jobs for execution.
  2. It waits for at most cfgDefaultJobTimeout seconds for existing job threads to exit.
  3. It exits once all current job-threads have exited OR when cfgDefaultJobTimeout seconds have elapsed - whichever occurs earlier.

If you’ve used OddJobs.Cli to build your Odd Jobs runner, you should be able to initiate a graceful shutdown via the command-line. The cli accepts a --timeout options to allow you to shutdown earlier than cfgDefaultJobTimeout seconds.

odd-jobs-cli stop --timeout [seconds]

Even if you end-up force-killing the Odd Jobs runner (i.e. without graceful shutdown), any “crashed” jobs will get automatically retried after cfgDefaultJobTimeout seconds.

Alerts and notifications

Heads-up: This section of the documentation is incomplete. Please open an issue if you’re interested in finding out how to integrate Odd Jobs with error monitoring tools like Aibrake, Sentry, etc.

Future enhancements, and known issues

  • Complete a few sections in the docs
  • Kill a running job (which is different from cancelling a job that has not been picked-up yet)
  • Register job-runners with a “master” server
  • Shutdown (all?) job-runners from the admin UI
  • Integrate with snap
  • Integrate with yesod
  • Cron-like job-scheduling
  • Support for multi-step jobs?
  • Integrate with an SMTP library for delivering critical alerts directly via email?
  • Enhancements to admin UI:
    • Global search
    • Kill running job
    • View logs
    • Aggregated stats