1 Star 0 Fork 0

一介农夫/logging

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
default_observer_impl_production_consumption.go 4.93 KB
一键复制 编辑 原始数据 按行查看 历史
package logging
import (
"flag"
"fmt"
"path"
"sort"
"strings"
"gitee.com/bambuo/lock"
"github.com/pkg/errors"
)
// 作者: chenchong
// 时间: 2021/7/15 上午10:41
// 版本: 1.0
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//观察者
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
var topics = []string{TopicInfo, TopicWarn, TopicError}
var capacity = flag.Int("logging.history.capacity", 10, "日志生产者暂存容量,给日志消费者接入时读取的历史条目数")
//匹配topic
//TODO 暂时匹配的为
// xxx/* 可以匹配如下 xxx/info xxx/warn xxx/error 等,只有一个 / 分割的,如果未来出现 xxx/*/info 将无法满足需求,需要重新修正
func matchTopic(topic string, topics []string) []string {
var _topics_ []string
//不与*匹配,直接返回
if !strings.HasSuffix(topic, `*`) {
return append(_topics_, topic)
}
//得到高级域
scope := topic[:strings.LastIndex(topic, `/`)]
//得到确定域
for _, v := range topics {
_topic_ := path.Join(scope, v)
//规则化topic与确定域一样就跳过
//之前已获取过了
if _topic_ == topic {
continue
}
_topics_ = append(_topics_, _topic_)
}
return _topics_
}
//DefaultProductionConsumptionObserver 默认观察者实现
type DefaultProductionConsumptionObserver struct {
lock.RecursiveMutex
histories map[string][]string
//所有主题及关联的消费者
consumers map[string]map[string]Consumer
producers map[string]map[string]Producer
}
//NewDefaultProductionConsumptionObserver 创建默认观察者
func NewDefaultProductionConsumptionObserver() *DefaultProductionConsumptionObserver {
return &DefaultProductionConsumptionObserver{
histories: map[string][]string{},
consumers: map[string]map[string]Consumer{},
producers: map[string]map[string]Producer{},
}
}
//Subscribe 订阅
func (do *DefaultProductionConsumptionObserver) Subscribe(topic, mask string, consumer Consumer) error {
do.Lock()
defer do.Unlock()
//匹配topic,理论上永远不可能返回nil
_topics_ := matchTopic(topic, topics)
var histories []string
for _, v := range _topics_ {
histories = append(histories, do.History(v)...)
}
sort.Strings(histories)
//发送历史
for _, v := range histories {
if len(v) == 0 {
continue
}
consumer.Consumption(topic, v)
}
//添加到订阅
consumers := do.consumers[topic]
if nil == consumers {
consumers = map[string]Consumer{}
}
if nil != consumers[mask] {
return errors.New(`Consumer already exists`)
}
consumers[mask] = consumer
do.consumers[topic] = consumers
return nil
}
//Unsubscribe 退订
func (do *DefaultProductionConsumptionObserver) Unsubscribe(topic, mask string) {
do.Lock()
defer do.Unlock()
consumers := do.consumers[topic]
if nil == consumers {
return
}
delete(consumers, mask)
do.consumers[topic] = consumers
}
// Registration 消费者注册
func (do *DefaultProductionConsumptionObserver) Registration(topic, mask string, producer Producer) error {
do.Lock()
defer do.Unlock()
producers := do.producers[topic]
if nil == producers {
producers = map[string]Producer{}
}
if nil != producers[mask] {
return errors.New(`Consumer already exists`)
}
producers[mask] = producer
do.producers[topic] = producers
return nil
}
// Revocation 生产者撤销
func (do *DefaultProductionConsumptionObserver) Revocation(topic, mask string) {
do.Lock()
defer do.Unlock()
producers := do.producers[topic]
if nil == producers {
return
}
delete(producers, mask)
do.producers[topic] = producers
}
// Consumption 消费
func (do *DefaultProductionConsumptionObserver) Consumption(topic string, data ...interface{}) {
do.Lock()
defer do.Unlock()
//保留10条历史
do.setHistory(topic, data...)
//分发
//明确的
consumers := do.consumers[topic]
//*匹配的
scope := topic[:strings.LastIndex(topic, `/`)]
for k, v := range do.consumers[path.Join(scope, `*`)] {
if nil == consumers {
consumers = map[string]Consumer{}
}
consumers[k] = v
}
if nil == consumers {
return
}
for _, v := range consumers {
v.Consumption(topic, data...)
}
}
func (do *DefaultProductionConsumptionObserver) setHistory(topic string, data ...interface{}) {
do.Lock()
defer do.Unlock()
//记录
histories := do.histories[topic]
if nil == histories {
histories = make([]string, *capacity, *capacity)
}
for i := 0; i < cap(histories)-1; i++ {
histories[i] = histories[i+1]
}
history := fmt.Sprint(data...)
if strings.HasPrefix(history, `[`) {
history = strings.ReplaceAll(history, `[`, ``)
}
if strings.HasSuffix(history, `]`) {
history = strings.ReplaceAll(history, `]`, ``)
}
histories[cap(histories)-1] = history
do.histories[topic] = histories
}
func (do *DefaultProductionConsumptionObserver) History(topic string) []string {
do.Lock()
defer do.Unlock()
histories := do.histories[topic]
return histories[:]
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/bambuo/logging.git
git@gitee.com:bambuo/logging.git
bambuo
logging
logging
main

搜索帮助

0d507c66 1850385 C8b1a773 1850385