1 Star 0 Fork 0

irishcoffeeguo/RTSPtoWeb

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
serverRTSP.go 13.91 KB
一键复制 编辑 原始数据 按行查看 历史
deepch 提交于 2022-01-05 19:26 . add video example token
package main
import (
"bytes"
"errors"
"fmt"
"net"
"net/url"
"os"
"strconv"
"strings"
"time"
"github.com/sirupsen/logrus"
)
var (
Version = "RTSP/1.0"
UserAgent = "Lavf58.29.100"
Session = "000a959d6816"
)
var (
OPTIONS = "OPTIONS"
DESCRIBE = "DESCRIBE"
SETUP = "SETUP"
PLAY = "PLAY"
TEARDOWN = "TEARDOWN"
)
// RTSP response status codes
const (
StatusContinue = 100
StatusOK = 200
StatusCreated = 201
StatusLowOnStorageSpace = 250
StatusMultipleChoices = 300
StatusMovedPermanently = 301
StatusMovedTemporarily = 302
StatusSeeOther = 303
StatusNotModified = 304
StatusUseProxy = 305
StatusBadRequest = 400
StatusUnauthorized = 401
StatusPaymentRequired = 402
StatusForbidden = 403
StatusNotFound = 404
StatusMethodNotAllowed = 405
StatusNotAcceptable = 406
StatusProxyAuthenticationRequired = 407
StatusRequestTimeout = 408
StatusGone = 410
StatusLengthRequired = 411
StatusPreconditionFailed = 412
StatusRequestEntityTooLarge = 413
StatusRequestURITooLong = 414
StatusUnsupportedMediaType = 415
StatusInvalidparameter = 451
StatusIllegalConferenceIdentifier = 452
StatusNotEnoughBandwidth = 453
StatusSessionNotFound = 454
StatusMethodNotValidInThisState = 455
StatusHeaderFieldNotValid = 456
StatusInvalidRange = 457
StatusParameterIsReadOnly = 458
StatusAggregateOperationNotAllowed = 459
StatusOnlyAggregateOperationAllowed = 460
StatusUnsupportedTransport = 461
StatusDestinationUnreachable = 462
StatusInternalServerError = 500
StatusNotImplemented = 501
StatusBadGateway = 502
StatusServiceUnavailable = 503
StatusGatewayTimeout = 504
StatusRTSPVersionNotSupported = 505
StatusOptionNotsupport = 551
)
func StatusText(code int) string {
return statusText[code]
}
var statusText = map[int]string{
StatusContinue: "Continue",
StatusOK: "OK",
StatusCreated: "Created",
StatusLowOnStorageSpace: "Low on Storage Space",
StatusMultipleChoices: "Multiple Choices",
StatusMovedPermanently: "Moved Permanently",
StatusMovedTemporarily: "Moved Temporarily",
StatusSeeOther: "See Other",
StatusNotModified: "Not Modified",
StatusUseProxy: "Use Proxy",
StatusBadRequest: "Bad Request",
StatusUnauthorized: "Unauthorized",
StatusPaymentRequired: "Payment Required",
StatusForbidden: "Forbidden",
StatusNotFound: "Not Found",
StatusMethodNotAllowed: "Method Not Allowed",
StatusNotAcceptable: "Not Acceptable",
StatusProxyAuthenticationRequired: "Proxy Authentication Required",
StatusRequestTimeout: "Request Time-out",
StatusGone: "Gone",
StatusLengthRequired: "Length Required",
StatusPreconditionFailed: "Precondition Failed",
StatusRequestEntityTooLarge: "Request Entity Too Large",
StatusRequestURITooLong: "Request-URI Too Large",
StatusUnsupportedMediaType: "Unsupported Media Type",
StatusInvalidparameter: "Parameter Not Understood",
StatusIllegalConferenceIdentifier: "Conference Not Found",
StatusNotEnoughBandwidth: "Not Enough Bandwidth",
StatusSessionNotFound: "Session Not Found",
StatusMethodNotValidInThisState: "Method Not Valid in This State",
StatusHeaderFieldNotValid: "Header Field Not Valid for Resource",
StatusInvalidRange: "Invalid Range",
StatusParameterIsReadOnly: "Parameter Is Read-Only",
StatusAggregateOperationNotAllowed: "Aggregate operation not allowed",
StatusOnlyAggregateOperationAllowed: "Only aggregate operation allowed",
StatusUnsupportedTransport: "Unsupported transport",
StatusDestinationUnreachable: "Destination unreachable",
StatusInternalServerError: "Internal Server Error",
StatusNotImplemented: "Not Implemented",
StatusBadGateway: "Bad Gateway",
StatusServiceUnavailable: "Service Unavailable",
StatusGatewayTimeout: "Gateway Time-out",
StatusRTSPVersionNotSupported: "RTSP Version not supported",
StatusOptionNotsupport: "Option not supported",
}
//RTSPServer func
func RTSPServer() {
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"func": "RTSPServer",
"call": "Start",
}).Infoln("Server RTSP start")
l, err := net.Listen("tcp", Storage.ServerRTSPPort())
if err != nil {
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"func": "RTSPServer",
"call": "Listen",
}).Errorln(err)
os.Exit(1)
}
defer func() {
err := l.Close()
if err != nil {
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"func": "RTSPServer",
"call": "Close",
}).Errorln(err)
}
}()
for {
conn, err := l.Accept()
if err != nil {
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"func": "RTSPServer",
"call": "Accept",
}).Errorln(err)
os.Exit(1)
}
go RTSPServerClientHandle(conn)
}
}
//RTSPServerClientHandle func
func RTSPServerClientHandle(conn net.Conn) {
buf := make([]byte, 4096)
token, uuid, channel, in, cSEQ := "", "", "0", 0, 0
var playStarted bool
defer func() {
err := conn.Close()
if err != nil {
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"stream": uuid,
"channel": channel,
"func": "handleRTSPServerRequest",
"call": "Close",
}).Errorln(err.Error())
}
}()
err := conn.SetDeadline(time.Now().Add(10 * time.Second))
if err != nil {
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"stream": uuid,
"channel": channel,
"func": "handleRTSPServerRequest",
"call": "SetDeadline",
}).Errorln(err.Error())
return
}
for {
n, err := conn.Read(buf)
if err != nil {
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"stream": uuid,
"channel": channel,
"func": "handleRTSPServerRequest",
"call": "Read",
}).Errorln(err.Error())
return
}
cSEQ = parsecSEQ(buf[:n])
stage, err := parseStage(buf[:n])
if err != nil {
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"stream": uuid,
"channel": channel,
"func": "handleRTSPServerRequest",
"call": "parseStage",
}).Errorln(err.Error())
}
err = conn.SetDeadline(time.Now().Add(60 * time.Second))
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"stream": uuid,
"channel": channel,
"func": "handleRTSPServerRequest",
"call": "Request",
}).Debugln(string(buf[:n]))
if err != nil {
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"stream": uuid,
"channel": channel,
"func": "handleRTSPServerRequest",
"call": "SetDeadline",
}).Errorln(err.Error())
return
}
switch stage {
case OPTIONS:
if playStarted {
err = RTSPServerClientResponse(uuid, channel, conn, 200, map[string]string{"CSeq": strconv.Itoa(cSEQ), "Public": "DESCRIBE, SETUP, TEARDOWN, PLAY"})
if err != nil {
return
}
continue
}
uuid, channel, token, err = parseStreamChannel(buf[:n])
if err != nil {
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"stream": uuid,
"channel": channel,
"func": "handleRTSPServerRequest",
"call": "parseStreamChannel",
}).Errorln(err.Error())
return
}
if !Storage.StreamChannelExist(uuid, channel) {
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"stream": uuid,
"channel": channel,
"func": "handleRTSPServerRequest",
"call": "StreamChannelExist",
}).Errorln(ErrorStreamNotFound.Error())
err = RTSPServerClientResponse(uuid, channel, conn, 404, map[string]string{"CSeq": strconv.Itoa(cSEQ)})
if err != nil {
return
}
return
}
if !RemoteAuthorization("RTSP", uuid, channel, token, conn.RemoteAddr().String()) {
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"stream": uuid,
"channel": channel,
"func": "handleRTSPServerRequest",
"call": "StreamChannelExist",
}).Errorln(ErrorStreamNotFound.Error())
err = RTSPServerClientResponse(uuid, channel, conn, 401, map[string]string{"CSeq": strconv.Itoa(cSEQ)})
if err != nil {
return
}
return
}
Storage.StreamChannelRun(uuid, channel)
err = RTSPServerClientResponse(uuid, channel, conn, 200, map[string]string{"CSeq": strconv.Itoa(cSEQ), "Public": "DESCRIBE, SETUP, TEARDOWN, PLAY"})
if err != nil {
return
}
case SETUP:
if !strings.Contains(string(buf[:n]), "interleaved") {
err = RTSPServerClientResponse(uuid, channel, conn, 461, map[string]string{"CSeq": strconv.Itoa(cSEQ)})
if err != nil {
return
}
continue
}
err = RTSPServerClientResponse(uuid, channel, conn, 200, map[string]string{"CSeq": strconv.Itoa(cSEQ), "User-Agent:": UserAgent, "Session": Session, "Transport": "RTP/AVP/TCP;unicast;interleaved=" + strconv.Itoa(in) + "-" + strconv.Itoa(in+1)})
if err != nil {
return
}
in = in + 2
case DESCRIBE:
sdp, err := Storage.StreamChannelSDP(uuid, channel)
if err != nil {
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"stream": uuid,
"channel": channel,
"func": "handleRTSPServerRequest",
"call": "StreamSDP",
}).Errorln(err.Error())
return
}
err = RTSPServerClientResponse(uuid, channel, conn, 200, map[string]string{"CSeq": strconv.Itoa(cSEQ), "User-Agent:": UserAgent, "Session": Session, "Content-Type": "application/sdp\r\nContent-Length: " + strconv.Itoa(len(sdp)), "sdp": string(sdp)})
if err != nil {
return
}
case PLAY:
err = RTSPServerClientResponse(uuid, channel, conn, 200, map[string]string{"CSeq": strconv.Itoa(cSEQ), "User-Agent:": UserAgent, "Session": Session})
if err != nil {
return
}
playStarted = true
go RTSPServerClientPlay(uuid, channel, conn)
case TEARDOWN:
err = RTSPServerClientResponse(uuid, channel, conn, 200, map[string]string{"CSeq": strconv.Itoa(cSEQ), "User-Agent:": UserAgent, "Session": Session})
if err != nil {
return
}
return
default:
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"stream": uuid,
"channel": channel,
"func": "handleRTSPServerRequest",
"call": "Stage",
}).Debugln("stage bad", stage)
}
}
}
//handleRTSPServerPlay func
func RTSPServerClientPlay(uuid string, channel string, conn net.Conn) {
cid, _, ch, err := Storage.ClientAdd(uuid, channel, RTSP)
if err != nil {
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"stream": uuid,
"channel": channel,
"func": "handleRTSPServerRequest",
"call": "ClientAdd",
}).Errorln(err.Error())
return
}
defer func() {
Storage.ClientDelete(uuid, cid, channel)
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"stream": uuid,
"channel": channel,
"func": "handleRTSPServerRequest",
"call": "ClientDelete",
}).Infoln("Client offline")
err := conn.Close()
if err != nil {
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"stream": uuid,
"channel": channel,
"func": "handleRTSPServerRequest",
"call": "Close",
}).Errorln(err.Error())
}
}()
noVideo := time.NewTimer(10 * time.Second)
for {
select {
case <-noVideo.C:
return
case pck := <-ch:
noVideo.Reset(10 * time.Second)
_, err := conn.Write(*pck)
if err != nil {
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"stream": uuid,
"channel": channel,
"func": "handleRTSPServerRequest",
"call": "Write",
}).Errorln(err.Error())
return
}
}
}
}
//handleRTSPServerPlay func
func RTSPServerClientResponse(uuid string, channel string, conn net.Conn, status int, headers map[string]string) error {
var sdp string
builder := bytes.Buffer{}
builder.WriteString(fmt.Sprintf(Version+" %d %s\r\n", status, StatusText(status)))
for k, v := range headers {
if k == "sdp" {
sdp = v
continue
}
builder.WriteString(fmt.Sprintf("%s: %s\r\n", k, v))
}
builder.WriteString(fmt.Sprintf("\r\n"))
builder.WriteString(sdp)
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"stream": uuid,
"channel": channel,
"func": "RTSPServerClientResponse",
"call": "Response",
}).Debugln(builder.String())
if _, err := conn.Write(builder.Bytes()); err != nil {
log.WithFields(logrus.Fields{
"module": "rtsp_server",
"stream": uuid,
"channel": channel,
"func": "RTSPServerClientResponse",
"call": "Write",
}).Errorln(err.Error())
return err
}
return nil
}
//parsecSEQ func
func parsecSEQ(buf []byte) int {
return stringToInt(stringInBetween(string(buf), "CSeq: ", "\r\n"))
}
//parseStage func
func parseStage(buf []byte) (string, error) {
st := strings.Split(string(buf), " ")
if len(st) > 0 {
return st[0], nil
}
return "", errors.New("parse stage error " + string(buf))
}
//parseStreamChannel func
func parseStreamChannel(buf []byte) (string, string, string, error) {
var token string
uri := stringInBetween(string(buf), " ", " ")
u, err := url.Parse(uri)
if err == nil {
token = u.Query().Get("token")
uri = u.Path
}
st := strings.Split(uri, "/")
if len(st) >= 3 {
return st[1], st[2], token, nil
}
return "", "0", token, errors.New("parse stream error " + string(buf))
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/irishcoffeeguo/RTSPtoWeb.git
git@gitee.com:irishcoffeeguo/RTSPtoWeb.git
irishcoffeeguo
RTSPtoWeb
RTSPtoWeb
master

搜索帮助