Add a mode that adds rtp headers

master
John Cox 2013-08-27 14:48:48 +01:00
rodzic 93a65e2761
commit 8c8d425257
3 zmienionych plików z 111 dodań i 30 usunięć

129
tswrite.c
Wyświetl plik

@ -178,6 +178,15 @@ typedef struct circular_buffer_item *circular_buffer_item_p;
#define SIZEOF_CIRCULAR_BUFFER_ITEM sizeof(struct circular_buffer_item)
typedef struct rtp_hdr_info_s
{
uint16_t seq;
uint32_t ssrc;
} rtp_hdr_info_t;
// ------------------------------------------------------------
// The header for the circular buffer
//
@ -198,6 +207,11 @@ struct circular_buffer
int TS_in_item; // max number of TS packets in a circular buffer item
int item_size; // and thus the size of said item's data array
int hdr_size;
tswrite_pkt_hdr_type_t hdr_type;
union {
rtp_hdr_info_t rtp;
} hdr;
int maxnowait; // max number consecutive packets to send with no wait
int waitfor; // the number of microseconds to wait thereafter
@ -337,7 +351,8 @@ static int map_circular_buffer(circular_buffer_p *circular,
int circ_buf_size,
int TS_in_packet,
int maxnowait,
int waitfor)
int waitfor,
const tswrite_pkt_hdr_type_t hdr_type)
{
// Rather than map a file, we'll map anonymous memory
// BSD supports the MAP_ANON flag as is,
@ -353,27 +368,34 @@ static int map_circular_buffer(circular_buffer_p *circular,
// is not fixed, we can't just allocate it "inside" the buffer items (it
// wouldn't be nice to allocate the *maximum* possible space we might want!).
// Instead, we'll put it as a byte array after the rest of our data.
//
// Space may be left to add an RTP header before each items data
//
// So:
const int hdr_size = (hdr_type == PKT_HDR_TYPE_RTP) ? 12 : 0;
int base_size = SIZEOF_CIRCULAR_BUFFER +
(circ_buf_size * SIZEOF_CIRCULAR_BUFFER_ITEM);
int data_size = circ_buf_size * TS_in_packet * TS_PACKET_SIZE;
int data_size = circ_buf_size * (TS_in_packet * TS_PACKET_SIZE + hdr_size);
int total_size = base_size + data_size;
circular_buffer_p cb;
*circular = NULL;
#ifdef _WIN32
// Under Windows, we're using threading to manage our parent/child
// processes, so we can just use malloc here
*circular = malloc(total_size);
if (*circular == NULL)
cb = malloc(total_size);
if (cb == NULL)
{
fprint_err("### Error mapping circular buffer as shared memory: %s\n",
strerror(errno));
return 1;
}
#else // _WIN32
*circular = mmap(NULL,total_size,
cb = mmap(NULL,total_size,
PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANON, -1, 0);
if (*circular == MAP_FAILED)
if (cb == MAP_FAILED)
{
fprint_err("### Error mapping circular buffer as shared memory: %s\n",
strerror(errno));
@ -381,16 +403,27 @@ static int map_circular_buffer(circular_buffer_p *circular,
}
#endif // _WIN32
(*circular)->start = 1;
(*circular)->end = 0;
(*circular)->pending = 0;
(*circular)->eos = FALSE;
(*circular)->size = circ_buf_size;
(*circular)->TS_in_item = TS_in_packet;
(*circular)->item_size = TS_in_packet * TS_PACKET_SIZE;
(*circular)->maxnowait = maxnowait;
(*circular)->waitfor = waitfor;
(*circular)->item_data = (byte *) *circular + base_size;
cb->start = 1;
cb->end = 0;
cb->pending = 0;
cb->eos = FALSE;
cb->size = circ_buf_size;
cb->TS_in_item = TS_in_packet;
cb->item_size = TS_in_packet * TS_PACKET_SIZE + hdr_size;
cb->hdr_size = hdr_size;
cb->hdr_type = hdr_type;
if (hdr_type == PKT_HDR_TYPE_RTP)
{
struct timeval now;
gettimeofday(&now, NULL);
cb->hdr.rtp.seq = 0;
cb->hdr.rtp.ssrc = (uint32_t)(now.tv_sec ^ now.tv_usec << 12); // A somewhat random number
}
cb->maxnowait = maxnowait;
cb->waitfor = waitfor;
cb->item_data = (byte *) cb + base_size + hdr_size;
*circular = cb;
return 0;
}
@ -639,7 +672,8 @@ static int build_buffered_TS_output(buffered_TS_output_p *writer,
tswrite_pcr_mode pcr_mode,
int prime_size,
int prime_speedup,
double pcr_scale)
double pcr_scale,
const tswrite_pkt_hdr_type_t hdr_type)
{
int err, ii;
circular_buffer_p circular;
@ -651,7 +685,7 @@ static int build_buffered_TS_output(buffered_TS_output_p *writer,
}
err = map_circular_buffer(&circular,circ_buf_size,TS_in_packet,
maxnowait,waitfor);
maxnowait,waitfor,hdr_type);
if (err)
{
print_err("### Error building buffered output\n");
@ -902,6 +936,22 @@ static int set_buffer_item_time_pcr1(buffered_TS_output_p writer)
return writer->which;
}
static inline void
set_32_be(uint8_t * const p, const uint32_t x)
{
p[0] = x >> 24;
p[1] = (x >> 16) & 0xff;
p[2] = (x >> 8) & 0xff;
p[3] = x & 0xff;
}
static inline void
set_16_be(uint8_t * const p, const unsigned int x)
{
p[0] = (x >> 8) & 0xff;
p[1] = x & 0xff;
}
// Set times on all packets between where we were and where we are now
// Sets the time on both the first & last packets
@ -923,6 +973,18 @@ set_circ_times(const circular_buffer_p circ,
struct circular_buffer_item * const item = circ->item + i;
int64_t pcr = (int64_t)pcr1 + (int64_t)offset * (int64_t)pcr_gap / (int64_t)gap_bytes;
if (circ->hdr_type == PKT_HDR_TYPE_RTP)
{
uint32_t timestamp = (uint32_t)(pcr / (uint64_t)300);
uint8_t * rtp_buf = circ->item_data + i * circ->item_size - 12;
rtp_buf[0] = 0x80;
rtp_buf[1] = 33; // TS
set_16_be(rtp_buf + 2, ++circ->hdr.rtp.seq);
set_32_be(rtp_buf + 4, timestamp);
set_32_be(rtp_buf + 8, circ->hdr.rtp.ssrc);
}
item->time = (uint32_t)(pcr / 27); // "time" in us
offset += item->length;
idx = i;
@ -1321,7 +1383,7 @@ static int write_to_buffered_TS_output(buffered_TS_output_p writer,
writer->num_packets ++;
// Have we filled this entry in the circular buffer?
if ((*length) == circular->item_size)
if ((*length) >= circular->item_size - circular->hdr_size)
internal_flush_buffered_TS_output(writer);
return 0;
}
@ -1868,12 +1930,12 @@ extern int wait_for_command(TS_writer_p tswriter)
*
* Returns 0 if all went well, 1 if something went wrong.
*/
static int write_circular_data(SOCKET output,
circular_buffer_p circular)
static int write_circular_data(const SOCKET output,
const circular_buffer_p circular)
{
int err;
byte *buffer = circular->item_data + circular->start*circular->item_size;
int length = circular->item[circular->start].length;
byte *buffer = circular->item_data + circular->start*circular->item_size - circular->hdr_size;
int length = circular->item[circular->start].length + circular->hdr_size;
#if DISPLAY_BUFFER
int oldend = circular->pending;
int oldstart = circular->start;
@ -1881,6 +1943,7 @@ static int write_circular_data(SOCKET output,
#endif
err = write_socket_data(output,buffer,length);
if (err)
{
// If we're writing out over UDP, it's possible our write fails for
@ -2202,7 +2265,7 @@ static int write_from_circular(SOCKET output,
wait_microseconds(waitfor);
sent_without_delay = 0;
}
// Write it...
err = write_circular_data(output,circular);
if (err) return 1;
@ -2635,7 +2698,8 @@ extern int tswrite_start_buffering(TS_writer_p tswriter,
tswrite_pcr_mode pcr_mode,
int prime_size,
int prime_speedup,
double pcr_scale)
double pcr_scale,
const tswrite_pkt_hdr_type_t hdr_type)
{
int err;
@ -2651,7 +2715,8 @@ extern int tswrite_start_buffering(TS_writer_p tswriter,
err = build_buffered_TS_output(&(tswriter->writer),
circ_buf_size,TS_in_packet,
maxnowait,waitfor,byterate,pcr_mode,
prime_size,prime_speedup,pcr_scale);
prime_size,prime_speedup,pcr_scale,
hdr_type);
if (err) return 1;
err = start_child(tswriter);
@ -2687,8 +2752,8 @@ extern int tswrite_start_buffering_from_context(TS_writer_p tswriter,
context->pcr_mode,
context->prime_size,
context->prime_speedup,
context->pcr_scale);
context->pcr_scale,
context->pkt_hdr_type);
}
/*
@ -3332,6 +3397,7 @@ extern int tswrite_process_args(char *prefix,
context->prime_size = DEFAULT_PRIME_SIZE;
context->prime_speedup = 100;
context->pcr_scale = 1.0;
context->pkt_hdr_type = PKT_HDR_TYPE_NONE;
while (ii < argc)
{
@ -3456,7 +3522,12 @@ extern int tswrite_process_args(char *prefix,
argv[ii] = argv[ii+1] = TSWRITE_PROCESSED;
ii++;
}
else if (!strcmp("-hd",argv[ii]))
else if (!strcmp("-rtp", argv[ii]))
{
context->pkt_hdr_type = PKT_HDR_TYPE_RTP;
argv[ii] = TSWRITE_PROCESSED;
}
else if (!strcmp("-hd", argv[ii]))
{
context->maxnowait = 40;
context->bitrate = 20000000;

Wyświetl plik

@ -178,6 +178,13 @@ typedef enum tswrite_pcr_mode_e {
TSWRITE_PCR_MODE_PCR2
} tswrite_pcr_mode;
typedef enum tswrite_pkt_hdr_type_e
{
PKT_HDR_TYPE_NONE = 0,
PKT_HDR_TYPE_RTP
} tswrite_pkt_hdr_type_t;
// ------------------------------------------------------------
// Context for use in decoding command line - see `tswrite_process_args()`
struct TS_context
@ -192,6 +199,7 @@ struct TS_context
tswrite_pcr_mode pcr_mode; // use PCRs for timing information?
int prime_size; // initial priming size for buffered output
int prime_speedup; // percentage of normal speed to prime with
tswrite_pkt_hdr_type_t pkt_hdr_type;
double pcr_scale; // multiplier for PCRs -- see buffered_TS_output
};
typedef struct TS_context *TS_context_p;

Wyświetl plik

@ -177,7 +177,9 @@ extern int tswrite_start_buffering(TS_writer_p tswriter,
tswrite_pcr_mode pcr_mode,
int prime_size,
int prime_speedup,
double pcr_scale);
double pcr_scale,
const tswrite_pkt_hdr_type_t hdr_type);
/*
* Set up internal buffering for TS output. This is necessary for UDP output,
* and optional otherwise.