FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/libavformat/fifo.c
Date: 2024-04-18 20:30:25
Exec Total Coverage
Lines: 236 329 71.7%
Functions: 17 18 94.4%
Branches: 88 176 50.0%

Line Branch Exec Source
1 /*
2 * FIFO pseudo-muxer
3 * Copyright (c) 2016 Jan Sebechlebsky
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public License
9 * as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
19 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <stdatomic.h>
23
24 #include "libavutil/avassert.h"
25 #include "libavutil/opt.h"
26 #include "libavutil/time.h"
27 #include "libavutil/thread.h"
28 #include "libavutil/threadmessage.h"
29 #include "avformat.h"
30 #include "internal.h"
31 #include "mux.h"
32
33 #define FIFO_DEFAULT_QUEUE_SIZE 60
34 #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 0
35 #define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds
36
37 typedef struct FifoContext {
38 const AVClass *class;
39 AVFormatContext *avf;
40
41 char *format;
42 AVDictionary *format_options;
43
44 int queue_size;
45 AVThreadMessageQueue *queue;
46
47 pthread_t writer_thread;
48
49 /* Return value of last write_trailer_call */
50 int write_trailer_ret;
51
52 /* Time to wait before next recovery attempt
53 * This can refer to the time in processed stream,
54 * or real time. */
55 int64_t recovery_wait_time;
56
57 /* Maximal number of unsuccessful successive recovery attempts */
58 int max_recovery_attempts;
59
60 /* Whether to attempt recovery from failure */
61 int attempt_recovery;
62
63 /* If >0 stream time will be used when waiting
64 * for the recovery attempt instead of real time */
65 int recovery_wait_streamtime;
66
67 /* If >0 recovery will be attempted regardless of error code
68 * (except AVERROR_EXIT, so exit request is never ignored) */
69 int recover_any_error;
70
71 /* Whether to drop packets in case the queue is full. */
72 int drop_pkts_on_overflow;
73
74 /* Whether to wait for keyframe when recovering
75 * from failure or queue overflow */
76 int restart_with_keyframe;
77
78 pthread_mutex_t overflow_flag_lock;
79 int overflow_flag_lock_initialized;
80 /* Value > 0 signals queue overflow */
81 volatile uint8_t overflow_flag;
82
83 atomic_int_least64_t queue_duration;
84 int64_t last_sent_dts;
85 int64_t timeshift;
86 } FifoContext;
87
88 typedef struct FifoThreadContext {
89 AVFormatContext *avf;
90
91 /* Timestamp of last failure.
92 * This is either pts in case stream time is used,
93 * or microseconds as returned by av_gettime_relative() */
94 int64_t last_recovery_ts;
95
96 /* Number of current recovery process
97 * Value > 0 means we are in recovery process */
98 int recovery_nr;
99
100 /* If > 0 all frames will be dropped until keyframe is received */
101 uint8_t drop_until_keyframe;
102
103 /* Value > 0 means that the previous write_header call was successful
104 * so finalization by calling write_trailer and ff_io_close must be done
105 * before exiting / reinitialization of underlying muxer */
106 uint8_t header_written;
107
108 int64_t last_received_dts;
109 } FifoThreadContext;
110
111 typedef enum FifoMessageType {
112 FIFO_NOOP,
113 FIFO_WRITE_HEADER,
114 FIFO_WRITE_PACKET,
115 FIFO_FLUSH_OUTPUT
116 } FifoMessageType;
117
118 typedef struct FifoMessage {
119 FifoMessageType type;
120 AVPacket pkt;
121 } FifoMessage;
122
123 51 static int fifo_thread_write_header(FifoThreadContext *ctx)
124 {
125 51 AVFormatContext *avf = ctx->avf;
126 51 FifoContext *fifo = avf->priv_data;
127 51 AVFormatContext *avf2 = fifo->avf;
128 51 AVDictionary *format_options = NULL;
129 int ret, i;
130
131 51 ret = av_dict_copy(&format_options, fifo->format_options, 0);
132
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 51 times.
51 if (ret < 0)
133 goto end;
134
135 51 ret = ff_format_output_open(avf2, avf->url, &format_options);
136
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 51 times.
51 if (ret < 0) {
137 av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->url,
138 av_err2str(ret));
139 goto end;
140 }
141
142
2/2
✓ Branch 0 taken 52 times.
✓ Branch 1 taken 51 times.
103 for (i = 0;i < avf2->nb_streams; i++)
143 52 ffstream(avf2->streams[i])->cur_dts = 0;
144
145 51 ret = avformat_write_header(avf2, &format_options);
146
1/2
✓ Branch 0 taken 51 times.
✗ Branch 1 not taken.
51 if (!ret)
147 51 ctx->header_written = 1;
148
149 // Check for options unrecognized by underlying muxer
150
1/2
✓ Branch 0 taken 51 times.
✗ Branch 1 not taken.
51 if (format_options) {
151 const AVDictionaryEntry *entry = NULL;
152 while ((entry = av_dict_iterate(format_options, entry)))
153 av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
154 ret = AVERROR(EINVAL);
155 }
156
157 51 end:
158 51 av_dict_free(&format_options);
159 51 return ret;
160 }
161
162 3 static int fifo_thread_flush_output(FifoThreadContext *ctx)
163 {
164 3 AVFormatContext *avf = ctx->avf;
165 3 FifoContext *fifo = avf->priv_data;
166 3 AVFormatContext *avf2 = fifo->avf;
167
168 3 return av_write_frame(avf2, NULL);
169 }
170
171 static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts)
172 {
173 AVStream *st = avf->streams[pkt->stream_index];
174 int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q);
175 int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts);
176 *last_dts = dts;
177 return duration;
178 }
179
180 145 static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
181 {
182 145 AVFormatContext *avf = ctx->avf;
183 145 FifoContext *fifo = avf->priv_data;
184 145 AVFormatContext *avf2 = fifo->avf;
185 AVRational src_tb, dst_tb;
186 int ret, s_idx;
187 int64_t orig_pts, orig_dts, orig_duration;
188
189
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 145 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
145 if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
190 atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed);
191
192
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 145 times.
145 if (ctx->drop_until_keyframe) {
193 if (pkt->flags & AV_PKT_FLAG_KEY) {
194 ctx->drop_until_keyframe = 0;
195 av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
196 } else {
197 av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
198 av_packet_unref(pkt);
199 return 0;
200 }
201 }
202
203 145 orig_pts = pkt->pts;
204 145 orig_dts = pkt->dts;
205 145 orig_duration = pkt->duration;
206 145 s_idx = pkt->stream_index;
207 145 src_tb = avf->streams[s_idx]->time_base;
208 145 dst_tb = avf2->streams[s_idx]->time_base;
209 145 av_packet_rescale_ts(pkt, src_tb, dst_tb);
210
211 145 ret = av_write_frame(avf2, pkt);
212
2/2
✓ Branch 0 taken 100 times.
✓ Branch 1 taken 45 times.
145 if (ret >= 0) {
213 100 av_packet_unref(pkt);
214 } else {
215 // avoid scaling twice
216 45 pkt->pts = orig_pts;
217 45 pkt->dts = orig_dts;
218 45 pkt->duration = orig_duration;
219 }
220 145 return ret;
221 }
222
223 51 static int fifo_thread_write_trailer(FifoThreadContext *ctx)
224 {
225 51 AVFormatContext *avf = ctx->avf;
226 51 FifoContext *fifo = avf->priv_data;
227 51 AVFormatContext *avf2 = fifo->avf;
228 int ret;
229
230
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 51 times.
51 if (!ctx->header_written)
231 return 0;
232
233 51 ret = av_write_trailer(avf2);
234 51 ff_format_io_close(avf2, &avf2->pb);
235
236 51 return ret;
237 }
238
239 154 static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
240 {
241 154 int ret = AVERROR(EINVAL);
242
243
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 154 times.
154 if (msg->type == FIFO_NOOP)
244 return 0;
245
246
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 103 times.
154 if (!ctx->header_written) {
247 51 ret = fifo_thread_write_header(ctx);
248
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 51 times.
51 if (ret < 0)
249 return ret;
250 }
251
252
3/4
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 145 times.
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
154 switch(msg->type) {
253 6 case FIFO_WRITE_HEADER:
254
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 av_assert0(ret >= 0);
255 6 return ret;
256 145 case FIFO_WRITE_PACKET:
257 145 return fifo_thread_write_packet(ctx, &msg->pkt);
258 3 case FIFO_FLUSH_OUTPUT:
259 3 return fifo_thread_flush_output(ctx);
260 }
261
262 av_assert0(0);
263 return AVERROR(EINVAL);
264 }
265
266 75 static int is_recoverable(const FifoContext *fifo, int err_no) {
267
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 75 times.
75 if (!fifo->attempt_recovery)
268 return 0;
269
270
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 75 times.
75 if (fifo->recover_any_error)
271 return err_no != AVERROR_EXIT;
272
273
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 75 times.
75 switch (err_no) {
274 case AVERROR(EINVAL):
275 case AVERROR(ENOSYS):
276 case AVERROR_EOF:
277 case AVERROR_EXIT:
278 case AVERROR_PATCHWELCOME:
279 return 0;
280 75 default:
281 75 return 1;
282 }
283 }
284
285 3 static void free_message(void *msg)
286 {
287 3 FifoMessage *fifo_msg = msg;
288
289
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if (fifo_msg->type == FIFO_WRITE_PACKET)
290 3 av_packet_unref(&fifo_msg->pkt);
291 3 }
292
293 30 static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt,
294 int err_no)
295 {
296 30 AVFormatContext *avf = ctx->avf;
297 30 FifoContext *fifo = avf->priv_data;
298 int ret;
299
300 30 av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
301 30 av_err2str(err_no));
302
303
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
30 if (fifo->recovery_wait_streamtime) {
304 if (pkt->pts == AV_NOPTS_VALUE)
305 av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
306 " timestamp, recovery will be attempted immediately");
307 ctx->last_recovery_ts = pkt->pts;
308 } else {
309 30 ctx->last_recovery_ts = av_gettime_relative();
310 }
311
312
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
30 if (fifo->max_recovery_attempts &&
313 ctx->recovery_nr >= fifo->max_recovery_attempts) {
314 av_log(avf, AV_LOG_ERROR,
315 "Maximal number of %d recovery attempts reached.\n",
316 fifo->max_recovery_attempts);
317 ret = err_no;
318 } else {
319 30 ret = AVERROR(EAGAIN);
320 }
321
322 30 return ret;
323 }
324
325 45 static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
326 {
327 45 AVFormatContext *avf = ctx->avf;
328 45 FifoContext *fifo = avf->priv_data;
329 45 AVPacket *pkt = &msg->pkt;
330 int64_t time_since_recovery;
331 int ret;
332
333
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 45 times.
45 if (!is_recoverable(fifo, err_no)) {
334 ret = err_no;
335 goto fail;
336 }
337
338
1/2
✓ Branch 0 taken 45 times.
✗ Branch 1 not taken.
45 if (ctx->header_written) {
339 45 fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
340 45 ctx->header_written = 0;
341 }
342
343
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 30 times.
45 if (!ctx->recovery_nr) {
344 15 ctx->last_recovery_ts = fifo->recovery_wait_streamtime ?
345
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
15 AV_NOPTS_VALUE : 0;
346 } else {
347
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
30 if (fifo->recovery_wait_streamtime) {
348 if (ctx->last_recovery_ts == AV_NOPTS_VALUE) {
349 AVRational tb = avf->streams[pkt->stream_index]->time_base;
350 time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
351 tb, AV_TIME_BASE_Q);
352 } else {
353 /* Enforce recovery immediately */
354 time_since_recovery = fifo->recovery_wait_time;
355 }
356 } else {
357 30 time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
358 }
359
360
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
30 if (time_since_recovery < fifo->recovery_wait_time)
361 return AVERROR(EAGAIN);
362 }
363
364 45 ctx->recovery_nr++;
365
366
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 45 times.
45 if (fifo->max_recovery_attempts) {
367 av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n",
368 ctx->recovery_nr, fifo->max_recovery_attempts);
369 } else {
370 45 av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n",
371 ctx->recovery_nr);
372 }
373
374
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
45 if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow)
375 ctx->drop_until_keyframe = 1;
376
377 45 ret = fifo_thread_dispatch_message(ctx, msg);
378
2/2
✓ Branch 0 taken 30 times.
✓ Branch 1 taken 15 times.
45 if (ret < 0) {
379
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 if (is_recoverable(fifo, ret)) {
380 30 return fifo_thread_process_recovery_failure(ctx, pkt, ret);
381 } else {
382 goto fail;
383 }
384 } else {
385 15 av_log(avf, AV_LOG_INFO, "Recovery successful\n");
386 15 ctx->recovery_nr = 0;
387 }
388
389 15 return 0;
390
391 fail:
392 free_message(msg);
393 return ret;
394 }
395
396 15 static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
397 {
398 15 AVFormatContext *avf = ctx->avf;
399 15 FifoContext *fifo = avf->priv_data;
400 int ret;
401
402 do {
403
3/4
✓ Branch 0 taken 45 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 30 times.
✓ Branch 3 taken 15 times.
45 if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
404 30 int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
405 30 int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
406
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
30 if (time_to_wait)
407 av_usleep(FFMIN(10000, time_to_wait));
408 }
409
410 45 ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
411
3/4
✓ Branch 0 taken 30 times.
✓ Branch 1 taken 15 times.
✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
45 } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow);
412
413
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
15 if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) {
414 if (msg->type == FIFO_WRITE_PACKET)
415 av_packet_unref(&msg->pkt);
416 ret = 0;
417 }
418
419 15 return ret;
420 }
421
422 6 static void *fifo_consumer_thread(void *data)
423 {
424 6 AVFormatContext *avf = data;
425 6 FifoContext *fifo = avf->priv_data;
426 6 AVThreadMessageQueue *queue = fifo->queue;
427 6 FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}};
428 int ret;
429
430 FifoThreadContext fifo_thread_ctx;
431 6 memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
432 6 fifo_thread_ctx.avf = avf;
433 6 fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE;
434
435 6 ff_thread_setname("fifo-consumer");
436
437 103 while (1) {
438 109 uint8_t just_flushed = 0;
439
440
1/2
✓ Branch 0 taken 109 times.
✗ Branch 1 not taken.
109 if (!fifo_thread_ctx.recovery_nr)
441 109 ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
442
443
3/4
✓ Branch 0 taken 94 times.
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 94 times.
109 if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
444 15 int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
445
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
15 if (rec_ret < 0) {
446 av_thread_message_queue_set_err_send(queue, rec_ret);
447 break;
448 }
449 }
450
451 /* If the queue is full at the moment when fifo_write_packet
452 * attempts to insert new message (packet) to the queue,
453 * it sets the fifo->overflow_flag to 1 and drops packet.
454 * Here in consumer thread, the flag is checked and if it is
455 * set, the queue is flushed and flag cleared. */
456 109 pthread_mutex_lock(&fifo->overflow_flag_lock);
457
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 108 times.
109 if (fifo->overflow_flag) {
458 1 av_thread_message_flush(queue);
459
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (fifo->restart_with_keyframe)
460 fifo_thread_ctx.drop_until_keyframe = 1;
461 1 fifo->overflow_flag = 0;
462 1 just_flushed = 1;
463 }
464 109 pthread_mutex_unlock(&fifo->overflow_flag_lock);
465
466
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 108 times.
109 if (just_flushed)
467 1 av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
468
469
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 109 times.
109 if (fifo->timeshift)
470 while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift)
471 av_usleep(10000);
472
473 109 ret = av_thread_message_queue_recv(queue, &msg, 0);
474
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 103 times.
109 if (ret < 0) {
475 6 av_thread_message_queue_set_err_send(queue, ret);
476 6 break;
477 }
478 }
479
480 6 fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
481
482 6 return NULL;
483 }
484
485 6 static int fifo_mux_init(AVFormatContext *avf, const AVOutputFormat *oformat,
486 const char *filename)
487 {
488 6 FifoContext *fifo = avf->priv_data;
489 AVFormatContext *avf2;
490 6 int ret = 0, i;
491
492 6 ret = avformat_alloc_output_context2(&avf2, oformat, NULL, filename);
493
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0)
494 return ret;
495
496 6 fifo->avf = avf2;
497
498 6 avf2->interrupt_callback = avf->interrupt_callback;
499 6 avf2->max_delay = avf->max_delay;
500 6 ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
501
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0)
502 return ret;
503 6 avf2->opaque = avf->opaque;
504 6 avf2->io_close2 = avf->io_close2;
505 6 avf2->io_open = avf->io_open;
506 6 avf2->flags = avf->flags;
507
508
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 6 times.
13 for (i = 0; i < avf->nb_streams; ++i) {
509 7 AVStream *st = ff_stream_clone(avf2, avf->streams[i]);
510
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
7 if (!st)
511 return AVERROR(ENOMEM);
512 }
513
514 6 return 0;
515 }
516
517 6 static int fifo_init(AVFormatContext *avf)
518 {
519 6 FifoContext *fifo = avf->priv_data;
520 const AVOutputFormat *oformat;
521 6 int ret = 0;
522
523
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
6 if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
524 av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
525 " only when drop_pkts_on_overflow is also turned on\n");
526 return AVERROR(EINVAL);
527 }
528 6 atomic_init(&fifo->queue_duration, 0);
529 6 fifo->last_sent_dts = AV_NOPTS_VALUE;
530
531 #ifdef FIFO_TEST
532 /* This exists for the fifo_muxer test tool. */
533
2/4
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
4 if (fifo->format && !strcmp(fifo->format, "fifo_test")) {
534 extern const FFOutputFormat ff_fifo_test_muxer;
535 4 oformat = &ff_fifo_test_muxer.p;
536 } else
537 #endif
538 2 oformat = av_guess_format(fifo->format, avf->url, NULL);
539
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (!oformat) {
540 ret = AVERROR_MUXER_NOT_FOUND;
541 return ret;
542 }
543
544 6 ret = fifo_mux_init(avf, oformat, avf->url);
545
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0)
546 return ret;
547
548 6 ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
549 sizeof(FifoMessage));
550
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0)
551 return ret;
552
553 6 av_thread_message_queue_set_free_func(fifo->queue, free_message);
554
555 6 ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
556
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0)
557 return AVERROR(ret);
558 6 fifo->overflow_flag_lock_initialized = 1;
559
560 6 return 0;
561 }
562
563 6 static int fifo_write_header(AVFormatContext *avf)
564 {
565 6 FifoContext * fifo = avf->priv_data;
566 int ret;
567
568 6 ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf);
569
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret) {
570 av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
571 av_err2str(AVERROR(ret)));
572 ret = AVERROR(ret);
573 }
574
575 6 return ret;
576 }
577
578 109 static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
579 {
580 109 FifoContext *fifo = avf->priv_data;
581
2/2
✓ Branch 0 taken 106 times.
✓ Branch 1 taken 3 times.
109 FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
582 int ret;
583
584
2/2
✓ Branch 0 taken 106 times.
✓ Branch 1 taken 3 times.
109 if (pkt) {
585 106 ret = av_packet_ref(&msg.pkt,pkt);
586
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 106 times.
106 if (ret < 0)
587 return ret;
588 }
589
590 109 ret = av_thread_message_queue_send(fifo->queue, &msg,
591 109 fifo->drop_pkts_on_overflow ?
592 AV_THREAD_MESSAGE_NONBLOCK : 0);
593
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 106 times.
109 if (ret == AVERROR(EAGAIN)) {
594 3 uint8_t overflow_set = 0;
595
596 /* Queue is full, set fifo->overflow_flag to 1
597 * to let consumer thread know the queue should
598 * be flushed. */
599 3 pthread_mutex_lock(&fifo->overflow_flag_lock);
600
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 2 times.
3 if (!fifo->overflow_flag)
601 1 fifo->overflow_flag = overflow_set = 1;
602 3 pthread_mutex_unlock(&fifo->overflow_flag_lock);
603
604
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 2 times.
3 if (overflow_set)
605 1 av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
606 3 ret = 0;
607 3 goto fail;
608
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 106 times.
106 } else if (ret < 0) {
609 goto fail;
610 }
611
612
1/6
✗ Branch 0 not taken.
✓ Branch 1 taken 106 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
106 if (fifo->timeshift && pkt && pkt->dts != AV_NOPTS_VALUE)
613 atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed);
614
615 106 return ret;
616 3 fail:
617
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if (pkt)
618 3 av_packet_unref(&msg.pkt);
619 3 return ret;
620 }
621
622 6 static int fifo_write_trailer(AVFormatContext *avf)
623 {
624 6 FifoContext *fifo= avf->priv_data;
625 int ret;
626
627 6 av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
628
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (fifo->timeshift) {
629 int64_t now = av_gettime_relative();
630 int64_t elapsed = 0;
631 FifoMessage msg = {FIFO_NOOP};
632 do {
633 int64_t delay = av_gettime_relative() - now;
634 if (delay < 0) { // Discontinuity?
635 delay = 10000;
636 now = av_gettime_relative();
637 } else {
638 now += delay;
639 }
640 atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed);
641 elapsed += delay;
642 if (elapsed > fifo->timeshift)
643 break;
644 av_usleep(10000);
645 ret = av_thread_message_queue_send(fifo->queue, &msg, AV_THREAD_MESSAGE_NONBLOCK);
646 } while (ret >= 0 || ret == AVERROR(EAGAIN));
647 atomic_store(&fifo->queue_duration, INT64_MAX);
648 }
649
650 6 ret = pthread_join(fifo->writer_thread, NULL);
651
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0) {
652 av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
653 av_err2str(AVERROR(ret)));
654 return AVERROR(ret);
655 }
656
657 6 ret = fifo->write_trailer_ret;
658 6 return ret;
659 }
660
661 6 static void fifo_deinit(AVFormatContext *avf)
662 {
663 6 FifoContext *fifo = avf->priv_data;
664
665 6 avformat_free_context(fifo->avf);
666 6 av_thread_message_queue_free(&fifo->queue);
667
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (fifo->overflow_flag_lock_initialized)
668 6 pthread_mutex_destroy(&fifo->overflow_flag_lock);
669 6 }
670
671 #define OFFSET(x) offsetof(FifoContext, x)
672 static const AVOption options[] = {
673 {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
674 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
675
676 {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow),
677 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
678
679 {"fifo_format", "Target muxer", OFFSET(format),
680 AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
681
682 {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options),
683 AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
684
685 {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
686 AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
687
688 {"queue_size", "Size of fifo queue", OFFSET(queue_size),
689 AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
690
691 {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
692 OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
693
694 {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
695 AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
696
697 {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
698 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
699
700 {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
701 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
702
703 {"timeshift", "Delay fifo output", OFFSET(timeshift),
704 AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
705
706 {NULL},
707 };
708
709 static const AVClass fifo_muxer_class = {
710 .class_name = "Fifo muxer",
711 .item_name = av_default_item_name,
712 .option = options,
713 .version = LIBAVUTIL_VERSION_INT,
714 };
715
716 const FFOutputFormat ff_fifo_muxer = {
717 .p.name = "fifo",
718 .p.long_name = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
719 .p.priv_class = &fifo_muxer_class,
720 #if FF_API_ALLOW_FLUSH
721 .p.flags = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE,
722 #else
723 .p.flags = AVFMT_NOFILE | AVFMT_TS_NEGATIVE,
724 #endif
725 .priv_data_size = sizeof(FifoContext),
726 .init = fifo_init,
727 .write_header = fifo_write_header,
728 .write_packet = fifo_write_packet,
729 .write_trailer = fifo_write_trailer,
730 .deinit = fifo_deinit,
731 .flags_internal = FF_OFMT_FLAG_ALLOW_FLUSH,
732 };
733