Server-Sent Events (SSE)
is a server-to-client communication solution where real-time updates are pushed from the server to the client unidirectionally. The basic principle involves the client opening a persistent connection to the server via an HTTP
request, allowing the server to continuously send event data through this connection. SSE
is suitable for applications that require ongoing data updates, such as real-time notifications, message push, and dynamic content updates. Compared to the data communication solution provided by WebSocket
, SSE
is lighter, easier to implement, and more suitable for simple one-way data flow scenarios.
Essentially, SSE
is a unidirectional data flow solution implemented using a persistent HTTP
connection and ReadableStream
. The client can maintain a one-way connection with the server and continuously receive real-time events pushed by the server without the need to constantly send requests to the server for data updates. Browsers have implemented the basic EventSource
object, which conveniently handles server responses. The server, in turn, can achieve streaming responses by continuously writing data to the Response
object. In practical business scenarios, both the server and client environments may not be ideal:
HTTP
long connection directly to the target address without the need to receive and then forward the response to the client.EventSource
object can only initiate GET
requests and cannot define request headers or carry request bodies. In situations requiring authentication, all content may need to be encoded in the URL
, with most browsers limiting the URL length to around 2000 characters. Therefore, utilizing fetch
to implement SSE
data requests can resolve such issues.Let's start by implementing the basic SSE
using the EventSource
object. Since the EventSource
object is a browser's API and belongs to the client-side implementation, we first need to use Node.js
to implement server-side streaming responses. The demos mentioned in the text are available at https://github.com/WindRunnerMax/webpack-simple-environment
.
Implementing basic streaming data responses on the server side is straightforward. Initially, we set the response header to text/event-stream;
. It is crucial to set the response header before the response body to avoid errors like ERR_INVALID_CHUNKED_ENCODING
that may occur when executing res.write
after res.writeHead
.
SSE
is essentially a protocol and, therefore, follows a specific format. In the text/event-stream
response format, each data group is separated by \n\n
, and within each group, different types of data are separated by \n
. For example, if we need to transmit data with id
, event
, and data
fields simultaneously:
Within Server-Sent Events
, features like automatic reconnection and event id
management are provided by default in the browser's built-in EventSource
. However, if using fetch
for implementation, these aspects need to be managed manually. In our basic example, these functionalities work effectively. Moreover, custom event names can be used to transmit messages. Notes or comments can be added using the :xxx\n
format. Hence, when establishing a connection, relevant information can be declared:
On the client side, one needs to create a connection using the EventSource
object and receive server data using custom events. If a specific event name is not specified, defaulting to the message
event will occur.
When dealing with the default message
event, we also output it on the server-side. As mentioned before, as long as we don't call res.end
, the entire connection will remain in a suspended state. So, if we want to keep the connection alive, we just need to continuously send data to the client through a timer here.
On the client-side, we can bind the onmessage
event handler to the source
object or directly use addEventListener("message")
. Furthermore, after successfully establishing a connection using the EventSource
object, we can see the EventStream data transmission panel in the browser's Network panel. The id
, type
, data
, and time
we defined will all be displayed there.
On the server-side, we also need to handle closing the server request when the client's connection is closed to avoid unnecessary resource consumption. In our case, if we don't close the timer, it will lead to a memory leak rather than just additional resource consumption.
Moreover, when establishing a connection without using HTTP/2
, SSE has a limit on the maximum number of connections per single domain. This can be troublesome when opening multiple tabs since the browser sets a very low limit of 6
connections. This limit is per domain, meaning that we can open 6
SSE connections to www.example1.com
across all tabs and at the same time open another 6
SSE connections to www.example2.com
. With HTTP/2
, the maximum number of HTTP connections between the server and client is typically negotiated and set to a default of 100
at any given time.
Before we handle data forwarding and proxy on the server-side, it's natural to define the data source of the entire event. Here, we don't actually need to integrate streaming responses from services like OpenAI
or Coze
; simply simulating it will suffice. Hence, we begin by defining the /stream
endpoint to simulate streaming output. It's important to note that our output is typically in Markdown format, so the presence of \n
symbols is common. In the SSE protocol, \n
is treated as a significant keyword. Therefore, we need to encode/decode it to avoid conflicts with \n
. Whether using JSON.stringify
or encodeURIComponent
, both methods are viable. For simplicity, we directly replace \n
with \\n
.
In this context, we won't delve too deeply into setting response headers and processing. During the actual model inference process, two types of outputs may occur. One is to output all the content of the current conversation, similar to slicing a string always starting from 0
. The other type is to output only the latest content delta
, similar to when slicing, where the end
of the last output is recorded as the start
of the next output. Here, we'll keep it simple and choose the first method to continuously push content starting from 0
to the client.
Since we are simulating streaming output, we directly set a timer, randomly generate the step length of the current output, record it as a new start
, then immediately output the content to the client using the default message
event. When reaching the end of the output, we close the timer and the connection. Of course, we must also consider cleaning up the current timer when the client closes the connection to avoid wasting server computing resources.
Once the data source interface is defined, we can start implementing data forwarding to carry out server-side preprocessing of responses. This is where we can filter, inspect, or perform other operations on the data. Therefore, we need to receive streaming responses on the server, preprocess the data, and then stream it back to the client. To initiate requests to the data source interface, we use node-fetch
directly.
When using node-fetch
, it's important to note that we are running the service directly with ts-node
, which means mixing CJS
with ESM
may cause exceptions. Hence, we should opt for the 2.x
version. Additionally, we need to define an AbortController
to promptly terminate the request when the client closes the connection. In node-fetch
, res.body
can still read ReadableStream
for handling the forwarded SSE
response.
On the server side, without the EventSource
object to receive data, we need to parse the data based on the SSE
protocol ourselves. Since we are using ReadableStream
for data retrieval, we need to handle binary data in a streaming manner instead of directly parsing the delimiters. Therefore, we implement a StreamParser
that merges incoming Uint8Array
binary data into a new buffer, and processes the data by scheduling the onLine
method when encountering a \n
.
When handling onLine
, we need to parse the data line by line according to the SSE
protocol. The format of the data we will be handling will be x: xxx;
. Under our processing, \n
serves as the end node and will not be included in the parameters. In this case, if the length of the data passed is 0
, we will need to trigger the onMessage
event, passing both the event name and data to the predefined event handling function. After that, we can use TextDecoder
to parse the data into a string, and then split and parse the data based on :
.
It is important to note that the ReadableStream
in Node
and the ReadableStream
implemented in browsers have different function signatures. Therefore, here we can easily iterate over the data using await
, or alternatively receive data and end responses using on("data")
and on("end")
. We also need to bind an onMessage
event to receive the parsed data and then respond the data to the target client.
When there is no need for data preprocessing, we can directly proxy the request as an HTTP
long connection to the target request address without actually receiving the response and then forwarding it to the client. Here, we can utilize the http
module to achieve the forwarding. Firstly, we need the node:url
module to parse the target address. Then, we can initiate the request using http.request
. Once the connection is established, we can directly pipe
the data to the target's Response
object, or alternatively, use proxyRes.on("data") + res.write
.
Naturally, we also need to handle some special cases. Firstly, for POST
requests, we need to handle the body
data by forwarding all the request data to the new request. Similarly, we can use req.on("data") + proxyReq.write
to achieve this. For error handling, we need to pass the response error information to the client. Responding with the correct error code is crucial, and we should also close the target request. When the client's request closes, we must also close the target request and end the response.
In fact, there is a dilemma here. If we use req.on("close")
to listen for the client's connection closure, it may cause issues in POST
requests. By executing the following node
program and then using curl
to make a request, followed by actively disconnecting the link, we can see that req.on("close")
triggers prematurely, rather than after we disconnect the request ourselves.
In reality, there are three events present in our request: req.on("close")
, res.on("close")
, and req.socket.on("close")
. The events on req
are affected by the data containing the body
. Therefore, in this case, we can use events on res
and socket
to monitor the client's connection closure. For ease of event triggering, we directly use events on socket
to detect the client's connection closure, and before node16
, the property name for socket
was connection
.
In the client-side, we need to implement SSE
based on fetch
. Through fetch
, we can pass request headers and bodies and send various types of requests like POST
, avoiding the limitation of only being able to send GET
requests and needing to encode all content into the URL
. If the connection is interrupted, we can also control the retry strategy. Unlike the browser, which silently retries a few times and then stops for EventSource objects
, this is not ideal for any robust application. If you need to perform some custom validation and handling before parsing the event source, you can also access the response object, which is very effective for designs like API gateways in front of application server programs.
The implementation based on fetch
is actually quite simple. First, we need to create an AbortController
object to promptly terminate requests when the client closes the connection. Then, we can use fetch
to make requests. When the request is successful, we can read the ReadableStream
through res.body
.
For stream processing, it follows the same method as the StreamParser
implemented on the server-side. As previously mentioned, due to the different function signatures of ReadableStream
, here we use chained Promise
calls for processing, while maintaining consistency with handling Uint8Array
data. An interesting point here is that using EventSource
objects allows monitoring of data transfer in the browser's Network
panel, whereas data exchange using fetch
cannot be tracked.
After implementing our data transmission solution, we can now achieve streaming interaction in the client-side. Once we parse the line data with the help of the StreamParser
method, we need to decode it. This operation is the reverse of the encoding approach mentioned above; here, we only need to replace \\n
with \n
. Additionally, we configure two output speeds: if there is a significant amount of pending text content to output, we output a character every 10ms
, otherwise, we output text every 50ms
.
Once we have parsed the data, we need to apply it to the DOM
structure. It is important to note that refreshing the entire DOM
content will prevent us from selecting the previously output content for copying. In other words, we cannot select content while it is being output. Therefore, we need to update the content incrementally here. The simplest solution is to update line by line. We can keep track of the last rendered line index and update content within that range to the current index.
There is also a scrolling interaction matter to address. When users scroll the content freely, we should not forcibly scroll back to the bottom. Therefore, we need to keep track of whether the user has scrolled. If the difference between el.scrollHeight - el.scrollTop
and el.clientHeight
is less than 1
, we consider that automatic scrolling should occur. Additionally, it is important to note that scrollTo
should not use a smooth
scrolling effect as it could affect the accuracy of our onScroll
calculation.
In the context of streaming output here, we can also implement a cursor blinking effect. This effect is relatively simple to achieve by using CSS animations and pseudo-classes. It is worth noting that not using pseudo-classes for implementation could lead to additional complexities in handling the previous DOM
nodes that needed treatment. Furthermore, since processing Markdown will involve nested nodes, specific handling with :not
will be necessary.