1 Star 0 Fork 4

jiaoyuedave/Service-Box-go

forked from Plato/Service-Box-go 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
channel.go 5.34 KB
一键复制 编辑 原始数据 按行查看 历史
//MIT License
//Copyright (c) 2021 cloudguan rcloudguan@163.com
//Permission is hereby granted, free of charge, to any person obtaining a copy
//of this software and associated documentation files (the "Software"), to deal
//in the Software without restriction, including without limitation the rights
//to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
//copies of the Software, and to permit persons to whom the Software is
//furnished to do so, subject to the following conditions:
//The above copyright notice and this permission notice shall be included in all
//copies or substantial portions of the Software.
//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
//IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
//FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
//AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
//LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
//OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
//SOFTWARE.
package sbox
import (
"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/protocol"
"gitee.com/dennis-kk/service-box-go/common"
"gitee.com/dennis-kk/service-box-go/internal/net"
"gitee.com/dennis-kk/service-box-go/util/slog"
"github.com/panjf2000/gnet/pkg/ringbuffer"
"sync/atomic"
)
// BoxChannel implement rpc backend go transport interface
// one connect has it's own channel
type BoxChannel struct {
transID uint32 //channel uuid
globalIndex protocol.GlobalIndexType // global index for
status int32 // status RUNNING, CLOSE, RECONNECT
localHost string //local ip port cache
peerHost string //peer host cache
conn net.IBoxConn // gnet connect handle
rb *ringbuffer.RingBuffer // cache buffer ringbuferr impl
cache []byte //cache buffer for head
}
func (bch *BoxChannel) GlobalIndex() protocol.GlobalIndexType {
return bch.globalIndex
}
func (bch *BoxChannel) SetGlobalIndex(uuid protocol.GlobalIndexType) {
bch.globalIndex = uuid
}
func NewBoxChannel(conn net.IBoxConn) *BoxChannel {
if conn == nil {
return nil
}
boxchan := &BoxChannel{
conn: conn,
status: common.ChannelRunning,
localHost: conn.LocalAddr().String(),
peerHost: conn.RemoteAddr().String(),
rb: ringbuffer.New(1024 * 16),
cache: make([]byte, 16),
}
return boxchan
}
func (bch *BoxChannel) LocalAddr() string {
return bch.localHost
}
func (bch *BoxChannel) RemoteAddr() string {
return bch.peerHost
}
func (bch *BoxChannel) Write(pkg []byte, length int) (int, error) {
if bch == nil {
return 0, common.ErrChannelInvalid
}
if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
return 0, common.ErrChannelNotRun
}
resLen, err := bch.rb.Write(pkg[:])
if resLen != len(pkg) || err != nil {
return 0, err
}
return resLen, nil
}
func (bch *BoxChannel) Read(pkg []byte, length int) (int, error) {
if bch == nil {
return 0, common.ErrChannelInvalid
}
if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
return 0, common.ErrChannelNotRun
}
//FIXME
resLen, err := bch.rb.Read(pkg)
if err != nil {
return 0, err
}
return resLen, nil
}
// Peek read buffer without modify read point
func (bch *BoxChannel) Peek(length int) ([]byte, int, error) {
if bch == nil {
return nil, 0, common.ErrChannelInvalid
}
if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
return nil, 0, common.ErrChannelNotRun
}
//not enough buffer cache
if bch.rb.Length() < length {
return nil, 0, nil
}
head, tail := bch.rb.Peek(length)
if len(tail) != 0 {
total := len(head) + len(tail)
if total > len(bch.cache) {
bch.cache = make([]byte, total)
}
copy(bch.cache, head)
copy(bch.cache[len(head):], tail)
return bch.cache, total, nil
}
return head, len(head), nil
}
func (bch *BoxChannel) Send(pkg []byte) error {
if bch == nil {
return common.ErrChannelInvalid
}
if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
return common.ErrChannelNotRun
}
if bch.conn == nil {
return common.ErrChannelConn
}
return bch.conn.Write(pkg)
}
//Close active close net channel !
func (bch *BoxChannel) Close() {
if bch == nil {
return
}
if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
return
}
defer func() {
atomic.StoreInt32(&bch.status, common.ChannelClose)
}()
err := bch.conn.Close()
if err != nil {
slog.Warn("[BoxChannel] %d active close error %v !", bch.transID, err)
return
}
}
//Clean clean channel and reacover
func (bch *BoxChannel) Clean() {
if bch == nil {
return
}
bch.conn = nil
bch.cache = nil
if bch.rb != nil {
bch.rb.Reset()
bch.rb = nil
}
atomic.StoreInt32(&bch.status, common.ChannelClose)
}
func (bch *BoxChannel) Size() uint32 {
if bch == nil {
return 0
}
if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
return 0
}
return uint32(bch.rb.Length())
}
func (bch *BoxChannel) IsClose() bool {
if bch == nil {
return true
}
if bch.conn == nil {
return true
}
return atomic.LoadInt32(&bch.status) != common.ChannelRunning
}
func (bch *BoxChannel) GetID() uint32 {
if bch == nil {
return 0
}
return bch.transID
}
func (bch *BoxChannel) SetID(transID uint32) {
if bch == nil {
return
}
bch.transID = transID
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/jiaoyuedave/service-box-go.git
git@gitee.com:jiaoyuedave/service-box-go.git
jiaoyuedave
service-box-go
Service-Box-go
master

搜索帮助