代码拉取完成,页面将自动刷新
同步操作将从 Plato/Service-Box-go 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
package sbox
import (
"fmt"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/errors"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/protocol"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/transport"
"gitee.com/dennis-kk/service-box-go/util/config/reader"
perror "gitee.com/dennis-kk/service-box-go/util/errors"
"gitee.com/dennis-kk/service-box-go/util/http_proxy"
"gitee.com/dennis-kk/service-box-go/util/slog"
"gitee.com/dennis-kk/service-box-go/util/tools"
)
const (
addInsideChan = iota
transHttpCallFailed
)
type (
outsideTransMap map[protocol.GlobalIndexType]transport.ITransport // outside connect cache
serviceTransMap map[uint64]transport.ITransport // inside service transport
outsideRefTransMap map[protocol.GlobalIndexType]serviceTransMap // outside find service c ache
insideCallMap map[uint32]*callInfo // inside call cache
//callInfo proxy rpc info
callInfo struct {
replaceCallId uint32
originalID uint32
inside transport.ITransport
}
proxyTask struct {
taskType int
globalIndex protocol.GlobalIndexType
serviceUuid uint64
callId uint32
inside *BoxChannel
}
proxyModeCfg struct {
Address string `yaml:"host"`
ConnectSize uint32 `yaml:"cache_size"`
Binding bool `yaml:"binding"` //是否启用绑定模式
}
//httpProxy http proxy 扩展,增加uuid
httpProxy struct {
*http_proxy.HttpProxy //http proxy
globalIndex protocol.GlobalIndexType //GlobalIndex 代表外部链接
}
ProxyHandler struct {
gTransMap outsideTransMap //外部连接索引
gSrvTransCache outsideRefTransMap //外部连接绑定服务缓存,用于绑定模式
gCallInfo insideCallMap //调用列表
cfg *proxyModeCfg //配置文件
owner *ServiceBox //box指针
replaceId uint32 //call id 用来替换内部调用外部请求使用的id
httpHandle *httpProxy //http协议网关handle
ch chan *proxyTask
}
)
func newProxyHandler(cfg *proxyModeCfg, box *ServiceBox) *ProxyHandler {
return &ProxyHandler{
gTransMap: make(outsideTransMap),
gSrvTransCache: make(outsideRefTransMap),
gCallInfo: make(insideCallMap),
cfg: cfg,
owner: box,
replaceId: 1,
ch: make(chan *proxyTask, 64),
}
}
func (p *proxyModeCfg) preProcess() error {
if addr, err := tools.ParseIp(p.Address); err != nil {
return err
} else {
p.Address = addr
}
return nil
}
func (p *ProxyHandler) NewReplaceID() uint32 {
p.replaceId = p.replaceId + 1
return p.replaceId
}
func (p *ProxyHandler) initHttpHandle(cfg reader.Value) error {
p.httpHandle = &httpProxy{
HttpProxy: http_proxy.MakeHttpProxy(
func(options *http_proxy.Options) {
cfg.Scan(options)
},
http_proxy.WithCustomLogger(p.owner.logger)),
globalIndex: protocol.GlobalIndexType(p.owner.idGen.New32UUID()),
}
if p.httpHandle.HttpProxy == nil {
return fmt.Errorf("init http proxy module error")
}
p.httpHandle.SetHttpRequestHandle(p.onHttpCall)
return nil
}
func (p *ProxyHandler) startHttpHandle() error {
if p.httpHandle == nil {
// not init http module
return nil
}
return p.httpHandle.Start()
}
func (p *ProxyHandler) shutdownHttpHandle() {
if p.httpHandle == nil {
// not init http module
return
}
p.httpHandle.ShutDown()
}
func (p *ProxyHandler) onAccept(name string, trans *BoxChannel) {
uuid := protocol.GlobalIndexType(p.owner.idGen.New32UUID())
trans.SetGlobalIndex(uuid)
p.gTransMap[uuid] = trans
p.owner.logger.Info("[Proxy] %s outside trans id: %v connect to server", name, uuid)
}
func (p *ProxyHandler) onClose(name string, trans *BoxChannel) {
if trans.GlobalIndex() != idlrpc.InvalidGlobalIndex {
delete(p.gSrvTransCache, trans.GlobalIndex())
}
}
func (p *ProxyHandler) OnRelay(from transport.ITransport, header *protocol.RpcMsgHeader) error {
var err error
switch header.Type {
case protocol.RequestMsg:
err = p.onRpcCall(from)
case protocol.ResponseMsg:
err = p.onRpcReturn(from)
case protocol.ProxyRequestMsg:
err = p.onProxyCall(from)
case protocol.ProxyResponseMsg:
err = p.onProxyReturn(from)
default:
p.owner.logger.Warn("unsupported plato rpc message type from %d:%s ", from.GlobalIndex(), from.RemoteAddr())
return errors.ErrInvalidProto
}
if err != nil {
slog.Warn("[Proxy] conn %s type %d trans error %v !", from.RemoteAddr(), header.Type, err)
}
return err
}
func (p *ProxyHandler) onTick() {
// tick http 请求
if p.httpHandle != nil {
p.httpHandle.Tick()
}
// tick 回包请求
for {
select {
case t := <-p.ch:
p.onTask(t)
default:
return
}
}
}
func (p *ProxyHandler) onTask(t *proxyTask) {
switch t.taskType {
case addInsideChan:
cache, ok := p.gSrvTransCache[t.globalIndex]
if !ok {
cache = make(serviceTransMap)
}
cache[t.serviceUuid] = t.inside
p.gSrvTransCache[t.globalIndex] = cache
case transHttpCallFailed:
// 处理http call 失败
if p.httpHandle == nil {
p.owner.logger.Warn("no effective http proxy, but received transHttpCallFailed %d:%d ", t.serviceUuid, t.serviceUuid)
return
}
// 构造ret
ret := &protocol.ProxyRespPackage{
Header: &protocol.RpcProxyCallRetHeader{
RpcMsgHeader: protocol.RpcMsgHeader{
Length: uint32(protocol.ProxyRetHeadSize),
Type: protocol.ProxyResponseMsg,
},
CallID: t.callId,
ErrorCode: protocol.IDL_SERVICE_NOT_FOUND,
GlobalIndex: t.globalIndex,
},
}
_ = p.httpHandle.OnRpcResponse(ret)
}
}
func (p *ProxyHandler) onRpcCall(trans transport.ITransport) error {
//read trans header
pkg := make([]byte, protocol.CallHeadSize)
if mLen, err := trans.Read(pkg[:], protocol.CallHeadSize); mLen != protocol.CallHeadSize || err != nil {
return errors.ErrIllegalProto
}
// read protocol header
msgHeader := protocol.ReadCallHeader(pkg)
if msgHeader == nil {
return errors.ErrIllegalReq
}
mLen := int(msgHeader.Length) - protocol.CallHeadSize
reqMsg := &protocol.RequestPackage{
Header: msgHeader,
Buffer: make([]byte, mLen),
}
rLen, err := trans.Read(reqMsg.Buffer[:], mLen)
if err != nil || rLen != mLen {
return errors.ErrIllegalProto
}
return p.outsideCallInside(trans, reqMsg.Header, reqMsg.Buffer)
}
func (p *ProxyHandler) onRpcReturn(from transport.ITransport) error {
//read trans header
pkg := make([]byte, protocol.RespHeadSize)
if mLen, err := from.Read(pkg[:], protocol.RespHeadSize); mLen != protocol.RespHeadSize || err != nil {
return errors.ErrIllegalProto
}
// read protocol header
msgHeader := protocol.ReadRetHeader(pkg)
if msgHeader == nil {
return errors.ErrIllegalReq
}
mLen := int(msgHeader.Length) - protocol.RespHeadSize
retMsg := &protocol.ResponsePackage{
Header: msgHeader,
Buffer: make([]byte, mLen),
}
rLen, err := from.Read(retMsg.Buffer[:], mLen)
if err != nil || rLen != mLen {
return errors.ErrIllegalProto
}
return p.outsideReturnInside(from, retMsg.Header, retMsg.Buffer)
}
func (p *ProxyHandler) onProxyCall(from transport.ITransport) error {
//
pkg := make([]byte, protocol.ProxyCallHeadSize)
if mLen, err := from.Read(pkg[:], protocol.ProxyCallHeadSize); mLen != protocol.ProxyCallHeadSize || err != nil {
return errors.ErrIllegalProto
}
// read protocol header
msgHeader := protocol.ReadProxyCallHeader(pkg)
if msgHeader == nil {
return errors.ErrIllegalReq
}
mLen := int(msgHeader.Length) - protocol.ProxyCallHeadSize
data := make([]byte, mLen)
rLen, err := from.Read(data[:], mLen)
if err != nil || rLen != mLen {
return errors.ErrIllegalProto
}
// 拦截http 内部客户端调用http 请求
if p.httpHandle != nil && p.httpHandle.globalIndex == msgHeader.GlobalIndex {
p.outsideReturnError(from, msgHeader, protocol.IDL_SERVICE_ERROR)
p.owner.logger.Error("inside service can't call http client call id %d", msgHeader.CallID)
return errors.ErrIllegalReq
} else {
return p.insideCallOutSide(from, msgHeader, data)
}
}
func (p *ProxyHandler) onProxyReturn(from transport.ITransport) error {
//read trans header
pkg := make([]byte, protocol.ProxyRetHeadSize)
if mLen, err := from.Read(pkg[:], protocol.ProxyRetHeadSize); mLen != protocol.ProxyRetHeadSize || err != nil {
return errors.ErrIllegalProto
}
// read protocol header
msgHeader := protocol.ReadProxyRetHeader(pkg)
if msgHeader == nil {
return errors.ErrIllegalReq
}
mLen := int(msgHeader.Length) - protocol.ProxyRetHeadSize
retMsg := &protocol.ProxyRespPackage{
Header: msgHeader,
Buffer: make([]byte, mLen),
}
rLen, err := from.Read(retMsg.Buffer[:], mLen)
if err != nil || rLen != mLen {
return errors.ErrIllegalProto
}
if p.httpHandle != nil && p.httpHandle.globalIndex == retMsg.Header.GlobalIndex {
//http 调用,转到对应接口
return p.httpHandle.OnRpcResponse(retMsg)
} else {
//rpc 调用,转到对应接口
return p.insideReturnOutside(from, retMsg.Header, retMsg.Buffer)
}
}
// http 处理函数
func (p *ProxyHandler) onHttpCall(req *protocol.ProxyRequestPackage) (uint32, error) {
// 产生callId
callId := p.NewReplaceID()
// 赋值global index
req.Header.GlobalIndex = p.httpHandle.globalIndex
req.Header.CallID = callId
// 开启协程 进行服务发现
go func() {
to, err := p.owner.GetTransport(req.Header.ServiceUUID)
// 没有找到对应服务,查找失败
if err != nil || to == nil {
task := &proxyTask{
taskType: transHttpCallFailed,
serviceUuid: req.Header.ServiceUUID,
callId: req.Header.CallID,
}
p.ch <- task
slog.Warn("[Http] http call %d service %d failed ! ", req.Header.CallID, req.Header.ServiceUUID)
return
}
// 打包协议并发送
buffer, _ := protocol.PackProxyReqMsg(req)
to.Send(buffer)
}()
return callId, nil
}
//outsideCallInside 外部调用内部服务, 绑定模式——只有第一次连接会进行服务发现,之后的连接不会在进行服务发现; 非绑定模式,每次都会进行服务发现
func (p *ProxyHandler) outsideCallInside(from transport.ITransport, header *protocol.RpcCallHeader, data []byte) error {
var to transport.ITransport
//如果是绑定模式,尝试从缓存获取, 否则每次都走重新发现逻辑
if p.cfg.Binding {
to = p.tryGetService(from.GlobalIndex(), header.ServiceUUID)
}
if to != nil {
err := p.transProxyCall(from, to, header, data)
if err != nil {
//极限情况下,网关刚获取到proxy,对端关闭
resp := protocol.BuildNotFound(header)
pkg, _ := protocol.PackRespMsg(resp)
from.Send(pkg)
slog.Warn("[Proxy] outside conn %v : trans call %d to service %d error !", from.GlobalIndex(), header.CallID, header.ServiceUUID)
return err
}
} else {
go func() {
to, err := p.owner.GetTransport(header.ServiceUUID)
if err != nil {
resp := protocol.BuildNotFound(header)
pkg, _ := protocol.PackRespMsg(resp)
from.Send(pkg)
slog.Warn("[Proxy] outside conn %v proxy find service %d error ! ", from.GlobalIndex(), header.ServiceUUID)
return
}
err = p.transProxyCall(from, to, header, data)
if p.cfg.Binding {
//绑定模式下发送主协程进行缓存
task := &proxyTask{
taskType: addInsideChan,
globalIndex: from.GlobalIndex(),
serviceUuid: header.ServiceUUID,
inside: to,
}
p.ch <- task
}
if err != nil {
slog.Warn("[Proxy] outside conn %v send call %d to service %d error %v !", from.GlobalIndex(), header.CallID, header.ServiceUUID, err)
return
}
}()
}
return nil
}
func (p *ProxyHandler) insideReturnOutside(from transport.ITransport, header *protocol.RpcProxyCallRetHeader, data []byte) error {
// try get outside connection
to, ok := p.gTransMap[header.GlobalIndex]
if !ok {
p.owner.logger.Warn("[Proxy] can't found outside connection by %v ", header.GlobalIndex)
return perror.ChannelNotFound
}
if to.IsClose() {
// remote to handle
delete(p.gTransMap, header.GlobalIndex)
p.owner.logger.Warn("target channel %d:%s has benn closed while trans %d !", to.GlobalIndex(), to.RemoteAddr(), header.CallID)
return perror.ChannelHasClosed
}
retMsg := &protocol.ResponsePackage{
Header: &protocol.RpcCallRetHeader{
RpcMsgHeader: protocol.RpcMsgHeader{
Length: header.Length - uint32(protocol.ProxyRetHeadSize) + uint32(protocol.RespHeadSize),
Type: protocol.ResponseMsg,
},
ServerID: header.ServerID,
CallID: header.CallID,
ErrorCode: header.ErrorCode,
},
Buffer: data,
}
pkg, _ := protocol.PackRespMsg(retMsg)
//p.owner.logger.Info("trans call %d to %d:%s !", header.CallID, to.GlobalIndex(), to.RemoteAddr())
return to.Send(pkg)
}
func (p *ProxyHandler) insideCallOutSide(from transport.ITransport, header *protocol.RpcProxyCallHeader, data []byte) error {
to, ok := p.gTransMap[header.GlobalIndex]
if !ok {
p.outsideReturnError(from, header, protocol.IDL_SERVICE_NOT_FOUND)
p.owner.logger.Error(" service conn %d:%q call outside %d service %d method %d, but it has been closed ! ", from.GlobalIndex(), from.RemoteAddr(), header.GlobalIndex, header.ServiceUUID, header.MethodID)
return perror.ChannelNotFound
}
if to.IsClose() {
delete(p.gTransMap, header.GlobalIndex)
p.outsideReturnError(from, header, protocol.IDL_SERVICE_NOT_FOUND)
p.owner.logger.Error(" service conn %d:%q call outside %d service %d method %d, but it has been closed ! ", from.GlobalIndex(), from.RemoteAddr(), header.GlobalIndex, header.ServiceUUID, header.MethodID)
return perror.ChannelHasClosed
}
info := &callInfo{
replaceCallId: p.NewReplaceID(),
originalID: header.CallID,
inside: from,
}
msg := &protocol.RequestPackage{
Header: &protocol.RpcCallHeader{
RpcMsgHeader: protocol.RpcMsgHeader{
Length: header.Length - uint32(protocol.ProxyCallHeadSize) + uint32(protocol.CallHeadSize),
Type: protocol.RequestMsg,
},
ServiceUUID: header.ServiceUUID,
ServerID: header.ServerID,
CallID: info.replaceCallId,
MethodID: header.MethodID,
},
Buffer: data,
}
if header.OneWay == 0 {
// record call info
p.gCallInfo[info.replaceCallId] = info
// TODO add timer ticker
}
// trans packs to outside
pack, _ := protocol.PackReqMsg(msg)
return to.Send(pack)
}
func (p *ProxyHandler) outsideReturnInside(from transport.ITransport, header *protocol.RpcCallRetHeader, data []byte) error {
info, ok := p.gCallInfo[header.CallID]
if !ok {
// call info has time out
p.owner.logger.Info("[Proxy] proxy call %d return inside has been time out !", header.CallID)
return nil
}
if info.inside.IsClose() {
return nil
}
msg := &protocol.ProxyRespPackage{
Header: &protocol.RpcProxyCallRetHeader{
RpcMsgHeader: protocol.RpcMsgHeader{
Length: header.Length - uint32(protocol.RespHeadSize) + uint32(protocol.ProxyRetHeadSize),
Type: protocol.ProxyResponseMsg,
},
ServerID: header.ServerID,
CallID: info.originalID,
ErrorCode: header.ErrorCode,
GlobalIndex: from.GlobalIndex(),
},
Buffer: data,
}
pkg, _ := protocol.PackProxyRespMsg(msg)
delete(p.gCallInfo, header.CallID)
return info.inside.Send(pkg)
}
func (p *ProxyHandler) tryGetService(globalIndex protocol.GlobalIndexType, uuid uint64) transport.ITransport {
uuidTransMap, ok := p.gSrvTransCache[globalIndex]
if !ok {
return nil
}
channel, ok := uuidTransMap[uuid]
if !ok {
return nil
}
if channel.IsClose() {
delete(uuidTransMap, uuid)
return nil
}
return channel
}
func (p *ProxyHandler) transProxyCall(from, to transport.ITransport, header *protocol.RpcCallHeader, data []byte) error {
pheader := protocol.BuildProxyCallHeader(header, from.GlobalIndex())
reqMsg := &protocol.ProxyRequestPackage{
Header: pheader,
Buffer: data,
}
pkg, _ := protocol.PackProxyReqMsg(reqMsg)
return to.Send(pkg)
}
func (p *ProxyHandler) outsideReturnError(from transport.ITransport, header *protocol.RpcProxyCallHeader, errorCode uint32) {
msg := &protocol.ProxyRespPackage{
Header: &protocol.RpcProxyCallRetHeader{
RpcMsgHeader: protocol.RpcMsgHeader{
Length: header.Length - uint32(protocol.ProxyCallHeadSize) + uint32(protocol.ProxyRetHeadSize),
Type: protocol.ProxyResponseMsg,
},
ServerID: header.ServerID,
CallID: header.CallID,
ErrorCode: errorCode,
GlobalIndex: from.GlobalIndex(),
},
}
pkg, _ := protocol.PackProxyRespMsg(msg)
err := from.Send(pkg)
if err != nil {
slog.Warn("send return info to %s error %v", from.RemoteAddr(), err)
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。