1 Star 0 Fork 1

罗声海/vedio stream

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
stream_entry.h 16.09 KB
一键复制 编辑 原始数据 按行查看 历史
hai 提交于 2021-01-30 22:51 . push 成功 ,user recv失败
#ifndef _STREAM_ENTRY_H_
#define _STREAM_ENTRY_H_
#include "flv_main.h"
#include "pusher.h"
#include "flv.h"
class data_struct;
enum CHUNK_STATE
{
CHUNK_START,
CHUNK_MORE,
CHUNK_OK,
CHUNK_END
};
typedef struct meta_info
{
oal_int32 proced_index;
oal_int32 flv_head_len;
oal_int32 flv_script_len;
oal_int32 avc_seq_len;
}Meta_Info;
class stream_entry :public pusher
{
public :
oal_bool meta_pass;
oal_bool is_close;//连接是否关闭
Meta_Info MetaInfo;
oal_uint32 read_index;//拉流用户读的位置
oal_uint32 write_index;//推流用户写的位置
oal_uint32 can_read_index;//可以读到的位置
oal_uint8 *cache_buff;//缓存空间
oal_int32 cache_len;//缓存空间长度
oal_uint8 *meta_data;//保存flv元数据
oal_uint32 meta_len;//元数据长度
list<oal_int32> *user_list;//user链表
pthread_spinlock_t userlist_spinlock;
CHUNK_STATE chunked_state;
oal_uint8 push_stream(data_struct *queue_handle);
stream_entry()
{
pthread_spin_init(&userlist_spinlock,PTHREAD_PROCESS_PRIVATE);
}
oal_void stream_entry_clear()
{
chunked_state = CHUNK_START;
pusher_clear();
meta_pass = false;
is_close = false;
read_index = write_index = 0;
can_read_index = 0;
meta_len = 0;
memset(cache_buff,0,cache_len);
memset(recv_buff,0,buf_len);
memset(meta_data,0,META_LEN);
memset(&MetaInfo,0,sizeof(Meta_Info));
//stream_entry_free();
}
oal_int8 init() //使用对象前一定要先init
{
buf_len = buffer_size;
recv_buff = (oal_uint8 *)malloc(buf_len);
if(recv_buff == oal_ptr_null)
{
printf("recv buff init failed\n");
return -1;
}
cache_len = CACHE_BUFF_LEN;
cache_buff = (oal_uint8 *)malloc(cache_len);
if(cache_buff == oal_ptr_null)
{
printf("cache_buff init failed\n");
return -1;
}
meta_data = (oal_uint8 *)malloc(META_LEN);
if(meta_data == oal_ptr_null)
{
printf("meta_data init failed\n");
return -1;
}
user_list = new list<oal_int32>();
if(user_list == oal_ptr_null )
{
printf("user_list init failed\n");
return -1;
}
entry = this;
http_state = CHECK_REQUEST_LINE;
stream_entry_clear();
return 0;
}
oal_void stream_entry_free()
{
free(recv_buff);
free(cache_buff);
free(meta_data);
delete user_list;
}
int ntoh24s(oal_uint24 * num)
{
int value = 0;
value += num->first<<16;
value += num->second<<8;
value += num->third;
//printf("the value is [%d]\n",value);
return value;
}
int oal_h2n_long(int num)
{
int num1 = 0;
int temp = 0;
for(int i = 0 ;i < 4; i++)
{
temp = num>>(i*8);
num1 += temp<<(32-(i+1)*8);
temp = 0;
}
return num1;
}
int oal_n2h_long(int num)
{
int num1 = 0;
int temp = 0;
for(int i = 0 ;i < 4; i++)
{
temp = num>>(i*8)&0xff;
num1 += temp<<(32-(i+1)*8);
temp = 0;
}
printf("num1 [%d] \n",num1);
return num1;
}
char * find_scipt_head(char *buffer,int len)
{
FLV_HEADER *flv_head = (FLV_HEADER *)buffer;
int head_len = oal_n2h_long(flv_head->offset);
printf("flv header length is [%d] \n",head_len);
if(len < head_len)
{
printf("the head len is wrong\n");
return NULL;
}
return (buffer + head_len);
}
char * find_vedio_head(char *script_head , int * script_len ,int pre_data_len)
{
FLVTAG_HEADER *tag_head = (FLVTAG_HEADER *)script_head;
int datalen = ntoh24s(&(tag_head->data_len));
printf("the script type is [%d]\n",tag_head->tag_type);
printf("the scripttag data len is [%d]\n",datalen);
*script_len = datalen + 11;
if(pre_data_len + *script_len + 4 > can_read_index)
{
return NULL;
}
printf("the script total len is [%d]\n",*script_len);
VEDIO_HEAD *vedio_header = (VEDIO_HEAD *)(tag_head + 1);
//printf("the frame type1 [%d]\n",vedio_header->frame_type);
printf("the frame type2 [%d]\n",vedio_header->stream_id);
AVC_HEAD *avc_header = (AVC_HEAD *)(vedio_header + 1);
printf("the avc type is [%d]\n\n",avc_header->avc_type);
return script_head + 15 + datalen;
}
char * find_next_vedio_head(char *last_data_head , int * tag_len , int *quit)
{
static int frame_count = 0;
FLVTAG_HEADER *tag_head = (FLVTAG_HEADER *)last_data_head;
int datalen = ntoh24s(&(tag_head->data_len));
//printf("the tag type is [%d]\n",tag_head->tag_type);
//printf("the vedio data len is [%d]\n",datalen);
*tag_len = datalen + 11;
//printf("the vedio total len is [%d]\n",*tag_len);
VEDIO_HEAD *vedio_header = (VEDIO_HEAD *)(tag_head + 1);
//printf("the frame type1 [%d]\n",vedio_header->frame_type);
//printf("the frame type2 [%d]\n",vedio_header->vedio_header);
AVC_HEAD *avc_header = (AVC_HEAD *)(vedio_header + 1);
//printf("the avc type is [%d]\n\n",avc_header->avc_type);
frame_count ++;
if( frame_count > 4 && vedio_header->stream_id == 1 )
{
printf("frame_count [%d]\n",frame_count);
*quit = 1;
}
return last_data_head + 15 + datalen;
}
oal_int8 save_metabuff()
{
oal_int32 first_index = MetaInfo.proced_index;
oal_int8 *script_data = find_scipt_head((oal_int8 *)cache_buff + first_index ,can_read_index);
if(script_data == NULL)
{
printf("script_data NULL\n");
return -1;
}
MetaInfo.proced_index = script_data - (oal_int8 *)cache_buff;
oal_int32 script_len = 0;
oal_int8 *avc_seq_data = find_vedio_head(script_data,&script_len, MetaInfo.proced_index);
if(avc_seq_data == NULL)
{
printf("avc_seq_data NULL\n");
return -1;
}
MetaInfo.proced_index = avc_seq_data - (oal_int8 *)cache_buff;
oal_int32 avc_seq_data_len = 0;
oal_int8 *normal_vedio_data = find_vedio_head(avc_seq_data,&avc_seq_data_len, MetaInfo.proced_index);
if(normal_vedio_data == NULL)
{
printf("normal_vedio_data NULL\n");
return -1;
}
printf("save meta buff 11111 , normal_vedio_data [%p] ,cache_buff [%p] \n",normal_vedio_data ,cache_buff);
meta_len = normal_vedio_data - (oal_int8 *)cache_buff;
MetaInfo.proced_index = normal_vedio_data - (oal_int8 *)cache_buff;
printf("save meta buff 22222 ,meta len [%d]\n",meta_len);
memcpy(meta_data,cache_buff,meta_len);
printf("save meta buff ok\n");
return 0;
}
oal_int8 write2user(oal_int32 socketfd,oal_uint8 *data , oal_int32 data_len)
{
oal_int32 ret;
oal_int32 total_len = 0;
printf("write in,data [%p] , len [%d],socket_id [%d]\n",data,data_len,socketfd);
// char *aaa = "rewtfdhgfjgh";
// data_len = strlen(aaa);
while(total_len < data_len)
{
ret = write(socketfd,data + total_len , data_len - total_len);//第三个参数为0时返回0
//ret = write(socketfd,aaa + total_len , data_len - total_len);//第三个参数为0时返回0
printf("write one\n");
if(ret < 0)
{
if(errno == EPIPE)//连接关闭
{
printf("user close\n");
return -1;
}
printf("error \n");
return -1;
}
else if(ret >= 0)
{
total_len += ret;
}
}
printf("write out \n");
return 0;
}
oal_int32 str2h(oal_int8 *str)
{
oal_int32 len = 0;
oal_int8 *temp_str = str;
oal_int32 sum = 0;
while(*temp_str != '\r')
{
temp_str++;
len++;
};
oal_int32 temp = 0;
for(oal_int32 i = 0; i < len ; i++)
{
if(str[i] >= 48 && str[i] <= 57)
{
temp = str[i] - 48;
sum += temp<<((len - i - 1)*4);
}
else if(str[i] >= 97 && str[i] <= 102)
{
temp = str[i] - 97 + 10;
sum += temp<<((len - i - 1)*4);
}
else
{
printf("exchange error!!\n");
return -1;
}
}
return sum;
}
oal_int8 parse_chunked_data()
{
printf("parse_chunked_data in , check size is [%d] , read_limit_index is [%d]\n",check_index,read_limit_index);
while(check_index < read_limit_index)//每一次接收的块不一定完整
{
if(chunked_state == CHUNK_OK)
{
printf("chunked_state == CHUNK_OK\n");
#ifndef _HAI_TEST_
int chunk_len = str2h(chunk_data.chunk_ascii);
chunk_data.now_asii = 0;
//data_addr[check_index-2] = 0;
//chunk_len = atoi((char *)data_addr+last_check_index);
printf("the chunk len [%d]\n",chunk_len);
printf("[%c] [%c]\n",data_addr[last_check_index],data_addr[last_check_index + 1]);
#endif
chunk_data.chunk_data_index = check_index;
chunk_data.chunk_size = chunk_len;
oal_int32 min_len = (chunk_data.chunk_size >= (read_limit_index - check_index )) ? (read_limit_index - check_index) : chunk_data.chunk_size;
printf("chunked_state 111111111,write_index [%d],chunk_size [%d] limit_size [%d]\n",write_index,chunk_data.chunk_size,read_limit_index - check_index);
if(write_index + min_len <= CACHE_BUFF_LEN)
{
printf("chunked_state 111122222 ,min_len [%d] \n",min_len);
memcpy(cache_buff + write_index, data_addr + check_index , min_len);
}
else
{
printf("chunked_state 111133333,min_len [%d] \n",min_len);
oal_uint32 left_len = (CACHE_BUFF_LEN - write_index);
printf("left_len [%u] \n",left_len);
memcpy(cache_buff + write_index, data_addr + check_index ,left_len);
memcpy(cache_buff , data_addr + check_index + left_len, min_len - left_len);
}
printf("chunked_state 22222\n");
write_index = (write_index + min_len)%CACHE_BUFF_LEN;
check_index += min_len;
if(min_len < chunk_data.chunk_size)
{
chunk_data.need_more = true;
chunk_data.proced_size = min_len;
chunked_state = CHUNK_MORE;
printf("chunked_state 33333\n");
}
else
{
chunk_data.proced_size = 0;
chunk_data.chunk_size = 0;
can_read_index = write_index;
if(!meta_pass && can_read_index > META_LEN)
{
if(save_metabuff() >= 0 )
{
meta_pass = true;
read_index = meta_len;
}
}
chunked_state = CHUNK_END;
printf("chunked_state 44444\n");
}
}
else if(chunked_state == CHUNK_MORE)
{
printf("chunked_state != CHUNK_OK \n");
oal_int32 left_len = chunk_data.chunk_size - chunk_data.proced_size;
printf("left_len is [%d] , chunk size [%d] , proced len is [%d]\n",left_len,chunk_data.chunk_size,chunk_data.proced_size);
oal_int32 write_len = (read_limit_index - check_index) >= left_len ? left_len : (read_limit_index - check_index );
if(write_index + write_len <= CACHE_BUFF_LEN)
{
memcpy(cache_buff + write_index, data_addr + check_index , write_len);
}
else
{
oal_int32 left_length = (CACHE_BUFF_LEN - write_index);
memcpy(cache_buff + write_index, data_addr + check_index ,left_length);
memcpy(cache_buff , data_addr + check_index + left_length, write_len - left_length);
}
write_index = (write_index + write_len)%CACHE_BUFF_LEN;
check_index += write_len;
if(left_len == write_len)
{
can_read_index = write_index;
chunked_state = CHUNK_END;
chunk_data.need_more = false;
chunk_data.proced_size = 0;
chunk_data.chunk_size = 0;
if(!meta_pass && can_read_index > META_LEN)
{
printf("in the metabuff\n");
if(save_metabuff() >= 0 )
{
meta_pass = true;
read_index = meta_len;
}
}
}
else
{
chunk_data.proced_size += write_len;
}
}
else if(chunked_state == CHUNK_START)
{
printf("chunked_state == CHUNK_START\n");
if(parse_endline() == 0)
{
chunked_state = CHUNK_OK;
}
}
else
{
printf("chunked_state == CHUNK_END\n");
if(data_addr[check_index++] == '\n')
{
chunked_state = CHUNK_START;
}
}
//printf("check size is [%d] , read_limit_index is [%d]\n",check_index,read_limit_index);
}
printf("parse_chunked_data out\n");
}
oal_int8 read()
{
//读完w
//printf("push read , the buf len [%d]\n",buf_len);
oal_int32 ret;
//oal_int32 read_len = 0;
oal_int32 total_len = 0;
while(total_len < buf_len)
{
ret = recv(socket_id,recv_buff + total_len, buf_len - total_len ,0);
if (ret == 0)//返回0表示连接已关闭
{
printf("connect close!!!\n");
is_close = true;
break;
}
else if (ret < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)//表示连接没有数据可读,EAGAIN意思是resource unvaluiable
{
break;
}
else if(EINTR == errno)//表明recv被中断打断,需要重新读
{
continue;
}
else
{
printf("something wrong happen!!\n");
is_close = true;
break;
}
}
total_len += ret;
//data_read = recv(temp_fd, http_con[index].read_buff + read_index, BUFFER_SIZE - read_index, 0);
}
read_limit_index = total_len;
printf("haixxxxxxxxxxxxxxxx ,read len [%d]\n",total_len);
chunked = true;
if(parse_content() == REQUEST_OK && chunked == true)
{
if(!find_mysql_flag)
{
push_user_id = 1;
}
parse_chunked_data();
}
last_check_index = 0;
read_limit_index = 0;
check_index = 0;
}
};
#endif
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C++
1
https://gitee.com/luo-shenghai/vedio-stream.git
git@gitee.com:luo-shenghai/vedio-stream.git
luo-shenghai
vedio-stream
vedio stream
master

搜索帮助