代码拉取完成,页面将自动刷新
同步操作将从 Plato/Service-Box-go 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
package sbox
import (
"fmt"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/log"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/transport"
"gitee.com/dennis-kk/service-box-go/util/errors"
"sync"
)
const (
ServiceAdd = iota
ServiceClose
)
type (
//ServiceEvent 服务变化事件类型 ServiceAdd 添加 ServiceClose 关闭
ServiceEvent int
//ServiceWatcher 是服务监听回调函数类型
ServiceWatcher func(eType ServiceEvent, name string, host string, proxy idlrpc.IProxy)
//ProxyFinder 服务proxy获取接口
ProxyFinder func(uuid uint64, trans transport.ITransport) (idlrpc.IProxy, error)
WatcherList struct {
uuid uint64
watched map[string]ServiceWatcher
}
//WatcherManager 服务变化监听管理
WatcherManager struct {
finder ProxyFinder //proxy 获取函数
wg sync.WaitGroup //等待协程结束
rw sync.RWMutex //读写锁
logger log.ILogger //日志
watchersMap map[string]*WatcherList //watcher列表
}
)
func (w *WatcherManager) init(finder ProxyFinder, logger log.ILogger) {
w.watchersMap = make(map[string]*WatcherList)
w.wg = sync.WaitGroup{}
w.rw = sync.RWMutex{}
w.finder = finder
w.logger = logger
}
func (w *WatcherManager) shutdown() {
w.wg.Wait()
w.watchersMap = nil
}
//addWatcher 添加一个服务的监听事件,如果一个函数被重复添加会返回报错
func (w *WatcherManager) addWatcher(name string, uuid uint64, watcher ServiceWatcher) error {
w.rw.Lock()
defer w.rw.Unlock()
wg, ok := w.watchersMap[name]
if !ok {
wg = &WatcherList{
uuid: uuid,
watched: map[string]ServiceWatcher{},
}
w.watchersMap[name] = wg
}
//校验uuid似否正确
if wg.uuid != uuid {
return errors.RepeatedWatcher
}
//生成handle
handle := fmt.Sprintf("%v", watcher)
//去重
if _, ok := wg.watched[handle]; ok {
return errors.RepeatedWatcher
}
//加入注册
wg.watched[handle] = watcher
return nil
}
//removeWatcher 移除特定服务的特定监听
func (w *WatcherManager) removeWatcher(name string, watcher ServiceWatcher) {
w.rw.Lock()
defer w.rw.Unlock()
wg, ok := w.watchersMap[name]
if !ok {
return
}
//生成handle
handle := fmt.Sprintf("%v", watcher)
if _, ok := wg.watched[handle]; ok {
delete(wg.watched, handle)
}
//如果监听空了,清理对应的handle
if len(wg.watched) == 0 {
delete(w.watchersMap, name)
}
return
}
func (w *WatcherManager) onConnected(name string, trans *BoxChannel) {
//新连接到来,触发所有监听的回调函数,先在主线程做预检查,减少增加协程得开销
var uuid uint64
w.rw.RLock()
wg, ok := w.watchersMap[name]
if ok {
uuid = wg.uuid
}
w.rw.RUnlock()
if !ok {
return
}
w.wg.Add(1)
go w.notifyWatcherAdd(uuid, name, trans)
}
func (w *WatcherManager) onClose(name, host string) {
w.rw.RLock()
_, ok := w.watchersMap[name]
w.rw.RUnlock()
if !ok {
return
}
w.wg.Add(1)
go w.notifyWatcherClose(name, host)
}
//notifyWatcher 启动协程触发watcher 收到新proxy到达得通知
func (w *WatcherManager) notifyWatcherAdd(uuid uint64, name string, trans *BoxChannel) {
// 先获取proxy
proxy, err := w.finder(uuid, trans)
if err != nil {
w.logger.Warn("get service %d,%s proxy error!", uuid, name)
w.wg.Done()
return
}
w.rw.RLock()
defer func() {
//防止用户回调函数里面触发异常
if r := recover(); r != nil {
w.logger.Error("service %s watcher panic %v", name, r)
}
w.rw.RUnlock()
w.wg.Done()
}()
// 触发用户回调
wg, ok := w.watchersMap[name]
if !ok {
return
}
for _, cb := range wg.watched {
cb(ServiceAdd, name, trans.RemoteAddr(), proxy)
}
return
}
//notifyWatcherDelete 通知watcher 有proxy失效
func (w *WatcherManager) notifyWatcherClose(name string, host string) {
w.rw.RLock()
defer func() {
//防止用户回调函数里面触发异常
if r := recover(); r != nil {
w.logger.Error("notify service %s watcher delete panic %v", name, r)
}
w.rw.RUnlock()
w.wg.Done()
}()
// 触发用户回调
wg, ok := w.watchersMap[name]
if !ok {
return
}
for _, cb := range wg.watched {
cb(ServiceClose, name, host, nil)
}
return
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。