代码拉取完成,页面将自动刷新
同步操作将从 YoMo/yomo 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
package yomo
import (
"context"
"github.com/yomorun/yomo/core"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/pkg/id"
)
// Source is responsible for sending data to yomo.
type Source interface {
// Close will close the connection to YoMo-Zipper.
Close() error
// Connect to YoMo-Zipper.
Connect() error
// Write the data to directed downstream.
Write(tag uint32, data []byte) error
// WriteWithTarget writes data to sfn instance with specified target.
WriteWithTarget(tag uint32, data []byte, target string) error
// SetErrorHandler set the error handler function when server error occurs
SetErrorHandler(fn func(err error))
}
// YoMo-Source
type yomoSource struct {
name string
zipperAddr string
client *core.Client
}
var _ Source = &yomoSource{}
// NewSource create a yomo-source
func NewSource(name, zipperAddr string, opts ...SourceOption) Source {
clientOpts := make([]core.ClientOption, len(opts))
for k, v := range opts {
clientOpts[k] = core.ClientOption(v)
}
client := core.NewClient(name, zipperAddr, core.ClientTypeSource, clientOpts...)
client.Logger = client.Logger.With(
"component", core.ClientTypeSource.String(),
"source_id", client.ClientID(),
"source_name", client.Name(),
"zipper_addr", zipperAddr,
)
return &yomoSource{
name: name,
zipperAddr: zipperAddr,
client: client,
}
}
// Close will close the connection to YoMo-Zipper.
func (s *yomoSource) Close() error {
if err := s.client.Close(); err != nil {
s.client.Logger.Error("failed to close the source", "err", err)
return err
}
s.client.Logger.Debug("the source is closed")
return nil
}
// Connect to YoMo-Zipper.
func (s *yomoSource) Connect() error {
return s.client.Connect(context.Background())
}
// Write writes data with specified tag.
func (s *yomoSource) Write(tag uint32, data []byte) error {
md, deferFunc := core.SourceMetadata(s.client.ClientID(), id.New(), s.name, s.client.TracerProvider(), s.client.Logger)
defer deferFunc()
mdBytes, err := md.Encode()
// metadata
if err != nil {
return err
}
f := &frame.DataFrame{
Tag: tag,
Metadata: mdBytes,
Payload: data,
}
s.client.Logger.Debug("source write", "tag", tag, "data", data)
return s.client.WriteFrame(f)
}
// WritePayload writes `yomo.Payload` with specified tag.
func (s *yomoSource) WriteWithTarget(tag uint32, data []byte, target string) error {
if data == nil {
return nil
}
md, deferFunc := core.SourceMetadata(s.client.ClientID(), id.New(), s.name, s.client.TracerProvider(), s.client.Logger)
defer deferFunc()
if target != "" {
core.SetMetadataTarget(md, target)
}
mdBytes, err := md.Encode()
if err != nil {
return err
}
f := &frame.DataFrame{
Tag: tag,
Metadata: mdBytes,
Payload: data,
}
s.client.Logger.Debug("source write with target", "tag", tag, "data", data, "target", target)
return s.client.WriteFrame(f)
}
// SetErrorHandler set the error handler function when server error occurs
func (s *yomoSource) SetErrorHandler(fn func(err error)) {
s.client.SetErrorHandler(fn)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。