FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/fftools/ffmpeg_sched.c
Date: 2026-04-23 02:20:26
Exec Total Coverage
Lines: 1168 1327 88.0%
Functions: 71 73 97.3%
Branches: 636 858 74.1%

Line Branch Exec Source
1 /*
2 * Inter-thread scheduling/synchronization.
3 * Copyright (c) 2023 Anton Khirnov
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31
32 #include "libavcodec/packet.h"
33
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
40 #include "libavutil/threadmessage.h"
41 #include "libavutil/time.h"
42
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
46
47 enum QueueType {
48 QUEUE_PACKETS,
49 QUEUE_FRAMES,
50 };
51
52 typedef struct SchWaiter {
53 pthread_mutex_t lock;
54 pthread_cond_t cond;
55 atomic_int choked;
56
57 // the following are internal state of schedule_update_locked() and must not
58 // be accessed outside of it
59 int choked_prev;
60 int choked_next;
61 } SchWaiter;
62
63 typedef struct SchTask {
64 Scheduler *parent;
65 SchedulerNode node;
66
67 SchThreadFunc func;
68 void *func_arg;
69
70 pthread_t thread;
71 int thread_running;
72 } SchTask;
73
74 typedef struct SchDecOutput {
75 SchedulerNode *dst;
76 uint8_t *dst_finished;
77 unsigned nb_dst;
78 } SchDecOutput;
79
80 typedef struct SchDec {
81 const AVClass *class;
82
83 SchedulerNode src;
84
85 SchDecOutput *outputs;
86 unsigned nb_outputs;
87
88 SchTask task;
89 // Queue for receiving input packets, one stream.
90 ThreadQueue *queue;
91
92 // Queue for sending post-flush end timestamps back to the source
93 AVThreadMessageQueue *queue_end_ts;
94 int expect_end_ts;
95
96 // temporary storage used by sch_dec_send()
97 AVFrame *send_frame;
98 } SchDec;
99
100 typedef struct SchSyncQueue {
101 SyncQueue *sq;
102 AVFrame *frame;
103 pthread_mutex_t lock;
104
105 unsigned *enc_idx;
106 unsigned nb_enc_idx;
107 } SchSyncQueue;
108
109 typedef struct SchEnc {
110 const AVClass *class;
111
112 SchedulerNode src;
113 SchedulerNode *dst;
114 uint8_t *dst_finished;
115 unsigned nb_dst;
116
117 // [0] - index of the sync queue in Scheduler.sq_enc,
118 // [1] - index of this encoder in the sq
119 int sq_idx[2];
120
121 /* Opening encoders is somewhat nontrivial due to their interaction with
122 * sync queues, which are (among other things) responsible for maintaining
123 * constant audio frame size, when it is required by the encoder.
124 *
125 * Opening the encoder requires stream parameters, obtained from the first
126 * frame. However, that frame cannot be properly chunked by the sync queue
127 * without knowing the required frame size, which is only available after
128 * opening the encoder.
129 *
130 * This apparent circular dependency is resolved in the following way:
131 * - the caller creating the encoder gives us a callback which opens the
132 * encoder and returns the required frame size (if any)
133 * - when the first frame is sent to the encoder, the sending thread
134 * - calls this callback, opening the encoder
135 * - passes the returned frame size to the sync queue
136 */
137 int (*open_cb)(void *opaque, const AVFrame *frame);
138 int opened;
139
140 SchTask task;
141 // Queue for receiving input frames, one stream.
142 ThreadQueue *queue;
143 // tq_send() to queue returned EOF
144 int in_finished;
145
146 // temporary storage used by sch_enc_send()
147 AVPacket *send_pkt;
148 } SchEnc;
149
150 typedef struct SchDemuxStream {
151 SchedulerNode *dst;
152 uint8_t *dst_finished;
153 unsigned nb_dst;
154 } SchDemuxStream;
155
156 typedef struct SchDemux {
157 const AVClass *class;
158
159 SchDemuxStream *streams;
160 unsigned nb_streams;
161
162 SchTask task;
163 SchWaiter waiter;
164
165 // temporary storage used by sch_demux_send()
166 AVPacket *send_pkt;
167
168 // protected by schedule_lock
169 int task_exited;
170 } SchDemux;
171
172 typedef struct PreMuxQueue {
173 /**
174 * Queue for buffering the packets before the muxer task can be started.
175 */
176 AVFifo *fifo;
177 /**
178 * Maximum number of packets in fifo.
179 */
180 int max_packets;
181 /*
182 * The size of the AVPackets' buffers in queue.
183 * Updated when a packet is either pushed or pulled from the queue.
184 */
185 size_t data_size;
186 /* Threshold after which max_packets will be in effect */
187 size_t data_threshold;
188 } PreMuxQueue;
189
190 typedef struct SchMuxStream {
191 SchedulerNode src;
192
193 unsigned *sub_heartbeat_dst;
194 unsigned nb_sub_heartbeat_dst;
195
196 PreMuxQueue pre_mux_queue;
197
198 // an EOF was generated while flushing the pre-mux queue
199 int init_eof;
200
201 ////////////////////////////////////////////////////////////
202 // The following are protected by Scheduler.schedule_lock //
203
204 /* dts+duration of the last packet sent to this stream
205 in AV_TIME_BASE_Q */
206 int64_t last_dts;
207 // this stream no longer accepts input
208 int source_finished;
209 ////////////////////////////////////////////////////////////
210 } SchMuxStream;
211
212 typedef struct SchMux {
213 const AVClass *class;
214
215 SchMuxStream *streams;
216 unsigned nb_streams;
217 unsigned nb_streams_ready;
218
219 int (*init)(void *arg);
220
221 SchTask task;
222 /**
223 * Set to 1 after starting the muxer task and flushing the
224 * pre-muxing queues.
225 * Set either before any tasks have started, or with
226 * Scheduler.mux_ready_lock held.
227 */
228 atomic_int mux_started;
229 ThreadQueue *queue;
230 unsigned queue_size;
231
232 AVPacket *sub_heartbeat_pkt;
233 } SchMux;
234
235 typedef struct SchFilterIn {
236 SchedulerNode src;
237 int send_finished;
238 int receive_finished;
239 } SchFilterIn;
240
241 typedef struct SchFilterOut {
242 SchedulerNode dst;
243 } SchFilterOut;
244
245 typedef struct SchFilterGraph {
246 const AVClass *class;
247
248 SchFilterIn *inputs;
249 unsigned nb_inputs;
250 unsigned nb_inputs_finished_send;
251 unsigned nb_inputs_finished_receive;
252
253 SchFilterOut *outputs;
254 unsigned nb_outputs;
255
256 SchTask task;
257 // input queue, nb_inputs+1 streams
258 // last stream is control
259 ThreadQueue *queue;
260 SchWaiter waiter;
261
262 // protected by schedule_lock
263 unsigned best_input;
264 int task_exited;
265 } SchFilterGraph;
266
267 enum SchedulerState {
268 SCH_STATE_UNINIT,
269 SCH_STATE_STARTED,
270 SCH_STATE_STOPPED,
271 };
272
273 struct Scheduler {
274 const AVClass *class;
275
276 SchDemux *demux;
277 unsigned nb_demux;
278
279 SchMux *mux;
280 unsigned nb_mux;
281
282 unsigned nb_mux_ready;
283 pthread_mutex_t mux_ready_lock;
284
285 unsigned nb_mux_done;
286 unsigned task_failed;
287 pthread_mutex_t finish_lock;
288 pthread_cond_t finish_cond;
289
290
291 SchDec *dec;
292 unsigned nb_dec;
293
294 SchEnc *enc;
295 unsigned nb_enc;
296
297 SchSyncQueue *sq_enc;
298 unsigned nb_sq_enc;
299
300 SchFilterGraph *filters;
301 unsigned nb_filters;
302
303 char *sdp_filename;
304 int sdp_auto;
305
306 enum SchedulerState state;
307 atomic_int terminate;
308
309 pthread_mutex_t schedule_lock;
310
311 atomic_int_least64_t last_dts;
312 };
313
314 /**
315 * Wait until this task is allowed to proceed.
316 *
317 * @retval 0 the caller should proceed
318 * @retval 1 the caller should terminate
319 */
320 536877 static int waiter_wait(Scheduler *sch, SchWaiter *w)
321 {
322 int terminate;
323
324
2/2
✓ Branch 0 taken 534030 times.
✓ Branch 1 taken 2847 times.
536877 if (!atomic_load(&w->choked))
325 534030 return 0;
326
327 2847 pthread_mutex_lock(&w->lock);
328
329
4/4
✓ Branch 0 taken 2884 times.
✓ Branch 1 taken 2447 times.
✓ Branch 2 taken 2484 times.
✓ Branch 3 taken 400 times.
5331 while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
330 2484 pthread_cond_wait(&w->cond, &w->lock);
331
332 2847 terminate = atomic_load(&sch->terminate);
333
334 2847 pthread_mutex_unlock(&w->lock);
335
336 2847 return terminate;
337 }
338
339 59276 static void waiter_set(SchWaiter *w, int choked)
340 {
341 59276 pthread_mutex_lock(&w->lock);
342
343 59276 atomic_store(&w->choked, choked);
344 59276 pthread_cond_signal(&w->cond);
345
346 59276 pthread_mutex_unlock(&w->lock);
347 59276 }
348
349 15565 static int waiter_init(SchWaiter *w)
350 {
351 int ret;
352
353 15565 atomic_init(&w->choked, 0);
354
355 15565 ret = pthread_mutex_init(&w->lock, NULL);
356
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15565 times.
15565 if (ret)
357 return AVERROR(ret);
358
359 15565 ret = pthread_cond_init(&w->cond, NULL);
360
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15565 times.
15565 if (ret)
361 return AVERROR(ret);
362
363 15565 return 0;
364 }
365
366 15565 static void waiter_uninit(SchWaiter *w)
367 {
368 15565 pthread_mutex_destroy(&w->lock);
369 15565 pthread_cond_destroy(&w->cond);
370 15565 }
371
372 31947 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
373 enum QueueType type)
374 {
375 ThreadQueue *tq;
376
377
1/2
✓ Branch 0 taken 31947 times.
✗ Branch 1 not taken.
31947 if (queue_size <= 0) {
378
2/2
✓ Branch 0 taken 16407 times.
✓ Branch 1 taken 15540 times.
31947 if (type == QUEUE_FRAMES)
379 16407 queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
380 else
381 15540 queue_size = DEFAULT_PACKET_THREAD_QUEUE_SIZE;
382 }
383
384
2/2
✓ Branch 0 taken 16407 times.
✓ Branch 1 taken 15540 times.
31947 if (type == QUEUE_FRAMES) {
385 // This queue length is used in the decoder code to ensure that
386 // there are enough entries in fixed-size frame pools to account
387 // for frames held in queues inside the ffmpeg utility. If this
388 // can ever dynamically change then the corresponding decode
389 // code needs to be updated as well.
390
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 16407 times.
16407 av_assert0(queue_size <= DEFAULT_FRAME_THREAD_QUEUE_SIZE);
391 }
392
393 31947 tq = tq_alloc(nb_streams, queue_size,
394 (type == QUEUE_PACKETS) ? THREAD_QUEUE_PACKETS : THREAD_QUEUE_FRAMES);
395
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31947 times.
31947 if (!tq)
396 return AVERROR(ENOMEM);
397
398 31947 *ptq = tq;
399 31947 return 0;
400 }
401
402 static void *task_wrapper(void *arg);
403
404 39380 static int task_start(SchTask *task)
405 {
406 int ret;
407
408
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 39377 times.
39380 if (!task->parent)
409 3 return 0;
410
411 39377 av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
412
413
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 39377 times.
39377 av_assert0(!task->thread_running);
414
415 39377 ret = pthread_create(&task->thread, NULL, task_wrapper, task);
416
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 39377 times.
39377 if (ret) {
417 av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
418 strerror(ret));
419 return AVERROR(ret);
420 }
421
422 39377 task->thread_running = 1;
423 39377 return 0;
424 }
425
426 39395 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
427 SchThreadFunc func, void *func_arg)
428 {
429 39395 task->parent = sch;
430
431 39395 task->node.type = type;
432 39395 task->node.idx = idx;
433
434 39395 task->func = func;
435 39395 task->func_arg = func_arg;
436 39395 }
437
438 637247 static int64_t trailing_dts(const Scheduler *sch)
439 {
440 637247 int64_t min_dts = INT64_MAX;
441
442
2/2
✓ Branch 0 taken 638315 times.
✓ Branch 1 taken 610252 times.
1248567 for (unsigned i = 0; i < sch->nb_mux; i++) {
443 638315 const SchMux *mux = &sch->mux[i];
444
445
2/2
✓ Branch 0 taken 703108 times.
✓ Branch 1 taken 611320 times.
1314428 for (unsigned j = 0; j < mux->nb_streams; j++) {
446 703108 const SchMuxStream *ms = &mux->streams[j];
447
448
2/2
✓ Branch 0 taken 38085 times.
✓ Branch 1 taken 665023 times.
703108 if (ms->source_finished)
449 38085 continue;
450
2/2
✓ Branch 0 taken 26995 times.
✓ Branch 1 taken 638028 times.
665023 if (ms->last_dts == AV_NOPTS_VALUE)
451 26995 return AV_NOPTS_VALUE;
452
453 638028 min_dts = FFMIN(min_dts, ms->last_dts);
454 }
455 }
456
457
2/2
✓ Branch 0 taken 581264 times.
✓ Branch 1 taken 28988 times.
610252 return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
458 }
459
460 645783 static int64_t progressing_dts(const Scheduler *sch, int count_finished)
461 {
462 645783 int64_t max_dts = INT64_MIN;
463
464
2/2
✓ Branch 0 taken 647139 times.
✓ Branch 1 taken 645783 times.
1292922 for (unsigned i = 0; i < sch->nb_mux; i++) {
465 647139 const SchMux *mux = &sch->mux[i];
466
467
2/2
✓ Branch 0 taken 720897 times.
✓ Branch 1 taken 647139 times.
1368036 for (unsigned j = 0; j < mux->nb_streams; j++) {
468 720897 const SchMuxStream *ms = &mux->streams[j];
469
470
4/4
✓ Branch 0 taken 47281 times.
✓ Branch 1 taken 673616 times.
✓ Branch 2 taken 38236 times.
✓ Branch 3 taken 9045 times.
720897 if (ms->source_finished && !count_finished)
471 38236 continue;
472
2/2
✓ Branch 0 taken 649490 times.
✓ Branch 1 taken 33171 times.
682661 if (ms->last_dts != AV_NOPTS_VALUE)
473 649490 max_dts = FFMAX(max_dts, ms->last_dts);
474 }
475 }
476
477 645783 return max_dts == INT64_MIN ? AV_NOPTS_VALUE : max_dts;
478 }
479
480 3 void sch_remove_filtergraph(Scheduler *sch, int idx)
481 {
482 3 SchFilterGraph *fg = &sch->filters[idx];
483
484
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 av_assert0(!fg->task.thread_running);
485 3 memset(&fg->task, 0, sizeof(fg->task));
486
487 3 tq_free(&fg->queue);
488
489 3 av_freep(&fg->inputs);
490 3 fg->nb_inputs = 0;
491 3 av_freep(&fg->outputs);
492 3 fg->nb_outputs = 0;
493
494 3 fg->task_exited = 1;
495 3 }
496
497 8539 void sch_free(Scheduler **psch)
498 {
499 8539 Scheduler *sch = *psch;
500
501
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8539 times.
8539 if (!sch)
502 return;
503
504 8539 sch_stop(sch, NULL);
505
506
2/2
✓ Branch 0 taken 7448 times.
✓ Branch 1 taken 8539 times.
15987 for (unsigned i = 0; i < sch->nb_demux; i++) {
507 7448 SchDemux *d = &sch->demux[i];
508
509
2/2
✓ Branch 0 taken 7728 times.
✓ Branch 1 taken 7448 times.
15176 for (unsigned j = 0; j < d->nb_streams; j++) {
510 7728 SchDemuxStream *ds = &d->streams[j];
511 7728 av_freep(&ds->dst);
512 7728 av_freep(&ds->dst_finished);
513 }
514 7448 av_freep(&d->streams);
515
516 7448 av_packet_free(&d->send_pkt);
517
518 7448 waiter_uninit(&d->waiter);
519 }
520 8539 av_freep(&sch->demux);
521
522
2/2
✓ Branch 0 taken 8540 times.
✓ Branch 1 taken 8539 times.
17079 for (unsigned i = 0; i < sch->nb_mux; i++) {
523 8540 SchMux *mux = &sch->mux[i];
524
525
2/2
✓ Branch 0 taken 9045 times.
✓ Branch 1 taken 8540 times.
17585 for (unsigned j = 0; j < mux->nb_streams; j++) {
526 9045 SchMuxStream *ms = &mux->streams[j];
527
528
1/2
✓ Branch 0 taken 9045 times.
✗ Branch 1 not taken.
9045 if (ms->pre_mux_queue.fifo) {
529 AVPacket *pkt;
530
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9045 times.
9045 while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
531 av_packet_free(&pkt);
532 9045 av_fifo_freep2(&ms->pre_mux_queue.fifo);
533 }
534
535 9045 av_freep(&ms->sub_heartbeat_dst);
536 }
537 8540 av_freep(&mux->streams);
538
539 8540 av_packet_free(&mux->sub_heartbeat_pkt);
540
541 8540 tq_free(&mux->queue);
542 }
543 8539 av_freep(&sch->mux);
544
545
2/2
✓ Branch 0 taken 7000 times.
✓ Branch 1 taken 8539 times.
15539 for (unsigned i = 0; i < sch->nb_dec; i++) {
546 7000 SchDec *dec = &sch->dec[i];
547
548 7000 tq_free(&dec->queue);
549
550 7000 av_thread_message_queue_free(&dec->queue_end_ts);
551
552
2/2
✓ Branch 0 taken 7006 times.
✓ Branch 1 taken 7000 times.
14006 for (unsigned j = 0; j < dec->nb_outputs; j++) {
553 7006 SchDecOutput *o = &dec->outputs[j];
554
555 7006 av_freep(&o->dst);
556 7006 av_freep(&o->dst_finished);
557 }
558
559 7000 av_freep(&dec->outputs);
560
561 7000 av_frame_free(&dec->send_frame);
562 }
563 8539 av_freep(&sch->dec);
564
565
2/2
✓ Branch 0 taken 8290 times.
✓ Branch 1 taken 8539 times.
16829 for (unsigned i = 0; i < sch->nb_enc; i++) {
566 8290 SchEnc *enc = &sch->enc[i];
567
568 8290 tq_free(&enc->queue);
569
570 8290 av_packet_free(&enc->send_pkt);
571
572 8290 av_freep(&enc->dst);
573 8290 av_freep(&enc->dst_finished);
574 }
575 8539 av_freep(&sch->enc);
576
577
2/2
✓ Branch 0 taken 3206 times.
✓ Branch 1 taken 8539 times.
11745 for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
578 3206 SchSyncQueue *sq = &sch->sq_enc[i];
579 3206 sq_free(&sq->sq);
580 3206 av_frame_free(&sq->frame);
581 3206 pthread_mutex_destroy(&sq->lock);
582 3206 av_freep(&sq->enc_idx);
583 }
584 8539 av_freep(&sch->sq_enc);
585
586
2/2
✓ Branch 0 taken 8117 times.
✓ Branch 1 taken 8539 times.
16656 for (unsigned i = 0; i < sch->nb_filters; i++) {
587 8117 SchFilterGraph *fg = &sch->filters[i];
588
589 8117 tq_free(&fg->queue);
590
591 8117 av_freep(&fg->inputs);
592 8117 av_freep(&fg->outputs);
593
594 8117 waiter_uninit(&fg->waiter);
595 }
596 8539 av_freep(&sch->filters);
597
598 8539 av_freep(&sch->sdp_filename);
599
600 8539 pthread_mutex_destroy(&sch->schedule_lock);
601
602 8539 pthread_mutex_destroy(&sch->mux_ready_lock);
603
604 8539 pthread_mutex_destroy(&sch->finish_lock);
605 8539 pthread_cond_destroy(&sch->finish_cond);
606
607 8539 av_freep(psch);
608 }
609
610 static const AVClass scheduler_class = {
611 .class_name = "Scheduler",
612 .version = LIBAVUTIL_VERSION_INT,
613 };
614
615 8539 Scheduler *sch_alloc(void)
616 {
617 Scheduler *sch;
618 int ret;
619
620 8539 sch = av_mallocz(sizeof(*sch));
621
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8539 times.
8539 if (!sch)
622 return NULL;
623
624 8539 sch->class = &scheduler_class;
625 8539 sch->sdp_auto = 1;
626
627 8539 ret = pthread_mutex_init(&sch->schedule_lock, NULL);
628
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8539 times.
8539 if (ret)
629 goto fail;
630
631 8539 ret = pthread_mutex_init(&sch->mux_ready_lock, NULL);
632
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8539 times.
8539 if (ret)
633 goto fail;
634
635 8539 ret = pthread_mutex_init(&sch->finish_lock, NULL);
636
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8539 times.
8539 if (ret)
637 goto fail;
638
639 8539 ret = pthread_cond_init(&sch->finish_cond, NULL);
640
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8539 times.
8539 if (ret)
641 goto fail;
642
643 8539 return sch;
644 fail:
645 sch_free(&sch);
646 return NULL;
647 }
648
649 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
650 {
651 av_freep(&sch->sdp_filename);
652 sch->sdp_filename = av_strdup(sdp_filename);
653 return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
654 }
655
656 static const AVClass sch_mux_class = {
657 .class_name = "SchMux",
658 .version = LIBAVUTIL_VERSION_INT,
659 .parent_log_context_offset = offsetof(SchMux, task.func_arg),
660 };
661
662 8540 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
663 void *arg, int sdp_auto, unsigned thread_queue_size)
664 {
665 8540 const unsigned idx = sch->nb_mux;
666
667 SchMux *mux;
668 int ret;
669
670 8540 ret = GROW_ARRAY(sch->mux, sch->nb_mux);
671
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8540 times.
8540 if (ret < 0)
672 return ret;
673
674 8540 mux = &sch->mux[idx];
675 8540 mux->class = &sch_mux_class;
676 8540 mux->init = init;
677 8540 mux->queue_size = thread_queue_size;
678
679 8540 task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
680
681 8540 sch->sdp_auto &= sdp_auto;
682
683 8540 return idx;
684 }
685
686 9045 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
687 {
688 SchMux *mux;
689 SchMuxStream *ms;
690 unsigned stream_idx;
691 int ret;
692
693
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9045 times.
9045 av_assert0(mux_idx < sch->nb_mux);
694 9045 mux = &sch->mux[mux_idx];
695
696 9045 ret = GROW_ARRAY(mux->streams, mux->nb_streams);
697
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9045 times.
9045 if (ret < 0)
698 return ret;
699 9045 stream_idx = mux->nb_streams - 1;
700
701 9045 ms = &mux->streams[stream_idx];
702
703 9045 ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
704
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9045 times.
9045 if (!ms->pre_mux_queue.fifo)
705 return AVERROR(ENOMEM);
706
707 9045 ms->last_dts = AV_NOPTS_VALUE;
708
709 9045 return stream_idx;
710 }
711
712 static const AVClass sch_demux_class = {
713 .class_name = "SchDemux",
714 .version = LIBAVUTIL_VERSION_INT,
715 .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
716 };
717
718 7448 int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
719 {
720 7448 const unsigned idx = sch->nb_demux;
721
722 SchDemux *d;
723 int ret;
724
725 7448 ret = GROW_ARRAY(sch->demux, sch->nb_demux);
726
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7448 times.
7448 if (ret < 0)
727 return ret;
728
729 7448 d = &sch->demux[idx];
730
731 7448 task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
732
733 7448 d->class = &sch_demux_class;
734 7448 d->send_pkt = av_packet_alloc();
735
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7448 times.
7448 if (!d->send_pkt)
736 return AVERROR(ENOMEM);
737
738 7448 ret = waiter_init(&d->waiter);
739
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7448 times.
7448 if (ret < 0)
740 return ret;
741
742 7448 return idx;
743 }
744
745 7728 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
746 {
747 SchDemux *d;
748 int ret;
749
750
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7728 times.
7728 av_assert0(demux_idx < sch->nb_demux);
751 7728 d = &sch->demux[demux_idx];
752
753 7728 ret = GROW_ARRAY(d->streams, d->nb_streams);
754
1/2
✓ Branch 0 taken 7728 times.
✗ Branch 1 not taken.
7728 return ret < 0 ? ret : d->nb_streams - 1;
755 }
756
757 7006 int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
758 {
759 SchDec *dec;
760 int ret;
761
762
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7006 times.
7006 av_assert0(dec_idx < sch->nb_dec);
763 7006 dec = &sch->dec[dec_idx];
764
765 7006 ret = GROW_ARRAY(dec->outputs, dec->nb_outputs);
766
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7006 times.
7006 if (ret < 0)
767 return ret;
768
769 7006 return dec->nb_outputs - 1;
770 }
771
772 static const AVClass sch_dec_class = {
773 .class_name = "SchDec",
774 .version = LIBAVUTIL_VERSION_INT,
775 .parent_log_context_offset = offsetof(SchDec, task.func_arg),
776 };
777
778 7000 int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
779 {
780 7000 const unsigned idx = sch->nb_dec;
781
782 SchDec *dec;
783 int ret;
784
785 7000 ret = GROW_ARRAY(sch->dec, sch->nb_dec);
786
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7000 times.
7000 if (ret < 0)
787 return ret;
788
789 7000 dec = &sch->dec[idx];
790
791 7000 task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
792
793 7000 dec->class = &sch_dec_class;
794 7000 dec->send_frame = av_frame_alloc();
795
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7000 times.
7000 if (!dec->send_frame)
796 return AVERROR(ENOMEM);
797
798 7000 ret = sch_add_dec_output(sch, idx);
799
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7000 times.
7000 if (ret < 0)
800 return ret;
801
802 7000 ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
803
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7000 times.
7000 if (ret < 0)
804 return ret;
805
806
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6999 times.
7000 if (send_end_ts) {
807 1 ret = av_thread_message_queue_alloc(&dec->queue_end_ts, 1, sizeof(Timestamp));
808
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ret < 0)
809 return ret;
810 }
811
812 7000 return idx;
813 }
814
815 static const AVClass sch_enc_class = {
816 .class_name = "SchEnc",
817 .version = LIBAVUTIL_VERSION_INT,
818 .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
819 };
820
821 8290 int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
822 int (*open_cb)(void *opaque, const AVFrame *frame))
823 {
824 8290 const unsigned idx = sch->nb_enc;
825
826 SchEnc *enc;
827 int ret;
828
829 8290 ret = GROW_ARRAY(sch->enc, sch->nb_enc);
830
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8290 times.
8290 if (ret < 0)
831 return ret;
832
833 8290 enc = &sch->enc[idx];
834
835 8290 enc->class = &sch_enc_class;
836 8290 enc->open_cb = open_cb;
837 8290 enc->sq_idx[0] = -1;
838 8290 enc->sq_idx[1] = -1;
839
840 8290 task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
841
842 8290 enc->send_pkt = av_packet_alloc();
843
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8290 times.
8290 if (!enc->send_pkt)
844 return AVERROR(ENOMEM);
845
846 8290 ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
847
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8290 times.
8290 if (ret < 0)
848 return ret;
849
850 8290 return idx;
851 }
852
853 static const AVClass sch_fg_class = {
854 .class_name = "SchFilterGraph",
855 .version = LIBAVUTIL_VERSION_INT,
856 .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
857 };
858
859 8117 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
860 SchThreadFunc func, void *ctx)
861 {
862 8117 const unsigned idx = sch->nb_filters;
863
864 SchFilterGraph *fg;
865 int ret;
866
867 8117 ret = GROW_ARRAY(sch->filters, sch->nb_filters);
868
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8117 times.
8117 if (ret < 0)
869 return ret;
870 8117 fg = &sch->filters[idx];
871
872 8117 fg->class = &sch_fg_class;
873
874 8117 task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
875
876
2/2
✓ Branch 0 taken 6929 times.
✓ Branch 1 taken 1188 times.
8117 if (nb_inputs) {
877 6929 fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
878
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6929 times.
6929 if (!fg->inputs)
879 return AVERROR(ENOMEM);
880 6929 fg->nb_inputs = nb_inputs;
881 }
882
883
1/2
✓ Branch 0 taken 8117 times.
✗ Branch 1 not taken.
8117 if (nb_outputs) {
884 8117 fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
885
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8117 times.
8117 if (!fg->outputs)
886 return AVERROR(ENOMEM);
887 8117 fg->nb_outputs = nb_outputs;
888 }
889
890 8117 ret = waiter_init(&fg->waiter);
891
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8117 times.
8117 if (ret < 0)
892 return ret;
893
894 8117 ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
895
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8117 times.
8117 if (ret < 0)
896 return ret;
897
898 8117 return idx;
899 }
900
901 3206 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
902 {
903 SchSyncQueue *sq;
904 int ret;
905
906 3206 ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
907
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3206 times.
3206 if (ret < 0)
908 return ret;
909 3206 sq = &sch->sq_enc[sch->nb_sq_enc - 1];
910
911 3206 sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
912
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3206 times.
3206 if (!sq->sq)
913 return AVERROR(ENOMEM);
914
915 3206 sq->frame = av_frame_alloc();
916
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3206 times.
3206 if (!sq->frame)
917 return AVERROR(ENOMEM);
918
919 3206 ret = pthread_mutex_init(&sq->lock, NULL);
920
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3206 times.
3206 if (ret)
921 return AVERROR(ret);
922
923 3206 return sq - sch->sq_enc;
924 }
925
926 3271 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
927 int limiting, uint64_t max_frames)
928 {
929 SchSyncQueue *sq;
930 SchEnc *enc;
931 int ret;
932
933
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3271 times.
3271 av_assert0(sq_idx < sch->nb_sq_enc);
934 3271 sq = &sch->sq_enc[sq_idx];
935
936
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3271 times.
3271 av_assert0(enc_idx < sch->nb_enc);
937 3271 enc = &sch->enc[enc_idx];
938
939 3271 ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
940
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3271 times.
3271 if (ret < 0)
941 return ret;
942 3271 sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
943
944 3271 ret = sq_add_stream(sq->sq, limiting);
945
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3271 times.
3271 if (ret < 0)
946 return ret;
947
948 3271 enc->sq_idx[0] = sq_idx;
949 3271 enc->sq_idx[1] = ret;
950
951
2/2
✓ Branch 0 taken 3064 times.
✓ Branch 1 taken 207 times.
3271 if (max_frames != INT64_MAX)
952 3064 sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
953
954 3271 return 0;
955 }
956
957 31364 int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
958 {
959 int ret;
960
961
4/5
✓ Branch 0 taken 7754 times.
✓ Branch 1 taken 7070 times.
✓ Branch 2 taken 8249 times.
✓ Branch 3 taken 8291 times.
✗ Branch 4 not taken.
31364 switch (src.type) {
962 7754 case SCH_NODE_TYPE_DEMUX: {
963 SchDemuxStream *ds;
964
965
2/4
✓ Branch 0 taken 7754 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7754 times.
7754 av_assert0(src.idx < sch->nb_demux &&
966 src.idx_stream < sch->demux[src.idx].nb_streams);
967 7754 ds = &sch->demux[src.idx].streams[src.idx_stream];
968
969 7754 ret = GROW_ARRAY(ds->dst, ds->nb_dst);
970
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7754 times.
7754 if (ret < 0)
971 return ret;
972
973 7754 ds->dst[ds->nb_dst - 1] = dst;
974
975 // demuxed packets go to decoding or streamcopy
976
2/3
✓ Branch 0 taken 6999 times.
✓ Branch 1 taken 755 times.
✗ Branch 2 not taken.
7754 switch (dst.type) {
977 6999 case SCH_NODE_TYPE_DEC: {
978 SchDec *dec;
979
980
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6999 times.
6999 av_assert0(dst.idx < sch->nb_dec);
981 6999 dec = &sch->dec[dst.idx];
982
983
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6999 times.
6999 av_assert0(!dec->src.type);
984 6999 dec->src = src;
985 6999 break;
986 }
987 755 case SCH_NODE_TYPE_MUX: {
988 SchMuxStream *ms;
989
990
2/4
✓ Branch 0 taken 755 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 755 times.
755 av_assert0(dst.idx < sch->nb_mux &&
991 dst.idx_stream < sch->mux[dst.idx].nb_streams);
992 755 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
993
994
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 755 times.
755 av_assert0(!ms->src.type);
995 755 ms->src = src;
996
997 755 break;
998 }
999 default: av_assert0(0);
1000 }
1001
1002 7754 break;
1003 }
1004 7070 case SCH_NODE_TYPE_DEC: {
1005 SchDec *dec;
1006 SchDecOutput *o;
1007
1008
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7070 times.
7070 av_assert0(src.idx < sch->nb_dec);
1009 7070 dec = &sch->dec[src.idx];
1010
1011
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7070 times.
7070 av_assert0(src.idx_stream < dec->nb_outputs);
1012 7070 o = &dec->outputs[src.idx_stream];
1013
1014 7070 ret = GROW_ARRAY(o->dst, o->nb_dst);
1015
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7070 times.
7070 if (ret < 0)
1016 return ret;
1017
1018 7070 o->dst[o->nb_dst - 1] = dst;
1019
1020 // decoded frames go to filters or encoding
1021
2/3
✓ Branch 0 taken 7028 times.
✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
7070 switch (dst.type) {
1022 7028 case SCH_NODE_TYPE_FILTER_IN: {
1023 SchFilterIn *fi;
1024
1025
2/4
✓ Branch 0 taken 7028 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7028 times.
7028 av_assert0(dst.idx < sch->nb_filters &&
1026 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1027 7028 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1028
1029
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7028 times.
7028 av_assert0(!fi->src.type);
1030 7028 fi->src = src;
1031 7028 break;
1032 }
1033 42 case SCH_NODE_TYPE_ENC: {
1034 SchEnc *enc;
1035
1036
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42 times.
42 av_assert0(dst.idx < sch->nb_enc);
1037 42 enc = &sch->enc[dst.idx];
1038
1039
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42 times.
42 av_assert0(!enc->src.type);
1040 42 enc->src = src;
1041 42 break;
1042 }
1043 default: av_assert0(0);
1044 }
1045
1046 7070 break;
1047 }
1048 8249 case SCH_NODE_TYPE_FILTER_OUT: {
1049 SchFilterOut *fo;
1050
1051
2/4
✓ Branch 0 taken 8249 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 8249 times.
8249 av_assert0(src.idx < sch->nb_filters &&
1052 src.idx_stream < sch->filters[src.idx].nb_outputs);
1053 8249 fo = &sch->filters[src.idx].outputs[src.idx_stream];
1054
1055
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8249 times.
8249 av_assert0(!fo->dst.type);
1056 8249 fo->dst = dst;
1057
1058 // filtered frames go to encoding or another filtergraph
1059
2/3
✓ Branch 0 taken 8248 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
8249 switch (dst.type) {
1060 8248 case SCH_NODE_TYPE_ENC: {
1061 SchEnc *enc;
1062
1063
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8248 times.
8248 av_assert0(dst.idx < sch->nb_enc);
1064 8248 enc = &sch->enc[dst.idx];
1065
1066
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8248 times.
8248 av_assert0(!enc->src.type);
1067 8248 enc->src = src;
1068 8248 break;
1069 }
1070 1 case SCH_NODE_TYPE_FILTER_IN: {
1071 SchFilterIn *fi;
1072
1073
2/4
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
1 av_assert0(dst.idx < sch->nb_filters &&
1074 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1075 1 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1076
1077
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(!fi->src.type);
1078 1 fi->src = src;
1079 1 break;
1080 }
1081 default: av_assert0(0);
1082 }
1083
1084
1085 8249 break;
1086 }
1087 8291 case SCH_NODE_TYPE_ENC: {
1088 SchEnc *enc;
1089
1090
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8291 times.
8291 av_assert0(src.idx < sch->nb_enc);
1091 8291 enc = &sch->enc[src.idx];
1092
1093 8291 ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1094
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8291 times.
8291 if (ret < 0)
1095 return ret;
1096
1097 8291 enc->dst[enc->nb_dst - 1] = dst;
1098
1099 // encoding packets go to muxing or decoding
1100
2/3
✓ Branch 0 taken 8290 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
8291 switch (dst.type) {
1101 8290 case SCH_NODE_TYPE_MUX: {
1102 SchMuxStream *ms;
1103
1104
2/4
✓ Branch 0 taken 8290 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 8290 times.
8290 av_assert0(dst.idx < sch->nb_mux &&
1105 dst.idx_stream < sch->mux[dst.idx].nb_streams);
1106 8290 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1107
1108
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8290 times.
8290 av_assert0(!ms->src.type);
1109 8290 ms->src = src;
1110
1111 8290 break;
1112 }
1113 1 case SCH_NODE_TYPE_DEC: {
1114 SchDec *dec;
1115
1116
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(dst.idx < sch->nb_dec);
1117 1 dec = &sch->dec[dst.idx];
1118
1119
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(!dec->src.type);
1120 1 dec->src = src;
1121
1122 1 break;
1123 }
1124 default: av_assert0(0);
1125 }
1126
1127 8291 break;
1128 }
1129 default: av_assert0(0);
1130 }
1131
1132 31364 return 0;
1133 }
1134
1135 8540 static int mux_task_start(SchMux *mux)
1136 {
1137 8540 int ret = 0;
1138
1139 8540 ret = task_start(&mux->task);
1140
1/2
✓ Branch 0 taken 8540 times.
✗ Branch 1 not taken.
8540 if (ret < 0)
1141 return ret;
1142
1143 /* flush the pre-muxing queues */
1144 2698 while (1) {
1145 11238 int min_stream = -1;
1146 11238 Timestamp min_ts = { .ts = AV_NOPTS_VALUE };
1147
1148 AVPacket *pkt;
1149
1150 // find the stream with the earliest dts or EOF in pre-muxing queue
1151
2/2
✓ Branch 0 taken 20173 times.
✓ Branch 1 taken 11186 times.
31359 for (unsigned i = 0; i < mux->nb_streams; i++) {
1152 20173 SchMuxStream *ms = &mux->streams[i];
1153
1154
2/2
✓ Branch 1 taken 15260 times.
✓ Branch 2 taken 4913 times.
20173 if (av_fifo_peek(ms->pre_mux_queue.fifo, &pkt, 1, 0) < 0)
1155 15260 continue;
1156
1157
4/4
✓ Branch 0 taken 4874 times.
✓ Branch 1 taken 39 times.
✓ Branch 2 taken 13 times.
✓ Branch 3 taken 4861 times.
4913 if (!pkt || pkt->dts == AV_NOPTS_VALUE) {
1158 52 min_stream = i;
1159 52 break;
1160 }
1161
1162
4/4
✓ Branch 0 taken 2188 times.
✓ Branch 1 taken 2673 times.
✓ Branch 2 taken 29 times.
✓ Branch 3 taken 2159 times.
7049 if (min_ts.ts == AV_NOPTS_VALUE ||
1163 2188 av_compare_ts(min_ts.ts, min_ts.tb, pkt->dts, pkt->time_base) > 0) {
1164 2702 min_stream = i;
1165 2702 min_ts = (Timestamp){ .ts = pkt->dts, .tb = pkt->time_base };
1166 }
1167 }
1168
1169
2/2
✓ Branch 0 taken 2698 times.
✓ Branch 1 taken 8540 times.
11238 if (min_stream >= 0) {
1170 2698 SchMuxStream *ms = &mux->streams[min_stream];
1171
1172 2698 ret = av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1);
1173
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2698 times.
2698 av_assert0(ret >= 0);
1174
1175
2/2
✓ Branch 0 taken 2659 times.
✓ Branch 1 taken 39 times.
2698 if (pkt) {
1176
2/2
✓ Branch 0 taken 2650 times.
✓ Branch 1 taken 9 times.
2659 if (!ms->init_eof)
1177 2650 ret = tq_send(mux->queue, min_stream, pkt);
1178 2659 av_packet_free(&pkt);
1179
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 2658 times.
2659 if (ret == AVERROR_EOF)
1180 1 ms->init_eof = 1;
1181
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2658 times.
2658 else if (ret < 0)
1182 return ret;
1183 } else
1184 39 tq_send_finish(mux->queue, min_stream);
1185
1186 2698 continue;
1187 }
1188
1189 8540 break;
1190 }
1191
1192 8540 atomic_store(&mux->mux_started, 1);
1193
1194 8540 return 0;
1195 }
1196
1197 int print_sdp(const char *filename);
1198
1199 8540 static int mux_init(Scheduler *sch, SchMux *mux)
1200 {
1201 int ret;
1202
1203 8540 ret = mux->init(mux->task.func_arg);
1204
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8540 times.
8540 if (ret < 0)
1205 return ret;
1206
1207 8540 sch->nb_mux_ready++;
1208
1209
2/4
✓ Branch 0 taken 8540 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 8540 times.
8540 if (sch->sdp_filename || sch->sdp_auto) {
1210 if (sch->nb_mux_ready < sch->nb_mux)
1211 return 0;
1212
1213 ret = print_sdp(sch->sdp_filename);
1214 if (ret < 0) {
1215 av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1216 return ret;
1217 }
1218
1219 /* SDP is written only after all the muxers are ready, so now we
1220 * start ALL the threads */
1221 for (unsigned i = 0; i < sch->nb_mux; i++) {
1222 ret = mux_task_start(&sch->mux[i]);
1223 if (ret < 0)
1224 return ret;
1225 }
1226 } else {
1227 8540 ret = mux_task_start(mux);
1228
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8540 times.
8540 if (ret < 0)
1229 return ret;
1230 }
1231
1232 8540 return 0;
1233 }
1234
1235 9045 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1236 size_t data_threshold, int max_packets)
1237 {
1238 SchMux *mux;
1239 SchMuxStream *ms;
1240
1241
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9045 times.
9045 av_assert0(mux_idx < sch->nb_mux);
1242 9045 mux = &sch->mux[mux_idx];
1243
1244
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9045 times.
9045 av_assert0(stream_idx < mux->nb_streams);
1245 9045 ms = &mux->streams[stream_idx];
1246
1247 9045 ms->pre_mux_queue.max_packets = max_packets;
1248 9045 ms->pre_mux_queue.data_threshold = data_threshold;
1249 9045 }
1250
1251 9045 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1252 {
1253 SchMux *mux;
1254 9045 int ret = 0;
1255
1256
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9045 times.
9045 av_assert0(mux_idx < sch->nb_mux);
1257 9045 mux = &sch->mux[mux_idx];
1258
1259
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9045 times.
9045 av_assert0(stream_idx < mux->nb_streams);
1260
1261 9045 pthread_mutex_lock(&sch->mux_ready_lock);
1262
1263
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9045 times.
9045 av_assert0(mux->nb_streams_ready < mux->nb_streams);
1264
1265 // this may be called during initialization - do not start
1266 // threads before sch_start() is called
1267
2/2
✓ Branch 0 taken 8540 times.
✓ Branch 1 taken 505 times.
9045 if (++mux->nb_streams_ready == mux->nb_streams &&
1268
2/2
✓ Branch 0 taken 8038 times.
✓ Branch 1 taken 502 times.
8540 sch->state >= SCH_STATE_STARTED)
1269 8038 ret = mux_init(sch, mux);
1270
1271 9045 pthread_mutex_unlock(&sch->mux_ready_lock);
1272
1273 9045 return ret;
1274 }
1275
1276 1 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1277 unsigned dec_idx)
1278 {
1279 SchMux *mux;
1280 SchMuxStream *ms;
1281 1 int ret = 0;
1282
1283
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(mux_idx < sch->nb_mux);
1284 1 mux = &sch->mux[mux_idx];
1285
1286
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(stream_idx < mux->nb_streams);
1287 1 ms = &mux->streams[stream_idx];
1288
1289 1 ret = GROW_ARRAY(ms->sub_heartbeat_dst, ms->nb_sub_heartbeat_dst);
1290
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ret < 0)
1291 return ret;
1292
1293
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(dec_idx < sch->nb_dec);
1294 1 ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1295
1296
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!mux->sub_heartbeat_pkt) {
1297 1 mux->sub_heartbeat_pkt = av_packet_alloc();
1298
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!mux->sub_heartbeat_pkt)
1299 return AVERROR(ENOMEM);
1300 }
1301
1302 1 return 0;
1303 }
1304
1305 static void unchoke_for_stream(Scheduler *sch, SchedulerNode src);
1306
1307 // Unchoke any filter graphs that are downstream of this node, to prevent it
1308 // from getting stuck trying to push data to a full queue
1309 1122940 static void unchoke_downstream(Scheduler *sch, SchedulerNode *dst)
1310 {
1311 SchFilterGraph *fg;
1312 SchDec *dec;
1313 SchEnc *enc;
1314
4/5
✓ Branch 0 taken 513282 times.
✓ Branch 1 taken 1280 times.
✓ Branch 2 taken 95720 times.
✓ Branch 3 taken 512658 times.
✗ Branch 4 not taken.
1122940 switch (dst->type) {
1315 513282 case SCH_NODE_TYPE_DEC:
1316 513282 dec = &sch->dec[dst->idx];
1317
2/2
✓ Branch 0 taken 513938 times.
✓ Branch 1 taken 513282 times.
1027220 for (int i = 0; i < dec->nb_outputs; i++)
1318 513938 unchoke_downstream(sch, dec->outputs[i].dst);
1319 513282 break;
1320 1280 case SCH_NODE_TYPE_ENC:
1321 1280 enc = &sch->enc[dst->idx];
1322
2/2
✓ Branch 0 taken 1280 times.
✓ Branch 1 taken 1280 times.
2560 for (int i = 0; i < enc->nb_dst; i++)
1323 1280 unchoke_downstream(sch, &enc->dst[i]);
1324 1280 break;
1325 95720 case SCH_NODE_TYPE_MUX:
1326 // muxers are never choked
1327 95720 break;
1328 512658 case SCH_NODE_TYPE_FILTER_IN:
1329 512658 fg = &sch->filters[dst->idx];
1330
2/2
✓ Branch 0 taken 104 times.
✓ Branch 1 taken 512554 times.
512658 if (fg->best_input == fg->nb_inputs) {
1331 104 fg->waiter.choked_next = 0;
1332 } else {
1333 // ensure that this filter graph is not stuck waiting for
1334 // input from a different upstream demuxer
1335 512554 unchoke_for_stream(sch, fg->inputs[fg->best_input].src);
1336 }
1337 512658 break;
1338 default:
1339 av_unreachable("Invalid destination node type?");
1340 break;
1341 }
1342 1122940 }
1343
1344 1158914 static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
1345 {
1346 2115020 while (1) {
1347 SchFilterGraph *fg;
1348 SchDemux *demux;
1349
4/5
✓ Branch 0 taken 1114244 times.
✓ Branch 1 taken 1032465 times.
✓ Branch 2 taken 564171 times.
✓ Branch 3 taken 563054 times.
✗ Branch 4 not taken.
3273934 switch (src.type) {
1350 1114244 case SCH_NODE_TYPE_DEMUX:
1351 // fed directly by a demuxer (i.e. not through a filtergraph)
1352 1114244 demux = &sch->demux[src.idx];
1353
2/2
✓ Branch 0 taken 545381 times.
✓ Branch 1 taken 568863 times.
1114244 if (demux->waiter.choked_next == 0)
1354 545381 return; // prevent infinite loop
1355 568863 demux->waiter.choked_next = 0;
1356
2/2
✓ Branch 0 taken 607722 times.
✓ Branch 1 taken 568863 times.
1176585 for (int i = 0; i < demux->nb_streams; i++)
1357 607722 unchoke_downstream(sch, demux->streams[i].dst);
1358 568863 return;
1359 1032465 case SCH_NODE_TYPE_DEC:
1360 1032465 src = sch->dec[src.idx].src;
1361 1032465 continue;
1362 564171 case SCH_NODE_TYPE_ENC:
1363 564171 src = sch->enc[src.idx].src;
1364 564171 continue;
1365 563054 case SCH_NODE_TYPE_FILTER_OUT:
1366 563054 fg = &sch->filters[src.idx];
1367 // the filtergraph contains internal sources and
1368 // requested to be scheduled directly
1369
2/2
✓ Branch 0 taken 44670 times.
✓ Branch 1 taken 518384 times.
563054 if (fg->best_input == fg->nb_inputs) {
1370 44670 fg->waiter.choked_next = 0;
1371 44670 return;
1372 }
1373 518384 src = fg->inputs[fg->best_input].src;
1374 518384 continue;
1375 default:
1376 av_unreachable("Invalid source node type?");
1377 return;
1378 }
1379 }
1380 }
1381
1382 27093 static void choke_demux(const Scheduler *sch, int demux_id, int choked)
1383 {
1384 av_assert1(demux_id < sch->nb_demux);
1385 27093 SchDemux *demux = &sch->demux[demux_id];
1386
1387
2/2
✓ Branch 0 taken 27856 times.
✓ Branch 1 taken 27093 times.
54949 for (int i = 0; i < demux->nb_streams; i++) {
1388 27856 SchedulerNode *dst = demux->streams[i].dst;
1389 SchFilterGraph *fg;
1390
1391
2/5
✓ Branch 0 taken 26218 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1638 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
27856 switch (dst->type) {
1392 26218 case SCH_NODE_TYPE_DEC:
1393 26218 tq_choke(sch->dec[dst->idx].queue, choked);
1394 26218 break;
1395 case SCH_NODE_TYPE_ENC:
1396 tq_choke(sch->enc[dst->idx].queue, choked);
1397 break;
1398 1638 case SCH_NODE_TYPE_MUX:
1399 1638 break;
1400 case SCH_NODE_TYPE_FILTER_IN:
1401 fg = &sch->filters[dst->idx];
1402 if (fg->nb_inputs == 1)
1403 tq_choke(fg->queue, choked);
1404 break;
1405 default:
1406 av_unreachable("Invalid destination node type?");
1407 break;
1408 }
1409 }
1410 27093 }
1411
1412 652698 static void schedule_update_locked(Scheduler *sch)
1413 {
1414 int64_t dts;
1415 652698 int have_unchoked = 0;
1416
1417 // on termination request all waiters are choked,
1418 // we are not to unchoke them
1419
2/2
✓ Branch 0 taken 15451 times.
✓ Branch 1 taken 637247 times.
652698 if (atomic_load(&sch->terminate))
1420 15451 return;
1421
1422 637247 dts = trailing_dts(sch);
1423
1424 637247 atomic_store(&sch->last_dts, progressing_dts(sch, 0));
1425
1426 // initialize our internal state
1427
2/2
✓ Branch 0 taken 1274494 times.
✓ Branch 1 taken 637247 times.
1911741 for (unsigned type = 0; type < 2; type++)
1428
4/4
✓ Branch 0 taken 1220612 times.
✓ Branch 1 taken 1250267 times.
✓ Branch 2 taken 1196385 times.
✓ Branch 3 taken 1274494 times.
2470879 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1429
2/2
✓ Branch 0 taken 583365 times.
✓ Branch 1 taken 613020 times.
1196385 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1430 1196385 w->choked_prev = atomic_load(&w->choked);
1431 1196385 w->choked_next = 1;
1432 }
1433
1434 // figure out the sources that are allowed to proceed
1435
2/2
✓ Branch 0 taken 638599 times.
✓ Branch 1 taken 637247 times.
1275846 for (unsigned i = 0; i < sch->nb_mux; i++) {
1436 638599 SchMux *mux = &sch->mux[i];
1437
1438
2/2
✓ Branch 0 taken 711852 times.
✓ Branch 1 taken 638599 times.
1350451 for (unsigned j = 0; j < mux->nb_streams; j++) {
1439 711852 SchMuxStream *ms = &mux->streams[j];
1440
1441 // unblock sources for output streams that are not finished
1442 // and not too far ahead of the trailing stream
1443
2/2
✓ Branch 0 taken 38236 times.
✓ Branch 1 taken 673616 times.
711852 if (ms->source_finished)
1444 38236 continue;
1445
4/4
✓ Branch 0 taken 41219 times.
✓ Branch 1 taken 632397 times.
✓ Branch 2 taken 8189 times.
✓ Branch 3 taken 33030 times.
673616 if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1446 8189 continue;
1447
4/4
✓ Branch 0 taken 632397 times.
✓ Branch 1 taken 33030 times.
✓ Branch 2 taken 19477 times.
✓ Branch 3 taken 612920 times.
665427 if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1448 19477 continue;
1449
1450 // resolve the source to unchoke
1451 645950 unchoke_for_stream(sch, ms->src);
1452 645950 have_unchoked = 1;
1453 }
1454 }
1455
1456 // also unchoke any sources feeding into closed filter graph inputs, so
1457 // that they can observe the downstream EOF
1458
2/2
✓ Branch 0 taken 583365 times.
✓ Branch 1 taken 637247 times.
1220612 for (unsigned i = 0; i < sch->nb_filters; i++) {
1459 583365 SchFilterGraph *fg = &sch->filters[i];
1460
1461
2/2
✓ Branch 0 taken 570573 times.
✓ Branch 1 taken 583365 times.
1153938 for (unsigned j = 0; j < fg->nb_inputs; j++) {
1462 570573 SchFilterIn *fi = &fg->inputs[j];
1463
4/4
✓ Branch 0 taken 3609 times.
✓ Branch 1 taken 566964 times.
✓ Branch 2 taken 410 times.
✓ Branch 3 taken 3199 times.
570573 if (fi->receive_finished && !fi->send_finished)
1464 410 unchoke_for_stream(sch, fi->src);
1465 }
1466 }
1467
1468 // make sure to unchoke at least one source, if still available
1469
4/4
✓ Branch 0 taken 48722 times.
✓ Branch 1 taken 630420 times.
✓ Branch 2 taken 41895 times.
✓ Branch 3 taken 6827 times.
679142 for (unsigned type = 0; !have_unchoked && type < 2; type++)
1470
4/4
✓ Branch 0 taken 18924 times.
✓ Branch 1 taken 39971 times.
✓ Branch 2 taken 39161 times.
✓ Branch 3 taken 19734 times.
58895 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1471
2/2
✓ Branch 0 taken 12097 times.
✓ Branch 1 taken 27064 times.
39161 int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1472
2/2
✓ Branch 0 taken 12097 times.
✓ Branch 1 taken 27064 times.
39161 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1473
2/2
✓ Branch 0 taken 22161 times.
✓ Branch 1 taken 17000 times.
39161 if (!exited) {
1474 22161 w->choked_next = 0;
1475 22161 have_unchoked = 1;
1476 22161 break;
1477 }
1478 }
1479
1480
2/2
✓ Branch 0 taken 1274494 times.
✓ Branch 1 taken 637247 times.
1911741 for (unsigned type = 0; type < 2; type++) {
1481
4/4
✓ Branch 0 taken 1220612 times.
✓ Branch 1 taken 1250267 times.
✓ Branch 2 taken 1196385 times.
✓ Branch 3 taken 1274494 times.
2470879 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1482
2/2
✓ Branch 0 taken 583365 times.
✓ Branch 1 taken 613020 times.
1196385 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1483
2/2
✓ Branch 0 taken 43712 times.
✓ Branch 1 taken 1152673 times.
1196385 if (w->choked_prev != w->choked_next) {
1484 43712 waiter_set(w, w->choked_next);
1485
2/2
✓ Branch 0 taken 19646 times.
✓ Branch 1 taken 24066 times.
43712 if (!type)
1486 19646 choke_demux(sch, i, w->choked_next);
1487 }
1488 }
1489 }
1490
1491 }
1492
1493 enum {
1494 CYCLE_NODE_NEW = 0,
1495 CYCLE_NODE_STARTED,
1496 CYCLE_NODE_DONE,
1497 };
1498
1499 // Finds the filtergraph or muxer upstream of a scheduler node
1500 7034 static SchedulerNode src_filtergraph(const Scheduler *sch, SchedulerNode src)
1501 {
1502 while (1) {
1503
3/4
✓ Branch 0 taken 7034 times.
✓ Branch 1 taken 7033 times.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
14068 switch (src.type) {
1504 7034 case SCH_NODE_TYPE_DEMUX:
1505 case SCH_NODE_TYPE_FILTER_OUT:
1506 7034 return src;
1507 7033 case SCH_NODE_TYPE_DEC:
1508 7033 src = sch->dec[src.idx].src;
1509 7034 continue;
1510 1 case SCH_NODE_TYPE_ENC:
1511 1 src = sch->enc[src.idx].src;
1512 1 continue;
1513 default:
1514 av_unreachable("Invalid source node type?");
1515 return (SchedulerNode) {0};
1516 }
1517 }
1518 }
1519
1520 static int
1521 8117 check_acyclic_for_output(const Scheduler *sch, SchedulerNode src,
1522 uint8_t *filters_visited, SchedulerNode *filters_stack)
1523 {
1524 8117 unsigned nb_filters_stack = 0;
1525
1526 8117 memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1527
1528 7036 while (1) {
1529 15153 const SchFilterGraph *fg = &sch->filters[src.idx];
1530
1531 15153 filters_visited[src.idx] = CYCLE_NODE_STARTED;
1532
1533 // descend into every input, depth first
1534
2/2
✓ Branch 0 taken 7034 times.
✓ Branch 1 taken 8119 times.
15153 if (src.idx_stream < fg->nb_inputs) {
1535 7034 const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1536 7034 SchedulerNode node = src_filtergraph(sch, fi->src);
1537
1538 // connected to demuxer, no cycles possible
1539
2/2
✓ Branch 0 taken 7032 times.
✓ Branch 1 taken 2 times.
7034 if (node.type == SCH_NODE_TYPE_DEMUX)
1540 7034 continue;
1541
1542 // otherwise connected to another filtergraph
1543
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 av_assert0(node.type == SCH_NODE_TYPE_FILTER_OUT);
1544
1545 // found a cycle
1546
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (filters_visited[node.idx] == CYCLE_NODE_STARTED)
1547 return AVERROR(EINVAL);
1548
1549 // place current position on stack and descend
1550
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 av_assert0(nb_filters_stack < sch->nb_filters);
1551 2 filters_stack[nb_filters_stack++] = src;
1552 2 src = (SchedulerNode){ .idx = node.idx, .idx_stream = 0 };
1553 2 continue;
1554 }
1555
1556 8119 filters_visited[src.idx] = CYCLE_NODE_DONE;
1557
1558 // previous search finished,
1559
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 8117 times.
8119 if (nb_filters_stack) {
1560 2 src = filters_stack[--nb_filters_stack];
1561 2 continue;
1562 }
1563 8117 return 0;
1564 }
1565 }
1566
1567 8536 static int check_acyclic(Scheduler *sch)
1568 {
1569 8536 uint8_t *filters_visited = NULL;
1570 8536 SchedulerNode *filters_stack = NULL;
1571
1572 8536 int ret = 0;
1573
1574
2/2
✓ Branch 0 taken 536 times.
✓ Branch 1 taken 8000 times.
8536 if (!sch->nb_filters)
1575 536 return 0;
1576
1577 8000 filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1578
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8000 times.
8000 if (!filters_visited)
1579 return AVERROR(ENOMEM);
1580
1581 8000 filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1582
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8000 times.
8000 if (!filters_stack) {
1583 ret = AVERROR(ENOMEM);
1584 goto fail;
1585 }
1586
1587 // trace the transcoding graph upstream from every filtegraph
1588
2/2
✓ Branch 0 taken 8117 times.
✓ Branch 1 taken 8000 times.
16117 for (unsigned i = 0; i < sch->nb_filters; i++) {
1589 8117 ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1590 filters_visited, filters_stack);
1591
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8117 times.
8117 if (ret < 0) {
1592 av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1593 goto fail;
1594 }
1595 }
1596
1597 8000 fail:
1598 8000 av_freep(&filters_visited);
1599 8000 av_freep(&filters_stack);
1600 8000 return ret;
1601 }
1602
1603 8536 static int start_prepare(Scheduler *sch)
1604 {
1605 int ret;
1606
1607
2/2
✓ Branch 0 taken 7447 times.
✓ Branch 1 taken 8536 times.
15983 for (unsigned i = 0; i < sch->nb_demux; i++) {
1608 7447 SchDemux *d = &sch->demux[i];
1609
1610
2/2
✓ Branch 0 taken 7728 times.
✓ Branch 1 taken 7447 times.
15175 for (unsigned j = 0; j < d->nb_streams; j++) {
1611 7728 SchDemuxStream *ds = &d->streams[j];
1612
1613
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7728 times.
7728 if (!ds->nb_dst) {
1614 av_log(d, AV_LOG_ERROR,
1615 "Demuxer stream %u not connected to any sink\n", j);
1616 return AVERROR(EINVAL);
1617 }
1618
1619 7728 ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1620
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7728 times.
7728 if (!ds->dst_finished)
1621 return AVERROR(ENOMEM);
1622 }
1623 }
1624
1625
2/2
✓ Branch 0 taken 7000 times.
✓ Branch 1 taken 8536 times.
15536 for (unsigned i = 0; i < sch->nb_dec; i++) {
1626 7000 SchDec *dec = &sch->dec[i];
1627
1628
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7000 times.
7000 if (!dec->src.type) {
1629 av_log(dec, AV_LOG_ERROR,
1630 "Decoder not connected to a source\n");
1631 return AVERROR(EINVAL);
1632 }
1633
1634
2/2
✓ Branch 0 taken 7006 times.
✓ Branch 1 taken 7000 times.
14006 for (unsigned j = 0; j < dec->nb_outputs; j++) {
1635 7006 SchDecOutput *o = &dec->outputs[j];
1636
1637
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7006 times.
7006 if (!o->nb_dst) {
1638 av_log(dec, AV_LOG_ERROR,
1639 "Decoder output %u not connected to any sink\n", j);
1640 return AVERROR(EINVAL);
1641 }
1642
1643 7006 o->dst_finished = av_calloc(o->nb_dst, sizeof(*o->dst_finished));
1644
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7006 times.
7006 if (!o->dst_finished)
1645 return AVERROR(ENOMEM);
1646 }
1647 }
1648
1649
2/2
✓ Branch 0 taken 8290 times.
✓ Branch 1 taken 8536 times.
16826 for (unsigned i = 0; i < sch->nb_enc; i++) {
1650 8290 SchEnc *enc = &sch->enc[i];
1651
1652
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8290 times.
8290 if (!enc->src.type) {
1653 av_log(enc, AV_LOG_ERROR,
1654 "Encoder not connected to a source\n");
1655 return AVERROR(EINVAL);
1656 }
1657
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8290 times.
8290 if (!enc->nb_dst) {
1658 av_log(enc, AV_LOG_ERROR,
1659 "Encoder not connected to any sink\n");
1660 return AVERROR(EINVAL);
1661 }
1662
1663 8290 enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1664
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8290 times.
8290 if (!enc->dst_finished)
1665 return AVERROR(ENOMEM);
1666 }
1667
1668
2/2
✓ Branch 0 taken 8540 times.
✓ Branch 1 taken 8536 times.
17076 for (unsigned i = 0; i < sch->nb_mux; i++) {
1669 8540 SchMux *mux = &sch->mux[i];
1670
1671
2/2
✓ Branch 0 taken 9045 times.
✓ Branch 1 taken 8540 times.
17585 for (unsigned j = 0; j < mux->nb_streams; j++) {
1672 9045 SchMuxStream *ms = &mux->streams[j];
1673
1674
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9045 times.
9045 if (!ms->src.type) {
1675 av_log(mux, AV_LOG_ERROR,
1676 "Muxer stream #%u not connected to a source\n", j);
1677 return AVERROR(EINVAL);
1678 }
1679 }
1680
1681 8540 ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1682 QUEUE_PACKETS);
1683
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8540 times.
8540 if (ret < 0)
1684 return ret;
1685 }
1686
1687
2/2
✓ Branch 0 taken 8117 times.
✓ Branch 1 taken 8536 times.
16653 for (unsigned i = 0; i < sch->nb_filters; i++) {
1688 8117 SchFilterGraph *fg = &sch->filters[i];
1689
1690
2/2
✓ Branch 0 taken 7029 times.
✓ Branch 1 taken 8117 times.
15146 for (unsigned j = 0; j < fg->nb_inputs; j++) {
1691 7029 SchFilterIn *fi = &fg->inputs[j];
1692
1693
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7029 times.
7029 if (!fi->src.type) {
1694 av_log(fg, AV_LOG_ERROR,
1695 "Filtergraph input %u not connected to a source\n", j);
1696 return AVERROR(EINVAL);
1697 }
1698 }
1699
1700
2/2
✓ Branch 0 taken 8249 times.
✓ Branch 1 taken 8117 times.
16366 for (unsigned j = 0; j < fg->nb_outputs; j++) {
1701 8249 SchFilterOut *fo = &fg->outputs[j];
1702
1703
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8249 times.
8249 if (!fo->dst.type) {
1704 av_log(fg, AV_LOG_ERROR,
1705 "Filtergraph %u output %u not connected to a sink\n", i, j);
1706 return AVERROR(EINVAL);
1707 }
1708 }
1709 }
1710
1711 // Check that the transcoding graph has no cycles.
1712 8536 ret = check_acyclic(sch);
1713
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8536 times.
8536 if (ret < 0)
1714 return ret;
1715
1716 8536 return 0;
1717 }
1718
1719 8536 int sch_start(Scheduler *sch)
1720 {
1721 int ret;
1722
1723 8536 ret = start_prepare(sch);
1724
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8536 times.
8536 if (ret < 0)
1725 return ret;
1726
1727
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8536 times.
8536 av_assert0(sch->state == SCH_STATE_UNINIT);
1728 8536 sch->state = SCH_STATE_STARTED;
1729
1730
2/2
✓ Branch 0 taken 8540 times.
✓ Branch 1 taken 8536 times.
17076 for (unsigned i = 0; i < sch->nb_mux; i++) {
1731 8540 SchMux *mux = &sch->mux[i];
1732
1733
2/2
✓ Branch 0 taken 502 times.
✓ Branch 1 taken 8038 times.
8540 if (mux->nb_streams_ready == mux->nb_streams) {
1734 502 ret = mux_init(sch, mux);
1735
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 502 times.
502 if (ret < 0)
1736 goto fail;
1737 }
1738 }
1739
1740
2/2
✓ Branch 0 taken 8290 times.
✓ Branch 1 taken 8536 times.
16826 for (unsigned i = 0; i < sch->nb_enc; i++) {
1741 8290 SchEnc *enc = &sch->enc[i];
1742
1743 8290 ret = task_start(&enc->task);
1744
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8290 times.
8290 if (ret < 0)
1745 goto fail;
1746 }
1747
1748
2/2
✓ Branch 0 taken 8117 times.
✓ Branch 1 taken 8536 times.
16653 for (unsigned i = 0; i < sch->nb_filters; i++) {
1749 8117 SchFilterGraph *fg = &sch->filters[i];
1750
1751 8117 ret = task_start(&fg->task);
1752
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8117 times.
8117 if (ret < 0)
1753 goto fail;
1754 }
1755
1756
2/2
✓ Branch 0 taken 7000 times.
✓ Branch 1 taken 8536 times.
15536 for (unsigned i = 0; i < sch->nb_dec; i++) {
1757 7000 SchDec *dec = &sch->dec[i];
1758
1759 7000 ret = task_start(&dec->task);
1760
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7000 times.
7000 if (ret < 0)
1761 goto fail;
1762 }
1763
1764
2/2
✓ Branch 0 taken 7447 times.
✓ Branch 1 taken 8536 times.
15983 for (unsigned i = 0; i < sch->nb_demux; i++) {
1765 7447 SchDemux *d = &sch->demux[i];
1766
1767
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 7433 times.
7447 if (!d->nb_streams)
1768 14 continue;
1769
1770 7433 ret = task_start(&d->task);
1771
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7433 times.
7433 if (ret < 0)
1772 goto fail;
1773 }
1774
1775 8536 pthread_mutex_lock(&sch->schedule_lock);
1776 8536 schedule_update_locked(sch);
1777 8536 pthread_mutex_unlock(&sch->schedule_lock);
1778
1779 8536 return 0;
1780 fail:
1781 sch_stop(sch, NULL);
1782 return ret;
1783 }
1784
1785 26800 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1786 {
1787 int ret;
1788
1789 // convert delay to absolute timestamp
1790 26800 timeout_us += av_gettime();
1791
1792 26800 pthread_mutex_lock(&sch->finish_lock);
1793
1794
1/2
✓ Branch 0 taken 26800 times.
✗ Branch 1 not taken.
26800 if (sch->nb_mux_done < sch->nb_mux) {
1795 26800 struct timespec tv = { .tv_sec = timeout_us / 1000000,
1796 26800 .tv_nsec = (timeout_us % 1000000) * 1000 };
1797 26800 pthread_cond_timedwait(&sch->finish_cond, &sch->finish_lock, &tv);
1798 }
1799
1800 // abort transcoding if any task failed
1801
4/4
✓ Branch 0 taken 18265 times.
✓ Branch 1 taken 8535 times.
✓ Branch 2 taken 1 times.
✓ Branch 3 taken 18264 times.
26800 ret = sch->nb_mux_done == sch->nb_mux || sch->task_failed;
1802
1803 26800 pthread_mutex_unlock(&sch->finish_lock);
1804
1805 26800 *transcode_ts = atomic_load(&sch->last_dts);
1806
1807 26800 return ret;
1808 }
1809
1810 8248 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1811 {
1812 int ret;
1813
1814 8248 ret = enc->open_cb(enc->task.func_arg, frame);
1815
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8248 times.
8248 if (ret < 0)
1816 return ret;
1817
1818 // ret>0 signals audio frame size, which means sync queue must
1819 // have been enabled during encoder creation
1820
2/2
✓ Branch 0 taken 192 times.
✓ Branch 1 taken 8056 times.
8248 if (ret > 0) {
1821 SchSyncQueue *sq;
1822
1823
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 192 times.
192 av_assert0(enc->sq_idx[0] >= 0);
1824 192 sq = &sch->sq_enc[enc->sq_idx[0]];
1825
1826 192 pthread_mutex_lock(&sq->lock);
1827
1828 192 sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1829
1830 192 pthread_mutex_unlock(&sq->lock);
1831 }
1832
1833 8248 return 0;
1834 }
1835
1836 545187 static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1837 {
1838 int ret;
1839
1840
2/2
✓ Branch 0 taken 20141 times.
✓ Branch 1 taken 525046 times.
545187 if (!frame) {
1841 20141 tq_send_finish(enc->queue, 0);
1842 20141 return 0;
1843 }
1844
1845
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 525046 times.
525046 if (enc->in_finished)
1846 return AVERROR_EOF;
1847
1848 525046 ret = tq_send(enc->queue, 0, frame);
1849
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 525045 times.
525046 if (ret < 0)
1850 1 enc->in_finished = 1;
1851
1852 525046 return ret;
1853 }
1854
1855 40528 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1856 {
1857 40528 SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1858 40528 int ret = 0;
1859
1860 // inform the scheduling code that no more input will arrive along this path;
1861 // this is necessary because the sync queue may not send an EOF downstream
1862 // until other streams finish
1863 // TODO: consider a cleaner way of passing this information through
1864 // the pipeline
1865
2/2
✓ Branch 0 taken 6685 times.
✓ Branch 1 taken 33843 times.
40528 if (!frame) {
1866
2/2
✓ Branch 0 taken 6685 times.
✓ Branch 1 taken 6685 times.
13370 for (unsigned i = 0; i < enc->nb_dst; i++) {
1867 SchMux *mux;
1868 SchMuxStream *ms;
1869
1870
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6685 times.
6685 if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1871 continue;
1872
1873 6685 mux = &sch->mux[enc->dst[i].idx];
1874 6685 ms = &mux->streams[enc->dst[i].idx_stream];
1875
1876 6685 pthread_mutex_lock(&sch->schedule_lock);
1877
1878 6685 ms->source_finished = 1;
1879 6685 schedule_update_locked(sch);
1880
1881 6685 pthread_mutex_unlock(&sch->schedule_lock);
1882 }
1883 }
1884
1885 40528 pthread_mutex_lock(&sq->lock);
1886
1887 40528 ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1888
2/2
✓ Branch 0 taken 40527 times.
✓ Branch 1 taken 1 times.
40528 if (ret < 0)
1889 1 goto finish;
1890
1891 117751 while (1) {
1892 SchEnc *enc;
1893
1894 // TODO: the SQ API should be extended to allow returning EOF
1895 // for individual streams
1896 158278 ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1897
2/2
✓ Branch 0 taken 40527 times.
✓ Branch 1 taken 117751 times.
158278 if (ret < 0) {
1898
2/2
✓ Branch 0 taken 9667 times.
✓ Branch 1 taken 30860 times.
40527 ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1899 40527 break;
1900 }
1901
1902 117751 enc = &sch->enc[sq->enc_idx[ret]];
1903 117751 ret = send_to_enc_thread(sch, enc, sq->frame);
1904
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 117751 times.
117751 if (ret < 0) {
1905 av_frame_unref(sq->frame);
1906 if (ret != AVERROR_EOF)
1907 break;
1908
1909 sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1910 continue;
1911 }
1912 }
1913
1914
2/2
✓ Branch 0 taken 30860 times.
✓ Branch 1 taken 9667 times.
40527 if (ret < 0) {
1915 // close all encoders fed from this sync queue
1916
2/2
✓ Branch 0 taken 10145 times.
✓ Branch 1 taken 9667 times.
19812 for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1917 10145 int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1918
1919 // if the sync queue error is EOF and closing the encoder
1920 // produces a more serious error, make sure to pick the latter
1921
2/4
✓ Branch 0 taken 10145 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 10145 times.
✗ Branch 3 not taken.
10145 ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1922 }
1923 }
1924
1925 40527 finish:
1926 40528 pthread_mutex_unlock(&sq->lock);
1927
1928 40528 return ret;
1929 }
1930
1931 457822 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1932 {
1933
6/6
✓ Branch 0 taken 456737 times.
✓ Branch 1 taken 1085 times.
✓ Branch 2 taken 440098 times.
✓ Branch 3 taken 16639 times.
✓ Branch 4 taken 8248 times.
✓ Branch 5 taken 431850 times.
457822 if (enc->open_cb && frame && !enc->opened) {
1934 8248 int ret = enc_open(sch, enc, frame);
1935
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8248 times.
8248 if (ret < 0)
1936 return ret;
1937 8248 enc->opened = 1;
1938
1939 // discard empty frames that only carry encoder init parameters
1940
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 8245 times.
8248 if (!frame->buf[0]) {
1941 3 av_frame_unref(frame);
1942 3 return 0;
1943 }
1944 }
1945
1946 457819 return (enc->sq_idx[0] >= 0) ?
1947
2/2
✓ Branch 0 taken 40528 times.
✓ Branch 1 taken 417291 times.
875110 send_to_enc_sq (sch, enc, frame) :
1948 417291 send_to_enc_thread(sch, enc, frame);
1949 }
1950
1951 2698 static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
1952 {
1953 2698 PreMuxQueue *q = &ms->pre_mux_queue;
1954 2698 AVPacket *tmp_pkt = NULL;
1955 int ret;
1956
1957
2/2
✓ Branch 1 taken 91 times.
✓ Branch 2 taken 2607 times.
2698 if (!av_fifo_can_write(q->fifo)) {
1958 91 size_t packets = av_fifo_can_read(q->fifo);
1959
1/2
✓ Branch 0 taken 91 times.
✗ Branch 1 not taken.
91 size_t pkt_size = pkt ? pkt->size : 0;
1960 91 int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1961
2/2
✓ Branch 0 taken 86 times.
✓ Branch 1 taken 5 times.
91 size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1962 91 size_t new_size = FFMIN(2 * packets, max_packets);
1963
1964
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 91 times.
91 if (new_size <= packets) {
1965 av_log(mux, AV_LOG_ERROR,
1966 "Too many packets buffered for output stream.\n");
1967 return AVERROR_BUFFER_TOO_SMALL;
1968 }
1969 91 ret = av_fifo_grow2(q->fifo, new_size - packets);
1970
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 91 times.
91 if (ret < 0)
1971 return ret;
1972 }
1973
1974
2/2
✓ Branch 0 taken 2659 times.
✓ Branch 1 taken 39 times.
2698 if (pkt) {
1975 2659 tmp_pkt = av_packet_alloc();
1976
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2659 times.
2659 if (!tmp_pkt)
1977 return AVERROR(ENOMEM);
1978
1979 2659 av_packet_move_ref(tmp_pkt, pkt);
1980 2659 q->data_size += tmp_pkt->size;
1981 }
1982 2698 av_fifo_write(q->fifo, &tmp_pkt, 1);
1983
1984 2698 return 0;
1985 }
1986
1987 599188 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1988 AVPacket *pkt)
1989 {
1990 599188 SchMuxStream *ms = &mux->streams[stream_idx];
1991
2/2
✓ Branch 0 taken 581802 times.
✓ Branch 1 taken 8341 times.
590143 int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1992
2/2
✓ Branch 0 taken 590143 times.
✓ Branch 1 taken 9045 times.
1189331 av_rescale_q(pkt->dts + pkt->duration, pkt->time_base, AV_TIME_BASE_Q) :
1993 AV_NOPTS_VALUE;
1994
1995 // queue the packet if the muxer cannot be started yet
1996
2/2
✓ Branch 0 taken 2709 times.
✓ Branch 1 taken 596479 times.
599188 if (!atomic_load(&mux->mux_started)) {
1997 2709 int queued = 0;
1998
1999 // the muxer could have started between the above atomic check and
2000 // locking the mutex, then this block falls through to normal send path
2001 2709 pthread_mutex_lock(&sch->mux_ready_lock);
2002
2003
2/2
✓ Branch 0 taken 2698 times.
✓ Branch 1 taken 11 times.
2709 if (!atomic_load(&mux->mux_started)) {
2004 2698 int ret = mux_queue_packet(mux, ms, pkt);
2005
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2698 times.
2698 queued = ret < 0 ? ret : 1;
2006 }
2007
2008 2709 pthread_mutex_unlock(&sch->mux_ready_lock);
2009
2010
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2709 times.
2709 if (queued < 0)
2011 return queued;
2012
2/2
✓ Branch 0 taken 2698 times.
✓ Branch 1 taken 11 times.
2709 else if (queued)
2013 2698 goto update_schedule;
2014 }
2015
2016
2/2
✓ Branch 0 taken 587484 times.
✓ Branch 1 taken 9006 times.
596490 if (pkt) {
2017 int ret;
2018
2019
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 587484 times.
587484 if (ms->init_eof)
2020 return AVERROR_EOF;
2021
2022 587484 ret = tq_send(mux->queue, stream_idx, pkt);
2023
2/2
✓ Branch 0 taken 69 times.
✓ Branch 1 taken 587415 times.
587484 if (ret < 0)
2024 69 return ret;
2025 } else
2026 9006 tq_send_finish(mux->queue, stream_idx);
2027
2028 599119 update_schedule:
2029 // TODO: use atomics to check whether this changes trailing dts
2030 // to avoid locking unnecessarily
2031
4/4
✓ Branch 0 taken 17386 times.
✓ Branch 1 taken 581733 times.
✓ Branch 2 taken 9045 times.
✓ Branch 3 taken 8341 times.
599119 if (dts != AV_NOPTS_VALUE || !pkt) {
2032 590778 pthread_mutex_lock(&sch->schedule_lock);
2033
2034
2/2
✓ Branch 0 taken 581733 times.
✓ Branch 1 taken 9045 times.
590778 if (pkt) ms->last_dts = dts;
2035 9045 else ms->source_finished = 1;
2036
2037 590778 schedule_update_locked(sch);
2038
2039 590778 pthread_mutex_unlock(&sch->schedule_lock);
2040 }
2041
2042 599119 return 0;
2043 }
2044
2045 static int
2046 513870 demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2047 uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
2048 {
2049 int ret;
2050
2051
2/2
✓ Branch 0 taken 3223 times.
✓ Branch 1 taken 510647 times.
513870 if (*dst_finished)
2052 3223 return AVERROR_EOF;
2053
2054
4/4
✓ Branch 0 taken 506116 times.
✓ Branch 1 taken 4531 times.
✓ Branch 2 taken 71245 times.
✓ Branch 3 taken 434871 times.
510647 if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
2055
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 71243 times.
71245 (flags & DEMUX_SEND_STREAMCOPY_EOF)) {
2056 2 av_packet_unref(pkt);
2057 2 pkt = NULL;
2058 }
2059
2060
2/2
✓ Branch 0 taken 4533 times.
✓ Branch 1 taken 506114 times.
510647 if (!pkt)
2061 4533 goto finish;
2062
2063 1012228 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2064
2/2
✓ Branch 0 taken 71243 times.
✓ Branch 1 taken 434871 times.
506114 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2065 434871 tq_send(sch->dec[dst.idx].queue, 0, pkt);
2066
2/2
✓ Branch 0 taken 3221 times.
✓ Branch 1 taken 502893 times.
506114 if (ret == AVERROR_EOF)
2067 3221 goto finish;
2068
2069 502893 return ret;
2070
2071 7754 finish:
2072
2/2
✓ Branch 0 taken 755 times.
✓ Branch 1 taken 6999 times.
7754 if (dst.type == SCH_NODE_TYPE_MUX)
2073 755 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2074 else
2075 6999 tq_send_finish(sch->dec[dst.idx].queue, 0);
2076
2077 7754 *dst_finished = 1;
2078 7754 return AVERROR_EOF;
2079 }
2080
2081 513403 static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds,
2082 AVPacket *pkt, unsigned flags)
2083 {
2084 513403 unsigned nb_done = 0;
2085
2086
2/2
✓ Branch 0 taken 513870 times.
✓ Branch 1 taken 513403 times.
1027273 for (unsigned i = 0; i < ds->nb_dst; i++) {
2087 513870 AVPacket *to_send = pkt;
2088 513870 uint8_t *finished = &ds->dst_finished[i];
2089
2090 int ret;
2091
2092 // sending a packet consumes it, so make a temporary reference if needed
2093
4/4
✓ Branch 0 taken 506116 times.
✓ Branch 1 taken 7754 times.
✓ Branch 2 taken 441 times.
✓ Branch 3 taken 505675 times.
513870 if (pkt && i < ds->nb_dst - 1) {
2094 441 to_send = d->send_pkt;
2095
2096 441 ret = av_packet_ref(to_send, pkt);
2097
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 441 times.
441 if (ret < 0)
2098 return ret;
2099 }
2100
2101 513870 ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
2102
2/2
✓ Branch 0 taken 506116 times.
✓ Branch 1 taken 7754 times.
513870 if (to_send)
2103 506116 av_packet_unref(to_send);
2104
2/2
✓ Branch 0 taken 10977 times.
✓ Branch 1 taken 502893 times.
513870 if (ret == AVERROR_EOF)
2105 10977 nb_done++;
2106
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 502893 times.
502893 else if (ret < 0)
2107 return ret;
2108 }
2109
2110
2/2
✓ Branch 0 taken 10951 times.
✓ Branch 1 taken 502452 times.
513403 return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
2111 }
2112
2113 11 static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
2114 {
2115 11 Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
2116
2117
3/6
✓ Branch 0 taken 11 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 11 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 11 times.
11 av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
2118
2119
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 11 times.
25 for (unsigned i = 0; i < d->nb_streams; i++) {
2120 14 SchDemuxStream *ds = &d->streams[i];
2121
2122
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 14 times.
28 for (unsigned j = 0; j < ds->nb_dst; j++) {
2123 14 const SchedulerNode *dst = &ds->dst[j];
2124 SchDec *dec;
2125 int ret;
2126
2127
3/4
✓ Branch 0 taken 14 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
✓ Branch 3 taken 6 times.
14 if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
2128 8 continue;
2129
2130 6 dec = &sch->dec[dst->idx];
2131
2132 6 ret = tq_send(dec->queue, 0, pkt);
2133
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0)
2134 return ret;
2135
2136
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 3 times.
6 if (dec->queue_end_ts) {
2137 Timestamp ts;
2138 3 ret = av_thread_message_queue_recv(dec->queue_end_ts, &ts, 0);
2139
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret < 0)
2140 return ret;
2141
2142
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (max_end_ts.ts == AV_NOPTS_VALUE ||
2143 (ts.ts != AV_NOPTS_VALUE &&
2144 av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
2145 3 max_end_ts = ts;
2146
2147 }
2148 }
2149 }
2150
2151 11 pkt->pts = max_end_ts.ts;
2152 11 pkt->time_base = max_end_ts.tb;
2153
2154 11 return 0;
2155 }
2156
2157 506087 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
2158 unsigned flags)
2159 {
2160 SchDemux *d;
2161 int terminate;
2162
2163
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 506087 times.
506087 av_assert0(demux_idx < sch->nb_demux);
2164 506087 d = &sch->demux[demux_idx];
2165
2166 506087 terminate = waiter_wait(sch, &d->waiter);
2167
2/2
✓ Branch 0 taken 401 times.
✓ Branch 1 taken 505686 times.
506087 if (terminate)
2168 401 return AVERROR_EXIT;
2169
2170 // flush the downstreams after seek
2171
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 505675 times.
505686 if (pkt->stream_index == -1)
2172 11 return demux_flush(sch, d, pkt);
2173
2174
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 505675 times.
505675 av_assert0(pkt->stream_index < d->nb_streams);
2175
2176 505675 return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
2177 }
2178
2179 7447 static int demux_done(Scheduler *sch, unsigned demux_idx)
2180 {
2181 7447 SchDemux *d = &sch->demux[demux_idx];
2182 7447 int ret = 0;
2183
2184
2/2
✓ Branch 0 taken 7728 times.
✓ Branch 1 taken 7447 times.
15175 for (unsigned i = 0; i < d->nb_streams; i++) {
2185 7728 int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
2186
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7728 times.
7728 if (err != AVERROR_EOF)
2187 ret = err_merge(ret, err);
2188 }
2189
2190 7447 pthread_mutex_lock(&sch->schedule_lock);
2191
2192 7447 d->task_exited = 1;
2193
2194 7447 schedule_update_locked(sch);
2195
2196 7447 pthread_mutex_unlock(&sch->schedule_lock);
2197
2198 7447 return ret;
2199 }
2200
2201 607398 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2202 {
2203 SchMux *mux;
2204 int ret, stream_idx;
2205
2206
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 607398 times.
607398 av_assert0(mux_idx < sch->nb_mux);
2207 607398 mux = &sch->mux[mux_idx];
2208
2209 607398 ret = tq_receive(mux->queue, &stream_idx, pkt);
2210 607398 pkt->stream_index = stream_idx;
2211 607398 return ret;
2212 }
2213
2214 220 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2215 {
2216 SchMux *mux;
2217
2218
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 220 times.
220 av_assert0(mux_idx < sch->nb_mux);
2219 220 mux = &sch->mux[mux_idx];
2220
2221
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 220 times.
220 av_assert0(stream_idx < mux->nb_streams);
2222 220 tq_receive_finish(mux->queue, stream_idx);
2223
2224 220 pthread_mutex_lock(&sch->schedule_lock);
2225 220 mux->streams[stream_idx].source_finished = 1;
2226
2227 220 schedule_update_locked(sch);
2228
2229 220 pthread_mutex_unlock(&sch->schedule_lock);
2230 220 }
2231
2232 549905 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2233 const AVPacket *pkt)
2234 {
2235 SchMux *mux;
2236 SchMuxStream *ms;
2237
2238
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 549905 times.
549905 av_assert0(mux_idx < sch->nb_mux);
2239 549905 mux = &sch->mux[mux_idx];
2240
2241
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 549905 times.
549905 av_assert0(stream_idx < mux->nb_streams);
2242 549905 ms = &mux->streams[stream_idx];
2243
2244
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 549905 times.
549910 for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2245 5 SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2246 int ret;
2247
2248 5 ret = av_packet_copy_props(mux->sub_heartbeat_pkt, pkt);
2249
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
5 if (ret < 0)
2250 return ret;
2251
2252 5 tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2253 }
2254
2255 549905 return 0;
2256 }
2257
2258 8540 static int mux_done(Scheduler *sch, unsigned mux_idx)
2259 {
2260 8540 SchMux *mux = &sch->mux[mux_idx];
2261
2262 8540 pthread_mutex_lock(&sch->schedule_lock);
2263
2264
2/2
✓ Branch 0 taken 9045 times.
✓ Branch 1 taken 8540 times.
17585 for (unsigned i = 0; i < mux->nb_streams; i++) {
2265 9045 tq_receive_finish(mux->queue, i);
2266 9045 mux->streams[i].source_finished = 1;
2267 }
2268
2269 8540 schedule_update_locked(sch);
2270
2271 8540 pthread_mutex_unlock(&sch->schedule_lock);
2272
2273 8540 pthread_mutex_lock(&sch->finish_lock);
2274
2275
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8540 times.
8540 av_assert0(sch->nb_mux_done < sch->nb_mux);
2276 8540 sch->nb_mux_done++;
2277
2278 8540 pthread_cond_signal(&sch->finish_cond);
2279
2280 8540 pthread_mutex_unlock(&sch->finish_lock);
2281
2282 8540 return 0;
2283 }
2284
2285 407182 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2286 {
2287 SchDec *dec;
2288 int ret, dummy;
2289
2290
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 407182 times.
407182 av_assert0(dec_idx < sch->nb_dec);
2291 407182 dec = &sch->dec[dec_idx];
2292
2293 // the decoder should have given us post-flush end timestamp in pkt
2294
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 407179 times.
407182 if (dec->expect_end_ts) {
2295 3 Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2296 3 ret = av_thread_message_queue_send(dec->queue_end_ts, &ts, 0);
2297
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret < 0)
2298 return ret;
2299
2300 3 dec->expect_end_ts = 0;
2301 }
2302
2303 407182 ret = tq_receive(dec->queue, &dummy, pkt);
2304
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 407182 times.
407182 av_assert0(dummy <= 0);
2305
2306 // got a flush packet, on the next call to this function the decoder
2307 // will give us post-flush end timestamp
2308
7/8
✓ Branch 0 taken 403746 times.
✓ Branch 1 taken 3436 times.
✓ Branch 2 taken 958 times.
✓ Branch 3 taken 402788 times.
✓ Branch 4 taken 958 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 3 times.
✓ Branch 7 taken 955 times.
407182 if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2309 3 dec->expect_end_ts = 1;
2310
2311 407182 return ret;
2312 }
2313
2314 435954 static int send_to_filter(Scheduler *sch, SchFilterGraph *fg,
2315 unsigned in_idx, AVFrame *frame)
2316 {
2317
2/2
✓ Branch 0 taken 428924 times.
✓ Branch 1 taken 7030 times.
435954 if (frame)
2318 428924 return tq_send(fg->queue, in_idx, frame);
2319
2320 7030 pthread_mutex_lock(&sch->schedule_lock);
2321
2322
2/2
✓ Branch 0 taken 7029 times.
✓ Branch 1 taken 1 times.
7030 if (!fg->inputs[in_idx].send_finished) {
2323 7029 fg->inputs[in_idx].send_finished = 1;
2324 7029 tq_send_finish(fg->queue, in_idx);
2325
2326 // close the control stream when all actual inputs are done
2327
2/2
✓ Branch 0 taken 6926 times.
✓ Branch 1 taken 103 times.
7029 if (++fg->nb_inputs_finished_send == fg->nb_inputs)
2328 6926 tq_send_finish(fg->queue, fg->nb_inputs);
2329
2330 7029 schedule_update_locked(sch);
2331 }
2332
2333 7030 pthread_mutex_unlock(&sch->schedule_lock);
2334 7030 return 0;
2335 }
2336
2337 440703 static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2338 uint8_t *dst_finished, AVFrame *frame)
2339 {
2340 int ret;
2341
2342
2/2
✓ Branch 0 taken 7287 times.
✓ Branch 1 taken 433416 times.
440703 if (*dst_finished)
2343 7287 return AVERROR_EOF;
2344
2345
2/2
✓ Branch 0 taken 3450 times.
✓ Branch 1 taken 429966 times.
433416 if (!frame)
2346 3450 goto finish;
2347
2348 859932 ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2349
2/2
✓ Branch 0 taken 428923 times.
✓ Branch 1 taken 1043 times.
429966 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2350 1043 send_to_enc(sch, &sch->enc[dst.idx], frame);
2351
2/2
✓ Branch 0 taken 3620 times.
✓ Branch 1 taken 426346 times.
429966 if (ret == AVERROR_EOF)
2352 3620 goto finish;
2353
2354 426346 return ret;
2355
2356 7070 finish:
2357
2/2
✓ Branch 0 taken 7028 times.
✓ Branch 1 taken 42 times.
7070 if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2358 7028 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2359 else
2360 42 send_to_enc(sch, &sch->enc[dst.idx], NULL);
2361
2362 7070 *dst_finished = 1;
2363
2364 7070 return AVERROR_EOF;
2365 }
2366
2367 432009 int sch_dec_send(Scheduler *sch, unsigned dec_idx,
2368 unsigned out_idx, AVFrame *frame)
2369 {
2370 SchDec *dec;
2371 SchDecOutput *o;
2372 int ret;
2373 432009 unsigned nb_done = 0;
2374
2375
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 432009 times.
432009 av_assert0(dec_idx < sch->nb_dec);
2376 432009 dec = &sch->dec[dec_idx];
2377
2378
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 432009 times.
432009 av_assert0(out_idx < dec->nb_outputs);
2379 432009 o = &dec->outputs[out_idx];
2380
2381
2/2
✓ Branch 0 taken 433633 times.
✓ Branch 1 taken 432009 times.
865642 for (unsigned i = 0; i < o->nb_dst; i++) {
2382 433633 uint8_t *finished = &o->dst_finished[i];
2383 433633 AVFrame *to_send = frame;
2384
2385 // sending a frame consumes it, so make a temporary reference if needed
2386
2/2
✓ Branch 0 taken 1624 times.
✓ Branch 1 taken 432009 times.
433633 if (i < o->nb_dst - 1) {
2387 1624 to_send = dec->send_frame;
2388
2389 // frame may sometimes contain props only,
2390 // e.g. to signal EOF timestamp
2391
2/2
✓ Branch 0 taken 1514 times.
✓ Branch 1 taken 110 times.
1624 ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2392 110 av_frame_copy_props(to_send, frame);
2393
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1624 times.
1624 if (ret < 0)
2394 return ret;
2395 }
2396
2397 433633 ret = dec_send_to_dst(sch, o->dst[i], finished, to_send);
2398
2/2
✓ Branch 0 taken 7287 times.
✓ Branch 1 taken 426346 times.
433633 if (ret < 0) {
2399 7287 av_frame_unref(to_send);
2400
1/2
✓ Branch 0 taken 7287 times.
✗ Branch 1 not taken.
7287 if (ret == AVERROR_EOF) {
2401 7287 nb_done++;
2402 7287 continue;
2403 }
2404 return ret;
2405 }
2406 }
2407
2408
2/2
✓ Branch 0 taken 7137 times.
✓ Branch 1 taken 424872 times.
432009 return (nb_done == o->nb_dst) ? AVERROR_EOF : 0;
2409 }
2410
2411 7000 static int dec_done(Scheduler *sch, unsigned dec_idx)
2412 {
2413 7000 SchDec *dec = &sch->dec[dec_idx];
2414 7000 int ret = 0;
2415
2416 7000 tq_receive_finish(dec->queue, 0);
2417
2418 // make sure our source does not get stuck waiting for end timestamps
2419 // that will never arrive
2420
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6999 times.
7000 if (dec->queue_end_ts)
2421 1 av_thread_message_queue_set_err_recv(dec->queue_end_ts, AVERROR_EOF);
2422
2423
2/2
✓ Branch 0 taken 7006 times.
✓ Branch 1 taken 7000 times.
14006 for (unsigned i = 0; i < dec->nb_outputs; i++) {
2424 7006 SchDecOutput *o = &dec->outputs[i];
2425
2426
2/2
✓ Branch 0 taken 7070 times.
✓ Branch 1 taken 7006 times.
14076 for (unsigned j = 0; j < o->nb_dst; j++) {
2427 7070 int err = dec_send_to_dst(sch, o->dst[j], &o->dst_finished[j], NULL);
2428
2/4
✓ Branch 0 taken 7070 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7070 times.
7070 if (err < 0 && err != AVERROR_EOF)
2429 ret = err_merge(ret, err);
2430 }
2431 }
2432
2433 7000 return ret;
2434 }
2435
2436 533334 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2437 {
2438 SchEnc *enc;
2439 int ret, dummy;
2440
2441
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 533334 times.
533334 av_assert0(enc_idx < sch->nb_enc);
2442 533334 enc = &sch->enc[enc_idx];
2443
2444 533334 ret = tq_receive(enc->queue, &dummy, frame);
2445
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 533334 times.
533334 av_assert0(dummy <= 0);
2446
2447 533334 return ret;
2448 }
2449
2450 527263 static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2451 uint8_t *dst_finished, AVPacket *pkt)
2452 {
2453 int ret;
2454
2455
2/2
✓ Branch 0 taken 24 times.
✓ Branch 1 taken 527239 times.
527263 if (*dst_finished)
2456 24 return AVERROR_EOF;
2457
2458
2/2
✓ Branch 0 taken 8289 times.
✓ Branch 1 taken 518950 times.
527239 if (!pkt)
2459 8289 goto finish;
2460
2461 1037900 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2462
2/2
✓ Branch 0 taken 518900 times.
✓ Branch 1 taken 50 times.
518950 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2463 50 tq_send(sch->dec[dst.idx].queue, 0, pkt);
2464
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 518948 times.
518950 if (ret == AVERROR_EOF)
2465 2 goto finish;
2466
2467 518948 return ret;
2468
2469 8291 finish:
2470
2/2
✓ Branch 0 taken 8290 times.
✓ Branch 1 taken 1 times.
8291 if (dst.type == SCH_NODE_TYPE_MUX)
2471 8290 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2472 else
2473 1 tq_send_finish(sch->dec[dst.idx].queue, 0);
2474
2475 8291 *dst_finished = 1;
2476
2477 8291 return AVERROR_EOF;
2478 }
2479
2480 518922 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2481 {
2482 SchEnc *enc;
2483 int ret;
2484
2485
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 518922 times.
518922 av_assert0(enc_idx < sch->nb_enc);
2486 518922 enc = &sch->enc[enc_idx];
2487
2488
2/2
✓ Branch 0 taken 518972 times.
✓ Branch 1 taken 518922 times.
1037894 for (unsigned i = 0; i < enc->nb_dst; i++) {
2489 518972 uint8_t *finished = &enc->dst_finished[i];
2490 518972 AVPacket *to_send = pkt;
2491
2492 // sending a packet consumes it, so make a temporary reference if needed
2493
2/2
✓ Branch 0 taken 50 times.
✓ Branch 1 taken 518922 times.
518972 if (i < enc->nb_dst - 1) {
2494 50 to_send = enc->send_pkt;
2495
2496 50 ret = av_packet_ref(to_send, pkt);
2497
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50 times.
50 if (ret < 0)
2498 return ret;
2499 }
2500
2501 518972 ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2502
2/2
✓ Branch 0 taken 24 times.
✓ Branch 1 taken 518948 times.
518972 if (ret < 0) {
2503 24 av_packet_unref(to_send);
2504
1/2
✓ Branch 0 taken 24 times.
✗ Branch 1 not taken.
24 if (ret == AVERROR_EOF)
2505 24 continue;
2506 return ret;
2507 }
2508 }
2509
2510 518922 return 0;
2511 }
2512
2513 8290 static int enc_done(Scheduler *sch, unsigned enc_idx)
2514 {
2515 8290 SchEnc *enc = &sch->enc[enc_idx];
2516 8290 int ret = 0;
2517
2518 8290 tq_receive_finish(enc->queue, 0);
2519
2520
2/2
✓ Branch 0 taken 8291 times.
✓ Branch 1 taken 8290 times.
16581 for (unsigned i = 0; i < enc->nb_dst; i++) {
2521 8291 int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2522
2/4
✓ Branch 0 taken 8291 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 8291 times.
8291 if (err < 0 && err != AVERROR_EOF)
2523 ret = err_merge(ret, err);
2524 }
2525
2526 8290 return ret;
2527 }
2528
2529 449217 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
2530 unsigned *in_idx, AVFrame *frame)
2531 {
2532 SchFilterGraph *fg;
2533
2534
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 449217 times.
449217 av_assert0(fg_idx < sch->nb_filters);
2535 449217 fg = &sch->filters[fg_idx];
2536
2537
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 449217 times.
449217 av_assert0(*in_idx <= fg->nb_inputs);
2538
2539 // update scheduling to account for desired input stream, if it changed
2540 //
2541 // this check needs no locking because only the filtering thread
2542 // updates this value
2543
2/2
✓ Branch 0 taken 8334 times.
✓ Branch 1 taken 440883 times.
449217 if (*in_idx != fg->best_input) {
2544 8334 pthread_mutex_lock(&sch->schedule_lock);
2545
2546 8334 fg->best_input = *in_idx;
2547 8334 schedule_update_locked(sch);
2548
2549 8334 pthread_mutex_unlock(&sch->schedule_lock);
2550 }
2551
2552
2/2
✓ Branch 0 taken 418427 times.
✓ Branch 1 taken 30790 times.
449217 if (*in_idx == fg->nb_inputs) {
2553 30790 int terminate = waiter_wait(sch, &fg->waiter);
2554
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 30790 times.
30790 return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
2555 }
2556
2557 27 while (1) {
2558 int ret, idx;
2559
2560 418454 ret = tq_receive(fg->queue, &idx, frame);
2561
2/2
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 418446 times.
418454 if (idx < 0)
2562 418427 return AVERROR_EOF;
2563
2/2
✓ Branch 0 taken 418419 times.
✓ Branch 1 taken 27 times.
418446 else if (ret >= 0) {
2564 418419 *in_idx = idx;
2565 418419 return 0;
2566 }
2567
2568 // disregard EOFs for specific streams - they should always be
2569 // preceded by an EOF frame
2570 }
2571 }
2572
2573 94 void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
2574 {
2575 SchFilterGraph *fg;
2576 SchFilterIn *fi;
2577
2578
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 94 times.
94 av_assert0(fg_idx < sch->nb_filters);
2579 94 fg = &sch->filters[fg_idx];
2580
2581
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 94 times.
94 av_assert0(in_idx < fg->nb_inputs);
2582 94 fi = &fg->inputs[in_idx];
2583
2584 94 pthread_mutex_lock(&sch->schedule_lock);
2585
2586
1/2
✓ Branch 0 taken 94 times.
✗ Branch 1 not taken.
94 if (!fi->receive_finished) {
2587 94 fi->receive_finished = 1;
2588 94 tq_receive_finish(fg->queue, in_idx);
2589
2590 // close the control stream when all actual inputs are done
2591
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 93 times.
94 if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
2592 1 tq_receive_finish(fg->queue, fg->nb_inputs);
2593
2594 94 schedule_update_locked(sch);
2595 }
2596
2597 94 pthread_mutex_unlock(&sch->schedule_lock);
2598 94 }
2599
2600 445283 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
2601 {
2602 SchFilterGraph *fg;
2603 SchedulerNode dst;
2604 int ret;
2605
2606
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 445283 times.
445283 av_assert0(fg_idx < sch->nb_filters);
2607 445283 fg = &sch->filters[fg_idx];
2608
2609
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 445283 times.
445283 av_assert0(out_idx < fg->nb_outputs);
2610 445283 dst = fg->outputs[out_idx].dst;
2611
2612
2/2
✓ Branch 0 taken 445281 times.
✓ Branch 1 taken 2 times.
445283 if (dst.type == SCH_NODE_TYPE_ENC) {
2613 445281 ret = send_to_enc(sch, &sch->enc[dst.idx], frame);
2614
2/2
✓ Branch 0 taken 3208 times.
✓ Branch 1 taken 442073 times.
445281 if (ret == AVERROR_EOF)
2615 3208 send_to_enc(sch, &sch->enc[dst.idx], NULL);
2616 } else {
2617 2 ret = send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
2618
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (ret == AVERROR_EOF)
2619 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2620 }
2621 445283 return ret;
2622 }
2623
2624 8114 static int filter_done(Scheduler *sch, unsigned fg_idx)
2625 {
2626 8114 SchFilterGraph *fg = &sch->filters[fg_idx];
2627 8114 int ret = 0;
2628
2629
2/2
✓ Branch 0 taken 15143 times.
✓ Branch 1 taken 8114 times.
23257 for (unsigned i = 0; i <= fg->nb_inputs; i++)
2630 15143 tq_receive_finish(fg->queue, i);
2631
2632
2/2
✓ Branch 0 taken 8249 times.
✓ Branch 1 taken 8114 times.
16363 for (unsigned i = 0; i < fg->nb_outputs; i++) {
2633 8249 SchedulerNode dst = fg->outputs[i].dst;
2634 16498 int err = (dst.type == SCH_NODE_TYPE_ENC) ?
2635
2/2
✓ Branch 0 taken 8248 times.
✓ Branch 1 taken 1 times.
8249 send_to_enc (sch, &sch->enc[dst.idx], NULL) :
2636 1 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2637
2638
3/4
✓ Branch 0 taken 3254 times.
✓ Branch 1 taken 4995 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 3254 times.
8249 if (err < 0 && err != AVERROR_EOF)
2639 ret = err_merge(ret, err);
2640 }
2641
2642 8114 pthread_mutex_lock(&sch->schedule_lock);
2643
2644 8114 fg->task_exited = 1;
2645
2646 8114 schedule_update_locked(sch);
2647
2648 8114 pthread_mutex_unlock(&sch->schedule_lock);
2649
2650 8114 return ret;
2651 }
2652
2653 int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
2654 {
2655 SchFilterGraph *fg;
2656
2657 av_assert0(fg_idx < sch->nb_filters);
2658 fg = &sch->filters[fg_idx];
2659
2660 return send_to_filter(sch, fg, fg->nb_inputs, frame);
2661 }
2662
2663 6921 void sch_filter_choke_inputs(Scheduler *sch, unsigned fg_idx)
2664 {
2665 SchFilterGraph *fg;
2666
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6921 times.
6921 av_assert0(fg_idx < sch->nb_filters);
2667 6921 fg = &sch->filters[fg_idx];
2668
2669 6921 pthread_mutex_lock(&sch->schedule_lock);
2670 6921 fg->best_input = fg->nb_inputs;
2671 6921 schedule_update_locked(sch);
2672 6921 pthread_mutex_unlock(&sch->schedule_lock);
2673 6921 }
2674
2675 39391 static int task_cleanup(Scheduler *sch, SchedulerNode node)
2676 {
2677
5/6
✓ Branch 0 taken 7447 times.
✓ Branch 1 taken 8540 times.
✓ Branch 2 taken 7000 times.
✓ Branch 3 taken 8290 times.
✓ Branch 4 taken 8114 times.
✗ Branch 5 not taken.
39391 switch (node.type) {
2678 7447 case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
2679 8540 case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
2680 7000 case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
2681 8290 case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
2682 8114 case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
2683 default: av_unreachable("Invalid node type?");
2684 }
2685 }
2686
2687 39377 static void *task_wrapper(void *arg)
2688 {
2689 39377 SchTask *task = arg;
2690 39377 Scheduler *sch = task->parent;
2691 int ret;
2692 39377 int err = 0;
2693
2694 39377 ret = task->func(task->func_arg);
2695
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 39376 times.
39377 if (ret < 0)
2696 1 av_log(task->func_arg, AV_LOG_ERROR,
2697 1 "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
2698
2699 39377 err = task_cleanup(sch, task->node);
2700 39377 ret = err_merge(ret, err);
2701
2702 // EOF is considered normal termination
2703
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 39377 times.
39377 if (ret == AVERROR_EOF)
2704 ret = 0;
2705
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 39376 times.
39377 if (ret < 0) {
2706 1 pthread_mutex_lock(&sch->finish_lock);
2707 1 sch->task_failed = 1;
2708 1 pthread_cond_signal(&sch->finish_cond);
2709 1 pthread_mutex_unlock(&sch->finish_lock);
2710 }
2711
2712
4/4
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 39376 times.
✓ Branch 2 taken 1 times.
✓ Branch 3 taken 39376 times.
39378 av_log(task->func_arg, ret < 0 ? AV_LOG_ERROR : AV_LOG_VERBOSE,
2713 "Terminating thread with return code %d (%s)\n", ret,
2714 1 ret < 0 ? av_err2str(ret) : "success");
2715
2716 39377 return (void*)(intptr_t)ret;
2717 }
2718
2719 39394 static int task_stop(Scheduler *sch, SchTask *task)
2720 {
2721 int ret;
2722 void *thread_ret;
2723
2724
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 39391 times.
39394 if (!task->parent)
2725 3 return 0;
2726
2727
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 39377 times.
39391 if (!task->thread_running)
2728 14 return task_cleanup(sch, task->node);
2729
2730 39377 ret = pthread_join(task->thread, &thread_ret);
2731
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 39377 times.
39377 av_assert0(ret == 0);
2732
2733 39377 task->thread_running = 0;
2734
2735 39377 return (intptr_t)thread_ret;
2736 }
2737
2738 17075 int sch_stop(Scheduler *sch, int64_t *finish_ts)
2739 {
2740 17075 int ret = 0, err;
2741
2742
2/2
✓ Branch 0 taken 8539 times.
✓ Branch 1 taken 8536 times.
17075 if (sch->state != SCH_STATE_STARTED)
2743 8539 return 0;
2744
2745 8536 atomic_store(&sch->terminate, 1);
2746
2747
2/2
✓ Branch 0 taken 17072 times.
✓ Branch 1 taken 8536 times.
25608 for (unsigned type = 0; type < 2; type++)
2748
4/4
✓ Branch 0 taken 15983 times.
✓ Branch 1 taken 16653 times.
✓ Branch 2 taken 15564 times.
✓ Branch 3 taken 17072 times.
32636 for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
2749
2/2
✓ Branch 0 taken 7447 times.
✓ Branch 1 taken 8117 times.
15564 SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
2750 15564 waiter_set(w, 1);
2751
2/2
✓ Branch 0 taken 7447 times.
✓ Branch 1 taken 8117 times.
15564 if (type)
2752 7447 choke_demux(sch, i, 0); // unfreeze to allow draining
2753 }
2754
2755
2/2
✓ Branch 0 taken 7447 times.
✓ Branch 1 taken 8536 times.
15983 for (unsigned i = 0; i < sch->nb_demux; i++) {
2756 7447 SchDemux *d = &sch->demux[i];
2757
2758 7447 err = task_stop(sch, &d->task);
2759 7447 ret = err_merge(ret, err);
2760 }
2761
2762
2/2
✓ Branch 0 taken 7000 times.
✓ Branch 1 taken 8536 times.
15536 for (unsigned i = 0; i < sch->nb_dec; i++) {
2763 7000 SchDec *dec = &sch->dec[i];
2764
2765 7000 err = task_stop(sch, &dec->task);
2766 7000 ret = err_merge(ret, err);
2767 }
2768
2769
2/2
✓ Branch 0 taken 8117 times.
✓ Branch 1 taken 8536 times.
16653 for (unsigned i = 0; i < sch->nb_filters; i++) {
2770 8117 SchFilterGraph *fg = &sch->filters[i];
2771
2772 8117 err = task_stop(sch, &fg->task);
2773 8117 ret = err_merge(ret, err);
2774 }
2775
2776
2/2
✓ Branch 0 taken 8290 times.
✓ Branch 1 taken 8536 times.
16826 for (unsigned i = 0; i < sch->nb_enc; i++) {
2777 8290 SchEnc *enc = &sch->enc[i];
2778
2779 8290 err = task_stop(sch, &enc->task);
2780 8290 ret = err_merge(ret, err);
2781 }
2782
2783
2/2
✓ Branch 0 taken 8540 times.
✓ Branch 1 taken 8536 times.
17076 for (unsigned i = 0; i < sch->nb_mux; i++) {
2784 8540 SchMux *mux = &sch->mux[i];
2785
2786 8540 err = task_stop(sch, &mux->task);
2787 8540 ret = err_merge(ret, err);
2788 }
2789
2790
1/2
✓ Branch 0 taken 8536 times.
✗ Branch 1 not taken.
8536 if (finish_ts)
2791 8536 *finish_ts = progressing_dts(sch, 1);
2792
2793 8536 sch->state = SCH_STATE_STOPPED;
2794
2795 8536 return ret;
2796 }
2797