1 Star 1 Fork 1

andypan/gnet

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
eventloop_unix.go 7.78 KB
一键复制 编辑 原始数据 按行查看 历史
// Copyright (c) 2019 Andy Pan
// Copyright (c) 2018 Joshua J Baker
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
// +build linux freebsd dragonfly darwin
package gnet
import (
"os"
"runtime"
"time"
"github.com/panjf2000/gnet/errors"
"github.com/panjf2000/gnet/internal/netpoll"
"golang.org/x/sys/unix"
)
type eventloop struct {
ln *listener // listener
idx int // loop index in the server loops list
svr *server // server in loop
poller *netpoll.Poller // epoll or kqueue
packet []byte // read packet buffer
connCount int32 // number of active connections in event-loop
connections map[int]*conn // loop connections fd -> conn
eventHandler EventHandler // user eventHandler
calibrateCallback func(*eventloop, int32) // callback func for re-adjusting connCount
}
func (el *eventloop) closeAllConns() {
// Close loops and all outstanding connections
for _, c := range el.connections {
_ = el.loopCloseConn(c, nil)
}
}
func (el *eventloop) loopRun(lockOSThread bool) {
if lockOSThread {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}
defer func() {
el.closeAllConns()
el.ln.close()
if el.idx == 0 && el.svr.opts.Ticker {
close(el.svr.ticktock)
}
el.svr.signalShutdown()
}()
if el.idx == 0 && el.svr.opts.Ticker {
go el.loopTicker()
}
switch err := el.poller.Polling(el.handleEvent); err {
case errors.ErrServerShutdown:
el.svr.logger.Infof("Event-loop(%d) is exiting normally on the signal error: %v", el.idx, err)
default:
el.svr.logger.Errorf("Event-loop(%d) is exiting due to an unexpected error: %v", el.idx, err)
}
}
func (el *eventloop) loopAccept(fd int) error {
if fd == el.ln.fd {
if el.ln.network == "udp" {
return el.loopReadUDP(fd)
}
nfd, sa, err := unix.Accept(fd)
if err != nil {
if err == unix.EAGAIN {
return nil
}
return os.NewSyscallError("accept", err)
}
if err = os.NewSyscallError("fcntl nonblock", unix.SetNonblock(nfd, true)); err != nil {
return err
}
netAddr := netpoll.SockaddrToTCPOrUnixAddr(sa)
c := newTCPConn(nfd, el, sa, netAddr)
if err = el.poller.AddRead(c.fd); err == nil {
el.connections[c.fd] = c
return el.loopOpen(c)
}
return err
}
return nil
}
func (el *eventloop) loopOpen(c *conn) error {
c.opened = true
el.calibrateCallback(el, 1)
out, action := el.eventHandler.OnOpened(c)
if out != nil {
c.open(out)
}
if !c.outboundBuffer.IsEmpty() {
_ = el.poller.AddWrite(c.fd)
}
return el.handleAction(c, action)
}
func (el *eventloop) loopRead(c *conn) error {
n, err := unix.Read(c.fd, el.packet)
if n == 0 || err != nil {
if err == unix.EAGAIN {
return nil
}
return el.loopCloseConn(c, os.NewSyscallError("read", err))
}
c.buffer = el.packet[:n]
for inFrame, _ := c.read(); inFrame != nil; inFrame, _ = c.read() {
out, action := el.eventHandler.React(inFrame, c)
if out != nil {
el.eventHandler.PreWrite()
if err = c.write(out); err != nil {
return err
}
}
switch action {
case None:
case Close:
return el.loopCloseConn(c, nil)
case Shutdown:
return errors.ErrServerShutdown
}
// Check the status of connection every loop since it might be closed during writing data back to client due to
// some kind of system error.
if !c.opened {
return nil
}
}
_, _ = c.inboundBuffer.Write(c.buffer)
return nil
}
func (el *eventloop) loopWrite(c *conn) error {
el.eventHandler.PreWrite()
head, tail := c.outboundBuffer.LazyReadAll()
n, err := unix.Write(c.fd, head)
if err != nil {
if err == unix.EAGAIN {
return nil
}
return el.loopCloseConn(c, os.NewSyscallError("write", err))
}
c.outboundBuffer.Shift(n)
if n == len(head) && tail != nil {
n, err = unix.Write(c.fd, tail)
if err != nil {
if err == unix.EAGAIN {
return nil
}
return el.loopCloseConn(c, os.NewSyscallError("write", err))
}
c.outboundBuffer.Shift(n)
}
if c.outboundBuffer.IsEmpty() {
_ = el.poller.ModRead(c.fd)
}
return nil
}
func (el *eventloop) loopCloseConn(c *conn, err error) error {
if !c.opened {
el.svr.logger.Debugf("The fd=%d in event-loop(%d) is already closed, skipping it", c.fd, el.idx)
return nil
}
// Send residual data in buffer back to client before actually closing the connection.
if !c.outboundBuffer.IsEmpty() {
el.eventHandler.PreWrite()
head, tail := c.outboundBuffer.LazyReadAll()
if n, err := unix.Write(c.fd, head); err == nil {
if n == len(head) && tail != nil {
_, _ = unix.Write(c.fd, tail)
}
}
}
if err0, err1 := el.poller.Delete(c.fd), unix.Close(c.fd); err0 == nil && err1 == nil {
delete(el.connections, c.fd)
el.calibrateCallback(el, -1)
if el.eventHandler.OnClosed(c, err) == Shutdown {
return errors.ErrServerShutdown
}
c.releaseTCP()
} else {
if err0 != nil {
el.svr.logger.Warnf("Failed to delete fd=%d from poller in event-loop(%d), %v", c.fd, el.idx, err0)
}
if err1 != nil {
el.svr.logger.Warnf("Failed to close fd=%d in event-loop(%d), %v",
c.fd, el.idx, os.NewSyscallError("close", err1))
}
}
return nil
}
func (el *eventloop) loopWake(c *conn) error {
//if co, ok := el.connections[c.fd]; !ok || co != c {
// return nil // ignore stale wakes.
//}
out, action := el.eventHandler.React(nil, c)
if out != nil {
if err := c.write(out); err != nil {
return err
}
}
return el.handleAction(c, action)
}
func (el *eventloop) loopTicker() {
var (
delay time.Duration
open bool
err error
)
for {
err = el.poller.Trigger(func() (err error) {
delay, action := el.eventHandler.Tick()
el.svr.ticktock <- delay
switch action {
case None:
case Shutdown:
err = errors.ErrServerShutdown
}
return
})
if err != nil {
el.svr.logger.Errorf("Failed to awake poller in event-loop(%d), error:%v, stopping ticker", el.idx, err)
break
}
if delay, open = <-el.svr.ticktock; open {
time.Sleep(delay)
} else {
break
}
}
}
func (el *eventloop) handleAction(c *conn, action Action) error {
switch action {
case None:
return nil
case Close:
return el.loopCloseConn(c, nil)
case Shutdown:
return errors.ErrServerShutdown
default:
return nil
}
}
func (el *eventloop) loopReadUDP(fd int) error {
n, sa, err := unix.Recvfrom(fd, el.packet, 0)
if err != nil || n == 0 {
if err != nil && err != unix.EAGAIN {
el.svr.logger.Warnf("Failed to read UDP packet from fd=%d in event-loop(%d), %v",
fd, el.idx, os.NewSyscallError("recvfrom", err))
}
return nil
}
c := newUDPConn(fd, el, sa)
out, action := el.eventHandler.React(el.packet[:n], c)
if out != nil {
el.eventHandler.PreWrite()
_ = c.sendTo(out)
}
if action == Shutdown {
return errors.ErrServerShutdown
}
c.releaseUDP()
return nil
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/panjf2000/gnet.git
git@gitee.com:panjf2000/gnet.git
panjf2000
gnet
gnet
master

搜索帮助