SuperSocket 2.0從入門到懵逼
- SuperSocket 2.0從入門到懵逼
- 1 使用SuperSocket 2.0在AspNetCore專案中搭建一個Socket服務器
- 1.1 引入SuperSocket 2.0
- 1.2 在AspNetCore中搭建一個Socket服務器
- 2 基本的協議概念
- 2.1 基本協議種類
- 2.1.1 固定頭格式協議
- 2.1.2 固定頭尾標識協議
- 2.1.3 固定包大小協議
- 2.1.4 命令列協議
- 2.1.5 一些其它協議
- PS: 關于協議的一些硬體廠商的私有協議比較奇葩, 他們的協議五花八門的...不過我們這里不做闡述, 有時間我會再講
- 2.1 基本協議種類
- 3 SuperSocket中的幾個基本概念
- 3.1 Package Type
- 3.2 PipelineFilter Type
- 3.3 使用PackageType和PipelineFilter Type創建SuperSocket
- 4 SuperSocket中的PipelineFilter, 實作自己的PipelineFilter
- 4.1 內置的PipelineFilter模板
- 4.2 基于內置模板實作PipelineFilter
- 4.2.1 FixedHeaderPipelineFilter-頭部格式固定并且包含內容長度的協議
- 4.2.3 另一種掛載決議器的方式
- 7 擴展AppSession和SuperSocketService
- 7.1 擴展AppSession
- 7.2 如何自己實作SuperSocketService?
- 8 擴展SuperSocket的功能
- 8.1 多協議切換
- 9 搭建WebSocket服務器
- 9.1 番外: WebSocket傳參
- 10 多服務器以及不同服務間的協同
- More 1 協議的編解碼器開發預覽
- More 2 DotNetty
- 1 使用SuperSocket 2.0在AspNetCore專案中搭建一個Socket服務器
附帶實作, 講解, 與部分源代碼解讀
1 使用SuperSocket 2.0在AspNetCore專案中搭建一個Socket服務器
1.1 引入SuperSocket 2.0
SuperSocket 2.0目前仍然處于Beta階段, 但是功能基本可靠, 可以用于生產環境.
可以從Github源地址自行fork代碼進行其它修改, 如需直接參考, 在默認的Nuget服務器是沒有的, 需要添加作者自己的Nuget服務器https://www.myget.org/F/supersocket/api/v3/index.json, 獲取預覽版本.
1.2 在AspNetCore中搭建一個Socket服務器
使用SuperSocket 2.0快速搭建一個Socket服務器非常簡單, 在框架中, 作者實作了一個SuperSocketHostBuilder進行構建, 我們要做的只是簡單的配置一些引數和業務邏輯.
此外, SuperSocketHostBuilder是可以直接嵌入到AspNetCore專案的CreateHostBuilder上的, 但是并不推薦這么做...
- 配置資料包和過濾器/選擇器
- 資料包, 即我們進行Socket通信時的資料包, 可以是包物件, 也可以是
byte[], 如果是包物件, 則需要在過濾器中配置解碼器. - 過濾器, 作用是對資料包進行篩選然后截取一包資料, 可以將解碼器掛載進來, 直接將二進制資料映射為物件.
- 選擇器, 我們可以使用選擇器, 來實作一個埠服務于多種協議的情況, 此時, 一個選擇器則會搭配多個過濾器進行使用.
- 資料包, 即我們進行Socket通信時的資料包, 可以是包物件, 也可以是
- 配置IP和埠
- 配置IP和埠可以選擇兩種方式, 一種方式是從程式寫入, 另一種是從組態檔中寫入.
- 在本系列的介紹中, 我們大多數采用程式寫入的方式, 從組態檔寫入的方式, 后續會采用其它方式實作, 來擴展業務.
- 配置Session
-
在程式中作者內置了
IAppSession和AppSession供我們使用, 如果我們需要自定義Session, 則需要繼承AppSession并且在程式中進行參考, 即可切換為我們定義的Session. -
自定義Session時, 由于在程式中大多數提供的引數都為
IAppSession, 所以, 需要實作SuperSocket的更多其它介面進行重寫, 來維持程式運轉. -
自定義Session大多數時候是為了添加更多自定義屬性, 作者在設計中提供了另外的方式提供我們選擇:
// AppSession原始碼 public class AppSession : IAppSession, ILogger, ILoggerAccessor { //...... private Dictionary<object, object> _items; public object this[object name] { get { var items = _items; if (items == null) return null; object value; if (items.TryGetValue(name, out value)) return value; return null; } set { lock (this) { var items = _items; if (items == null) items = _items = new Dictionary<object, object>(); items[name] = value; } } } //...... }可以看到, 其中內置了一個字典, 我們可以將屬性的
Key-Value值直接存在字典中, 即session["prop"] = value;的形式.
-
- 配置SessionHandler
- SessionHandler會在建立和斷開Socket連接時觸發, 用于處理連接和斷開時的業務, 可以在
SuperSocketHostBuilder中直接配置, 也可以通過重寫相應的介面實作. - 連接時, 會將創建好的Session傳入該方法.
- 斷開時, 會將斷開的Session以及觸發斷開的事件傳入該方法.
- SessionHandler會在建立和斷開Socket連接時觸發, 用于處理連接和斷開時的業務, 可以在
- 配置PackageHandler
- PackageHandler會在接收到資料包時觸發.
- 該方法自動觸發時我們將獲取兩個引數, 一個是Session, 一個是Package. 但是要注意的是, 這里的Session是
IAppSession型別, 并不是我們自定義的Session.- Session即為當前Socket連接, 里面附帶了各種連接資訊以及狀態等.
- Package是通過過濾器后得到的資料包, 不過要注意的是, 例如:如果資料包為頭尾識別符號的資料包, 如果采用的是
byte[]的形式, 得到的可能不是原包, 而是去除了包頭尾標識后的資料體. 即:7E 01 02 03 7E, 去掉頭尾的7E標識得到的是01 02 03.
- Build & Run
- 構建這里同時提供了幾種方式, 推薦采用
BuildAsServer(), 然后通過StartAsync()進行啟用.
- 構建這里同時提供了幾種方式, 推薦采用
此時, 一個Socket服務器就搭建完成了. 具體實作:
// SampleSession
public class SampleSession: AppSession
{
}
// Socket Server代碼
var tcpHost = SuperSocketHostBuilder.Create<byte[], PipelineFilter>()
.ConfigureSuperSocket(options =>
{
options.AddListener(new ListenOptions
{
Ip = "Any",
Port = 4040
})
.AddListener(new ListenOptions()
{
Ip = "Any",
Port = 8888
});
})
.UseSession<JT808TcpSession>()
.UseClearIdleSession()
.UseSessionHandler(s =>
{
})
.UsePackageHandler(async (s, p) =>
{
//解包/應答/轉發
})
.ConfigureErrorHandler((s, v) =>
{
})
.UseMiddleware<InProcSessionContainerMiddleware>()
.UseInProcSessionContainer()
.BuildAsServer();
await tcpHost.RunAsync();
.UseClearIdleSession()請務必呼叫, 在使用各類Socket框架時, 不可避免的我們的應用程式都會維持大量的僵尸連接, SuperSocket中提供了UseClearIdleSession()來自動復用已經閑置或者失去連接的資源.
2 基本的協議概念
2.1 基本協議種類
2.1.1 固定頭格式協議
顧名思義, 這類協議的Header是固定的, 并且一般Header的長度是固定的, 但是也有例外情況, 此外, Header中也會包含資料體的長度等資訊. 然后可以根據長度來截取一包資料.
2.1.2 固定頭尾標識協議
這類協議的資料包, 前幾位元組和后幾位元組是固定的, 這樣就可以通過頭尾的標識來截取一包資料.
通常, 這類協議的資料包, 為了避免資料內容和協議頭、尾的標識沖突, 通常會設定轉義, 即將資料包中出現頭尾標識的地方, 轉義為其它資料, 避免識別時出現錯誤.
2.1.3 固定包大小協議
這類協議每一包資料的大小都是固定的, 所以可以直接根據長度進行讀取.
2.1.4 命令列協議
這類協議通常以\r\n結尾, 采用字串轉為二進制流進行傳輸.
2.1.5 一些其它協議
PS: 關于協議的一些硬體廠商的私有協議比較奇葩, 他們的協議五花八門的...不過我們這里不做闡述, 有時間我會再講
3 SuperSocket中的幾個基本概念
3.1 Package Type
Package Type即包型別, 這里描述的是資料包的結構, 例如SuperSocket中就提供了一些基礎的包型別TextPackageInfo等.
public class TextPackageInfo
{
public string Text{get; set;}
}
這里的TextPackageInfo標識了這型別別的資料包中, 僅包含了一個字串, 當然, 我們通常會有更復雜的網路資料包結構.例如, 我將在下列展示一個包含首尾標識的通信包, 它包含了首尾標識, 訊息號, 終端Id, 以及訊息體:
public class SamplePackage
{
public byte Begin{get; set;}
public MessageId MessageId{get; set;}
public string TerminalId{get; set;}
public SampleBody Body{get; set;}
public byte End{get; set;}
}
當然, 在SuperSocket中也提供了一些介面供我們實作一些類似格式的包, 不過個人不太喜歡這種方式, 官方檔案也舉了一些例子, 例如,有的包會有一個特殊的欄位來代表此包內容的型別. 我們將此欄位命名為 "Key". 此欄位也告訴我們用何種邏輯處理此型別的包. 這是在網路應用程式中非常常見的一種設計. 例如,你的 Key 欄位是整數型別,你的包型別需要實作介面IKeyedPackageInfo:
public class MyPackage : IKeyedPackageInfo<int>
{
public int Key { get; set; }
public short Sequence { get; set; }
public string Body { get; set; }
}
3.2 PipelineFilter Type
這種型別在網路協議決議中作用重要. 它定義了如何將 IO 資料流解碼成可以被應用程式理解的資料包. 換句話說, 就是把你的二進制流資料, 能夠一包一包的識別出來, 同時可以決議成你構建的Package物件. 當然, 你也可以選擇不構建, 然后將源資料直接回傳.
這些是 PipelineFilter 的基本介面. 你的系統中至少需要一個實作這個介面的 PipelineFilter 型別.
public interface IPipelineFilter
{
void Reset();
object Context { get; set; }
}
public interface IPipelineFilter<TPackageInfo> : IPipelineFilter
where TPackageInfo : class
{
IPackageDecoder<TPackageInfo> Decoder { get; set; }
TPackageInfo Filter(ref SequenceReader<byte> reader);
IPipelineFilter<TPackageInfo> NextFilter { get; }
}
事實上,由于 SuperSocket 已經提供了一些內置的 PipelineFilter 模版,這些幾乎可以覆寫 90% 的場景的模版極大的簡化了你的開發作業. 所以你不需要完全從頭開始實作 PipelineFilter. 即使這些內置的模版無法滿足你的需求,完全自己實作PipelineFilter也不是難事.
3.3 使用PackageType和PipelineFilter Type創建SuperSocket
你定義好 Package 型別和 PipelineFilter 型別之后,你就可以使用 SuperSocketHostBuilder 創建 SuperSocket 宿主了,
var host = SuperSocketHostBuilder.Create<StringPackageInfo, CommandLinePipelineFilter>();
在某些情況下,你可能需要實作介面 IPipelineFilterFactory 來完全控制 PipelineFilter 的創建,
public class MyFilterFactory : PipelineFilterFactoryBase<TextPackageInfo>
{
protected override IPipelineFilter<TPackageInfo> CreateCore(object client)
{
return new FixedSizePipelineFilter(10);
}
}
然后在 SuperSocket 宿主被創建出來之后啟用這個 PipelineFilterFactory:
var host = SuperSocketHostBuilder.Create<StringPackageInfo>();
host.UsePipelineFilterFactory<MyFilterFactory>();
4 SuperSocket中的PipelineFilter, 實作自己的PipelineFilter
4.1 內置的PipelineFilter模板
SuperSocket中內置了一些PipelineFilter模板, 這些模板幾乎可以覆寫到90%的應用場景, 極大簡化了開發作業, 所以不需要完全從頭開始實作PipelineFilter. 即使這些內置的模板無法滿足你的需求, 完全自己實作PipelineFilter.
SuperSocket提供了這些PipelineFilter模板:
- TerminatorPipelineFilter (SuperSocket.ProtoBase.TerminatorPipelineFilter, SuperSocket.ProtoBase)
- TerminatorTextPipelineFilter (SuperSocket.ProtoBase.TerminatorTextPipelineFilter, SuperSocket.ProtoBase)
- LinePipelineFilter (SuperSocket.ProtoBase.LinePipelineFilter, SuperSocket.ProtoBase)
- CommandLinePipelineFilter (SuperSocket.ProtoBase.CommandLinePipelineFilter, SuperSocket.ProtoBase)
- BeginEndMarkPipelineFilter (SuperSocket.ProtoBase.BeginEndMarkPipelineFilter, SuperSocket.ProtoBase)
- FixedSizePipelineFilter (SuperSocket.ProtoBase.FixedSizePipelineFilter, SuperSocket.ProtoBase)
- FixedHeaderPipelineFilter (SuperSocket.ProtoBase.FixedHeaderPipelineFilter, SuperSocket.ProtoBase)
4.2 基于內置模板實作PipelineFilter
4.2.1 FixedHeaderPipelineFilter-頭部格式固定并且包含內容長度的協議
這種協議講請求定義為兩大部分, 第一部分定義了包含第二部分長度等等基礎資訊, 我們通常稱第一部分為頭部.
例如, 我們有一個這樣的協議: 頭部包含 3 個位元組, 第 1 個位元組用于存盤請求的型別, 后兩個位元組用于代表請求體的長度:
/// +-------+---+-------------------------------+
/// |request| l | |
/// | type | e | request body |
/// | (1) | n | |
/// | |(2)| |
/// +-------+---+-------------------------------+
根據此協議的規范, 我們可以使用如下代碼定義包的型別:
public class MyPackage
{
public byte Key { get; set; }
public string Body { get; set; }
}
下一個是設計PipelineFilter:
public class MyPipelineFilter : FixedHeaderPipelineFilter<MyPackage>
{
public MyPipelineFilter()
: base(3) // 包頭的大小是3位元組,所以將3傳如基類的構造方法中去
{
}
// 從資料包的頭部回傳包體的大小
protected override int GetBodyLengthFromHeader(ref ReadOnlySequence<byte> buffer)
{
var reader = new SequenceReader<byte>(buffer);
reader.Advance(1); // skip the first byte
reader.TryReadBigEndian(out short len);
return len;
}
// 將資料包決議成 MyPackage 的實體
protected override MyPackage DecodePackage(ref ReadOnlySequence<byte> buffer)
{
var package = new MyPackage();
var reader = new SequenceReader<byte>(buffer);
reader.TryRead(out byte packageKey);
package.Key = packageKey;
reader.Advance(2); // skip the length
package.Body = reader.ReadString();
return package;
}
}
最后,你可通過資料包的型別和 PipelineFilter 的型別來創建宿主:
var host = SuperSocketHostBuilder.Create<MyPackage, MyPipelineFilter>()
.UsePackageHandler(async (s, p) =>
{
// handle your package over here
}).Build();
你也可以通過將決議包的代碼從 PipelineFilter 移到 你的包解碼器中來獲得更大的靈活性:
public class MyPackageDecoder : IPackageDecoder<MyPackage>
{
public MyPackage Decode(ref ReadOnlySequence<byte> buffer, object context)
{
var package = new MyPackage();
var reader = new SequenceReader<byte>(buffer);
reader.TryRead(out byte packageKey);
package.Key = packageKey;
reader.Advance(2); // skip the length
package.Body = reader.ReadString();
return package;
}
}
通過 host builder 的 UsePackageDecoder 方法來在SuperSocket中啟用它:
builder.UsePackageDecoder<MyPackageDecoder>();
4.2.3 另一種掛載決議器的方式
在Asp.Net Core Application我們可以new(), 直接注入或者采用工廠模式等方式, 向Host中注入協議決議器, 然后在過濾波器中進行使用.
public class MyPipelineFilter : FixedHeaderPipelineFilter<MyPackage>
{
public readonly PacketConvert _packageConvert;
public MyPipelineFilter()
: base(3) // 包頭的大小是3位元組,所以將3傳如基類的構造方法中去
{
_packageConvert = new PackageConvert();
}
// 從資料包的頭部回傳包體的大小
protected override int GetBodyLengthFromHeader(ref ReadOnlySequence<byte> buffer)
{
var reader = new SequenceReader<byte>(buffer);
reader.Advance(1); // skip the first byte
reader.TryReadBigEndian(out short len);
return len;
}
// 將資料包決議成 MyPackage 的實體
protected override MyPackage DecodePackage(ref ReadOnlySequence<byte> buffer)
{
var package = _packageConvert.Deserialize<Package>(buffer);
return package;
}
}
PS: 過濾器中DecodePackage回傳的buffer可能不是完整的包, 例如固定頭尾結構的包中, 回傳的buffer可能是去掉頭尾的格式
例如固定頭尾的包
0x7E 0x7E xxxxxxx 0x7E 0x7E, 回傳的buffer中頭尾的0x7E 0x7E會被去除, 只留下中間xxxxxxx的部分,所以在實作解碼器部分的時候需要注意.
7 擴展AppSession和SuperSocketService
7.1 擴展AppSession
在SuperSocket關于Socket的管理提供了SessionContainer供大家獲取程式中的Session實體, 只需在構建中呼叫.UseMiddleware<InProcSessionContainerMiddleware>()和UseInProcSessionContainer()即可通過AppSession.Server.SessionContainer()獲取.
但是為了方便管理, 個人角色還是實作一個另外的SessionManager比較好, 這樣可以更方便的集成到我們的Asp.Net Core Application中. 使用ConcurrentDictionary原子字典來存盤, 可以避免一些讀寫上的死鎖問題.
public class SessionManager<TSession> where TSession : IAppSession
{
/// <summary>
/// 存盤的Session
/// </summary>
public ConcurrentDictionary<string, TSession> Sessions { get; private set; } = new();
/// <summary>
/// Session的數量
/// </summary>
public int Count => Sessions.Count;
/// <summary>
/// </summary>
public SessionManager()
{
}
public ConcurrentDictionary<string, TSession> GetAllSessions()
{
return Sessions;
}
/// <summary>
/// 獲取一個Session
/// </summary>
/// <param name="key"> </param>
/// <returns> </returns>
public virtual async Task<TSession> TryGet(string key)
{
return await Task.Run(() =>
{
Sessions.TryGetValue(key, out var session);
return session;
});
}
/// <summary>
/// 添加或者更新一個Session
/// </summary>
/// <param name="key"> </param>
/// <param name="session"> </param>
/// <returns> </returns>
public virtual async Task TryAddOrUpdate(string key, TSession session)
{
await Task.Run(() =>
{
if (Sessions.TryGetValue(key, out var oldSession))
{
Sessions.TryUpdate(key, session, oldSession);
}
else
{
Sessions.TryAdd(key, session);
}
});
}
/// <summary>
/// 移除一個Session
/// </summary>
/// <param name="key"> </param>
/// <returns> </returns>
public virtual async Task TryRemove(string key)
{
await Task.Run(() =>
{
if (Sessions.TryRemove(key, out var session))
{
}
else
{
}
});
}
/// <summary>
/// 通過Session移除Session
/// </summary>
/// <param name="sessionId"> </param>
/// <returns> </returns>
public virtual async Task TryRemoveBySessionId(string sessionId)
{
await Task.Run(() =>
{
foreach (var session in Sessions)
{
if (session.Value.SessionID == sessionId)
{
Sessions.TryRemove(session);
return;
}
}
});
}
/// <summary>
/// 洗掉僵尸鏈接
/// </summary>
/// <returns> </returns>
[Obsolete("該方法丟棄", true)]
public virtual async Task TryRemoveZombieSessions()
{
await Task.Run(() =>
{
});
}
/// <summary>
/// 移除所有Session
/// </summary>
/// <returns> </returns>
public virtual async Task TryRemoveAll()
{
await Task.Run(() =>
{
Sessions.Clear();
});
}
/// <summary>
/// </summary>
/// <param name="session"> </param>
/// <param name="buffer"> </param>
/// <returns> </returns>
public virtual async Task SendAsync(TSession session, ReadOnlyMemory<byte> buffer)
{
if (session == null)
{
throw new ArgumentNullException(nameof(session));
}
await session.SendAsync(buffer);
}
/// <summary>
/// </summary>
/// <param name="session"> </param>
/// <param name="message"> </param>
/// <returns> </returns>
public virtual async Task SendAsync(ClientSession session, string message)
{
if (session == null)
{
throw new ArgumentNullException(nameof(session));
}
// ReSharper disable once PossibleNullReferenceException
await session.SendAsync(message);
}
/// <summary>
/// </summary>
/// <param name="session"> </param>
/// <returns> </returns>
public virtual async Task<Guid> FindIdBySession(TSession session)
{
return await Task.Run(() =>
{
return Guid.Parse(Sessions.First(x => x.Value.SessionID.Equals(session.SessionID)).Key);
});
}
}
7.2 如何自己實作SuperSocketService?
我們在使用SuperSocket時需要在Program.cs中來構建, 這樣會導致一個問題, 這樣我們的SuperSocket服務就會變得難以控制, 那么有沒有一種寫法來將這部分代碼抽離出來呢?
答案是有的, 我們可以采用.Net Core中的BackgroundService或者IHostedService來實作后臺服務, 甚至將這些服務管理起來, 根據需要隨時創建, 隨時啟動, 隨時停止. 這樣做的好處還有, 我們可以隨時獲取依賴注入的服務來做一些更多的操作, 例如讀取配置, 管理Session, 配置編解碼器, 日志, 應答器, MQ等等.
public class TcpSocketServerHostedService : IHostedService
{
private readonly IOptions<ServerOption> _serverOptions;
private readonly IOptions<KafkaOption> _kafkaOptions;
private readonly ClientSessionManagers _clientSessionManager;
private readonly TerminalSessionManager _gpsTrackerSessionManager;
private readonly ILogger<TcpSocketServerHostedService> _logger;
private readonly IGeneralRepository _generalRepository;
private readonly NbazhGpsSerializer _nbazhGpsSerializer = new NbazhGpsSerializer();
private static EV26MsgIdProducer _provider = null;
/// <summary>
/// Tcp Server服務
/// </summary>
/// <param name="serverOptions"> </param>
/// <param name="kafkaOptions"> </param>
/// <param name="clientSessionManager"> </param>
/// <param name="gpsTrackerSessionManager"> </param>
/// <param name="logger"> </param>
/// <param name="factory"> </param>
public TcpSocketServerHostedService(
IOptions<ServerOption> serverOptions,
IOptions<KafkaOption> kafkaOptions,
ClientSessionManagers clientSessionManager,
TerminalSessionManager gpsTrackerSessionManager,
ILogger<TcpSocketServerHostedService> logger,
IServiceScopeFactory factory)
{
_serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions));
_kafkaOptions = kafkaOptions;
_clientSessionManager = clientSessionManager ?? throw new ArgumentNullException(nameof(clientSessionManager));
_gpsTrackerSessionManager = gpsTrackerSessionManager ?? throw new ArgumentNullException(nameof(gpsTrackerSessionManager));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_generalRepository = factory.CreateScope().ServiceProvider.GetRequiredService<IGeneralRepository>();
}
/// <summary>
/// </summary>
/// <param name="cancellationToken"> </param>
/// <returns> </returns>
public async Task StartAsync(CancellationToken cancellationToken)
{
var host = SuperSocketHostBuilder.Create<NbazhGpsPackage, ProtocolPipelineSwitcher>()
.ConfigureSuperSocket(opts =>
{
foreach (var listener in _serverOptions.Value.TcpListeners)
{
opts.AddListener(new ListenOptions()
{
Ip = listener.Ip,
Port = listener.Port
});
}
})
.UseSession<GpsTrackerSession>()
.UseClearIdleSession()
.UseSessionHandler(onClosed: async (s, v) =>
{
try
{
// Session管理
await _gpsTrackerSessionManager.TryRemoveBySessionId(s.SessionID);
}
catch
{
// ignored
}
})
.UsePackageHandler(async (s, packet) =>
{
// 處理包
})
.UseInProcSessionContainer()
.BuildAsServer();
await host.StartAsync();
await Task.CompletedTask;
}
/// <summary>
/// </summary>
/// <param name="cancellationToken"> </param>
/// <returns> </returns>
public async Task StopAsync(CancellationToken cancellationToken)
{
try
{
await _gpsTrackerSessionManager.TryRemoveAll();
}
catch
{
// ignored
}
await Task.CompletedTask;
}
}
實作服務后, 我們可以寫擴展方法將服務注入.
/// <summary>
/// 服務器擴展
/// </summary>
public static class ServerBuilderExtensions
{
/// <summary>
/// 添加Tcp服務器
/// </summary>
/// <param name="services"> </param>
/// <returns> </returns>
public static IServiceCollection AddTcpServer(
this IServiceCollection services)
{
services.AddSingleton<TerminalSessionManager>();
services.AddHostedService<TcpSocketServerHostedService>();
return services;
}
/// <summary>
/// 添加Ws服務器
/// </summary>
/// <param name="services"> </param>
/// <returns> </returns>
public static IServiceCollection AddWsServer(
this IServiceCollection services)
{
services.AddSingleton<ClientSessionManagers>();
services.AddHostedService<WebSocketServerHostedService>();
return services;
}
}
8 擴展SuperSocket的功能
8.1 多協議切換
我們有時候會面對一種需求, 就是同一個介面, 需要接收不同的終端的協議包, 這樣我們通常會根據協議的不同點來區分協議. PS: 協議最好是同型別協議, 并且有明顯不同的特征!
具體的實作方式就是實作一個特殊的PipelineFilter, 在下列代碼中, 我們將讀取該包資料的第一個位元組來分辨該協議為0x78 0x78型別開頭的協議還是0x79 0x79開頭的協議, 然后將標記移回改包開頭, 然后將這一包資料交給對應的過濾器來進行決議:
// NbazhGpsPackage: 包編解碼器
public class ProtocolPipelineSwitcher : PipelineFilterBase<NbazhGpsPackage>
{
private IPipelineFilter<NbazhGpsPackage> _filter7878;
private byte _beginMarkA = 0x78;
private IPipelineFilter<NbazhGpsPackage> _filter7979;
private byte _beginMarkB = 0x79;
public ProtocolPipelineSwitcher()
{
_filter7878 = new EV26PipelineFilter7878(this);
_filter7979 = new EV26PipelineFilter7979(this);
}
public override NbazhGpsPackage Filter(ref SequenceReader<byte> reader)
{
if (!reader.TryRead(out byte flag))
{
throw new ProtocolException(@"flag byte cannot be read");
}
if (flag == _beginMarkA)
{
NextFilter = _filter7878;
}
else if (flag == _beginMarkB)
{
NextFilter = _filter7979;
}
else
{
return null;
//throw new ProtocolException($"首位元組未知 {flag}");
}
// 將標記移回開頭
reader.Rewind(1);
return null;
}
}
9 搭建WebSocket服務器
WebSocket Server的實作方式與之前的Socket Server實作方式大致相同, 其中不同的地方主要為: WebSocket Server不需要配置編解碼器, 采用String作為訊息格式等.
/// <summary>
/// </summary>
public class WebSocketServerHostedService : IHostedService
{
private readonly IOptions<ServerOption> _serverOptions;
private readonly ClientSessionManagers _clientSessionManager;
private readonly TerminalSessionManager _gpsTrackerSessionManager;
private readonly IGeneralRepository _generalRepository;
/// <summary>
/// </summary>
/// <param name="serverOptions"> </param>
/// <param name="clientSessionManager"> </param>
/// <param name="gpsTrackerSessionManager"> </param>
/// <param name="factory"> </param>
public WebSocketServerHostedService(
IOptions<ServerOption> serverOptions,
ClientSessionManagers clientSessionManager,
TerminalSessionManager gpsTrackerSessionManager,
IServiceScopeFactory factory)
{
_serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(_serverOptions));
_clientSessionManager = clientSessionManager ?? throw new ArgumentNullException(nameof(clientSessionManager));
_gpsTrackerSessionManager = gpsTrackerSessionManager ?? throw new ArgumentNullException(nameof(gpsTrackerSessionManager));
_generalRepository = factory.CreateScope().ServiceProvider.GetRequiredService<IGeneralRepository>();
}
/// <summary>
/// WebSocketServer
/// </summary>
/// <param name="cancellationToken"> </param>
/// <returns> </returns>
public async Task StartAsync(CancellationToken cancellationToken)
{
var host = WebSocketHostBuilder.Create()
.ConfigureSuperSocket(opts =>
{
foreach (var listener in _serverOptions.Value.WsListeners)
{
opts.AddListener(new ListenOptions()
{
Ip = listener.Ip,
Port = listener.Port
});
}
})
.UseSession<ClientSession>()
.UseClearIdleSession()
.UseSessionHandler(onClosed: async (s, v) =>
{
await _clientSessionManager.TryRemoveBySessionId(s.SessionID);
})
.UseWebSocketMessageHandler(async (s, p) =>
{
var package = p.Message.ToObject<ClientPackage>();
if (package.PackageType == PackageType.Heart)
{
return;
}
if (package.PackageType == PackageType.Login)
{
var client = _generalRepository.FindAsync<User>(x => x.Id.Equals(Guid.Parse(package.ClientId)));
if (client is null)
{
await s.CloseAsync(CloseReason.ProtocolError, "ClientId不存在");
}
var verifyCode = Guid.NewGuid().ToString();
var loginPacket = new ClientPackage()
{
PackageType = PackageType.Login,
ClientId = package.ClientId,
VerifyCode = verifyCode,
};
s["VerifyCode"] = verifyCode;
var msg = loginPacket.ToJson();
await s.SendAsync(msg);
}
// 追蹤
if (package.PackageType == PackageType.Trace)
{
return;
}
})
.UseInProcSessionContainer()
.BuildAsServer();
await host.StartAsync();
await Task.CompletedTask;
}
/// <summary>
/// </summary>
/// <param name="cancellationToken"> </param>
/// <returns> </returns>
public async Task StopAsync(CancellationToken cancellationToken)
{
await Task.CompletedTask;
}
}
9.1 番外: WebSocket傳參
WebSocket的第一次請求是基于Http進行建立鏈接的, 所以WebSocket是可以在url中或者請求體中攜帶token等引數的. 當然后端并不像寫Api時那么簡單的就可以獲取, 需要截取當前請求的Url或者攜帶的資訊, 然后進行讀取, 進而進行驗證等操作. 這部分的代碼之后再補充...
//.Net Core從Url中讀取引數
10 多服務器以及不同服務間的協同
得益于我們實作的SessionManager, 我們將不用Server的SessionManger注入DI后, 我們可以在任意Server的Service中做到跨Service進行訊息傳遞, 驗證等等操作.
More 1 協議的編解碼器開發預覽
協議編解碼器樣例:
EV26 Gps通信協議(使用方法在xUnit測驗中):
- Github
- Gitee
簡單樣例:
public class Nbazh0X01Test
{
private readonly ITestOutputHelper _testOutputHelper;
private NbazhGpsSerializer NbazhGpsSerializer = new NbazhGpsSerializer();
public Nbazh0X01Test(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}
[Fact]
public void Test1()
{
//78 78 11 01 07 52 53 36 78 90 02 42 70 00 32 01 00 05 12 79 0D 0A
var hex = "7878 11 01 07 52 53 36 78 90 02 42 7000 3201 0005 1279 0D0A".ToHexBytes();
// ----協議決議部分----//
var packet = NbazhGpsSerializer.Deserialize(hex);
Nbazh0X01 body = (Nbazh0X01)packet.Bodies;
// ----協議決議部分----//
Assert.Equal(0x11, packet.Header.Length);
Assert.Equal(0x01, packet.Header.MsgId);
Assert.Equal("7 52 53 36 78 90 02 42".Replace(" ", ""), body.TerminalId);
Assert.Equal(0x7000, body.TerminalType);
//Assert.Equal(0x3201, body.TimeZoneLanguage.Serialize());
Assert.Equal(0x0005, packet.Header.MsgNum);
Assert.Equal(0x1279, packet.Header.Crc);
// 時區 0011 001000000001
}
}
More 2 DotNetty
以后我們會探究DotNetty與SuperSocket的異同.
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/431926.html
標籤:.NET Core
