Consul 介紹
服務注冊與發現采用 consul 組件,是google開源的一個使用go語言開發的服務發現、配置管理中心服務,內置了服務注冊與發現框架、分布一致性協議實作、健康檢查、Key/Value存盤、多資料中心方案等,

Consul使用gossip協議管理成員關系、廣播訊息到整個集群,他有兩個gossip pool(LAN pool和WAN pool),LAN pool是同一個資料中心內部通信的,WAN pool是多個資料中心通信的,LAN pool有多個,WAN pool只有一個,
專案中應用
完整代碼:
https://github.com/Justin02180218/micro-kit
組態檔
consul:
addr: "http://consul-server:8500"
interval: "10s"
timeout: "1s"
grpc:
retrymax: 3
retrytimeout: 500
name: "book-rpc-service"
在 /etc/hosts 中配置 consul-server 域名
config.go
type ConsulConfig struct {
Addr string `json:"addr" yaml:"addr"`
Interval string `json:"interval" yaml:"interval"`
Timeout string `json:"timeout" yaml:"timeout"`
Client struct {
RetryMax int `json:"retrymax" yaml:"retrymax"`
RetryTimeout int `json:"retrytimeout" yaml:"retrytimeout"`
}
}
type GRPCConfig struct {
RetryMax int `json:"retrymax" yaml:"retrymax"`
RetryTimeout int `json:"retrytimeout" yaml:"retrytimeout"`
Name string `json:"name" yaml:"name"`
}
pkg/registers
在 pkg 下新建目錄 registers,創建 consul.go 檔案:

代碼如下:
// 鏈接 consul
func connectConsul(consulAddr string) (client consul.Client) {
consulConfig := api.DefaultConfig()
consulConfig.Address = consulAddr
consulClient, err := api.NewClient(consulConfig)
if err != nil {
panic(err)
}
client = consul.NewClient(consulClient)
return
}
// 向 consul 注冊服務
func InitRegister(cfg *configs.AppConfig, check api.AgentServiceCheck, logger log.Logger) (registrar sd.Registrar) {
rand.Seed(time.Now().UnixNano())
name := cfg.ServerConfig.Name
addr := utils.LocalIP()
port := cfg.ServerConfig.Port
consulAddr := cfg.ConsulConfig.Addr
client := connectConsul(consulAddr)
num := rand.Intn(100)
asr := api.AgentServiceRegistration{
ID: name + "-" + strconv.Itoa(num),
Name: name,
Address: addr,
Port: port,
Tags: []string{name},
Check: &check,
}
registrar = consul.NewRegistrar(client, &asr, logger)
return
}
// restful 服務檢測
func HttpCheck(port int, interval, timeout string) api.AgentServiceCheck {
return api.AgentServiceCheck{
HTTP: "http://" + utils.LocalIP() + ":" + strconv.Itoa(port) + "/health",
Interval: interval,
Timeout: timeout,
Notes: "Http Health Check",
}
}
// gRPC 服務檢測
func GRPCCheck(port int, interval, timeout string) api.AgentServiceCheck {
return api.AgentServiceCheck{
GRPC: utils.LocalIP() + ":" + strconv.Itoa(port) + "/health",
Interval: interval,
Timeout: timeout,
Notes: "GRPC Health Check",
}
}
service層
以 library-user-service 為例,其他微服務相同,
// 增加函式
HealthCheck() bool
// 實作
func (u *UserServiceImpl) HealthCheck() bool {
return true
}
endpoint層
以 library-user-service 為例,其他微服務相同,
// 增加屬性
HealthEndpoint endpoint.Endpoint
func MakeHealthEndpoint(svc service.UserService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
status := svc.HealthCheck()
return status, nil
}
}
transport層
以 library-user-service 為例,其他微服務相同,
r.GET("/health", func(c *gin.Context) {
kithttp.NewServer(
endpoints.HealthEndpoint,
decodeHealthRequest,
utils.EncodeJsonResponse,
).ServeHTTP(c.Writer, c.Request)
})
main.go
以 library-user-service 為例,其他微服務相同,
// 配置日志
var logger log.Logger
{
logger = log.NewLogfmtLogger(os.Stderr)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = log.With(logger, "caller", log.DefaultCaller)
}
// consul 注冊
check := registers.HttpCheck(configs.Conf.ServerConfig.Port, configs.Conf.ConsulConfig.Interval, configs.Conf.ConsulConfig.Timeout)
registrar := registers.InitRegister(configs.Conf, check, logger)
go func() {
registrar.Register()
errChan <- r.Run(fmt.Sprintf(":%s", strconv.Itoa(configs.Conf.ServerConfig.Port)))
}()
registrar.Deregister()
啟動 consul
使用 Docker 啟動 consul
docker pull progrium/consul
docker run --rm -p 8400:8400 -p 8500:8500 -p 8600:53/udp -h node1 progrium/consul -server -bootstrap -ui-dir /ui
啟動服務,然后在瀏覽器地址欄輸入:http://consul-server:8500/

可以看到三個微服務都以注冊到 consul 上,健康檢測也都已經通過,
呼叫
以前 user-service 呼叫 book-rpc-service 的介面,使用的是gPRC Client的方式,現在服務注冊的 consul 上,user-service 通過 consul 獲取已經注冊的 book-rpc-service 的一個實體,然后呼叫這個實體上的介面,

在 consul.go 檔案中增加如下代碼:
func GRPCClient(cfg *configs.AppConfig, makeEndpoint func(string) endpoint.Endpoint, logger log.Logger) endpoint.Endpoint {
consulAddr := cfg.ConsulConfig.Addr
retryMax := cfg.GRPCConfig.RetryMax
retryTimeout := cfg.GRPCConfig.RetryTimeout
name := cfg.GRPCConfig.Name
client := connectConsul(consulAddr)
instance := consul.NewInstancer(client, logger, name, []string{name}, true)
factory := factoryFor(makeEndpoint)
endpointer := sd.NewEndpointer(instance, factory, logger)
// 負載均衡采用輪詢策略
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, time.Millisecond*time.Duration(retryTimeout), balancer)
return retry
}
func factoryFor(makeEndpoint func(string,) endpoint.Endpoint) sd.Factory {
return func(instance string) (endpoint.Endpoint, io.Closer, error) {
endpoint := makeEndpoint(instance)
return endpoint, nil, nil
}
}
在 endpoint 目錄下增加檔案 book_rpc_endpoint.go,代碼如下:
type BookRPCEndpoints struct {
FindBooksEndpoint endpoint.Endpoint
}
func MakeFindBooksEndpoint(instance string) endpoint.Endpoint {
conn, err := grpc.Dial(instance, grpc.WithInsecure())
if err != nil {
fmt.Println(err)
return nil
}
findBooksEndpoint := grpctransport.NewClient(
conn, "book.Book", "FindBooksByUserID",
encodeGRPCFindBooksRequest,
decodeGRPCFindBooksResponse,
pbbook.BooksResponse{},
).Endpoint()
return findBooksEndpoint
}
修改 service.go
type UserServiceImpl struct {
userDao dao.UserDao
grpcClient kitendpoint.Endpoint
}
func NewUserServiceImpl(userDao dao.UserDao, grpcClient kitendpoint.Endpoint) UserService {
return &UserServiceImpl{
userDao: userDao,
grpcClient: grpcClient,
}
}
func (u *UserServiceImpl) FindBooksByUserID(ctx context.Context, id uint64) (interface{}, error) {
res, err := u.grpcClient(ctx, id)
if err != nil {
return nil, err
}
return res, nil
}
修改 main.go
findBooksEndpoint := endpoint.MakeFindBooksEndpoint
grpcClient := registers.GRPCClient(configs.Conf, findBooksEndpoint, tracer, logger)
userService := service.NewUserServiceImpl(userDao, grpcClient)
介面測驗同 《五,微服務library-book-grpc-service》
下一篇文章,我們在微服務中加入熔斷、降級功能,
完整代碼:
https://github.com/Justin02180218/micro-kit
更多【分布式專輯】【架構實戰專輯】系列文章,請關注公眾號

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/303005.html
標籤:其他
上一篇:IP地址的主機號和網路號
