FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/fftools/ffmpeg_sched.c
Date: 2024-10-04 17:46:48
Exec Total Coverage
Lines: 1050 1206 87.1%
Functions: 65 67 97.0%
Branches: 579 805 71.9%

Line Branch Exec Source
1 /*
2 * Inter-thread scheduling/synchronization.
3 * Copyright (c) 2023 Anton Khirnov
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31
32 #include "libavcodec/packet.h"
33
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
40 #include "libavutil/threadmessage.h"
41 #include "libavutil/time.h"
42
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
46
47 enum QueueType {
48 QUEUE_PACKETS,
49 QUEUE_FRAMES,
50 };
51
52 typedef struct SchWaiter {
53 pthread_mutex_t lock;
54 pthread_cond_t cond;
55 atomic_int choked;
56
57 // the following are internal state of schedule_update_locked() and must not
58 // be accessed outside of it
59 int choked_prev;
60 int choked_next;
61 } SchWaiter;
62
63 typedef struct SchTask {
64 Scheduler *parent;
65 SchedulerNode node;
66
67 SchThreadFunc func;
68 void *func_arg;
69
70 pthread_t thread;
71 int thread_running;
72 } SchTask;
73
74 typedef struct SchDecOutput {
75 SchedulerNode *dst;
76 uint8_t *dst_finished;
77 unsigned nb_dst;
78 } SchDecOutput;
79
80 typedef struct SchDec {
81 const AVClass *class;
82
83 SchedulerNode src;
84
85 SchDecOutput *outputs;
86 unsigned nb_outputs;
87
88 SchTask task;
89 // Queue for receiving input packets, one stream.
90 ThreadQueue *queue;
91
92 // Queue for sending post-flush end timestamps back to the source
93 AVThreadMessageQueue *queue_end_ts;
94 int expect_end_ts;
95
96 // temporary storage used by sch_dec_send()
97 AVFrame *send_frame;
98 } SchDec;
99
100 typedef struct SchSyncQueue {
101 SyncQueue *sq;
102 AVFrame *frame;
103 pthread_mutex_t lock;
104
105 unsigned *enc_idx;
106 unsigned nb_enc_idx;
107 } SchSyncQueue;
108
109 typedef struct SchEnc {
110 const AVClass *class;
111
112 SchedulerNode src;
113 SchedulerNode *dst;
114 uint8_t *dst_finished;
115 unsigned nb_dst;
116
117 // [0] - index of the sync queue in Scheduler.sq_enc,
118 // [1] - index of this encoder in the sq
119 int sq_idx[2];
120
121 /* Opening encoders is somewhat nontrivial due to their interaction with
122 * sync queues, which are (among other things) responsible for maintaining
123 * constant audio frame size, when it is required by the encoder.
124 *
125 * Opening the encoder requires stream parameters, obtained from the first
126 * frame. However, that frame cannot be properly chunked by the sync queue
127 * without knowing the required frame size, which is only available after
128 * opening the encoder.
129 *
130 * This apparent circular dependency is resolved in the following way:
131 * - the caller creating the encoder gives us a callback which opens the
132 * encoder and returns the required frame size (if any)
133 * - when the first frame is sent to the encoder, the sending thread
134 * - calls this callback, opening the encoder
135 * - passes the returned frame size to the sync queue
136 */
137 int (*open_cb)(void *opaque, const AVFrame *frame);
138 int opened;
139
140 SchTask task;
141 // Queue for receiving input frames, one stream.
142 ThreadQueue *queue;
143 // tq_send() to queue returned EOF
144 int in_finished;
145
146 // temporary storage used by sch_enc_send()
147 AVPacket *send_pkt;
148 } SchEnc;
149
150 typedef struct SchDemuxStream {
151 SchedulerNode *dst;
152 uint8_t *dst_finished;
153 unsigned nb_dst;
154 } SchDemuxStream;
155
156 typedef struct SchDemux {
157 const AVClass *class;
158
159 SchDemuxStream *streams;
160 unsigned nb_streams;
161
162 SchTask task;
163 SchWaiter waiter;
164
165 // temporary storage used by sch_demux_send()
166 AVPacket *send_pkt;
167
168 // protected by schedule_lock
169 int task_exited;
170 } SchDemux;
171
172 typedef struct PreMuxQueue {
173 /**
174 * Queue for buffering the packets before the muxer task can be started.
175 */
176 AVFifo *fifo;
177 /**
178 * Maximum number of packets in fifo.
179 */
180 int max_packets;
181 /*
182 * The size of the AVPackets' buffers in queue.
183 * Updated when a packet is either pushed or pulled from the queue.
184 */
185 size_t data_size;
186 /* Threshold after which max_packets will be in effect */
187 size_t data_threshold;
188 } PreMuxQueue;
189
190 typedef struct SchMuxStream {
191 SchedulerNode src;
192 SchedulerNode src_sched;
193
194 unsigned *sub_heartbeat_dst;
195 unsigned nb_sub_heartbeat_dst;
196
197 PreMuxQueue pre_mux_queue;
198
199 // an EOF was generated while flushing the pre-mux queue
200 int init_eof;
201
202 ////////////////////////////////////////////////////////////
203 // The following are protected by Scheduler.schedule_lock //
204
205 /* dts+duration of the last packet sent to this stream
206 in AV_TIME_BASE_Q */
207 int64_t last_dts;
208 // this stream no longer accepts input
209 int source_finished;
210 ////////////////////////////////////////////////////////////
211 } SchMuxStream;
212
213 typedef struct SchMux {
214 const AVClass *class;
215
216 SchMuxStream *streams;
217 unsigned nb_streams;
218 unsigned nb_streams_ready;
219
220 int (*init)(void *arg);
221
222 SchTask task;
223 /**
224 * Set to 1 after starting the muxer task and flushing the
225 * pre-muxing queues.
226 * Set either before any tasks have started, or with
227 * Scheduler.mux_ready_lock held.
228 */
229 atomic_int mux_started;
230 ThreadQueue *queue;
231 unsigned queue_size;
232
233 AVPacket *sub_heartbeat_pkt;
234 } SchMux;
235
236 typedef struct SchFilterIn {
237 SchedulerNode src;
238 SchedulerNode src_sched;
239 int send_finished;
240 int receive_finished;
241 } SchFilterIn;
242
243 typedef struct SchFilterOut {
244 SchedulerNode dst;
245 } SchFilterOut;
246
247 typedef struct SchFilterGraph {
248 const AVClass *class;
249
250 SchFilterIn *inputs;
251 unsigned nb_inputs;
252 atomic_uint nb_inputs_finished_send;
253 unsigned nb_inputs_finished_receive;
254
255 SchFilterOut *outputs;
256 unsigned nb_outputs;
257
258 SchTask task;
259 // input queue, nb_inputs+1 streams
260 // last stream is control
261 ThreadQueue *queue;
262 SchWaiter waiter;
263
264 // protected by schedule_lock
265 unsigned best_input;
266 int task_exited;
267 } SchFilterGraph;
268
269 enum SchedulerState {
270 SCH_STATE_UNINIT,
271 SCH_STATE_STARTED,
272 SCH_STATE_STOPPED,
273 };
274
275 struct Scheduler {
276 const AVClass *class;
277
278 SchDemux *demux;
279 unsigned nb_demux;
280
281 SchMux *mux;
282 unsigned nb_mux;
283
284 unsigned nb_mux_ready;
285 pthread_mutex_t mux_ready_lock;
286
287 unsigned nb_mux_done;
288 pthread_mutex_t mux_done_lock;
289 pthread_cond_t mux_done_cond;
290
291
292 SchDec *dec;
293 unsigned nb_dec;
294
295 SchEnc *enc;
296 unsigned nb_enc;
297
298 SchSyncQueue *sq_enc;
299 unsigned nb_sq_enc;
300
301 SchFilterGraph *filters;
302 unsigned nb_filters;
303
304 char *sdp_filename;
305 int sdp_auto;
306
307 enum SchedulerState state;
308 atomic_int terminate;
309 atomic_int task_failed;
310
311 pthread_mutex_t schedule_lock;
312
313 atomic_int_least64_t last_dts;
314 };
315
316 /**
317 * Wait until this task is allowed to proceed.
318 *
319 * @retval 0 the caller should proceed
320 * @retval 1 the caller should terminate
321 */
322 464714 static int waiter_wait(Scheduler *sch, SchWaiter *w)
323 {
324 int terminate;
325
326
2/2
✓ Branch 0 taken 464458 times.
✓ Branch 1 taken 256 times.
464714 if (!atomic_load(&w->choked))
327 464458 return 0;
328
329 256 pthread_mutex_lock(&w->lock);
330
331
4/4
✓ Branch 0 taken 262 times.
✓ Branch 1 taken 199 times.
✓ Branch 2 taken 205 times.
✓ Branch 3 taken 57 times.
461 while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
332 205 pthread_cond_wait(&w->cond, &w->lock);
333
334 256 terminate = atomic_load(&sch->terminate);
335
336 256 pthread_mutex_unlock(&w->lock);
337
338 256 return terminate;
339 }
340
341 30620 static void waiter_set(SchWaiter *w, int choked)
342 {
343 30620 pthread_mutex_lock(&w->lock);
344
345 30620 atomic_store(&w->choked, choked);
346 30620 pthread_cond_signal(&w->cond);
347
348 30620 pthread_mutex_unlock(&w->lock);
349 30620 }
350
351 13285 static int waiter_init(SchWaiter *w)
352 {
353 int ret;
354
355 13285 atomic_init(&w->choked, 0);
356
357 13285 ret = pthread_mutex_init(&w->lock, NULL);
358
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13285 times.
13285 if (ret)
359 return AVERROR(ret);
360
361 13285 ret = pthread_cond_init(&w->cond, NULL);
362
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13285 times.
13285 if (ret)
363 return AVERROR(ret);
364
365 13285 return 0;
366 }
367
368 13285 static void waiter_uninit(SchWaiter *w)
369 {
370 13285 pthread_mutex_destroy(&w->lock);
371 13285 pthread_cond_destroy(&w->cond);
372 13285 }
373
374 26291 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
375 enum QueueType type)
376 {
377 ThreadQueue *tq;
378 ObjPool *op;
379
380
1/2
✓ Branch 0 taken 26291 times.
✗ Branch 1 not taken.
26291 if (queue_size <= 0) {
381
2/2
✓ Branch 0 taken 13037 times.
✓ Branch 1 taken 13254 times.
26291 if (type == QUEUE_FRAMES)
382 13037 queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
383 else
384 13254 queue_size = DEFAULT_PACKET_THREAD_QUEUE_SIZE;
385 }
386
387
2/2
✓ Branch 0 taken 13037 times.
✓ Branch 1 taken 13254 times.
26291 if (type == QUEUE_FRAMES) {
388 // This queue length is used in the decoder code to ensure that
389 // there are enough entries in fixed-size frame pools to account
390 // for frames held in queues inside the ffmpeg utility. If this
391 // can ever dynamically change then the corresponding decode
392 // code needs to be updated as well.
393
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13037 times.
13037 av_assert0(queue_size == DEFAULT_FRAME_THREAD_QUEUE_SIZE);
394 }
395
396
2/2
✓ Branch 0 taken 13254 times.
✓ Branch 1 taken 13037 times.
26291 op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() :
397 13037 objpool_alloc_frames();
398
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26291 times.
26291 if (!op)
399 return AVERROR(ENOMEM);
400
401
2/2
✓ Branch 0 taken 13254 times.
✓ Branch 1 taken 13037 times.
26291 tq = tq_alloc(nb_streams, queue_size, op,
402 (type == QUEUE_PACKETS) ? pkt_move : frame_move);
403
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26291 times.
26291 if (!tq) {
404 objpool_free(&op);
405 return AVERROR(ENOMEM);
406 }
407
408 26291 *ptq = tq;
409 26291 return 0;
410 }
411
412 static void *task_wrapper(void *arg);
413
414 33123 static int task_start(SchTask *task)
415 {
416 int ret;
417
418 33123 av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
419
420
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 33123 times.
33123 av_assert0(!task->thread_running);
421
422 33123 ret = pthread_create(&task->thread, NULL, task_wrapper, task);
423
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 33123 times.
33123 if (ret) {
424 av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
425 strerror(ret));
426 return AVERROR(ret);
427 }
428
429 33123 task->thread_running = 1;
430 33123 return 0;
431 }
432
433 33137 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
434 SchThreadFunc func, void *func_arg)
435 {
436 33137 task->parent = sch;
437
438 33137 task->node.type = type;
439 33137 task->node.idx = idx;
440
441 33137 task->func = func;
442 33137 task->func_arg = func_arg;
443 33137 }
444
445 509033 static int64_t trailing_dts(const Scheduler *sch, int count_finished)
446 {
447 509033 int64_t min_dts = INT64_MAX;
448
449
2/2
✓ Branch 0 taken 509288 times.
✓ Branch 1 taken 495137 times.
1004425 for (unsigned i = 0; i < sch->nb_mux; i++) {
450 509288 const SchMux *mux = &sch->mux[i];
451
452
2/2
✓ Branch 0 taken 555576 times.
✓ Branch 1 taken 495392 times.
1050968 for (unsigned j = 0; j < mux->nb_streams; j++) {
453 555576 const SchMuxStream *ms = &mux->streams[j];
454
455
4/4
✓ Branch 0 taken 40577 times.
✓ Branch 1 taken 514999 times.
✓ Branch 2 taken 33355 times.
✓ Branch 3 taken 7222 times.
555576 if (ms->source_finished && !count_finished)
456 33355 continue;
457
2/2
✓ Branch 0 taken 13896 times.
✓ Branch 1 taken 508325 times.
522221 if (ms->last_dts == AV_NOPTS_VALUE)
458 13896 return AV_NOPTS_VALUE;
459
460 508325 min_dts = FFMIN(min_dts, ms->last_dts);
461 }
462 }
463
464
2/2
✓ Branch 0 taken 470038 times.
✓ Branch 1 taken 25099 times.
495137 return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
465 }
466
467 6817 void sch_free(Scheduler **psch)
468 {
469 6817 Scheduler *sch = *psch;
470
471
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6817 times.
6817 if (!sch)
472 return;
473
474 6817 sch_stop(sch, NULL);
475
476
2/2
✓ Branch 0 taken 6846 times.
✓ Branch 1 taken 6817 times.
13663 for (unsigned i = 0; i < sch->nb_demux; i++) {
477 6846 SchDemux *d = &sch->demux[i];
478
479
2/2
✓ Branch 0 taken 7072 times.
✓ Branch 1 taken 6846 times.
13918 for (unsigned j = 0; j < d->nb_streams; j++) {
480 7072 SchDemuxStream *ds = &d->streams[j];
481 7072 av_freep(&ds->dst);
482 7072 av_freep(&ds->dst_finished);
483 }
484 6846 av_freep(&d->streams);
485
486 6846 av_packet_free(&d->send_pkt);
487
488 6846 waiter_uninit(&d->waiter);
489 }
490 6817 av_freep(&sch->demux);
491
492
2/2
✓ Branch 0 taken 6818 times.
✓ Branch 1 taken 6817 times.
13635 for (unsigned i = 0; i < sch->nb_mux; i++) {
493 6818 SchMux *mux = &sch->mux[i];
494
495
2/2
✓ Branch 0 taken 7258 times.
✓ Branch 1 taken 6818 times.
14076 for (unsigned j = 0; j < mux->nb_streams; j++) {
496 7258 SchMuxStream *ms = &mux->streams[j];
497
498
1/2
✓ Branch 0 taken 7258 times.
✗ Branch 1 not taken.
7258 if (ms->pre_mux_queue.fifo) {
499 AVPacket *pkt;
500
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 7258 times.
7258 while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
501 av_packet_free(&pkt);
502 7258 av_fifo_freep2(&ms->pre_mux_queue.fifo);
503 }
504
505 7258 av_freep(&ms->sub_heartbeat_dst);
506 }
507 6818 av_freep(&mux->streams);
508
509 6818 av_packet_free(&mux->sub_heartbeat_pkt);
510
511 6818 tq_free(&mux->queue);
512 }
513 6817 av_freep(&sch->mux);
514
515
2/2
✓ Branch 0 taken 6436 times.
✓ Branch 1 taken 6817 times.
13253 for (unsigned i = 0; i < sch->nb_dec; i++) {
516 6436 SchDec *dec = &sch->dec[i];
517
518 6436 tq_free(&dec->queue);
519
520 6436 av_thread_message_queue_free(&dec->queue_end_ts);
521
522
2/2
✓ Branch 0 taken 6442 times.
✓ Branch 1 taken 6436 times.
12878 for (unsigned j = 0; j < dec->nb_outputs; j++) {
523 6442 SchDecOutput *o = &dec->outputs[j];
524
525 6442 av_freep(&o->dst);
526 6442 av_freep(&o->dst_finished);
527 }
528
529 6436 av_freep(&dec->outputs);
530
531 6436 av_frame_free(&dec->send_frame);
532 }
533 6817 av_freep(&sch->dec);
534
535
2/2
✓ Branch 0 taken 6598 times.
✓ Branch 1 taken 6817 times.
13415 for (unsigned i = 0; i < sch->nb_enc; i++) {
536 6598 SchEnc *enc = &sch->enc[i];
537
538 6598 tq_free(&enc->queue);
539
540 6598 av_packet_free(&enc->send_pkt);
541
542 6598 av_freep(&enc->dst);
543 6598 av_freep(&enc->dst_finished);
544 }
545 6817 av_freep(&sch->enc);
546
547
2/2
✓ Branch 0 taken 2777 times.
✓ Branch 1 taken 6817 times.
9594 for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
548 2777 SchSyncQueue *sq = &sch->sq_enc[i];
549 2777 sq_free(&sq->sq);
550 2777 av_frame_free(&sq->frame);
551 2777 pthread_mutex_destroy(&sq->lock);
552 2777 av_freep(&sq->enc_idx);
553 }
554 6817 av_freep(&sch->sq_enc);
555
556
2/2
✓ Branch 0 taken 6439 times.
✓ Branch 1 taken 6817 times.
13256 for (unsigned i = 0; i < sch->nb_filters; i++) {
557 6439 SchFilterGraph *fg = &sch->filters[i];
558
559 6439 tq_free(&fg->queue);
560
561 6439 av_freep(&fg->inputs);
562 6439 av_freep(&fg->outputs);
563
564 6439 waiter_uninit(&fg->waiter);
565 }
566 6817 av_freep(&sch->filters);
567
568 6817 av_freep(&sch->sdp_filename);
569
570 6817 pthread_mutex_destroy(&sch->schedule_lock);
571
572 6817 pthread_mutex_destroy(&sch->mux_ready_lock);
573
574 6817 pthread_mutex_destroy(&sch->mux_done_lock);
575 6817 pthread_cond_destroy(&sch->mux_done_cond);
576
577 6817 av_freep(psch);
578 }
579
580 static const AVClass scheduler_class = {
581 .class_name = "Scheduler",
582 .version = LIBAVUTIL_VERSION_INT,
583 };
584
585 6817 Scheduler *sch_alloc(void)
586 {
587 Scheduler *sch;
588 int ret;
589
590 6817 sch = av_mallocz(sizeof(*sch));
591
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6817 times.
6817 if (!sch)
592 return NULL;
593
594 6817 sch->class = &scheduler_class;
595 6817 sch->sdp_auto = 1;
596
597 6817 ret = pthread_mutex_init(&sch->schedule_lock, NULL);
598
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6817 times.
6817 if (ret)
599 goto fail;
600
601 6817 ret = pthread_mutex_init(&sch->mux_ready_lock, NULL);
602
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6817 times.
6817 if (ret)
603 goto fail;
604
605 6817 ret = pthread_mutex_init(&sch->mux_done_lock, NULL);
606
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6817 times.
6817 if (ret)
607 goto fail;
608
609 6817 ret = pthread_cond_init(&sch->mux_done_cond, NULL);
610
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6817 times.
6817 if (ret)
611 goto fail;
612
613 6817 return sch;
614 fail:
615 sch_free(&sch);
616 return NULL;
617 }
618
619 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
620 {
621 av_freep(&sch->sdp_filename);
622 sch->sdp_filename = av_strdup(sdp_filename);
623 return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
624 }
625
626 static const AVClass sch_mux_class = {
627 .class_name = "SchMux",
628 .version = LIBAVUTIL_VERSION_INT,
629 .parent_log_context_offset = offsetof(SchMux, task.func_arg),
630 };
631
632 6818 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
633 void *arg, int sdp_auto, unsigned thread_queue_size)
634 {
635 6818 const unsigned idx = sch->nb_mux;
636
637 SchMux *mux;
638 int ret;
639
640 6818 ret = GROW_ARRAY(sch->mux, sch->nb_mux);
641
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6818 times.
6818 if (ret < 0)
642 return ret;
643
644 6818 mux = &sch->mux[idx];
645 6818 mux->class = &sch_mux_class;
646 6818 mux->init = init;
647 6818 mux->queue_size = thread_queue_size;
648
649 6818 task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
650
651 6818 sch->sdp_auto &= sdp_auto;
652
653 6818 return idx;
654 }
655
656 7258 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
657 {
658 SchMux *mux;
659 SchMuxStream *ms;
660 unsigned stream_idx;
661 int ret;
662
663
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7258 times.
7258 av_assert0(mux_idx < sch->nb_mux);
664 7258 mux = &sch->mux[mux_idx];
665
666 7258 ret = GROW_ARRAY(mux->streams, mux->nb_streams);
667
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7258 times.
7258 if (ret < 0)
668 return ret;
669 7258 stream_idx = mux->nb_streams - 1;
670
671 7258 ms = &mux->streams[stream_idx];
672
673 7258 ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
674
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7258 times.
7258 if (!ms->pre_mux_queue.fifo)
675 return AVERROR(ENOMEM);
676
677 7258 ms->last_dts = AV_NOPTS_VALUE;
678
679 7258 return stream_idx;
680 }
681
682 static const AVClass sch_demux_class = {
683 .class_name = "SchDemux",
684 .version = LIBAVUTIL_VERSION_INT,
685 .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
686 };
687
688 6846 int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
689 {
690 6846 const unsigned idx = sch->nb_demux;
691
692 SchDemux *d;
693 int ret;
694
695 6846 ret = GROW_ARRAY(sch->demux, sch->nb_demux);
696
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6846 times.
6846 if (ret < 0)
697 return ret;
698
699 6846 d = &sch->demux[idx];
700
701 6846 task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
702
703 6846 d->class = &sch_demux_class;
704 6846 d->send_pkt = av_packet_alloc();
705
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6846 times.
6846 if (!d->send_pkt)
706 return AVERROR(ENOMEM);
707
708 6846 ret = waiter_init(&d->waiter);
709
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6846 times.
6846 if (ret < 0)
710 return ret;
711
712 6846 return idx;
713 }
714
715 7072 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
716 {
717 SchDemux *d;
718 int ret;
719
720
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7072 times.
7072 av_assert0(demux_idx < sch->nb_demux);
721 7072 d = &sch->demux[demux_idx];
722
723 7072 ret = GROW_ARRAY(d->streams, d->nb_streams);
724
1/2
✓ Branch 0 taken 7072 times.
✗ Branch 1 not taken.
7072 return ret < 0 ? ret : d->nb_streams - 1;
725 }
726
727 6442 int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
728 {
729 SchDec *dec;
730 int ret;
731
732
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6442 times.
6442 av_assert0(dec_idx < sch->nb_dec);
733 6442 dec = &sch->dec[dec_idx];
734
735 6442 ret = GROW_ARRAY(dec->outputs, dec->nb_outputs);
736
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6442 times.
6442 if (ret < 0)
737 return ret;
738
739 6442 return dec->nb_outputs - 1;
740 }
741
742 static const AVClass sch_dec_class = {
743 .class_name = "SchDec",
744 .version = LIBAVUTIL_VERSION_INT,
745 .parent_log_context_offset = offsetof(SchDec, task.func_arg),
746 };
747
748 6436 int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
749 {
750 6436 const unsigned idx = sch->nb_dec;
751
752 SchDec *dec;
753 int ret;
754
755 6436 ret = GROW_ARRAY(sch->dec, sch->nb_dec);
756
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6436 times.
6436 if (ret < 0)
757 return ret;
758
759 6436 dec = &sch->dec[idx];
760
761 6436 task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
762
763 6436 dec->class = &sch_dec_class;
764 6436 dec->send_frame = av_frame_alloc();
765
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6436 times.
6436 if (!dec->send_frame)
766 return AVERROR(ENOMEM);
767
768 6436 ret = sch_add_dec_output(sch, idx);
769
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6436 times.
6436 if (ret < 0)
770 return ret;
771
772 6436 ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
773
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6436 times.
6436 if (ret < 0)
774 return ret;
775
776
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6435 times.
6436 if (send_end_ts) {
777 1 ret = av_thread_message_queue_alloc(&dec->queue_end_ts, 1, sizeof(Timestamp));
778
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ret < 0)
779 return ret;
780 }
781
782 6436 return idx;
783 }
784
785 static const AVClass sch_enc_class = {
786 .class_name = "SchEnc",
787 .version = LIBAVUTIL_VERSION_INT,
788 .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
789 };
790
791 6598 int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
792 int (*open_cb)(void *opaque, const AVFrame *frame))
793 {
794 6598 const unsigned idx = sch->nb_enc;
795
796 SchEnc *enc;
797 int ret;
798
799 6598 ret = GROW_ARRAY(sch->enc, sch->nb_enc);
800
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6598 times.
6598 if (ret < 0)
801 return ret;
802
803 6598 enc = &sch->enc[idx];
804
805 6598 enc->class = &sch_enc_class;
806 6598 enc->open_cb = open_cb;
807 6598 enc->sq_idx[0] = -1;
808 6598 enc->sq_idx[1] = -1;
809
810 6598 task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
811
812 6598 enc->send_pkt = av_packet_alloc();
813
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6598 times.
6598 if (!enc->send_pkt)
814 return AVERROR(ENOMEM);
815
816 6598 ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
817
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6598 times.
6598 if (ret < 0)
818 return ret;
819
820 6598 return idx;
821 }
822
823 static const AVClass sch_fg_class = {
824 .class_name = "SchFilterGraph",
825 .version = LIBAVUTIL_VERSION_INT,
826 .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
827 };
828
829 6439 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
830 SchThreadFunc func, void *ctx)
831 {
832 6439 const unsigned idx = sch->nb_filters;
833
834 SchFilterGraph *fg;
835 int ret;
836
837 6439 ret = GROW_ARRAY(sch->filters, sch->nb_filters);
838
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6439 times.
6439 if (ret < 0)
839 return ret;
840 6439 fg = &sch->filters[idx];
841
842 6439 fg->class = &sch_fg_class;
843
844 6439 task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
845
846
2/2
✓ Branch 0 taken 6380 times.
✓ Branch 1 taken 59 times.
6439 if (nb_inputs) {
847 6380 fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
848
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6380 times.
6380 if (!fg->inputs)
849 return AVERROR(ENOMEM);
850 6380 fg->nb_inputs = nb_inputs;
851 }
852
853
1/2
✓ Branch 0 taken 6439 times.
✗ Branch 1 not taken.
6439 if (nb_outputs) {
854 6439 fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
855
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6439 times.
6439 if (!fg->outputs)
856 return AVERROR(ENOMEM);
857 6439 fg->nb_outputs = nb_outputs;
858 }
859
860 6439 ret = waiter_init(&fg->waiter);
861
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6439 times.
6439 if (ret < 0)
862 return ret;
863
864 6439 ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
865
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6439 times.
6439 if (ret < 0)
866 return ret;
867
868 6439 return idx;
869 }
870
871 2777 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
872 {
873 SchSyncQueue *sq;
874 int ret;
875
876 2777 ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
877
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2777 times.
2777 if (ret < 0)
878 return ret;
879 2777 sq = &sch->sq_enc[sch->nb_sq_enc - 1];
880
881 2777 sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
882
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2777 times.
2777 if (!sq->sq)
883 return AVERROR(ENOMEM);
884
885 2777 sq->frame = av_frame_alloc();
886
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2777 times.
2777 if (!sq->frame)
887 return AVERROR(ENOMEM);
888
889 2777 ret = pthread_mutex_init(&sq->lock, NULL);
890
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2777 times.
2777 if (ret)
891 return AVERROR(ret);
892
893 2777 return sq - sch->sq_enc;
894 }
895
896 2830 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
897 int limiting, uint64_t max_frames)
898 {
899 SchSyncQueue *sq;
900 SchEnc *enc;
901 int ret;
902
903
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2830 times.
2830 av_assert0(sq_idx < sch->nb_sq_enc);
904 2830 sq = &sch->sq_enc[sq_idx];
905
906
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2830 times.
2830 av_assert0(enc_idx < sch->nb_enc);
907 2830 enc = &sch->enc[enc_idx];
908
909 2830 ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
910
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2830 times.
2830 if (ret < 0)
911 return ret;
912 2830 sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
913
914 2830 ret = sq_add_stream(sq->sq, limiting);
915
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2830 times.
2830 if (ret < 0)
916 return ret;
917
918 2830 enc->sq_idx[0] = sq_idx;
919 2830 enc->sq_idx[1] = ret;
920
921
2/2
✓ Branch 0 taken 2651 times.
✓ Branch 1 taken 179 times.
2830 if (max_frames != INT64_MAX)
922 2651 sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
923
924 2830 return 0;
925 }
926
927 26748 int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
928 {
929 int ret;
930
931
4/5
✓ Branch 0 taken 7095 times.
✓ Branch 1 taken 6494 times.
✓ Branch 2 taken 6560 times.
✓ Branch 3 taken 6599 times.
✗ Branch 4 not taken.
26748 switch (src.type) {
932 7095 case SCH_NODE_TYPE_DEMUX: {
933 SchDemuxStream *ds;
934
935
2/4
✓ Branch 0 taken 7095 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7095 times.
7095 av_assert0(src.idx < sch->nb_demux &&
936 src.idx_stream < sch->demux[src.idx].nb_streams);
937 7095 ds = &sch->demux[src.idx].streams[src.idx_stream];
938
939 7095 ret = GROW_ARRAY(ds->dst, ds->nb_dst);
940
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7095 times.
7095 if (ret < 0)
941 return ret;
942
943 7095 ds->dst[ds->nb_dst - 1] = dst;
944
945 // demuxed packets go to decoding or streamcopy
946
2/3
✓ Branch 0 taken 6435 times.
✓ Branch 1 taken 660 times.
✗ Branch 2 not taken.
7095 switch (dst.type) {
947 6435 case SCH_NODE_TYPE_DEC: {
948 SchDec *dec;
949
950
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6435 times.
6435 av_assert0(dst.idx < sch->nb_dec);
951 6435 dec = &sch->dec[dst.idx];
952
953
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6435 times.
6435 av_assert0(!dec->src.type);
954 6435 dec->src = src;
955 6435 break;
956 }
957 660 case SCH_NODE_TYPE_MUX: {
958 SchMuxStream *ms;
959
960
2/4
✓ Branch 0 taken 660 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 660 times.
660 av_assert0(dst.idx < sch->nb_mux &&
961 dst.idx_stream < sch->mux[dst.idx].nb_streams);
962 660 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
963
964
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 660 times.
660 av_assert0(!ms->src.type);
965 660 ms->src = src;
966
967 660 break;
968 }
969 default: av_assert0(0);
970 }
971
972 7095 break;
973 }
974 6494 case SCH_NODE_TYPE_DEC: {
975 SchDec *dec;
976 SchDecOutput *o;
977
978
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6494 times.
6494 av_assert0(src.idx < sch->nb_dec);
979 6494 dec = &sch->dec[src.idx];
980
981
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6494 times.
6494 av_assert0(src.idx_stream < dec->nb_outputs);
982 6494 o = &dec->outputs[src.idx_stream];
983
984 6494 ret = GROW_ARRAY(o->dst, o->nb_dst);
985
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6494 times.
6494 if (ret < 0)
986 return ret;
987
988 6494 o->dst[o->nb_dst - 1] = dst;
989
990 // decoded frames go to filters or encoding
991
2/3
✓ Branch 0 taken 6456 times.
✓ Branch 1 taken 38 times.
✗ Branch 2 not taken.
6494 switch (dst.type) {
992 6456 case SCH_NODE_TYPE_FILTER_IN: {
993 SchFilterIn *fi;
994
995
2/4
✓ Branch 0 taken 6456 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6456 times.
6456 av_assert0(dst.idx < sch->nb_filters &&
996 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
997 6456 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
998
999
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6456 times.
6456 av_assert0(!fi->src.type);
1000 6456 fi->src = src;
1001 6456 break;
1002 }
1003 38 case SCH_NODE_TYPE_ENC: {
1004 SchEnc *enc;
1005
1006
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 av_assert0(dst.idx < sch->nb_enc);
1007 38 enc = &sch->enc[dst.idx];
1008
1009
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 av_assert0(!enc->src.type);
1010 38 enc->src = src;
1011 38 break;
1012 }
1013 default: av_assert0(0);
1014 }
1015
1016 6494 break;
1017 }
1018 6560 case SCH_NODE_TYPE_FILTER_OUT: {
1019 SchFilterOut *fo;
1020
1021
2/4
✓ Branch 0 taken 6560 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6560 times.
6560 av_assert0(src.idx < sch->nb_filters &&
1022 src.idx_stream < sch->filters[src.idx].nb_outputs);
1023 6560 fo = &sch->filters[src.idx].outputs[src.idx_stream];
1024
1025
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6560 times.
6560 av_assert0(!fo->dst.type);
1026 6560 fo->dst = dst;
1027
1028 // filtered frames go to encoding or another filtergraph
1029
1/3
✓ Branch 0 taken 6560 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
6560 switch (dst.type) {
1030 6560 case SCH_NODE_TYPE_ENC: {
1031 SchEnc *enc;
1032
1033
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6560 times.
6560 av_assert0(dst.idx < sch->nb_enc);
1034 6560 enc = &sch->enc[dst.idx];
1035
1036
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6560 times.
6560 av_assert0(!enc->src.type);
1037 6560 enc->src = src;
1038 6560 break;
1039 }
1040 case SCH_NODE_TYPE_FILTER_IN: {
1041 SchFilterIn *fi;
1042
1043 av_assert0(dst.idx < sch->nb_filters &&
1044 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1045 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1046
1047 av_assert0(!fi->src.type);
1048 fi->src = src;
1049 break;
1050 }
1051 default: av_assert0(0);
1052 }
1053
1054
1055 6560 break;
1056 }
1057 6599 case SCH_NODE_TYPE_ENC: {
1058 SchEnc *enc;
1059
1060
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6599 times.
6599 av_assert0(src.idx < sch->nb_enc);
1061 6599 enc = &sch->enc[src.idx];
1062
1063 6599 ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1064
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6599 times.
6599 if (ret < 0)
1065 return ret;
1066
1067 6599 enc->dst[enc->nb_dst - 1] = dst;
1068
1069 // encoding packets go to muxing or decoding
1070
2/3
✓ Branch 0 taken 6598 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
6599 switch (dst.type) {
1071 6598 case SCH_NODE_TYPE_MUX: {
1072 SchMuxStream *ms;
1073
1074
2/4
✓ Branch 0 taken 6598 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6598 times.
6598 av_assert0(dst.idx < sch->nb_mux &&
1075 dst.idx_stream < sch->mux[dst.idx].nb_streams);
1076 6598 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1077
1078
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6598 times.
6598 av_assert0(!ms->src.type);
1079 6598 ms->src = src;
1080
1081 6598 break;
1082 }
1083 1 case SCH_NODE_TYPE_DEC: {
1084 SchDec *dec;
1085
1086
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(dst.idx < sch->nb_dec);
1087 1 dec = &sch->dec[dst.idx];
1088
1089
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(!dec->src.type);
1090 1 dec->src = src;
1091
1092 1 break;
1093 }
1094 default: av_assert0(0);
1095 }
1096
1097 6599 break;
1098 }
1099 default: av_assert0(0);
1100 }
1101
1102 26748 return 0;
1103 }
1104
1105 6818 static int mux_task_start(SchMux *mux)
1106 {
1107 6818 int ret = 0;
1108
1109 6818 ret = task_start(&mux->task);
1110
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6818 times.
6818 if (ret < 0)
1111 return ret;
1112
1113 /* flush the pre-muxing queues */
1114
2/2
✓ Branch 0 taken 7258 times.
✓ Branch 1 taken 6818 times.
14076 for (unsigned i = 0; i < mux->nb_streams; i++) {
1115 7258 SchMuxStream *ms = &mux->streams[i];
1116 AVPacket *pkt;
1117
1118
2/2
✓ Branch 1 taken 3414 times.
✓ Branch 2 taken 7258 times.
10672 while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0) {
1119
2/2
✓ Branch 0 taken 3312 times.
✓ Branch 1 taken 102 times.
3414 if (pkt) {
1120
2/2
✓ Branch 0 taken 3297 times.
✓ Branch 1 taken 15 times.
3312 if (!ms->init_eof)
1121 3297 ret = tq_send(mux->queue, i, pkt);
1122 3312 av_packet_free(&pkt);
1123
2/2
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 3296 times.
3312 if (ret == AVERROR_EOF)
1124 16 ms->init_eof = 1;
1125
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3296 times.
3296 else if (ret < 0)
1126 return ret;
1127 } else
1128 102 tq_send_finish(mux->queue, i);
1129 }
1130 }
1131
1132 6818 atomic_store(&mux->mux_started, 1);
1133
1134 6818 return 0;
1135 }
1136
1137 int print_sdp(const char *filename);
1138
1139 6818 static int mux_init(Scheduler *sch, SchMux *mux)
1140 {
1141 int ret;
1142
1143 6818 ret = mux->init(mux->task.func_arg);
1144
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6818 times.
6818 if (ret < 0)
1145 return ret;
1146
1147 6818 sch->nb_mux_ready++;
1148
1149
2/4
✓ Branch 0 taken 6818 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6818 times.
6818 if (sch->sdp_filename || sch->sdp_auto) {
1150 if (sch->nb_mux_ready < sch->nb_mux)
1151 return 0;
1152
1153 ret = print_sdp(sch->sdp_filename);
1154 if (ret < 0) {
1155 av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1156 return ret;
1157 }
1158
1159 /* SDP is written only after all the muxers are ready, so now we
1160 * start ALL the threads */
1161 for (unsigned i = 0; i < sch->nb_mux; i++) {
1162 ret = mux_task_start(&sch->mux[i]);
1163 if (ret < 0)
1164 return ret;
1165 }
1166 } else {
1167 6818 ret = mux_task_start(mux);
1168
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6818 times.
6818 if (ret < 0)
1169 return ret;
1170 }
1171
1172 6818 return 0;
1173 }
1174
1175 7258 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1176 size_t data_threshold, int max_packets)
1177 {
1178 SchMux *mux;
1179 SchMuxStream *ms;
1180
1181
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7258 times.
7258 av_assert0(mux_idx < sch->nb_mux);
1182 7258 mux = &sch->mux[mux_idx];
1183
1184
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7258 times.
7258 av_assert0(stream_idx < mux->nb_streams);
1185 7258 ms = &mux->streams[stream_idx];
1186
1187 7258 ms->pre_mux_queue.max_packets = max_packets;
1188 7258 ms->pre_mux_queue.data_threshold = data_threshold;
1189 7258 }
1190
1191 7258 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1192 {
1193 SchMux *mux;
1194 7258 int ret = 0;
1195
1196
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7258 times.
7258 av_assert0(mux_idx < sch->nb_mux);
1197 7258 mux = &sch->mux[mux_idx];
1198
1199
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7258 times.
7258 av_assert0(stream_idx < mux->nb_streams);
1200
1201 7258 pthread_mutex_lock(&sch->mux_ready_lock);
1202
1203
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7258 times.
7258 av_assert0(mux->nb_streams_ready < mux->nb_streams);
1204
1205 // this may be called during initialization - do not start
1206 // threads before sch_start() is called
1207
2/2
✓ Branch 0 taken 6818 times.
✓ Branch 1 taken 440 times.
7258 if (++mux->nb_streams_ready == mux->nb_streams &&
1208
2/2
✓ Branch 0 taken 6364 times.
✓ Branch 1 taken 454 times.
6818 sch->state >= SCH_STATE_STARTED)
1209 6364 ret = mux_init(sch, mux);
1210
1211 7258 pthread_mutex_unlock(&sch->mux_ready_lock);
1212
1213 7258 return ret;
1214 }
1215
1216 1 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1217 unsigned dec_idx)
1218 {
1219 SchMux *mux;
1220 SchMuxStream *ms;
1221 1 int ret = 0;
1222
1223
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(mux_idx < sch->nb_mux);
1224 1 mux = &sch->mux[mux_idx];
1225
1226
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(stream_idx < mux->nb_streams);
1227 1 ms = &mux->streams[stream_idx];
1228
1229 1 ret = GROW_ARRAY(ms->sub_heartbeat_dst, ms->nb_sub_heartbeat_dst);
1230
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ret < 0)
1231 return ret;
1232
1233
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(dec_idx < sch->nb_dec);
1234 1 ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1235
1236
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!mux->sub_heartbeat_pkt) {
1237 1 mux->sub_heartbeat_pkt = av_packet_alloc();
1238
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!mux->sub_heartbeat_pkt)
1239 return AVERROR(ENOMEM);
1240 }
1241
1242 1 return 0;
1243 }
1244
1245 497336 static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
1246 {
1247 426136 while (1) {
1248 SchFilterGraph *fg;
1249
1250 // fed directly by a demuxer (i.e. not through a filtergraph)
1251
2/2
✓ Branch 0 taken 492960 times.
✓ Branch 1 taken 430512 times.
923472 if (src.type == SCH_NODE_TYPE_DEMUX) {
1252 492960 sch->demux[src.idx].waiter.choked_next = 0;
1253 492960 return;
1254 }
1255
1256
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 430512 times.
430512 av_assert0(src.type == SCH_NODE_TYPE_FILTER_OUT);
1257 430512 fg = &sch->filters[src.idx];
1258
1259 // the filtergraph contains internal sources and
1260 // requested to be scheduled directly
1261
2/2
✓ Branch 0 taken 4376 times.
✓ Branch 1 taken 426136 times.
430512 if (fg->best_input == fg->nb_inputs) {
1262 4376 fg->waiter.choked_next = 0;
1263 4376 return;
1264 }
1265
1266 426136 src = fg->inputs[fg->best_input].src_sched;
1267 }
1268 }
1269
1270 505389 static void schedule_update_locked(Scheduler *sch)
1271 {
1272 int64_t dts;
1273 505389 int have_unchoked = 0;
1274
1275 // on termination request all waiters are choked,
1276 // we are not to unchoke them
1277
2/2
✓ Branch 0 taken 3172 times.
✓ Branch 1 taken 502217 times.
505389 if (atomic_load(&sch->terminate))
1278 3172 return;
1279
1280 502217 dts = trailing_dts(sch, 0);
1281
1282 502217 atomic_store(&sch->last_dts, dts);
1283
1284 // initialize our internal state
1285
2/2
✓ Branch 0 taken 1004434 times.
✓ Branch 1 taken 502217 times.
1506651 for (unsigned type = 0; type < 2; type++)
1286
4/4
✓ Branch 0 taken 954386 times.
✓ Branch 1 taken 1008045 times.
✓ Branch 2 taken 957997 times.
✓ Branch 3 taken 1004434 times.
1962431 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1287
2/2
✓ Branch 0 taken 452169 times.
✓ Branch 1 taken 505828 times.
957997 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1288 957997 w->choked_prev = atomic_load(&w->choked);
1289 957997 w->choked_next = 1;
1290 }
1291
1292 // figure out the sources that are allowed to proceed
1293
2/2
✓ Branch 0 taken 502484 times.
✓ Branch 1 taken 502217 times.
1004701 for (unsigned i = 0; i < sch->nb_mux; i++) {
1294 502484 SchMux *mux = &sch->mux[i];
1295
1296
2/2
✓ Branch 0 taken 557241 times.
✓ Branch 1 taken 502484 times.
1059725 for (unsigned j = 0; j < mux->nb_streams; j++) {
1297 557241 SchMuxStream *ms = &mux->streams[j];
1298
1299 // unblock sources for output streams that are not finished
1300 // and not too far ahead of the trailing stream
1301
2/2
✓ Branch 0 taken 33899 times.
✓ Branch 1 taken 523342 times.
557241 if (ms->source_finished)
1302 33899 continue;
1303
4/4
✓ Branch 0 taken 28583 times.
✓ Branch 1 taken 494759 times.
✓ Branch 2 taken 9850 times.
✓ Branch 3 taken 18733 times.
523342 if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1304 9850 continue;
1305
4/4
✓ Branch 0 taken 494759 times.
✓ Branch 1 taken 18733 times.
✓ Branch 2 taken 16156 times.
✓ Branch 3 taken 478603 times.
513492 if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1306 16156 continue;
1307
1308 // resolve the source to unchoke
1309 497336 unchoke_for_stream(sch, ms->src_sched);
1310 497336 have_unchoked = 1;
1311 }
1312 }
1313
1314 // make sure to unchoke at least one source, if still available
1315
4/4
✓ Branch 0 taken 55684 times.
✓ Branch 1 taken 489091 times.
✓ Branch 2 taken 42558 times.
✓ Branch 3 taken 13126 times.
544775 for (unsigned type = 0; !have_unchoked && type < 2; type++)
1316
4/4
✓ Branch 0 taken 29960 times.
✓ Branch 1 taken 42809 times.
✓ Branch 2 taken 42184 times.
✓ Branch 3 taken 30585 times.
72769 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1317
2/2
✓ Branch 0 taken 16834 times.
✓ Branch 1 taken 25350 times.
42184 int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1318
2/2
✓ Branch 0 taken 16834 times.
✓ Branch 1 taken 25350 times.
42184 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1319
2/2
✓ Branch 0 taken 11973 times.
✓ Branch 1 taken 30211 times.
42184 if (!exited) {
1320 11973 w->choked_next = 0;
1321 11973 have_unchoked = 1;
1322 11973 break;
1323 }
1324 }
1325
1326
1327
2/2
✓ Branch 0 taken 1004434 times.
✓ Branch 1 taken 502217 times.
1506651 for (unsigned type = 0; type < 2; type++)
1328
4/4
✓ Branch 0 taken 954386 times.
✓ Branch 1 taken 1008045 times.
✓ Branch 2 taken 957997 times.
✓ Branch 3 taken 1004434 times.
1962431 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1329
2/2
✓ Branch 0 taken 452169 times.
✓ Branch 1 taken 505828 times.
957997 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1330
2/2
✓ Branch 0 taken 17335 times.
✓ Branch 1 taken 940662 times.
957997 if (w->choked_prev != w->choked_next)
1331 17335 waiter_set(w, w->choked_next);
1332 }
1333
1334 }
1335
1336 enum {
1337 CYCLE_NODE_NEW = 0,
1338 CYCLE_NODE_STARTED,
1339 CYCLE_NODE_DONE,
1340 };
1341
1342 static int
1343 6439 check_acyclic_for_output(const Scheduler *sch, SchedulerNode src,
1344 uint8_t *filters_visited, SchedulerNode *filters_stack)
1345 {
1346 6439 unsigned nb_filters_stack = 0;
1347
1348 6439 memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1349
1350 6458 while (1) {
1351 12897 const SchFilterGraph *fg = &sch->filters[src.idx];
1352
1353 12897 filters_visited[src.idx] = CYCLE_NODE_STARTED;
1354
1355 // descend into every input, depth first
1356
2/2
✓ Branch 0 taken 6457 times.
✓ Branch 1 taken 6440 times.
12897 if (src.idx_stream < fg->nb_inputs) {
1357 6457 const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1358
1359 // connected to demuxer, no cycles possible
1360
2/2
✓ Branch 0 taken 6456 times.
✓ Branch 1 taken 1 times.
6457 if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX)
1361 6457 continue;
1362
1363 // otherwise connected to another filtergraph
1364
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(fi->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
1365
1366 // found a cycle
1367
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED)
1368 return AVERROR(EINVAL);
1369
1370 // place current position on stack and descend
1371
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(nb_filters_stack < sch->nb_filters);
1372 1 filters_stack[nb_filters_stack++] = src;
1373 1 src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 };
1374 1 continue;
1375 }
1376
1377 6440 filters_visited[src.idx] = CYCLE_NODE_DONE;
1378
1379 // previous search finished,
1380
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6439 times.
6440 if (nb_filters_stack) {
1381 1 src = filters_stack[--nb_filters_stack];
1382 1 continue;
1383 }
1384 6439 return 0;
1385 }
1386 }
1387
1388 6816 static int check_acyclic(Scheduler *sch)
1389 {
1390 6816 uint8_t *filters_visited = NULL;
1391 6816 SchedulerNode *filters_stack = NULL;
1392
1393 6816 int ret = 0;
1394
1395
2/2
✓ Branch 0 taken 489 times.
✓ Branch 1 taken 6327 times.
6816 if (!sch->nb_filters)
1396 489 return 0;
1397
1398 6327 filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1399
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6327 times.
6327 if (!filters_visited)
1400 return AVERROR(ENOMEM);
1401
1402 6327 filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1403
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6327 times.
6327 if (!filters_stack) {
1404 ret = AVERROR(ENOMEM);
1405 goto fail;
1406 }
1407
1408 // trace the transcoding graph upstream from every filtegraph
1409
2/2
✓ Branch 0 taken 6439 times.
✓ Branch 1 taken 6327 times.
12766 for (unsigned i = 0; i < sch->nb_filters; i++) {
1410 6439 ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1411 filters_visited, filters_stack);
1412
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6439 times.
6439 if (ret < 0) {
1413 av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1414 goto fail;
1415 }
1416 }
1417
1418 6327 fail:
1419 6327 av_freep(&filters_visited);
1420 6327 av_freep(&filters_stack);
1421 6327 return ret;
1422 }
1423
1424 6816 static int start_prepare(Scheduler *sch)
1425 {
1426 int ret;
1427
1428
2/2
✓ Branch 0 taken 6846 times.
✓ Branch 1 taken 6816 times.
13662 for (unsigned i = 0; i < sch->nb_demux; i++) {
1429 6846 SchDemux *d = &sch->demux[i];
1430
1431
2/2
✓ Branch 0 taken 7072 times.
✓ Branch 1 taken 6846 times.
13918 for (unsigned j = 0; j < d->nb_streams; j++) {
1432 7072 SchDemuxStream *ds = &d->streams[j];
1433
1434
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7072 times.
7072 if (!ds->nb_dst) {
1435 av_log(d, AV_LOG_ERROR,
1436 "Demuxer stream %u not connected to any sink\n", j);
1437 return AVERROR(EINVAL);
1438 }
1439
1440 7072 ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1441
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7072 times.
7072 if (!ds->dst_finished)
1442 return AVERROR(ENOMEM);
1443 }
1444 }
1445
1446
2/2
✓ Branch 0 taken 6436 times.
✓ Branch 1 taken 6816 times.
13252 for (unsigned i = 0; i < sch->nb_dec; i++) {
1447 6436 SchDec *dec = &sch->dec[i];
1448
1449
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6436 times.
6436 if (!dec->src.type) {
1450 av_log(dec, AV_LOG_ERROR,
1451 "Decoder not connected to a source\n");
1452 return AVERROR(EINVAL);
1453 }
1454
1455
2/2
✓ Branch 0 taken 6442 times.
✓ Branch 1 taken 6436 times.
12878 for (unsigned j = 0; j < dec->nb_outputs; j++) {
1456 6442 SchDecOutput *o = &dec->outputs[j];
1457
1458
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6442 times.
6442 if (!o->nb_dst) {
1459 av_log(dec, AV_LOG_ERROR,
1460 "Decoder output %u not connected to any sink\n", j);
1461 return AVERROR(EINVAL);
1462 }
1463
1464 6442 o->dst_finished = av_calloc(o->nb_dst, sizeof(*o->dst_finished));
1465
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6442 times.
6442 if (!o->dst_finished)
1466 return AVERROR(ENOMEM);
1467 }
1468 }
1469
1470
2/2
✓ Branch 0 taken 6598 times.
✓ Branch 1 taken 6816 times.
13414 for (unsigned i = 0; i < sch->nb_enc; i++) {
1471 6598 SchEnc *enc = &sch->enc[i];
1472
1473
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6598 times.
6598 if (!enc->src.type) {
1474 av_log(enc, AV_LOG_ERROR,
1475 "Encoder not connected to a source\n");
1476 return AVERROR(EINVAL);
1477 }
1478
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6598 times.
6598 if (!enc->nb_dst) {
1479 av_log(enc, AV_LOG_ERROR,
1480 "Encoder not connected to any sink\n");
1481 return AVERROR(EINVAL);
1482 }
1483
1484 6598 enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1485
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6598 times.
6598 if (!enc->dst_finished)
1486 return AVERROR(ENOMEM);
1487 }
1488
1489
2/2
✓ Branch 0 taken 6818 times.
✓ Branch 1 taken 6816 times.
13634 for (unsigned i = 0; i < sch->nb_mux; i++) {
1490 6818 SchMux *mux = &sch->mux[i];
1491
1492
2/2
✓ Branch 0 taken 7258 times.
✓ Branch 1 taken 6818 times.
14076 for (unsigned j = 0; j < mux->nb_streams; j++) {
1493 7258 SchMuxStream *ms = &mux->streams[j];
1494
1495
2/3
✓ Branch 0 taken 6598 times.
✓ Branch 1 taken 660 times.
✗ Branch 2 not taken.
7258 switch (ms->src.type) {
1496 6598 case SCH_NODE_TYPE_ENC: {
1497 6598 SchEnc *enc = &sch->enc[ms->src.idx];
1498
2/2
✓ Branch 0 taken 38 times.
✓ Branch 1 taken 6560 times.
6598 if (enc->src.type == SCH_NODE_TYPE_DEC) {
1499 38 ms->src_sched = sch->dec[enc->src.idx].src;
1500
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 av_assert0(ms->src_sched.type == SCH_NODE_TYPE_DEMUX);
1501 } else {
1502 6560 ms->src_sched = enc->src;
1503
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6560 times.
6560 av_assert0(ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
1504 }
1505 6598 break;
1506 }
1507 660 case SCH_NODE_TYPE_DEMUX:
1508 660 ms->src_sched = ms->src;
1509 660 break;
1510 default:
1511 av_log(mux, AV_LOG_ERROR,
1512 "Muxer stream #%u not connected to a source\n", j);
1513 return AVERROR(EINVAL);
1514 }
1515 }
1516
1517 6818 ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1518 QUEUE_PACKETS);
1519
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6818 times.
6818 if (ret < 0)
1520 return ret;
1521 }
1522
1523
2/2
✓ Branch 0 taken 6439 times.
✓ Branch 1 taken 6816 times.
13255 for (unsigned i = 0; i < sch->nb_filters; i++) {
1524 6439 SchFilterGraph *fg = &sch->filters[i];
1525
1526
2/2
✓ Branch 0 taken 6456 times.
✓ Branch 1 taken 6439 times.
12895 for (unsigned j = 0; j < fg->nb_inputs; j++) {
1527 6456 SchFilterIn *fi = &fg->inputs[j];
1528 SchDec *dec;
1529
1530
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6456 times.
6456 if (!fi->src.type) {
1531 av_log(fg, AV_LOG_ERROR,
1532 "Filtergraph input %u not connected to a source\n", j);
1533 return AVERROR(EINVAL);
1534 }
1535
1536
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6456 times.
6456 if (fi->src.type == SCH_NODE_TYPE_FILTER_OUT)
1537 fi->src_sched = fi->src;
1538 else {
1539
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6456 times.
6456 av_assert0(fi->src.type == SCH_NODE_TYPE_DEC);
1540 6456 dec = &sch->dec[fi->src.idx];
1541
1542
2/3
✓ Branch 0 taken 6455 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
6456 switch (dec->src.type) {
1543 6455 case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break;
1544 1 case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break;
1545 default: av_assert0(0);
1546 }
1547 }
1548 }
1549
1550
2/2
✓ Branch 0 taken 6560 times.
✓ Branch 1 taken 6439 times.
12999 for (unsigned j = 0; j < fg->nb_outputs; j++) {
1551 6560 SchFilterOut *fo = &fg->outputs[j];
1552
1553
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6560 times.
6560 if (!fo->dst.type) {
1554 av_log(fg, AV_LOG_ERROR,
1555 "Filtergraph %u output %u not connected to a sink\n", i, j);
1556 return AVERROR(EINVAL);
1557 }
1558 }
1559 }
1560
1561 // Check that the transcoding graph has no cycles.
1562 6816 ret = check_acyclic(sch);
1563
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6816 times.
6816 if (ret < 0)
1564 return ret;
1565
1566 6816 return 0;
1567 }
1568
1569 6816 int sch_start(Scheduler *sch)
1570 {
1571 int ret;
1572
1573 6816 ret = start_prepare(sch);
1574
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6816 times.
6816 if (ret < 0)
1575 return ret;
1576
1577
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6816 times.
6816 av_assert0(sch->state == SCH_STATE_UNINIT);
1578 6816 sch->state = SCH_STATE_STARTED;
1579
1580
2/2
✓ Branch 0 taken 6818 times.
✓ Branch 1 taken 6816 times.
13634 for (unsigned i = 0; i < sch->nb_mux; i++) {
1581 6818 SchMux *mux = &sch->mux[i];
1582
1583
2/2
✓ Branch 0 taken 454 times.
✓ Branch 1 taken 6364 times.
6818 if (mux->nb_streams_ready == mux->nb_streams) {
1584 454 ret = mux_init(sch, mux);
1585
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 454 times.
454 if (ret < 0)
1586 goto fail;
1587 }
1588 }
1589
1590
2/2
✓ Branch 0 taken 6598 times.
✓ Branch 1 taken 6816 times.
13414 for (unsigned i = 0; i < sch->nb_enc; i++) {
1591 6598 SchEnc *enc = &sch->enc[i];
1592
1593 6598 ret = task_start(&enc->task);
1594
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6598 times.
6598 if (ret < 0)
1595 goto fail;
1596 }
1597
1598
2/2
✓ Branch 0 taken 6439 times.
✓ Branch 1 taken 6816 times.
13255 for (unsigned i = 0; i < sch->nb_filters; i++) {
1599 6439 SchFilterGraph *fg = &sch->filters[i];
1600
1601 6439 ret = task_start(&fg->task);
1602
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6439 times.
6439 if (ret < 0)
1603 goto fail;
1604 }
1605
1606
2/2
✓ Branch 0 taken 6436 times.
✓ Branch 1 taken 6816 times.
13252 for (unsigned i = 0; i < sch->nb_dec; i++) {
1607 6436 SchDec *dec = &sch->dec[i];
1608
1609 6436 ret = task_start(&dec->task);
1610
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6436 times.
6436 if (ret < 0)
1611 goto fail;
1612 }
1613
1614
2/2
✓ Branch 0 taken 6846 times.
✓ Branch 1 taken 6816 times.
13662 for (unsigned i = 0; i < sch->nb_demux; i++) {
1615 6846 SchDemux *d = &sch->demux[i];
1616
1617
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 6832 times.
6846 if (!d->nb_streams)
1618 14 continue;
1619
1620 6832 ret = task_start(&d->task);
1621
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6832 times.
6832 if (ret < 0)
1622 goto fail;
1623 }
1624
1625 6816 pthread_mutex_lock(&sch->schedule_lock);
1626 6816 schedule_update_locked(sch);
1627 6816 pthread_mutex_unlock(&sch->schedule_lock);
1628
1629 6816 return 0;
1630 fail:
1631 sch_stop(sch, NULL);
1632 return ret;
1633 }
1634
1635 20857 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1636 {
1637 int ret, err;
1638
1639 // convert delay to absolute timestamp
1640 20857 timeout_us += av_gettime();
1641
1642 20857 pthread_mutex_lock(&sch->mux_done_lock);
1643
1644
1/2
✓ Branch 0 taken 20857 times.
✗ Branch 1 not taken.
20857 if (sch->nb_mux_done < sch->nb_mux) {
1645 20857 struct timespec tv = { .tv_sec = timeout_us / 1000000,
1646 20857 .tv_nsec = (timeout_us % 1000000) * 1000 };
1647 20857 pthread_cond_timedwait(&sch->mux_done_cond, &sch->mux_done_lock, &tv);
1648 }
1649
1650 20857 ret = sch->nb_mux_done == sch->nb_mux;
1651
1652 20857 pthread_mutex_unlock(&sch->mux_done_lock);
1653
1654 20857 *transcode_ts = atomic_load(&sch->last_dts);
1655
1656 // abort transcoding if any task failed
1657 20857 err = atomic_load(&sch->task_failed);
1658
1659
3/4
✓ Branch 0 taken 14041 times.
✓ Branch 1 taken 6816 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 14041 times.
20857 return ret || err;
1660 }
1661
1662 6560 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1663 {
1664 int ret;
1665
1666 6560 ret = enc->open_cb(enc->task.func_arg, frame);
1667
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6560 times.
6560 if (ret < 0)
1668 return ret;
1669
1670 // ret>0 signals audio frame size, which means sync queue must
1671 // have been enabled during encoder creation
1672
2/2
✓ Branch 0 taken 163 times.
✓ Branch 1 taken 6397 times.
6560 if (ret > 0) {
1673 SchSyncQueue *sq;
1674
1675
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 163 times.
163 av_assert0(enc->sq_idx[0] >= 0);
1676 163 sq = &sch->sq_enc[enc->sq_idx[0]];
1677
1678 163 pthread_mutex_lock(&sq->lock);
1679
1680 163 sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1681
1682 163 pthread_mutex_unlock(&sq->lock);
1683 }
1684
1685 6560 return 0;
1686 }
1687
1688 428169 static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1689 {
1690 int ret;
1691
1692
2/2
✓ Branch 0 taken 13286 times.
✓ Branch 1 taken 414883 times.
428169 if (!frame) {
1693 13286 tq_send_finish(enc->queue, 0);
1694 13286 return 0;
1695 }
1696
1697
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 414883 times.
414883 if (enc->in_finished)
1698 return AVERROR_EOF;
1699
1700 414883 ret = tq_send(enc->queue, 0, frame);
1701
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 414882 times.
414883 if (ret < 0)
1702 1 enc->in_finished = 1;
1703
1704 414883 return ret;
1705 }
1706
1707 34126 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1708 {
1709 34126 SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1710 34126 int ret = 0;
1711
1712 // inform the scheduling code that no more input will arrive along this path;
1713 // this is necessary because the sync queue may not send an EOF downstream
1714 // until other streams finish
1715 // TODO: consider a cleaner way of passing this information through
1716 // the pipeline
1717
2/2
✓ Branch 0 taken 3009 times.
✓ Branch 1 taken 31117 times.
34126 if (!frame) {
1718
2/2
✓ Branch 0 taken 3009 times.
✓ Branch 1 taken 3009 times.
6018 for (unsigned i = 0; i < enc->nb_dst; i++) {
1719 SchMux *mux;
1720 SchMuxStream *ms;
1721
1722
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3009 times.
3009 if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1723 continue;
1724
1725 3009 mux = &sch->mux[enc->dst[i].idx];
1726 3009 ms = &mux->streams[enc->dst[i].idx_stream];
1727
1728 3009 pthread_mutex_lock(&sch->schedule_lock);
1729
1730 3009 ms->source_finished = 1;
1731 3009 schedule_update_locked(sch);
1732
1733 3009 pthread_mutex_unlock(&sch->schedule_lock);
1734 }
1735 }
1736
1737 34126 pthread_mutex_lock(&sq->lock);
1738
1739 34126 ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1740
2/2
✓ Branch 0 taken 34125 times.
✓ Branch 1 taken 1 times.
34126 if (ret < 0)
1741 1 goto finish;
1742
1743 81961 while (1) {
1744 SchEnc *enc;
1745
1746 // TODO: the SQ API should be extended to allow returning EOF
1747 // for individual streams
1748 116086 ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1749
2/2
✓ Branch 0 taken 34125 times.
✓ Branch 1 taken 81961 times.
116086 if (ret < 0) {
1750
2/2
✓ Branch 0 taken 5577 times.
✓ Branch 1 taken 28548 times.
34125 ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1751 34125 break;
1752 }
1753
1754 81961 enc = &sch->enc[sq->enc_idx[ret]];
1755 81961 ret = send_to_enc_thread(sch, enc, sq->frame);
1756
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 81961 times.
81961 if (ret < 0) {
1757 av_frame_unref(sq->frame);
1758 if (ret != AVERROR_EOF)
1759 break;
1760
1761 sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1762 continue;
1763 }
1764 }
1765
1766
2/2
✓ Branch 0 taken 28548 times.
✓ Branch 1 taken 5577 times.
34125 if (ret < 0) {
1767 // close all encoders fed from this sync queue
1768
2/2
✓ Branch 0 taken 5789 times.
✓ Branch 1 taken 5577 times.
11366 for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1769 5789 int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1770
1771 // if the sync queue error is EOF and closing the encoder
1772 // produces a more serious error, make sure to pick the latter
1773
2/4
✓ Branch 0 taken 5789 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 5789 times.
✗ Branch 3 not taken.
5789 ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1774 }
1775 }
1776
1777 34125 finish:
1778 34126 pthread_mutex_unlock(&sq->lock);
1779
1780 34126 return ret;
1781 }
1782
1783 374548 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1784 {
1785
6/6
✓ Branch 0 taken 373631 times.
✓ Branch 1 taken 917 times.
✓ Branch 2 taken 363163 times.
✓ Branch 3 taken 10468 times.
✓ Branch 4 taken 6560 times.
✓ Branch 5 taken 356603 times.
374548 if (enc->open_cb && frame && !enc->opened) {
1786 6560 int ret = enc_open(sch, enc, frame);
1787
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6560 times.
6560 if (ret < 0)
1788 return ret;
1789 6560 enc->opened = 1;
1790
1791 // discard empty frames that only carry encoder init parameters
1792
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 6557 times.
6560 if (!frame->buf[0]) {
1793 3 av_frame_unref(frame);
1794 3 return 0;
1795 }
1796 }
1797
1798 374545 return (enc->sq_idx[0] >= 0) ?
1799
2/2
✓ Branch 0 taken 34126 times.
✓ Branch 1 taken 340419 times.
714964 send_to_enc_sq (sch, enc, frame) :
1800 340419 send_to_enc_thread(sch, enc, frame);
1801 }
1802
1803 3414 static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
1804 {
1805 3414 PreMuxQueue *q = &ms->pre_mux_queue;
1806 3414 AVPacket *tmp_pkt = NULL;
1807 int ret;
1808
1809
2/2
✓ Branch 1 taken 151 times.
✓ Branch 2 taken 3263 times.
3414 if (!av_fifo_can_write(q->fifo)) {
1810 151 size_t packets = av_fifo_can_read(q->fifo);
1811
1/2
✓ Branch 0 taken 151 times.
✗ Branch 1 not taken.
151 size_t pkt_size = pkt ? pkt->size : 0;
1812 151 int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1813
2/2
✓ Branch 0 taken 146 times.
✓ Branch 1 taken 5 times.
151 size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1814 151 size_t new_size = FFMIN(2 * packets, max_packets);
1815
1816
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 151 times.
151 if (new_size <= packets) {
1817 av_log(mux, AV_LOG_ERROR,
1818 "Too many packets buffered for output stream.\n");
1819 return AVERROR(ENOSPC);
1820 }
1821 151 ret = av_fifo_grow2(q->fifo, new_size - packets);
1822
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 151 times.
151 if (ret < 0)
1823 return ret;
1824 }
1825
1826
2/2
✓ Branch 0 taken 3312 times.
✓ Branch 1 taken 102 times.
3414 if (pkt) {
1827 3312 tmp_pkt = av_packet_alloc();
1828
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3312 times.
3312 if (!tmp_pkt)
1829 return AVERROR(ENOMEM);
1830
1831 3312 av_packet_move_ref(tmp_pkt, pkt);
1832 3312 q->data_size += tmp_pkt->size;
1833 }
1834 3414 av_fifo_write(q->fifo, &tmp_pkt, 1);
1835
1836 3414 return 0;
1837 }
1838
1839 483645 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1840 AVPacket *pkt)
1841 {
1842 483645 SchMuxStream *ms = &mux->streams[stream_idx];
1843
2/2
✓ Branch 0 taken 467180 times.
✓ Branch 1 taken 9207 times.
476387 int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1844
2/2
✓ Branch 0 taken 476387 times.
✓ Branch 1 taken 7258 times.
960032 av_rescale_q(pkt->dts + pkt->duration, pkt->time_base, AV_TIME_BASE_Q) :
1845 AV_NOPTS_VALUE;
1846
1847 // queue the packet if the muxer cannot be started yet
1848
2/2
✓ Branch 0 taken 3448 times.
✓ Branch 1 taken 480197 times.
483645 if (!atomic_load(&mux->mux_started)) {
1849 3448 int queued = 0;
1850
1851 // the muxer could have started between the above atomic check and
1852 // locking the mutex, then this block falls through to normal send path
1853 3448 pthread_mutex_lock(&sch->mux_ready_lock);
1854
1855
2/2
✓ Branch 0 taken 3414 times.
✓ Branch 1 taken 34 times.
3448 if (!atomic_load(&mux->mux_started)) {
1856 3414 int ret = mux_queue_packet(mux, ms, pkt);
1857
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3414 times.
3414 queued = ret < 0 ? ret : 1;
1858 }
1859
1860 3448 pthread_mutex_unlock(&sch->mux_ready_lock);
1861
1862
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3448 times.
3448 if (queued < 0)
1863 return queued;
1864
2/2
✓ Branch 0 taken 3414 times.
✓ Branch 1 taken 34 times.
3448 else if (queued)
1865 3414 goto update_schedule;
1866 }
1867
1868
2/2
✓ Branch 0 taken 473075 times.
✓ Branch 1 taken 7156 times.
480231 if (pkt) {
1869 int ret;
1870
1871
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 473075 times.
473075 if (ms->init_eof)
1872 return AVERROR_EOF;
1873
1874 473075 ret = tq_send(mux->queue, stream_idx, pkt);
1875
2/2
✓ Branch 0 taken 52 times.
✓ Branch 1 taken 473023 times.
473075 if (ret < 0)
1876 52 return ret;
1877 } else
1878 7156 tq_send_finish(mux->queue, stream_idx);
1879
1880 483593 update_schedule:
1881 // TODO: use atomics to check whether this changes trailing dts
1882 // to avoid locking unnecesarily
1883
4/4
✓ Branch 0 taken 16465 times.
✓ Branch 1 taken 467128 times.
✓ Branch 2 taken 7258 times.
✓ Branch 3 taken 9207 times.
483593 if (dts != AV_NOPTS_VALUE || !pkt) {
1884 474386 pthread_mutex_lock(&sch->schedule_lock);
1885
1886
2/2
✓ Branch 0 taken 467128 times.
✓ Branch 1 taken 7258 times.
474386 if (pkt) ms->last_dts = dts;
1887 7258 else ms->source_finished = 1;
1888
1889 474386 schedule_update_locked(sch);
1890
1891 474386 pthread_mutex_unlock(&sch->schedule_lock);
1892 }
1893
1894 483593 return 0;
1895 }
1896
1897 static int
1898 470478 demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst,
1899 uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
1900 {
1901 int ret;
1902
1903
2/2
✓ Branch 0 taken 3118 times.
✓ Branch 1 taken 467360 times.
470478 if (*dst_finished)
1904 3118 return AVERROR_EOF;
1905
1906
4/4
✓ Branch 0 taken 463383 times.
✓ Branch 1 taken 3977 times.
✓ Branch 2 taken 67680 times.
✓ Branch 3 taken 395703 times.
467360 if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
1907
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 67678 times.
67680 (flags & DEMUX_SEND_STREAMCOPY_EOF)) {
1908 2 av_packet_unref(pkt);
1909 2 pkt = NULL;
1910 }
1911
1912
2/2
✓ Branch 0 taken 3979 times.
✓ Branch 1 taken 463381 times.
467360 if (!pkt)
1913 3979 goto finish;
1914
1915 926762 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
1916
2/2
✓ Branch 0 taken 67678 times.
✓ Branch 1 taken 395703 times.
463381 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
1917 395703 tq_send(sch->dec[dst.idx].queue, 0, pkt);
1918
2/2
✓ Branch 0 taken 3116 times.
✓ Branch 1 taken 460265 times.
463381 if (ret == AVERROR_EOF)
1919 3116 goto finish;
1920
1921 460265 return ret;
1922
1923 7095 finish:
1924
2/2
✓ Branch 0 taken 660 times.
✓ Branch 1 taken 6435 times.
7095 if (dst.type == SCH_NODE_TYPE_MUX)
1925 660 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
1926 else
1927 6435 tq_send_finish(sch->dec[dst.idx].queue, 0);
1928
1929 7095 *dst_finished = 1;
1930 7095 return AVERROR_EOF;
1931 }
1932
1933 470015 static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds,
1934 AVPacket *pkt, unsigned flags)
1935 {
1936 470015 unsigned nb_done = 0;
1937
1938
2/2
✓ Branch 0 taken 470478 times.
✓ Branch 1 taken 470015 times.
940493 for (unsigned i = 0; i < ds->nb_dst; i++) {
1939 470478 AVPacket *to_send = pkt;
1940 470478 uint8_t *finished = &ds->dst_finished[i];
1941
1942 int ret;
1943
1944 // sending a packet consumes it, so make a temporary reference if needed
1945
4/4
✓ Branch 0 taken 463383 times.
✓ Branch 1 taken 7095 times.
✓ Branch 2 taken 440 times.
✓ Branch 3 taken 462943 times.
470478 if (pkt && i < ds->nb_dst - 1) {
1946 440 to_send = d->send_pkt;
1947
1948 440 ret = av_packet_ref(to_send, pkt);
1949
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 440 times.
440 if (ret < 0)
1950 return ret;
1951 }
1952
1953 470478 ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
1954
2/2
✓ Branch 0 taken 463383 times.
✓ Branch 1 taken 7095 times.
470478 if (to_send)
1955 463383 av_packet_unref(to_send);
1956
2/2
✓ Branch 0 taken 10213 times.
✓ Branch 1 taken 460265 times.
470478 if (ret == AVERROR_EOF)
1957 10213 nb_done++;
1958
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 460265 times.
460265 else if (ret < 0)
1959 return ret;
1960 }
1961
1962
2/2
✓ Branch 0 taken 10189 times.
✓ Branch 1 taken 459826 times.
470015 return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
1963 }
1964
1965 11 static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
1966 {
1967 11 Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
1968
1969
3/6
✓ Branch 0 taken 11 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 11 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 11 times.
11 av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
1970
1971
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 11 times.
25 for (unsigned i = 0; i < d->nb_streams; i++) {
1972 14 SchDemuxStream *ds = &d->streams[i];
1973
1974
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 14 times.
28 for (unsigned j = 0; j < ds->nb_dst; j++) {
1975 14 const SchedulerNode *dst = &ds->dst[j];
1976 SchDec *dec;
1977 int ret;
1978
1979
3/4
✓ Branch 0 taken 14 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
✓ Branch 3 taken 6 times.
14 if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
1980 8 continue;
1981
1982 6 dec = &sch->dec[dst->idx];
1983
1984 6 ret = tq_send(dec->queue, 0, pkt);
1985
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0)
1986 return ret;
1987
1988
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 3 times.
6 if (dec->queue_end_ts) {
1989 Timestamp ts;
1990 3 ret = av_thread_message_queue_recv(dec->queue_end_ts, &ts, 0);
1991
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret < 0)
1992 return ret;
1993
1994
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (max_end_ts.ts == AV_NOPTS_VALUE ||
1995 (ts.ts != AV_NOPTS_VALUE &&
1996 av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
1997 3 max_end_ts = ts;
1998
1999 }
2000 }
2001 }
2002
2003 11 pkt->pts = max_end_ts.ts;
2004 11 pkt->time_base = max_end_ts.tb;
2005
2006 11 return 0;
2007 }
2008
2009 463011 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
2010 unsigned flags)
2011 {
2012 SchDemux *d;
2013 int terminate;
2014
2015
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 463011 times.
463011 av_assert0(demux_idx < sch->nb_demux);
2016 463011 d = &sch->demux[demux_idx];
2017
2018 463011 terminate = waiter_wait(sch, &d->waiter);
2019
2/2
✓ Branch 0 taken 57 times.
✓ Branch 1 taken 462954 times.
463011 if (terminate)
2020 57 return AVERROR_EXIT;
2021
2022 // flush the downstreams after seek
2023
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 462943 times.
462954 if (pkt->stream_index == -1)
2024 11 return demux_flush(sch, d, pkt);
2025
2026
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 462943 times.
462943 av_assert0(pkt->stream_index < d->nb_streams);
2027
2028 462943 return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
2029 }
2030
2031 6846 static int demux_done(Scheduler *sch, unsigned demux_idx)
2032 {
2033 6846 SchDemux *d = &sch->demux[demux_idx];
2034 6846 int ret = 0;
2035
2036
2/2
✓ Branch 0 taken 7072 times.
✓ Branch 1 taken 6846 times.
13918 for (unsigned i = 0; i < d->nb_streams; i++) {
2037 7072 int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
2038
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7072 times.
7072 if (err != AVERROR_EOF)
2039 ret = err_merge(ret, err);
2040 }
2041
2042 6846 pthread_mutex_lock(&sch->schedule_lock);
2043
2044 6846 d->task_exited = 1;
2045
2046 6846 schedule_update_locked(sch);
2047
2048 6846 pthread_mutex_unlock(&sch->schedule_lock);
2049
2050 6846 return ret;
2051 }
2052
2053 490126 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2054 {
2055 SchMux *mux;
2056 int ret, stream_idx;
2057
2058
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 490126 times.
490126 av_assert0(mux_idx < sch->nb_mux);
2059 490126 mux = &sch->mux[mux_idx];
2060
2061 490126 ret = tq_receive(mux->queue, &stream_idx, pkt);
2062 490126 pkt->stream_index = stream_idx;
2063 490126 return ret;
2064 }
2065
2066 199 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2067 {
2068 SchMux *mux;
2069
2070
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 199 times.
199 av_assert0(mux_idx < sch->nb_mux);
2071 199 mux = &sch->mux[mux_idx];
2072
2073
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 199 times.
199 av_assert0(stream_idx < mux->nb_streams);
2074 199 tq_receive_finish(mux->queue, stream_idx);
2075
2076 199 pthread_mutex_lock(&sch->schedule_lock);
2077 199 mux->streams[stream_idx].source_finished = 1;
2078
2079 199 schedule_update_locked(sch);
2080
2081 199 pthread_mutex_unlock(&sch->schedule_lock);
2082 199 }
2083
2084 438927 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2085 const AVPacket *pkt)
2086 {
2087 SchMux *mux;
2088 SchMuxStream *ms;
2089
2090
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 438927 times.
438927 av_assert0(mux_idx < sch->nb_mux);
2091 438927 mux = &sch->mux[mux_idx];
2092
2093
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 438927 times.
438927 av_assert0(stream_idx < mux->nb_streams);
2094 438927 ms = &mux->streams[stream_idx];
2095
2096
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 438927 times.
438932 for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2097 5 SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2098 int ret;
2099
2100 5 ret = av_packet_copy_props(mux->sub_heartbeat_pkt, pkt);
2101
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
5 if (ret < 0)
2102 return ret;
2103
2104 5 tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2105 }
2106
2107 438927 return 0;
2108 }
2109
2110 6818 static int mux_done(Scheduler *sch, unsigned mux_idx)
2111 {
2112 6818 SchMux *mux = &sch->mux[mux_idx];
2113
2114 6818 pthread_mutex_lock(&sch->schedule_lock);
2115
2116
2/2
✓ Branch 0 taken 7258 times.
✓ Branch 1 taken 6818 times.
14076 for (unsigned i = 0; i < mux->nb_streams; i++) {
2117 7258 tq_receive_finish(mux->queue, i);
2118 7258 mux->streams[i].source_finished = 1;
2119 }
2120
2121 6818 schedule_update_locked(sch);
2122
2123 6818 pthread_mutex_unlock(&sch->schedule_lock);
2124
2125 6818 pthread_mutex_lock(&sch->mux_done_lock);
2126
2127
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6818 times.
6818 av_assert0(sch->nb_mux_done < sch->nb_mux);
2128 6818 sch->nb_mux_done++;
2129
2130 6818 pthread_cond_signal(&sch->mux_done_cond);
2131
2132 6818 pthread_mutex_unlock(&sch->mux_done_lock);
2133
2134 6818 return 0;
2135 }
2136
2137 371392 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2138 {
2139 SchDec *dec;
2140 int ret, dummy;
2141
2142
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 371392 times.
371392 av_assert0(dec_idx < sch->nb_dec);
2143 371392 dec = &sch->dec[dec_idx];
2144
2145 // the decoder should have given us post-flush end timestamp in pkt
2146
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 371389 times.
371392 if (dec->expect_end_ts) {
2147 3 Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2148 3 ret = av_thread_message_queue_send(dec->queue_end_ts, &ts, 0);
2149
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret < 0)
2150 return ret;
2151
2152 3 dec->expect_end_ts = 0;
2153 }
2154
2155 371392 ret = tq_receive(dec->queue, &dummy, pkt);
2156
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 371392 times.
371392 av_assert0(dummy <= 0);
2157
2158 // got a flush packet, on the next call to this function the decoder
2159 // will give us post-flush end timestamp
2160
7/8
✓ Branch 0 taken 368087 times.
✓ Branch 1 taken 3305 times.
✓ Branch 2 taken 958 times.
✓ Branch 3 taken 367129 times.
✓ Branch 4 taken 958 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 3 times.
✓ Branch 7 taken 955 times.
371392 if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2161 3 dec->expect_end_ts = 1;
2162
2163 371392 return ret;
2164 }
2165
2166 399751 static int send_to_filter(Scheduler *sch, SchFilterGraph *fg,
2167 unsigned in_idx, AVFrame *frame)
2168 {
2169
2/2
✓ Branch 0 taken 393295 times.
✓ Branch 1 taken 6456 times.
399751 if (frame)
2170 393295 return tq_send(fg->queue, in_idx, frame);
2171
2172
1/2
✓ Branch 0 taken 6456 times.
✗ Branch 1 not taken.
6456 if (!fg->inputs[in_idx].send_finished) {
2173 6456 fg->inputs[in_idx].send_finished = 1;
2174 6456 tq_send_finish(fg->queue, in_idx);
2175
2176 // close the control stream when all actual inputs are done
2177
2/2
✓ Branch 0 taken 6380 times.
✓ Branch 1 taken 76 times.
6456 if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
2178 6380 tq_send_finish(fg->queue, fg->nb_inputs);
2179 }
2180 6456 return 0;
2181 }
2182
2183 403856 static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2184 uint8_t *dst_finished, AVFrame *frame)
2185 {
2186 int ret;
2187
2188
2/2
✓ Branch 0 taken 6366 times.
✓ Branch 1 taken 397490 times.
403856 if (*dst_finished)
2189 6366 return AVERROR_EOF;
2190
2191
2/2
✓ Branch 0 taken 3316 times.
✓ Branch 1 taken 394174 times.
397490 if (!frame)
2192 3316 goto finish;
2193
2194 788348 ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2195
2/2
✓ Branch 0 taken 393295 times.
✓ Branch 1 taken 879 times.
394174 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2196 879 send_to_enc(sch, &sch->enc[dst.idx], frame);
2197
2/2
✓ Branch 0 taken 3178 times.
✓ Branch 1 taken 390996 times.
394174 if (ret == AVERROR_EOF)
2198 3178 goto finish;
2199
2200 390996 return ret;
2201
2202 6494 finish:
2203
2/2
✓ Branch 0 taken 6456 times.
✓ Branch 1 taken 38 times.
6494 if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2204 6456 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2205 else
2206 38 send_to_enc(sch, &sch->enc[dst.idx], NULL);
2207
2208 6494 *dst_finished = 1;
2209
2210 6494 return AVERROR_EOF;
2211 }
2212
2213 395930 int sch_dec_send(Scheduler *sch, unsigned dec_idx,
2214 unsigned out_idx, AVFrame *frame)
2215 {
2216 SchDec *dec;
2217 SchDecOutput *o;
2218 int ret;
2219 395930 unsigned nb_done = 0;
2220
2221
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 395930 times.
395930 av_assert0(dec_idx < sch->nb_dec);
2222 395930 dec = &sch->dec[dec_idx];
2223
2224
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 395930 times.
395930 av_assert0(out_idx < dec->nb_outputs);
2225 395930 o = &dec->outputs[out_idx];
2226
2227
2/2
✓ Branch 0 taken 397362 times.
✓ Branch 1 taken 395930 times.
793292 for (unsigned i = 0; i < o->nb_dst; i++) {
2228 397362 uint8_t *finished = &o->dst_finished[i];
2229 397362 AVFrame *to_send = frame;
2230
2231 // sending a frame consumes it, so make a temporary reference if needed
2232
2/2
✓ Branch 0 taken 1432 times.
✓ Branch 1 taken 395930 times.
397362 if (i < o->nb_dst - 1) {
2233 1432 to_send = dec->send_frame;
2234
2235 // frame may sometimes contain props only,
2236 // e.g. to signal EOF timestamp
2237
2/2
✓ Branch 0 taken 1334 times.
✓ Branch 1 taken 98 times.
1432 ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2238 98 av_frame_copy_props(to_send, frame);
2239
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1432 times.
1432 if (ret < 0)
2240 return ret;
2241 }
2242
2243 397362 ret = dec_send_to_dst(sch, o->dst[i], finished, to_send);
2244
2/2
✓ Branch 0 taken 6366 times.
✓ Branch 1 taken 390996 times.
397362 if (ret < 0) {
2245 6366 av_frame_unref(to_send);
2246
1/2
✓ Branch 0 taken 6366 times.
✗ Branch 1 not taken.
6366 if (ret == AVERROR_EOF) {
2247 6366 nb_done++;
2248 6366 continue;
2249 }
2250 return ret;
2251 }
2252 }
2253
2254
2/2
✓ Branch 0 taken 6275 times.
✓ Branch 1 taken 389655 times.
395930 return (nb_done == o->nb_dst) ? AVERROR_EOF : 0;
2255 }
2256
2257 6436 static int dec_done(Scheduler *sch, unsigned dec_idx)
2258 {
2259 6436 SchDec *dec = &sch->dec[dec_idx];
2260 6436 int ret = 0;
2261
2262 6436 tq_receive_finish(dec->queue, 0);
2263
2264 // make sure our source does not get stuck waiting for end timestamps
2265 // that will never arrive
2266
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6435 times.
6436 if (dec->queue_end_ts)
2267 1 av_thread_message_queue_set_err_recv(dec->queue_end_ts, AVERROR_EOF);
2268
2269
2/2
✓ Branch 0 taken 6442 times.
✓ Branch 1 taken 6436 times.
12878 for (unsigned i = 0; i < dec->nb_outputs; i++) {
2270 6442 SchDecOutput *o = &dec->outputs[i];
2271
2272
2/2
✓ Branch 0 taken 6494 times.
✓ Branch 1 taken 6442 times.
12936 for (unsigned j = 0; j < o->nb_dst; j++) {
2273 6494 int err = dec_send_to_dst(sch, o->dst[j], &o->dst_finished[j], NULL);
2274
2/4
✓ Branch 0 taken 6494 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6494 times.
6494 if (err < 0 && err != AVERROR_EOF)
2275 ret = err_merge(ret, err);
2276 }
2277 }
2278
2279 6436 return ret;
2280 }
2281
2282 421477 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2283 {
2284 SchEnc *enc;
2285 int ret, dummy;
2286
2287
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 421477 times.
421477 av_assert0(enc_idx < sch->nb_enc);
2288 421477 enc = &sch->enc[enc_idx];
2289
2290 421477 ret = tq_receive(enc->queue, &dummy, frame);
2291
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 421477 times.
421477 av_assert0(dummy <= 0);
2292
2293 421477 return ret;
2294 }
2295
2296 415402 static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2297 uint8_t *dst_finished, AVPacket *pkt)
2298 {
2299 int ret;
2300
2301
2/2
✓ Branch 0 taken 46 times.
✓ Branch 1 taken 415356 times.
415402 if (*dst_finished)
2302 46 return AVERROR_EOF;
2303
2304
2/2
✓ Branch 0 taken 6597 times.
✓ Branch 1 taken 408759 times.
415356 if (!pkt)
2305 6597 goto finish;
2306
2307 817518 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2308
2/2
✓ Branch 0 taken 408709 times.
✓ Branch 1 taken 50 times.
408759 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2309 50 tq_send(sch->dec[dst.idx].queue, 0, pkt);
2310
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 408757 times.
408759 if (ret == AVERROR_EOF)
2311 2 goto finish;
2312
2313 408757 return ret;
2314
2315 6599 finish:
2316
2/2
✓ Branch 0 taken 6598 times.
✓ Branch 1 taken 1 times.
6599 if (dst.type == SCH_NODE_TYPE_MUX)
2317 6598 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2318 else
2319 1 tq_send_finish(sch->dec[dst.idx].queue, 0);