1 Star 0 Fork 1

星旋/LinuxMediaCapture

forked from icedbeer/LinuxMediaCapture 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
credundance.cpp 10.04 KB
一键复制 编辑 原始数据 按行查看 历史
icedbeer 提交于 2016-11-30 23:42 . commit source code
#include "credundance.h"
#include <pthread.h>
typedef struct _COMMAND
{
short cmdId;
short cmdLen;
}COMMAND, *PCOMMAND;
typedef struct _RECVCOMMAND
{
short cmdId;
short cmdLen;
short err;
}RECVCOMMAND, *PRECVCOMMAND;
typedef struct _COMMANDIPPORT
{
char encoderIp[128];
char muticastIp[128];
short port;
// CDebugLog Log;
}COMMANDIPPORT, *PCOMMANDIPPORT;
CRedundance::CRedundance(CDeviceCtrl *parg)
: pParent(parg)
{
if(NULL == pParent)
{
return;
}
flag = false;
pingCount = pParent->pingTimes;
pLog = pParent->GetDebugLog();
if(pParent->isMaster)
{
pLog->DebugLog("I am master, i will set encoder and decoder.");
StartUdpSend();
}else
{
pLog->DebugLog("I am not master, i will set encoder and decoder.");
InitUdpRecv();
startTimer(pParent->pingInterval);
}
SetEncoderDecoder(pParent->masterEncoderIp, pParent->masterEncoderPort, pParent->masterMuticastIp);
SetEncoderDecoder(pParent->backupEncoderIp, pParent->backupEncoderPort, pParent->backupMuticastIp);
}
CRedundance::~CRedundance()
{
}
void CRedundance::timerEvent(QTimerEvent *)
{
if(INVALID_SOCKET == m_RecvSocket)
return;
struct sockaddr_in tSockAddrin;
int s32SockAddrinLength = 0, s32Read = 0;
struct timeval tTimeVal;
fd_set tRdSet;
tTimeVal.tv_sec = 0;
tTimeVal.tv_usec = pParent->pingInterval*1000;
FD_ZERO(&tRdSet);
FD_SET(m_RecvSocket, &tRdSet);
s32Read = select(FD_SETSIZE, &tRdSet, NULL, NULL, &tTimeVal);
if(s32Read <= 0)
{
if(pingCount > 0)
pingCount--;
} else
{
s32SockAddrinLength = sizeof(tSockAddrin);
s32Read = 4;
s32Read = recvfrom(m_RecvSocket, recvBuf,
s32Read, 0, (struct sockaddr *)&tSockAddrin,
(socklen_t *)&s32SockAddrinLength);
if(s32Read > 0 && (0 == strcmp(recvBuf, "MIT")))
{
if(pingCount <= 0 )
{
flag = false;
pingCount = pParent->pingTimes;
SetEncoderDecoder(pParent->masterEncoderIp, pParent->masterEncoderPort, pParent->masterMuticastIp);
SetEncoderDecoder(pParent->backupEncoderIp, pParent->backupEncoderPort, pParent->backupMuticastIp);
}
}else
{
if(pingCount > 0)
pingCount--;
}
}
if(pingCount<=0 && !pParent->isMaster && !flag)
{
flag = true;
SetEncoderDecoder(pParent->masterEncoderIp, pParent->masterEncoderPort, pParent->backupMuticastIp);
SetEncoderDecoder(pParent->backupEncoderIp, pParent->backupEncoderPort, pParent->masterMuticastIp);
}
}
void CRedundance::InitUdpRecv()
{
int s32Value = 0;
struct sockaddr_in tSockAddrin;
do
{
m_RecvSocket = INVALID_SOCKET;
m_RecvSocket = socket(AF_INET, SOCK_DGRAM, 0);
if(INVALID_SOCKET == m_RecvSocket)
{
break;
}
s32Value = 4 * 1024 * 1024;
setsockopt(m_RecvSocket, SOL_SOCKET, SO_SNDBUF, (char *)&s32Value, sizeof(s32Value));
s32Value = 4 * 1024 * 1024;
setsockopt(m_RecvSocket, SOL_SOCKET, SO_RCVBUF, (char *)&s32Value, sizeof(s32Value));
s32Value = 1;
setsockopt(m_RecvSocket, SOL_SOCKET, SO_REUSEADDR, (char *)&s32Value, sizeof(s32Value));
fcntl(m_RecvSocket, F_SETFL, O_NONBLOCK);
memset(&tSockAddrin, 0, sizeof(tSockAddrin));
tSockAddrin.sin_family = AF_INET;
tSockAddrin.sin_port = htons(pParent->pingPort);
tSockAddrin.sin_addr.s_addr = /*"192.168.1.87"*/INADDR_ANY;
if(bind(m_RecvSocket, (struct sockaddr *)&tSockAddrin, sizeof(tSockAddrin)) != 0)
{
perror("bind");
break;
}
return;
}while(0);
if(INVALID_SOCKET != m_RecvSocket)
{
close(m_RecvSocket);
m_RecvSocket = (~0);
}
}
void *ThreadMasterSend(void *pArg)
{
CDeviceCtrl *pDevice = (CDeviceCtrl *)pArg;
if(NULL == pDevice)
{
return NULL;
}
int s;
struct sockaddr_in mcast_addr, local_addr;
// dest pc socket set
memset(&mcast_addr, 0, sizeof(mcast_addr));
mcast_addr.sin_family = AF_INET;
mcast_addr.sin_addr.s_addr = inet_addr(pDevice->backupIp.toLocal8Bit().data());
mcast_addr.sin_port = htons(pDevice->pingPort);
//source pc socket set
s = socket(AF_INET, SOCK_DGRAM, 0);
if (s == -1)
{
pDevice->GetDebugLog()->DebugLog("ThreadMasterSend() socket failed:%d", errno);
return NULL;
}
int s32Value = 100;
if(0 > setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char *)&s32Value, sizeof(s32Value)))
{
pDevice->GetDebugLog()->DebugLog("ThreadMasterSend() SO_REUSEADDR failed:%d", errno);
return NULL;
}
fcntl(s, F_SETFL, O_NONBLOCK);
memset(&local_addr, 0, sizeof(local_addr));
local_addr.sin_family = AF_INET;
local_addr.sin_addr.s_addr = htonl(INADDR_ANY/*inet_addr(ptAvUdpSend->send_ip)*/);
local_addr.sin_port = htons(pDevice->pingPort);
if(bind(s, (struct sockaddr *)&local_addr, sizeof(local_addr)) < 0)
{
pDevice->GetDebugLog()->DebugLog("ThreadMasterSend() bind failed:%d", errno);
return NULL;
}
s32Value = 16 * 1024 * 1024;
setsockopt(s, SOL_SOCKET, SO_SNDBUF, (char *)&s32Value, sizeof(s32Value));
while(true)
{
if(0 > sendto(s, "MIT", 3, 0, (struct sockaddr*)&mcast_addr, sizeof(mcast_addr)))
{
pDevice->GetDebugLog()->DebugLog("ThreadMasterSend() sendto failed:%d", errno);
}else
{
usleep(pDevice->pingInterval*1000);
}
}
pDevice->GetDebugLog()->DebugLog("ThreadMasterSend() exit", errno);
close(s);
pthread_exit(NULL);
}
void CRedundance::StartUdpSend()
{
pLog->DebugLog("StartUdpSend()");
pthread_t hThreadHandle;
if(0 != pthread_create(&hThreadHandle, NULL, ThreadMasterSend, pParent))
{
pLog->DebugLog("StartUdpSend() master start udp send thread failed.");
}
}
void *ThreadSetEncoder(void *arg)
{
PCOMMANDIPPORT pIpPort = (PCOMMANDIPPORT )arg;
if(NULL == pIpPort)
{
return NULL;
}
QString encoderIp = pIpPort->encoderIp;
uint32_t encoderPort = pIpPort->port;
QString MuticastIp = pIpPort->muticastIp;
//set encoder
int EnClient=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
struct sockaddr_in servaddr;
servaddr.sin_family=AF_INET;
servaddr.sin_addr.s_addr=inet_addr(encoderIp.toLocal8Bit().data()); //ָ
servaddr.sin_port=htons(encoderPort);
int value = 100;
setsockopt(EnClient, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
if(0 != ::connect(EnClient,(struct sockaddr*)&servaddr,sizeof(servaddr))) //ͨ׽
{
// pLog->DebugLog("SetEncoderDecoder() connect failed:%d", errno);
return NULL;
}
PCOMMAND pCmd = new COMMAND;
char buf[128]={0};
QString str = QString("enc0.stream_url=%1\n").arg(MuticastIp);
pCmd->cmdId = htons(3);
pCmd->cmdLen = htons(str.length());
memcpy(buf, (char *)pCmd, sizeof(COMMAND));
memcpy(buf+sizeof(COMMAND), str.toLocal8Bit().data(), str.length());
if(0 >= send(EnClient, buf, sizeof(COMMAND) + str.length(), 0))
{
// pLog->DebugLog("SetEncoderDecoder() send failed:%d", errno);
}
// while(true)
// {
// char buf[128];
// int len=recv(EnClient,buf,sizeof(buf),0); //ͨ׽ֽ
// if(len>0)
// {
// PRECVCOMMAND pRcv = (PRECVCOMMAND)buf;
// pRcv->cmdId = ntohs(pRcv->cmdId);
// pRcv->cmdLen = ntohs(pRcv->cmdLen);
// pRcv->err = ntohs(pRcv->err);
// }
// }
delete pCmd;
close(EnClient);
// //set decoder
// int DecClient=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
//// struct sockaddr_in servaddr;
// bzero(&servaddr, sizeof(struct sockaddr_in));
// servaddr.sin_family=AF_INET;
// servaddr.sin_addr.s_addr=inet_addr(pParent->encoderIp.toLocal8Bit().data()); //ָ
// servaddr.sin_port=htons(pParent->encoderPort);
// value = 100;
// setsockopt(DecClient, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
// if(0 != ::connect(DecClient,(struct sockaddr*)&servaddr,sizeof(servaddr))) //ͨ׽
// {
// pLog->DebugLog("SetEncoderDecoder() connect failed:%d", errno);
// return ;
// }
// PCOMMAND pCmd2 = new COMMAND;
// char buf2[128]={0};
// QString str2 = QString("dec0.stream_url=%1\n").arg(MuticastIp);
// pCmd2->cmdId = htons(3);
// pCmd2->cmdLen = htons(str2.length());
// memcpy(buf2, (char *)pCmd2, sizeof(COMMAND));
// memcpy(buf2+sizeof(COMMAND), str2.toLocal8Bit().data(), str2.length());
// if(0 >= send(DecClient, (char *)pCmd2, sizeof(COMMAND) + str2.length(), 0))
// {
// pLog->DebugLog("SetEncoderDecoder() send failed:%d", errno);
// }
//// while(true)
//// {
//// char buf[128];
//// int len=recv(DecClient,buf,sizeof(buf),0); //ͨ׽ֽ
//// if(len>0)
//// {
//// PRECVCOMMAND pRcv = (PRECVCOMMAND)buf;
//// pRcv->cmdId = ntohs(pRcv->cmdId);
//// pRcv->cmdLen = ntohs(pRcv->cmdLen);
//// pRcv->err = ntohs(pRcv->err);
//// }
//// }
// delete pCmd2;
// close(DecClient);
pthread_exit(NULL);
}
void CRedundance::SetEncoderDecoder(QString encoderIp, uint32_t encoderPort, QString MuticastIp)
{
pLog->DebugLog("SetEncoderDecoder()");
pthread_t hThreadHandle;
COMMANDIPPORT IpPort;
memset(&IpPort, 0, sizeof(COMMANDIPPORT));
memcpy(IpPort.encoderIp, encoderIp.toLocal8Bit().data(), encoderIp.length());
IpPort.port = encoderPort;
// IpPort.Log = *pLog;
memcpy(IpPort.muticastIp, MuticastIp.toLocal8Bit().data(), MuticastIp.length());
if(0 != pthread_create(&hThreadHandle, NULL, ThreadSetEncoder, &IpPort))
{
pLog->DebugLog("StartUdpSend() master start udp send thread failed.");
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C++
1
https://gitee.com/star-spinning/LinuxMediaCapture.git
git@gitee.com:star-spinning/LinuxMediaCapture.git
star-spinning
LinuxMediaCapture
LinuxMediaCapture
master

搜索帮助

0d507c66 1850385 C8b1a773 1850385