How To Build A Time-Traveling Contacts App with SirixDB

Written by sirixdb | Published 2020/12/28
Tech Story Tags: python-tutorials | fastapi | database | sirixdb | temporal | hackernoon-top-story | temporal-database | sirixdb-and-pysirix

TLDRvia the TL;DR App

SirixDB is a Temporal Database. It was built from the ground up with temporality in mind. Temporal means time related. Using SirixDB, prior revisions of updated and deleted data are effieciently retained.
The use cases for SirixDB divide into two main categories:
  1. Where you need to be able to view older versions of a given record. In these use-cases, SirixDB obiviates the need for a history table to reconstruct the state of the database at a given time. Instead, you can simply specify a timestamp for your queries to use.
  2. Where you need to multiple versions of your data at once. In these cases, we cannot simply use a history table to rebuild to the state at a given time, because we need to compare the state of the database at many timestamps. With SirixDB, however, it is easy to write such time-travel queries.
There are various solutions for these purposes, such as history tables and event sourcing. But using SirixDB is generally a simple and effective solution.
Disclaimer: I am a member of the SirixDB team.

Some Caveats

At this time, SirixDB does not support relational data. Instead, databases and resources (the equivalent of a table in SQL) may contain either XML or JSON, using XQuery — with the JSONiq extension — as the query language (modified to use
=>
instead of the 
.
operator).
It should be noted that SirixDB is ACID compliant.
SirixDB is currently in alpha.

A Demo of SirixDB and PySirix

We will build a simple contacts app to demonstrate the temporal capabilities of SirixDB. A simple contacts app would support the following operations:
  • add a contact
  • view a contact
  • update a contact
  • remove a contact
  • search for a contact
However, once we have the time-traveling capabilities of SirixDB, we can easily support operations such as:
  • view previous versions of contact information (view a contact as it was at a specific date and time / view all versions ever stored for a given contact)
  • view a deleted contact
  • search deleted contacts
  • search all contacts, including deleted contacts
It is completely possible to support these operations in another database system, but it is generally simpler and more efficient to use a temporal datababse system like SirixDB.

Before We Begin

We will only build an API server with endpoints for the above mentioned functionality. We will not build a frontend.
We’ll write the API in Python using the FastAPI framework, with the pysirix library for connecting to SirixDB.
A SirixDB server stores databases, wherein a database contains resources of a single type (either XML or JSON). A resource can be thought of as a large document file (though of course, the internal representation of SirixDB uses a binary encoding — a huge durable, persistent tree). Thus, a JSON resource is represents a large XML or JSON file. We can query JSON data with XQuery and the JSONiq extension to XQuery.
However, instead of writing custom queries, we will (for the most part) use the JsonStore abstraction provided by the pysirix package. A JsonStore creates a resource with an empty array as its root, and inserts (and retrieves) arbitrary JSON objects to (and from) this array. In this abstraction, it is similiar to MongoDB is used.
Let’s begin.

Environment and Dependencies

We first need to set up SirixDB itself. Currently, the practical way to do this is using Docker Compose. Place these files (except for
wait.sh
, which is not needed) in a directory together, and simply run
docker-compose up -d
This will create both SirixDB and Keycloak (used for authentication) containers. The create-sirix-users.sh file is configured to create a user named admin with the password admin.
Then, to install FastAPI, Uvicorn (for running the fastapi server) and PySirix, run
pip install fastapi uvicorn pysirix
pysirix can be used with both regular and async code. We will use it both ways - sync mode in the initialization script, and async mode in the FastAPI app.

Code

The completed tutorial can be found on github.

Initialization Script

Before we begin, we will create a script that will initialize the database and resource on the SirixDB server (running in docker) for our project.
import pysirix
import httpx


def init(database_name: str, resource_name: str):
    client = httpx.Client(base_url="http://localhost:9443")
    sirix = pysirix.sirix_sync("admin", "admin", client)
    store = sirix.database(database_name, pysirix.DBType.JSON).json_store(resource_name)
    if not store.exists():
        # database will be created implicitly if it does not exist when the resource is created
        store.create()
        print(f"created resource {resource_name} in database {database_name}")
    else:
        print(f"resource {resource_name} in database {database_name} already exists")
    client.close()


if __name__ == "__main__":
    init("contacts", "contacts")
This code first creates an httpx client to connect to
http://localhost:9443
, which is the URL of our local SirixDB server. Then the client is passed to the
sirix_sync
function, which is helper to both create a Sirix class and authenticate to Keycloak.
On the third line of the
init()
function, we create a
JsonStoreSync
class, and then we check if it exists on line 4. If it exists, we do not recreate it, as this would override the existing resource. If it does not exist, however, we create it. We then close the httpx client, and the script exits.
I have named this script initialize.py and placed it in a scripts directory, so I can run it as follows:
python scripts/initialize.py

App Code

We first need to insert new contacts into the store. In order to do so, our code would look something like this:
from pysirix import sirix_async, DBType
import httpx


httpx_client = httpx.AsyncClient(base_url="http://localhost:9443")
sirix = await sirix_async("admin", "admin", httpx_client)
store = sirix.database("contacts", DBType.JSON).json_store("contacts")
await store.insert_one(contact.dict())
However, since we will be needing the
JsonStoreAsync
class quite a lot, we will use FastAPI's dependency injection system to pass a store instance to our endpoint functions:
from fastapi import FastAPI
import httpx
from pysirix import sirix_async, DBType


app = FastAPI()


async def get_json_store() -> JsonStoreAsync:
    httpx_client = httpx.AsyncClient(base_url="http://localhost:9443")
    sirix = await sirix_async("admin", "admin", httpx_client)
    store = sirix.database("contacts", DBType.JSON).json_store("contacts")
    try:
        yield store
    finally:
        sirix.dispose()
        await httpx_client.aclose()
Here, we recreate the
Sirix
instance the
httpx
AsyncClient
instance every time this function is called (which will be every time an endpoint function is called). This also has the overhead of authenticating to Keycloak every time
sirix_async
is called. An alternative solution, which reuses the python classes, the connection pool, and removes the overhead of of re-authentication, would look like this:
app = FastAPI()

httpx_client = httpx.AsyncClient(base_url="http://localhost:9443")

sirix = Sirix("admin", "admin", httpx_client)


@app.on_event("startup")
async def init_sirix():
    await sirix.authenticate()
Here, I have demonstrated how to initialize the Sirix class directly, instead of using the
sirix_sync
or
sirix_async
helper functions. It will be initialized when the app starts. We cannot make this code, because the authenticate method (like all other methods) is asynchronous when using the
httpx.AsyncClient
.
However, even with this method, instead of simply using the sirix global, we will use FastAPI's dependency injection system to inject a
JsonStoreAsync
instance into our endpoint handlers. For this, we simply need a function that will initialize and return a
JsonStoreAsync
instance.
def get_json_store(sirix: Sirix) -> JsonStoreAsync:
    return sirix.database("contacts", DBType.JSON).json_store("contacts")

Create a Contact

Now, we can define our first endpoint — 
/contacts/new
:
from . import schemas


@app.post("/contact/new", status_code=status.HTTP_204_NO_CONTENT)
async def new_contact(
    contact: schemas.Contact, json_store: JsonStoreAsync = Depends(get_json_store)
):
    """
    Create a new contact
    """
    await json_store.insert_one(contact.dict())
The only thing we still need for this endpoint, is to define what our Contact object should look like. So let's add the following to
schemas.py
:
from pydantic import BaseModel, root_validator
from typing import Optional


class Contact(BaseModel):
    name: Optional[str]
    phone: Optional[str]
    email: Optional[str]
    address: Optional[str]

    @root_validator
    def check_not_empty(cls, fields):
        """
        At least 1 field must be truthy.
        """
        assert any(
            fields.values()
        ), "At least 1 field must not be None/null, and not empty"
        return fields
All fields are optional (so they can be
None
, or
null
in JSON), so as to allow creating a contact without all the fields containing a value. Of course, this means that a contact could be created without any fields present. We rectify this by adding a validator that asserts that at least one field contains a value.
Our
/contact/new
endpoint should now work as expected. If we run our app with uvicorn, and navigate to http://localhost:8000/docs, we can test it out, and store a new contact.

List Contacts

Our second endpoint will simply return all contacts. By default, we will return all current contacts, but if a database revision number is supplied, we will return the database as it was at that state. Alternatively, if a UTC timestamp we will supply the contacts that existed at that time.
@app.get("/contact/list", response_model=list[schemas.ContactWithMeta])
async def list_contacts(
    revision_id: Optional[int] = None,
    revision_timestamp: Optional[str] = None,
    json_store: JsonStoreAsync = Depends(get_json_store),
):
    """
    List all contacts. Optionally, a revision may be specified.
    """
    results = await json_store.find_all(
        {}, revision=parse_revision(revision_id, revision_timestamp), hash=True
    )
    return [
        schemas.ContactWithMeta(**result, key=result["nodeKey"]) for result in results
    ]
We will use the find_all method to return all contacts matching the
query_dict
. We have supplied an empty dictionary, so all records will match. If we supplied
{"name": "SirixDB"}
, then only those records whose name field is exactly SirixDB would be returned.
By default, the
find_all
method also returns a nodeKey for every record returned. This is a stable and unique identifier for the root of the record it is returned with. It also optionally returns a hash for the revision of the record. We will use both shortly for updates and deletes.
In order to return the nodeKeys to the client, we will define a new
ContactWithMeta
class, that requires a key field:
class ContactWithMeta(Contact):
    key: int
    hash: str
Also, because we will often provide the option of providing either a revision number or a stringified timestamp, we will define a utility function that handle converting the provided revision parameters to be acceptable to pysirix.
from datetime import datetime


def parse_revision(
    revision_id: Union[int], revision_timestamp: Union[str, None]
) -> Union[int, datetime, None]:
    """
    A utility function to return either a revision ID or a revision timestamp, or ``None``,
            given two possible values (``revision_id`` and ``revision_timestamp``).

    :return: an ``int`` or ``datetime`` representing a revision 
    """
    return revision_id or (
        (
            revision_timestamp
            and datetime.strptime(revision_timestamp, "%Y-%m-%dT%H:%M:%S.%f")
        )
        or None
    )
If a number was provided as
revision_id
, the number will be returned, otherwise, if a timestamp was provided, it will be returned as a datetime object. Else,
None
will be returned, which is also a valid value to pass to pysirix, to indicate the most recent revision.

Delete a Contact

The
JsonStoreAsync
class does not currently provide a method to delete a record by nodeKey, only by
query_dict
. A simple way to delete a record by nodeKey is to use the
Resource
class:
from fastapi import Response
from pysirix import SirixServerError


@app.delete("/contact/{contact_key}")
async def delete_contact(
    contact_key: int, hash: str, resource: Resource = Depends(get_json_resource)
):
    """
    Delete the contact with the given key.
    If the record has changed since the hash was obtained, a 409 error is returned.
    """
    try:
        await resource.delete(contact_key, hash)
    except SirixServerError:
        return Response(status_code=status.HTTP_409_CONFLICT)
    return Response(status_code=status.HTTP_204_NO_CONTENT)
We need to handle a possible
SirixServerError
, due to the way the
Resource
class deletes the record.
Ideally, you would wish to be certain that the record has not changed between the last time you retrieved it, and the time of deletion. One way of doing so is to retrieve a hash associated with the record upon retrieval, and to provide the hash when deleting. If the hashes do not match, then the server returns an error response, and a
SirixServerError
is raised.
To provide the
Resource
to our function, we can create a simple dependency injection function (here I will use the alternate method shown above):
from pysirix import Resource


def get_json_resource() -> Resource:
    return sirix.database("contacts", DBType.JSON).resource("contacts")

Update a Contact

@app.put("/contact/{contact_key}", status_code=status.HTTP_204_NO_CONTENT)
async def update_contact(
    contact_key: int,
    contact: schemas.Contact,
    json_store: JsonStoreAsync = Depends(get_json_store),
):
    """
    Update a contact. Fields in the new contact object will
            overwrite fields in the old version of the contact.
    """
    await json_store.update_by_key(contact_key, contact.dict())
The
update_by_key
method takes a key and a dictionary. Any fields present in the dictionary override the corresponding fields currently present in the record.

View Old Versions of a Contact

@app.get("/contact/{contact_key}", response_model=schemas.Contact)
async def view_contact(
    contact_key: int,
    revision_id: Optional[int] = None,
    revision_timestamp: Optional[str] = None,
    json_store: JsonStoreAsync = Depends(get_json_store),
):
    """
    Return a contact, given its key. Can return the contact as it was in different points in time.
    By default, the current version is returned.
    """
    result = await json_store.find_by_key(
        contact_key, parse_revision(revision_id, revision_timestamp)
    )
    return schemas.Contact(**result)
This function is similar to the
/contact/list
endpoint, except that it returns only a single contact, as it was at a given time.

View All Revisions of a Contact

@app.get(
    "/contact/{contact_key}/history",
    response_model=Union[list[schemas.HistoricalContact], list[schemas.Revision]],
)
async def view_contact_history(
    contact_key: int,
    revision_id: Optional[int] = None,
    revision_timestamp: Optional[str] = None,
    embed: bool = False,
    json_store: JsonStoreAsync = Depends(get_json_store),
):
    """
    Return the history of a contact, given its key.
    If `embed` if `False`, then only the metadata of each revision will be returned.
    Else, the contact (as it was at that revision) will be returned as well.

    If the contact does not currently exist, a `revision_id` or `revision_timestamp`
    of when the contact _did_ exist can be supplied.
    """
    if embed:
        results = await json_store.history_embed(
            contact_key, parse_revision(revision_id, revision_timestamp)
        )
        return [schemas.HistoricalContact(**result) for result in results]
    else:
        return await json_store.history(
            contact_key, revision=parse_revision(revision_id, revision_timestamp)
        )
We also need to define the schemas for this method:
class Revision(BaseModel):
    """
    This schema is of the form of pysirix.types.SubtreeRevision
    """

    revisionTimestamp: str
    revisionNumber: int


class HistoricalContact(Revision):
    """
    This schema is of the form of pysirix.types.QueryResult
    """
    revision: Contact

Search (Within a Revision)

We will now implement search. We will first implement searching within a particular revision (or the current revision), and later will implement another endpoint for all-time search.
We will first declare the
QueryTerm
class that will contain a search term.
class QueryTerm(BaseModel):
    # the term to match against
    term: str
    # whether to match when the field string contains `term`, instead of looking for an exact match
    fuzzy: bool = False
    # which field in the record to match against
    field: str
And now our endpoint handler:
from fastapi import HTTPException


@app.post("/contact/search", response_model=list[schemas.ContactWithMeta])
async def search_contacts(
    query_terms: list[schemas.QueryTerm],
    revision_id: Optional[int] = None,
    revision_timestamp: Optional[str] = None,
    resource: Resource = Depends(get_json_resource),
):
    """
    Search for a contact. If an empty list is provided instead of a list of
            search terms, a 400 error is returned.
    Provide a `revision_id` or `revision_timestamp` to search a particular revision,
    instead of the latest.
    """
    if len(query_terms) == 0:
        raise HTTPException(
            status.HTTP_400_BAD_REQUEST,
            "when not using search terms, use the /contact/list endpoint instead",
        )
    if revision_id:
        open_resource = f"jn:doc('contacts','contacts', {revision_id})"
    elif revision_timestamp:
        open_resource = (
            f"jn:open('contacts','contacts', xs:dateTime('{revision_timestamp}'))"
        )
    else:
        open_resource = "."
    query_list = []
    for query_term in query_terms:
        if query_term.fuzzy:
            query_list.append(
                f"(typeswitch($i=>{query_term.field}) "
                f"case xs:string return contains(xs:string($i=>{query_term.field}), '{query_term.term}')"
                " default return false())"
            )
        else:
            query_list.append(f"$i=>{query_term.field} eq '{query_term.term}'")
    query_filter = " and ".join(query_list)
    query = (
        f"for $i in bit:array-values({open_resource}) where {query_filter}"
        " return {$i, 'nodeKey': sdb:nodekey($i), 'hash': sdb:hash($i)}"
    )
    results = await resource.query(query)
    return [
        schemas.ContactWithMeta(**result, key=result["nodeKey"])
        for result in results["rest"]
    ]
First, we will create the
open_resource
variable, containing the XQuery code to open the resource. If we wish to query the latest resource, we can simply use . to indicate the resource, since we are using the
Resource
class. If we wish to open a particular revision, however, we must use
jn:doc
or
jn:open
. We also need to convert the timestamp into an
xs:dateTime
.
We then create the
query_list
, where each item in the list is an expression returning either true or false. If the search is not fuzzy, we will simply do a comparison of the search term and the field value. If we wish to do a fuzzy search, we use the contains method to check if the search term is contained in the field.
However, there is a potential bug in using the contains method. Since a field must be converted to
xs:string
before using the contains method, if the field is null, it will be converted to the string
"null"
, and matched. Also, if for some reason the field of that record is not a string (as SirixDB does not enforce type schemas on the resources), there may even be an error thrown by a failed conversion.
To work around this issue, we use a
typeswitch
statement to first check that the field is a string, otherwise, the
typeswitch
returns false.
Finally, we declare the query itself:
for $i in bit:array-values({open_resource})
opens the resource and iterates through the array at the base of the resource.
where {query_filter}
then filters out any record (
$i
) that does not match our query terms, and then finally
return {$i, 'nodeKey': sdb:nodekey($i), 'hash': sdb:hash($i)}
returns an object - for every record that passed the filter - containing the object, the key, and the hash.
As we will soon see, if we simply returned
$i
, we would get an object of the compatible with the
schemas.HistoricalContact
class declared above. However, when returning
{$i}
, only the record itself is returned.

Search (all-time)

We will now implement a similar query to search all contacts stored, including deleted contacts.
@app.post("/contact/search/all-time", response_model=list[schemas.HistoricalContact])
async def search_contacts_all_time(
    query_terms: list[schemas.QueryTerm],
    existing: bool = True,
    resource: Resource = Depends(get_json_resource),
):
    """
    Search for a contact, even it does not currently exist.
    All contacts are returned if an empty list is provided.

    If `existing` is `True`, then currently existing contacts will be returned as well.
    """
    if len(query_terms) == 0:
        query_filter = ""
    else:
        query_list = []
        for query_term in query_terms:
            if query_term.fuzzy:
                query_list.append(
                    f"(typeswitch($i=>{query_term.field}) "
                    f"case xs:string return contains(xs:string($i=>{query_term.field}), '{query_term.term}')"
                    " default return false())"
                )
            else:
                query_list.append(f"$i=>{query_term.field} eq '{query_term.term}'")
        query_filter = " and ".join(query_list)
        query_filter = f"where {query_filter}"
    query_deleted_only = "where sdb:is-deleted($i)" if not existing else ""
    deduplicate = (
        "if (not(exists(jn:future($i)))) then $i "
        "else if (sdb:hash($i) ne sdb:hash(jn:future($i))) then $i "
        "else ()"
    )
    query = (
        "for $rev in jn:all-times(.) for $i in bit:array-values($rev) "
        f"{query_deleted_only} {query_filter} return {deduplicate}"
    )
    results = await resource.query(query)
    return results["rest"]
In this route we will handle requests without any query terms, by not filtering results, (instead of raising an error, and directing to use a different route, as in the non-temporal search route). In that case, query_filter will be an empty string. If query terms are provided, however, we will build a filter expression similar to the one used above in the non-temporal search.
We then create
query_deleted_only
, which will be an empty string if we are to query existing contacts as well, or add an additional where clause if we are to not query existing contacts.
We then define a deduplicate expression, where we check if the hash of the record is the same as the hash of that record in the next revision. If it is the same, then we return the empty sequence
()
to indicate we are returning nothing. If, on the other hand, the hashes are not equal (indicating that a change has occurred), or it does not exist at all in the next revision (meaning it was deleted), then we return the record.
We then define the query itself, iterating through all revisions of the resource (
for $rev in jn:all-times(.)
), applying our search filters and deduplicate filters.

Join us!

Do you plan on using SirixDB? Do you have a feature request?

Written by sirixdb | A versioned, hybrid on-disk/in-memory database management system. Query your data from yesterday.
Published by HackerNoon on 2020/12/28