is a . It was built from the ground up with in mind. means time related. Using SirixDB, prior revisions of updated and deleted data are effieciently retained. SirixDB Temporal Database temporality Temporal The use cases for SirixDB divide into two main categories: Where you need to be able to view older versions of a given record. In these use-cases, SirixDB obiviates the need for a history table to reconstruct the state of the database at a given time. Instead, you can simply specify a timestamp for your queries to use. Where you need to versions of your data at once. In these cases, we cannot simply use a history table to rebuild to the state at a given time, because we need to compare the state of the database at timestamps. With SirixDB, however, it is easy to write such time-travel queries. multiple many There are various solutions for these purposes, such as history tables and event sourcing. But using SirixDB is generally a simple and effective solution. Disclaimer: I am a member of the SirixDB team. Some Caveats At this time, SirixDB does not support relational data. Instead, databases and resources (the equivalent of a table in SQL) may contain either XML or JSON, using XQuery — with the JSONiq extension — as the query language (modified to use instead of the operator). => . It should be noted that SirixDB ACID compliant. is SirixDB is currently in alpha. A Demo of SirixDB and PySirix We will build a simple contacts app to demonstrate the temporal capabilities of SirixDB. A simple contacts app would support the following operations: add a contact view a contact update a contact remove a contact search for a contact However, once we have the time-traveling capabilities of SirixDB, we can easily support operations such as: view previous versions of contact information (view a contact as it was at a specific date and time / view all versions ever stored for a given contact) view a deleted contact search deleted contacts search all contacts, including deleted contacts It is completely possible to support these operations in another database system, but it is generally simpler and more efficient to use a temporal datababse system like SirixDB. Before We Begin We will only build an API server with endpoints for the above mentioned functionality. We will not build a frontend. We’ll write the API in Python using the , with the for connecting to SirixDB. FastAPI framework pysirix library A SirixDB server stores databases, wherein a database contains resources of a single type (either XML or JSON). A resource can be thought of as a large document file (though of course, the internal representation of SirixDB uses a binary encoding — a huge durable, persistent tree). Thus, a JSON resource is represents a large XML or JSON file. We can query JSON data with XQuery and the JSONiq extension to XQuery. However, instead of writing custom queries, we will (for the most part) use the JsonStore abstraction provided by the pysirix package. A JsonStore creates a resource with an empty array as its root, and inserts (and retrieves) arbitrary JSON objects to (and from) this array. In this abstraction, it is similiar to MongoDB is used. Let’s begin. Environment and Dependencies We first need to set up SirixDB itself. Currently, the practical way to do this is using Docker Compose. Place (except for , which is not needed) in a directory together, and simply run these files wait.sh docker-compose up -d This will create both SirixDB and Keycloak (used for authentication) containers. The create-sirix-users.sh file is configured to create a user named admin with the password admin. Then, to install FastAPI, Uvicorn (for running the fastapi server) and PySirix, run pip install fastapi uvicorn pysirix pysirix can be used with both regular and async code. We will use it both ways - sync mode in the initialization script, and async mode in the FastAPI app. Code The completed tutorial can be found . on github Initialization Script Before we begin, we will create a script that will initialize the database and resource on the SirixDB server (running in docker) for our project. pysirix httpx client = httpx.Client(base_url= ) sirix = pysirix.sirix_sync( , , client) store = sirix.database(database_name, pysirix.DBType.JSON).json_store(resource_name) store.exists(): store.create() print( ) : print( ) client.close() __name__ == : init( , ) import import : def init (database_name: str, resource_name: str) "http://localhost:9443" "admin" "admin" if not # database will be created implicitly if it does not exist when the resource is created f"created resource in database " {resource_name} {database_name} else f"resource in database already exists" {resource_name} {database_name} if "__main__" "contacts" "contacts" This code first creates an httpx client to connect to , which is the URL of our local SirixDB server. Then the client is passed to the function, which is helper to both create a Sirix class and authenticate to Keycloak. http://localhost:9443 sirix_sync On the third line of the function, we create a class, and then we check if it exists on line 4. If it exists, we do recreate it, as this would override the existing resource. If it does exist, however, we create it. We then close the httpx client, and the script exits. init() JsonStoreSync not not I have named this script initialize.py and placed it in a scripts directory, so I can run it as follows: python scripts/initialize.py App Code We first need to insert new contacts into the store. In order to do so, our code would look something like this: pysirix sirix_async, DBType httpx httpx_client = httpx.AsyncClient(base_url= ) sirix = sirix_async( , , httpx_client) store = sirix.database( , DBType.JSON).json_store( ) store.insert_one(contact.dict()) from import import "http://localhost:9443" await "admin" "admin" "contacts" "contacts" await However, since we will be needing the class quite a lot, we will use FastAPI's dependency injection system to pass a store instance to our endpoint functions: JsonStoreAsync fastapi FastAPI httpx pysirix sirix_async, DBType app = FastAPI() httpx_client = httpx.AsyncClient(base_url= ) sirix = sirix_async( , , httpx_client) store = sirix.database( , DBType.JSON).json_store( ) : store : sirix.dispose() httpx_client.aclose() from import import from import async -> JsonStoreAsync: def get_json_store () "http://localhost:9443" await "admin" "admin" "contacts" "contacts" try yield finally await Here, we recreate the instance the instance every time this function is called (which will be every time an endpoint function is called). This also has the overhead of authenticating to Keycloak every time is called. An alternative solution, which reuses the python classes, the connection pool, and removes the overhead of of re-authentication, would look like this: Sirix httpx AsyncClient sirix_async app = FastAPI() httpx_client = httpx.AsyncClient(base_url= ) sirix = Sirix( , , httpx_client) sirix.authenticate() "http://localhost:9443" "admin" "admin" @app.on_event("startup") async : def init_sirix () await Here, I have demonstrated how to initialize the Sirix class directly, instead of using the or helper functions. It will be initialized when the app starts. We cannot make this code, because the authenticate method (like all other methods) is asynchronous when using the . sirix_sync sirix_async httpx.AsyncClient However, even with this method, instead of simply using the sirix global, we will use FastAPI's dependency injection system to inject a instance into our endpoint handlers. For this, we simply need a function that will initialize and return a instance. JsonStoreAsync JsonStoreAsync sirix.database( , DBType.JSON).json_store( ) -> JsonStoreAsync: def get_json_store (sirix: Sirix) return "contacts" "contacts" Create a Contact Now, we can define our first endpoint — : /contacts/new . schemas json_store.insert_one(contact.dict()) from import @app.post("/contact/new", status_code=status.HTTP_204_NO_CONTENT) async : def new_contact ( contact: schemas.Contact, json_store: JsonStoreAsync = Depends ) (get_json_store) """ Create a new contact """ await The only thing we still need for this endpoint, is to define what our Contact object should look like. So let's add the following to : schemas.py pydantic BaseModel, root_validator typing Optional name: Optional[str] phone: Optional[str] email: Optional[str] address: Optional[str] any( fields.values() ), fields from import from import : class Contact (BaseModel) @root_validator : def check_not_empty (cls, fields) """ At least 1 field must be truthy. """ assert "At least 1 field must not be None/null, and not empty" return All fields are optional (so they can be , or in JSON), so as to allow creating a contact without all the fields containing a value. Of course, this means that a contact could be created without fields present. We rectify this by adding a validator that asserts that at least one field contains a value. None null any Our endpoint should now work as expected. If we run our app with uvicorn, and navigate to , we can test it out, and store a new contact. /contact/new http://localhost:8000/docs List Contacts Our second endpoint will simply return all contacts. By default, we will return all current contacts, but if a database revision number is supplied, we will return the database as it was at that state. Alternatively, if a UTC timestamp we will supply the contacts that existed at that time. results = json_store.find_all( {}, revision=parse_revision(revision_id, revision_timestamp), hash= ) [ schemas.ContactWithMeta(**result, key=result[ ]) result results ] @app.get("/contact/list", response_model=list[schemas.ContactWithMeta]) async : def list_contacts ( revision_id: Optional[int] = None, revision_timestamp: Optional[str] = None, json_store: JsonStoreAsync = Depends , ) (get_json_store) """ List all contacts. Optionally, a revision may be specified. """ await True return "nodeKey" for in We will use the find_all method to return all contacts matching the . We have supplied an empty dictionary, so all records will match. If we supplied , then only those records whose name field is exactly SirixDB would be returned. query_dict {"name": "SirixDB"} By default, the method also returns a nodeKey for every record returned. This is a stable and unique identifier for the root of the record it is returned with. It also optionally returns a hash for the revision of the record. We will use both shortly for updates and deletes. find_all In order to return the nodeKeys to the client, we will define a new class, that requires a key field: ContactWithMeta key: int hash: str : class ContactWithMeta (Contact) Also, because we will often provide the option of providing either a revision number or a stringified timestamp, we will define a utility function that handle converting the provided revision parameters to be acceptable to pysirix. datetime datetime revision_id ( ( revision_timestamp datetime.strptime(revision_timestamp, ) ) ) from import -> Union[int, datetime, ]: def parse_revision ( revision_id: Union[int], revision_timestamp: Union[str, None] ) None """ A utility function to return either a revision ID or a revision timestamp, or ``None``, given two possible values (``revision_id`` and ``revision_timestamp``). :return: an ``int`` or ``datetime`` representing a revision """ return or and "%Y-%m-%dT%H:%M:%S.%f" or None If a number was provided as , the number will be returned, otherwise, if a timestamp was provided, it will be returned as a datetime object. Else, will be returned, which is also a valid value to pass to pysirix, to indicate the most recent revision. revision_id None Delete a Contact The class does not currently provide a method to delete a record by nodeKey, only by . A simple way to delete a record by nodeKey is to use the class: JsonStoreAsync query_dict Resource fastapi Response pysirix SirixServerError : resource.delete(contact_key, hash) SirixServerError: Response(status_code=status.HTTP_409_CONFLICT) Response(status_code=status.HTTP_204_NO_CONTENT) from import from import @app.delete("/contact/{contact_key}") async : def delete_contact ( contact_key: int, hash: str, resource: Resource = Depends ) (get_json_resource) """ Delete the contact with the given key. If the record has changed since the hash was obtained, a 409 error is returned. """ try await except return return We need to handle a possible , due to the way the class deletes the record. SirixServerError Resource Ideally, you would wish to be certain that the record has not changed between the last time you retrieved it, and the time of deletion. One way of doing so is to retrieve a hash associated with the record upon retrieval, and to provide the hash when deleting. If the hashes do not match, then the server returns an error response, and a is raised. SirixServerError To provide the to our function, we can create a simple dependency injection function (here I will use the alternate method shown above): Resource pysirix Resource sirix.database( , DBType.JSON).resource( ) from import -> Resource: def get_json_resource () return "contacts" "contacts" Update a Contact json_store.update_by_key(contact_key, contact.dict()) @app.put("/contact/{contact_key}", status_code=status.HTTP_204_NO_CONTENT) async : def update_contact ( contact_key: int, contact: schemas.Contact, json_store: JsonStoreAsync = Depends , ) (get_json_store) """ Update a contact. Fields in the new contact object will overwrite fields in the old version of the contact. """ await The method takes a key and a dictionary. Any fields present in the dictionary override the corresponding fields currently present in the record. update_by_key View Old Versions of a Contact result = json_store.find_by_key( contact_key, parse_revision(revision_id, revision_timestamp) ) schemas.Contact(**result) @app.get("/contact/{contact_key}", response_model=schemas.Contact) async : def view_contact ( contact_key: int, revision_id: Optional[int] = None, revision_timestamp: Optional[str] = None, json_store: JsonStoreAsync = Depends , ) (get_json_store) """ Return a contact, given its key. Can return the contact as it was in different points in time. By default, the current version is returned. """ await return This function is similar to the endpoint, except that it returns only a single contact, as it was at a given time. /contact/list View All Revisions of a Contact , response_model=Union[list[schemas.HistoricalContact], list[schemas.Revision]], ) embed: results = json_store.history_embed( contact_key, parse_revision(revision_id, revision_timestamp) ) [schemas.HistoricalContact(**result) result results] : json_store.history( contact_key, revision=parse_revision(revision_id, revision_timestamp) ) @app.get( "/contact/{contact_key}/history" async : def view_contact_history ( contact_key: int, revision_id: Optional[int] = None, revision_timestamp: Optional[str] = None, embed: bool = False, json_store: JsonStoreAsync = Depends , ) (get_json_store) """ Return the history of a contact, given its key. If `embed` if `False`, then only the metadata of each revision will be returned. Else, the contact (as it was at that revision) will be returned as well. If the contact does not currently exist, a `revision_id` or `revision_timestamp` of when the contact _did_ exist can be supplied. """ if await return for in else return await We also need to define the schemas for this method: revisionTimestamp: str revisionNumber: int revision: Contact : class Revision (BaseModel) """ This schema is of the form of pysirix.types.SubtreeRevision """ : class HistoricalContact (Revision) """ This schema is of the form of pysirix.types.QueryResult """ Search (Within a Revision) We will now implement search. We will first implement searching within a particular revision (or the current revision), and later will implement another endpoint for all-time search. We will first declare the class that will contain a search term. QueryTerm term: str fuzzy: bool = field: str : class QueryTerm (BaseModel) # the term to match against # whether to match when the field string contains `term`, instead of looking for an exact match False # which field in the record to match against And now our endpoint handler: fastapi HTTPException len(query_terms) == : HTTPException( status.HTTP_400_BAD_REQUEST, , ) revision_id: open_resource = revision_timestamp: open_resource = ( ) : open_resource = query_list = [] query_term query_terms: query_term.fuzzy: query_list.append( ) : query_list.append( ) query_filter = .join(query_list) query = ( ) results = resource.query(query) [ schemas.ContactWithMeta(**result, key=result[ ]) result results[ ] ] from import @app.post("/contact/search", response_model=list[schemas.ContactWithMeta]) async : def search_contacts ( query_terms: list[schemas.QueryTerm], revision_id: Optional[int] = None, revision_timestamp: Optional[str] = None, resource: Resource = Depends , ) (get_json_resource) """ Search for a contact. If an empty list is provided instead of a list of search terms, a 400 error is returned. Provide a `revision_id` or `revision_timestamp` to search a particular revision, instead of the latest. """ if 0 raise "when not using search terms, use the /contact/list endpoint instead" if f"jn:doc('contacts','contacts', )" {revision_id} elif f"jn:open('contacts','contacts', xs:dateTime(' '))" {revision_timestamp} else "." for in if f"(typeswitch($i=> ) " {query_term.field} f"case xs:string return contains(xs:string($i=> ), ' ')" {query_term.field} {query_term.term} " default return false())" else f"$i=> eq ' '" {query_term.field} {query_term.term} " and " f"for $i in bit:array-values( ) where " {open_resource} {query_filter} " return {$i, 'nodeKey': sdb:nodekey($i), 'hash': sdb:hash($i)}" await return "nodeKey" for in "rest" First, we will create the variable, containing the XQuery code to open the resource. If we wish to query the latest resource, we can simply use . to indicate the resource, since we are using the class. If we wish to open a particular revision, however, we must use or . We also need to convert the timestamp into an . open_resource Resource jn:doc jn:open xs:dateTime We then create the , where each item in the list is an expression returning either true or false. If the search is not fuzzy, we will simply do a comparison of the search term and the field value. If we wish to do a fuzzy search, we use the contains method to check if the search term is contained in the field. query_list However, there is a potential bug in using the contains method. Since a field must be converted to before using the contains method, if the field is null, it will be converted to the string , and matched. Also, if for some reason the field of that record is not a string (as SirixDB does not enforce type schemas on the resources), there may even be an error thrown by a failed conversion. xs:string "null" To work around this issue, we use a statement to first check that the field is a string, otherwise, the returns false. typeswitch typeswitch Finally, we declare the query itself: opens the resource and iterates through the array at the base of the resource. then filters out any record ( ) that does not match our query terms, and then finally returns an object - for every record that passed the filter - containing the object, the key, and the hash. for $i in bit:array-values({open_resource}) where {query_filter} $i return {$i, 'nodeKey': sdb:nodekey($i), 'hash': sdb:hash($i)} As we will soon see, if we simply returned , we would get an object of the compatible with the class declared above. However, when returning , only the record itself is returned. $i schemas.HistoricalContact {$i} Search (all-time) We will now implement a similar query to search all contacts stored, including deleted contacts. len(query_terms) == : query_filter = : query_list = [] query_term query_terms: query_term.fuzzy: query_list.append( ) : query_list.append( ) query_filter = .join(query_list) query_filter = query_deleted_only = existing deduplicate = ( ) query = ( ) results = resource.query(query) results[ ] @app.post("/contact/search/all-time", response_model=list[schemas.HistoricalContact]) async : def search_contacts_all_time ( query_terms: list[schemas.QueryTerm], existing: bool = True, resource: Resource = Depends , ) (get_json_resource) """ Search for a contact, even it does not currently exist. All contacts are returned if an empty list is provided. If `existing` is `True`, then currently existing contacts will be returned as well. """ if 0 "" else for in if f"(typeswitch($i=> ) " {query_term.field} f"case xs:string return contains(xs:string($i=> ), ' ')" {query_term.field} {query_term.term} " default return false())" else f"$i=> eq ' '" {query_term.field} {query_term.term} " and " f"where " {query_filter} "where sdb:is-deleted($i)" if not else "" "if (not(exists(jn:future($i)))) then $i " "else if (sdb:hash($i) ne sdb:hash(jn:future($i))) then $i " "else ()" "for $rev in jn:all-times(.) for $i in bit:array-values($rev) " f" return " {query_deleted_only} {query_filter} {deduplicate} await return "rest" In this route we will handle requests without any query terms, by not filtering results, (instead of raising an error, and directing to use a different route, as in the non-temporal search route). In that case, query_filter will be an empty string. If query terms provided, however, we will build a filter expression similar to the one used above in the non-temporal search. are We then create , which will be an empty string if we are to query existing contacts as well, or add an additional where clause if we are to not query existing contacts. query_deleted_only We then define a deduplicate expression, where we check if the hash of the record is the same as the hash of that record in the next revision. If it is the same, then we return the empty sequence to indicate we are returning nothing. If, on the other hand, the hashes are not equal (indicating that a change has occurred), or it does not exist at all in the next revision (meaning it was deleted), then we return the record. () We then define the query itself, iterating through all revisions of the resource ( ), applying our search filters and deduplicate filters. for $rev in jn:all-times(.) Join us! Do you plan on using SirixDB? Do you have a feature request? Find us on , , or . slack discourse github discussions Also published at https://dev.to/sirixdb/building-a-time-traveling-contacts-app-with-sirixdb-2amo