SSE 是单工通信模式,只能服务端向客户端发送消息,我们做消息推送、数据更新、web图片用手机上传并实时显示上传图片等。
SPringBoot 已经内置,无需新增扩展,新建一个服务端的工具类,代码如下:
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
//sse客户端
public class SseClient {
public static volatile SseClient instance;
private static final Map<String, SseEmitter> sseMap = new HashMap<>();
public static SseClient getInstance() {
if (instance == null) {
synchronized (SseClient.class) {
if (instance == null) {
instance = new SseClient();
}
}
}
return instance;
}
public SseEmitter createSse(String key) {
//30秒超时
SseEmitter sseEmitter = new SseEmitter(0L);
sseEmitter.onCompletion(new Runnable() {
@Override
public void run() {
//完成回调
sseMap.remove(key);
System.out.println( "完成回调====" );
}
});
sseEmitter.onTimeout(new Runnable() {
@Override
public void run() {
//超时回调
sseMap.remove(key);
System.out.println( "超时回调====" );
}
});
sseEmitter.onError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
try {
sseEmitter.send(
SseEmitter.event()
.id(key)
.name("error")
.data(throwable.getMessage())
.reconnectTime(5000)
);
sseMap.put(key, sseEmitter);
System.out.println( "失败回调====" );
} catch (IOException e) {
if (sseMap.containsKey(key)) sseMap.remove(key);
throw new RuntimeException(e);
}
}
});
try {
sseEmitter.send(SseEmitter.event().reconnectTime(5000));
} catch (IOException e) {
if (sseMap.containsKey(key)) sseMap.remove(key);
throw new RuntimeException(e);
}
sseMap.put(key, sseEmitter);
return sseEmitter;
}
public void send(String key, String message) {
System.out.println("send message:" + key + "==" + key + "===" + message);
if (!sseMap.containsKey(key)) return;
SseEmitter sseEmitter = sseMap.get(key);
if (sseEmitter == null) return;
try {
sseEmitter.send(SseEmitter.event()
.id(key)
.name("message")
.data(message)
.reconnectTime( 1*60*1000L )
);
} catch (IOException e) {
if (sseMap.containsKey(key)) sseMap.remove(key);
sseEmitter.complete();
throw new RuntimeException(e);
}
}
public void close(String key) {
if (sseMap.containsKey(key)) {
SseEmitter sseEmitter = sseMap.get(key);
if (sseEmitter != null) {
sseEmitter.complete();
}
sseMap.remove(key);
}
System.out.println("close sse key:" + key);
}
}
接下来控制器写法:
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
* sse连接管理
*/
@Controller
@RequestMapping("/sse")
public class SseController extends WebBaseController
{
@CrossOrigin(origins = "*")
@GetMapping(value= "/content/{uid}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter content(@PathVariable String uid) {
System.out.println("开始连接");
SseEmitter sse = SseClient.getInstance().createSse(uid + "");
//return sse;// != null ? success("连接成功") : error("连接失败");
System.out.println("连接完成");
return sse;
}
@CrossOrigin(origins = "*")
@GetMapping("/send")
public AjaxResult send(@RequestParam("content") String content){
Long uid = 0L;
SseClient.getInstance().send(uid + "" , content );
return success("发送消息");
}
@CrossOrigin(origins = "*")
@PostMapping("/close")
public AjaxResult close(){
Long uid = 0L;
SseClient.getInstance().close(uid + "" );
return success("断开成功");
}
}
客户端连接
<script>
let source = null;
// 用时间戳模拟登录用户
const userId = new Date().getTime();
if (window.EventSource) {
// 建立连接
source = new EventSource('/sse/connect/' + userId);
/**
* 连接一旦建立,就会触发open事件
* 另一种写法:source.onopen = function (event) {}
*/
source.addEventListener('open', function (e) {
setMessageInnerHTML("建立连接。。。");
}, false);
/**
* 客户端收到服务器发来的数据
* 另一种写法:source.onmessage = function (event) {}
*/
source.addEventListener('message', function (e) {
setMessageInnerHTML(e.data);
});
/**
* 如果发生通信错误(比如连接中断),就会触发error事件
* 或者:
* 另一种写法:source.onerror = function (event) {}
*/
source.addEventListener('error', function (e) {
if (e.readyState === EventSource.CLOSED) {
setMessageInnerHTML("连接关闭");
} else {
console.log(e);
}
}, false);
} else {
setMessageInnerHTML("你的浏览器不支持SSE");
}
// 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
window.onbeforeunload = function () {
closeSse();
};
// 关闭Sse连接
function closeSse() {
source.close();
const httpRequest = new XMLHttpRequest();
httpRequest.open('GET', '/sse/close', true);
httpRequest.send();
console.log("close");
}
// 将消息显示在网页上
function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
</script>
ok!就这样!
博客地址: blog.codeceo.net