FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/libavformat/tee.c
Date: 2025-01-20 09:27:23
Exec Total Coverage
Lines: 0 329 0.0%
Functions: 0 11 0.0%
Branches: 0 192 0.0%

Line Branch Exec Source
1 /*
2 * Tee pseudo-muxer
3 * Copyright (c) 2012 Nicolas George
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public License
9 * as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
19 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22
23 #include "libavutil/avutil.h"
24 #include "libavutil/avstring.h"
25 #include "libavutil/mem.h"
26 #include "libavutil/opt.h"
27 #include "libavcodec/bsf.h"
28 #include "internal.h"
29 #include "avformat.h"
30 #include "mux.h"
31 #include "tee_common.h"
32
33 typedef enum {
34 ON_SLAVE_FAILURE_ABORT = 1,
35 ON_SLAVE_FAILURE_IGNORE = 2
36 } SlaveFailurePolicy;
37
38 #define DEFAULT_SLAVE_FAILURE_POLICY ON_SLAVE_FAILURE_ABORT
39
40 typedef struct {
41 AVFormatContext *avf;
42 AVBSFContext **bsfs; ///< bitstream filters per stream
43
44 SlaveFailurePolicy on_fail;
45 int use_fifo;
46 AVDictionary *fifo_options;
47
48 /** map from input to output streams indexes,
49 * disabled output streams are set to -1 */
50 int *stream_map;
51 int header_written;
52 } TeeSlave;
53
54 typedef struct TeeContext {
55 const AVClass *class;
56 unsigned nb_slaves;
57 unsigned nb_alive;
58 TeeSlave *slaves;
59 int use_fifo;
60 AVDictionary *fifo_options;
61 } TeeContext;
62
63 static const char *const slave_delim = "|";
64 static const char *const slave_bsfs_spec_sep = "/";
65 static const char *const slave_select_sep = ",";
66
67 #define OFFSET(x) offsetof(TeeContext, x)
68 static const AVOption options[] = {
69 {"use_fifo", "Use fifo pseudo-muxer to separate actual muxers from encoder",
70 OFFSET(use_fifo), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
71 {"fifo_options", "fifo pseudo-muxer options", OFFSET(fifo_options),
72 AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
73 {NULL}
74 };
75
76 static const AVClass tee_muxer_class = {
77 .class_name = "Tee muxer",
78 .item_name = av_default_item_name,
79 .option = options,
80 .version = LIBAVUTIL_VERSION_INT,
81 };
82
83 static inline int parse_slave_failure_policy_option(const char *opt, TeeSlave *tee_slave)
84 {
85 if (!opt) {
86 tee_slave->on_fail = DEFAULT_SLAVE_FAILURE_POLICY;
87 return 0;
88 } else if (!av_strcasecmp("abort", opt)) {
89 tee_slave->on_fail = ON_SLAVE_FAILURE_ABORT;
90 return 0;
91 } else if (!av_strcasecmp("ignore", opt)) {
92 tee_slave->on_fail = ON_SLAVE_FAILURE_IGNORE;
93 return 0;
94 }
95 /* Set failure behaviour to abort, so invalid option error will not be ignored */
96 tee_slave->on_fail = ON_SLAVE_FAILURE_ABORT;
97 return AVERROR(EINVAL);
98 }
99
100 static int parse_slave_fifo_policy(const char *use_fifo, TeeSlave *tee_slave)
101 {
102 /*TODO - change this to use proper function for parsing boolean
103 * options when there is one */
104 if (av_match_name(use_fifo, "true,y,yes,enable,enabled,on,1")) {
105 tee_slave->use_fifo = 1;
106 } else if (av_match_name(use_fifo, "false,n,no,disable,disabled,off,0")) {
107 tee_slave->use_fifo = 0;
108 } else {
109 return AVERROR(EINVAL);
110 }
111 return 0;
112 }
113
114 static int parse_slave_fifo_options(const char *fifo_options, TeeSlave *tee_slave)
115 {
116 return av_dict_parse_string(&tee_slave->fifo_options, fifo_options, "=", ":", 0);
117 }
118
119 static int close_slave(TeeSlave *tee_slave)
120 {
121 AVFormatContext *avf;
122 int ret = 0;
123
124 av_dict_free(&tee_slave->fifo_options);
125 avf = tee_slave->avf;
126 if (!avf)
127 return 0;
128
129 if (tee_slave->header_written)
130 ret = av_write_trailer(avf);
131
132 if (tee_slave->bsfs) {
133 for (unsigned i = 0; i < avf->nb_streams; ++i)
134 av_bsf_free(&tee_slave->bsfs[i]);
135 }
136 av_freep(&tee_slave->stream_map);
137 av_freep(&tee_slave->bsfs);
138
139 ff_format_io_close(avf, &avf->pb);
140 avformat_free_context(avf);
141 tee_slave->avf = NULL;
142 return ret;
143 }
144
145 static void close_slaves(AVFormatContext *avf)
146 {
147 TeeContext *tee = avf->priv_data;
148
149 for (unsigned i = 0; i < tee->nb_slaves; i++) {
150 close_slave(&tee->slaves[i]);
151 }
152 av_freep(&tee->slaves);
153 }
154
155 static int open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave)
156 {
157 int ret;
158 AVDictionary *options = NULL, *bsf_options = NULL;
159 const AVDictionaryEntry *entry;
160 char *filename;
161 char *format = NULL, *select = NULL;
162 AVFormatContext *avf2 = NULL;
163 int stream_count;
164 int fullret;
165 char *subselect = NULL, *next_subselect = NULL, *first_subselect = NULL, *tmp_select = NULL;
166
167 if ((ret = ff_tee_parse_slave_options(avf, slave, &options, &filename)) < 0)
168 return ret;
169
170 #define CONSUME_OPTION(option, field, action) do { \
171 AVDictionaryEntry *en = av_dict_get(options, option, NULL, 0); \
172 if (en) { \
173 field = en->value; \
174 { action } \
175 av_dict_set(&options, option, NULL, 0); \
176 } \
177 } while (0)
178 #define STEAL_OPTION(option, field) \
179 CONSUME_OPTION(option, field, \
180 en->value = NULL; /* prevent it from being freed */)
181 #define PROCESS_OPTION(option, function, on_error) do { \
182 const char *value; \
183 CONSUME_OPTION(option, value, if ((ret = function) < 0) \
184 { { on_error } goto end; }); \
185 } while (0)
186
187 STEAL_OPTION("f", format);
188 STEAL_OPTION("select", select);
189 PROCESS_OPTION("onfail",
190 parse_slave_failure_policy_option(value, tee_slave),
191 av_log(avf, AV_LOG_ERROR, "Invalid onfail option value, "
192 "valid options are 'abort' and 'ignore'\n"););
193 PROCESS_OPTION("use_fifo",
194 parse_slave_fifo_policy(value, tee_slave),
195 av_log(avf, AV_LOG_ERROR, "Error parsing fifo options: %s\n",
196 av_err2str(ret)););
197 PROCESS_OPTION("fifo_options",
198 parse_slave_fifo_options(value, tee_slave), ;);
199 entry = NULL;
200 while ((entry = av_dict_get(options, "bsfs", entry, AV_DICT_IGNORE_SUFFIX))) {
201 /* trim out strlen("bsfs") characters from key */
202 av_dict_set(&bsf_options, entry->key + 4, entry->value, 0);
203 av_dict_set(&options, entry->key, NULL, 0);
204 }
205
206 if (tee_slave->use_fifo) {
207
208 if (options) {
209 char *format_options_str = NULL;
210 ret = av_dict_get_string(options, &format_options_str, '=', ':');
211 if (ret < 0)
212 goto end;
213
214 ret = av_dict_set(&tee_slave->fifo_options, "format_opts", format_options_str,
215 AV_DICT_DONT_STRDUP_VAL);
216 if (ret < 0)
217 goto end;
218 }
219
220 if (format) {
221 ret = av_dict_set(&tee_slave->fifo_options, "fifo_format", format,
222 AV_DICT_DONT_STRDUP_VAL);
223 format = NULL;
224 if (ret < 0)
225 goto end;
226 }
227
228 av_dict_free(&options);
229 options = tee_slave->fifo_options;
230 tee_slave->fifo_options = NULL;
231 }
232 ret = avformat_alloc_output_context2(&avf2, NULL,
233 tee_slave->use_fifo ? "fifo" :format, filename);
234 if (ret < 0)
235 goto end;
236 tee_slave->avf = avf2;
237 av_dict_copy(&avf2->metadata, avf->metadata, 0);
238 avf2->opaque = avf->opaque;
239 avf2->io_open = avf->io_open;
240 avf2->io_close2 = avf->io_close2;
241 avf2->interrupt_callback = avf->interrupt_callback;
242 avf2->flags = avf->flags;
243 avf2->strict_std_compliance = avf->strict_std_compliance;
244
245 tee_slave->stream_map = av_calloc(avf->nb_streams, sizeof(*tee_slave->stream_map));
246 if (!tee_slave->stream_map) {
247 ret = AVERROR(ENOMEM);
248 goto end;
249 }
250
251 stream_count = 0;
252 for (unsigned i = 0; i < avf->nb_streams; i++) {
253 const AVStream *st = avf->streams[i];
254 AVStream *st2;
255 if (select) {
256 tmp_select = av_strdup(select); // av_strtok is destructive so we regenerate it in each loop
257 if (!tmp_select) {
258 ret = AVERROR(ENOMEM);
259 goto end;
260 }
261 fullret = 0;
262 first_subselect = tmp_select;
263 next_subselect = NULL;
264 while (subselect = av_strtok(first_subselect, slave_select_sep, &next_subselect)) {
265 first_subselect = NULL;
266
267 ret = avformat_match_stream_specifier(avf, avf->streams[i], subselect);
268 if (ret < 0) {
269 av_log(avf, AV_LOG_ERROR,
270 "Invalid stream specifier '%s' for output '%s'\n",
271 subselect, slave);
272 goto end;
273 }
274 if (ret != 0) {
275 fullret = 1; // match
276 break;
277 }
278 }
279 av_freep(&tmp_select);
280
281 if (fullret == 0) { /* no match */
282 tee_slave->stream_map[i] = -1;
283 continue;
284 }
285 }
286 tee_slave->stream_map[i] = stream_count++;
287
288 st2 = ff_stream_clone(avf2, st);
289 if (!st2) {
290 ret = AVERROR(ENOMEM);
291 goto end;
292 }
293 }
294
295 ret = ff_format_output_open(avf2, filename, &options);
296 if (ret < 0) {
297 av_log(avf, AV_LOG_ERROR, "Slave '%s': error opening: %s\n", slave,
298 av_err2str(ret));
299 goto end;
300 }
301
302 if ((ret = avformat_write_header(avf2, &options)) < 0) {
303 av_log(avf, AV_LOG_ERROR, "Slave '%s': error writing header: %s\n",
304 slave, av_err2str(ret));
305 goto end;
306 }
307 tee_slave->header_written = 1;
308
309 tee_slave->bsfs = av_calloc(avf2->nb_streams, sizeof(*tee_slave->bsfs));
310 if (!tee_slave->bsfs) {
311 ret = AVERROR(ENOMEM);
312 goto end;
313 }
314
315 entry = NULL;
316 while (entry = av_dict_iterate(bsf_options, NULL)) {
317 const char *spec = entry->key;
318 if (*spec) {
319 if (strspn(spec, slave_bsfs_spec_sep) != 1) {
320 av_log(avf, AV_LOG_ERROR,
321 "Specifier separator in '%s' is '%c', but only characters '%s' "
322 "are allowed\n", entry->key, *spec, slave_bsfs_spec_sep);
323 ret = AVERROR(EINVAL);
324 goto end;
325 }
326 spec++; /* consume separator */
327 }
328
329 for (unsigned i = 0; i < avf2->nb_streams; i++) {
330 ret = avformat_match_stream_specifier(avf2, avf2->streams[i], spec);
331 if (ret < 0) {
332 av_log(avf, AV_LOG_ERROR,
333 "Invalid stream specifier '%s' in bsfs option '%s' for slave "
334 "output '%s'\n", spec, entry->key, filename);
335 goto end;
336 }
337
338 if (ret > 0) {
339 av_log(avf, AV_LOG_DEBUG, "spec:%s bsfs:%s matches stream %d of slave "
340 "output '%s'\n", spec, entry->value, i, filename);
341 if (tee_slave->bsfs[i]) {
342 av_log(avf, AV_LOG_WARNING,
343 "Duplicate bsfs specification associated to stream %d of slave "
344 "output '%s', filters will be ignored\n", i, filename);
345 continue;
346 }
347 ret = av_bsf_list_parse_str(entry->value, &tee_slave->bsfs[i]);
348 if (ret < 0) {
349 av_log(avf, AV_LOG_ERROR,
350 "Error parsing bitstream filter sequence '%s' associated to "
351 "stream %d of slave output '%s'\n", entry->value, i, filename);
352 goto end;
353 }
354 }
355 }
356
357 av_dict_set(&bsf_options, entry->key, NULL, 0);
358 }
359
360 for (unsigned i = 0; i < avf->nb_streams; i++){
361 int target_stream = tee_slave->stream_map[i];
362 if (target_stream < 0)
363 continue;
364
365 if (!tee_slave->bsfs[target_stream]) {
366 /* Add pass-through bitstream filter */
367 ret = av_bsf_get_null_filter(&tee_slave->bsfs[target_stream]);
368 if (ret < 0) {
369 av_log(avf, AV_LOG_ERROR,
370 "Failed to create pass-through bitstream filter: %s\n",
371 av_err2str(ret));
372 goto end;
373 }
374 }
375
376 tee_slave->bsfs[target_stream]->time_base_in = avf->streams[i]->time_base;
377 ret = avcodec_parameters_copy(tee_slave->bsfs[target_stream]->par_in,
378 avf->streams[i]->codecpar);
379 if (ret < 0)
380 goto end;
381
382 ret = av_bsf_init(tee_slave->bsfs[target_stream]);
383 if (ret < 0) {
384 av_log(avf, AV_LOG_ERROR,
385 "Failed to initialize bitstream filter(s): %s\n",
386 av_err2str(ret));
387 goto end;
388 }
389 }
390
391 if (options) {
392 entry = NULL;
393 while ((entry = av_dict_iterate(options, entry)))
394 av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
395 ret = AVERROR_OPTION_NOT_FOUND;
396 goto end;
397 }
398
399 end:
400 av_free(format);
401 av_free(select);
402 av_dict_free(&options);
403 av_dict_free(&bsf_options);
404 av_freep(&tmp_select);
405 return ret;
406 }
407
408 static void log_slave(TeeSlave *slave, void *log_ctx, int log_level)
409 {
410 av_log(log_ctx, log_level, "filename:'%s' format:%s\n",
411 slave->avf->url, slave->avf->oformat->name);
412 for (unsigned i = 0; i < slave->avf->nb_streams; i++) {
413 AVStream *st = slave->avf->streams[i];
414 AVBSFContext *bsf = slave->bsfs[i];
415 const char *bsf_name;
416
417 av_log(log_ctx, log_level, " stream:%d codec:%s type:%s",
418 i, avcodec_get_name(st->codecpar->codec_id),
419 av_get_media_type_string(st->codecpar->codec_type));
420
421 bsf_name = bsf->filter->priv_class ?
422 bsf->filter->priv_class->item_name(bsf) : bsf->filter->name;
423 av_log(log_ctx, log_level, " bsfs: %s\n", bsf_name);
424 }
425 }
426
427 static int tee_process_slave_failure(AVFormatContext *avf, unsigned slave_idx, int err_n)
428 {
429 TeeContext *tee = avf->priv_data;
430 TeeSlave *tee_slave = &tee->slaves[slave_idx];
431
432 tee->nb_alive--;
433
434 close_slave(tee_slave);
435
436 if (!tee->nb_alive) {
437 av_log(avf, AV_LOG_ERROR, "All tee outputs failed.\n");
438 return err_n;
439 } else if (tee_slave->on_fail == ON_SLAVE_FAILURE_ABORT) {
440 av_log(avf, AV_LOG_ERROR, "Slave muxer #%u failed, aborting.\n", slave_idx);
441 return err_n;
442 } else {
443 av_log(avf, AV_LOG_ERROR, "Slave muxer #%u failed: %s, continuing with %u/%u slaves.\n",
444 slave_idx, av_err2str(err_n), tee->nb_alive, tee->nb_slaves);
445 return 0;
446 }
447 }
448
449 static int tee_write_header(AVFormatContext *avf)
450 {
451 TeeContext *tee = avf->priv_data;
452 unsigned nb_slaves = 0;
453 const char *filename = avf->url;
454 char **slaves = NULL;
455 int ret;
456
457 while (*filename) {
458 char *slave = av_get_token(&filename, slave_delim);
459 if (!slave) {
460 ret = AVERROR(ENOMEM);
461 goto fail;
462 }
463 ret = av_dynarray_add_nofree(&slaves, &nb_slaves, slave);
464 if (ret < 0) {
465 av_free(slave);
466 goto fail;
467 }
468 if (strspn(filename, slave_delim))
469 filename++;
470 }
471
472 if (!FF_ALLOCZ_TYPED_ARRAY(tee->slaves, nb_slaves)) {
473 ret = AVERROR(ENOMEM);
474 goto fail;
475 }
476 tee->nb_slaves = tee->nb_alive = nb_slaves;
477
478 for (unsigned i = 0; i < nb_slaves; i++) {
479
480 tee->slaves[i].use_fifo = tee->use_fifo;
481 ret = av_dict_copy(&tee->slaves[i].fifo_options, tee->fifo_options, 0);
482 if (ret < 0)
483 goto fail;
484
485 if ((ret = open_slave(avf, slaves[i], &tee->slaves[i])) < 0) {
486 ret = tee_process_slave_failure(avf, i, ret);
487 if (ret < 0)
488 goto fail;
489 } else {
490 log_slave(&tee->slaves[i], avf, AV_LOG_VERBOSE);
491 }
492 av_freep(&slaves[i]);
493 }
494
495 for (unsigned i = 0; i < avf->nb_streams; i++) {
496 int mapped = 0;
497 for (unsigned j = 0; j < tee->nb_slaves; j++)
498 if (tee->slaves[j].avf)
499 mapped += tee->slaves[j].stream_map[i] >= 0;
500 if (!mapped)
501 av_log(avf, AV_LOG_WARNING, "Input stream #%d is not mapped "
502 "to any slave.\n", i);
503 }
504 av_free(slaves);
505 return 0;
506
507 fail:
508 for (unsigned i = 0; i < nb_slaves; i++)
509 av_freep(&slaves[i]);
510 close_slaves(avf);
511 av_free(slaves);
512 return ret;
513 }
514
515 static int tee_write_trailer(AVFormatContext *avf)
516 {
517 TeeContext *tee = avf->priv_data;
518 int ret_all = 0, ret;
519
520 for (unsigned i = 0; i < tee->nb_slaves; i++) {
521 if ((ret = close_slave(&tee->slaves[i])) < 0) {
522 ret = tee_process_slave_failure(avf, i, ret);
523 if (!ret_all && ret < 0)
524 ret_all = ret;
525 }
526 }
527 av_freep(&tee->slaves);
528 return ret_all;
529 }
530
531 static int tee_write_packet(AVFormatContext *avf, AVPacket *pkt)
532 {
533 TeeContext *tee = avf->priv_data;
534 AVPacket *const pkt2 = ffformatcontext(avf)->pkt;
535 int ret_all = 0, ret;
536 unsigned s;
537 int s2;
538
539 for (unsigned i = 0; i < tee->nb_slaves; i++) {
540 AVFormatContext *avf2 = tee->slaves[i].avf;
541 AVBSFContext *bsfs;
542
543 if (!avf2)
544 continue;
545
546 /* Flush slave if pkt is NULL*/
547 if (!pkt) {
548 ret = av_interleaved_write_frame(avf2, NULL);
549 if (ret < 0) {
550 ret = tee_process_slave_failure(avf, i, ret);
551 if (!ret_all && ret < 0)
552 ret_all = ret;
553 }
554 continue;
555 }
556
557 s = pkt->stream_index;
558 s2 = tee->slaves[i].stream_map[s];
559 if (s2 < 0)
560 continue;
561
562 if ((ret = av_packet_ref(pkt2, pkt)) < 0) {
563 if (!ret_all)
564 ret_all = ret;
565 continue;
566 }
567 bsfs = tee->slaves[i].bsfs[s2];
568 pkt2->stream_index = s2;
569
570 ret = av_bsf_send_packet(bsfs, pkt2);
571 if (ret < 0) {
572 av_packet_unref(pkt2);
573 av_log(avf, AV_LOG_ERROR, "Error while sending packet to bitstream filter: %s\n",
574 av_err2str(ret));
575 ret = tee_process_slave_failure(avf, i, ret);
576 if (!ret_all && ret < 0)
577 ret_all = ret;
578 }
579
580 while(1) {
581 ret = av_bsf_receive_packet(bsfs, pkt2);
582 if (ret == AVERROR(EAGAIN)) {
583 ret = 0;
584 break;
585 } else if (ret < 0) {
586 break;
587 }
588
589 av_packet_rescale_ts(pkt2, bsfs->time_base_out,
590 avf2->streams[s2]->time_base);
591 ret = av_interleaved_write_frame(avf2, pkt2);
592 if (ret < 0)
593 break;
594 };
595
596 if (ret < 0) {
597 ret = tee_process_slave_failure(avf, i, ret);
598 if (!ret_all && ret < 0)
599 ret_all = ret;
600 }
601 }
602 return ret_all;
603 }
604
605 const FFOutputFormat ff_tee_muxer = {
606 .p.name = "tee",
607 .p.long_name = NULL_IF_CONFIG_SMALL("Multiple muxer tee"),
608 .priv_data_size = sizeof(TeeContext),
609 .write_header = tee_write_header,
610 .write_trailer = tee_write_trailer,
611 .write_packet = tee_write_packet,
612 .p.priv_class = &tee_muxer_class,
613 #if FF_API_ALLOW_FLUSH
614 .p.flags = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE,
615 #else
616 .p.flags = AVFMT_NOFILE | AVFMT_TS_NEGATIVE,
617 #endif
618 .flags_internal = FF_OFMT_FLAG_ALLOW_FLUSH,
619 };
620