阅读量:2
HTTP SSE (Server-Sent Events)流式服务器推送技术
SSE(Server-Sent Events)是一种基于HTTP协议的服务器推送技术,用于实现服务器向客户端实时发送事件的功能。在SSE中,服务器可以发送不同类型的消息给客户端。
以下是SSE可能出现的消息类型:
事件消息(Event Message):服务器发送的一般事件消息,可以包含自定义的事件类型和数据。
注释消息(Comment Message):服务器发送的注释消息,用于向客户端传递一些额外的信息,但不会触发任何事件处理。
重连消息(Reconnect Message):服务器发送的重连消息,用于告知客户端重新连接服务器。
下面是一个完整的HTTP请求返回结果示例,展示了SSE的使用:
HTTP/1.1 200 OK Content-Type: text/event-stream Cache-Control: no-cache Connection: keep-alive id: msgid event: myEvent data: {"message": "Hello, SSE!"} id: msgid event: myEvent data: {"message": "Another event"} : This is a comment id: msgid event: reconnect retry: 5000
在这个示例中,服务器返回的HTTP响应头中指定了Content-Type为text/event-stream,表示这是一个SSE的响应。Cache-Control设置为no-cache,确保每次请求都能从服务器获取最新的数据。Connection设置为keep-alive,保持长连接。
接下来,服务器通过多个事件消息和注释消息向客户端发送数据。每个消息以event字段指定事件类型,以data字段包含消息的数据。在示例中,服务器发送了两个事件消息,分别是myEvent类型的消息,包含不同的数据。还发送了一个注释消息,以冒号开头,用于传递额外的信息。
最后,服务器发送了一个重连消息,以event字段指定为reconnect类型,同时通过retry字段指定了重连的时间间隔为5000毫秒。
客户端可以通过监听SSE事件,实时接收服务器发送的消息,并进行相应的处理。
下面是一个使用C#解析SSE示例,展示了SSE的解析过程:
namespace System.Net { using System; using System.Collections.Generic; using System.IO; using System.Text; /// <summary> /// 解析SSE 。 /// </summary> public class EventStreamHandler { /// <summary> /// 新的换行符。 /// </summary> private readonly string newLine; /// <summary> /// 消息体与消息体之间的空白行的数量。 /// </summary> private const int emptySplitLineCount = 1; /// <summary> /// 最后1个消息体与流结束符的空白行的数量。 /// </summary> private readonly static int emptySplitLineCountEnd = 1; private string idSource; private string eventSource; private string dataSource; // 是不是可能有未知项(消息体与消息体之间的未知项)。 private bool possibleUnknownItem; /// <summary> /// 当前行。 /// </summary> private string line; /// <summary> /// 是不是处理了当前行 <see cref="line"/>。 /// </summary> private bool isHandledLine; /// <summary> /// 消息体和消息体中间的未知行。 /// </summary> private readonly List<string> unknownLines = new List<string>(16); /// <summary> /// 消息体列表。 /// </summary> private readonly List<EventStreamEventArgs> addedItems = new List<EventStreamEventArgs>(64); public EventStreamHandler(string newLine) { this.newLine = newLine; } /// <summary> /// 消息体列表。 /// </summary> public List<EventStreamEventArgs> AddedItems { get { return addedItems; } } protected string DataSource { get { return dataSource; } } protected bool IsHandledLine { get { return isHandledLine; } } /// <summary> /// 测试代码。 /// </summary> [System.Diagnostics.Conditional("DEBUG")] public static void Test() { var builder = new StringBuilder(64); builder.AppendLine("id: msgid"); builder.AppendLine("event: myEvent"); builder.AppendLine("data: Hello, SSE!"); builder.AppendLine(); builder.AppendLine("id: msgid"); builder.AppendLine("event: myEvent"); builder.AppendLine("data: Another event"); builder.AppendLine(); builder.AppendLine(": This is a comment"); builder.AppendLine(); builder.AppendLine("id: msgid"); builder.AppendLine("event: reconnect"); builder.AppendLine("retry: 5000"); builder.AppendLine(); builder.AppendLine("id: msgid end"); builder.AppendLine("event: myEvent end"); builder.AppendLine("data: Another event end"); builder.AppendLine(); var bytes = Encoding.UTF8.GetBytes(builder.ToString()); var stream = new MemoryStream(bytes); var reader = new StreamReader(stream, Encoding.UTF8); var handler = new EventStreamHandler("\r\n"); handler.HandleStreamReader(reader); var messageItems = handler.AddedItems; System.Diagnostics.Trace.WriteLine($"Message items count {messageItems.Count}"); } /// <summary> /// 解析SSE 。 /// </summary> /// <param name="streamReader"></param> public void HandleStreamReader(StreamReader streamReader) { while (!streamReader.EndOfStream) { line = streamReader.ReadLine(); isHandledLine = false; if (!string.IsNullOrEmpty(line)) { if (line.StartsWith(EventStreamEventArgs.IdBegin)) { HandleForIdBegin(); } else if (line.StartsWith(EventStreamEventArgs.EventBegin)) { HandleForEventBegin(); } else if (line.StartsWith(EventStreamEventArgs.DataBegin)) { HandleForDataBegin(); } else if (line.StartsWith(EventStreamEventArgs.RetryBegin)) { HandleForRetryBegin(); } } if (!isHandledLine) { unknownLines.Add(line); } } HandleUnknownItemWhenEnd(); } #region 解析各种消息。 private void HandleForIdBegin() { // 当前行是不是用来传输 id: 信息的行。 var isIdLine = HandleCheckIdLine(); if (isIdLine) { idSource = line; isHandledLine = true; } } private bool HandleCheckIdLine() { // 当前行是不是用来传输 id: 信息的行。 bool isIdLine = false; // 如果是第1次遇到 idSource 。移除前面遇到的未知行。 if (addedItems.Count < 1) { ClearIfNotEmpty(unknownLines); isIdLine = true; } // 如果遇到了,上1个消息体结束的特征。 else if (MatchEmptySplitLine(unknownLines, emptySplitLineCount)) { HandleUnknownItemWhenMiddle(); isIdLine = true; } return isIdLine; } private void HandleForEventBegin() { ClearIfNotEmpty(unknownLines); eventSource = line; isHandledLine = true; } private void HandleForDataBegin() { ClearIfNotEmpty(unknownLines); dataSource = line; isHandledLine = true; AddItem(addedItems, idSource, eventSource, dataSource); possibleUnknownItem = true; } private void HandleForRetryBegin() { ClearIfNotEmpty(unknownLines); isHandledLine = true; possibleUnknownItem = true; } private void HandleUnknownItemWhenMiddle() { if (possibleUnknownItem) { // 如果遇到消息体与消息体之间,插入了,额外的未知内容时。 AddUnknownItem(addedItems, emptySplitLineCount, unknownLines); possibleUnknownItem = false; ClearIfNotEmpty(unknownLines); } } private void HandleUnknownItemWhenEnd() { if (possibleUnknownItem) { var emptySplitLineCountEndNew = (emptySplitLineCountEnd == 0) ? 0 : Math.Min(emptySplitLineCountEnd, GetEndEmptyLineCount(unknownLines)); AddUnknownItem(addedItems, emptySplitLineCountEndNew, unknownLines); possibleUnknownItem = false; ClearIfNotEmpty(unknownLines); } } #endregion #region 帮助函数。 private static void ClearIfNotEmpty(List<string> unknownLines) { if (unknownLines.Count > 0) { unknownLines.Clear(); } } /// <summary> /// 是不是匹配了,消息体与消息体之间的分割特征。 /// </summary> /// <param name="lines"></param> /// <param name="emptySplitLineCount"></param> /// <returns></returns> private static bool MatchEmptySplitLine(List<string> lines, int emptySplitLineCount) { bool isMatch = lines.Count >= emptySplitLineCount; if (isMatch && emptySplitLineCount > 0) { int minIndex = lines.Count - emptySplitLineCount; for (int index = lines.Count - 1; index >= minIndex; index--) { if (!string.IsNullOrEmpty(lines[index])) { isMatch = false; break; } } } return isMatch; } /// <summary> /// 末尾连续的空白行数。 /// </summary> /// <param name="lines"></param> /// <returns></returns> private static int GetEndEmptyLineCount(List<string> lines) { int count = 0; for (int index = lines.Count - 1; index >= 0 && string.IsNullOrEmpty(lines[index]); index--) { count++; } return count; } private string GetUnknownValue(int splitLineCount, List<string> unknownLines) { string followingValue = string.Empty; if (unknownLines.Count > splitLineCount) { var addLine = unknownLines.Count - splitLineCount; StringBuilder builder = new StringBuilder(64); for (int index = 0; index < addLine; index++) { var unknownLine = unknownLines[index]; if (!string.IsNullOrEmpty(unknownLine)) { builder.Append(unknownLine); } builder.Append(newLine); followingValue = builder.ToString(); } } return followingValue; } private void AddUnknownItem(List<EventStreamEventArgs> addedItems, string unknownValue) { var item = new EventStreamEventArgs(unknownValue); AddItem(addedItems, item); } private void AddUnknownItem(List<EventStreamEventArgs> addedItems, int splitLineCount, List<string> unknownLines) { var unknownValue = GetUnknownValue(splitLineCount, unknownLines); if (!string.IsNullOrEmpty(unknownValue)) { AddUnknownItem(addedItems, unknownValue); } } private void AddItem(List<EventStreamEventArgs> addedItems, string idSource, string eventSource, string dataSource) { var item = new EventStreamEventArgs(idSource, eventSource, dataSource); AddItem(addedItems, item); } private void AddItem(List<EventStreamEventArgs> addedItems, EventStreamEventArgs item) { addedItems.Add(item); } #endregion } /// <summary> /// 一个消息体。 /// </summary> public class EventStreamEventArgs : EventArgs { internal const string IdBegin = "id: "; internal const string EventBegin = "event: "; internal const string DataBegin = "data: "; /// <summary> /// 重连消息(Reconnect Message) /// 服务器发送的重连消息,用于告知客户端重新连接服务器。 /// 示例: /// event: reconnect /// retry: 5000 /// </summary> internal const string RetryBegin = "retry: "; [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] private readonly string idSource; [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] private readonly string eventSource; [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] private readonly string dataSource; [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] private string idValue; [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] private string eventValue; [System.Diagnostics.DebuggerBrowsable(System.Diagnostics.DebuggerBrowsableState.Never)] private string dataValue; public EventStreamEventArgs(string idSource, string eventSource, string dataSource) { this.idSource = idSource; this.eventSource = eventSource; this.dataSource = dataSource; } public EventStreamEventArgs(string unknownValue) { this.UnknownValue = unknownValue; } public string IdValue { get { if (idValue == null) idValue = GetValue(idSource, IdBegin); return idValue; } } public string EventValue { get { if (eventValue == null) eventValue = GetValue(eventSource, EventBegin); return eventValue; } } public string DataValue { get { if (dataValue == null) dataValue = GetValue(dataSource, DataBegin); return dataValue; } } /// <summary> /// 上1个 <see cref="EventStreamEventArgs"/> 中的 <see cref="DataValue"/> 后面跟随的未知内容。 /// </summary> public string UnknownValue { get; private set; } private string GetValue(string source, string begin) { string value; if (source != null && source.Length > begin.Length) { value = source.Substring(begin.Length, source.Length - begin.Length); } else { value = string.Empty; } return value; } } }