1 Star 0 Fork 7

ryvius_key/netpoll

forked from CloudWeGo/netpoll 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
Apache-2.0
// Copyright 2021 CloudWeGo Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //go:build (darwin || netbsd || freebsd || openbsd || dragonfly) && race // +build darwin netbsd freebsd openbsd dragonfly // +build race package netpoll import ( "log" "sync" "sync/atomic" "syscall" ) // mock no race poll func openPoll() Poll { return openDefaultPoll() } func openDefaultPoll() *defaultPoll { l := new(defaultPoll) p, err := syscall.Kqueue() if err != nil { panic(err) } l.fd = p _, err = syscall.Kevent(l.fd, []syscall.Kevent_t{{ Ident: 0, Filter: syscall.EVFILT_USER, Flags: syscall.EV_ADD | syscall.EV_CLEAR, }}, nil, nil) if err != nil { panic(err) } return l } type defaultPoll struct { fd int trigger uint32 m sync.Map } // Wait implements Poll. func (p *defaultPoll) Wait() error { // init var size, caps = 1024, barriercap var events, barriers = make([]syscall.Kevent_t, size), make([]barrier, size) for i := range barriers { barriers[i].bs = make([][]byte, caps) barriers[i].ivs = make([]syscall.Iovec, caps) } // wait for { var hups []*FDOperator n, err := syscall.Kevent(p.fd, nil, events, nil) if err != nil && err != syscall.EINTR { // exit gracefully if err == syscall.EBADF { return nil } return err } for i := 0; i < n; i++ { var fd = int(events[i].Ident) // trigger if fd == 0 { // clean trigger atomic.StoreUint32(&p.trigger, 0) continue } var operator *FDOperator if tmp, ok := p.m.Load(fd); ok { operator = tmp.(*FDOperator) } else { continue } if !operator.do() { continue } // check poll in if events[i].Filter == syscall.EVFILT_READ && events[i].Flags&syscall.EV_ENABLE != 0 { if operator.OnRead != nil { // for non-connection operator.OnRead(p) } else { // only for connection var bs = operator.Inputs(barriers[i].bs) if len(bs) > 0 { var n, err = readv(operator.FD, bs, barriers[i].ivs) operator.InputAck(n) if err != nil && err != syscall.EAGAIN && err != syscall.EINTR { log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error()) hups = append(hups, operator) operator.done() continue } } } } // check hup if events[i].Flags&syscall.EV_EOF != 0 { hups = append(hups, operator) operator.done() continue } // check poll out if events[i].Filter == syscall.EVFILT_WRITE && events[i].Flags&syscall.EV_ENABLE != 0 { if operator.OnWrite != nil { // for non-connection operator.OnWrite(p) } else { // only for connection var bs, supportZeroCopy = operator.Outputs(barriers[i].bs) if len(bs) > 0 { // TODO: Let the upper layer pass in whether to use ZeroCopy. var n, err = sendmsg(operator.FD, bs, barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) if err != nil && err != syscall.EAGAIN { log.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error()) hups = append(hups, operator) } } } } operator.done() } // hup conns together to avoid blocking the poll. if len(hups) > 0 { p.detaches(hups) } } } // TODO: Close will bad file descriptor here func (p *defaultPoll) Close() error { var err = syscall.Close(p.fd) // delete all *FDOperator p.m.Range(func(key, value interface{}) bool { var operator, _ = value.(*FDOperator) if operator.OnHup != nil { operator.OnHup(p) } return true }) return err } // Trigger implements Poll. func (p *defaultPoll) Trigger() error { if atomic.AddUint32(&p.trigger, 1) > 1 { return nil } _, err := syscall.Kevent(p.fd, []syscall.Kevent_t{{ Ident: 0, Filter: syscall.EVFILT_USER, Fflags: syscall.NOTE_TRIGGER, }}, nil, nil) return err } // Control implements Poll. func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { var evs = make([]syscall.Kevent_t, 1) evs[0].Ident = uint64(operator.FD) switch event { case PollReadable, PollModReadable: operator.inuse() p.m.Store(operator.FD, operator) evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_ADD|syscall.EV_ENABLE case PollDetach: defer operator.unused() p.m.Delete(operator.FD) evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_DELETE|syscall.EV_ONESHOT case PollWritable: operator.inuse() p.m.Store(operator.FD, operator) evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_ADD|syscall.EV_ENABLE|syscall.EV_ONESHOT case PollR2RW: evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_ADD|syscall.EV_ENABLE case PollRW2R: evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_DELETE|syscall.EV_ONESHOT } _, err := syscall.Kevent(p.fd, evs, nil, nil) return err } func (p *defaultPoll) detaches(hups []*FDOperator) error { var onhups = make([]func(p Poll) error, len(hups)) for i := range hups { onhups[i] = hups[i].OnHup p.Control(hups[i], PollDetach) } go func(onhups []func(p Poll) error) { for i := range onhups { if onhups[i] != nil { onhups[i](p) } } }(onhups) return nil }

简介

Netpoll 是由 字节跳动 开发的高性能 NIO(Non-blocking I/O) 网络库,专注于 RPC 场景 展开 收起
Apache-2.0
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/ryvius_key/netpoll.git
git@gitee.com:ryvius_key/netpoll.git
ryvius_key
netpoll
netpoll
develop

搜索帮助