代码拉取完成,页面将自动刷新
package neffos
import (
"bytes"
"encoding/json"
"errors"
"strconv"
"strings"
"time"
)
// The Message is the structure which describes the incoming and outcoming data.
// Emitter's "body" argument is the `Message.Body` field.
// Emitter's return non-nil error is the `Message.Err` field.
// If native message sent then the `Message.Body` is filled with the body and
// when incoming native message then the `Message.Event` is the `OnNativeMessage`,
// native messages are allowed only when an empty namespace("") and its `OnNativeMessage` callback are present.
//
// The the raw data received/sent structured following this order:
// <wait()>;
// <namespace>;
// <room>;
// <event>;
// <isError(0-1)>;
// <isNoOp(0-1)>;
// <body||error_message>
//
// Internal `serializeMessage` and
// exported `DeserializeMessage` functions
// do the job on `Conn#Write`, `NSConn#Emit` and `Room#Emit` calls.
type Message struct {
wait string
// The Namespace that this message sent to/received from.
Namespace string
// The Room that this message sent to/received from.
Room string
// The Event that this message sent to/received from.
Event string
// The actual body of the incoming/outcoming data.
Body []byte
// The Err contains any message's error, if any.
// Note that server-side and client-side connections can return an error instead of a message from each event callbacks,
// except the clients's force Disconnect which its local event doesn't matter when disconnected manually.
Err error
// if true then `Err` is filled by the error message and
// the last segment of incoming/outcoming serialized message is the error message instead of the body.
isError bool
isNoOp bool
isInvalid bool
// the CONN ID, filled automatically if `Server#Broadcast` first parameter of sender connection's ID is not empty,
// not exposed to the subscribers (rest of the clients).
// This is the ID across neffos servers when scale.
from string
// When sent by the same connection of the current running server instance.
// This field is serialized/deserialized but it's clean on sending or receiving from a client
// and it's only used on StackExchange feature.
// It's serialized as the first parameter, instead of wait signal, if incoming starts with 0x.
FromExplicit string // the exact Conn's pointer in this server instance.
// Reports whether this message is coming from a stackexchange.
// This field is not exposed and it's not serialized at all, ~local-use only~.
//
// The "wait" field can determinate if this message is coming from a stackexchange using its second char,
// This value set based on "wait" on deserialization when coming from remote side.
// Only server-side can actually set it.
FromStackExchange bool
// To is the connection ID of the receiver, used only when `Server#Broadcast` is called, indeed when we only need to send a message to a single connection.
// The Namespace, Room are still respected at all.
//
// However, sending messages to a group of connections is done by the `Room` field for groups inside a namespace or just `Namespace` field as usual.
// This field is not filled on sending/receiving.
To string
// True when event came from local (i.e client if running client) on force disconnection,
// i.e OnNamespaceDisconnect and OnRoomLeave when closing a conn.
// This field is not filled on sending/receiving.
// Err does not matter and never sent to the other side.
IsForced bool
// True when asking the other side and fire the respond's event (which matches the sent for connect/disconnect/join/leave),
// i.e if a client (or server) onnection want to connect
// to a namespace or join to a room.
// Should be used rarely, state can be checked by `Conn#IsClient() bool`.
// This field is not filled on sending/receiving.
IsLocal bool
// True when user define it for writing, only its body is written as raw native websocket message, namespace, event and all other fields are empty.
// The receiver should accept it on the `OnNativeMessage` event.
// This field is not filled on sending/receiving.
IsNative bool
// Useful rarely internally on `Conn#Write` namespace and rooms checks, i.e `Conn#DisconnectAll` and `NSConn#RemoveAll`.
// If true then the writer's checks will not lock connectedNamespacesMutex or roomsMutex again. May be useful in the future, keep that solution.
locked bool
// if server or client should write using Binary message or if the incoming message was readen as binary.
SetBinary bool
}
func (m *Message) isConnect() bool {
return m.Event == OnNamespaceConnect
}
func (m *Message) isDisconnect() bool {
return m.Event == OnNamespaceDisconnect
}
func (m *Message) isRoomJoin() bool {
return m.Event == OnRoomJoin
}
func (m *Message) isRoomLeft() bool {
return m.Event == OnRoomLeft
}
// Serialize returns this message's transport format.
func (m Message) Serialize() []byte {
return serializeMessage(m)
}
type (
// MessageObjectMarshaler is an optional interface that "objects"
// can implement to customize their byte representation, see `Object` package-level function.
MessageObjectMarshaler interface {
Marshal() ([]byte, error)
}
// MessageObjectUnmarshaler is an optional interface that "objects"
// can implement to customize their structure, see `Message.Object` method.
MessageObjectUnmarshaler interface {
Unmarshal(body []byte) error
}
)
var (
// DefaultMarshaler is a global, package-level alternative for `MessageObjectMarshaler`.
// It's used when the `Marshal.v` parameter is not a `MessageObjectMarshaler`.
DefaultMarshaler = json.Marshal
// DefaultUnmarshaler is a global, package-level alternative for `MessageObjectMarshaler`.
// It's used when the `Message.Unmarshal.outPtr` parameter is not a `MessageObjectUnmarshaler`.
DefaultUnmarshaler = json.Unmarshal
)
// Marshal marshals the "v" value and returns a Message's Body.
// If the "v" value is `MessageObjectMarshaler` then it returns the result of its `Marshal` method,
// otherwise the DefaultMarshaler will be used instead.
// Errors are pushed to the result, use the object's Marshal method to catch those when necessary.
func Marshal(v interface{}) []byte {
if v == nil {
panic("nil assigment")
}
var (
body []byte
err error
)
if marshaler, ok := v.(MessageObjectMarshaler); ok {
body, err = marshaler.Marshal()
} else {
body, err = DefaultMarshaler(v)
}
if err != nil {
return []byte(err.Error())
}
return body
}
// Unmarshal unmarshals this Message's body to the "outPtr".
// The "outPtr" must be a pointer to a value that can customize its decoded value
// by implementing the `MessageObjectUnmarshaler`, otherwise the `DefaultUnmarshaler` will be used instead.
func (m *Message) Unmarshal(outPtr interface{}) error {
if outPtr == nil {
panic("nil assigment")
}
if unmarshaler, ok := outPtr.(MessageObjectUnmarshaler); ok {
return unmarshaler.Unmarshal(m.Body)
}
return DefaultUnmarshaler(m.Body, outPtr)
}
const (
waitIsConfirmationPrefix = '#'
waitComesFromClientPrefix = '$'
waitComesFromStackExchange = '!'
)
// IsWait reports whether this message waits for a response back.
func (m *Message) IsWait(isClientConn bool) bool {
if m.wait == "" {
return false
}
if m.wait[0] == waitIsConfirmationPrefix {
// true even if it's not client-client but it's a confirmation message.
return true
}
if m.wait[0] == waitComesFromClientPrefix {
return isClientConn
}
return true
}
// ClearWait clears the wait token, rarely used.
func (m *Message) ClearWait() bool {
if m.FromExplicit == "" && m.wait != "" {
m.wait = ""
return true
}
return false
}
func genWait(isClientConn bool) string {
now := time.Now().UnixNano()
wait := strconv.FormatInt(now, 10)
if isClientConn {
wait = string(waitComesFromClientPrefix) + wait
}
return wait
}
// func genWaitConfirmation(wait string) string {
// return string(waitIsConfirmationPrefix) + wait
// }
func genWaitStackExchange(wait string) string {
if len(wait) < 2 {
return ""
}
// This is the second special character.
// If found, it is removed on the deserialization
// and Message.FromStackExchange is set to true.
return string(wait[0]+waitComesFromStackExchange) + wait[1:]
}
var (
trueByte = []byte{'1'}
falseByte = []byte{'0'}
messageSeparatorString = ";"
messageSeparator = []byte(messageSeparatorString)
// we use this because has zero chance to be part of end-developer's Message.Namespace, Room, Event, To and Err fields,
// semicolon has higher probability to exists on those values. See `escape` and `unescape`.
messageFieldSeparatorReplacement = "@%!semicolon@%!"
)
// called on `serializeMessage` to all message's fields except the body (and error).
func escape(s string) string {
if len(s) == 0 {
return s
}
return strings.Replace(s, messageSeparatorString, messageFieldSeparatorReplacement, -1)
}
// called on `DeserializeMessage` to all message's fields except the body (and error).
func unescape(s string) string {
if len(s) == 0 {
return s
}
return strings.Replace(s, messageFieldSeparatorReplacement, messageSeparatorString, -1)
}
func serializeMessage(msg Message) (out []byte) {
if msg.IsNative && msg.wait == "" {
out = msg.Body
} else {
if msg.FromExplicit != "" {
if msg.wait != "" {
// this should never happen unless manual set of FromExplicit by end-developer which is forbidden by the higher level calls.
panic("msg.wait and msg.FromExplicit cannot work together")
}
msg.wait = msg.FromExplicit
}
out = serializeOutput(msg.wait, escape(msg.Namespace), escape(msg.Room), escape(msg.Event), msg.Body, msg.Err, msg.isNoOp)
}
return out
}
func serializeOutput(wait, namespace, room, event string,
body []byte,
err error,
isNoOp bool,
) []byte {
var (
isErrorByte = falseByte
isNoOpByte = falseByte
waitByte = []byte{}
)
if err != nil {
if b, ok := isReply(err); ok {
body = b
} else {
body = []byte(err.Error())
isErrorByte = trueByte
}
}
if isNoOp {
isNoOpByte = trueByte
}
if wait != "" {
waitByte = []byte(wait)
}
msg := bytes.Join([][]byte{ // this number of fields should match the deserializer's, see `validMessageSepCount`.
waitByte,
[]byte(namespace),
[]byte(room),
[]byte(event),
isErrorByte,
isNoOpByte,
body,
}, messageSeparator)
return msg
}
// DeserializeMessage accepts a serialized message []byte
// and returns a neffos Message.
// When allowNativeMessages only Body is filled and check about message format is skipped.
func DeserializeMessage(msgTyp MessageType, b []byte, allowNativeMessages, shouldHandleOnlyNativeMessages bool) Message {
wait, namespace, room, event, body, err, isNoOp, isInvalid := deserializeInput(b, allowNativeMessages, shouldHandleOnlyNativeMessages)
fromExplicit := ""
if isServerConnID(wait) {
fromExplicit = wait
wait = ""
}
fromStackExchange := len(wait) > 2 && wait[1] == waitComesFromStackExchange
if fromStackExchange {
// remove the second special char, we need to reform it,
// this wait token is compared to the waiter side as it's without the information about stackexchnage.
wait = string(wait[0]) + wait[2:]
}
return Message{
wait: wait,
Namespace: unescape(namespace),
Room: unescape(room),
Event: unescape(event),
Body: body,
Err: err,
isError: err != nil,
isNoOp: isNoOp,
isInvalid: isInvalid,
from: "",
FromExplicit: fromExplicit,
FromStackExchange: fromStackExchange,
To: "",
IsForced: false,
IsLocal: false,
IsNative: allowNativeMessages && event == OnNativeMessage,
locked: false,
SetBinary: msgTyp == BinaryMessage,
}
}
const validMessageSepCount = 7
var knownErrors = []error{ErrBadNamespace, ErrBadRoom, ErrWrite, ErrInvalidPayload}
// RegisterKnownError registers an error that it's "known" to both server and client sides.
// This simply adds an error to a list which, if its static text matches
// an incoming error text then its value is set to the `Message.Error` field on the events callbacks.
//
// For dynamic text error, there is a special case which if
// the error "err" contains
// a `ResolveError(errorText string) bool` method then,
// it is used to report whether this "err" is match to the incoming error text.
func RegisterKnownError(err error) {
for _, knownErr := range knownErrors {
if err == knownErr {
return
}
}
knownErrors = append(knownErrors, err)
}
func resolveError(errorText string) error {
for _, knownErr := range knownErrors {
if resolver, ok := knownErr.(interface {
ResolveError(errorText string) bool
}); ok {
if resolver.ResolveError(errorText) {
return knownErr
}
}
if knownErr.Error() == errorText {
return knownErr
}
}
return errors.New(errorText)
}
func deserializeInput(b []byte, allowNativeMessages, shouldHandleOnlyNativeMessages bool) ( // go-lint: ignore line
wait,
namespace,
room,
event string,
body []byte,
err error,
isNoOp bool,
isInvalid bool,
) {
if len(b) == 0 {
isInvalid = true
return
}
if shouldHandleOnlyNativeMessages {
event = OnNativeMessage
body = b
return
}
// Note: Go's SplitN returns the remainder in[6] but JavasSript's string.split behaves differently.
dts := bytes.SplitN(b, messageSeparator, validMessageSepCount)
if len(dts) != validMessageSepCount {
if !allowNativeMessages {
isInvalid = true
return
}
event = OnNativeMessage
body = b
return
}
wait = string(dts[0])
namespace = string(dts[1])
room = string(dts[2])
event = string(dts[3])
isError := bytes.Equal(dts[4], trueByte)
isNoOp = bytes.Equal(dts[5], trueByte)
if b := dts[6]; len(b) > 0 {
if isError {
errorText := string(b)
err = resolveError(errorText)
} else {
body = b // keep it like that.
}
}
return
}
func genEmptyReplyToWait(wait string) []byte {
return append([]byte(wait), bytes.Repeat(messageSeparator, validMessageSepCount-1)...)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。