1 Star 1 Fork 0

初八/cancer

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
sub.go 3.49 KB
一键复制 编辑 原始数据 按行查看 历史
初八 提交于 2022-12-12 19:37 . 功能完善
package cancer
import (
"cancer/stomp"
"cancer/store"
)
type subscribeMgr struct {
subscribes map[string][]*Subscribe // key destination value []Subscribe
store *store.Store // key destination value tasklooper
}
func (subMgr *subscribeMgr) SetStore(store *store.Store) {
subMgr.store = store
}
var subMgr = &subscribeMgr{
subscribes: make(map[string][]*Subscribe),
}
func OnSubscribe(frame *stomp.Frame, channel *Channel) *stomp.Frame {
/// validate
dest := frame.GetHeader(stomp.StompHeaders.Destination)
subId := frame.GetHeader(stomp.StompHeaders.Id)
ack := frame.GetHeader(stomp.StompHeaders.Ack)
var ackFrame *stomp.Frame
if ack == "client" || ack == "client-individual" {
ackFrame = stomp.NewAckFrame(subId)
}
if dest == "" {
ackFrame = stomp.NewErrorFrame("Headers missing :'destination'", "Subscribe frame header['destination']: must not be null", frame)
}
if subId == "" {
ackFrame = stomp.NewErrorFrame("Headers missing :'id'", "Subscribe frame header['id']: must not be null", frame)
}
subscribe := GetSubscribe(subId, dest)
if subscribe != nil {
ackFrame = stomp.NewErrorFrame("Subscribe: ['id'] must uniquely",
"This sub-id is alread been used with this destination:["+subscribe.id+"].\n sub-id must be uniquely", frame)
}
sub := &Subscribe{
id: subId,
destination: dest,
channel: channel,
}
err := subMgr.registerSubscribe(sub)
if err != nil {
ackFrame = stomp.NewErrorFrame("Create subscribe destination error",
"sever create subscribe:["+subscribe.id+"] error, code 503", frame)
}
return ackFrame
}
func (subMgr *subscribeMgr) registerSubscribe(sub *Subscribe) error {
// 注册消费者
// 一个destination 多个Subscribe
subscribes := subMgr.subscribes[sub.destination]
if subscribes != nil {
for _, value := range subscribes {
if value == sub {
subscribes = append(subscribes, sub)
}
}
} else {
subscribes = make([]*Subscribe, 10)
subscribes = append(subscribes, sub)
subMgr.subscribes[sub.destination] = subscribes
}
err := executeSub(sub)
return err
}
func executeSub(sub *Subscribe) error {
// 为每一个Subscribe开启一个looper任务
fileQueue, err := subMgr.store.OpenFileQueue(sub.destination)
if err != nil {
return err
}
poller := NewTaskPooler(fileQueue)
looper := NewTaskLooper(10000, sub)
sub.taskLooper = looper
driver := NewSubscribeDriver("SubscribeDriver_"+sub.id, poller, looper)
driver.Start()
return nil
}
func UnSubscribe(frame *stomp.Frame) *stomp.Frame {
/// validate
dest := frame.GetHeader(stomp.StompHeaders.Subscription)
subId := frame.GetHeader(stomp.StompHeaders.Id)
ack := frame.GetHeader(stomp.StompHeaders.Ack)
var ackFrame *stomp.Frame
if ack == "client" || ack == "client-individual" {
ackFrame = stomp.NewAckFrame(subId)
}
if dest == "" {
ackFrame = stomp.NewErrorFrame("Headers missing :'destination'", "UnSubscribe frame header['destination']: must not be null", frame)
}
if subId == "" {
ackFrame = stomp.NewErrorFrame("Headers missing :'id'", "UnSubscribe frame header['id']: must not be null", frame)
}
delete(subMgr.subscribes, subId)
return ackFrame
}
func GetSubscribe(subId string, destination string) *Subscribe {
subscribes := subMgr.subscribes[destination]
for _, value := range subscribes {
id := value.id
if id == subId {
return value
}
}
return nil
}
type Subscribe struct {
id string
destination string
channel *Channel
taskLooper *TaskLooper
}
func (sub *Subscribe) GetChannel() *Channel {
return sub.channel
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/ypconfig/cancer.git
git@gitee.com:ypconfig/cancer.git
ypconfig
cancer
cancer
master

搜索帮助

0d507c66 1850385 C8b1a773 1850385