When the iteration loop is long, programming becomes frustrating.
Let’s say you have to change a low-level system that lacks a way to test it directly. To test the new code, you have to manually execute a complex series of time-consuming and onerous steps. When something goes wrong, all you know is either your code is broken (for some unknown reason), or you exposed an existing issue in the monstrous system that depends on your change. It’s even worse when the code cannot be tested in a reproducible way, and you get a different behavior with each run.
If that isn’t frustrating enough, imagine you are unable to implement the solution you envisioned. You can’t directly access the low-level system because a series of components — meant to abstract, simplify and create a safe alternative to your low-level system — are in the way. The components accomplish little of their stated purposes, but force your final design to be slower and more complex than necessary.
Finally, you learn that the dev environment you are testing doesn’t match the behavior of production in crucial, important ways.
This is what feels like to write database queries for many websites. It’s not fun. Let’s fix that.
In this post I’ll show how to make database programming fun by using packages that make testing easy. I’ll walk through a example of building a durable queue backed with PostgreSQL.
To make database programming fun, we need a quick way to iterate. We need to bring up instances of our database, PostgreSQL in my case, and directly test the queries we are writing.
One excellent option is [pg_tmp](http://ephemeralpg.org/)
. I’m going to utilize a Haskell analog I wrote called [tmp-postgres](http://hackage.haskell.org/package/tmp-postgres)
and the test helper based on it: [hspec-pg-transact](http://hackage.haskell.org/package/hspec-pg-transact)
.
[hspec-pg-transact](http://hackage.haskell.org/package/hspec-pg-transact)
provides a helper for setting up a test suite that creates a temporary postgres
process. Tests can use the optional [itDB](https://hackage.haskell.org/package/hspec-pg-transact-0.1.0.0/docs/Test-Hspec-DB.html#v:itDB)
, which adds a test and runs the db transaction used for the test.
Here is a simple example of testing db queries with a temporary database.
describeDB migrate "Query” $itDB "work" $ doexecute_ [sql|INSERT INTO thingsVALUES (‘me’) |]query_ [sql|SELECT nameFROM things |]`shouldReturn` [Only "me"]
(You need to make sure that initdb
is on the PATH
. This is case for brew
on macOS, but for Ubuntu you must add /usr/lib/postgresql/VERSION/bin
to the PATH
)
The full code, including import statements, is here. Let’s walk through a larger example.
In one of my previous posts, I discussed an in memory queue for sending emails. The problem with the queue, as Cale Gibbard pointed out, is that it’s not durable. However, PostgreSQL is durable, so let’s use it to make our queue as Cale did.
We’ll add new payloads with enqueueDB
. tryLockDB
will grab the latest payload for us to process. We’ll call dequeueDB
on the payload once the processing is finished:
data Payload = Payload{ pId :: PayloadId, pValue :: Value, pState :: State, pCreatedAt :: UTCTime, pModifiedAt :: UTCTime} deriving (Show, Eq)
enqueueDB :: Value -> DB PayloadIdenqueueDB value = dopid <- liftIO randomIOexecute[sql| INSERT INTO payloads(id, value) VALUES (?, ?)|](pid, value)return $ PayloadId pid
tryLockDB :: DB (Maybe Payload)tryLockDB = listToMaybe <$> query_[sql| UPDATE payloadsSET state='locked'WHERE id in( SELECT idFROM payloadsWHERE state='enqueued'ORDER BY created_at ASCLIMIT 1)RETURNING id, value, state,created_at,modified_at|]
dequeueDB :: PayloadId -> DB ()dequeueDB payloadId = void $ execute[sql| UPDATE payloadsSET state='dequeued'WHERE id=?|]payloadId
All the operations occur in a [DB](http://hackage.haskell.org/package/pg-transact-0.1.0.0/docs/Database-PostgreSQL-Transact.html#t:DB)
monad so we can compose them with other db operations and have them run in the same transaction. For instance, we could use this library to enqueueDB
and create a customer in a single transaction.
We can utilize [hspec-pg-transact](http://hackage.haskell.org/package/hspec-pg-transact)
to write our tests:
main :: IO ()main = hspec spec
spec :: Specspec = describeDB init "Queue" $ doitDB "empty gives nothing" $ dotryLockDB`shouldReturn` Nothing
itDB "enqueue/lock/dequeue" $ dotheId <- enqueueDB $ String "!"Just Payload {..} <- tryLockDB
pId \`shouldBe\` theId
pValue \`shouldBe\` String "!"
tryLockDB
\`shouldReturn\` Nothing
dequeueDB pId \`shouldReturn\` ()
tryLockDB
\`shouldReturn\` Nothing
We can load up our single test file, [test/Database/QueueSpec.hs](https://github.com/jfischoff/postgresql-queue/blob/master/test/Database/PostgreSQL/Simple/QueueSpec.hs)
, and run it in isolation:
> :l test/Database/QueueSpec.hs> :main --match=locksDatabase.Queuepostgresql:///test?host=/tmp/tmp-postgres26345&port=57473
empty locks nothingenqueue/lock/dequeueShutting Down
Finished in 1.8647 seconds2 examples, 0 failures
Two seconds isn’t considered great in the unit-testing world, but for database testing we are in good shape. Not only are the tests fast, but we can also quickly iterate by reloading and rerunning a single file, all in single-digit seconds.
This actually scales quite well. The queries themselves take milliseconds; almost all of the time is taken for the temporary db startup, which we only need to do once.
Simple tests are easy, but let’s write a more complicated stress test. Unlike this simple test, the stress test will need to run multiple transactions, so we can’t use the itDB
test helper.
In this test we create 10
consumer threads that read from the queue and 1000
threads that enqueue
messages:
it "stress queue" $ \testDB -> dolet withPool' = withPool testDBelemCount = 1000 :: Intexpected = [0 .. elemCount-1]
ref <- newIORef []
-- Make 10 queue consumer threadsloopThreads <- replicateM 10$ async$ fix $ \next -> dompayload <- withPool' tryLockcase mpayload ofNothing -> nextJust Payload {..} -> dolast <-atomicModifyIORef ref $\xs -> ( pValue : xs, length xs + 1)withPool' $ \conn ->dequeue conn pIdwhen (last < elemCount)next
-- Fork a 1000 threads and-- enqueue an indexforM_ [0 .. elementCount - 1] $\i -> forkIO $ void $withPool' $ flip enqueue $toJSON i
waitAnyCancel loopThreadsJust decoded <-mapM (decode . encode)<$> readIORef refsort decoded`shouldBe` sort expected
This test works, but makes clear that there is an issue with our design. We have to poll the db constantly to see if there is new data.
Luckily PostgreSQL has a solution for us, as lpsimth pointed out: [NOTIFY](https://www.postgresql.org/docs/current/static/sql-notify.html)
and [LISTEN](https://www.postgresql.org/docs/current/static/sql-listen.html)
. We make a new function, lock
, to utilize PostgreSQL’s pubsub feature:
notifyPayload :: Connection-> IO ()notifyPayload conn = doNotification {..} <-getNotification connunless (notificationChannel== "enqueue") $notifyPayload conn
lock :: Connection-> IO Payloadlock conn = bracket_(Simple.execute_conn "LISTEN enqueue")(Simple.execute_conn "UNLISTEN enqueue")$ fix $ \continue -> dom <- tryLock conncase m ofNothing -> donotifyPayload conncontinueJust x -> return x
We extend enqueueDB
to also NOTIFY
our enqueue
channel after it enqueues a payload
enqueueDB :: Value -> DB PayloadIdenqueueDB value = dopid <- liftIO randomIOexecute[sql| INSERT INTO payloads(id, value) VALUES (?, ?)NOTIFY enqueue;|](pid, value)return $ PayloadId pid
We can write a similar test for lock
, and our loop becomes:
loopThreads <- replicateM 10 $async $ fix $ \next -> do-- blocks waiting for-- new payloadx <- withPool testDB locklastCount <-atomicModifyIORef ref $\xs -> ( pValue x : xs, length xs + 1)withPool testDB $ \conn ->dequeue conn (pId x)when (last < elemCount)next
Much nicer (ignoring the super small margins for mobile formatting :p). The final queue is packaged up as postgresql-simple-queue
on hackage and github.
We went through a bunch of intermediate states before I pushed it to Hackage. I started with something simple, wrote stress tests, which thankfully found bugs, and added pubsub. Most incredibly, it was fun and not frustrating.
Don’t take my word for it: try it. [pg-transact](https://hackage.haskell.org/package/pg-transact)
and [hspec-pg-transact](https://hackage.haskell.org/package/hspec-pg-transact)
are on Hackage/Stackage.
We can now write a better email process queue, discussed in my other post. First, we need a processing loop:
forever $ dopayload <- withResourceconnectionPool locksendEmail payloadwithResource connectionPool $\conn -> dequeue conn(pId payload)
postgres-simple-queue
’s, [defaultMain](https://github.com/jfischoff/postgresql-queue/blob/master/src/Database/PostgreSQL/Simple/Queue/Main.hs#L133)
provides this consumer loop for us and also does command line argument processing. We can make a queue consumer executable easily, like so:
main :: IO ()main = doenv <- newEnv DiscoverrunResourceT $ runAWS env $ defaultMain "aws-email-queue-consumer"$ \payload _ -> docase fromJSON $ pValue payload ofSuccess email -> doresp <- AWS.send $ makeEmail emaillogFailedRequest respError x -> throwIO $ userError$ "Failed to decode payload as an Email: "++ show x
See [EmailQueue](https://github.com/jfischoff/postgresql-queue/blob/master/examples/EmailQueue.hs)
for the full example.