FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/fftools/ffmpeg_sched.c
Date: 2025-06-23 20:06:14
Exec Total Coverage
Lines: 1067 1220 87.5%
Functions: 65 67 97.0%
Branches: 587 811 72.4%

Line Branch Exec Source
1 /*
2 * Inter-thread scheduling/synchronization.
3 * Copyright (c) 2023 Anton Khirnov
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31
32 #include "libavcodec/packet.h"
33
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
40 #include "libavutil/threadmessage.h"
41 #include "libavutil/time.h"
42
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
46
47 enum QueueType {
48 QUEUE_PACKETS,
49 QUEUE_FRAMES,
50 };
51
52 typedef struct SchWaiter {
53 pthread_mutex_t lock;
54 pthread_cond_t cond;
55 atomic_int choked;
56
57 // the following are internal state of schedule_update_locked() and must not
58 // be accessed outside of it
59 int choked_prev;
60 int choked_next;
61 } SchWaiter;
62
63 typedef struct SchTask {
64 Scheduler *parent;
65 SchedulerNode node;
66
67 SchThreadFunc func;
68 void *func_arg;
69
70 pthread_t thread;
71 int thread_running;
72 } SchTask;
73
74 typedef struct SchDecOutput {
75 SchedulerNode *dst;
76 uint8_t *dst_finished;
77 unsigned nb_dst;
78 } SchDecOutput;
79
80 typedef struct SchDec {
81 const AVClass *class;
82
83 SchedulerNode src;
84
85 SchDecOutput *outputs;
86 unsigned nb_outputs;
87
88 SchTask task;
89 // Queue for receiving input packets, one stream.
90 ThreadQueue *queue;
91
92 // Queue for sending post-flush end timestamps back to the source
93 AVThreadMessageQueue *queue_end_ts;
94 int expect_end_ts;
95
96 // temporary storage used by sch_dec_send()
97 AVFrame *send_frame;
98 } SchDec;
99
100 typedef struct SchSyncQueue {
101 SyncQueue *sq;
102 AVFrame *frame;
103 pthread_mutex_t lock;
104
105 unsigned *enc_idx;
106 unsigned nb_enc_idx;
107 } SchSyncQueue;
108
109 typedef struct SchEnc {
110 const AVClass *class;
111
112 SchedulerNode src;
113 SchedulerNode *dst;
114 uint8_t *dst_finished;
115 unsigned nb_dst;
116
117 // [0] - index of the sync queue in Scheduler.sq_enc,
118 // [1] - index of this encoder in the sq
119 int sq_idx[2];
120
121 /* Opening encoders is somewhat nontrivial due to their interaction with
122 * sync queues, which are (among other things) responsible for maintaining
123 * constant audio frame size, when it is required by the encoder.
124 *
125 * Opening the encoder requires stream parameters, obtained from the first
126 * frame. However, that frame cannot be properly chunked by the sync queue
127 * without knowing the required frame size, which is only available after
128 * opening the encoder.
129 *
130 * This apparent circular dependency is resolved in the following way:
131 * - the caller creating the encoder gives us a callback which opens the
132 * encoder and returns the required frame size (if any)
133 * - when the first frame is sent to the encoder, the sending thread
134 * - calls this callback, opening the encoder
135 * - passes the returned frame size to the sync queue
136 */
137 int (*open_cb)(void *opaque, const AVFrame *frame);
138 int opened;
139
140 SchTask task;
141 // Queue for receiving input frames, one stream.
142 ThreadQueue *queue;
143 // tq_send() to queue returned EOF
144 int in_finished;
145
146 // temporary storage used by sch_enc_send()
147 AVPacket *send_pkt;
148 } SchEnc;
149
150 typedef struct SchDemuxStream {
151 SchedulerNode *dst;
152 uint8_t *dst_finished;
153 unsigned nb_dst;
154 } SchDemuxStream;
155
156 typedef struct SchDemux {
157 const AVClass *class;
158
159 SchDemuxStream *streams;
160 unsigned nb_streams;
161
162 SchTask task;
163 SchWaiter waiter;
164
165 // temporary storage used by sch_demux_send()
166 AVPacket *send_pkt;
167
168 // protected by schedule_lock
169 int task_exited;
170 } SchDemux;
171
172 typedef struct PreMuxQueue {
173 /**
174 * Queue for buffering the packets before the muxer task can be started.
175 */
176 AVFifo *fifo;
177 /**
178 * Maximum number of packets in fifo.
179 */
180 int max_packets;
181 /*
182 * The size of the AVPackets' buffers in queue.
183 * Updated when a packet is either pushed or pulled from the queue.
184 */
185 size_t data_size;
186 /* Threshold after which max_packets will be in effect */
187 size_t data_threshold;
188 } PreMuxQueue;
189
190 typedef struct SchMuxStream {
191 SchedulerNode src;
192 SchedulerNode src_sched;
193
194 unsigned *sub_heartbeat_dst;
195 unsigned nb_sub_heartbeat_dst;
196
197 PreMuxQueue pre_mux_queue;
198
199 // an EOF was generated while flushing the pre-mux queue
200 int init_eof;
201
202 ////////////////////////////////////////////////////////////
203 // The following are protected by Scheduler.schedule_lock //
204
205 /* dts+duration of the last packet sent to this stream
206 in AV_TIME_BASE_Q */
207 int64_t last_dts;
208 // this stream no longer accepts input
209 int source_finished;
210 ////////////////////////////////////////////////////////////
211 } SchMuxStream;
212
213 typedef struct SchMux {
214 const AVClass *class;
215
216 SchMuxStream *streams;
217 unsigned nb_streams;
218 unsigned nb_streams_ready;
219
220 int (*init)(void *arg);
221
222 SchTask task;
223 /**
224 * Set to 1 after starting the muxer task and flushing the
225 * pre-muxing queues.
226 * Set either before any tasks have started, or with
227 * Scheduler.mux_ready_lock held.
228 */
229 atomic_int mux_started;
230 ThreadQueue *queue;
231 unsigned queue_size;
232
233 AVPacket *sub_heartbeat_pkt;
234 } SchMux;
235
236 typedef struct SchFilterIn {
237 SchedulerNode src;
238 SchedulerNode src_sched;
239 int send_finished;
240 int receive_finished;
241 } SchFilterIn;
242
243 typedef struct SchFilterOut {
244 SchedulerNode dst;
245 } SchFilterOut;
246
247 typedef struct SchFilterGraph {
248 const AVClass *class;
249
250 SchFilterIn *inputs;
251 unsigned nb_inputs;
252 atomic_uint nb_inputs_finished_send;
253 unsigned nb_inputs_finished_receive;
254
255 SchFilterOut *outputs;
256 unsigned nb_outputs;
257
258 SchTask task;
259 // input queue, nb_inputs+1 streams
260 // last stream is control
261 ThreadQueue *queue;
262 SchWaiter waiter;
263
264 // protected by schedule_lock
265 unsigned best_input;
266 int task_exited;
267 } SchFilterGraph;
268
269 enum SchedulerState {
270 SCH_STATE_UNINIT,
271 SCH_STATE_STARTED,
272 SCH_STATE_STOPPED,
273 };
274
275 struct Scheduler {
276 const AVClass *class;
277
278 SchDemux *demux;
279 unsigned nb_demux;
280
281 SchMux *mux;
282 unsigned nb_mux;
283
284 unsigned nb_mux_ready;
285 pthread_mutex_t mux_ready_lock;
286
287 unsigned nb_mux_done;
288 unsigned task_failed;
289 pthread_mutex_t finish_lock;
290 pthread_cond_t finish_cond;
291
292
293 SchDec *dec;
294 unsigned nb_dec;
295
296 SchEnc *enc;
297 unsigned nb_enc;
298
299 SchSyncQueue *sq_enc;
300 unsigned nb_sq_enc;
301
302 SchFilterGraph *filters;
303 unsigned nb_filters;
304
305 char *sdp_filename;
306 int sdp_auto;
307
308 enum SchedulerState state;
309 atomic_int terminate;
310
311 pthread_mutex_t schedule_lock;
312
313 atomic_int_least64_t last_dts;
314 };
315
316 /**
317 * Wait until this task is allowed to proceed.
318 *
319 * @retval 0 the caller should proceed
320 * @retval 1 the caller should terminate
321 */
322 500523 static int waiter_wait(Scheduler *sch, SchWaiter *w)
323 {
324 int terminate;
325
326
2/2
✓ Branch 0 taken 500251 times.
✓ Branch 1 taken 272 times.
500523 if (!atomic_load(&w->choked))
327 500251 return 0;
328
329 272 pthread_mutex_lock(&w->lock);
330
331
4/4
✓ Branch 0 taken 279 times.
✓ Branch 1 taken 231 times.
✓ Branch 2 taken 238 times.
✓ Branch 3 taken 41 times.
510 while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
332 238 pthread_cond_wait(&w->cond, &w->lock);
333
334 272 terminate = atomic_load(&sch->terminate);
335
336 272 pthread_mutex_unlock(&w->lock);
337
338 272 return terminate;
339 }
340
341 33482 static void waiter_set(SchWaiter *w, int choked)
342 {
343 33482 pthread_mutex_lock(&w->lock);
344
345 33482 atomic_store(&w->choked, choked);
346 33482 pthread_cond_signal(&w->cond);
347
348 33482 pthread_mutex_unlock(&w->lock);
349 33482 }
350
351 15087 static int waiter_init(SchWaiter *w)
352 {
353 int ret;
354
355 15087 atomic_init(&w->choked, 0);
356
357 15087 ret = pthread_mutex_init(&w->lock, NULL);
358
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15087 times.
15087 if (ret)
359 return AVERROR(ret);
360
361 15087 ret = pthread_cond_init(&w->cond, NULL);
362
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15087 times.
15087 if (ret)
363 return AVERROR(ret);
364
365 15087 return 0;
366 }
367
368 15087 static void waiter_uninit(SchWaiter *w)
369 {
370 15087 pthread_mutex_destroy(&w->lock);
371 15087 pthread_cond_destroy(&w->cond);
372 15087 }
373
374 30975 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
375 enum QueueType type)
376 {
377 ThreadQueue *tq;
378
379
1/2
✓ Branch 0 taken 30975 times.
✗ Branch 1 not taken.
30975 if (queue_size <= 0) {
380
2/2
✓ Branch 0 taken 15920 times.
✓ Branch 1 taken 15055 times.
30975 if (type == QUEUE_FRAMES)
381 15920 queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
382 else
383 15055 queue_size = DEFAULT_PACKET_THREAD_QUEUE_SIZE;
384 }
385
386
2/2
✓ Branch 0 taken 15920 times.
✓ Branch 1 taken 15055 times.
30975 if (type == QUEUE_FRAMES) {
387 // This queue length is used in the decoder code to ensure that
388 // there are enough entries in fixed-size frame pools to account
389 // for frames held in queues inside the ffmpeg utility. If this
390 // can ever dynamically change then the corresponding decode
391 // code needs to be updated as well.
392
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15920 times.
15920 av_assert0(queue_size == DEFAULT_FRAME_THREAD_QUEUE_SIZE);
393 }
394
395 30975 tq = tq_alloc(nb_streams, queue_size,
396 (type == QUEUE_PACKETS) ? THREAD_QUEUE_PACKETS : THREAD_QUEUE_FRAMES);
397
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 30975 times.
30975 if (!tq)
398 return AVERROR(ENOMEM);
399
400 30975 *ptq = tq;
401 30975 return 0;
402 }
403
404 static void *task_wrapper(void *arg);
405
406 38172 static int task_start(SchTask *task)
407 {
408 int ret;
409
410 38172 av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
411
412
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38172 times.
38172 av_assert0(!task->thread_running);
413
414 38172 ret = pthread_create(&task->thread, NULL, task_wrapper, task);
415
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38172 times.
38172 if (ret) {
416 av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
417 strerror(ret));
418 return AVERROR(ret);
419 }
420
421 38172 task->thread_running = 1;
422 38172 return 0;
423 }
424
425 38186 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
426 SchThreadFunc func, void *func_arg)
427 {
428 38186 task->parent = sch;
429
430 38186 task->node.type = type;
431 38186 task->node.idx = idx;
432
433 38186 task->func = func;
434 38186 task->func_arg = func_arg;
435 38186 }
436
437 546771 static int64_t trailing_dts(const Scheduler *sch, int count_finished)
438 {
439 546771 int64_t min_dts = INT64_MAX;
440
441
2/2
✓ Branch 0 taken 547023 times.
✓ Branch 1 taken 531247 times.
1078270 for (unsigned i = 0; i < sch->nb_mux; i++) {
442 547023 const SchMux *mux = &sch->mux[i];
443
444
2/2
✓ Branch 0 taken 604870 times.
✓ Branch 1 taken 531499 times.
1136369 for (unsigned j = 0; j < mux->nb_streams; j++) {
445 604870 const SchMuxStream *ms = &mux->streams[j];
446
447
4/4
✓ Branch 0 taken 47146 times.
✓ Branch 1 taken 557724 times.
✓ Branch 2 taken 38437 times.
✓ Branch 3 taken 8709 times.
604870 if (ms->source_finished && !count_finished)
448 38437 continue;
449
2/2
✓ Branch 0 taken 15524 times.
✓ Branch 1 taken 550909 times.
566433 if (ms->last_dts == AV_NOPTS_VALUE)
450 15524 return AV_NOPTS_VALUE;
451
452 550909 min_dts = FFMIN(min_dts, ms->last_dts);
453 }
454 }
455
456
2/2
✓ Branch 0 taken 502242 times.
✓ Branch 1 taken 29005 times.
531247 return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
457 }
458
459 8261 void sch_free(Scheduler **psch)
460 {
461 8261 Scheduler *sch = *psch;
462
463
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8261 times.
8261 if (!sch)
464 return;
465
466 8261 sch_stop(sch, NULL);
467
468
2/2
✓ Branch 0 taken 7211 times.
✓ Branch 1 taken 8261 times.
15472 for (unsigned i = 0; i < sch->nb_demux; i++) {
469 7211 SchDemux *d = &sch->demux[i];
470
471
2/2
✓ Branch 0 taken 7467 times.
✓ Branch 1 taken 7211 times.
14678 for (unsigned j = 0; j < d->nb_streams; j++) {
472 7467 SchDemuxStream *ds = &d->streams[j];
473 7467 av_freep(&ds->dst);
474 7467 av_freep(&ds->dst_finished);
475 }
476 7211 av_freep(&d->streams);
477
478 7211 av_packet_free(&d->send_pkt);
479
480 7211 waiter_uninit(&d->waiter);
481 }
482 8261 av_freep(&sch->demux);
483
484
2/2
✓ Branch 0 taken 8262 times.
✓ Branch 1 taken 8261 times.
16523 for (unsigned i = 0; i < sch->nb_mux; i++) {
485 8262 SchMux *mux = &sch->mux[i];
486
487
2/2
✓ Branch 0 taken 8745 times.
✓ Branch 1 taken 8262 times.
17007 for (unsigned j = 0; j < mux->nb_streams; j++) {
488 8745 SchMuxStream *ms = &mux->streams[j];
489
490
1/2
✓ Branch 0 taken 8745 times.
✗ Branch 1 not taken.
8745 if (ms->pre_mux_queue.fifo) {
491 AVPacket *pkt;
492
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 8745 times.
8745 while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
493 av_packet_free(&pkt);
494 8745 av_fifo_freep2(&ms->pre_mux_queue.fifo);
495 }
496
497 8745 av_freep(&ms->sub_heartbeat_dst);
498 }
499 8262 av_freep(&mux->streams);
500
501 8262 av_packet_free(&mux->sub_heartbeat_pkt);
502
503 8262 tq_free(&mux->queue);
504 }
505 8261 av_freep(&sch->mux);
506
507
2/2
✓ Branch 0 taken 6793 times.
✓ Branch 1 taken 8261 times.
15054 for (unsigned i = 0; i < sch->nb_dec; i++) {
508 6793 SchDec *dec = &sch->dec[i];
509
510 6793 tq_free(&dec->queue);
511
512 6793 av_thread_message_queue_free(&dec->queue_end_ts);
513
514
2/2
✓ Branch 0 taken 6799 times.
✓ Branch 1 taken 6793 times.
13592 for (unsigned j = 0; j < dec->nb_outputs; j++) {
515 6799 SchDecOutput *o = &dec->outputs[j];
516
517 6799 av_freep(&o->dst);
518 6799 av_freep(&o->dst_finished);
519 }
520
521 6793 av_freep(&dec->outputs);
522
523 6793 av_frame_free(&dec->send_frame);
524 }
525 8261 av_freep(&sch->dec);
526
527
2/2
✓ Branch 0 taken 8044 times.
✓ Branch 1 taken 8261 times.
16305 for (unsigned i = 0; i < sch->nb_enc; i++) {
528 8044 SchEnc *enc = &sch->enc[i];
529
530 8044 tq_free(&enc->queue);
531
532 8044 av_packet_free(&enc->send_pkt);
533
534 8044 av_freep(&enc->dst);
535 8044 av_freep(&enc->dst_finished);
536 }
537 8261 av_freep(&sch->enc);
538
539
2/2
✓ Branch 0 taken 3085 times.
✓ Branch 1 taken 8261 times.
11346 for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
540 3085 SchSyncQueue *sq = &sch->sq_enc[i];
541 3085 sq_free(&sq->sq);
542 3085 av_frame_free(&sq->frame);
543 3085 pthread_mutex_destroy(&sq->lock);
544 3085 av_freep(&sq->enc_idx);
545 }
546 8261 av_freep(&sch->sq_enc);
547
548
2/2
✓ Branch 0 taken 7876 times.
✓ Branch 1 taken 8261 times.
16137 for (unsigned i = 0; i < sch->nb_filters; i++) {
549 7876 SchFilterGraph *fg = &sch->filters[i];
550
551 7876 tq_free(&fg->queue);
552
553 7876 av_freep(&fg->inputs);
554 7876 av_freep(&fg->outputs);
555
556 7876 waiter_uninit(&fg->waiter);
557 }
558 8261 av_freep(&sch->filters);
559
560 8261 av_freep(&sch->sdp_filename);
561
562 8261 pthread_mutex_destroy(&sch->schedule_lock);
563
564 8261 pthread_mutex_destroy(&sch->mux_ready_lock);
565
566 8261 pthread_mutex_destroy(&sch->finish_lock);
567 8261 pthread_cond_destroy(&sch->finish_cond);
568
569 8261 av_freep(psch);
570 }
571
572 static const AVClass scheduler_class = {
573 .class_name = "Scheduler",
574 .version = LIBAVUTIL_VERSION_INT,
575 };
576
577 8261 Scheduler *sch_alloc(void)
578 {
579 Scheduler *sch;
580 int ret;
581
582 8261 sch = av_mallocz(sizeof(*sch));
583
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8261 times.
8261 if (!sch)
584 return NULL;
585
586 8261 sch->class = &scheduler_class;
587 8261 sch->sdp_auto = 1;
588
589 8261 ret = pthread_mutex_init(&sch->schedule_lock, NULL);
590
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8261 times.
8261 if (ret)
591 goto fail;
592
593 8261 ret = pthread_mutex_init(&sch->mux_ready_lock, NULL);
594
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8261 times.
8261 if (ret)
595 goto fail;
596
597 8261 ret = pthread_mutex_init(&sch->finish_lock, NULL);
598
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8261 times.
8261 if (ret)
599 goto fail;
600
601 8261 ret = pthread_cond_init(&sch->finish_cond, NULL);
602
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8261 times.
8261 if (ret)
603 goto fail;
604
605 8261 return sch;
606 fail:
607 sch_free(&sch);
608 return NULL;
609 }
610
611 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
612 {
613 av_freep(&sch->sdp_filename);
614 sch->sdp_filename = av_strdup(sdp_filename);
615 return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
616 }
617
618 static const AVClass sch_mux_class = {
619 .class_name = "SchMux",
620 .version = LIBAVUTIL_VERSION_INT,
621 .parent_log_context_offset = offsetof(SchMux, task.func_arg),
622 };
623
624 8262 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
625 void *arg, int sdp_auto, unsigned thread_queue_size)
626 {
627 8262 const unsigned idx = sch->nb_mux;
628
629 SchMux *mux;
630 int ret;
631
632 8262 ret = GROW_ARRAY(sch->mux, sch->nb_mux);
633
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8262 times.
8262 if (ret < 0)
634 return ret;
635
636 8262 mux = &sch->mux[idx];
637 8262 mux->class = &sch_mux_class;
638 8262 mux->init = init;
639 8262 mux->queue_size = thread_queue_size;
640
641 8262 task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
642
643 8262 sch->sdp_auto &= sdp_auto;
644
645 8262 return idx;
646 }
647
648 8745 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
649 {
650 SchMux *mux;
651 SchMuxStream *ms;
652 unsigned stream_idx;
653 int ret;
654
655
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8745 times.
8745 av_assert0(mux_idx < sch->nb_mux);
656 8745 mux = &sch->mux[mux_idx];
657
658 8745 ret = GROW_ARRAY(mux->streams, mux->nb_streams);
659
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8745 times.
8745 if (ret < 0)
660 return ret;
661 8745 stream_idx = mux->nb_streams - 1;
662
663 8745 ms = &mux->streams[stream_idx];
664
665 8745 ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
666
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8745 times.
8745 if (!ms->pre_mux_queue.fifo)
667 return AVERROR(ENOMEM);
668
669 8745 ms->last_dts = AV_NOPTS_VALUE;
670
671 8745 return stream_idx;
672 }
673
674 static const AVClass sch_demux_class = {
675 .class_name = "SchDemux",
676 .version = LIBAVUTIL_VERSION_INT,
677 .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
678 };
679
680 7211 int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
681 {
682 7211 const unsigned idx = sch->nb_demux;
683
684 SchDemux *d;
685 int ret;
686
687 7211 ret = GROW_ARRAY(sch->demux, sch->nb_demux);
688
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7211 times.
7211 if (ret < 0)
689 return ret;
690
691 7211 d = &sch->demux[idx];
692
693 7211 task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
694
695 7211 d->class = &sch_demux_class;
696 7211 d->send_pkt = av_packet_alloc();
697
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7211 times.
7211 if (!d->send_pkt)
698 return AVERROR(ENOMEM);
699
700 7211 ret = waiter_init(&d->waiter);
701
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7211 times.
7211 if (ret < 0)
702 return ret;
703
704 7211 return idx;
705 }
706
707 7467 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
708 {
709 SchDemux *d;
710 int ret;
711
712
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7467 times.
7467 av_assert0(demux_idx < sch->nb_demux);
713 7467 d = &sch->demux[demux_idx];
714
715 7467 ret = GROW_ARRAY(d->streams, d->nb_streams);
716
1/2
✓ Branch 0 taken 7467 times.
✗ Branch 1 not taken.
7467 return ret < 0 ? ret : d->nb_streams - 1;
717 }
718
719 6799 int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
720 {
721 SchDec *dec;
722 int ret;
723
724
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6799 times.
6799 av_assert0(dec_idx < sch->nb_dec);
725 6799 dec = &sch->dec[dec_idx];
726
727 6799 ret = GROW_ARRAY(dec->outputs, dec->nb_outputs);
728
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6799 times.
6799 if (ret < 0)
729 return ret;
730
731 6799 return dec->nb_outputs - 1;
732 }
733
734 static const AVClass sch_dec_class = {
735 .class_name = "SchDec",
736 .version = LIBAVUTIL_VERSION_INT,
737 .parent_log_context_offset = offsetof(SchDec, task.func_arg),
738 };
739
740 6793 int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
741 {
742 6793 const unsigned idx = sch->nb_dec;
743
744 SchDec *dec;
745 int ret;
746
747 6793 ret = GROW_ARRAY(sch->dec, sch->nb_dec);
748
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6793 times.
6793 if (ret < 0)
749 return ret;
750
751 6793 dec = &sch->dec[idx];
752
753 6793 task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
754
755 6793 dec->class = &sch_dec_class;
756 6793 dec->send_frame = av_frame_alloc();
757
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6793 times.
6793 if (!dec->send_frame)
758 return AVERROR(ENOMEM);
759
760 6793 ret = sch_add_dec_output(sch, idx);
761
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6793 times.
6793 if (ret < 0)
762 return ret;
763
764 6793 ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
765
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6793 times.
6793 if (ret < 0)
766 return ret;
767
768
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6792 times.
6793 if (send_end_ts) {
769 1 ret = av_thread_message_queue_alloc(&dec->queue_end_ts, 1, sizeof(Timestamp));
770
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ret < 0)
771 return ret;
772 }
773
774 6793 return idx;
775 }
776
777 static const AVClass sch_enc_class = {
778 .class_name = "SchEnc",
779 .version = LIBAVUTIL_VERSION_INT,
780 .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
781 };
782
783 8044 int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
784 int (*open_cb)(void *opaque, const AVFrame *frame))
785 {
786 8044 const unsigned idx = sch->nb_enc;
787
788 SchEnc *enc;
789 int ret;
790
791 8044 ret = GROW_ARRAY(sch->enc, sch->nb_enc);
792
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8044 times.
8044 if (ret < 0)
793 return ret;
794
795 8044 enc = &sch->enc[idx];
796
797 8044 enc->class = &sch_enc_class;
798 8044 enc->open_cb = open_cb;
799 8044 enc->sq_idx[0] = -1;
800 8044 enc->sq_idx[1] = -1;
801
802 8044 task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
803
804 8044 enc->send_pkt = av_packet_alloc();
805
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8044 times.
8044 if (!enc->send_pkt)
806 return AVERROR(ENOMEM);
807
808 8044 ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
809
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8044 times.
8044 if (ret < 0)
810 return ret;
811
812 8044 return idx;
813 }
814
815 static const AVClass sch_fg_class = {
816 .class_name = "SchFilterGraph",
817 .version = LIBAVUTIL_VERSION_INT,
818 .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
819 };
820
821 7876 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
822 SchThreadFunc func, void *ctx)
823 {
824 7876 const unsigned idx = sch->nb_filters;
825
826 SchFilterGraph *fg;
827 int ret;
828
829 7876 ret = GROW_ARRAY(sch->filters, sch->nb_filters);
830
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7876 times.
7876 if (ret < 0)
831 return ret;
832 7876 fg = &sch->filters[idx];
833
834 7876 fg->class = &sch_fg_class;
835
836 7876 task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
837
838
2/2
✓ Branch 0 taken 6737 times.
✓ Branch 1 taken 1139 times.
7876 if (nb_inputs) {
839 6737 fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
840
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6737 times.
6737 if (!fg->inputs)
841 return AVERROR(ENOMEM);
842 6737 fg->nb_inputs = nb_inputs;
843 }
844
845
1/2
✓ Branch 0 taken 7876 times.
✗ Branch 1 not taken.
7876 if (nb_outputs) {
846 7876 fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
847
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7876 times.
7876 if (!fg->outputs)
848 return AVERROR(ENOMEM);
849 7876 fg->nb_outputs = nb_outputs;
850 }
851
852 7876 ret = waiter_init(&fg->waiter);
853
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7876 times.
7876 if (ret < 0)
854 return ret;
855
856 7876 ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
857
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7876 times.
7876 if (ret < 0)
858 return ret;
859
860 7876 return idx;
861 }
862
863 3085 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
864 {
865 SchSyncQueue *sq;
866 int ret;
867
868 3085 ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
869
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3085 times.
3085 if (ret < 0)
870 return ret;
871 3085 sq = &sch->sq_enc[sch->nb_sq_enc - 1];
872
873 3085 sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
874
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3085 times.
3085 if (!sq->sq)
875 return AVERROR(ENOMEM);
876
877 3085 sq->frame = av_frame_alloc();
878
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3085 times.
3085 if (!sq->frame)
879 return AVERROR(ENOMEM);
880
881 3085 ret = pthread_mutex_init(&sq->lock, NULL);
882
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3085 times.
3085 if (ret)
883 return AVERROR(ret);
884
885 3085 return sq - sch->sq_enc;
886 }
887
888 3147 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
889 int limiting, uint64_t max_frames)
890 {
891 SchSyncQueue *sq;
892 SchEnc *enc;
893 int ret;
894
895
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3147 times.
3147 av_assert0(sq_idx < sch->nb_sq_enc);
896 3147 sq = &sch->sq_enc[sq_idx];
897
898
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3147 times.
3147 av_assert0(enc_idx < sch->nb_enc);
899 3147 enc = &sch->enc[enc_idx];
900
901 3147 ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
902
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3147 times.
3147 if (ret < 0)
903 return ret;
904 3147 sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
905
906 3147 ret = sq_add_stream(sq->sq, limiting);
907
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3147 times.
3147 if (ret < 0)
908 return ret;
909
910 3147 enc->sq_idx[0] = sq_idx;
911 3147 enc->sq_idx[1] = ret;
912
913
2/2
✓ Branch 0 taken 2959 times.
✓ Branch 1 taken 188 times.
3147 if (max_frames != INT64_MAX)
914 2959 sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
915
916 3147 return 0;
917 }
918
919 30404 int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
920 {
921 int ret;
922
923
4/5
✓ Branch 0 taken 7493 times.
✓ Branch 1 taken 6860 times.
✓ Branch 2 taken 8006 times.
✓ Branch 3 taken 8045 times.
✗ Branch 4 not taken.
30404 switch (src.type) {
924 7493 case SCH_NODE_TYPE_DEMUX: {
925 SchDemuxStream *ds;
926
927
2/4
✓ Branch 0 taken 7493 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7493 times.
7493 av_assert0(src.idx < sch->nb_demux &&
928 src.idx_stream < sch->demux[src.idx].nb_streams);
929 7493 ds = &sch->demux[src.idx].streams[src.idx_stream];
930
931 7493 ret = GROW_ARRAY(ds->dst, ds->nb_dst);
932
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7493 times.
7493 if (ret < 0)
933 return ret;
934
935 7493 ds->dst[ds->nb_dst - 1] = dst;
936
937 // demuxed packets go to decoding or streamcopy
938
2/3
✓ Branch 0 taken 6792 times.
✓ Branch 1 taken 701 times.
✗ Branch 2 not taken.
7493 switch (dst.type) {
939 6792 case SCH_NODE_TYPE_DEC: {
940 SchDec *dec;
941
942
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6792 times.
6792 av_assert0(dst.idx < sch->nb_dec);
943 6792 dec = &sch->dec[dst.idx];
944
945
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6792 times.
6792 av_assert0(!dec->src.type);
946 6792 dec->src = src;
947 6792 break;
948 }
949 701 case SCH_NODE_TYPE_MUX: {
950 SchMuxStream *ms;
951
952
2/4
✓ Branch 0 taken 701 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 701 times.
701 av_assert0(dst.idx < sch->nb_mux &&
953 dst.idx_stream < sch->mux[dst.idx].nb_streams);
954 701 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
955
956
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 701 times.
701 av_assert0(!ms->src.type);
957 701 ms->src = src;
958
959 701 break;
960 }
961 default: av_assert0(0);
962 }
963
964 7493 break;
965 }
966 6860 case SCH_NODE_TYPE_DEC: {
967 SchDec *dec;
968 SchDecOutput *o;
969
970
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6860 times.
6860 av_assert0(src.idx < sch->nb_dec);
971 6860 dec = &sch->dec[src.idx];
972
973
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6860 times.
6860 av_assert0(src.idx_stream < dec->nb_outputs);
974 6860 o = &dec->outputs[src.idx_stream];
975
976 6860 ret = GROW_ARRAY(o->dst, o->nb_dst);
977
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6860 times.
6860 if (ret < 0)
978 return ret;
979
980 6860 o->dst[o->nb_dst - 1] = dst;
981
982 // decoded frames go to filters or encoding
983
2/3
✓ Branch 0 taken 6822 times.
✓ Branch 1 taken 38 times.
✗ Branch 2 not taken.
6860 switch (dst.type) {
984 6822 case SCH_NODE_TYPE_FILTER_IN: {
985 SchFilterIn *fi;
986
987
2/4
✓ Branch 0 taken 6822 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6822 times.
6822 av_assert0(dst.idx < sch->nb_filters &&
988 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
989 6822 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
990
991
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6822 times.
6822 av_assert0(!fi->src.type);
992 6822 fi->src = src;
993 6822 break;
994 }
995 38 case SCH_NODE_TYPE_ENC: {
996 SchEnc *enc;
997
998
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 av_assert0(dst.idx < sch->nb_enc);
999 38 enc = &sch->enc[dst.idx];
1000
1001
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 av_assert0(!enc->src.type);
1002 38 enc->src = src;
1003 38 break;
1004 }
1005 default: av_assert0(0);
1006 }
1007
1008 6860 break;
1009 }
1010 8006 case SCH_NODE_TYPE_FILTER_OUT: {
1011 SchFilterOut *fo;
1012
1013
2/4
✓ Branch 0 taken 8006 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 8006 times.
8006 av_assert0(src.idx < sch->nb_filters &&
1014 src.idx_stream < sch->filters[src.idx].nb_outputs);
1015 8006 fo = &sch->filters[src.idx].outputs[src.idx_stream];
1016
1017
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8006 times.
8006 av_assert0(!fo->dst.type);
1018 8006 fo->dst = dst;
1019
1020 // filtered frames go to encoding or another filtergraph
1021
1/3
✓ Branch 0 taken 8006 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
8006 switch (dst.type) {
1022 8006 case SCH_NODE_TYPE_ENC: {
1023 SchEnc *enc;
1024
1025
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8006 times.
8006 av_assert0(dst.idx < sch->nb_enc);
1026 8006 enc = &sch->enc[dst.idx];
1027
1028
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8006 times.
8006 av_assert0(!enc->src.type);
1029 8006 enc->src = src;
1030 8006 break;
1031 }
1032 case SCH_NODE_TYPE_FILTER_IN: {
1033 SchFilterIn *fi;
1034
1035 av_assert0(dst.idx < sch->nb_filters &&
1036 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1037 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1038
1039 av_assert0(!fi->src.type);
1040 fi->src = src;
1041 break;
1042 }
1043 default: av_assert0(0);
1044 }
1045
1046
1047 8006 break;
1048 }
1049 8045 case SCH_NODE_TYPE_ENC: {
1050 SchEnc *enc;
1051
1052
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8045 times.
8045 av_assert0(src.idx < sch->nb_enc);
1053 8045 enc = &sch->enc[src.idx];
1054
1055 8045 ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1056
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8045 times.
8045 if (ret < 0)
1057 return ret;
1058
1059 8045 enc->dst[enc->nb_dst - 1] = dst;
1060
1061 // encoding packets go to muxing or decoding
1062
2/3
✓ Branch 0 taken 8044 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
8045 switch (dst.type) {
1063 8044 case SCH_NODE_TYPE_MUX: {
1064 SchMuxStream *ms;
1065
1066
2/4
✓ Branch 0 taken 8044 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 8044 times.
8044 av_assert0(dst.idx < sch->nb_mux &&
1067 dst.idx_stream < sch->mux[dst.idx].nb_streams);
1068 8044 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1069
1070
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8044 times.
8044 av_assert0(!ms->src.type);
1071 8044 ms->src = src;
1072
1073 8044 break;
1074 }
1075 1 case SCH_NODE_TYPE_DEC: {
1076 SchDec *dec;
1077
1078
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(dst.idx < sch->nb_dec);
1079 1 dec = &sch->dec[dst.idx];
1080
1081
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(!dec->src.type);
1082 1 dec->src = src;
1083
1084 1 break;
1085 }
1086 default: av_assert0(0);
1087 }
1088
1089 8045 break;
1090 }
1091 default: av_assert0(0);
1092 }
1093
1094 30404 return 0;
1095 }
1096
1097 8262 static int mux_task_start(SchMux *mux)
1098 {
1099 8262 int ret = 0;
1100
1101 8262 ret = task_start(&mux->task);
1102
1/2
✓ Branch 0 taken 8262 times.
✗ Branch 1 not taken.
8262 if (ret < 0)
1103 return ret;
1104
1105 /* flush the pre-muxing queues */
1106 3390 while (1) {
1107 11652 int min_stream = -1;
1108 11652 Timestamp min_ts = { .ts = AV_NOPTS_VALUE };
1109
1110 AVPacket *pkt;
1111
1112 // find the stream with the earliest dts or EOF in pre-muxing queue
1113
2/2
✓ Branch 0 taken 22167 times.
✓ Branch 1 taken 11537 times.
33704 for (unsigned i = 0; i < mux->nb_streams; i++) {
1114 22167 SchMuxStream *ms = &mux->streams[i];
1115
1116
2/2
✓ Branch 1 taken 16537 times.
✓ Branch 2 taken 5630 times.
22167 if (av_fifo_peek(ms->pre_mux_queue.fifo, &pkt, 1, 0) < 0)
1117 16537 continue;
1118
1119
4/4
✓ Branch 0 taken 5528 times.
✓ Branch 1 taken 102 times.
✓ Branch 2 taken 13 times.
✓ Branch 3 taken 5515 times.
5630 if (!pkt || pkt->dts == AV_NOPTS_VALUE) {
1120 115 min_stream = i;
1121 115 break;
1122 }
1123
1124
4/4
✓ Branch 0 taken 2214 times.
✓ Branch 1 taken 3301 times.
✓ Branch 2 taken 38 times.
✓ Branch 3 taken 2176 times.
7729 if (min_ts.ts == AV_NOPTS_VALUE ||
1125 2214 av_compare_ts(min_ts.ts, min_ts.tb, pkt->dts, pkt->time_base) > 0) {
1126 3339 min_stream = i;
1127 3339 min_ts = (Timestamp){ .ts = pkt->dts, .tb = pkt->time_base };
1128 }
1129 }
1130
1131
2/2
✓ Branch 0 taken 3390 times.
✓ Branch 1 taken 8262 times.
11652 if (min_stream >= 0) {
1132 3390 SchMuxStream *ms = &mux->streams[min_stream];
1133
1134 3390 ret = av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1);
1135
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3390 times.
3390 av_assert0(ret >= 0);
1136
1137
2/2
✓ Branch 0 taken 3288 times.
✓ Branch 1 taken 102 times.
3390 if (pkt) {
1138
2/2
✓ Branch 0 taken 3271 times.
✓ Branch 1 taken 17 times.
3288 if (!ms->init_eof)
1139 3271 ret = tq_send(mux->queue, min_stream, pkt);
1140 3288 av_packet_free(&pkt);
1141
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 3287 times.
3288 if (ret == AVERROR_EOF)
1142 1 ms->init_eof = 1;
1143
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3287 times.
3287 else if (ret < 0)
1144 return ret;
1145 } else
1146 102 tq_send_finish(mux->queue, min_stream);
1147
1148 3390 continue;
1149 }
1150
1151 8262 break;
1152 }
1153
1154 8262 atomic_store(&mux->mux_started, 1);
1155
1156 8262 return 0;
1157 }
1158
1159 int print_sdp(const char *filename);
1160
1161 8262 static int mux_init(Scheduler *sch, SchMux *mux)
1162 {
1163 int ret;
1164
1165 8262 ret = mux->init(mux->task.func_arg);
1166
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8262 times.
8262 if (ret < 0)
1167 return ret;
1168
1169 8262 sch->nb_mux_ready++;
1170
1171
2/4
✓ Branch 0 taken 8262 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 8262 times.
8262 if (sch->sdp_filename || sch->sdp_auto) {
1172 if (sch->nb_mux_ready < sch->nb_mux)
1173 return 0;
1174
1175 ret = print_sdp(sch->sdp_filename);
1176 if (ret < 0) {
1177 av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1178 return ret;
1179 }
1180
1181 /* SDP is written only after all the muxers are ready, so now we
1182 * start ALL the threads */
1183 for (unsigned i = 0; i < sch->nb_mux; i++) {
1184 ret = mux_task_start(&sch->mux[i]);
1185 if (ret < 0)
1186 return ret;
1187 }
1188 } else {
1189 8262 ret = mux_task_start(mux);
1190
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8262 times.
8262 if (ret < 0)
1191 return ret;
1192 }
1193
1194 8262 return 0;
1195 }
1196
1197 8745 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1198 size_t data_threshold, int max_packets)
1199 {
1200 SchMux *mux;
1201 SchMuxStream *ms;
1202
1203
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8745 times.
8745 av_assert0(mux_idx < sch->nb_mux);
1204 8745 mux = &sch->mux[mux_idx];
1205
1206
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8745 times.
8745 av_assert0(stream_idx < mux->nb_streams);
1207 8745 ms = &mux->streams[stream_idx];
1208
1209 8745 ms->pre_mux_queue.max_packets = max_packets;
1210 8745 ms->pre_mux_queue.data_threshold = data_threshold;
1211 8745 }
1212
1213 8745 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1214 {
1215 SchMux *mux;
1216 8745 int ret = 0;
1217
1218
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8745 times.
8745 av_assert0(mux_idx < sch->nb_mux);
1219 8745 mux = &sch->mux[mux_idx];
1220
1221
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8745 times.
8745 av_assert0(stream_idx < mux->nb_streams);
1222
1223 8745 pthread_mutex_lock(&sch->mux_ready_lock);
1224
1225
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8745 times.
8745 av_assert0(mux->nb_streams_ready < mux->nb_streams);
1226
1227 // this may be called during initialization - do not start
1228 // threads before sch_start() is called
1229
2/2
✓ Branch 0 taken 8262 times.
✓ Branch 1 taken 483 times.
8745 if (++mux->nb_streams_ready == mux->nb_streams &&
1230
2/2
✓ Branch 0 taken 7797 times.
✓ Branch 1 taken 465 times.
8262 sch->state >= SCH_STATE_STARTED)
1231 7797 ret = mux_init(sch, mux);
1232
1233 8745 pthread_mutex_unlock(&sch->mux_ready_lock);
1234
1235 8745 return ret;
1236 }
1237
1238 1 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1239 unsigned dec_idx)
1240 {
1241 SchMux *mux;
1242 SchMuxStream *ms;
1243 1 int ret = 0;
1244
1245
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(mux_idx < sch->nb_mux);
1246 1 mux = &sch->mux[mux_idx];
1247
1248
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(stream_idx < mux->nb_streams);
1249 1 ms = &mux->streams[stream_idx];
1250
1251 1 ret = GROW_ARRAY(ms->sub_heartbeat_dst, ms->nb_sub_heartbeat_dst);
1252
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ret < 0)
1253 return ret;
1254
1255
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(dec_idx < sch->nb_dec);
1256 1 ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1257
1258
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!mux->sub_heartbeat_pkt) {
1259 1 mux->sub_heartbeat_pkt = av_packet_alloc();
1260
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!mux->sub_heartbeat_pkt)
1261 return AVERROR(ENOMEM);
1262 }
1263
1264 1 return 0;
1265 }
1266
1267 538200 static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
1268 {
1269 428498 while (1) {
1270 SchFilterGraph *fg;
1271
1272 // fed directly by a demuxer (i.e. not through a filtergraph)
1273
2/2
✓ Branch 0 taken 505921 times.
✓ Branch 1 taken 460777 times.
966698 if (src.type == SCH_NODE_TYPE_DEMUX) {
1274 505921 sch->demux[src.idx].waiter.choked_next = 0;
1275 505921 return;
1276 }
1277
1278
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 460777 times.
460777 av_assert0(src.type == SCH_NODE_TYPE_FILTER_OUT);
1279 460777 fg = &sch->filters[src.idx];
1280
1281 // the filtergraph contains internal sources and
1282 // requested to be scheduled directly
1283
2/2
✓ Branch 0 taken 32279 times.
✓ Branch 1 taken 428498 times.
460777 if (fg->best_input == fg->nb_inputs) {
1284 32279 fg->waiter.choked_next = 0;
1285 32279 return;
1286 }
1287
1288 428498 src = fg->inputs[fg->best_input].src_sched;
1289 }
1290 }
1291
1292 542914 static void schedule_update_locked(Scheduler *sch)
1293 {
1294 int64_t dts;
1295 542914 int have_unchoked = 0;
1296
1297 // on termination request all waiters are choked,
1298 // we are not to unchoke them
1299
2/2
✓ Branch 0 taken 4403 times.
✓ Branch 1 taken 538511 times.
542914 if (atomic_load(&sch->terminate))
1300 4403 return;
1301
1302 538511 dts = trailing_dts(sch, 0);
1303
1304 538511 atomic_store(&sch->last_dts, dts);
1305
1306 // initialize our internal state
1307
2/2
✓ Branch 0 taken 1077022 times.
✓ Branch 1 taken 538511 times.
1615533 for (unsigned type = 0; type < 2; type++)
1308
4/4
✓ Branch 0 taken 1024541 times.
✓ Branch 1 taken 1051005 times.
✓ Branch 2 taken 998524 times.
✓ Branch 3 taken 1077022 times.
2075546 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1309
2/2
✓ Branch 0 taken 486030 times.
✓ Branch 1 taken 512494 times.
998524 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1310 998524 w->choked_prev = atomic_load(&w->choked);
1311 998524 w->choked_next = 1;
1312 }
1313
1314 // figure out the sources that are allowed to proceed
1315
2/2
✓ Branch 0 taken 538778 times.
✓ Branch 1 taken 538511 times.
1077289 for (unsigned i = 0; i < sch->nb_mux; i++) {
1316 538778 SchMux *mux = &sch->mux[i];
1317
1318
2/2
✓ Branch 0 taken 605177 times.
✓ Branch 1 taken 538778 times.
1143955 for (unsigned j = 0; j < mux->nb_streams; j++) {
1319 605177 SchMuxStream *ms = &mux->streams[j];
1320
1321 // unblock sources for output streams that are not finished
1322 // and not too far ahead of the trailing stream
1323
2/2
✓ Branch 0 taken 38686 times.
✓ Branch 1 taken 566491 times.
605177 if (ms->source_finished)
1324 38686 continue;
1325
4/4
✓ Branch 0 taken 31451 times.
✓ Branch 1 taken 535040 times.
✓ Branch 2 taken 10535 times.
✓ Branch 3 taken 20916 times.
566491 if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1326 10535 continue;
1327
4/4
✓ Branch 0 taken 535040 times.
✓ Branch 1 taken 20916 times.
✓ Branch 2 taken 17756 times.
✓ Branch 3 taken 517284 times.
555956 if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1328 17756 continue;
1329
1330 // resolve the source to unchoke
1331 538200 unchoke_for_stream(sch, ms->src_sched);
1332 538200 have_unchoked = 1;
1333 }
1334 }
1335
1336 // make sure to unchoke at least one source, if still available
1337
4/4
✓ Branch 0 taken 63506 times.
✓ Branch 1 taken 524482 times.
✓ Branch 2 taken 49477 times.
✓ Branch 3 taken 14029 times.
587988 for (unsigned type = 0; !have_unchoked && type < 2; type++)
1338
4/4
✓ Branch 0 taken 33845 times.
✓ Branch 1 taken 47750 times.
✓ Branch 2 taken 47094 times.
✓ Branch 3 taken 34501 times.
81595 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1339
2/2
✓ Branch 0 taken 19816 times.
✓ Branch 1 taken 27278 times.
47094 int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1340
2/2
✓ Branch 0 taken 19816 times.
✓ Branch 1 taken 27278 times.
47094 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1341
2/2
✓ Branch 0 taken 14976 times.
✓ Branch 1 taken 32118 times.
47094 if (!exited) {
1342 14976 w->choked_next = 0;
1343 14976 have_unchoked = 1;
1344 14976 break;
1345 }
1346 }
1347
1348
1349
2/2
✓ Branch 0 taken 1077022 times.
✓ Branch 1 taken 538511 times.
1615533 for (unsigned type = 0; type < 2; type++)
1350
4/4
✓ Branch 0 taken 1024541 times.
✓ Branch 1 taken 1051005 times.
✓ Branch 2 taken 998524 times.
✓ Branch 3 taken 1077022 times.
2075546 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1351
2/2
✓ Branch 0 taken 486030 times.
✓ Branch 1 taken 512494 times.
998524 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1352
2/2
✓ Branch 0 taken 18395 times.
✓ Branch 1 taken 980129 times.
998524 if (w->choked_prev != w->choked_next)
1353 18395 waiter_set(w, w->choked_next);
1354 }
1355
1356 }
1357
1358 enum {
1359 CYCLE_NODE_NEW = 0,
1360 CYCLE_NODE_STARTED,
1361 CYCLE_NODE_DONE,
1362 };
1363
1364 static int
1365 7876 check_acyclic_for_output(const Scheduler *sch, SchedulerNode src,
1366 uint8_t *filters_visited, SchedulerNode *filters_stack)
1367 {
1368 7876 unsigned nb_filters_stack = 0;
1369
1370 7876 memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1371
1372 6824 while (1) {
1373 14700 const SchFilterGraph *fg = &sch->filters[src.idx];
1374
1375 14700 filters_visited[src.idx] = CYCLE_NODE_STARTED;
1376
1377 // descend into every input, depth first
1378
2/2
✓ Branch 0 taken 6823 times.
✓ Branch 1 taken 7877 times.
14700 if (src.idx_stream < fg->nb_inputs) {
1379 6823 const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1380
1381 // connected to demuxer, no cycles possible
1382
2/2
✓ Branch 0 taken 6822 times.
✓ Branch 1 taken 1 times.
6823 if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX)
1383 6823 continue;
1384
1385 // otherwise connected to another filtergraph
1386
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(fi->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
1387
1388 // found a cycle
1389
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED)
1390 return AVERROR(EINVAL);
1391
1392 // place current position on stack and descend
1393
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(nb_filters_stack < sch->nb_filters);
1394 1 filters_stack[nb_filters_stack++] = src;
1395 1 src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 };
1396 1 continue;
1397 }
1398
1399 7877 filters_visited[src.idx] = CYCLE_NODE_DONE;
1400
1401 // previous search finished,
1402
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 7876 times.
7877 if (nb_filters_stack) {
1403 1 src = filters_stack[--nb_filters_stack];
1404 1 continue;
1405 }
1406 7876 return 0;
1407 }
1408 }
1409
1410 8260 static int check_acyclic(Scheduler *sch)
1411 {
1412 8260 uint8_t *filters_visited = NULL;
1413 8260 SchedulerNode *filters_stack = NULL;
1414
1415 8260 int ret = 0;
1416
1417
2/2
✓ Branch 0 taken 500 times.
✓ Branch 1 taken 7760 times.
8260 if (!sch->nb_filters)
1418 500 return 0;
1419
1420 7760 filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1421
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7760 times.
7760 if (!filters_visited)
1422 return AVERROR(ENOMEM);
1423
1424 7760 filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1425
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7760 times.
7760 if (!filters_stack) {
1426 ret = AVERROR(ENOMEM);
1427 goto fail;
1428 }
1429
1430 // trace the transcoding graph upstream from every filtegraph
1431
2/2
✓ Branch 0 taken 7876 times.
✓ Branch 1 taken 7760 times.
15636 for (unsigned i = 0; i < sch->nb_filters; i++) {
1432 7876 ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1433 filters_visited, filters_stack);
1434
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7876 times.
7876 if (ret < 0) {
1435 av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1436 goto fail;
1437 }
1438 }
1439
1440 7760 fail:
1441 7760 av_freep(&filters_visited);
1442 7760 av_freep(&filters_stack);
1443 7760 return ret;
1444 }
1445
1446 8260 static int start_prepare(Scheduler *sch)
1447 {
1448 int ret;
1449
1450
2/2
✓ Branch 0 taken 7211 times.
✓ Branch 1 taken 8260 times.
15471 for (unsigned i = 0; i < sch->nb_demux; i++) {
1451 7211 SchDemux *d = &sch->demux[i];
1452
1453
2/2
✓ Branch 0 taken 7467 times.
✓ Branch 1 taken 7211 times.
14678 for (unsigned j = 0; j < d->nb_streams; j++) {
1454 7467 SchDemuxStream *ds = &d->streams[j];
1455
1456
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7467 times.
7467 if (!ds->nb_dst) {
1457 av_log(d, AV_LOG_ERROR,
1458 "Demuxer stream %u not connected to any sink\n", j);
1459 return AVERROR(EINVAL);
1460 }
1461
1462 7467 ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1463
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7467 times.
7467 if (!ds->dst_finished)
1464 return AVERROR(ENOMEM);
1465 }
1466 }
1467
1468
2/2
✓ Branch 0 taken 6793 times.
✓ Branch 1 taken 8260 times.
15053 for (unsigned i = 0; i < sch->nb_dec; i++) {
1469 6793 SchDec *dec = &sch->dec[i];
1470
1471
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6793 times.
6793 if (!dec->src.type) {
1472 av_log(dec, AV_LOG_ERROR,
1473 "Decoder not connected to a source\n");
1474 return AVERROR(EINVAL);
1475 }
1476
1477
2/2
✓ Branch 0 taken 6799 times.
✓ Branch 1 taken 6793 times.
13592 for (unsigned j = 0; j < dec->nb_outputs; j++) {
1478 6799 SchDecOutput *o = &dec->outputs[j];
1479
1480
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6799 times.
6799 if (!o->nb_dst) {
1481 av_log(dec, AV_LOG_ERROR,
1482 "Decoder output %u not connected to any sink\n", j);
1483 return AVERROR(EINVAL);
1484 }
1485
1486 6799 o->dst_finished = av_calloc(o->nb_dst, sizeof(*o->dst_finished));
1487
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6799 times.
6799 if (!o->dst_finished)
1488 return AVERROR(ENOMEM);
1489 }
1490 }
1491
1492
2/2
✓ Branch 0 taken 8044 times.
✓ Branch 1 taken 8260 times.
16304 for (unsigned i = 0; i < sch->nb_enc; i++) {
1493 8044 SchEnc *enc = &sch->enc[i];
1494
1495
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8044 times.
8044 if (!enc->src.type) {
1496 av_log(enc, AV_LOG_ERROR,
1497 "Encoder not connected to a source\n");
1498 return AVERROR(EINVAL);
1499 }
1500
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8044 times.
8044 if (!enc->nb_dst) {
1501 av_log(enc, AV_LOG_ERROR,
1502 "Encoder not connected to any sink\n");
1503 return AVERROR(EINVAL);
1504 }
1505
1506 8044 enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1507
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8044 times.
8044 if (!enc->dst_finished)
1508 return AVERROR(ENOMEM);
1509 }
1510
1511
2/2
✓ Branch 0 taken 8262 times.
✓ Branch 1 taken 8260 times.
16522 for (unsigned i = 0; i < sch->nb_mux; i++) {
1512 8262 SchMux *mux = &sch->mux[i];
1513
1514
2/2
✓ Branch 0 taken 8745 times.
✓ Branch 1 taken 8262 times.
17007 for (unsigned j = 0; j < mux->nb_streams; j++) {
1515 8745 SchMuxStream *ms = &mux->streams[j];
1516
1517
2/3
✓ Branch 0 taken 8044 times.
✓ Branch 1 taken 701 times.
✗ Branch 2 not taken.
8745 switch (ms->src.type) {
1518 8044 case SCH_NODE_TYPE_ENC: {
1519 8044 SchEnc *enc = &sch->enc[ms->src.idx];
1520
2/2
✓ Branch 0 taken 38 times.
✓ Branch 1 taken 8006 times.
8044 if (enc->src.type == SCH_NODE_TYPE_DEC) {
1521 38 ms->src_sched = sch->dec[enc->src.idx].src;
1522
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 av_assert0(ms->src_sched.type == SCH_NODE_TYPE_DEMUX);
1523 } else {
1524 8006 ms->src_sched = enc->src;
1525
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8006 times.
8006 av_assert0(ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
1526 }
1527 8044 break;
1528 }
1529 701 case SCH_NODE_TYPE_DEMUX:
1530 701 ms->src_sched = ms->src;
1531 701 break;
1532 default:
1533 av_log(mux, AV_LOG_ERROR,
1534 "Muxer stream #%u not connected to a source\n", j);
1535 return AVERROR(EINVAL);
1536 }
1537 }
1538
1539 8262 ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1540 QUEUE_PACKETS);
1541
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8262 times.
8262 if (ret < 0)
1542 return ret;
1543 }
1544
1545
2/2
✓ Branch 0 taken 7876 times.
✓ Branch 1 taken 8260 times.
16136 for (unsigned i = 0; i < sch->nb_filters; i++) {
1546 7876 SchFilterGraph *fg = &sch->filters[i];
1547
1548
2/2
✓ Branch 0 taken 6822 times.
✓ Branch 1 taken 7876 times.
14698 for (unsigned j = 0; j < fg->nb_inputs; j++) {
1549 6822 SchFilterIn *fi = &fg->inputs[j];
1550 SchDec *dec;
1551
1552
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6822 times.
6822 if (!fi->src.type) {
1553 av_log(fg, AV_LOG_ERROR,
1554 "Filtergraph input %u not connected to a source\n", j);
1555 return AVERROR(EINVAL);
1556 }
1557
1558
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6822 times.
6822 if (fi->src.type == SCH_NODE_TYPE_FILTER_OUT)
1559 fi->src_sched = fi->src;
1560 else {
1561
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6822 times.
6822 av_assert0(fi->src.type == SCH_NODE_TYPE_DEC);
1562 6822 dec = &sch->dec[fi->src.idx];
1563
1564
2/3
✓ Branch 0 taken 6821 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
6822 switch (dec->src.type) {
1565 6821 case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break;
1566 1 case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break;
1567 default: av_assert0(0);
1568 }
1569 }
1570 }
1571
1572
2/2
✓ Branch 0 taken 8006 times.
✓ Branch 1 taken 7876 times.
15882 for (unsigned j = 0; j < fg->nb_outputs; j++) {
1573 8006 SchFilterOut *fo = &fg->outputs[j];
1574
1575
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8006 times.
8006 if (!fo->dst.type) {
1576 av_log(fg, AV_LOG_ERROR,
1577 "Filtergraph %u output %u not connected to a sink\n", i, j);
1578 return AVERROR(EINVAL);
1579 }
1580 }
1581 }
1582
1583 // Check that the transcoding graph has no cycles.
1584 8260 ret = check_acyclic(sch);
1585
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8260 times.
8260 if (ret < 0)
1586 return ret;
1587
1588 8260 return 0;
1589 }
1590
1591 8260 int sch_start(Scheduler *sch)
1592 {
1593 int ret;
1594
1595 8260 ret = start_prepare(sch);
1596
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8260 times.
8260 if (ret < 0)
1597 return ret;
1598
1599
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8260 times.
8260 av_assert0(sch->state == SCH_STATE_UNINIT);
1600 8260 sch->state = SCH_STATE_STARTED;
1601
1602
2/2
✓ Branch 0 taken 8262 times.
✓ Branch 1 taken 8260 times.
16522 for (unsigned i = 0; i < sch->nb_mux; i++) {
1603 8262 SchMux *mux = &sch->mux[i];
1604
1605
2/2
✓ Branch 0 taken 465 times.
✓ Branch 1 taken 7797 times.
8262 if (mux->nb_streams_ready == mux->nb_streams) {
1606 465 ret = mux_init(sch, mux);
1607
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 465 times.
465 if (ret < 0)
1608 goto fail;
1609 }
1610 }
1611
1612
2/2
✓ Branch 0 taken 8044 times.
✓ Branch 1 taken 8260 times.
16304 for (unsigned i = 0; i < sch->nb_enc; i++) {
1613 8044 SchEnc *enc = &sch->enc[i];
1614
1615 8044 ret = task_start(&enc->task);
1616
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8044 times.
8044 if (ret < 0)
1617 goto fail;
1618 }
1619
1620
2/2
✓ Branch 0 taken 7876 times.
✓ Branch 1 taken 8260 times.
16136 for (unsigned i = 0; i < sch->nb_filters; i++) {
1621 7876 SchFilterGraph *fg = &sch->filters[i];
1622
1623 7876 ret = task_start(&fg->task);
1624
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7876 times.
7876 if (ret < 0)
1625 goto fail;
1626 }
1627
1628
2/2
✓ Branch 0 taken 6793 times.
✓ Branch 1 taken 8260 times.
15053 for (unsigned i = 0; i < sch->nb_dec; i++) {
1629 6793 SchDec *dec = &sch->dec[i];
1630
1631 6793 ret = task_start(&dec->task);
1632
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6793 times.
6793 if (ret < 0)
1633 goto fail;
1634 }
1635
1636
2/2
✓ Branch 0 taken 7211 times.
✓ Branch 1 taken 8260 times.
15471 for (unsigned i = 0; i < sch->nb_demux; i++) {
1637 7211 SchDemux *d = &sch->demux[i];
1638
1639
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 7197 times.
7211 if (!d->nb_streams)
1640 14 continue;
1641
1642 7197 ret = task_start(&d->task);
1643
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7197 times.
7197 if (ret < 0)
1644 goto fail;
1645 }
1646
1647 8260 pthread_mutex_lock(&sch->schedule_lock);
1648 8260 schedule_update_locked(sch);
1649 8260 pthread_mutex_unlock(&sch->schedule_lock);
1650
1651 8260 return 0;
1652 fail:
1653 sch_stop(sch, NULL);
1654 return ret;
1655 }
1656
1657 25667 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1658 {
1659 int ret;
1660
1661 // convert delay to absolute timestamp
1662 25667 timeout_us += av_gettime();
1663
1664 25667 pthread_mutex_lock(&sch->finish_lock);
1665
1666
1/2
✓ Branch 0 taken 25667 times.
✗ Branch 1 not taken.
25667 if (sch->nb_mux_done < sch->nb_mux) {
1667 25667 struct timespec tv = { .tv_sec = timeout_us / 1000000,
1668 25667 .tv_nsec = (timeout_us % 1000000) * 1000 };
1669 25667 pthread_cond_timedwait(&sch->finish_cond, &sch->finish_lock, &tv);
1670 }
1671
1672 // abort transcoding if any task failed
1673
4/4
✓ Branch 0 taken 17408 times.
✓ Branch 1 taken 8259 times.
✓ Branch 2 taken 1 times.
✓ Branch 3 taken 17407 times.
25667 ret = sch->nb_mux_done == sch->nb_mux || sch->task_failed;
1674
1675 25667 pthread_mutex_unlock(&sch->finish_lock);
1676
1677 25667 *transcode_ts = atomic_load(&sch->last_dts);
1678
1679 25667 return ret;
1680 }
1681
1682 8006 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1683 {
1684 int ret;
1685
1686 8006 ret = enc->open_cb(enc->task.func_arg, frame);
1687
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8006 times.
8006 if (ret < 0)
1688 return ret;
1689
1690 // ret>0 signals audio frame size, which means sync queue must
1691 // have been enabled during encoder creation
1692
2/2
✓ Branch 0 taken 173 times.
✓ Branch 1 taken 7833 times.
8006 if (ret > 0) {
1693 SchSyncQueue *sq;
1694
1695
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 173 times.
173 av_assert0(enc->sq_idx[0] >= 0);
1696 173 sq = &sch->sq_enc[enc->sq_idx[0]];
1697
1698 173 pthread_mutex_lock(&sq->lock);
1699
1700 173 sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1701
1702 173 pthread_mutex_unlock(&sq->lock);
1703 }
1704
1705 8006 return 0;
1706 }
1707
1708 459760 static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1709 {
1710 int ret;
1711
1712
2/2
✓ Branch 0 taken 16367 times.
✓ Branch 1 taken 443393 times.
459760 if (!frame) {
1713 16367 tq_send_finish(enc->queue, 0);
1714 16367 return 0;
1715 }
1716
1717
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 443393 times.
443393 if (enc->in_finished)
1718 return AVERROR_EOF;
1719
1720 443393 ret = tq_send(enc->queue, 0, frame);
1721
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 443392 times.
443393 if (ret < 0)
1722 1 enc->in_finished = 1;
1723
1724 443393 return ret;
1725 }
1726
1727 35551 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1728 {
1729 35551 SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1730 35551 int ret = 0;
1731
1732 // inform the scheduling code that no more input will arrive along this path;
1733 // this is necessary because the sync queue may not send an EOF downstream
1734 // until other streams finish
1735 // TODO: consider a cleaner way of passing this information through
1736 // the pipeline
1737
2/2
✓ Branch 0 taken 3334 times.
✓ Branch 1 taken 32217 times.
35551 if (!frame) {
1738
2/2
✓ Branch 0 taken 3334 times.
✓ Branch 1 taken 3334 times.
6668 for (unsigned i = 0; i < enc->nb_dst; i++) {
1739 SchMux *mux;
1740 SchMuxStream *ms;
1741
1742
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3334 times.
3334 if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1743 continue;
1744
1745 3334 mux = &sch->mux[enc->dst[i].idx];
1746 3334 ms = &mux->streams[enc->dst[i].idx_stream];
1747
1748 3334 pthread_mutex_lock(&sch->schedule_lock);
1749
1750 3334 ms->source_finished = 1;
1751 3334 schedule_update_locked(sch);
1752
1753 3334 pthread_mutex_unlock(&sch->schedule_lock);
1754 }
1755 }
1756
1757 35551 pthread_mutex_lock(&sq->lock);
1758
1759 35551 ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1760
2/2
✓ Branch 0 taken 35549 times.
✓ Branch 1 taken 2 times.
35551 if (ret < 0)
1761 2 goto finish;
1762
1763 82752 while (1) {
1764 SchEnc *enc;
1765
1766 // TODO: the SQ API should be extended to allow returning EOF
1767 // for individual streams
1768 118301 ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1769
2/2
✓ Branch 0 taken 35549 times.
✓ Branch 1 taken 82752 times.
118301 if (ret < 0) {
1770
2/2
✓ Branch 0 taken 6215 times.
✓ Branch 1 taken 29334 times.
35549 ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1771 35549 break;
1772 }
1773
1774 82752 enc = &sch->enc[sq->enc_idx[ret]];
1775 82752 ret = send_to_enc_thread(sch, enc, sq->frame);
1776
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 82752 times.
82752 if (ret < 0) {
1777 av_frame_unref(sq->frame);
1778 if (ret != AVERROR_EOF)
1779 break;
1780
1781 sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1782 continue;
1783 }
1784 }
1785
1786
2/2
✓ Branch 0 taken 29334 times.
✓ Branch 1 taken 6215 times.
35549 if (ret < 0) {
1787 // close all encoders fed from this sync queue
1788
2/2
✓ Branch 0 taken 6612 times.
✓ Branch 1 taken 6215 times.
12827 for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1789 6612 int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1790
1791 // if the sync queue error is EOF and closing the encoder
1792 // produces a more serious error, make sure to pick the latter
1793
2/4
✓ Branch 0 taken 6612 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 6612 times.
✗ Branch 3 not taken.
6612 ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1794 }
1795 }
1796
1797 35549 finish:
1798 35551 pthread_mutex_unlock(&sq->lock);
1799
1800 35551 return ret;
1801 }
1802
1803 405950 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1804 {
1805
6/6
✓ Branch 0 taken 405033 times.
✓ Branch 1 taken 917 times.
✓ Branch 2 taken 391982 times.
✓ Branch 3 taken 13051 times.
✓ Branch 4 taken 8006 times.
✓ Branch 5 taken 383976 times.
405950 if (enc->open_cb && frame && !enc->opened) {
1806 8006 int ret = enc_open(sch, enc, frame);
1807
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8006 times.
8006 if (ret < 0)
1808 return ret;
1809 8006 enc->opened = 1;
1810
1811 // discard empty frames that only carry encoder init parameters
1812
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 8003 times.
8006 if (!frame->buf[0]) {
1813 3 av_frame_unref(frame);
1814 3 return 0;
1815 }
1816 }
1817
1818 405947 return (enc->sq_idx[0] >= 0) ?
1819
2/2
✓ Branch 0 taken 35551 times.
✓ Branch 1 taken 370396 times.
776343 send_to_enc_sq (sch, enc, frame) :
1820 370396 send_to_enc_thread(sch, enc, frame);
1821 }
1822
1823 3390 static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
1824 {
1825 3390 PreMuxQueue *q = &ms->pre_mux_queue;
1826 3390 AVPacket *tmp_pkt = NULL;
1827 int ret;
1828
1829
2/2
✓ Branch 1 taken 147 times.
✓ Branch 2 taken 3243 times.
3390 if (!av_fifo_can_write(q->fifo)) {
1830 147 size_t packets = av_fifo_can_read(q->fifo);
1831
1/2
✓ Branch 0 taken 147 times.
✗ Branch 1 not taken.
147 size_t pkt_size = pkt ? pkt->size : 0;
1832 147 int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1833
2/2
✓ Branch 0 taken 142 times.
✓ Branch 1 taken 5 times.
147 size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1834 147 size_t new_size = FFMIN(2 * packets, max_packets);
1835
1836
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 147 times.
147 if (new_size <= packets) {
1837 av_log(mux, AV_LOG_ERROR,
1838 "Too many packets buffered for output stream.\n");
1839 return AVERROR_BUFFER_TOO_SMALL;
1840 }
1841 147 ret = av_fifo_grow2(q->fifo, new_size - packets);
1842
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 147 times.
147 if (ret < 0)
1843 return ret;
1844 }
1845
1846
2/2
✓ Branch 0 taken 3288 times.
✓ Branch 1 taken 102 times.
3390 if (pkt) {
1847 3288 tmp_pkt = av_packet_alloc();
1848
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3288 times.
3288 if (!tmp_pkt)
1849 return AVERROR(ENOMEM);
1850
1851 3288 av_packet_move_ref(tmp_pkt, pkt);
1852 3288 q->data_size += tmp_pkt->size;
1853 }
1854 3390 av_fifo_write(q->fifo, &tmp_pkt, 1);
1855
1856 3390 return 0;
1857 }
1858
1859 514994 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1860 AVPacket *pkt)
1861 {
1862 514994 SchMuxStream *ms = &mux->streams[stream_idx];
1863
2/2
✓ Branch 0 taken 498149 times.
✓ Branch 1 taken 8100 times.
506249 int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1864
2/2
✓ Branch 0 taken 506249 times.
✓ Branch 1 taken 8745 times.
1021243 av_rescale_q(pkt->dts + pkt->duration, pkt->time_base, AV_TIME_BASE_Q) :
1865 AV_NOPTS_VALUE;
1866
1867 // queue the packet if the muxer cannot be started yet
1868
2/2
✓ Branch 0 taken 3427 times.
✓ Branch 1 taken 511567 times.
514994 if (!atomic_load(&mux->mux_started)) {
1869 3427 int queued = 0;
1870
1871 // the muxer could have started between the above atomic check and
1872 // locking the mutex, then this block falls through to normal send path
1873 3427 pthread_mutex_lock(&sch->mux_ready_lock);
1874
1875
2/2
✓ Branch 0 taken 3390 times.
✓ Branch 1 taken 37 times.
3427 if (!atomic_load(&mux->mux_started)) {
1876 3390 int ret = mux_queue_packet(mux, ms, pkt);
1877
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3390 times.
3390 queued = ret < 0 ? ret : 1;
1878 }
1879
1880 3427 pthread_mutex_unlock(&sch->mux_ready_lock);
1881
1882
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3427 times.
3427 if (queued < 0)
1883 return queued;
1884
2/2
✓ Branch 0 taken 3390 times.
✓ Branch 1 taken 37 times.
3427 else if (queued)
1885 3390 goto update_schedule;
1886 }
1887
1888
2/2
✓ Branch 0 taken 502961 times.
✓ Branch 1 taken 8643 times.
511604 if (pkt) {
1889 int ret;
1890
1891
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 502960 times.
502961 if (ms->init_eof)
1892 1 return AVERROR_EOF;
1893
1894 502960 ret = tq_send(mux->queue, stream_idx, pkt);
1895
2/2
✓ Branch 0 taken 66 times.
✓ Branch 1 taken 502894 times.
502960 if (ret < 0)
1896 66 return ret;
1897 } else
1898 8643 tq_send_finish(mux->queue, stream_idx);
1899
1900 514927 update_schedule:
1901 // TODO: use atomics to check whether this changes trailing dts
1902 // to avoid locking unnecesarily
1903
4/4
✓ Branch 0 taken 16845 times.
✓ Branch 1 taken 498082 times.
✓ Branch 2 taken 8745 times.
✓ Branch 3 taken 8100 times.
514927 if (dts != AV_NOPTS_VALUE || !pkt) {
1904 506827 pthread_mutex_lock(&sch->schedule_lock);
1905
1906
2/2
✓ Branch 0 taken 498082 times.
✓ Branch 1 taken 8745 times.
506827 if (pkt) ms->last_dts = dts;
1907 8745 else ms->source_finished = 1;
1908
1909 506827 schedule_update_locked(sch);
1910
1911 506827 pthread_mutex_unlock(&sch->schedule_lock);
1912 }
1913
1914 514927 return 0;
1915 }
1916
1917 static int
1918 480048 demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst,
1919 uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
1920 {
1921 int ret;
1922
1923
2/2
✓ Branch 0 taken 3458 times.
✓ Branch 1 taken 476590 times.
480048 if (*dst_finished)
1924 3458 return AVERROR_EOF;
1925
1926
4/4
✓ Branch 0 taken 472555 times.
✓ Branch 1 taken 4035 times.
✓ Branch 2 taken 69022 times.
✓ Branch 3 taken 403533 times.
476590 if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
1927
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 69020 times.
69022 (flags & DEMUX_SEND_STREAMCOPY_EOF)) {
1928 2 av_packet_unref(pkt);
1929 2 pkt = NULL;
1930 }
1931
1932
2/2
✓ Branch 0 taken 4037 times.
✓ Branch 1 taken 472553 times.
476590 if (!pkt)
1933 4037 goto finish;
1934
1935 945106 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
1936
2/2
✓ Branch 0 taken 69020 times.
✓ Branch 1 taken 403533 times.
472553 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
1937 403533 tq_send(sch->dec[dst.idx].queue, 0, pkt);
1938
2/2
✓ Branch 0 taken 3456 times.
✓ Branch 1 taken 469097 times.
472553 if (ret == AVERROR_EOF)
1939 3456 goto finish;
1940
1941 469097 return ret;
1942
1943 7493 finish:
1944
2/2
✓ Branch 0 taken 701 times.
✓ Branch 1 taken 6792 times.
7493 if (dst.type == SCH_NODE_TYPE_MUX)
1945 701 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
1946 else
1947 6792 tq_send_finish(sch->dec[dst.idx].queue, 0);
1948
1949 7493 *dst_finished = 1;
1950 7493 return AVERROR_EOF;
1951 }
1952
1953 479580 static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds,
1954 AVPacket *pkt, unsigned flags)
1955 {
1956 479580 unsigned nb_done = 0;
1957
1958
2/2
✓ Branch 0 taken 480048 times.
✓ Branch 1 taken 479580 times.
959628 for (unsigned i = 0; i < ds->nb_dst; i++) {
1959 480048 AVPacket *to_send = pkt;
1960 480048 uint8_t *finished = &ds->dst_finished[i];
1961
1962 int ret;
1963
1964 // sending a packet consumes it, so make a temporary reference if needed
1965
4/4
✓ Branch 0 taken 472555 times.
✓ Branch 1 taken 7493 times.
✓ Branch 2 taken 442 times.
✓ Branch 3 taken 472113 times.
480048 if (pkt && i < ds->nb_dst - 1) {
1966 442 to_send = d->send_pkt;
1967
1968 442 ret = av_packet_ref(to_send, pkt);
1969
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 442 times.
442 if (ret < 0)
1970 return ret;
1971 }
1972
1973 480048 ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
1974
2/2
✓ Branch 0 taken 472555 times.
✓ Branch 1 taken 7493 times.
480048 if (to_send)
1975 472555 av_packet_unref(to_send);
1976
2/2
✓ Branch 0 taken 10951 times.
✓ Branch 1 taken 469097 times.
480048 if (ret == AVERROR_EOF)
1977 10951 nb_done++;
1978
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 469097 times.
469097 else if (ret < 0)
1979 return ret;
1980 }
1981
1982
2/2
✓ Branch 0 taken 10924 times.
✓ Branch 1 taken 468656 times.
479580 return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
1983 }
1984
1985 11 static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
1986 {
1987 11 Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
1988
1989
3/6
✓ Branch 0 taken 11 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 11 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 11 times.
11 av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
1990
1991
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 11 times.
25 for (unsigned i = 0; i < d->nb_streams; i++) {
1992 14 SchDemuxStream *ds = &d->streams[i];
1993
1994
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 14 times.
28 for (unsigned j = 0; j < ds->nb_dst; j++) {
1995 14 const SchedulerNode *dst = &ds->dst[j];
1996 SchDec *dec;
1997 int ret;
1998
1999
3/4
✓ Branch 0 taken 14 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
✓ Branch 3 taken 6 times.
14 if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
2000 8 continue;
2001
2002 6 dec = &sch->dec[dst->idx];
2003
2004 6 ret = tq_send(dec->queue, 0, pkt);
2005
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0)
2006 return ret;
2007
2008
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 3 times.
6 if (dec->queue_end_ts) {
2009 Timestamp ts;
2010 3 ret = av_thread_message_queue_recv(dec->queue_end_ts, &ts, 0);
2011
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret < 0)
2012 return ret;
2013
2014
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (max_end_ts.ts == AV_NOPTS_VALUE ||
2015 (ts.ts != AV_NOPTS_VALUE &&
2016 av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
2017 3 max_end_ts = ts;
2018
2019 }
2020 }
2021 }
2022
2023 11 pkt->pts = max_end_ts.ts;
2024 11 pkt->time_base = max_end_ts.tb;
2025
2026 11 return 0;
2027 }
2028
2029 472165 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
2030 unsigned flags)
2031 {
2032 SchDemux *d;
2033 int terminate;
2034
2035
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 472165 times.
472165 av_assert0(demux_idx < sch->nb_demux);
2036 472165 d = &sch->demux[demux_idx];
2037
2038 472165 terminate = waiter_wait(sch, &d->waiter);
2039
2/2
✓ Branch 0 taken 41 times.
✓ Branch 1 taken 472124 times.
472165 if (terminate)
2040 41 return AVERROR_EXIT;
2041
2042 // flush the downstreams after seek
2043
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 472113 times.
472124 if (pkt->stream_index == -1)
2044 11 return demux_flush(sch, d, pkt);
2045
2046
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 472113 times.
472113 av_assert0(pkt->stream_index < d->nb_streams);
2047
2048 472113 return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
2049 }
2050
2051 7211 static int demux_done(Scheduler *sch, unsigned demux_idx)
2052 {
2053 7211 SchDemux *d = &sch->demux[demux_idx];
2054 7211 int ret = 0;
2055
2056
2/2
✓ Branch 0 taken 7467 times.
✓ Branch 1 taken 7211 times.
14678 for (unsigned i = 0; i < d->nb_streams; i++) {
2057 7467 int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
2058
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7467 times.
7467 if (err != AVERROR_EOF)
2059 ret = err_merge(ret, err);
2060 }
2061
2062 7211 pthread_mutex_lock(&sch->schedule_lock);
2063
2064 7211 d->task_exited = 1;
2065
2066 7211 schedule_update_locked(sch);
2067
2068 7211 pthread_mutex_unlock(&sch->schedule_lock);
2069
2070 7211 return ret;
2071 }
2072
2073 522881 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2074 {
2075 SchMux *mux;
2076 int ret, stream_idx;
2077
2078
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 522881 times.
522881 av_assert0(mux_idx < sch->nb_mux);
2079 522881 mux = &sch->mux[mux_idx];
2080
2081 522881 ret = tq_receive(mux->queue, &stream_idx, pkt);
2082 522881 pkt->stream_index = stream_idx;
2083 522881 return ret;
2084 }
2085
2086 203 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2087 {
2088 SchMux *mux;
2089
2090
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 av_assert0(mux_idx < sch->nb_mux);
2091 203 mux = &sch->mux[mux_idx];
2092
2093
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 av_assert0(stream_idx < mux->nb_streams);
2094 203 tq_receive_finish(mux->queue, stream_idx);
2095
2096 203 pthread_mutex_lock(&sch->schedule_lock);
2097 203 mux->streams[stream_idx].source_finished = 1;
2098
2099 203 schedule_update_locked(sch);
2100
2101 203 pthread_mutex_unlock(&sch->schedule_lock);
2102 203 }
2103
2104 468149 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2105 const AVPacket *pkt)
2106 {
2107 SchMux *mux;
2108 SchMuxStream *ms;
2109
2110
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 468149 times.
468149 av_assert0(mux_idx < sch->nb_mux);
2111 468149 mux = &sch->mux[mux_idx];
2112
2113
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 468149 times.
468149 av_assert0(stream_idx < mux->nb_streams);
2114 468149 ms = &mux->streams[stream_idx];
2115
2116
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 468149 times.
468154 for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2117 5 SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2118 int ret;
2119
2120 5 ret = av_packet_copy_props(mux->sub_heartbeat_pkt, pkt);
2121
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
5 if (ret < 0)
2122 return ret;
2123
2124 5 tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2125 }
2126
2127 468149 return 0;
2128 }
2129
2130 8262 static int mux_done(Scheduler *sch, unsigned mux_idx)
2131 {
2132 8262 SchMux *mux = &sch->mux[mux_idx];
2133
2134 8262 pthread_mutex_lock(&sch->schedule_lock);
2135
2136
2/2
✓ Branch 0 taken 8745 times.
✓ Branch 1 taken 8262 times.
17007 for (unsigned i = 0; i < mux->nb_streams; i++) {
2137 8745 tq_receive_finish(mux->queue, i);
2138 8745 mux->streams[i].source_finished = 1;
2139 }
2140
2141 8262 schedule_update_locked(sch);
2142
2143 8262 pthread_mutex_unlock(&sch->schedule_lock);
2144
2145 8262 pthread_mutex_lock(&sch->finish_lock);
2146
2147
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8262 times.
8262 av_assert0(sch->nb_mux_done < sch->nb_mux);
2148 8262 sch->nb_mux_done++;
2149
2150 8262 pthread_cond_signal(&sch->finish_cond);
2151
2152 8262 pthread_mutex_unlock(&sch->finish_lock);
2153
2154 8262 return 0;
2155 }
2156
2157 376388 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2158 {
2159 SchDec *dec;
2160 int ret, dummy;
2161
2162
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 376388 times.
376388 av_assert0(dec_idx < sch->nb_dec);
2163 376388 dec = &sch->dec[dec_idx];
2164
2165 // the decoder should have given us post-flush end timestamp in pkt
2166
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 376385 times.
376388 if (dec->expect_end_ts) {
2167 3 Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2168 3 ret = av_thread_message_queue_send(dec->queue_end_ts, &ts, 0);
2169
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret < 0)
2170 return ret;
2171
2172 3 dec->expect_end_ts = 0;
2173 }
2174
2175 376388 ret = tq_receive(dec->queue, &dummy, pkt);
2176
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 376388 times.
376388 av_assert0(dummy <= 0);
2177
2178 // got a flush packet, on the next call to this function the decoder
2179 // will give us post-flush end timestamp
2180
7/8
✓ Branch 0 taken 373044 times.
✓ Branch 1 taken 3344 times.
✓ Branch 2 taken 958 times.
✓ Branch 3 taken 372086 times.
✓ Branch 4 taken 958 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 3 times.
✓ Branch 7 taken 955 times.
376388 if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2181 3 dec->expect_end_ts = 1;
2182
2183 376388 return ret;
2184 }
2185
2186 405185 static int send_to_filter(Scheduler *sch, SchFilterGraph *fg,
2187 unsigned in_idx, AVFrame *frame)
2188 {
2189
2/2
✓ Branch 0 taken 398363 times.
✓ Branch 1 taken 6822 times.
405185 if (frame)
2190 398363 return tq_send(fg->queue, in_idx, frame);
2191
2192
1/2
✓ Branch 0 taken 6822 times.
✗ Branch 1 not taken.
6822 if (!fg->inputs[in_idx].send_finished) {
2193 6822 fg->inputs[in_idx].send_finished = 1;
2194 6822 tq_send_finish(fg->queue, in_idx);
2195
2196 // close the control stream when all actual inputs are done
2197
2/2
✓ Branch 0 taken 6737 times.
✓ Branch 1 taken 85 times.
6822 if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
2198 6737 tq_send_finish(fg->queue, fg->nb_inputs);
2199 }
2200 6822 return 0;
2201 }
2202
2203 409625 static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2204 uint8_t *dst_finished, AVFrame *frame)
2205 {
2206 int ret;
2207
2208
2/2
✓ Branch 0 taken 7026 times.
✓ Branch 1 taken 402599 times.
409625 if (*dst_finished)
2209 7026 return AVERROR_EOF;
2210
2211
2/2
✓ Branch 0 taken 3357 times.
✓ Branch 1 taken 399242 times.
402599 if (!frame)
2212 3357 goto finish;
2213
2214 798484 ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2215
2/2
✓ Branch 0 taken 398363 times.
✓ Branch 1 taken 879 times.
399242 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2216 879 send_to_enc(sch, &sch->enc[dst.idx], frame);
2217
2/2
✓ Branch 0 taken 3503 times.
✓ Branch 1 taken 395739 times.
399242 if (ret == AVERROR_EOF)
2218 3503 goto finish;
2219
2220 395739 return ret;
2221
2222 6860 finish:
2223
2/2
✓ Branch 0 taken 6822 times.
✓ Branch 1 taken 38 times.
6860 if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2224 6822 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2225 else
2226 38 send_to_enc(sch, &sch->enc[dst.idx], NULL);
2227
2228 6860 *dst_finished = 1;
2229
2230 6860 return AVERROR_EOF;
2231 }
2232
2233 401192 int sch_dec_send(Scheduler *sch, unsigned dec_idx,
2234 unsigned out_idx, AVFrame *frame)
2235 {
2236 SchDec *dec;
2237 SchDecOutput *o;
2238 int ret;
2239 401192 unsigned nb_done = 0;
2240
2241
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 401192 times.
401192 av_assert0(dec_idx < sch->nb_dec);
2242 401192 dec = &sch->dec[dec_idx];
2243
2244
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 401192 times.
401192 av_assert0(out_idx < dec->nb_outputs);
2245 401192 o = &dec->outputs[out_idx];
2246
2247
2/2
✓ Branch 0 taken 402765 times.
✓ Branch 1 taken 401192 times.
803957 for (unsigned i = 0; i < o->nb_dst; i++) {
2248 402765 uint8_t *finished = &o->dst_finished[i];
2249 402765 AVFrame *to_send = frame;
2250
2251 // sending a frame consumes it, so make a temporary reference if needed
2252
2/2
✓ Branch 0 taken 1573 times.
✓ Branch 1 taken 401192 times.
402765 if (i < o->nb_dst - 1) {
2253 1573 to_send = dec->send_frame;
2254
2255 // frame may sometimes contain props only,
2256 // e.g. to signal EOF timestamp
2257
2/2
✓ Branch 0 taken 1466 times.
✓ Branch 1 taken 107 times.
1573 ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2258 107 av_frame_copy_props(to_send, frame);
2259
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1573 times.
1573 if (ret < 0)
2260 return ret;
2261 }
2262
2263 402765 ret = dec_send_to_dst(sch, o->dst[i], finished, to_send);
2264
2/2
✓ Branch 0 taken 7026 times.
✓ Branch 1 taken 395739 times.
402765 if (ret < 0) {
2265 7026 av_frame_unref(to_send);
2266
1/2
✓ Branch 0 taken 7026 times.
✗ Branch 1 not taken.
7026 if (ret == AVERROR_EOF) {
2267 7026 nb_done++;
2268 7026 continue;
2269 }
2270 return ret;
2271 }
2272 }
2273
2274
2/2
✓ Branch 0 taken 6909 times.
✓ Branch 1 taken 394283 times.
401192 return (nb_done == o->nb_dst) ? AVERROR_EOF : 0;
2275 }
2276
2277 6793 static int dec_done(Scheduler *sch, unsigned dec_idx)
2278 {
2279 6793 SchDec *dec = &sch->dec[dec_idx];
2280 6793 int ret = 0;
2281
2282 6793 tq_receive_finish(dec->queue, 0);
2283
2284 // make sure our source does not get stuck waiting for end timestamps
2285 // that will never arrive
2286
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6792 times.
6793 if (dec->queue_end_ts)
2287 1 av_thread_message_queue_set_err_recv(dec->queue_end_ts, AVERROR_EOF);
2288
2289
2/2
✓ Branch 0 taken 6799 times.
✓ Branch 1 taken 6793 times.
13592 for (unsigned i = 0; i < dec->nb_outputs; i++) {
2290 6799 SchDecOutput *o = &dec->outputs[i];
2291
2292
2/2
✓ Branch 0 taken 6860 times.
✓ Branch 1 taken 6799 times.
13659 for (unsigned j = 0; j < o->nb_dst; j++) {
2293 6860 int err = dec_send_to_dst(sch, o->dst[j], &o->dst_finished[j], NULL);
2294
2/4
✓ Branch 0 taken 6860 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6860 times.
6860 if (err < 0 && err != AVERROR_EOF)
2295 ret = err_merge(ret, err);
2296 }
2297 }
2298
2299 6793 return ret;
2300 }
2301
2302 451432 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2303 {
2304 SchEnc *enc;
2305 int ret, dummy;
2306
2307
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 451432 times.
451432 av_assert0(enc_idx < sch->nb_enc);
2308 451432 enc = &sch->enc[enc_idx];
2309
2310 451432 ret = tq_receive(enc->queue, &dummy, frame);
2311
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 451432 times.
451432 av_assert0(dummy <= 0);
2312
2313 451432 return ret;
2314 }
2315
2316 445366 static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2317 uint8_t *dst_finished, AVPacket *pkt)
2318 {
2319 int ret;
2320
2321
2/2
✓ Branch 0 taken 44 times.
✓ Branch 1 taken 445322 times.
445366 if (*dst_finished)
2322 44 return AVERROR_EOF;
2323
2324
2/2
✓ Branch 0 taken 8043 times.
✓ Branch 1 taken 437279 times.
445322 if (!pkt)
2325 8043 goto finish;
2326
2327 874558 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2328
2/2
✓ Branch 0 taken 437229 times.
✓ Branch 1 taken 50 times.
437279 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2329 50 tq_send(sch->dec[dst.idx].queue, 0, pkt);
2330
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 437277 times.
437279 if (ret == AVERROR_EOF)
2331 2 goto finish;
2332
2333 437277 return ret;
2334
2335 8045 finish:
2336
2/2
✓ Branch 0 taken 8044 times.
✓ Branch 1 taken 1 times.
8045 if (dst.type == SCH_NODE_TYPE_MUX)
2337 8044 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2338 else
2339 1 tq_send_finish(sch->dec[dst.idx].queue, 0);
2340
2341 8045 *dst_finished = 1;
2342
2343 8045 return AVERROR_EOF;
2344 }
2345
2346 437271 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2347 {
2348 SchEnc *enc;
2349 int ret;
2350
2351
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 437271 times.
437271 av_assert0(enc_idx < sch->nb_enc);
2352 437271 enc = &sch->enc[enc_idx];
2353
2354
2/2
✓ Branch 0 taken 437321 times.
✓ Branch 1 taken 437271 times.
874592 for (unsigned i = 0; i < enc->nb_dst; i++) {
2355 437321 uint8_t *finished = &enc->dst_finished[i];
2356 437321 AVPacket *to_send = pkt;
2357
2358 // sending a packet consumes it, so make a temporary reference if needed
2359
2/2
✓ Branch 0 taken 50 times.
✓ Branch 1 taken 437271 times.
437321 if (i < enc->nb_dst - 1) {
2360 50 to_send = enc->send_pkt;
2361
2362 50 ret = av_packet_ref(to_send, pkt);
2363
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50 times.
50 if (ret < 0)
2364 return ret;
2365 }
2366
2367 437321 ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2368
2/2
✓ Branch 0 taken 44 times.
✓ Branch 1 taken 437277 times.
437321 if (ret < 0) {
2369 44 av_packet_unref(to_send);
2370
1/2
✓ Branch 0 taken 44 times.
✗ Branch 1 not taken.
44 if (ret == AVERROR_EOF)
2371 44 continue;
2372 return ret;
2373 }
2374 }
2375
2376 437271 return 0;
2377 }
2378
2379 8044 static int enc_done(Scheduler *sch, unsigned enc_idx)
2380 {
2381 8044 SchEnc *enc = &sch->enc[enc_idx];
2382 8044 int ret = 0;
2383
2384 8044 tq_receive_finish(enc->queue, 0);
2385
2386
2/2
✓ Branch 0 taken 8045 times.
✓ Branch 1 taken 8044 times.
16089 for (unsigned i = 0; i < enc->nb_dst; i++) {
2387 8045 int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2388
2/4
✓ Branch 0 taken 8045 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 8045 times.
8045 if (err < 0 && err != AVERROR_EOF)
2389 ret = err_merge(ret, err);
2390 }
2391
2392 8044 return ret;
2393 }
2394
2395 396305 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
2396 unsigned *in_idx, AVFrame *frame)
2397 {
2398 SchFilterGraph *fg;
2399
2400
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 396305 times.
396305 av_assert0(fg_idx < sch->nb_filters);
2401 396305 fg = &sch->filters[fg_idx];
2402
2403
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 396305 times.
396305 av_assert0(*in_idx <= fg->nb_inputs);
2404
2405 // update scheduling to account for desired input stream, if it changed
2406 //
2407 // this check needs no locking because only the filtering thread
2408 // updates this value
2409
2/2
✓ Branch 0 taken 941 times.
✓ Branch 1 taken 395364 times.
396305 if (*in_idx != fg->best_input) {
2410 941 pthread_mutex_lock(&sch->schedule_lock);
2411
2412 941 fg->best_input = *in_idx;
2413 941 schedule_update_locked(sch);
2414
2415 941 pthread_mutex_unlock(&sch->schedule_lock);
2416 }
2417
2418
2/2
✓ Branch 0 taken 367947 times.
✓ Branch 1 taken 28358 times.
396305 if (*in_idx == fg->nb_inputs) {
2419 28358 int terminate = waiter_wait(sch, &fg->waiter);
2420
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28358 times.
28358 return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
2421 }
2422
2423 25 while (1) {
2424 int ret, idx;
2425
2426 367972 ret = tq_receive(fg->queue, &idx, frame);
2427
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 367966 times.
367972 if (idx < 0)
2428 367947 return AVERROR_EOF;
2429
2/2
✓ Branch 0 taken 367941 times.
✓ Branch 1 taken 25 times.
367966 else if (ret >= 0) {
2430 367941 *in_idx = idx;
2431 367941 return 0;
2432 }
2433
2434 // disregard EOFs for specific streams - they should always be
2435 // preceded by an EOF frame
2436 }
2437 }
2438
2439 1 void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
2440 {
2441 SchFilterGraph *fg;
2442 SchFilterIn *fi;
2443
2444
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(fg_idx < sch->nb_filters);
2445 1 fg = &sch->filters[fg_idx];
2446
2447
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(in_idx < fg->nb_inputs);
2448 1 fi = &fg->inputs[in_idx];
2449
2450
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!fi->receive_finished) {
2451 1 fi->receive_finished = 1;
2452 1 tq_receive_finish(fg->queue, in_idx);
2453
2454 // close the control stream when all actual inputs are done
2455
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
2456 tq_receive_finish(fg->queue, fg->nb_inputs);
2457 }
2458 1 }
2459
2460 397027 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
2461 {
2462 SchFilterGraph *fg;
2463 SchedulerNode dst;
2464
2465
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 397027 times.
397027 av_assert0(fg_idx < sch->nb_filters);
2466 397027 fg = &sch->filters[fg_idx];
2467
2468
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 397027 times.
397027 av_assert0(out_idx < fg->nb_outputs);
2469 397027 dst = fg->outputs[out_idx].dst;
2470
2471 397027 return (dst.type == SCH_NODE_TYPE_ENC) ?
2472
1/2
✓ Branch 0 taken 397027 times.
✗ Branch 1 not taken.
397027 send_to_enc (sch, &sch->enc[dst.idx], frame) :
2473 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
2474 }
2475
2476 7876 static int filter_done(Scheduler *sch, unsigned fg_idx)
2477 {
2478 7876 SchFilterGraph *fg = &sch->filters[fg_idx];
2479 7876 int ret = 0;
2480
2481
2/2
✓ Branch 0 taken 14698 times.
✓ Branch 1 taken 7876 times.
22574 for (unsigned i = 0; i <= fg->nb_inputs; i++)
2482 14698 tq_receive_finish(fg->queue, i);
2483
2484
2/2
✓ Branch 0 taken 8006 times.
✓ Branch 1 taken 7876 times.
15882 for (unsigned i = 0; i < fg->nb_outputs; i++) {
2485 8006 SchedulerNode dst = fg->outputs[i].dst;
2486 16012 int err = (dst.type == SCH_NODE_TYPE_ENC) ?
2487
1/2
✓ Branch 0 taken 8006 times.
✗ Branch 1 not taken.
8006 send_to_enc (sch, &sch->enc[dst.idx], NULL) :
2488 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2489
2490
3/4
✓ Branch 0 taken 3130 times.
✓ Branch 1 taken 4876 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 3130 times.
8006 if (err < 0 && err != AVERROR_EOF)
2491 ret = err_merge(ret, err);
2492 }
2493
2494 7876 pthread_mutex_lock(&sch->schedule_lock);
2495
2496 7876 fg->task_exited = 1;
2497
2498 7876 schedule_update_locked(sch);
2499
2500 7876 pthread_mutex_unlock(&sch->schedule_lock);
2501
2502 7876 return ret;
2503 }
2504
2505 int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
2506 {
2507 SchFilterGraph *fg;
2508
2509 av_assert0(fg_idx < sch->nb_filters);
2510 fg = &sch->filters[fg_idx];
2511
2512 return send_to_filter(sch, fg, fg->nb_inputs, frame);
2513 }
2514
2515 38186 static int task_cleanup(Scheduler *sch, SchedulerNode node)
2516 {
2517
5/6
✓ Branch 0 taken 7211 times.
✓ Branch 1 taken 8262 times.
✓ Branch 2 taken 6793 times.
✓ Branch 3 taken 8044 times.
✓ Branch 4 taken 7876 times.
✗ Branch 5 not taken.
38186 switch (node.type) {
2518 7211 case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
2519 8262 case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
2520 6793 case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
2521 8044 case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
2522 7876 case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
2523 default: av_assert0(0);
2524 }
2525 }
2526
2527 38172 static void *task_wrapper(void *arg)
2528 {
2529 38172 SchTask *task = arg;
2530 38172 Scheduler *sch = task->parent;
2531 int ret;
2532 38172 int err = 0;
2533
2534 38172 ret = task->func(task->func_arg);
2535
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 38171 times.
38172 if (ret < 0)
2536 1 av_log(task->func_arg, AV_LOG_ERROR,
2537 1 "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
2538
2539 38172 err = task_cleanup(sch, task->node);
2540 38172 ret = err_merge(ret, err);
2541
2542 // EOF is considered normal termination
2543
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38172 times.
38172 if (ret == AVERROR_EOF)
2544 ret = 0;
2545
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 38171 times.
38172 if (ret < 0) {
2546 1 pthread_mutex_lock(&sch->finish_lock);
2547 1 sch->task_failed = 1;
2548 1 pthread_cond_signal(&sch->finish_cond);
2549 1 pthread_mutex_unlock(&sch->finish_lock);
2550 }
2551
2552
4/4
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 38171 times.
✓ Branch 2 taken 1 times.
✓ Branch 3 taken 38171 times.
38173 av_log(task->func_arg, ret < 0 ? AV_LOG_ERROR : AV_LOG_VERBOSE,
2553 "Terminating thread with return code %d (%s)\n", ret,
2554 1 ret < 0 ? av_err2str(ret) : "success");
2555
2556 38172 return (void*)(intptr_t)ret;
2557 }
2558
2559 38186 static int task_stop(Scheduler *sch, SchTask *task)
2560 {
2561 int ret;
2562 void *thread_ret;
2563
2564
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 38172 times.
38186 if (!task->thread_running)
2565 14 return task_cleanup(sch, task->node);
2566
2567 38172 ret = pthread_join(task->thread, &thread_ret);
2568
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38172 times.
38172 av_assert0(ret == 0);
2569
2570 38172 task->thread_running = 0;
2571
2572 38172 return (intptr_t)thread_ret;
2573 }
2574
2575 16521 int sch_stop(Scheduler *sch, int64_t *finish_ts)
2576 {
2577 16521 int ret = 0, err;
2578
2579
2/2
✓ Branch 0 taken 8261 times.
✓ Branch 1 taken 8260 times.
16521 if (sch->state != SCH_STATE_STARTED)
2580 8261 return 0;
2581
2582 8260 atomic_store(&sch->terminate, 1);
2583
2584
2/2
✓ Branch 0 taken 16520 times.
✓ Branch 1 taken 8260 times.
24780 for (unsigned type = 0; type < 2; type++)
2585
4/4
✓ Branch 0 taken 15471 times.
✓ Branch 1 taken 16136 times.
✓ Branch 2 taken 15087 times.
✓ Branch 3 taken 16520 times.
31607 for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
2586
2/2
✓ Branch 0 taken 7211 times.
✓ Branch 1 taken 7876 times.
15087 SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
2587 15087 waiter_set(w, 1);
2588 }
2589
2590
2/2
✓ Branch 0 taken 7211 times.
✓ Branch 1 taken 8260 times.
15471 for (unsigned i = 0; i < sch->nb_demux; i++) {
2591 7211 SchDemux *d = &sch->demux[i];
2592
2593 7211 err = task_stop(sch, &d->task);
2594 7211 ret = err_merge(ret, err);
2595 }
2596
2597
2/2
✓ Branch 0 taken 6793 times.
✓ Branch 1 taken 8260 times.
15053 for (unsigned i = 0; i < sch->nb_dec; i++) {
2598 6793 SchDec *dec = &sch->dec[i];
2599
2600 6793 err = task_stop(sch, &dec->task);
2601 6793 ret = err_merge(ret, err);
2602 }
2603
2604
2/2
✓ Branch 0 taken 7876 times.
✓ Branch 1 taken 8260 times.
16136 for (unsigned i = 0; i < sch->nb_filters; i++) {
2605 7876 SchFilterGraph *fg = &sch->filters[i];
2606
2607 7876 err = task_stop(sch, &fg->task);
2608 7876 ret = err_merge(ret, err);
2609 }
2610
2611
2/2
✓ Branch 0 taken 8044 times.
✓ Branch 1 taken 8260 times.
16304 for (unsigned i = 0; i < sch->nb_enc; i++) {
2612 8044 SchEnc *enc = &sch->enc[i];
2613
2614 8044 err = task_stop(sch, &enc->task);
2615 8044 ret = err_merge(ret, err);
2616 }
2617
2618
2/2
✓ Branch 0 taken 8262 times.
✓ Branch 1 taken 8260 times.
16522 for (unsigned i = 0; i < sch->nb_mux; i++) {
2619 8262 SchMux *mux = &sch->mux[i];
2620
2621 8262 err = task_stop(sch, &mux->task);
2622 8262 ret = err_merge(ret, err);
2623 }
2624
2625
1/2
✓ Branch 0 taken 8260 times.
✗ Branch 1 not taken.
8260 if (finish_ts)
2626 8260 *finish_ts = trailing_dts(sch, 1);
2627
2628 8260 sch->state = SCH_STATE_STOPPED;
2629
2630 8260 return ret;
2631 }
2632