Introduction Server-sent events (SSE) is a way to send data to the browser without reloading the page. This allows you to use streaming data and build real-time applications that can be used in a variety of scenarios. FastAPI is a Python framework that makes it easy to build APIs. In this tutorial, we will use FastAPI to create a simple SSE server that will send a message every second. Prerequisites In order to follow this tutorial, you will need to have a Python and pip installed on your machine: https://www.python.org/downloads/ Installing FastAPI To install FastAPI and all of its dependencies, you can use the following command: pip install "fastapi[all]" This will also include the server, which is used to run the server. uvicorn Installing sse-starlette Once you've installed FastAPI, you can install the extension to add support for SSE to your FastAPI project: sse-starlette pip install sse-starlette Let's also add to our project: asyncio pip install asyncio Creating a simple hello world endpoint Once you've installed FastAPI, you can create a simple hello world endpoint to get started. Create a new file called and add the following code: main.py import asyncio import uvicorn from fastapi import FastAPI, Request app = FastAPI() @app.get("/") async def root(): return {"message": "Hello World"} Running the server uvicorn To run the server, you can use the following command: uvicorn main:app --reload This will run the server on port . The flag will automatically reload the server when you make changes to the code so you don't have to restart the server every time you make a change. 8000 --reload Visit the server in your browser and you should see the following output: { "message": "Hello World" } FastAPI will automatically generate a endpoint that will show you the API documentation. If you were to visit , you would see the following: /docs /docs Adding SSE support to your FastAPI project Next, let's extend the file to add SSE support. To do so you can add SSE support to your project by adding the following line to your file: main.py main.py from sse_starlette.sse import EventSourceResponse Then you can use the class to create a response that will send SSE events. Let's create a new endpoint that will send an event every second: EventSourceResponse STREAM_DELAY = 1 # second RETRY_TIMEOUT = 15000 # milisecond @app.get('/stream') async def message_stream(request: Request): def new_messages(): # Add logic here to check for new messages yield 'Hello World' async def event_generator(): while True: # If client closes connection, stop sending events if await request.is_disconnected(): break # Checks for new messages and return them to client if any if new_messages(): yield { "event": "new_message", "id": "message_id", "retry": RETRY_TIMEOUT, "data": "message_content" } await asyncio.sleep(STREAM_DELAY) return EventSourceResponse(event_generator()) Now if you visit the endpoint in your browser, you would see an event sent every second without you having to reload the page. /stream FastAPI with streaming data and Materialize To learn more about streaming data, you can check out this tutorial here on how to use FastAPI with Materialize: How to use FastAPI with Materialize for real-time data processing The tutorial also includes a demo project that you could run to get a feel on how it all works. Here is a quick diagram of the project: What is Materialize? Materialize is a streaming database that takes data coming from different sources like Kafka, PostgreSQL, S3 buckets, and more and allows users to write views that aggregate/materialize that data and let you query those views using pure SQL with very low latency. Streaming data with Materialize For the demo project, we are using the statement. streams updates from a source, table, or view as they occur which allows you to query the data as it is being updated and is a perfect fit for the SSE example. [TAIL](https://materialize.com/docs/sql/tail/#conceptual-framework) TAIL Here is the code for the endpoint that uses to stream data: /stream TAIL @app.get('/stream') async def message_stream(request: Request): def new_messages(): # Check if data in table results = engine.execute('SELECT count(*) FROM sensors_view_1s') if results.fetchone()[0] == 0: return None else: return True async def event_generator(): while True: # If client was closed the connection if await request.is_disconnected(): break # Checks for new messages and return them to client if any if new_messages(): connection = engine.raw_connection() with connection.cursor() as cur: cur.execute("DECLARE c CURSOR FOR TAIL sensors_view_1s") cur.execute("FETCH ALL c") for row in cur: yield row await asyncio.sleep(MESSAGE_STREAM_DELAY) return EventSourceResponse(event_generator()) As you can see, we have just extended the function to check if there are any new messages in the view. If there are no new messages, we will return and the will not send any events. If there are new messages, we will return and the will send the new messages. new_message sensors_view_1s None EventSourceResponse True EventSourceResponse Then in the async function, we are using with the statement to get all the messages in the view. We are using the statement to create a cursor that will stream the data as it is being updated. event_generator TAIL FETCH ALL sensors_view_1s DECLARE CURSOR Conclusion To learn more about FastAPI, check out the . FastAPI documentation For more information on how to use FastAPI with Materialize, check out this . tutorial To learn more about Materialize, check out the . Materialize documentation Also Published Here