2 Star 0 Fork 0

mirrors_influxdata/kapacitor

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
node.go 9.28 KB
一键复制 编辑 原始数据 按行查看 历史
package kapacitor
import (
"bytes"
"expvar"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/edge"
kexpvar "github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/server/vars"
"github.com/influxdata/kapacitor/timer"
"github.com/pkg/errors"
)
const (
statErrorCount = "errors"
statCardinalityGauge = "working_cardinality"
statAverageExecTime = "avg_exec_time_ns"
)
type NodeDiagnostic interface {
Error(msg string, err error, ctx ...keyvalue.T)
// AlertNode
AlertTriggered(level alert.Level, id string, message string, rows *models.Row)
// AutoscaleNode
SettingReplicas(new int, old int, id string)
// QueryNode
StartingBatchQuery(q string)
// LogNode
LogPointData(key, prefix string, data edge.PointMessage)
LogBatchData(key, prefix string, data edge.BufferedBatchMessage)
//UDF
UDFLog(s string)
}
type nodeDiagnostic struct {
NodeDiagnostic
node *node
}
func newNodeDiagnostic(n *node, diag NodeDiagnostic) *nodeDiagnostic {
return &nodeDiagnostic{
NodeDiagnostic: diag,
node: n,
}
}
func (n *nodeDiagnostic) Error(msg string, err error, ctx ...keyvalue.T) {
n.node.incrementErrorCount()
if !n.node.quiet {
n.NodeDiagnostic.Error(msg, err, ctx...)
}
}
// A node that can be in an executor.
type Node interface {
pipeline.Node
addParentEdge(edge.StatsEdge)
init(quiet bool)
// start the node and its children
start(snapshot []byte)
stop()
// snapshot running state
snapshot() ([]byte, error)
restore(snapshot []byte) error
// wait for the node to finish processing and return any errors
Wait() error
// link specified child
linkChild(c Node) error
addParent(p Node)
// close children edges
closeChildEdges()
// abort parent edges
abortParentEdges()
// executing dot
edot(buf *bytes.Buffer, labels bool)
nodeStatsByGroup() map[models.GroupID]nodeStats
collectedCount() int64
emittedCount() int64
incrementErrorCount()
stats() map[string]interface{}
}
// implementation of Node
type node struct {
pipeline.Node
et *ExecutingTask
parents []Node
children []Node
runF func(snapshot []byte) error
stopF func()
errCh chan error
err error
finishedMu sync.Mutex
finished bool
ins []edge.StatsEdge
outs []edge.StatsEdge
diag NodeDiagnostic
timer timer.Timer
statsKey string
statMap *kexpvar.Map
quiet bool
nodeErrors *kexpvar.Int
}
func (n *node) addParentEdge(e edge.StatsEdge) {
n.ins = append(n.ins, e)
}
func (n *node) abortParentEdges() {
for _, in := range n.ins {
in.Abort()
}
}
func (n *node) init(quiet bool) {
tags := map[string]string{
"task": n.et.Task.ID,
"node": n.Name(),
"type": n.et.Task.Type.String(),
"kind": n.Desc(),
}
n.statsKey, n.statMap = vars.NewStatistic("nodes", tags)
avgExecVar := &MaxDuration{}
n.statMap.Set(statAverageExecTime, avgExecVar)
n.nodeErrors = &kexpvar.Int{}
n.statMap.Set(statErrorCount, n.nodeErrors)
n.diag = newNodeDiagnostic(n, n.diag)
n.statMap.Set(statCardinalityGauge, kexpvar.NewIntFuncGauge(nil))
n.timer = n.et.tm.TimingService.NewTimer(avgExecVar)
n.errCh = make(chan error, 1)
n.quiet = quiet
}
func (n *node) start(snapshot []byte) {
go func() {
var err error
defer func() {
// Always close children edges
n.closeChildEdges()
// Propagate error up
if err != nil {
// Handle panic in runF
r := recover()
if r != nil {
trace := make([]byte, 512)
n := runtime.Stack(trace, false)
err = fmt.Errorf("%s: Trace:%s", r, string(trace[:n]))
}
n.abortParentEdges()
n.diag.Error("node failed", err)
err = errors.Wrap(err, n.Name())
}
n.errCh <- err
}()
// Run node
err = n.runF(snapshot)
}()
}
func (n *node) stop() {
if n.stopF != nil {
n.stopF()
}
vars.DeleteStatistic(n.statsKey)
}
// no-op snapshot
func (n *node) snapshot() (b []byte, err error) { return }
// no-op restore
func (n *node) restore([]byte) error { return nil }
func (n *node) Wait() error {
n.finishedMu.Lock()
defer n.finishedMu.Unlock()
if !n.finished {
n.finished = true
n.err = <-n.errCh
}
return n.err
}
func (n *node) addChild(c Node) (edge.StatsEdge, error) {
if n.Provides() != c.Wants() {
return nil, fmt.Errorf("cannot add child mismatched edges: %s:%s -> %s:%s", n.Name(), n.Provides(), c.Name(), c.Wants())
}
if n.Provides() == pipeline.NoEdge {
return nil, fmt.Errorf("cannot add child no edge expected: %s:%s -> %s:%s", n.Name(), n.Provides(), c.Name(), c.Wants())
}
n.children = append(n.children, c)
d := n.et.tm.diag.WithEdgeContext(n.et.Task.ID, n.Name(), c.Name())
edge := newEdge(n.et.Task.ID, n.Name(), c.Name(), n.Provides(), defaultEdgeBufferSize, d)
if edge == nil {
return nil, fmt.Errorf("unknown edge type %s", n.Provides())
}
c.addParentEdge(edge)
return edge, nil
}
func (n *node) addParent(p Node) {
n.parents = append(n.parents, p)
}
func (n *node) linkChild(c Node) error {
// add child
edge, err := n.addChild(c)
if err != nil {
return err
}
// add parent
c.addParent(n)
// store edge to child
n.outs = append(n.outs, edge)
return nil
}
func (n *node) closeChildEdges() {
for _, child := range n.outs {
child.Close()
}
}
func (n *node) edot(buf *bytes.Buffer, labels bool) {
if labels {
// Print all stats on node.
buf.WriteString(
fmt.Sprintf("\n%s [xlabel=\"",
n.Name(),
),
)
i := 0
n.statMap.DoSorted(func(kv expvar.KeyValue) {
if i != 0 {
// NOTE: A literal \r, indicates a newline right justified in graphviz syntax.
buf.WriteString(`\r`)
}
i++
var s string
if sv, ok := kv.Value.(kexpvar.StringVar); ok {
s = sv.StringValue()
} else {
s = kv.Value.String()
}
buf.WriteString(
fmt.Sprintf("%s=%s",
kv.Key,
s,
),
)
})
buf.Write([]byte("\"];\n"))
for i, c := range n.children {
buf.Write([]byte(
fmt.Sprintf("%s -> %s [label=\"processed=%d\"];\n",
n.Name(),
c.Name(),
n.outs[i].Collected(),
),
))
}
} else {
// Print all stats on node.
buf.Write([]byte(
fmt.Sprintf("\n%s [",
n.Name(),
),
))
n.statMap.DoSorted(func(kv expvar.KeyValue) {
var s string
if sv, ok := kv.Value.(kexpvar.StringVar); ok {
s = sv.StringValue()
} else {
s = kv.Value.String()
}
buf.Write([]byte(
fmt.Sprintf("%s=\"%s\" ",
kv.Key,
s,
),
))
})
buf.Write([]byte("];\n"))
for i, c := range n.children {
buf.Write([]byte(
fmt.Sprintf("%s -> %s [processed=\"%d\"];\n",
n.Name(),
c.Name(),
n.outs[i].Collected(),
),
))
}
}
}
// node collected count is the sum of emitted counts of parent edges
func (n *node) collectedCount() (count int64) {
for _, in := range n.ins {
count += in.Emitted()
}
return
}
// node emitted count is the sum of collected counts of children edges
func (n *node) emittedCount() (count int64) {
for _, out := range n.outs {
count += out.Collected()
}
return
}
// node increment error count increments a nodes error_count stat
func (n *node) incrementErrorCount() {
n.nodeErrors.Add(1)
}
func (n *node) stats() map[string]interface{} {
stats := make(map[string]interface{})
n.statMap.Do(func(kv expvar.KeyValue) {
switch v := kv.Value.(type) {
case kexpvar.IntVar:
stats[kv.Key] = v.IntValue()
case kexpvar.FloatVar:
stats[kv.Key] = v.FloatValue()
default:
stats[kv.Key] = v.String()
}
})
return stats
}
// Statistics for a node
type nodeStats struct {
Fields models.Fields
Tags models.Tags
Dimensions models.Dimensions
}
// Return a copy of the current node statistics.
// If if no groups have been seen yet a NilGroup will be created with zero stats.
func (n *node) nodeStatsByGroup() (stats map[models.GroupID]nodeStats) {
// Get the counts for just one output.
stats = make(map[models.GroupID]nodeStats)
if len(n.outs) > 0 {
n.outs[0].ReadGroupStats(func(g *edge.GroupStats) {
stats[g.GroupInfo.ID] = nodeStats{
Fields: models.Fields{
// A node's emitted count is the collected count of its output.
"emitted": g.Collected,
},
Tags: g.GroupInfo.Tags,
Dimensions: g.GroupInfo.Dimensions,
}
})
}
if len(stats) == 0 {
// If we have no groups/stats add nil group with emitted = 0
stats[""] = nodeStats{
Fields: models.Fields{
"emitted": int64(0),
},
}
}
return
}
// MaxDuration is a 64-bit int variable representing a duration in nanoseconds,that satisfies the expvar.Var interface.
// When setting a value it will only be set if it is greater than the current value.
type MaxDuration struct {
d int64
setter timer.Setter
}
func (v *MaxDuration) String() string {
return `"` + v.StringValue() + `"`
}
func (v *MaxDuration) StringValue() string {
return time.Duration(v.IntValue()).String()
}
func (v *MaxDuration) IntValue() int64 {
return atomic.LoadInt64(&v.d)
}
// Set sets value if it is greater than current value.
// If set was successful and a setter exists, will pass on value to setter.
func (v *MaxDuration) Set(next int64) {
for {
cur := v.IntValue()
if next > cur {
if atomic.CompareAndSwapInt64(&v.d, cur, next) {
if v.setter != nil {
v.setter.Set(next)
}
return
}
} else {
return
}
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mirrors_influxdata/kapacitor.git
git@gitee.com:mirrors_influxdata/kapacitor.git
mirrors_influxdata
kapacitor
kapacitor
master

搜索帮助