最近手上有點時間,打算繼續了解下go-micro的發布訂閱(訊息),看了micro的examples后,有個疑問,go-micro在提供發布訂閱的插件Broker(以及幾種實作)的同時,go-micro本身還實作了Publish(Client)以及Subscribe(Server)功能,于是翻了下原始碼,做個記錄,
Broker
Broker是go-micro定義的一個異步訊息的介面,同時使用插件的形式,可隨意在不同的實作(http,nats,rabbitmq)之間無縫切換,
// Broker is an interface used for asynchronous messaging.
type Broker interface {
Init(...Option) error
Options() Options
Address() string
Connect() error
Disconnect() error
Publish(topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
String() string
}
從上面的介面可以看出,使用Broker來完成發布訂閱只需要以下幾步:
- 初始化一個Broker(
Init) - 連接Broker(
Connect) - 使用準備好的Broker發布/訂閱(
Publish/Subscribe) - 關閉Broker(
Disconnect)
go-micro中默認的broker實作
go-micro默認有基于http的Broker實作,可以直接使用,micro有給出具體的example,具體看下source code中的實作,
下面是go-micro中broer.go中對DefaultBroker的相關code:
var (
DefaultBroker Broker = NewBroker()
)
func Init(opts ...Option) error {
return DefaultBroker.Init(opts...)
}
func Connect() error {
return DefaultBroker.Connect()
}
func Disconnect() error {
return DefaultBroker.Disconnect()
}
func Publish(topic string, msg *Message, opts ...PublishOption) error {
return DefaultBroker.Publish(topic, msg, opts...)
}
func Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
return DefaultBroker.Subscribe(topic, handler, opts...)
}
func String() string {
return DefaultBroker.String()
}
可以看到都是基于NewBroker()回傳的broker實體來做的公用方法封裝,我們進一步看看,
// NewBroker returns a new http broker
func NewBroker(opts ...Option) Broker {
return newHttpBroker(opts...)
}
這里是直接回傳了一個http實作的broker(和上面提到的默認是基于http實作的匹配),繼續跟newHttpBroker,
這里這列出部分code,詳細的可直接參考go-micro下的http.go
h := &httpBroker{
id: uuid.New().String(),
address: addr,
opts: options,
r: options.Registry,
c: &http.Client{Transport: newTransport(options.TLSConfig)},
subscribers: make(map[string][]*httpSubscriber),
exit: make(chan chan error),
mux: http.NewServeMux(),
inbox: make(map[string][][]byte),
}
這里的核心是new了一個httpBroker,做為Broker介面的實作,在具體的實作就不在這里說了,下來我們看看上面提到介面的實作,
Init
func (h *httpBroker) Init(opts ...Option) error {
h.RLock()
if h.running {
h.RUnlock()
return errors.New("cannot init while connected")
}
h.RUnlock()
h.Lock()
defer h.Unlock()
for _, o := range opts {
o(&h.opts)
}
if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {
h.address = h.opts.Addrs[0]
}
if len(h.id) == 0 {
h.id = "go.micro.http.broker-" + uuid.New().String()
}
// get registry
reg := h.opts.Registry
if reg == nil {
reg = registry.DefaultRegistry
}
// get cache
if rc, ok := h.r.(cache.Cache); ok {
rc.Stop()
}
// set registry
h.r = cache.New(reg)
// reconfigure tls config
if c := h.opts.TLSConfig; c != nil {
h.c = &http.Client{
Transport: newTransport(c),
}
}
return nil
}
從上面的code中可以看到,Init的作用就是初始化各種配置,如果Option引數有提供,就是用引數提供的,如果沒有就在這里設定一個,這里有2個點我們需要額外關注下:
-
Registry
Registry是注冊中心,如果option中沒有提供registry,就會使用go-micro默認實作的(msdn)
-
TLSConfig
TLSConfig是針對https的配置,默認是http
Connect
func (h *httpBroker) Connect() error {
h.RLock()
if h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
var l net.Listener
var err error
if h.opts.Secure || h.opts.TLSConfig != nil {
config := h.opts.TLSConfig
fn := func(addr string) (net.Listener, error) {
if config == nil {
hosts := []string{addr}
// check if its a valid host:port
if host, _, err := net.SplitHostPort(addr); err == nil {
if len(host) == 0 {
hosts = maddr.IPs()
} else {
hosts = []string{host}
}
}
// generate a certificate
cert, err := mls.Certificate(hosts...)
if err != nil {
return nil, err
}
config = &tls.Config{Certificates: []tls.Certificate{cert}}
}
return tls.Listen("tcp", addr, config)
}
l, err = mnet.Listen(h.address, fn)
} else {
fn := func(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
}
l, err = mnet.Listen(h.address, fn)
}
if err != nil {
return err
}
addr := h.address
h.address = l.Addr().String()
go http.Serve(l, h.mux)
go func() {
h.run(l)
h.Lock()
h.opts.Addrs = []string{addr}
h.address = addr
h.Unlock()
}()
// get registry
reg := h.opts.Registry
if reg == nil {
reg = registry.DefaultRegistry
}
// set cache
h.r = cache.New(reg)
// set running
h.running = true
return nil
}
Connect方法的主要作用是創建一個Htto Server用來接收Publish時發送的訊息
Subscribe
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
var err error
var host, port string
options := NewSubscribeOptions(opts...)
// parse address for host, port
host, port, err = net.SplitHostPort(h.Address())
if err != nil {
return nil, err
}
addr, err := maddr.Extract(host)
if err != nil {
return nil, err
}
var secure bool
if h.opts.Secure || h.opts.TLSConfig != nil {
secure = true
}
// register service
node := ®istry.Node{
Id: topic + "-" + h.id,
Address: mnet.HostPort(addr, port),
Metadata: map[string]string{
"secure": fmt.Sprintf("%t", secure),
"broker": "http",
"topic": topic,
},
}
// check for queue group or broadcast queue
version := options.Queue
if len(version) == 0 {
version = broadcastVersion
}
service := ®istry.Service{
Name: serviceName,
Version: version,
Nodes: []*registry.Node{node},
}
// generate subscriber
subscriber := &httpSubscriber{
opts: options,
hb: h,
id: node.Id,
topic: topic,
fn: handler,
svc: service,
}
// subscribe now
if err := h.subscribe(subscriber); err != nil {
return nil, err
}
// return the subscriber
return subscriber, nil
}
這部分代碼的核心功能就是創建用于訂閱的server,一個topic創建一個server并收集(注冊)到httpSubscriber的svc串列中(發布訊息時使用topic在subscriber的svc串列中查詢到對應的server給他發送訊息),
Publish
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
// create the message first
m := &Message{
Header: make(map[string]string),
Body: msg.Body,
}
for k, v := range msg.Header {
m.Header[k] = v
}
m.Header["Micro-Topic"] = topic
// encode the message
b, err := h.opts.Codec.Marshal(m)
if err != nil {
return err
}
// save the message
h.saveMessage(topic, b)
// now attempt to get the service
h.RLock()
s, err := h.r.GetService(serviceName)
if err != nil {
h.RUnlock()
return err
}
h.RUnlock()
pub := func(node *registry.Node, t string, b []byte) error {
scheme := "http"
// check if secure is added in metadata
if node.Metadata["secure"] == "true" {
scheme = "https"
}
vals := url.Values{}
vals.Add("id", node.Id)
uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultPath, vals.Encode ())
r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
if err != nil {
return err
}
// discard response body
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
return nil
}
srv := func(s []*registry.Service, b []byte) {
for _, service := range s {
var nodes []*registry.Node
for _, node := range service.Nodes {
// only use nodes tagged with broker http
if node.Metadata["broker"] != "http" {
continue
}
// look for nodes for the topic
if node.Metadata["topic"] != topic {
continue
}
nodes = append(nodes, node)
}
// only process if we have nodes
if len(nodes) == 0 {
continue
}
switch service.Version {
// broadcast version means broadcast to all nodes
case broadcastVersion:
var success bool
// publish to all nodes
for _, node := range nodes {
// publish async
if err := pub(node, topic, b); err == nil {
success = true
}
}
// save if it failed to publish at least once
if !success {
h.saveMessage(topic, b)
}
default:
// select node to publish to
node := nodes[rand.Int()%len(nodes)]
// publish async to one node
if err := pub(node, topic, b); err != nil {
// if failed save it
h.saveMessage(topic, b)
}
}
}
}
// do the rest async
go func() {
// get a third of the backlog
messages := h.getMessage(topic, 8)
delay := (len(messages) > 1)
// publish all the messages
for _, msg := range messages {
// serialize here
srv(s, msg)
// sending a backlog of messages
if delay {
time.Sleep(time.Millisecond * 100)
}
}
}()
return nil
看過了上面的Subscribe實作,這里的Publish就比較簡單
- 創建訊息體并存盤在inbox
- 根據topic以及broker的標簽(這里是固定http)來查找訂閱的server(在上面訂閱模塊創建的)
上面有可能會查找出多個node(訂閱server),所以里面還有一個版本的機制,如果指定了版本就會給所有的匹配節點發送(默認是隨機發送一個)
- 使用http post的方式(異步)把訊息發送出去
Disconnect
func (h *httpBroker) Disconnect() error {
h.RLock()
if !h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
// stop cache
rc, ok := h.r.(cache.Cache)
if ok {
rc.Stop()
}
// exit and return err
ch := make(chan error)
h.exit <- ch
err := <-ch
// set not running
h.running = false
return err
}
這部分功能很簡單,清空快取并發送退出的訊息,同時停止服務
以上就是go-micro中默認基于http的broker實作,
go-micro中對于broker的包裝
在看完broker的http默認實作后,我們對于broker有了一個大體了解,接下來我們在看下go-micro對于broker做的包裝部分,應該是為了簡化使用(確實只需要一步就可以),
訂閱RegisterSubscriber:
func main() {
// create a service
service := micro.NewService(
micro.Name("go.micro.srv.pubsub"),
)
// parse command line
service.Init()
// register subscriber
micro.RegisterSubscriber("example.topic.pubsub.1", service.Server(), new(Sub))
// register subscriber with queue, each message is delivered to a unique subscriber
micro.RegisterSubscriber("example.topic.pubsub.2", service.Server(), subEv, server.SubscriberQueue("queue.pubsub"))
if err := service.Run(); err != nil {
log.Fatal(err)
}
}
發布NewPublisher, Publish:
func main() {
// create a service
service := micro.NewService(
micro.Name("go.micro.cli.pubsub"),
)
// parse command line
service.Init()
// create publisher
pub1 := micro.NewPublisher("example.topic.pubsub.1", service.Client())
pub2 := micro.NewPublisher("example.topic.pubsub.2", service.Client())
// pub to topic 1
go sendEv("example.topic.pubsub.1", pub1)
// pub to topic 2
go sendEv("example.topic.pubsub.2", pub2)
// block forever
select {}
}
以上只是代碼節選,具體使用方法可以參考example中的pubsub,
Subscriber
訂閱對比直接用Broker只需要一步RegisterSubscriber,我們看看里面實作
//go-micro/micro.go
// RegisterSubscriber is syntactic sugar for registering a subscriber
func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error {
return s.Subscribe(s.NewSubscriber(topic, h, opts...))
}
//go-micro/server/rpc_server.go
func (s *rpcServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber {
return s.router.NewSubscriber(topic, sb, opts...)
}
func (s *rpcServer) Subscribe(sb Subscriber) error {
s.Lock()
defer s.Unlock()
if err := s.router.Subscribe(sb); err != nil {
return err
}
s.subscribers[sb] = nil
return nil
}
//go-micro/server/rpc_router.go
// router represents an RPC router.
type router struct {
.......
subscribers map[string][]*subscriber
}
//go-micro/server/subscriber.go
type subscriber struct {
topic string
rcvr reflect.Value
typ reflect.Type
subscriber interface{}
handlers []*handler
endpoints []*registry.Endpoint
opts SubscriberOptions
}
上面的節選code可以看出,在默認server(rpcServer)中的router中定義了個map型別的變數subscribers用來存盤訂閱的topic和對應處理的subscriber,server在接收到訊息后,只需要根據topic去map中找到subscriber,去處理即可,
subscriber中具體的處理,可以從定義中看出來,里面存盤對應路由和回應的handler(server本身的功能),有興趣可以在go-micro/server/subscriber.go看看具體代碼實作,
Publisher
發布是在go-micro的默認client實作(rpc_client)里面定義了一個默認的broker(上面有分析過的http實作)
//go-micro/micro.go
// Deprecated: NewPublisher returns a new Publisher
func NewPublisher(topic string, c client.Client) Event {
return NewEvent(topic, c)
}
// NewEvent creates a new event publisher
func NewEvent(topic string, c client.Client) Event {
if c == nil {
c = client.NewClient()
}
return &event{c, topic}
}
//go-micro/event.go
type event struct {
c client.Client
topic string
}
func (e *event) Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error {
return e.c.Publish(ctx, e.c.NewMessage(e.topic, msg), opts...)
}
這里可以看到實際上是使用傳遞進來的client來初始化一個event,并用來發送訊息,如果傳遞的是空,默認創建一個client(rpcClient),
總結
經過以上程序的追蹤,最終總結下來就幾點:
- broker定義了介面,micro提供的插件的形式可無縫替換實作
- go-micro提供了一個默認的broker實作,是基于http
- go-micro的基于默認的server、client以及brkoer包裝了一套更簡單的pub和sub方法
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/1904.html
標籤:Go
上一篇:Go的100天之旅-09Map
下一篇:golang nsq示例使用介紹
