代码拉取完成,页面将自动刷新
同步操作将从 Plato/Service-Box-go 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
//MIT License
//Copyright (c) 2021 cloudguan rcloudguan@163.com
//Permission is hereby granted, free of charge, to any person obtaining a copy
//of this software and associated documentation files (the "Software"), to deal
//in the Software without restriction, including without limitation the rights
//to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
//copies of the Software, and to permit persons to whom the Software is
//furnished to do so, subject to the following conditions:
//The above copyright notice and this permission notice shall be included in all
//copies or substantial portions of the Software.
//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
//IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
//FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
//AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
//LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
//OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
//SOFTWARE.
package sbox
import (
"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/protocol"
"gitee.com/dennis-kk/service-box-go/common"
"gitee.com/dennis-kk/service-box-go/internal/net"
"gitee.com/dennis-kk/service-box-go/util/slog"
"github.com/panjf2000/gnet/pkg/ringbuffer"
"sync/atomic"
)
// BoxChannel implement rpc backend go transport interface
// one connect has it's own channel
type BoxChannel struct {
transID uint32 //channel uuid
globalIndex protocol.GlobalIndexType // global index for
status int32 // status RUNNING, CLOSE, RECONNECT
localHost string //local ip port cache
peerHost string //peer host cache
conn net.IBoxConn // gnet connect handle
rb *ringbuffer.RingBuffer // cache buffer ringbuferr impl
cache []byte //cache buffer for head
}
func (bch *BoxChannel) GlobalIndex() protocol.GlobalIndexType {
return bch.globalIndex
}
func (bch *BoxChannel) SetGlobalIndex(uuid protocol.GlobalIndexType) {
bch.globalIndex = uuid
}
func NewBoxChannel(conn net.IBoxConn) *BoxChannel {
if conn == nil {
return nil
}
boxchan := &BoxChannel{
conn: conn,
status: common.ChannelRunning,
localHost: conn.LocalAddr().String(),
peerHost: conn.RemoteAddr().String(),
rb: ringbuffer.New(1024 * 16),
cache: make([]byte, 16),
}
return boxchan
}
func (bch *BoxChannel) LocalAddr() string {
return bch.localHost
}
func (bch *BoxChannel) RemoteAddr() string {
return bch.peerHost
}
func (bch *BoxChannel) Write(pkg []byte, length int) (int, error) {
if bch == nil {
return 0, common.ErrChannelInvalid
}
if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
return 0, common.ErrChannelNotRun
}
resLen, err := bch.rb.Write(pkg[:])
if resLen != len(pkg) || err != nil {
return 0, err
}
return resLen, nil
}
func (bch *BoxChannel) Read(pkg []byte, length int) (int, error) {
if bch == nil {
return 0, common.ErrChannelInvalid
}
if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
return 0, common.ErrChannelNotRun
}
//FIXME
resLen, err := bch.rb.Read(pkg)
if err != nil {
return 0, err
}
return resLen, nil
}
// Peek read buffer without modify read point
func (bch *BoxChannel) Peek(length int) ([]byte, int, error) {
if bch == nil {
return nil, 0, common.ErrChannelInvalid
}
if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
return nil, 0, common.ErrChannelNotRun
}
//not enough buffer cache
if bch.rb.Length() < length {
return nil, 0, nil
}
head, tail := bch.rb.Peek(length)
if len(tail) != 0 {
total := len(head) + len(tail)
if total > len(bch.cache) {
bch.cache = make([]byte, total)
}
copy(bch.cache, head)
copy(bch.cache[len(head):], tail)
return bch.cache, total, nil
}
return head, len(head), nil
}
func (bch *BoxChannel) Send(pkg []byte) error {
if bch == nil {
return common.ErrChannelInvalid
}
if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
return common.ErrChannelNotRun
}
if bch.conn == nil {
return common.ErrChannelConn
}
return bch.conn.Write(pkg)
}
//Close active close net channel !
func (bch *BoxChannel) Close() {
if bch == nil {
return
}
if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
return
}
defer func() {
atomic.StoreInt32(&bch.status, common.ChannelClose)
}()
err := bch.conn.Close()
if err != nil {
slog.Warn("[BoxChannel] %d active close error %v !", bch.transID, err)
return
}
}
//Clean clean channel and reacover
func (bch *BoxChannel) Clean() {
if bch == nil {
return
}
bch.conn = nil
bch.cache = nil
if bch.rb != nil {
bch.rb.Reset()
bch.rb = nil
}
atomic.StoreInt32(&bch.status, common.ChannelClose)
}
func (bch *BoxChannel) Size() uint32 {
if bch == nil {
return 0
}
if atomic.LoadInt32(&bch.status) != common.ChannelRunning {
return 0
}
return uint32(bch.rb.Length())
}
func (bch *BoxChannel) IsClose() bool {
if bch == nil {
return true
}
if bch.conn == nil {
return true
}
return atomic.LoadInt32(&bch.status) != common.ChannelRunning
}
func (bch *BoxChannel) GetID() uint32 {
if bch == nil {
return 0
}
return bch.transID
}
func (bch *BoxChannel) SetID(transID uint32) {
if bch == nil {
return
}
bch.transID = transID
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。