FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/fftools/ffmpeg_sched.c
Date: 2026-01-16 07:34:38
Exec Total Coverage
Lines: 1154 1313 87.9%
Functions: 70 72 97.2%
Branches: 628 850 73.9%

Line Branch Exec Source
1 /*
2 * Inter-thread scheduling/synchronization.
3 * Copyright (c) 2023 Anton Khirnov
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31
32 #include "libavcodec/packet.h"
33
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
40 #include "libavutil/threadmessage.h"
41 #include "libavutil/time.h"
42
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
46
47 enum QueueType {
48 QUEUE_PACKETS,
49 QUEUE_FRAMES,
50 };
51
52 typedef struct SchWaiter {
53 pthread_mutex_t lock;
54 pthread_cond_t cond;
55 atomic_int choked;
56
57 // the following are internal state of schedule_update_locked() and must not
58 // be accessed outside of it
59 int choked_prev;
60 int choked_next;
61 } SchWaiter;
62
63 typedef struct SchTask {
64 Scheduler *parent;
65 SchedulerNode node;
66
67 SchThreadFunc func;
68 void *func_arg;
69
70 pthread_t thread;
71 int thread_running;
72 } SchTask;
73
74 typedef struct SchDecOutput {
75 SchedulerNode *dst;
76 uint8_t *dst_finished;
77 unsigned nb_dst;
78 } SchDecOutput;
79
80 typedef struct SchDec {
81 const AVClass *class;
82
83 SchedulerNode src;
84
85 SchDecOutput *outputs;
86 unsigned nb_outputs;
87
88 SchTask task;
89 // Queue for receiving input packets, one stream.
90 ThreadQueue *queue;
91
92 // Queue for sending post-flush end timestamps back to the source
93 AVThreadMessageQueue *queue_end_ts;
94 int expect_end_ts;
95
96 // temporary storage used by sch_dec_send()
97 AVFrame *send_frame;
98 } SchDec;
99
100 typedef struct SchSyncQueue {
101 SyncQueue *sq;
102 AVFrame *frame;
103 pthread_mutex_t lock;
104
105 unsigned *enc_idx;
106 unsigned nb_enc_idx;
107 } SchSyncQueue;
108
109 typedef struct SchEnc {
110 const AVClass *class;
111
112 SchedulerNode src;
113 SchedulerNode *dst;
114 uint8_t *dst_finished;
115 unsigned nb_dst;
116
117 // [0] - index of the sync queue in Scheduler.sq_enc,
118 // [1] - index of this encoder in the sq
119 int sq_idx[2];
120
121 /* Opening encoders is somewhat nontrivial due to their interaction with
122 * sync queues, which are (among other things) responsible for maintaining
123 * constant audio frame size, when it is required by the encoder.
124 *
125 * Opening the encoder requires stream parameters, obtained from the first
126 * frame. However, that frame cannot be properly chunked by the sync queue
127 * without knowing the required frame size, which is only available after
128 * opening the encoder.
129 *
130 * This apparent circular dependency is resolved in the following way:
131 * - the caller creating the encoder gives us a callback which opens the
132 * encoder and returns the required frame size (if any)
133 * - when the first frame is sent to the encoder, the sending thread
134 * - calls this callback, opening the encoder
135 * - passes the returned frame size to the sync queue
136 */
137 int (*open_cb)(void *opaque, const AVFrame *frame);
138 int opened;
139
140 SchTask task;
141 // Queue for receiving input frames, one stream.
142 ThreadQueue *queue;
143 // tq_send() to queue returned EOF
144 int in_finished;
145
146 // temporary storage used by sch_enc_send()
147 AVPacket *send_pkt;
148 } SchEnc;
149
150 typedef struct SchDemuxStream {
151 SchedulerNode *dst;
152 uint8_t *dst_finished;
153 unsigned nb_dst;
154 } SchDemuxStream;
155
156 typedef struct SchDemux {
157 const AVClass *class;
158
159 SchDemuxStream *streams;
160 unsigned nb_streams;
161
162 SchTask task;
163 SchWaiter waiter;
164
165 // temporary storage used by sch_demux_send()
166 AVPacket *send_pkt;
167
168 // protected by schedule_lock
169 int task_exited;
170 } SchDemux;
171
172 typedef struct PreMuxQueue {
173 /**
174 * Queue for buffering the packets before the muxer task can be started.
175 */
176 AVFifo *fifo;
177 /**
178 * Maximum number of packets in fifo.
179 */
180 int max_packets;
181 /*
182 * The size of the AVPackets' buffers in queue.
183 * Updated when a packet is either pushed or pulled from the queue.
184 */
185 size_t data_size;
186 /* Threshold after which max_packets will be in effect */
187 size_t data_threshold;
188 } PreMuxQueue;
189
190 typedef struct SchMuxStream {
191 SchedulerNode src;
192
193 unsigned *sub_heartbeat_dst;
194 unsigned nb_sub_heartbeat_dst;
195
196 PreMuxQueue pre_mux_queue;
197
198 // an EOF was generated while flushing the pre-mux queue
199 int init_eof;
200
201 ////////////////////////////////////////////////////////////
202 // The following are protected by Scheduler.schedule_lock //
203
204 /* dts+duration of the last packet sent to this stream
205 in AV_TIME_BASE_Q */
206 int64_t last_dts;
207 // this stream no longer accepts input
208 int source_finished;
209 ////////////////////////////////////////////////////////////
210 } SchMuxStream;
211
212 typedef struct SchMux {
213 const AVClass *class;
214
215 SchMuxStream *streams;
216 unsigned nb_streams;
217 unsigned nb_streams_ready;
218
219 int (*init)(void *arg);
220
221 SchTask task;
222 /**
223 * Set to 1 after starting the muxer task and flushing the
224 * pre-muxing queues.
225 * Set either before any tasks have started, or with
226 * Scheduler.mux_ready_lock held.
227 */
228 atomic_int mux_started;
229 ThreadQueue *queue;
230 unsigned queue_size;
231
232 AVPacket *sub_heartbeat_pkt;
233 } SchMux;
234
235 typedef struct SchFilterIn {
236 SchedulerNode src;
237 int send_finished;
238 int receive_finished;
239 } SchFilterIn;
240
241 typedef struct SchFilterOut {
242 SchedulerNode dst;
243 } SchFilterOut;
244
245 typedef struct SchFilterGraph {
246 const AVClass *class;
247
248 SchFilterIn *inputs;
249 unsigned nb_inputs;
250 atomic_uint nb_inputs_finished_send;
251 unsigned nb_inputs_finished_receive;
252
253 SchFilterOut *outputs;
254 unsigned nb_outputs;
255
256 SchTask task;
257 // input queue, nb_inputs+1 streams
258 // last stream is control
259 ThreadQueue *queue;
260 SchWaiter waiter;
261
262 // protected by schedule_lock
263 unsigned best_input;
264 int task_exited;
265 } SchFilterGraph;
266
267 enum SchedulerState {
268 SCH_STATE_UNINIT,
269 SCH_STATE_STARTED,
270 SCH_STATE_STOPPED,
271 };
272
273 struct Scheduler {
274 const AVClass *class;
275
276 SchDemux *demux;
277 unsigned nb_demux;
278
279 SchMux *mux;
280 unsigned nb_mux;
281
282 unsigned nb_mux_ready;
283 pthread_mutex_t mux_ready_lock;
284
285 unsigned nb_mux_done;
286 unsigned task_failed;
287 pthread_mutex_t finish_lock;
288 pthread_cond_t finish_cond;
289
290
291 SchDec *dec;
292 unsigned nb_dec;
293
294 SchEnc *enc;
295 unsigned nb_enc;
296
297 SchSyncQueue *sq_enc;
298 unsigned nb_sq_enc;
299
300 SchFilterGraph *filters;
301 unsigned nb_filters;
302
303 char *sdp_filename;
304 int sdp_auto;
305
306 enum SchedulerState state;
307 atomic_int terminate;
308
309 pthread_mutex_t schedule_lock;
310
311 atomic_int_least64_t last_dts;
312 };
313
314 /**
315 * Wait until this task is allowed to proceed.
316 *
317 * @retval 0 the caller should proceed
318 * @retval 1 the caller should terminate
319 */
320 500705 static int waiter_wait(Scheduler *sch, SchWaiter *w)
321 {
322 int terminate;
323
324
2/2
✓ Branch 0 taken 497890 times.
✓ Branch 1 taken 2815 times.
500705 if (!atomic_load(&w->choked))
325 497890 return 0;
326
327 2815 pthread_mutex_lock(&w->lock);
328
329
4/4
✓ Branch 0 taken 2837 times.
✓ Branch 1 taken 2419 times.
✓ Branch 2 taken 2441 times.
✓ Branch 3 taken 396 times.
5256 while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
330 2441 pthread_cond_wait(&w->cond, &w->lock);
331
332 2815 terminate = atomic_load(&sch->terminate);
333
334 2815 pthread_mutex_unlock(&w->lock);
335
336 2815 return terminate;
337 }
338
339 58825 static void waiter_set(SchWaiter *w, int choked)
340 {
341 58825 pthread_mutex_lock(&w->lock);
342
343 58825 atomic_store(&w->choked, choked);
344 58825 pthread_cond_signal(&w->cond);
345
346 58825 pthread_mutex_unlock(&w->lock);
347 58825 }
348
349 15444 static int waiter_init(SchWaiter *w)
350 {
351 int ret;
352
353 15444 atomic_init(&w->choked, 0);
354
355 15444 ret = pthread_mutex_init(&w->lock, NULL);
356
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15444 times.
15444 if (ret)
357 return AVERROR(ret);
358
359 15444 ret = pthread_cond_init(&w->cond, NULL);
360
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15444 times.
15444 if (ret)
361 return AVERROR(ret);
362
363 15444 return 0;
364 }
365
366 15444 static void waiter_uninit(SchWaiter *w)
367 {
368 15444 pthread_mutex_destroy(&w->lock);
369 15444 pthread_cond_destroy(&w->cond);
370 15444 }
371
372 31711 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
373 enum QueueType type)
374 {
375 ThreadQueue *tq;
376
377
1/2
✓ Branch 0 taken 31711 times.
✗ Branch 1 not taken.
31711 if (queue_size <= 0) {
378
2/2
✓ Branch 0 taken 16291 times.
✓ Branch 1 taken 15420 times.
31711 if (type == QUEUE_FRAMES)
379 16291 queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
380 else
381 15420 queue_size = DEFAULT_PACKET_THREAD_QUEUE_SIZE;
382 }
383
384
2/2
✓ Branch 0 taken 16291 times.
✓ Branch 1 taken 15420 times.
31711 if (type == QUEUE_FRAMES) {
385 // This queue length is used in the decoder code to ensure that
386 // there are enough entries in fixed-size frame pools to account
387 // for frames held in queues inside the ffmpeg utility. If this
388 // can ever dynamically change then the corresponding decode
389 // code needs to be updated as well.
390
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 16291 times.
16291 av_assert0(queue_size <= DEFAULT_FRAME_THREAD_QUEUE_SIZE);
391 }
392
393 31711 tq = tq_alloc(nb_streams, queue_size,
394 (type == QUEUE_PACKETS) ? THREAD_QUEUE_PACKETS : THREAD_QUEUE_FRAMES);
395
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31711 times.
31711 if (!tq)
396 return AVERROR(ENOMEM);
397
398 31711 *ptq = tq;
399 31711 return 0;
400 }
401
402 static void *task_wrapper(void *arg);
403
404 39081 static int task_start(SchTask *task)
405 {
406 int ret;
407
408
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 39078 times.
39081 if (!task->parent)
409 3 return 0;
410
411 39078 av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
412
413
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 39078 times.
39078 av_assert0(!task->thread_running);
414
415 39078 ret = pthread_create(&task->thread, NULL, task_wrapper, task);
416
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 39078 times.
39078 if (ret) {
417 av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
418 strerror(ret));
419 return AVERROR(ret);
420 }
421
422 39078 task->thread_running = 1;
423 39078 return 0;
424 }
425
426 39096 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
427 SchThreadFunc func, void *func_arg)
428 {
429 39096 task->parent = sch;
430
431 39096 task->node.type = type;
432 39096 task->node.idx = idx;
433
434 39096 task->func = func;
435 39096 task->func_arg = func_arg;
436 39096 }
437
438 572090 static int64_t trailing_dts(const Scheduler *sch, int count_finished)
439 {
440 572090 int64_t min_dts = INT64_MAX;
441
442
2/2
✓ Branch 0 taken 573157 times.
✓ Branch 1 taken 545719 times.
1118876 for (unsigned i = 0; i < sch->nb_mux; i++) {
443 573157 const SchMux *mux = &sch->mux[i];
444
445
2/2
✓ Branch 0 taken 637782 times.
✓ Branch 1 taken 546786 times.
1184568 for (unsigned j = 0; j < mux->nb_streams; j++) {
446 637782 const SchMuxStream *ms = &mux->streams[j];
447
448
4/4
✓ Branch 0 taken 45622 times.
✓ Branch 1 taken 592160 times.
✓ Branch 2 taken 36680 times.
✓ Branch 3 taken 8942 times.
637782 if (ms->source_finished && !count_finished)
449 36680 continue;
450
2/2
✓ Branch 0 taken 26371 times.
✓ Branch 1 taken 574731 times.
601102 if (ms->last_dts == AV_NOPTS_VALUE)
451 26371 return AV_NOPTS_VALUE;
452
453 574731 min_dts = FFMIN(min_dts, ms->last_dts);
454 }
455 }
456
457
2/2
✓ Branch 0 taken 517891 times.
✓ Branch 1 taken 27828 times.
545719 return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
458 }
459
460 3 void sch_remove_filtergraph(Scheduler *sch, int idx)
461 {
462 3 SchFilterGraph *fg = &sch->filters[idx];
463
464
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 av_assert0(!fg->task.thread_running);
465 3 memset(&fg->task, 0, sizeof(fg->task));
466
467 3 tq_free(&fg->queue);
468
469 3 av_freep(&fg->inputs);
470 3 fg->nb_inputs = 0;
471 3 av_freep(&fg->outputs);
472 3 fg->nb_outputs = 0;
473
474 3 fg->task_exited = 1;
475 3 }
476
477 8472 void sch_free(Scheduler **psch)
478 {
479 8472 Scheduler *sch = *psch;
480
481
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8472 times.
8472 if (!sch)
482 return;
483
484 8472 sch_stop(sch, NULL);
485
486
2/2
✓ Branch 0 taken 7385 times.
✓ Branch 1 taken 8472 times.
15857 for (unsigned i = 0; i < sch->nb_demux; i++) {
487 7385 SchDemux *d = &sch->demux[i];
488
489
2/2
✓ Branch 0 taken 7665 times.
✓ Branch 1 taken 7385 times.
15050 for (unsigned j = 0; j < d->nb_streams; j++) {
490 7665 SchDemuxStream *ds = &d->streams[j];
491 7665 av_freep(&ds->dst);
492 7665 av_freep(&ds->dst_finished);
493 }
494 7385 av_freep(&d->streams);
495
496 7385 av_packet_free(&d->send_pkt);
497
498 7385 waiter_uninit(&d->waiter);
499 }
500 8472 av_freep(&sch->demux);
501
502
2/2
✓ Branch 0 taken 8474 times.
✓ Branch 1 taken 8472 times.
16946 for (unsigned i = 0; i < sch->nb_mux; i++) {
503 8474 SchMux *mux = &sch->mux[i];
504
505
2/2
✓ Branch 0 taken 8978 times.
✓ Branch 1 taken 8474 times.
17452 for (unsigned j = 0; j < mux->nb_streams; j++) {
506 8978 SchMuxStream *ms = &mux->streams[j];
507
508
1/2
✓ Branch 0 taken 8978 times.
✗ Branch 1 not taken.
8978 if (ms->pre_mux_queue.fifo) {
509 AVPacket *pkt;
510
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 8978 times.
8978 while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
511 av_packet_free(&pkt);
512 8978 av_fifo_freep2(&ms->pre_mux_queue.fifo);
513 }
514
515 8978 av_freep(&ms->sub_heartbeat_dst);
516 }
517 8474 av_freep(&mux->streams);
518
519 8474 av_packet_free(&mux->sub_heartbeat_pkt);
520
521 8474 tq_free(&mux->queue);
522 }
523 8472 av_freep(&sch->mux);
524
525
2/2
✓ Branch 0 taken 6946 times.
✓ Branch 1 taken 8472 times.
15418 for (unsigned i = 0; i < sch->nb_dec; i++) {
526 6946 SchDec *dec = &sch->dec[i];
527
528 6946 tq_free(&dec->queue);
529
530 6946 av_thread_message_queue_free(&dec->queue_end_ts);
531
532
2/2
✓ Branch 0 taken 6952 times.
✓ Branch 1 taken 6946 times.
13898 for (unsigned j = 0; j < dec->nb_outputs; j++) {
533 6952 SchDecOutput *o = &dec->outputs[j];
534
535 6952 av_freep(&o->dst);
536 6952 av_freep(&o->dst_finished);
537 }
538
539 6946 av_freep(&dec->outputs);
540
541 6946 av_frame_free(&dec->send_frame);
542 }
543 8472 av_freep(&sch->dec);
544
545
2/2
✓ Branch 0 taken 8232 times.
✓ Branch 1 taken 8472 times.
16704 for (unsigned i = 0; i < sch->nb_enc; i++) {
546 8232 SchEnc *enc = &sch->enc[i];
547
548 8232 tq_free(&enc->queue);
549
550 8232 av_packet_free(&enc->send_pkt);
551
552 8232 av_freep(&enc->dst);
553 8232 av_freep(&enc->dst_finished);
554 }
555 8472 av_freep(&sch->enc);
556
557
2/2
✓ Branch 0 taken 3189 times.
✓ Branch 1 taken 8472 times.
11661 for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
558 3189 SchSyncQueue *sq = &sch->sq_enc[i];
559 3189 sq_free(&sq->sq);
560 3189 av_frame_free(&sq->frame);
561 3189 pthread_mutex_destroy(&sq->lock);
562 3189 av_freep(&sq->enc_idx);
563 }
564 8472 av_freep(&sch->sq_enc);
565
566
2/2
✓ Branch 0 taken 8059 times.
✓ Branch 1 taken 8472 times.
16531 for (unsigned i = 0; i < sch->nb_filters; i++) {
567 8059 SchFilterGraph *fg = &sch->filters[i];
568
569 8059 tq_free(&fg->queue);
570
571 8059 av_freep(&fg->inputs);
572 8059 av_freep(&fg->outputs);
573
574 8059 waiter_uninit(&fg->waiter);
575 }
576 8472 av_freep(&sch->filters);
577
578 8472 av_freep(&sch->sdp_filename);
579
580 8472 pthread_mutex_destroy(&sch->schedule_lock);
581
582 8472 pthread_mutex_destroy(&sch->mux_ready_lock);
583
584 8472 pthread_mutex_destroy(&sch->finish_lock);
585 8472 pthread_cond_destroy(&sch->finish_cond);
586
587 8472 av_freep(psch);
588 }
589
590 static const AVClass scheduler_class = {
591 .class_name = "Scheduler",
592 .version = LIBAVUTIL_VERSION_INT,
593 };
594
595 8472 Scheduler *sch_alloc(void)
596 {
597 Scheduler *sch;
598 int ret;
599
600 8472 sch = av_mallocz(sizeof(*sch));
601
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8472 times.
8472 if (!sch)
602 return NULL;
603
604 8472 sch->class = &scheduler_class;
605 8472 sch->sdp_auto = 1;
606
607 8472 ret = pthread_mutex_init(&sch->schedule_lock, NULL);
608
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8472 times.
8472 if (ret)
609 goto fail;
610
611 8472 ret = pthread_mutex_init(&sch->mux_ready_lock, NULL);
612
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8472 times.
8472 if (ret)
613 goto fail;
614
615 8472 ret = pthread_mutex_init(&sch->finish_lock, NULL);
616
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8472 times.
8472 if (ret)
617 goto fail;
618
619 8472 ret = pthread_cond_init(&sch->finish_cond, NULL);
620
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8472 times.
8472 if (ret)
621 goto fail;
622
623 8472 return sch;
624 fail:
625 sch_free(&sch);
626 return NULL;
627 }
628
629 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
630 {
631 av_freep(&sch->sdp_filename);
632 sch->sdp_filename = av_strdup(sdp_filename);
633 return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
634 }
635
636 static const AVClass sch_mux_class = {
637 .class_name = "SchMux",
638 .version = LIBAVUTIL_VERSION_INT,
639 .parent_log_context_offset = offsetof(SchMux, task.func_arg),
640 };
641
642 8474 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
643 void *arg, int sdp_auto, unsigned thread_queue_size)
644 {
645 8474 const unsigned idx = sch->nb_mux;
646
647 SchMux *mux;
648 int ret;
649
650 8474 ret = GROW_ARRAY(sch->mux, sch->nb_mux);
651
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8474 times.
8474 if (ret < 0)
652 return ret;
653
654 8474 mux = &sch->mux[idx];
655 8474 mux->class = &sch_mux_class;
656 8474 mux->init = init;
657 8474 mux->queue_size = thread_queue_size;
658
659 8474 task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
660
661 8474 sch->sdp_auto &= sdp_auto;
662
663 8474 return idx;
664 }
665
666 8978 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
667 {
668 SchMux *mux;
669 SchMuxStream *ms;
670 unsigned stream_idx;
671 int ret;
672
673
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8978 times.
8978 av_assert0(mux_idx < sch->nb_mux);
674 8978 mux = &sch->mux[mux_idx];
675
676 8978 ret = GROW_ARRAY(mux->streams, mux->nb_streams);
677
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8978 times.
8978 if (ret < 0)
678 return ret;
679 8978 stream_idx = mux->nb_streams - 1;
680
681 8978 ms = &mux->streams[stream_idx];
682
683 8978 ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
684
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8978 times.
8978 if (!ms->pre_mux_queue.fifo)
685 return AVERROR(ENOMEM);
686
687 8978 ms->last_dts = AV_NOPTS_VALUE;
688
689 8978 return stream_idx;
690 }
691
692 static const AVClass sch_demux_class = {
693 .class_name = "SchDemux",
694 .version = LIBAVUTIL_VERSION_INT,
695 .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
696 };
697
698 7385 int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
699 {
700 7385 const unsigned idx = sch->nb_demux;
701
702 SchDemux *d;
703 int ret;
704
705 7385 ret = GROW_ARRAY(sch->demux, sch->nb_demux);
706
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7385 times.
7385 if (ret < 0)
707 return ret;
708
709 7385 d = &sch->demux[idx];
710
711 7385 task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
712
713 7385 d->class = &sch_demux_class;
714 7385 d->send_pkt = av_packet_alloc();
715
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7385 times.
7385 if (!d->send_pkt)
716 return AVERROR(ENOMEM);
717
718 7385 ret = waiter_init(&d->waiter);
719
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7385 times.
7385 if (ret < 0)
720 return ret;
721
722 7385 return idx;
723 }
724
725 7665 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
726 {
727 SchDemux *d;
728 int ret;
729
730
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7665 times.
7665 av_assert0(demux_idx < sch->nb_demux);
731 7665 d = &sch->demux[demux_idx];
732
733 7665 ret = GROW_ARRAY(d->streams, d->nb_streams);
734
1/2
✓ Branch 0 taken 7665 times.
✗ Branch 1 not taken.
7665 return ret < 0 ? ret : d->nb_streams - 1;
735 }
736
737 6952 int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
738 {
739 SchDec *dec;
740 int ret;
741
742
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6952 times.
6952 av_assert0(dec_idx < sch->nb_dec);
743 6952 dec = &sch->dec[dec_idx];
744
745 6952 ret = GROW_ARRAY(dec->outputs, dec->nb_outputs);
746
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6952 times.
6952 if (ret < 0)
747 return ret;
748
749 6952 return dec->nb_outputs - 1;
750 }
751
752 static const AVClass sch_dec_class = {
753 .class_name = "SchDec",
754 .version = LIBAVUTIL_VERSION_INT,
755 .parent_log_context_offset = offsetof(SchDec, task.func_arg),
756 };
757
758 6946 int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
759 {
760 6946 const unsigned idx = sch->nb_dec;
761
762 SchDec *dec;
763 int ret;
764
765 6946 ret = GROW_ARRAY(sch->dec, sch->nb_dec);
766
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6946 times.
6946 if (ret < 0)
767 return ret;
768
769 6946 dec = &sch->dec[idx];
770
771 6946 task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
772
773 6946 dec->class = &sch_dec_class;
774 6946 dec->send_frame = av_frame_alloc();
775
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6946 times.
6946 if (!dec->send_frame)
776 return AVERROR(ENOMEM);
777
778 6946 ret = sch_add_dec_output(sch, idx);
779
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6946 times.
6946 if (ret < 0)
780 return ret;
781
782 6946 ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
783
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6946 times.
6946 if (ret < 0)
784 return ret;
785
786
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6945 times.
6946 if (send_end_ts) {
787 1 ret = av_thread_message_queue_alloc(&dec->queue_end_ts, 1, sizeof(Timestamp));
788
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ret < 0)
789 return ret;
790 }
791
792 6946 return idx;
793 }
794
795 static const AVClass sch_enc_class = {
796 .class_name = "SchEnc",
797 .version = LIBAVUTIL_VERSION_INT,
798 .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
799 };
800
801 8232 int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
802 int (*open_cb)(void *opaque, const AVFrame *frame))
803 {
804 8232 const unsigned idx = sch->nb_enc;
805
806 SchEnc *enc;
807 int ret;
808
809 8232 ret = GROW_ARRAY(sch->enc, sch->nb_enc);
810
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8232 times.
8232 if (ret < 0)
811 return ret;
812
813 8232 enc = &sch->enc[idx];
814
815 8232 enc->class = &sch_enc_class;
816 8232 enc->open_cb = open_cb;
817 8232 enc->sq_idx[0] = -1;
818 8232 enc->sq_idx[1] = -1;
819
820 8232 task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
821
822 8232 enc->send_pkt = av_packet_alloc();
823
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8232 times.
8232 if (!enc->send_pkt)
824 return AVERROR(ENOMEM);
825
826 8232 ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
827
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8232 times.
8232 if (ret < 0)
828 return ret;
829
830 8232 return idx;
831 }
832
833 static const AVClass sch_fg_class = {
834 .class_name = "SchFilterGraph",
835 .version = LIBAVUTIL_VERSION_INT,
836 .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
837 };
838
839 8059 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
840 SchThreadFunc func, void *ctx)
841 {
842 8059 const unsigned idx = sch->nb_filters;
843
844 SchFilterGraph *fg;
845 int ret;
846
847 8059 ret = GROW_ARRAY(sch->filters, sch->nb_filters);
848
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8059 times.
8059 if (ret < 0)
849 return ret;
850 8059 fg = &sch->filters[idx];
851
852 8059 fg->class = &sch_fg_class;
853
854 8059 task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
855
856
2/2
✓ Branch 0 taken 6875 times.
✓ Branch 1 taken 1184 times.
8059 if (nb_inputs) {
857 6875 fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
858
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6875 times.
6875 if (!fg->inputs)
859 return AVERROR(ENOMEM);
860 6875 fg->nb_inputs = nb_inputs;
861 }
862
863
1/2
✓ Branch 0 taken 8059 times.
✗ Branch 1 not taken.
8059 if (nb_outputs) {
864 8059 fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
865
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8059 times.
8059 if (!fg->outputs)
866 return AVERROR(ENOMEM);
867 8059 fg->nb_outputs = nb_outputs;
868 }
869
870 8059 ret = waiter_init(&fg->waiter);
871
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8059 times.
8059 if (ret < 0)
872 return ret;
873
874 8059 ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
875
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8059 times.
8059 if (ret < 0)
876 return ret;
877
878 8059 return idx;
879 }
880
881 3189 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
882 {
883 SchSyncQueue *sq;
884 int ret;
885
886 3189 ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
887
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3189 times.
3189 if (ret < 0)
888 return ret;
889 3189 sq = &sch->sq_enc[sch->nb_sq_enc - 1];
890
891 3189 sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
892
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3189 times.
3189 if (!sq->sq)
893 return AVERROR(ENOMEM);
894
895 3189 sq->frame = av_frame_alloc();
896
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3189 times.
3189 if (!sq->frame)
897 return AVERROR(ENOMEM);
898
899 3189 ret = pthread_mutex_init(&sq->lock, NULL);
900
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3189 times.
3189 if (ret)
901 return AVERROR(ret);
902
903 3189 return sq - sch->sq_enc;
904 }
905
906 3254 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
907 int limiting, uint64_t max_frames)
908 {
909 SchSyncQueue *sq;
910 SchEnc *enc;
911 int ret;
912
913
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3254 times.
3254 av_assert0(sq_idx < sch->nb_sq_enc);
914 3254 sq = &sch->sq_enc[sq_idx];
915
916
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3254 times.
3254 av_assert0(enc_idx < sch->nb_enc);
917 3254 enc = &sch->enc[enc_idx];
918
919 3254 ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
920
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3254 times.
3254 if (ret < 0)
921 return ret;
922 3254 sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
923
924 3254 ret = sq_add_stream(sq->sq, limiting);
925
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3254 times.
3254 if (ret < 0)
926 return ret;
927
928 3254 enc->sq_idx[0] = sq_idx;
929 3254 enc->sq_idx[1] = ret;
930
931
2/2
✓ Branch 0 taken 3061 times.
✓ Branch 1 taken 193 times.
3254 if (max_frames != INT64_MAX)
932 3061 sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
933
934 3254 return 0;
935 }
936
937 31131 int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
938 {
939 int ret;
940
941
4/5
✓ Branch 0 taken 7691 times.
✓ Branch 1 taken 7016 times.
✓ Branch 2 taken 8191 times.
✓ Branch 3 taken 8233 times.
✗ Branch 4 not taken.
31131 switch (src.type) {
942 7691 case SCH_NODE_TYPE_DEMUX: {
943 SchDemuxStream *ds;
944
945
2/4
✓ Branch 0 taken 7691 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7691 times.
7691 av_assert0(src.idx < sch->nb_demux &&
946 src.idx_stream < sch->demux[src.idx].nb_streams);
947 7691 ds = &sch->demux[src.idx].streams[src.idx_stream];
948
949 7691 ret = GROW_ARRAY(ds->dst, ds->nb_dst);
950
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7691 times.
7691 if (ret < 0)
951 return ret;
952
953 7691 ds->dst[ds->nb_dst - 1] = dst;
954
955 // demuxed packets go to decoding or streamcopy
956
2/3
✓ Branch 0 taken 6945 times.
✓ Branch 1 taken 746 times.
✗ Branch 2 not taken.
7691 switch (dst.type) {
957 6945 case SCH_NODE_TYPE_DEC: {
958 SchDec *dec;
959
960
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6945 times.
6945 av_assert0(dst.idx < sch->nb_dec);
961 6945 dec = &sch->dec[dst.idx];
962
963
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6945 times.
6945 av_assert0(!dec->src.type);
964 6945 dec->src = src;
965 6945 break;
966 }
967 746 case SCH_NODE_TYPE_MUX: {
968 SchMuxStream *ms;
969
970
2/4
✓ Branch 0 taken 746 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 746 times.
746 av_assert0(dst.idx < sch->nb_mux &&
971 dst.idx_stream < sch->mux[dst.idx].nb_streams);
972 746 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
973
974
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 746 times.
746 av_assert0(!ms->src.type);
975 746 ms->src = src;
976
977 746 break;
978 }
979 default: av_assert0(0);
980 }
981
982 7691 break;
983 }
984 7016 case SCH_NODE_TYPE_DEC: {
985 SchDec *dec;
986 SchDecOutput *o;
987
988
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7016 times.
7016 av_assert0(src.idx < sch->nb_dec);
989 7016 dec = &sch->dec[src.idx];
990
991
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7016 times.
7016 av_assert0(src.idx_stream < dec->nb_outputs);
992 7016 o = &dec->outputs[src.idx_stream];
993
994 7016 ret = GROW_ARRAY(o->dst, o->nb_dst);
995
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7016 times.
7016 if (ret < 0)
996 return ret;
997
998 7016 o->dst[o->nb_dst - 1] = dst;
999
1000 // decoded frames go to filters or encoding
1001
2/3
✓ Branch 0 taken 6974 times.
✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
7016 switch (dst.type) {
1002 6974 case SCH_NODE_TYPE_FILTER_IN: {
1003 SchFilterIn *fi;
1004
1005
2/4
✓ Branch 0 taken 6974 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6974 times.
6974 av_assert0(dst.idx < sch->nb_filters &&
1006 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1007 6974 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1008
1009
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6974 times.
6974 av_assert0(!fi->src.type);
1010 6974 fi->src = src;
1011 6974 break;
1012 }
1013 42 case SCH_NODE_TYPE_ENC: {
1014 SchEnc *enc;
1015
1016
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42 times.
42 av_assert0(dst.idx < sch->nb_enc);
1017 42 enc = &sch->enc[dst.idx];
1018
1019
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42 times.
42 av_assert0(!enc->src.type);
1020 42 enc->src = src;
1021 42 break;
1022 }
1023 default: av_assert0(0);
1024 }
1025
1026 7016 break;
1027 }
1028 8191 case SCH_NODE_TYPE_FILTER_OUT: {
1029 SchFilterOut *fo;
1030
1031
2/4
✓ Branch 0 taken 8191 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 8191 times.
8191 av_assert0(src.idx < sch->nb_filters &&
1032 src.idx_stream < sch->filters[src.idx].nb_outputs);
1033 8191 fo = &sch->filters[src.idx].outputs[src.idx_stream];
1034
1035
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8191 times.
8191 av_assert0(!fo->dst.type);
1036 8191 fo->dst = dst;
1037
1038 // filtered frames go to encoding or another filtergraph
1039
2/3
✓ Branch 0 taken 8190 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
8191 switch (dst.type) {
1040 8190 case SCH_NODE_TYPE_ENC: {
1041 SchEnc *enc;
1042
1043
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8190 times.
8190 av_assert0(dst.idx < sch->nb_enc);
1044 8190 enc = &sch->enc[dst.idx];
1045
1046
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8190 times.
8190 av_assert0(!enc->src.type);
1047 8190 enc->src = src;
1048 8190 break;
1049 }
1050 1 case SCH_NODE_TYPE_FILTER_IN: {
1051 SchFilterIn *fi;
1052
1053
2/4
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
1 av_assert0(dst.idx < sch->nb_filters &&
1054 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1055 1 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1056
1057
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(!fi->src.type);
1058 1 fi->src = src;
1059 1 break;
1060 }
1061 default: av_assert0(0);
1062 }
1063
1064
1065 8191 break;
1066 }
1067 8233 case SCH_NODE_TYPE_ENC: {
1068 SchEnc *enc;
1069
1070
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8233 times.
8233 av_assert0(src.idx < sch->nb_enc);
1071 8233 enc = &sch->enc[src.idx];
1072
1073 8233 ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1074
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8233 times.
8233 if (ret < 0)
1075 return ret;
1076
1077 8233 enc->dst[enc->nb_dst - 1] = dst;
1078
1079 // encoding packets go to muxing or decoding
1080
2/3
✓ Branch 0 taken 8232 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
8233 switch (dst.type) {
1081 8232 case SCH_NODE_TYPE_MUX: {
1082 SchMuxStream *ms;
1083
1084
2/4
✓ Branch 0 taken 8232 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 8232 times.
8232 av_assert0(dst.idx < sch->nb_mux &&
1085 dst.idx_stream < sch->mux[dst.idx].nb_streams);
1086 8232 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1087
1088
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8232 times.
8232 av_assert0(!ms->src.type);
1089 8232 ms->src = src;
1090
1091 8232 break;
1092 }
1093 1 case SCH_NODE_TYPE_DEC: {
1094 SchDec *dec;
1095
1096
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(dst.idx < sch->nb_dec);
1097 1 dec = &sch->dec[dst.idx];
1098
1099
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(!dec->src.type);
1100 1 dec->src = src;
1101
1102 1 break;
1103 }
1104 default: av_assert0(0);
1105 }
1106
1107 8233 break;
1108 }
1109 default: av_assert0(0);
1110 }
1111
1112 31131 return 0;
1113 }
1114
1115 8474 static int mux_task_start(SchMux *mux)
1116 {
1117 8474 int ret = 0;
1118
1119 8474 ret = task_start(&mux->task);
1120
1/2
✓ Branch 0 taken 8474 times.
✗ Branch 1 not taken.
8474 if (ret < 0)
1121 return ret;
1122
1123 /* flush the pre-muxing queues */
1124 2718 while (1) {
1125 11192 int min_stream = -1;
1126 11192 Timestamp min_ts = { .ts = AV_NOPTS_VALUE };
1127
1128 AVPacket *pkt;
1129
1130 // find the stream with the earliest dts or EOF in pre-muxing queue
1131
2/2
✓ Branch 0 taken 19927 times.
✓ Branch 1 taken 11141 times.
31068 for (unsigned i = 0; i < mux->nb_streams; i++) {
1132 19927 SchMuxStream *ms = &mux->streams[i];
1133
1134
2/2
✓ Branch 1 taken 15248 times.
✓ Branch 2 taken 4679 times.
19927 if (av_fifo_peek(ms->pre_mux_queue.fifo, &pkt, 1, 0) < 0)
1135 15248 continue;
1136
1137
4/4
✓ Branch 0 taken 4641 times.
✓ Branch 1 taken 38 times.
✓ Branch 2 taken 13 times.
✓ Branch 3 taken 4628 times.
4679 if (!pkt || pkt->dts == AV_NOPTS_VALUE) {
1138 51 min_stream = i;
1139 51 break;
1140 }
1141
1142
4/4
✓ Branch 0 taken 1935 times.
✓ Branch 1 taken 2693 times.
✓ Branch 2 taken 26 times.
✓ Branch 3 taken 1909 times.
6563 if (min_ts.ts == AV_NOPTS_VALUE ||
1143 1935 av_compare_ts(min_ts.ts, min_ts.tb, pkt->dts, pkt->time_base) > 0) {
1144 2719 min_stream = i;
1145 2719 min_ts = (Timestamp){ .ts = pkt->dts, .tb = pkt->time_base };
1146 }
1147 }
1148
1149
2/2
✓ Branch 0 taken 2718 times.
✓ Branch 1 taken 8474 times.
11192 if (min_stream >= 0) {
1150 2718 SchMuxStream *ms = &mux->streams[min_stream];
1151
1152 2718 ret = av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1);
1153
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2718 times.
2718 av_assert0(ret >= 0);
1154
1155
2/2
✓ Branch 0 taken 2680 times.
✓ Branch 1 taken 38 times.
2718 if (pkt) {
1156
2/2
✓ Branch 0 taken 2666 times.
✓ Branch 1 taken 14 times.
2680 if (!ms->init_eof)
1157 2666 ret = tq_send(mux->queue, min_stream, pkt);
1158 2680 av_packet_free(&pkt);
1159
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 2679 times.
2680 if (ret == AVERROR_EOF)
1160 1 ms->init_eof = 1;
1161
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2679 times.
2679 else if (ret < 0)
1162 return ret;
1163 } else
1164 38 tq_send_finish(mux->queue, min_stream);
1165
1166 2718 continue;
1167 }
1168
1169 8474 break;
1170 }
1171
1172 8474 atomic_store(&mux->mux_started, 1);
1173
1174 8474 return 0;
1175 }
1176
1177 int print_sdp(const char *filename);
1178
1179 8474 static int mux_init(Scheduler *sch, SchMux *mux)
1180 {
1181 int ret;
1182
1183 8474 ret = mux->init(mux->task.func_arg);
1184
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8474 times.
8474 if (ret < 0)
1185 return ret;
1186
1187 8474 sch->nb_mux_ready++;
1188
1189
2/4
✓ Branch 0 taken 8474 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 8474 times.
8474 if (sch->sdp_filename || sch->sdp_auto) {
1190 if (sch->nb_mux_ready < sch->nb_mux)
1191 return 0;
1192
1193 ret = print_sdp(sch->sdp_filename);
1194 if (ret < 0) {
1195 av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1196 return ret;
1197 }
1198
1199 /* SDP is written only after all the muxers are ready, so now we
1200 * start ALL the threads */
1201 for (unsigned i = 0; i < sch->nb_mux; i++) {
1202 ret = mux_task_start(&sch->mux[i]);
1203 if (ret < 0)
1204 return ret;
1205 }
1206 } else {
1207 8474 ret = mux_task_start(mux);
1208
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8474 times.
8474 if (ret < 0)
1209 return ret;
1210 }
1211
1212 8474 return 0;
1213 }
1214
1215 8978 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1216 size_t data_threshold, int max_packets)
1217 {
1218 SchMux *mux;
1219 SchMuxStream *ms;
1220
1221
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8978 times.
8978 av_assert0(mux_idx < sch->nb_mux);
1222 8978 mux = &sch->mux[mux_idx];
1223
1224
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8978 times.
8978 av_assert0(stream_idx < mux->nb_streams);
1225 8978 ms = &mux->streams[stream_idx];
1226
1227 8978 ms->pre_mux_queue.max_packets = max_packets;
1228 8978 ms->pre_mux_queue.data_threshold = data_threshold;
1229 8978 }
1230
1231 8978 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1232 {
1233 SchMux *mux;
1234 8978 int ret = 0;
1235
1236
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8978 times.
8978 av_assert0(mux_idx < sch->nb_mux);
1237 8978 mux = &sch->mux[mux_idx];
1238
1239
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8978 times.
8978 av_assert0(stream_idx < mux->nb_streams);
1240
1241 8978 pthread_mutex_lock(&sch->mux_ready_lock);
1242
1243
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8978 times.
8978 av_assert0(mux->nb_streams_ready < mux->nb_streams);
1244
1245 // this may be called during initialization - do not start
1246 // threads before sch_start() is called
1247
2/2
✓ Branch 0 taken 8474 times.
✓ Branch 1 taken 504 times.
8978 if (++mux->nb_streams_ready == mux->nb_streams &&
1248
2/2
✓ Branch 0 taken 7980 times.
✓ Branch 1 taken 494 times.
8474 sch->state >= SCH_STATE_STARTED)
1249 7980 ret = mux_init(sch, mux);
1250
1251 8978 pthread_mutex_unlock(&sch->mux_ready_lock);
1252
1253 8978 return ret;
1254 }
1255
1256 1 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1257 unsigned dec_idx)
1258 {
1259 SchMux *mux;
1260 SchMuxStream *ms;
1261 1 int ret = 0;
1262
1263
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(mux_idx < sch->nb_mux);
1264 1 mux = &sch->mux[mux_idx];
1265
1266
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(stream_idx < mux->nb_streams);
1267 1 ms = &mux->streams[stream_idx];
1268
1269 1 ret = GROW_ARRAY(ms->sub_heartbeat_dst, ms->nb_sub_heartbeat_dst);
1270
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ret < 0)
1271 return ret;
1272
1273
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(dec_idx < sch->nb_dec);
1274 1 ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1275
1276
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!mux->sub_heartbeat_pkt) {
1277 1 mux->sub_heartbeat_pkt = av_packet_alloc();
1278
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!mux->sub_heartbeat_pkt)
1279 return AVERROR(ENOMEM);
1280 }
1281
1282 1 return 0;
1283 }
1284
1285 static void unchoke_for_stream(Scheduler *sch, SchedulerNode src);
1286
1287 // Unchoke any filter graphs that are downstream of this node, to prevent it
1288 // from getting stuck trying to push data to a full queue
1289 979461 static void unchoke_downstream(Scheduler *sch, SchedulerNode *dst)
1290 {
1291 SchFilterGraph *fg;
1292 SchDec *dec;
1293 SchEnc *enc;
1294
4/5
✓ Branch 0 taken 441589 times.
✓ Branch 1 taken 1275 times.
✓ Branch 2 taken 95642 times.
✓ Branch 3 taken 440955 times.
✗ Branch 4 not taken.
979461 switch (dst->type) {
1295 441589 case SCH_NODE_TYPE_DEC:
1296 441589 dec = &sch->dec[dst->idx];
1297
2/2
✓ Branch 0 taken 442230 times.
✓ Branch 1 taken 441589 times.
883819 for (int i = 0; i < dec->nb_outputs; i++)
1298 442230 unchoke_downstream(sch, dec->outputs[i].dst);
1299 441589 break;
1300 1275 case SCH_NODE_TYPE_ENC:
1301 1275 enc = &sch->enc[dst->idx];
1302
2/2
✓ Branch 0 taken 1275 times.
✓ Branch 1 taken 1275 times.
2550 for (int i = 0; i < enc->nb_dst; i++)
1303 1275 unchoke_downstream(sch, &enc->dst[i]);
1304 1275 break;
1305 95642 case SCH_NODE_TYPE_MUX:
1306 // muxers are never choked
1307 95642 break;
1308 440955 case SCH_NODE_TYPE_FILTER_IN:
1309 440955 fg = &sch->filters[dst->idx];
1310
2/2
✓ Branch 0 taken 128 times.
✓ Branch 1 taken 440827 times.
440955 if (fg->best_input == fg->nb_inputs) {
1311 128 fg->waiter.choked_next = 0;
1312 } else {
1313 // ensure that this filter graph is not stuck waiting for
1314 // input from a different upstream demuxer
1315 440827 unchoke_for_stream(sch, fg->inputs[fg->best_input].src);
1316 }
1317 440955 break;
1318 default:
1319 av_unreachable("Invalid destination node type?");
1320 break;
1321 }
1322 979461 }
1323
1324 1014756 static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
1325 {
1326 1827858 while (1) {
1327 SchFilterGraph *fg;
1328 SchDemux *demux;
1329
4/5
✓ Branch 0 taken 970853 times.
✓ Branch 1 taken 889141 times.
✓ Branch 2 taken 491871 times.
✓ Branch 3 taken 490749 times.
✗ Branch 4 not taken.
2842614 switch (src.type) {
1330 970853 case SCH_NODE_TYPE_DEMUX:
1331 // fed directly by a demuxer (i.e. not through a filtergraph)
1332 970853 demux = &sch->demux[src.idx];
1333
2/2
✓ Branch 0 taken 473624 times.
✓ Branch 1 taken 497229 times.
970853 if (demux->waiter.choked_next == 0)
1334 473624 return; // prevent infinite loop
1335 497229 demux->waiter.choked_next = 0;
1336
2/2
✓ Branch 0 taken 535956 times.
✓ Branch 1 taken 497229 times.
1033185 for (int i = 0; i < demux->nb_streams; i++)
1337 535956 unchoke_downstream(sch, demux->streams[i].dst);
1338 497229 return;
1339 889141 case SCH_NODE_TYPE_DEC:
1340 889141 src = sch->dec[src.idx].src;
1341 889141 continue;
1342 491871 case SCH_NODE_TYPE_ENC:
1343 491871 src = sch->enc[src.idx].src;
1344 491871 continue;
1345 490749 case SCH_NODE_TYPE_FILTER_OUT:
1346 490749 fg = &sch->filters[src.idx];
1347 // the filtergraph contains internal sources and
1348 // requested to be scheduled directly
1349
2/2
✓ Branch 0 taken 43903 times.
✓ Branch 1 taken 446846 times.
490749 if (fg->best_input == fg->nb_inputs) {
1350 43903 fg->waiter.choked_next = 0;
1351 43903 return;
1352 }
1353 446846 src = fg->inputs[fg->best_input].src;
1354 446846 continue;
1355 default:
1356 av_unreachable("Invalid source node type?");
1357 return;
1358 }
1359 }
1360 }
1361
1362 26835 static void choke_demux(const Scheduler *sch, int demux_id, int choked)
1363 {
1364 av_assert1(demux_id < sch->nb_demux);
1365 26835 SchDemux *demux = &sch->demux[demux_id];
1366
1367
2/2
✓ Branch 0 taken 27599 times.
✓ Branch 1 taken 26835 times.
54434 for (int i = 0; i < demux->nb_streams; i++) {
1368 27599 SchedulerNode *dst = demux->streams[i].dst;
1369 SchFilterGraph *fg;
1370
1371
2/5
✓ Branch 0 taken 25995 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1604 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
27599 switch (dst->type) {
1372 25995 case SCH_NODE_TYPE_DEC:
1373 25995 tq_choke(sch->dec[dst->idx].queue, choked);
1374 25995 break;
1375 case SCH_NODE_TYPE_ENC:
1376 tq_choke(sch->enc[dst->idx].queue, choked);
1377 break;
1378 1604 case SCH_NODE_TYPE_MUX:
1379 1604 break;
1380 case SCH_NODE_TYPE_FILTER_IN:
1381 fg = &sch->filters[dst->idx];
1382 if (fg->nb_inputs == 1)
1383 tq_choke(fg->queue, choked);
1384 break;
1385 default:
1386 av_unreachable("Invalid destination node type?");
1387 break;
1388 }
1389 }
1390 26835 }
1391
1392 575780 static void schedule_update_locked(Scheduler *sch)
1393 {
1394 int64_t dts;
1395 575780 int have_unchoked = 0;
1396
1397 // on termination request all waiters are choked,
1398 // we are not to unchoke them
1399
2/2
✓ Branch 0 taken 12160 times.
✓ Branch 1 taken 563620 times.
575780 if (atomic_load(&sch->terminate))
1400 12160 return;
1401
1402 563620 dts = trailing_dts(sch, 0);
1403
1404 563620 atomic_store(&sch->last_dts, dts);
1405
1406 // initialize our internal state
1407
2/2
✓ Branch 0 taken 1127240 times.
✓ Branch 1 taken 563620 times.
1690860 for (unsigned type = 0; type < 2; type++)
1408
4/4
✓ Branch 0 taken 1073236 times.
✓ Branch 1 taken 1102963 times.
✓ Branch 2 taken 1048959 times.
✓ Branch 3 taken 1127240 times.
2176199 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1409
2/2
✓ Branch 0 taken 509616 times.
✓ Branch 1 taken 539343 times.
1048959 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1410 1048959 w->choked_prev = atomic_load(&w->choked);
1411 1048959 w->choked_next = 1;
1412 }
1413
1414 // figure out the sources that are allowed to proceed
1415
2/2
✓ Branch 0 taken 564964 times.
✓ Branch 1 taken 563620 times.
1128584 for (unsigned i = 0; i < sch->nb_mux; i++) {
1416 564964 SchMux *mux = &sch->mux[i];
1417
1418
2/2
✓ Branch 0 taken 637691 times.
✓ Branch 1 taken 564964 times.
1202655 for (unsigned j = 0; j < mux->nb_streams; j++) {
1419 637691 SchMuxStream *ms = &mux->streams[j];
1420
1421 // unblock sources for output streams that are not finished
1422 // and not too far ahead of the trailing stream
1423
2/2
✓ Branch 0 taken 36816 times.
✓ Branch 1 taken 600875 times.
637691 if (ms->source_finished)
1424 36816 continue;
1425
4/4
✓ Branch 0 taken 40445 times.
✓ Branch 1 taken 560430 times.
✓ Branch 2 taken 8158 times.
✓ Branch 3 taken 32287 times.
600875 if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1426 8158 continue;
1427
4/4
✓ Branch 0 taken 560430 times.
✓ Branch 1 taken 32287 times.
✓ Branch 2 taken 19134 times.
✓ Branch 3 taken 541296 times.
592717 if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1428 19134 continue;
1429
1430 // resolve the source to unchoke
1431 573583 unchoke_for_stream(sch, ms->src);
1432 573583 have_unchoked = 1;
1433 }
1434 }
1435
1436 // also unchoke any sources feeding into closed filter graph inputs, so
1437 // that they can observe the downstream EOF
1438
2/2
✓ Branch 0 taken 509616 times.
✓ Branch 1 taken 563620 times.
1073236 for (unsigned i = 0; i < sch->nb_filters; i++) {
1439 509616 SchFilterGraph *fg = &sch->filters[i];
1440
1441
2/2
✓ Branch 0 taken 496560 times.
✓ Branch 1 taken 509616 times.
1006176 for (unsigned j = 0; j < fg->nb_inputs; j++) {
1442 496560 SchFilterIn *fi = &fg->inputs[j];
1443
4/4
✓ Branch 0 taken 3537 times.
✓ Branch 1 taken 493023 times.
✓ Branch 2 taken 346 times.
✓ Branch 3 taken 3191 times.
496560 if (fi->receive_finished && !fi->send_finished)
1444 346 unchoke_for_stream(sch, fi->src);
1445 }
1446 }
1447
1448 // make sure to unchoke at least one source, if still available
1449
4/4
✓ Branch 0 taken 46785 times.
✓ Branch 1 taken 557119 times.
✓ Branch 2 taken 40284 times.
✓ Branch 3 taken 6501 times.
603904 for (unsigned type = 0; !have_unchoked && type < 2; type++)
1450
4/4
✓ Branch 0 taken 18183 times.
✓ Branch 1 taken 38362 times.
✓ Branch 2 taken 37588 times.
✓ Branch 3 taken 18957 times.
56545 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1451
2/2
✓ Branch 0 taken 11682 times.
✓ Branch 1 taken 25906 times.
37588 int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1452
2/2
✓ Branch 0 taken 11682 times.
✓ Branch 1 taken 25906 times.
37588 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1453
2/2
✓ Branch 0 taken 21327 times.
✓ Branch 1 taken 16261 times.
37588 if (!exited) {
1454 21327 w->choked_next = 0;
1455 21327 have_unchoked = 1;
1456 21327 break;
1457 }
1458 }
1459
1460
2/2
✓ Branch 0 taken 1127240 times.
✓ Branch 1 taken 563620 times.
1690860 for (unsigned type = 0; type < 2; type++) {
1461
4/4
✓ Branch 0 taken 1073236 times.
✓ Branch 1 taken 1102963 times.
✓ Branch 2 taken 1048959 times.
✓ Branch 3 taken 1127240 times.
2176199 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1462
2/2
✓ Branch 0 taken 509616 times.
✓ Branch 1 taken 539343 times.
1048959 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1463
2/2
✓ Branch 0 taken 43382 times.
✓ Branch 1 taken 1005577 times.
1048959 if (w->choked_prev != w->choked_next) {
1464 43382 waiter_set(w, w->choked_next);
1465
2/2
✓ Branch 0 taken 19451 times.
✓ Branch 1 taken 23931 times.
43382 if (!type)
1466 19451 choke_demux(sch, i, w->choked_next);
1467 }
1468 }
1469 }
1470
1471 }
1472
1473 enum {
1474 CYCLE_NODE_NEW = 0,
1475 CYCLE_NODE_STARTED,
1476 CYCLE_NODE_DONE,
1477 };
1478
1479 // Finds the filtergraph or muxer upstream of a scheduler node
1480 6980 static SchedulerNode src_filtergraph(const Scheduler *sch, SchedulerNode src)
1481 {
1482 while (1) {
1483
3/4
✓ Branch 0 taken 6980 times.
✓ Branch 1 taken 6979 times.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
13960 switch (src.type) {
1484 6980 case SCH_NODE_TYPE_DEMUX:
1485 case SCH_NODE_TYPE_FILTER_OUT:
1486 6980 return src;
1487 6979 case SCH_NODE_TYPE_DEC:
1488 6979 src = sch->dec[src.idx].src;
1489 6980 continue;
1490 1 case SCH_NODE_TYPE_ENC:
1491 1 src = sch->enc[src.idx].src;
1492 1 continue;
1493 default:
1494 av_unreachable("Invalid source node type?");
1495 return (SchedulerNode) {0};
1496 }
1497 }
1498 }
1499
1500 static int
1501 8059 check_acyclic_for_output(const Scheduler *sch, SchedulerNode src,
1502 uint8_t *filters_visited, SchedulerNode *filters_stack)
1503 {
1504 8059 unsigned nb_filters_stack = 0;
1505
1506 8059 memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1507
1508 6982 while (1) {
1509 15041 const SchFilterGraph *fg = &sch->filters[src.idx];
1510
1511 15041 filters_visited[src.idx] = CYCLE_NODE_STARTED;
1512
1513 // descend into every input, depth first
1514
2/2
✓ Branch 0 taken 6980 times.
✓ Branch 1 taken 8061 times.
15041 if (src.idx_stream < fg->nb_inputs) {
1515 6980 const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1516 6980 SchedulerNode node = src_filtergraph(sch, fi->src);
1517
1518 // connected to demuxer, no cycles possible
1519
2/2
✓ Branch 0 taken 6978 times.
✓ Branch 1 taken 2 times.
6980 if (node.type == SCH_NODE_TYPE_DEMUX)
1520 6980 continue;
1521
1522 // otherwise connected to another filtergraph
1523
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 av_assert0(node.type == SCH_NODE_TYPE_FILTER_OUT);
1524
1525 // found a cycle
1526
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (filters_visited[node.idx] == CYCLE_NODE_STARTED)
1527 return AVERROR(EINVAL);
1528
1529 // place current position on stack and descend
1530
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 av_assert0(nb_filters_stack < sch->nb_filters);
1531 2 filters_stack[nb_filters_stack++] = src;
1532 2 src = (SchedulerNode){ .idx = node.idx, .idx_stream = 0 };
1533 2 continue;
1534 }
1535
1536 8061 filters_visited[src.idx] = CYCLE_NODE_DONE;
1537
1538 // previous search finished,
1539
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 8059 times.
8061 if (nb_filters_stack) {
1540 2 src = filters_stack[--nb_filters_stack];
1541 2 continue;
1542 }
1543 8059 return 0;
1544 }
1545 }
1546
1547 8470 static int check_acyclic(Scheduler *sch)
1548 {
1549 8470 uint8_t *filters_visited = NULL;
1550 8470 SchedulerNode *filters_stack = NULL;
1551
1552 8470 int ret = 0;
1553
1554
2/2
✓ Branch 0 taken 528 times.
✓ Branch 1 taken 7942 times.
8470 if (!sch->nb_filters)
1555 528 return 0;
1556
1557 7942 filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1558
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7942 times.
7942 if (!filters_visited)
1559 return AVERROR(ENOMEM);
1560
1561 7942 filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1562
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7942 times.
7942 if (!filters_stack) {
1563 ret = AVERROR(ENOMEM);
1564 goto fail;
1565 }
1566
1567 // trace the transcoding graph upstream from every filtegraph
1568
2/2
✓ Branch 0 taken 8059 times.
✓ Branch 1 taken 7942 times.
16001 for (unsigned i = 0; i < sch->nb_filters; i++) {
1569 8059 ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1570 filters_visited, filters_stack);
1571
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8059 times.
8059 if (ret < 0) {
1572 av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1573 goto fail;
1574 }
1575 }
1576
1577 7942 fail:
1578 7942 av_freep(&filters_visited);
1579 7942 av_freep(&filters_stack);
1580 7942 return ret;
1581 }
1582
1583 8470 static int start_prepare(Scheduler *sch)
1584 {
1585 int ret;
1586
1587
2/2
✓ Branch 0 taken 7384 times.
✓ Branch 1 taken 8470 times.
15854 for (unsigned i = 0; i < sch->nb_demux; i++) {
1588 7384 SchDemux *d = &sch->demux[i];
1589
1590
2/2
✓ Branch 0 taken 7665 times.
✓ Branch 1 taken 7384 times.
15049 for (unsigned j = 0; j < d->nb_streams; j++) {
1591 7665 SchDemuxStream *ds = &d->streams[j];
1592
1593
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7665 times.
7665 if (!ds->nb_dst) {
1594 av_log(d, AV_LOG_ERROR,
1595 "Demuxer stream %u not connected to any sink\n", j);
1596 return AVERROR(EINVAL);
1597 }
1598
1599 7665 ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1600
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7665 times.
7665 if (!ds->dst_finished)
1601 return AVERROR(ENOMEM);
1602 }
1603 }
1604
1605
2/2
✓ Branch 0 taken 6946 times.
✓ Branch 1 taken 8470 times.
15416 for (unsigned i = 0; i < sch->nb_dec; i++) {
1606 6946 SchDec *dec = &sch->dec[i];
1607
1608
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6946 times.
6946 if (!dec->src.type) {
1609 av_log(dec, AV_LOG_ERROR,
1610 "Decoder not connected to a source\n");
1611 return AVERROR(EINVAL);
1612 }
1613
1614
2/2
✓ Branch 0 taken 6952 times.
✓ Branch 1 taken 6946 times.
13898 for (unsigned j = 0; j < dec->nb_outputs; j++) {
1615 6952 SchDecOutput *o = &dec->outputs[j];
1616
1617
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6952 times.
6952 if (!o->nb_dst) {
1618 av_log(dec, AV_LOG_ERROR,
1619 "Decoder output %u not connected to any sink\n", j);
1620 return AVERROR(EINVAL);
1621 }
1622
1623 6952 o->dst_finished = av_calloc(o->nb_dst, sizeof(*o->dst_finished));
1624
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6952 times.
6952 if (!o->dst_finished)
1625 return AVERROR(ENOMEM);
1626 }
1627 }
1628
1629
2/2
✓ Branch 0 taken 8232 times.
✓ Branch 1 taken 8470 times.
16702 for (unsigned i = 0; i < sch->nb_enc; i++) {
1630 8232 SchEnc *enc = &sch->enc[i];
1631
1632
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8232 times.
8232 if (!enc->src.type) {
1633 av_log(enc, AV_LOG_ERROR,
1634 "Encoder not connected to a source\n");
1635 return AVERROR(EINVAL);
1636 }
1637
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8232 times.
8232 if (!enc->nb_dst) {
1638 av_log(enc, AV_LOG_ERROR,
1639 "Encoder not connected to any sink\n");
1640 return AVERROR(EINVAL);
1641 }
1642
1643 8232 enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1644
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8232 times.
8232 if (!enc->dst_finished)
1645 return AVERROR(ENOMEM);
1646 }
1647
1648
2/2
✓ Branch 0 taken 8474 times.
✓ Branch 1 taken 8470 times.
16944 for (unsigned i = 0; i < sch->nb_mux; i++) {
1649 8474 SchMux *mux = &sch->mux[i];
1650
1651
2/2
✓ Branch 0 taken 8978 times.
✓ Branch 1 taken 8474 times.
17452 for (unsigned j = 0; j < mux->nb_streams; j++) {
1652 8978 SchMuxStream *ms = &mux->streams[j];
1653
1654
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8978 times.
8978 if (!ms->src.type) {
1655 av_log(mux, AV_LOG_ERROR,
1656 "Muxer stream #%u not connected to a source\n", j);
1657 return AVERROR(EINVAL);
1658 }
1659 }
1660
1661 8474 ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1662 QUEUE_PACKETS);
1663
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8474 times.
8474 if (ret < 0)
1664 return ret;
1665 }
1666
1667
2/2
✓ Branch 0 taken 8059 times.
✓ Branch 1 taken 8470 times.
16529 for (unsigned i = 0; i < sch->nb_filters; i++) {
1668 8059 SchFilterGraph *fg = &sch->filters[i];
1669
1670
2/2
✓ Branch 0 taken 6975 times.
✓ Branch 1 taken 8059 times.
15034 for (unsigned j = 0; j < fg->nb_inputs; j++) {
1671 6975 SchFilterIn *fi = &fg->inputs[j];
1672
1673
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6975 times.
6975 if (!fi->src.type) {
1674 av_log(fg, AV_LOG_ERROR,
1675 "Filtergraph input %u not connected to a source\n", j);
1676 return AVERROR(EINVAL);
1677 }
1678 }
1679
1680
2/2
✓ Branch 0 taken 8191 times.
✓ Branch 1 taken 8059 times.
16250 for (unsigned j = 0; j < fg->nb_outputs; j++) {
1681 8191 SchFilterOut *fo = &fg->outputs[j];
1682
1683
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8191 times.
8191 if (!fo->dst.type) {
1684 av_log(fg, AV_LOG_ERROR,
1685 "Filtergraph %u output %u not connected to a sink\n", i, j);
1686 return AVERROR(EINVAL);
1687 }
1688 }
1689 }
1690
1691 // Check that the transcoding graph has no cycles.
1692 8470 ret = check_acyclic(sch);
1693
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8470 times.
8470 if (ret < 0)
1694 return ret;
1695
1696 8470 return 0;
1697 }
1698
1699 8470 int sch_start(Scheduler *sch)
1700 {
1701 int ret;
1702
1703 8470 ret = start_prepare(sch);
1704
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8470 times.
8470 if (ret < 0)
1705 return ret;
1706
1707
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8470 times.
8470 av_assert0(sch->state == SCH_STATE_UNINIT);
1708 8470 sch->state = SCH_STATE_STARTED;
1709
1710
2/2
✓ Branch 0 taken 8474 times.
✓ Branch 1 taken 8470 times.
16944 for (unsigned i = 0; i < sch->nb_mux; i++) {
1711 8474 SchMux *mux = &sch->mux[i];
1712
1713
2/2
✓ Branch 0 taken 494 times.
✓ Branch 1 taken 7980 times.
8474 if (mux->nb_streams_ready == mux->nb_streams) {
1714 494 ret = mux_init(sch, mux);
1715
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 494 times.
494 if (ret < 0)
1716 goto fail;
1717 }
1718 }
1719
1720
2/2
✓ Branch 0 taken 8232 times.
✓ Branch 1 taken 8470 times.
16702 for (unsigned i = 0; i < sch->nb_enc; i++) {
1721 8232 SchEnc *enc = &sch->enc[i];
1722
1723 8232 ret = task_start(&enc->task);
1724
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8232 times.
8232 if (ret < 0)
1725 goto fail;
1726 }
1727
1728
2/2
✓ Branch 0 taken 8059 times.
✓ Branch 1 taken 8470 times.
16529 for (unsigned i = 0; i < sch->nb_filters; i++) {
1729 8059 SchFilterGraph *fg = &sch->filters[i];
1730
1731 8059 ret = task_start(&fg->task);
1732
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8059 times.
8059 if (ret < 0)
1733 goto fail;
1734 }
1735
1736
2/2
✓ Branch 0 taken 6946 times.
✓ Branch 1 taken 8470 times.
15416 for (unsigned i = 0; i < sch->nb_dec; i++) {
1737 6946 SchDec *dec = &sch->dec[i];
1738
1739 6946 ret = task_start(&dec->task);
1740
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6946 times.
6946 if (ret < 0)
1741 goto fail;
1742 }
1743
1744
2/2
✓ Branch 0 taken 7384 times.
✓ Branch 1 taken 8470 times.
15854 for (unsigned i = 0; i < sch->nb_demux; i++) {
1745 7384 SchDemux *d = &sch->demux[i];
1746
1747
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 7370 times.
7384 if (!d->nb_streams)
1748 14 continue;
1749
1750 7370 ret = task_start(&d->task);
1751
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7370 times.
7370 if (ret < 0)
1752 goto fail;
1753 }
1754
1755 8470 pthread_mutex_lock(&sch->schedule_lock);
1756 8470 schedule_update_locked(sch);
1757 8470 pthread_mutex_unlock(&sch->schedule_lock);
1758
1759 8470 return 0;
1760 fail:
1761 sch_stop(sch, NULL);
1762 return ret;
1763 }
1764
1765 26803 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1766 {
1767 int ret;
1768
1769 // convert delay to absolute timestamp
1770 26803 timeout_us += av_gettime();
1771
1772 26803 pthread_mutex_lock(&sch->finish_lock);
1773
1774
1/2
✓ Branch 0 taken 26803 times.
✗ Branch 1 not taken.
26803 if (sch->nb_mux_done < sch->nb_mux) {
1775 26803 struct timespec tv = { .tv_sec = timeout_us / 1000000,
1776 26803 .tv_nsec = (timeout_us % 1000000) * 1000 };
1777 26803 pthread_cond_timedwait(&sch->finish_cond, &sch->finish_lock, &tv);
1778 }
1779
1780 // abort transcoding if any task failed
1781
4/4
✓ Branch 0 taken 18334 times.
✓ Branch 1 taken 8469 times.
✓ Branch 2 taken 1 times.
✓ Branch 3 taken 18333 times.
26803 ret = sch->nb_mux_done == sch->nb_mux || sch->task_failed;
1782
1783 26803 pthread_mutex_unlock(&sch->finish_lock);
1784
1785 26803 *transcode_ts = atomic_load(&sch->last_dts);
1786
1787 26803 return ret;
1788 }
1789
1790 8190 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1791 {
1792 int ret;
1793
1794 8190 ret = enc->open_cb(enc->task.func_arg, frame);
1795
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8190 times.
8190 if (ret < 0)
1796 return ret;
1797
1798 // ret>0 signals audio frame size, which means sync queue must
1799 // have been enabled during encoder creation
1800
2/2
✓ Branch 0 taken 178 times.
✓ Branch 1 taken 8012 times.
8190 if (ret > 0) {
1801 SchSyncQueue *sq;
1802
1803
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 178 times.
178 av_assert0(enc->sq_idx[0] >= 0);
1804 178 sq = &sch->sq_enc[enc->sq_idx[0]];
1805
1806 178 pthread_mutex_lock(&sq->lock);
1807
1808 178 sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1809
1810 178 pthread_mutex_unlock(&sq->lock);
1811 }
1812
1813 8190 return 0;
1814 }
1815
1816 475726 static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1817 {
1818 int ret;
1819
1820
2/2
✓ Branch 0 taken 20003 times.
✓ Branch 1 taken 455723 times.
475726 if (!frame) {
1821 20003 tq_send_finish(enc->queue, 0);
1822 20003 return 0;
1823 }
1824
1825
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 455723 times.
455723 if (enc->in_finished)
1826 return AVERROR_EOF;
1827
1828 455723 ret = tq_send(enc->queue, 0, frame);
1829
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 455722 times.
455723 if (ret < 0)
1830 1 enc->in_finished = 1;
1831
1832 455723 return ret;
1833 }
1834
1835 39091 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1836 {
1837 39091 SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1838 39091 int ret = 0;
1839
1840 // inform the scheduling code that no more input will arrive along this path;
1841 // this is necessary because the sync queue may not send an EOF downstream
1842 // until other streams finish
1843 // TODO: consider a cleaner way of passing this information through
1844 // the pipeline
1845
2/2
✓ Branch 0 taken 6637 times.
✓ Branch 1 taken 32454 times.
39091 if (!frame) {
1846
2/2
✓ Branch 0 taken 6637 times.
✓ Branch 1 taken 6637 times.
13274 for (unsigned i = 0; i < enc->nb_dst; i++) {
1847 SchMux *mux;
1848 SchMuxStream *ms;
1849
1850
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6637 times.
6637 if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1851 continue;
1852
1853 6637 mux = &sch->mux[enc->dst[i].idx];
1854 6637 ms = &mux->streams[enc->dst[i].idx_stream];
1855
1856 6637 pthread_mutex_lock(&sch->schedule_lock);
1857
1858 6637 ms->source_finished = 1;
1859 6637 schedule_update_locked(sch);
1860
1861 6637 pthread_mutex_unlock(&sch->schedule_lock);
1862 }
1863 }
1864
1865 39091 pthread_mutex_lock(&sq->lock);
1866
1867 39091 ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1868
2/2
✓ Branch 0 taken 39089 times.
✓ Branch 1 taken 2 times.
39091 if (ret < 0)
1869 2 goto finish;
1870
1871 82991 while (1) {
1872 SchEnc *enc;
1873
1874 // TODO: the SQ API should be extended to allow returning EOF
1875 // for individual streams
1876 122080 ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1877
2/2
✓ Branch 0 taken 39089 times.
✓ Branch 1 taken 82991 times.
122080 if (ret < 0) {
1878
2/2
✓ Branch 0 taken 9614 times.
✓ Branch 1 taken 29475 times.
39089 ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1879 39089 break;
1880 }
1881
1882 82991 enc = &sch->enc[sq->enc_idx[ret]];
1883 82991 ret = send_to_enc_thread(sch, enc, sq->frame);
1884
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 82991 times.
82991 if (ret < 0) {
1885 av_frame_unref(sq->frame);
1886 if (ret != AVERROR_EOF)
1887 break;
1888
1889 sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1890 continue;
1891 }
1892 }
1893
1894
2/2
✓ Branch 0 taken 29475 times.
✓ Branch 1 taken 9614 times.
39089 if (ret < 0) {
1895 // close all encoders fed from this sync queue
1896
2/2
✓ Branch 0 taken 10089 times.
✓ Branch 1 taken 9614 times.
19703 for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1897 10089 int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1898
1899 // if the sync queue error is EOF and closing the encoder
1900 // produces a more serious error, make sure to pick the latter
1901
2/4
✓ Branch 0 taken 10089 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 10089 times.
✗ Branch 3 not taken.
10089 ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1902 }
1903 }
1904
1905 39089 finish:
1906 39091 pthread_mutex_unlock(&sq->lock);
1907
1908 39091 return ret;
1909 }
1910
1911 421740 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1912 {
1913
6/6
✓ Branch 0 taken 420655 times.
✓ Branch 1 taken 1085 times.
✓ Branch 2 taken 404146 times.
✓ Branch 3 taken 16509 times.
✓ Branch 4 taken 8190 times.
✓ Branch 5 taken 395956 times.
421740 if (enc->open_cb && frame && !enc->opened) {
1914 8190 int ret = enc_open(sch, enc, frame);
1915
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8190 times.
8190 if (ret < 0)
1916 return ret;
1917 8190 enc->opened = 1;
1918
1919 // discard empty frames that only carry encoder init parameters
1920
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 8187 times.
8190 if (!frame->buf[0]) {
1921 3 av_frame_unref(frame);
1922 3 return 0;
1923 }
1924 }
1925
1926 421737 return (enc->sq_idx[0] >= 0) ?
1927
2/2
✓ Branch 0 taken 39091 times.
✓ Branch 1 taken 382646 times.
804383 send_to_enc_sq (sch, enc, frame) :
1928 382646 send_to_enc_thread(sch, enc, frame);
1929 }
1930
1931 2718 static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
1932 {
1933 2718 PreMuxQueue *q = &ms->pre_mux_queue;
1934 2718 AVPacket *tmp_pkt = NULL;
1935 int ret;
1936
1937
2/2
✓ Branch 1 taken 91 times.
✓ Branch 2 taken 2627 times.
2718 if (!av_fifo_can_write(q->fifo)) {
1938 91 size_t packets = av_fifo_can_read(q->fifo);
1939
1/2
✓ Branch 0 taken 91 times.
✗ Branch 1 not taken.
91 size_t pkt_size = pkt ? pkt->size : 0;
1940 91 int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1941
2/2
✓ Branch 0 taken 86 times.
✓ Branch 1 taken 5 times.
91 size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1942 91 size_t new_size = FFMIN(2 * packets, max_packets);
1943
1944
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 91 times.
91 if (new_size <= packets) {
1945 av_log(mux, AV_LOG_ERROR,
1946 "Too many packets buffered for output stream.\n");
1947 return AVERROR_BUFFER_TOO_SMALL;
1948 }
1949 91 ret = av_fifo_grow2(q->fifo, new_size - packets);
1950
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 91 times.
91 if (ret < 0)
1951 return ret;
1952 }
1953
1954
2/2
✓ Branch 0 taken 2680 times.
✓ Branch 1 taken 38 times.
2718 if (pkt) {
1955 2680 tmp_pkt = av_packet_alloc();
1956
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2680 times.
2680 if (!tmp_pkt)
1957 return AVERROR(ENOMEM);
1958
1959 2680 av_packet_move_ref(tmp_pkt, pkt);
1960 2680 q->data_size += tmp_pkt->size;
1961 }
1962 2718 av_fifo_write(q->fifo, &tmp_pkt, 1);
1963
1964 2718 return 0;
1965 }
1966
1967 529709 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1968 AVPacket *pkt)
1969 {
1970 529709 SchMuxStream *ms = &mux->streams[stream_idx];
1971
2/2
✓ Branch 0 taken 512407 times.
✓ Branch 1 taken 8324 times.
520731 int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1972
2/2
✓ Branch 0 taken 520731 times.
✓ Branch 1 taken 8978 times.
1050440 av_rescale_q(pkt->dts + pkt->duration, pkt->time_base, AV_TIME_BASE_Q) :
1973 AV_NOPTS_VALUE;
1974
1975 // queue the packet if the muxer cannot be started yet
1976
2/2
✓ Branch 0 taken 2736 times.
✓ Branch 1 taken 526973 times.
529709 if (!atomic_load(&mux->mux_started)) {
1977 2736 int queued = 0;
1978
1979 // the muxer could have started between the above atomic check and
1980 // locking the mutex, then this block falls through to normal send path
1981 2736 pthread_mutex_lock(&sch->mux_ready_lock);
1982
1983
2/2
✓ Branch 0 taken 2718 times.
✓ Branch 1 taken 18 times.
2736 if (!atomic_load(&mux->mux_started)) {
1984 2718 int ret = mux_queue_packet(mux, ms, pkt);
1985
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2718 times.
2718 queued = ret < 0 ? ret : 1;
1986 }
1987
1988 2736 pthread_mutex_unlock(&sch->mux_ready_lock);
1989
1990
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2736 times.
2736 if (queued < 0)
1991 return queued;
1992
2/2
✓ Branch 0 taken 2718 times.
✓ Branch 1 taken 18 times.
2736 else if (queued)
1993 2718 goto update_schedule;
1994 }
1995
1996
2/2
✓ Branch 0 taken 518051 times.
✓ Branch 1 taken 8940 times.
526991 if (pkt) {
1997 int ret;
1998
1999
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 518051 times.
518051 if (ms->init_eof)
2000 return AVERROR_EOF;
2001
2002 518051 ret = tq_send(mux->queue, stream_idx, pkt);
2003
2/2
✓ Branch 0 taken 74 times.
✓ Branch 1 taken 517977 times.
518051 if (ret < 0)
2004 74 return ret;
2005 } else
2006 8940 tq_send_finish(mux->queue, stream_idx);
2007
2008 529635 update_schedule:
2009 // TODO: use atomics to check whether this changes trailing dts
2010 // to avoid locking unnecessarily
2011
4/4
✓ Branch 0 taken 17302 times.
✓ Branch 1 taken 512333 times.
✓ Branch 2 taken 8978 times.
✓ Branch 3 taken 8324 times.
529635 if (dts != AV_NOPTS_VALUE || !pkt) {
2012 521311 pthread_mutex_lock(&sch->schedule_lock);
2013
2014
2/2
✓ Branch 0 taken 512333 times.
✓ Branch 1 taken 8978 times.
521311 if (pkt) ms->last_dts = dts;
2015 8978 else ms->source_finished = 1;
2016
2017 521311 schedule_update_locked(sch);
2018
2019 521311 pthread_mutex_unlock(&sch->schedule_lock);
2020 }
2021
2022 529635 return 0;
2023 }
2024
2025 static int
2026 477734 demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2027 uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
2028 {
2029 int ret;
2030
2031
2/2
✓ Branch 0 taken 3216 times.
✓ Branch 1 taken 474518 times.
477734 if (*dst_finished)
2032 3216 return AVERROR_EOF;
2033
2034
4/4
✓ Branch 0 taken 470043 times.
✓ Branch 1 taken 4475 times.
✓ Branch 2 taken 71155 times.
✓ Branch 3 taken 398888 times.
474518 if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
2035
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 71153 times.
71155 (flags & DEMUX_SEND_STREAMCOPY_EOF)) {
2036 2 av_packet_unref(pkt);
2037 2 pkt = NULL;
2038 }
2039
2040
2/2
✓ Branch 0 taken 4477 times.
✓ Branch 1 taken 470041 times.
474518 if (!pkt)
2041 4477 goto finish;
2042
2043 940082 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2044
2/2
✓ Branch 0 taken 71153 times.
✓ Branch 1 taken 398888 times.
470041 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2045 398888 tq_send(sch->dec[dst.idx].queue, 0, pkt);
2046
2/2
✓ Branch 0 taken 3214 times.
✓ Branch 1 taken 466827 times.
470041 if (ret == AVERROR_EOF)
2047 3214 goto finish;
2048
2049 466827 return ret;
2050
2051 7691 finish:
2052
2/2
✓ Branch 0 taken 746 times.
✓ Branch 1 taken 6945 times.
7691 if (dst.type == SCH_NODE_TYPE_MUX)
2053 746 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2054 else
2055 6945 tq_send_finish(sch->dec[dst.idx].queue, 0);
2056
2057 7691 *dst_finished = 1;
2058 7691 return AVERROR_EOF;
2059 }
2060
2061 477267 static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds,
2062 AVPacket *pkt, unsigned flags)
2063 {
2064 477267 unsigned nb_done = 0;
2065
2066
2/2
✓ Branch 0 taken 477734 times.
✓ Branch 1 taken 477267 times.
955001 for (unsigned i = 0; i < ds->nb_dst; i++) {
2067 477734 AVPacket *to_send = pkt;
2068 477734 uint8_t *finished = &ds->dst_finished[i];
2069
2070 int ret;
2071
2072 // sending a packet consumes it, so make a temporary reference if needed
2073
4/4
✓ Branch 0 taken 470043 times.
✓ Branch 1 taken 7691 times.
✓ Branch 2 taken 441 times.
✓ Branch 3 taken 469602 times.
477734 if (pkt && i < ds->nb_dst - 1) {
2074 441 to_send = d->send_pkt;
2075
2076 441 ret = av_packet_ref(to_send, pkt);
2077
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 441 times.
441 if (ret < 0)
2078 return ret;
2079 }
2080
2081 477734 ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
2082
2/2
✓ Branch 0 taken 470043 times.
✓ Branch 1 taken 7691 times.
477734 if (to_send)
2083 470043 av_packet_unref(to_send);
2084
2/2
✓ Branch 0 taken 10907 times.
✓ Branch 1 taken 466827 times.
477734 if (ret == AVERROR_EOF)
2085 10907 nb_done++;
2086
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 466827 times.
466827 else if (ret < 0)
2087 return ret;
2088 }
2089
2090
2/2
✓ Branch 0 taken 10880 times.
✓ Branch 1 taken 466387 times.
477267 return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
2091 }
2092
2093 11 static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
2094 {
2095 11 Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
2096
2097
3/6
✓ Branch 0 taken 11 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 11 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 11 times.
11 av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
2098
2099
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 11 times.
25 for (unsigned i = 0; i < d->nb_streams; i++) {
2100 14 SchDemuxStream *ds = &d->streams[i];
2101
2102
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 14 times.
28 for (unsigned j = 0; j < ds->nb_dst; j++) {
2103 14 const SchedulerNode *dst = &ds->dst[j];
2104 SchDec *dec;
2105 int ret;
2106
2107
3/4
✓ Branch 0 taken 14 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
✓ Branch 3 taken 6 times.
14 if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
2108 8 continue;
2109
2110 6 dec = &sch->dec[dst->idx];
2111
2112 6 ret = tq_send(dec->queue, 0, pkt);
2113
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0)
2114 return ret;
2115
2116
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 3 times.
6 if (dec->queue_end_ts) {
2117 Timestamp ts;
2118 3 ret = av_thread_message_queue_recv(dec->queue_end_ts, &ts, 0);
2119
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret < 0)
2120 return ret;
2121
2122
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (max_end_ts.ts == AV_NOPTS_VALUE ||
2123 (ts.ts != AV_NOPTS_VALUE &&
2124 av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
2125 3 max_end_ts = ts;
2126
2127 }
2128 }
2129 }
2130
2131 11 pkt->pts = max_end_ts.ts;
2132 11 pkt->time_base = max_end_ts.tb;
2133
2134 11 return 0;
2135 }
2136
2137 470009 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
2138 unsigned flags)
2139 {
2140 SchDemux *d;
2141 int terminate;
2142
2143
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 470009 times.
470009 av_assert0(demux_idx < sch->nb_demux);
2144 470009 d = &sch->demux[demux_idx];
2145
2146 470009 terminate = waiter_wait(sch, &d->waiter);
2147
2/2
✓ Branch 0 taken 396 times.
✓ Branch 1 taken 469613 times.
470009 if (terminate)
2148 396 return AVERROR_EXIT;
2149
2150 // flush the downstreams after seek
2151
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 469602 times.
469613 if (pkt->stream_index == -1)
2152 11 return demux_flush(sch, d, pkt);
2153
2154
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 469602 times.
469602 av_assert0(pkt->stream_index < d->nb_streams);
2155
2156 469602 return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
2157 }
2158
2159 7384 static int demux_done(Scheduler *sch, unsigned demux_idx)
2160 {
2161 7384 SchDemux *d = &sch->demux[demux_idx];
2162 7384 int ret = 0;
2163
2164
2/2
✓ Branch 0 taken 7665 times.
✓ Branch 1 taken 7384 times.
15049 for (unsigned i = 0; i < d->nb_streams; i++) {
2165 7665 int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
2166
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7665 times.
7665 if (err != AVERROR_EOF)
2167 ret = err_merge(ret, err);
2168 }
2169
2170 7384 pthread_mutex_lock(&sch->schedule_lock);
2171
2172 7384 d->task_exited = 1;
2173
2174 7384 schedule_update_locked(sch);
2175
2176 7384 pthread_mutex_unlock(&sch->schedule_lock);
2177
2178 7384 return ret;
2179 }
2180
2181 537826 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2182 {
2183 SchMux *mux;
2184 int ret, stream_idx;
2185
2186
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 537826 times.
537826 av_assert0(mux_idx < sch->nb_mux);
2187 537826 mux = &sch->mux[mux_idx];
2188
2189 537826 ret = tq_receive(mux->queue, &stream_idx, pkt);
2190 537826 pkt->stream_index = stream_idx;
2191 537826 return ret;
2192 }
2193
2194 219 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2195 {
2196 SchMux *mux;
2197
2198
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 219 times.
219 av_assert0(mux_idx < sch->nb_mux);
2199 219 mux = &sch->mux[mux_idx];
2200
2201
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 219 times.
219 av_assert0(stream_idx < mux->nb_streams);
2202 219 tq_receive_finish(mux->queue, stream_idx);
2203
2204 219 pthread_mutex_lock(&sch->schedule_lock);
2205 219 mux->streams[stream_idx].source_finished = 1;
2206
2207 219 schedule_update_locked(sch);
2208
2209 219 pthread_mutex_unlock(&sch->schedule_lock);
2210 219 }
2211
2212 480500 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2213 const AVPacket *pkt)
2214 {
2215 SchMux *mux;
2216 SchMuxStream *ms;
2217
2218
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 480500 times.
480500 av_assert0(mux_idx < sch->nb_mux);
2219 480500 mux = &sch->mux[mux_idx];
2220
2221
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 480500 times.
480500 av_assert0(stream_idx < mux->nb_streams);
2222 480500 ms = &mux->streams[stream_idx];
2223
2224
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 480500 times.
480505 for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2225 5 SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2226 int ret;
2227
2228 5 ret = av_packet_copy_props(mux->sub_heartbeat_pkt, pkt);
2229
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
5 if (ret < 0)
2230 return ret;
2231
2232 5 tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2233 }
2234
2235 480500 return 0;
2236 }
2237
2238 8474 static int mux_done(Scheduler *sch, unsigned mux_idx)
2239 {
2240 8474 SchMux *mux = &sch->mux[mux_idx];
2241
2242 8474 pthread_mutex_lock(&sch->schedule_lock);
2243
2244
2/2
✓ Branch 0 taken 8978 times.
✓ Branch 1 taken 8474 times.
17452 for (unsigned i = 0; i < mux->nb_streams; i++) {
2245 8978 tq_receive_finish(mux->queue, i);
2246 8978 mux->streams[i].source_finished = 1;
2247 }
2248
2249 8474 schedule_update_locked(sch);
2250
2251 8474 pthread_mutex_unlock(&sch->schedule_lock);
2252
2253 8474 pthread_mutex_lock(&sch->finish_lock);
2254
2255
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8474 times.
8474 av_assert0(sch->nb_mux_done < sch->nb_mux);
2256 8474 sch->nb_mux_done++;
2257
2258 8474 pthread_cond_signal(&sch->finish_cond);
2259
2260 8474 pthread_mutex_unlock(&sch->finish_lock);
2261
2262 8474 return 0;
2263 }
2264
2265 371202 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2266 {
2267 SchDec *dec;
2268 int ret, dummy;
2269
2270
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 371202 times.
371202 av_assert0(dec_idx < sch->nb_dec);
2271 371202 dec = &sch->dec[dec_idx];
2272
2273 // the decoder should have given us post-flush end timestamp in pkt
2274
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 371199 times.
371202 if (dec->expect_end_ts) {
2275 3 Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2276 3 ret = av_thread_message_queue_send(dec->queue_end_ts, &ts, 0);
2277
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret < 0)
2278 return ret;
2279
2280 3 dec->expect_end_ts = 0;
2281 }
2282
2283 371202 ret = tq_receive(dec->queue, &dummy, pkt);
2284
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 371202 times.
371202 av_assert0(dummy <= 0);
2285
2286 // got a flush packet, on the next call to this function the decoder
2287 // will give us post-flush end timestamp
2288
7/8
✓ Branch 0 taken 367806 times.
✓ Branch 1 taken 3396 times.
✓ Branch 2 taken 958 times.
✓ Branch 3 taken 366848 times.
✓ Branch 4 taken 958 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 3 times.
✓ Branch 7 taken 955 times.
371202 if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2289 3 dec->expect_end_ts = 1;
2290
2291 371202 return ret;
2292 }
2293
2294 399948 static int send_to_filter(Scheduler *sch, SchFilterGraph *fg,
2295 unsigned in_idx, AVFrame *frame)
2296 {
2297
2/2
✓ Branch 0 taken 392972 times.
✓ Branch 1 taken 6976 times.
399948 if (frame)
2298 392972 return tq_send(fg->queue, in_idx, frame);
2299
2300
2/2
✓ Branch 0 taken 6975 times.
✓ Branch 1 taken 1 times.
6976 if (!fg->inputs[in_idx].send_finished) {
2301 6975 fg->inputs[in_idx].send_finished = 1;
2302 6975 tq_send_finish(fg->queue, in_idx);
2303
2304 // close the control stream when all actual inputs are done
2305
2/2
✓ Branch 0 taken 6872 times.
✓ Branch 1 taken 103 times.
6975 if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
2306 6872 tq_send_finish(fg->queue, fg->nb_inputs);
2307 }
2308 6976 return 0;
2309 }
2310
2311 404693 static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2312 uint8_t *dst_finished, AVFrame *frame)
2313 {
2314 int ret;
2315
2316
2/2
✓ Branch 0 taken 7270 times.
✓ Branch 1 taken 397423 times.
404693 if (*dst_finished)
2317 7270 return AVERROR_EOF;
2318
2319
2/2
✓ Branch 0 taken 3409 times.
✓ Branch 1 taken 394014 times.
397423 if (!frame)
2320 3409 goto finish;
2321
2322 788028 ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2323
2/2
✓ Branch 0 taken 392971 times.
✓ Branch 1 taken 1043 times.
394014 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2324 1043 send_to_enc(sch, &sch->enc[dst.idx], frame);
2325
2/2
✓ Branch 0 taken 3607 times.
✓ Branch 1 taken 390407 times.
394014 if (ret == AVERROR_EOF)
2326 3607 goto finish;
2327
2328 390407 return ret;
2329
2330 7016 finish:
2331
2/2
✓ Branch 0 taken 6974 times.
✓ Branch 1 taken 42 times.
7016 if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2332 6974 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2333 else
2334 42 send_to_enc(sch, &sch->enc[dst.idx], NULL);
2335
2336 7016 *dst_finished = 1;
2337
2338 7016 return AVERROR_EOF;
2339 }
2340
2341 396044 int sch_dec_send(Scheduler *sch, unsigned dec_idx,
2342 unsigned out_idx, AVFrame *frame)
2343 {
2344 SchDec *dec;
2345 SchDecOutput *o;
2346 int ret;
2347 396044 unsigned nb_done = 0;
2348
2349
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 396044 times.
396044 av_assert0(dec_idx < sch->nb_dec);
2350 396044 dec = &sch->dec[dec_idx];
2351
2352
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 396044 times.
396044 av_assert0(out_idx < dec->nb_outputs);
2353 396044 o = &dec->outputs[out_idx];
2354
2355
2/2
✓ Branch 0 taken 397677 times.
✓ Branch 1 taken 396044 times.
793721 for (unsigned i = 0; i < o->nb_dst; i++) {
2356 397677 uint8_t *finished = &o->dst_finished[i];
2357 397677 AVFrame *to_send = frame;
2358
2359 // sending a frame consumes it, so make a temporary reference if needed
2360
2/2
✓ Branch 0 taken 1633 times.
✓ Branch 1 taken 396044 times.
397677 if (i < o->nb_dst - 1) {
2361 1633 to_send = dec->send_frame;
2362
2363 // frame may sometimes contain props only,
2364 // e.g. to signal EOF timestamp
2365
2/2
✓ Branch 0 taken 1523 times.
✓ Branch 1 taken 110 times.
1633 ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2366 110 av_frame_copy_props(to_send, frame);
2367
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1633 times.
1633 if (ret < 0)
2368 return ret;
2369 }
2370
2371 397677 ret = dec_send_to_dst(sch, o->dst[i], finished, to_send);
2372
2/2
✓ Branch 0 taken 7270 times.
✓ Branch 1 taken 390407 times.
397677 if (ret < 0) {
2373 7270 av_frame_unref(to_send);
2374
1/2
✓ Branch 0 taken 7270 times.
✗ Branch 1 not taken.
7270 if (ret == AVERROR_EOF) {
2375 7270 nb_done++;
2376 7270 continue;
2377 }
2378 return ret;
2379 }
2380 }
2381
2382
2/2
✓ Branch 0 taken 7110 times.
✓ Branch 1 taken 388934 times.
396044 return (nb_done == o->nb_dst) ? AVERROR_EOF : 0;
2383 }
2384
2385 6946 static int dec_done(Scheduler *sch, unsigned dec_idx)
2386 {
2387 6946 SchDec *dec = &sch->dec[dec_idx];
2388 6946 int ret = 0;
2389
2390 6946 tq_receive_finish(dec->queue, 0);
2391
2392 // make sure our source does not get stuck waiting for end timestamps
2393 // that will never arrive
2394
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6945 times.
6946 if (dec->queue_end_ts)
2395 1 av_thread_message_queue_set_err_recv(dec->queue_end_ts, AVERROR_EOF);
2396
2397
2/2
✓ Branch 0 taken 6952 times.
✓ Branch 1 taken 6946 times.
13898 for (unsigned i = 0; i < dec->nb_outputs; i++) {
2398 6952 SchDecOutput *o = &dec->outputs[i];
2399
2400
2/2
✓ Branch 0 taken 7016 times.
✓ Branch 1 taken 6952 times.
13968 for (unsigned j = 0; j < o->nb_dst; j++) {
2401 7016 int err = dec_send_to_dst(sch, o->dst[j], &o->dst_finished[j], NULL);
2402
2/4
✓ Branch 0 taken 7016 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7016 times.
7016 if (err < 0 && err != AVERROR_EOF)
2403 ret = err_merge(ret, err);
2404 }
2405 }
2406
2407 6946 return ret;
2408 }
2409
2410 463953 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2411 {
2412 SchEnc *enc;
2413 int ret, dummy;
2414
2415
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 463953 times.
463953 av_assert0(enc_idx < sch->nb_enc);
2416 463953 enc = &sch->enc[enc_idx];
2417
2418 463953 ret = tq_receive(enc->queue, &dummy, frame);
2419
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 463953 times.
463953 av_assert0(dummy <= 0);
2420
2421 463953 return ret;
2422 }
2423
2424 457887 static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2425 uint8_t *dst_finished, AVPacket *pkt)
2426 {
2427 int ret;
2428
2429
2/2
✓ Branch 0 taken 28 times.
✓ Branch 1 taken 457859 times.
457887 if (*dst_finished)
2430 28 return AVERROR_EOF;
2431
2432
2/2
✓ Branch 0 taken 8231 times.
✓ Branch 1 taken 449628 times.
457859 if (!pkt)
2433 8231 goto finish;
2434
2435 899256 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2436
2/2
✓ Branch 0 taken 449578 times.
✓ Branch 1 taken 50 times.
449628 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2437 50 tq_send(sch->dec[dst.idx].queue, 0, pkt);
2438
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 449626 times.
449628 if (ret == AVERROR_EOF)
2439 2 goto finish;
2440
2441 449626 return ret;
2442
2443 8233 finish:
2444
2/2
✓ Branch 0 taken 8232 times.
✓ Branch 1 taken 1 times.
8233 if (dst.type == SCH_NODE_TYPE_MUX)
2445 8232 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2446 else
2447 1 tq_send_finish(sch->dec[dst.idx].queue, 0);
2448
2449 8233 *dst_finished = 1;
2450
2451 8233 return AVERROR_EOF;
2452 }
2453
2454 449604 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2455 {
2456 SchEnc *enc;
2457 int ret;
2458
2459
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 449604 times.
449604 av_assert0(enc_idx < sch->nb_enc);
2460 449604 enc = &sch->enc[enc_idx];
2461
2462
2/2
✓ Branch 0 taken 449654 times.
✓ Branch 1 taken 449604 times.
899258 for (unsigned i = 0; i < enc->nb_dst; i++) {
2463 449654 uint8_t *finished = &enc->dst_finished[i];
2464 449654 AVPacket *to_send = pkt;
2465
2466 // sending a packet consumes it, so make a temporary reference if needed
2467
2/2
✓ Branch 0 taken 50 times.
✓ Branch 1 taken 449604 times.
449654 if (i < enc->nb_dst - 1) {
2468 50 to_send = enc->send_pkt;
2469
2470 50 ret = av_packet_ref(to_send, pkt);
2471
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50 times.
50 if (ret < 0)
2472 return ret;
2473 }
2474
2475 449654 ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2476
2/2
✓ Branch 0 taken 28 times.
✓ Branch 1 taken 449626 times.
449654 if (ret < 0) {
2477 28 av_packet_unref(to_send);
2478
1/2
✓ Branch 0 taken 28 times.
✗ Branch 1 not taken.
28 if (ret == AVERROR_EOF)
2479 28 continue;
2480 return ret;
2481 }
2482 }
2483
2484 449604 return 0;
2485 }
2486
2487 8232 static int enc_done(Scheduler *sch, unsigned enc_idx)
2488 {
2489 8232 SchEnc *enc = &sch->enc[enc_idx];
2490 8232 int ret = 0;
2491
2492 8232 tq_receive_finish(enc->queue, 0);
2493
2494
2/2
✓ Branch 0 taken 8233 times.
✓ Branch 1 taken 8232 times.
16465 for (unsigned i = 0; i < enc->nb_dst; i++) {
2495 8233 int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2496
2/4
✓ Branch 0 taken 8233 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 8233 times.
8233 if (err < 0 && err != AVERROR_EOF)
2497 ret = err_merge(ret, err);
2498 }
2499
2500 8232 return ret;
2501 }
2502
2503 413211 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
2504 unsigned *in_idx, AVFrame *frame)
2505 {
2506 SchFilterGraph *fg;
2507
2508
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 413211 times.
413211 av_assert0(fg_idx < sch->nb_filters);
2509 413211 fg = &sch->filters[fg_idx];
2510
2511
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 413211 times.
413211 av_assert0(*in_idx <= fg->nb_inputs);
2512
2513 // update scheduling to account for desired input stream, if it changed
2514 //
2515 // this check needs no locking because only the filtering thread
2516 // updates this value
2517
2/2
✓ Branch 0 taken 8268 times.
✓ Branch 1 taken 404943 times.
413211 if (*in_idx != fg->best_input) {
2518 8268 pthread_mutex_lock(&sch->schedule_lock);
2519
2520 8268 fg->best_input = *in_idx;
2521 8268 schedule_update_locked(sch);
2522
2523 8268 pthread_mutex_unlock(&sch->schedule_lock);
2524 }
2525
2526
2/2
✓ Branch 0 taken 382515 times.
✓ Branch 1 taken 30696 times.
413211 if (*in_idx == fg->nb_inputs) {
2527 30696 int terminate = waiter_wait(sch, &fg->waiter);
2528
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 30696 times.
30696 return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
2529 }
2530
2531 27 while (1) {
2532 int ret, idx;
2533
2534 382542 ret = tq_receive(fg->queue, &idx, frame);
2535
2/2
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 382534 times.
382542 if (idx < 0)
2536 382515 return AVERROR_EOF;
2537
2/2
✓ Branch 0 taken 382507 times.
✓ Branch 1 taken 27 times.
382534 else if (ret >= 0) {
2538 382507 *in_idx = idx;
2539 382507 return 0;
2540 }
2541
2542 // disregard EOFs for specific streams - they should always be
2543 // preceded by an EOF frame
2544 }
2545 }
2546
2547 94 void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
2548 {
2549 SchFilterGraph *fg;
2550 SchFilterIn *fi;
2551
2552
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 94 times.
94 av_assert0(fg_idx < sch->nb_filters);
2553 94 fg = &sch->filters[fg_idx];
2554
2555
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 94 times.
94 av_assert0(in_idx < fg->nb_inputs);
2556 94 fi = &fg->inputs[in_idx];
2557
2558 94 pthread_mutex_lock(&sch->schedule_lock);
2559
2560
1/2
✓ Branch 0 taken 94 times.
✗ Branch 1 not taken.
94 if (!fi->receive_finished) {
2561 94 fi->receive_finished = 1;
2562 94 tq_receive_finish(fg->queue, in_idx);
2563
2564 // close the control stream when all actual inputs are done
2565
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 93 times.
94 if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
2566 1 tq_receive_finish(fg->queue, fg->nb_inputs);
2567
2568 94 schedule_update_locked(sch);
2569 }
2570
2571 94 pthread_mutex_unlock(&sch->schedule_lock);
2572 94 }
2573
2574 409275 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
2575 {
2576 SchFilterGraph *fg;
2577 SchedulerNode dst;
2578 int ret;
2579
2580
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 409275 times.
409275 av_assert0(fg_idx < sch->nb_filters);
2581 409275 fg = &sch->filters[fg_idx];
2582
2583
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 409275 times.
409275 av_assert0(out_idx < fg->nb_outputs);
2584 409275 dst = fg->outputs[out_idx].dst;
2585
2586
2/2
✓ Branch 0 taken 409273 times.
✓ Branch 1 taken 2 times.
409275 if (dst.type == SCH_NODE_TYPE_ENC) {
2587 409273 ret = send_to_enc(sch, &sch->enc[dst.idx], frame);
2588
2/2
✓ Branch 0 taken 3192 times.
✓ Branch 1 taken 406081 times.
409273 if (ret == AVERROR_EOF)
2589 3192 send_to_enc(sch, &sch->enc[dst.idx], NULL);
2590 } else {
2591 2 ret = send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
2592
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (ret == AVERROR_EOF)
2593 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2594 }
2595 409275 return ret;
2596 }
2597
2598 8056 static int filter_done(Scheduler *sch, unsigned fg_idx)
2599 {
2600 8056 SchFilterGraph *fg = &sch->filters[fg_idx];
2601 8056 int ret = 0;
2602
2603
2/2
✓ Branch 0 taken 15031 times.
✓ Branch 1 taken 8056 times.
23087 for (unsigned i = 0; i <= fg->nb_inputs; i++)
2604 15031 tq_receive_finish(fg->queue, i);
2605
2606
2/2
✓ Branch 0 taken 8191 times.
✓ Branch 1 taken 8056 times.
16247 for (unsigned i = 0; i < fg->nb_outputs; i++) {
2607 8191 SchedulerNode dst = fg->outputs[i].dst;
2608 16382 int err = (dst.type == SCH_NODE_TYPE_ENC) ?
2609
2/2
✓ Branch 0 taken 8190 times.
✓ Branch 1 taken 1 times.
8191 send_to_enc (sch, &sch->enc[dst.idx], NULL) :
2610 1 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2611
2612
3/4
✓ Branch 0 taken 3236 times.
✓ Branch 1 taken 4955 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 3236 times.
8191 if (err < 0 && err != AVERROR_EOF)
2613 ret = err_merge(ret, err);
2614 }
2615
2616 8056 pthread_mutex_lock(&sch->schedule_lock);
2617
2618 8056 fg->task_exited = 1;
2619
2620 8056 schedule_update_locked(sch);
2621
2622 8056 pthread_mutex_unlock(&sch->schedule_lock);
2623
2624 8056 return ret;
2625 }
2626
2627 int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
2628 {
2629 SchFilterGraph *fg;
2630
2631 av_assert0(fg_idx < sch->nb_filters);
2632 fg = &sch->filters[fg_idx];
2633
2634 return send_to_filter(sch, fg, fg->nb_inputs, frame);
2635 }
2636
2637 6867 void sch_filter_choke_inputs(Scheduler *sch, unsigned fg_idx)
2638 {
2639 SchFilterGraph *fg;
2640
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6867 times.
6867 av_assert0(fg_idx < sch->nb_filters);
2641 6867 fg = &sch->filters[fg_idx];
2642
2643 6867 pthread_mutex_lock(&sch->schedule_lock);
2644 6867 fg->best_input = fg->nb_inputs;
2645 6867 schedule_update_locked(sch);
2646 6867 pthread_mutex_unlock(&sch->schedule_lock);
2647 6867 }
2648
2649 39092 static int task_cleanup(Scheduler *sch, SchedulerNode node)
2650 {
2651
5/6
✓ Branch 0 taken 7384 times.
✓ Branch 1 taken 8474 times.
✓ Branch 2 taken 6946 times.
✓ Branch 3 taken 8232 times.
✓ Branch 4 taken 8056 times.
✗ Branch 5 not taken.
39092 switch (node.type) {
2652 7384 case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
2653 8474 case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
2654 6946 case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
2655 8232 case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
2656 8056 case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
2657 default: av_unreachable("Invalid node type?");
2658 }
2659 }
2660
2661 39078 static void *task_wrapper(void *arg)
2662 {
2663 39078 SchTask *task = arg;
2664 39078 Scheduler *sch = task->parent;
2665 int ret;
2666 39078 int err = 0;
2667
2668 39078 ret = task->func(task->func_arg);
2669
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 39077 times.
39078 if (ret < 0)
2670 1 av_log(task->func_arg, AV_LOG_ERROR,
2671 1 "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
2672
2673 39078 err = task_cleanup(sch, task->node);
2674 39078 ret = err_merge(ret, err);
2675
2676 // EOF is considered normal termination
2677
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 39078 times.
39078 if (ret == AVERROR_EOF)
2678 ret = 0;
2679
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 39077 times.
39078 if (ret < 0) {
2680 1 pthread_mutex_lock(&sch->finish_lock);
2681 1 sch->task_failed = 1;
2682 1 pthread_cond_signal(&sch->finish_cond);
2683 1 pthread_mutex_unlock(&sch->finish_lock);
2684 }
2685
2686
4/4
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 39077 times.
✓ Branch 2 taken 1 times.
✓ Branch 3 taken 39077 times.
39079 av_log(task->func_arg, ret < 0 ? AV_LOG_ERROR : AV_LOG_VERBOSE,
2687 "Terminating thread with return code %d (%s)\n", ret,
2688 1 ret < 0 ? av_err2str(ret) : "success");
2689
2690 39078 return (void*)(intptr_t)ret;
2691 }
2692
2693 39095 static int task_stop(Scheduler *sch, SchTask *task)
2694 {
2695 int ret;
2696 void *thread_ret;
2697
2698
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 39092 times.
39095 if (!task->parent)
2699 3 return 0;
2700
2701
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 39078 times.
39092 if (!task->thread_running)
2702 14 return task_cleanup(sch, task->node);
2703
2704 39078 ret = pthread_join(task->thread, &thread_ret);
2705
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 39078 times.
39078 av_assert0(ret == 0);
2706
2707 39078 task->thread_running = 0;
2708
2709 39078 return (intptr_t)thread_ret;
2710 }
2711
2712 16942 int sch_stop(Scheduler *sch, int64_t *finish_ts)
2713 {
2714 16942 int ret = 0, err;
2715
2716
2/2
✓ Branch 0 taken 8472 times.
✓ Branch 1 taken 8470 times.
16942 if (sch->state != SCH_STATE_STARTED)
2717 8472 return 0;
2718
2719 8470 atomic_store(&sch->terminate, 1);
2720
2721
2/2
✓ Branch 0 taken 16940 times.
✓ Branch 1 taken 8470 times.
25410 for (unsigned type = 0; type < 2; type++)
2722
4/4
✓ Branch 0 taken 15854 times.
✓ Branch 1 taken 16529 times.
✓ Branch 2 taken 15443 times.
✓ Branch 3 taken 16940 times.
32383 for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
2723
2/2
✓ Branch 0 taken 7384 times.
✓ Branch 1 taken 8059 times.
15443 SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
2724 15443 waiter_set(w, 1);
2725
2/2
✓ Branch 0 taken 7384 times.
✓ Branch 1 taken 8059 times.
15443 if (type)
2726 7384 choke_demux(sch, i, 0); // unfreeze to allow draining
2727 }
2728
2729
2/2
✓ Branch 0 taken 7384 times.
✓ Branch 1 taken 8470 times.
15854 for (unsigned i = 0; i < sch->nb_demux; i++) {
2730 7384 SchDemux *d = &sch->demux[i];
2731
2732 7384 err = task_stop(sch, &d->task);
2733 7384 ret = err_merge(ret, err);
2734 }
2735
2736
2/2
✓ Branch 0 taken 6946 times.
✓ Branch 1 taken 8470 times.
15416 for (unsigned i = 0; i < sch->nb_dec; i++) {
2737 6946 SchDec *dec = &sch->dec[i];
2738
2739 6946 err = task_stop(sch, &dec->task);
2740 6946 ret = err_merge(ret, err);
2741 }
2742
2743
2/2
✓ Branch 0 taken 8059 times.
✓ Branch 1 taken 8470 times.
16529 for (unsigned i = 0; i < sch->nb_filters; i++) {
2744 8059 SchFilterGraph *fg = &sch->filters[i];
2745
2746 8059 err = task_stop(sch, &fg->task);
2747 8059 ret = err_merge(ret, err);
2748 }
2749
2750
2/2
✓ Branch 0 taken 8232 times.
✓ Branch 1 taken 8470 times.
16702 for (unsigned i = 0; i < sch->nb_enc; i++) {
2751 8232 SchEnc *enc = &sch->enc[i];
2752
2753 8232 err = task_stop(sch, &enc->task);
2754 8232 ret = err_merge(ret, err);
2755 }
2756
2757
2/2
✓ Branch 0 taken 8474 times.
✓ Branch 1 taken 8470 times.
16944 for (unsigned i = 0; i < sch->nb_mux; i++) {
2758 8474 SchMux *mux = &sch->mux[i];
2759
2760 8474 err = task_stop(sch, &mux->task);
2761 8474 ret = err_merge(ret, err);
2762 }
2763
2764
1/2
✓ Branch 0 taken 8470 times.
✗ Branch 1 not taken.
8470 if (finish_ts)
2765 8470 *finish_ts = trailing_dts(sch, 1);
2766
2767 8470 sch->state = SCH_STATE_STOPPED;
2768
2769 8470 return ret;
2770 }
2771