==>>點擊查看本系列文章目錄
首先看一下幾種注冊中心:
最老的就是Zookeeper了, 比較新的有Eureka,Consul 都可以做注冊中心,可以自行搜索對比三者的優缺點,
Zookeeper 最開始就是hadoop大家族中的一員,用于做協調的框架,后來已經是apache的子專案了,
幾年前大資料很火的時候,只要學hadoop必學zookeeper,當然還有其他成員,
大資料簡單說就是分布式,比如分布式檔案存盤hdfs,分布式資料庫hbase,分布式協調zookeeper,還有kafka,Flume等等都是hadoop大家族,
zookeeper,現在更多被用來做注冊中心,比如阿里的開源SOA框架dubbo就經常搭配zookeeper做注冊中心,
Eureka:java的微服務框架Spring Cloud中內部已經集成了Eureka注冊中心,
我選擇zookeeper,不是因為他比另外兩個強,而是因為我幾年前就已經學習過一些zookeeper的原理,上手更容易,網路上學習書籍、資料、視頻教程也特別多,學習資料完善,
注冊中心的基本功能:
1. 注冊服務,有點類似DNS,所有的服務注冊到注冊中心,包含服務的地址等資訊,
2. 服務訂閱,客戶端請求服務,注冊中心就要把那些能用的服務器地址告訴客戶端,服務端有變動時,注冊中心也能及時通知到客戶端,
3. 性能好且高可用,注冊中心自身也是一個集群,如果只有一個注冊中心機器的話那豈不是把注冊中心累死啊,而且他一旦壞了以后,那客戶端都找不到服務器了,所有注冊中心就有很多臺,其中只有一個老大(leader),老大用來寫,小弟用來讀,就是說老大來決定一臺服務器能不能注冊進來,小弟負責幫助客戶端查找服務器,因為注冊服務的次數是很少的,通常有新服務器加入才需要注冊,但是客戶端訂閱那就很多了,所以注冊中心只有一個leader,leader萬一壞掉的話,會從小弟中選舉出一個來當老大接替作業,
上面提到說zookeeper集群,就是說有很多臺機器做zookeeper機器,但是這些機器里存盤的東西基本上都是一樣的,就是說客戶端不管連到哪個zookeeper 都是一樣的,能做服務訂閱,
每一個zookeeper 中都有很多節點(Znode),
接下來說的zookeeper節點和集群完全不是一回事, 有些人喜歡吧集群中的每一臺zookeeper機器稱為一個節點,但是這個節點(zookeeper機器)和我說的節點(Znode)完全不是一回事,
如下圖:

本例的圖中可以看到,一共有5臺機器,每臺機器都有5個znode,Znode下面的子節點就更多了,
先看5臺機器:
一臺leader,老大,上文已經介紹,服務都從這些注冊寫入,
兩臺follower,小弟,平時用于服務訂閱,老大掛掉以后,follower內部就會自行選出老大,
兩臺observer,觀察者,就是屬于無業游民,只能看,沒有選老大的資格,不能參與競選也不能投票,唯一的功能就是服務訂閱,
observer模式需要手動開啟,為什么會出現observer呢,是因為機器太多的話,每個機器都有選舉權的話特別影響性能,全中國14億人口,每個人都參與國家競選的話,效率極低,所以呢,選舉的作業就交給follower完成就行了,只需要確保一直都有leader接班人就好,
再看看zookeeper有什么基本功能:
基本功能很簡單,組合以后卻可以完成各種復雜作業,
1. 可以創建:臨時節點(斷開連接時便洗掉節點) 和 持久化節點(必須手動洗掉節點),
2. 可以創建:無序節點 和 有序節點,
3. 節點上可以添加watcher監聽功能,監聽該節點的增刪改,然后觸發自定義的事件,
看看這些功能怎么用:
1. 節點: 每次注冊一個服務就創建一個節點,節點的名稱(Key)就是服務的名稱,服務的詳細資訊存盤在節點value中,客戶端通過key找到對應的節點,再找打節點中的value,
2. 臨時節點:服務端注冊一個服務時創建一個臨時節點,服務斷開時,臨時節點自動銷毀,自動完成服務注銷,
3. watcher監聽: 客戶端在注冊中心訂閱了一個服務的時候,同時在這個服務所在的節點上加一個監聽事件,每當服務節點資訊有變化的時候,注冊中心會自動回呼通知客戶端,
4. 有序臨時節點:分布式鎖或者分布式佇列(這里與服務注冊無關),客戶端1想要操作一條資料的時候,在A節點下創建一個有序臨時節點,自動分配編號001;客戶端1也要操作該資料的時候,在A節點下也創建一個有序臨時節點,自動分配編號002,只有編號最小的子節點才會被執行,因此001節點會被執行,客戶端1執行完畢后,自動洗掉001節點,此時002編號為最小子節點,即鎖的概念,不能同時操作同一資料;也可以做佇列,按照先后順序依次執行,
5. 有序臨時節點+watcher監聽: 上面第4條中說到每次執行編號最小的節點,因此需要有一個程式,每次都需要遍歷全部節點,然后找出最小的節點,假如是002節點,這時客戶端2開始執行,但是添加監聽機制以后就不一樣了,002監聽001,003監聽比他小一號的002,這樣001銷毀的同時通知002開始執行,002銷毀的時候通知003開始執行,不需要遍歷最小節點,也能有序依次執行,
6. 臨時節點+watcher監聽: 集群master選舉以及高可用,比如hadoop集群,也有一個resourcemanager資源管理器,負責調度其它節點機器,相當于hadoop集群的leader節點,這個leader就可以交由zookeeper管理,所有的hadoop機器同時在zookeeper中創建一個同名的臨時節點,由于是同名互斥的節點,因此只有一個節點能被創建,成功創建這個節點的hadoop機器就是leader,同時添加Watcher監聽,這個leader只要斷開連接,臨時節點自動銷毀,觸發監聽,其它hadoop開始新一輪的master選舉,這也是zookeeper最初在hadoop家族中的重要使命,
7....... 還要很多地方都能用zookeeper,簡直無所不能,而且自身也是高可用,高性能,牛x
zookeeper本身的操作還是很簡單的,無非就是節點的增刪改查,可以選擇要創建節點的型別,還有就是在節點上添加watcher監聽器,就這些,
檔案結構:

上代碼:
zookeeper客戶端管理類:
public class ZookeeperClientProvider { private ConfigInfo _config; private readonly ILogger<ZookeeperClientProvider> _logger; private readonly Dictionary<string, ZooKeeper> _zookeeperClients = new Dictionary<string, ZooKeeper>(); public ZookeeperClientProvider(ConfigInfo config, ILogger<ZookeeperClientProvider> logger) { _config = config; _logger = logger; } public async Task<ZooKeeper> GetZooKeeper() { return await CreateZooKeeper(_config.Addresses.FirstOrDefault()); } public async Task<ZooKeeper> CreateZooKeeper(string address) { if (!_zookeeperClients.TryGetValue(address, out ZooKeeper result)) { await Task.Run(() => { result = new ZooKeeper(address, (int)_config.SessionTimeout.TotalMilliseconds, new ReconnectionWatcher( async () => { if (_zookeeperClients.Remove(address, out ZooKeeper value)) { await value.closeAsync(); } await CreateZooKeeper(address); })); _zookeeperClients.TryAdd(address, result); }); } return result; } public async Task<IEnumerable<ZooKeeper>> GetZooKeepers() { var result = new List<ZooKeeper>(); foreach (var address in _config.Addresses) { result.Add(await CreateZooKeeper(address)); } return result; } }ZookeeperClientProvider
zookeeper服務注冊類:
/// <summary> /// 一個抽象的服務路由發現者, /// </summary> public interface IServiceRouteManager { /// <summary> /// 服務路由被創建, /// </summary> event EventHandler<ServiceRouteEventArgs> Created; /// <summary> /// 服務路由被洗掉, /// </summary> event EventHandler<ServiceRouteEventArgs> Removed; /// <summary> /// 服務路由被修改, /// </summary> event EventHandler<ServiceRouteChangedEventArgs> Changed; /// <summary> /// 獲取所有可用的服務路由資訊, /// </summary> /// <returns>服務路由集合,</returns> Task<IEnumerable<ServiceRoute>> GetRoutesAsync(); /// <summary> /// 設定服務路由, /// </summary> /// <param name="routes">服務路由集合,</param> /// <returns>一個任務,</returns> Task SetRoutesAsync(IEnumerable<ServiceRoute> routes); /// <summary> /// 移除地址串列 /// </summary> /// <param name="routes">地址串列,</param> /// <returns>一個任務,</returns> Task RemveAddressAsync(IEnumerable<string> Address); /// <summary> /// 清空所有的服務路由, /// </summary> /// <returns>一個任務,</returns> Task ClearAsync(); } /// <summary> /// 服務路由事件引數, /// </summary> public class ServiceRouteEventArgs { public ServiceRouteEventArgs(ServiceRoute route) { Route = route; } /// <summary> /// 服務路由資訊, /// </summary> public ServiceRoute Route { get; private set; } } /// <summary> /// 服務路由變更事件引數, /// </summary> public class ServiceRouteChangedEventArgs : ServiceRouteEventArgs { public ServiceRouteChangedEventArgs(ServiceRoute route, ServiceRoute oldRoute) : base(route) { OldRoute = oldRoute; } /// <summary> /// 舊的服務路由資訊, /// </summary> public ServiceRoute OldRoute { get; set; } }IServiceRouteManager
public class ZooKeeperServiceRouteManager : IServiceRouteManager, IDisposable { private readonly ConfigInfo _configInfo; private readonly ISerializer<byte[]> _serializer; private readonly ILogger<ZooKeeperServiceRouteManager> _logger; private ServiceRoute[] _routes; private readonly ZookeeperClientProvider _zookeeperClientProvider; public ZooKeeperServiceRouteManager(ConfigInfo configInfo, ISerializer<byte[]> serializer, ISerializer<string> stringSerializer, ILogger<ZooKeeperServiceRouteManager> logger, ZookeeperClientProvider zookeeperClientProvider) { _configInfo = configInfo; _serializer = serializer; _logger = logger; _zookeeperClientProvider = zookeeperClientProvider; EnterRoutes().Wait(); } private EventHandler<ServiceRouteEventArgs> _created; private EventHandler<ServiceRouteEventArgs> _removed; private EventHandler<ServiceRouteChangedEventArgs> _changed; /// <summary> /// 服務路由被創建, /// </summary> public event EventHandler<ServiceRouteEventArgs> Created { add { _created += value; } remove { _created -= value; } } /// <summary> /// 服務路由被洗掉, /// </summary> public event EventHandler<ServiceRouteEventArgs> Removed { add { _removed += value; } remove { _removed -= value; } } /// <summary> /// 服務路由被修改, /// </summary> public event EventHandler<ServiceRouteChangedEventArgs> Changed { add { _changed += value; } remove { _changed -= value; } } protected void OnCreated(params ServiceRouteEventArgs[] args) { if (_created == null) return; foreach (var arg in args) _created(this, arg); } protected void OnChanged(params ServiceRouteChangedEventArgs[] args) { if (_changed == null) return; foreach (var arg in args) _changed(this, arg); } protected void OnRemoved(params ServiceRouteEventArgs[] args) { if (_removed == null) return; foreach (var arg in args) _removed(this, arg); } /// <summary> /// 獲取所有可用的服務路由資訊, /// </summary> /// <returns>服務路由集合,</returns> public async Task<IEnumerable<ServiceRoute>> GetRoutesAsync() { await EnterRoutes(); return _routes; } /// <summary> /// 清空所有的服務路由, /// </summary> /// <returns>一個任務,</returns> public async Task ClearAsync() { if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("準備清空所有路由配置,"); var zooKeepers = await _zookeeperClientProvider.GetZooKeepers(); foreach (var zooKeeper in zooKeepers) { var path = _configInfo.RoutePath; var childrens = path.Split(new[] { '/' }, StringSplitOptions.RemoveEmptyEntries); var index = 0; while (childrens.Count() > 1) { var nodePath = "/" + string.Join("/", childrens); if (await zooKeeper.existsAsync(nodePath) != null) { var result = await zooKeeper.getChildrenAsync(nodePath); if (result?.Children != null) { foreach (var child in result.Children) { var childPath = $"{nodePath}/{child}"; if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug($"準備洗掉:{childPath},"); await zooKeeper.deleteAsync(childPath); } } if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug($"準備洗掉:{nodePath},"); await zooKeeper.deleteAsync(nodePath); } index++; childrens = childrens.Take(childrens.Length - index).ToArray(); } if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("路由配置清空完成,"); } } /// <summary> /// 設定服務路由, /// </summary> /// <param name="routes">服務路由集合,</param> /// <returns>一個任務,</returns> public async Task SetRoutesAsync(IEnumerable<ServiceRoute> routes) { var hostAddr = NetUtils.GetHostAddress(); var serviceRoutes = await GetRoutes(routes.Select(p => p.serviceRouteDescriptor.Id)); if (serviceRoutes.Count() > 0) { foreach (var route in routes) { var serviceRoute = serviceRoutes.Where(p => p.serviceRouteDescriptor.Id == route.serviceRouteDescriptor.Id).FirstOrDefault(); if (serviceRoute != null) { var addresses = serviceRoute.Address.Concat( route.Address.Except(serviceRoute.Address)).ToList(); foreach (var address in route.Address) { addresses.Remove(addresses.Where(p => p.ToString() == address.ToString()).FirstOrDefault()); addresses.Add(address); } route.Address = addresses; } } } await RemoveExceptRoutesAsync(routes, hostAddr); if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("準備添加服務路由,"); var zooKeepers = await _zookeeperClientProvider.GetZooKeepers(); foreach (var zooKeeper in zooKeepers) { await CreateSubdirectory(zooKeeper, _configInfo.RoutePath); var path = _configInfo.RoutePath; if (!path.EndsWith("/")) path += "/"; routes = routes.ToArray(); foreach (var serviceRoute in routes) { var nodePath = $"{path}{serviceRoute.serviceRouteDescriptor.Id}"; var nodeData =https://www.cnblogs.com/hongwei918/p/ _serializer.Serialize(serviceRoute); if (await zooKeeper.existsAsync(nodePath) == null) { if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug($"節點:{nodePath}不存在將進行創建,"); await zooKeeper.createAsync(nodePath, nodeData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } else { if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug($"將更新節點:{nodePath}的資料,"); var onlineData = https://www.cnblogs.com/hongwei918/p/(await zooKeeper.getDataAsync(nodePath)).Data; if (!DataEquals(nodeData, onlineData)) await zooKeeper.setDataAsync(nodePath, nodeData); } } if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("服務路由添加成功,"); } } public async Task RemveAddressAsync(IEnumerable<string> Address) { var routes = await GetRoutesAsync(); foreach (var route in routes) { route.Address = route.Address.Except(Address); } await SetRoutesAsync(routes); } private async Task RemoveExceptRoutesAsync(IEnumerable<ServiceRoute> routes, string hostAddr) { var path = _configInfo.RoutePath; if (!path.EndsWith("/")) path += "/"; routes = routes.ToArray(); var zooKeepers = await _zookeeperClientProvider.GetZooKeepers(); foreach (var zooKeeper in zooKeepers) { if (_routes != null) { var oldRouteIds = _routes.Select(i => i.serviceRouteDescriptor.Id).ToArray(); var newRouteIds = routes.Select(i => i.serviceRouteDescriptor.Id).ToArray(); var deletedRouteIds = oldRouteIds.Except(newRouteIds).ToArray(); foreach (var deletedRouteId in deletedRouteIds) { var addresses = _routes.Where(p => p.serviceRouteDescriptor.Id == deletedRouteId).Select(p => p.Address).FirstOrDefault(); if (addresses.Contains(hostAddr)) { var nodePath = $"{path}{deletedRouteId}"; await zooKeeper.deleteAsync(nodePath); } } } } } private async Task CreateSubdirectory(ZooKeeper zooKeeper, string path) { if (await zooKeeper.existsAsync(path) != null) return; if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation($"節點{path}不存在,將進行創建,"); var childrens = path.Split(new[] { '/' }, StringSplitOptions.RemoveEmptyEntries); var nodePath = "/"; foreach (var children in childrens) { nodePath += children; if (await zooKeeper.existsAsync(nodePath) == null) { await zooKeeper.createAsync(nodePath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } nodePath += "/"; } } private async Task<ServiceRoute> GetRoute(byte[] data) { if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug($"準備轉換服務路由,配置內容:{Encoding.UTF8.GetString(data)},"); if (data =https://www.cnblogs.com/hongwei918/p/= null) return null; return await Task.Run(() => { return _serializer.Deserialize<ServiceRoute>(data); }); } private async Task<ServiceRoute> GetRoute(string path) { ServiceRoute result = null; var zooKeeper = await GetZooKeeper(); var watcher = new NodeMonitorWatcher(GetZooKeeper(), path, async (oldData, newData) => await NodeChange(oldData, newData)); if (await zooKeeper.existsAsync(path) != null) { var data = https://www.cnblogs.com/hongwei918/p/(await zooKeeper.getDataAsync(path, watcher)).Data; watcher.SetCurrentData(data); result = await GetRoute(data); } return result; } private async Task<ServiceRoute[]> GetRoutes(IEnumerable<string> childrens) { var rootPath = _configInfo.RoutePath; if (!rootPath.EndsWith("/")) rootPath += "/"; childrens = childrens.ToArray(); var routes = new List<ServiceRoute>(childrens.Count()); foreach (var children in childrens) { if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug($"準備從節點:{children}中獲取路由資訊,"); var nodePath = $"{rootPath}{children}"; var route = await GetRoute(nodePath); if (route != null) routes.Add(route); } return routes.ToArray(); } private async Task EnterRoutes() { if (_routes != null) return; var zooKeeper = await GetZooKeeper(); var watcher = new ChildrenMonitorWatcher(GetZooKeeper(), _configInfo.RoutePath, async (oldChildrens, newChildrens) => await ChildrenChange(oldChildrens, newChildrens)); if (await zooKeeper.existsAsync(_configInfo.RoutePath, watcher) != null) { var result = await zooKeeper.getChildrenAsync(_configInfo.RoutePath, watcher); var childrens = result.Children.ToArray(); watcher.SetCurrentData(childrens); _routes = await GetRoutes(childrens); } else { if (_logger.IsEnabled(LogLevel.Warning)) _logger.LogWarning($"無法獲取路由資訊,因為節點:{_configInfo.RoutePath},不存在,"); _routes = new ServiceRoute[0]; } } private static bool DataEquals(IReadOnlyList<byte> data1, IReadOnlyList<byte> data2) { if (data1.Count != data2.Count) return false; for (var i = 0; i < data1.Count; i++) { var b1 = data1[i]; var b2 = data2[i]; if (b1 != b2) return false; } return true; } public async Task NodeChange(byte[] oldData, byte[] newData) { if (DataEquals(oldData, newData)) return; var newRoute = await GetRoute(newData); //得到舊的路由, var oldRoute = _routes.FirstOrDefault(i => i.serviceRouteDescriptor.Id == newRoute.serviceRouteDescriptor.Id); lock (_routes) { //洗掉舊路由,并添加上新的路由, _routes = _routes .Where(i => i.serviceRouteDescriptor.Id != newRoute.serviceRouteDescriptor.Id) .Concat(new[] { newRoute }).ToArray(); } //觸發路由變更事件, OnChanged(new ServiceRouteChangedEventArgs(newRoute, oldRoute)); } public async Task ChildrenChange(string[] oldChildrens, string[] newChildrens) { if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug($"最新的節點資訊:{string.Join(",", newChildrens)}"); if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug($"舊的節點資訊:{string.Join(",", oldChildrens)}"); //計算出已被洗掉的節點, var deletedChildrens = oldChildrens.Except(newChildrens).ToArray(); //計算出新增的節點, var createdChildrens = newChildrens.Except(oldChildrens).ToArray(); if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug($"需要被洗掉的路由節點:{string.Join(",", deletedChildrens)}"); if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug($"需要被添加的路由節點:{string.Join(",", createdChildrens)}"); //獲取新增的路由資訊, var newRoutes = (await GetRoutes(createdChildrens)).ToArray(); var routes = _routes.ToArray(); lock (_routes) { _routes = _routes //洗掉無效的節點路由, .Where(i => !deletedChildrens.Contains(i.serviceRouteDescriptor.Id)) //連接上新的路由, .Concat(newRoutes) .ToArray(); } //需要洗掉的路由集合, var deletedRoutes = routes.Where(i => deletedChildrens.Contains(i.serviceRouteDescriptor.Id)).ToArray(); //觸發洗掉事件, OnRemoved(deletedRoutes.Select(route => new ServiceRouteEventArgs(route)).ToArray()); //觸發路由被創建事件, OnCreated(newRoutes.Select(route => new ServiceRouteEventArgs(route)).ToArray()); if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("路由資料更新成功,"); } /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary> public void Dispose() { } private async Task<ZooKeeper> GetZooKeeper() { return await _zookeeperClientProvider.GetZooKeeper(); } }ZooKeeperServiceRouteManager
zookeeper連接配置類:
public class ConfigInfo { /// <summary> /// 初始化會話超時為20秒的Zookeeper配置資訊, /// </summary> /// <param name="connectionString">連接字串,</param> /// <param name="routePath">路由配置路徑,</param> /// <param name="subscriberPath">訂閱者配置路徑</param> /// <param name="commandPath">服務命令配置路徑</param> /// <param name="cachePath">快取中心配置路徑</param> /// <param name="mqttRoutePath">mqtt路由配置路徑</param> /// <param name="chRoot">根節點,</param> public ConfigInfo(string connectionString, string routePath = "/services/serviceRoutes", string subscriberPath = "/services/serviceSubscribers", string commandPath = "/services/serviceCommands", string cachePath = "/services/serviceCaches", string mqttRoutePath = "/services/mqttServiceRoutes", string chRoot = null, bool reloadOnChange = false, bool enableChildrenMonitor = false) : this(connectionString, TimeSpan.FromSeconds(20), routePath, subscriberPath, commandPath, cachePath, mqttRoutePath, chRoot, reloadOnChange, enableChildrenMonitor) { } /// <summary> /// 初始化Zookeeper配置資訊, /// </summary> /// <param name="connectionString">連接字串,</param> /// <param name="routePath">路由配置路徑,</param> /// <param name="commandPath">服務命令配置路徑</param> /// <param name="subscriberPath">訂閱者配置路徑</param> /// <param name="sessionTimeout">會話超時時間,</param> /// <param name="cachePath">快取中心配置路徑</param> /// <param name="mqttRoutePath">mqtt路由配置路徑</param> /// <param name="chRoot">根節點,</param> public ConfigInfo(string connectionString, TimeSpan sessionTimeout, string routePath = "/services/serviceRoutes", string subscriberPath = "/services/serviceSubscribers", string commandPath = "/services/serviceCommands", string cachePath = "/services/serviceCaches", string mqttRoutePath = "/services/mqttServiceRoutes", string chRoot = null, bool reloadOnChange = false, bool enableChildrenMonitor = false) { CachePath = cachePath; ReloadOnChange = reloadOnChange; ChRoot = chRoot; CommandPath = commandPath; SubscriberPath = subscriberPath; ConnectionString = connectionString; RoutePath = routePath; SessionTimeout = sessionTimeout; MqttRoutePath = mqttRoutePath; EnableChildrenMonitor = enableChildrenMonitor; Addresses = connectionString?.Split(","); } public bool EnableChildrenMonitor { get; set; } public bool ReloadOnChange { get; set; } /// <summary> /// 連接字串, /// </summary> public string ConnectionString { get; set; } /// <summary> /// 命令配置路徑 /// </summary> public string CommandPath { get; set; } /// <summary> /// 路由配置路徑, /// </summary> public string RoutePath { get; set; } /// <summary> /// 訂閱者配置路徑 /// </summary> public string SubscriberPath { get; set; } /// <summary> /// 會話超時時間, /// </summary> public TimeSpan SessionTimeout { get; set; } /// <summary> /// 根節點, /// </summary> public string ChRoot { get; set; } public IEnumerable<string> Addresses { get; set; } /// <summary> /// 快取中心配置中心 /// </summary> public string CachePath { get; set; } /// <summary> /// Mqtt路由配置路徑, /// </summary> public string MqttRoutePath { get; set; } }ConfigInfo
路由和路由描述:
public class ServiceRoute { /// <summary> /// 服務可用地址, /// </summary> public IEnumerable<string> Address { get; set; } /// <summary> /// 服務描述符, /// </summary> public ServiceRouteDescriptor serviceRouteDescriptor { get; set; } #region Equality members /// <summary>Determines whether the specified object is equal to the current object.</summary> /// <returns>true if the specified object is equal to the current object; otherwise, false.</returns> /// <param name="obj">The object to compare with the current object. </param> public override bool Equals(object obj) { var model = obj as ServiceRoute; if (model == null) return false; if (obj.GetType() != GetType()) return false; if (model.serviceRouteDescriptor != serviceRouteDescriptor) return false; return model.Address.Count() == Address.Count() && model.Address.All(addressModel => Address.Contains(addressModel)); } /// <summary>Serves as the default hash function. </summary> /// <returns>A hash code for the current object.</returns> public override int GetHashCode() { return ToString().GetHashCode(); } public static bool operator ==(ServiceRoute model1, ServiceRoute model2) { return Equals(model1, model2); } public static bool operator !=(ServiceRoute model1, ServiceRoute model2) { return !Equals(model1, model2); } #endregion Equality members }ServiceRoute
/// <summary> /// 服務描述符, /// </summary> [Serializable] public class ServiceRouteDescriptor { /// <summary> /// 初始化一個新的服務描述符, /// </summary> public ServiceRouteDescriptor() { Metadatas = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase); } /// <summary> /// 服務Id, /// </summary> public string Id { get; set; } /// <summary> /// 訪問的令牌 /// </summary> public string Token { get; set; } /// <summary> /// 路由 /// </summary> public string RoutePath { get; set; } /// <summary> /// 元資料, /// </summary> public IDictionary<string, object> Metadatas { get; set; } /// <summary> /// 獲取一個元資料, /// </summary> /// <typeparam name="T">元資料型別,</typeparam> /// <param name="name">元資料名稱,</param> /// <param name="def">如果指定名稱的元資料不存在則回傳這個引數,</param> /// <returns>元資料值,</returns> public T GetMetadata<T>(string name, T def = default(T)) { if (!Metadatas.ContainsKey(name)) return def; return (T)Metadatas[name]; } #region Equality members /// <summary>Determines whether the specified object is equal to the current object.</summary> /// <returns>true if the specified object is equal to the current object; otherwise, false.</returns> /// <param name="obj">The object to compare with the current object. </param> public override bool Equals(object obj) { var model = obj as ServiceRouteDescriptor; if (model == null) return false; if (obj.GetType() != GetType()) return false; if (model.Id != Id) return false; return model.Metadatas.Count == Metadatas.Count && model.Metadatas.All(metadata =https://www.cnblogs.com/hongwei918/p/> { object value; if (!Metadatas.TryGetValue(metadata.Key, out value)) return false; if (metadata.Value =https://www.cnblogs.com/hongwei918/p/= null && value =https://www.cnblogs.com/hongwei918/p/= null) return true; if (metadata.Value =https://www.cnblogs.com/hongwei918/p/= null || value =https://www.cnblogs.com/hongwei918/p/= null) return false; return metadata.Value.Equals(value); }); } /// <summary>Serves as the default hash function. </summary> /// <returns>A hash code for the current object.</returns> public override int GetHashCode() { return ToString().GetHashCode(); } public static bool operator ==(ServiceRouteDescriptor model1, ServiceRouteDescriptor model2) { return Equals(model1, model2); } public static bool operator !=(ServiceRouteDescriptor model1, ServiceRouteDescriptor model2) { return !Equals(model1, model2); } #endregion Equality members }ServiceRouteDescriptor
Watcher監聽器:
子節點監聽器:
internal class ChildrenMonitorWatcher : Watcher { private readonly Task<ZooKeeper> _zooKeeperCall; private readonly string _path; private readonly Action<string[], string[]> _action; private string[] _currentData = https://www.cnblogs.com/hongwei918/p/new string[0]; public ChildrenMonitorWatcher(Task<ZooKeeper> zooKeeperCall, string path, Action<string[], string[]> action) { _zooKeeperCall = zooKeeperCall; _path = path; _action = action; } public ChildrenMonitorWatcher SetCurrentData(string[] currentData) { _currentData = currentData ?? new string[0]; return this; } #region Overrides of WatcherBase public override async Task process(WatchedEvent watchedEvent) { if (watchedEvent.getState() != Event.KeeperState.SyncConnected || watchedEvent.getPath() != _path) return; var zooKeeper = await _zooKeeperCall; //Func<ChildrenMonitorWatcher> getWatcher = () => new ChildrenMonitorWatcher(_zooKeeperCall, path, _action); Task<ChildrenMonitorWatcher> getWatcher = Task.Run(() => {return new ChildrenMonitorWatcher(_zooKeeperCall, _path, _action); }); switch (watchedEvent.get_Type()) { //創建之后開始監視下面的子節點情況, case Event.EventType.NodeCreated: await zooKeeper.getChildrenAsync(_path, await getWatcher); break; //子節點修改則繼續監控子節點資訊并通知客戶端資料變更, case Event.EventType.NodeChildrenChanged: try { var watcher = await getWatcher; var result = await zooKeeper.getChildrenAsync(_path, watcher); var childrens = result.Children.ToArray(); _action(_currentData, childrens); watcher.SetCurrentData(childrens); } catch (KeeperException.NoNodeException) { _action(_currentData, new string[0]); } break; //洗掉之后開始監控自身節點,并通知客戶端資料被清空, case Event.EventType.NodeDeleted: { var watcher = await getWatcher; await zooKeeper.existsAsync(_path, watcher); _action(_currentData, new string[0]); watcher.SetCurrentData(new string[0]); } break; } } #endregion Overrides of WatcherBase }ChildrenMonitorWatcher
當前節點監聽器:
internal class NodeMonitorWatcher : Watcher { private readonly Task<ZooKeeper> _zooKeeperCall; private readonly string _path; private readonly Action<byte[], byte[]> _action; private byte[] _currentData; public NodeMonitorWatcher(Task<ZooKeeper> zooKeeperCall, string path, Action<byte[], byte[]> action) { _zooKeeperCall = zooKeeperCall; _path = path; _action = action; } public NodeMonitorWatcher SetCurrentData(byte[] currentData) { _currentData = currentData; return this; } #region Overrides of WatcherBase public override async Task process(WatchedEvent watchedEvent) { switch (watchedEvent.get_Type()) { case Event.EventType.NodeDataChanged: var zooKeeper = await _zooKeeperCall; var watcher = new NodeMonitorWatcher(_zooKeeperCall, _path, _action); var data = https://www.cnblogs.com/hongwei918/p/await zooKeeper.getDataAsync(_path, watcher); var newData =https://www.cnblogs.com/hongwei918/p/ data.Data; _action(_currentData, newData); watcher.SetCurrentData(newData); break; } } #endregion Overrides of WatcherBase }NodeMonitorWatcher
連接斷開監聽器:
internal class ReconnectionWatcher : Watcher { private readonly Action _reconnection; public ReconnectionWatcher(Action reconnection) { _reconnection = reconnection; } #region Overrides of Watcher /// <summary>Processes the specified event.</summary> /// <param name="watchedEvent">The event.</param> /// <returns></returns> public override async Task process(WatchedEvent watchedEvent) { var state = watchedEvent.getState(); switch (state) { case Event.KeeperState.Expired: case Event.KeeperState.Disconnected: { _reconnection(); break; } } await Task.CompletedTask; } #endregion Overrides of Watcher }ReconnectionWatcher
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/110610.html
標籤:.NET Core
