代码拉取完成,页面将自动刷新
//MIT License
//Copyright (c) 2022 cloudguan rcloudguan@163.com
//Permission is hereby granted, free of charge, to any person obtaining a copy
//of this software and associated documentation files (the "Software"), to deal
//in the Software without restriction, including without limitation the rights
//to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
//copies of the Software, and to permit persons to whom the Software is
//furnished to do so, subject to the following conditions:
//The above copyright notice and this permission notice shall be included in all
//copies or substantial portions of the Software.
//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
//IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
//FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
//AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
//LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
//OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
//SOFTWARE.
package sbox
import (
"errors"
perror "gitee.com/dennis-kk/service-box-go/util/errors"
"runtime/debug"
"gitee.com/dennis-kk/service-box-go/internal/net"
"gitee.com/dennis-kk/service-box-go/util/slog"
)
// BoxNetwork service box network manager
type (
// ChannelCache LoopCache map[string]*boxNetLoop
ChannelCache map[string]*BoxChannel
//NetActionCallback network event call back
NetActionCallback func(string, *BoxChannel, error)
BoxNetwork struct {
cUuid uint32 //channel uuid
channelMap ChannelCache //channel cache
ch net.NetEventQueue //network event channel
netLoop net.IBoxNetLoop //network loop, difference
eventcb []NetActionCallback //network event call back
}
)
func NewBoxNetWork() *BoxNetwork {
bn := &BoxNetwork{
cUuid: 1,
ch: make(net.NetEventQueue, 128),
channelMap: make(ChannelCache, 128),
eventcb: make([]NetActionCallback, net.EventMaxFlag),
}
return bn
}
// Init init box net work
func (bn *BoxNetwork) Init() {
bn.netLoop = net.BoxLoop(bn.ch)
}
//Start start all box network loop, like redis, zookeeper
func (bn *BoxNetwork) Start() {
if bn == nil || bn.netLoop == nil {
panic("[ServiceBox] box not init correctly !")
}
err := bn.netLoop.Start()
if err != nil {
panic("[ServiceBox] box network loop init error! !")
}
}
//ListenAt start Server at address !
func (bn *BoxNetwork) ListenAt(network, address string) error {
return bn.netLoop.ListenAt(network, address)
}
//ConnectTo connect to other server block
func (bn *BoxNetwork) ConnectTo(network, address string) (net.IBoxConn, error) {
return bn.netLoop.ConnectTo(network, address)
}
func (bn *BoxNetwork) ConnectToService(service, network, address string) {
nEvent := net.GetEventData()
nEvent.NetType = net.EventReqConnect
nEvent.Data = &net.ConnectToEvent{
Name: service,
Network: network,
Addr: address,
}
bn.netLoop.SendToLoop(nEvent)
}
func (bn *BoxNetwork) RegisterEventHandle(eventType net.EventType, handle NetActionCallback) {
if eventType >= net.EventMaxFlag {
return
}
bn.eventcb[eventType] = handle
}
// Tick framework main loop
func (bn *BoxNetwork) Tick() {
// deal box net work event
for {
select {
case nEvent := <-bn.ch:
err := bn.dealNetEvent(nEvent)
if err != nil {
slog.Error("[SBoxNetwork] service box deal net event err %v", err)
continue
}
default:
return
}
}
}
// Stop stop box net work, send c
func (bn *BoxNetwork) Stop() {
// close connect channel
for _, ch := range bn.channelMap {
ch.Close()
}
// stop
if err := bn.netLoop.Stop(); err != nil {
slog.Warn("[Network] stop network error!")
}
close(bn.ch)
for nEvent := range bn.ch {
if err := bn.dealNetEvent(nEvent); err != nil {
slog.Error("[SBoxNetwork] service box deal net event err %v", err)
continue
}
}
bn.channelMap = nil
}
func (bn *BoxNetwork) TryGetChannel(host string) *BoxChannel {
if channel, ok := bn.channelMap[host]; ok {
if !channel.IsClose() {
return channel
}
}
return nil
}
// =================================================
func (bn *BoxNetwork) genChannelUuid() uint32 {
bn.cUuid++
return bn.cUuid
}
func (bn *BoxNetwork) dealNetEvent(nEvent *net.EventData) error {
// 错误保底
defer func() {
if r := recover(); r != nil {
// recover panic
slog.Error("deal net event error %v ", r)
slog.Error("trace back: %q", string(debug.Stack()))
}
}()
switch nEvent.NetType {
case net.EventAccept:
bn.onAccept(nEvent.Data)
case net.EventRespConnect:
bn.onConnect(nEvent)
case net.EventReceive:
bn.onReceived(nEvent.Data)
case net.EventClose:
bn.onClose(nEvent.Data)
}
return nil
}
// onAccept accept new connection, work in logic loop!
func (bn *BoxNetwork) onAccept(eData interface{}) {
if eData == nil {
return
}
// type cast
var cEvent *net.AcceptEvent
var ok bool
if cEvent, ok = eData.(*net.AcceptEvent); !ok {
slog.Warn("[SBoxNetwork] network event type error!")
return
}
if cEvent.Conn.IsClose() {
if cb := bn.eventcb[net.EventAccept]; cb != nil {
cb(cEvent.Addr, nil, perror.ChannelHasClosed)
}
return
}
tsid := cEvent.Conn.RemoteAddr().String()
//check valid
trans, err := bn.createBoxChannel(tsid, cEvent.Conn)
if trans != nil && err == nil {
slog.Info("[SBoxNetwork] %s connect to service box !", tsid)
}
// call back
if cb := bn.eventcb[net.EventAccept]; cb != nil {
cb(tsid, trans, err)
}
}
func (bn *BoxNetwork) onConnect(nEvent *net.EventData) {
if nEvent == nil {
return
}
var cEvent *net.ConnectEvent
var ok bool
if cEvent, ok = nEvent.Data.(*net.ConnectEvent); !ok {
//TODO add error log
return
}
if nEvent.Err != nil {
if cb := bn.eventcb[net.EventRespConnect]; cb != nil {
cb(cEvent.Host, nil, nEvent.Err)
}
return
}
tsid := cEvent.Conn.RemoteAddr().String()
//check valid
trans, err := bn.createBoxChannel(tsid, cEvent.Conn)
if trans != nil && err == nil {
slog.Info("[SBoxNetwork] %s connect to service box !", tsid)
}
// call back
if cb := bn.eventcb[net.EventRespConnect]; cb != nil {
cb(cEvent.Host, trans, err)
}
}
func (bn *BoxNetwork) onReceived(eData interface{}) {
if bn == nil || eData == nil {
return
}
//type cast
var rEvent *net.ReceiveEvent
var ok bool
if rEvent, ok = eData.(*net.ReceiveEvent); !ok {
slog.Warn("[SBoxNetwork] network event type error while receive Data !")
return
}
// get trans sid
trans, ok := bn.channelMap[rEvent.TSid]
if !ok {
slog.Error("[SBoxNetwork] get transport %s error !!", rEvent.TSid)
return
}
rLen, err := trans.Write(rEvent.Data, len(rEvent.Data))
if err != nil {
//pass error to callback function
slog.Error("[SBoxNetwork] transport %s write net Data error %v", rEvent.TSid, err)
}
if rLen != len(rEvent.Data) {
slog.Error("[SBoxNetwork] transport %s write net Data error, cache buffer not enought %d:%d", rEvent.TSid, len(rEvent.Data), rLen)
}
// call back
if cb := bn.eventcb[net.EventReceive]; cb != nil {
cb(rEvent.TSid, trans, err)
}
}
func (bn *BoxNetwork) onClose(eData interface{}) {
if bn == nil || eData == nil {
return
}
var (
cEvent *net.ClosedEvent
ok bool
trans *BoxChannel
)
if cEvent, ok = eData.(*net.ClosedEvent); !ok {
slog.Warn("[SBoxNetwork] network event type error while close !")
return
}
if trans, ok = bn.channelMap[cEvent.TSid]; ok {
if trans == nil {
return
}
// close finish, clean Data
trans.Clean()
} else {
// channel has been removed
return
}
delete(bn.channelMap, cEvent.TSid)
if cb := bn.eventcb[net.EventClose]; cb != nil {
cb(cEvent.TSid, trans, nil)
}
}
func (bn *BoxNetwork) createBoxChannel(tsid string, conn net.IBoxConn) (*BoxChannel, error) {
//check valid
_, ok := bn.channelMap[tsid]
if ok {
//panic("[SBoxNetwork] client" + tsid + "connect to server !!")
return nil, perror.ChannelRepeatedConnect
}
//new transport
trans := NewBoxChannel(conn)
if trans == nil {
slog.Error("[ServiceBox] %s create box channel failed!", tsid)
return nil, errors.New("create channel failed")
}
trans.SetID(bn.genChannelUuid())
bn.channelMap[tsid] = trans
return trans, nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。