代码拉取完成,页面将自动刷新
同步操作将从 Plato/Service-Box-go 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
package sbox
import (
"context"
"errors"
"fmt"
"reflect"
"time"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc"
_ "gitee.com/dennis-kk/service-box-go/internal/jsonpb/jsonpbcpp"
"gitee.com/dennis-kk/service-box-go/internal/net"
_ "gitee.com/dennis-kk/service-box-go/internal/zookeeper"
"gitee.com/dennis-kk/service-box-go/util/config"
berrors "gitee.com/dennis-kk/service-box-go/util/errors"
"gitee.com/dennis-kk/service-box-go/util/mongodb"
"gitee.com/dennis-kk/service-box-go/util/redis"
"gitee.com/dennis-kk/service-box-go/util/scli"
"gitee.com/dennis-kk/service-box-go/util/slog"
"gitee.com/dennis-kk/service-box-go/util/slog/zap"
"gitee.com/dennis-kk/service-box-go/util/uuid"
"gitee.com/dennis-kk/service-box-go/util/uuid/custom"
)
type (
SBoxKey struct{}
packCache map[uint64]*idlrpc.PackageInfo
ServiceBox struct {
boxState state // service box state
network *BoxNetwork //network
srvLayer *serviceLayer //service host manager
opts *Options //options
directHandle *BoxChannel //direct connect channel
cmd scli.Cmd //command line parser
rpc idlrpc.IRpc //rpc framework
logger slog.BoxLogger //logger
cfg config.Config //Box's Config Module
serviceCfg *ServiceCfg // service custom config
waitLoadPack packCache // package wait to load
proxy *ProxyHandler //proxy handle
idGen uuid.IUuidGenerator //uuid framework
redisProxy redis.IClient // redis client
mongoClient *mongodb.MongoDB // mongo db client
}
)
func MakeServiceBox() *ServiceBox {
return &ServiceBox{
boxState: stateClosed,
waitLoadPack: make(packCache), //package cache
network: NewBoxNetWork(), //init with config
opts: NewBoxOptions(), //init options
serviceCfg: &ServiceCfg{
CustomName: make(map[uint64]string),
},
rpc: nil,
}
}
func (sb *ServiceBox) Init(opts ...Option) error {
for _, opt := range opts {
opt(sb.opts)
}
// init cmd module
if err := sb.initCmd(); err != nil {
return err
}
// init config
if err := sb.initCfg(); err != nil {
panic(fmt.Sprintf("[ServiceBox] init config with file error %v \n", err))
}
// init logger
if err := sb.initLogger(); err != nil {
panic(fmt.Sprintf("[ServiceBox] init box's logger error %v \n", err))
}
sb.logger.Info(sb.opts.logo)
sb.network.Init()
if err := sb.initRpc(); err != nil {
panic(fmt.Sprintf("[ServiceBox] init rpc framework error %v \n", err))
}
if err := sb.initServiceFinder(); err != nil {
panic(fmt.Sprintf("[ServiceBox] init service infra error %v \n", err))
}
if err := sb.initRedisClient(); err != nil {
panic(fmt.Sprintf("[ServiceBox] init redis client error %v \n", err))
}
if err := sb.initMongoDB(); err != nil {
panic(fmt.Sprintf("[ServiceBox] init mongo client error %v \n", err))
}
if err := sb.initUuidGen(); err != nil {
panic(fmt.Sprintf("[ServiceBox] init uuid error %v \n", err))
}
if err := sb.initProxyHandle(); err != nil {
panic(fmt.Sprintf("[ServiceBox] init proxy handler error %v \n", err))
}
if err := sb.initHttpProxy(); err != nil {
panic(fmt.Sprintf("[ServiceBox] init http proxy handler error %q \n", err.Error()))
}
sb.setState(stateInited)
sb.logger.Info("[Service Box] Box init finished !")
return nil
}
func (sb *ServiceBox) Start() error {
// network start
if err := sb.startNetwork(); err != nil {
sb.logger.Error("[ServiceBox] start network module error %v", err)
return err
}
sb.logger.Info("[ServiceBox] network module has been started !")
if err := sb.srvLayer.start(); err != nil {
sb.logger.Warn("[ServiceBox] start service finder error %v !", err)
return err
}
sb.logger.Info("[ServiceBox] service finder module has been started !")
//rpc framework
if err := sb.rpc.Start(); err != nil {
sb.logger.Warn("[ServiceBox] start rpc framework error %v !", err)
return err
}
sb.logger.Info("[ServiceBox] rpc module has been started !")
if err := sb.startRedisClient(); err != nil {
sb.logger.Warn("[ServiceBox] start redis client error %v !", err)
return err
}
if err := sb.startMongoClient(); err != nil {
sb.logger.Warn("[ServiceBox] start mongo client error %v !", err)
return err
}
//proxy handler
if err := sb.startProxyHandler(); err != nil {
sb.logger.Warn("[ServiceBox] start proxy handler error %v !", err)
return err
}
//http handler
if err := sb.startHttpHandle(); err != nil {
sb.logger.Warn("[ServiceBox] start http proxy error %q", err.Error())
return err
}
//direct connect module
if err := sb.startConnHandle(); err != nil {
sb.logger.Warn("[ServiceBox] start connect handler error %v !", err)
return err
}
//register service to remote
if !sb.opts.proxyMode {
err := sb.loadAllService()
if err != nil {
return err
}
}
//request necessary service from remote
sb.setState(stateRunning)
return nil
}
func (sb *ServiceBox) Tick() {
sb.network.Tick()
sb.srvLayer.tick()
if sb.proxy != nil {
sb.proxy.onTick()
}
if err := sb.rpc.Tick(); err != nil {
sb.logger.Warn("[ServiceBox] rpc throw error %v !", err)
}
}
func (sb *ServiceBox) ShutDown() error {
// stop proxy handle
if sb.proxy != nil {
sb.proxy.shutdownHttpHandle()
}
// stop rpc framework
if err := sb.rpc.ShutDown(); err != nil {
sb.logger.Warn("[ServiceBox] stop rpc framework error ")
}
// stop service layer
sb.srvLayer.stop()
// stop network framework
sb.network.Stop()
sb.logger.Info("========================== Service Box Stopped ==========================")
return nil
}
func (sb *ServiceBox) GetLogger() slog.BoxLogger {
return sb.logger
}
func (sb *ServiceBox) GetProxy(uuid uint64) (idlrpc.IProxy, error) {
name := fmt.Sprintf("%d", uuid)
return sb.GetProxyWithNickName(uuid, name)
}
func (sb *ServiceBox) GetTransport(uuid uint64) (*BoxChannel, error) {
name := fmt.Sprintf("%d", uuid)
trans := sb.srvLayer.tryGetTransport(name)
if trans != nil {
return trans, nil
}
type Result struct {
trans *BoxChannel
err error
}
notify := make(chan *Result, 1)
cb := func(name string, trans *BoxChannel, err error) {
defer func() {
close(notify)
if r := recover(); r != nil {
sb.logger.Warn("[ServiceBox] get proxy %s panic ", uuid)
return
}
}()
result := &Result{
trans: trans,
err: err,
}
notify <- result
}
err := sb.srvLayer.getTransport(name, cb)
if err != nil {
return nil, err
}
timer := time.NewTimer(time.Second * 5)
defer func() {
timer.Stop()
}()
for {
select {
case result := <-notify:
return result.trans, result.err
case <-timer.C:
sb.logger.Warn("[ServiceBox] get proxy %d timeout ", uuid)
return nil, berrors.FindServiceTimeOut
}
}
}
func (sb *ServiceBox) GetProxyWithNickName(uuid uint64, name string) (idlrpc.IProxy, error) {
//直连模式
if sb.directHandle != nil {
return sb.rpc.GetServiceProxy(uuid, sb.directHandle)
}
trans := sb.srvLayer.tryGetTransport(name)
if trans != nil {
return sb.rpc.GetServiceProxy(uuid, trans)
}
type Result struct {
proxy idlrpc.IProxy
err error
}
notify := make(chan *Result, 1)
cb := func(name string, trans *BoxChannel, err error) {
defer func() {
close(notify)
if r := recover(); r != nil {
sb.logger.Warn("[ServiceBox] get proxy %s panic ", uuid)
return
}
}()
result := &Result{
err: err,
}
if err != nil {
notify <- result
return
}
result.proxy, result.err = sb.rpc.GetServiceProxy(uuid, trans)
notify <- result
}
timer := time.NewTimer(time.Second * 5)
defer func() {
timer.Stop()
}()
err := sb.srvLayer.getTransport(name, cb)
if err != nil {
return nil, err
}
for {
select {
case result := <-notify:
return result.proxy, result.err
case <-timer.C:
sb.logger.Warn("[ServiceBox] get proxy %d timeout ", uuid)
return nil, berrors.FindServiceTimeOut
}
}
}
func (sb *ServiceBox) GetProxyFromPeer(ctx context.Context, uuid uint64) (idlrpc.IProxy, error) {
return sb.rpc.GetProxyFromPeer(ctx, uuid)
}
func (sb *ServiceBox) connectToService(name, network, host string) {
if sb == nil || sb.network == nil {
//TODO add log error
return
}
//先尝试从缓存获取
ch := sb.network.TryGetChannel(host)
if ch != nil {
sb.onConnect(host, ch, nil)
} else {
sb.network.ConnectToService(name, network, host)
}
}
func (sb *ServiceBox) AddServicePackage(pack *idlrpc.PackageInfo) error {
if sb == nil {
panic("Service Box not init yet!")
}
if pack == nil {
//TODO add error
return nil
}
if pack.Creator == nil {
//TODO add errors and log
sb.logger.Warn("%d invalid service package ", pack.ServiceUUID)
return nil
}
sb.waitLoadPack[pack.ServiceUUID] = pack
return nil
}
func (sb *ServiceBox) GetConfig() config.Config {
return sb.cfg
}
func (sb *ServiceBox) GetRedisClient() redis.IClient {
return sb.redisProxy
}
func (sb *ServiceBox) GetMongoClient() *mongodb.MongoDB {
return sb.mongoClient
}
func (sb *ServiceBox) GetOptions() string {
return sb.opts.option
}
func (sb *ServiceBox) WatchService(uuid uint64, watcher ServiceWatcher) error {
return sb.srvLayer.addWatcher(uuid, fmt.Sprintf("%d", uuid), watcher)
}
func (sb *ServiceBox) WatchServiceWithNickName(uuid uint64, name string, watcher ServiceWatcher) error {
return sb.srvLayer.addWatcher(uuid, name, watcher)
}
func (sb *ServiceBox) UnWatchService(name string, watcher ServiceWatcher) error {
return sb.srvLayer.removeWatcher(name, watcher)
}
func (sb *ServiceBox) setState(s state) {
sb.boxState = s
}
//startNetwork start box network
func (sb *ServiceBox) startNetwork() error {
// start network
sb.network.Start()
// proxy 模式跳过初始化监听
if sb.opts.proxyMode == false {
err := sb.cfg.Get("listener").Scan(&sb.opts.host)
if err != nil {
return err
}
// 预处理ip 支持配置hostname
err = sb.opts.PreHosts()
if err != nil {
return err
}
if len(sb.opts.host) != 0 {
for _, host := range sb.opts.host {
err := sb.network.ListenAt("tcp", host.InnerHost)
if err != nil {
return err
}
}
}
}
sb.registerNetworkCallback()
return nil
}
func (sb *ServiceBox) startProxyHandler() error {
if !sb.opts.proxyMode {
return nil
}
if err := sb.network.ListenAt("tcp", sb.proxy.cfg.Address); err != nil {
return err
}
sb.logger.Info("[ServiceBox] start proxy handle successful !")
return nil
}
func (sb *ServiceBox) startHttpHandle() error {
if !sb.opts.proxyMode {
return nil
}
return sb.proxy.startHttpHandle()
}
func (sb *ServiceBox) registerNetworkCallback() {
sb.network.RegisterEventHandle(net.EventAccept, sb.onAccept)
sb.network.RegisterEventHandle(net.EventRespConnect, sb.onConnect)
sb.network.RegisterEventHandle(net.EventReceive, sb.onReceive)
sb.network.RegisterEventHandle(net.EventClose, sb.onClosed)
}
// initCmd will parse cmd from args
func (sb *ServiceBox) initCmd() error {
//add flag's
var opts []scli.Option
flags := []*scli.FlagCfg{
{
BaseCfg: &scli.BaseCfg{
Name: "config",
Short: "c",
Usage: "config path, default is default.yaml",
},
Dest: &sb.opts.cfgPath,
},
{
BaseCfg: &scli.BaseCfg{
Name: "proxy",
Short: "p",
Usage: "start service with proxy model",
},
Dest: &sb.opts.proxyMode,
},
{
BaseCfg: &scli.BaseCfg{
Name: "options",
Short: "option",
Usage: "custom option",
},
Dest: &sb.opts.option,
},
{
BaseCfg: &scli.BaseCfg{
Name: "apollo",
Usage: "init config system with apollo config [url appid namespace cluster]",
},
Dest: sb.opts.apolloCfg,
},
}
for _, flag := range flags {
opts = append(opts, scli.WithFlag(flag))
}
sb.cmd = scli.NewCmd(opts...)
return sb.cmd.Parse(sb.opts.args)
}
func (sb *ServiceBox) initCfg() (err error) {
if sb.opts.cfgPath == "" {
sb.opts.cfgPath = "default.yaml"
}
//启动apollo 模式,如果配置apollo 从apollo 启动,不在读取配置
if len(sb.opts.apolloCfg.Host) > 0 && len(sb.opts.apolloCfg.AppId) > 0 {
sb.cfg, err = config.LoadApollo(sb.opts.apolloCfg)
} else {
sb.cfg, err = config.LoadFile(sb.opts.cfgPath)
}
if err != nil {
return
}
customCfg := sb.cfg.Get("services")
if !customCfg.IsNil() {
err = customCfg.Scan(sb.serviceCfg)
if err != nil {
return err
}
}
logoCfg := sb.cfg.Get("logo")
if !logoCfg.IsNil() {
sb.opts.logo = logoCfg.String(boxLogo)
}
return
}
//initLogger will create logger with config
func (sb *ServiceBox) initLogger() error {
var err error
opts := []slog.Option{func(opt *slog.Options) {
err := sb.cfg.Get("logger").Scan(opt)
if err != nil {
panic(err)
}
},
}
boxName := sb.cfg.Get("name")
if !boxName.IsNil() {
opts = append(opts, slog.WithAppName(boxName.String("service-box")))
}
sb.logger, err = zap.NewLogger(opts...)
if err != nil {
return err
}
slog.SetDefaultLog(sb.logger)
return nil
}
func (sb *ServiceBox) initProxyHandle() error {
if !sb.opts.proxyMode {
return nil
}
cfg := &proxyModeCfg{
Binding: true, //设置默认为绑定模式
}
if err := sb.cfg.Get("proxy").Scan(cfg); err != nil {
return err
}
// 配置错误
if len(cfg.Address) <= 0 {
sb.logger.Warn("[ServiceBox] proxy model config error, key host is nil !")
return berrors.ProxyConfigError
}
if err := cfg.preProcess(); err != nil {
return err
}
sb.proxy = newProxyHandler(cfg, sb)
sb.logger.Info("[ServiceBox] init proxy handle successful at %s", cfg.Address)
return nil
}
func (sb *ServiceBox) initHttpProxy() error {
if !sb.opts.proxyMode {
return nil
}
// 检查是否配置了固定的配置字段
cfg := sb.cfg.Get("http_proxy")
if cfg.IsNil() {
sb.logger.Info("skip http_proxy module")
return nil
}
// 初始化http模块
return sb.proxy.initHttpHandle(cfg)
}
//initRedisClient 初始化redis模块的
func (sb *ServiceBox) initRedisClient() error {
// 获取配置中中redis字段
cfg := sb.cfg.Get("redis")
if cfg.IsNil() {
sb.logger.Info("skip redis module")
return nil
}
// 根据 host 数量配置选择集群或者普通客户端口
h := cfg.Get("addr_hosts")
if h.IsNil() {
sb.logger.Error("invalid redis address")
return berrors.RedisConfigError
}
opt := func(o *redis.Options) {
err := cfg.Scan(o)
if err != nil {
panic(err)
}
}
if len(h.StringSlice(nil)) > 1 {
sb.redisProxy = redis.NewClusterClient(opt)
} else {
sb.redisProxy = redis.NewClient(opt)
}
// 初始化redis模块
return sb.redisProxy.Init()
}
func (sb *ServiceBox) initMongoDB() error {
// 未配置mongo则跳过
cfg := sb.cfg.Get("mongo")
if cfg.IsNil() {
sb.logger.Info("skip mongo db module")
return nil
}
// 解析配置,初始化客户端
opt := func(o *mongodb.Options) {
err := cfg.Scan(o)
if err != nil {
panic(err)
}
}
sb.mongoClient = mongodb.NewMongoDB(opt)
return sb.mongoClient.Init()
}
func (sb *ServiceBox) startConnHandle() error {
host := sb.cfg.Get("connect", "host").String("")
if len(host) == 0 {
return nil
}
// 设置了对应的connect, 尝试直连
conn, err := sb.network.ConnectTo("tcp", host)
if err != nil {
sb.logger.Warn("")
return err
}
sb.directHandle, err = sb.network.createBoxChannel(conn.RemoteAddr().String(), conn)
if err != nil {
sb.logger.Warn("create box channel error %v !", err)
return err
}
return nil
}
func (sb *ServiceBox) startRedisClient() error {
if sb.redisProxy == nil {
//skip redis module
return nil
}
return sb.redisProxy.Start()
}
func (sb *ServiceBox) startMongoClient() error {
if sb.mongoClient == nil {
return nil
}
return sb.mongoClient.Start()
}
func (sb *ServiceBox) initUuidGen() error {
opt := func(opts *custom.Options) {
if err := sb.cfg.Get("uuid", "custom").Scan(opts); err != nil {
panic(err)
}
}
sb.idGen = custom.NewCustomUUID(opt)
sb.logger.Info("[ServiceBox] init uuid generator")
return nil
}
func (sb *ServiceBox) initServiceFinder() error {
//load from config
cfg := &serviceLayerConfig{}
err := sb.cfg.Get("service_finder").Scan(cfg)
if err != nil {
return err
}
if len(cfg.MiddlewareType) == 0 {
return errors.New("invalid service finder type")
}
// 对配置进行一些预处理
if cfg.Prefix == "" {
cfg.Prefix = "/"
}
sb.srvLayer = makeServiceLayer(sb.connectToService, sb.logger)
if sb.srvLayer == nil {
return errors.New("[ServiceBox] create service layer failed ")
}
return sb.srvLayer.init(cfg, sb.rpc.GetServiceProxy)
}
func (sb *ServiceBox) initRpc() error {
sb.rpc = idlrpc.CreateRpcFramework()
if sb.rpc == nil {
return errors.New("[ServiceBox] create rpc framework failed")
}
// 获取配置
st := sb.cfg.Get("rpc", "stack_trace").Bool(false)
var opts []idlrpc.Option
opts = append(opts, idlrpc.WithUserData(SBoxKey{}, sb))
//TODO 根据配置开关rpc logger
opts = append(opts, idlrpc.WithLogger(sb.logger), idlrpc.WithStackTrace(st))
err := sb.rpc.Init(opts...)
if err != nil {
return err
}
return nil
}
//=========================== service package tool function ===========================
// LoadServiceByPackInfo load service and register to rpc framework
func (sb *ServiceBox) loadServiceByPackInfo(pack *idlrpc.PackageInfo) error {
if sb == nil || sb.rpc == nil {
panic("service box's rpc framework not init yet !")
}
var sdk idlrpc.ISDK
var err error
if v, ok := sb.serviceCfg.CustomName[pack.ServiceUUID]; ok {
sdk, err = pack.Creator(v)
} else {
sdk, err = pack.Creator()
}
if err != nil {
return err
}
if sdk == nil {
return nil
}
if err = sdk.Register(sb.rpc); err != nil {
sb.logger.Warn("[ServiceBox] register package %s error !", pack.ServiceUUID)
return err
}
host := sb.opts.GetOneHostIp()
typeName := reflect.TypeOf(sdk).Elem().Name()
sb.srvLayer.addLoadedServiceName(sdk.GetUuid(), typeName)
if !sdk.IsProxy() {
uuidStr := fmt.Sprintf("%d", sdk.GetUuid())
if uuidStr != sdk.GetNickName() {
if err := sb.srvLayer.registerService(sdk.GetNickName(), host); err != nil {
return err
}
sb.logger.Info("register service %s:%d to zookeeper with nickname %s ", typeName, sdk.GetUuid(), sdk.GetNickName())
}
sb.logger.Info("register service %s:%d to zookeeper with uuid %s ", typeName, sdk.GetUuid(), uuidStr)
return sb.srvLayer.registerService(uuidStr, host)
} else {
sb.logger.Info("load proxy %s:%d to service box", typeName, sdk.GetUuid())
return nil
}
}
func (sb *ServiceBox) onAccept(name string, trans *BoxChannel, err error) {
if err != nil {
return
}
if sb.proxy != nil {
sb.proxy.onAccept(name, trans)
}
}
//onConnect will add trans to service layer with host name
//If Get encounters any errors, it will return
func (sb *ServiceBox) onConnect(host string, trans *BoxChannel, err error) {
if err != nil || trans == nil {
_ = sb.srvLayer.onConnectFailed(host, err)
slog.Warn("[ServiceBox] connect to %s host error %v ", host, err)
return
}
err = sb.srvLayer.onConnect(host, trans)
if err != nil {
return
}
}
func (sb *ServiceBox) onReceive(tSid string, trans *BoxChannel, err error) {
if err != nil {
sb.logger.Error("[ServiceBox] %s %v while receive message", tSid, err)
return
}
if sb.proxy != nil {
err := sb.rpc.OnProxyMessage(trans, sb.proxy)
if err != nil {
return
}
} else {
err := sb.rpc.OnMessage(trans, context.Background())
if err != nil {
return
}
}
}
func (sb *ServiceBox) onClosed(name string, trans *BoxChannel, _ error) {
//sb.srvLayer.onClose(name, )
sb.srvLayer.onClose(trans.peerHost)
if sb.proxy != nil {
sb.proxy.onClose(name, trans)
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。