FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/fftools/ffmpeg_sched.c
Date: 2025-10-19 14:07:46
Exec Total Coverage
Lines: 1104 1270 86.9%
Functions: 68 70 97.1%
Branches: 601 829 72.5%

Line Branch Exec Source
1 /*
2 * Inter-thread scheduling/synchronization.
3 * Copyright (c) 2023 Anton Khirnov
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31
32 #include "libavcodec/packet.h"
33
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
40 #include "libavutil/threadmessage.h"
41 #include "libavutil/time.h"
42
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
46
47 enum QueueType {
48 QUEUE_PACKETS,
49 QUEUE_FRAMES,
50 };
51
52 typedef struct SchWaiter {
53 pthread_mutex_t lock;
54 pthread_cond_t cond;
55 atomic_int choked;
56
57 // the following are internal state of schedule_update_locked() and must not
58 // be accessed outside of it
59 int choked_prev;
60 int choked_next;
61 } SchWaiter;
62
63 typedef struct SchTask {
64 Scheduler *parent;
65 SchedulerNode node;
66
67 SchThreadFunc func;
68 void *func_arg;
69
70 pthread_t thread;
71 int thread_running;
72 } SchTask;
73
74 typedef struct SchDecOutput {
75 SchedulerNode *dst;
76 uint8_t *dst_finished;
77 unsigned nb_dst;
78 } SchDecOutput;
79
80 typedef struct SchDec {
81 const AVClass *class;
82
83 SchedulerNode src;
84
85 SchDecOutput *outputs;
86 unsigned nb_outputs;
87
88 SchTask task;
89 // Queue for receiving input packets, one stream.
90 ThreadQueue *queue;
91
92 // Queue for sending post-flush end timestamps back to the source
93 AVThreadMessageQueue *queue_end_ts;
94 int expect_end_ts;
95
96 // temporary storage used by sch_dec_send()
97 AVFrame *send_frame;
98 } SchDec;
99
100 typedef struct SchSyncQueue {
101 SyncQueue *sq;
102 AVFrame *frame;
103 pthread_mutex_t lock;
104
105 unsigned *enc_idx;
106 unsigned nb_enc_idx;
107 } SchSyncQueue;
108
109 typedef struct SchEnc {
110 const AVClass *class;
111
112 SchedulerNode src;
113 SchedulerNode *dst;
114 uint8_t *dst_finished;
115 unsigned nb_dst;
116
117 // [0] - index of the sync queue in Scheduler.sq_enc,
118 // [1] - index of this encoder in the sq
119 int sq_idx[2];
120
121 /* Opening encoders is somewhat nontrivial due to their interaction with
122 * sync queues, which are (among other things) responsible for maintaining
123 * constant audio frame size, when it is required by the encoder.
124 *
125 * Opening the encoder requires stream parameters, obtained from the first
126 * frame. However, that frame cannot be properly chunked by the sync queue
127 * without knowing the required frame size, which is only available after
128 * opening the encoder.
129 *
130 * This apparent circular dependency is resolved in the following way:
131 * - the caller creating the encoder gives us a callback which opens the
132 * encoder and returns the required frame size (if any)
133 * - when the first frame is sent to the encoder, the sending thread
134 * - calls this callback, opening the encoder
135 * - passes the returned frame size to the sync queue
136 */
137 int (*open_cb)(void *opaque, const AVFrame *frame);
138 int opened;
139
140 SchTask task;
141 // Queue for receiving input frames, one stream.
142 ThreadQueue *queue;
143 // tq_send() to queue returned EOF
144 int in_finished;
145
146 // temporary storage used by sch_enc_send()
147 AVPacket *send_pkt;
148 } SchEnc;
149
150 typedef struct SchDemuxStream {
151 SchedulerNode *dst;
152 uint8_t *dst_finished;
153 unsigned nb_dst;
154 } SchDemuxStream;
155
156 typedef struct SchDemux {
157 const AVClass *class;
158
159 SchDemuxStream *streams;
160 unsigned nb_streams;
161
162 SchTask task;
163 SchWaiter waiter;
164
165 // temporary storage used by sch_demux_send()
166 AVPacket *send_pkt;
167
168 // protected by schedule_lock
169 int task_exited;
170 } SchDemux;
171
172 typedef struct PreMuxQueue {
173 /**
174 * Queue for buffering the packets before the muxer task can be started.
175 */
176 AVFifo *fifo;
177 /**
178 * Maximum number of packets in fifo.
179 */
180 int max_packets;
181 /*
182 * The size of the AVPackets' buffers in queue.
183 * Updated when a packet is either pushed or pulled from the queue.
184 */
185 size_t data_size;
186 /* Threshold after which max_packets will be in effect */
187 size_t data_threshold;
188 } PreMuxQueue;
189
190 typedef struct SchMuxStream {
191 SchedulerNode src;
192
193 unsigned *sub_heartbeat_dst;
194 unsigned nb_sub_heartbeat_dst;
195
196 PreMuxQueue pre_mux_queue;
197
198 // an EOF was generated while flushing the pre-mux queue
199 int init_eof;
200
201 ////////////////////////////////////////////////////////////
202 // The following are protected by Scheduler.schedule_lock //
203
204 /* dts+duration of the last packet sent to this stream
205 in AV_TIME_BASE_Q */
206 int64_t last_dts;
207 // this stream no longer accepts input
208 int source_finished;
209 ////////////////////////////////////////////////////////////
210 } SchMuxStream;
211
212 typedef struct SchMux {
213 const AVClass *class;
214
215 SchMuxStream *streams;
216 unsigned nb_streams;
217 unsigned nb_streams_ready;
218
219 int (*init)(void *arg);
220
221 SchTask task;
222 /**
223 * Set to 1 after starting the muxer task and flushing the
224 * pre-muxing queues.
225 * Set either before any tasks have started, or with
226 * Scheduler.mux_ready_lock held.
227 */
228 atomic_int mux_started;
229 ThreadQueue *queue;
230 unsigned queue_size;
231
232 AVPacket *sub_heartbeat_pkt;
233 } SchMux;
234
235 typedef struct SchFilterIn {
236 SchedulerNode src;
237 int send_finished;
238 int receive_finished;
239 } SchFilterIn;
240
241 typedef struct SchFilterOut {
242 SchedulerNode dst;
243 } SchFilterOut;
244
245 typedef struct SchFilterGraph {
246 const AVClass *class;
247
248 SchFilterIn *inputs;
249 unsigned nb_inputs;
250 atomic_uint nb_inputs_finished_send;
251 unsigned nb_inputs_finished_receive;
252
253 SchFilterOut *outputs;
254 unsigned nb_outputs;
255
256 SchTask task;
257 // input queue, nb_inputs+1 streams
258 // last stream is control
259 ThreadQueue *queue;
260 SchWaiter waiter;
261
262 // protected by schedule_lock
263 unsigned best_input;
264 int task_exited;
265 } SchFilterGraph;
266
267 enum SchedulerState {
268 SCH_STATE_UNINIT,
269 SCH_STATE_STARTED,
270 SCH_STATE_STOPPED,
271 };
272
273 struct Scheduler {
274 const AVClass *class;
275
276 SchDemux *demux;
277 unsigned nb_demux;
278
279 SchMux *mux;
280 unsigned nb_mux;
281
282 unsigned nb_mux_ready;
283 pthread_mutex_t mux_ready_lock;
284
285 unsigned nb_mux_done;
286 unsigned task_failed;
287 pthread_mutex_t finish_lock;
288 pthread_cond_t finish_cond;
289
290
291 SchDec *dec;
292 unsigned nb_dec;
293
294 SchEnc *enc;
295 unsigned nb_enc;
296
297 SchSyncQueue *sq_enc;
298 unsigned nb_sq_enc;
299
300 SchFilterGraph *filters;
301 unsigned nb_filters;
302
303 char *sdp_filename;
304 int sdp_auto;
305
306 enum SchedulerState state;
307 atomic_int terminate;
308
309 pthread_mutex_t schedule_lock;
310
311 atomic_int_least64_t last_dts;
312 };
313
314 /**
315 * Wait until this task is allowed to proceed.
316 *
317 * @retval 0 the caller should proceed
318 * @retval 1 the caller should terminate
319 */
320 496963 static int waiter_wait(Scheduler *sch, SchWaiter *w)
321 {
322 int terminate;
323
324
2/2
✓ Branch 0 taken 494140 times.
✓ Branch 1 taken 2823 times.
496963 if (!atomic_load(&w->choked))
325 494140 return 0;
326
327 2823 pthread_mutex_lock(&w->lock);
328
329
4/4
✓ Branch 0 taken 2852 times.
✓ Branch 1 taken 2429 times.
✓ Branch 2 taken 2458 times.
✓ Branch 3 taken 394 times.
5281 while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
330 2458 pthread_cond_wait(&w->cond, &w->lock);
331
332 2823 terminate = atomic_load(&sch->terminate);
333
334 2823 pthread_mutex_unlock(&w->lock);
335
336 2823 return terminate;
337 }
338
339 58600 static void waiter_set(SchWaiter *w, int choked)
340 {
341 58600 pthread_mutex_lock(&w->lock);
342
343 58600 atomic_store(&w->choked, choked);
344 58600 pthread_cond_signal(&w->cond);
345
346 58600 pthread_mutex_unlock(&w->lock);
347 58600 }
348
349 15354 static int waiter_init(SchWaiter *w)
350 {
351 int ret;
352
353 15354 atomic_init(&w->choked, 0);
354
355 15354 ret = pthread_mutex_init(&w->lock, NULL);
356
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15354 times.
15354 if (ret)
357 return AVERROR(ret);
358
359 15354 ret = pthread_cond_init(&w->cond, NULL);
360
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15354 times.
15354 if (ret)
361 return AVERROR(ret);
362
363 15354 return 0;
364 }
365
366 15354 static void waiter_uninit(SchWaiter *w)
367 {
368 15354 pthread_mutex_destroy(&w->lock);
369 15354 pthread_cond_destroy(&w->cond);
370 15354 }
371
372 31496 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
373 enum QueueType type)
374 {
375 ThreadQueue *tq;
376
377
1/2
✓ Branch 0 taken 31496 times.
✗ Branch 1 not taken.
31496 if (queue_size <= 0) {
378
2/2
✓ Branch 0 taken 16170 times.
✓ Branch 1 taken 15326 times.
31496 if (type == QUEUE_FRAMES)
379 16170 queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
380 else
381 15326 queue_size = DEFAULT_PACKET_THREAD_QUEUE_SIZE;
382 }
383
384
2/2
✓ Branch 0 taken 16170 times.
✓ Branch 1 taken 15326 times.
31496 if (type == QUEUE_FRAMES) {
385 // This queue length is used in the decoder code to ensure that
386 // there are enough entries in fixed-size frame pools to account
387 // for frames held in queues inside the ffmpeg utility. If this
388 // can ever dynamically change then the corresponding decode
389 // code needs to be updated as well.
390
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 16170 times.
16170 av_assert0(queue_size <= DEFAULT_FRAME_THREAD_QUEUE_SIZE);
391 }
392
393 31496 tq = tq_alloc(nb_streams, queue_size,
394 (type == QUEUE_PACKETS) ? THREAD_QUEUE_PACKETS : THREAD_QUEUE_FRAMES);
395
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31496 times.
31496 if (!tq)
396 return AVERROR(ENOMEM);
397
398 31496 *ptq = tq;
399 31496 return 0;
400 }
401
402 static void *task_wrapper(void *arg);
403
404 38838 static int task_start(SchTask *task)
405 {
406 int ret;
407
408 38838 av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
409
410
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38838 times.
38838 av_assert0(!task->thread_running);
411
412 38838 ret = pthread_create(&task->thread, NULL, task_wrapper, task);
413
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38838 times.
38838 if (ret) {
414 av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
415 strerror(ret));
416 return AVERROR(ret);
417 }
418
419 38838 task->thread_running = 1;
420 38838 return 0;
421 }
422
423 38852 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
424 SchThreadFunc func, void *func_arg)
425 {
426 38852 task->parent = sch;
427
428 38852 task->node.type = type;
429 38852 task->node.idx = idx;
430
431 38852 task->func = func;
432 38852 task->func_arg = func_arg;
433 38852 }
434
435 568543 static int64_t trailing_dts(const Scheduler *sch, int count_finished)
436 {
437 568543 int64_t min_dts = INT64_MAX;
438
439
2/2
✓ Branch 0 taken 569609 times.
✓ Branch 1 taken 542361 times.
1111970 for (unsigned i = 0; i < sch->nb_mux; i++) {
440 569609 const SchMux *mux = &sch->mux[i];
441
442
2/2
✓ Branch 0 taken 629488 times.
✓ Branch 1 taken 543427 times.
1172915 for (unsigned j = 0; j < mux->nb_streams; j++) {
443 629488 const SchMuxStream *ms = &mux->streams[j];
444
445
4/4
✓ Branch 0 taken 45044 times.
✓ Branch 1 taken 584444 times.
✓ Branch 2 taken 36185 times.
✓ Branch 3 taken 8859 times.
629488 if (ms->source_finished && !count_finished)
446 36185 continue;
447
2/2
✓ Branch 0 taken 26182 times.
✓ Branch 1 taken 567121 times.
593303 if (ms->last_dts == AV_NOPTS_VALUE)
448 26182 return AV_NOPTS_VALUE;
449
450 567121 min_dts = FFMIN(min_dts, ms->last_dts);
451 }
452 }
453
454
2/2
✓ Branch 0 taken 514622 times.
✓ Branch 1 taken 27739 times.
542361 return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
455 }
456
457 8403 void sch_free(Scheduler **psch)
458 {
459 8403 Scheduler *sch = *psch;
460
461
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8403 times.
8403 if (!sch)
462 return;
463
464 8403 sch_stop(sch, NULL);
465
466
2/2
✓ Branch 0 taken 7356 times.
✓ Branch 1 taken 8403 times.
15759 for (unsigned i = 0; i < sch->nb_demux; i++) {
467 7356 SchDemux *d = &sch->demux[i];
468
469
2/2
✓ Branch 0 taken 7616 times.
✓ Branch 1 taken 7356 times.
14972 for (unsigned j = 0; j < d->nb_streams; j++) {
470 7616 SchDemuxStream *ds = &d->streams[j];
471 7616 av_freep(&ds->dst);
472 7616 av_freep(&ds->dst_finished);
473 }
474 7356 av_freep(&d->streams);
475
476 7356 av_packet_free(&d->send_pkt);
477
478 7356 waiter_uninit(&d->waiter);
479 }
480 8403 av_freep(&sch->demux);
481
482
2/2
✓ Branch 0 taken 8406 times.
✓ Branch 1 taken 8403 times.
16809 for (unsigned i = 0; i < sch->nb_mux; i++) {
483 8406 SchMux *mux = &sch->mux[i];
484
485
2/2
✓ Branch 0 taken 8895 times.
✓ Branch 1 taken 8406 times.
17301 for (unsigned j = 0; j < mux->nb_streams; j++) {
486 8895 SchMuxStream *ms = &mux->streams[j];
487
488
1/2
✓ Branch 0 taken 8895 times.
✗ Branch 1 not taken.
8895 if (ms->pre_mux_queue.fifo) {
489 AVPacket *pkt;
490
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 8895 times.
8895 while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
491 av_packet_free(&pkt);
492 8895 av_fifo_freep2(&ms->pre_mux_queue.fifo);
493 }
494
495 8895 av_freep(&ms->sub_heartbeat_dst);
496 }
497 8406 av_freep(&mux->streams);
498
499 8406 av_packet_free(&mux->sub_heartbeat_pkt);
500
501 8406 tq_free(&mux->queue);
502 }
503 8403 av_freep(&sch->mux);
504
505
2/2
✓ Branch 0 taken 6920 times.
✓ Branch 1 taken 8403 times.
15323 for (unsigned i = 0; i < sch->nb_dec; i++) {
506 6920 SchDec *dec = &sch->dec[i];
507
508 6920 tq_free(&dec->queue);
509
510 6920 av_thread_message_queue_free(&dec->queue_end_ts);
511
512
2/2
✓ Branch 0 taken 6926 times.
✓ Branch 1 taken 6920 times.
13846 for (unsigned j = 0; j < dec->nb_outputs; j++) {
513 6926 SchDecOutput *o = &dec->outputs[j];
514
515 6926 av_freep(&o->dst);
516 6926 av_freep(&o->dst_finished);
517 }
518
519 6920 av_freep(&dec->outputs);
520
521 6920 av_frame_free(&dec->send_frame);
522 }
523 8403 av_freep(&sch->dec);
524
525
2/2
✓ Branch 0 taken 8172 times.
✓ Branch 1 taken 8403 times.
16575 for (unsigned i = 0; i < sch->nb_enc; i++) {
526 8172 SchEnc *enc = &sch->enc[i];
527
528 8172 tq_free(&enc->queue);
529
530 8172 av_packet_free(&enc->send_pkt);
531
532 8172 av_freep(&enc->dst);
533 8172 av_freep(&enc->dst_finished);
534 }
535 8403 av_freep(&sch->enc);
536
537
2/2
✓ Branch 0 taken 3186 times.
✓ Branch 1 taken 8403 times.
11589 for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
538 3186 SchSyncQueue *sq = &sch->sq_enc[i];
539 3186 sq_free(&sq->sq);
540 3186 av_frame_free(&sq->frame);
541 3186 pthread_mutex_destroy(&sq->lock);
542 3186 av_freep(&sq->enc_idx);
543 }
544 8403 av_freep(&sch->sq_enc);
545
546
2/2
✓ Branch 0 taken 7998 times.
✓ Branch 1 taken 8403 times.
16401 for (unsigned i = 0; i < sch->nb_filters; i++) {
547 7998 SchFilterGraph *fg = &sch->filters[i];
548
549 7998 tq_free(&fg->queue);
550
551 7998 av_freep(&fg->inputs);
552 7998 av_freep(&fg->outputs);
553
554 7998 waiter_uninit(&fg->waiter);
555 }
556 8403 av_freep(&sch->filters);
557
558 8403 av_freep(&sch->sdp_filename);
559
560 8403 pthread_mutex_destroy(&sch->schedule_lock);
561
562 8403 pthread_mutex_destroy(&sch->mux_ready_lock);
563
564 8403 pthread_mutex_destroy(&sch->finish_lock);
565 8403 pthread_cond_destroy(&sch->finish_cond);
566
567 8403 av_freep(psch);
568 }
569
570 static const AVClass scheduler_class = {
571 .class_name = "Scheduler",
572 .version = LIBAVUTIL_VERSION_INT,
573 };
574
575 8403 Scheduler *sch_alloc(void)
576 {
577 Scheduler *sch;
578 int ret;
579
580 8403 sch = av_mallocz(sizeof(*sch));
581
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8403 times.
8403 if (!sch)
582 return NULL;
583
584 8403 sch->class = &scheduler_class;
585 8403 sch->sdp_auto = 1;
586
587 8403 ret = pthread_mutex_init(&sch->schedule_lock, NULL);
588
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8403 times.
8403 if (ret)
589 goto fail;
590
591 8403 ret = pthread_mutex_init(&sch->mux_ready_lock, NULL);
592
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8403 times.
8403 if (ret)
593 goto fail;
594
595 8403 ret = pthread_mutex_init(&sch->finish_lock, NULL);
596
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8403 times.
8403 if (ret)
597 goto fail;
598
599 8403 ret = pthread_cond_init(&sch->finish_cond, NULL);
600
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8403 times.
8403 if (ret)
601 goto fail;
602
603 8403 return sch;
604 fail:
605 sch_free(&sch);
606 return NULL;
607 }
608
609 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
610 {
611 av_freep(&sch->sdp_filename);
612 sch->sdp_filename = av_strdup(sdp_filename);
613 return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
614 }
615
616 static const AVClass sch_mux_class = {
617 .class_name = "SchMux",
618 .version = LIBAVUTIL_VERSION_INT,
619 .parent_log_context_offset = offsetof(SchMux, task.func_arg),
620 };
621
622 8406 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
623 void *arg, int sdp_auto, unsigned thread_queue_size)
624 {
625 8406 const unsigned idx = sch->nb_mux;
626
627 SchMux *mux;
628 int ret;
629
630 8406 ret = GROW_ARRAY(sch->mux, sch->nb_mux);
631
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8406 times.
8406 if (ret < 0)
632 return ret;
633
634 8406 mux = &sch->mux[idx];
635 8406 mux->class = &sch_mux_class;
636 8406 mux->init = init;
637 8406 mux->queue_size = thread_queue_size;
638
639 8406 task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
640
641 8406 sch->sdp_auto &= sdp_auto;
642
643 8406 return idx;
644 }
645
646 8895 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
647 {
648 SchMux *mux;
649 SchMuxStream *ms;
650 unsigned stream_idx;
651 int ret;
652
653
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8895 times.
8895 av_assert0(mux_idx < sch->nb_mux);
654 8895 mux = &sch->mux[mux_idx];
655
656 8895 ret = GROW_ARRAY(mux->streams, mux->nb_streams);
657
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8895 times.
8895 if (ret < 0)
658 return ret;
659 8895 stream_idx = mux->nb_streams - 1;
660
661 8895 ms = &mux->streams[stream_idx];
662
663 8895 ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
664
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8895 times.
8895 if (!ms->pre_mux_queue.fifo)
665 return AVERROR(ENOMEM);
666
667 8895 ms->last_dts = AV_NOPTS_VALUE;
668
669 8895 return stream_idx;
670 }
671
672 static const AVClass sch_demux_class = {
673 .class_name = "SchDemux",
674 .version = LIBAVUTIL_VERSION_INT,
675 .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
676 };
677
678 7356 int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
679 {
680 7356 const unsigned idx = sch->nb_demux;
681
682 SchDemux *d;
683 int ret;
684
685 7356 ret = GROW_ARRAY(sch->demux, sch->nb_demux);
686
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7356 times.
7356 if (ret < 0)
687 return ret;
688
689 7356 d = &sch->demux[idx];
690
691 7356 task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
692
693 7356 d->class = &sch_demux_class;
694 7356 d->send_pkt = av_packet_alloc();
695
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7356 times.
7356 if (!d->send_pkt)
696 return AVERROR(ENOMEM);
697
698 7356 ret = waiter_init(&d->waiter);
699
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7356 times.
7356 if (ret < 0)
700 return ret;
701
702 7356 return idx;
703 }
704
705 7616 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
706 {
707 SchDemux *d;
708 int ret;
709
710
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7616 times.
7616 av_assert0(demux_idx < sch->nb_demux);
711 7616 d = &sch->demux[demux_idx];
712
713 7616 ret = GROW_ARRAY(d->streams, d->nb_streams);
714
1/2
✓ Branch 0 taken 7616 times.
✗ Branch 1 not taken.
7616 return ret < 0 ? ret : d->nb_streams - 1;
715 }
716
717 6926 int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
718 {
719 SchDec *dec;
720 int ret;
721
722
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6926 times.
6926 av_assert0(dec_idx < sch->nb_dec);
723 6926 dec = &sch->dec[dec_idx];
724
725 6926 ret = GROW_ARRAY(dec->outputs, dec->nb_outputs);
726
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6926 times.
6926 if (ret < 0)
727 return ret;
728
729 6926 return dec->nb_outputs - 1;
730 }
731
732 static const AVClass sch_dec_class = {
733 .class_name = "SchDec",
734 .version = LIBAVUTIL_VERSION_INT,
735 .parent_log_context_offset = offsetof(SchDec, task.func_arg),
736 };
737
738 6920 int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
739 {
740 6920 const unsigned idx = sch->nb_dec;
741
742 SchDec *dec;
743 int ret;
744
745 6920 ret = GROW_ARRAY(sch->dec, sch->nb_dec);
746
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6920 times.
6920 if (ret < 0)
747 return ret;
748
749 6920 dec = &sch->dec[idx];
750
751 6920 task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
752
753 6920 dec->class = &sch_dec_class;
754 6920 dec->send_frame = av_frame_alloc();
755
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6920 times.
6920 if (!dec->send_frame)
756 return AVERROR(ENOMEM);
757
758 6920 ret = sch_add_dec_output(sch, idx);
759
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6920 times.
6920 if (ret < 0)
760 return ret;
761
762 6920 ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
763
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6920 times.
6920 if (ret < 0)
764 return ret;
765
766
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6919 times.
6920 if (send_end_ts) {
767 1 ret = av_thread_message_queue_alloc(&dec->queue_end_ts, 1, sizeof(Timestamp));
768
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ret < 0)
769 return ret;
770 }
771
772 6920 return idx;
773 }
774
775 static const AVClass sch_enc_class = {
776 .class_name = "SchEnc",
777 .version = LIBAVUTIL_VERSION_INT,
778 .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
779 };
780
781 8172 int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
782 int (*open_cb)(void *opaque, const AVFrame *frame))
783 {
784 8172 const unsigned idx = sch->nb_enc;
785
786 SchEnc *enc;
787 int ret;
788
789 8172 ret = GROW_ARRAY(sch->enc, sch->nb_enc);
790
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8172 times.
8172 if (ret < 0)
791 return ret;
792
793 8172 enc = &sch->enc[idx];
794
795 8172 enc->class = &sch_enc_class;
796 8172 enc->open_cb = open_cb;
797 8172 enc->sq_idx[0] = -1;
798 8172 enc->sq_idx[1] = -1;
799
800 8172 task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
801
802 8172 enc->send_pkt = av_packet_alloc();
803
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8172 times.
8172 if (!enc->send_pkt)
804 return AVERROR(ENOMEM);
805
806 8172 ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
807
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8172 times.
8172 if (ret < 0)
808 return ret;
809
810 8172 return idx;
811 }
812
813 static const AVClass sch_fg_class = {
814 .class_name = "SchFilterGraph",
815 .version = LIBAVUTIL_VERSION_INT,
816 .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
817 };
818
819 7998 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
820 SchThreadFunc func, void *ctx)
821 {
822 7998 const unsigned idx = sch->nb_filters;
823
824 SchFilterGraph *fg;
825 int ret;
826
827 7998 ret = GROW_ARRAY(sch->filters, sch->nb_filters);
828
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7998 times.
7998 if (ret < 0)
829 return ret;
830 7998 fg = &sch->filters[idx];
831
832 7998 fg->class = &sch_fg_class;
833
834 7998 task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
835
836
2/2
✓ Branch 0 taken 6859 times.
✓ Branch 1 taken 1139 times.
7998 if (nb_inputs) {
837 6859 fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
838
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6859 times.
6859 if (!fg->inputs)
839 return AVERROR(ENOMEM);
840 6859 fg->nb_inputs = nb_inputs;
841 }
842
843
1/2
✓ Branch 0 taken 7998 times.
✗ Branch 1 not taken.
7998 if (nb_outputs) {
844 7998 fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
845
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7998 times.
7998 if (!fg->outputs)
846 return AVERROR(ENOMEM);
847 7998 fg->nb_outputs = nb_outputs;
848 }
849
850 7998 ret = waiter_init(&fg->waiter);
851
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7998 times.
7998 if (ret < 0)
852 return ret;
853
854 7998 ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
855
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7998 times.
7998 if (ret < 0)
856 return ret;
857
858 7998 return idx;
859 }
860
861 3186 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
862 {
863 SchSyncQueue *sq;
864 int ret;
865
866 3186 ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
867
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3186 times.
3186 if (ret < 0)
868 return ret;
869 3186 sq = &sch->sq_enc[sch->nb_sq_enc - 1];
870
871 3186 sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
872
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3186 times.
3186 if (!sq->sq)
873 return AVERROR(ENOMEM);
874
875 3186 sq->frame = av_frame_alloc();
876
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3186 times.
3186 if (!sq->frame)
877 return AVERROR(ENOMEM);
878
879 3186 ret = pthread_mutex_init(&sq->lock, NULL);
880
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3186 times.
3186 if (ret)
881 return AVERROR(ret);
882
883 3186 return sq - sch->sq_enc;
884 }
885
886 3248 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
887 int limiting, uint64_t max_frames)
888 {
889 SchSyncQueue *sq;
890 SchEnc *enc;
891 int ret;
892
893
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3248 times.
3248 av_assert0(sq_idx < sch->nb_sq_enc);
894 3248 sq = &sch->sq_enc[sq_idx];
895
896
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3248 times.
3248 av_assert0(enc_idx < sch->nb_enc);
897 3248 enc = &sch->enc[enc_idx];
898
899 3248 ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
900
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3248 times.
3248 if (ret < 0)
901 return ret;
902 3248 sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
903
904 3248 ret = sq_add_stream(sq->sq, limiting);
905
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3248 times.
3248 if (ret < 0)
906 return ret;
907
908 3248 enc->sq_idx[0] = sq_idx;
909 3248 enc->sq_idx[1] = ret;
910
911
2/2
✓ Branch 0 taken 3059 times.
✓ Branch 1 taken 189 times.
3248 if (max_frames != INT64_MAX)
912 3059 sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
913
914 3248 return 0;
915 }
916
917 30932 int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
918 {
919 int ret;
920
921
4/5
✓ Branch 0 taken 7642 times.
✓ Branch 1 taken 6987 times.
✓ Branch 2 taken 8130 times.
✓ Branch 3 taken 8173 times.
✗ Branch 4 not taken.
30932 switch (src.type) {
922 7642 case SCH_NODE_TYPE_DEMUX: {
923 SchDemuxStream *ds;
924
925
2/4
✓ Branch 0 taken 7642 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7642 times.
7642 av_assert0(src.idx < sch->nb_demux &&
926 src.idx_stream < sch->demux[src.idx].nb_streams);
927 7642 ds = &sch->demux[src.idx].streams[src.idx_stream];
928
929 7642 ret = GROW_ARRAY(ds->dst, ds->nb_dst);
930
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7642 times.
7642 if (ret < 0)
931 return ret;
932
933 7642 ds->dst[ds->nb_dst - 1] = dst;
934
935 // demuxed packets go to decoding or streamcopy
936
2/3
✓ Branch 0 taken 6919 times.
✓ Branch 1 taken 723 times.
✗ Branch 2 not taken.
7642 switch (dst.type) {
937 6919 case SCH_NODE_TYPE_DEC: {
938 SchDec *dec;
939
940
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6919 times.
6919 av_assert0(dst.idx < sch->nb_dec);
941 6919 dec = &sch->dec[dst.idx];
942
943
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6919 times.
6919 av_assert0(!dec->src.type);
944 6919 dec->src = src;
945 6919 break;
946 }
947 723 case SCH_NODE_TYPE_MUX: {
948 SchMuxStream *ms;
949
950
2/4
✓ Branch 0 taken 723 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 723 times.
723 av_assert0(dst.idx < sch->nb_mux &&
951 dst.idx_stream < sch->mux[dst.idx].nb_streams);
952 723 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
953
954
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 723 times.
723 av_assert0(!ms->src.type);
955 723 ms->src = src;
956
957 723 break;
958 }
959 default: av_assert0(0);
960 }
961
962 7642 break;
963 }
964 6987 case SCH_NODE_TYPE_DEC: {
965 SchDec *dec;
966 SchDecOutput *o;
967
968
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6987 times.
6987 av_assert0(src.idx < sch->nb_dec);
969 6987 dec = &sch->dec[src.idx];
970
971
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6987 times.
6987 av_assert0(src.idx_stream < dec->nb_outputs);
972 6987 o = &dec->outputs[src.idx_stream];
973
974 6987 ret = GROW_ARRAY(o->dst, o->nb_dst);
975
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6987 times.
6987 if (ret < 0)
976 return ret;
977
978 6987 o->dst[o->nb_dst - 1] = dst;
979
980 // decoded frames go to filters or encoding
981
2/3
✓ Branch 0 taken 6945 times.
✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
6987 switch (dst.type) {
982 6945 case SCH_NODE_TYPE_FILTER_IN: {
983 SchFilterIn *fi;
984
985
2/4
✓ Branch 0 taken 6945 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6945 times.
6945 av_assert0(dst.idx < sch->nb_filters &&
986 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
987 6945 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
988
989
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6945 times.
6945 av_assert0(!fi->src.type);
990 6945 fi->src = src;
991 6945 break;
992 }
993 42 case SCH_NODE_TYPE_ENC: {
994 SchEnc *enc;
995
996
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42 times.
42 av_assert0(dst.idx < sch->nb_enc);
997 42 enc = &sch->enc[dst.idx];
998
999
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42 times.
42 av_assert0(!enc->src.type);
1000 42 enc->src = src;
1001 42 break;
1002 }
1003 default: av_assert0(0);
1004 }
1005
1006 6987 break;
1007 }
1008 8130 case SCH_NODE_TYPE_FILTER_OUT: {
1009 SchFilterOut *fo;
1010
1011
2/4
✓ Branch 0 taken 8130 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 8130 times.
8130 av_assert0(src.idx < sch->nb_filters &&
1012 src.idx_stream < sch->filters[src.idx].nb_outputs);
1013 8130 fo = &sch->filters[src.idx].outputs[src.idx_stream];
1014
1015
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8130 times.
8130 av_assert0(!fo->dst.type);
1016 8130 fo->dst = dst;
1017
1018 // filtered frames go to encoding or another filtergraph
1019
1/3
✓ Branch 0 taken 8130 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
8130 switch (dst.type) {
1020 8130 case SCH_NODE_TYPE_ENC: {
1021 SchEnc *enc;
1022
1023
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8130 times.
8130 av_assert0(dst.idx < sch->nb_enc);
1024 8130 enc = &sch->enc[dst.idx];
1025
1026
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8130 times.
8130 av_assert0(!enc->src.type);
1027 8130 enc->src = src;
1028 8130 break;
1029 }
1030 case SCH_NODE_TYPE_FILTER_IN: {
1031 SchFilterIn *fi;
1032
1033 av_assert0(dst.idx < sch->nb_filters &&
1034 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1035 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1036
1037 av_assert0(!fi->src.type);
1038 fi->src = src;
1039 break;
1040 }
1041 default: av_assert0(0);
1042 }
1043
1044
1045 8130 break;
1046 }
1047 8173 case SCH_NODE_TYPE_ENC: {
1048 SchEnc *enc;
1049
1050
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8173 times.
8173 av_assert0(src.idx < sch->nb_enc);
1051 8173 enc = &sch->enc[src.idx];
1052
1053 8173 ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1054
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8173 times.
8173 if (ret < 0)
1055 return ret;
1056
1057 8173 enc->dst[enc->nb_dst - 1] = dst;
1058
1059 // encoding packets go to muxing or decoding
1060
2/3
✓ Branch 0 taken 8172 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
8173 switch (dst.type) {
1061 8172 case SCH_NODE_TYPE_MUX: {
1062 SchMuxStream *ms;
1063
1064
2/4
✓ Branch 0 taken 8172 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 8172 times.
8172 av_assert0(dst.idx < sch->nb_mux &&
1065 dst.idx_stream < sch->mux[dst.idx].nb_streams);
1066 8172 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1067
1068
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8172 times.
8172 av_assert0(!ms->src.type);
1069 8172 ms->src = src;
1070
1071 8172 break;
1072 }
1073 1 case SCH_NODE_TYPE_DEC: {
1074 SchDec *dec;
1075
1076
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(dst.idx < sch->nb_dec);
1077 1 dec = &sch->dec[dst.idx];
1078
1079
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(!dec->src.type);
1080 1 dec->src = src;
1081
1082 1 break;
1083 }
1084 default: av_assert0(0);
1085 }
1086
1087 8173 break;
1088 }
1089 default: av_assert0(0);
1090 }
1091
1092 30932 return 0;
1093 }
1094
1095 8406 static int mux_task_start(SchMux *mux)
1096 {
1097 8406 int ret = 0;
1098
1099 8406 ret = task_start(&mux->task);
1100
1/2
✓ Branch 0 taken 8406 times.
✗ Branch 1 not taken.
8406 if (ret < 0)
1101 return ret;
1102
1103 /* flush the pre-muxing queues */
1104 2697 while (1) {
1105 11103 int min_stream = -1;
1106 11103 Timestamp min_ts = { .ts = AV_NOPTS_VALUE };
1107
1108 AVPacket *pkt;
1109
1110 // find the stream with the earliest dts or EOF in pre-muxing queue
1111
2/2
✓ Branch 0 taken 18332 times.
✓ Branch 1 taken 11049 times.
29381 for (unsigned i = 0; i < mux->nb_streams; i++) {
1112 18332 SchMuxStream *ms = &mux->streams[i];
1113
1114
2/2
✓ Branch 1 taken 14797 times.
✓ Branch 2 taken 3535 times.
18332 if (av_fifo_peek(ms->pre_mux_queue.fifo, &pkt, 1, 0) < 0)
1115 14797 continue;
1116
1117
4/4
✓ Branch 0 taken 3493 times.
✓ Branch 1 taken 42 times.
✓ Branch 2 taken 12 times.
✓ Branch 3 taken 3481 times.
3535 if (!pkt || pkt->dts == AV_NOPTS_VALUE) {
1118 54 min_stream = i;
1119 54 break;
1120 }
1121
1122
4/4
✓ Branch 0 taken 811 times.
✓ Branch 1 taken 2670 times.
✓ Branch 2 taken 22 times.
✓ Branch 3 taken 789 times.
4292 if (min_ts.ts == AV_NOPTS_VALUE ||
1123 811 av_compare_ts(min_ts.ts, min_ts.tb, pkt->dts, pkt->time_base) > 0) {
1124 2692 min_stream = i;
1125 2692 min_ts = (Timestamp){ .ts = pkt->dts, .tb = pkt->time_base };
1126 }
1127 }
1128
1129
2/2
✓ Branch 0 taken 2697 times.
✓ Branch 1 taken 8406 times.
11103 if (min_stream >= 0) {
1130 2697 SchMuxStream *ms = &mux->streams[min_stream];
1131
1132 2697 ret = av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1);
1133
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2697 times.
2697 av_assert0(ret >= 0);
1134
1135
2/2
✓ Branch 0 taken 2655 times.
✓ Branch 1 taken 42 times.
2697 if (pkt) {
1136
2/2
✓ Branch 0 taken 2646 times.
✓ Branch 1 taken 9 times.
2655 if (!ms->init_eof)
1137 2646 ret = tq_send(mux->queue, min_stream, pkt);
1138 2655 av_packet_free(&pkt);
1139
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 2654 times.
2655 if (ret == AVERROR_EOF)
1140 1 ms->init_eof = 1;
1141
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2654 times.
2654 else if (ret < 0)
1142 return ret;
1143 } else
1144 42 tq_send_finish(mux->queue, min_stream);
1145
1146 2697 continue;
1147 }
1148
1149 8406 break;
1150 }
1151
1152 8406 atomic_store(&mux->mux_started, 1);
1153
1154 8406 return 0;
1155 }
1156
1157 int print_sdp(const char *filename);
1158
1159 8406 static int mux_init(Scheduler *sch, SchMux *mux)
1160 {
1161 int ret;
1162
1163 8406 ret = mux->init(mux->task.func_arg);
1164
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8406 times.
8406 if (ret < 0)
1165 return ret;
1166
1167 8406 sch->nb_mux_ready++;
1168
1169
2/4
✓ Branch 0 taken 8406 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 8406 times.
8406 if (sch->sdp_filename || sch->sdp_auto) {
1170 if (sch->nb_mux_ready < sch->nb_mux)
1171 return 0;
1172
1173 ret = print_sdp(sch->sdp_filename);
1174 if (ret < 0) {
1175 av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1176 return ret;
1177 }
1178
1179 /* SDP is written only after all the muxers are ready, so now we
1180 * start ALL the threads */
1181 for (unsigned i = 0; i < sch->nb_mux; i++) {
1182 ret = mux_task_start(&sch->mux[i]);
1183 if (ret < 0)
1184 return ret;
1185 }
1186 } else {
1187 8406 ret = mux_task_start(mux);
1188
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8406 times.
8406 if (ret < 0)
1189 return ret;
1190 }
1191
1192 8406 return 0;
1193 }
1194
1195 8895 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1196 size_t data_threshold, int max_packets)
1197 {
1198 SchMux *mux;
1199 SchMuxStream *ms;
1200
1201
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8895 times.
8895 av_assert0(mux_idx < sch->nb_mux);
1202 8895 mux = &sch->mux[mux_idx];
1203
1204
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8895 times.
8895 av_assert0(stream_idx < mux->nb_streams);
1205 8895 ms = &mux->streams[stream_idx];
1206
1207 8895 ms->pre_mux_queue.max_packets = max_packets;
1208 8895 ms->pre_mux_queue.data_threshold = data_threshold;
1209 8895 }
1210
1211 8895 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1212 {
1213 SchMux *mux;
1214 8895 int ret = 0;
1215
1216
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8895 times.
8895 av_assert0(mux_idx < sch->nb_mux);
1217 8895 mux = &sch->mux[mux_idx];
1218
1219
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8895 times.
8895 av_assert0(stream_idx < mux->nb_streams);
1220
1221 8895 pthread_mutex_lock(&sch->mux_ready_lock);
1222
1223
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8895 times.
8895 av_assert0(mux->nb_streams_ready < mux->nb_streams);
1224
1225 // this may be called during initialization - do not start
1226 // threads before sch_start() is called
1227
2/2
✓ Branch 0 taken 8406 times.
✓ Branch 1 taken 489 times.
8895 if (++mux->nb_streams_ready == mux->nb_streams &&
1228
2/2
✓ Branch 0 taken 7923 times.
✓ Branch 1 taken 483 times.
8406 sch->state >= SCH_STATE_STARTED)
1229 7923 ret = mux_init(sch, mux);
1230
1231 8895 pthread_mutex_unlock(&sch->mux_ready_lock);
1232
1233 8895 return ret;
1234 }
1235
1236 1 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1237 unsigned dec_idx)
1238 {
1239 SchMux *mux;
1240 SchMuxStream *ms;
1241 1 int ret = 0;
1242
1243
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(mux_idx < sch->nb_mux);
1244 1 mux = &sch->mux[mux_idx];
1245
1246
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(stream_idx < mux->nb_streams);
1247 1 ms = &mux->streams[stream_idx];
1248
1249 1 ret = GROW_ARRAY(ms->sub_heartbeat_dst, ms->nb_sub_heartbeat_dst);
1250
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ret < 0)
1251 return ret;
1252
1253
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(dec_idx < sch->nb_dec);
1254 1 ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1255
1256
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!mux->sub_heartbeat_pkt) {
1257 1 mux->sub_heartbeat_pkt = av_packet_alloc();
1258
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!mux->sub_heartbeat_pkt)
1259 return AVERROR(ENOMEM);
1260 }
1261
1262 1 return 0;
1263 }
1264
1265 566461 static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
1266 {
1267 1380842 while (1) {
1268 SchFilterGraph *fg;
1269
4/5
✓ Branch 0 taken 523681 times.
✓ Branch 1 taken 446398 times.
✓ Branch 2 taken 489177 times.
✓ Branch 3 taken 488047 times.
✗ Branch 4 not taken.
1947303 switch (src.type) {
1270 523681 case SCH_NODE_TYPE_DEMUX:
1271 // fed directly by a demuxer (i.e. not through a filtergraph)
1272 523681 sch->demux[src.idx].waiter.choked_next = 0;
1273 523681 return;
1274 446398 case SCH_NODE_TYPE_DEC:
1275 446398 src = sch->dec[src.idx].src;
1276 446398 continue;
1277 489177 case SCH_NODE_TYPE_ENC:
1278 489177 src = sch->enc[src.idx].src;
1279 489177 continue;
1280 488047 case SCH_NODE_TYPE_FILTER_OUT:
1281 488047 fg = &sch->filters[src.idx];
1282 // the filtergraph contains internal sources and
1283 // requested to be scheduled directly
1284
2/2
✓ Branch 0 taken 42780 times.
✓ Branch 1 taken 445267 times.
488047 if (fg->best_input == fg->nb_inputs) {
1285 42780 fg->waiter.choked_next = 0;
1286 42780 return;
1287 }
1288 445267 src = fg->inputs[fg->best_input].src;
1289 445267 continue;
1290 default:
1291 av_unreachable("Invalid source node type?");
1292 return;
1293 }
1294 }
1295 }
1296
1297 26659 static void choke_demux(const Scheduler *sch, int demux_id, int choked)
1298 {
1299 av_assert1(demux_id < sch->nb_demux);
1300 26659 SchDemux *demux = &sch->demux[demux_id];
1301
1302
2/2
✓ Branch 0 taken 27359 times.
✓ Branch 1 taken 26659 times.
54018 for (int i = 0; i < demux->nb_streams; i++) {
1303 27359 SchedulerNode *dst = demux->streams[i].dst;
1304 SchFilterGraph *fg;
1305
1306
2/5
✓ Branch 0 taken 25792 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1567 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
27359 switch (dst->type) {
1307 25792 case SCH_NODE_TYPE_DEC:
1308 25792 tq_choke(sch->dec[dst->idx].queue, choked);
1309 25792 break;
1310 case SCH_NODE_TYPE_ENC:
1311 tq_choke(sch->enc[dst->idx].queue, choked);
1312 break;
1313 1567 case SCH_NODE_TYPE_MUX:
1314 1567 break;
1315 case SCH_NODE_TYPE_FILTER_IN:
1316 fg = &sch->filters[dst->idx];
1317 if (fg->nb_inputs == 1)
1318 tq_choke(fg->queue, choked);
1319 break;
1320 default:
1321 av_unreachable("Invalid destination node type?");
1322 break;
1323 }
1324 }
1325 26659 }
1326
1327 572188 static void schedule_update_locked(Scheduler *sch)
1328 {
1329 int64_t dts;
1330 572188 int have_unchoked = 0;
1331
1332 // on termination request all waiters are choked,
1333 // we are not to unchoke them
1334
2/2
✓ Branch 0 taken 12047 times.
✓ Branch 1 taken 560141 times.
572188 if (atomic_load(&sch->terminate))
1335 12047 return;
1336
1337 560141 dts = trailing_dts(sch, 0);
1338
1339 560141 atomic_store(&sch->last_dts, dts);
1340
1341 // initialize our internal state
1342
2/2
✓ Branch 0 taken 1120282 times.
✓ Branch 1 taken 560141 times.
1680423 for (unsigned type = 0; type < 2; type++)
1343
4/4
✓ Branch 0 taken 1067545 times.
✓ Branch 1 taken 1096400 times.
✓ Branch 2 taken 1043663 times.
✓ Branch 3 taken 1120282 times.
2163945 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1344
2/2
✓ Branch 0 taken 507404 times.
✓ Branch 1 taken 536259 times.
1043663 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1345 1043663 w->choked_prev = atomic_load(&w->choked);
1346 1043663 w->choked_next = 1;
1347 }
1348
1349 // figure out the sources that are allowed to proceed
1350
2/2
✓ Branch 0 taken 561487 times.
✓ Branch 1 taken 560141 times.
1121628 for (unsigned i = 0; i < sch->nb_mux; i++) {
1351 561487 SchMux *mux = &sch->mux[i];
1352
1353
2/2
✓ Branch 0 taken 630001 times.
✓ Branch 1 taken 561487 times.
1191488 for (unsigned j = 0; j < mux->nb_streams; j++) {
1354 630001 SchMuxStream *ms = &mux->streams[j];
1355
1356 // unblock sources for output streams that are not finished
1357 // and not too far ahead of the trailing stream
1358
2/2
✓ Branch 0 taken 36319 times.
✓ Branch 1 taken 593682 times.
630001 if (ms->source_finished)
1359 36319 continue;
1360
4/4
✓ Branch 0 taken 40118 times.
✓ Branch 1 taken 553564 times.
✓ Branch 2 taken 8073 times.
✓ Branch 3 taken 32045 times.
593682 if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1361 8073 continue;
1362
4/4
✓ Branch 0 taken 553564 times.
✓ Branch 1 taken 32045 times.
✓ Branch 2 taken 19149 times.
✓ Branch 3 taken 534415 times.
585609 if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1363 19149 continue;
1364
1365 // resolve the source to unchoke
1366 566460 unchoke_for_stream(sch, ms->src);
1367 566460 have_unchoked = 1;
1368 }
1369 }
1370
1371 // also unchoke any sources feeding into closed filter graph inputs, so
1372 // that they can observe the downstream EOF
1373
2/2
✓ Branch 0 taken 507404 times.
✓ Branch 1 taken 560141 times.
1067545 for (unsigned i = 0; i < sch->nb_filters; i++) {
1374 507404 SchFilterGraph *fg = &sch->filters[i];
1375
1376
2/2
✓ Branch 0 taken 493883 times.
✓ Branch 1 taken 507404 times.
1001287 for (unsigned j = 0; j < fg->nb_inputs; j++) {
1377 493883 SchFilterIn *fi = &fg->inputs[j];
1378
4/4
✓ Branch 0 taken 35 times.
✓ Branch 1 taken 493848 times.
✓ Branch 2 taken 1 times.
✓ Branch 3 taken 34 times.
493883 if (fi->receive_finished && !fi->send_finished)
1379 1 unchoke_for_stream(sch, fi->src);
1380 }
1381 }
1382
1383 // make sure to unchoke at least one source, if still available
1384
4/4
✓ Branch 0 taken 46632 times.
✓ Branch 1 taken 553643 times.
✓ Branch 2 taken 40134 times.
✓ Branch 3 taken 6498 times.
600275 for (unsigned type = 0; !have_unchoked && type < 2; type++)
1385
4/4
✓ Branch 0 taken 18108 times.
✓ Branch 1 taken 38304 times.
✓ Branch 2 taken 37519 times.
✓ Branch 3 taken 18893 times.
56412 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1386
2/2
✓ Branch 0 taken 11610 times.
✓ Branch 1 taken 25909 times.
37519 int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1387
2/2
✓ Branch 0 taken 11610 times.
✓ Branch 1 taken 25909 times.
37519 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1388
2/2
✓ Branch 0 taken 21241 times.
✓ Branch 1 taken 16278 times.
37519 if (!exited) {
1389 21241 w->choked_next = 0;
1390 21241 have_unchoked = 1;
1391 21241 break;
1392 }
1393 }
1394
1395
2/2
✓ Branch 0 taken 1120282 times.
✓ Branch 1 taken 560141 times.
1680423 for (unsigned type = 0; type < 2; type++) {
1396
4/4
✓ Branch 0 taken 1067545 times.
✓ Branch 1 taken 1096400 times.
✓ Branch 2 taken 1043663 times.
✓ Branch 3 taken 1120282 times.
2163945 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1397
2/2
✓ Branch 0 taken 507404 times.
✓ Branch 1 taken 536259 times.
1043663 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1398
2/2
✓ Branch 0 taken 43246 times.
✓ Branch 1 taken 1000417 times.
1043663 if (w->choked_prev != w->choked_next) {
1399 43246 waiter_set(w, w->choked_next);
1400
2/2
✓ Branch 0 taken 19303 times.
✓ Branch 1 taken 23943 times.
43246 if (!type)
1401 19303 choke_demux(sch, i, w->choked_next);
1402 }
1403 }
1404 }
1405
1406 }
1407
1408 enum {
1409 CYCLE_NODE_NEW = 0,
1410 CYCLE_NODE_STARTED,
1411 CYCLE_NODE_DONE,
1412 };
1413
1414 // Finds the filtergraph or muxer upstream of a scheduler node
1415 6946 static SchedulerNode src_filtergraph(const Scheduler *sch, SchedulerNode src)
1416 {
1417 while (1) {
1418
3/4
✓ Branch 0 taken 6946 times.
✓ Branch 1 taken 6946 times.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
13893 switch (src.type) {
1419 6946 case SCH_NODE_TYPE_DEMUX:
1420 case SCH_NODE_TYPE_FILTER_OUT:
1421 6946 return src;
1422 6946 case SCH_NODE_TYPE_DEC:
1423 6946 src = sch->dec[src.idx].src;
1424 6947 continue;
1425 1 case SCH_NODE_TYPE_ENC:
1426 1 src = sch->enc[src.idx].src;
1427 1 continue;
1428 default:
1429 av_unreachable("Invalid source node type?");
1430 return (SchedulerNode) {0};
1431 }
1432 }
1433 }
1434
1435 static int
1436 7998 check_acyclic_for_output(const Scheduler *sch, SchedulerNode src,
1437 uint8_t *filters_visited, SchedulerNode *filters_stack)
1438 {
1439 7998 unsigned nb_filters_stack = 0;
1440
1441 7998 memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1442
1443 6947 while (1) {
1444 14945 const SchFilterGraph *fg = &sch->filters[src.idx];
1445
1446 14945 filters_visited[src.idx] = CYCLE_NODE_STARTED;
1447
1448 // descend into every input, depth first
1449
2/2
✓ Branch 0 taken 6946 times.
✓ Branch 1 taken 7999 times.
14945 if (src.idx_stream < fg->nb_inputs) {
1450 6946 const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1451 6946 SchedulerNode node = src_filtergraph(sch, fi->src);
1452
1453 // connected to demuxer, no cycles possible
1454
2/2
✓ Branch 0 taken 6945 times.
✓ Branch 1 taken 1 times.
6946 if (node.type == SCH_NODE_TYPE_DEMUX)
1455 6946 continue;
1456
1457 // otherwise connected to another filtergraph
1458
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(node.type == SCH_NODE_TYPE_FILTER_OUT);
1459
1460 // found a cycle
1461
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (filters_visited[node.idx] == CYCLE_NODE_STARTED)
1462 return AVERROR(EINVAL);
1463
1464 // place current position on stack and descend
1465
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(nb_filters_stack < sch->nb_filters);
1466 1 filters_stack[nb_filters_stack++] = src;
1467 1 src = (SchedulerNode){ .idx = node.idx, .idx_stream = 0 };
1468 1 continue;
1469 }
1470
1471 7999 filters_visited[src.idx] = CYCLE_NODE_DONE;
1472
1473 // previous search finished,
1474
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 7998 times.
7999 if (nb_filters_stack) {
1475 1 src = filters_stack[--nb_filters_stack];
1476 1 continue;
1477 }
1478 7998 return 0;
1479 }
1480 }
1481
1482 8402 static int check_acyclic(Scheduler *sch)
1483 {
1484 8402 uint8_t *filters_visited = NULL;
1485 8402 SchedulerNode *filters_stack = NULL;
1486
1487 8402 int ret = 0;
1488
1489
2/2
✓ Branch 0 taken 520 times.
✓ Branch 1 taken 7882 times.
8402 if (!sch->nb_filters)
1490 520 return 0;
1491
1492 7882 filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1493
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7882 times.
7882 if (!filters_visited)
1494 return AVERROR(ENOMEM);
1495
1496 7882 filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1497
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7882 times.
7882 if (!filters_stack) {
1498 ret = AVERROR(ENOMEM);
1499 goto fail;
1500 }
1501
1502 // trace the transcoding graph upstream from every filtegraph
1503
2/2
✓ Branch 0 taken 7998 times.
✓ Branch 1 taken 7882 times.
15880 for (unsigned i = 0; i < sch->nb_filters; i++) {
1504 7998 ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1505 filters_visited, filters_stack);
1506
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7998 times.
7998 if (ret < 0) {
1507 av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1508 goto fail;
1509 }
1510 }
1511
1512 7882 fail:
1513 7882 av_freep(&filters_visited);
1514 7882 av_freep(&filters_stack);
1515 7882 return ret;
1516 }
1517
1518 8402 static int start_prepare(Scheduler *sch)
1519 {
1520 int ret;
1521
1522
2/2
✓ Branch 0 taken 7356 times.
✓ Branch 1 taken 8402 times.
15758 for (unsigned i = 0; i < sch->nb_demux; i++) {
1523 7356 SchDemux *d = &sch->demux[i];
1524
1525
2/2
✓ Branch 0 taken 7616 times.
✓ Branch 1 taken 7356 times.
14972 for (unsigned j = 0; j < d->nb_streams; j++) {
1526 7616 SchDemuxStream *ds = &d->streams[j];
1527
1528
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7616 times.
7616 if (!ds->nb_dst) {
1529 av_log(d, AV_LOG_ERROR,
1530 "Demuxer stream %u not connected to any sink\n", j);
1531 return AVERROR(EINVAL);
1532 }
1533
1534 7616 ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1535
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7616 times.
7616 if (!ds->dst_finished)
1536 return AVERROR(ENOMEM);
1537 }
1538 }
1539
1540
2/2
✓ Branch 0 taken 6920 times.
✓ Branch 1 taken 8402 times.
15322 for (unsigned i = 0; i < sch->nb_dec; i++) {
1541 6920 SchDec *dec = &sch->dec[i];
1542
1543
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6920 times.
6920 if (!dec->src.type) {
1544 av_log(dec, AV_LOG_ERROR,
1545 "Decoder not connected to a source\n");
1546 return AVERROR(EINVAL);
1547 }
1548
1549
2/2
✓ Branch 0 taken 6926 times.
✓ Branch 1 taken 6920 times.
13846 for (unsigned j = 0; j < dec->nb_outputs; j++) {
1550 6926 SchDecOutput *o = &dec->outputs[j];
1551
1552
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6926 times.
6926 if (!o->nb_dst) {
1553 av_log(dec, AV_LOG_ERROR,
1554 "Decoder output %u not connected to any sink\n", j);
1555 return AVERROR(EINVAL);
1556 }
1557
1558 6926 o->dst_finished = av_calloc(o->nb_dst, sizeof(*o->dst_finished));
1559
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6926 times.
6926 if (!o->dst_finished)
1560 return AVERROR(ENOMEM);
1561 }
1562 }
1563
1564
2/2
✓ Branch 0 taken 8172 times.
✓ Branch 1 taken 8402 times.
16574 for (unsigned i = 0; i < sch->nb_enc; i++) {
1565 8172 SchEnc *enc = &sch->enc[i];
1566
1567
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8172 times.
8172 if (!enc->src.type) {
1568 av_log(enc, AV_LOG_ERROR,
1569 "Encoder not connected to a source\n");
1570 return AVERROR(EINVAL);
1571 }
1572
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8172 times.
8172 if (!enc->nb_dst) {
1573 av_log(enc, AV_LOG_ERROR,
1574 "Encoder not connected to any sink\n");
1575 return AVERROR(EINVAL);
1576 }
1577
1578 8172 enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1579
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8172 times.
8172 if (!enc->dst_finished)
1580 return AVERROR(ENOMEM);
1581 }
1582
1583
2/2
✓ Branch 0 taken 8406 times.
✓ Branch 1 taken 8402 times.
16808 for (unsigned i = 0; i < sch->nb_mux; i++) {
1584 8406 SchMux *mux = &sch->mux[i];
1585
1586
2/2
✓ Branch 0 taken 8895 times.
✓ Branch 1 taken 8406 times.
17301 for (unsigned j = 0; j < mux->nb_streams; j++) {
1587 8895 SchMuxStream *ms = &mux->streams[j];
1588
1589
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8895 times.
8895 if (!ms->src.type) {
1590 av_log(mux, AV_LOG_ERROR,
1591 "Muxer stream #%u not connected to a source\n", j);
1592 return AVERROR(EINVAL);
1593 }
1594 }
1595
1596 8406 ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1597 QUEUE_PACKETS);
1598
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8406 times.
8406 if (ret < 0)
1599 return ret;
1600 }
1601
1602
2/2
✓ Branch 0 taken 7998 times.
✓ Branch 1 taken 8402 times.
16400 for (unsigned i = 0; i < sch->nb_filters; i++) {
1603 7998 SchFilterGraph *fg = &sch->filters[i];
1604
1605
2/2
✓ Branch 0 taken 6945 times.
✓ Branch 1 taken 7998 times.
14943 for (unsigned j = 0; j < fg->nb_inputs; j++) {
1606 6945 SchFilterIn *fi = &fg->inputs[j];
1607
1608
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6945 times.
6945 if (!fi->src.type) {
1609 av_log(fg, AV_LOG_ERROR,
1610 "Filtergraph input %u not connected to a source\n", j);
1611 return AVERROR(EINVAL);
1612 }
1613 }
1614
1615
2/2
✓ Branch 0 taken 8130 times.
✓ Branch 1 taken 7998 times.
16128 for (unsigned j = 0; j < fg->nb_outputs; j++) {
1616 8130 SchFilterOut *fo = &fg->outputs[j];
1617
1618
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8130 times.
8130 if (!fo->dst.type) {
1619 av_log(fg, AV_LOG_ERROR,
1620 "Filtergraph %u output %u not connected to a sink\n", i, j);
1621 return AVERROR(EINVAL);
1622 }
1623 }
1624 }
1625
1626 // Check that the transcoding graph has no cycles.
1627 8402 ret = check_acyclic(sch);
1628
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8402 times.
8402 if (ret < 0)
1629 return ret;
1630
1631 8402 return 0;
1632 }
1633
1634 8402 int sch_start(Scheduler *sch)
1635 {
1636 int ret;
1637
1638 8402 ret = start_prepare(sch);
1639
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8402 times.
8402 if (ret < 0)
1640 return ret;
1641
1642
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8402 times.
8402 av_assert0(sch->state == SCH_STATE_UNINIT);
1643 8402 sch->state = SCH_STATE_STARTED;
1644
1645
2/2
✓ Branch 0 taken 8406 times.
✓ Branch 1 taken 8402 times.
16808 for (unsigned i = 0; i < sch->nb_mux; i++) {
1646 8406 SchMux *mux = &sch->mux[i];
1647
1648
2/2
✓ Branch 0 taken 483 times.
✓ Branch 1 taken 7923 times.
8406 if (mux->nb_streams_ready == mux->nb_streams) {
1649 483 ret = mux_init(sch, mux);
1650
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 483 times.
483 if (ret < 0)
1651 goto fail;
1652 }
1653 }
1654
1655
2/2
✓ Branch 0 taken 8172 times.
✓ Branch 1 taken 8402 times.
16574 for (unsigned i = 0; i < sch->nb_enc; i++) {
1656 8172 SchEnc *enc = &sch->enc[i];
1657
1658 8172 ret = task_start(&enc->task);
1659
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8172 times.
8172 if (ret < 0)
1660 goto fail;
1661 }
1662
1663
2/2
✓ Branch 0 taken 7998 times.
✓ Branch 1 taken 8402 times.
16400 for (unsigned i = 0; i < sch->nb_filters; i++) {
1664 7998 SchFilterGraph *fg = &sch->filters[i];
1665
1666 7998 ret = task_start(&fg->task);
1667
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7998 times.
7998 if (ret < 0)
1668 goto fail;
1669 }
1670
1671
2/2
✓ Branch 0 taken 6920 times.
✓ Branch 1 taken 8402 times.
15322 for (unsigned i = 0; i < sch->nb_dec; i++) {
1672 6920 SchDec *dec = &sch->dec[i];
1673
1674 6920 ret = task_start(&dec->task);
1675
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6920 times.
6920 if (ret < 0)
1676 goto fail;
1677 }
1678
1679
2/2
✓ Branch 0 taken 7356 times.
✓ Branch 1 taken 8402 times.
15758 for (unsigned i = 0; i < sch->nb_demux; i++) {
1680 7356 SchDemux *d = &sch->demux[i];
1681
1682
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 7342 times.
7356 if (!d->nb_streams)
1683 14 continue;
1684
1685 7342 ret = task_start(&d->task);
1686
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7342 times.
7342 if (ret < 0)
1687 goto fail;
1688 }
1689
1690 8402 pthread_mutex_lock(&sch->schedule_lock);
1691 8402 schedule_update_locked(sch);
1692 8402 pthread_mutex_unlock(&sch->schedule_lock);
1693
1694 8402 return 0;
1695 fail:
1696 sch_stop(sch, NULL);
1697 return ret;
1698 }
1699
1700 26246 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1701 {
1702 int ret;
1703
1704 // convert delay to absolute timestamp
1705 26246 timeout_us += av_gettime();
1706
1707 26246 pthread_mutex_lock(&sch->finish_lock);
1708
1709
1/2
✓ Branch 0 taken 26246 times.
✗ Branch 1 not taken.
26246 if (sch->nb_mux_done < sch->nb_mux) {
1710 26246 struct timespec tv = { .tv_sec = timeout_us / 1000000,
1711 26246 .tv_nsec = (timeout_us % 1000000) * 1000 };
1712 26246 pthread_cond_timedwait(&sch->finish_cond, &sch->finish_lock, &tv);
1713 }
1714
1715 // abort transcoding if any task failed
1716
4/4
✓ Branch 0 taken 17845 times.
✓ Branch 1 taken 8401 times.
✓ Branch 2 taken 1 times.
✓ Branch 3 taken 17844 times.
26246 ret = sch->nb_mux_done == sch->nb_mux || sch->task_failed;
1717
1718 26246 pthread_mutex_unlock(&sch->finish_lock);
1719
1720 26246 *transcode_ts = atomic_load(&sch->last_dts);
1721
1722 26246 return ret;
1723 }
1724
1725 8130 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1726 {
1727 int ret;
1728
1729 8130 ret = enc->open_cb(enc->task.func_arg, frame);
1730
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8130 times.
8130 if (ret < 0)
1731 return ret;
1732
1733 // ret>0 signals audio frame size, which means sync queue must
1734 // have been enabled during encoder creation
1735
2/2
✓ Branch 0 taken 174 times.
✓ Branch 1 taken 7956 times.
8130 if (ret > 0) {
1736 SchSyncQueue *sq;
1737
1738
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 174 times.
174 av_assert0(enc->sq_idx[0] >= 0);
1739 174 sq = &sch->sq_enc[enc->sq_idx[0]];
1740
1741 174 pthread_mutex_lock(&sq->lock);
1742
1743 174 sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1744
1745 174 pthread_mutex_unlock(&sq->lock);
1746 }
1747
1748 8130 return 0;
1749 }
1750
1751 473824 static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1752 {
1753 int ret;
1754
1755
2/2
✓ Branch 0 taken 19870 times.
✓ Branch 1 taken 453954 times.
473824 if (!frame) {
1756 19870 tq_send_finish(enc->queue, 0);
1757 19870 return 0;
1758 }
1759
1760
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 453954 times.
453954 if (enc->in_finished)
1761 return AVERROR_EOF;
1762
1763 453954 ret = tq_send(enc->queue, 0, frame);
1764
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 453953 times.
453954 if (ret < 0)
1765 1 enc->in_finished = 1;
1766
1767 453954 return ret;
1768 }
1769
1770 38993 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1771 {
1772 38993 SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1773 38993 int ret = 0;
1774
1775 // inform the scheduling code that no more input will arrive along this path;
1776 // this is necessary because the sync queue may not send an EOF downstream
1777 // until other streams finish
1778 // TODO: consider a cleaner way of passing this information through
1779 // the pipeline
1780
2/2
✓ Branch 0 taken 6624 times.
✓ Branch 1 taken 32369 times.
38993 if (!frame) {
1781
2/2
✓ Branch 0 taken 6624 times.
✓ Branch 1 taken 6624 times.
13248 for (unsigned i = 0; i < enc->nb_dst; i++) {
1782 SchMux *mux;
1783 SchMuxStream *ms;
1784
1785
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6624 times.
6624 if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1786 continue;
1787
1788 6624 mux = &sch->mux[enc->dst[i].idx];
1789 6624 ms = &mux->streams[enc->dst[i].idx_stream];
1790
1791 6624 pthread_mutex_lock(&sch->schedule_lock);
1792
1793 6624 ms->source_finished = 1;
1794 6624 schedule_update_locked(sch);
1795
1796 6624 pthread_mutex_unlock(&sch->schedule_lock);
1797 }
1798 }
1799
1800 38993 pthread_mutex_lock(&sq->lock);
1801
1802 38993 ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1803
2/2
✓ Branch 0 taken 38992 times.
✓ Branch 1 taken 1 times.
38993 if (ret < 0)
1804 1 goto finish;
1805
1806 82911 while (1) {
1807 SchEnc *enc;
1808
1809 // TODO: the SQ API should be extended to allow returning EOF
1810 // for individual streams
1811 121903 ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1812
2/2
✓ Branch 0 taken 38992 times.
✓ Branch 1 taken 82911 times.
121903 if (ret < 0) {
1813
2/2
✓ Branch 0 taken 9604 times.
✓ Branch 1 taken 29388 times.
38992 ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1814 38992 break;
1815 }
1816
1817 82911 enc = &sch->enc[sq->enc_idx[ret]];
1818 82911 ret = send_to_enc_thread(sch, enc, sq->frame);
1819
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 82911 times.
82911 if (ret < 0) {
1820 av_frame_unref(sq->frame);
1821 if (ret != AVERROR_EOF)
1822 break;
1823
1824 sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1825 continue;
1826 }
1827 }
1828
1829
2/2
✓ Branch 0 taken 29388 times.
✓ Branch 1 taken 9604 times.
38992 if (ret < 0) {
1830 // close all encoders fed from this sync queue
1831
2/2
✓ Branch 0 taken 10064 times.
✓ Branch 1 taken 9604 times.
19668 for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1832 10064 int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1833
1834 // if the sync queue error is EOF and closing the encoder
1835 // produces a more serious error, make sure to pick the latter
1836
2/4
✓ Branch 0 taken 10064 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 10064 times.
✗ Branch 3 not taken.
10064 ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1837 }
1838 }
1839
1840 38992 finish:
1841 38993 pthread_mutex_unlock(&sq->lock);
1842
1843 38993 return ret;
1844 }
1845
1846 419845 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1847 {
1848
6/6
✓ Branch 0 taken 418760 times.
✓ Branch 1 taken 1085 times.
✓ Branch 2 taken 402372 times.
✓ Branch 3 taken 16388 times.
✓ Branch 4 taken 8130 times.
✓ Branch 5 taken 394242 times.
419845 if (enc->open_cb && frame && !enc->opened) {
1849 8130 int ret = enc_open(sch, enc, frame);
1850
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8130 times.
8130 if (ret < 0)
1851 return ret;
1852 8130 enc->opened = 1;
1853
1854 // discard empty frames that only carry encoder init parameters
1855
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 8127 times.
8130 if (!frame->buf[0]) {
1856 3 av_frame_unref(frame);
1857 3 return 0;
1858 }
1859 }
1860
1861 419842 return (enc->sq_idx[0] >= 0) ?
1862
2/2
✓ Branch 0 taken 38993 times.
✓ Branch 1 taken 380849 times.
800691 send_to_enc_sq (sch, enc, frame) :
1863 380849 send_to_enc_thread(sch, enc, frame);
1864 }
1865
1866 2697 static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
1867 {
1868 2697 PreMuxQueue *q = &ms->pre_mux_queue;
1869 2697 AVPacket *tmp_pkt = NULL;
1870 int ret;
1871
1872
2/2
✓ Branch 1 taken 94 times.
✓ Branch 2 taken 2603 times.
2697 if (!av_fifo_can_write(q->fifo)) {
1873 94 size_t packets = av_fifo_can_read(q->fifo);
1874
1/2
✓ Branch 0 taken 94 times.
✗ Branch 1 not taken.
94 size_t pkt_size = pkt ? pkt->size : 0;
1875 94 int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1876
2/2
✓ Branch 0 taken 89 times.
✓ Branch 1 taken 5 times.
94 size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1877 94 size_t new_size = FFMIN(2 * packets, max_packets);
1878
1879
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 94 times.
94 if (new_size <= packets) {
1880 av_log(mux, AV_LOG_ERROR,
1881 "Too many packets buffered for output stream.\n");
1882 return AVERROR_BUFFER_TOO_SMALL;
1883 }
1884 94 ret = av_fifo_grow2(q->fifo, new_size - packets);
1885
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 94 times.
94 if (ret < 0)
1886 return ret;
1887 }
1888
1889
2/2
✓ Branch 0 taken 2655 times.
✓ Branch 1 taken 42 times.
2697 if (pkt) {
1890 2655 tmp_pkt = av_packet_alloc();
1891
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2655 times.
2655 if (!tmp_pkt)
1892 return AVERROR(ENOMEM);
1893
1894 2655 av_packet_move_ref(tmp_pkt, pkt);
1895 2655 q->data_size += tmp_pkt->size;
1896 }
1897 2697 av_fifo_write(q->fifo, &tmp_pkt, 1);
1898
1899 2697 return 0;
1900 }
1901
1902 526365 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1903 AVPacket *pkt)
1904 {
1905 526365 SchMuxStream *ms = &mux->streams[stream_idx];
1906
2/2
✓ Branch 0 taken 509369 times.
✓ Branch 1 taken 8101 times.
517470 int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1907
2/2
✓ Branch 0 taken 517470 times.
✓ Branch 1 taken 8895 times.
1043835 av_rescale_q(pkt->dts + pkt->duration, pkt->time_base, AV_TIME_BASE_Q) :
1908 AV_NOPTS_VALUE;
1909
1910 // queue the packet if the muxer cannot be started yet
1911
2/2
✓ Branch 0 taken 2728 times.
✓ Branch 1 taken 523637 times.
526365 if (!atomic_load(&mux->mux_started)) {
1912 2728 int queued = 0;
1913
1914 // the muxer could have started between the above atomic check and
1915 // locking the mutex, then this block falls through to normal send path
1916 2728 pthread_mutex_lock(&sch->mux_ready_lock);
1917
1918
2/2
✓ Branch 0 taken 2697 times.
✓ Branch 1 taken 31 times.
2728 if (!atomic_load(&mux->mux_started)) {
1919 2697 int ret = mux_queue_packet(mux, ms, pkt);
1920
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2697 times.
2697 queued = ret < 0 ? ret : 1;
1921 }
1922
1923 2728 pthread_mutex_unlock(&sch->mux_ready_lock);
1924
1925
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2728 times.
2728 if (queued < 0)
1926 return queued;
1927
2/2
✓ Branch 0 taken 2697 times.
✓ Branch 1 taken 31 times.
2728 else if (queued)
1928 2697 goto update_schedule;
1929 }
1930
1931
2/2
✓ Branch 0 taken 514815 times.
✓ Branch 1 taken 8853 times.
523668 if (pkt) {
1932 int ret;
1933
1934
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 514815 times.
514815 if (ms->init_eof)
1935 return AVERROR_EOF;
1936
1937 514815 ret = tq_send(mux->queue, stream_idx, pkt);
1938
2/2
✓ Branch 0 taken 62 times.
✓ Branch 1 taken 514753 times.
514815 if (ret < 0)
1939 62 return ret;
1940 } else
1941 8853 tq_send_finish(mux->queue, stream_idx);
1942
1943 526303 update_schedule:
1944 // TODO: use atomics to check whether this changes trailing dts
1945 // to avoid locking unnecessarily
1946
4/4
✓ Branch 0 taken 16996 times.
✓ Branch 1 taken 509307 times.
✓ Branch 2 taken 8895 times.
✓ Branch 3 taken 8101 times.
526303 if (dts != AV_NOPTS_VALUE || !pkt) {
1947 518202 pthread_mutex_lock(&sch->schedule_lock);
1948
1949
2/2
✓ Branch 0 taken 509307 times.
✓ Branch 1 taken 8895 times.
518202 if (pkt) ms->last_dts = dts;
1950 8895 else ms->source_finished = 1;
1951
1952 518202 schedule_update_locked(sch);
1953
1954 518202 pthread_mutex_unlock(&sch->schedule_lock);
1955 }
1956
1957 526303 return 0;
1958 }
1959
1960 static int
1961 475099 demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst,
1962 uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
1963 {
1964 int ret;
1965
1966
2/2
✓ Branch 0 taken 3205 times.
✓ Branch 1 taken 471894 times.
475099 if (*dst_finished)
1967 3205 return AVERROR_EOF;
1968
1969
4/4
✓ Branch 0 taken 467457 times.
✓ Branch 1 taken 4437 times.
✓ Branch 2 taken 69667 times.
✓ Branch 3 taken 397790 times.
471894 if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
1970
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 69665 times.
69667 (flags & DEMUX_SEND_STREAMCOPY_EOF)) {
1971 2 av_packet_unref(pkt);
1972 2 pkt = NULL;
1973 }
1974
1975
2/2
✓ Branch 0 taken 4439 times.
✓ Branch 1 taken 467455 times.
471894 if (!pkt)
1976 4439 goto finish;
1977
1978 934910 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
1979
2/2
✓ Branch 0 taken 69665 times.
✓ Branch 1 taken 397790 times.
467455 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
1980 397790 tq_send(sch->dec[dst.idx].queue, 0, pkt);
1981
2/2
✓ Branch 0 taken 3203 times.
✓ Branch 1 taken 464252 times.
467455 if (ret == AVERROR_EOF)
1982 3203 goto finish;
1983
1984 464252 return ret;
1985
1986 7642 finish:
1987
2/2
✓ Branch 0 taken 723 times.
✓ Branch 1 taken 6919 times.
7642 if (dst.type == SCH_NODE_TYPE_MUX)
1988 723 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
1989 else
1990 6919 tq_send_finish(sch->dec[dst.idx].queue, 0);
1991
1992 7642 *dst_finished = 1;
1993 7642 return AVERROR_EOF;
1994 }
1995
1996 474632 static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds,
1997 AVPacket *pkt, unsigned flags)
1998 {
1999 474632 unsigned nb_done = 0;
2000
2001
2/2
✓ Branch 0 taken 475099 times.
✓ Branch 1 taken 474632 times.
949731 for (unsigned i = 0; i < ds->nb_dst; i++) {
2002 475099 AVPacket *to_send = pkt;
2003 475099 uint8_t *finished = &ds->dst_finished[i];
2004
2005 int ret;
2006
2007 // sending a packet consumes it, so make a temporary reference if needed
2008
4/4
✓ Branch 0 taken 467457 times.
✓ Branch 1 taken 7642 times.
✓ Branch 2 taken 441 times.
✓ Branch 3 taken 467016 times.
475099 if (pkt && i < ds->nb_dst - 1) {
2009 441 to_send = d->send_pkt;
2010
2011 441 ret = av_packet_ref(to_send, pkt);
2012
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 441 times.
441 if (ret < 0)
2013 return ret;
2014 }
2015
2016 475099 ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
2017
2/2
✓ Branch 0 taken 467457 times.
✓ Branch 1 taken 7642 times.
475099 if (to_send)
2018 467457 av_packet_unref(to_send);
2019
2/2
✓ Branch 0 taken 10847 times.
✓ Branch 1 taken 464252 times.
475099 if (ret == AVERROR_EOF)
2020 10847 nb_done++;
2021
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 464252 times.
464252 else if (ret < 0)
2022 return ret;
2023 }
2024
2025
2/2
✓ Branch 0 taken 10821 times.
✓ Branch 1 taken 463811 times.
474632 return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
2026 }
2027
2028 11 static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
2029 {
2030 11 Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
2031
2032
3/6
✓ Branch 0 taken 11 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 11 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 11 times.
11 av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
2033
2034
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 11 times.
25 for (unsigned i = 0; i < d->nb_streams; i++) {
2035 14 SchDemuxStream *ds = &d->streams[i];
2036
2037
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 14 times.
28 for (unsigned j = 0; j < ds->nb_dst; j++) {
2038 14 const SchedulerNode *dst = &ds->dst[j];
2039 SchDec *dec;
2040 int ret;
2041
2042
3/4
✓ Branch 0 taken 14 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
✓ Branch 3 taken 6 times.
14 if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
2043 8 continue;
2044
2045 6 dec = &sch->dec[dst->idx];
2046
2047 6 ret = tq_send(dec->queue, 0, pkt);
2048
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0)
2049 return ret;
2050
2051
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 3 times.
6 if (dec->queue_end_ts) {
2052 Timestamp ts;
2053 3 ret = av_thread_message_queue_recv(dec->queue_end_ts, &ts, 0);
2054
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret < 0)
2055 return ret;
2056
2057
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (max_end_ts.ts == AV_NOPTS_VALUE ||
2058 (ts.ts != AV_NOPTS_VALUE &&
2059 av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
2060 3 max_end_ts = ts;
2061
2062 }
2063 }
2064 }
2065
2066 11 pkt->pts = max_end_ts.ts;
2067 11 pkt->time_base = max_end_ts.tb;
2068
2069 11 return 0;
2070 }
2071
2072 467422 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
2073 unsigned flags)
2074 {
2075 SchDemux *d;
2076 int terminate;
2077
2078
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 467422 times.
467422 av_assert0(demux_idx < sch->nb_demux);
2079 467422 d = &sch->demux[demux_idx];
2080
2081 467422 terminate = waiter_wait(sch, &d->waiter);
2082
2/2
✓ Branch 0 taken 395 times.
✓ Branch 1 taken 467027 times.
467422 if (terminate)
2083 395 return AVERROR_EXIT;
2084
2085 // flush the downstreams after seek
2086
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 467016 times.
467027 if (pkt->stream_index == -1)
2087 11 return demux_flush(sch, d, pkt);
2088
2089
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 467016 times.
467016 av_assert0(pkt->stream_index < d->nb_streams);
2090
2091 467016 return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
2092 }
2093
2094 7356 static int demux_done(Scheduler *sch, unsigned demux_idx)
2095 {
2096 7356 SchDemux *d = &sch->demux[demux_idx];
2097 7356 int ret = 0;
2098
2099
2/2
✓ Branch 0 taken 7616 times.
✓ Branch 1 taken 7356 times.
14972 for (unsigned i = 0; i < d->nb_streams; i++) {
2100 7616 int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
2101
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7616 times.
7616 if (err != AVERROR_EOF)
2102 ret = err_merge(ret, err);
2103 }
2104
2105 7356 pthread_mutex_lock(&sch->schedule_lock);
2106
2107 7356 d->task_exited = 1;
2108
2109 7356 schedule_update_locked(sch);
2110
2111 7356 pthread_mutex_unlock(&sch->schedule_lock);
2112
2113 7356 return ret;
2114 }
2115
2116 534424 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2117 {
2118 SchMux *mux;
2119 int ret, stream_idx;
2120
2121
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 534424 times.
534424 av_assert0(mux_idx < sch->nb_mux);
2122 534424 mux = &sch->mux[mux_idx];
2123
2124 534424 ret = tq_receive(mux->queue, &stream_idx, pkt);
2125 534424 pkt->stream_index = stream_idx;
2126 534424 return ret;
2127 }
2128
2129 209 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2130 {
2131 SchMux *mux;
2132
2133
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 209 times.
209 av_assert0(mux_idx < sch->nb_mux);
2134 209 mux = &sch->mux[mux_idx];
2135
2136
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 209 times.
209 av_assert0(stream_idx < mux->nb_streams);
2137 209 tq_receive_finish(mux->queue, stream_idx);
2138
2139 209 pthread_mutex_lock(&sch->schedule_lock);
2140 209 mux->streams[stream_idx].source_finished = 1;
2141
2142 209 schedule_update_locked(sch);
2143
2144 209 pthread_mutex_unlock(&sch->schedule_lock);
2145 209 }
2146
2147 478041 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2148 const AVPacket *pkt)
2149 {
2150 SchMux *mux;
2151 SchMuxStream *ms;
2152
2153
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 478041 times.
478041 av_assert0(mux_idx < sch->nb_mux);
2154 478041 mux = &sch->mux[mux_idx];
2155
2156
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 478041 times.
478041 av_assert0(stream_idx < mux->nb_streams);
2157 478041 ms = &mux->streams[stream_idx];
2158
2159
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 478041 times.
478046 for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2160 5 SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2161 int ret;
2162
2163 5 ret = av_packet_copy_props(mux->sub_heartbeat_pkt, pkt);
2164
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
5 if (ret < 0)
2165 return ret;
2166
2167 5 tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2168 }
2169
2170 478041 return 0;
2171 }
2172
2173 8406 static int mux_done(Scheduler *sch, unsigned mux_idx)
2174 {
2175 8406 SchMux *mux = &sch->mux[mux_idx];
2176
2177 8406 pthread_mutex_lock(&sch->schedule_lock);
2178
2179
2/2
✓ Branch 0 taken 8895 times.
✓ Branch 1 taken 8406 times.
17301 for (unsigned i = 0; i < mux->nb_streams; i++) {
2180 8895 tq_receive_finish(mux->queue, i);
2181 8895 mux->streams[i].source_finished = 1;
2182 }
2183
2184 8406 schedule_update_locked(sch);
2185
2186 8406 pthread_mutex_unlock(&sch->schedule_lock);
2187
2188 8406 pthread_mutex_lock(&sch->finish_lock);
2189
2190
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8406 times.
8406 av_assert0(sch->nb_mux_done < sch->nb_mux);
2191 8406 sch->nb_mux_done++;
2192
2193 8406 pthread_cond_signal(&sch->finish_cond);
2194
2195 8406 pthread_mutex_unlock(&sch->finish_lock);
2196
2197 8406 return 0;
2198 }
2199
2200 370103 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2201 {
2202 SchDec *dec;
2203 int ret, dummy;
2204
2205
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 370103 times.
370103 av_assert0(dec_idx < sch->nb_dec);
2206 370103 dec = &sch->dec[dec_idx];
2207
2208 // the decoder should have given us post-flush end timestamp in pkt
2209
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 370100 times.
370103 if (dec->expect_end_ts) {
2210 3 Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2211 3 ret = av_thread_message_queue_send(dec->queue_end_ts, &ts, 0);
2212
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret < 0)
2213 return ret;
2214
2215 3 dec->expect_end_ts = 0;
2216 }
2217
2218 370103 ret = tq_receive(dec->queue, &dummy, pkt);
2219
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 370103 times.
370103 av_assert0(dummy <= 0);
2220
2221 // got a flush packet, on the next call to this function the decoder
2222 // will give us post-flush end timestamp
2223
7/8
✓ Branch 0 taken 366729 times.
✓ Branch 1 taken 3374 times.
✓ Branch 2 taken 958 times.
✓ Branch 3 taken 365771 times.
✓ Branch 4 taken 958 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 3 times.
✓ Branch 7 taken 955 times.
370103 if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2224 3 dec->expect_end_ts = 1;
2225
2226 370103 return ret;
2227 }
2228
2229 398823 static int send_to_filter(Scheduler *sch, SchFilterGraph *fg,
2230 unsigned in_idx, AVFrame *frame)
2231 {
2232
2/2
✓ Branch 0 taken 391878 times.
✓ Branch 1 taken 6945 times.
398823 if (frame)
2233 391878 return tq_send(fg->queue, in_idx, frame);
2234
2235
1/2
✓ Branch 0 taken 6945 times.
✗ Branch 1 not taken.
6945 if (!fg->inputs[in_idx].send_finished) {
2236 6945 fg->inputs[in_idx].send_finished = 1;
2237 6945 tq_send_finish(fg->queue, in_idx);
2238
2239 // close the control stream when all actual inputs are done
2240
2/2
✓ Branch 0 taken 6859 times.
✓ Branch 1 taken 86 times.
6945 if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
2241 6859 tq_send_finish(fg->queue, fg->nb_inputs);
2242 }
2243 6945 return 0;
2244 }
2245
2246 403534 static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2247 uint8_t *dst_finished, AVFrame *frame)
2248 {
2249 int ret;
2250
2251
2/2
✓ Branch 0 taken 7222 times.
✓ Branch 1 taken 396312 times.
403534 if (*dst_finished)
2252 7222 return AVERROR_EOF;
2253
2254
2/2
✓ Branch 0 taken 3391 times.
✓ Branch 1 taken 392921 times.
396312 if (!frame)
2255 3391 goto finish;
2256
2257 785842 ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2258
2/2
✓ Branch 0 taken 391878 times.
✓ Branch 1 taken 1043 times.
392921 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2259 1043 send_to_enc(sch, &sch->enc[dst.idx], frame);
2260
2/2
✓ Branch 0 taken 3596 times.
✓ Branch 1 taken 389325 times.
392921 if (ret == AVERROR_EOF)
2261 3596 goto finish;
2262
2263 389325 return ret;
2264
2265 6987 finish:
2266
2/2
✓ Branch 0 taken 6945 times.
✓ Branch 1 taken 42 times.
6987 if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2267 6945 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2268 else
2269 42 send_to_enc(sch, &sch->enc[dst.idx], NULL);
2270
2271 6987 *dst_finished = 1;
2272
2273 6987 return AVERROR_EOF;
2274 }
2275
2276 395007 int sch_dec_send(Scheduler *sch, unsigned dec_idx,
2277 unsigned out_idx, AVFrame *frame)
2278 {
2279 SchDec *dec;
2280 SchDecOutput *o;
2281 int ret;
2282 395007 unsigned nb_done = 0;
2283
2284
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 395007 times.
395007 av_assert0(dec_idx < sch->nb_dec);
2285 395007 dec = &sch->dec[dec_idx];
2286
2287
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 395007 times.
395007 av_assert0(out_idx < dec->nb_outputs);
2288 395007 o = &dec->outputs[out_idx];
2289
2290
2/2
✓ Branch 0 taken 396547 times.
✓ Branch 1 taken 395007 times.
791554 for (unsigned i = 0; i < o->nb_dst; i++) {
2291 396547 uint8_t *finished = &o->dst_finished[i];
2292 396547 AVFrame *to_send = frame;
2293
2294 // sending a frame consumes it, so make a temporary reference if needed
2295
2/2
✓ Branch 0 taken 1540 times.
✓ Branch 1 taken 395007 times.
396547 if (i < o->nb_dst - 1) {
2296 1540 to_send = dec->send_frame;
2297
2298 // frame may sometimes contain props only,
2299 // e.g. to signal EOF timestamp
2300
2/2
✓ Branch 0 taken 1433 times.
✓ Branch 1 taken 107 times.
1540 ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2301 107 av_frame_copy_props(to_send, frame);
2302
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1540 times.
1540 if (ret < 0)
2303 return ret;
2304 }
2305
2306 396547 ret = dec_send_to_dst(sch, o->dst[i], finished, to_send);
2307
2/2
✓ Branch 0 taken 7222 times.
✓ Branch 1 taken 389325 times.
396547 if (ret < 0) {
2308 7222 av_frame_unref(to_send);
2309
1/2
✓ Branch 0 taken 7222 times.
✗ Branch 1 not taken.
7222 if (ret == AVERROR_EOF) {
2310 7222 nb_done++;
2311 7222 continue;
2312 }
2313 return ret;
2314 }
2315 }
2316
2317
2/2
✓ Branch 0 taken 7098 times.
✓ Branch 1 taken 387909 times.
395007 return (nb_done == o->nb_dst) ? AVERROR_EOF : 0;
2318 }
2319
2320 6920 static int dec_done(Scheduler *sch, unsigned dec_idx)
2321 {
2322 6920 SchDec *dec = &sch->dec[dec_idx];
2323 6920 int ret = 0;
2324
2325 6920 tq_receive_finish(dec->queue, 0);
2326
2327 // make sure our source does not get stuck waiting for end timestamps
2328 // that will never arrive
2329
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6919 times.
6920 if (dec->queue_end_ts)
2330 1 av_thread_message_queue_set_err_recv(dec->queue_end_ts, AVERROR_EOF);
2331
2332
2/2
✓ Branch 0 taken 6926 times.
✓ Branch 1 taken 6920 times.
13846 for (unsigned i = 0; i < dec->nb_outputs; i++) {
2333 6926 SchDecOutput *o = &dec->outputs[i];
2334
2335
2/2
✓ Branch 0 taken 6987 times.
✓ Branch 1 taken 6926 times.
13913 for (unsigned j = 0; j < o->nb_dst; j++) {
2336 6987 int err = dec_send_to_dst(sch, o->dst[j], &o->dst_finished[j], NULL);
2337
2/4
✓ Branch 0 taken 6987 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6987 times.
6987 if (err < 0 && err != AVERROR_EOF)
2338 ret = err_merge(ret, err);
2339 }
2340 }
2341
2342 6920 return ret;
2343 }
2344
2345 462124 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2346 {
2347 SchEnc *enc;
2348 int ret, dummy;
2349
2350
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 462124 times.
462124 av_assert0(enc_idx < sch->nb_enc);
2351 462124 enc = &sch->enc[enc_idx];
2352
2353 462124 ret = tq_receive(enc->queue, &dummy, frame);
2354
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 462124 times.
462124 av_assert0(dummy <= 0);
2355
2356 462124 return ret;
2357 }
2358
2359 456054 static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2360 uint8_t *dst_finished, AVPacket *pkt)
2361 {
2362 int ret;
2363
2364
2/2
✓ Branch 0 taken 28 times.
✓ Branch 1 taken 456026 times.
456054 if (*dst_finished)
2365 28 return AVERROR_EOF;
2366
2367
2/2
✓ Branch 0 taken 8171 times.
✓ Branch 1 taken 447855 times.
456026 if (!pkt)
2368 8171 goto finish;
2369
2370 895710 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2371
2/2
✓ Branch 0 taken 447805 times.
✓ Branch 1 taken 50 times.
447855 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2372 50 tq_send(sch->dec[dst.idx].queue, 0, pkt);
2373
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 447853 times.
447855 if (ret == AVERROR_EOF)
2374 2 goto finish;
2375
2376 447853 return ret;
2377
2378 8173 finish:
2379
2/2
✓ Branch 0 taken 8172 times.
✓ Branch 1 taken 1 times.
8173 if (dst.type == SCH_NODE_TYPE_MUX)
2380 8172 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2381 else
2382 1 tq_send_finish(sch->dec[dst.idx].queue, 0);
2383
2384 8173 *dst_finished = 1;
2385
2386 8173 return AVERROR_EOF;
2387 }
2388
2389 447831 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2390 {
2391 SchEnc *enc;
2392 int ret;
2393
2394
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 447831 times.
447831 av_assert0(enc_idx < sch->nb_enc);
2395 447831 enc = &sch->enc[enc_idx];
2396
2397
2/2
✓ Branch 0 taken 447881 times.
✓ Branch 1 taken 447831 times.
895712 for (unsigned i = 0; i < enc->nb_dst; i++) {
2398 447881 uint8_t *finished = &enc->dst_finished[i];
2399 447881 AVPacket *to_send = pkt;
2400
2401 // sending a packet consumes it, so make a temporary reference if needed
2402
2/2
✓ Branch 0 taken 50 times.
✓ Branch 1 taken 447831 times.
447881 if (i < enc->nb_dst - 1) {
2403 50 to_send = enc->send_pkt;
2404
2405 50 ret = av_packet_ref(to_send, pkt);
2406
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50 times.
50 if (ret < 0)
2407 return ret;
2408 }
2409
2410 447881 ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2411
2/2
✓ Branch 0 taken 28 times.
✓ Branch 1 taken 447853 times.
447881 if (ret < 0) {
2412 28 av_packet_unref(to_send);
2413
1/2
✓ Branch 0 taken 28 times.
✗ Branch 1 not taken.
28 if (ret == AVERROR_EOF)
2414 28 continue;
2415 return ret;
2416 }
2417 }
2418
2419 447831 return 0;
2420 }
2421
2422 8172 static int enc_done(Scheduler *sch, unsigned enc_idx)
2423 {
2424 8172 SchEnc *enc = &sch->enc[enc_idx];
2425 8172 int ret = 0;
2426
2427 8172 tq_receive_finish(enc->queue, 0);
2428
2429
2/2
✓ Branch 0 taken 8173 times.
✓ Branch 1 taken 8172 times.
16345 for (unsigned i = 0; i < enc->nb_dst; i++) {
2430 8173 int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2431
2/4
✓ Branch 0 taken 8173 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 8173 times.
8173 if (err < 0 && err != AVERROR_EOF)
2432 ret = err_merge(ret, err);
2433 }
2434
2435 8172 return ret;
2436 }
2437
2438 410985 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
2439 unsigned *in_idx, AVFrame *frame)
2440 {
2441 SchFilterGraph *fg;
2442
2443
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 410985 times.
410985 av_assert0(fg_idx < sch->nb_filters);
2444 410985 fg = &sch->filters[fg_idx];
2445
2446
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 410985 times.
410985 av_assert0(*in_idx <= fg->nb_inputs);
2447
2448 // update scheduling to account for desired input stream, if it changed
2449 //
2450 // this check needs no locking because only the filtering thread
2451 // updates this value
2452
2/2
✓ Branch 0 taken 8135 times.
✓ Branch 1 taken 402850 times.
410985 if (*in_idx != fg->best_input) {
2453 8135 pthread_mutex_lock(&sch->schedule_lock);
2454
2455 8135 fg->best_input = *in_idx;
2456 8135 schedule_update_locked(sch);
2457
2458 8135 pthread_mutex_unlock(&sch->schedule_lock);
2459 }
2460
2461
2/2
✓ Branch 0 taken 381444 times.
✓ Branch 1 taken 29541 times.
410985 if (*in_idx == fg->nb_inputs) {
2462 29541 int terminate = waiter_wait(sch, &fg->waiter);
2463
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29541 times.
29541 return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
2464 }
2465
2466 32 while (1) {
2467 int ret, idx;
2468
2469 381476 ret = tq_receive(fg->queue, &idx, frame);
2470
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 381469 times.
381476 if (idx < 0)
2471 381444 return AVERROR_EOF;
2472
2/2
✓ Branch 0 taken 381437 times.
✓ Branch 1 taken 32 times.
381469 else if (ret >= 0) {
2473 381437 *in_idx = idx;
2474 381437 return 0;
2475 }
2476
2477 // disregard EOFs for specific streams - they should always be
2478 // preceded by an EOF frame
2479 }
2480 }
2481
2482 2 void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
2483 {
2484 SchFilterGraph *fg;
2485 SchFilterIn *fi;
2486
2487
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 av_assert0(fg_idx < sch->nb_filters);
2488 2 fg = &sch->filters[fg_idx];
2489
2490
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 av_assert0(in_idx < fg->nb_inputs);
2491 2 fi = &fg->inputs[in_idx];
2492
2493 2 pthread_mutex_lock(&sch->schedule_lock);
2494
2495
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (!fi->receive_finished) {
2496 2 fi->receive_finished = 1;
2497 2 tq_receive_finish(fg->queue, in_idx);
2498
2499 // close the control stream when all actual inputs are done
2500
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
2501 1 tq_receive_finish(fg->queue, fg->nb_inputs);
2502
2503 2 schedule_update_locked(sch);
2504 }
2505
2506 2 pthread_mutex_unlock(&sch->schedule_lock);
2507 2 }
2508
2509 407442 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
2510 {
2511 SchFilterGraph *fg;
2512 SchedulerNode dst;
2513 int ret;
2514
2515
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 407442 times.
407442 av_assert0(fg_idx < sch->nb_filters);
2516 407442 fg = &sch->filters[fg_idx];
2517
2518
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 407442 times.
407442 av_assert0(out_idx < fg->nb_outputs);
2519 407442 dst = fg->outputs[out_idx].dst;
2520
2521
1/2
✓ Branch 0 taken 407442 times.
✗ Branch 1 not taken.
407442 if (dst.type == SCH_NODE_TYPE_ENC) {
2522 407442 ret = send_to_enc(sch, &sch->enc[dst.idx], frame);
2523
2/2
✓ Branch 0 taken 3188 times.
✓ Branch 1 taken 404254 times.
407442 if (ret == AVERROR_EOF)
2524 3188 send_to_enc(sch, &sch->enc[dst.idx], NULL);
2525 } else {
2526 ret = send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
2527 if (ret == AVERROR_EOF)
2528 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2529 }
2530 407442 return ret;
2531 }
2532
2533 7998 static int filter_done(Scheduler *sch, unsigned fg_idx)
2534 {
2535 7998 SchFilterGraph *fg = &sch->filters[fg_idx];
2536 7998 int ret = 0;
2537
2538
2/2
✓ Branch 0 taken 14943 times.
✓ Branch 1 taken 7998 times.
22941 for (unsigned i = 0; i <= fg->nb_inputs; i++)
2539 14943 tq_receive_finish(fg->queue, i);
2540
2541
2/2
✓ Branch 0 taken 8130 times.
✓ Branch 1 taken 7998 times.
16128 for (unsigned i = 0; i < fg->nb_outputs; i++) {
2542 8130 SchedulerNode dst = fg->outputs[i].dst;
2543 16260 int err = (dst.type == SCH_NODE_TYPE_ENC) ?
2544
1/2
✓ Branch 0 taken 8130 times.
✗ Branch 1 not taken.
8130 send_to_enc (sch, &sch->enc[dst.idx], NULL) :
2545 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2546
2547
3/4
✓ Branch 0 taken 3231 times.
✓ Branch 1 taken 4899 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 3231 times.
8130 if (err < 0 && err != AVERROR_EOF)
2548 ret = err_merge(ret, err);
2549 }
2550
2551 7998 pthread_mutex_lock(&sch->schedule_lock);
2552
2553 7998 fg->task_exited = 1;
2554
2555 7998 schedule_update_locked(sch);
2556
2557 7998 pthread_mutex_unlock(&sch->schedule_lock);
2558
2559 7998 return ret;
2560 }
2561
2562 int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
2563 {
2564 SchFilterGraph *fg;
2565
2566 av_assert0(fg_idx < sch->nb_filters);
2567 fg = &sch->filters[fg_idx];
2568
2569 return send_to_filter(sch, fg, fg->nb_inputs, frame);
2570 }
2571
2572 6854 void sch_filter_choke_inputs(Scheduler *sch, unsigned fg_idx)
2573 {
2574 SchFilterGraph *fg;
2575
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6854 times.
6854 av_assert0(fg_idx < sch->nb_filters);
2576 6854 fg = &sch->filters[fg_idx];
2577
2578 6854 pthread_mutex_lock(&sch->schedule_lock);
2579 6854 fg->best_input = fg->nb_inputs;
2580 6854 schedule_update_locked(sch);
2581 6854 pthread_mutex_unlock(&sch->schedule_lock);
2582 6854 }
2583
2584 38852 static int task_cleanup(Scheduler *sch, SchedulerNode node)
2585 {
2586
5/6
✓ Branch 0 taken 7356 times.
✓ Branch 1 taken 8406 times.
✓ Branch 2 taken 6920 times.
✓ Branch 3 taken 8172 times.
✓ Branch 4 taken 7998 times.
✗ Branch 5 not taken.
38852 switch (node.type) {
2587 7356 case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
2588 8406 case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
2589 6920 case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
2590 8172 case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
2591 7998 case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
2592 default: av_assert0(0);
2593 }
2594 }
2595
2596 38838 static void *task_wrapper(void *arg)
2597 {
2598 38838 SchTask *task = arg;
2599 38838 Scheduler *sch = task->parent;
2600 int ret;
2601 38838 int err = 0;
2602
2603 38838 ret = task->func(task->func_arg);
2604
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 38837 times.
38838 if (ret < 0)
2605 1 av_log(task->func_arg, AV_LOG_ERROR,
2606 1 "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
2607
2608 38838 err = task_cleanup(sch, task->node);
2609 38838 ret = err_merge(ret, err);
2610
2611 // EOF is considered normal termination
2612
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38838 times.
38838 if (ret == AVERROR_EOF)
2613 ret = 0;
2614
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 38837 times.
38838 if (ret < 0) {
2615 1 pthread_mutex_lock(&sch->finish_lock);
2616 1 sch->task_failed = 1;
2617 1 pthread_cond_signal(&sch->finish_cond);
2618 1 pthread_mutex_unlock(&sch->finish_lock);
2619 }
2620
2621
4/4
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 38837 times.
✓ Branch 2 taken 1 times.
✓ Branch 3 taken 38837 times.
38839 av_log(task->func_arg, ret < 0 ? AV_LOG_ERROR : AV_LOG_VERBOSE,
2622 "Terminating thread with return code %d (%s)\n", ret,
2623 1 ret < 0 ? av_err2str(ret) : "success");
2624
2625 38838 return (void*)(intptr_t)ret;
2626 }
2627
2628 38852 static int task_stop(Scheduler *sch, SchTask *task)
2629 {
2630 int ret;
2631 void *thread_ret;
2632
2633
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 38838 times.
38852 if (!task->thread_running)
2634 14 return task_cleanup(sch, task->node);
2635
2636 38838 ret = pthread_join(task->thread, &thread_ret);
2637
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38838 times.
38838 av_assert0(ret == 0);
2638
2639 38838 task->thread_running = 0;
2640
2641 38838 return (intptr_t)thread_ret;
2642 }
2643
2644 16805 int sch_stop(Scheduler *sch, int64_t *finish_ts)
2645 {
2646 16805 int ret = 0, err;
2647
2648
2/2
✓ Branch 0 taken 8403 times.
✓ Branch 1 taken 8402 times.
16805 if (sch->state != SCH_STATE_STARTED)
2649 8403 return 0;
2650
2651 8402 atomic_store(&sch->terminate, 1);
2652
2653
2/2
✓ Branch 0 taken 16804 times.
✓ Branch 1 taken 8402 times.
25206 for (unsigned type = 0; type < 2; type++)
2654
4/4
✓ Branch 0 taken 15758 times.
✓ Branch 1 taken 16400 times.
✓ Branch 2 taken 15354 times.
✓ Branch 3 taken 16804 times.
32158 for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
2655
2/2
✓ Branch 0 taken 7356 times.
✓ Branch 1 taken 7998 times.
15354 SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
2656 15354 waiter_set(w, 1);
2657
2/2
✓ Branch 0 taken 7356 times.
✓ Branch 1 taken 7998 times.
15354 if (type)
2658 7356 choke_demux(sch, i, 0); // unfreeze to allow draining
2659 }
2660
2661
2/2
✓ Branch 0 taken 7356 times.
✓ Branch 1 taken 8402 times.
15758 for (unsigned i = 0; i < sch->nb_demux; i++) {
2662 7356 SchDemux *d = &sch->demux[i];
2663
2664 7356 err = task_stop(sch, &d->task);
2665 7356 ret = err_merge(ret, err);
2666 }
2667
2668
2/2
✓ Branch 0 taken 6920 times.
✓ Branch 1 taken 8402 times.
15322 for (unsigned i = 0; i < sch->nb_dec; i++) {
2669 6920 SchDec *dec = &sch->dec[i];
2670
2671 6920 err = task_stop(sch, &dec->task);
2672 6920 ret = err_merge(ret, err);
2673 }
2674
2675
2/2
✓ Branch 0 taken 7998 times.
✓ Branch 1 taken 8402 times.
16400 for (unsigned i = 0; i < sch->nb_filters; i++) {
2676 7998 SchFilterGraph *fg = &sch->filters[i];
2677
2678 7998 err = task_stop(sch, &fg->task);
2679 7998 ret = err_merge(ret, err);
2680 }
2681
2682
2/2
✓ Branch 0 taken 8172 times.
✓ Branch 1 taken 8402 times.
16574 for (unsigned i = 0; i < sch->nb_enc; i++) {
2683 8172 SchEnc *enc = &sch->enc[i];
2684
2685 8172 err = task_stop(sch, &enc->task);
2686 8172 ret = err_merge(ret, err);
2687 }
2688
2689
2/2
✓ Branch 0 taken 8406 times.
✓ Branch 1 taken 8402 times.
16808 for (unsigned i = 0; i < sch->nb_mux; i++) {
2690 8406 SchMux *mux = &sch->mux[i];
2691
2692 8406 err = task_stop(sch, &mux->task);
2693 8406 ret = err_merge(ret, err);
2694 }
2695
2696
1/2
✓ Branch 0 taken 8402 times.
✗ Branch 1 not taken.
8402 if (finish_ts)
2697 8402 *finish_ts = trailing_dts(sch, 1);
2698
2699 8402 sch->state = SCH_STATE_STOPPED;
2700
2701 8402 return ret;
2702 }
2703