主頁 > .NET開發 > (九)分布式服務----Zookeeper注冊中心

(九)分布式服務----Zookeeper注冊中心

2020-09-23 08:57:03 .NET開發

 ==>>點擊查看本系列文章目錄

 

首先看一下幾種注冊中心:

最老的就是Zookeeper了, 比較新的有Eureka,Consul 都可以做注冊中心,可以自行搜索對比三者的優缺點,

Zookeeper 最開始就是hadoop大家族中的一員,用于做協調的框架,后來已經是apache的子專案了,

幾年前大資料很火的時候,只要學hadoop必學zookeeper,當然還有其他成員,

大資料簡單說就是分布式,比如分布式檔案存盤hdfs,分布式資料庫hbase,分布式協調zookeeper,還有kafka,Flume等等都是hadoop大家族,

zookeeper,現在更多被用來做注冊中心,比如阿里的開源SOA框架dubbo就經常搭配zookeeper做注冊中心,

Eureka:java的微服務框架Spring Cloud中內部已經集成了Eureka注冊中心,

我選擇zookeeper,不是因為他比另外兩個強,而是因為我幾年前就已經學習過一些zookeeper的原理,上手更容易,網路上學習書籍、資料、視頻教程也特別多,學習資料完善,

 

注冊中心的基本功能:

1. 注冊服務,有點類似DNS,所有的服務注冊到注冊中心,包含服務的地址等資訊,

2. 服務訂閱,客戶端請求服務,注冊中心就要把那些能用的服務器地址告訴客戶端,服務端有變動時,注冊中心也能及時通知到客戶端,

3. 性能好且高可用,注冊中心自身也是一個集群,如果只有一個注冊中心機器的話那豈不是把注冊中心累死啊,而且他一旦壞了以后,那客戶端都找不到服務器了,所有注冊中心就有很多臺,其中只有一個老大(leader),老大用來寫,小弟用來讀,就是說老大來決定一臺服務器能不能注冊進來,小弟負責幫助客戶端查找服務器,因為注冊服務的次數是很少的,通常有新服務器加入才需要注冊,但是客戶端訂閱那就很多了,所以注冊中心只有一個leader,leader萬一壞掉的話,會從小弟中選舉出一個來當老大接替作業,

 

上面提到說zookeeper集群,就是說有很多臺機器做zookeeper機器,但是這些機器里存盤的東西基本上都是一樣的,就是說客戶端不管連到哪個zookeeper 都是一樣的,能做服務訂閱,

每一個zookeeper 中都有很多節點(Znode),

接下來說的zookeeper節點和集群完全不是一回事, 有些人喜歡吧集群中的每一臺zookeeper機器稱為一個節點,但是這個節點(zookeeper機器)和我說的節點(Znode)完全不是一回事,

如下圖:

 

 本例的圖中可以看到,一共有5臺機器,每臺機器都有5個znode,Znode下面的子節點就更多了,

先看5臺機器:

一臺leader,老大,上文已經介紹,服務都從這些注冊寫入,

兩臺follower,小弟,平時用于服務訂閱,老大掛掉以后,follower內部就會自行選出老大,

兩臺observer,觀察者,就是屬于無業游民,只能看,沒有選老大的資格,不能參與競選也不能投票,唯一的功能就是服務訂閱,

  observer模式需要手動開啟,為什么會出現observer呢,是因為機器太多的話,每個機器都有選舉權的話特別影響性能,全中國14億人口,每個人都參與國家競選的話,效率極低,所以呢,選舉的作業就交給follower完成就行了,只需要確保一直都有leader接班人就好,

 

再看看zookeeper有什么基本功能:

基本功能很簡單,組合以后卻可以完成各種復雜作業,

1. 可以創建:臨時節點(斷開連接時便洗掉節點) 和 持久化節點(必須手動洗掉節點),

2. 可以創建:無序節點 和 有序節點,

3. 節點上可以添加watcher監聽功能,監聽該節點的增刪改,然后觸發自定義的事件,

 

看看這些功能怎么用:

1. 節點: 每次注冊一個服務就創建一個節點,節點的名稱(Key)就是服務的名稱,服務的詳細資訊存盤在節點value中,客戶端通過key找到對應的節點,再找打節點中的value,

2. 臨時節點:服務端注冊一個服務時創建一個臨時節點,服務斷開時,臨時節點自動銷毀,自動完成服務注銷,

3. watcher監聽: 客戶端在注冊中心訂閱了一個服務的時候,同時在這個服務所在的節點上加一個監聽事件,每當服務節點資訊有變化的時候,注冊中心會自動回呼通知客戶端,

4. 有序臨時節點:分布式鎖或者分布式佇列(這里與服務注冊無關),客戶端1想要操作一條資料的時候,在A節點下創建一個有序臨時節點,自動分配編號001;客戶端1也要操作該資料的時候,在A節點下也創建一個有序臨時節點,自動分配編號002,只有編號最小的子節點才會被執行,因此001節點會被執行,客戶端1執行完畢后,自動洗掉001節點,此時002編號為最小子節點,即鎖的概念,不能同時操作同一資料;也可以做佇列,按照先后順序依次執行,

5. 有序臨時節點+watcher監聽: 上面第4條中說到每次執行編號最小的節點,因此需要有一個程式,每次都需要遍歷全部節點,然后找出最小的節點,假如是002節點,這時客戶端2開始執行,但是添加監聽機制以后就不一樣了,002監聽001,003監聽比他小一號的002,這樣001銷毀的同時通知002開始執行,002銷毀的時候通知003開始執行,不需要遍歷最小節點,也能有序依次執行,

6. 臨時節點+watcher監聽: 集群master選舉以及高可用,比如hadoop集群,也有一個resourcemanager資源管理器,負責調度其它節點機器,相當于hadoop集群的leader節點,這個leader就可以交由zookeeper管理,所有的hadoop機器同時在zookeeper中創建一個同名的臨時節點,由于是同名互斥的節點,因此只有一個節點能被創建,成功創建這個節點的hadoop機器就是leader,同時添加Watcher監聽,這個leader只要斷開連接,臨時節點自動銷毀,觸發監聽,其它hadoop開始新一輪的master選舉,這也是zookeeper最初在hadoop家族中的重要使命,

7....... 還要很多地方都能用zookeeper,簡直無所不能,而且自身也是高可用,高性能,牛x

 

zookeeper本身的操作還是很簡單的,無非就是節點的增刪改查,可以選擇要創建節點的型別,還有就是在節點上添加watcher監聽器,就這些,

 

檔案結構:

 

上代碼:

zookeeper客戶端管理類:

public class ZookeeperClientProvider
    {
        private ConfigInfo _config;
        private readonly ILogger<ZookeeperClientProvider> _logger;
        private readonly Dictionary<string, ZooKeeper> _zookeeperClients = new Dictionary<string, ZooKeeper>();

        public ZookeeperClientProvider(ConfigInfo config, ILogger<ZookeeperClientProvider> logger)
        {
            _config = config;
            _logger = logger;
        }

        public async Task<ZooKeeper> GetZooKeeper()
        {
            return await CreateZooKeeper(_config.Addresses.FirstOrDefault());
        }
        public async Task<ZooKeeper> CreateZooKeeper(string address)
        {
            if (!_zookeeperClients.TryGetValue(address, out ZooKeeper result))
            {
                await Task.Run(() =>
                {
                    result = new ZooKeeper(address, (int)_config.SessionTimeout.TotalMilliseconds,
                        new ReconnectionWatcher(
                            async () =>
                            {
                                if (_zookeeperClients.Remove(address, out ZooKeeper value))
                                {
                                    await value.closeAsync();
                                }
                                await CreateZooKeeper(address);
                            }));
                    _zookeeperClients.TryAdd(address, result);
                });
            }
            return result;
        }

        public async Task<IEnumerable<ZooKeeper>> GetZooKeepers()
        {
            var result = new List<ZooKeeper>();
            foreach (var address in _config.Addresses)
            {
                result.Add(await CreateZooKeeper(address));
            }
            return result;
        }
    }
ZookeeperClientProvider

zookeeper服務注冊類:

/// <summary>
    /// 一個抽象的服務路由發現者,
    /// </summary>
    public interface IServiceRouteManager
    {

        /// <summary>
        /// 服務路由被創建,
        /// </summary>
        event EventHandler<ServiceRouteEventArgs> Created;

        /// <summary>
        /// 服務路由被洗掉,
        /// </summary>
        event EventHandler<ServiceRouteEventArgs> Removed;

        /// <summary>
        /// 服務路由被修改,
        /// </summary>
        event EventHandler<ServiceRouteChangedEventArgs> Changed;

        /// <summary>
        /// 獲取所有可用的服務路由資訊,
        /// </summary>
        /// <returns>服務路由集合,</returns>
        Task<IEnumerable<ServiceRoute>> GetRoutesAsync();

        /// <summary>
        /// 設定服務路由,
        /// </summary>
        /// <param name="routes">服務路由集合,</param>
        /// <returns>一個任務,</returns>
        Task SetRoutesAsync(IEnumerable<ServiceRoute> routes);

        /// <summary>
        /// 移除地址串列
        /// </summary>
        /// <param name="routes">地址串列,</param>
        /// <returns>一個任務,</returns>
        Task RemveAddressAsync(IEnumerable<string> Address);
        /// <summary>
        /// 清空所有的服務路由,
        /// </summary>
        /// <returns>一個任務,</returns>
        Task ClearAsync();
    }

    /// <summary>
    /// 服務路由事件引數,
    /// </summary>
    public class ServiceRouteEventArgs
    {
        public ServiceRouteEventArgs(ServiceRoute route)
        {
            Route = route;
        }

        /// <summary>
        /// 服務路由資訊,
        /// </summary>
        public ServiceRoute Route { get; private set; }
    }

    /// <summary>
    /// 服務路由變更事件引數,
    /// </summary>
    public class ServiceRouteChangedEventArgs : ServiceRouteEventArgs
    {
        public ServiceRouteChangedEventArgs(ServiceRoute route, ServiceRoute oldRoute) : base(route)
        {
            OldRoute = oldRoute;
        }

        /// <summary>
        /// 舊的服務路由資訊,
        /// </summary>
        public ServiceRoute OldRoute { get; set; }
    }
IServiceRouteManager
public class ZooKeeperServiceRouteManager : IServiceRouteManager, IDisposable
    {
        private readonly ConfigInfo _configInfo;
        private readonly ISerializer<byte[]> _serializer;
        private readonly ILogger<ZooKeeperServiceRouteManager> _logger;
        private ServiceRoute[] _routes;
        private readonly ZookeeperClientProvider _zookeeperClientProvider;

        public ZooKeeperServiceRouteManager(ConfigInfo configInfo, ISerializer<byte[]> serializer,
            ISerializer<string> stringSerializer,
            ILogger<ZooKeeperServiceRouteManager> logger,
            ZookeeperClientProvider zookeeperClientProvider)
        {
            _configInfo = configInfo;
            _serializer = serializer;
            _logger = logger;
            _zookeeperClientProvider = zookeeperClientProvider;
            EnterRoutes().Wait();
        }

        private EventHandler<ServiceRouteEventArgs> _created;
        private EventHandler<ServiceRouteEventArgs> _removed;
        private EventHandler<ServiceRouteChangedEventArgs> _changed;

        /// <summary>
        /// 服務路由被創建,
        /// </summary>
        public event EventHandler<ServiceRouteEventArgs> Created
        {
            add { _created += value; }
            remove { _created -= value; }
        }

        /// <summary>
        /// 服務路由被洗掉,
        /// </summary>
        public event EventHandler<ServiceRouteEventArgs> Removed
        {
            add { _removed += value; }
            remove { _removed -= value; }
        }

        /// <summary>
        /// 服務路由被修改,
        /// </summary>
        public event EventHandler<ServiceRouteChangedEventArgs> Changed
        {
            add { _changed += value; }
            remove { _changed -= value; }
        }



        protected void OnCreated(params ServiceRouteEventArgs[] args)
        {
            if (_created == null)
                return;

            foreach (var arg in args)
                _created(this, arg);
        }

        protected void OnChanged(params ServiceRouteChangedEventArgs[] args)
        {
            if (_changed == null)
                return;

            foreach (var arg in args)
                _changed(this, arg);
        }

        protected void OnRemoved(params ServiceRouteEventArgs[] args)
        {
            if (_removed == null)
                return;

            foreach (var arg in args)
                _removed(this, arg);
        }


        /// <summary>
        /// 獲取所有可用的服務路由資訊,
        /// </summary>
        /// <returns>服務路由集合,</returns>
        public async Task<IEnumerable<ServiceRoute>> GetRoutesAsync()
        {
            await EnterRoutes();
            return _routes;
        }

        /// <summary>
        /// 清空所有的服務路由,
        /// </summary>
        /// <returns>一個任務,</returns>
        public async Task ClearAsync()
        {
            if (_logger.IsEnabled(LogLevel.Information))
                _logger.LogInformation("準備清空所有路由配置,");
            var zooKeepers = await _zookeeperClientProvider.GetZooKeepers();
            foreach (var zooKeeper in zooKeepers)
            {
                var path = _configInfo.RoutePath;
                var childrens = path.Split(new[] { '/' }, StringSplitOptions.RemoveEmptyEntries);

                var index = 0;
                while (childrens.Count() > 1)
                {
                    var nodePath = "/" + string.Join("/", childrens);

                    if (await zooKeeper.existsAsync(nodePath) != null)
                    {
                        var result = await zooKeeper.getChildrenAsync(nodePath);
                        if (result?.Children != null)
                        {
                            foreach (var child in result.Children)
                            {
                                var childPath = $"{nodePath}/{child}";
                                if (_logger.IsEnabled(LogLevel.Debug))
                                    _logger.LogDebug($"準備洗掉:{childPath},");
                                await zooKeeper.deleteAsync(childPath);
                            }
                        }
                        if (_logger.IsEnabled(LogLevel.Debug))
                            _logger.LogDebug($"準備洗掉:{nodePath},");
                        await zooKeeper.deleteAsync(nodePath);
                    }
                    index++;
                    childrens = childrens.Take(childrens.Length - index).ToArray();
                }
                if (_logger.IsEnabled(LogLevel.Information))
                    _logger.LogInformation("路由配置清空完成,");
            }
        }

        /// <summary>
        /// 設定服務路由,
        /// </summary>
        /// <param name="routes">服務路由集合,</param>
        /// <returns>一個任務,</returns>
        public async Task SetRoutesAsync(IEnumerable<ServiceRoute> routes)
        {
            var hostAddr = NetUtils.GetHostAddress();
            var serviceRoutes = await GetRoutes(routes.Select(p => p.serviceRouteDescriptor.Id));
            if (serviceRoutes.Count() > 0)
            {
                foreach (var route in routes)
                {
                    var serviceRoute = serviceRoutes.Where(p => p.serviceRouteDescriptor.Id == route.serviceRouteDescriptor.Id).FirstOrDefault();
                    if (serviceRoute != null)
                    {
                        var addresses = serviceRoute.Address.Concat(
                          route.Address.Except(serviceRoute.Address)).ToList();

                        foreach (var address in route.Address)
                        {
                            addresses.Remove(addresses.Where(p => p.ToString() == address.ToString()).FirstOrDefault());
                            addresses.Add(address);
                        }
                        route.Address = addresses;
                    }
                }
            }
            await RemoveExceptRoutesAsync(routes, hostAddr);

            if (_logger.IsEnabled(LogLevel.Information))
                _logger.LogInformation("準備添加服務路由,");
            var zooKeepers = await _zookeeperClientProvider.GetZooKeepers();
            foreach (var zooKeeper in zooKeepers)
            {
                await CreateSubdirectory(zooKeeper, _configInfo.RoutePath);

                var path = _configInfo.RoutePath;
                if (!path.EndsWith("/"))
                    path += "/";

                routes = routes.ToArray();

                foreach (var serviceRoute in routes)
                {
                    var nodePath = $"{path}{serviceRoute.serviceRouteDescriptor.Id}";
                    var nodeData =https://www.cnblogs.com/hongwei918/p/ _serializer.Serialize(serviceRoute);
                    if (await zooKeeper.existsAsync(nodePath) == null)
                    {
                        if (_logger.IsEnabled(LogLevel.Debug))
                            _logger.LogDebug($"節點:{nodePath}不存在將進行創建,");

                        await zooKeeper.createAsync(nodePath, nodeData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    }
                    else
                    {
                        if (_logger.IsEnabled(LogLevel.Debug))
                            _logger.LogDebug($"將更新節點:{nodePath}的資料,");

                        var onlineData = https://www.cnblogs.com/hongwei918/p/(await zooKeeper.getDataAsync(nodePath)).Data;
                        if (!DataEquals(nodeData, onlineData))
                            await zooKeeper.setDataAsync(nodePath, nodeData);
                    }
                }
                if (_logger.IsEnabled(LogLevel.Information))
                    _logger.LogInformation("服務路由添加成功,");
            }
        }

        public async Task RemveAddressAsync(IEnumerable<string> Address)
        {
            var routes = await GetRoutesAsync();
            foreach (var route in routes)
            {
                route.Address = route.Address.Except(Address);
            }
            await SetRoutesAsync(routes);
        }

        private async Task RemoveExceptRoutesAsync(IEnumerable<ServiceRoute> routes, string hostAddr)
        {
            var path = _configInfo.RoutePath;
            if (!path.EndsWith("/"))
                path += "/";
            routes = routes.ToArray();
            var zooKeepers = await _zookeeperClientProvider.GetZooKeepers();
            foreach (var zooKeeper in zooKeepers)
            {
                if (_routes != null)
                {
                    var oldRouteIds = _routes.Select(i => i.serviceRouteDescriptor.Id).ToArray();
                    var newRouteIds = routes.Select(i => i.serviceRouteDescriptor.Id).ToArray();
                    var deletedRouteIds = oldRouteIds.Except(newRouteIds).ToArray();
                    foreach (var deletedRouteId in deletedRouteIds)
                    {
                        var addresses = _routes.Where(p => p.serviceRouteDescriptor.Id == deletedRouteId).Select(p => p.Address).FirstOrDefault();
                        if (addresses.Contains(hostAddr))
                        {
                            var nodePath = $"{path}{deletedRouteId}";
                            await zooKeeper.deleteAsync(nodePath);
                        }
                    }
                }
            }
        }

        private async Task CreateSubdirectory(ZooKeeper zooKeeper, string path)
        {
            if (await zooKeeper.existsAsync(path) != null)
                return;

            if (_logger.IsEnabled(LogLevel.Information))
                _logger.LogInformation($"節點{path}不存在,將進行創建,");

            var childrens = path.Split(new[] { '/' }, StringSplitOptions.RemoveEmptyEntries);
            var nodePath = "/";

            foreach (var children in childrens)
            {
                nodePath += children;
                if (await zooKeeper.existsAsync(nodePath) == null)
                {
                    await zooKeeper.createAsync(nodePath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
                nodePath += "/";
            }
        }

        private async Task<ServiceRoute> GetRoute(byte[] data)
        {
            if (_logger.IsEnabled(LogLevel.Debug))
                _logger.LogDebug($"準備轉換服務路由,配置內容:{Encoding.UTF8.GetString(data)},");

            if (data =https://www.cnblogs.com/hongwei918/p/= null)
                return null;

            return await Task.Run(() =>
            {
                return _serializer.Deserialize<ServiceRoute>(data);
            });
        }

        private async Task<ServiceRoute> GetRoute(string path)
        {
            ServiceRoute result = null;
            var zooKeeper = await GetZooKeeper();
            var watcher = new NodeMonitorWatcher(GetZooKeeper(), path,
                 async (oldData, newData) => await NodeChange(oldData, newData));
            if (await zooKeeper.existsAsync(path) != null)
            {
                var data = https://www.cnblogs.com/hongwei918/p/(await zooKeeper.getDataAsync(path, watcher)).Data;
                watcher.SetCurrentData(data);
                result = await GetRoute(data);
            }
            return result;
        }

        private async Task<ServiceRoute[]> GetRoutes(IEnumerable<string> childrens)
        {
            var rootPath = _configInfo.RoutePath;
            if (!rootPath.EndsWith("/"))
                rootPath += "/";

            childrens = childrens.ToArray();
            var routes = new List<ServiceRoute>(childrens.Count());

            foreach (var children in childrens)
            {
                if (_logger.IsEnabled(LogLevel.Debug))
                    _logger.LogDebug($"準備從節點:{children}中獲取路由資訊,");

                var nodePath = $"{rootPath}{children}";
                var route = await GetRoute(nodePath);
                if (route != null)
                    routes.Add(route);
            }

            return routes.ToArray();
        }

        private async Task EnterRoutes()
        {
            if (_routes != null)
                return;
            var zooKeeper = await GetZooKeeper();
            var watcher = new ChildrenMonitorWatcher(GetZooKeeper(), _configInfo.RoutePath,
             async (oldChildrens, newChildrens) => await ChildrenChange(oldChildrens, newChildrens));
            if (await zooKeeper.existsAsync(_configInfo.RoutePath, watcher) != null)
            {
                var result = await zooKeeper.getChildrenAsync(_configInfo.RoutePath, watcher);
                var childrens = result.Children.ToArray();
                watcher.SetCurrentData(childrens);
                _routes = await GetRoutes(childrens);
            }
            else
            {
                if (_logger.IsEnabled(LogLevel.Warning))
                    _logger.LogWarning($"無法獲取路由資訊,因為節點:{_configInfo.RoutePath},不存在,");
                _routes = new ServiceRoute[0];
            }
        }

        private static bool DataEquals(IReadOnlyList<byte> data1, IReadOnlyList<byte> data2)
        {
            if (data1.Count != data2.Count)
                return false;
            for (var i = 0; i < data1.Count; i++)
            {
                var b1 = data1[i];
                var b2 = data2[i];
                if (b1 != b2)
                    return false;
            }
            return true;
        }

        public async Task NodeChange(byte[] oldData, byte[] newData)
        {
            if (DataEquals(oldData, newData))
                return;

            var newRoute = await GetRoute(newData);
            //得到舊的路由,
            var oldRoute = _routes.FirstOrDefault(i => i.serviceRouteDescriptor.Id == newRoute.serviceRouteDescriptor.Id);

            lock (_routes)
            {
                //洗掉舊路由,并添加上新的路由,
                _routes =
                    _routes
                        .Where(i => i.serviceRouteDescriptor.Id != newRoute.serviceRouteDescriptor.Id)
                        .Concat(new[] { newRoute }).ToArray();
            }

            //觸發路由變更事件,
            OnChanged(new ServiceRouteChangedEventArgs(newRoute, oldRoute));
        }

        public async Task ChildrenChange(string[] oldChildrens, string[] newChildrens)
        {
            if (_logger.IsEnabled(LogLevel.Debug))
                _logger.LogDebug($"最新的節點資訊:{string.Join(",", newChildrens)}");

            if (_logger.IsEnabled(LogLevel.Debug))
                _logger.LogDebug($"舊的節點資訊:{string.Join(",", oldChildrens)}");

            //計算出已被洗掉的節點,
            var deletedChildrens = oldChildrens.Except(newChildrens).ToArray();
            //計算出新增的節點,
            var createdChildrens = newChildrens.Except(oldChildrens).ToArray();

            if (_logger.IsEnabled(LogLevel.Debug))
                _logger.LogDebug($"需要被洗掉的路由節點:{string.Join(",", deletedChildrens)}");
            if (_logger.IsEnabled(LogLevel.Debug))
                _logger.LogDebug($"需要被添加的路由節點:{string.Join(",", createdChildrens)}");

            //獲取新增的路由資訊,
            var newRoutes = (await GetRoutes(createdChildrens)).ToArray();

            var routes = _routes.ToArray();
            lock (_routes)
            {
                _routes = _routes
                    //洗掉無效的節點路由,
                    .Where(i => !deletedChildrens.Contains(i.serviceRouteDescriptor.Id))
                    //連接上新的路由,
                    .Concat(newRoutes)
                    .ToArray();
            }
            //需要洗掉的路由集合,
            var deletedRoutes = routes.Where(i => deletedChildrens.Contains(i.serviceRouteDescriptor.Id)).ToArray();
            //觸發洗掉事件,
            OnRemoved(deletedRoutes.Select(route => new ServiceRouteEventArgs(route)).ToArray());

            //觸發路由被創建事件,
            OnCreated(newRoutes.Select(route => new ServiceRouteEventArgs(route)).ToArray());

            if (_logger.IsEnabled(LogLevel.Information))
                _logger.LogInformation("路由資料更新成功,");
        }


        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void Dispose()
        {
        }

        private async Task<ZooKeeper> GetZooKeeper()
        {
            return await _zookeeperClientProvider.GetZooKeeper();
        }

    }
ZooKeeperServiceRouteManager

zookeeper連接配置類:

public class ConfigInfo
    {
        /// <summary>
        /// 初始化會話超時為20秒的Zookeeper配置資訊,
        /// </summary>
        /// <param name="connectionString">連接字串,</param>
        /// <param name="routePath">路由配置路徑,</param>
        /// <param name="subscriberPath">訂閱者配置路徑</param>
        /// <param name="commandPath">服務命令配置路徑</param>
        /// <param name="cachePath">快取中心配置路徑</param>
        /// <param name="mqttRoutePath">mqtt路由配置路徑</param>
        /// <param name="chRoot">根節點,</param>
        public ConfigInfo(string connectionString, string routePath = "/services/serviceRoutes",
            string subscriberPath = "/services/serviceSubscribers",
            string commandPath = "/services/serviceCommands",
            string cachePath = "/services/serviceCaches",
            string mqttRoutePath = "/services/mqttServiceRoutes",
            string chRoot = null,
            bool reloadOnChange = false, bool enableChildrenMonitor = false) : this(connectionString,
                TimeSpan.FromSeconds(20),
                routePath,
                subscriberPath,
                commandPath,
                cachePath,
                mqttRoutePath,
                chRoot,
                reloadOnChange, enableChildrenMonitor)
        {
        }

        /// <summary>
        /// 初始化Zookeeper配置資訊,
        /// </summary>
        /// <param name="connectionString">連接字串,</param>
        /// <param name="routePath">路由配置路徑,</param>
        /// <param name="commandPath">服務命令配置路徑</param>
        /// <param name="subscriberPath">訂閱者配置路徑</param>
        /// <param name="sessionTimeout">會話超時時間,</param>
        /// <param name="cachePath">快取中心配置路徑</param>
        /// <param name="mqttRoutePath">mqtt路由配置路徑</param>
        /// <param name="chRoot">根節點,</param>
        public ConfigInfo(string connectionString, TimeSpan sessionTimeout, string routePath = "/services/serviceRoutes",
            string subscriberPath = "/services/serviceSubscribers",
            string commandPath = "/services/serviceCommands",
            string cachePath = "/services/serviceCaches",
            string mqttRoutePath = "/services/mqttServiceRoutes",
            string chRoot = null,
            bool reloadOnChange = false, bool enableChildrenMonitor = false)
        {
            CachePath = cachePath;
            ReloadOnChange = reloadOnChange;
            ChRoot = chRoot;
            CommandPath = commandPath;
            SubscriberPath = subscriberPath;
            ConnectionString = connectionString;
            RoutePath = routePath;
            SessionTimeout = sessionTimeout;
            MqttRoutePath = mqttRoutePath;
            EnableChildrenMonitor = enableChildrenMonitor;
            Addresses = connectionString?.Split(",");
        }

        public bool EnableChildrenMonitor { get; set; }

        public bool ReloadOnChange { get; set; }

        /// <summary>
        /// 連接字串,
        /// </summary>
        public string ConnectionString { get; set; }

        /// <summary>
        /// 命令配置路徑
        /// </summary>
        public string CommandPath { get; set; }

        /// <summary>
        /// 路由配置路徑,
        /// </summary>
        public string RoutePath { get; set; }

        /// <summary>
        /// 訂閱者配置路徑
        /// </summary>
        public string SubscriberPath { get; set; }

        /// <summary>
        /// 會話超時時間,
        /// </summary>
        public TimeSpan SessionTimeout { get; set; }

        /// <summary>
        /// 根節點,
        /// </summary>
        public string ChRoot { get; set; }


        public IEnumerable<string> Addresses { get; set; }

        /// <summary>
        /// 快取中心配置中心
        /// </summary>
        public string CachePath { get; set; }


        /// <summary>
        /// Mqtt路由配置路徑,
        /// </summary>
        public string MqttRoutePath { get; set; }
    }
ConfigInfo

路由和路由描述:

public class ServiceRoute
    {
        /// <summary>
        /// 服務可用地址,
        /// </summary>
        public IEnumerable<string> Address { get; set; }
        /// <summary>
        /// 服務描述符,
        /// </summary>
        public ServiceRouteDescriptor serviceRouteDescriptor { get; set; }

        #region Equality members

        /// <summary>Determines whether the specified object is equal to the current object.</summary>
        /// <returns>true if the specified object  is equal to the current object; otherwise, false.</returns>
        /// <param name="obj">The object to compare with the current object. </param>
        public override bool Equals(object obj)
        {
            var model = obj as ServiceRoute;
            if (model == null)
                return false;

            if (obj.GetType() != GetType())
                return false;

            if (model.serviceRouteDescriptor != serviceRouteDescriptor)
                return false;

            return model.Address.Count() == Address.Count() && model.Address.All(addressModel => Address.Contains(addressModel));
        }

        /// <summary>Serves as the default hash function. </summary>
        /// <returns>A hash code for the current object.</returns>
        public override int GetHashCode()
        {
            return ToString().GetHashCode();
        }

        public static bool operator ==(ServiceRoute model1, ServiceRoute model2)
        {
            return Equals(model1, model2);
        }

        public static bool operator !=(ServiceRoute model1, ServiceRoute model2)
        {
            return !Equals(model1, model2);
        }

        #endregion Equality members
    }
ServiceRoute
/// <summary>
    /// 服務描述符,
    /// </summary>
    [Serializable]
    public class ServiceRouteDescriptor
    {
        /// <summary>
        /// 初始化一個新的服務描述符,
        /// </summary>
        public ServiceRouteDescriptor()
        {
            Metadatas = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase);
        }

        /// <summary>
        /// 服務Id,
        /// </summary>
        public string Id { get; set; }

        /// <summary>
        /// 訪問的令牌
        /// </summary>
        public string Token { get; set; }

        /// <summary>
        /// 路由
        /// </summary>
        public string RoutePath { get; set; }

        /// <summary>
        /// 元資料,
        /// </summary> 
        public IDictionary<string, object> Metadatas { get; set; }

        /// <summary>
        /// 獲取一個元資料,
        /// </summary>
        /// <typeparam name="T">元資料型別,</typeparam>
        /// <param name="name">元資料名稱,</param>
        /// <param name="def">如果指定名稱的元資料不存在則回傳這個引數,</param>
        /// <returns>元資料值,</returns>
        public T GetMetadata<T>(string name, T def = default(T))
        {
            if (!Metadatas.ContainsKey(name))
                return def;

            return (T)Metadatas[name];
        }

        #region Equality members

        /// <summary>Determines whether the specified object is equal to the current object.</summary>
        /// <returns>true if the specified object  is equal to the current object; otherwise, false.</returns>
        /// <param name="obj">The object to compare with the current object. </param>
        public override bool Equals(object obj)
        {
            var model = obj as ServiceRouteDescriptor;
            if (model == null)
                return false;

            if (obj.GetType() != GetType())
                return false;

            if (model.Id != Id)
                return false;

            return model.Metadatas.Count == Metadatas.Count && model.Metadatas.All(metadata =https://www.cnblogs.com/hongwei918/p/>
            {
                object value;
                if (!Metadatas.TryGetValue(metadata.Key, out value))
                    return false;

                if (metadata.Value =https://www.cnblogs.com/hongwei918/p/= null && value =https://www.cnblogs.com/hongwei918/p/= null)
                    return true;
                if (metadata.Value =https://www.cnblogs.com/hongwei918/p/= null || value =https://www.cnblogs.com/hongwei918/p/= null)
                    return false;

                return metadata.Value.Equals(value);
            });
        }

        /// <summary>Serves as the default hash function. </summary>
        /// <returns>A hash code for the current object.</returns>
        public override int GetHashCode()
        {
            return ToString().GetHashCode();
        }

        public static bool operator ==(ServiceRouteDescriptor model1, ServiceRouteDescriptor model2)
        {
            return Equals(model1, model2);
        }

        public static bool operator !=(ServiceRouteDescriptor model1, ServiceRouteDescriptor model2)
        {
            return !Equals(model1, model2);
        }

        #endregion Equality members
    }
ServiceRouteDescriptor

Watcher監聽器:

子節點監聽器:

internal class ChildrenMonitorWatcher : Watcher
    {
        private readonly Task<ZooKeeper> _zooKeeperCall;
        private readonly string _path;
        private readonly Action<string[], string[]> _action;
        private string[] _currentData = https://www.cnblogs.com/hongwei918/p/new string[0];

        public ChildrenMonitorWatcher(Task<ZooKeeper> zooKeeperCall, string path, Action<string[], string[]> action)
        {
            _zooKeeperCall = zooKeeperCall;
            _path = path;
            _action = action;
        }

        public ChildrenMonitorWatcher SetCurrentData(string[] currentData)
        {
            _currentData = currentData ?? new string[0];

            return this;
        }

        #region Overrides of WatcherBase

        public override async Task process(WatchedEvent watchedEvent)
        {
            if (watchedEvent.getState() != Event.KeeperState.SyncConnected || watchedEvent.getPath() != _path)
                return;
            var zooKeeper = await _zooKeeperCall;
            //Func<ChildrenMonitorWatcher> getWatcher = () => new ChildrenMonitorWatcher(_zooKeeperCall, path, _action);
            Task<ChildrenMonitorWatcher> getWatcher =  Task.Run(() => {return new ChildrenMonitorWatcher(_zooKeeperCall, _path, _action); });
            switch (watchedEvent.get_Type())
            {
                //創建之后開始監視下面的子節點情況,
                case Event.EventType.NodeCreated:
                    await zooKeeper.getChildrenAsync(_path, await getWatcher);
                    break;

                //子節點修改則繼續監控子節點資訊并通知客戶端資料變更,
                case Event.EventType.NodeChildrenChanged:
                    try
                    {
                        var watcher = await getWatcher;
                        var result = await zooKeeper.getChildrenAsync(_path, watcher);
                        var childrens = result.Children.ToArray();
                        _action(_currentData, childrens);
                        watcher.SetCurrentData(childrens);
                    }
                    catch (KeeperException.NoNodeException)
                    {
                        _action(_currentData, new string[0]);
                    }
                    break;

                //洗掉之后開始監控自身節點,并通知客戶端資料被清空,
                case Event.EventType.NodeDeleted:
                    {
                        var watcher = await getWatcher;
                        await zooKeeper.existsAsync(_path, watcher);
                        _action(_currentData, new string[0]);
                        watcher.SetCurrentData(new string[0]);
                    }
                    break;
            }
        }
        #endregion Overrides of WatcherBase
    }
ChildrenMonitorWatcher

當前節點監聽器:

internal class NodeMonitorWatcher : Watcher
    {
        private readonly Task<ZooKeeper> _zooKeeperCall;
        private readonly string _path;
        private readonly Action<byte[], byte[]> _action;
        private byte[] _currentData;

        public NodeMonitorWatcher(Task<ZooKeeper> zooKeeperCall, string path, Action<byte[], byte[]> action)
        {
            _zooKeeperCall = zooKeeperCall;
            _path = path;
            _action = action;
        }

        public NodeMonitorWatcher SetCurrentData(byte[] currentData)
        {
            _currentData = currentData;

            return this;
        }

        #region Overrides of WatcherBase

        public override async Task process(WatchedEvent watchedEvent)
        {
            switch (watchedEvent.get_Type())
            {
                case Event.EventType.NodeDataChanged:
                    var zooKeeper = await _zooKeeperCall;
                    var watcher = new NodeMonitorWatcher(_zooKeeperCall, _path, _action);
                    var data = https://www.cnblogs.com/hongwei918/p/await zooKeeper.getDataAsync(_path, watcher);
                    var newData =https://www.cnblogs.com/hongwei918/p/ data.Data;
                    _action(_currentData, newData);
                    watcher.SetCurrentData(newData);
                    break;
            }
        }

        #endregion Overrides of WatcherBase
    }
NodeMonitorWatcher

連接斷開監聽器:

internal class ReconnectionWatcher : Watcher
    {
        private readonly Action _reconnection;

        public ReconnectionWatcher(Action reconnection)
        {
            _reconnection = reconnection;
        }

        #region Overrides of Watcher

        /// <summary>Processes the specified event.</summary>
        /// <param name="watchedEvent">The event.</param>
        /// <returns></returns>
        public override async Task process(WatchedEvent watchedEvent)
        {
            var state = watchedEvent.getState();
            switch (state)
            {
                case Event.KeeperState.Expired:
                case Event.KeeperState.Disconnected:
                    {
                        _reconnection();
                        break;
                    }
            }
            await Task.CompletedTask;
        }

        #endregion Overrides of Watcher
    }
ReconnectionWatcher

 

轉載請註明出處,本文鏈接:https://www.uj5u.com/net/110610.html

標籤:.NET Core

上一篇:最近優化個人博客,一下調整了很多問題

下一篇:.net core 3.0 Signalr - 實作一個業務推送系統

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • WebAPI簡介

    Web體系結構: 有三個核心:資源(resource),URL(統一資源識別符號)和表示 他們的關系是這樣的:一個資源由一個URL進行標識,HTTP客戶端使用URL定位資源,表示是從資源回傳資料,媒體型別是資源回傳的資料格式。 接下來我們說下HTTP. HTTP協議的系統是一種無狀態的方式,使用請求/ ......

    uj5u.com 2020-09-09 22:07:47 more
  • asp.net core 3.1 入口:Program.cs中的Main函式

    本文分析Program.cs 中Main()函式中代碼的運行順序分析asp.net core程式的啟動,重點不是剖析原始碼,而是理清程式開始時執行的順序。到呼叫了哪些實體,哪些法方。asp.net core 3.1 的程式入口在專案Program.cs檔案里,如下。ususing System; us ......

    uj5u.com 2020-09-09 22:07:49 more
  • asp.net網站作為websocket服務端的應用該如何寫

    最近被websocket的一個問題困擾了很久,有一個需求是在web網站中搭建websocket服務。客戶端通過網頁與服務器建立連接,然后服務器根據ip給客戶端網頁發送資訊。 其實,這個需求并不難,只是剛開始對websocket的內容不太了解。上網搜索了一下,有通過asp.net core 實作的、有 ......

    uj5u.com 2020-09-09 22:08:02 more
  • ASP.NET 開源匯入匯出庫Magicodes.IE Docker中使用

    Magicodes.IE在Docker中使用 更新歷史 2019.02.13 【Nuget】版本更新到2.0.2 【匯入】修復單列匯入的Bug,單元測驗“OneColumnImporter_Test”。問題見(https://github.com/dotnetcore/Magicodes.IE/is ......

    uj5u.com 2020-09-09 22:08:05 more
  • 在webform中使用ajax

    如果你用過Asp.net webform, 說明你也算是.NET 開發的老兵了。WEBform應該是2011 2013左右,當時還用visual studio 2005、 visual studio 2008。后來基本都用的是MVC。 如果是新開發的專案,估計沒人會用webform技術。但是有些舊版 ......

    uj5u.com 2020-09-09 22:08:50 more
  • iis添加asp.net網站,訪問提示:由于擴展配置問題而無法提供您請求的

    今天在iis服務器配置asp.net網站,遇到一個問題,記錄一下: 問題:由于擴展配置問題而無法提供您請求的頁面。如果該頁面是腳本,請添加處理程式。如果應下載檔案,請添加 MIME 映射。 WindowServer2012服務器,添加角色安裝完.netframework和iis之后,運行aspx頁面 ......

    uj5u.com 2020-09-09 22:10:00 more
  • WebAPI-處理架構

    帶著問題去思考,大家好! 問題1:HTTP請求和回傳相應的HTTP回應資訊之間發生了什么? 1:首先是最底層,托管層,位于WebAPI和底層HTTP堆疊之間 2:其次是 訊息處理程式管道層,這里比如日志和快取。OWIN的參考是將訊息處理程式管道的一些功能下移到堆疊下端的OWIN中間件了。 3:控制器處理 ......

    uj5u.com 2020-09-09 22:11:13 more
  • 微信門戶開發框架-使用指導說明書

    微信門戶應用管理系統,采用基于 MVC + Bootstrap + Ajax + Enterprise Library的技術路線,界面層采用Boostrap + Metronic組合的前端框架,資料訪問層支持Oracle、SQLServer、MySQL、PostgreSQL等資料庫。框架以MVC5,... ......

    uj5u.com 2020-09-09 22:15:18 more
  • WebAPI-HTTP編程模型

    帶著問題去思考,大家好!它是什么?它包含什么?它能干什么? 訊息 HTTP編程模型的核心就是訊息抽象,表示為:HttPRequestMessage,HttpResponseMessage.用于客戶端和服務端之間交換請求和回應訊息。 HttpMethod類包含了一組靜態屬性: private stat ......

    uj5u.com 2020-09-09 22:15:23 more
  • 部署WebApi隨筆

    一、跨域 NuGet參考Microsoft.AspNet.WebApi.Cors WebApiConfig.cs中配置: // Web API 配置和服務 config.EnableCors(new EnableCorsAttribute("*", "*", "*")); 二、清除默認回傳XML格式 ......

    uj5u.com 2020-09-09 22:15:48 more
最新发布
  • C#多執行緒學習(二) 如何操縱一個執行緒

    <a href="https://www.cnblogs.com/x-zhi/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/2943582/20220801082530.png" alt="" /></...

    uj5u.com 2023-04-19 09:17:20 more
  • C#多執行緒學習(二) 如何操縱一個執行緒

    C#多執行緒學習(二) 如何操縱一個執行緒 執行緒學習第一篇:C#多執行緒學習(一) 多執行緒的相關概念 下面我們就動手來創建一個執行緒,使用Thread類創建執行緒時,只需提供執行緒入口即可。(執行緒入口使程式知道該讓這個執行緒干什么事) 在C#中,執行緒入口是通過ThreadStart代理(delegate)來提供的 ......

    uj5u.com 2023-04-19 09:16:49 more
  • 記一次 .NET某醫療器械清洗系統 卡死分析

    <a href="https://www.cnblogs.com/huangxincheng/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/214741/20200614104537.png" alt="" /&g...

    uj5u.com 2023-04-18 08:39:04 more
  • 記一次 .NET某醫療器械清洗系統 卡死分析

    一:背景 1. 講故事 前段時間協助訓練營里的一位朋友分析了一個程式卡死的問題,回過頭來看這個案例比較經典,這篇稍微整理一下供后來者少踩坑吧。 二:WinDbg 分析 1. 為什么會卡死 因為是表單程式,理所當然就是看主執行緒此時正在做什么? 可以用 ~0s ; k 看一下便知。 0:000> k # ......

    uj5u.com 2023-04-18 08:33:10 more
  • SignalR, No Connection with that ID,IIS

    <a href="https://www.cnblogs.com/smartstar/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/u36196.jpg" alt="" /></a>...

    uj5u.com 2023-03-30 17:21:52 more
  • 一次對pool的誤用導致的.net頻繁gc的診斷分析

    <a href="https://www.cnblogs.com/dotnet-diagnostic/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/3115652/20230225090434.png" alt=""...

    uj5u.com 2023-03-28 10:15:33 more
  • 一次對pool的誤用導致的.net頻繁gc的診斷分析

    <a href="https://www.cnblogs.com/dotnet-diagnostic/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/3115652/20230225090434.png" alt=""...

    uj5u.com 2023-03-28 10:13:31 more
  • C#遍歷指定檔案夾中所有檔案的3種方法

    <a href="https://www.cnblogs.com/xbhp/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/957602/20230310105611.png" alt="" /></a&...

    uj5u.com 2023-03-27 14:46:55 more
  • C#/VB.NET:如何將PDF轉為PDF/A

    <a href="https://www.cnblogs.com/Carina-baby/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/2859233/20220427162558.png" alt="" />...

    uj5u.com 2023-03-27 14:46:35 more
  • 武裝你的WEBAPI-OData聚合查詢

    <a href="https://www.cnblogs.com/podolski/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/616093/20140323000327.png" alt="" /><...

    uj5u.com 2023-03-27 14:46:16 more