SSE 简单实践 - SpringMVC 异步消息处理
SSE 简单实践 - SpringMVC 异步消息处理
官方文档: Server-Sent Events
参考博客:Spring Boot 中的 SSE 异常处理:统一处理连接和事件异常
阮一峰大佬的博客:Server-Sent Events 教程 - 阮一峰的网络日志
理论部分
关于 SSE 的理论部分,请看《Web 实时消息推送的 7 种实现方案》的 Server-sent events
小节
以后有时间了,再深入了解一下 HTTP 异步消息处理的在 Servlet 层面的原理。
后端
Spring 的控制器方法默认支持哪些返回值类型,看官方文档:Spring MVC Return Values,其中就专门提到了返回值类型为 SseEmitter
或者 ResponseBodyEmitter
的场景,这两个类的作用是发出一个异步对象传输流,用 HttpMessageConverter
的具体实现将其写入 HTTP 响应。这两个类也支持作为 ResponseEntity
的主体。参见 异步请求 和 HTTP 流。
ResponseBodyEmitter
用于异步请求处理,其子类 SseEmitter
专门处理异步消息中的一个特殊场景,即 Server-sent events
。以后我们有异步请求处理的需求,都可以通过从控制器方法返回 SseEmitter
来实现。
关于控制器方法不同的返回值,Spring 是如何处理的,请看《RequestMappingHandlerAdapter 源码解析》的
返回值处理器
小节
SseEmitter
的结构也很简单,其父类为 ResponseBodyEmitter
,我们直接从这个类开始看起。
ResponseBodyEmitter
ResponseBodyEmitter
是异步请求处理的控制器方法返回值类型,控制器方法返回 ResponseBodyEmitter
后,我们可以将一个或多个对象被写入 ResponseBodyEmitter
中,推送到客户端,这就是异步请求处理。
有同样作用的还有 DeferredResult
,不过 DeferredResult
用于推送单个对象到客户端,而 ResponseBodyEmitter
可以用于发送多个对象,注意,这些对象不一定是同一种类型的对象,而且其中每个对象都应该注册相应的 HttpMessageConverter
,因为最终需要通过 HttpMessageConverter
将对象写入到响应消息体中。
关于如何创建
HttpMessageConverter
,并将其注册到 Spring 容器中,请看《HttpMessageConverters 自动配置类源码解析》
ResponseBodyEmitter
方法的源码其实没什么可看的,值得注意的是,ResponseBodyEmitter
的初始化方法(ResponseBodyEmitter#initialize
,ResponseBodyEmitter#initializeWithError
)、发送消息的方法(ResponseBodyEmitter#send
)、关闭连接的方法(ResponseBodyEmitter#complete
,ResponseBodyEmitter#completeWithError
)、甚至异常事件捕获的方法(ResponseBodyEmitter#onTimeout
,ResponseBodyEmitter#onError
,ResponseBodyEmitter#onCompletion
),全都添加了 synchronized
关键字,也就是说,ResponseBodyEmitter
对象实例的整个工作周期,都是线程安全的,尤其是发送消息的方法也是线程安全的,这样,我们就可以在多个线程中使用同一个 ResponseBodyEmitter
实例的 send 方法推送消息到客户端。
ResponseBodyEmitter
的使用方式如下
@RequestMapping(value="/stream", method=RequestMethod.GET)
public ResponseBodyEmitter handle() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
// Pass the emitter to another component...
return emitter;
}
// in another thread
emitter.send(foo1);
// and again
emitter.send(foo2);
// and done
emitter.complete();
SseEmitter - 重点,常用
SseEmitter
继承了 ResponseBodyEmitter
,是 Spring 专门为 Server-Sent Events 实现的异步请求处理的控制器方法返回值类型,
SseEmitter
的主要作用,就是简化了 ResponseBodyEmitter
的使用,将所有异步推送的 HTTP 信息的消息体的格式都统一成 UTF-8
编码的文本。
private static final MediaType TEXT_PLAIN = new MediaType("text", "plain", StandardCharsets.UTF_8);
protected void extendResponse(ServerHttpResponse outputMessage) {
super.extendResponse(outputMessage);
HttpHeaders headers = outputMessage.getHeaders();
if (headers.getContentType() == null) {
// 设置响应消息头中的内容格式为文本流 text/event-stream
headers.setContentType(MediaType.TEXT_EVENT_STREAM);
}
}
这样,不管 SseEmitter#send
方法的 SseEmitter.SseEventBuilder
的 data 方法参数是什么类型,最终都会被转化为字符串推送到客户端,而 SpringBoot 刚好会自动注入 MappingJackson2HttpMessageConverter
,可以将任意类型的对象格式化为 JSON 字符串,因此,用户不需要再自定义 SseEmitter.SseEventBuilder
的 data 方法参数对应类型的 HttpMessageConverter
的实现类,简化了异步推送消息的过程。
当然,如果你对字符串化的结果有定制化的需求,你也可以自定义 SseEmitter.SseEventBuilder
的 data 方法参数对应类型的 HttpMessageConverter
的实现类,然后将其注册为 bean 即可,这个我们在后面都会有实践
关于为什么 SpringBoot 会自动注册
MappingJackson2HttpMessageConverter
,以及如何手动注册自定义的HttpMessageConverter
,具体请看《HttpMessageConverters 自动配置类源码解析》
此外,SseEmitter#send
方法也加了锁。保证了线程安全。这样,我们就可以在多个线程中使用同一个 SseEmitter
实例的 SseEmitter#send
方法推送消息到客户端,这在后面的本文后面的实践部分也有提到。
public void send(SseEventBuilder builder) throws IOException {
Set<DataWithMediaType> dataToSend = builder.build();
// 加锁,线程安全
this.writeLock.lock();
try {
// super 就是 ResponseBodyEmitter
// 其实 ResponseBodyEmitter 的 send 已经加了所了,这里再加一次,感觉有点多余
super.send(dataToSend);
}
finally {
this.writeLock.unlock();
}
}
实际上,SseEmitter#send
方法最终调用的是到 ResponseBodyEmitterReturnValueHandler#HttpMessageConvertingHandler
的 sendInternal
方法,调用栈如下:
ResponseBodyEmitterReturnValueHandler#HttpMessageConvertingHandler
的 sendInternal
方法的具体细节请看 ResponseBodyEmitterReturnValueHandler
小节。
SseEmitter#send
方法的参数是 SseEmitter$SseEventBuilder
,这个构造器接口的主要作用是构造一个 SSE 也就是一个服务端发送的事件,这个接口默认只有一个实现类,也在 SseEmitter 内部 SseEmitter$SseEventBuilderImpl
,我们通过 SseEmitter#event
方法可以轻松获取这个构造器实现。
这个构造器可以指定一个 SSE 的以下几个字段
-
id:事件 ID
-
name:事件类型,前端 EventSource 对象默认监听三种类型的事件:
open
、error
、message
,我们可以通过在后端指定 SSE 的 name 为这三个类型,来往这三个监听方法发信息。我们也可以通过指定其他 name 来自定义 SSE 的类型。具体请看
EventSource 对象
小节和自定义关闭事件
小节 -
reconnectTime:重试时间
-
comment:事件评论
-
data:事件数据,我们可以传入任意类型的对象
具体请看
自定义传递对象
小节
这个与 HTTP 消息
小节提到的 SSE 对应的 HTTP 消息的格式是对应的。
此外,调用 SseEmitter
实例的 complete 方法可以结束请求,关闭连接,连接一旦关闭,就会触发前端的 error 事件,而前端的 EventSource
对象是存在断线重连机制的,如果不调用 EventSource
对象的 close 方法,EventSource
对象自动进行断线重连,持续尝试对后端发起连接。因此,我们可以在监听到 error 事件的时候调用 EventSource.close();
来关闭前端 EventSource 对象,阻止重试,不过这样就无法在网络出现波动的意外情况下自动重连了,因此,我们还是建议后端发送自定义的关闭事件,来通知客户端关闭连接,这个在后面实践环节也会提到。
SseEmitter
实例的 complete 方法实际上继承自ResponseBodyEmitter#complete
,最终调用ResponseBodyEmitterReturnValueHandler#HttpMessageConvertingHandler
的complete
方法,这个到ResponseBodyEmitterReturnValueHandler
小节再看
ResponseBodyEmitterReturnValueHandler
ResponseBodyEmitterReturnValueHandler
的作用就是处理控制器方法返回值类型为 ResponseBodyEmitter
的情况,所以啊,源码其实应该从这里开始看。
其实
ResponseBodyEmitterReturnValueHandler
我们之前接触过,在《RequestMappingHandlerAdapter 源码解析》的返回值处理器
小节中,提到过这个类,也看到过其他的返回值处理类
返回值处理类的重点是 handleReturnValue
方法,这里我们简单分析一下
@Override
@SuppressWarnings("resource")
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
// 获取原始的响应对象,并封装为 outputMessage
HttpServletResponse response = webRequest.getNativeResponse(HttpServletResponse.class);
Assert.state(response != null, "No HttpServletResponse");
ServerHttpResponse outputMessage = new ServletServerHttpResponse(response);
// 如果是 ResponseEntity
if (returnValue instanceof ResponseEntity<?> responseEntity) {
response.setStatus(responseEntity.getStatusCode().value());
outputMessage.getHeaders().putAll(responseEntity.getHeaders());
returnValue = responseEntity.getBody();
returnType = returnType.nested();
if (returnValue == null) {
mavContainer.setRequestHandled(true);
outputMessage.flush();
return;
}
}
ServletRequest request = webRequest.getNativeRequest(ServletRequest.class);
Assert.state(request != null, "No ServletRequest");
ResponseBodyEmitter emitter;
if (returnValue instanceof ResponseBodyEmitter responseBodyEmitter) {
// 如果是 ResponseBodyEmitter 这个类或者其子类,说明还是传统的 servelet API 处理
emitter = responseBodyEmitter;
} else {
// 如果不是 ResponseBodyEmitter 这个类或者其子类,那么就是响应式 API
emitter = this.reactiveHandler.handleValue(returnValue, returnType, mavContainer, webRequest);
if (emitter == null) {
// Not streaming: write headers without committing response..
outputMessage.getHeaders().forEach((headerName, headerValues) -> {
for (String headerValue : headerValues) {
response.addHeader(headerName, headerValue);
}
});
return;
}
}
// 对 HTTP 响应消息进行拓展,如果是 SseEmitter,此时就会设置响应消息体的格式为文本流 text/event-stream
emitter.extendResponse(outputMessage);
// At this point we know we're streaming..
ShallowEtagHeaderFilter.disableContentCaching(request);
// 包装响应对象,开始以流的形式进行消息传递
// Wrap the response to ignore further header changes
// Headers will be flushed at the first write
outputMessage = new StreamingServletServerHttpResponse(outputMessage);
HttpMessageConvertingHandler handler;
try {
DeferredResult<?> deferredResult = new DeferredResult<>(emitter.getTimeout());
// 本质上还是基于 deferredResult 进行异步消息传递
// getAsyncManager 获取的是 WebAsyncManager,这是异步请求处理的核心
// startDeferredResultProcessing 方法的作用是,监听异步请求的各种事件,比如请求超时,请求报错,请求完成,并在这些事件发生的时候对 deferredResult 执行相应的操作
// 这里就不深入理解了
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);
handler = new HttpMessageConvertingHandler(outputMessage, deferredResult);
}
catch (Throwable ex) {
emitter.initializeWithError(ex);
throw ex;
}
// 初始化 ResponseBodyEmitter
emitter.initialize(handler);
}
然后再看内部类 ResponseBodyEmitterReturnValueHandler#HttpMessageConvertingHandler
,这个内部类实现了 ResponseBodyEmitter.Handler
。
我们重点看这个内部类的 sendInternal
方法,简单的源码分析如下:
// data 就是 SseEmitter.event().id().name().data() 的 data 方法传入的值,也就是我们推送到客户端的对象
// mediaType 是我们希望以什么样的格式来传递 data
private <T> void sendInternal(T data, @Nullable MediaType mediaType) throws IOException {
// ResponseBodyEmitterReturnValueHandler.this.sseMessageConverters 保存的是当前容器内的所有的 HttpMessageConverter
for (HttpMessageConverter<?> converter : ResponseBodyEmitterReturnValueHandler.this.sseMessageConverters) {
// 判断当前 converter 能否将 data 的类型的数据转化为指定的 MediaType 格式
if (converter.canWrite(data.getClass(), mediaType)) {
// 如果可以,直接转化,写入到响应中
((HttpMessageConverter<T>) converter).write(data, mediaType, this.outputMessage);
// 然后直接返回,不会再继续查找,所以 converter 的顺序很重要,这话都要说烂了。
return;
}
}
throw new IllegalArgumentException("No suitable converter for " + data.getClass());
}
类似的分析请看《SpringMVC-ContentNegotiation 内容协商》的
简单分析RequestResponseBodyMethodProcessor
小节
这个内部方法 sendInternal
方法,实际上就是将异步推送的数据推送到客户端的地方。
假设我们调用 SseEmitter#send
方法向客户端推送数据,那么从 SseEmitter#send
方法到 ResponseBodyEmitterReturnValueHandler#HttpMessageConvertingHandler
的 sendInternal
方法的调用栈如下:
此外还有 complete
方法,简单看一下。
@Override
public void complete() {
try {
// outputMessage 的类型为 ServerHttpResponse 就是 HTTP 响应对象
// 执行刷新
this.outputMessage.flush();
// setResult 会标志着异步请求完成
this.deferredResult.setResult(null);
}
catch (IOException ex) {
this.deferredResult.setErrorResult(ex);
}
}
HTTP 消息
数据格式
服务器向浏览器发送的 SSE 数据,必须是 UTF-8 编码的文本,具有如下的 HTTP 头信息。
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
上面三行之中,第一行的 Content-Type
必须指定 MIME 类型为 event-steam
。
每一次发送的信息,由若干个 message
组成,每个 message
之间用 \n\n
分隔。每个 message
内部由若干行组成,每一行都是如下格式。
[field]: value\n
上面的 field
可以取四个值。
- data
- event
- id
- retry
此外,还可以有冒号开头的行,表示注释。通常,服务器每隔一段时间就会向浏览器发送一个注释,保持连接不中断。
: This is a comment
下面是一个例子。
: this is a test stream\n\n
data: some text\n\n
data: another message\n
data: with two lines \n\n
data 字段
数据内容用 data
字段表示。
data: message\n\n
如果数据很长,可以分成多行,最后一行用 \n\n
结尾,前面行都用 \n
结尾。
data: begin message\n
data: continue message\n\n
下面是一个发送 JSON 数据的例子。
data: {\n
data: "foo": "bar",\n
data: "baz", 555\n
data: }\n\n
id 字段
数据标识符用 id
字段表示,相当于每一条数据的编号。
id: msg1\n
data: message\n\n
浏览器用 lastEventId
属性读取这个值。一旦连接断线,浏览器会发送一个 HTTP 头,里面包含一个特殊的 Last-Event-ID
头信息,将这个值发送回来,用来帮助服务器端重建连接。因此,这个头信息可以被视为一种同步机制。
event 字段
event
字段表示自定义的事件类型,默认是 message
事件。浏览器可以用 addEventListener()
监听该事件。
event: foo\n
data: a foo event\n\n
data: an unnamed event\n\n
event: bar\n
data: a bar event\n\n
上面的代码创造了三条信息。第一条的名字是 foo
,触发浏览器的 foo
事件;第二条未取名,表示默认类型,触发浏览器的 message
事件;第三条是 bar
,触发浏览器的 bar
事件。
下面是另一个例子。
event: userconnect
data: {"username": "bobby", "time": "02:33:48"}
event: usermessage
data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}
event: userdisconnect
data: {"username": "bobby", "time": "02:34:23"}
event: usermessage
data: {"username": "sean", "time": "02:34:36", "text": "Bye, bobby."}
retry 字段
服务器可以用 retry
字段,指定浏览器重新发起连接的时间间隔。
retry: 10000\n
两种情况会导致浏览器重新发起连接:一种是时间间隔到期,二是由于网络错误等原因,导致连接出错。
前端
官方文档: Server-Sent Events
EventSource 对象
SSE 的客户端 API 部署在 EventSource
对象上。下面的代码可以检测浏览器是否支持 SSE。
if ('EventSource' in window) {
// ...
}
使用 SSE 时,浏览器首先生成一个 EventSource
实例,向服务器发起连接。
var source = new EventSource(url);
上面的 url
可以与当前网址同域,也可以跨域。跨域时,可以指定第二个参数,打开 withCredentials
属性,表示是否一起发送 Cookie。
var source = new EventSource(url, { withCredentials: true });
EventSource
实例的 readyState
属性,表明连接的当前状态。该属性只读,可以取以下值。
-
0:相当于常量
EventSource.CONNECTING
,表示连接还未建立,或者断线正在重连。 -
1:相当于常量
EventSource.OPEN
,表示连接已经建立,可以接受数据。 -
2:相当于常量
EventSource.CLOSED
,表示连接已断,且不会重连。
基本使用流程
SSE 默认的事件如下:
通道建立事件,连接一旦建立,就会触发 open
事件,可以在 onopen
属性定义回调函数。
source.onopen = function (event) {
// ...
};
// 另一种写法
source.addEventListener('open', function (event) {
// ...
}, false);
客户端收到服务器发来的数据,就会触发 message
事件,可以在 onmessage
属性的回调函数。
source.onmessage = function (event) {
var data = event.data;
// handle message
};
// 另一种写法
source.addEventListener('message', function (event) {
var data = event.data;
// handle message
}, false);
上面代码中,事件对象的 data
属性就是服务器端传回的数据(Content-Type
为 text/event-stream
,文本流格式)。
如果发生通信错误(比如连接中断),或者服务端调用 SseEmitter.complete
的时候(此方法会结束连接),就会触发 error
事件,可以在 onerror
属性定义回调函数。此时如果不调用 EventSource
对象的 close 方法,这个对象自动进行断线重连,会持续对其 url 发起连接。因此,我们一般是建议通过自定义事件来通知前端关闭 EventSource
。
source.onerror = function (event) {
// handle error event
};
// 另一种写法
source.addEventListener('error', function (event) {
// handle error event
}, false);
close
方法用于关闭 SSE 连接。
source.close();
默认情况下,服务器发来的数据,总是触发浏览器 EventSource
实例的 message
事件。开发者还可以自定义 SSE 事件,这种情况下,发送回来的数据不会触发 message
事件。
总体上来说需要自定义两类事件,用来监听 SSE 的通信阶段
-
传输业务数据
我们可以根据业务情况,给一个页面中需要推送的每一种类型的数据都给一个专门的事件名称,这样一个页面只用一个 SSE 通道即可,且不同类型的数据走不同的通道,相互不干扰。
-
关闭连接
通知前端关闭 SSE 通道,不要再重试
简单实践
基本使用
首先搭建一个简单的 SpringBoot Web 程序
pom 文件:
<project>
......
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
配置文件
server:
# 当前应用所在的 Web 服务器监听的本地端口
port: 8080
servlet:
# 应用的上下文路径
context-path: /SSESimple
# 启用默认的 Servlet
register-default-servlet: true
在项目的 src\main\resources\static
目录下创建 index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>首页</title>
</head>
<body>
<h1>Hello World</h1>
SSE 测试
<br/>
<a href="./ssepage">测试 SSE </a>
</body>
</html>
然后注册控制器 SSEController
@Controller
public class SSEController {
@RequestMapping("/ssepage")
public String ssepage() {
//设置视图名称
return "ssepage.html";
}
}
添加视图页面 ssepage.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>首页</title>
</head>
<body>
<div>动态刷新:<span id="dynamic_content">还未接收到动态数据!</span></div>
</body>
<script>
let source = null;
if (window.EventSource) {
// 建立连接
source = new EventSource('http://localhost:8080/SSESimple/ssedata');
setMessageInnerHTML("开始连接 SSE 服务端");
/**
* 连接一旦建立,就会触发 open 事件
* 另一种写法:source.onopen = function (event) {}
*/
source.addEventListener('open', function (e) {
setMessageInnerHTML("成功建立连接。。。");
}, false);
/**
* 客户端收到服务器发来的数据
* 另一种写法:source.onmessage = function (event) {}
*/
source.addEventListener('message', function (e) {
setMessageInnerHTML(e.data);
});
// 异常处理
source.addEventListener('error', function (event) {
// handle error event
console.log("SSE 发生错误:", event)
// 服务端的 SseEmitter.complete; 方法,会触发这里的 error 事件
// 如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
source.close();
}, false);
} else {
setMessageInnerHTML("你的浏览器不支持 SSE");
}
function setMessageInnerHTML(message) {
document.getElementById('dynamic_content').innerHTML = message;
}
</script>
</html>
同时添加 SSE 的后台处理器 SSEDataController
@RestController
public class SSEDataController {
@GetMapping("/ssedata")
public SseEmitter handleSSE() {
// 每一次请求都返回一个新的 SseEmitter 对象
SseEmitter emitter = new SseEmitter();
// 注册回调
// 当连接完成时,会触发 onCompletion 回调
emitter.onCompletion(() -> System.out.println("link complete!!"));
// 当连接报错时,会触发 onError 回调
emitter.onErrorprintStackTrace;
// 当连接超时时,会触发 onTimeout 回调
emitter.onTimeout(() -> System.out.println("connection timeout!!"));
// 注意,不要使用 Executors.newSingleThreadExecutor 这样,每次用户请到达服务端都会创建一个核心线程数为 1 的线程池,执行完请求之后,核心线程会一直存在,这样的话,会造成线程的浪费
// 单线程写入
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
SseEmitter.SseEventBuilder event = SseEmitter.event().id(String.valueOf(i)).name("message").data("SSE event #" + i);
emitter.send(event);
Thread.sleep(1000);
}
// 开始关闭
// 标志 SSE 事件完成,关闭连接,连接会断开,这会触发前端的 error 事件
emitter.complete();
} catch (IOException e) {
// 检测到报错,关闭连接,当然如果报错原因就是连接断开了,则啥都不做直接返回
emitter.completeWithError(e);
} catch (InterruptedException e) {
// 线程中断
// throw new RuntimeException(e);
}
}).start();
return emitter;
}
}
启动之后,访问 http://localhost:8080/SSESimple/
:进入首页之后点击 测试 SSE
,即可看到页面内容在刷新
输出到 9 之后,因为服务端关闭了异步请求,因此数字不再增加,同时服务端控制台输出
link complete!!
打开浏览器控制台,我们可以看到请求的具体情况

以及 EventStream
事件流,可以很清楚地看到服务端给客户端推送的事件流

关闭事件
有一点需要注意的是异步请求的关闭,如果连接关闭了,而一方不是主动关闭连接的那一方,那这一方就会报错。
在上面的示例代码中,是客户端主动结束异步请求的处理(SseEmitter
实例调用 complete 方法),此时因为连接关闭,客户端监听到连接断开之后,会触发 error 事件,输出错误如下。
其实如果是因为其他原因导致连接关闭,比如断网,也会触发客户端的 error 事件。

而前端的 EventSource
对象是存在断线重连机制的,如果不调用 EventSource
对象的 close 方法
// 异常处理
source.addEventListener('error', function (event) {
// handle error event
console.log("SSE 发生错误:", event)
// 服务端的 SseEmitter.complete; 方法,会触发这里的 error 事件
// 如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
// source.close();
}, false);
EventSource
对象自动进行断线重连,持续尝试对后端发起连接。如果网络是畅通的,而且后端接口也依然可用,那么就前端会再次与后端建立连接一个新的连接

,效果就是前端页面显示到 9 之后,会再次从 0 开始刷新:
如果我们想不进行自动重连,那我们可以在监听到 error 事件的时候调用 EventSource.close();
来关闭前端 EventSource 对象,阻止重试,
// 异常处理
source.addEventListener('error', function (event) {
// handle error event
console.log("SSE 发生错误:", event)
// 服务端的 SseEmitter.complete; 方法,会触发这里的 error 事件
// 如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
source.close();
}, false);
不过这样就无法在网络出现波动的意外情况下自动重连了,因此,我们还是建议后端发送自定义的关闭事件,来通知客户端关闭连接,这个在 自定义关闭事件
小节会提到。
我们再来看看如果服务端还未关闭连接(SseEmitter
实例调用 complete 方法),客户端主动关闭连接会怎么样。
经过实验我们发现,客户端在服务端还未调用 complete 方法之前,标签页关闭或者 JavaScript 脚本中主动调用 EventSource.close
方法,甚至页面刷新,服务端都报错,提示连接已关闭。输出错误信息如下:
可以这样在前端中主动关闭连接:
setTimeout(function () { source.close(); }, 3000)
java.io.IOException: 你的主机中的软件中止了一个已建立的连接。
at java.base/sun.nio.ch.SocketDispatcher.write0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:54)
at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:132)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:97)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:53)
at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:532)
at org.apache.tomcat.util.net.NioChannel.write(NioChannel.java:118)
at org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper.doWrite(NioEndpoint.java:1381)
at org.apache.tomcat.util.net.SocketWrapperBase.doWrite(SocketWrapperBase.java:764)
at org.apache.tomcat.util.net.SocketWrapperBase.flushBlocking(SocketWrapperBase.java:728)
at org.apache.tomcat.util.net.SocketWrapperBase.flush(SocketWrapperBase.java:712)
at org.apache.coyote.http11.Http11OutputBuffer$SocketOutputBuffer.flush(Http11OutputBuffer.java:566)
at org.apache.coyote.http11.filters.ChunkedOutputFilter.flush(ChunkedOutputFilter.java:157)
at org.apache.coyote.http11.Http11OutputBuffer.flush(Http11OutputBuffer.java:220)
at org.apache.coyote.http11.Http11Processor.flush(Http11Processor.java:1245)
at org.apache.coyote.AbstractProcessor.action(AbstractProcessor.java:400)
at org.apache.coyote.Response.action(Response.java:208)
at org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:301)
at org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:267)
at org.apache.catalina.connector.CoyoteOutputStream.flush(CoyoteOutputStream.java:120)
at org.springframework.util.StreamUtils.copy(StreamUtils.java:135)
at org.springframework.http.converter.StringHttpMessageConverter.writeInternal(StringHttpMessageConverter.java:130)
at org.springframework.http.converter.StringHttpMessageConverter.writeInternal(StringHttpMessageConverter.java:44)
at org.springframework.http.converter.AbstractHttpMessageConverter.write(AbstractHttpMessageConverter.java:236)
at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.sendInternal(ResponseBodyEmitterReturnValueHandler.java:221)
at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.send(ResponseBodyEmitterReturnValueHandler.java:212)
at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.sendInternal(ResponseBodyEmitter.java:234)
at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.send(ResponseBodyEmitter.java:225)
at org.springframework.web.servlet.mvc.method.annotation.SseEmitter.send(SseEmitter.java:135)
at xyz.xiashuo.ssesimple.controller.SSEDataController.lambda$handleSSE$2(SSEDataController.java:34)
at java.base/java.lang.Thread.run(Thread.java:833)
link complete!!
日志的输出是因为我们在代码中注册了错误处理动作
emitter.onErrorprintStackTrace;
而且我们发现即使是发生了错误,连接关闭的时候依然会触发 emitter.onCompletion
事件。
因此,我们可以在 onCompletion
或者 onError
的处理方法中,关闭服务端的相关资源,比如线程,I/O 接口之类的。
值得注意的是,如果是因为连接断开而报的错,是不需要调用 SseEmitter.complete()
来关闭连接的,如果是因为其他的错,比如服务端内部的,跟连接状态无关的错,可以调用的 SseEmitter.complete()
来关闭连接。
为什么不需要?因为 SseEmitter.complete()
方法适用于服务端主动关闭连接的时候使用,而不应该在 Servlet 容器相关事件之后使用,这在 SseEmitter.complete()
的注释中也有提到。
自定义关闭事件
一般在监听到 error 事件的时候,我们不建议调用 EventSource.close();
来关闭前端 EventSource 对象,因为这样会跟自动重试相冲突,因此,我们可以自定义关闭事件。
在后端调用 complete 方法前,先发送一个类型为 complete 的事件,当然你也可以起别的名字。
// 开始关闭
// 建议发送自定义的关闭事件,来通知客户端关闭连接
emitter.send(SseEmitter.event().name("complete").data("close"));
// 标志 SSE 事件完成,关闭连接,连接会断开,这会触发前端的 error 事件
emitter.complete();
在前端中,在 error 事件中,不再调用 EventSource.close();
,此外,再额外监听 complete
// 异常处理
source.addEventListener('error', function (event) {
// handle error event
console.log("SSE 发生错误:", event)
// 服务端的 SseEmitter.complete; 方法,会触发这里的 error 事件
// 如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
// source.close();
}, false);
// 注意,如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
// 因此还是建议发送自定义的关闭事件,来通知客户端关闭连接。
// 默认情况下,服务器发来的数据,总是触发浏览器 EventSource 实例的 message 事件。开发者还可以自定义 SSE 事件,这种情况下,发送回来的数据不会触发 message 事件。
source.addEventListener('complete', function (e) {
source.close();
});
这样,就很完美了。此时客户端收到的 EventStream 为。PS:不知道最后两个事件 id 都为 9 会不会有影响。

自定义传递对象
在前面的例子中,我们往客户端推送的事件的数据类型都是字符串,那可以推送其他自定义的类型吗?答案是可以的。
首先在 index.html
中添加连接
<a href="./ssepage_obj">测试 SSE 传输 Java 对象</a>
然后在 SSEController
中添加控制器方法
@RequestMapping("/ssepage_obj")
public String ssepage_obj() {
//设置视图名称
return "ssepageobj.html";
}
在静态资源路径 src\main\resources\static
下添加页面 ssepageobj.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>首页</title>
</head>
<body>
<div>动态刷新:<span id="dynamic_content">还未接收到动态数据!</span></div>
</body>
<script>
let source = null;
if (window.EventSource) {
// 建立连接
source = new EventSource('http://localhost:8080/SSESimple/ssedata_obj');
setMessageInnerHTML("开始连接 SSE 服务端");
/**
* 连接一旦建立,就会触发 open 事件
* 另一种写法:source.onopen = function (event) {}
*/
source.addEventListener('open', function (e) {
setMessageInnerHTML("成功建立连接。。。");
}, false);
/**
* 客户端收到服务器发来的数据
* 另一种写法:source.onmessage = function (event) {}
*/
source.addEventListener('message', function (e) {
setMessageInnerHTML(e.data);
});
// 异常处理
source.addEventListener('error', function (event) {
// handle error event
console.log("SSE 发生错误:", event)
// 服务端的 SseEmitter.complete; 方法,会触发这里的 error 事件
// 如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
// source.close();
}, false);
// 注意,如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
// 因此还是建议发送自定义的关闭事件,来通知客户端关闭连接。
// 默认情况下,服务器发来的数据,总是触发浏览器 EventSource 实例的 message 事件。开发者还可以自定义 SSE 事件,这种情况下,发送回来的数据不会触发 message 事件。
source.addEventListener('complete', function (e) {
source.close();
});
// setTimeout(function () {
// source.close();
// }, 3000)
} else {
setMessageInnerHTML("你的浏览器不支持 SSE");
}
function setMessageInnerHTML(message) {
document.getElementById('dynamic_content').innerHTML = message;
}
</script>
</html>
然后添加自定义 SSE 消息对象类
@Data
为 Lombok 的注解
@Data
public class SSEMessage {
private String name;
private int age;
private LocalDateTime birthday = LocalDateTime.now();
}
再在 SSEDataController
中添加后台接口
@GetMapping("/ssedata_obj")
public SseEmitter handleSSEObj() {
// 每一次请求都返回一个新的 SseEmitter 对象
SseEmitter emitter = new SseEmitter();
// 注册回调
// 当连接完成时,会触发 onCompletion 回调
emitter.onCompletion(() -> System.out.println("link complete!!"));
// 当连接报错时,会触发 onError 回调
emitter.onErrorprintStackTrace;
// 当连接超时时,会触发 onTimeout 回调
emitter.onTimeout(() -> System.out.println("connection timeout!!"));
// 注意,不要使用 Executors.newSingleThreadExecutor 这样,每次用户请到达服务端都会创建一个核心线程数为 1 的线程池,执行完请求之后,核心线程会一直存在,这样的话,会造成线程的浪费
// 单线程写入
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
// 传递字符串
// SseEmitter.SseEventBuilder event = SseEmitter.event().id(String.valueOf(i)).name("message").data("SSE event #" + i);
// 传递自定义类型
SSEMessage message = new SSEMessage();
message.setName("SSE event #" + i);
SseEmitter.SseEventBuilder event = SseEmitter.event().id(String.valueOf(i)).name("message").data(message);
emitter.send(event);
Thread.sleep(1000);
}
// 开始关闭
// 建议发送自定义的关闭事件,来通知客户端关闭连接
emitter.send(SseEmitter.event().name("complete").data("close"));
// 标志 SSE 事件完成,关闭连接,连接会断开,这会触发前端的 error 事件
emitter.complete();
} catch (IOException e) {
// 检测到报错,关闭连接,当然如果报错原因就是连接断开了,则啥都不做直接返回
emitter.completeWithError(e);
} catch (InterruptedException e) {
// 线程中断
// throw new RuntimeException(e);
}
}).start();
return emitter;
}
启动项目之后,在前台页面中可以看到
为什么可以正常传输呢?根据我们在 SseEmitter
的源码分析,我们可以知道,最终传递给客户端的 HTTP 消息的格式是 MediaType.TEXT_EVENT_STREAM
,而 ResponseBodyEmitterReturnValueHandler#HttpMessageConvertingHandler
的 sendInternal
方法要求系统中存在一个 HttpMessageConverter
,能够将我们传入的类型(可能是任意类型)转化成 MediaType.TEXT_EVENT_STREAM
类型的 HTTP 消息传给客户端,但是我们并没有注册这样一个 HttpMessageConverter
,那么实际是那个 HttpMessageConverter
完成的转化呢?经过简单调试,我们发现实际默认注册了 8 个 HttpMessageConverter

而最终负责将我们传入的类型(可能是任意类型)转化成 MediaType.TEXT_EVENT_STREAM
类型的 HTTP 消息的是 MappingJackson2HttpMessageConverter

关于为什么 SpringBoot 会自动注册
MappingJackson2HttpMessageConverter
,以及如何手动注册自定义的HttpMessageConverter
,具体请看《HttpMessageConverters 自动配置类源码解析》
难怪看页面上的结果,那么像一个 JSON 字符串。
当然你也可以自定义一个 HttpMessageConverter
,用来处理特定格式到成 MediaType.TEXT_EVENT_STREAM
类型的 HTTP 消息的转化,例如 SSEMessageConverter
:
/**
* 参考 StringHttpMessageConverter
*/
@Component
public class SSEMessageConverter extends AbstractHttpMessageConverter<SSEMessage> {
// 默认编码
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
public SSEMessageConverter() {
// super(DEFAULT_CHARSET, null);
}
@Override
protected boolean supports(Class<?> clazz) {
// 只支持处理 SSEMessage 类型的实例
return SSEMessage.class.isAssignableFrom(clazz);
}
@Override
protected SSEMessage readInternal(Class<? extends SSEMessage> clazz, HttpInputMessage inputMessage) throws IOException, HttpMessageNotReadableException {
// 读操作有待实现
return null;
}
@Override
protected void writeInternal(SSEMessage sseMessage, HttpOutputMessage outputMessage) throws IOException, HttpMessageNotWritableException {
String content = myProtocolContent(sseMessage);
// 输出到响应的消息体
StreamUtils.copy(content, DEFAULT_CHARSET, outputMessage.getBody());
}
/**
* 自定义消息输出内容格式
*
* @param sseMessage
* @return
*/
private String myProtocolContent(SSEMessage sseMessage) {
// 当然你可以在这里选择其他的拼接方式,比如把所有的属性用英文逗号拼接,等等。
// 或者序列化为 JSON 字符串
return "自定义转换:"+sseMessage.toString();
}
@Override
protected void addDefaultHeaders(HttpHeaders headers, SSEMessage s, @Nullable MediaType type) throws IOException {
// 不重写 addDefaultHeaders 指定响应的 ContentType 消息头的话,默认的 addDefaultHeaders 会自动设置 ContentType 为自定义 MediaType,
// 浏览器会处理不了,会变成下载,所以这里需要设置 ContentType 消息头,设置成 MediaType.TEXT_PLAIN 是为了方便展示
// 指定类型的时候指定编码
headers.setContentType(new MediaType(MediaType.TEXT_PLAIN, DEFAULT_CHARSET));
super.addDefaultHeaders(headers, s, type);
}
@Override
protected Long getContentLength(SSEMessage message, @Nullable MediaType contentType) {
String str = myProtocolContent(message);
return (long) str.getBytes(DEFAULT_CHARSET).length;
}
}
再次调试,可以看到 SSEMessageConverter
确实出现在了转化器列表中,而且排在 MappingJackson2HttpMessageConverter
的前面,这意味着 SSEMessageConverter
会优先生效

实际的结果:
可以看到,转化结果确实是自定义的 HttpMessageConverter
转化的结果。
一个页面多个类型的事件使用同一个 SseEmitter 对象
在实际使用的过程中,一个页面中可能有多个需要异步处理的请求,比如订单信息的变更,比如聊天信息的条数,等等,这些信息属于不同的主题,而且 SseEmitter
是支持单个对象传输不同主题的 SSE 事件的,而且因为 SseEmitter
的 send 方法是线程安全的,因此,我们可以在多个线程中调用同一个 SseEmitter
实例的 send 方法来推送消息,方案也很简单,一个主题的事件用一个线程推送即可。
首先在 index.html
中添加连接
<a href="./ssepage_concurrent">测试 SSE 传输多类型事件</a>
然后在 SSEController
中添加控制器方法
@RequestMapping("/ssepage_concurrent")
public String ssepageConcurrent() {
//设置视图名称
return "ssepage_concurrent.html";
}
在静态资源路径 src\main\resources\static
下添加页面 ssepage_concurrent.html
,在这个页面中,我们手动监听三个类型的事件 topic1
、topic2
、topic3
。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>首页</title>
</head>
<body>
<div>主题 1 动态刷新:<span id="dynamic_content_1">还未接收到动态数据!</span></div>
<div>主题 2 动态刷新:<span id="dynamic_content_2">还未接收到动态数据!</span></div>
<div>主题 3 动态刷新:<span id="dynamic_content_3">还未接收到动态数据!</span></div>
</body>
<script>
let source = null;
if (window.EventSource) {
// 建立连接
source = new EventSource('http://localhost:8080/SSESimple/ssedata_concurrent');
console.log("开始连接 SSE 服务端");
/**
* 连接一旦建立,就会触发 open 事件
* 另一种写法:source.onopen = function (event) {}
*/
source.addEventListener('open', function (e) {
console.log("成功建立连接。。。");
}, false);
/**
* 客户端收到服务器发来的数据
* 另一种写法:source.onmessage = function (event) {}
*/
source.addEventListener('message', function (e) {
// 后端指定的 ID 通过 e.lastEventId 获取
setMessageInnerHTML(e.lastEventId, e.data);
});
source.addEventListener('topic1', function (e) {
// 后端指定的 ID 通过 e.lastEventId 获取
setMessageInnerHTML(e.lastEventId, e.data);
});
source.addEventListener('topic2', function (e) {
// 后端指定的 ID 通过 e.lastEventId 获取
setMessageInnerHTML(e.lastEventId, e.data);
});
source.addEventListener('topic3', function (e) {
// 后端指定的 ID 通过 e.lastEventId 获取
setMessageInnerHTML(e.lastEventId, e.data);
});
// 异常处理
source.addEventListener('error', function (event) {
// handle error event
console.log("SSE 发生错误:", event)
// 服务端的 SseEmitter.complete; 方法,会触发这里的 error 事件
// 如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
// source.close();
}, false);
// 注意,如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
// 因此还是建议发送自定义的关闭事件,来通知客户端关闭连接。
// 默认情况下,服务器发来的数据,总是触发浏览器 EventSource 实例的 message 事件。开发者还可以自定义 SSE 事件,这种情况下,发送回来的数据不会触发 message 事件。
source.addEventListener('complete', function (e) {
source.close();
});
} else {
setMessageInnerHTML("你的浏览器不支持 SSE");
}
function setMessageInnerHTML(id, message) {
document.getElementById('dynamic_content_' + id).innerHTML = message;
}
</script>
</html>
再在 SSEDataController
中添加后台接口
@GetMapping("/ssedata_concurrent")
public SseEmitter handleSSEConcurrent() {
SseEmitter emitter = new SseEmitter();
// 注册回调
emitter.onCompletion(() -> System.out.println("link complete!!"));
emitter.onErrorprintStackTrace;
emitter.onTimeout(() -> System.out.println("connection timeout!!"));
// 在实际业务种,我们可以给每一种业务分配一个主题,并且用一个专门的线程来推送数据
for (int i = 1; i <= 3; i++) {
int finalI = i;
new Thread(() -> {
String topic = "topic" + finalI;
// 测试每个主题发送 10 条数据
for (int j = 0; j < 10; j++) {
String msg = " Topic " + finalI + " : SSE event #" + j;
SseEmitter.SseEventBuilder event = SseEmitter.event().id(String.valueOf(finalI)).name(topic).data(msg);
try {
emitter.send(event);
// 延时
Thread.sleep(800 + 50 * finalI * j);
} catch (IOException e) {
// 客户端主动关闭
emitter.completeWithError(e);
} catch (InterruptedException e) {
// 线程中断
// throw new RuntimeException(e);
}
}
}).start();
}
return emitter;
}
启动项目之后,在前台页面中可以看到下图这样的变换,为了让效果更直观,我们还给每个主题添加了不一样的延时
效果还是不错的。
在实际的生产实践中,应该是一个用户一个 SseEmitter
对象,而不是一个请求一个 SseEmitter
对象,最好是用一个 Map
,来保存所有的 SseEmitter
,key 为 SessionID
,在 Session
过期的时候清除对应的 SseEmitter
,这个就懒得写了,后面用到了再说。