代码拉取完成,页面将自动刷新
package main
import (
"bytes"
"encoding/json"
"errors"
"github.com/tidwall/gjson"
"io"
"log"
"net/http"
"strings"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
var errCode = []int{1000, 1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012, 1013, 1014, 1015}
var upgrader = websocket.Upgrader{
// 这个是校验请求来源
// 在这里我们不做校验,直接return true
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type Client struct {
Key string
Conn *websocket.Conn
Lock sync.RWMutex
IsActive bool
wsctx *WebsocketManager
}
func (c *Client) Read() { // 理论上保留这个 客户端到服务端的后台读者 只能单个协程读conn
var flag bool = true // 增加一个局部的变量,缓存连接的活动状态,减少一次锁获取
for {
if !flag {
return
}
msgtype, msg, err := c.Conn.ReadMessage()
if msgtype == websocket.TextMessage {
log.Printf("get client message [%s] from %s\n", string(msg), c.Conn.RemoteAddr())
}
key := c.Key
if flag = c.GetActive(); err != nil && flag {
log.Printf("read msg error,%v", err)
if websocket.IsCloseError(err, errCode...) {
c.wsctx.DeleteConnByKey(key)
}
return
}
}
}
func (c *Client) SetDeadStaus() {
c.Lock.Lock()
defer c.Lock.Unlock()
c.IsActive = false
}
func (c *Client) GetActive() bool {
c.Lock.RLock()
defer c.Lock.RUnlock()
return c.IsActive
}
type syncMap struct {
Mx sync.RWMutex
PortMap map[string][]string
}
func (m *syncMap) RemoveAddr(addr string) bool {
split := strings.Split(addr, ":")
if len(split) < 2 {
return false
}
ip := split[0]
port := split[1]
m.Mx.Lock()
defer m.Mx.Unlock()
ports, ok := m.PortMap[ip]
if !ok || len(ports) == 0 {
return false
}
nst := 0
for idx, size := 0, len(ports); idx < size; idx++ {
if ports[idx] != port {
nst++
} else {
ports[nst] = ports[idx]
}
}
if nst != len(port) {
return false
}
ports = ports[:nst]
return true
}
func (m *syncMap) AddAddr(addr string) {
split := strings.Split(addr, ":")
if len(split) < 2 {
return
}
ip := split[0]
port := split[1]
m.Mx.Lock()
defer m.Mx.Unlock()
ports, ok := m.PortMap[ip]
if !ok {
m.PortMap[ip] = []string{port}
return
}
ports = append(ports, port)
return
}
func (m *syncMap) FindAddr(addr string) []string {
m.Mx.RLock()
defer m.Mx.RUnlock()
ports, ok := m.PortMap[addr]
if !ok {
return nil
}
return ports
}
type targetIP struct {
Ip string
}
func (t targetIP) IsExist() bool {
return t.Ip != ""
}
type WebsocketManager struct {
ConnGroups sync.Map //map[string]*websocket.Conn
RespPool sync.Pool // 这是复用缓冲区的 缓存池 减少GC
portGroup syncMap
}
func (w *WebsocketManager) CleanDeadConn() { // 用来清理死链的
pingTicker := time.NewTicker(time.Second * 10)
for t := range pingTicker.C {
w.ConnGroups.Range(func(key, value any) bool {
client := value.(*Client)
if !client.GetActive() {
return true
}
conn := client.Conn
conn.SetWriteDeadline(time.Now().Add(time.Second * 20))
err := conn.WriteMessage(websocket.PingMessage, []byte{})
if err != nil {
log.Println("send ping err:", err, t)
w.DeleteConnByKey(client.Key)
log.Printf("clean up conn ip %s from server", conn.RemoteAddr())
}
return true
})
}
}
func (w *WebsocketManager) SetWSTarget(c *gin.Context) {
c.Set("targetIPForWS", targetIP{Ip: c.ClientIP()})
}
func (w *WebsocketManager) AddClient(key string, client *Client) {
w.ConnGroups.Store(key, client)
w.portGroup.AddAddr(key)
}
func (w *WebsocketManager) SendMessage(msg string) { // 广播
var wg sync.WaitGroup
data := []byte(msg)
w.ConnGroups.Range(func(key, value any) bool {
client := value.(*Client)
if !client.GetActive() {
return true
}
wg.Add(1)
go func(key string, client *Client) {
conn := client.Conn
conn.SetWriteDeadline(time.Now().Add(3 * time.Second))
err := conn.WriteMessage(websocket.TextMessage, data)
if err != nil {
log.Printf("ERROR SEND MESSAGE : %v\n", err)
if websocket.IsCloseError(err, errCode...) {
if !client.GetActive() {
wg.Done()
return
}
w.DeleteConnByKey(key)
log.Println("写入错误删除conn", conn.RemoteAddr())
}
}
wg.Done()
}(key.(string), client)
return true
})
wg.Wait()
}
func (w *WebsocketManager) SendTo(key, msg string) error { // TODO 指定client 推送信息
posts := w.portGroup.FindAddr(key)
if len(posts) == 0 {
return errors.New("peer client has closed")
}
for _, key := range posts {
value, ok := w.ConnGroups.Load(key)
if !ok || value == nil {
return errors.New("peer client has closed")
}
client := value.(*Client)
if !client.GetActive() {
return errors.New("peer client has closed")
}
conn := client.Conn
conn.SetWriteDeadline(time.Now().Add(3 * time.Second))
err := conn.WriteMessage(websocket.TextMessage, []byte(msg))
if err != nil {
log.Printf("ERROR SEND MESSAGE : %v\n", err)
if websocket.IsCloseError(err, errCode...) {
if !client.GetActive() {
return errors.New("peer client has closed")
}
w.DeleteConnByKey(key)
log.Println("写入错误删除conn", conn.RemoteAddr())
}
return err
}
}
return nil
}
func (w *WebsocketManager) DeleteConnByKey(key string) { // 读写时发现对端已经关闭了连接,就从群组里删除
client, ok := w.ConnGroups.Load(key)
if ok && client != nil {
client, _ := client.(*Client)
conn := client.Conn
client.SetDeadStaus()
w.portGroup.RemoveAddr(client.Key)
conn.Close()
w.ConnGroups.Delete(key)
return
}
}
// 自定义一个结构体,实现 gin.ResponseWriter interface
type responseWriter struct {
gin.ResponseWriter
b *bytes.Buffer
req *bytes.Buffer
}
func newWriter() any {
return responseWriter{b: bytes.NewBuffer([]byte{}), req: bytes.NewBuffer([]byte{})}
}
// 重写 Write([]byte) (int, error) 方法
func (w responseWriter) Write(b []byte) (int, error) {
//向一个bytes.buffer中写一份数据来为获取body使用
w.b.Write(b)
//完成gin.Context.Writer.Write()原有功能
return w.ResponseWriter.Write(b)
}
func (w responseWriter) CopyBody(b []byte) (int, error) {
return w.req.Write(b)
}
func (w responseWriter) Reset() {
w.b.Reset()
w.req.Reset()
}
var WSM *WebsocketManager
func init() {
WSM = &WebsocketManager{RespPool: sync.Pool{New: newWriter}}
go WSM.CleanDeadConn()
}
func checkRepStatus(json string) bool {
status := gjson.Get(json, "success").Bool()
return status
}
type msg struct {
URL string `json:"url"`
Method string `json:"method"`
Body string `json:"body"`
Resp string `json:"resp"`
}
func Register(c *gin.Context) { // 单独的注册路由, 避免因为中间件拦截注册,劫持原生的 http 连接
if !c.IsWebsocket() {
c.JSON(http.StatusBadRequest, gin.H{"success": false, "result": "this url used for websocket"})
return
}
log.Printf("client ip %s %s\n", c.ClientIP(), c.RemoteIP())
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil || conn == nil {
log.Printf("websocket register failed because %v,ip is %s", err, conn.RemoteAddr().String())
c.Abort()
return
}
addr := conn.RemoteAddr().String()
conn.SetCloseHandler(func(code int, text string) error {
log.Printf("ip %s", addr)
WSM.DeleteConnByKey(addr)
return nil
})
client := &Client{Conn: conn, IsActive: true, Key: addr}
WSM.AddClient(addr, client) // 添加新的客户端ws 连接
go client.Read()
log.Printf("conn [%s] 链接成功\n", addr)
}
func WSMessage(c *gin.Context) {
wr := WSM.RespPool.Get().(responseWriter)
wr.ResponseWriter = c.Writer
c.Writer = wr
body, _ := c.GetRawData() // 读取 request body 的内容
wr.CopyBody(body)
c.Request.Body = io.NopCloser(wr.req)
c.Set("targetIPForWS", targetIP{})
c.Next()
log.Printf("response [%s] to %s", wr.b.String(), c.RemoteIP())
// 只拿 状态正常的 响应 一半这种才会有变动,值得去推送
if c.Writer.Status() == http.StatusOK && checkRepStatus(wr.b.String()) {
m := msg{
URL: c.Request.Host + c.Request.URL.String(),
Method: c.Request.Method,
Body: string(body),
Resp: wr.b.String(),
}
out, _ := json.Marshal(m)
tip, _ := c.Get("targetIPForWS")
target := tip.(targetIP)
if target.IsExist() {
WSM.SendTo(target.Ip, string(out))
} else {
WSM.SendMessage(string(out))
}
}
wr.Reset()
WSM.RespPool.Put(wr)
//WSM.SendMessage(WSM.Respe.b.String())
return
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。