SSE

SSE(Server-sent Events)介绍

SSEServer-sent EventsWebSocket的一种轻量代替方案,使用HTTP协议

严格地说,HTTP协议是没有办法做服务器推送的,但是当服务器向客户端声明接下来要发送流信息时,客户端就会保持连接打开,SSE使用的就是这种原理

1.SSE能做什么?

理论上,SSEWebSocket做的是同一件事情。当你需要用新数据局部更新网络应用时,SSE可以做到不需要用户执行任何操作,便可以完成

举例我们要做一个统计系统的管理后台,我们想知道统计数据的实时情况。类似这种更新频繁、 低延迟的场景,SSE可以完全满足。

其他一些应用场景:例如邮箱服务的新邮件提醒,微博的新消息推送、管理后台的一些操作实时同步等,SSE都是不错的选择

SSE是单向通道,只能服务器向客户端发送消息,如果客户端需要向服务器发送消息,则需要一个新的HTTP请求。这对比WebSocket的双工通道来说,会有更大的开销。这么一来的话就会存在一个「什么时候才需要关心这个差异?」的问题,如果平均每秒会向服务器发送一次消息的话,那应该选择WebSocket。如果一分钟仅 5 - 6 次的话,其实这个差异并不大

2.SSE优势

  • 实现一个完整的服务仅需要少量的代码
  • 可以在现有的服务中使用,不需要启动一个新的服务
  • 可以用任何一种服务端语言中使用
  • 基于HTTP/HTTPS协议,可以直接运行于现有的代理服务器和认证技术

目录结构

主目录sse

  1. —— main.go
  2. —— optional.sse.mini.js.html

代码示例

main.go

  1. // Package main显示如何通过代理通过SSE向客户端发送连续事件消息。
  2. //阅读详情:https://www.w3schools.com/htmL/html5_serversentevents.asp
  3. // https://robots.thoughtbot.com/writing-a-server-sent-events-server-in-go
  4. package main
  5. import (
  6. "encoding/json"
  7. "fmt"
  8. "time"
  9. "github.com/kataras/golog"
  10. "github.com/kataras/iris"
  11. //注意: 由于某种原因,最新的vscode-go语言扩展不能提供足够智能帮助(参数文档并转到定义功能)
  12. //对于`iris.Context`别名,因此如果您使用VS Code,则导入`Context`的原始导入路径,它将执行此操作:
  13. "github.com/kataras/iris/context"
  14. )
  15. //Broker拥有开放的客户端连接
  16. //在其Notifier频道上侦听传入事件
  17. //并将事件数据广播到所有已注册的连接
  18. type Broker struct {
  19. //主要事件收集例程将事件推送到此频道
  20. Notifier chan []byte
  21. //新的客户端连接
  22. newClients chan chan []byte
  23. //关闭客户端连接
  24. closingClients chan chan []byte
  25. //客户端连接注册表
  26. clients map[chan []byte]bool
  27. }
  28. // NewBroker返回一个新的代理工厂
  29. func NewBroker() *Broker {
  30. b := &Broker{
  31. Notifier: make(chan []byte, 1),
  32. newClients: make(chan chan []byte),
  33. closingClients: make(chan chan []byte),
  34. clients: make(map[chan []byte]bool),
  35. }
  36. //设置它正在运行 - 收听和广播事件
  37. go b.listen()
  38. return b
  39. }
  40. //听取不同的频道并采取相应应对
  41. func (b *Broker) listen() {
  42. for {
  43. select {
  44. case s := <-b.newClients:
  45. //新客户端已连接
  46. //注册他们的消息频道
  47. b.clients[s] = true
  48. golog.Infof("Client added. %d registered clients", len(b.clients))
  49. case s := <-b.closingClients:
  50. //客户端已离线,我们希望停止向其发送消息。
  51. delete(b.clients, s)
  52. golog.Warnf("Removed client. %d registered clients", len(b.clients))
  53. case event := <-b.Notifier:
  54. //我们从外面得到了一个新事件
  55. //向所有连接的客户端发送事件
  56. for clientMessageChan := range b.clients {
  57. clientMessageChan <- event
  58. }
  59. }
  60. }
  61. }
  62. func (b *Broker) ServeHTTP(ctx context.Context) {
  63. //确保编写器支持刷新
  64. flusher, ok := ctx.ResponseWriter().Flusher()
  65. if !ok {
  66. ctx.StatusCode(iris.StatusHTTPVersionNotSupported)
  67. ctx.WriteString("Streaming unsupported!")
  68. return
  69. }
  70. //设置与事件流相关的header,如果发送纯文本,则可以省略“application/json”
  71. //如果你开发了一个go客户端,你必须设置:“Accept”:“application/json,text/event-stream”header
  72. ctx.ContentType("application/json, text/event-stream")
  73. ctx.Header("Cache-Control", "no-cache")
  74. ctx.Header("Connection", "keep-alive")
  75. //我们还添加了跨源资源共享标头,以便不同域上的浏览器仍然可以连接
  76. ctx.Header("Access-Control-Allow-Origin", "*")
  77. //每个连接都使用Broker的连接注册表注册自己的消息通道
  78. messageChan := make(chan []byte)
  79. //通知我们有新连接的Broker
  80. b.newClients <- messageChan
  81. //监听连接关闭以及整个请求处理程序链退出时(此处理程序)并取消注册messageChan。
  82. ctx.OnClose(func() {
  83. //从已连接客户端的map中删除此客户端,当这个处理程序退出时
  84. b.closingClients <- messageChan
  85. })
  86. //阻止等待在此连接的消息上广播的消息
  87. for {
  88. //写入ResponseWriter
  89. // Server Sent Events兼容
  90. ctx.Writef("data: %s\n\n", <-messageChan)
  91. //或json:data:{obj}
  92. //立即刷新数据而不是稍后缓冲它
  93. flusher.Flush()
  94. }
  95. }
  96. type event struct {
  97. Timestamp int64 `json:"timestamp"`
  98. Message string `json:"message"`
  99. }
  100. const script = `<script type="text/javascript">
  101. if(typeof(EventSource) !== "undefined") {
  102. console.log("server-sent events supported");
  103. var client = new EventSource("http://localhost:8080/events");
  104. var index = 1;
  105. client.onmessage = function (evt) {
  106. console.log(evt);
  107. // it's not required that you send and receive JSON, you can just output the "evt.data" as well.
  108. dataJSON = JSON.parse(evt.data)
  109. var table = document.getElementById("messagesTable");
  110. var row = table.insertRow(index);
  111. var cellTimestamp = row.insertCell(0);
  112. var cellMessage = row.insertCell(1);
  113. cellTimestamp.innerHTML = dataJSON.timestamp;
  114. cellMessage.innerHTML = dataJSON.message;
  115. index++;
  116. window.scrollTo(0,document.body.scrollHeight);
  117. };
  118. } else {
  119. document.getElementById("header").innerHTML = "<h2>SSE not supported by this client-protocol</h2>";
  120. }
  121. </script>`
  122. func main() {
  123. broker := NewBroker()
  124. go func() {
  125. for {
  126. time.Sleep(2 * time.Second)
  127. now := time.Now()
  128. evt := event{
  129. Timestamp: now.Unix(),
  130. Message: fmt.Sprintf("Hello at %s", now.Format(time.RFC1123)),
  131. }
  132. evtBytes, err := json.Marshal(evt)
  133. if err != nil {
  134. golog.Error(err)
  135. continue
  136. }
  137. broker.Notifier <- evtBytes
  138. }
  139. }()
  140. app := iris.New()
  141. app.Get("/", func(ctx context.Context) {
  142. ctx.HTML(
  143. `<html><head><title>SSE</title>` + script + `</head>
  144. <body>
  145. <h1 id="header">Waiting for messages...</h1>
  146. <table id="messagesTable" border="1">
  147. <tr>
  148. <th>Timestamp (server)</th>
  149. <th>Message</th>
  150. </tr>
  151. </table>
  152. </body>
  153. </html>`)
  154. })
  155. app.Get("/events", broker.ServeHTTP)
  156. // http://localhost:8080
  157. // http://localhost:8080/events
  158. app.Run(iris.Addr(":8080"), iris.WithoutServerError(iris.ErrServerClosed))
  159. }

optional.sse.mini.js.html

  1. <!-- 你可以把它放到你最喜欢的浏览器 -->
  2. <html>
  3. <head>
  4. <title>SSE(javascript side)</title>
  5. <script type="text/javascript">
  6. var client = new EventSource("http://localhost:8080/events")
  7. client.onmessage = function (evt) {
  8. console.log(evt)
  9. }
  10. </script>
  11. </head>
  12. <body>
  13. <h1>打开浏览器控制台(F12)并观察传入的事件消息</h1>
  14. </body>
  15. </html>