Odd Jobs: Full-featured Haskell job-queue
Last updated: 17 May, 2020
odd-jobs
is Postgres-backed job-queue, written in Haskell. It builds upon the techniques used by Rails’ delayed_jobs.
When we started adopting Haskell at Vacation Labs, back in 2016, something like delayed_jobs was curiously missing from the Haskell eco-system. It’s not like there was nothing attempting to solve this problem, but we certainly felt that there was nothing appropriate for our use-case. Hence, we wrote odd-jobs
to “scratch our own itch”
Table of Contents
- Getting started guide
- How does it work
- Deployment
- Admin UI
- Controlling concurrency
- Customising the job’s payload structure
- Making changes to the Haskell-type representing your job payload
- Structured logging
- Graceful shutdown
- Alerts and notifications
- Future enhancements, and known issues
Getting started guide
1. Create a table to store jobs
In this example, our jobs table will be called jobs_test
ghci> import Datasbe.PostgreSQL.Simple (connectPostgreSQL)
ghci> import OddJobs.Migrations
ghci> conn <- connectPostgreSQL "dbname=jobs_test user=jobs_test password=jobs_test host=localhost"
ghci> createJobTable conn "jobs_test"
2. Create a module for your job-runner
Ideally, this module should be compiled into a separate executable and should depend on your application’s library module. If you do not wish to deploy odd-jobs as an independent executable, you may embed it within your main application’s executable as well. This is described in deployment.
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE LambdaCase #-}
module OddJobsCliExample where
import OddJobs.Job (Job(..), ConcurrencyControl(..), Config(..), throwParsePayload)
import OddJobs.ConfigBuilder (mkConfig, withConnectionPool, defaultTimedLogger, defaultLogStr, defaultJobType)
import OddJobs.Cli (defaultMain)
-- Note: It is not necessary to use fast-logger. You can use any logging library
-- that can give you a logging function in the IO monad.
import System.Log.FastLogger(withTimedFastLogger, LogType'(..), defaultBufSize)
import System.Log.FastLogger.Date (newTimeCache, simpleTimeFormat)
import Data.Text (Text)
import Data.Aeson as Aeson
import GHC.Generics
-- This example is using these functions to introduce an artificial delay of a
-- few seconds in one of the jobs. Otherwise it is not really needed.
import OddJobs.Types (delaySeconds, Seconds(..))
3. Set-up a Haskell type to represent your job-payload
- Ideally, this data-type should be defined inside your application’s code and the module containing this type-definition should be part of the
exposed-modules
stanza. To work with all the default settings provided by ‘OddJobs.ConfigBuilder’ this data-type should have a “tagged” JSON serialisation, i.e.:
In case your JSON payload does not conform to this structure, please look at customising the job-payload’s structure.
In this example, we are blindly deriving
ToJSON
andFromJSON
instances because the default behaviour of Aeson is to generate a tagged JSON as-per the example given above.
data MyJob
= SendWelcomeEmail Int
| SendPasswordResetEmail Text
| SetupSampleData Int
deriving (Eq, Show, Generic, ToJSON, FromJSON)
4. Write the core job-runner function
In this example, the core job-runner function is in the IO
monad. In all probability, you application’s code will be in a custom monad, and not IO. Pleae refer to TODO, on how to work with custom monads.
myJobRunner :: Job -> IO ()
myJobRunner job = do
(throwParsePayload job) >>= \case
SendWelcomeEmail userId -> do
putStrLn $ "This should call the function that actually sends the welcome email. " <>
"\nWe are purposely waiting 60 seconds before completing this job so that graceful shutdown can be demonstrated."
delaySeconds (Seconds 60)
putStrLn "60 second wait is now over..."
SendPasswordResetEmail tkn ->
putStrLn "This should call the function that actually sends the password-reset email"
SetupSampleData userId -> do
Prelude.error "User onboarding is incomplete"
putStrLn "This should call the function that actually sets up sample data in a newly registered user's account"
5. Write the main function using OddJobs.Cli
main :: IO ()
main = do
defaultMain startJobMonitor
where
-- A callback-within-callback function. If the commands-line args contain a
-- `start` command, this function will be called. Once this function has
-- constructed the 'Config' (which requires setting up a logging function,
-- and a DB pool) it needs to execute the `callback` function that is passed
-- to it.
startJobMonitor callback =
-- a utility function provided by `OddJobs.ConfigBuilder` which ensures
-- that the DB pool is gracefully destroyed upon shutdown.
withConnectionPool (Left "dbname=jobs_test user=jobs_test password=jobs_test host=localhost")$ \dbPool -> do
-- Boilerplate code to setup a TimedFastLogger (from the fast-logger library)
tcache <- newTimeCache simpleTimeFormat
withTimedFastLogger tcache (LogFileNoRotate "oddjobs.log" defaultBufSize) $ \logger -> do
-- Using the default string-based logging provided by
-- `OddJobs.ConfigBuilder`. If you want to actually use
-- structured-logging you'll need to define your own logging function.
let jobLogger = defaultTimedLogger logger (defaultLogStr defaultJobType)
cfg = mkConfig jobLogger "jobs" dbPool (MaxConcurrentJobs 50) myJobRunner Prelude.id
-- Finally, executing the callback function that was passed to me...
callback cfg
6. Compile and start the Odd Jobs runner
$ stack install <your-package-name>:exe:odd-jobs-cli
$ odd-jobs-cli start --daemonize --web-ui-basic-auth=oddjobs --web-ui-basic-password=awesome
7. Enqueue some jobs from within your application’s code
ghci> import OddJobs.Job (createJob)
ghci> import Database.PostgreSQL.Simple
ghci> conn <- connectPostgreSQL "dbname=jobs_test user=jobs_test password=jobs_test host=localhost"
ghci> createJob conn $ SendWelcomeEmail 10
ghci> createJob conn $ SetupSampleData 10
8. Check-out the awesome web UI
Visit http://localhost:7777 (username=oddjobs
/ password=awesome
as configured earlier).
9. Check-out the log file to see what Odd Jobs is doing
$ tail -f oddjobs.log
10. Finally, shutdown Odd Jobs gracefully
Please read graceful shutdown to know more.
$ odd-jobs-cli stop --timeout 65
How does it work
Job creation is a straighforward INSERT
into the jobs table, specified by cfgTableName
.
Running jobs requires a “job runner”, which, internally runs (and coordinates) two background threads:
- One thread (called
jobEventListener
), which uses Postgres’LISTEN/NOTIFY
mechanism to be immediately notified every time a row is inserted into the jobs table. This allows jobs to be run as soon as they’re queued. But, it can’t run jobs scheduled for executing in the future. - Which is why another thread (called
jobPoller
) polls the jobs table everycfgPollingInterval
seconds looking for jobs which match any of the following conditions:jobRunAt
is in the past ANDstate
isqueued
orretry
- OR,
state
islocked
, butlocked_at
iscfgDefaultLockTimeout
seconds in the past, which possibly means that the job-worker may have crashed
Every time a job is found, the following is done:
- a new background thread is spawned in which the
cfgJobRunner
function provided by you (i.e. the library user) is invoked with the newly found job. (Caveat: This is a simplification, ascfgConcurrencyControl
may prevent too many threads from being spawned simultaneously and overloading the machine). - the job’s state in the DB is changed to
locked
- the
locked_by
column is updated to record the hostname, PID, andthreadId
of the worker process - the
locked_at
column is updated to record the time at which job execution was started. If the job’s status is not changed tosuccess
,retry
, orfailed
, withinmonitorLockTimeout
seconds, then it will be considered as “crashed”, and will be picked-up again.
Deployment
Broadly speaking, you’ve got two options to deploy Odd Jobs, which are compared in the table below:
Scenario | Separate Process 1 | Bundled with your app 2 |
---|---|---|
Scaling workloads independently: Ability to spin-up more servers for your main application OR Odd Jobs depending upon what the workload demands. | Yes | No |
Managing workload spikes: Can a spike in jobs possibly cause a slowdown in your main application, and vice versa? | No | Yes |
Independent deploys/upgrades: Does upgrading the job-runner require re-deploying your main application, and vice versa? | Yes, if your application modules/packages are architected appropriately | No. Also, re-deploying your main app might take longer as the bundled Odd Jobs will wait for a graceful shutdown before exiting. |
Memory usage | Possibly higher | Possibly lower |
Admin UI | Enable via a command-line switch | Write custom code to integrate into main application |
- Compile the Odd Jobs runner as a separate binary, and run it in an independent process (separate from your main application’s process).
OddJobs.Cli
provides all the building blocks required to put together an independent binary. - Bundle it within your main application’s binary and run it in the same process as your main application
Admin UI
Odd Jobs comes with a built-in admin UI that enables you to do the following:
- See which jobs are currently running, by which machine + process + thread
- View the the latest error of any job which is in the
retry
orfailed
state - See which jobs are scheduled to run in the future
- Retry any job that has been permanently failed after max-attempts
- Immediately run a job that is otherwise scheduled to run in the future
- Cancel a job that is scheduled to run in the future
- Filter by job-type
- Filter by job-runner
- View the payloads of each job (Related: Customising the job’s payload structure)
Using the in-built admin UI
If you are using OddJobs.Cli
to deploy Odd Jobs as a separate binary, you can enable the admin UI by picking the authentication scheme for the UI:
- Either pass
--web-ui-basic-auth-user=<USER>
and--web-ui-basic-auth-password=<PASS>
at the command-line - Or, pass
--web-ui-no-auth
at the command-line
More details about command-line options are available at StartArgs
Bundling the admin UI with an existing web-app
Heads-up: This section of the documentation is incomplete. Please open an issue for the specific framework / library that you’re interested in.
If you’re bundling Odd Jobs into your main application, in all probability you’d want it to run on the same port as your main web-app.
- TODO: General instructions
- TODO: Specific instructions for Scotty
- TODO: Specific instructions for Yesod
- TODO: Specific instructions for Spock
- TODO: Specific instructions for Servant
Controlling concurrency
Odd Jobs uses Haskell’s in-built forkIO
threads to execute multiple jobs concurrently in the same OS process. This is unlike other job-queues, such as Delayed Jobs, or Resque, where you need to run multiple OS processes, because each job-runner can execute only one job at a time.
Compared to such job-queues Odd Jobs has the following advantages as a result of executing jobs in concurrent threads:
- It potentially uses less memory because only one OS process needs to be running
- Lesser number of process are polling the jobs table
- It can scale-up / scale-down the number of threads easily depending upon the workload. With other queueing systems you need to pre-plan the amount of concurrency you need and run those many number of processes on your job machine - you waste resources if your workload is lower, and you cannot scale-up as easily when your workload is higher.
On the other hand, here are the disadvantages:
- Firing too many concurrent jobs can potentially starve your job-machine of resources and can either slow it down to a crawl, or cause out-of-memory errors.
- In case the single job-runner process crashes, it takes down all concurrently running jobs with it (although they will be retried after
cfgDefaultLockTimeout
seconds.
Therefore it is necessary to artificially limit the maximum concurrency of an Odd Jobs runner. This can be done via the cfgConcurrencyControl
parameter:
- Maximum concurrent threads: This is the easiest to set-up and tells Odd Jobs to run at most
N
threads per job-runner. How to pick thisN
depends a lot on what your job workload actually is. Is it CPU heavy? Is it network heavy? Is it acquiring DB connections (from your application’s DB pool) for a long time? In absence of such information, one can assume that acquiring a DB connection from your application’s DB pool is a natural constraint, and that you can useN = size of DB pool
as a starting point. - Dynamic concurrency control: Trying to come up with a fixed number for maximum threads (
N
) is very hard and actually doesn’t utilise the power of running jobs in threads. If you pickN
to be too low, you’re wasting resources. On the other hand, if you pickN
to be too high, you might overload your machine. There’s another way, albeit currently experimental, where before spwaning a new job, Odd Jobs can run anIO Bool
action you provide. If this action evalutes toTrue
the job is picked for execution, else it is left for executing in the next poll, or will be picked up by some other job-runner. In thisIO Bool
action you can write logic to check the CPU-load, free-memory, or IOPS of the machine, and determine whether it is safe to execute another job concurently, without overloading the machine. - Unbounded concurrency: Generally speaking, this is not recommended because it can potentially cause your server to run out of resources.
Customising the job’s payload structure
Important Please ensure that the payload
field in your jobs table is appropriately indexed, else filtering jobs by job-type could potentially be slow. In the example discussed below, a BTREE index on payload #>> '{payload,tag}'
is strongly recommended
Through a number of default config params, Odd Jobs assumes that the Haskell-type representing your job payload, is serialised into the following JSON structure:
{ "tag": "..."
, "contents": ...
}
However, that may not always be the case. For example, in our application at Vacation Labs, our job payload looks like the following:
{
"user_id": xx
, "account_id": xx
, "txn_source": "xx"
, "payload":
{
"tag": "xx"
, "contents": xx
}
}
So, we use the following code snippet to let Odd Jobs know how to get the job-type of a given job:
import Control.Lens ((^?))
import Data.Aeson.Lens (key)
import Data.Maybe (fromMaybe)
let cfg = mkConfig jobLogger "jobs" jobPool jobRunner $ \x ->
x { cfgJobType = \j -> fromMaybe "unknown" $ j ^? (key "payload") . (key "tag")
, cfgJobTypeSql = "payload #>> '{payload,tag}'"
}
Making changes to the Haskell-type representing your job payload
As your application evolves, so will the Haskell type representing your job-payload. Due to changes in your main application code, it is very common to perform one of the following operations:
- Add a new constructor to your sum-type: This scenario poses no problem and you can add a new constructor without any issues.
-
Remove a constructor from your sum-type: If you have jobs scheduled in the future that use a constructur that has been removed, those jobs will fail (because they won’t get parsed into your Haskell job-type). Therefore, depending upon why you’re removing this constructor, you need to take one of the following actions:
- Either, write a DB migration that removes the jobs related to this constructor from the DB
- Or, wait for all jobs related to this constructor to be executed (you can use
runJobNowIO
to force a job to execute immediately)
-
Alter a constructor by either changing its arguments or renaming the constructor itself:
- Either, you don’t alter the constructor in the first place. Instead introduce a new constructor to handle the new code paths, and phase out the old constructor over the next few releases, before removing it completely.
- Or, you write a DB migration to convert the data in the
payload
column for the rows that are related to the constructor.
Structured logging
If you’re using things like Kibana/LogStash, etc. you might want to directly log in JSON instead of plain-text strings. This allows you to build dashboards for Odd Jobs and track things like:
- Successful jobs per minute
- Failed/retried jobs per minute
- Plot job-failure by job-type
- and more…
Here’s an example of how we’re using this at Vacation Labs. In our production environment, we don’t use Kibana/LogStash, but standard rsyslog
along with cee
logging. rsyslog
then inserts this cee/JSON into Postgres/TimescaleDB from where it is visualised using Grafana.
import OddJobs.Job
import OddJobs.ConfigBuilder (mkConfig, defaultJsonLogEvent)
import System.Posix.Syslog as Syslog -- from the `hsyslog` package
import qualified Data.ByteString.Lazy as BSL
import Foreign.C.String
import Data.Aeson as Aeson
import Data.ByteString.Unsafe (unsafeUseAsCStringLen)
jobLogger :: LogLevel -> LogEvent -> IO ()
jobLogger logLevel logEvent =
unsafeUseAsCStringLen logStr $ \cStr -> do
syslog Nothing sysLogLevel cStr
where
sysLogLevel = case logLevel of
LevelDebug -> Syslog.Debug
LevelInfo -> Syslog.Info
LevelWarn -> Syslog.Warning
LevelError -> Syslog.Error
LevelOther _ -> Syslog.Notice
logStr =
"@cee:" <> (BSL.toStrict $ Aeson.encode $ defaultJsonLogEvent logEvent)
main :: IO ()
main = withSysLog "odd-jobs" [] User $ do
let cfg = mkConfig jobLogger "jobs" jobPool (MaxConcurrentJobs (2 * poolSize)) jobRunner Prelude.id
-- use `cfg` here
Graceful shutdown
It is highly recommended to always shutdown the Odd Jobs runner gracefully. This can be achieved by sendnig a SIGINT
signal to the runner process. Once the Odd Jobs runner receives a SIGINT
it initiates a graceful shutdown:
- It stops picking new jobs for execution.
- It waits for at most
cfgDefaultJobTimeout
seconds for existing job threads to exit. - It exits once all current job-threads have exited OR when
cfgDefaultJobTimeout
seconds have elapsed - whichever occurs earlier.
If you’ve used OddJobs.Cli
to build your Odd Jobs runner, you should be able to initiate a graceful shutdown via the command-line. The cli accepts a --timeout
options to allow you to shutdown earlier than cfgDefaultJobTimeout
seconds.
odd-jobs-cli stop --timeout [seconds]
Even if you end-up force-killing the Odd Jobs runner (i.e. without graceful shutdown), any “crashed” jobs will get automatically retried after cfgDefaultJobTimeout
seconds.
Alerts and notifications
Heads-up: This section of the documentation is incomplete. Please open an issue if you’re interested in finding out how to integrate Odd Jobs with error monitoring tools like Aibrake, Sentry, etc.
Future enhancements, and known issues
- Complete a few sections in the docs
- Kill a running job (which is different from cancelling a job that has not been picked-up yet)
- Register job-runners with a “master” server
- Shutdown (all?) job-runners from the admin UI
- Integrate with
snap
- Integrate with
yesod
- Cron-like job-scheduling
- Support for multi-step jobs?
- Integrate with an SMTP library for delivering critical alerts directly via email?
- Enhancements to admin UI:
- Global search
- Kill running job
- View logs
- Aggregated stats