1 Star 0 Fork 4

jiaoyuedave/Service-Box-go

forked from Plato/Service-Box-go 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
proxy_handle.go 16.29 KB
一键复制 编辑 原始数据 按行查看 历史
CloudGuan 提交于 2022-12-14 21:07 . bug: 合并错误,代码编译报错
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
package sbox
import (
"fmt"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/errors"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/protocol"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/transport"
"gitee.com/dennis-kk/service-box-go/util/config/reader"
perror "gitee.com/dennis-kk/service-box-go/util/errors"
"gitee.com/dennis-kk/service-box-go/util/http_proxy"
"gitee.com/dennis-kk/service-box-go/util/slog"
"gitee.com/dennis-kk/service-box-go/util/tools"
)
const (
addInsideChan = iota
transHttpCallFailed
)
type (
outsideTransMap map[protocol.GlobalIndexType]transport.ITransport // outside connect cache
serviceTransMap map[uint64]transport.ITransport // inside service transport
outsideRefTransMap map[protocol.GlobalIndexType]serviceTransMap // outside find service c ache
insideCallMap map[uint32]*callInfo // inside call cache
//callInfo proxy rpc info
callInfo struct {
replaceCallId uint32
originalID uint32
inside transport.ITransport
}
proxyTask struct {
taskType int
globalIndex protocol.GlobalIndexType
serviceUuid uint64
callId uint32
inside *BoxChannel
}
proxyModeCfg struct {
Address string `yaml:"host"`
ConnectSize uint32 `yaml:"cache_size"`
Binding bool `yaml:"binding"` //是否启用绑定模式
}
//httpProxy http proxy 扩展,增加uuid
httpProxy struct {
*http_proxy.HttpProxy //http proxy
globalIndex protocol.GlobalIndexType //GlobalIndex 代表外部链接
}
ProxyHandler struct {
gTransMap outsideTransMap //外部连接索引
gSrvTransCache outsideRefTransMap //外部连接绑定服务缓存,用于绑定模式
gCallInfo insideCallMap //调用列表
cfg *proxyModeCfg //配置文件
owner *ServiceBox //box指针
replaceId uint32 //call id 用来替换内部调用外部请求使用的id
httpHandle *httpProxy //http协议网关handle
ch chan *proxyTask
}
)
func newProxyHandler(cfg *proxyModeCfg, box *ServiceBox) *ProxyHandler {
return &ProxyHandler{
gTransMap: make(outsideTransMap),
gSrvTransCache: make(outsideRefTransMap),
gCallInfo: make(insideCallMap),
cfg: cfg,
owner: box,
replaceId: 1,
ch: make(chan *proxyTask, 64),
}
}
func (p *proxyModeCfg) preProcess() error {
if addr, err := tools.ParseIp(p.Address); err != nil {
return err
} else {
p.Address = addr
}
return nil
}
func (p *ProxyHandler) NewReplaceID() uint32 {
p.replaceId = p.replaceId + 1
return p.replaceId
}
func (p *ProxyHandler) initHttpHandle(cfg reader.Value) error {
p.httpHandle = &httpProxy{
HttpProxy: http_proxy.MakeHttpProxy(
func(options *http_proxy.Options) {
cfg.Scan(options)
},
http_proxy.WithCustomLogger(p.owner.logger)),
globalIndex: protocol.GlobalIndexType(p.owner.idGen.New32UUID()),
}
if p.httpHandle.HttpProxy == nil {
return fmt.Errorf("init http proxy module error")
}
p.httpHandle.SetHttpRequestHandle(p.onHttpCall)
return nil
}
func (p *ProxyHandler) startHttpHandle() error {
if p.httpHandle == nil {
// not init http module
return nil
}
return p.httpHandle.Start()
}
func (p *ProxyHandler) shutdownHttpHandle() {
if p.httpHandle == nil {
// not init http module
return
}
p.httpHandle.ShutDown()
}
func (p *ProxyHandler) onAccept(name string, trans *BoxChannel) {
uuid := protocol.GlobalIndexType(p.owner.idGen.New32UUID())
trans.SetGlobalIndex(uuid)
p.gTransMap[uuid] = trans
p.owner.logger.Info("[Proxy] %s outside trans id: %v connect to server", name, uuid)
}
func (p *ProxyHandler) onClose(name string, trans *BoxChannel) {
if trans.GlobalIndex() != idlrpc.InvalidGlobalIndex {
delete(p.gSrvTransCache, trans.GlobalIndex())
}
}
func (p *ProxyHandler) OnRelay(from transport.ITransport, header *protocol.RpcMsgHeader) error {
var err error
switch header.Type {
case protocol.RequestMsg:
err = p.onRpcCall(from)
case protocol.ResponseMsg:
err = p.onRpcReturn(from)
case protocol.ProxyRequestMsg:
err = p.onProxyCall(from)
case protocol.ProxyResponseMsg:
err = p.onProxyReturn(from)
default:
p.owner.logger.Warn("unsupported plato rpc message type from %d:%s ", from.GlobalIndex(), from.RemoteAddr())
return errors.ErrInvalidProto
}
if err != nil {
slog.Warn("[Proxy] conn %s type %d trans error %v !", from.RemoteAddr(), header.Type, err)
}
return err
}
func (p *ProxyHandler) onTick() {
// tick http 请求
if p.httpHandle != nil {
p.httpHandle.Tick()
}
// tick 回包请求
for {
select {
case t := <-p.ch:
p.onTask(t)
default:
return
}
}
}
func (p *ProxyHandler) onTask(t *proxyTask) {
switch t.taskType {
case addInsideChan:
cache, ok := p.gSrvTransCache[t.globalIndex]
if !ok {
cache = make(serviceTransMap)
}
cache[t.serviceUuid] = t.inside
p.gSrvTransCache[t.globalIndex] = cache
case transHttpCallFailed:
// 处理http call 失败
if p.httpHandle == nil {
p.owner.logger.Warn("no effective http proxy, but received transHttpCallFailed %d:%d ", t.serviceUuid, t.serviceUuid)
return
}
// 构造ret
ret := &protocol.ProxyRespPackage{
Header: &protocol.RpcProxyCallRetHeader{
RpcMsgHeader: protocol.RpcMsgHeader{
Length: uint32(protocol.ProxyRetHeadSize),
Type: protocol.ProxyResponseMsg,
},
CallID: t.callId,
ErrorCode: protocol.IDL_SERVICE_NOT_FOUND,
GlobalIndex: t.globalIndex,
},
}
_ = p.httpHandle.OnRpcResponse(ret)
}
}
func (p *ProxyHandler) onRpcCall(trans transport.ITransport) error {
//read trans header
pkg := make([]byte, protocol.CallHeadSize)
if mLen, err := trans.Read(pkg[:], protocol.CallHeadSize); mLen != protocol.CallHeadSize || err != nil {
return errors.ErrIllegalProto
}
// read protocol header
msgHeader := protocol.ReadCallHeader(pkg)
if msgHeader == nil {
return errors.ErrIllegalReq
}
mLen := int(msgHeader.Length) - protocol.CallHeadSize
reqMsg := &protocol.RequestPackage{
Header: msgHeader,
Buffer: make([]byte, mLen),
}
rLen, err := trans.Read(reqMsg.Buffer[:], mLen)
if err != nil || rLen != mLen {
return errors.ErrIllegalProto
}
return p.outsideCallInside(trans, reqMsg.Header, reqMsg.Buffer)
}
func (p *ProxyHandler) onRpcReturn(from transport.ITransport) error {
//read trans header
pkg := make([]byte, protocol.RespHeadSize)
if mLen, err := from.Read(pkg[:], protocol.RespHeadSize); mLen != protocol.RespHeadSize || err != nil {
return errors.ErrIllegalProto
}
// read protocol header
msgHeader := protocol.ReadRetHeader(pkg)
if msgHeader == nil {
return errors.ErrIllegalReq
}
mLen := int(msgHeader.Length) - protocol.RespHeadSize
retMsg := &protocol.ResponsePackage{
Header: msgHeader,
Buffer: make([]byte, mLen),
}
rLen, err := from.Read(retMsg.Buffer[:], mLen)
if err != nil || rLen != mLen {
return errors.ErrIllegalProto
}
return p.outsideReturnInside(from, retMsg.Header, retMsg.Buffer)
}
func (p *ProxyHandler) onProxyCall(from transport.ITransport) error {
//
pkg := make([]byte, protocol.ProxyCallHeadSize)
if mLen, err := from.Read(pkg[:], protocol.ProxyCallHeadSize); mLen != protocol.ProxyCallHeadSize || err != nil {
return errors.ErrIllegalProto
}
// read protocol header
msgHeader := protocol.ReadProxyCallHeader(pkg)
if msgHeader == nil {
return errors.ErrIllegalReq
}
mLen := int(msgHeader.Length) - protocol.ProxyCallHeadSize
data := make([]byte, mLen)
rLen, err := from.Read(data[:], mLen)
if err != nil || rLen != mLen {
return errors.ErrIllegalProto
}
// 拦截http 内部客户端调用http 请求
if p.httpHandle != nil && p.httpHandle.globalIndex == msgHeader.GlobalIndex {
p.outsideReturnError(from, msgHeader, protocol.IDL_SERVICE_ERROR)
p.owner.logger.Error("inside service can't call http client call id %d", msgHeader.CallID)
return errors.ErrIllegalReq
} else {
return p.insideCallOutSide(from, msgHeader, data)
}
}
func (p *ProxyHandler) onProxyReturn(from transport.ITransport) error {
//read trans header
pkg := make([]byte, protocol.ProxyRetHeadSize)
if mLen, err := from.Read(pkg[:], protocol.ProxyRetHeadSize); mLen != protocol.ProxyRetHeadSize || err != nil {
return errors.ErrIllegalProto
}
// read protocol header
msgHeader := protocol.ReadProxyRetHeader(pkg)
if msgHeader == nil {
return errors.ErrIllegalReq
}
mLen := int(msgHeader.Length) - protocol.ProxyRetHeadSize
retMsg := &protocol.ProxyRespPackage{
Header: msgHeader,
Buffer: make([]byte, mLen),
}
rLen, err := from.Read(retMsg.Buffer[:], mLen)
if err != nil || rLen != mLen {
return errors.ErrIllegalProto
}
if p.httpHandle != nil && p.httpHandle.globalIndex == retMsg.Header.GlobalIndex {
//http 调用,转到对应接口
return p.httpHandle.OnRpcResponse(retMsg)
} else {
//rpc 调用,转到对应接口
return p.insideReturnOutside(from, retMsg.Header, retMsg.Buffer)
}
}
// http 处理函数
func (p *ProxyHandler) onHttpCall(req *protocol.ProxyRequestPackage) (uint32, error) {
// 产生callId
callId := p.NewReplaceID()
// 赋值global index
req.Header.GlobalIndex = p.httpHandle.globalIndex
req.Header.CallID = callId
// 开启协程 进行服务发现
go func() {
to, err := p.owner.GetTransport(req.Header.ServiceUUID)
// 没有找到对应服务,查找失败
if err != nil || to == nil {
task := &proxyTask{
taskType: transHttpCallFailed,
serviceUuid: req.Header.ServiceUUID,
callId: req.Header.CallID,
}
p.ch <- task
slog.Warn("[Http] http call %d service %d failed ! ", req.Header.CallID, req.Header.ServiceUUID)
return
}
// 打包协议并发送
buffer, _ := protocol.PackProxyReqMsg(req)
to.Send(buffer)
}()
return callId, nil
}
//outsideCallInside 外部调用内部服务, 绑定模式——只有第一次连接会进行服务发现,之后的连接不会在进行服务发现; 非绑定模式,每次都会进行服务发现
func (p *ProxyHandler) outsideCallInside(from transport.ITransport, header *protocol.RpcCallHeader, data []byte) error {
var to transport.ITransport
//如果是绑定模式,尝试从缓存获取, 否则每次都走重新发现逻辑
if p.cfg.Binding {
to = p.tryGetService(from.GlobalIndex(), header.ServiceUUID)
}
if to != nil {
err := p.transProxyCall(from, to, header, data)
if err != nil {
//极限情况下,网关刚获取到proxy,对端关闭
resp := protocol.BuildNotFound(header)
pkg, _ := protocol.PackRespMsg(resp)
from.Send(pkg)
slog.Warn("[Proxy] outside conn %v : trans call %d to service %d error !", from.GlobalIndex(), header.CallID, header.ServiceUUID)
return err
}
} else {
go func() {
to, err := p.owner.GetTransport(header.ServiceUUID)
if err != nil {
resp := protocol.BuildNotFound(header)
pkg, _ := protocol.PackRespMsg(resp)
from.Send(pkg)
slog.Warn("[Proxy] outside conn %v proxy find service %d error ! ", from.GlobalIndex(), header.ServiceUUID)
return
}
err = p.transProxyCall(from, to, header, data)
if p.cfg.Binding {
//绑定模式下发送主协程进行缓存
task := &proxyTask{
taskType: addInsideChan,
globalIndex: from.GlobalIndex(),
serviceUuid: header.ServiceUUID,
inside: to,
}
p.ch <- task
}
if err != nil {
slog.Warn("[Proxy] outside conn %v send call %d to service %d error %v !", from.GlobalIndex(), header.CallID, header.ServiceUUID, err)
return
}
}()
}
return nil
}
func (p *ProxyHandler) insideReturnOutside(from transport.ITransport, header *protocol.RpcProxyCallRetHeader, data []byte) error {
// try get outside connection
to, ok := p.gTransMap[header.GlobalIndex]
if !ok {
p.owner.logger.Warn("[Proxy] can't found outside connection by %v ", header.GlobalIndex)
return perror.ChannelNotFound
}
if to.IsClose() {
// remote to handle
delete(p.gTransMap, header.GlobalIndex)
p.owner.logger.Warn("target channel %d:%s has benn closed while trans %d !", to.GlobalIndex(), to.RemoteAddr(), header.CallID)
return perror.ChannelHasClosed
}
retMsg := &protocol.ResponsePackage{
Header: &protocol.RpcCallRetHeader{
RpcMsgHeader: protocol.RpcMsgHeader{
Length: header.Length - uint32(protocol.ProxyRetHeadSize) + uint32(protocol.RespHeadSize),
Type: protocol.ResponseMsg,
},
ServerID: header.ServerID,
CallID: header.CallID,
ErrorCode: header.ErrorCode,
},
Buffer: data,
}
pkg, _ := protocol.PackRespMsg(retMsg)
//p.owner.logger.Info("trans call %d to %d:%s !", header.CallID, to.GlobalIndex(), to.RemoteAddr())
return to.Send(pkg)
}
func (p *ProxyHandler) insideCallOutSide(from transport.ITransport, header *protocol.RpcProxyCallHeader, data []byte) error {
to, ok := p.gTransMap[header.GlobalIndex]
if !ok {
p.outsideReturnError(from, header, protocol.IDL_SERVICE_NOT_FOUND)
p.owner.logger.Error(" service conn %d:%q call outside %d service %d method %d, but it has been closed ! ", from.GlobalIndex(), from.RemoteAddr(), header.GlobalIndex, header.ServiceUUID, header.MethodID)
return perror.ChannelNotFound
}
if to.IsClose() {
delete(p.gTransMap, header.GlobalIndex)
p.outsideReturnError(from, header, protocol.IDL_SERVICE_NOT_FOUND)
p.owner.logger.Error(" service conn %d:%q call outside %d service %d method %d, but it has been closed ! ", from.GlobalIndex(), from.RemoteAddr(), header.GlobalIndex, header.ServiceUUID, header.MethodID)
return perror.ChannelHasClosed
}
info := &callInfo{
replaceCallId: p.NewReplaceID(),
originalID: header.CallID,
inside: from,
}
msg := &protocol.RequestPackage{
Header: &protocol.RpcCallHeader{
RpcMsgHeader: protocol.RpcMsgHeader{
Length: header.Length - uint32(protocol.ProxyCallHeadSize) + uint32(protocol.CallHeadSize),
Type: protocol.RequestMsg,
},
ServiceUUID: header.ServiceUUID,
ServerID: header.ServerID,
CallID: info.replaceCallId,
MethodID: header.MethodID,
},
Buffer: data,
}
if header.OneWay == 0 {
// record call info
p.gCallInfo[info.replaceCallId] = info
// TODO add timer ticker
}
// trans packs to outside
pack, _ := protocol.PackReqMsg(msg)
return to.Send(pack)
}
func (p *ProxyHandler) outsideReturnInside(from transport.ITransport, header *protocol.RpcCallRetHeader, data []byte) error {
info, ok := p.gCallInfo[header.CallID]
if !ok {
// call info has time out
p.owner.logger.Info("[Proxy] proxy call %d return inside has been time out !", header.CallID)
return nil
}
if info.inside.IsClose() {
return nil
}
msg := &protocol.ProxyRespPackage{
Header: &protocol.RpcProxyCallRetHeader{
RpcMsgHeader: protocol.RpcMsgHeader{
Length: header.Length - uint32(protocol.RespHeadSize) + uint32(protocol.ProxyRetHeadSize),
Type: protocol.ProxyResponseMsg,
},
ServerID: header.ServerID,
CallID: info.originalID,
ErrorCode: header.ErrorCode,
GlobalIndex: from.GlobalIndex(),
},
Buffer: data,
}
pkg, _ := protocol.PackProxyRespMsg(msg)
delete(p.gCallInfo, header.CallID)
return info.inside.Send(pkg)
}
func (p *ProxyHandler) tryGetService(globalIndex protocol.GlobalIndexType, uuid uint64) transport.ITransport {
uuidTransMap, ok := p.gSrvTransCache[globalIndex]
if !ok {
return nil
}
channel, ok := uuidTransMap[uuid]
if !ok {
return nil
}
if channel.IsClose() {
delete(uuidTransMap, uuid)
return nil
}
return channel
}
func (p *ProxyHandler) transProxyCall(from, to transport.ITransport, header *protocol.RpcCallHeader, data []byte) error {
pheader := protocol.BuildProxyCallHeader(header, from.GlobalIndex())
reqMsg := &protocol.ProxyRequestPackage{
Header: pheader,
Buffer: data,
}
pkg, _ := protocol.PackProxyReqMsg(reqMsg)
return to.Send(pkg)
}
func (p *ProxyHandler) outsideReturnError(from transport.ITransport, header *protocol.RpcProxyCallHeader, errorCode uint32) {
msg := &protocol.ProxyRespPackage{
Header: &protocol.RpcProxyCallRetHeader{
RpcMsgHeader: protocol.RpcMsgHeader{
Length: header.Length - uint32(protocol.ProxyCallHeadSize) + uint32(protocol.ProxyRetHeadSize),
Type: protocol.ProxyResponseMsg,
},
ServerID: header.ServerID,
CallID: header.CallID,
ErrorCode: errorCode,
GlobalIndex: from.GlobalIndex(),
},
}
pkg, _ := protocol.PackProxyRespMsg(msg)
err := from.Send(pkg)
if err != nil {
slog.Warn("send return info to %s error %v", from.RemoteAddr(), err)
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/jiaoyuedave/service-box-go.git
git@gitee.com:jiaoyuedave/service-box-go.git
jiaoyuedave
service-box-go
Service-Box-go
master

搜索帮助