1 Star 1 Fork 0

初八/cancer

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
handler.go 2.44 KB
一键复制 编辑 原始数据 按行查看 历史
初八 提交于 2022-12-12 19:37 . 功能完善
package cancer
import (
"cancer/stomp"
"container/list"
)
type Handler func(cxt *HandlerContext, msg Msg)
type Msg interface{}
type HandlerContext struct {
channel *Channel
pipeline *Pipeline
current *list.Element
}
func (cxt *HandlerContext) OnConnected(messageFrame *stomp.Frame) {
err := stomp.ValidateServerCommand(messageFrame.Command)
if err != nil {
logger.Errorf("accept message error:%s:", err)
return
}
stringMsg := messageFrame.Serialize()
cxt.channel.onConnect()
cxt.channel.Send([]byte(stringMsg))
}
func (cxt *HandlerContext) OnAck(messageFrame *stomp.Frame) {
if messageFrame == nil {
return
}
stringMsg := messageFrame.Serialize()
cxt.channel.Send([]byte(stringMsg))
}
func (cxt *HandlerContext) fireHandler(msg Msg) {
next := cxt.current.Next()
if next == nil {
return
}
cxt.current = next
handler := cxt.current.Value.(Handler)
handler(cxt, msg)
}
func StompEncoderHandler(cxt *HandlerContext, msg Msg) {
frame, err := stomp.Deserialize(msg.(string))
if err != nil {
logger.Errorf("channel read a message of frame error :{%s}", err)
}
cxt.fireHandler(frame)
}
func FrameDecoderHandler(cxt *HandlerContext, msg Msg) {
frame := msg.(*stomp.Frame)
channel := cxt.channel
sessionId := channel.SessionId
switch frame.Command {
case stomp.FrameC.Connect:
/*
* 这里需要做一些协议支持处理 http://stomp.github.io/stomp-specification-1.1.html#Protocol_Negotiation
* 可接受的协议 Done.
* host验证
* login
* passcode
*/
connectedFrame := stomp.NewConnectedFrame(frame, sessionId)
cxt.OnConnected(connectedFrame)
break
case stomp.FrameC.Subscribe:
returnFrame := OnSubscribe(frame, channel)
cxt.OnAck(returnFrame)
break
case stomp.FrameC.Send:
// 解析数据
cxt.fireHandler(msg)
break
case stomp.FrameC.Unsubscribe:
returnFrame := UnSubscribe(frame)
cxt.OnAck(returnFrame)
break
default:
logger.Warn("server can not support data frame on this command:%s", frame.Command)
break
}
return
}
func MessageEnqueueHandler(cxt *HandlerContext, msg Msg) {
frame := msg.(*stomp.Frame)
desc := frame.GetHeader(stomp.StompHeaders.Destination)
body := frame.Body()
message := NewPubMessage(desc, body)
OnMessage(message)
}
func StompDecoderHandler(cxt *HandlerContext, msg Msg) {
frame := msg.(stomp.Frame)
msg, err := stomp.Deserialize(frame.Serialize())
if err != nil {
logger.Errorf("channel read a message of frame error :{%s}", err)
}
cxt.fireHandler(msg)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/ypconfig/cancer.git
git@gitee.com:ypconfig/cancer.git
ypconfig
cancer
cancer
master

搜索帮助