基于fetch的SSE方案

Server-Sent Events(SSE)是一种由服务器单向推送实时更新到客户端的方案,基本原理是客户端通过HTTP请求打开与服务端的持久连接,服务端可以通过该连接连续发送事件数据。SSE适用于需要持续更新数据的应用,如实时通知、消息推送和动态内容更新,相比于WebSocket的数据通信方案更加轻量,SSE更易于实现且更适合简单的单向数据流场景。

描述

SSE本质上是使用HTTP长链接以及ReadableStream实现的一种单向数据流方案,客户端可以保持与服务器的单向连接,并且能够持续接收服务端推送的实时事件,而不需要客户端不断地向服务器发送请求来获取数据更新。而在浏览器中实现了基本的EventSource对象,可以很方便地处理服务端的响应,服务端自然也可以通过不停地对Response对象写数据来实现流式响应。而在我们实际的业务需求中,无论是服务端和客户端都不是那么理想的场景:

  • 服务端预处理响应,在实现类似流式对话的需求过程中,通常我们都是将LLM推理的数据通过服务端转发到客户端,而在服务端处理过程中我们就需要对数据进行过滤、审查等操作,因此我们就需要在服务端接受流式响应,进行数据预处理之后再流式响应到客户端。
  • 服务端数据直接转发,在不需要进行数据预处理的情况下,如果在服务端接收数据流式响应再将其转发到客户端则显得比较麻烦,因此我们可以直接将请求作为HTTP长连接代理到目标的请求地址,而不需要实际实现接收响应后再转发到客户端。
  • 基于fetch请求数据,EventSource对象只能发起GET请求,且无法定义请求头以及携带请求体,这在需要鉴权的情况下就需要将所有的内容编码到URL上,多数浏览器对URL长度上都限制在2000字符,因此基于fetch实现SSE数据请求则可以解决上述问题。

在这里我们首先来通过EventSource对象来实现基本的SSE,由于EventSource对象是浏览器实现的API,是属于客户端的实现,因此我们在这里还需要先使用Node.js实现服务端的数据流式响应,文中涉及的DEMO都在https://github.com/WindRunnerMax/webpack-simple-environment中。

在服务端中实现基本的流式数据响应比较方便,我们首先需要将响应头设置为text/event-stream;,注意响应头是需要在响应体之前设置的,否则在执行res.writeHead之前后执行res.write的话会导致响应ERR_INVALID_CHUNKED_ENCODING

// packages/fetch-sse/server/modules/ping.ts
const ping = (req: http.IncomingMessage, res: http.ServerResponse<http.IncomingMessage>) => {
  res.writeHead(200, {
    "Content-Type": "text/event-stream; charset=utf-8",
    "Cache-Control": "no-cache",
    "Connection": "keep-alive",
  });
}

SSE实际上是一种协议,那么既然是协议自然就需要有固定的格式,在text/event-stream的响应格式中,每组数据都是以\n\n分隔的,而在组中的数据如果需要传递多种类型,则需要以\n分隔,例如我们需要同时传递ideventdata字段的数据:

id: 1
event: message
data: hello world

id: 2
event: custom
data: hello
data: world

Server-Sent Events事件中,自带了自动重连与事件id管理方法,当然这些处理都是在浏览器预设的EventSource来实现的,如果我们使用fetch来实现则需要自行管理。但是在我们当前的基本示例中是可以生效的,此外我们还可以通过自定义事件名来传递消息,如果仅传递:xxx\n的格式也可以作为注释使用,因此我们在创建连接时可以声明相关信息:

// packages/fetch-sse/server/modules/ping.ts
res.write("retry: 10000\n");
res.write("id: -1\n");
res.write("event: connect\n");
res.write("data: " + new Date() + "\n\n");

那么在客户端则需要通过EventSource对象创建连接,然后通过自定义事件来接收上述服务端的数据,而实际上如果不指定具体的事件名,即上述的connect事件,则会默认缺省为message事件,也就是说这里的事件名并不是必须的。

// packages/fetch-sse/client/components/ping.tsx
const onConnect = useMemoFn((e: MessageEvent<string>) => {
  prepend("Start Time: " + e.data);
});
const source = new EventSource("/ping");
source.addEventListener("connect", onConnect);

针对于默认的message事件,我们同样在服务端将其输出,我们先前也提到了只要我们不调用res.end则会导致整个连接处于挂起状态,那么在这里如果我们希望保持连接的话则只需要通过定时器不断地向客户端发送数据即可。

// packages/fetch-sse/server/modules/ping.ts
let index = 0;
const interval = setInterval(() => {
  res.write("id: " + index++ + "\n");
  res.write("data: " + new Date() + "\n\n");
}, 1000);

而在客户端我们可以为source对象添加onmessage事件绑定,也可以直接addEventListener(message)来绑定事件。此外,当我们成功通过EventSource对象创建连接后,我们可以在浏览器控制台的Network面板看到EventStream的数据传输面板,我们定义的idtypedatatime都会在此处显示。

// packages/fetch-sse/client/components/ping.tsx
const prepend = (text: string) => {
  const el = ref.current;
  if (!el) return;
  const child = document.createElement("div");
  child.textContent = text;
  el.prepend(child);
};

const onMessage = (e: MessageEvent<string>) => {
  prepend("Ping: " + e.data);
};

const source = new EventSource("/ping");
source.onmessage = onMessage;

在服务端我们还需要注意的是,当用户的客户端连接关闭我们同样也需要关闭服务端的请求,以此来避免额外资源的占用,当然在我们这里的定时器中如果不关闭的话就是内存泄漏而不仅仅是额外的资源占用了。

req.socket.on("close", () => {
  console.log("[ping] connection close");
  clearInterval(interval);
  res.end();
});

此外,当不通过HTTP/2建立连接时,SSE对于单个域名会受到最大连接数的限制,这在打开多个选项卡时会比较麻烦,该限制是浏览器针对数据请求设计的,并且被设置为一个非常低的6个连接数量。此限制是针对每个域的请求,因此这意味着我们可以跨所有选项卡打开6SSE连接到www.example1.com,以及同时打开6SSE连接到www.example2.com,而使用HTTP/2时,同一时间内HTTP最大连接数由服务器和客户端之间协商,默认为100

服务端

在服务端处理数据转发与代理之前,我们自然需要定义整个事件的数据源。在这里我们并没有必要实际对接例如OpenAICoze的流式响应,只需要模拟一下即可,那么在这里我们就先定义/stream接口来模拟流式输出。在这里需要注意的一点是,通常我们的输出都是Markdown的格式,那么这里自然会有\n的符号,而在SSE协议中\n是需要作为关键字使用的,因此我们就需要对其编码/解码,以此来避免\n关键字,那么无论是使用JSON.stringify抑或是encodeURIComponent都是可以的,在这里我们简单一些,直接将\n替换为\\n

// packages/fetch-sse/server/modules/stream.ts
const content = `# 出师表

- 诸葛亮 

先帝创业未半而中道崩殂,今天下三分,益州疲弊,此诚危急存亡之秋也。然侍卫之臣不懈于内,忠志之士忘身于外者,盖追先帝之殊遇,欲报之于陛下也。诚宜开张圣听,以光先帝遗德,恢弘志士之气,不宜妄自菲薄,引喻失义,以塞忠谏之路也。

...

今当远离,临表涕零,不知所言。`.replace(/\n/g, " \\n");

在此处设置响应头等处理就不再过多叙述了,在实际模型推理的过程中可能会有两种输出,一种是将本次对话的所有内容都输出,类似于对字符串的slice永远是从0开始,另一种则是只输出最新内容delta,类似于slice时记录了上次输出的end作为下次输出的start。在这里我们简单一些,选用第一种方式将内容从0开始输出不断推送到客户端。

由于是模拟流式输出,因此我们直接设置一个定时器,并且随机生成本次输出的步进长度,然后将其作为新的start记录下来,紧接着将内容输出到客户端,这里我们直接使用默认的message事件即可,当输出到最后时则关闭定时器并且关闭连接。当然,我们也不能忽略当连接的客户端关闭时,需要主动清理当前的定时器避免服务端计算资源浪费。

// packages/fetch-sse/server/modules/stream.ts
res.write("event: connect\n");
res.write("data: " + Date.now() + "\n\n");

let start = 0;
const interval = setInterval(() => {
  const slice = Math.floor(Math.random() * 30) + 1;
  start = start + slice;
  res.write("event: message\n");
  res.write("data: " + content.slice(0, start) + "\n\n");
  if (start >= content.length) {
    clearInterval(interval);
    res.end();
  }
}, 500);

req.socket.on("close", () => {
  console.log("[stream] connection close");
  clearInterval(interval);
  res.end();
});

数据转发

当定义好数据源接口之后,我们就可以开始实现数据转发的功能,用以实现服务端预处理响应,也就是在这里我们可以对数据进行过滤、审查等操作。因此我们就需要在服务端接受流式响应,进行数据预处理之后再流式响应到客户端。那么在这个转发接口中首先我们就需要对数据源接口发起请求,在这里我们直接使用node-fetch来发起请求。

// packages/fetch-sse/server/modules/transfer.ts
import fetch from "node-fetch";
const response = await fetch("http://127.0.0.1:8800/stream")

使用node-fetch的过程中需要注意我们是直接使用ts-node启动的服务,因此还是如果CJS混入ESM的话会导致抛出异常,因此这里我们需要选择2.x版本。此外,我们还需要定义好AbortController,以便在客户端关闭连接时及时终止请求,在node-fetchres.body依然可以读取ReadableStream,以此来处理转发的SSE响应。

// packages/fetch-sse/server/modules/transfer.ts
const ctrl = new AbortController();
const response = await fetch("http://127.0.0.1:8800/stream", {
  signal: ctrl.signal as AbortSignal,
});
const readable = response.body;
if (!readable) return null;

req.socket.on("close", () => {
  console.log("[transfer] connection close");
  ctrl.abort();
  res.end();
});

在服务端我们是没有EventSource对象来接收数据的,那么我们自然只能根据SSE协议来自行解析数据,而既然我们是通过ReadableStream来实现的数据读取,那么我们就需要流式地处理二进制数据,而不能直接解析分隔。因此在这里我们实现StreamParser,当接收到Uint8Array二进制数据后,我们首先将其合并为新的buffer,然后遍历当前数据,当遇到\n时则调度到onLine方法来处理数据。

// packages/fetch-sse/server/utils/steam-parser.ts
export class StreamParser {
  private compose(data: Uint8Array) {
    const buffer = new Uint8Array(this.buffer.length + data.length);
    buffer.set(this.buffer);
    buffer.set(data, this.buffer.length);
    this.buffer = buffer;
    return buffer;
  }

  public onBinary(bytes: Uint8Array) {
    const buffer = this.compose(bytes);
    const len = buffer.length;
    let start = 0;

    for (let i = 0; i < len; i++) {
      if (buffer[i] === 10) {
        this.onLine(buffer.slice(start, i));
        start = i + 1;
      }
    }
    this.buffer = buffer.slice(start);
  }
}

当处理到onLine时,我们就需要根据SSE协议来按行解析数据了,我们将要处理的数据格式将是x: xxx;,在我们的处理下\n是作为末尾节点不会被传参,那么此时如果我们的数据传递长度为0,那么就需要发起onMessage事件,将事件名与数据全部传递带预设的事件处理函数中。在其后我们就可以使用TextDecoder来解析为字符串,然后就可以根据:来分隔与解析数据了。

// packages/fetch-sse/server/utils/steam-parser.ts
export class StreamParser {
  private onLine(bytes: Uint8Array) {
    if (bytes.length === 0) {
      if (this.onMessage && this.message.event) {
        this.message.data = this.message.data || "";
        this.onMessage(this.message as Message);
      }
      this.message = {};
      return;
    }
    const decoder = new TextDecoder();
    const line = decoder.decode(bytes);
    const [field, ...rest] = line.split(":");
    const value = rest.join(":").trim();
    switch (field) {
      case "id":
        this.message.id = value;
        break;
      case "event":
        this.message.event = value;
        break;
      case "data":
        this.message.event = this.message.event || "message";
        this.message.data = value;
        break;
      default:
        break;
    }
  }
}

这里需要注意的是,在Node中的ReadableStream与浏览器实现的ReadableStream函数签名是不一样的,因此这里我们直接方便地使用await迭代数据即可,当然使用on("data") on("end")来接收数据与结束响应即可。我们还需要绑定onMessage事件来接收解析好的数据,并且将数据响应到目标客户端即可。

// packages/fetch-sse/server/utils/steam-parser.ts
const parser = new StreamParser();
parser.onMessage = message => {
  res.write(`event: ${message.event}\n`);
  res.write(`data: ${message.data}\n\n`);
};

for await (const chunk of readable) {
  const buffer = chunk as Buffer;
  const uint = new Uint8Array(buffer);
  parser.onBinary(uint);
}

res.end();

请求代理

当不需要进行数据预处理的情况下,我们可以直接将请求作为HTTP长连接代理到目标的请求地址,而不需要实际实现接收响应后再转发到客户端。在这里我们可以直接借助http模块来实现转发,首先需要node:url模块来解析目标地址,然后就可以通过http.request来发起请求,当建立连接之后就可以直接将数据pipe到目标的Response对象中,当然使用proxyRes.on("data") + res.write也是可以的。

// packages/fetch-sse/server/modules/proxy.ts
const targetUrl = new URL("http://127.0.0.1:8800/stream");
const options: http.RequestOptions = {
  hostname: targetUrl.hostname,
  port: targetUrl.port,
  path: targetUrl.pathname,
  method: req.method,
  headers: req.headers,
};
const proxyReq = http.request(options, proxyRes => {
  res.writeHead(proxyRes.statusCode || 404, proxyRes.headers);
  proxyRes.pipe(res);
});

这里我们自然还需要处理一些特殊情况,首先是对于POST请求的body数据处理,我们需要将请求的所有数据同样转发到新的请求上,这里同样也可以使用req.on("data") + proxyReq.write来实现。而对异常处理我们也需要将响应错误信息传递到客户端,这里的错误码响应还是比较重要的,并且将对目标的请求关闭。当客户端的请求关闭之后,同样需要关闭目标的请求,以及结束响应。

req.pipe(proxyReq);

proxyReq.on("error", error => {
  console.log("proxy error", error);
  res.writeHead(502, { "Content-Type": "text/plain" });
  res.end("Bad Gateway");
});

req.socket.on("close", () => {
  console.log("[proxy] connection close");
  res.end();
  proxyReq.destroy();
});

其实在这里还有个问题,如果使用req.on("close")来监听客户端的连接关闭,那么在POST请求中会出现问题。我们可以直接执行下面的node程序,然后就可以使用curl来发起请求,之后主动断开链接,然后就可以发现req.on("close")会触发地过早,而不是在我们主动断开请求之后才会执行。

echo "
const http = require('http');
const server = http.createServer((req, res) => {
  req.on('close', () => {
    console.log('close');
  });
  req.on('data', (chunk) => {
    console.log('data:', new TextDecoder().decode(chunk));
  });
  setTimeout(() => res.end('end'), 10000);
});
server.listen(8001);
" | node;
curl -X POST http://127.0.0.1:8001 \
-H "Content-Type: application/json"  \
-d '{"key1":"value1", "key2":"value2"}'

实际上在这里我们的请求中存在req.on("close")res.on("close")req.socket.on("close")这三个事件,在req的事件会被上述携带body的数据所影响,因此此处可以使用ressocket上的事件来监听客户端的连接关闭,为了方便我们的事件触发,在此处我们直接使用socket上的事件来监听客户端的连接关闭,此外socket属性在node16前的属性名为connection

echo "
const http = require('http');
const server = http.createServer((req, res) => {
  res.on('close', () => {
    console.log('res close');
  });
  req.socket.on('close', () => {
    console.log('socket close');
  });
  req.on('data', (chunk) => {
    console.log('data:', new TextDecoder().decode(chunk));
  });
  setTimeout(() => res.end('end'), 10000);
});
server.listen(8001);
" | node;
curl -X POST http://127.0.0.1:8001 \
-H "Content-Type: application/json"  \
-d '{"key1":"value1", "key2":"value2"}'

客户端

在客户端我们则需要基于fetch实现SSE,通过fetch可以传递请求头与请求体,并且可以发送POST等类型的请求,避免仅能发送GET请求而需要将所有内容编码到URL上的问题。如果连接中断,我们还可以控制重试策略,对于EventSource对象浏览器将默默地为您重试几次然后停止,这对于任何类型的强大应用程序来说都不够好。如果需要在解析事件源之前进行一些自定义验证与处理,也可以访问响应对象,这对于应用服务端程序前的API网关等设计非常有效。

fetch实现

基于fetch的实现实际上还是比较简单的,我们首先需要创建一个AbortController对象,以便在客户端关闭连接时及时终止请求,然后就可以通过fetch来发起请求,当请求成功后我们就可以通过res.body来读取ReadableStream

// packages/fetch-sse/client/components/fetch.tsx
const signal = new AbortController();
fetch("/proxy", { method: "POST", signal: signal.signal })
  .then(res => {
    onOpen(res);
    const body = res.body;
    if (!body) return null;
  })

对于数据的流式处理,与在服务端实现的StreamParser的方法是一致的,先前我们也提到了由于ReadableStream的函数签名不同,在这里我们就使用Promise的链式调用来处理,而对于Uint8Array数据的处理,则与先前保持一致。在这里实际上还有个有趣的事情,使用EventSource对象在浏览器控制台的Network中是可以看到EventStream的数据传输面板,而使用fetch的数据交换则是无法记录的。

// packages/fetch-sse/client/components/fetch.tsx
const reader = body.getReader();
const parser = new StreamParser();
parser.onMessage = onMessage;
const process = (res: ReadableStreamReadResult<Uint8Array>) => {
  if (res.done) return null;
  parser.onBinary(res.value);
  reader
    .read()
    .then(process)
    .catch(() => null);
};
reader.read().then(process);

流式交互

当我们的数据传输方案实现之后,我们就可以在客户端实现流式的交互。当我们借助StreamParser方法来解析出行数据之后,就需要进行解码操作,这个方法与上述的编码方案是相反的,此处只需要将\\n替换为\n即可。然后在这里我们设置两种速度的输出交互,如果未输出的文本内容过多,则10ms来输出一个文字,否则就以50ms的速度输出文字。

// packages/fetch-sse/client/components/stream.tsx
const onMessage = useMemoFn((e: Message) => {
  if (e.event !== "message") return null;
  setPainting(true);
  const data = e.data;
  const text = data.replace(/\\n/g, "\n");
  const start = currentIndex.current;
  const len = text.length;
  const delay = len - start > 50 ? 10 : 50;
  const process = () => {
    currentIndex.current++;
    const end = currentIndex.current;
    append(text.slice(0, end));
    if (end < len) {
      timer.current = setTimeout(process, delay);
    }
    if (!transmittingRef.current && end >= len) {
      setPainting(false);
    }
  };
  setTimeout(process, delay);
});

当我们将数据解析出来后,就需要将其应用到DOM结构上,这里需要注意的一点是,如果我们全量刷新整个DOM内容的话,会导致我们无法选中先前输出的内容来复制,也就是说我们不能一边输出内容一边选中内容。因此在这里我们需要将更新的内容精细化,最简单的方案就是按行更新,我们可以记录上次渲染的行索引,更新范围则是上次索引到当前索引。

// packages/fetch-sse/client/components/stream.tsx
const append = (text: string) => {
  const el = ref.current;
  if (!el) return null;
  const mdIt = MarkdownIt();
  const textHTML = mdIt.render(text);
  const dom = new DOMParser().parseFromString(textHTML, "text/html");
  const current = currentDOMIndex.current;
  const children = Array.from(el.children);
  for (let i = current; i < children.length; i++) {
    children[i] && children[i].remove();
  }
  const next = dom.body.children;
  for (let i = current; i < next.length; i++) {
    next[i] && el.appendChild(next[i].cloneNode(true));
  }
  currentDOMIndex.current = next.length - 1;
};

在这里还有个滚动交互的问题需要处理,当用户自由滚动内容的时候,我们则不能将用户滚动的位置强制拉回到底部,因此我们需要记录用户是否滚动过,当用户滚动过的时候我们就不再自动滚动,如果el.scrollHeight - el.scrollTopel.clientHeight的差值小于1的话,则认为应该自动滚动,此外这里还需要注意scrollTo不能使用smooth的滚动效果,这样会导致我们的onScroll滚动计算不准确。

const append = (text: string) => {
  isAutoScroll.current && el.scrollTo({ top: el.scrollHeight });
};

useEffect(() => {
  const el = ref.current;
  if (!el) return;
  el.onscroll = () => {
    if (el.scrollHeight - el.scrollTop - el.clientHeight <= 1) {
      isAutoScroll.current = true;
    } else {
      isAutoScroll.current = false;
    }
  };
  return () => {
    el.onscroll = null;
  };
}, []);

在这里的流式输出中,我们还可以实现光标闪烁效果,这个效果比较简单,我们可以直接使用CSS的动画与伪类来实现,这里需要注意的是如果不使用伪类来实现的话,则会导致我们先前的DOM节点追加需要处理的问题则需要多一些。此外,由于处理Markdown实际上是会存在节点的嵌套的,因此对于节点的处理则需要:not来具体化处理。

// packages/fetch-sse/client/styles/stream.m.scss
@keyframes blink {
  0% { opacity: 1; }
  50% { opacity: 0; }
  100% { opacity: 1; }
}

.textarea {
  &.painting > *:last-child:not(ol):not(ul),
  &.painting > ol:last-child > li:last-child,
  &.painting > ul:last-child > li:last-child {
    &::after {
      animation: blink 1s infinite;
      background-color: #000;
      content: '';
      display: inline-block;
      height: 1em;
      margin-top: -2px;
      vertical-align: middle;
      width: 1px;
    }
  }
}

每日一题

https://github.com/WindrunnerMax/EveryDay

参考

https://github.com/Azure/fetch-event-source https://developer.mozilla.org/zh-CN/docs/Web/API/EventSource https://www.ruanyifeng.com/blog/2017/05/server-sent_events.html https://nodejs.org/docs/latest-v20.x/api/http.html#messagesocket https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events https://stackoverflow.com/questions/7348736/how-to-check-if-connection-was-aborted-in-node-js-server https://stackoverflow.com/questions/76115409/why-does-node-js-express-call-request-close-on-post-request-with-data-before-r