SSE 是单工通信模式,只能服务端向客户端发送消息,我们做消息推送、数据更新、web图片用手机上传并实时显示上传图片等。

SPringBoot 已经内置,无需新增扩展,新建一个服务端的工具类,代码如下:


import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

//sse客户端
public class SseClient {
    public static volatile SseClient instance;
    private static final Map<String, SseEmitter> sseMap = new HashMap<>();

    public static SseClient getInstance() {
        if (instance == null) {
            synchronized (SseClient.class) {
                if (instance == null) {
                    instance = new SseClient();
                }
            }
        }
        return instance;
    }

    public SseEmitter createSse(String key) {
        //30秒超时
        SseEmitter sseEmitter = new SseEmitter(0L);
        sseEmitter.onCompletion(new Runnable() {
            @Override
            public void run() {
                //完成回调
                sseMap.remove(key);
                System.out.println( "完成回调====" );
            }
        });
        sseEmitter.onTimeout(new Runnable() {
            @Override
            public void run() {
                //超时回调
                sseMap.remove(key);
                System.out.println( "超时回调====" );
            }
        });
        sseEmitter.onError(new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) {
                try {
                    sseEmitter.send(
                            SseEmitter.event()
                                    .id(key)
                                    .name("error")
                                    .data(throwable.getMessage())
                                    .reconnectTime(5000)
                    );
                    sseMap.put(key, sseEmitter);
                    System.out.println( "失败回调====" );
                } catch (IOException e) {
                    if (sseMap.containsKey(key)) sseMap.remove(key);
                    throw new RuntimeException(e);
                }
            }
        });
        try {
            sseEmitter.send(SseEmitter.event().reconnectTime(5000));
        } catch (IOException e) {
            if (sseMap.containsKey(key)) sseMap.remove(key);
            throw new RuntimeException(e);
        }
        sseMap.put(key, sseEmitter);
        return sseEmitter;
    }
    public void send(String key, String message) {
        System.out.println("send message:" + key + "==" + key + "===" + message);
        if (!sseMap.containsKey(key)) return;
        SseEmitter sseEmitter = sseMap.get(key);
        if (sseEmitter == null) return;
        try {
            sseEmitter.send(SseEmitter.event()
                    .id(key)
                    .name("message")
                    .data(message)
                    .reconnectTime( 1*60*1000L )
            );
        } catch (IOException e) {
            if (sseMap.containsKey(key)) sseMap.remove(key);
            sseEmitter.complete();
            throw new RuntimeException(e);
        }
    }

    public void close(String key) {
        if (sseMap.containsKey(key)) {
            SseEmitter sseEmitter = sseMap.get(key);
            if (sseEmitter != null) {
                sseEmitter.complete();
            }
            sseMap.remove(key);
        }
        System.out.println("close sse key:" + key);
    }

}



接下来控制器写法:




import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

/**
 * sse连接管理
 */
@Controller
@RequestMapping("/sse")
public class SseController extends WebBaseController
{
    @CrossOrigin(origins = "*")
    @GetMapping(value=  "/content/{uid}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    SseEmitter content(@PathVariable String uid) {
        System.out.println("开始连接");
        SseEmitter sse = SseClient.getInstance().createSse(uid + "");
        //return sse;// != null ? success("连接成功") : error("连接失败");
        System.out.println("连接完成");
        return sse;
    }
    @CrossOrigin(origins = "*")
    @GetMapping("/send")
    public AjaxResult send(@RequestParam("content") String content){
        Long uid = 0L;
        SseClient.getInstance().send(uid + "" , content );
        return success("发送消息");
    }
    @CrossOrigin(origins = "*")
    @PostMapping("/close")
    public AjaxResult close(){
        Long uid = 0L;
        SseClient.getInstance().close(uid + "" );
        return success("断开成功");
    }

}

客户端连接

<script>
    let source = null;

    // 用时间戳模拟登录用户
    const userId = new Date().getTime();

    if (window.EventSource) {

        // 建立连接
        source = new EventSource('/sse/connect/' + userId);

        /**
         * 连接一旦建立,就会触发open事件
         * 另一种写法:source.onopen = function (event) {}
         */
        source.addEventListener('open', function (e) {
            setMessageInnerHTML("建立连接。。。");
        }, false);

        /**
         * 客户端收到服务器发来的数据
         * 另一种写法:source.onmessage = function (event) {}
         */
        source.addEventListener('message', function (e) {
            setMessageInnerHTML(e.data);
        });


        /**
         * 如果发生通信错误(比如连接中断),就会触发error事件
         * 或者:
         * 另一种写法:source.onerror = function (event) {}
         */
        source.addEventListener('error', function (e) {
            if (e.readyState === EventSource.CLOSED) {
                setMessageInnerHTML("连接关闭");
            } else {
                console.log(e);
            }
        }, false);

    } else {
        setMessageInnerHTML("你的浏览器不支持SSE");
    }

    // 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
    window.onbeforeunload = function () {
        closeSse();
    };

    // 关闭Sse连接
    function closeSse() {
        source.close();
        const httpRequest = new XMLHttpRequest();
        httpRequest.open('GET', '/sse/close', true);
        httpRequest.send();
        console.log("close");
    }

    // 将消息显示在网页上
    function setMessageInnerHTML(innerHTML) {
        document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }
</script>

ok!就这样!

博客地址: blog.codeceo.net