FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/libavformat/tee.c
Date: 2024-04-26 14:42:52
Exec Total Coverage
Lines: 0 326 0.0%
Functions: 0 11 0.0%
Branches: 0 192 0.0%

Line Branch Exec Source
1 /*
2 * Tee pseudo-muxer
3 * Copyright (c) 2012 Nicolas George
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public License
9 * as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
19 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22
23 #include "libavutil/avutil.h"
24 #include "libavutil/avstring.h"
25 #include "libavutil/mem.h"
26 #include "libavutil/opt.h"
27 #include "libavcodec/bsf.h"
28 #include "internal.h"
29 #include "avformat.h"
30 #include "mux.h"
31 #include "tee_common.h"
32
33 typedef enum {
34 ON_SLAVE_FAILURE_ABORT = 1,
35 ON_SLAVE_FAILURE_IGNORE = 2
36 } SlaveFailurePolicy;
37
38 #define DEFAULT_SLAVE_FAILURE_POLICY ON_SLAVE_FAILURE_ABORT
39
40 typedef struct {
41 AVFormatContext *avf;
42 AVBSFContext **bsfs; ///< bitstream filters per stream
43
44 SlaveFailurePolicy on_fail;
45 int use_fifo;
46 AVDictionary *fifo_options;
47
48 /** map from input to output streams indexes,
49 * disabled output streams are set to -1 */
50 int *stream_map;
51 int header_written;
52 } TeeSlave;
53
54 typedef struct TeeContext {
55 const AVClass *class;
56 unsigned nb_slaves;
57 unsigned nb_alive;
58 TeeSlave *slaves;
59 int use_fifo;
60 AVDictionary *fifo_options;
61 } TeeContext;
62
63 static const char *const slave_delim = "|";
64 static const char *const slave_bsfs_spec_sep = "/";
65 static const char *const slave_select_sep = ",";
66
67 #define OFFSET(x) offsetof(TeeContext, x)
68 static const AVOption options[] = {
69 {"use_fifo", "Use fifo pseudo-muxer to separate actual muxers from encoder",
70 OFFSET(use_fifo), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
71 {"fifo_options", "fifo pseudo-muxer options", OFFSET(fifo_options),
72 AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
73 {NULL}
74 };
75
76 static const AVClass tee_muxer_class = {
77 .class_name = "Tee muxer",
78 .item_name = av_default_item_name,
79 .option = options,
80 .version = LIBAVUTIL_VERSION_INT,
81 };
82
83 static inline int parse_slave_failure_policy_option(const char *opt, TeeSlave *tee_slave)
84 {
85 if (!opt) {
86 tee_slave->on_fail = DEFAULT_SLAVE_FAILURE_POLICY;
87 return 0;
88 } else if (!av_strcasecmp("abort", opt)) {
89 tee_slave->on_fail = ON_SLAVE_FAILURE_ABORT;
90 return 0;
91 } else if (!av_strcasecmp("ignore", opt)) {
92 tee_slave->on_fail = ON_SLAVE_FAILURE_IGNORE;
93 return 0;
94 }
95 /* Set failure behaviour to abort, so invalid option error will not be ignored */
96 tee_slave->on_fail = ON_SLAVE_FAILURE_ABORT;
97 return AVERROR(EINVAL);
98 }
99
100 static int parse_slave_fifo_policy(const char *use_fifo, TeeSlave *tee_slave)
101 {
102 /*TODO - change this to use proper function for parsing boolean
103 * options when there is one */
104 if (av_match_name(use_fifo, "true,y,yes,enable,enabled,on,1")) {
105 tee_slave->use_fifo = 1;
106 } else if (av_match_name(use_fifo, "false,n,no,disable,disabled,off,0")) {
107 tee_slave->use_fifo = 0;
108 } else {
109 return AVERROR(EINVAL);
110 }
111 return 0;
112 }
113
114 static int parse_slave_fifo_options(const char *fifo_options, TeeSlave *tee_slave)
115 {
116 return av_dict_parse_string(&tee_slave->fifo_options, fifo_options, "=", ":", 0);
117 }
118
119 static int close_slave(TeeSlave *tee_slave)
120 {
121 AVFormatContext *avf;
122 unsigned i;
123 int ret = 0;
124
125 av_dict_free(&tee_slave->fifo_options);
126 avf = tee_slave->avf;
127 if (!avf)
128 return 0;
129
130 if (tee_slave->header_written)
131 ret = av_write_trailer(avf);
132
133 if (tee_slave->bsfs) {
134 for (i = 0; i < avf->nb_streams; ++i)
135 av_bsf_free(&tee_slave->bsfs[i]);
136 }
137 av_freep(&tee_slave->stream_map);
138 av_freep(&tee_slave->bsfs);
139
140 ff_format_io_close(avf, &avf->pb);
141 avformat_free_context(avf);
142 tee_slave->avf = NULL;
143 return ret;
144 }
145
146 static void close_slaves(AVFormatContext *avf)
147 {
148 TeeContext *tee = avf->priv_data;
149 unsigned i;
150
151 for (i = 0; i < tee->nb_slaves; i++) {
152 close_slave(&tee->slaves[i]);
153 }
154 av_freep(&tee->slaves);
155 }
156
157 static int open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave)
158 {
159 int i, ret;
160 AVDictionary *options = NULL, *bsf_options = NULL;
161 AVDictionaryEntry *entry;
162 char *filename;
163 char *format = NULL, *select = NULL, *on_fail = NULL;
164 char *use_fifo = NULL, *fifo_options_str = NULL;
165 AVFormatContext *avf2 = NULL;
166 AVStream *st, *st2;
167 int stream_count;
168 int fullret;
169 char *subselect = NULL, *next_subselect = NULL, *first_subselect = NULL, *tmp_select = NULL;
170
171 if ((ret = ff_tee_parse_slave_options(avf, slave, &options, &filename)) < 0)
172 return ret;
173
174 #define CONSUME_OPTION(option, field, action) do { \
175 if ((entry = av_dict_get(options, option, NULL, 0))) { \
176 field = entry->value; \
177 { action } \
178 av_dict_set(&options, option, NULL, 0); \
179 } \
180 } while (0)
181 #define STEAL_OPTION(option, field) \
182 CONSUME_OPTION(option, field, \
183 entry->value = NULL; /* prevent it from being freed */)
184 #define PROCESS_OPTION(option, field, function, on_error) \
185 CONSUME_OPTION(option, field, if ((ret = function) < 0) { { on_error } goto end; })
186
187 STEAL_OPTION("f", format);
188 STEAL_OPTION("select", select);
189 PROCESS_OPTION("onfail", on_fail,
190 parse_slave_failure_policy_option(on_fail, tee_slave),
191 av_log(avf, AV_LOG_ERROR, "Invalid onfail option value, "
192 "valid options are 'abort' and 'ignore'\n"););
193 PROCESS_OPTION("use_fifo", use_fifo,
194 parse_slave_fifo_policy(use_fifo, tee_slave),
195 av_log(avf, AV_LOG_ERROR, "Error parsing fifo options: %s\n",
196 av_err2str(ret)););
197 PROCESS_OPTION("fifo_options", fifo_options_str,
198 parse_slave_fifo_options(fifo_options_str, tee_slave), ;);
199 entry = NULL;
200 while ((entry = av_dict_get(options, "bsfs", entry, AV_DICT_IGNORE_SUFFIX))) {
201 /* trim out strlen("bsfs") characters from key */
202 av_dict_set(&bsf_options, entry->key + 4, entry->value, 0);
203 av_dict_set(&options, entry->key, NULL, 0);
204 }
205
206 if (tee_slave->use_fifo) {
207
208 if (options) {
209 char *format_options_str = NULL;
210 ret = av_dict_get_string(options, &format_options_str, '=', ':');
211 if (ret < 0)
212 goto end;
213
214 ret = av_dict_set(&tee_slave->fifo_options, "format_opts", format_options_str,
215 AV_DICT_DONT_STRDUP_VAL);
216 if (ret < 0)
217 goto end;
218 }
219
220 if (format) {
221 ret = av_dict_set(&tee_slave->fifo_options, "fifo_format", format,
222 AV_DICT_DONT_STRDUP_VAL);
223 format = NULL;
224 if (ret < 0)
225 goto end;
226 }
227
228 av_dict_free(&options);
229 options = tee_slave->fifo_options;
230 tee_slave->fifo_options = NULL;
231 }
232 ret = avformat_alloc_output_context2(&avf2, NULL,
233 tee_slave->use_fifo ? "fifo" :format, filename);
234 if (ret < 0)
235 goto end;
236 tee_slave->avf = avf2;
237 av_dict_copy(&avf2->metadata, avf->metadata, 0);
238 avf2->opaque = avf->opaque;
239 avf2->io_open = avf->io_open;
240 avf2->io_close2 = avf->io_close2;
241 avf2->interrupt_callback = avf->interrupt_callback;
242 avf2->flags = avf->flags;
243 avf2->strict_std_compliance = avf->strict_std_compliance;
244
245 tee_slave->stream_map = av_calloc(avf->nb_streams, sizeof(*tee_slave->stream_map));
246 if (!tee_slave->stream_map) {
247 ret = AVERROR(ENOMEM);
248 goto end;
249 }
250
251 stream_count = 0;
252 for (i = 0; i < avf->nb_streams; i++) {
253 st = avf->streams[i];
254 if (select) {
255 tmp_select = av_strdup(select); // av_strtok is destructive so we regenerate it in each loop
256 if (!tmp_select) {
257 ret = AVERROR(ENOMEM);
258 goto end;
259 }
260 fullret = 0;
261 first_subselect = tmp_select;
262 next_subselect = NULL;
263 while (subselect = av_strtok(first_subselect, slave_select_sep, &next_subselect)) {
264 first_subselect = NULL;
265
266 ret = avformat_match_stream_specifier(avf, avf->streams[i], subselect);
267 if (ret < 0) {
268 av_log(avf, AV_LOG_ERROR,
269 "Invalid stream specifier '%s' for output '%s'\n",
270 subselect, slave);
271 goto end;
272 }
273 if (ret != 0) {
274 fullret = 1; // match
275 break;
276 }
277 }
278 av_freep(&tmp_select);
279
280 if (fullret == 0) { /* no match */
281 tee_slave->stream_map[i] = -1;
282 continue;
283 }
284 }
285 tee_slave->stream_map[i] = stream_count++;
286
287 st2 = ff_stream_clone(avf2, st);
288 if (!st2) {
289 ret = AVERROR(ENOMEM);
290 goto end;
291 }
292 }
293
294 ret = ff_format_output_open(avf2, filename, &options);
295 if (ret < 0) {
296 av_log(avf, AV_LOG_ERROR, "Slave '%s': error opening: %s\n", slave,
297 av_err2str(ret));
298 goto end;
299 }
300
301 if ((ret = avformat_write_header(avf2, &options)) < 0) {
302 av_log(avf, AV_LOG_ERROR, "Slave '%s': error writing header: %s\n",
303 slave, av_err2str(ret));
304 goto end;
305 }
306 tee_slave->header_written = 1;
307
308 tee_slave->bsfs = av_calloc(avf2->nb_streams, sizeof(*tee_slave->bsfs));
309 if (!tee_slave->bsfs) {
310 ret = AVERROR(ENOMEM);
311 goto end;
312 }
313
314 entry = NULL;
315 while (entry = av_dict_get(bsf_options, "", NULL, AV_DICT_IGNORE_SUFFIX)) {
316 const char *spec = entry->key;
317 if (*spec) {
318 if (strspn(spec, slave_bsfs_spec_sep) != 1) {
319 av_log(avf, AV_LOG_ERROR,
320 "Specifier separator in '%s' is '%c', but only characters '%s' "
321 "are allowed\n", entry->key, *spec, slave_bsfs_spec_sep);
322 ret = AVERROR(EINVAL);
323 goto end;
324 }
325 spec++; /* consume separator */
326 }
327
328 for (i = 0; i < avf2->nb_streams; i++) {
329 ret = avformat_match_stream_specifier(avf2, avf2->streams[i], spec);
330 if (ret < 0) {
331 av_log(avf, AV_LOG_ERROR,
332 "Invalid stream specifier '%s' in bsfs option '%s' for slave "
333 "output '%s'\n", spec, entry->key, filename);
334 goto end;
335 }
336
337 if (ret > 0) {
338 av_log(avf, AV_LOG_DEBUG, "spec:%s bsfs:%s matches stream %d of slave "
339 "output '%s'\n", spec, entry->value, i, filename);
340 if (tee_slave->bsfs[i]) {
341 av_log(avf, AV_LOG_WARNING,
342 "Duplicate bsfs specification associated to stream %d of slave "
343 "output '%s', filters will be ignored\n", i, filename);
344 continue;
345 }
346 ret = av_bsf_list_parse_str(entry->value, &tee_slave->bsfs[i]);
347 if (ret < 0) {
348 av_log(avf, AV_LOG_ERROR,
349 "Error parsing bitstream filter sequence '%s' associated to "
350 "stream %d of slave output '%s'\n", entry->value, i, filename);
351 goto end;
352 }
353 }
354 }
355
356 av_dict_set(&bsf_options, entry->key, NULL, 0);
357 }
358
359 for (i = 0; i < avf->nb_streams; i++){
360 int target_stream = tee_slave->stream_map[i];
361 if (target_stream < 0)
362 continue;
363
364 if (!tee_slave->bsfs[target_stream]) {
365 /* Add pass-through bitstream filter */
366 ret = av_bsf_get_null_filter(&tee_slave->bsfs[target_stream]);
367 if (ret < 0) {
368 av_log(avf, AV_LOG_ERROR,
369 "Failed to create pass-through bitstream filter: %s\n",
370 av_err2str(ret));
371 goto end;
372 }
373 }
374
375 tee_slave->bsfs[target_stream]->time_base_in = avf->streams[i]->time_base;
376 ret = avcodec_parameters_copy(tee_slave->bsfs[target_stream]->par_in,
377 avf->streams[i]->codecpar);
378 if (ret < 0)
379 goto end;
380
381 ret = av_bsf_init(tee_slave->bsfs[target_stream]);
382 if (ret < 0) {
383 av_log(avf, AV_LOG_ERROR,
384 "Failed to initialize bitstream filter(s): %s\n",
385 av_err2str(ret));
386 goto end;
387 }
388 }
389
390 if (options) {
391 entry = NULL;
392 while ((entry = av_dict_get(options, "", entry, AV_DICT_IGNORE_SUFFIX)))
393 av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
394 ret = AVERROR_OPTION_NOT_FOUND;
395 goto end;
396 }
397
398 end:
399 av_free(format);
400 av_free(select);
401 av_dict_free(&options);
402 av_dict_free(&bsf_options);
403 av_freep(&tmp_select);
404 return ret;
405 }
406
407 static void log_slave(TeeSlave *slave, void *log_ctx, int log_level)
408 {
409 int i;
410 av_log(log_ctx, log_level, "filename:'%s' format:%s\n",
411 slave->avf->url, slave->avf->oformat->name);
412 for (i = 0; i < slave->avf->nb_streams; i++) {
413 AVStream *st = slave->avf->streams[i];
414 AVBSFContext *bsf = slave->bsfs[i];
415 const char *bsf_name;
416
417 av_log(log_ctx, log_level, " stream:%d codec:%s type:%s",
418 i, avcodec_get_name(st->codecpar->codec_id),
419 av_get_media_type_string(st->codecpar->codec_type));
420
421 bsf_name = bsf->filter->priv_class ?
422 bsf->filter->priv_class->item_name(bsf) : bsf->filter->name;
423 av_log(log_ctx, log_level, " bsfs: %s\n", bsf_name);
424 }
425 }
426
427 static int tee_process_slave_failure(AVFormatContext *avf, unsigned slave_idx, int err_n)
428 {
429 TeeContext *tee = avf->priv_data;
430 TeeSlave *tee_slave = &tee->slaves[slave_idx];
431
432 tee->nb_alive--;
433
434 close_slave(tee_slave);
435
436 if (!tee->nb_alive) {
437 av_log(avf, AV_LOG_ERROR, "All tee outputs failed.\n");
438 return err_n;
439 } else if (tee_slave->on_fail == ON_SLAVE_FAILURE_ABORT) {
440 av_log(avf, AV_LOG_ERROR, "Slave muxer #%u failed, aborting.\n", slave_idx);
441 return err_n;
442 } else {
443 av_log(avf, AV_LOG_ERROR, "Slave muxer #%u failed: %s, continuing with %u/%u slaves.\n",
444 slave_idx, av_err2str(err_n), tee->nb_alive, tee->nb_slaves);
445 return 0;
446 }
447 }
448
449 static int tee_write_header(AVFormatContext *avf)
450 {
451 TeeContext *tee = avf->priv_data;
452 unsigned nb_slaves = 0, i;
453 const char *filename = avf->url;
454 char **slaves = NULL;
455 int ret;
456
457 while (*filename) {
458 char *slave = av_get_token(&filename, slave_delim);
459 if (!slave) {
460 ret = AVERROR(ENOMEM);
461 goto fail;
462 }
463 ret = av_dynarray_add_nofree(&slaves, &nb_slaves, slave);
464 if (ret < 0) {
465 av_free(slave);
466 goto fail;
467 }
468 if (strspn(filename, slave_delim))
469 filename++;
470 }
471
472 if (!FF_ALLOCZ_TYPED_ARRAY(tee->slaves, nb_slaves)) {
473 ret = AVERROR(ENOMEM);
474 goto fail;
475 }
476 tee->nb_slaves = tee->nb_alive = nb_slaves;
477
478 for (i = 0; i < nb_slaves; i++) {
479
480 tee->slaves[i].use_fifo = tee->use_fifo;
481 ret = av_dict_copy(&tee->slaves[i].fifo_options, tee->fifo_options, 0);
482 if (ret < 0)
483 goto fail;
484
485 if ((ret = open_slave(avf, slaves[i], &tee->slaves[i])) < 0) {
486 ret = tee_process_slave_failure(avf, i, ret);
487 if (ret < 0)
488 goto fail;
489 } else {
490 log_slave(&tee->slaves[i], avf, AV_LOG_VERBOSE);
491 }
492 av_freep(&slaves[i]);
493 }
494
495 for (i = 0; i < avf->nb_streams; i++) {
496 int j, mapped = 0;
497 for (j = 0; j < tee->nb_slaves; j++)
498 if (tee->slaves[j].avf)
499 mapped += tee->slaves[j].stream_map[i] >= 0;
500 if (!mapped)
501 av_log(avf, AV_LOG_WARNING, "Input stream #%d is not mapped "
502 "to any slave.\n", i);
503 }
504 av_free(slaves);
505 return 0;
506
507 fail:
508 for (i = 0; i < nb_slaves; i++)
509 av_freep(&slaves[i]);
510 close_slaves(avf);
511 av_free(slaves);
512 return ret;
513 }
514
515 static int tee_write_trailer(AVFormatContext *avf)
516 {
517 TeeContext *tee = avf->priv_data;
518 int ret_all = 0, ret;
519 unsigned i;
520
521 for (i = 0; i < tee->nb_slaves; i++) {
522 if ((ret = close_slave(&tee->slaves[i])) < 0) {
523 ret = tee_process_slave_failure(avf, i, ret);
524 if (!ret_all && ret < 0)
525 ret_all = ret;
526 }
527 }
528 av_freep(&tee->slaves);
529 return ret_all;
530 }
531
532 static int tee_write_packet(AVFormatContext *avf, AVPacket *pkt)
533 {
534 TeeContext *tee = avf->priv_data;
535 AVFormatContext *avf2;
536 AVBSFContext *bsfs;
537 AVPacket *const pkt2 = ffformatcontext(avf)->pkt;
538 int ret_all = 0, ret;
539 unsigned i, s;
540 int s2;
541
542 for (i = 0; i < tee->nb_slaves; i++) {
543 if (!(avf2 = tee->slaves[i].avf))
544 continue;
545
546 /* Flush slave if pkt is NULL*/
547 if (!pkt) {
548 ret = av_interleaved_write_frame(avf2, NULL);
549 if (ret < 0) {
550 ret = tee_process_slave_failure(avf, i, ret);
551 if (!ret_all && ret < 0)
552 ret_all = ret;
553 }
554 continue;
555 }
556
557 s = pkt->stream_index;
558 s2 = tee->slaves[i].stream_map[s];
559 if (s2 < 0)
560 continue;
561
562 if ((ret = av_packet_ref(pkt2, pkt)) < 0) {
563 if (!ret_all)
564 ret_all = ret;
565 continue;
566 }
567 bsfs = tee->slaves[i].bsfs[s2];
568 pkt2->stream_index = s2;
569
570 ret = av_bsf_send_packet(bsfs, pkt2);
571 if (ret < 0) {
572 av_packet_unref(pkt2);
573 av_log(avf, AV_LOG_ERROR, "Error while sending packet to bitstream filter: %s\n",
574 av_err2str(ret));
575 ret = tee_process_slave_failure(avf, i, ret);
576 if (!ret_all && ret < 0)
577 ret_all = ret;
578 }
579
580 while(1) {
581 ret = av_bsf_receive_packet(bsfs, pkt2);
582 if (ret == AVERROR(EAGAIN)) {
583 ret = 0;
584 break;
585 } else if (ret < 0) {
586 break;
587 }
588
589 av_packet_rescale_ts(pkt2, bsfs->time_base_out,
590 avf2->streams[s2]->time_base);
591 ret = av_interleaved_write_frame(avf2, pkt2);
592 if (ret < 0)
593 break;
594 };
595
596 if (ret < 0) {
597 ret = tee_process_slave_failure(avf, i, ret);
598 if (!ret_all && ret < 0)
599 ret_all = ret;
600 }
601 }
602 return ret_all;
603 }
604
605 const FFOutputFormat ff_tee_muxer = {
606 .p.name = "tee",
607 .p.long_name = NULL_IF_CONFIG_SMALL("Multiple muxer tee"),
608 .priv_data_size = sizeof(TeeContext),
609 .write_header = tee_write_header,
610 .write_trailer = tee_write_trailer,
611 .write_packet = tee_write_packet,
612 .p.priv_class = &tee_muxer_class,
613 #if FF_API_ALLOW_FLUSH
614 .p.flags = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE,
615 #else
616 .p.flags = AVFMT_NOFILE | AVFMT_TS_NEGATIVE,
617 #endif
618 .flags_internal = FF_OFMT_FLAG_ALLOW_FLUSH,
619 };
620