MP3文件TCP流式分发并播放


MP3文件流式分发并播放,主要涉及广播分发音频文件,然后在广播端解码播放。


呱牛笔记

服务器端,读文件,发送流,为了避免粘包,先发长度,然后发送二进制流。

流格式为RTP+1300个字节的音频

struct ClientConfig {
public:
    char  ip[64];
    unsigned short port;
    bool useTcp; 
};
std::vector<ClientConfig*> clients;

static volatile uint8_t running_flag = 0;
static volatile uint8_t running_end_flag = 1;
// RTP头结构
#pragma pack(push, 1)
struct RTPHeader {
    uint8_t version_cc;
    uint8_t marker_pt;
    uint16_t seq;
    uint32_t timestamp;
    uint32_t ssrc;
}; 
DWORD WINAPI SendWorker(LPVOID param) { 
    char* file_name = (char*)param;

    int file_size = GetFileSize(file_name);
    // 一次性读取整个文件
    FILE* fp = fopen(file_name, "rb");
    if (fp == NULL) {
        free(file_name);
        return 0;
    } 
    //fseek(fp, 0, SEEK_END);
    //long file_size = ftell(fp);
    fseek(fp, 0, SEEK_SET); 

    std::vector<SOCKET> clientSocks;
    int sendBufSize = 1 * 1024 * 1024; // 1MB
    for (auto* client : clients) {
        if (client == NULL) {
            continue;
        }
        SOCKET sock = client->useTcp ?
            socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) :
            socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
        setsockopt(sock, SOL_SOCKET, SO_SNDBUF,
            (char*)&sendBufSize, sizeof(sendBufSize));

        sockaddr_in addr = {};
        addr.sin_family = AF_INET;
        addr.sin_port = htons(client->port);
        addr.sin_addr.s_addr = inet_addr(client->ip);
        if (client->useTcp) connect(sock, (sockaddr*)&addr, sizeof(addr));

        clientSocks.push_back(sock);
    }
#if 1
    //第一个包,发送文件名,文件大小,文件信息
    char send_buffer[256] = "";
    std::string name = GetFileName(file_name);

    sprintf(send_buffer, "start#name:%s#file_size:%d",name.c_str(), file_size);
    for (size_t i = 0; i < clients.size(); ++i) {
        if (clients[i]->useTcp) {
            int packetSize = file_size;//strlen(send_buffer)+1;
            uint32_t netSize = htonl(packetSize);

            // 发送长度头
            send(clientSocks[i], (char*)&netSize, sizeof(netSize), 0);

            //send(clientSocks[i], send_buffer, packetSize+1, 0);
        }
        else {
            int packetSize = strlen(send_buffer)+1;
            sockaddr_in addr = {};
            addr.sin_family = AF_INET;
            addr.sin_port = htons(clients[i]->port);
            addr.sin_addr.s_addr = inet_addr(clients[i]->ip);
            sendto(clientSocks[i], send_buffer, packetSize + 1, 0,
                (sockaddr*)&addr, sizeof(addr));
        }
    }
#endif
      
    // 分段发送
    const int chunkSize = 1300; // 有效载荷大小
    RTPHeader header;
    header.version_cc = 0x80; // V=2, CC=0
    header.marker_pt = 0x60;  // PT=96
    header.ssrc = htonl(0x12345678);
    char* fileData = new char[chunkSize*2];

    running_end_flag = 0;
    // 构造RTP包
    char *packet = (char *)malloc(sizeof(RTPHeader) + chunkSize);
    int seq = 1;
    int file_offset = 0;
    int need_pause = 0;
    while (!feof(fp)) {
        if (!running_flag) {
            need_pause = 1;
            break;
        }
        int readSize = fread(fileData, 1, chunkSize, fp);
        if (readSize <= 0) {

            break;
        }

        header.seq = htons(seq++);
        header.timestamp = htonl(file_offset); // 模拟时间戳
        memcpy((void *)packet, (void*)&header, HEADER_SIZE);
         
        memcpy((char*)packet + HEADER_SIZE, (char*)fileData, readSize);
        int packetSize = HEADER_SIZE + readSize;

        // 为每个客户端发送当前块
        for (size_t i = 0; i < clients.size(); ++i) {
            if (clients[i]->useTcp) {
                uint32_t netSize = htonl(packetSize);

                // 发送长度头
                send(clientSocks[i], (char*)&netSize, sizeof(netSize), 0);

                send(clientSocks[i], packet, packetSize, 0);
            }
            else {
                sockaddr_in addr = {};
                addr.sin_family = AF_INET;
                addr.sin_port = htons(clients[i]->port);
                addr.sin_addr.s_addr = inet_addr(clients[i]->ip);
                sendto(clientSocks[i], packet, packetSize, 0,
                    (sockaddr*)&addr, sizeof(addr));
            }
        }
        file_offset += readSize;

        if (!running_flag) {
            need_pause = 1;
            break;
        }
        Sleep(10); // 控制速率
    }

    char temp[64] = "";
    sprintf(temp, "end!");
    if (need_pause) {
        sprintf(temp, "stop");
    }
    for (size_t i = 0; i < clients.size(); ++i) {
        if (clients[i]->useTcp) {
            int len = strlen(temp)+1;
            uint32_t netSize = htonl(len);
            // 发送长度头
            send(clientSocks[i], (char*)&netSize, sizeof(netSize), 0);
            send(clientSocks[i], temp, len, 0);
        }
        else {
            sockaddr_in addr = {};
            addr.sin_family = AF_INET;
            addr.sin_port = htons(clients[i]->port);
            addr.sin_addr.s_addr = inet_addr(clients[i]->ip);
            sendto(clientSocks[i], temp, strlen(temp), 0,
                (sockaddr*)&addr, sizeof(addr));
        }
    }
    Sleep(10); // 控制速率

    for (auto sock : clientSocks) closesocket(sock);
    delete[] fileData;

    clients.clear();

    running_end_flag = 1;
    running_flag = 0;

    fclose(fp);
    free(packet);
    free(file_name);
    return 0;
}





Windows电脑模拟客户端:

几个关键点:1、音频流保存为文件;

2、使用ffmpeg的api探测探测音频流的格式,解码并转换为电脑播放的音频采样率、channel和音频PCM格式;

3、使用SDL的API播放音频,关键是AUDIO_F32LSB的转换,要不播放出来的声音有异常;


#define DATA_PORT 5600
#define SAMPLE_RATE  44100
#define OUT_PLAY_CHANNEL 2

enum AVSampleFormat sdl_format_to_ffmpeg(SDL_AudioFormat format);

// RTP头结构
#pragma pack(push, 1)
struct RTPHeader {
    uint8_t version_cc;
    uint8_t marker_pt;
    uint16_t seq;
    uint32_t timestamp;
    uint32_t ssrc;
};
#define AUDIO_BUFFER_COUNT 4
#define MAX_FRAME_SIZE 4410*8
#define DETECT_LEN 300*1024
#define OUT_DATA_SIZE 1152 * 8

// 音频解码上下文
struct AudioContext {
    AVCodecContext* codecCtx;

    AVFormatContext* fmt_ctx;
    AVCodecParameters* codecpar;
    int audio_stream_idx;
    const AVCodec* codec; 

    AVCodecParserContext* parser;

    SwrContext* swrCtx;
    HWAVEOUT hWaveOut;
    bool isTcpMode;
    AVFrame* frame;
    AVFrame* pframePCM;
    AVPacket* pkt;

    uint8_t* outData[2];

    uint8_t* cache_pcm_buffer;
    size_t cache_capacity;
    size_t cache_buffer_size;

    uint8_t* recv_buffer;
    size_t capacity;
    size_t recv_buffer_size;
    size_t read_buffer;
    int file_size;

    unsigned long play_start_time;  
    int duration;
    double accumulated_play_time;


    size_t dectected;
    struct {
        uint8_t* data;
        int size;
        bool inUse;
    } audioBuffers[AUDIO_BUFFER_COUNT];
    int currentBuffer;

    pthread_mutex_t bufferMutex;
    bool running;//
    bool play_pause;//

    SDL_AudioDeviceID audio_device;

};

static AudioContext ctx;
 
int configure_decoder(AudioContext* ctx) {
    if (ctx->codec != NULL) { 
        avcodec_free_context(&ctx->codecCtx);
        ctx->codecCtx = NULL;
        swr_free(&ctx->swrCtx);
    }
    if (ctx->codecpar == NULL) {
        printf("codecpar == NULL \n");
        return -1;
    }
    // 查找解码器
    ctx->codec = avcodec_find_decoder(ctx->codecpar->codec_id);
    if (!ctx->codec) {
        printf("avcodec_find_decoder failed \n");
        return AVERROR_DECODER_NOT_FOUND;
    }

    // 创建解码器上下文
    ctx->codecCtx = avcodec_alloc_context3(ctx->codec);
    if (!ctx->codecCtx) {
        printf("avcodec_alloc_context3 failed \n");
        return AVERROR(ENOMEM);
    }

    // 复制参数到解码器上下文
    int ret = avcodec_parameters_to_context(ctx->codecCtx, ctx->codecpar);
    if (ret < 0) {
        printf("avcodec_parameters_to_context failed: %d\n", ret);
        return ret;
    }
    // 打开解码器
    ret = avcodec_open2(ctx->codecCtx, ctx->codec, NULL);
    if (ret < 0) {
        printf("Could not open codec: %d\n", ret);
        avcodec_free_context(&ctx->codecCtx);
        return -1;
    }

    ctx->swrCtx = swr_alloc();

    // 初始化重采样器 
    swr_alloc_set_opts(ctx->swrCtx,
        AV_CH_LAYOUT_STEREO,
#ifdef USE_SDL_DEVICE 
        sdl_format_to_ffmpeg(AUDIO_F32LSB),
#else
        AV_SAMPLE_FMT_S16,
#endif
        SAMPLE_RATE,
        av_get_default_channel_layout(ctx->codecCtx->channels),
        ctx->codecCtx->sample_fmt,
        ctx->codecCtx->sample_rate,
        0,
        NULL);
    ret = swr_init(ctx->swrCtx);
    if (ret < 0)
    {
        printf("swr_init failed: %d\n", ret);
        avcodec_free_context(&ctx->codecCtx);
        swr_free(&ctx->swrCtx);
        return -1;
    }
    return ret;
}


enum AVSampleFormat sdl_format_to_ffmpeg(SDL_AudioFormat format) {
    switch (format) {
    case AUDIO_S16SYS: 
    case AUDIO_S16MSB:
        return AV_SAMPLE_FMT_S16;
    case AUDIO_F32LSB:
    case AUDIO_F32MSB:
        return AV_SAMPLE_FMT_FLT;
    case AUDIO_S32LSB:
    case AUDIO_S32MSB:
        return AV_SAMPLE_FMT_S32;
    default:
        return AV_SAMPLE_FMT_NONE;
    }
}
#define SDL_AUDIO_BUFFER_SIZE 4096
int start_play_with_sdl() {

    // 设置SDL音频参数
    SDL_AudioSpec desired, obtained;
    desired.freq = SAMPLE_RATE;             // 采样率
    desired.format = AUDIO_F32;// AUDIO_F32LSB;// AUDIO_S16SYS;              // 16位有符号整数(系统字节序)
    desired.channels = 2;                       // 立体声
    desired.silence = 0;                        // 静音值为0
    desired.samples = SDL_AUDIO_BUFFER_SIZE;    // 音频缓冲区大小
    desired.callback = NULL;                    // 不使用回调
    desired.userdata = NULL;


    if (SDL_Init(SDL_INIT_AUDIO) < 0) {
        fprintf(stderr, "SDL初始化失败: %s\n", SDL_GetError());
        return -1;
    }

    // 打开音频设备
    ctx.audio_device = SDL_OpenAudioDevice(NULL, 0, &desired, &obtained, SDL_AUDIO_ALLOW_FORMAT_CHANGE);
    if (ctx.audio_device == 0) {
        fprintf(stderr, "Failed to open audio device: %s\n", SDL_GetError()); 
        return -1;
    }

    // 检查实际获得的音频格式是否符合要求
    if (obtained.format != AUDIO_F32LSB) {
        fprintf(stderr, "SDL did not provide AUDIO_S16SYS format\n"); 
        return -1;
    }

    // 开始播放
    SDL_PauseAudioDevice(ctx.audio_device, 0);

}

// 探测音频格式
int probe_audio_format(AudioContext* ctx, std::string &temp_file) {
    if (ctx->fmt_ctx != NULL) {
        avformat_free_context(ctx->fmt_ctx);
    }
    ctx->fmt_ctx = avformat_alloc_context();
    if (!ctx->fmt_ctx) return AVERROR(ENOMEM); 
    // 探测前20KB数据(可调整)
    ctx->fmt_ctx->probesize = DETECT_LEN;
    ctx->fmt_ctx->max_analyze_duration = 1 * AV_TIME_BASE; // 5秒

    // 探测格式但不解析完整文件
    int ret = avformat_open_input(&ctx->fmt_ctx, temp_file.c_str(), NULL, NULL);
    if (ret < 0) {
        printf("open failed");
        return ret;
    }

    // 查找流信息(仅头部)
    ret = avformat_find_stream_info(ctx->fmt_ctx, NULL);
    if (ret < 0) {
        printf("avformat_find_stream_info failed");
        return ret;
    }

    // 查找音频流
    for (unsigned i = 0; i < ctx->fmt_ctx->nb_streams; i++) {
        AVStream* stream = ctx->fmt_ctx->streams[i];
        if (stream->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
            ctx->audio_stream_idx = i;
            ctx->codecpar = stream->codecpar;
            break;
        }
    }

    if (ctx->audio_stream_idx == -1) {
        return AVERROR_STREAM_NOT_FOUND;
    }
    if (0 != configure_decoder(ctx)) {
        return 0;
    }

    {
        int64_t total_duration = ctx->fmt_ctx->duration + ((ctx->fmt_ctx->duration <= (INT64_MAX - 5000)) ? 5000 : 0);
        ctx->duration = (total_duration / AV_TIME_BASE);
        int us = (total_duration % AV_TIME_BASE);
        printf("total_duration:%02d:%02d\r\n", ctx->duration, us);
    }

    struct timeval tv;
    gettimeofday(&tv, NULL);
    unsigned long start_time = tv.tv_sec * 1000 + tv.tv_usec / 1000;//ms   

    ctx->recv_buffer_size = 0;
    ctx->dectected = 1;
    ctx->play_start_time = start_time;// GetTickCount();// time(nullptr); 

    register_pjsip_thread("NetworkThread");
    //start_play(ctx->codecCtx->sample_rate, ctx->codecCtx->channels, NULL, RemotePlayCallbackFunc);
    
#ifndef USE_SDL_DEVICE
    start_play(SAMPLE_RATE, OUT_PLAY_CHANNEL, NULL, RemotePlayCallbackFunc);
#else
    start_play_with_sdl();
#endif
    return 0;
} 

bool appendBinaryFile(const std::string& filePath, const uint8_t* binaryData, size_t dataSize) {
    // 创建并打开文件流,以二进制追加模式
    std::ofstream outFile(filePath, std::ios::out | std::ios::binary | std::ios::app);

    // 检查文件是否成功打开
    if (!outFile) {
        std::cerr << "无法打开文件: " << filePath << std::endl;
        return false;
    }

    // 将二进制数据追加写入文件
    outFile.write(reinterpret_cast<const char*>(binaryData), dataSize);
     

    // 关闭文件流
    outFile.close();

    return true;
}

bool initDecoder(AudioContext* ctx) {
    // FFmpeg初始化
    avcodec_register_all();

    pthread_mutex_init(&ctx->bufferMutex, NULL);

    ctx->fmt_ctx = NULL;// avformat_alloc_context(); 
     
    for (int i = 0; i < AUDIO_BUFFER_COUNT; i++) {
        ctx->audioBuffers[i].data = (uint8_t*)av_malloc(MAX_FRAME_SIZE);
        if (!ctx->audioBuffers[i].data) return false;
        ctx->audioBuffers[i].size = 0;
        ctx->audioBuffers[i].inUse = false;
    }
    ctx->currentBuffer = 0;

    
    ctx->cache_capacity = 400 * 1024;
    ctx->cache_pcm_buffer = (uint8_t*)malloc(ctx->cache_capacity);
    ctx->cache_buffer_size = 0;

    ctx->capacity = 400 * 1024;
    ctx->recv_buffer = (uint8_t*)malloc(ctx->capacity);
    ctx->recv_buffer_size = 0;
    ctx->dectected = 0;
#if 1
    ctx->codecCtx = NULL;
    ctx->codec = NULL; 
    ctx->pkt = av_packet_alloc();
    av_init_packet(ctx->pkt);

    ctx->frame = av_frame_alloc();

    ctx->outData[0] = (uint8_t*)av_malloc(OUT_DATA_SIZE);
    ctx->outData[1] = (uint8_t*)av_malloc(OUT_DATA_SIZE);
    memset(ctx->outData[0], 0x00, OUT_DATA_SIZE);
    memset(ctx->outData[1], 0x00, OUT_DATA_SIZE);

    ctx->pframePCM = av_frame_alloc();

    if (ctx->frame == NULL || ctx->pframePCM == NULL) {
        return false;
    }
    AVFrame* pframePCM = ctx->pframePCM;
    
#ifdef USE_SDL_DEVICE
    
        pframePCM->format = sdl_format_to_ffmpeg(AUDIO_F32LSB);
#else
    pframePCM->format = AV_SAMPLE_FMT_S16;
#endif
    pframePCM->channel_layout = AV_CH_LAYOUT_STEREO;
    pframePCM->sample_rate = SAMPLE_RATE;
    pframePCM->nb_samples = SAMPLE_RATE / 100;// 2 * SAMPLE_RATE / 100;//(rate*channels*AV_CH_LAYOUT_STEREO*20)/8000;//
    pframePCM->channels = OUT_PLAY_CHANNEL;
    av_frame_get_buffer(pframePCM, 0);

    ctx->read_buffer = pframePCM->nb_samples * OUT_PLAY_CHANNEL;

    ringmalloc(ctx->read_buffer * 2);
    return true;
}

// 网络接收线程
int NetworkThread(void * param) {
    AudioContext* ctx = (AudioContext*)param; 
     
    SOCKET sock = ctx->isTcpMode ?
        socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) :
        socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
     
    sockaddr_in addr = {};
    addr.sin_family = AF_INET;
    addr.sin_port = htons(DATA_PORT);
    addr.sin_addr.s_addr = INADDR_ANY;
    bind(sock, (sockaddr*)&addr, sizeof(addr));
     
    register_pjsip_thread("NetworkThread");

    std::string curr_path = "";
    GetExePath(curr_path); 
    std::string save_temp_path = curr_path + "\\temp.data";// 
    wchar_t* file_temp = str_to_wstr(save_temp_path.c_str());

    // 接收数据循环
    int start_recv_len = 100 * 1024;
#define BUFFER_SIZE 21480
    uint8_t *buffer = (uint8_t *)malloc(BUFFER_SIZE);
    int recvLen = 0;
    // TCP模式监听连接
    if (ctx->isTcpMode) {
        do {
            listen(sock, 1);
            SOCKET client = accept(sock, NULL, NULL); 
            SOCKET new_sock = client;
            
            ctx->dectected = 0;
            ctx->recv_buffer_size = 0;
            ctx->play_pause = false;

            int first_packet = 1;
            int server_play_end = 0;
            int server_play_stop = 0;
            while (1) { 
                uint32_t netSize = 0;
                int ret = recv(new_sock, (char*)&netSize, sizeof(netSize), MSG_WAITALL);
                if (ret < 0 || recvLen < 0) {
                    printf("1 failed: client closed.\r\n"); 
                    break; 
                }
                if (ret != 4) {
                    //error 
                    printf("1 failed: client ret failed.\r\n");
                    break;
                }

                recvLen = ntohl(netSize);
                if (first_packet == 1) {
                    first_packet = 0;
                    //第一个包是文件大小
                    //sprintf(send_buffer, "start#name:%s#file_size:%d", name.c_str(), file_size);
                    ctx->file_size = recvLen;
                    printf("2 : server start,file_size:%d.\r\n", recvLen);
                    continue; 
                }

                ret = recv(new_sock, (char*)(buffer), recvLen, 0);
                if (ret <= 0 || recvLen <= 0) {
                    printf("2 failed: client closed.\r\n"); 
                    break;
                }

                if (recvLen == 5) {
                    buffer[recvLen] = '\0';
                    if (strstr((char *)buffer, "stop") != NULL) {
                        printf("2 failed: server closed.\r\n");
                        server_play_stop = 1;
                        Sleep(1000); 
                        break;
                    }
                    else if (strstr((char*)buffer, "end!") != NULL) {
                        printf("2 failed: server play end.\r\n");
                        server_play_end = 1;
                        break;
                    }
                }

                uint8_t* payload = buffer + HEADER_SIZE;
                int payload_len = recvLen - HEADER_SIZE;
                if (payload_len < 0) {
                    continue;
                }
                
                appendBinaryFile(save_temp_path.c_str(), payload, payload_len); 
                ctx->recv_buffer_size += payload_len; 
                
                if (ctx->recv_buffer_size >= DETECT_LEN) {
                    if (ctx->dectected == 0) {
                        probe_audio_format(ctx, save_temp_path); 
                    } 
                } 
                if (ctx->dectected) {
                    ret = decodeFrame(ctx, buffer, recvLen);
                    if (ret == AVERROR_EOF) {
                        printf("2 failed: server closed.\r\n"); 
                        break;
                    }
                }
                Sleep(5);
            }

            if (ctx->dectected && server_play_end) {
                do {
                    int ret = decodeFrame(ctx, buffer, recvLen);
                    if (ret == AVERROR_EOF) {
                        printf("2 failed: file end.\r\n");
                        break;
                    }

                    Sleep(10);
                } while (!ctx->play_pause);

                do {
                    int ret = do_get_frame(ctx);
                    if (ret == -1) {
                        break;
                    }
                    Sleep(10);
                } while (!ctx->play_pause);
            }


#ifdef USE_SDL_DEVICE

            while (SDL_GetQueuedAudioSize(ctx->audio_device) > 0) {
                Uint32 queue_size = SDL_GetQueuedAudioSize(ctx->audio_device);
                double remaining_sec = (double)queue_size / (SAMPLE_RATE * OUT_PLAY_CHANNEL * 2); // 2 bytes per sample
                printf("剩余: %.2f KB (约 %.1f 秒)\r", queue_size / 1024.0, remaining_sec);
                SDL_Delay(100);
            }
            SDL_Delay(100);
            if (ctx->audio_device != 0) {
                SDL_ClearQueuedAudio(ctx->audio_device); // 清空音频队列
                SDL_CloseAudioDevice(ctx->audio_device);
                printf("SDL音频设备已关闭\n");
            }
            SDL_Quit();
#endif
            ctx->dectected = 0;
            ctx->recv_buffer_size = 0;

            closesocket(new_sock);
            Sleep(100);
            stop_play();
            avformat_close_input(&ctx->fmt_ctx);
            avformat_free_context(ctx->fmt_ctx);
            avcodec_free_context(&ctx->codecCtx);
            ctx->codecCtx = NULL;
            ctx->fmt_ctx = NULL;
            Sleep(10);
            DeleteFile(file_temp);
        } while (ctx->running);
    }
    else {
        while (ctx->running) { 
             recvLen =
                    recvfrom(sock, (char*)buffer, BUFFER_SIZE, 0, NULL, NULL);
             if (recvLen == 4) {
                 buffer[recvLen] = '\0';
                 if (strstr((char*)buffer, "stop") != NULL) {
                     break;
                 }
             }else  if (recvLen > 0) {
                decodeFrame(ctx, buffer, recvLen);
            }
        } 
    }

    delete[]file_temp;
    free(buffer);
    closesocket(sock); 
    return 0;
}


遗留问题,保存到音频流数据,能探测到音频格式之后,播放总是播一半就结束了。



-------------------广告线---------------
项目、合作,欢迎勾搭,邮箱:promall@qq.com


本文为呱牛笔记原创文章,转载无需和我联系,但请注明来自呱牛笔记 ,it3q.com

请先登录后发表评论
  • 最新评论
  • 总共0条评论