/* * OpenMRCP - Open Source Media Resource Control Protocol Stack * Copyright (C) 2007, Cepstral LLC * * Version: MPL 1.1 * * The contents of this file are subject to the Mozilla Public License Version * 1.1 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * http://www.mozilla.org/MPL/ * * Software distributed under the License is distributed on an "AS IS" basis, * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License * for the specific language governing rights and limitations under the * License. * * Author(s): * Arsen Chaloyan * * Contributor(s): * */ #include "rtp_session.h" #include "rtp.h" #include "jitter_buffer.h" #include "audio_source.h" #include "audio_sink.h" #define RTP_SEQ_MOD (1 << 16) #define MAX_DROPOUT 3000 #define MAX_MISORDER 100 #define DISCARDED_TO_RECEIVED_RATIO_THRESHOLD 30 /* 30% */ #define DEVIATION_THRESHOLD 4000 typedef struct rtp_tx_stream_t rtp_tx_stream_t; struct rtp_tx_stream_t { audio_sink_t sink; apr_sockaddr_t *r_sockaddr; apr_socket_t *socket; apr_uint32_t ssrc; codec_handle_t codec; apr_byte_t event_pt; apr_uint16_t ptime; apr_uint16_t packet_frames; apr_uint16_t current_frames; apr_uint32_t samples_per_frame; apr_byte_t marker; apr_uint16_t last_seq_num; apr_uint32_t timestamp; char packet[1500]; apr_size_t packet_size; apr_uint32_t packets_sent; }; typedef struct rtp_rx_stat_t rtp_rx_stat_t; struct rtp_rx_stat_t { /** number of valid RTP packets received */ apr_uint32_t received_packets; /** number of invalid RTP packets received */ apr_uint32_t invalid_packets; /** number of discarded in jitter buffer packets */ apr_uint32_t discarded_packets; /** number of ignored packets */ apr_uint32_t ignored_packets; /** number of lost in network packets */ apr_uint32_t lost_packets; /** number of restarts */ apr_byte_t restarts; /** network jitter (rfc3550) */ apr_uint32_t jitter; /** source id of received RTP stream */ apr_uint32_t ssrc; }; typedef struct rtp_rx_history_t rtp_rx_history_t; struct rtp_rx_history_t { apr_uint32_t seq_cycles; apr_uint16_t seq_num_base; apr_uint16_t seq_num_max; apr_uint32_t ts_last; apr_time_t time_last; apr_uint32_t jitter_min; apr_uint32_t jitter_max; apr_uint32_t ssrc_new; apr_byte_t ssrc_probation; }; typedef struct rtp_rx_periodic_history_t rtp_rx_periodic_history_t; struct rtp_rx_periodic_history_t { apr_uint32_t received_prior; apr_uint32_t discarded_prior; apr_uint32_t jitter_min; apr_uint32_t jitter_max; }; typedef struct rtp_rx_stream_t rtp_rx_stream_t; struct rtp_rx_stream_t { audio_source_t source; apr_sockaddr_t *r_sockaddr; apr_socket_t *socket; codec_handle_t codec; apr_byte_t event_pt; jitter_buffer_t *jb; jitter_buffer_config_t jb_config; rtp_rx_stat_t stat; rtp_rx_history_t history; rtp_rx_periodic_history_t periodic_history; }; struct rtp_session_t { rtp_tx_stream_t *tx_stream; rtp_rx_stream_t *rx_stream; apr_sockaddr_t *l_sockaddr; apr_socket_t *socket; apr_pool_t *pool; }; static apr_status_t rtp_tx_write_frame(audio_sink_t *sink, media_frame_t *frame); static const audio_sink_method_set_t audio_sink_method_set = { NULL, NULL, NULL, rtp_tx_write_frame }; static apr_status_t rtp_rx_open(audio_source_t *source); static apr_status_t rtp_rx_read_frame(audio_source_t *source, media_frame_t *frame); static const audio_source_method_set_t audio_source_method_set = { NULL, rtp_rx_open, NULL, rtp_rx_read_frame }; rtp_session_t* rtp_session_create(const char *local_ip, apr_port_t local_port, apr_pool_t *pool) { rtp_session_t *rtp_session = apr_palloc(pool,sizeof(rtp_session_t)); rtp_session->pool = pool; rtp_session->tx_stream = NULL; rtp_session->rx_stream = NULL; apr_sockaddr_info_get(&rtp_session->l_sockaddr,local_ip,APR_INET,local_port,0,pool); if(!rtp_session->l_sockaddr) { return NULL; } if(apr_socket_create(&rtp_session->socket,rtp_session->l_sockaddr->family,SOCK_DGRAM,0,pool) != APR_SUCCESS) { return NULL; } apr_socket_opt_set(rtp_session->socket,APR_SO_NONBLOCK,1); apr_socket_timeout_set(rtp_session->socket,0); apr_socket_opt_set(rtp_session->socket,APR_SO_REUSEADDR,1); if(apr_socket_bind(rtp_session->socket,rtp_session->l_sockaddr) != APR_SUCCESS) { return NULL; } return rtp_session; } apr_status_t rtp_session_destroy(rtp_session_t *rtp_session) { return APR_SUCCESS; } audio_sink_t* rtp_session_tx_stream_create(rtp_session_t *rtp_session, const char *remote_ip, apr_port_t remote_port, codec_descriptor_t* codec, const codec_manipulator_t* codec_manipulator, apr_uint16_t ptime) { rtp_tx_stream_t *tx_stream; if(!codec || !codec_manipulator) { return NULL; } tx_stream = apr_palloc(rtp_session->pool,sizeof(rtp_tx_stream_t)); tx_stream->codec.descriptor = codec; tx_stream->codec.manipulator = codec_manipulator; tx_stream->event_pt = 101; tx_stream->ptime = ptime; if(!tx_stream->ptime) { tx_stream->ptime = 20; } tx_stream->packet_frames = tx_stream->ptime / CODEC_FRAME_TIME_BASE; tx_stream->current_frames = 0; tx_stream->samples_per_frame = CODEC_FRAME_TIME_BASE * tx_stream->codec.descriptor->sampling_rate / 1000; tx_stream->ssrc = (apr_uint32_t)apr_time_now(); tx_stream->marker = 1; tx_stream->timestamp = 0; tx_stream->last_seq_num = 0; tx_stream->packets_sent = 0; tx_stream->socket = rtp_session->socket; apr_sockaddr_info_get(&tx_stream->r_sockaddr,remote_ip,APR_INET,remote_port,0,rtp_session->pool); if(!tx_stream->r_sockaddr) { return NULL; } if(tx_stream->codec.manipulator->open(&tx_stream->codec) != APR_SUCCESS) { return NULL; } tx_stream->sink.method_set = &audio_sink_method_set; rtp_session->tx_stream = tx_stream; return &tx_stream->sink; } apr_status_t rtp_session_tx_stream_destroy(rtp_session_t *rtp_session) { rtp_tx_stream_t *tx_stream = rtp_session->tx_stream; if(tx_stream) { tx_stream->codec.manipulator->close(&tx_stream->codec); rtp_session->tx_stream = NULL; } return APR_SUCCESS; } static APR_INLINE void rtp_rx_stat_reset(rtp_rx_stream_t *rx_stream); audio_source_t* rtp_session_rx_stream_create(rtp_session_t *rtp_session, const char *remote_ip, apr_port_t remote_port, codec_descriptor_t* codec, const codec_manipulator_t *codec_manipulator) { rtp_rx_stream_t *rx_stream; if(!codec || !codec_manipulator) { return NULL; } rx_stream = apr_palloc(rtp_session->pool,sizeof(rtp_rx_stream_t)); rx_stream->codec.descriptor = codec; rx_stream->codec.manipulator = codec_manipulator; rx_stream->event_pt = 101; rx_stream->socket = rtp_session->socket; apr_sockaddr_info_get(&rx_stream->r_sockaddr,remote_ip,APR_INET,remote_port,0,rtp_session->pool); if(!rx_stream->r_sockaddr) { return NULL; } if(rx_stream->codec.manipulator->open(&rx_stream->codec) != APR_SUCCESS) { return NULL; } rx_stream->jb_config.min_playout_delay = 10; rx_stream->jb_config.initial_playout_delay = 50; rx_stream->jb_config.max_playout_delay = 200; rx_stream->jb_config.adaptive = 0; rx_stream->jb = jitter_buffer_create(&rx_stream->jb_config,rx_stream->codec.descriptor->sampling_rate,rtp_session->pool); rtp_rx_stat_reset(rx_stream); rx_stream->source.method_set = &audio_source_method_set; rtp_session->rx_stream = rx_stream; return &rx_stream->source; } apr_status_t rtp_session_rx_stream_destroy(rtp_session_t *rtp_session) { rtp_rx_stream_t *rx_stream = rtp_session->rx_stream; if(rx_stream) { rx_stream->stat.lost_packets = 0; if(rx_stream->stat.received_packets) { apr_uint32_t expected_packets = rx_stream->history.seq_cycles + rx_stream->history.seq_num_max - rx_stream->history.seq_num_base + 1; if(expected_packets > rx_stream->stat.received_packets) { rx_stream->stat.lost_packets = expected_packets - rx_stream->stat.received_packets; } } #if PRINT_RTP_SESSION_STAT printf("RTP RxStat \n-received=%lu lost=%lu jitter=%lu\n", rx_stream->stat.received_packets, rx_stream->stat.lost_packets, rx_stream->stat.jitter); #endif if(rx_stream->stat.invalid_packets || rx_stream->stat.discarded_packets || rx_stream->stat.ignored_packets || rx_stream->stat.restarts) { #if PRINT_RTP_SESSION_STAT printf("-invalid=%lu discarded=%lu ignored=%lu restarts=%d\n", rx_stream->stat.invalid_packets, rx_stream->stat.discarded_packets, rx_stream->stat.ignored_packets, rx_stream->stat.restarts); #endif } rx_stream->codec.manipulator->close(&rx_stream->codec); jitter_buffer_destroy(rx_stream->jb); rtp_session->rx_stream = NULL; } return APR_SUCCESS; } audio_sink_t* rtp_session_audio_sink_get(rtp_session_t *rtp_session) { if(rtp_session && rtp_session->tx_stream) { return &rtp_session->tx_stream->sink; } return NULL; } audio_source_t* rtp_session_audio_source_get(rtp_session_t *rtp_session) { if(rtp_session && rtp_session->rx_stream) { return &rtp_session->rx_stream->source; } return NULL; } static APR_INLINE void rtp_header_prepare(rtp_tx_stream_t *tx_stream) { rtp_header_t *header = (rtp_header_t*)tx_stream->packet; header->version = RTP_VERSION; header->padding = 0; header->extension = 0; header->count = 0; header->marker = tx_stream->marker; header->type = tx_stream->codec.descriptor->payload_type; header->sequence = htons(++tx_stream->last_seq_num); header->timestamp = htonl(tx_stream->timestamp); header->ssrc = htonl(tx_stream->ssrc); if(tx_stream->marker) { tx_stream->marker = 0; } tx_stream->packet_size = sizeof(rtp_header_t); } static apr_status_t rtp_tx_write_frame(audio_sink_t *sink, media_frame_t *frame) { codec_frame_t encoded_frame; rtp_tx_stream_t *tx_stream = (rtp_tx_stream_t*)sink; tx_stream->timestamp += tx_stream->samples_per_frame; if(tx_stream->current_frames == 0) { rtp_header_prepare(tx_stream); } encoded_frame.buffer = tx_stream->packet + tx_stream->packet_size; tx_stream->codec.manipulator->encode(&tx_stream->codec,&frame->codec_frame,&encoded_frame); tx_stream->packet_size += encoded_frame.size; if(++tx_stream->current_frames == tx_stream->packet_frames) { apr_socket_sendto(tx_stream->socket,tx_stream->r_sockaddr,0,tx_stream->packet,&tx_stream->packet_size); tx_stream->packets_sent++; tx_stream->current_frames = 0; } return APR_SUCCESS; } static APR_INLINE void rtp_rx_stat_reset(rtp_rx_stream_t *rx_stream) { memset(&rx_stream->stat,0,sizeof(rx_stream->stat)); memset(&rx_stream->history,0,sizeof(rx_stream->history)); memset(&rx_stream->periodic_history,0,sizeof(rx_stream->periodic_history)); } static APR_INLINE void rtp_rx_stat_init(rtp_rx_stream_t *rx_stream, rtp_header_t *header, apr_time_t *time) { rx_stream->stat.ssrc = header->ssrc; rx_stream->history.seq_num_base = rx_stream->history.seq_num_max = (apr_uint16_t)header->sequence; rx_stream->history.ts_last = header->timestamp; rx_stream->history.time_last = *time; } static APR_INLINE void rtp_rx_restart(rtp_rx_stream_t *rx_stream) { apr_byte_t restarts = ++rx_stream->stat.restarts; rtp_rx_stat_reset(rx_stream); jitter_buffer_restart(rx_stream->jb); rx_stream->stat.restarts = restarts; } static rtp_header_t* rtp_rx_header_get(rtp_rx_stream_t *rx_stream, void **buffer, apr_size_t *size) { apr_size_t offset = 0; rtp_header_t *header = (rtp_header_t*)*buffer; /* RTP header validity check */ if(header->version != RTP_VERSION) { return NULL; } /* calculate payload offset */ offset = sizeof(rtp_header_t) + (header->count * sizeof(apr_uint32_t)); /* additional offset in case of RTP extension */ if(header->extension) { rtp_extension_header_t *ext_header = (rtp_extension_header_t*)(((apr_byte_t*)*buffer)+offset); offset += (ntohs(ext_header->length) * sizeof(apr_uint32_t)); } if (offset >= *size) { return NULL; } /* skip to payload */ *buffer = (apr_byte_t*)*buffer + offset; *size = *size - offset; return header; } typedef enum { RTP_SSRC_UPDATE, RTP_SSRC_PROBATION, RTP_SSRC_RESTART } rtp_ssrc_result_t; static APR_INLINE rtp_ssrc_result_t rtp_rx_update_ssrc(rtp_rx_stream_t *rx_stream, apr_uint32_t ssrc) { if(rx_stream->stat.ssrc == ssrc) { /* known ssrc */ if(rx_stream->history.ssrc_probation) { /* reset the probation for new ssrc */ rx_stream->history.ssrc_probation = 0; rx_stream->history.ssrc_new = 0; } } else { if(rx_stream->history.ssrc_new == ssrc) { if(--rx_stream->history.ssrc_probation == 0) { /* restart with new ssrc */ rx_stream->stat.ssrc = ssrc; return RTP_SSRC_RESTART; } else { return RTP_SSRC_PROBATION; } } else { /* start probation for new ssrc */ rx_stream->history.ssrc_new = ssrc; rx_stream->history.ssrc_probation = 5; return RTP_SSRC_PROBATION; } } return RTP_SSRC_UPDATE; } typedef enum { RTP_SEQ_UPDATE, RTP_SEQ_MISORDER, RTP_SEQ_DRIFT } rtp_seq_result_t; static APR_INLINE rtp_seq_result_t rtp_rx_update_seq(rtp_rx_stream_t *rx_stream, apr_uint16_t seq_num) { rtp_seq_result_t result = RTP_SEQ_UPDATE; apr_uint16_t seq_delta = seq_num - rx_stream->history.seq_num_max; if(seq_delta < MAX_DROPOUT) { if(seq_num < rx_stream->history.seq_num_max) { /* sequence number wrapped */ rx_stream->history.seq_cycles += RTP_SEQ_MOD; } rx_stream->history.seq_num_max = seq_num; } else if(seq_delta <= RTP_SEQ_MOD - MAX_MISORDER) { /* sequence number made a very large jump */ result = RTP_SEQ_DRIFT; } else { /* duplicate or misordered packet */ result = RTP_SEQ_MISORDER; } rx_stream->stat.received_packets++; if(rx_stream->stat.received_packets - rx_stream->periodic_history.received_prior >= 50) { rx_stream->periodic_history.received_prior = rx_stream->stat.received_packets; rx_stream->periodic_history.discarded_prior = rx_stream->stat.discarded_packets; rx_stream->periodic_history.jitter_min = rx_stream->stat.jitter; rx_stream->periodic_history.jitter_max = rx_stream->stat.jitter; } return result; } typedef enum { RTP_TS_UPDATE, RTP_TS_DRIFT } rtp_ts_result_t; static APR_INLINE rtp_ts_result_t rtp_rx_update_ts(rtp_rx_stream_t *rx_stream, apr_time_t *time, apr_uint32_t ts) { apr_int32_t deviation; /* arrival time diff in samples */ deviation = (apr_int32_t)apr_time_as_msec(*time - rx_stream->history.time_last) * rx_stream->codec.descriptor->sampling_rate / 1000; /* arrival timestamp diff */ deviation -= ts - rx_stream->history.ts_last; if(deviation < 0) { deviation = -deviation; } if(deviation > DEVIATION_THRESHOLD) { return RTP_TS_DRIFT; } rx_stream->stat.jitter += deviation - ((rx_stream->stat.jitter + 8) >> 4); #if PRINT_RTP_PACKET_STAT printf("jitter=%d deviation=%d\n",rx_stream->stat.jitter,deviation); #endif rx_stream->history.time_last = *time; rx_stream->history.ts_last = ts; if(rx_stream->stat.jitter < rx_stream->periodic_history.jitter_min) { rx_stream->periodic_history.jitter_min = rx_stream->stat.jitter; } if(rx_stream->stat.jitter > rx_stream->periodic_history.jitter_max) { rx_stream->periodic_history.jitter_max = rx_stream->stat.jitter; } return RTP_TS_UPDATE; } static APR_INLINE void rtp_rx_check_failure_threshold(rtp_rx_stream_t *rx_stream) { apr_uint32_t received; apr_uint32_t discarded; received = rx_stream->stat.received_packets - rx_stream->periodic_history.received_prior; discarded = rx_stream->stat.discarded_packets - rx_stream->periodic_history.discarded_prior; if(discarded * 100 > received * DISCARDED_TO_RECEIVED_RATIO_THRESHOLD) { /* failure threshold hired, restart */ rtp_rx_restart(rx_stream); } } static apr_status_t rtp_rx_packet_receive(rtp_rx_stream_t *rx_stream, void *buffer, apr_size_t size) { apr_time_t time; rtp_ssrc_result_t ssrc_result; rtp_header_t *header = rtp_rx_header_get(rx_stream,&buffer,&size); if(!header) { /* invalid RTP packet */ rx_stream->stat.invalid_packets++; return APR_EGENERAL; } header->sequence = ntohl(header->sequence); header->timestamp = ntohl(header->timestamp); header->ssrc = ntohl(header->ssrc); time = apr_time_now(); #if PRINT_RTP_PACKET_STAT printf("RTP time=%6lu ssrc=%8lx pt=%3u %cts=%9lu seq=%5u size=%lu\n", (apr_uint32_t)apr_time_usec(time), header->ssrc, header->type, header->marker ? '*' : ' ', header->timestamp, header->sequence, size); #endif if(!rx_stream->stat.received_packets) { /* initialization */ rtp_rx_stat_init(rx_stream,header,&time); } ssrc_result = rtp_rx_update_ssrc(rx_stream,header->ssrc); if(ssrc_result == RTP_SSRC_PROBATION) { rx_stream->stat.invalid_packets++; return APR_EGENERAL; } else if(ssrc_result == RTP_SSRC_RESTART) { rtp_rx_restart(rx_stream); rtp_rx_stat_init(rx_stream,header,&time); } rtp_rx_update_seq(rx_stream,(apr_uint16_t)header->sequence); if(rtp_rx_update_ts(rx_stream,&time,header->timestamp) == RTP_TS_DRIFT) { rtp_rx_restart(rx_stream); return APR_EGENERAL; } if(header->type == rx_stream->codec.descriptor->payload_type) { /* codec */ if(jitter_buffer_write(rx_stream->jb,&rx_stream->codec,buffer,size,header->timestamp) != JB_OK) { rx_stream->stat.discarded_packets++; rtp_rx_check_failure_threshold(rx_stream); } } else if(header->type == rx_stream->event_pt) { /* named event */ named_event_frame_t *named_event = (named_event_frame_t *)buffer; if(jitter_buffer_write_named_event(rx_stream->jb,named_event,header->timestamp) != JB_OK) { rx_stream->stat.discarded_packets++; rtp_rx_check_failure_threshold(rx_stream); } } else if(header->type == 13 || header->type == 19) { /* CN packet*/ rx_stream->stat.ignored_packets++; } else { /* invalid payload type */ rx_stream->stat.ignored_packets++; } return APR_SUCCESS; } static apr_status_t rtp_rx_process(rtp_rx_stream_t *rx_stream) { char buffer[1500]; apr_size_t size = sizeof(buffer); apr_size_t max_count = 5; while(max_count && apr_socket_recvfrom(rx_stream->r_sockaddr,rx_stream->socket,0,buffer,&size) == APR_SUCCESS) { rtp_rx_packet_receive(rx_stream,buffer,size); size = sizeof(buffer); max_count--; } return APR_SUCCESS; } static apr_status_t rtp_rx_open(audio_source_t *source) { rtp_rx_stream_t *rx_stream = (rtp_rx_stream_t*)source; char buffer[1500]; apr_size_t size = sizeof(buffer); apr_size_t drop_out_count = 0; while(apr_socket_recvfrom(rx_stream->r_sockaddr,rx_stream->socket,0,buffer,&size) == APR_SUCCESS) { drop_out_count++; } #if PRINT_RTP_SESSION_STAT printf("RTP Rx Open drop_out=%lu\n",drop_out_count); #endif rtp_rx_stat_reset(rx_stream); jitter_buffer_restart(rx_stream->jb); return APR_SUCCESS; } static apr_status_t rtp_rx_read_frame(audio_source_t *source, media_frame_t *frame) { rtp_rx_stream_t *rx_stream = (rtp_rx_stream_t*)source; rtp_rx_process(rx_stream); jitter_buffer_read(rx_stream->jb,frame); return APR_SUCCESS; }