FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/fftools/ffmpeg_sched.c
Date: 2024-11-20 23:03:26
Exec Total Coverage
Lines: 1066 1223 87.2%
Functions: 65 67 97.0%
Branches: 589 817 72.1%

Line Branch Exec Source
1 /*
2 * Inter-thread scheduling/synchronization.
3 * Copyright (c) 2023 Anton Khirnov
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31
32 #include "libavcodec/packet.h"
33
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
40 #include "libavutil/threadmessage.h"
41 #include "libavutil/time.h"
42
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
46
47 enum QueueType {
48 QUEUE_PACKETS,
49 QUEUE_FRAMES,
50 };
51
52 typedef struct SchWaiter {
53 pthread_mutex_t lock;
54 pthread_cond_t cond;
55 atomic_int choked;
56
57 // the following are internal state of schedule_update_locked() and must not
58 // be accessed outside of it
59 int choked_prev;
60 int choked_next;
61 } SchWaiter;
62
63 typedef struct SchTask {
64 Scheduler *parent;
65 SchedulerNode node;
66
67 SchThreadFunc func;
68 void *func_arg;
69
70 pthread_t thread;
71 int thread_running;
72 } SchTask;
73
74 typedef struct SchDecOutput {
75 SchedulerNode *dst;
76 uint8_t *dst_finished;
77 unsigned nb_dst;
78 } SchDecOutput;
79
80 typedef struct SchDec {
81 const AVClass *class;
82
83 SchedulerNode src;
84
85 SchDecOutput *outputs;
86 unsigned nb_outputs;
87
88 SchTask task;
89 // Queue for receiving input packets, one stream.
90 ThreadQueue *queue;
91
92 // Queue for sending post-flush end timestamps back to the source
93 AVThreadMessageQueue *queue_end_ts;
94 int expect_end_ts;
95
96 // temporary storage used by sch_dec_send()
97 AVFrame *send_frame;
98 } SchDec;
99
100 typedef struct SchSyncQueue {
101 SyncQueue *sq;
102 AVFrame *frame;
103 pthread_mutex_t lock;
104
105 unsigned *enc_idx;
106 unsigned nb_enc_idx;
107 } SchSyncQueue;
108
109 typedef struct SchEnc {
110 const AVClass *class;
111
112 SchedulerNode src;
113 SchedulerNode *dst;
114 uint8_t *dst_finished;
115 unsigned nb_dst;
116
117 // [0] - index of the sync queue in Scheduler.sq_enc,
118 // [1] - index of this encoder in the sq
119 int sq_idx[2];
120
121 /* Opening encoders is somewhat nontrivial due to their interaction with
122 * sync queues, which are (among other things) responsible for maintaining
123 * constant audio frame size, when it is required by the encoder.
124 *
125 * Opening the encoder requires stream parameters, obtained from the first
126 * frame. However, that frame cannot be properly chunked by the sync queue
127 * without knowing the required frame size, which is only available after
128 * opening the encoder.
129 *
130 * This apparent circular dependency is resolved in the following way:
131 * - the caller creating the encoder gives us a callback which opens the
132 * encoder and returns the required frame size (if any)
133 * - when the first frame is sent to the encoder, the sending thread
134 * - calls this callback, opening the encoder
135 * - passes the returned frame size to the sync queue
136 */
137 int (*open_cb)(void *opaque, const AVFrame *frame);
138 int opened;
139
140 SchTask task;
141 // Queue for receiving input frames, one stream.
142 ThreadQueue *queue;
143 // tq_send() to queue returned EOF
144 int in_finished;
145
146 // temporary storage used by sch_enc_send()
147 AVPacket *send_pkt;
148 } SchEnc;
149
150 typedef struct SchDemuxStream {
151 SchedulerNode *dst;
152 uint8_t *dst_finished;
153 unsigned nb_dst;
154 } SchDemuxStream;
155
156 typedef struct SchDemux {
157 const AVClass *class;
158
159 SchDemuxStream *streams;
160 unsigned nb_streams;
161
162 SchTask task;
163 SchWaiter waiter;
164
165 // temporary storage used by sch_demux_send()
166 AVPacket *send_pkt;
167
168 // protected by schedule_lock
169 int task_exited;
170 } SchDemux;
171
172 typedef struct PreMuxQueue {
173 /**
174 * Queue for buffering the packets before the muxer task can be started.
175 */
176 AVFifo *fifo;
177 /**
178 * Maximum number of packets in fifo.
179 */
180 int max_packets;
181 /*
182 * The size of the AVPackets' buffers in queue.
183 * Updated when a packet is either pushed or pulled from the queue.
184 */
185 size_t data_size;
186 /* Threshold after which max_packets will be in effect */
187 size_t data_threshold;
188 } PreMuxQueue;
189
190 typedef struct SchMuxStream {
191 SchedulerNode src;
192 SchedulerNode src_sched;
193
194 unsigned *sub_heartbeat_dst;
195 unsigned nb_sub_heartbeat_dst;
196
197 PreMuxQueue pre_mux_queue;
198
199 // an EOF was generated while flushing the pre-mux queue
200 int init_eof;
201
202 ////////////////////////////////////////////////////////////
203 // The following are protected by Scheduler.schedule_lock //
204
205 /* dts+duration of the last packet sent to this stream
206 in AV_TIME_BASE_Q */
207 int64_t last_dts;
208 // this stream no longer accepts input
209 int source_finished;
210 ////////////////////////////////////////////////////////////
211 } SchMuxStream;
212
213 typedef struct SchMux {
214 const AVClass *class;
215
216 SchMuxStream *streams;
217 unsigned nb_streams;
218 unsigned nb_streams_ready;
219
220 int (*init)(void *arg);
221
222 SchTask task;
223 /**
224 * Set to 1 after starting the muxer task and flushing the
225 * pre-muxing queues.
226 * Set either before any tasks have started, or with
227 * Scheduler.mux_ready_lock held.
228 */
229 atomic_int mux_started;
230 ThreadQueue *queue;
231 unsigned queue_size;
232
233 AVPacket *sub_heartbeat_pkt;
234 } SchMux;
235
236 typedef struct SchFilterIn {
237 SchedulerNode src;
238 SchedulerNode src_sched;
239 int send_finished;
240 int receive_finished;
241 } SchFilterIn;
242
243 typedef struct SchFilterOut {
244 SchedulerNode dst;
245 } SchFilterOut;
246
247 typedef struct SchFilterGraph {
248 const AVClass *class;
249
250 SchFilterIn *inputs;
251 unsigned nb_inputs;
252 atomic_uint nb_inputs_finished_send;
253 unsigned nb_inputs_finished_receive;
254
255 SchFilterOut *outputs;
256 unsigned nb_outputs;
257
258 SchTask task;
259 // input queue, nb_inputs+1 streams
260 // last stream is control
261 ThreadQueue *queue;
262 SchWaiter waiter;
263
264 // protected by schedule_lock
265 unsigned best_input;
266 int task_exited;
267 } SchFilterGraph;
268
269 enum SchedulerState {
270 SCH_STATE_UNINIT,
271 SCH_STATE_STARTED,
272 SCH_STATE_STOPPED,
273 };
274
275 struct Scheduler {
276 const AVClass *class;
277
278 SchDemux *demux;
279 unsigned nb_demux;
280
281 SchMux *mux;
282 unsigned nb_mux;
283
284 unsigned nb_mux_ready;
285 pthread_mutex_t mux_ready_lock;
286
287 unsigned nb_mux_done;
288 pthread_mutex_t mux_done_lock;
289 pthread_cond_t mux_done_cond;
290
291
292 SchDec *dec;
293 unsigned nb_dec;
294
295 SchEnc *enc;
296 unsigned nb_enc;
297
298 SchSyncQueue *sq_enc;
299 unsigned nb_sq_enc;
300
301 SchFilterGraph *filters;
302 unsigned nb_filters;
303
304 char *sdp_filename;
305 int sdp_auto;
306
307 enum SchedulerState state;
308 atomic_int terminate;
309 atomic_int task_failed;
310
311 pthread_mutex_t schedule_lock;
312
313 atomic_int_least64_t last_dts;
314 };
315
316 /**
317 * Wait until this task is allowed to proceed.
318 *
319 * @retval 0 the caller should proceed
320 * @retval 1 the caller should terminate
321 */
322 491275 static int waiter_wait(Scheduler *sch, SchWaiter *w)
323 {
324 int terminate;
325
326
2/2
✓ Branch 0 taken 491027 times.
✓ Branch 1 taken 248 times.
491275 if (!atomic_load(&w->choked))
327 491027 return 0;
328
329 248 pthread_mutex_lock(&w->lock);
330
331
4/4
✓ Branch 0 taken 254 times.
✓ Branch 1 taken 204 times.
✓ Branch 2 taken 210 times.
✓ Branch 3 taken 44 times.
458 while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
332 210 pthread_cond_wait(&w->cond, &w->lock);
333
334 248 terminate = atomic_load(&sch->terminate);
335
336 248 pthread_mutex_unlock(&w->lock);
337
338 248 return terminate;
339 }
340
341 32894 static void waiter_set(SchWaiter *w, int choked)
342 {
343 32894 pthread_mutex_lock(&w->lock);
344
345 32894 atomic_store(&w->choked, choked);
346 32894 pthread_cond_signal(&w->cond);
347
348 32894 pthread_mutex_unlock(&w->lock);
349 32894 }
350
351 14701 static int waiter_init(SchWaiter *w)
352 {
353 int ret;
354
355 14701 atomic_init(&w->choked, 0);
356
357 14701 ret = pthread_mutex_init(&w->lock, NULL);
358
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 14701 times.
14701 if (ret)
359 return AVERROR(ret);
360
361 14701 ret = pthread_cond_init(&w->cond, NULL);
362
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 14701 times.
14701 if (ret)
363 return AVERROR(ret);
364
365 14701 return 0;
366 }
367
368 14701 static void waiter_uninit(SchWaiter *w)
369 {
370 14701 pthread_mutex_destroy(&w->lock);
371 14701 pthread_cond_destroy(&w->cond);
372 14701 }
373
374 29903 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
375 enum QueueType type)
376 {
377 ThreadQueue *tq;
378 ObjPool *op;
379
380
1/2
✓ Branch 0 taken 29903 times.
✗ Branch 1 not taken.
29903 if (queue_size <= 0) {
381
2/2
✓ Branch 0 taken 15233 times.
✓ Branch 1 taken 14670 times.
29903 if (type == QUEUE_FRAMES)
382 15233 queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
383 else
384 14670 queue_size = DEFAULT_PACKET_THREAD_QUEUE_SIZE;
385 }
386
387
2/2
✓ Branch 0 taken 15233 times.
✓ Branch 1 taken 14670 times.
29903 if (type == QUEUE_FRAMES) {
388 // This queue length is used in the decoder code to ensure that
389 // there are enough entries in fixed-size frame pools to account
390 // for frames held in queues inside the ffmpeg utility. If this
391 // can ever dynamically change then the corresponding decode
392 // code needs to be updated as well.
393
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15233 times.
15233 av_assert0(queue_size == DEFAULT_FRAME_THREAD_QUEUE_SIZE);
394 }
395
396
2/2
✓ Branch 0 taken 14670 times.
✓ Branch 1 taken 15233 times.
29903 op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() :
397 15233 objpool_alloc_frames();
398
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29903 times.
29903 if (!op)
399 return AVERROR(ENOMEM);
400
401
2/2
✓ Branch 0 taken 14670 times.
✓ Branch 1 taken 15233 times.
29903 tq = tq_alloc(nb_streams, queue_size, op,
402 (type == QUEUE_PACKETS) ? pkt_move : frame_move);
403
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29903 times.
29903 if (!tq) {
404 objpool_free(&op);
405 return AVERROR(ENOMEM);
406 }
407
408 29903 *ptq = tq;
409 29903 return 0;
410 }
411
412 static void *task_wrapper(void *arg);
413
414 37053 static int task_start(SchTask *task)
415 {
416 int ret;
417
418 37053 av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
419
420
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 37053 times.
37053 av_assert0(!task->thread_running);
421
422 37053 ret = pthread_create(&task->thread, NULL, task_wrapper, task);
423
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 37053 times.
37053 if (ret) {
424 av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
425 strerror(ret));
426 return AVERROR(ret);
427 }
428
429 37053 task->thread_running = 1;
430 37053 return 0;
431 }
432
433 37067 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
434 SchThreadFunc func, void *func_arg)
435 {
436 37067 task->parent = sch;
437
438 37067 task->node.type = type;
439 37067 task->node.idx = idx;
440
441 37067 task->func = func;
442 37067 task->func_arg = func_arg;
443 37067 }
444
445 535352 static int64_t trailing_dts(const Scheduler *sch, int count_finished)
446 {
447 535352 int64_t min_dts = INT64_MAX;
448
449
2/2
✓ Branch 0 taken 535604 times.
✓ Branch 1 taken 520431 times.
1056035 for (unsigned i = 0; i < sch->nb_mux; i++) {
450 535604 const SchMux *mux = &sch->mux[i];
451
452
2/2
✓ Branch 0 taken 581802 times.
✓ Branch 1 taken 520683 times.
1102485 for (unsigned j = 0; j < mux->nb_streams; j++) {
453 581802 const SchMuxStream *ms = &mux->streams[j];
454
455
4/4
✓ Branch 0 taken 44798 times.
✓ Branch 1 taken 537004 times.
✓ Branch 2 taken 36478 times.
✓ Branch 3 taken 8320 times.
581802 if (ms->source_finished && !count_finished)
456 36478 continue;
457
2/2
✓ Branch 0 taken 14921 times.
✓ Branch 1 taken 530403 times.
545324 if (ms->last_dts == AV_NOPTS_VALUE)
458 14921 return AV_NOPTS_VALUE;
459
460 530403 min_dts = FFMIN(min_dts, ms->last_dts);
461 }
462 }
463
464
2/2
✓ Branch 0 taken 492273 times.
✓ Branch 1 taken 28158 times.
520431 return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
465 }
466
467 7915 void sch_free(Scheduler **psch)
468 {
469 7915 Scheduler *sch = *psch;
470
471
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7915 times.
7915 if (!sch)
472 return;
473
474 7915 sch_stop(sch, NULL);
475
476
2/2
✓ Branch 0 taken 7164 times.
✓ Branch 1 taken 7915 times.
15079 for (unsigned i = 0; i < sch->nb_demux; i++) {
477 7164 SchDemux *d = &sch->demux[i];
478
479
2/2
✓ Branch 0 taken 7390 times.
✓ Branch 1 taken 7164 times.
14554 for (unsigned j = 0; j < d->nb_streams; j++) {
480 7390 SchDemuxStream *ds = &d->streams[j];
481 7390 av_freep(&ds->dst);
482 7390 av_freep(&ds->dst_finished);
483 }
484 7164 av_freep(&d->streams);
485
486 7164 av_packet_free(&d->send_pkt);
487
488 7164 waiter_uninit(&d->waiter);
489 }
490 7915 av_freep(&sch->demux);
491
492
2/2
✓ Branch 0 taken 7916 times.
✓ Branch 1 taken 7915 times.
15831 for (unsigned i = 0; i < sch->nb_mux; i++) {
493 7916 SchMux *mux = &sch->mux[i];
494
495
2/2
✓ Branch 0 taken 8356 times.
✓ Branch 1 taken 7916 times.
16272 for (unsigned j = 0; j < mux->nb_streams; j++) {
496 8356 SchMuxStream *ms = &mux->streams[j];
497
498
1/2
✓ Branch 0 taken 8356 times.
✗ Branch 1 not taken.
8356 if (ms->pre_mux_queue.fifo) {
499 AVPacket *pkt;
500
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 8356 times.
8356 while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
501 av_packet_free(&pkt);
502 8356 av_fifo_freep2(&ms->pre_mux_queue.fifo);
503 }
504
505 8356 av_freep(&ms->sub_heartbeat_dst);
506 }
507 7916 av_freep(&mux->streams);
508
509 7916 av_packet_free(&mux->sub_heartbeat_pkt);
510
511 7916 tq_free(&mux->queue);
512 }
513 7915 av_freep(&sch->mux);
514
515
2/2
✓ Branch 0 taken 6754 times.
✓ Branch 1 taken 7915 times.
14669 for (unsigned i = 0; i < sch->nb_dec; i++) {
516 6754 SchDec *dec = &sch->dec[i];
517
518 6754 tq_free(&dec->queue);
519
520 6754 av_thread_message_queue_free(&dec->queue_end_ts);
521
522
2/2
✓ Branch 0 taken 6760 times.
✓ Branch 1 taken 6754 times.
13514 for (unsigned j = 0; j < dec->nb_outputs; j++) {
523 6760 SchDecOutput *o = &dec->outputs[j];
524
525 6760 av_freep(&o->dst);
526 6760 av_freep(&o->dst_finished);
527 }
528
529 6754 av_freep(&dec->outputs);
530
531 6754 av_frame_free(&dec->send_frame);
532 }
533 7915 av_freep(&sch->dec);
534
535
2/2
✓ Branch 0 taken 7696 times.
✓ Branch 1 taken 7915 times.
15611 for (unsigned i = 0; i < sch->nb_enc; i++) {
536 7696 SchEnc *enc = &sch->enc[i];
537
538 7696 tq_free(&enc->queue);
539
540 7696 av_packet_free(&enc->send_pkt);
541
542 7696 av_freep(&enc->dst);
543 7696 av_freep(&enc->dst_finished);
544 }
545 7915 av_freep(&sch->enc);
546
547
2/2
✓ Branch 0 taken 3068 times.
✓ Branch 1 taken 7915 times.
10983 for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
548 3068 SchSyncQueue *sq = &sch->sq_enc[i];
549 3068 sq_free(&sq->sq);
550 3068 av_frame_free(&sq->frame);
551 3068 pthread_mutex_destroy(&sq->lock);
552 3068 av_freep(&sq->enc_idx);
553 }
554 7915 av_freep(&sch->sq_enc);
555
556
2/2
✓ Branch 0 taken 7537 times.
✓ Branch 1 taken 7915 times.
15452 for (unsigned i = 0; i < sch->nb_filters; i++) {
557 7537 SchFilterGraph *fg = &sch->filters[i];
558
559 7537 tq_free(&fg->queue);
560
561 7537 av_freep(&fg->inputs);
562 7537 av_freep(&fg->outputs);
563
564 7537 waiter_uninit(&fg->waiter);
565 }
566 7915 av_freep(&sch->filters);
567
568 7915 av_freep(&sch->sdp_filename);
569
570 7915 pthread_mutex_destroy(&sch->schedule_lock);
571
572 7915 pthread_mutex_destroy(&sch->mux_ready_lock);
573
574 7915 pthread_mutex_destroy(&sch->mux_done_lock);
575 7915 pthread_cond_destroy(&sch->mux_done_cond);
576
577 7915 av_freep(psch);
578 }
579
580 static const AVClass scheduler_class = {
581 .class_name = "Scheduler",
582 .version = LIBAVUTIL_VERSION_INT,
583 };
584
585 7915 Scheduler *sch_alloc(void)
586 {
587 Scheduler *sch;
588 int ret;
589
590 7915 sch = av_mallocz(sizeof(*sch));
591
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7915 times.
7915 if (!sch)
592 return NULL;
593
594 7915 sch->class = &scheduler_class;
595 7915 sch->sdp_auto = 1;
596
597 7915 ret = pthread_mutex_init(&sch->schedule_lock, NULL);
598
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7915 times.
7915 if (ret)
599 goto fail;
600
601 7915 ret = pthread_mutex_init(&sch->mux_ready_lock, NULL);
602
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7915 times.
7915 if (ret)
603 goto fail;
604
605 7915 ret = pthread_mutex_init(&sch->mux_done_lock, NULL);
606
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7915 times.
7915 if (ret)
607 goto fail;
608
609 7915 ret = pthread_cond_init(&sch->mux_done_cond, NULL);
610
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7915 times.
7915 if (ret)
611 goto fail;
612
613 7915 return sch;
614 fail:
615 sch_free(&sch);
616 return NULL;
617 }
618
619 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
620 {
621 av_freep(&sch->sdp_filename);
622 sch->sdp_filename = av_strdup(sdp_filename);
623 return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
624 }
625
626 static const AVClass sch_mux_class = {
627 .class_name = "SchMux",
628 .version = LIBAVUTIL_VERSION_INT,
629 .parent_log_context_offset = offsetof(SchMux, task.func_arg),
630 };
631
632 7916 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
633 void *arg, int sdp_auto, unsigned thread_queue_size)
634 {
635 7916 const unsigned idx = sch->nb_mux;
636
637 SchMux *mux;
638 int ret;
639
640 7916 ret = GROW_ARRAY(sch->mux, sch->nb_mux);
641
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7916 times.
7916 if (ret < 0)
642 return ret;
643
644 7916 mux = &sch->mux[idx];
645 7916 mux->class = &sch_mux_class;
646 7916 mux->init = init;
647 7916 mux->queue_size = thread_queue_size;
648
649 7916 task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
650
651 7916 sch->sdp_auto &= sdp_auto;
652
653 7916 return idx;
654 }
655
656 8356 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
657 {
658 SchMux *mux;
659 SchMuxStream *ms;
660 unsigned stream_idx;
661 int ret;
662
663
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8356 times.
8356 av_assert0(mux_idx < sch->nb_mux);
664 8356 mux = &sch->mux[mux_idx];
665
666 8356 ret = GROW_ARRAY(mux->streams, mux->nb_streams);
667
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8356 times.
8356 if (ret < 0)
668 return ret;
669 8356 stream_idx = mux->nb_streams - 1;
670
671 8356 ms = &mux->streams[stream_idx];
672
673 8356 ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
674
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8356 times.
8356 if (!ms->pre_mux_queue.fifo)
675 return AVERROR(ENOMEM);
676
677 8356 ms->last_dts = AV_NOPTS_VALUE;
678
679 8356 return stream_idx;
680 }
681
682 static const AVClass sch_demux_class = {
683 .class_name = "SchDemux",
684 .version = LIBAVUTIL_VERSION_INT,
685 .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
686 };
687
688 7164 int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
689 {
690 7164 const unsigned idx = sch->nb_demux;
691
692 SchDemux *d;
693 int ret;
694
695 7164 ret = GROW_ARRAY(sch->demux, sch->nb_demux);
696
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7164 times.
7164 if (ret < 0)
697 return ret;
698
699 7164 d = &sch->demux[idx];
700
701 7164 task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
702
703 7164 d->class = &sch_demux_class;
704 7164 d->send_pkt = av_packet_alloc();
705
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7164 times.
7164 if (!d->send_pkt)
706 return AVERROR(ENOMEM);
707
708 7164 ret = waiter_init(&d->waiter);
709
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7164 times.
7164 if (ret < 0)
710 return ret;
711
712 7164 return idx;
713 }
714
715 7390 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
716 {
717 SchDemux *d;
718 int ret;
719
720
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7390 times.
7390 av_assert0(demux_idx < sch->nb_demux);
721 7390 d = &sch->demux[demux_idx];
722
723 7390 ret = GROW_ARRAY(d->streams, d->nb_streams);
724
1/2
✓ Branch 0 taken 7390 times.
✗ Branch 1 not taken.
7390 return ret < 0 ? ret : d->nb_streams - 1;
725 }
726
727 6760 int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
728 {
729 SchDec *dec;
730 int ret;
731
732
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6760 times.
6760 av_assert0(dec_idx < sch->nb_dec);
733 6760 dec = &sch->dec[dec_idx];
734
735 6760 ret = GROW_ARRAY(dec->outputs, dec->nb_outputs);
736
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6760 times.
6760 if (ret < 0)
737 return ret;
738
739 6760 return dec->nb_outputs - 1;
740 }
741
742 static const AVClass sch_dec_class = {
743 .class_name = "SchDec",
744 .version = LIBAVUTIL_VERSION_INT,
745 .parent_log_context_offset = offsetof(SchDec, task.func_arg),
746 };
747
748 6754 int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
749 {
750 6754 const unsigned idx = sch->nb_dec;
751
752 SchDec *dec;
753 int ret;
754
755 6754 ret = GROW_ARRAY(sch->dec, sch->nb_dec);
756
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6754 times.
6754 if (ret < 0)
757 return ret;
758
759 6754 dec = &sch->dec[idx];
760
761 6754 task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
762
763 6754 dec->class = &sch_dec_class;
764 6754 dec->send_frame = av_frame_alloc();
765
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6754 times.
6754 if (!dec->send_frame)
766 return AVERROR(ENOMEM);
767
768 6754 ret = sch_add_dec_output(sch, idx);
769
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6754 times.
6754 if (ret < 0)
770 return ret;
771
772 6754 ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
773
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6754 times.
6754 if (ret < 0)
774 return ret;
775
776
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6753 times.
6754 if (send_end_ts) {
777 1 ret = av_thread_message_queue_alloc(&dec->queue_end_ts, 1, sizeof(Timestamp));
778
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ret < 0)
779 return ret;
780 }
781
782 6754 return idx;
783 }
784
785 static const AVClass sch_enc_class = {
786 .class_name = "SchEnc",
787 .version = LIBAVUTIL_VERSION_INT,
788 .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
789 };
790
791 7696 int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
792 int (*open_cb)(void *opaque, const AVFrame *frame))
793 {
794 7696 const unsigned idx = sch->nb_enc;
795
796 SchEnc *enc;
797 int ret;
798
799 7696 ret = GROW_ARRAY(sch->enc, sch->nb_enc);
800
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7696 times.
7696 if (ret < 0)
801 return ret;
802
803 7696 enc = &sch->enc[idx];
804
805 7696 enc->class = &sch_enc_class;
806 7696 enc->open_cb = open_cb;
807 7696 enc->sq_idx[0] = -1;
808 7696 enc->sq_idx[1] = -1;
809
810 7696 task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
811
812 7696 enc->send_pkt = av_packet_alloc();
813
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7696 times.
7696 if (!enc->send_pkt)
814 return AVERROR(ENOMEM);
815
816 7696 ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
817
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7696 times.
7696 if (ret < 0)
818 return ret;
819
820 7696 return idx;
821 }
822
823 static const AVClass sch_fg_class = {
824 .class_name = "SchFilterGraph",
825 .version = LIBAVUTIL_VERSION_INT,
826 .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
827 };
828
829 7537 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
830 SchThreadFunc func, void *ctx)
831 {
832 7537 const unsigned idx = sch->nb_filters;
833
834 SchFilterGraph *fg;
835 int ret;
836
837 7537 ret = GROW_ARRAY(sch->filters, sch->nb_filters);
838
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7537 times.
7537 if (ret < 0)
839 return ret;
840 7537 fg = &sch->filters[idx];
841
842 7537 fg->class = &sch_fg_class;
843
844 7537 task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
845
846
2/2
✓ Branch 0 taken 6698 times.
✓ Branch 1 taken 839 times.
7537 if (nb_inputs) {
847 6698 fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
848
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6698 times.
6698 if (!fg->inputs)
849 return AVERROR(ENOMEM);
850 6698 fg->nb_inputs = nb_inputs;
851 }
852
853
1/2
✓ Branch 0 taken 7537 times.
✗ Branch 1 not taken.
7537 if (nb_outputs) {
854 7537 fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
855
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7537 times.
7537 if (!fg->outputs)
856 return AVERROR(ENOMEM);
857 7537 fg->nb_outputs = nb_outputs;
858 }
859
860 7537 ret = waiter_init(&fg->waiter);
861
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7537 times.
7537 if (ret < 0)
862 return ret;
863
864 7537 ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
865
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7537 times.
7537 if (ret < 0)
866 return ret;
867
868 7537 return idx;
869 }
870
871 3068 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
872 {
873 SchSyncQueue *sq;
874 int ret;
875
876 3068 ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
877
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3068 times.
3068 if (ret < 0)
878 return ret;
879 3068 sq = &sch->sq_enc[sch->nb_sq_enc - 1];
880
881 3068 sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
882
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3068 times.
3068 if (!sq->sq)
883 return AVERROR(ENOMEM);
884
885 3068 sq->frame = av_frame_alloc();
886
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3068 times.
3068 if (!sq->frame)
887 return AVERROR(ENOMEM);
888
889 3068 ret = pthread_mutex_init(&sq->lock, NULL);
890
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3068 times.
3068 if (ret)
891 return AVERROR(ret);
892
893 3068 return sq - sch->sq_enc;
894 }
895
896 3121 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
897 int limiting, uint64_t max_frames)
898 {
899 SchSyncQueue *sq;
900 SchEnc *enc;
901 int ret;
902
903
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3121 times.
3121 av_assert0(sq_idx < sch->nb_sq_enc);
904 3121 sq = &sch->sq_enc[sq_idx];
905
906
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3121 times.
3121 av_assert0(enc_idx < sch->nb_enc);
907 3121 enc = &sch->enc[enc_idx];
908
909 3121 ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
910
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3121 times.
3121 if (ret < 0)
911 return ret;
912 3121 sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
913
914 3121 ret = sq_add_stream(sq->sq, limiting);
915
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3121 times.
3121 if (ret < 0)
916 return ret;
917
918 3121 enc->sq_idx[0] = sq_idx;
919 3121 enc->sq_idx[1] = ret;
920
921
2/2
✓ Branch 0 taken 2942 times.
✓ Branch 1 taken 179 times.
3121 if (max_frames != INT64_MAX)
922 2942 sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
923
924 3121 return 0;
925 }
926
927 29580 int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
928 {
929 int ret;
930
931
4/5
✓ Branch 0 taken 7413 times.
✓ Branch 1 taken 6812 times.
✓ Branch 2 taken 7658 times.
✓ Branch 3 taken 7697 times.
✗ Branch 4 not taken.
29580 switch (src.type) {
932 7413 case SCH_NODE_TYPE_DEMUX: {
933 SchDemuxStream *ds;
934
935
2/4
✓ Branch 0 taken 7413 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7413 times.
7413 av_assert0(src.idx < sch->nb_demux &&
936 src.idx_stream < sch->demux[src.idx].nb_streams);
937 7413 ds = &sch->demux[src.idx].streams[src.idx_stream];
938
939 7413 ret = GROW_ARRAY(ds->dst, ds->nb_dst);
940
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7413 times.
7413 if (ret < 0)
941 return ret;
942
943 7413 ds->dst[ds->nb_dst - 1] = dst;
944
945 // demuxed packets go to decoding or streamcopy
946
2/3
✓ Branch 0 taken 6753 times.
✓ Branch 1 taken 660 times.
✗ Branch 2 not taken.
7413 switch (dst.type) {
947 6753 case SCH_NODE_TYPE_DEC: {
948 SchDec *dec;
949
950
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6753 times.
6753 av_assert0(dst.idx < sch->nb_dec);
951 6753 dec = &sch->dec[dst.idx];
952
953
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6753 times.
6753 av_assert0(!dec->src.type);
954 6753 dec->src = src;
955 6753 break;
956 }
957 660 case SCH_NODE_TYPE_MUX: {
958 SchMuxStream *ms;
959
960
2/4
✓ Branch 0 taken 660 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 660 times.
660 av_assert0(dst.idx < sch->nb_mux &&
961 dst.idx_stream < sch->mux[dst.idx].nb_streams);
962 660 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
963
964
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 660 times.
660 av_assert0(!ms->src.type);
965 660 ms->src = src;
966
967 660 break;
968 }
969 default: av_assert0(0);
970 }
971
972 7413 break;
973 }
974 6812 case SCH_NODE_TYPE_DEC: {
975 SchDec *dec;
976 SchDecOutput *o;
977
978
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6812 times.
6812 av_assert0(src.idx < sch->nb_dec);
979 6812 dec = &sch->dec[src.idx];
980
981
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6812 times.
6812 av_assert0(src.idx_stream < dec->nb_outputs);
982 6812 o = &dec->outputs[src.idx_stream];
983
984 6812 ret = GROW_ARRAY(o->dst, o->nb_dst);
985
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6812 times.
6812 if (ret < 0)
986 return ret;
987
988 6812 o->dst[o->nb_dst - 1] = dst;
989
990 // decoded frames go to filters or encoding
991
2/3
✓ Branch 0 taken 6774 times.
✓ Branch 1 taken 38 times.
✗ Branch 2 not taken.
6812 switch (dst.type) {
992 6774 case SCH_NODE_TYPE_FILTER_IN: {
993 SchFilterIn *fi;
994
995
2/4
✓ Branch 0 taken 6774 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6774 times.
6774 av_assert0(dst.idx < sch->nb_filters &&
996 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
997 6774 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
998
999
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6774 times.
6774 av_assert0(!fi->src.type);
1000 6774 fi->src = src;
1001 6774 break;
1002 }
1003 38 case SCH_NODE_TYPE_ENC: {
1004 SchEnc *enc;
1005
1006
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 av_assert0(dst.idx < sch->nb_enc);
1007 38 enc = &sch->enc[dst.idx];
1008
1009
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 av_assert0(!enc->src.type);
1010 38 enc->src = src;
1011 38 break;
1012 }
1013 default: av_assert0(0);
1014 }
1015
1016 6812 break;
1017 }
1018 7658 case SCH_NODE_TYPE_FILTER_OUT: {
1019 SchFilterOut *fo;
1020
1021
2/4
✓ Branch 0 taken 7658 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7658 times.
7658 av_assert0(src.idx < sch->nb_filters &&
1022 src.idx_stream < sch->filters[src.idx].nb_outputs);
1023 7658 fo = &sch->filters[src.idx].outputs[src.idx_stream];
1024
1025
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7658 times.
7658 av_assert0(!fo->dst.type);
1026 7658 fo->dst = dst;
1027
1028 // filtered frames go to encoding or another filtergraph
1029
1/3
✓ Branch 0 taken 7658 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
7658 switch (dst.type) {
1030 7658 case SCH_NODE_TYPE_ENC: {
1031 SchEnc *enc;
1032
1033
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7658 times.
7658 av_assert0(dst.idx < sch->nb_enc);
1034 7658 enc = &sch->enc[dst.idx];
1035
1036
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7658 times.
7658 av_assert0(!enc->src.type);
1037 7658 enc->src = src;
1038 7658 break;
1039 }
1040 case SCH_NODE_TYPE_FILTER_IN: {
1041 SchFilterIn *fi;
1042
1043 av_assert0(dst.idx < sch->nb_filters &&
1044 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1045 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1046
1047 av_assert0(!fi->src.type);
1048 fi->src = src;
1049 break;
1050 }
1051 default: av_assert0(0);
1052 }
1053
1054
1055 7658 break;
1056 }
1057 7697 case SCH_NODE_TYPE_ENC: {
1058 SchEnc *enc;
1059
1060
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7697 times.
7697 av_assert0(src.idx < sch->nb_enc);
1061 7697 enc = &sch->enc[src.idx];
1062
1063 7697 ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1064
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7697 times.
7697 if (ret < 0)
1065 return ret;
1066
1067 7697 enc->dst[enc->nb_dst - 1] = dst;
1068
1069 // encoding packets go to muxing or decoding
1070
2/3
✓ Branch 0 taken 7696 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
7697 switch (dst.type) {
1071 7696 case SCH_NODE_TYPE_MUX: {
1072 SchMuxStream *ms;
1073
1074
2/4
✓ Branch 0 taken 7696 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7696 times.
7696 av_assert0(dst.idx < sch->nb_mux &&
1075 dst.idx_stream < sch->mux[dst.idx].nb_streams);
1076 7696 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1077
1078
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7696 times.
7696 av_assert0(!ms->src.type);
1079 7696 ms->src = src;
1080
1081 7696 break;
1082 }
1083 1 case SCH_NODE_TYPE_DEC: {
1084 SchDec *dec;
1085
1086
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(dst.idx < sch->nb_dec);
1087 1 dec = &sch->dec[dst.idx];
1088
1089
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(!dec->src.type);
1090 1 dec->src = src;
1091
1092 1 break;
1093 }
1094 default: av_assert0(0);
1095 }
1096
1097 7697 break;
1098 }
1099 default: av_assert0(0);
1100 }
1101
1102 29580 return 0;
1103 }
1104
1105 7916 static int mux_task_start(SchMux *mux)
1106 {
1107 7916 int ret = 0;
1108
1109 7916 ret = task_start(&mux->task);
1110
1/2
✓ Branch 0 taken 7916 times.
✗ Branch 1 not taken.
7916 if (ret < 0)
1111 return ret;
1112
1113 /* flush the pre-muxing queues */
1114 3362 while (1) {
1115 11278 int min_stream = -1;
1116 11278 Timestamp min_ts = { .ts = AV_NOPTS_VALUE };
1117
1118 AVPacket *pkt;
1119
1120 // find the stream with the earliest dts or EOF in pre-muxing queue
1121
2/2
✓ Branch 0 taken 21851 times.
✓ Branch 1 taken 11172 times.
33023 for (unsigned i = 0; i < mux->nb_streams; i++) {
1122 21851 SchMuxStream *ms = &mux->streams[i];
1123
1124
2/2
✓ Branch 1 taken 16207 times.
✓ Branch 2 taken 5644 times.
21851 if (av_fifo_peek(ms->pre_mux_queue.fifo, &pkt, 1, 0) < 0)
1125 16207 continue;
1126
1127
4/4
✓ Branch 0 taken 5551 times.
✓ Branch 1 taken 93 times.
✓ Branch 2 taken 13 times.
✓ Branch 3 taken 5538 times.
5644 if (!pkt || pkt->dts == AV_NOPTS_VALUE) {
1128 106 min_stream = i;
1129 106 break;
1130 }
1131
1132
4/4
✓ Branch 0 taken 2256 times.
✓ Branch 1 taken 3282 times.
✓ Branch 2 taken 38 times.
✓ Branch 3 taken 2218 times.
7794 if (min_ts.ts == AV_NOPTS_VALUE ||
1133 2256 av_compare_ts(min_ts.ts, min_ts.tb, pkt->dts, pkt->time_base) > 0) {
1134 3320 min_stream = i;
1135 3320 min_ts = (Timestamp){ .ts = pkt->dts, .tb = pkt->time_base };
1136 }
1137 }
1138
1139
2/2
✓ Branch 0 taken 3362 times.
✓ Branch 1 taken 7916 times.
11278 if (min_stream >= 0) {
1140 3362 SchMuxStream *ms = &mux->streams[min_stream];
1141
1142 3362 ret = av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1);
1143
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3362 times.
3362 av_assert0(ret >= 0);
1144
1145
2/2
✓ Branch 0 taken 3269 times.
✓ Branch 1 taken 93 times.
3362 if (pkt) {
1146
2/2
✓ Branch 0 taken 3255 times.
✓ Branch 1 taken 14 times.
3269 if (!ms->init_eof)
1147 3255 ret = tq_send(mux->queue, min_stream, pkt);
1148 3269 av_packet_free(&pkt);
1149
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 3268 times.
3269 if (ret == AVERROR_EOF)
1150 1 ms->init_eof = 1;
1151
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3268 times.
3268 else if (ret < 0)
1152 return ret;
1153 } else
1154 93 tq_send_finish(mux->queue, min_stream);
1155
1156 3362 continue;
1157 }
1158
1159 7916 break;
1160 }
1161
1162 7916 atomic_store(&mux->mux_started, 1);
1163
1164 7916 return 0;
1165 }
1166
1167 int print_sdp(const char *filename);
1168
1169 7916 static int mux_init(Scheduler *sch, SchMux *mux)
1170 {
1171 int ret;
1172
1173 7916 ret = mux->init(mux->task.func_arg);
1174
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7916 times.
7916 if (ret < 0)
1175 return ret;
1176
1177 7916 sch->nb_mux_ready++;
1178
1179
2/4
✓ Branch 0 taken 7916 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7916 times.
7916 if (sch->sdp_filename || sch->sdp_auto) {
1180 if (sch->nb_mux_ready < sch->nb_mux)
1181 return 0;
1182
1183 ret = print_sdp(sch->sdp_filename);
1184 if (ret < 0) {
1185 av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1186 return ret;
1187 }
1188
1189 /* SDP is written only after all the muxers are ready, so now we
1190 * start ALL the threads */
1191 for (unsigned i = 0; i < sch->nb_mux; i++) {
1192 ret = mux_task_start(&sch->mux[i]);
1193 if (ret < 0)
1194 return ret;
1195 }
1196 } else {
1197 7916 ret = mux_task_start(mux);
1198
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7916 times.
7916 if (ret < 0)
1199 return ret;
1200 }
1201
1202 7916 return 0;
1203 }
1204
1205 8356 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1206 size_t data_threshold, int max_packets)
1207 {
1208 SchMux *mux;
1209 SchMuxStream *ms;
1210
1211
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8356 times.
8356 av_assert0(mux_idx < sch->nb_mux);
1212 8356 mux = &sch->mux[mux_idx];
1213
1214
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8356 times.
8356 av_assert0(stream_idx < mux->nb_streams);
1215 8356 ms = &mux->streams[stream_idx];
1216
1217 8356 ms->pre_mux_queue.max_packets = max_packets;
1218 8356 ms->pre_mux_queue.data_threshold = data_threshold;
1219 8356 }
1220
1221 8356 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1222 {
1223 SchMux *mux;
1224 8356 int ret = 0;
1225
1226
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8356 times.
8356 av_assert0(mux_idx < sch->nb_mux);
1227 8356 mux = &sch->mux[mux_idx];
1228
1229
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8356 times.
8356 av_assert0(stream_idx < mux->nb_streams);
1230
1231 8356 pthread_mutex_lock(&sch->mux_ready_lock);
1232
1233
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8356 times.
8356 av_assert0(mux->nb_streams_ready < mux->nb_streams);
1234
1235 // this may be called during initialization - do not start
1236 // threads before sch_start() is called
1237
2/2
✓ Branch 0 taken 7916 times.
✓ Branch 1 taken 440 times.
8356 if (++mux->nb_streams_ready == mux->nb_streams &&
1238
2/2
✓ Branch 0 taken 7462 times.
✓ Branch 1 taken 454 times.
7916 sch->state >= SCH_STATE_STARTED)
1239 7462 ret = mux_init(sch, mux);
1240
1241 8356 pthread_mutex_unlock(&sch->mux_ready_lock);
1242
1243 8356 return ret;
1244 }
1245
1246 1 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1247 unsigned dec_idx)
1248 {
1249 SchMux *mux;
1250 SchMuxStream *ms;
1251 1 int ret = 0;
1252
1253
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(mux_idx < sch->nb_mux);
1254 1 mux = &sch->mux[mux_idx];
1255
1256
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(stream_idx < mux->nb_streams);
1257 1 ms = &mux->streams[stream_idx];
1258
1259 1 ret = GROW_ARRAY(ms->sub_heartbeat_dst, ms->nb_sub_heartbeat_dst);
1260
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ret < 0)
1261 return ret;
1262
1263
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(dec_idx < sch->nb_dec);
1264 1 ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1265
1266
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!mux->sub_heartbeat_pkt) {
1267 1 mux->sub_heartbeat_pkt = av_packet_alloc();
1268
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!mux->sub_heartbeat_pkt)
1269 return AVERROR(ENOMEM);
1270 }
1271
1272 1 return 0;
1273 }
1274
1275 518655 static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
1276 {
1277 427274 while (1) {
1278 SchFilterGraph *fg;
1279
1280 // fed directly by a demuxer (i.e. not through a filtergraph)
1281
2/2
✓ Branch 0 taken 494071 times.
✓ Branch 1 taken 451858 times.
945929 if (src.type == SCH_NODE_TYPE_DEMUX) {
1282 494071 sch->demux[src.idx].waiter.choked_next = 0;
1283 494071 return;
1284 }
1285
1286
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 451858 times.
451858 av_assert0(src.type == SCH_NODE_TYPE_FILTER_OUT);
1287 451858 fg = &sch->filters[src.idx];
1288
1289 // the filtergraph contains internal sources and
1290 // requested to be scheduled directly
1291
2/2
✓ Branch 0 taken 24584 times.
✓ Branch 1 taken 427274 times.
451858 if (fg->best_input == fg->nb_inputs) {
1292 24584 fg->waiter.choked_next = 0;
1293 24584 return;
1294 }
1295
1296 427274 src = fg->inputs[fg->best_input].src_sched;
1297 }
1298 }
1299
1300 531566 static void schedule_update_locked(Scheduler *sch)
1301 {
1302 int64_t dts;
1303 531566 int have_unchoked = 0;
1304
1305 // on termination request all waiters are choked,
1306 // we are not to unchoke them
1307
2/2
✓ Branch 0 taken 4128 times.
✓ Branch 1 taken 527438 times.
531566 if (atomic_load(&sch->terminate))
1308 4128 return;
1309
1310 527438 dts = trailing_dts(sch, 0);
1311
1312 527438 atomic_store(&sch->last_dts, dts);
1313
1314 // initialize our internal state
1315
2/2
✓ Branch 0 taken 1054876 times.
✓ Branch 1 taken 527438 times.
1582314 for (unsigned type = 0; type < 2; type++)
1316
4/4
✓ Branch 0 taken 1004805 times.
✓ Branch 1 taken 1037045 times.
✓ Branch 2 taken 986974 times.
✓ Branch 3 taken 1054876 times.
2041850 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1317
2/2
✓ Branch 0 taken 477367 times.
✓ Branch 1 taken 509607 times.
986974 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1318 986974 w->choked_prev = atomic_load(&w->choked);
1319 986974 w->choked_next = 1;
1320 }
1321
1322 // figure out the sources that are allowed to proceed
1323
2/2
✓ Branch 0 taken 527705 times.
✓ Branch 1 taken 527438 times.
1055143 for (unsigned i = 0; i < sch->nb_mux; i++) {
1324 527705 SchMux *mux = &sch->mux[i];
1325
1326
2/2
✓ Branch 0 taken 582523 times.
✓ Branch 1 taken 527705 times.
1110228 for (unsigned j = 0; j < mux->nb_streams; j++) {
1327 582523 SchMuxStream *ms = &mux->streams[j];
1328
1329 // unblock sources for output streams that are not finished
1330 // and not too far ahead of the trailing stream
1331
2/2
✓ Branch 0 taken 36686 times.
✓ Branch 1 taken 545837 times.
582523 if (ms->source_finished)
1332 36686 continue;
1333
4/4
✓ Branch 0 taken 29389 times.
✓ Branch 1 taken 516448 times.
✓ Branch 2 taken 9592 times.
✓ Branch 3 taken 19797 times.
545837 if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1334 9592 continue;
1335
4/4
✓ Branch 0 taken 516448 times.
✓ Branch 1 taken 19797 times.
✓ Branch 2 taken 17590 times.
✓ Branch 3 taken 498858 times.
536245 if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1336 17590 continue;
1337
1338 // resolve the source to unchoke
1339 518655 unchoke_for_stream(sch, ms->src_sched);
1340 518655 have_unchoked = 1;
1341 }
1342 }
1343
1344 // make sure to unchoke at least one source, if still available
1345
4/4
✓ Branch 0 taken 61757 times.
✓ Branch 1 taken 513553 times.
✓ Branch 2 taken 47872 times.
✓ Branch 3 taken 13885 times.
575310 for (unsigned type = 0; !have_unchoked && type < 2; type++)
1346
4/4
✓ Branch 0 taken 32987 times.
✓ Branch 1 taken 46728 times.
✓ Branch 2 taken 46116 times.
✓ Branch 3 taken 33599 times.
79715 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1347
2/2
✓ Branch 0 taken 19102 times.
✓ Branch 1 taken 27014 times.
46116 int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1348
2/2
✓ Branch 0 taken 19102 times.
✓ Branch 1 taken 27014 times.
46116 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1349
2/2
✓ Branch 0 taken 14273 times.
✓ Branch 1 taken 31843 times.
46116 if (!exited) {
1350 14273 w->choked_next = 0;
1351 14273 have_unchoked = 1;
1352 14273 break;
1353 }
1354 }
1355
1356
1357
2/2
✓ Branch 0 taken 1054876 times.
✓ Branch 1 taken 527438 times.
1582314 for (unsigned type = 0; type < 2; type++)
1358
4/4
✓ Branch 0 taken 1004805 times.
✓ Branch 1 taken 1037045 times.
✓ Branch 2 taken 986974 times.
✓ Branch 3 taken 1054876 times.
2041850 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1359
2/2
✓ Branch 0 taken 477367 times.
✓ Branch 1 taken 509607 times.
986974 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1360
2/2
✓ Branch 0 taken 18193 times.
✓ Branch 1 taken 968781 times.
986974 if (w->choked_prev != w->choked_next)
1361 18193 waiter_set(w, w->choked_next);
1362 }
1363
1364 }
1365
1366 enum {
1367 CYCLE_NODE_NEW = 0,
1368 CYCLE_NODE_STARTED,
1369 CYCLE_NODE_DONE,
1370 };
1371
1372 static int
1373 7537 check_acyclic_for_output(const Scheduler *sch, SchedulerNode src,
1374 uint8_t *filters_visited, SchedulerNode *filters_stack)
1375 {
1376 7537 unsigned nb_filters_stack = 0;
1377
1378 7537 memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1379
1380 6776 while (1) {
1381 14313 const SchFilterGraph *fg = &sch->filters[src.idx];
1382
1383 14313 filters_visited[src.idx] = CYCLE_NODE_STARTED;
1384
1385 // descend into every input, depth first
1386
2/2
✓ Branch 0 taken 6775 times.
✓ Branch 1 taken 7538 times.
14313 if (src.idx_stream < fg->nb_inputs) {
1387 6775 const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1388
1389 // connected to demuxer, no cycles possible
1390
2/2
✓ Branch 0 taken 6774 times.
✓ Branch 1 taken 1 times.
6775 if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX)
1391 6775 continue;
1392
1393 // otherwise connected to another filtergraph
1394
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(fi->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
1395
1396 // found a cycle
1397
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED)
1398 return AVERROR(EINVAL);
1399
1400 // place current position on stack and descend
1401
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(nb_filters_stack < sch->nb_filters);
1402 1 filters_stack[nb_filters_stack++] = src;
1403 1 src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 };
1404 1 continue;
1405 }
1406
1407 7538 filters_visited[src.idx] = CYCLE_NODE_DONE;
1408
1409 // previous search finished,
1410
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 7537 times.
7538 if (nb_filters_stack) {
1411 1 src = filters_stack[--nb_filters_stack];
1412 1 continue;
1413 }
1414 7537 return 0;
1415 }
1416 }
1417
1418 7914 static int check_acyclic(Scheduler *sch)
1419 {
1420 7914 uint8_t *filters_visited = NULL;
1421 7914 SchedulerNode *filters_stack = NULL;
1422
1423 7914 int ret = 0;
1424
1425
2/2
✓ Branch 0 taken 489 times.
✓ Branch 1 taken 7425 times.
7914 if (!sch->nb_filters)
1426 489 return 0;
1427
1428 7425 filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1429
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7425 times.
7425 if (!filters_visited)
1430 return AVERROR(ENOMEM);
1431
1432 7425 filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1433
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7425 times.
7425 if (!filters_stack) {
1434 ret = AVERROR(ENOMEM);
1435 goto fail;
1436 }
1437
1438 // trace the transcoding graph upstream from every filtegraph
1439
2/2
✓ Branch 0 taken 7537 times.
✓ Branch 1 taken 7425 times.
14962 for (unsigned i = 0; i < sch->nb_filters; i++) {
1440 7537 ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1441 filters_visited, filters_stack);
1442
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7537 times.
7537 if (ret < 0) {
1443 av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1444 goto fail;
1445 }
1446 }
1447
1448 7425 fail:
1449 7425 av_freep(&filters_visited);
1450 7425 av_freep(&filters_stack);
1451 7425 return ret;
1452 }
1453
1454 7914 static int start_prepare(Scheduler *sch)
1455 {
1456 int ret;
1457
1458
2/2
✓ Branch 0 taken 7164 times.
✓ Branch 1 taken 7914 times.
15078 for (unsigned i = 0; i < sch->nb_demux; i++) {
1459 7164 SchDemux *d = &sch->demux[i];
1460
1461
2/2
✓ Branch 0 taken 7390 times.
✓ Branch 1 taken 7164 times.
14554 for (unsigned j = 0; j < d->nb_streams; j++) {
1462 7390 SchDemuxStream *ds = &d->streams[j];
1463
1464
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7390 times.
7390 if (!ds->nb_dst) {
1465 av_log(d, AV_LOG_ERROR,
1466 "Demuxer stream %u not connected to any sink\n", j);
1467 return AVERROR(EINVAL);
1468 }
1469
1470 7390 ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1471
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7390 times.
7390 if (!ds->dst_finished)
1472 return AVERROR(ENOMEM);
1473 }
1474 }
1475
1476
2/2
✓ Branch 0 taken 6754 times.
✓ Branch 1 taken 7914 times.
14668 for (unsigned i = 0; i < sch->nb_dec; i++) {
1477 6754 SchDec *dec = &sch->dec[i];
1478
1479
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6754 times.
6754 if (!dec->src.type) {
1480 av_log(dec, AV_LOG_ERROR,
1481 "Decoder not connected to a source\n");
1482 return AVERROR(EINVAL);
1483 }
1484
1485
2/2
✓ Branch 0 taken 6760 times.
✓ Branch 1 taken 6754 times.
13514 for (unsigned j = 0; j < dec->nb_outputs; j++) {
1486 6760 SchDecOutput *o = &dec->outputs[j];
1487
1488
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6760 times.
6760 if (!o->nb_dst) {
1489 av_log(dec, AV_LOG_ERROR,
1490 "Decoder output %u not connected to any sink\n", j);
1491 return AVERROR(EINVAL);
1492 }
1493
1494 6760 o->dst_finished = av_calloc(o->nb_dst, sizeof(*o->dst_finished));
1495
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6760 times.
6760 if (!o->dst_finished)
1496 return AVERROR(ENOMEM);
1497 }
1498 }
1499
1500
2/2
✓ Branch 0 taken 7696 times.
✓ Branch 1 taken 7914 times.
15610 for (unsigned i = 0; i < sch->nb_enc; i++) {
1501 7696 SchEnc *enc = &sch->enc[i];
1502
1503
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7696 times.
7696 if (!enc->src.type) {
1504 av_log(enc, AV_LOG_ERROR,
1505 "Encoder not connected to a source\n");
1506 return AVERROR(EINVAL);
1507 }
1508
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7696 times.
7696 if (!enc->nb_dst) {
1509 av_log(enc, AV_LOG_ERROR,
1510 "Encoder not connected to any sink\n");
1511 return AVERROR(EINVAL);
1512 }
1513
1514 7696 enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1515
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7696 times.
7696 if (!enc->dst_finished)
1516 return AVERROR(ENOMEM);
1517 }
1518
1519
2/2
✓ Branch 0 taken 7916 times.
✓ Branch 1 taken 7914 times.
15830 for (unsigned i = 0; i < sch->nb_mux; i++) {
1520 7916 SchMux *mux = &sch->mux[i];
1521
1522
2/2
✓ Branch 0 taken 8356 times.
✓ Branch 1 taken 7916 times.
16272 for (unsigned j = 0; j < mux->nb_streams; j++) {
1523 8356 SchMuxStream *ms = &mux->streams[j];
1524
1525
2/3
✓ Branch 0 taken 7696 times.
✓ Branch 1 taken 660 times.
✗ Branch 2 not taken.
8356 switch (ms->src.type) {
1526 7696 case SCH_NODE_TYPE_ENC: {
1527 7696 SchEnc *enc = &sch->enc[ms->src.idx];
1528
2/2
✓ Branch 0 taken 38 times.
✓ Branch 1 taken 7658 times.
7696 if (enc->src.type == SCH_NODE_TYPE_DEC) {
1529 38 ms->src_sched = sch->dec[enc->src.idx].src;
1530
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 av_assert0(ms->src_sched.type == SCH_NODE_TYPE_DEMUX);
1531 } else {
1532 7658 ms->src_sched = enc->src;
1533
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7658 times.
7658 av_assert0(ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
1534 }
1535 7696 break;
1536 }
1537 660 case SCH_NODE_TYPE_DEMUX:
1538 660 ms->src_sched = ms->src;
1539 660 break;
1540 default:
1541 av_log(mux, AV_LOG_ERROR,
1542 "Muxer stream #%u not connected to a source\n", j);
1543 return AVERROR(EINVAL);
1544 }
1545 }
1546
1547 7916 ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1548 QUEUE_PACKETS);
1549
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7916 times.
7916 if (ret < 0)
1550 return ret;
1551 }
1552
1553
2/2
✓ Branch 0 taken 7537 times.
✓ Branch 1 taken 7914 times.
15451 for (unsigned i = 0; i < sch->nb_filters; i++) {
1554 7537 SchFilterGraph *fg = &sch->filters[i];
1555
1556
2/2
✓ Branch 0 taken 6774 times.
✓ Branch 1 taken 7537 times.
14311 for (unsigned j = 0; j < fg->nb_inputs; j++) {
1557 6774 SchFilterIn *fi = &fg->inputs[j];
1558 SchDec *dec;
1559
1560
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6774 times.
6774 if (!fi->src.type) {
1561 av_log(fg, AV_LOG_ERROR,
1562 "Filtergraph input %u not connected to a source\n", j);
1563 return AVERROR(EINVAL);
1564 }
1565
1566
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6774 times.
6774 if (fi->src.type == SCH_NODE_TYPE_FILTER_OUT)
1567 fi->src_sched = fi->src;
1568 else {
1569
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6774 times.
6774 av_assert0(fi->src.type == SCH_NODE_TYPE_DEC);
1570 6774 dec = &sch->dec[fi->src.idx];
1571
1572
2/3
✓ Branch 0 taken 6773 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
6774 switch (dec->src.type) {
1573 6773 case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break;
1574 1 case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break;
1575 default: av_assert0(0);
1576 }
1577 }
1578 }
1579
1580
2/2
✓ Branch 0 taken 7658 times.
✓ Branch 1 taken 7537 times.
15195 for (unsigned j = 0; j < fg->nb_outputs; j++) {
1581 7658 SchFilterOut *fo = &fg->outputs[j];
1582
1583
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7658 times.
7658 if (!fo->dst.type) {
1584 av_log(fg, AV_LOG_ERROR,
1585 "Filtergraph %u output %u not connected to a sink\n", i, j);
1586 return AVERROR(EINVAL);
1587 }
1588 }
1589 }
1590
1591 // Check that the transcoding graph has no cycles.
1592 7914 ret = check_acyclic(sch);
1593
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7914 times.
7914 if (ret < 0)
1594 return ret;
1595
1596 7914 return 0;
1597 }
1598
1599 7914 int sch_start(Scheduler *sch)
1600 {
1601 int ret;
1602
1603 7914 ret = start_prepare(sch);
1604
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7914 times.
7914 if (ret < 0)
1605 return ret;
1606
1607
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7914 times.
7914 av_assert0(sch->state == SCH_STATE_UNINIT);
1608 7914 sch->state = SCH_STATE_STARTED;
1609
1610
2/2
✓ Branch 0 taken 7916 times.
✓ Branch 1 taken 7914 times.
15830 for (unsigned i = 0; i < sch->nb_mux; i++) {
1611 7916 SchMux *mux = &sch->mux[i];
1612
1613
2/2
✓ Branch 0 taken 454 times.
✓ Branch 1 taken 7462 times.
7916 if (mux->nb_streams_ready == mux->nb_streams) {
1614 454 ret = mux_init(sch, mux);
1615
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 454 times.
454 if (ret < 0)
1616 goto fail;
1617 }
1618 }
1619
1620
2/2
✓ Branch 0 taken 7696 times.
✓ Branch 1 taken 7914 times.
15610 for (unsigned i = 0; i < sch->nb_enc; i++) {
1621 7696 SchEnc *enc = &sch->enc[i];
1622
1623 7696 ret = task_start(&enc->task);
1624
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7696 times.
7696 if (ret < 0)
1625 goto fail;
1626 }
1627
1628
2/2
✓ Branch 0 taken 7537 times.
✓ Branch 1 taken 7914 times.
15451 for (unsigned i = 0; i < sch->nb_filters; i++) {
1629 7537 SchFilterGraph *fg = &sch->filters[i];
1630
1631 7537 ret = task_start(&fg->task);
1632
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7537 times.
7537 if (ret < 0)
1633 goto fail;
1634 }
1635
1636
2/2
✓ Branch 0 taken 6754 times.
✓ Branch 1 taken 7914 times.
14668 for (unsigned i = 0; i < sch->nb_dec; i++) {
1637 6754 SchDec *dec = &sch->dec[i];
1638
1639 6754 ret = task_start(&dec->task);
1640
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6754 times.
6754 if (ret < 0)
1641 goto fail;
1642 }
1643
1644
2/2
✓ Branch 0 taken 7164 times.
✓ Branch 1 taken 7914 times.
15078 for (unsigned i = 0; i < sch->nb_demux; i++) {
1645 7164 SchDemux *d = &sch->demux[i];
1646
1647
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 7150 times.
7164 if (!d->nb_streams)
1648 14 continue;
1649
1650 7150 ret = task_start(&d->task);
1651
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7150 times.
7150 if (ret < 0)
1652 goto fail;
1653 }
1654
1655 7914 pthread_mutex_lock(&sch->schedule_lock);
1656 7914 schedule_update_locked(sch);
1657 7914 pthread_mutex_unlock(&sch->schedule_lock);
1658
1659 7914 return 0;
1660 fail:
1661 sch_stop(sch, NULL);
1662 return ret;
1663 }
1664
1665 24293 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1666 {
1667 int ret, err;
1668
1669 // convert delay to absolute timestamp
1670 24293 timeout_us += av_gettime();
1671
1672 24293 pthread_mutex_lock(&sch->mux_done_lock);
1673
1674
1/2
✓ Branch 0 taken 24293 times.
✗ Branch 1 not taken.
24293 if (sch->nb_mux_done < sch->nb_mux) {
1675 24293 struct timespec tv = { .tv_sec = timeout_us / 1000000,
1676 24293 .tv_nsec = (timeout_us % 1000000) * 1000 };
1677 24293 pthread_cond_timedwait(&sch->mux_done_cond, &sch->mux_done_lock, &tv);
1678 }
1679
1680 24293 ret = sch->nb_mux_done == sch->nb_mux;
1681
1682 24293 pthread_mutex_unlock(&sch->mux_done_lock);
1683
1684 24293 *transcode_ts = atomic_load(&sch->last_dts);
1685
1686 // abort transcoding if any task failed
1687 24293 err = atomic_load(&sch->task_failed);
1688
1689
3/4
✓ Branch 0 taken 16379 times.
✓ Branch 1 taken 7914 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 16379 times.
24293 return ret || err;
1690 }
1691
1692 7658 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1693 {
1694 int ret;
1695
1696 7658 ret = enc->open_cb(enc->task.func_arg, frame);
1697
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7658 times.
7658 if (ret < 0)
1698 return ret;
1699
1700 // ret>0 signals audio frame size, which means sync queue must
1701 // have been enabled during encoder creation
1702
2/2
✓ Branch 0 taken 163 times.
✓ Branch 1 taken 7495 times.
7658 if (ret > 0) {
1703 SchSyncQueue *sq;
1704
1705
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 163 times.
163 av_assert0(enc->sq_idx[0] >= 0);
1706 163 sq = &sch->sq_enc[enc->sq_idx[0]];
1707
1708 163 pthread_mutex_lock(&sq->lock);
1709
1710 163 sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1711
1712 163 pthread_mutex_unlock(&sq->lock);
1713 }
1714
1715 7658 return 0;
1716 }
1717
1718 451548 static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1719 {
1720 int ret;
1721
1722
2/2
✓ Branch 0 taken 15527 times.
✓ Branch 1 taken 436021 times.
451548 if (!frame) {
1723 15527 tq_send_finish(enc->queue, 0);
1724 15527 return 0;
1725 }
1726
1727
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 436021 times.
436021 if (enc->in_finished)
1728 return AVERROR_EOF;
1729
1730 436021 ret = tq_send(enc->queue, 0, frame);
1731
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 436020 times.
436021 if (ret < 0)
1732 1 enc->in_finished = 1;
1733
1734 436021 return ret;
1735 }
1736
1737 35511 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1738 {
1739 35511 SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1740 35511 int ret = 0;
1741
1742 // inform the scheduling code that no more input will arrive along this path;
1743 // this is necessary because the sync queue may not send an EOF downstream
1744 // until other streams finish
1745 // TODO: consider a cleaner way of passing this information through
1746 // the pipeline
1747
2/2
✓ Branch 0 taken 3301 times.
✓ Branch 1 taken 32210 times.
35511 if (!frame) {
1748
2/2
✓ Branch 0 taken 3301 times.
✓ Branch 1 taken 3301 times.
6602 for (unsigned i = 0; i < enc->nb_dst; i++) {
1749 SchMux *mux;
1750 SchMuxStream *ms;
1751
1752
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3301 times.
3301 if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1753 continue;
1754
1755 3301 mux = &sch->mux[enc->dst[i].idx];
1756 3301 ms = &mux->streams[enc->dst[i].idx_stream];
1757
1758 3301 pthread_mutex_lock(&sch->schedule_lock);
1759
1760 3301 ms->source_finished = 1;
1761 3301 schedule_update_locked(sch);
1762
1763 3301 pthread_mutex_unlock(&sch->schedule_lock);
1764 }
1765 }
1766
1767 35511 pthread_mutex_lock(&sq->lock);
1768
1769 35511 ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1770
1/2
✓ Branch 0 taken 35511 times.
✗ Branch 1 not taken.
35511 if (ret < 0)
1771 goto finish;
1772
1773 83054 while (1) {
1774 SchEnc *enc;
1775
1776 // TODO: the SQ API should be extended to allow returning EOF
1777 // for individual streams
1778 118565 ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1779
2/2
✓ Branch 0 taken 35511 times.
✓ Branch 1 taken 83054 times.
118565 if (ret < 0) {
1780
2/2
✓ Branch 0 taken 6164 times.
✓ Branch 1 taken 29347 times.
35511 ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1781 35511 break;
1782 }
1783
1784 83054 enc = &sch->enc[sq->enc_idx[ret]];
1785 83054 ret = send_to_enc_thread(sch, enc, sq->frame);
1786
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 83054 times.
83054 if (ret < 0) {
1787 av_frame_unref(sq->frame);
1788 if (ret != AVERROR_EOF)
1789 break;
1790
1791 sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1792 continue;
1793 }
1794 }
1795
1796
2/2
✓ Branch 0 taken 29347 times.
✓ Branch 1 taken 6164 times.
35511 if (ret < 0) {
1797 // close all encoders fed from this sync queue
1798
2/2
✓ Branch 0 taken 6416 times.
✓ Branch 1 taken 6164 times.
12580 for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1799 6416 int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1800
1801 // if the sync queue error is EOF and closing the encoder
1802 // produces a more serious error, make sure to pick the latter
1803
2/4
✓ Branch 0 taken 6416 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 6416 times.
✗ Branch 3 not taken.
6416 ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1804 }
1805 }
1806
1807 35511 finish:
1808 35511 pthread_mutex_unlock(&sq->lock);
1809
1810 35511 return ret;
1811 }
1812
1813 397592 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1814 {
1815
6/6
✓ Branch 0 taken 396675 times.
✓ Branch 1 taken 917 times.
✓ Branch 2 taken 384301 times.
✓ Branch 3 taken 12374 times.
✓ Branch 4 taken 7658 times.
✓ Branch 5 taken 376643 times.
397592 if (enc->open_cb && frame && !enc->opened) {
1816 7658 int ret = enc_open(sch, enc, frame);
1817
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7658 times.
7658 if (ret < 0)
1818 return ret;
1819 7658 enc->opened = 1;
1820
1821 // discard empty frames that only carry encoder init parameters
1822
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 7655 times.
7658 if (!frame->buf[0]) {
1823 3 av_frame_unref(frame);
1824 3 return 0;
1825 }
1826 }
1827
1828 397589 return (enc->sq_idx[0] >= 0) ?
1829
2/2
✓ Branch 0 taken 35511 times.
✓ Branch 1 taken 362078 times.
759667 send_to_enc_sq (sch, enc, frame) :
1830 362078 send_to_enc_thread(sch, enc, frame);
1831 }
1832
1833 3362 static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
1834 {
1835 3362 PreMuxQueue *q = &ms->pre_mux_queue;
1836 3362 AVPacket *tmp_pkt = NULL;
1837 int ret;
1838
1839
2/2
✓ Branch 1 taken 145 times.
✓ Branch 2 taken 3217 times.
3362 if (!av_fifo_can_write(q->fifo)) {
1840 145 size_t packets = av_fifo_can_read(q->fifo);
1841
1/2
✓ Branch 0 taken 145 times.
✗ Branch 1 not taken.
145 size_t pkt_size = pkt ? pkt->size : 0;
1842 145 int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1843
2/2
✓ Branch 0 taken 140 times.
✓ Branch 1 taken 5 times.
145 size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1844 145 size_t new_size = FFMIN(2 * packets, max_packets);
1845
1846
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 145 times.
145 if (new_size <= packets) {
1847 av_log(mux, AV_LOG_ERROR,
1848 "Too many packets buffered for output stream.\n");
1849 return AVERROR(ENOSPC);
1850 }
1851 145 ret = av_fifo_grow2(q->fifo, new_size - packets);
1852
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 145 times.
145 if (ret < 0)
1853 return ret;
1854 }
1855
1856
2/2
✓ Branch 0 taken 3269 times.
✓ Branch 1 taken 93 times.
3362 if (pkt) {
1857 3269 tmp_pkt = av_packet_alloc();
1858
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3269 times.
3269 if (!tmp_pkt)
1859 return AVERROR(ENOMEM);
1860
1861 3269 av_packet_move_ref(tmp_pkt, pkt);
1862 3269 q->data_size += tmp_pkt->size;
1863 }
1864 3362 av_fifo_write(q->fifo, &tmp_pkt, 1);
1865
1866 3362 return 0;
1867 }
1868
1869 505899 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1870 AVPacket *pkt)
1871 {
1872 505899 SchMuxStream *ms = &mux->streams[stream_idx];
1873
2/2
✓ Branch 0 taken 488336 times.
✓ Branch 1 taken 9207 times.
497543 int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1874
2/2
✓ Branch 0 taken 497543 times.
✓ Branch 1 taken 8356 times.
1003442 av_rescale_q(pkt->dts + pkt->duration, pkt->time_base, AV_TIME_BASE_Q) :
1875 AV_NOPTS_VALUE;
1876
1877 // queue the packet if the muxer cannot be started yet
1878
2/2
✓ Branch 0 taken 3402 times.
✓ Branch 1 taken 502497 times.
505899 if (!atomic_load(&mux->mux_started)) {
1879 3402 int queued = 0;
1880
1881 // the muxer could have started between the above atomic check and
1882 // locking the mutex, then this block falls through to normal send path
1883 3402 pthread_mutex_lock(&sch->mux_ready_lock);
1884
1885
2/2
✓ Branch 0 taken 3362 times.
✓ Branch 1 taken 40 times.
3402 if (!atomic_load(&mux->mux_started)) {
1886 3362 int ret = mux_queue_packet(mux, ms, pkt);
1887
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3362 times.
3362 queued = ret < 0 ? ret : 1;
1888 }
1889
1890 3402 pthread_mutex_unlock(&sch->mux_ready_lock);
1891
1892
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3402 times.
3402 if (queued < 0)
1893 return queued;
1894
2/2
✓ Branch 0 taken 3362 times.
✓ Branch 1 taken 40 times.
3402 else if (queued)
1895 3362 goto update_schedule;
1896 }
1897
1898
2/2
✓ Branch 0 taken 494274 times.
✓ Branch 1 taken 8263 times.
502537 if (pkt) {
1899 int ret;
1900
1901
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 494274 times.
494274 if (ms->init_eof)
1902 return AVERROR_EOF;
1903
1904 494274 ret = tq_send(mux->queue, stream_idx, pkt);
1905
2/2
✓ Branch 0 taken 64 times.
✓ Branch 1 taken 494210 times.
494274 if (ret < 0)
1906 64 return ret;
1907 } else
1908 8263 tq_send_finish(mux->queue, stream_idx);
1909
1910 505835 update_schedule:
1911 // TODO: use atomics to check whether this changes trailing dts
1912 // to avoid locking unnecesarily
1913
4/4
✓ Branch 0 taken 17563 times.
✓ Branch 1 taken 488272 times.
✓ Branch 2 taken 8356 times.
✓ Branch 3 taken 9207 times.
505835 if (dts != AV_NOPTS_VALUE || !pkt) {
1914 496628 pthread_mutex_lock(&sch->schedule_lock);
1915
1916
2/2
✓ Branch 0 taken 488272 times.
✓ Branch 1 taken 8356 times.
496628 if (pkt) ms->last_dts = dts;
1917 8356 else ms->source_finished = 1;
1918
1919 496628 schedule_update_locked(sch);
1920
1921 496628 pthread_mutex_unlock(&sch->schedule_lock);
1922 }
1923
1924 505835 return 0;
1925 }
1926
1927 static int
1928 478108 demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst,
1929 uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
1930 {
1931 int ret;
1932
1933
2/2
✓ Branch 0 taken 3433 times.
✓ Branch 1 taken 474675 times.
478108 if (*dst_finished)
1934 3433 return AVERROR_EOF;
1935
1936
4/4
✓ Branch 0 taken 470695 times.
✓ Branch 1 taken 3980 times.
✓ Branch 2 taken 67696 times.
✓ Branch 3 taken 402999 times.
474675 if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
1937
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 67694 times.
67696 (flags & DEMUX_SEND_STREAMCOPY_EOF)) {
1938 2 av_packet_unref(pkt);
1939 2 pkt = NULL;
1940 }
1941
1942
2/2
✓ Branch 0 taken 3982 times.
✓ Branch 1 taken 470693 times.
474675 if (!pkt)
1943 3982 goto finish;
1944
1945 941386 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
1946
2/2
✓ Branch 0 taken 67694 times.
✓ Branch 1 taken 402999 times.
470693 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
1947 402999 tq_send(sch->dec[dst.idx].queue, 0, pkt);
1948
2/2
✓ Branch 0 taken 3431 times.
✓ Branch 1 taken 467262 times.
470693 if (ret == AVERROR_EOF)
1949 3431 goto finish;
1950
1951 467262 return ret;
1952
1953 7413 finish:
1954
2/2
✓ Branch 0 taken 660 times.
✓ Branch 1 taken 6753 times.
7413 if (dst.type == SCH_NODE_TYPE_MUX)
1955 660 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
1956 else
1957 6753 tq_send_finish(sch->dec[dst.idx].queue, 0);
1958
1959 7413 *dst_finished = 1;
1960 7413 return AVERROR_EOF;
1961 }
1962
1963 477647 static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds,
1964 AVPacket *pkt, unsigned flags)
1965 {
1966 477647 unsigned nb_done = 0;
1967
1968
2/2
✓ Branch 0 taken 478108 times.
✓ Branch 1 taken 477647 times.
955755 for (unsigned i = 0; i < ds->nb_dst; i++) {
1969 478108 AVPacket *to_send = pkt;
1970 478108 uint8_t *finished = &ds->dst_finished[i];
1971
1972 int ret;
1973
1974 // sending a packet consumes it, so make a temporary reference if needed
1975
4/4
✓ Branch 0 taken 470695 times.
✓ Branch 1 taken 7413 times.
✓ Branch 2 taken 438 times.
✓ Branch 3 taken 470257 times.
478108 if (pkt && i < ds->nb_dst - 1) {
1976 438 to_send = d->send_pkt;
1977
1978 438 ret = av_packet_ref(to_send, pkt);
1979
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 438 times.
438 if (ret < 0)
1980 return ret;
1981 }
1982
1983 478108 ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
1984
2/2
✓ Branch 0 taken 470695 times.
✓ Branch 1 taken 7413 times.
478108 if (to_send)
1985 470695 av_packet_unref(to_send);
1986
2/2
✓ Branch 0 taken 10846 times.
✓ Branch 1 taken 467262 times.
478108 if (ret == AVERROR_EOF)
1987 10846 nb_done++;
1988
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 467262 times.
467262 else if (ret < 0)
1989 return ret;
1990 }
1991
1992
2/2
✓ Branch 0 taken 10822 times.
✓ Branch 1 taken 466825 times.
477647 return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
1993 }
1994
1995 11 static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
1996 {
1997 11 Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
1998
1999
3/6
✓ Branch 0 taken 11 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 11 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 11 times.
11 av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
2000
2001
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 11 times.
25 for (unsigned i = 0; i < d->nb_streams; i++) {
2002 14 SchDemuxStream *ds = &d->streams[i];
2003
2004
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 14 times.
28 for (unsigned j = 0; j < ds->nb_dst; j++) {
2005 14 const SchedulerNode *dst = &ds->dst[j];
2006 SchDec *dec;
2007 int ret;
2008
2009
3/4
✓ Branch 0 taken 14 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
✓ Branch 3 taken 6 times.
14 if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
2010 8 continue;
2011
2012 6 dec = &sch->dec[dst->idx];
2013
2014 6 ret = tq_send(dec->queue, 0, pkt);
2015
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0)
2016 return ret;
2017
2018
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 3 times.
6 if (dec->queue_end_ts) {
2019 Timestamp ts;
2020 3 ret = av_thread_message_queue_recv(dec->queue_end_ts, &ts, 0);
2021
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret < 0)
2022 return ret;
2023
2024
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (max_end_ts.ts == AV_NOPTS_VALUE ||
2025 (ts.ts != AV_NOPTS_VALUE &&
2026 av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
2027 3 max_end_ts = ts;
2028
2029 }
2030 }
2031 }
2032
2033 11 pkt->pts = max_end_ts.ts;
2034 11 pkt->time_base = max_end_ts.tb;
2035
2036 11 return 0;
2037 }
2038
2039 470312 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
2040 unsigned flags)
2041 {
2042 SchDemux *d;
2043 int terminate;
2044
2045
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 470312 times.
470312 av_assert0(demux_idx < sch->nb_demux);
2046 470312 d = &sch->demux[demux_idx];
2047
2048 470312 terminate = waiter_wait(sch, &d->waiter);
2049
2/2
✓ Branch 0 taken 44 times.
✓ Branch 1 taken 470268 times.
470312 if (terminate)
2050 44 return AVERROR_EXIT;
2051
2052 // flush the downstreams after seek
2053
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 470257 times.
470268 if (pkt->stream_index == -1)
2054 11 return demux_flush(sch, d, pkt);
2055
2056
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 470257 times.
470257 av_assert0(pkt->stream_index < d->nb_streams);
2057
2058 470257 return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
2059 }
2060
2061 7164 static int demux_done(Scheduler *sch, unsigned demux_idx)
2062 {
2063 7164 SchDemux *d = &sch->demux[demux_idx];
2064 7164 int ret = 0;
2065
2066
2/2
✓ Branch 0 taken 7390 times.
✓ Branch 1 taken 7164 times.
14554 for (unsigned i = 0; i < d->nb_streams; i++) {
2067 7390 int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
2068
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7390 times.
7390 if (err != AVERROR_EOF)
2069 ret = err_merge(ret, err);
2070 }
2071
2072 7164 pthread_mutex_lock(&sch->schedule_lock);
2073
2074 7164 d->task_exited = 1;
2075
2076 7164 schedule_update_locked(sch);
2077
2078 7164 pthread_mutex_unlock(&sch->schedule_lock);
2079
2080 7164 return ret;
2081 }
2082
2083 513462 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2084 {
2085 SchMux *mux;
2086 int ret, stream_idx;
2087
2088
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 513462 times.
513462 av_assert0(mux_idx < sch->nb_mux);
2089 513462 mux = &sch->mux[mux_idx];
2090
2091 513462 ret = tq_receive(mux->queue, &stream_idx, pkt);
2092 513462 pkt->stream_index = stream_idx;
2093 513462 return ret;
2094 }
2095
2096 199 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2097 {
2098 SchMux *mux;
2099
2100
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 199 times.
199 av_assert0(mux_idx < sch->nb_mux);
2101 199 mux = &sch->mux[mux_idx];
2102
2103
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 199 times.
199 av_assert0(stream_idx < mux->nb_streams);
2104 199 tq_receive_finish(mux->queue, stream_idx);
2105
2106 199 pthread_mutex_lock(&sch->schedule_lock);
2107 199 mux->streams[stream_idx].source_finished = 1;
2108
2109 199 schedule_update_locked(sch);
2110
2111 199 pthread_mutex_unlock(&sch->schedule_lock);
2112 199 }
2113
2114 460067 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2115 const AVPacket *pkt)
2116 {
2117 SchMux *mux;
2118 SchMuxStream *ms;
2119
2120
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 460067 times.
460067 av_assert0(mux_idx < sch->nb_mux);
2121 460067 mux = &sch->mux[mux_idx];
2122
2123
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 460067 times.
460067 av_assert0(stream_idx < mux->nb_streams);
2124 460067 ms = &mux->streams[stream_idx];
2125
2126
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 460067 times.
460072 for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2127 5 SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2128 int ret;
2129
2130 5 ret = av_packet_copy_props(mux->sub_heartbeat_pkt, pkt);
2131
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
5 if (ret < 0)
2132 return ret;
2133
2134 5 tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2135 }
2136
2137 460067 return 0;
2138 }
2139
2140 7916 static int mux_done(Scheduler *sch, unsigned mux_idx)
2141 {
2142 7916 SchMux *mux = &sch->mux[mux_idx];
2143
2144 7916 pthread_mutex_lock(&sch->schedule_lock);
2145
2146
2/2
✓ Branch 0 taken 8356 times.
✓ Branch 1 taken 7916 times.
16272 for (unsigned i = 0; i < mux->nb_streams; i++) {
2147 8356 tq_receive_finish(mux->queue, i);
2148 8356 mux->streams[i].source_finished = 1;
2149 }
2150
2151 7916 schedule_update_locked(sch);
2152
2153 7916 pthread_mutex_unlock(&sch->schedule_lock);
2154
2155 7916 pthread_mutex_lock(&sch->mux_done_lock);
2156
2157
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7916 times.
7916 av_assert0(sch->nb_mux_done < sch->nb_mux);
2158 7916 sch->nb_mux_done++;
2159
2160 7916 pthread_cond_signal(&sch->mux_done_cond);
2161
2162 7916 pthread_mutex_unlock(&sch->mux_done_lock);
2163
2164 7916 return 0;
2165 }
2166
2167 375988 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2168 {
2169 SchDec *dec;
2170 int ret, dummy;
2171
2172
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 375988 times.
375988 av_assert0(dec_idx < sch->nb_dec);
2173 375988 dec = &sch->dec[dec_idx];
2174
2175 // the decoder should have given us post-flush end timestamp in pkt
2176
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 375985 times.
375988 if (dec->expect_end_ts) {
2177 3 Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2178 3 ret = av_thread_message_queue_send(dec->queue_end_ts, &ts, 0);
2179
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret < 0)
2180 return ret;
2181
2182 3 dec->expect_end_ts = 0;
2183 }
2184
2185 375988 ret = tq_receive(dec->queue, &dummy, pkt);
2186
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 375988 times.
375988 av_assert0(dummy <= 0);
2187
2188 // got a flush packet, on the next call to this function the decoder
2189 // will give us post-flush end timestamp
2190
7/8
✓ Branch 0 taken 372665 times.
✓ Branch 1 taken 3323 times.
✓ Branch 2 taken 958 times.
✓ Branch 3 taken 371707 times.
✓ Branch 4 taken 958 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 3 times.
✓ Branch 7 taken 955 times.
375988 if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2191 3 dec->expect_end_ts = 1;
2192
2193 375988 return ret;
2194 }
2195
2196 404669 static int send_to_filter(Scheduler *sch, SchFilterGraph *fg,
2197 unsigned in_idx, AVFrame *frame)
2198 {
2199
2/2
✓ Branch 0 taken 397895 times.
✓ Branch 1 taken 6774 times.
404669 if (frame)
2200 397895 return tq_send(fg->queue, in_idx, frame);
2201
2202
1/2
✓ Branch 0 taken 6774 times.
✗ Branch 1 not taken.
6774 if (!fg->inputs[in_idx].send_finished) {
2203 6774 fg->inputs[in_idx].send_finished = 1;
2204 6774 tq_send_finish(fg->queue, in_idx);
2205
2206 // close the control stream when all actual inputs are done
2207
2/2
✓ Branch 0 taken 6698 times.
✓ Branch 1 taken 76 times.
6774 if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
2208 6698 tq_send_finish(fg->queue, fg->nb_inputs);
2209 }
2210 6774 return 0;
2211 }
2212
2213 409072 static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2214 uint8_t *dst_finished, AVFrame *frame)
2215 {
2216 int ret;
2217
2218
2/2
✓ Branch 0 taken 6962 times.
✓ Branch 1 taken 402110 times.
409072 if (*dst_finished)
2219 6962 return AVERROR_EOF;
2220
2221
2/2
✓ Branch 0 taken 3336 times.
✓ Branch 1 taken 398774 times.
402110 if (!frame)
2222 3336 goto finish;
2223
2224 797548 ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2225
2/2
✓ Branch 0 taken 397895 times.
✓ Branch 1 taken 879 times.
398774 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2226 879 send_to_enc(sch, &sch->enc[dst.idx], frame);
2227
2/2
✓ Branch 0 taken 3476 times.
✓ Branch 1 taken 395298 times.
398774 if (ret == AVERROR_EOF)
2228 3476 goto finish;
2229
2230 395298 return ret;
2231
2232 6812 finish:
2233
2/2
✓ Branch 0 taken 6774 times.
✓ Branch 1 taken 38 times.
6812 if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2234 6774 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2235 else
2236 38 send_to_enc(sch, &sch->enc[dst.idx], NULL);
2237
2238 6812 *dst_finished = 1;
2239
2240 6812 return AVERROR_EOF;
2241 }
2242
2243 400827 int sch_dec_send(Scheduler *sch, unsigned dec_idx,
2244 unsigned out_idx, AVFrame *frame)
2245 {
2246 SchDec *dec;
2247 SchDecOutput *o;
2248 int ret;
2249 400827 unsigned nb_done = 0;
2250
2251
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 400827 times.
400827 av_assert0(dec_idx < sch->nb_dec);
2252 400827 dec = &sch->dec[dec_idx];
2253
2254
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 400827 times.
400827 av_assert0(out_idx < dec->nb_outputs);
2255 400827 o = &dec->outputs[out_idx];
2256
2257
2/2
✓ Branch 0 taken 402260 times.
✓ Branch 1 taken 400827 times.
803087 for (unsigned i = 0; i < o->nb_dst; i++) {
2258 402260 uint8_t *finished = &o->dst_finished[i];
2259 402260 AVFrame *to_send = frame;
2260
2261 // sending a frame consumes it, so make a temporary reference if needed
2262
2/2
✓ Branch 0 taken 1433 times.
✓ Branch 1 taken 400827 times.
402260 if (i < o->nb_dst - 1) {
2263 1433 to_send = dec->send_frame;
2264
2265 // frame may sometimes contain props only,
2266 // e.g. to signal EOF timestamp
2267
2/2
✓ Branch 0 taken 1335 times.
✓ Branch 1 taken 98 times.
1433 ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2268 98 av_frame_copy_props(to_send, frame);
2269
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1433 times.
1433 if (ret < 0)
2270 return ret;
2271 }
2272
2273 402260 ret = dec_send_to_dst(sch, o->dst[i], finished, to_send);
2274
2/2
✓ Branch 0 taken 6962 times.
✓ Branch 1 taken 395298 times.
402260 if (ret < 0) {
2275 6962 av_frame_unref(to_send);
2276
1/2
✓ Branch 0 taken 6962 times.
✗ Branch 1 not taken.
6962 if (ret == AVERROR_EOF) {
2277 6962 nb_done++;
2278 6962 continue;
2279 }
2280 return ret;
2281 }
2282 }
2283
2284
2/2
✓ Branch 0 taken 6874 times.
✓ Branch 1 taken 393953 times.
400827 return (nb_done == o->nb_dst) ? AVERROR_EOF : 0;
2285 }
2286
2287 6754 static int dec_done(Scheduler *sch, unsigned dec_idx)
2288 {
2289 6754 SchDec *dec = &sch->dec[dec_idx];
2290 6754 int ret = 0;
2291
2292 6754 tq_receive_finish(dec->queue, 0);
2293
2294 // make sure our source does not get stuck waiting for end timestamps
2295 // that will never arrive
2296
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6753 times.
6754 if (dec->queue_end_ts)
2297 1 av_thread_message_queue_set_err_recv(dec->queue_end_ts, AVERROR_EOF);
2298
2299
2/2
✓ Branch 0 taken 6760 times.
✓ Branch 1 taken 6754 times.
13514 for (unsigned i = 0; i < dec->nb_outputs; i++) {
2300 6760 SchDecOutput *o = &dec->outputs[i];
2301
2302
2/2
✓ Branch 0 taken 6812 times.
✓ Branch 1 taken 6760 times.
13572 for (unsigned j = 0; j < o->nb_dst; j++) {
2303 6812 int err = dec_send_to_dst(sch, o->dst[j], &o->dst_finished[j], NULL);
2304
2/4
✓ Branch 0 taken 6812 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6812 times.
6812 if (err < 0 && err != AVERROR_EOF)
2305 ret = err_merge(ret, err);
2306 }
2307 }
2308
2309 6754 return ret;
2310 }
2311
2312 443713 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2313 {
2314 SchEnc *enc;
2315 int ret, dummy;
2316
2317
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 443713 times.
443713 av_assert0(enc_idx < sch->nb_enc);
2318 443713 enc = &sch->enc[enc_idx];
2319
2320 443713 ret = tq_receive(enc->queue, &dummy, frame);
2321
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 443713 times.
443713 av_assert0(dummy <= 0);
2322
2323 443713 return ret;
2324 }
2325
2326 437638 static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2327 uint8_t *dst_finished, AVPacket *pkt)
2328 {
2329 int ret;
2330
2331
2/2
✓ Branch 0 taken 44 times.
✓ Branch 1 taken 437594 times.
437638 if (*dst_finished)
2332 44 return AVERROR_EOF;
2333
2334
2/2
✓ Branch 0 taken 7695 times.
✓ Branch 1 taken 429899 times.
437594 if (!pkt)
2335 7695 goto finish;
2336
2337 859798 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2338
2/2
✓ Branch 0 taken 429849 times.
✓ Branch 1 taken 50 times.
429899 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2339 50 tq_send(sch->dec[dst.idx].queue, 0, pkt);
2340
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 429897 times.
429899 if (ret == AVERROR_EOF)
2341 2 goto finish;
2342
2343 429897 return ret;
2344
2345 7697 finish:
2346
2/2
✓ Branch 0 taken 7696 times.
✓ Branch 1 taken 1 times.
7697 if (dst.type == SCH_NODE_TYPE_MUX)
2347 7696 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2348 else
2349 1 tq_send_finish(sch->dec[dst.idx].queue, 0);
2350
2351 7697 *dst_finished = 1;
2352
2353 7697 return AVERROR_EOF;
2354 }
2355
2356 429891 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2357 {
2358 SchEnc *enc;
2359 int ret;
2360
2361
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 429891 times.
429891 av_assert0(enc_idx < sch->nb_enc);
2362 429891 enc = &sch->enc[enc_idx];
2363
2364
2/2
✓ Branch 0 taken 429941 times.
✓ Branch 1 taken 429891 times.
859832 for (unsigned i = 0; i < enc->nb_dst; i++) {
2365 429941 uint8_t *finished = &enc->dst_finished[i];
2366 429941 AVPacket *to_send = pkt;
2367
2368 // sending a packet consumes it, so make a temporary reference if needed
2369
2/2
✓ Branch 0 taken 50 times.
✓ Branch 1 taken 429891 times.
429941 if (i < enc->nb_dst - 1) {
2370 50 to_send = enc->send_pkt;
2371
2372 50 ret = av_packet_ref(to_send, pkt);
2373
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50 times.
50 if (ret < 0)
2374 return ret;
2375 }
2376
2377 429941 ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2378
2/2
✓ Branch 0 taken 44 times.
✓ Branch 1 taken 429897 times.
429941 if (ret < 0) {
2379 44 av_packet_unref(to_send);
2380
1/2
✓ Branch 0 taken 44 times.
✗ Branch 1 not taken.
44 if (ret == AVERROR_EOF)
2381 44 continue;
2382 return ret;
2383 }
2384 }
2385
2386 429891 return 0;
2387 }
2388
2389 7696 static int enc_done(Scheduler *sch, unsigned enc_idx)
2390 {
2391 7696 SchEnc *enc = &sch->enc[enc_idx];
2392 7696 int ret = 0;
2393
2394 7696 tq_receive_finish(enc->queue, 0);
2395
2396
2/2
✓ Branch 0 taken 7697 times.
✓ Branch 1 taken 7696 times.
15393 for (unsigned i = 0; i < enc->nb_dst; i++) {
2397 7697 int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2398
2/4
✓ Branch 0 taken 7697 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7697 times.
7697 if (err < 0 && err != AVERROR_EOF)
2399 ret = err_merge(ret, err);
2400 }
2401
2402 7696 return ret;
2403 }
2404
2405 388590 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
2406 unsigned *in_idx, AVFrame *frame)
2407 {
2408 SchFilterGraph *fg;
2409
2410
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 388590 times.
388590 av_assert0(fg_idx < sch->nb_filters);
2411 388590 fg = &sch->filters[fg_idx];
2412
2413
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 388590 times.
388590 av_assert0(*in_idx <= fg->nb_inputs);
2414
2415 // update scheduling to account for desired input stream, if it changed
2416 //
2417 // this check needs no locking because only the filtering thread
2418 // updates this value
2419
2/2
✓ Branch 0 taken 907 times.
✓ Branch 1 taken 387683 times.
388590 if (*in_idx != fg->best_input) {
2420 907 pthread_mutex_lock(&sch->schedule_lock);
2421
2422 907 fg->best_input = *in_idx;
2423 907 schedule_update_locked(sch);
2424
2425 907 pthread_mutex_unlock(&sch->schedule_lock);
2426 }
2427
2428
2/2
✓ Branch 0 taken 367627 times.
✓ Branch 1 taken 20963 times.
388590 if (*in_idx == fg->nb_inputs) {
2429 20963 int terminate = waiter_wait(sch, &fg->waiter);
2430
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20963 times.
20963 return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
2431 }
2432
2433 26 while (1) {
2434 int ret, idx;
2435
2436 367653 ret = tq_receive(fg->queue, &idx, frame);
2437
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 367647 times.
367653 if (idx < 0)
2438 367627 return AVERROR_EOF;
2439
2/2
✓ Branch 0 taken 367621 times.
✓ Branch 1 taken 26 times.
367647 else if (ret >= 0) {
2440 367621 *in_idx = idx;
2441 367621 return 0;
2442 }
2443
2444 // disregard EOFs for specific streams - they should always be
2445 // preceded by an EOF frame
2446 }
2447 }
2448
2449 1 void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
2450 {
2451 SchFilterGraph *fg;
2452 SchFilterIn *fi;
2453
2454
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(fg_idx < sch->nb_filters);
2455 1 fg = &sch->filters[fg_idx];
2456
2457
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(in_idx < fg->nb_inputs);
2458 1 fi = &fg->inputs[in_idx];
2459
2460
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!fi->receive_finished) {
2461 1 fi->receive_finished = 1;
2462 1 tq_receive_finish(fg->queue, in_idx);
2463
2464 // close the control stream when all actual inputs are done
2465
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
2466 tq_receive_finish(fg->queue, fg->nb_inputs);
2467 }
2468 1 }
2469
2470 389017 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
2471 {
2472 SchFilterGraph *fg;
2473 SchedulerNode dst;
2474
2475
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 389017 times.
389017 av_assert0(fg_idx < sch->nb_filters);
2476 389017 fg = &sch->filters[fg_idx];
2477
2478
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 389017 times.
389017 av_assert0(out_idx < fg->nb_outputs);
2479 389017 dst = fg->outputs[out_idx].dst;
2480
2481 389017 return (dst.type == SCH_NODE_TYPE_ENC) ?
2482
1/2
✓ Branch 0 taken 389017 times.
✗ Branch 1 not taken.
389017 send_to_enc (sch, &sch->enc[dst.idx], frame) :
2483 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
2484 }
2485
2486 7537 static int filter_done(Scheduler *sch, unsigned fg_idx)
2487 {
2488 7537 SchFilterGraph *fg = &sch->filters[fg_idx];
2489 7537 int ret = 0;
2490
2491
2/2
✓ Branch 0 taken 14311 times.
✓ Branch 1 taken 7537 times.
21848 for (unsigned i = 0; i <= fg->nb_inputs; i++)
2492 14311 tq_receive_finish(fg->queue, i);
2493
2494
2/2
✓ Branch 0 taken 7658 times.
✓ Branch 1 taken 7537 times.
15195 for (unsigned i = 0; i < fg->nb_outputs; i++) {
2495 7658 SchedulerNode dst = fg->outputs[i].dst;
2496 15316 int err = (dst.type == SCH_NODE_TYPE_ENC) ?
2497
1/2
✓ Branch 0 taken 7658 times.
✗ Branch 1 not taken.
7658 send_to_enc (sch, &sch->enc[dst.idx], NULL) :
2498 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2499
2500
3/4
✓ Branch 0 taken 3096 times.
✓ Branch 1 taken 4562 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 3096 times.
7658 if (err < 0 && err != AVERROR_EOF)
2501 ret = err_merge(ret, err);
2502 }
2503
2504 7537 pthread_mutex_lock(&sch->schedule_lock);
2505
2506 7537 fg->task_exited = 1;
2507
2508 7537 schedule_update_locked(sch);
2509
2510 7537 pthread_mutex_unlock(&sch->schedule_lock);
2511
2512 7537 return ret;
2513 }
2514
2515 int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
2516 {
2517 SchFilterGraph *fg;
2518
2519 av_assert0(fg_idx < sch->nb_filters);
2520 fg = &sch->filters[fg_idx];
2521
2522 return send_to_filter(sch, fg, fg->nb_inputs, frame);
2523 }
2524
2525 37067 static int task_cleanup(Scheduler *sch, SchedulerNode node)
2526 {
2527
5/6
✓ Branch 0 taken 7164 times.
✓ Branch 1 taken 7916 times.
✓ Branch 2 taken 6754 times.
✓ Branch 3 taken 7696 times.
✓ Branch 4 taken 7537 times.
✗ Branch 5 not taken.
37067 switch (node.type) {
2528 7164 case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
2529 7916 case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
2530 6754 case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
2531 7696 case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
2532 7537 case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
2533 default: av_assert0(0);
2534 }
2535 }
2536
2537 37053 static void *task_wrapper(void *arg)
2538 {
2539 37053 SchTask *task = arg;
2540 37053 Scheduler *sch = task->parent;
2541 int ret;
2542 37053 int err = 0;
2543
2544 37053 ret = task->func(task->func_arg);
2545
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 37052 times.
37053 if (ret < 0)
2546 1 av_log(task->func_arg, AV_LOG_ERROR,
2547 1 "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
2548
2549 37053 err = task_cleanup(sch, task->node);
2550 37053 ret = err_merge(ret, err);
2551
2552 // EOF is considered normal termination
2553
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 37053 times.
37053 if (ret == AVERROR_EOF)
2554 ret = 0;
2555
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 37052 times.
37053 if (ret < 0)
2556 1 atomic_store(&sch->task_failed, 1);
2557
2558
4/4
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 37052 times.
✓ Branch 2 taken 1 times.
✓ Branch 3 taken 37052 times.
37054 av_log(task->func_arg, ret < 0 ? AV_LOG_ERROR : AV_LOG_VERBOSE,
2559 "Terminating thread with return code %d (%s)\n", ret,
2560 1 ret < 0 ? av_err2str(ret) : "success");
2561
2562 37053 return (void*)(intptr_t)ret;
2563 }
2564
2565 37067 static int task_stop(Scheduler *sch, SchTask *task)
2566 {
2567 int ret;
2568 void *thread_ret;
2569
2570
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 37053 times.
37067 if (!task->thread_running)
2571 14 return task_cleanup(sch, task->node);
2572
2573 37053 ret = pthread_join(task->thread, &thread_ret);
2574
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 37053 times.
37053 av_assert0(ret == 0);
2575
2576 37053 task->thread_running = 0;
2577
2578 37053 return (intptr_t)thread_ret;
2579 }
2580
2581 15829 int sch_stop(Scheduler *sch, int64_t *finish_ts)
2582 {
2583 15829 int ret = 0, err;
2584
2585
2/2
✓ Branch 0 taken 7915 times.
✓ Branch 1 taken 7914 times.
15829 if (sch->state != SCH_STATE_STARTED)
2586 7915 return 0;
2587
2588 7914 atomic_store(&sch->terminate, 1);
2589
2590
2/2
✓ Branch 0 taken 15828 times.
✓ Branch 1 taken 7914 times.
23742 for (unsigned type = 0; type < 2; type++)
2591
4/4
✓ Branch 0 taken 15078 times.
✓ Branch 1 taken 15451 times.
✓ Branch 2 taken 14701 times.
✓ Branch 3 taken 15828 times.
30529 for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
2592
2/2
✓ Branch 0 taken 7164 times.
✓ Branch 1 taken 7537 times.
14701 SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
2593 14701 waiter_set(w, 1);
2594 }
2595
2596
2/2
✓ Branch 0 taken 7164 times.
✓ Branch 1 taken 7914 times.
15078 for (unsigned i = 0; i < sch->nb_demux; i++) {
2597 7164 SchDemux *d = &sch->demux[i];
2598
2599 7164 err = task_stop(sch, &d->task);
2600 7164 ret = err_merge(ret, err);
2601 }
2602
2603
2/2
✓ Branch 0 taken 6754 times.
✓ Branch 1 taken 7914 times.
14668 for (unsigned i = 0; i < sch->nb_dec; i++) {
2604 6754 SchDec *dec = &sch->dec[i];
2605
2606 6754 err = task_stop(sch, &dec->task);
2607 6754 ret = err_merge(ret, err);
2608 }
2609
2610
2/2
✓ Branch 0 taken 7537 times.
✓ Branch 1 taken 7914 times.
15451 for (unsigned i = 0; i < sch->nb_filters; i++) {
2611 7537 SchFilterGraph *fg = &sch->filters[i];
2612
2613 7537 err = task_stop(sch, &fg->task);
2614 7537 ret = err_merge(ret, err);
2615 }
2616
2617
2/2
✓ Branch 0 taken 7696 times.
✓ Branch 1 taken 7914 times.
15610 for (unsigned i = 0; i < sch->nb_enc; i++) {
2618 7696 SchEnc *enc = &sch->enc[i];
2619
2620 7696 err = task_stop(sch, &enc->task);
2621 7696 ret = err_merge(ret, err);
2622 }
2623
2624
2/2
✓ Branch 0 taken 7916 times.
✓ Branch 1 taken 7914 times.
15830 for (unsigned i = 0; i < sch->nb_mux; i++) {
2625 7916 SchMux *mux = &sch->mux[i];
2626
2627 7916 err = task_stop(sch, &mux->task);
2628 7916 ret = err_merge(ret, err);
2629 }
2630
2631
1/2
✓ Branch 0 taken 7914 times.
✗ Branch 1 not taken.
7914 if (finish_ts)
2632 7914 *finish_ts = trailing_dts(sch, 1);
2633
2634 7914 sch->state = SCH_STATE_STOPPED;
2635
2636 7914 return ret;
2637 }
2638