1 Star 0 Fork 0

D10.天地弦/sputils

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
subscribe.go 13.09 KB
一键复制 编辑 原始数据 按行查看 历史
D10.天地弦 提交于 2024-12-18 09:20 . ADD: sputils
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
package sputils
import (
"fmt"
"gitee.com/ymofen/gobase/subpub"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
)
/*
支持使用路由进行订阅 # 匹配下级所有, +匹配当前层
topic/news
10000 topic
pub:
sync.map
lk:
utils_subscribe_v2._test.go:168: 25505, 5094/s
utils_subscribe_v2._test.go:168: 50409, 5035/s
nolk:
utils_subscribe_v2._test.go:168: 25767, 5135/s
utils_subscribe_v2._test.go:168: 51914, 5175/s
map with lk
utils_subscribe_v2._test.go:168: 34228, 6844/s
utils_subscribe_v2._test.go:168: 68952, 6894/s
map with lk
utils_subscribe_v2._test.go:168: 35343, 6977/s
utils_subscribe_v2._test.go:168: 71194, 7072/s
sesslst -> map
utils_subscribe_v2._test.go:168: 36668, 7321/s
utils_subscribe_v2._test.go:168: 74022, 7383/s
*/
type ChannelSubPublisher interface {
Pub(max int, args ...interface{}) int
Sub(id string, cb subpub.SubFunc) int
Unsub(id string) (r int)
}
type channelSubSession struct {
// 引用计数(可能多个主题匹配到同一个通道)
refN int32
// 订阅者
id string
fn subpub.SubFunc
}
// 发布通道
type channelItem struct {
refN int32
owner *Subscribe
id string
matchTopic interface{}
sesslst map[string]*channelSubSession
idlst []string
fnlst []subpub.SubFunc
lastActivityT int64
}
// 订阅主题
type subTopicItem struct {
subtopicid string
subtopic interface{}
sesslst map[string]subpub.SubFunc
}
// 订阅中心,
// 订阅主题:支持通配符/news/+/qq, 将会收到匹配的的通道消息
// PUB消息通道不支持通配符
type Subscribe struct {
lockcnt int32
lk sync.RWMutex
// 通道列表
channellst map[string]*channelItem
// 订阅主题列表
subTopiclst map[string]*subTopicItem
routeParseFunc func(topic string) interface{} // 必须确保有值
routeMatchFunc func(route interface{}, topic interface{}) bool // 必须确保有值
}
var (
DefaultSubscribe = NewSubscribe()
)
func innerParseRoute(route string) interface{} {
return strings.Split(route, "/")
}
func innerRouteIncludesTopic0(route []string, topic []string) bool {
if len(route) == 0 {
return len(topic) == 0
}
if len(topic) == 0 {
return route[0] == "#"
}
if route[0] == "#" {
return true
}
if (route[0] == "+") || (route[0] == topic[0]) {
return innerRouteIncludesTopic0(route[1:], topic[1:])
}
return false
}
/*
1000W:consume:730(ms)
*/
func innerRouteIncludesTopic(route interface{}, topic interface{}) bool {
return innerRouteIncludesTopic0(route.([]string), topic.([]string))
}
func NewSubscribe() *Subscribe {
return &Subscribe{
channellst: make(map[string]*channelItem),
subTopiclst: make(map[string]*subTopicItem),
routeParseFunc: innerParseRoute,
routeMatchFunc: innerRouteIncludesTopic,
}
}
func NewSubscribeEx(routeParseFunc func(topic string) interface{}, routeMatchFunc func(route interface{}, topic interface{}) bool) *Subscribe {
if routeParseFunc == nil {
panic("invalid routeParseFunc")
}
if routeMatchFunc == nil {
panic("invalid routeMatch")
}
return &Subscribe{
channellst: make(map[string]*channelItem),
subTopiclst: make(map[string]*subTopicItem),
routeParseFunc: routeParseFunc,
routeMatchFunc: routeMatchFunc,
}
}
// 创建并增加引用通道的引用计数,
//
// CheckGetPubChannelAddRef/CheckClosePubChannelDecRef 必须配套调用否则会造成通道泄漏
func (this *Subscribe) CheckGetPubChannelAddRef(channel string) ChannelSubPublisher {
this.lk.Lock()
atomic.AddInt32(&this.lockcnt, 1)
itm := this.innerCheckCreateChannel(channel)
itm.refN++
this.lk.Unlock()
return itm
}
// 释放通道引用计数, 如果引用计数:0 则进行释放通道
func (this *Subscribe) CheckClosePubChannelDecRef(ch ChannelSubPublisher) (closed bool) {
this.lk.Lock()
defer this.lk.Unlock()
atomic.AddInt32(&this.lockcnt, 1)
if itm, ok := ch.(*channelItem); ok {
itm.refN--
if itm.refN == 0 {
delete(this.channellst, itm.id)
return true
}
}
return false
}
// 直接释放通道
func (this *Subscribe) ClosePubChannel(channel string) (closed bool) {
this.lk.Lock()
defer this.lk.Unlock()
atomic.AddInt32(&this.lockcnt, 1)
itm := this.channellst[channel]
return this.innerFreeChannel(itm)
}
func (this *Subscribe) Close() error {
this.lk.Lock()
defer this.lk.Unlock()
for k, _ := range this.subTopiclst {
delete(this.subTopiclst, k)
}
return nil
}
func (this *Subscribe) matchSubTopic(topics interface{}, fn func(itm *subTopicItem) bool) {
this.rangeSubTopics(func(key string, itm *subTopicItem) bool {
if this.routeMatchFunc(itm.subtopic, topics) {
return fn(itm)
}
return true
})
}
func (this *Subscribe) checkGetTopic(topic string, new bool) *subTopicItem {
itm := this.subTopiclst[topic]
if new && itm == nil {
itm = &subTopicItem{subtopicid: topic, subtopic: this.routeParseFunc(topic), sesslst: make(map[string]subpub.SubFunc)}
this.subTopiclst[topic] = itm
}
return itm
}
func (this *Subscribe) Status() string {
this.lk.RLock()
defer this.lk.RUnlock()
return fmt.Sprintf("subTopicN:%d, channel:%d, topicSessN:%d", len(this.subTopiclst), len(this.channellst), this.sessionCount())
}
func (this *Subscribe) GetTopicSubCount(topic string) int {
this.lk.RLock()
defer this.lk.RUnlock()
itm := this.checkGetTopic(topic, false)
if itm == nil {
return 0
}
return len(itm.sesslst)
}
func (this *Subscribe) TopicCount() int {
this.lk.RLock()
defer this.lk.RUnlock()
return len(this.subTopiclst)
}
func (this *Subscribe) rangeSubTopics(fn func(key string, itm *subTopicItem) bool) {
for k, v := range this.subTopiclst {
if !fn(k, v) {
break
}
}
}
func (this *Subscribe) sessionCount() int {
n := 0
this.rangeSubTopics(func(key string, itm *subTopicItem) bool {
n += len(itm.sesslst)
return true
})
return n
}
func (this *Subscribe) Count() int {
this.lk.RLock()
defer this.lk.RUnlock()
return this.sessionCount()
}
func (this *Subscribe) StatusDetail() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("topic:%d", this.TopicCount()))
lst := make([]string, 0, 1024)
this.rangeSubTopics(func(key string, itm *subTopicItem) bool {
lst = append(lst, key)
return true
})
sort.Strings(lst)
for i := 0; i < len(lst); i++ {
itm := this.checkGetTopic(lst[i], false)
if itm != nil {
sb.WriteString(fmt.Sprintf(",%s:%d", lst[i], len(itm.sesslst)))
}
}
return sb.String()
}
// 添加订阅者(subid)到通道(channel)
// changed: true表示通道中订阅者有新增
func (this *Subscribe) addSub2Channel(channel *channelItem, subid string, cb subpub.SubFunc) (changed bool) {
itm := channel.sesslst[subid]
if itm == nil {
itm = &channelSubSession{id: subid, fn: cb}
channel.sesslst[subid] = itm
changed = true
}
itm.refN++
return
}
// 从通道中减少一个订阅者(subid)引用, 如果引用为:0, 则移除订阅者, 返回:true
func (this *Subscribe) releaseTopic(channel *channelItem, subid string) (changed bool) {
itm := channel.sesslst[subid]
if itm == nil {
return false
}
itm.refN--
if itm.refN == 0 {
delete(channel.sesslst, subid)
return true
}
return false
}
// lk must be held
// 添加订阅者(subid)到所有匹配的通道, (可能多个主题匹配到同一个通道)
func (this *Subscribe) innerAdd2Channel(id string, topic interface{}, cb subpub.SubFunc) {
for _, v := range this.channellst {
if this.routeMatchFunc(topic, v.matchTopic) {
if this.addSub2Channel(v, id, cb) { // 同一个通道下, 同一个订阅者(id), 确保只订阅一次
this.innerReloadSubSessionFnlst(v)
}
v.refN++
}
}
}
// 从匹配的通道中, 移除一次订阅者(subid)
func (this *Subscribe) innerRemoveFromChannel(id string, topic interface{}) (cnt int) {
for _, v := range this.channellst {
if this.routeMatchFunc(topic, v.matchTopic) {
if this.releaseTopic(v, id) {
cnt++
this.innerReloadSubSessionFnlst(v)
}
v.refN--
}
}
return
}
func (this *Subscribe) innerCheckCreateChannel(channel string) *channelItem {
itm := this.channellst[channel]
if itm != nil {
return itm
}
itm = &channelItem{owner: this, matchTopic: this.routeParseFunc(channel), id: channel, sesslst: make(map[string]*channelSubSession)}
this.channellst[channel] = itm
this.innerCollectSubSession(itm)
return itm
}
func (this *Subscribe) innerReloadSubSessionFnlst(itm *channelItem) {
fnlst := make([]subpub.SubFunc, len(itm.sesslst))
idlst := make([]string, len(itm.sesslst))
i := 0
for _, s := range itm.sesslst { // 所有session都添加进去
if s.fn == nil {
panic("callback is nil")
}
fnlst[i] = s.fn
idlst[i] = s.id
i++
}
itm.fnlst, itm.idlst = fnlst, idlst
}
func (this *Subscribe) innerCollectSubSession(itm *channelItem) {
for _, v := range this.subTopiclst {
if this.routeMatchFunc(v.subtopic, itm.matchTopic) {
for sid, sfn := range v.sesslst { // 所有session都添加进去
this.addSub2Channel(itm, sid, sfn)
}
itm.refN++
}
}
this.innerReloadSubSessionFnlst(itm)
}
// 订阅一个主题
//
// topic订阅主题,为空不进行订阅
// id订阅者id,topic 下id重复将会被覆盖(之前订阅失效)
func (this *Subscribe) Sub(id, topic string, cb subpub.SubFunc) int {
if len(topic) == 0 {
return 0
}
this.lk.Lock()
defer this.lk.Unlock()
atomic.AddInt32(&this.lockcnt, 1)
itm := this.checkGetTopic(topic, true)
itm.sesslst[id] = cb
this.innerAdd2Channel(id, itm.subtopic, cb)
return len(itm.sesslst)
}
// 取消订阅
// id 订阅时传入的id
// topic订阅的主题
// r: 返回取消订阅后, topic下订阅者数量
// changed: true表示topic主题下订阅者数量发生变化
func (this *Subscribe) Unsub(id, topic string) (r int, changed bool) {
if len(topic) == 0 {
return -1, false
}
this.lk.Lock()
defer this.lk.Unlock()
atomic.AddInt32(&this.lockcnt, 1)
itm := this.checkGetTopic(topic, false)
if itm != nil {
r0 := len(itm.sesslst)
delete(itm.sesslst, id)
r = len(itm.sesslst)
if r == 0 {
delete(this.subTopiclst, topic)
}
this.innerRemoveFromChannel(id, itm.subtopic)
return r, r != r0
}
return -1, false
}
// >1 投递成功max次后停止
// 循环map
//
// utils_subscribe_v2._test.go:168: 34228, 6844/s
// utils_subscribe_v2._test.go:168: 68952, 6894/s
//
// 循环lst
//
// utils_subscribe_v2._test.go:168: 35343, 6977/s
// utils_subscribe_v2._test.go:168: 71194, 7072/s
//
// 向主题订阅者推送数据
// topic推送的主题
// max最大接收者,超过该值不再进行推送
//
//max:0, 全部投递
//max:0, 全部投递
func (this *Subscribe) Pub(channel string, max int, args ...interface{}) int {
n := 0
var fnlst []subpub.SubFunc
var idlst []string
this.lk.RLock()
itm := this.channellst[channel]
if itm != nil {
fnlst, idlst = itm.fnlst, itm.idlst
itm.lastActivityT = time.Now().Unix()
}
this.lk.RUnlock()
if itm == nil {
this.lk.Lock()
atomic.AddInt32(&this.lockcnt, 1)
itm = this.innerCheckCreateChannel(channel)
fnlst, idlst = itm.fnlst, itm.idlst
itm.lastActivityT = time.Now().Unix()
this.lk.Unlock()
}
if len(idlst) != len(fnlst) {
return -1
}
for idx, fn := range fnlst {
if fn(idlst[idx], channel, args...) {
n++
if max > 0 && n >= max {
break
}
}
}
return n
}
// 通道必须确保不被释放,
//
// sub/pub 必须配套使用, 否则通道的生命周期会混乱
func (this *channelItem) Sub(id string, cb subpub.SubFunc) int {
this.owner.lk.Lock()
defer this.owner.lk.Unlock()
atomic.AddInt32(&this.owner.lockcnt, 1)
if this.owner.addSub2Channel(this, id, cb) {
this.owner.innerReloadSubSessionFnlst(this)
}
this.refN++
return len(this.idlst)
}
// 通道必须确保不被释放
func (this *channelItem) Unsub(id string) int {
this.owner.lk.Lock()
defer this.owner.lk.Unlock()
atomic.AddInt32(&this.owner.lockcnt, 1)
if this.owner.releaseTopic(this, id) {
this.owner.innerReloadSubSessionFnlst(this)
}
this.refN--
return len(this.idlst)
}
// 通道必须确保不被释放
func (this *channelItem) Pub(max int, args ...interface{}) (n int) {
fnlst, idlst := this.fnlst, this.idlst
this.lastActivityT = time.Now().Unix()
for idx, fn := range fnlst {
if fn(idlst[idx], this.id, args...) {
n++
if max > 0 && n >= max {
break
}
}
}
return n
}
func (this *Subscribe) innerFreeChannel(itm *channelItem) bool {
if itm == nil || len(itm.sesslst) > 0 {
return false
}
delete(this.channellst, itm.id)
itm.matchTopic = nil
itm.idlst = nil
itm.sesslst = nil
itm.fnlst = nil
itm.idlst = nil
return true
}
//// 释放通道
//func (this *Subscribe) ClosePubChannel(channel string) (closed bool) {
// this.lk.Lock()
// defer this.lk.Unlock()
//
// atomic.AddInt32(&this.lockcnt, 1)
// itm := this.channellst[channel]
// return this.innerFreeChannel(itm)
//}
// 清理一些超时10分钟没有发布消息的通道
func (this *Subscribe) CleanChannels() (cnt int) {
t := time.Now().Unix()
var lst []*channelItem
this.lk.RLock()
for _, itm := range this.channellst {
if itm.refN <= 0 && t-itm.lastActivityT > 600 { // 10分钟没有发布数据, 进行清理
lst = append(lst, itm)
}
}
lst = append(lst)
this.lk.RUnlock()
if len(lst) > 0 {
this.lk.Lock()
defer this.lk.Unlock()
for i := 0; i < len(lst); i++ {
if this.innerFreeChannel(lst[i]) {
cnt++
}
}
}
return
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/ymofen/sputils.git
git@gitee.com:ymofen/sputils.git
ymofen
sputils
sputils
master

搜索帮助