From 8c8d425257f021eaa4ad66ce4aeff3353d4ca196 Mon Sep 17 00:00:00 2001 From: John Cox Date: Tue, 27 Aug 2013 14:48:48 +0100 Subject: [PATCH] Add a mode that adds rtp headers --- tswrite.c | 129 +++++++++++++++++++++++++++++++++++++----------- tswrite_defns.h | 8 +++ tswrite_fns.h | 4 +- 3 files changed, 111 insertions(+), 30 deletions(-) diff --git a/tswrite.c b/tswrite.c index a125756..8c23def 100644 --- a/tswrite.c +++ b/tswrite.c @@ -178,6 +178,15 @@ typedef struct circular_buffer_item *circular_buffer_item_p; #define SIZEOF_CIRCULAR_BUFFER_ITEM sizeof(struct circular_buffer_item) + + +typedef struct rtp_hdr_info_s +{ + uint16_t seq; + uint32_t ssrc; +} rtp_hdr_info_t; + + // ------------------------------------------------------------ // The header for the circular buffer // @@ -198,6 +207,11 @@ struct circular_buffer int TS_in_item; // max number of TS packets in a circular buffer item int item_size; // and thus the size of said item's data array + int hdr_size; + tswrite_pkt_hdr_type_t hdr_type; + union { + rtp_hdr_info_t rtp; + } hdr; int maxnowait; // max number consecutive packets to send with no wait int waitfor; // the number of microseconds to wait thereafter @@ -337,7 +351,8 @@ static int map_circular_buffer(circular_buffer_p *circular, int circ_buf_size, int TS_in_packet, int maxnowait, - int waitfor) + int waitfor, + const tswrite_pkt_hdr_type_t hdr_type) { // Rather than map a file, we'll map anonymous memory // BSD supports the MAP_ANON flag as is, @@ -353,27 +368,34 @@ static int map_circular_buffer(circular_buffer_p *circular, // is not fixed, we can't just allocate it "inside" the buffer items (it // wouldn't be nice to allocate the *maximum* possible space we might want!). // Instead, we'll put it as a byte array after the rest of our data. + // + // Space may be left to add an RTP header before each items data + // // So: + const int hdr_size = (hdr_type == PKT_HDR_TYPE_RTP) ? 12 : 0; int base_size = SIZEOF_CIRCULAR_BUFFER + (circ_buf_size * SIZEOF_CIRCULAR_BUFFER_ITEM); - int data_size = circ_buf_size * TS_in_packet * TS_PACKET_SIZE; + int data_size = circ_buf_size * (TS_in_packet * TS_PACKET_SIZE + hdr_size); int total_size = base_size + data_size; + circular_buffer_p cb; + + *circular = NULL; #ifdef _WIN32 // Under Windows, we're using threading to manage our parent/child // processes, so we can just use malloc here - *circular = malloc(total_size); - if (*circular == NULL) + cb = malloc(total_size); + if (cb == NULL) { fprint_err("### Error mapping circular buffer as shared memory: %s\n", strerror(errno)); return 1; } #else // _WIN32 - *circular = mmap(NULL,total_size, + cb = mmap(NULL,total_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANON, -1, 0); - if (*circular == MAP_FAILED) + if (cb == MAP_FAILED) { fprint_err("### Error mapping circular buffer as shared memory: %s\n", strerror(errno)); @@ -381,16 +403,27 @@ static int map_circular_buffer(circular_buffer_p *circular, } #endif // _WIN32 - (*circular)->start = 1; - (*circular)->end = 0; - (*circular)->pending = 0; - (*circular)->eos = FALSE; - (*circular)->size = circ_buf_size; - (*circular)->TS_in_item = TS_in_packet; - (*circular)->item_size = TS_in_packet * TS_PACKET_SIZE; - (*circular)->maxnowait = maxnowait; - (*circular)->waitfor = waitfor; - (*circular)->item_data = (byte *) *circular + base_size; + cb->start = 1; + cb->end = 0; + cb->pending = 0; + cb->eos = FALSE; + cb->size = circ_buf_size; + cb->TS_in_item = TS_in_packet; + cb->item_size = TS_in_packet * TS_PACKET_SIZE + hdr_size; + cb->hdr_size = hdr_size; + cb->hdr_type = hdr_type; + if (hdr_type == PKT_HDR_TYPE_RTP) + { + struct timeval now; + gettimeofday(&now, NULL); + + cb->hdr.rtp.seq = 0; + cb->hdr.rtp.ssrc = (uint32_t)(now.tv_sec ^ now.tv_usec << 12); // A somewhat random number + } + cb->maxnowait = maxnowait; + cb->waitfor = waitfor; + cb->item_data = (byte *) cb + base_size + hdr_size; + *circular = cb; return 0; } @@ -639,7 +672,8 @@ static int build_buffered_TS_output(buffered_TS_output_p *writer, tswrite_pcr_mode pcr_mode, int prime_size, int prime_speedup, - double pcr_scale) + double pcr_scale, + const tswrite_pkt_hdr_type_t hdr_type) { int err, ii; circular_buffer_p circular; @@ -651,7 +685,7 @@ static int build_buffered_TS_output(buffered_TS_output_p *writer, } err = map_circular_buffer(&circular,circ_buf_size,TS_in_packet, - maxnowait,waitfor); + maxnowait,waitfor,hdr_type); if (err) { print_err("### Error building buffered output\n"); @@ -902,6 +936,22 @@ static int set_buffer_item_time_pcr1(buffered_TS_output_p writer) return writer->which; } +static inline void +set_32_be(uint8_t * const p, const uint32_t x) +{ + p[0] = x >> 24; + p[1] = (x >> 16) & 0xff; + p[2] = (x >> 8) & 0xff; + p[3] = x & 0xff; +} + +static inline void +set_16_be(uint8_t * const p, const unsigned int x) +{ + p[0] = (x >> 8) & 0xff; + p[1] = x & 0xff; +} + // Set times on all packets between where we were and where we are now // Sets the time on both the first & last packets @@ -923,6 +973,18 @@ set_circ_times(const circular_buffer_p circ, struct circular_buffer_item * const item = circ->item + i; int64_t pcr = (int64_t)pcr1 + (int64_t)offset * (int64_t)pcr_gap / (int64_t)gap_bytes; + if (circ->hdr_type == PKT_HDR_TYPE_RTP) + { + uint32_t timestamp = (uint32_t)(pcr / (uint64_t)300); + uint8_t * rtp_buf = circ->item_data + i * circ->item_size - 12; + + rtp_buf[0] = 0x80; + rtp_buf[1] = 33; // TS + set_16_be(rtp_buf + 2, ++circ->hdr.rtp.seq); + set_32_be(rtp_buf + 4, timestamp); + set_32_be(rtp_buf + 8, circ->hdr.rtp.ssrc); + } + item->time = (uint32_t)(pcr / 27); // "time" in us offset += item->length; idx = i; @@ -1321,7 +1383,7 @@ static int write_to_buffered_TS_output(buffered_TS_output_p writer, writer->num_packets ++; // Have we filled this entry in the circular buffer? - if ((*length) == circular->item_size) + if ((*length) >= circular->item_size - circular->hdr_size) internal_flush_buffered_TS_output(writer); return 0; } @@ -1868,12 +1930,12 @@ extern int wait_for_command(TS_writer_p tswriter) * * Returns 0 if all went well, 1 if something went wrong. */ -static int write_circular_data(SOCKET output, - circular_buffer_p circular) +static int write_circular_data(const SOCKET output, + const circular_buffer_p circular) { int err; - byte *buffer = circular->item_data + circular->start*circular->item_size; - int length = circular->item[circular->start].length; + byte *buffer = circular->item_data + circular->start*circular->item_size - circular->hdr_size; + int length = circular->item[circular->start].length + circular->hdr_size; #if DISPLAY_BUFFER int oldend = circular->pending; int oldstart = circular->start; @@ -1881,6 +1943,7 @@ static int write_circular_data(SOCKET output, #endif err = write_socket_data(output,buffer,length); + if (err) { // If we're writing out over UDP, it's possible our write fails for @@ -2202,7 +2265,7 @@ static int write_from_circular(SOCKET output, wait_microseconds(waitfor); sent_without_delay = 0; } - + // Write it... err = write_circular_data(output,circular); if (err) return 1; @@ -2635,7 +2698,8 @@ extern int tswrite_start_buffering(TS_writer_p tswriter, tswrite_pcr_mode pcr_mode, int prime_size, int prime_speedup, - double pcr_scale) + double pcr_scale, + const tswrite_pkt_hdr_type_t hdr_type) { int err; @@ -2651,7 +2715,8 @@ extern int tswrite_start_buffering(TS_writer_p tswriter, err = build_buffered_TS_output(&(tswriter->writer), circ_buf_size,TS_in_packet, maxnowait,waitfor,byterate,pcr_mode, - prime_size,prime_speedup,pcr_scale); + prime_size,prime_speedup,pcr_scale, + hdr_type); if (err) return 1; err = start_child(tswriter); @@ -2687,8 +2752,8 @@ extern int tswrite_start_buffering_from_context(TS_writer_p tswriter, context->pcr_mode, context->prime_size, context->prime_speedup, - context->pcr_scale); - + context->pcr_scale, + context->pkt_hdr_type); } /* @@ -3332,6 +3397,7 @@ extern int tswrite_process_args(char *prefix, context->prime_size = DEFAULT_PRIME_SIZE; context->prime_speedup = 100; context->pcr_scale = 1.0; + context->pkt_hdr_type = PKT_HDR_TYPE_NONE; while (ii < argc) { @@ -3456,7 +3522,12 @@ extern int tswrite_process_args(char *prefix, argv[ii] = argv[ii+1] = TSWRITE_PROCESSED; ii++; } - else if (!strcmp("-hd",argv[ii])) + else if (!strcmp("-rtp", argv[ii])) + { + context->pkt_hdr_type = PKT_HDR_TYPE_RTP; + argv[ii] = TSWRITE_PROCESSED; + } + else if (!strcmp("-hd", argv[ii])) { context->maxnowait = 40; context->bitrate = 20000000; diff --git a/tswrite_defns.h b/tswrite_defns.h index 4544a7d..0835e5c 100644 --- a/tswrite_defns.h +++ b/tswrite_defns.h @@ -178,6 +178,13 @@ typedef enum tswrite_pcr_mode_e { TSWRITE_PCR_MODE_PCR2 } tswrite_pcr_mode; +typedef enum tswrite_pkt_hdr_type_e +{ + PKT_HDR_TYPE_NONE = 0, + PKT_HDR_TYPE_RTP +} tswrite_pkt_hdr_type_t; + + // ------------------------------------------------------------ // Context for use in decoding command line - see `tswrite_process_args()` struct TS_context @@ -192,6 +199,7 @@ struct TS_context tswrite_pcr_mode pcr_mode; // use PCRs for timing information? int prime_size; // initial priming size for buffered output int prime_speedup; // percentage of normal speed to prime with + tswrite_pkt_hdr_type_t pkt_hdr_type; double pcr_scale; // multiplier for PCRs -- see buffered_TS_output }; typedef struct TS_context *TS_context_p; diff --git a/tswrite_fns.h b/tswrite_fns.h index ee821ad..2f8a4ee 100644 --- a/tswrite_fns.h +++ b/tswrite_fns.h @@ -177,7 +177,9 @@ extern int tswrite_start_buffering(TS_writer_p tswriter, tswrite_pcr_mode pcr_mode, int prime_size, int prime_speedup, - double pcr_scale); + double pcr_scale, + const tswrite_pkt_hdr_type_t hdr_type); + /* * Set up internal buffering for TS output. This is necessary for UDP output, * and optional otherwise.