Server-sent events in Flask without extra dependencies
Server-sent events (SSE) is a mechanism for sending updates from a server to a client. The fundamental difference with WebSockets is that the communication only goes in one direction. In other words, the client cannot send information to the server. For many usecases this is all you might need. Indeed, if you just want to receive notifications/updates/messages, then using a WebSocket is overkill. Once you’ve implemented the SSE functionality on your server, then all you need on a JavaScript client is an EventSource
. Trust me, it’s very straightforward.
I am first and foremost a data scientist, and therefore don’t consider myself to be an expert in web technologies. I came across the SSE protocol because I wanted to implement a notification system for a machine learning deployment tool I’m working on. The tool uses Flask, and so I stumbled on the flask-sse
package. It looks great, but it requires using Redis. I like Redis, but I don’t like the idea of having to add a new dependency to my application for implementing a single feature. If I was the only person that was going to use the application, then I would be fine with it. However, the application I’m building is destined to be distributed as a package, and therefore I don’t want to coerce users into installing Redis.
The flask-sse
package requires having Redis installed because it needs a storage backend with implements the publish-subscribe pattern – which is commonly abbreviated to “pubsub”. The idea of this pattern is that messages are not directly sent to listeners. Instead, a message is sent to a middleware who’s responsibility is to relay the message to the listeners. The advantage is that the message emitter doesn’t have to worry about the details. In particular, it doesn’t have to check that the message gets dispatched correctly. These concerns are rather delegated to the middleware. There are many great implementations that can take charge of this for you, including Redis. However, if you’re not too concerned about performance, then you can easily do this yourself in Flask without any extra dependencies. Let me demonstrate.
To start off, I’m going to create a file named app.py
which will contain all the Flask server logic.
import flask
app = flask.Flask(__name__)
@app.route('/')
def hello_world():
return 'Hello, World!'
In order to implement the pubsub pattern, I’m going to define a MessageAnnouncer
class.
import queue
class MessageAnnouncer:
def __init__(self):
self.listeners = []
def listen(self):
q = queue.Queue(maxsize=5)
self.listeners.append(q)
return q
def announce(self, msg):
for i in reversed(range(len(self.listeners))):
try:
self.listeners[i].put_nowait(msg)
except queue.Full:
del self.listeners[i]
As you can see, MessageAnnouncer
has two methods. The first one, which is listen
, will be called by clients when they want to receive a notification every time something new happens. When a client starts listening, we simply append a queue.Queue
to the list of listeners. The queue
module is part of Python’s standard library; it has the desirable property of being thread-safe by implementing locking mechanisms under the hood.
The second method of MessageAnnouncer
is announce
. It’s responsability is to take an input message and dispatch it to every listener. Additionally, it removes listeners that don’t “seem” to be listening anymore. By this I mean that if a message queue is full, then it’s probably because the queue isn’t being read from anymore. In the listen
method, the maximum size of each queue is set to 5, which should give ample time to read a message before the next one arrives. If we set the maximum size to 1, then a queue might potentially be full because the associated listener doesn’t have enough time to read each message before the next one arrives. Therefore, increasing the size of the queue gives some leeway so that rapid bursts of notifications don’t clog the message queue. Note that we loop in reverse order because deleting a listener will affect the index values of all subsequent listeners.
Now for the easy part, which is to use the MessageAnnouncer
. The first step is to instantiate it.
announcer = MessageAnnouncer()
We’ll only be using one MessageAnnouncer
for the purpose of this example, but in practice you can use as many as you like. For instance, you could create one instance for each user you might have. But let’s not disgress. Now, in order to implement the SSE protocol, we need to send events that follow a certain format. This format is slightly obscure, but is quite well described here. Here is an example of what each message should look like:
event: Jackson 5\\ndata: {"abc": 123}\\n\\n
The carriage returns are important because they delimitate the beginnings and ends of consecutive messages. Here is little helper function to format a message to follow this convention:
def format_sse(data: str, event=None) -> str:
msg = f'data: {data}\n\n'
if event is not None:
msg = f'event: {event}\n{msg}'
return msg
The event
parameter is optional, it allows defining topics to which clients can subscribe to. This avoids having to define one message queue for each topic. We can now send messages to our message announcer, which will in turn take of dispatching them. Let’s create a /ping
route which does just that.
@app.route('/ping')
def ping():
msg = format_sse(data='pong')
announcer.announce(msg=msg)
return {}, 200
Because we’re using the correct message format, these messages should property get received by any decently written client function. However, we first have to define a /listen
route that allows listeners to subscribe in order to receive messages. Here goes:
@app.route('/listen', methods=['GET'])
def listen():
def stream():
messages = announcer.listen() # returns a queue.Queue
while True:
msg = messages.get() # blocks until a new message arrives
yield msg
return flask.Response(stream(), mimetype='text/event-stream')
The previous is a bit esoteric because it’s not common to return a function in a Flask response. However, this pattern is quite well documented and works as intended. Effectively, sending a GET
request to the /listen
route results in a response that takes an infinite amount of time. The messages.get()
call blocks until a new message is put into the queue. Once a message arrives, it is sent through the HTTP connection in progress. The important thing to understand is that this response will never terminate, and thus will hang forever. Consequently, if you’re running Flask with a single process, then it will block forever. Therefore, you need to make sure you’re using Flask in threaded mode – which is done by default in recent versions. Moreover, if you’re going to use a WSGI server other than Flask’s default one, such as Gunicorn, then you need to make sure you’re asynchronous workers. For example, in Gunicorn, this can be one setting the worker_class
parameter to something else than 'sync'
.
We may now run the server:
export FLASK_APP=app.py
export FLASK_ENV=development
flask run
In a separate terminal session, we can run a listen.py
script which will subscribe to the /listen
route. We can do this with the sseclient
library, which is a thin wrapper on top of requests
:
import sseclient
messages = sseclient.SSEClient('http://localhost:5000/listen')
for msg in messages:
print(msg)
Finally, we can use a third terminal session to run another script which will call the /ping
route once every second:
import time
import requests
while True:
requests.get('http://localhost:5000/ping')
time.sleep(1)
You should now see a steady stream of pong
messages in the terminal where the listen.py
script is being ran. That’s it, you’re done! You can find a copy of these instructions along with the code in the accompaying GitHub repository.
There’s probably some room improvement. For instance, it would be nice to perform the event emission with Flask signals, but I haven’t been able to make it work yet. Additionally, I’m not 100% how to make this work seamlessly behind a reverse proxy such as Nginx. Indeed, it seems that there are some specific settings that have to be configured because of the long polling nature of the listening routes. Nonetheless, this implementation has been working quite well for me. Plus, I like the fact that it’s a standalone solution. However, there might be some subtlety that I have missed and that would justify using something like Redis, in which case I would love some feedback.