《[音视频应用开发系列文章目录](https://blog.csdn.net/KayChanGEEK/article/details/103319415)》
## 推流器设计思路
生产者-消费者模式。一个线程负责拉取原始音视频流然后存储在队列,一个线程负责从音视频流队列获取流然后推送,视频帧需要按照实际的帧率间隔发送,否则服务器可能会受不了
![](https://img-blog.csdnimg.cn/20191226145526982.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0theUNoYW5HRUVL,size_16,color_FFFFFF,t_70)
## 设计AvPusher类
### AvPusher.h
```
#ifndef _AV_PUSHER_H_
#define _AV_PUSHER_H_
/***********************************************************
** Author:kaychan
** Data:2019-11-04
** Mail:1203375695@qq.com
** Explain:a audio/video pusher
***********************************************************/
#include "AvCommon.h"
// 错误回调函数
typedef void(*on_error)(std::string error, void *priv);
typedef struct AvPusherParamters_S {
public:
void *priv = NULL;
std::string istream = "";
std::string ostream = "";
bool run = true;
AVFormatContext *ifmt_ctx = NULL;
AVFormatContext *ofmt_ctx = NULL;
int64_t video_udelay = 0;
int64_t cache_packets_size = 0;
int stream_mapping_size;
int *stream_mapping = NULL;
int video_index = -1;
AvPacketSafeQueue av_packets;
int64_t flowrate = 0;
on_error av_on_error;
}AvPusherParamters;
class AvPusher {
public:
AvPusher();
// 启动推流器
// istream:拉流地址
// ostream:推流地址支持RTMP和UDP
// av_on_error:错误回调函数
// priv:回调函数入参
void start(const std::string istream, const std::string ostream, on_error av_on_error, void *priv = NULL);
// 停止推流器
void stop();
// 获取推流流量
int64_t flowrate();
private:
std::thread tsk_pull_;
std::thread tsk_push_;
AvPusherParamters paramters_;
};
#endif
```
### AvPusher.cpp
```
#include "AvPusher.h"
static inline void emit_error(int errcode, AvPusherParamters ¶mters) {
paramters.run = false;
char error[256];
av_strerror(errcode, error, 256);
paramters.av_on_error(error, paramters.priv);
}
void AvPusher_pull(AvPusherParamters ¶mters) {
// deal with input stream
AVInputFormat *ifmt = NULL;
AVDictionary *dict = NULL;
av_dict_set(&dict, "stimeout", "2000000", 0);
av_dict_set(&dict, "buffer_size", "4096000", 0);
av_dict_set(&dict, "recv_buffer_size", "4096000", 0);
int r = avformat_open_input(¶mters.ifmt_ctx, paramters.istream.c_str(), ifmt, &dict);
if (r != 0) {
emit_error(r, paramters);
return;
}
r = avformat_find_stream_info(paramters.ifmt_ctx, NULL);
if (r < 0) {
emit_error(r, paramters);
return;
}
// deal with output stream
char *ofmt_name = "flv"; // default is rtmp
std::string::size_type st = paramters.ostream.find("udp://"); // judge udp or not
if (st != std::string::npos) ofmt_name = "mpegts";
r = avformat_alloc_output_context2(¶mters.ofmt_ctx, NULL, ofmt_name, paramters.ostream.c_str());
if (r < 0) {
emit_error(r, paramters);
return;
}
paramters.stream_mapping_size = paramters.ifmt_ctx->nb_streams;
paramters.stream_mapping = (int *)av_mallocz_array(paramters.stream_mapping_size, sizeof(int));
if (!paramters.stream_mapping) {
emit_error(-1, paramters);
return;
}
int stream_index = 0;
for (int ii = 0; ii < paramters.ifmt_ctx->nb_streams; ii++) {
AVStream *istream = paramters.ifmt_ctx->streams[ii];
enum AVMediaType mt = istream->codecpar->codec_type;
if (mt != AVMEDIA_TYPE_AUDIO && mt != AVMEDIA_TYPE_VIDEO) {
paramters.stream_mapping[ii] = -1;
continue;
}
paramters.stream_mapping[ii] = stream_index++;
AVStream *ostream = NULL;
ostream = avformat_new_stream(paramters.ofmt_ctx, NULL);
if (ostream) {
r = avcodec_parameters_copy(ostream->codecpar, istream->codecpar);
if (r < 0) {
emit_error(r, paramters);
return;
}
ostream->codecpar->codec_tag = 0;
}
// if video, calc push delay
if (mt == AVMEDIA_TYPE_VIDEO) {
paramters.video_index = ii;
AVRational frame_rate = av_guess_frame_rate(paramters.ifmt_ctx, istream, NULL);
AVRational reciprocal = { frame_rate.den, frame_rate.num };
double duration = (reciprocal.num && reciprocal.den) ? av_q2d(reciprocal) : (0);
paramters.video_udelay = (int64_t)(duration * AV_TIME_BASE);
}
}
// if flags AVFMT_NOFILE is set, AVIOContext *pb will be NULL
// should open first, otherwise is opened.
if (!(paramters.ofmt_ctx->oformat->flags & AVFMT_NOFILE)) {
r = avio_open(¶mters.ofmt_ctx->pb, paramters.ostream.c_str(), AVIO_FLAG_WRITE);
if (r < 0) {
emit_error(r, paramters);
return;
}
}
// start pull
while (paramters.run) {
AVPacket packet;
paramters.cache_packets_size = paramters.av_packets.size();
if (paramters.cache_packets_size < 1 * MB) {
r = av_read_frame(paramters.ifmt_ctx, &packet);
if (r >= 0) {
if (packet.stream_index >= paramters.stream_mapping_size ||
paramters.stream_mapping[packet.stream_index] < 0) {
av_packet_unref(&packet);
continue;
}
paramters.av_packets.eq(packet);
}
else {
// maybe rtsp/rtmp has disconnected
// maybe file is eof
break;
}
}
else CPP11_MSLEEP(30);
}
}
void AvPusher_push(AvPusherParamters ¶mters) {
// start push
while (!paramters.ofmt_ctx) {
if (!paramters.run) return;
CPP11_MSLEEP(30);
}
int r = avformat_write_header(paramters.ofmt_ctx, NULL);
if (r >= 0) {
while (paramters.run) {
if (!paramters.av_packets.empty()) {
AVPacket p = paramters.av_packets.dq();
paramters.flowrate += p.size;
AVStream *istream, *ostream;
istream = paramters.ifmt_ctx->streams[p.stream_index];
if (p.stream_index == paramters.video_index) {
av_usleep(paramters.video_udelay);
}
p.stream_index = paramters.stream_mapping[p.stream_index];
ostream = paramters.ofmt_ctx->streams[p.stream_index];
// update packet timestamp
av_packet_rescale_ts(&p, istream->time_base, ostream->time_base);
p.pos = -1;
r = av_interleaved_write_frame(paramters.ofmt_ctx, &p);
if (r < 0) break;
av_packet_unref(&p);
}
else CPP11_MSLEEP(30);
}
av_write_trailer(paramters.ofmt_ctx);
}
}
AvPusher::AvPusher() {
}
void AvPusher::start(const std::string istream, const std::string ostream, on_error av_on_error, void *priv) {
paramters_.istream = istream;
paramters_.ostream = ostream;
paramters_.av_on_error = av_on_error;
paramters_.priv = priv;
paramters_.flowrate = 0;
paramters_.run = true;
tsk_pull_ = std::thread(AvPusher_pull, std::ref(paramters_));
tsk_push_ = std::thread(AvPusher_push, std::ref(paramters_));
}
void AvPusher::stop() {
paramters_.run = false;
if (tsk_pull_.joinable()) tsk_pull_.join();
if (tsk_push_.joinable()) tsk_push_.join();
paramters_.av_packets.clear();
if (paramters_.ofmt_ctx && !(paramters_.ofmt_ctx->oformat->flags & AVFMT_NOFILE)) {
avio_closep(¶mters_.ofmt_ctx->pb);
}
if (paramters_.ifmt_ctx) {
avformat_close_input(¶mters_.ifmt_ctx);
paramters_.ifmt_ctx = NULL;
}
if (paramters_.ofmt_ctx) {
avformat_free_context(paramters_.ofmt_ctx);
paramters_.ofmt_ctx = NULL;
}
if (paramters_.stream_mapping) {
av_freep(¶mters_.stream_mapping);
paramters_.stream_mapping = NULL;
}
}
int64_t AvPusher::flowrate() {
return paramters_.flowrate;
}
```
- 序言
- 编解码
- H264
- HEVC码流解析
- H264编码原理
- 多媒体封装
- MP4
- 学好 MP4,让直播更给力
- AAC
- FLV
- 流媒体协议
- RTSP
- RTCP
- RTP
- H265 RTP封包笔记
- SDP
- RTMP
- RTMP URL
- rtmp url基础
- webrtc
- 编译
- 最简单的编译webrtc方案
- Webrtc音视频会议之Webrtc“不求甚解”
- Webrtc音视频会议之Mesh/MCU/SFU三种架构
- 音频传输之Jitter Buffer设计与实现
- Janus
- Webrtc音视频会议之Janus编译
- Webrtc音视频会议之Janus源码架构设计
- webrtc服务器-janus房间管理
- 源码分析
- WebRTC视频JitterBuffer详解
- 走读Webrtc 中的视频JitterBuffer(一)
- 走读webrtc 中的视频JitterBuffer(二)
- webrtc视频帧率控制算法机制
- 目标码率丢帧-1
- 目标帧率丢帧-2
- 29 如何使用Medooze 实现多方视频会议
- FFmpeg
- FFmpeg编译
- Window10下编译最新版FFmpeg的方法步骤
- FFMPEG静态库编译
- ffmpeg实现画中画
- FFmpeg推流器
- ffmpeg-aac
- OpenCV
- OpenCV学习笔记——视频的边缘检测
- 图像特征点匹配(视频质量诊断、画面抖动检测)
- 图像质量诊断