FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/libavformat/fifo.c
Date: 2025-04-25 22:50:00
Exec Total Coverage
Lines: 241 341 70.7%
Functions: 17 18 94.4%
Branches: 92 184 50.0%

Line Branch Exec Source
1 /*
2 * FIFO pseudo-muxer
3 * Copyright (c) 2016 Jan Sebechlebsky
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public License
9 * as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
19 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <stdatomic.h>
23
24 #include "libavutil/avassert.h"
25 #include "libavutil/opt.h"
26 #include "libavutil/time.h"
27 #include "libavutil/thread.h"
28 #include "libavutil/threadmessage.h"
29 #include "avformat.h"
30 #include "internal.h"
31 #include "mux.h"
32
33 #define FIFO_DEFAULT_QUEUE_SIZE 60
34 #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 0
35 #define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds
36
37 typedef struct FifoContext {
38 const AVClass *class;
39 AVFormatContext *avf;
40
41 char *format;
42 AVDictionary *format_options;
43
44 int queue_size;
45 AVThreadMessageQueue *queue;
46
47 pthread_t writer_thread;
48
49 /* Return value of last write_trailer_call */
50 int write_trailer_ret;
51
52 /* Time to wait before next recovery attempt
53 * This can refer to the time in processed stream,
54 * or real time. */
55 int64_t recovery_wait_time;
56
57 /* Maximal number of unsuccessful successive recovery attempts */
58 int max_recovery_attempts;
59
60 /* Whether to attempt recovery from failure */
61 int attempt_recovery;
62
63 /* If >0 stream time will be used when waiting
64 * for the recovery attempt instead of real time */
65 int recovery_wait_streamtime;
66
67 /* If >0 recovery will be attempted regardless of error code
68 * (except AVERROR_EXIT, so exit request is never ignored) */
69 int recover_any_error;
70
71 /* Whether to drop packets in case the queue is full. */
72 int drop_pkts_on_overflow;
73
74 /* Whether to wait for keyframe when recovering
75 * from failure or queue overflow */
76 int restart_with_keyframe;
77
78 pthread_mutex_t overflow_flag_lock;
79 int overflow_flag_lock_initialized;
80 /* Value > 0 signals queue overflow */
81 volatile uint8_t overflow_flag;
82
83 atomic_int_least64_t queue_duration;
84 int64_t last_sent_dts;
85 int64_t timeshift;
86 } FifoContext;
87
88 typedef struct FifoThreadContext {
89 AVFormatContext *avf;
90
91 /* Timestamp of last failure.
92 * This is either pts in case stream time is used,
93 * or microseconds as returned by av_gettime_relative() */
94 int64_t last_recovery_ts;
95
96 /* Number of current recovery process
97 * Value > 0 means we are in recovery process */
98 int recovery_nr;
99
100 /* If > 0 all frames will be dropped until keyframe is received */
101 uint8_t drop_until_keyframe;
102
103 /* Value > 0 means that the previous write_header call was successful
104 * so finalization by calling write_trailer and ff_io_close must be done
105 * before exiting / reinitialization of underlying muxer */
106 uint8_t header_written;
107
108 int64_t last_received_dts;
109
110 /* If > 0 at least one of the streams is a video stream */
111 uint8_t has_video_stream;
112 } FifoThreadContext;
113
114 typedef enum FifoMessageType {
115 FIFO_NOOP,
116 FIFO_WRITE_HEADER,
117 FIFO_WRITE_PACKET,
118 FIFO_FLUSH_OUTPUT
119 } FifoMessageType;
120
121 typedef struct FifoMessage {
122 FifoMessageType type;
123 AVPacket pkt;
124 } FifoMessage;
125
126 51 static int fifo_thread_write_header(FifoThreadContext *ctx)
127 {
128 51 AVFormatContext *avf = ctx->avf;
129 51 FifoContext *fifo = avf->priv_data;
130 51 AVFormatContext *avf2 = fifo->avf;
131 51 AVDictionary *format_options = NULL;
132 int ret, i;
133
134 51 ret = av_dict_copy(&format_options, fifo->format_options, 0);
135
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 51 times.
51 if (ret < 0)
136 goto end;
137
138 51 ret = ff_format_output_open(avf2, avf->url, &format_options);
139
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 51 times.
51 if (ret < 0) {
140 av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->url,
141 av_err2str(ret));
142 goto end;
143 }
144
145
2/2
✓ Branch 0 taken 52 times.
✓ Branch 1 taken 51 times.
103 for (i = 0;i < avf2->nb_streams; i++)
146 52 ffstream(avf2->streams[i])->cur_dts = 0;
147
148 51 ret = avformat_write_header(avf2, &format_options);
149
1/2
✓ Branch 0 taken 51 times.
✗ Branch 1 not taken.
51 if (!ret)
150 51 ctx->header_written = 1;
151
152 // Check for options unrecognized by underlying muxer
153
1/2
✓ Branch 0 taken 51 times.
✗ Branch 1 not taken.
51 if (format_options) {
154 const AVDictionaryEntry *entry = NULL;
155 while ((entry = av_dict_iterate(format_options, entry)))
156 av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
157 ret = AVERROR(EINVAL);
158 }
159
160 51 end:
161 51 av_dict_free(&format_options);
162 51 return ret;
163 }
164
165 3 static int fifo_thread_flush_output(FifoThreadContext *ctx)
166 {
167 3 AVFormatContext *avf = ctx->avf;
168 3 FifoContext *fifo = avf->priv_data;
169 3 AVFormatContext *avf2 = fifo->avf;
170
171 3 return av_write_frame(avf2, NULL);
172 }
173
174 static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts)
175 {
176 AVStream *st = avf->streams[pkt->stream_index];
177 int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q);
178 int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts);
179 *last_dts = dts;
180 return duration;
181 }
182
183 145 static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
184 {
185 145 AVFormatContext *avf = ctx->avf;
186 145 FifoContext *fifo = avf->priv_data;
187 145 AVFormatContext *avf2 = fifo->avf;
188 AVRational src_tb, dst_tb;
189 int ret, s_idx;
190 int64_t orig_pts, orig_dts, orig_duration;
191 145 enum AVMediaType stream_codec_type = avf->streams[pkt->stream_index]->codecpar->codec_type;
192
193
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 145 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
145 if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
194 atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed);
195
196
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 145 times.
145 if (ctx->drop_until_keyframe) {
197 if (pkt->flags & AV_PKT_FLAG_KEY) {
198 if (!ctx->has_video_stream) {
199 ctx->drop_until_keyframe = 0;
200 av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
201 } else {
202 if (stream_codec_type == AVMEDIA_TYPE_VIDEO) {
203 ctx->drop_until_keyframe = 0;
204 av_log(avf, AV_LOG_VERBOSE, "Video keyframe received, recovering...\n");
205 } else {
206 av_log(avf, AV_LOG_VERBOSE, "Dropping non-video keyframe\n");
207 av_packet_unref(pkt);
208 return 0;
209 }
210 }
211 } else {
212 av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
213 av_packet_unref(pkt);
214 return 0;
215 }
216 }
217
218 145 orig_pts = pkt->pts;
219 145 orig_dts = pkt->dts;
220 145 orig_duration = pkt->duration;
221 145 s_idx = pkt->stream_index;
222 145 src_tb = avf->streams[s_idx]->time_base;
223 145 dst_tb = avf2->streams[s_idx]->time_base;
224 145 av_packet_rescale_ts(pkt, src_tb, dst_tb);
225
226 145 ret = av_write_frame(avf2, pkt);
227
2/2
✓ Branch 0 taken 100 times.
✓ Branch 1 taken 45 times.
145 if (ret >= 0) {
228 100 av_packet_unref(pkt);
229 } else {
230 // avoid scaling twice
231 45 pkt->pts = orig_pts;
232 45 pkt->dts = orig_dts;
233 45 pkt->duration = orig_duration;
234 }
235 145 return ret;
236 }
237
238 51 static int fifo_thread_write_trailer(FifoThreadContext *ctx)
239 {
240 51 AVFormatContext *avf = ctx->avf;
241 51 FifoContext *fifo = avf->priv_data;
242 51 AVFormatContext *avf2 = fifo->avf;
243 int ret;
244
245
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 51 times.
51 if (!ctx->header_written)
246 return 0;
247
248 51 ret = av_write_trailer(avf2);
249 51 ff_format_io_close(avf2, &avf2->pb);
250
251 51 return ret;
252 }
253
254 154 static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
255 {
256 154 int ret = AVERROR(EINVAL);
257
258
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 154 times.
154 if (msg->type == FIFO_NOOP)
259 return 0;
260
261
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 103 times.
154 if (!ctx->header_written) {
262 51 ret = fifo_thread_write_header(ctx);
263
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 51 times.
51 if (ret < 0)
264 return ret;
265 }
266
267
3/4
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 145 times.
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
154 switch(msg->type) {
268 6 case FIFO_WRITE_HEADER:
269
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 av_assert0(ret >= 0);
270 6 return ret;
271 145 case FIFO_WRITE_PACKET:
272 145 return fifo_thread_write_packet(ctx, &msg->pkt);
273 3 case FIFO_FLUSH_OUTPUT:
274 3 return fifo_thread_flush_output(ctx);
275 }
276
277 av_assert0(0);
278 return AVERROR(EINVAL);
279 }
280
281 75 static int is_recoverable(const FifoContext *fifo, int err_no) {
282
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 75 times.
75 if (!fifo->attempt_recovery)
283 return 0;
284
285
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 75 times.
75 if (fifo->recover_any_error)
286 return err_no != AVERROR_EXIT;
287
288
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 75 times.
75 switch (err_no) {
289 case AVERROR(EINVAL):
290 case AVERROR(ENOSYS):
291 case AVERROR_EOF:
292 case AVERROR_EXIT:
293 case AVERROR_PATCHWELCOME:
294 return 0;
295 75 default:
296 75 return 1;
297 }
298 }
299
300 3 static void free_message(void *msg)
301 {
302 3 FifoMessage *fifo_msg = msg;
303
304
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if (fifo_msg->type == FIFO_WRITE_PACKET)
305 3 av_packet_unref(&fifo_msg->pkt);
306 3 }
307
308 30 static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt,
309 int err_no)
310 {
311 30 AVFormatContext *avf = ctx->avf;
312 30 FifoContext *fifo = avf->priv_data;
313 int ret;
314
315 30 av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
316 30 av_err2str(err_no));
317
318
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
30 if (fifo->recovery_wait_streamtime) {
319 if (pkt->pts == AV_NOPTS_VALUE)
320 av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
321 " timestamp, recovery will be attempted immediately");
322 ctx->last_recovery_ts = pkt->pts;
323 } else {
324 30 ctx->last_recovery_ts = av_gettime_relative();
325 }
326
327
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
30 if (fifo->max_recovery_attempts &&
328 ctx->recovery_nr >= fifo->max_recovery_attempts) {
329 av_log(avf, AV_LOG_ERROR,
330 "Maximal number of %d recovery attempts reached.\n",
331 fifo->max_recovery_attempts);
332 ret = err_no;
333 } else {
334 30 ret = AVERROR(EAGAIN);
335 }
336
337 30 return ret;
338 }
339
340 45 static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
341 {
342 45 AVFormatContext *avf = ctx->avf;
343 45 FifoContext *fifo = avf->priv_data;
344 45 AVPacket *pkt = &msg->pkt;
345 int64_t time_since_recovery;
346 int ret;
347
348
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 45 times.
45 if (!is_recoverable(fifo, err_no)) {
349 ret = err_no;
350 goto fail;
351 }
352
353
1/2
✓ Branch 0 taken 45 times.
✗ Branch 1 not taken.
45 if (ctx->header_written) {
354 45 fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
355 45 ctx->header_written = 0;
356 }
357
358
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 30 times.
45 if (!ctx->recovery_nr) {
359 15 ctx->last_recovery_ts = fifo->recovery_wait_streamtime ?
360
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
15 AV_NOPTS_VALUE : 0;
361 } else {
362
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
30 if (fifo->recovery_wait_streamtime) {
363 if (ctx->last_recovery_ts == AV_NOPTS_VALUE) {
364 AVRational tb = avf->streams[pkt->stream_index]->time_base;
365 time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
366 tb, AV_TIME_BASE_Q);
367 } else {
368 /* Enforce recovery immediately */
369 time_since_recovery = fifo->recovery_wait_time;
370 }
371 } else {
372 30 time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
373 }
374
375
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
30 if (time_since_recovery < fifo->recovery_wait_time)
376 return AVERROR(EAGAIN);
377 }
378
379 45 ctx->recovery_nr++;
380
381
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 45 times.
45 if (fifo->max_recovery_attempts) {
382 av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n",
383 ctx->recovery_nr, fifo->max_recovery_attempts);
384 } else {
385 45 av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n",
386 ctx->recovery_nr);
387 }
388
389
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
45 if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow)
390 ctx->drop_until_keyframe = 1;
391
392 45 ret = fifo_thread_dispatch_message(ctx, msg);
393
2/2
✓ Branch 0 taken 30 times.
✓ Branch 1 taken 15 times.
45 if (ret < 0) {
394
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 if (is_recoverable(fifo, ret)) {
395 30 return fifo_thread_process_recovery_failure(ctx, pkt, ret);
396 } else {
397 goto fail;
398 }
399 } else {
400 15 av_log(avf, AV_LOG_INFO, "Recovery successful\n");
401 15 ctx->recovery_nr = 0;
402 }
403
404 15 return 0;
405
406 fail:
407 free_message(msg);
408 return ret;
409 }
410
411 15 static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
412 {
413 15 AVFormatContext *avf = ctx->avf;
414 15 FifoContext *fifo = avf->priv_data;
415 int ret;
416
417 do {
418
3/4
✓ Branch 0 taken 45 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 30 times.
✓ Branch 3 taken 15 times.
45 if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
419 30 int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
420 30 int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
421
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
30 if (time_to_wait)
422 av_usleep(FFMIN(10000, time_to_wait));
423 }
424
425 45 ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
426
3/4
✓ Branch 0 taken 30 times.
✓ Branch 1 taken 15 times.
✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
45 } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow);
427
428
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
15 if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) {
429 if (msg->type == FIFO_WRITE_PACKET)
430 av_packet_unref(&msg->pkt);
431 ret = 0;
432 }
433
434 15 return ret;
435 }
436
437 6 static void *fifo_consumer_thread(void *data)
438 {
439 6 AVFormatContext *avf = data;
440 6 FifoContext *fifo = avf->priv_data;
441 6 AVThreadMessageQueue *queue = fifo->queue;
442 6 FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}};
443 int ret, i;
444
445 FifoThreadContext fifo_thread_ctx;
446 6 memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
447 6 fifo_thread_ctx.avf = avf;
448 6 fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE;
449
450 6 ff_thread_setname("fifo-consumer");
451
452
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 5 times.
11 for (i = 0; i < avf->nb_streams; i++) {
453
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 5 times.
6 if (avf->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
454 1 fifo_thread_ctx.has_video_stream = 1;
455 1 break;
456 }
457 }
458
459 103 while (1) {
460 109 uint8_t just_flushed = 0;
461
462
1/2
✓ Branch 0 taken 109 times.
✗ Branch 1 not taken.
109 if (!fifo_thread_ctx.recovery_nr)
463 109 ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
464
465
3/4
✓ Branch 0 taken 94 times.
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 94 times.
109 if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
466 15 int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
467
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
15 if (rec_ret < 0) {
468 av_thread_message_queue_set_err_send(queue, rec_ret);
469 break;
470 }
471 }
472
473 /* If the queue is full at the moment when fifo_write_packet
474 * attempts to insert new message (packet) to the queue,
475 * it sets the fifo->overflow_flag to 1 and drops packet.
476 * Here in consumer thread, the flag is checked and if it is
477 * set, the queue is flushed and flag cleared. */
478 109 pthread_mutex_lock(&fifo->overflow_flag_lock);
479
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 108 times.
109 if (fifo->overflow_flag) {
480 1 av_thread_message_flush(queue);
481
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (fifo->restart_with_keyframe)
482 fifo_thread_ctx.drop_until_keyframe = 1;
483 1 fifo->overflow_flag = 0;
484 1 just_flushed = 1;
485 }
486 109 pthread_mutex_unlock(&fifo->overflow_flag_lock);
487
488
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 108 times.
109 if (just_flushed)
489 1 av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
490
491
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 109 times.
109 if (fifo->timeshift)
492 while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift)
493 av_usleep(10000);
494
495 109 ret = av_thread_message_queue_recv(queue, &msg, 0);
496
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 103 times.
109 if (ret < 0) {
497 6 av_thread_message_queue_set_err_send(queue, ret);
498 6 break;
499 }
500 }
501
502 6 fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
503
504 6 return NULL;
505 }
506
507 6 static int fifo_mux_init(AVFormatContext *avf, const AVOutputFormat *oformat,
508 const char *filename)
509 {
510 6 FifoContext *fifo = avf->priv_data;
511 AVFormatContext *avf2;
512 6 int ret = 0, i;
513
514 6 ret = avformat_alloc_output_context2(&avf2, oformat, NULL, filename);
515
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0)
516 return ret;
517
518 6 fifo->avf = avf2;
519
520 6 avf2->interrupt_callback = avf->interrupt_callback;
521 6 avf2->max_delay = avf->max_delay;
522 6 ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
523
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0)
524 return ret;
525 6 avf2->opaque = avf->opaque;
526 6 avf2->io_close2 = avf->io_close2;
527 6 avf2->io_open = avf->io_open;
528 6 avf2->flags = avf->flags;
529
530
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 6 times.
13 for (i = 0; i < avf->nb_streams; ++i) {
531 7 AVStream *st = ff_stream_clone(avf2, avf->streams[i]);
532
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
7 if (!st)
533 return AVERROR(ENOMEM);
534 }
535
536 6 return 0;
537 }
538
539 6 static int fifo_init(AVFormatContext *avf)
540 {
541 6 FifoContext *fifo = avf->priv_data;
542 const AVOutputFormat *oformat;
543 6 int ret = 0;
544
545
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
6 if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
546 av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
547 " only when drop_pkts_on_overflow is also turned on\n");
548 return AVERROR(EINVAL);
549 }
550 6 atomic_init(&fifo->queue_duration, 0);
551 6 fifo->last_sent_dts = AV_NOPTS_VALUE;
552
553 #ifdef FIFO_TEST
554 /* This exists for the fifo_muxer test tool. */
555
2/4
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
4 if (fifo->format && !strcmp(fifo->format, "fifo_test")) {
556 extern const FFOutputFormat ff_fifo_test_muxer;
557 4 oformat = &ff_fifo_test_muxer.p;
558 } else
559 #endif
560 2 oformat = av_guess_format(fifo->format, avf->url, NULL);
561
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (!oformat) {
562 ret = AVERROR_MUXER_NOT_FOUND;
563 return ret;
564 }
565
566 6 ret = fifo_mux_init(avf, oformat, avf->url);
567
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0)
568 return ret;
569
570 6 ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
571 sizeof(FifoMessage));
572
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0)
573 return ret;
574
575 6 av_thread_message_queue_set_free_func(fifo->queue, free_message);
576
577 6 ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
578
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0)
579 return AVERROR(ret);
580 6 fifo->overflow_flag_lock_initialized = 1;
581
582 6 return 0;
583 }
584
585 6 static int fifo_write_header(AVFormatContext *avf)
586 {
587 6 FifoContext * fifo = avf->priv_data;
588 int ret;
589
590 6 ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf);
591
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret) {
592 av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
593 av_err2str(AVERROR(ret)));
594 ret = AVERROR(ret);
595 }
596
597 6 return ret;
598 }
599
600 109 static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
601 {
602 109 FifoContext *fifo = avf->priv_data;
603
2/2
✓ Branch 0 taken 106 times.
✓ Branch 1 taken 3 times.
109 FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
604 int ret;
605
606
2/2
✓ Branch 0 taken 106 times.
✓ Branch 1 taken 3 times.
109 if (pkt) {
607 106 ret = av_packet_ref(&msg.pkt,pkt);
608
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 106 times.
106 if (ret < 0)
609 return ret;
610 }
611
612 109 ret = av_thread_message_queue_send(fifo->queue, &msg,
613 109 fifo->drop_pkts_on_overflow ?
614 AV_THREAD_MESSAGE_NONBLOCK : 0);
615
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 106 times.
109 if (ret == AVERROR(EAGAIN)) {
616 3 uint8_t overflow_set = 0;
617
618 /* Queue is full, set fifo->overflow_flag to 1
619 * to let consumer thread know the queue should
620 * be flushed. */
621 3 pthread_mutex_lock(&fifo->overflow_flag_lock);
622
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 2 times.
3 if (!fifo->overflow_flag)
623 1 fifo->overflow_flag = overflow_set = 1;
624 3 pthread_mutex_unlock(&fifo->overflow_flag_lock);
625
626
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 2 times.
3 if (overflow_set)
627 1 av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
628 3 ret = 0;
629 3 goto fail;
630
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 106 times.
106 } else if (ret < 0) {
631 goto fail;
632 }
633
634
1/6
✗ Branch 0 not taken.
✓ Branch 1 taken 106 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
106 if (fifo->timeshift && pkt && pkt->dts != AV_NOPTS_VALUE)
635 atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed);
636
637 106 return ret;
638 3 fail:
639
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if (pkt)
640 3 av_packet_unref(&msg.pkt);
641 3 return ret;
642 }
643
644 6 static int fifo_write_trailer(AVFormatContext *avf)
645 {
646 6 FifoContext *fifo= avf->priv_data;
647 int ret;
648
649 6 av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
650
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (fifo->timeshift) {
651 int64_t now = av_gettime_relative();
652 int64_t elapsed = 0;
653 FifoMessage msg = {FIFO_NOOP};
654 do {
655 int64_t delay = av_gettime_relative() - now;
656 if (delay < 0) { // Discontinuity?
657 delay = 10000;
658 now = av_gettime_relative();
659 } else {
660 now += delay;
661 }
662 atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed);
663 elapsed += delay;
664 if (elapsed > fifo->timeshift)
665 break;
666 av_usleep(10000);
667 ret = av_thread_message_queue_send(fifo->queue, &msg, AV_THREAD_MESSAGE_NONBLOCK);
668 } while (ret >= 0 || ret == AVERROR(EAGAIN));
669 atomic_store(&fifo->queue_duration, INT64_MAX);
670 }
671
672 6 ret = pthread_join(fifo->writer_thread, NULL);
673
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0) {
674 av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
675 av_err2str(AVERROR(ret)));
676 return AVERROR(ret);
677 }
678
679 6 ret = fifo->write_trailer_ret;
680 6 return ret;
681 }
682
683 6 static void fifo_deinit(AVFormatContext *avf)
684 {
685 6 FifoContext *fifo = avf->priv_data;
686
687 6 avformat_free_context(fifo->avf);
688 6 av_thread_message_queue_free(&fifo->queue);
689
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (fifo->overflow_flag_lock_initialized)
690 6 pthread_mutex_destroy(&fifo->overflow_flag_lock);
691 6 }
692
693 #define OFFSET(x) offsetof(FifoContext, x)
694 static const AVOption options[] = {
695 {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
696 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
697
698 {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow),
699 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
700
701 {"fifo_format", "Target muxer", OFFSET(format),
702 AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
703
704 {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options),
705 AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
706
707 {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
708 AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
709
710 {"queue_size", "Size of fifo queue", OFFSET(queue_size),
711 AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
712
713 {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
714 OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
715
716 {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
717 AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
718
719 {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
720 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
721
722 {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
723 AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
724
725 {"timeshift", "Delay fifo output", OFFSET(timeshift),
726 AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
727
728 {NULL},
729 };
730
731 static const AVClass fifo_muxer_class = {
732 .class_name = "Fifo muxer",
733 .item_name = av_default_item_name,
734 .option = options,
735 .version = LIBAVUTIL_VERSION_INT,
736 };
737
738 const FFOutputFormat ff_fifo_muxer = {
739 .p.name = "fifo",
740 .p.long_name = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
741 .p.priv_class = &fifo_muxer_class,
742 .p.flags = AVFMT_NOFILE | AVFMT_TS_NEGATIVE,
743 .priv_data_size = sizeof(FifoContext),
744 .init = fifo_init,
745 .write_header = fifo_write_header,
746 .write_packet = fifo_write_packet,
747 .write_trailer = fifo_write_trailer,
748 .deinit = fifo_deinit,
749 .flags_internal = FF_OFMT_FLAG_ALLOW_FLUSH,
750 };
751