FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/fftools/ffmpeg_sched.c
Date: 2024-05-03 15:42:48
Exec Total Coverage
Lines: 1033 1187 87.0%
Functions: 64 66 97.0%
Branches: 567 789 71.9%

Line Branch Exec Source
1 /*
2 * Inter-thread scheduling/synchronization.
3 * Copyright (c) 2023 Anton Khirnov
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31
32 #include "libavcodec/packet.h"
33
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
40 #include "libavutil/threadmessage.h"
41 #include "libavutil/time.h"
42
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
46
47 enum QueueType {
48 QUEUE_PACKETS,
49 QUEUE_FRAMES,
50 };
51
52 typedef struct SchWaiter {
53 pthread_mutex_t lock;
54 pthread_cond_t cond;
55 atomic_int choked;
56
57 // the following are internal state of schedule_update_locked() and must not
58 // be accessed outside of it
59 int choked_prev;
60 int choked_next;
61 } SchWaiter;
62
63 typedef struct SchTask {
64 Scheduler *parent;
65 SchedulerNode node;
66
67 SchThreadFunc func;
68 void *func_arg;
69
70 pthread_t thread;
71 int thread_running;
72 } SchTask;
73
74 typedef struct SchDec {
75 const AVClass *class;
76
77 SchedulerNode src;
78 SchedulerNode *dst;
79 uint8_t *dst_finished;
80 unsigned nb_dst;
81
82 SchTask task;
83 // Queue for receiving input packets, one stream.
84 ThreadQueue *queue;
85
86 // Queue for sending post-flush end timestamps back to the source
87 AVThreadMessageQueue *queue_end_ts;
88 int expect_end_ts;
89
90 // temporary storage used by sch_dec_send()
91 AVFrame *send_frame;
92 } SchDec;
93
94 typedef struct SchSyncQueue {
95 SyncQueue *sq;
96 AVFrame *frame;
97 pthread_mutex_t lock;
98
99 unsigned *enc_idx;
100 unsigned nb_enc_idx;
101 } SchSyncQueue;
102
103 typedef struct SchEnc {
104 const AVClass *class;
105
106 SchedulerNode src;
107 SchedulerNode *dst;
108 uint8_t *dst_finished;
109 unsigned nb_dst;
110
111 // [0] - index of the sync queue in Scheduler.sq_enc,
112 // [1] - index of this encoder in the sq
113 int sq_idx[2];
114
115 /* Opening encoders is somewhat nontrivial due to their interaction with
116 * sync queues, which are (among other things) responsible for maintaining
117 * constant audio frame size, when it is required by the encoder.
118 *
119 * Opening the encoder requires stream parameters, obtained from the first
120 * frame. However, that frame cannot be properly chunked by the sync queue
121 * without knowing the required frame size, which is only available after
122 * opening the encoder.
123 *
124 * This apparent circular dependency is resolved in the following way:
125 * - the caller creating the encoder gives us a callback which opens the
126 * encoder and returns the required frame size (if any)
127 * - when the first frame is sent to the encoder, the sending thread
128 * - calls this callback, opening the encoder
129 * - passes the returned frame size to the sync queue
130 */
131 int (*open_cb)(void *opaque, const AVFrame *frame);
132 int opened;
133
134 SchTask task;
135 // Queue for receiving input frames, one stream.
136 ThreadQueue *queue;
137 // tq_send() to queue returned EOF
138 int in_finished;
139
140 // temporary storage used by sch_enc_send()
141 AVPacket *send_pkt;
142 } SchEnc;
143
144 typedef struct SchDemuxStream {
145 SchedulerNode *dst;
146 uint8_t *dst_finished;
147 unsigned nb_dst;
148 } SchDemuxStream;
149
150 typedef struct SchDemux {
151 const AVClass *class;
152
153 SchDemuxStream *streams;
154 unsigned nb_streams;
155
156 SchTask task;
157 SchWaiter waiter;
158
159 // temporary storage used by sch_demux_send()
160 AVPacket *send_pkt;
161
162 // protected by schedule_lock
163 int task_exited;
164 } SchDemux;
165
166 typedef struct PreMuxQueue {
167 /**
168 * Queue for buffering the packets before the muxer task can be started.
169 */
170 AVFifo *fifo;
171 /**
172 * Maximum number of packets in fifo.
173 */
174 int max_packets;
175 /*
176 * The size of the AVPackets' buffers in queue.
177 * Updated when a packet is either pushed or pulled from the queue.
178 */
179 size_t data_size;
180 /* Threshold after which max_packets will be in effect */
181 size_t data_threshold;
182 } PreMuxQueue;
183
184 typedef struct SchMuxStream {
185 SchedulerNode src;
186 SchedulerNode src_sched;
187
188 unsigned *sub_heartbeat_dst;
189 unsigned nb_sub_heartbeat_dst;
190
191 PreMuxQueue pre_mux_queue;
192
193 // an EOF was generated while flushing the pre-mux queue
194 int init_eof;
195
196 ////////////////////////////////////////////////////////////
197 // The following are protected by Scheduler.schedule_lock //
198
199 /* dts+duration of the last packet sent to this stream
200 in AV_TIME_BASE_Q */
201 int64_t last_dts;
202 // this stream no longer accepts input
203 int source_finished;
204 ////////////////////////////////////////////////////////////
205 } SchMuxStream;
206
207 typedef struct SchMux {
208 const AVClass *class;
209
210 SchMuxStream *streams;
211 unsigned nb_streams;
212 unsigned nb_streams_ready;
213
214 int (*init)(void *arg);
215
216 SchTask task;
217 /**
218 * Set to 1 after starting the muxer task and flushing the
219 * pre-muxing queues.
220 * Set either before any tasks have started, or with
221 * Scheduler.mux_ready_lock held.
222 */
223 atomic_int mux_started;
224 ThreadQueue *queue;
225 unsigned queue_size;
226
227 AVPacket *sub_heartbeat_pkt;
228 } SchMux;
229
230 typedef struct SchFilterIn {
231 SchedulerNode src;
232 SchedulerNode src_sched;
233 int send_finished;
234 int receive_finished;
235 } SchFilterIn;
236
237 typedef struct SchFilterOut {
238 SchedulerNode dst;
239 } SchFilterOut;
240
241 typedef struct SchFilterGraph {
242 const AVClass *class;
243
244 SchFilterIn *inputs;
245 unsigned nb_inputs;
246 atomic_uint nb_inputs_finished_send;
247 unsigned nb_inputs_finished_receive;
248
249 SchFilterOut *outputs;
250 unsigned nb_outputs;
251
252 SchTask task;
253 // input queue, nb_inputs+1 streams
254 // last stream is control
255 ThreadQueue *queue;
256 SchWaiter waiter;
257
258 // protected by schedule_lock
259 unsigned best_input;
260 int task_exited;
261 } SchFilterGraph;
262
263 enum SchedulerState {
264 SCH_STATE_UNINIT,
265 SCH_STATE_STARTED,
266 SCH_STATE_STOPPED,
267 };
268
269 struct Scheduler {
270 const AVClass *class;
271
272 SchDemux *demux;
273 unsigned nb_demux;
274
275 SchMux *mux;
276 unsigned nb_mux;
277
278 unsigned nb_mux_ready;
279 pthread_mutex_t mux_ready_lock;
280
281 unsigned nb_mux_done;
282 pthread_mutex_t mux_done_lock;
283 pthread_cond_t mux_done_cond;
284
285
286 SchDec *dec;
287 unsigned nb_dec;
288
289 SchEnc *enc;
290 unsigned nb_enc;
291
292 SchSyncQueue *sq_enc;
293 unsigned nb_sq_enc;
294
295 SchFilterGraph *filters;
296 unsigned nb_filters;
297
298 char *sdp_filename;
299 int sdp_auto;
300
301 enum SchedulerState state;
302 atomic_int terminate;
303 atomic_int task_failed;
304
305 pthread_mutex_t schedule_lock;
306
307 atomic_int_least64_t last_dts;
308 };
309
310 /**
311 * Wait until this task is allowed to proceed.
312 *
313 * @retval 0 the caller should proceed
314 * @retval 1 the caller should terminate
315 */
316 456560 static int waiter_wait(Scheduler *sch, SchWaiter *w)
317 {
318 int terminate;
319
320
2/2
✓ Branch 0 taken 456309 times.
✓ Branch 1 taken 251 times.
456560 if (!atomic_load(&w->choked))
321 456309 return 0;
322
323 251 pthread_mutex_lock(&w->lock);
324
325
4/4
✓ Branch 0 taken 252 times.
✓ Branch 1 taken 205 times.
✓ Branch 2 taken 206 times.
✓ Branch 3 taken 46 times.
457 while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
326 206 pthread_cond_wait(&w->cond, &w->lock);
327
328 251 terminate = atomic_load(&sch->terminate);
329
330 251 pthread_mutex_unlock(&w->lock);
331
332 251 return terminate;
333 }
334
335 30618 static void waiter_set(SchWaiter *w, int choked)
336 {
337 30618 pthread_mutex_lock(&w->lock);
338
339 30618 atomic_store(&w->choked, choked);
340 30618 pthread_cond_signal(&w->cond);
341
342 30618 pthread_mutex_unlock(&w->lock);
343 30618 }
344
345 13174 static int waiter_init(SchWaiter *w)
346 {
347 int ret;
348
349 13174 atomic_init(&w->choked, 0);
350
351 13174 ret = pthread_mutex_init(&w->lock, NULL);
352
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13174 times.
13174 if (ret)
353 return AVERROR(ret);
354
355 13174 ret = pthread_cond_init(&w->cond, NULL);
356
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13174 times.
13174 if (ret)
357 return AVERROR(ret);
358
359 13174 return 0;
360 }
361
362 13174 static void waiter_uninit(SchWaiter *w)
363 {
364 13174 pthread_mutex_destroy(&w->lock);
365 13174 pthread_cond_destroy(&w->cond);
366 13174 }
367
368 26008 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
369 enum QueueType type)
370 {
371 ThreadQueue *tq;
372 ObjPool *op;
373
374
1/2
✓ Branch 0 taken 26008 times.
✗ Branch 1 not taken.
26008 if (queue_size <= 0) {
375
2/2
✓ Branch 0 taken 12856 times.
✓ Branch 1 taken 13152 times.
26008 if (type == QUEUE_FRAMES)
376 12856 queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
377 else
378 13152 queue_size = DEFAULT_PACKET_THREAD_QUEUE_SIZE;
379 }
380
381
2/2
✓ Branch 0 taken 12856 times.
✓ Branch 1 taken 13152 times.
26008 if (type == QUEUE_FRAMES) {
382 // This queue length is used in the decoder code to ensure that
383 // there are enough entries in fixed-size frame pools to account
384 // for frames held in queues inside the ffmpeg utility. If this
385 // can ever dynamically change then the corresponding decode
386 // code needs to be updated as well.
387
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12856 times.
12856 av_assert0(queue_size == DEFAULT_FRAME_THREAD_QUEUE_SIZE);
388 }
389
390
2/2
✓ Branch 0 taken 13152 times.
✓ Branch 1 taken 12856 times.
26008 op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() :
391 12856 objpool_alloc_frames();
392
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26008 times.
26008 if (!op)
393 return AVERROR(ENOMEM);
394
395
2/2
✓ Branch 0 taken 13152 times.
✓ Branch 1 taken 12856 times.
26008 tq = tq_alloc(nb_streams, queue_size, op,
396 (type == QUEUE_PACKETS) ? pkt_move : frame_move);
397
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26008 times.
26008 if (!tq) {
398 objpool_free(&op);
399 return AVERROR(ENOMEM);
400 }
401
402 26008 *ptq = tq;
403 26008 return 0;
404 }
405
406 static void *task_wrapper(void *arg);
407
408 32784 static int task_start(SchTask *task)
409 {
410 int ret;
411
412 32784 av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
413
414
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 32784 times.
32784 av_assert0(!task->thread_running);
415
416 32784 ret = pthread_create(&task->thread, NULL, task_wrapper, task);
417
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 32784 times.
32784 if (ret) {
418 av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
419 strerror(ret));
420 return AVERROR(ret);
421 }
422
423 32784 task->thread_running = 1;
424 32784 return 0;
425 }
426
427 32798 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
428 SchThreadFunc func, void *func_arg)
429 {
430 32798 task->parent = sch;
431
432 32798 task->node.type = type;
433 32798 task->node.idx = idx;
434
435 32798 task->func = func;
436 32798 task->func_arg = func_arg;
437 32798 }
438
439 502477 static int64_t trailing_dts(const Scheduler *sch, int count_finished)
440 {
441 502477 int64_t min_dts = INT64_MAX;
442
443
2/2
✓ Branch 0 taken 502734 times.
✓ Branch 1 taken 488638 times.
991372 for (unsigned i = 0; i < sch->nb_mux; i++) {
444 502734 const SchMux *mux = &sch->mux[i];
445
446
2/2
✓ Branch 0 taken 540044 times.
✓ Branch 1 taken 488895 times.
1028939 for (unsigned j = 0; j < mux->nb_streams; j++) {
447 540044 const SchMuxStream *ms = &mux->streams[j];
448
449
4/4
✓ Branch 0 taken 36733 times.
✓ Branch 1 taken 503311 times.
✓ Branch 2 taken 29658 times.
✓ Branch 3 taken 7075 times.
540044 if (ms->source_finished && !count_finished)
450 29658 continue;
451
2/2
✓ Branch 0 taken 13839 times.
✓ Branch 1 taken 496547 times.
510386 if (ms->last_dts == AV_NOPTS_VALUE)
452 13839 return AV_NOPTS_VALUE;
453
454 496547 min_dts = FFMIN(min_dts, ms->last_dts);
455 }
456 }
457
458
2/2
✓ Branch 0 taken 464103 times.
✓ Branch 1 taken 24535 times.
488638 return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
459 }
460
461 6760 void sch_free(Scheduler **psch)
462 {
463 6760 Scheduler *sch = *psch;
464
465
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6760 times.
6760 if (!sch)
466 return;
467
468 6760 sch_stop(sch, NULL);
469
470
2/2
✓ Branch 0 taken 6790 times.
✓ Branch 1 taken 6760 times.
13550 for (unsigned i = 0; i < sch->nb_demux; i++) {
471 6790 SchDemux *d = &sch->demux[i];
472
473
2/2
✓ Branch 0 taken 7006 times.
✓ Branch 1 taken 6790 times.
13796 for (unsigned j = 0; j < d->nb_streams; j++) {
474 7006 SchDemuxStream *ds = &d->streams[j];
475 7006 av_freep(&ds->dst);
476 7006 av_freep(&ds->dst_finished);
477 }
478 6790 av_freep(&d->streams);
479
480 6790 av_packet_free(&d->send_pkt);
481
482 6790 waiter_uninit(&d->waiter);
483 }
484 6760 av_freep(&sch->demux);
485
486
2/2
✓ Branch 0 taken 6761 times.
✓ Branch 1 taken 6760 times.
13521 for (unsigned i = 0; i < sch->nb_mux; i++) {
487 6761 SchMux *mux = &sch->mux[i];
488
489
2/2
✓ Branch 0 taken 7111 times.
✓ Branch 1 taken 6761 times.
13872 for (unsigned j = 0; j < mux->nb_streams; j++) {
490 7111 SchMuxStream *ms = &mux->streams[j];
491
492
1/2
✓ Branch 0 taken 7111 times.
✗ Branch 1 not taken.
7111 if (ms->pre_mux_queue.fifo) {
493 AVPacket *pkt;
494
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 7111 times.
7111 while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
495 av_packet_free(&pkt);
496 7111 av_fifo_freep2(&ms->pre_mux_queue.fifo);
497 }
498
499 7111 av_freep(&ms->sub_heartbeat_dst);
500 }
501 6761 av_freep(&mux->streams);
502
503 6761 av_packet_free(&mux->sub_heartbeat_pkt);
504
505 6761 tq_free(&mux->queue);
506 }
507 6760 av_freep(&sch->mux);
508
509
2/2
✓ Branch 0 taken 6391 times.
✓ Branch 1 taken 6760 times.
13151 for (unsigned i = 0; i < sch->nb_dec; i++) {
510 6391 SchDec *dec = &sch->dec[i];
511
512 6391 tq_free(&dec->queue);
513
514 6391 av_thread_message_queue_free(&dec->queue_end_ts);
515
516 6391 av_freep(&dec->dst);
517 6391 av_freep(&dec->dst_finished);
518
519 6391 av_frame_free(&dec->send_frame);
520 }
521 6760 av_freep(&sch->dec);
522
523
2/2
✓ Branch 0 taken 6472 times.
✓ Branch 1 taken 6760 times.
13232 for (unsigned i = 0; i < sch->nb_enc; i++) {
524 6472 SchEnc *enc = &sch->enc[i];
525
526 6472 tq_free(&enc->queue);
527
528 6472 av_packet_free(&enc->send_pkt);
529
530 6472 av_freep(&enc->dst);
531 6472 av_freep(&enc->dst_finished);
532 }
533 6760 av_freep(&sch->enc);
534
535
2/2
✓ Branch 0 taken 2776 times.
✓ Branch 1 taken 6760 times.
9536 for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
536 2776 SchSyncQueue *sq = &sch->sq_enc[i];
537 2776 sq_free(&sq->sq);
538 2776 av_frame_free(&sq->frame);
539 2776 pthread_mutex_destroy(&sq->lock);
540 2776 av_freep(&sq->enc_idx);
541 }
542 6760 av_freep(&sch->sq_enc);
543
544
2/2
✓ Branch 0 taken 6384 times.
✓ Branch 1 taken 6760 times.
13144 for (unsigned i = 0; i < sch->nb_filters; i++) {
545 6384 SchFilterGraph *fg = &sch->filters[i];
546
547 6384 tq_free(&fg->queue);
548
549 6384 av_freep(&fg->inputs);
550 6384 av_freep(&fg->outputs);
551
552 6384 waiter_uninit(&fg->waiter);
553 }
554 6760 av_freep(&sch->filters);
555
556 6760 av_freep(&sch->sdp_filename);
557
558 6760 pthread_mutex_destroy(&sch->schedule_lock);
559
560 6760 pthread_mutex_destroy(&sch->mux_ready_lock);
561
562 6760 pthread_mutex_destroy(&sch->mux_done_lock);
563 6760 pthread_cond_destroy(&sch->mux_done_cond);
564
565 6760 av_freep(psch);
566 }
567
568 static const AVClass scheduler_class = {
569 .class_name = "Scheduler",
570 .version = LIBAVUTIL_VERSION_INT,
571 };
572
573 6760 Scheduler *sch_alloc(void)
574 {
575 Scheduler *sch;
576 int ret;
577
578 6760 sch = av_mallocz(sizeof(*sch));
579
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6760 times.
6760 if (!sch)
580 return NULL;
581
582 6760 sch->class = &scheduler_class;
583 6760 sch->sdp_auto = 1;
584
585 6760 ret = pthread_mutex_init(&sch->schedule_lock, NULL);
586
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6760 times.
6760 if (ret)
587 goto fail;
588
589 6760 ret = pthread_mutex_init(&sch->mux_ready_lock, NULL);
590
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6760 times.
6760 if (ret)
591 goto fail;
592
593 6760 ret = pthread_mutex_init(&sch->mux_done_lock, NULL);
594
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6760 times.
6760 if (ret)
595 goto fail;
596
597 6760 ret = pthread_cond_init(&sch->mux_done_cond, NULL);
598
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6760 times.
6760 if (ret)
599 goto fail;
600
601 6760 return sch;
602 fail:
603 sch_free(&sch);
604 return NULL;
605 }
606
607 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
608 {
609 av_freep(&sch->sdp_filename);
610 sch->sdp_filename = av_strdup(sdp_filename);
611 return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
612 }
613
614 static const AVClass sch_mux_class = {
615 .class_name = "SchMux",
616 .version = LIBAVUTIL_VERSION_INT,
617 .parent_log_context_offset = offsetof(SchMux, task.func_arg),
618 };
619
620 6761 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
621 void *arg, int sdp_auto, unsigned thread_queue_size)
622 {
623 6761 const unsigned idx = sch->nb_mux;
624
625 SchMux *mux;
626 int ret;
627
628 6761 ret = GROW_ARRAY(sch->mux, sch->nb_mux);
629
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6761 times.
6761 if (ret < 0)
630 return ret;
631
632 6761 mux = &sch->mux[idx];
633 6761 mux->class = &sch_mux_class;
634 6761 mux->init = init;
635 6761 mux->queue_size = thread_queue_size;
636
637 6761 task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
638
639 6761 sch->sdp_auto &= sdp_auto;
640
641 6761 return idx;
642 }
643
644 7111 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
645 {
646 SchMux *mux;
647 SchMuxStream *ms;
648 unsigned stream_idx;
649 int ret;
650
651
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7111 times.
7111 av_assert0(mux_idx < sch->nb_mux);
652 7111 mux = &sch->mux[mux_idx];
653
654 7111 ret = GROW_ARRAY(mux->streams, mux->nb_streams);
655
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7111 times.
7111 if (ret < 0)
656 return ret;
657 7111 stream_idx = mux->nb_streams - 1;
658
659 7111 ms = &mux->streams[stream_idx];
660
661 7111 ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
662
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7111 times.
7111 if (!ms->pre_mux_queue.fifo)
663 return AVERROR(ENOMEM);
664
665 7111 ms->last_dts = AV_NOPTS_VALUE;
666
667 7111 return stream_idx;
668 }
669
670 static const AVClass sch_demux_class = {
671 .class_name = "SchDemux",
672 .version = LIBAVUTIL_VERSION_INT,
673 .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
674 };
675
676 6790 int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
677 {
678 6790 const unsigned idx = sch->nb_demux;
679
680 SchDemux *d;
681 int ret;
682
683 6790 ret = GROW_ARRAY(sch->demux, sch->nb_demux);
684
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6790 times.
6790 if (ret < 0)
685 return ret;
686
687 6790 d = &sch->demux[idx];
688
689 6790 task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
690
691 6790 d->class = &sch_demux_class;
692 6790 d->send_pkt = av_packet_alloc();
693
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6790 times.
6790 if (!d->send_pkt)
694 return AVERROR(ENOMEM);
695
696 6790 ret = waiter_init(&d->waiter);
697
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6790 times.
6790 if (ret < 0)
698 return ret;
699
700 6790 return idx;
701 }
702
703 7006 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
704 {
705 SchDemux *d;
706 int ret;
707
708
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7006 times.
7006 av_assert0(demux_idx < sch->nb_demux);
709 7006 d = &sch->demux[demux_idx];
710
711 7006 ret = GROW_ARRAY(d->streams, d->nb_streams);
712
1/2
✓ Branch 0 taken 7006 times.
✗ Branch 1 not taken.
7006 return ret < 0 ? ret : d->nb_streams - 1;
713 }
714
715 static const AVClass sch_dec_class = {
716 .class_name = "SchDec",
717 .version = LIBAVUTIL_VERSION_INT,
718 .parent_log_context_offset = offsetof(SchDec, task.func_arg),
719 };
720
721 6391 int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx,
722 int send_end_ts)
723 {
724 6391 const unsigned idx = sch->nb_dec;
725
726 SchDec *dec;
727 int ret;
728
729 6391 ret = GROW_ARRAY(sch->dec, sch->nb_dec);
730
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6391 times.
6391 if (ret < 0)
731 return ret;
732
733 6391 dec = &sch->dec[idx];
734
735 6391 task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
736
737 6391 dec->class = &sch_dec_class;
738 6391 dec->send_frame = av_frame_alloc();
739
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6391 times.
6391 if (!dec->send_frame)
740 return AVERROR(ENOMEM);
741
742 6391 ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
743
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6391 times.
6391 if (ret < 0)
744 return ret;
745
746
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6390 times.
6391 if (send_end_ts) {
747 1 ret = av_thread_message_queue_alloc(&dec->queue_end_ts, 1, sizeof(Timestamp));
748
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ret < 0)
749 return ret;
750 }
751
752 6391 return idx;
753 }
754
755 static const AVClass sch_enc_class = {
756 .class_name = "SchEnc",
757 .version = LIBAVUTIL_VERSION_INT,
758 .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
759 };
760
761 6472 int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
762 int (*open_cb)(void *opaque, const AVFrame *frame))
763 {
764 6472 const unsigned idx = sch->nb_enc;
765
766 SchEnc *enc;
767 int ret;
768
769 6472 ret = GROW_ARRAY(sch->enc, sch->nb_enc);
770
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6472 times.
6472 if (ret < 0)
771 return ret;
772
773 6472 enc = &sch->enc[idx];
774
775 6472 enc->class = &sch_enc_class;
776 6472 enc->open_cb = open_cb;
777 6472 enc->sq_idx[0] = -1;
778 6472 enc->sq_idx[1] = -1;
779
780 6472 task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
781
782 6472 enc->send_pkt = av_packet_alloc();
783
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6472 times.
6472 if (!enc->send_pkt)
784 return AVERROR(ENOMEM);
785
786 6472 ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
787
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6472 times.
6472 if (ret < 0)
788 return ret;
789
790 6472 return idx;
791 }
792
793 static const AVClass sch_fg_class = {
794 .class_name = "SchFilterGraph",
795 .version = LIBAVUTIL_VERSION_INT,
796 .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
797 };
798
799 6384 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
800 SchThreadFunc func, void *ctx)
801 {
802 6384 const unsigned idx = sch->nb_filters;
803
804 SchFilterGraph *fg;
805 int ret;
806
807 6384 ret = GROW_ARRAY(sch->filters, sch->nb_filters);
808
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6384 times.
6384 if (ret < 0)
809 return ret;
810 6384 fg = &sch->filters[idx];
811
812 6384 fg->class = &sch_fg_class;
813
814 6384 task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
815
816
2/2
✓ Branch 0 taken 6329 times.
✓ Branch 1 taken 55 times.
6384 if (nb_inputs) {
817 6329 fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
818
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6329 times.
6329 if (!fg->inputs)
819 return AVERROR(ENOMEM);
820 6329 fg->nb_inputs = nb_inputs;
821 }
822
823
1/2
✓ Branch 0 taken 6384 times.
✗ Branch 1 not taken.
6384 if (nb_outputs) {
824 6384 fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
825
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6384 times.
6384 if (!fg->outputs)
826 return AVERROR(ENOMEM);
827 6384 fg->nb_outputs = nb_outputs;
828 }
829
830 6384 ret = waiter_init(&fg->waiter);
831
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6384 times.
6384 if (ret < 0)
832 return ret;
833
834 6384 ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
835
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6384 times.
6384 if (ret < 0)
836 return ret;
837
838 6384 return idx;
839 }
840
841 2776 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
842 {
843 SchSyncQueue *sq;
844 int ret;
845
846 2776 ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
847
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2776 times.
2776 if (ret < 0)
848 return ret;
849 2776 sq = &sch->sq_enc[sch->nb_sq_enc - 1];
850
851 2776 sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
852
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2776 times.
2776 if (!sq->sq)
853 return AVERROR(ENOMEM);
854
855 2776 sq->frame = av_frame_alloc();
856
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2776 times.
2776 if (!sq->frame)
857 return AVERROR(ENOMEM);
858
859 2776 ret = pthread_mutex_init(&sq->lock, NULL);
860
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2776 times.
2776 if (ret)
861 return AVERROR(ret);
862
863 2776 return sq - sch->sq_enc;
864 }
865
866 2821 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
867 int limiting, uint64_t max_frames)
868 {
869 SchSyncQueue *sq;
870 SchEnc *enc;
871 int ret;
872
873
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2821 times.
2821 av_assert0(sq_idx < sch->nb_sq_enc);
874 2821 sq = &sch->sq_enc[sq_idx];
875
876
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2821 times.
2821 av_assert0(enc_idx < sch->nb_enc);
877 2821 enc = &sch->enc[enc_idx];
878
879 2821 ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
880
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2821 times.
2821 if (ret < 0)
881 return ret;
882 2821 sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
883
884 2821 ret = sq_add_stream(sq->sq, limiting);
885
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2821 times.
2821 if (ret < 0)
886 return ret;
887
888 2821 enc->sq_idx[0] = sq_idx;
889 2821 enc->sq_idx[1] = ret;
890
891
2/2
✓ Branch 0 taken 2651 times.
✓ Branch 1 taken 170 times.
2821 if (max_frames != INT64_MAX)
892 2651 sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
893
894 2821 return 0;
895 }
896
897 26373 int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
898 {
899 int ret;
900
901
4/5
✓ Branch 0 taken 7029 times.
✓ Branch 1 taken 6437 times.
✓ Branch 2 taken 6434 times.
✓ Branch 3 taken 6473 times.
✗ Branch 4 not taken.
26373 switch (src.type) {
902 7029 case SCH_NODE_TYPE_DEMUX: {
903 SchDemuxStream *ds;
904
905
2/4
✓ Branch 0 taken 7029 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7029 times.
7029 av_assert0(src.idx < sch->nb_demux &&
906 src.idx_stream < sch->demux[src.idx].nb_streams);
907 7029 ds = &sch->demux[src.idx].streams[src.idx_stream];
908
909 7029 ret = GROW_ARRAY(ds->dst, ds->nb_dst);
910
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7029 times.
7029 if (ret < 0)
911 return ret;
912
913 7029 ds->dst[ds->nb_dst - 1] = dst;
914
915 // demuxed packets go to decoding or streamcopy
916
2/3
✓ Branch 0 taken 6390 times.
✓ Branch 1 taken 639 times.
✗ Branch 2 not taken.
7029 switch (dst.type) {
917 6390 case SCH_NODE_TYPE_DEC: {
918 SchDec *dec;
919
920
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6390 times.
6390 av_assert0(dst.idx < sch->nb_dec);
921 6390 dec = &sch->dec[dst.idx];
922
923
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6390 times.
6390 av_assert0(!dec->src.type);
924 6390 dec->src = src;
925 6390 break;
926 }
927 639 case SCH_NODE_TYPE_MUX: {
928 SchMuxStream *ms;
929
930
2/4
✓ Branch 0 taken 639 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 639 times.
639 av_assert0(dst.idx < sch->nb_mux &&
931 dst.idx_stream < sch->mux[dst.idx].nb_streams);
932 639 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
933
934
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 639 times.
639 av_assert0(!ms->src.type);
935 639 ms->src = src;
936
937 639 break;
938 }
939 default: av_assert0(0);
940 }
941
942 7029 break;
943 }
944 6437 case SCH_NODE_TYPE_DEC: {
945 SchDec *dec;
946
947
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6437 times.
6437 av_assert0(src.idx < sch->nb_dec);
948 6437 dec = &sch->dec[src.idx];
949
950 6437 ret = GROW_ARRAY(dec->dst, dec->nb_dst);
951
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6437 times.
6437 if (ret < 0)
952 return ret;
953
954 6437 dec->dst[dec->nb_dst - 1] = dst;
955
956 // decoded frames go to filters or encoding
957
2/3
✓ Branch 0 taken 6399 times.
✓ Branch 1 taken 38 times.
✗ Branch 2 not taken.
6437 switch (dst.type) {
958 6399 case SCH_NODE_TYPE_FILTER_IN: {
959 SchFilterIn *fi;
960
961
2/4
✓ Branch 0 taken 6399 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6399 times.
6399 av_assert0(dst.idx < sch->nb_filters &&
962 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
963 6399 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
964
965
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6399 times.
6399 av_assert0(!fi->src.type);
966 6399 fi->src = src;
967 6399 break;
968 }
969 38 case SCH_NODE_TYPE_ENC: {
970 SchEnc *enc;
971
972
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 av_assert0(dst.idx < sch->nb_enc);
973 38 enc = &sch->enc[dst.idx];
974
975
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 av_assert0(!enc->src.type);
976 38 enc->src = src;
977 38 break;
978 }
979 default: av_assert0(0);
980 }
981
982 6437 break;
983 }
984 6434 case SCH_NODE_TYPE_FILTER_OUT: {
985 SchFilterOut *fo;
986
987
2/4
✓ Branch 0 taken 6434 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6434 times.
6434 av_assert0(src.idx < sch->nb_filters &&
988 src.idx_stream < sch->filters[src.idx].nb_outputs);
989 6434 fo = &sch->filters[src.idx].outputs[src.idx_stream];
990
991
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6434 times.
6434 av_assert0(!fo->dst.type);
992 6434 fo->dst = dst;
993
994 // filtered frames go to encoding or another filtergraph
995
1/3
✓ Branch 0 taken 6434 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
6434 switch (dst.type) {
996 6434 case SCH_NODE_TYPE_ENC: {
997 SchEnc *enc;
998
999
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6434 times.
6434 av_assert0(dst.idx < sch->nb_enc);
1000 6434 enc = &sch->enc[dst.idx];
1001
1002
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6434 times.
6434 av_assert0(!enc->src.type);
1003 6434 enc->src = src;
1004 6434 break;
1005 }
1006 case SCH_NODE_TYPE_FILTER_IN: {
1007 SchFilterIn *fi;
1008
1009 av_assert0(dst.idx < sch->nb_filters &&
1010 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1011 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1012
1013 av_assert0(!fi->src.type);
1014 fi->src = src;
1015 break;
1016 }
1017 default: av_assert0(0);
1018 }
1019
1020
1021 6434 break;
1022 }
1023 6473 case SCH_NODE_TYPE_ENC: {
1024 SchEnc *enc;
1025
1026
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6473 times.
6473 av_assert0(src.idx < sch->nb_enc);
1027 6473 enc = &sch->enc[src.idx];
1028
1029 6473 ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1030
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6473 times.
6473 if (ret < 0)
1031 return ret;
1032
1033 6473 enc->dst[enc->nb_dst - 1] = dst;
1034
1035 // encoding packets go to muxing or decoding
1036
2/3
✓ Branch 0 taken 6472 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
6473 switch (dst.type) {
1037 6472 case SCH_NODE_TYPE_MUX: {
1038 SchMuxStream *ms;
1039
1040
2/4
✓ Branch 0 taken 6472 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6472 times.
6472 av_assert0(dst.idx < sch->nb_mux &&
1041 dst.idx_stream < sch->mux[dst.idx].nb_streams);
1042 6472 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1043
1044
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6472 times.
6472 av_assert0(!ms->src.type);
1045 6472 ms->src = src;
1046
1047 6472 break;
1048 }
1049 1 case SCH_NODE_TYPE_DEC: {
1050 SchDec *dec;
1051
1052
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(dst.idx < sch->nb_dec);
1053 1 dec = &sch->dec[dst.idx];
1054
1055
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(!dec->src.type);
1056 1 dec->src = src;
1057
1058 1 break;
1059 }
1060 default: av_assert0(0);
1061 }
1062
1063 6473 break;
1064 }
1065 default: av_assert0(0);
1066 }
1067
1068 26373 return 0;
1069 }
1070
1071 6761 static int mux_task_start(SchMux *mux)
1072 {
1073 6761 int ret = 0;
1074
1075 6761 ret = task_start(&mux->task);
1076
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6761 times.
6761 if (ret < 0)
1077 return ret;
1078
1079 /* flush the pre-muxing queues */
1080
2/2
✓ Branch 0 taken 7111 times.
✓ Branch 1 taken 6761 times.
13872 for (unsigned i = 0; i < mux->nb_streams; i++) {
1081 7111 SchMuxStream *ms = &mux->streams[i];
1082 AVPacket *pkt;
1083
1084
2/2
✓ Branch 1 taken 3150 times.
✓ Branch 2 taken 7111 times.
10261 while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0) {
1085
2/2
✓ Branch 0 taken 3098 times.
✓ Branch 1 taken 52 times.
3150 if (pkt) {
1086
2/2
✓ Branch 0 taken 3081 times.
✓ Branch 1 taken 17 times.
3098 if (!ms->init_eof)
1087 3081 ret = tq_send(mux->queue, i, pkt);
1088 3098 av_packet_free(&pkt);
1089
2/2
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 3080 times.
3098 if (ret == AVERROR_EOF)
1090 18 ms->init_eof = 1;
1091
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3080 times.
3080 else if (ret < 0)
1092 return ret;
1093 } else
1094 52 tq_send_finish(mux->queue, i);
1095 }
1096 }
1097
1098 6761 atomic_store(&mux->mux_started, 1);
1099
1100 6761 return 0;
1101 }
1102
1103 int print_sdp(const char *filename);
1104
1105 6761 static int mux_init(Scheduler *sch, SchMux *mux)
1106 {
1107 int ret;
1108
1109 6761 ret = mux->init(mux->task.func_arg);
1110
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6761 times.
6761 if (ret < 0)
1111 return ret;
1112
1113 6761 sch->nb_mux_ready++;
1114
1115
2/4
✓ Branch 0 taken 6761 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6761 times.
6761 if (sch->sdp_filename || sch->sdp_auto) {
1116 if (sch->nb_mux_ready < sch->nb_mux)
1117 return 0;
1118
1119 ret = print_sdp(sch->sdp_filename);
1120 if (ret < 0) {
1121 av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1122 return ret;
1123 }
1124
1125 /* SDP is written only after all the muxers are ready, so now we
1126 * start ALL the threads */
1127 for (unsigned i = 0; i < sch->nb_mux; i++) {
1128 ret = mux_task_start(&sch->mux[i]);
1129 if (ret < 0)
1130 return ret;
1131 }
1132 } else {
1133 6761 ret = mux_task_start(mux);
1134
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6761 times.
6761 if (ret < 0)
1135 return ret;
1136 }
1137
1138 6761 return 0;
1139 }
1140
1141 7111 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1142 size_t data_threshold, int max_packets)
1143 {
1144 SchMux *mux;
1145 SchMuxStream *ms;
1146
1147
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7111 times.
7111 av_assert0(mux_idx < sch->nb_mux);
1148 7111 mux = &sch->mux[mux_idx];
1149
1150
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7111 times.
7111 av_assert0(stream_idx < mux->nb_streams);
1151 7111 ms = &mux->streams[stream_idx];
1152
1153 7111 ms->pre_mux_queue.max_packets = max_packets;
1154 7111 ms->pre_mux_queue.data_threshold = data_threshold;
1155 7111 }
1156
1157 7111 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1158 {
1159 SchMux *mux;
1160 7111 int ret = 0;
1161
1162
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7111 times.
7111 av_assert0(mux_idx < sch->nb_mux);
1163 7111 mux = &sch->mux[mux_idx];
1164
1165
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7111 times.
7111 av_assert0(stream_idx < mux->nb_streams);
1166
1167 7111 pthread_mutex_lock(&sch->mux_ready_lock);
1168
1169
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7111 times.
7111 av_assert0(mux->nb_streams_ready < mux->nb_streams);
1170
1171 // this may be called during initialization - do not start
1172 // threads before sch_start() is called
1173
2/2
✓ Branch 0 taken 6761 times.
✓ Branch 1 taken 350 times.
7111 if (++mux->nb_streams_ready == mux->nb_streams &&
1174
2/2
✓ Branch 0 taken 6319 times.
✓ Branch 1 taken 442 times.
6761 sch->state >= SCH_STATE_STARTED)
1175 6319 ret = mux_init(sch, mux);
1176
1177 7111 pthread_mutex_unlock(&sch->mux_ready_lock);
1178
1179 7111 return ret;
1180 }
1181
1182 1 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1183 unsigned dec_idx)
1184 {
1185 SchMux *mux;
1186 SchMuxStream *ms;
1187 1 int ret = 0;
1188
1189
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(mux_idx < sch->nb_mux);
1190 1 mux = &sch->mux[mux_idx];
1191
1192
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(stream_idx < mux->nb_streams);
1193 1 ms = &mux->streams[stream_idx];
1194
1195 1 ret = GROW_ARRAY(ms->sub_heartbeat_dst, ms->nb_sub_heartbeat_dst);
1196
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ret < 0)
1197 return ret;
1198
1199
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(dec_idx < sch->nb_dec);
1200 1 ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1201
1202
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!mux->sub_heartbeat_pkt) {
1203 1 mux->sub_heartbeat_pkt = av_packet_alloc();
1204
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!mux->sub_heartbeat_pkt)
1205 return AVERROR(ENOMEM);
1206 }
1207
1208 1 return 0;
1209 }
1210
1211 488969 static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
1212 {
1213 421295 while (1) {
1214 SchFilterGraph *fg;
1215
1216 // fed directly by a demuxer (i.e. not through a filtergraph)
1217
2/2
✓ Branch 0 taken 486949 times.
✓ Branch 1 taken 423315 times.
910264 if (src.type == SCH_NODE_TYPE_DEMUX) {
1218 486949 sch->demux[src.idx].waiter.choked_next = 0;
1219 486949 return;
1220 }
1221
1222
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 423315 times.
423315 av_assert0(src.type == SCH_NODE_TYPE_FILTER_OUT);
1223 423315 fg = &sch->filters[src.idx];
1224
1225 // the filtergraph contains internal sources and
1226 // requested to be scheduled directly
1227
2/2
✓ Branch 0 taken 2020 times.
✓ Branch 1 taken 421295 times.
423315 if (fg->best_input == fg->nb_inputs) {
1228 2020 fg->waiter.choked_next = 0;
1229 2020 return;
1230 }
1231
1232 421295 src = fg->inputs[fg->best_input].src_sched;
1233 }
1234 }
1235
1236 498606 static void schedule_update_locked(Scheduler *sch)
1237 {
1238 int64_t dts;
1239 498606 int have_unchoked = 0;
1240
1241 // on termination request all waiters are choked,
1242 // we are not to unchoke them
1243
2/2
✓ Branch 0 taken 2888 times.
✓ Branch 1 taken 495718 times.
498606 if (atomic_load(&sch->terminate))
1244 2888 return;
1245
1246 495718 dts = trailing_dts(sch, 0);
1247
1248 495718 atomic_store(&sch->last_dts, dts);
1249
1250 // initialize our internal state
1251
2/2
✓ Branch 0 taken 991436 times.
✓ Branch 1 taken 495718 times.
1487154 for (unsigned type = 0; type < 2; type++)
1252
4/4
✓ Branch 0 taken 940771 times.
✓ Branch 1 taken 994831 times.
✓ Branch 2 taken 944166 times.
✓ Branch 3 taken 991436 times.
1935602 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1253
2/2
✓ Branch 0 taken 445053 times.
✓ Branch 1 taken 499113 times.
944166 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1254 944166 w->choked_prev = atomic_load(&w->choked);
1255 944166 w->choked_next = 1;
1256 }
1257
1258 // figure out the sources that are allowed to proceed
1259
2/2
✓ Branch 0 taken 495984 times.
✓ Branch 1 taken 495718 times.
991702 for (unsigned i = 0; i < sch->nb_mux; i++) {
1260 495984 SchMux *mux = &sch->mux[i];
1261
1262
2/2
✓ Branch 0 taken 538499 times.
✓ Branch 1 taken 495984 times.
1034483 for (unsigned j = 0; j < mux->nb_streams; j++) {
1263 538499 SchMuxStream *ms = &mux->streams[j];
1264
1265 // unblock sources for output streams that are not finished
1266 // and not too far ahead of the trailing stream
1267
2/2
✓ Branch 0 taken 29872 times.
✓ Branch 1 taken 508627 times.
538499 if (ms->source_finished)
1268 29872 continue;
1269
4/4
✓ Branch 0 taken 21692 times.
✓ Branch 1 taken 486935 times.
✓ Branch 2 taken 5223 times.
✓ Branch 3 taken 16469 times.
508627 if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1270 5223 continue;
1271
4/4
✓ Branch 0 taken 486935 times.
✓ Branch 1 taken 16469 times.
✓ Branch 2 taken 14435 times.
✓ Branch 3 taken 472500 times.
503404 if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1272 14435 continue;
1273
1274 // resolve the source to unchoke
1275 488969 unchoke_for_stream(sch, ms->src_sched);
1276 488969 have_unchoked = 1;
1277 }
1278 }
1279
1280 // make sure to unchoke at least one source, if still available
1281
4/4
✓ Branch 0 taken 55327 times.
✓ Branch 1 taken 482527 times.
✓ Branch 2 taken 42136 times.
✓ Branch 3 taken 13191 times.
537854 for (unsigned type = 0; !have_unchoked && type < 2; type++)
1282
4/4
✓ Branch 0 taken 30284 times.
✓ Branch 1 taken 42352 times.
✓ Branch 2 taken 41844 times.
✓ Branch 3 taken 30792 times.
72636 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1283
2/2
✓ Branch 0 taken 17093 times.
✓ Branch 1 taken 24751 times.
41844 int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1284
2/2
✓ Branch 0 taken 17093 times.
✓ Branch 1 taken 24751 times.
41844 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1285
2/2
✓ Branch 0 taken 11344 times.
✓ Branch 1 taken 30500 times.
41844 if (!exited) {
1286 11344 w->choked_next = 0;
1287 11344 have_unchoked = 1;
1288 11344 break;
1289 }
1290 }
1291
1292
1293
2/2
✓ Branch 0 taken 991436 times.
✓ Branch 1 taken 495718 times.
1487154 for (unsigned type = 0; type < 2; type++)
1294
4/4
✓ Branch 0 taken 940771 times.
✓ Branch 1 taken 994831 times.
✓ Branch 2 taken 944166 times.
✓ Branch 3 taken 991436 times.
1935602 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1295
2/2
✓ Branch 0 taken 445053 times.
✓ Branch 1 taken 499113 times.
944166 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1296
2/2
✓ Branch 0 taken 17444 times.
✓ Branch 1 taken 926722 times.
944166 if (w->choked_prev != w->choked_next)
1297 17444 waiter_set(w, w->choked_next);
1298 }
1299
1300 }
1301
1302 enum {
1303 CYCLE_NODE_NEW = 0,
1304 CYCLE_NODE_STARTED,
1305 CYCLE_NODE_DONE,
1306 };
1307
1308 static int
1309 6384 check_acyclic_for_output(const Scheduler *sch, SchedulerNode src,
1310 uint8_t *filters_visited, SchedulerNode *filters_stack)
1311 {
1312 6384 unsigned nb_filters_stack = 0;
1313
1314 6384 memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1315
1316 6401 while (1) {
1317 12785 const SchFilterGraph *fg = &sch->filters[src.idx];
1318
1319 12785 filters_visited[src.idx] = CYCLE_NODE_STARTED;
1320
1321 // descend into every input, depth first
1322
2/2
✓ Branch 0 taken 6400 times.
✓ Branch 1 taken 6385 times.
12785 if (src.idx_stream < fg->nb_inputs) {
1323 6400 const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1324
1325 // connected to demuxer, no cycles possible
1326
2/2
✓ Branch 0 taken 6399 times.
✓ Branch 1 taken 1 times.
6400 if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX)
1327 6400 continue;
1328
1329 // otherwise connected to another filtergraph
1330
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(fi->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
1331
1332 // found a cycle
1333
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED)
1334 return AVERROR(EINVAL);
1335
1336 // place current position on stack and descend
1337
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(nb_filters_stack < sch->nb_filters);
1338 1 filters_stack[nb_filters_stack++] = src;
1339 1 src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 };
1340 1 continue;
1341 }
1342
1343 6385 filters_visited[src.idx] = CYCLE_NODE_DONE;
1344
1345 // previous search finished,
1346
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6384 times.
6385 if (nb_filters_stack) {
1347 1 src = filters_stack[--nb_filters_stack];
1348 1 continue;
1349 }
1350 6384 return 0;
1351 }
1352 }
1353
1354 6759 static int check_acyclic(Scheduler *sch)
1355 {
1356 6759 uint8_t *filters_visited = NULL;
1357 6759 SchedulerNode *filters_stack = NULL;
1358
1359 6759 int ret = 0;
1360
1361
2/2
✓ Branch 0 taken 477 times.
✓ Branch 1 taken 6282 times.
6759 if (!sch->nb_filters)
1362 477 return 0;
1363
1364 6282 filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1365
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6282 times.
6282 if (!filters_visited)
1366 return AVERROR(ENOMEM);
1367
1368 6282 filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1369
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6282 times.
6282 if (!filters_stack) {
1370 ret = AVERROR(ENOMEM);
1371 goto fail;
1372 }
1373
1374 // trace the transcoding graph upstream from every filtegraph
1375
2/2
✓ Branch 0 taken 6384 times.
✓ Branch 1 taken 6282 times.
12666 for (unsigned i = 0; i < sch->nb_filters; i++) {
1376 6384 ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1377 filters_visited, filters_stack);
1378
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6384 times.
6384 if (ret < 0) {
1379 av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1380 goto fail;
1381 }
1382 }
1383
1384 6282 fail:
1385 6282 av_freep(&filters_visited);
1386 6282 av_freep(&filters_stack);
1387 6282 return ret;
1388 }
1389
1390 6759 static int start_prepare(Scheduler *sch)
1391 {
1392 int ret;
1393
1394
2/2
✓ Branch 0 taken 6790 times.
✓ Branch 1 taken 6759 times.
13549 for (unsigned i = 0; i < sch->nb_demux; i++) {
1395 6790 SchDemux *d = &sch->demux[i];
1396
1397
2/2
✓ Branch 0 taken 7006 times.
✓ Branch 1 taken 6790 times.
13796 for (unsigned j = 0; j < d->nb_streams; j++) {
1398 7006 SchDemuxStream *ds = &d->streams[j];
1399
1400
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7006 times.
7006 if (!ds->nb_dst) {
1401 av_log(d, AV_LOG_ERROR,
1402 "Demuxer stream %u not connected to any sink\n", j);
1403 return AVERROR(EINVAL);
1404 }
1405
1406 7006 ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1407
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7006 times.
7006 if (!ds->dst_finished)
1408 return AVERROR(ENOMEM);
1409 }
1410 }
1411
1412
2/2
✓ Branch 0 taken 6391 times.
✓ Branch 1 taken 6759 times.
13150 for (unsigned i = 0; i < sch->nb_dec; i++) {
1413 6391 SchDec *dec = &sch->dec[i];
1414
1415
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6391 times.
6391 if (!dec->src.type) {
1416 av_log(dec, AV_LOG_ERROR,
1417 "Decoder not connected to a source\n");
1418 return AVERROR(EINVAL);
1419 }
1420
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6391 times.
6391 if (!dec->nb_dst) {
1421 av_log(dec, AV_LOG_ERROR,
1422 "Decoder not connected to any sink\n");
1423 return AVERROR(EINVAL);
1424 }
1425
1426 6391 dec->dst_finished = av_calloc(dec->nb_dst, sizeof(*dec->dst_finished));
1427
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6391 times.
6391 if (!dec->dst_finished)
1428 return AVERROR(ENOMEM);
1429 }
1430
1431
2/2
✓ Branch 0 taken 6472 times.
✓ Branch 1 taken 6759 times.
13231 for (unsigned i = 0; i < sch->nb_enc; i++) {
1432 6472 SchEnc *enc = &sch->enc[i];
1433
1434
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6472 times.
6472 if (!enc->src.type) {
1435 av_log(enc, AV_LOG_ERROR,
1436 "Encoder not connected to a source\n");
1437 return AVERROR(EINVAL);
1438 }
1439
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6472 times.
6472 if (!enc->nb_dst) {
1440 av_log(enc, AV_LOG_ERROR,
1441 "Encoder not connected to any sink\n");
1442 return AVERROR(EINVAL);
1443 }
1444
1445 6472 enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1446
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6472 times.
6472 if (!enc->dst_finished)
1447 return AVERROR(ENOMEM);
1448 }
1449
1450
2/2
✓ Branch 0 taken 6761 times.
✓ Branch 1 taken 6759 times.
13520 for (unsigned i = 0; i < sch->nb_mux; i++) {
1451 6761 SchMux *mux = &sch->mux[i];
1452
1453
2/2
✓ Branch 0 taken 7111 times.
✓ Branch 1 taken 6761 times.
13872 for (unsigned j = 0; j < mux->nb_streams; j++) {
1454 7111 SchMuxStream *ms = &mux->streams[j];
1455
1456
2/3
✓ Branch 0 taken 6472 times.
✓ Branch 1 taken 639 times.
✗ Branch 2 not taken.
7111 switch (ms->src.type) {
1457 6472 case SCH_NODE_TYPE_ENC: {
1458 6472 SchEnc *enc = &sch->enc[ms->src.idx];
1459
2/2
✓ Branch 0 taken 38 times.
✓ Branch 1 taken 6434 times.
6472 if (enc->src.type == SCH_NODE_TYPE_DEC) {
1460 38 ms->src_sched = sch->dec[enc->src.idx].src;
1461
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 av_assert0(ms->src_sched.type == SCH_NODE_TYPE_DEMUX);
1462 } else {
1463 6434 ms->src_sched = enc->src;
1464
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6434 times.
6434 av_assert0(ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
1465 }
1466 6472 break;
1467 }
1468 639 case SCH_NODE_TYPE_DEMUX:
1469 639 ms->src_sched = ms->src;
1470 639 break;
1471 default:
1472 av_log(mux, AV_LOG_ERROR,
1473 "Muxer stream #%u not connected to a source\n", j);
1474 return AVERROR(EINVAL);
1475 }
1476 }
1477
1478 6761 ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1479 QUEUE_PACKETS);
1480
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6761 times.
6761 if (ret < 0)
1481 return ret;
1482 }
1483
1484
2/2
✓ Branch 0 taken 6384 times.
✓ Branch 1 taken 6759 times.
13143 for (unsigned i = 0; i < sch->nb_filters; i++) {
1485 6384 SchFilterGraph *fg = &sch->filters[i];
1486
1487
2/2
✓ Branch 0 taken 6399 times.
✓ Branch 1 taken 6384 times.
12783 for (unsigned j = 0; j < fg->nb_inputs; j++) {
1488 6399 SchFilterIn *fi = &fg->inputs[j];
1489 SchDec *dec;
1490
1491
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6399 times.
6399 if (!fi->src.type) {
1492 av_log(fg, AV_LOG_ERROR,
1493 "Filtergraph input %u not connected to a source\n", j);
1494 return AVERROR(EINVAL);
1495 }
1496
1497
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6399 times.
6399 if (fi->src.type == SCH_NODE_TYPE_FILTER_OUT)
1498 fi->src_sched = fi->src;
1499 else {
1500
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6399 times.
6399 av_assert0(fi->src.type == SCH_NODE_TYPE_DEC);
1501 6399 dec = &sch->dec[fi->src.idx];
1502
1503
2/3
✓ Branch 0 taken 6398 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
6399 switch (dec->src.type) {
1504 6398 case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break;
1505 1 case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break;
1506 default: av_assert0(0);
1507 }
1508 }
1509 }
1510
1511
2/2
✓ Branch 0 taken 6434 times.
✓ Branch 1 taken 6384 times.
12818 for (unsigned j = 0; j < fg->nb_outputs; j++) {
1512 6434 SchFilterOut *fo = &fg->outputs[j];
1513
1514
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6434 times.
6434 if (!fo->dst.type) {
1515 av_log(fg, AV_LOG_ERROR,
1516 "Filtergraph %u output %u not connected to a sink\n", i, j);
1517 return AVERROR(EINVAL);
1518 }
1519 }
1520 }
1521
1522 // Check that the transcoding graph has no cycles.
1523 6759 ret = check_acyclic(sch);
1524
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6759 times.
6759 if (ret < 0)
1525 return ret;
1526
1527 6759 return 0;
1528 }
1529
1530 6759 int sch_start(Scheduler *sch)
1531 {
1532 int ret;
1533
1534 6759 ret = start_prepare(sch);
1535
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6759 times.
6759 if (ret < 0)
1536 return ret;
1537
1538
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6759 times.
6759 av_assert0(sch->state == SCH_STATE_UNINIT);
1539 6759 sch->state = SCH_STATE_STARTED;
1540
1541
2/2
✓ Branch 0 taken 6761 times.
✓ Branch 1 taken 6759 times.
13520 for (unsigned i = 0; i < sch->nb_mux; i++) {
1542 6761 SchMux *mux = &sch->mux[i];
1543
1544
2/2
✓ Branch 0 taken 442 times.
✓ Branch 1 taken 6319 times.
6761 if (mux->nb_streams_ready == mux->nb_streams) {
1545 442 ret = mux_init(sch, mux);
1546
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 442 times.
442 if (ret < 0)
1547 goto fail;
1548 }
1549 }
1550
1551
2/2
✓ Branch 0 taken 6472 times.
✓ Branch 1 taken 6759 times.
13231 for (unsigned i = 0; i < sch->nb_enc; i++) {
1552 6472 SchEnc *enc = &sch->enc[i];
1553
1554 6472 ret = task_start(&enc->task);
1555
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6472 times.
6472 if (ret < 0)
1556 goto fail;
1557 }
1558
1559
2/2
✓ Branch 0 taken 6384 times.
✓ Branch 1 taken 6759 times.
13143 for (unsigned i = 0; i < sch->nb_filters; i++) {
1560 6384 SchFilterGraph *fg = &sch->filters[i];
1561
1562 6384 ret = task_start(&fg->task);
1563
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6384 times.
6384 if (ret < 0)
1564 goto fail;
1565 }
1566
1567
2/2
✓ Branch 0 taken 6391 times.
✓ Branch 1 taken 6759 times.
13150 for (unsigned i = 0; i < sch->nb_dec; i++) {
1568 6391 SchDec *dec = &sch->dec[i];
1569
1570 6391 ret = task_start(&dec->task);
1571
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6391 times.
6391 if (ret < 0)
1572 goto fail;
1573 }
1574
1575
2/2
✓ Branch 0 taken 6790 times.
✓ Branch 1 taken 6759 times.
13549 for (unsigned i = 0; i < sch->nb_demux; i++) {
1576 6790 SchDemux *d = &sch->demux[i];
1577
1578
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 6776 times.
6790 if (!d->nb_streams)
1579 14 continue;
1580
1581 6776 ret = task_start(&d->task);
1582
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6776 times.
6776 if (ret < 0)
1583 goto fail;
1584 }
1585
1586 6759 pthread_mutex_lock(&sch->schedule_lock);
1587 6759 schedule_update_locked(sch);
1588 6759 pthread_mutex_unlock(&sch->schedule_lock);
1589
1590 6759 return 0;
1591 fail:
1592 sch_stop(sch, NULL);
1593 return ret;
1594 }
1595
1596 20920 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1597 {
1598 int ret, err;
1599
1600 // convert delay to absolute timestamp
1601 20920 timeout_us += av_gettime();
1602
1603 20920 pthread_mutex_lock(&sch->mux_done_lock);
1604
1605
1/2
✓ Branch 0 taken 20920 times.
✗ Branch 1 not taken.
20920 if (sch->nb_mux_done < sch->nb_mux) {
1606 20920 struct timespec tv = { .tv_sec = timeout_us / 1000000,
1607 20920 .tv_nsec = (timeout_us % 1000000) * 1000 };
1608 20920 pthread_cond_timedwait(&sch->mux_done_cond, &sch->mux_done_lock, &tv);
1609 }
1610
1611 20920 ret = sch->nb_mux_done == sch->nb_mux;
1612
1613 20920 pthread_mutex_unlock(&sch->mux_done_lock);
1614
1615 20920 *transcode_ts = atomic_load(&sch->last_dts);
1616
1617 // abort transcoding if any task failed
1618 20920 err = atomic_load(&sch->task_failed);
1619
1620
3/4
✓ Branch 0 taken 14161 times.
✓ Branch 1 taken 6759 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 14161 times.
20920 return ret || err;
1621 }
1622
1623 6434 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1624 {
1625 int ret;
1626
1627 6434 ret = enc->open_cb(enc->task.func_arg, frame);
1628
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6434 times.
6434 if (ret < 0)
1629 return ret;
1630
1631 // ret>0 signals audio frame size, which means sync queue must
1632 // have been enabled during encoder creation
1633
2/2
✓ Branch 0 taken 156 times.
✓ Branch 1 taken 6278 times.
6434 if (ret > 0) {
1634 SchSyncQueue *sq;
1635
1636
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 156 times.
156 av_assert0(enc->sq_idx[0] >= 0);
1637 156 sq = &sch->sq_enc[enc->sq_idx[0]];
1638
1639 156 pthread_mutex_lock(&sq->lock);
1640
1641 156 sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1642
1643 156 pthread_mutex_unlock(&sq->lock);
1644 }
1645
1646 6434 return 0;
1647 }
1648
1649 421912 static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1650 {
1651 int ret;
1652
1653
2/2
✓ Branch 0 taken 13076 times.
✓ Branch 1 taken 408836 times.
421912 if (!frame) {
1654 13076 tq_send_finish(enc->queue, 0);
1655 13076 return 0;
1656 }
1657
1658
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 408836 times.
408836 if (enc->in_finished)
1659 return AVERROR_EOF;
1660
1661 408836 ret = tq_send(enc->queue, 0, frame);
1662
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 408835 times.
408836 if (ret < 0)
1663 1 enc->in_finished = 1;
1664
1665 408836 return ret;
1666 }
1667
1668 33978 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1669 {
1670 33978 SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1671 33978 int ret = 0;
1672
1673 // inform the scheduling code that no more input will arrive along this path;
1674 // this is necessary because the sync queue may not send an EOF downstream
1675 // until other streams finish
1676 // TODO: consider a cleaner way of passing this information through
1677 // the pipeline
1678
2/2
✓ Branch 0 taken 2991 times.
✓ Branch 1 taken 30987 times.
33978 if (!frame) {
1679
2/2
✓ Branch 0 taken 2991 times.
✓ Branch 1 taken 2991 times.
5982 for (unsigned i = 0; i < enc->nb_dst; i++) {
1680 SchMux *mux;
1681 SchMuxStream *ms;
1682
1683
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2991 times.
2991 if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1684 continue;
1685
1686 2991 mux = &sch->mux[enc->dst[i].idx];
1687 2991 ms = &mux->streams[enc->dst[i].idx_stream];
1688
1689 2991 pthread_mutex_lock(&sch->schedule_lock);
1690
1691 2991 ms->source_finished = 1;
1692 2991 schedule_update_locked(sch);
1693
1694 2991 pthread_mutex_unlock(&sch->schedule_lock);
1695 }
1696 }
1697
1698 33978 pthread_mutex_lock(&sq->lock);
1699
1700 33978 ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1701
2/2
✓ Branch 0 taken 33977 times.
✓ Branch 1 taken 1 times.
33978 if (ret < 0)
1702 1 goto finish;
1703
1704 81839 while (1) {
1705 SchEnc *enc;
1706
1707 // TODO: the SQ API should be extended to allow returning EOF
1708 // for individual streams
1709 115816 ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1710
2/2
✓ Branch 0 taken 33977 times.
✓ Branch 1 taken 81839 times.
115816 if (ret < 0) {
1711
2/2
✓ Branch 0 taken 5581 times.
✓ Branch 1 taken 28396 times.
33977 ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1712 33977 break;
1713 }
1714
1715 81839 enc = &sch->enc[sq->enc_idx[ret]];
1716 81839 ret = send_to_enc_thread(sch, enc, sq->frame);
1717
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 81839 times.
81839 if (ret < 0) {
1718 av_frame_unref(sq->frame);
1719 if (ret != AVERROR_EOF)
1720 break;
1721
1722 sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1723 continue;
1724 }
1725 }
1726
1727
2/2
✓ Branch 0 taken 28396 times.
✓ Branch 1 taken 5581 times.
33977 if (ret < 0) {
1728 // close all encoders fed from this sync queue
1729
2/2
✓ Branch 0 taken 5813 times.
✓ Branch 1 taken 5581 times.
11394 for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1730 5813 int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1731
1732 // if the sync queue error is EOF and closing the encoder
1733 // produces a more serious error, make sure to pick the latter
1734
2/4
✓ Branch 0 taken 5813 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 5813 times.
✗ Branch 3 not taken.
5813 ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1735 }
1736 }
1737
1738 33977 finish:
1739 33978 pthread_mutex_unlock(&sq->lock);
1740
1741 33978 return ret;
1742 }
1743
1744 368241 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1745 {
1746
6/6
✓ Branch 0 taken 367324 times.
✓ Branch 1 taken 917 times.
✓ Branch 2 taken 357108 times.
✓ Branch 3 taken 10216 times.
✓ Branch 4 taken 6434 times.
✓ Branch 5 taken 350674 times.
368241 if (enc->open_cb && frame && !enc->opened) {
1747 6434 int ret = enc_open(sch, enc, frame);
1748
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6434 times.
6434 if (ret < 0)
1749 return ret;
1750 6434 enc->opened = 1;
1751
1752 // discard empty frames that only carry encoder init parameters
1753
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 6431 times.
6434 if (!frame->buf[0]) {
1754 3 av_frame_unref(frame);
1755 3 return 0;
1756 }
1757 }
1758
1759 368238 return (enc->sq_idx[0] >= 0) ?
1760
2/2
✓ Branch 0 taken 33978 times.
✓ Branch 1 taken 334260 times.
702498 send_to_enc_sq (sch, enc, frame) :
1761 334260 send_to_enc_thread(sch, enc, frame);
1762 }
1763
1764 3150 static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
1765 {
1766 3150 PreMuxQueue *q = &ms->pre_mux_queue;
1767 3150 AVPacket *tmp_pkt = NULL;
1768 int ret;
1769
1770
2/2
✓ Branch 1 taken 141 times.
✓ Branch 2 taken 3009 times.
3150 if (!av_fifo_can_write(q->fifo)) {
1771 141 size_t packets = av_fifo_can_read(q->fifo);
1772
1/2
✓ Branch 0 taken 141 times.
✗ Branch 1 not taken.
141 size_t pkt_size = pkt ? pkt->size : 0;
1773 141 int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1774
2/2
✓ Branch 0 taken 136 times.
✓ Branch 1 taken 5 times.
141 size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1775 141 size_t new_size = FFMIN(2 * packets, max_packets);
1776
1777
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 141 times.
141 if (new_size <= packets) {
1778 av_log(mux, AV_LOG_ERROR,
1779 "Too many packets buffered for output stream.\n");
1780 return AVERROR(ENOSPC);
1781 }
1782 141 ret = av_fifo_grow2(q->fifo, new_size - packets);
1783
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 141 times.
141 if (ret < 0)
1784 return ret;
1785 }
1786
1787
2/2
✓ Branch 0 taken 3098 times.
✓ Branch 1 taken 52 times.
3150 if (pkt) {
1788 3098 tmp_pkt = av_packet_alloc();
1789
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3098 times.
3098 if (!tmp_pkt)
1790 return AVERROR(ENOMEM);
1791
1792 3098 av_packet_move_ref(tmp_pkt, pkt);
1793 3098 q->data_size += tmp_pkt->size;
1794 }
1795 3150 av_fifo_write(q->fifo, &tmp_pkt, 1);
1796
1797 3150 return 0;
1798 }
1799
1800 476967 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1801 AVPacket *pkt)
1802 {
1803 476967 SchMuxStream *ms = &mux->streams[stream_idx];
1804
2/2
✓ Branch 0 taken 460834 times.
✓ Branch 1 taken 9022 times.
469856 int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1805
2/2
✓ Branch 0 taken 469856 times.
✓ Branch 1 taken 7111 times.
946823 av_rescale_q(pkt->dts + pkt->duration, pkt->time_base, AV_TIME_BASE_Q) :
1806 AV_NOPTS_VALUE;
1807
1808 // queue the packet if the muxer cannot be started yet
1809
2/2
✓ Branch 0 taken 3180 times.
✓ Branch 1 taken 473787 times.
476967 if (!atomic_load(&mux->mux_started)) {
1810 3180 int queued = 0;
1811
1812 // the muxer could have started between the above atomic check and
1813 // locking the mutex, then this block falls through to normal send path
1814 3180 pthread_mutex_lock(&sch->mux_ready_lock);
1815
1816
2/2
✓ Branch 0 taken 3150 times.
✓ Branch 1 taken 30 times.
3180 if (!atomic_load(&mux->mux_started)) {
1817 3150 int ret = mux_queue_packet(mux, ms, pkt);
1818
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3150 times.
3150 queued = ret < 0 ? ret : 1;
1819 }
1820
1821 3180 pthread_mutex_unlock(&sch->mux_ready_lock);
1822
1823
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3180 times.
3180 if (queued < 0)
1824 return queued;
1825
2/2
✓ Branch 0 taken 3150 times.
✓ Branch 1 taken 30 times.
3180 else if (queued)
1826 3150 goto update_schedule;
1827 }
1828
1829
2/2
✓ Branch 0 taken 466758 times.
✓ Branch 1 taken 7059 times.
473817 if (pkt) {
1830 int ret;
1831
1832
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 466758 times.
466758 if (ms->init_eof)
1833 return AVERROR_EOF;
1834
1835 466758 ret = tq_send(mux->queue, stream_idx, pkt);
1836
2/2
✓ Branch 0 taken 55 times.
✓ Branch 1 taken 466703 times.
466758 if (ret < 0)
1837 55 return ret;
1838 } else
1839 7059 tq_send_finish(mux->queue, stream_idx);
1840
1841 476912 update_schedule:
1842 // TODO: use atomics to check whether this changes trailing dts
1843 // to avoid locking unnecesarily
1844
4/4
✓ Branch 0 taken 16133 times.
✓ Branch 1 taken 460779 times.
✓ Branch 2 taken 7111 times.
✓ Branch 3 taken 9022 times.
476912 if (dts != AV_NOPTS_VALUE || !pkt) {
1845 467890 pthread_mutex_lock(&sch->schedule_lock);
1846
1847
2/2
✓ Branch 0 taken 460779 times.
✓ Branch 1 taken 7111 times.
467890 if (pkt) ms->last_dts = dts;
1848 7111 else ms->source_finished = 1;
1849
1850 467890 schedule_update_locked(sch);
1851
1852 467890 pthread_mutex_unlock(&sch->schedule_lock);
1853 }
1854
1855 476912 return 0;
1856 }
1857
1858 static int
1859 462310 demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst,
1860 uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
1861 {
1862 int ret;
1863
1864
2/2
✓ Branch 0 taken 3127 times.
✓ Branch 1 taken 459183 times.
462310 if (*dst_finished)
1865 3127 return AVERROR_EOF;
1866
1867
4/4
✓ Branch 0 taken 455281 times.
✓ Branch 1 taken 3902 times.
✓ Branch 2 taken 67202 times.
✓ Branch 3 taken 388079 times.
459183 if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
1868
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 67200 times.
67202 (flags & DEMUX_SEND_STREAMCOPY_EOF)) {
1869 2 av_packet_unref(pkt);
1870 2 pkt = NULL;
1871 }
1872
1873
2/2
✓ Branch 0 taken 3904 times.
✓ Branch 1 taken 455279 times.
459183 if (!pkt)
1874 3904 goto finish;
1875
1876 910558 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
1877
2/2
✓ Branch 0 taken 67200 times.
✓ Branch 1 taken 388079 times.
455279 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
1878 388079 tq_send(sch->dec[dst.idx].queue, 0, pkt);
1879
2/2
✓ Branch 0 taken 3125 times.
✓ Branch 1 taken 452154 times.
455279 if (ret == AVERROR_EOF)
1880 3125 goto finish;
1881
1882 452154 return ret;
1883
1884 7029 finish:
1885
2/2
✓ Branch 0 taken 639 times.
✓ Branch 1 taken 6390 times.
7029 if (dst.type == SCH_NODE_TYPE_MUX)
1886 639 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
1887 else
1888 6390 tq_send_finish(sch->dec[dst.idx].queue, 0);
1889
1890 7029 *dst_finished = 1;
1891 7029 return AVERROR_EOF;
1892 }
1893
1894 461848 static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds,
1895 AVPacket *pkt, unsigned flags)
1896 {
1897 461848 unsigned nb_done = 0;
1898
1899
2/2
✓ Branch 0 taken 462310 times.
✓ Branch 1 taken 461848 times.
924158 for (unsigned i = 0; i < ds->nb_dst; i++) {
1900 462310 AVPacket *to_send = pkt;
1901 462310 uint8_t *finished = &ds->dst_finished[i];
1902
1903 int ret;
1904
1905 // sending a packet consumes it, so make a temporary reference if needed
1906
4/4
✓ Branch 0 taken 455281 times.
✓ Branch 1 taken 7029 times.
✓ Branch 2 taken 439 times.
✓ Branch 3 taken 454842 times.
462310 if (pkt && i < ds->nb_dst - 1) {
1907 439 to_send = d->send_pkt;
1908
1909 439 ret = av_packet_ref(to_send, pkt);
1910
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 439 times.
439 if (ret < 0)
1911 return ret;
1912 }
1913
1914 462310 ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
1915
2/2
✓ Branch 0 taken 455281 times.
✓ Branch 1 taken 7029 times.
462310 if (to_send)
1916 455281 av_packet_unref(to_send);
1917
2/2
✓ Branch 0 taken 10156 times.
✓ Branch 1 taken 452154 times.
462310 if (ret == AVERROR_EOF)
1918 10156 nb_done++;
1919
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 452154 times.
452154 else if (ret < 0)
1920 return ret;
1921 }
1922
1923
2/2
✓ Branch 0 taken 10132 times.
✓ Branch 1 taken 451716 times.
461848 return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
1924 }
1925
1926 11 static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
1927 {
1928 11 Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
1929
1930
3/6
✓ Branch 0 taken 11 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 11 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 11 times.
11 av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
1931
1932
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 11 times.
25 for (unsigned i = 0; i < d->nb_streams; i++) {
1933 14 SchDemuxStream *ds = &d->streams[i];
1934
1935
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 14 times.
28 for (unsigned j = 0; j < ds->nb_dst; j++) {
1936 14 const SchedulerNode *dst = &ds->dst[j];
1937 SchDec *dec;
1938 int ret;
1939
1940
3/4
✓ Branch 0 taken 14 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
✓ Branch 3 taken 6 times.
14 if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
1941 8 continue;
1942
1943 6 dec = &sch->dec[dst->idx];
1944
1945 6 ret = tq_send(dec->queue, 0, pkt);
1946
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0)
1947 return ret;
1948
1949
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 3 times.
6 if (dec->queue_end_ts) {
1950 Timestamp ts;
1951 3 ret = av_thread_message_queue_recv(dec->queue_end_ts, &ts, 0);
1952
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret < 0)
1953 return ret;
1954
1955
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (max_end_ts.ts == AV_NOPTS_VALUE ||
1956 (ts.ts != AV_NOPTS_VALUE &&
1957 av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
1958 3 max_end_ts = ts;
1959
1960 }
1961 }
1962 }
1963
1964 11 pkt->pts = max_end_ts.ts;
1965 11 pkt->time_base = max_end_ts.tb;
1966
1967 11 return 0;
1968 }
1969
1970 454899 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
1971 unsigned flags)
1972 {
1973 SchDemux *d;
1974 int terminate;
1975
1976
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 454899 times.
454899 av_assert0(demux_idx < sch->nb_demux);
1977 454899 d = &sch->demux[demux_idx];
1978
1979 454899 terminate = waiter_wait(sch, &d->waiter);
1980
2/2
✓ Branch 0 taken 46 times.
✓ Branch 1 taken 454853 times.
454899 if (terminate)
1981 46 return AVERROR_EXIT;
1982
1983 // flush the downstreams after seek
1984
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 454842 times.
454853 if (pkt->stream_index == -1)
1985 11 return demux_flush(sch, d, pkt);
1986
1987
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 454842 times.
454842 av_assert0(pkt->stream_index < d->nb_streams);
1988
1989 454842 return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
1990 }
1991
1992 6790 static int demux_done(Scheduler *sch, unsigned demux_idx)
1993 {
1994 6790 SchDemux *d = &sch->demux[demux_idx];
1995 6790 int ret = 0;
1996
1997
2/2
✓ Branch 0 taken 7006 times.
✓ Branch 1 taken 6790 times.
13796 for (unsigned i = 0; i < d->nb_streams; i++) {
1998 7006 int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
1999
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7006 times.
7006 if (err != AVERROR_EOF)
2000 ret = err_merge(ret, err);
2001 }
2002
2003 6790 pthread_mutex_lock(&sch->schedule_lock);
2004
2005 6790 d->task_exited = 1;
2006
2007 6790 schedule_update_locked(sch);
2008
2009 6790 pthread_mutex_unlock(&sch->schedule_lock);
2010
2011 6790 return ret;
2012 }
2013
2014 483383 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2015 {
2016 SchMux *mux;
2017 int ret, stream_idx;
2018
2019
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 483383 times.
483383 av_assert0(mux_idx < sch->nb_mux);
2020 483383 mux = &sch->mux[mux_idx];
2021
2022 483383 ret = tq_receive(mux->queue, &stream_idx, pkt);
2023 483383 pkt->stream_index = stream_idx;
2024 483383 return ret;
2025 }
2026
2027 198 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2028 {
2029 SchMux *mux;
2030
2031
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 198 times.
198 av_assert0(mux_idx < sch->nb_mux);
2032 198 mux = &sch->mux[mux_idx];
2033
2034
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 198 times.
198 av_assert0(stream_idx < mux->nb_streams);
2035 198 tq_receive_finish(mux->queue, stream_idx);
2036
2037 198 pthread_mutex_lock(&sch->schedule_lock);
2038 198 mux->streams[stream_idx].source_finished = 1;
2039
2040 198 schedule_update_locked(sch);
2041
2042 198 pthread_mutex_unlock(&sch->schedule_lock);
2043 198 }
2044
2045 433394 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2046 const AVPacket *pkt)
2047 {
2048 SchMux *mux;
2049 SchMuxStream *ms;
2050
2051
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 433394 times.
433394 av_assert0(mux_idx < sch->nb_mux);
2052 433394 mux = &sch->mux[mux_idx];
2053
2054
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 433394 times.
433394 av_assert0(stream_idx < mux->nb_streams);
2055 433394 ms = &mux->streams[stream_idx];
2056
2057
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 433394 times.
433399 for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2058 5 SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2059 int ret;
2060
2061 5 ret = av_packet_copy_props(mux->sub_heartbeat_pkt, pkt);
2062
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
5 if (ret < 0)
2063 return ret;
2064
2065 5 tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2066 }
2067
2068 433394 return 0;
2069 }
2070
2071 6761 static int mux_done(Scheduler *sch, unsigned mux_idx)
2072 {
2073 6761 SchMux *mux = &sch->mux[mux_idx];
2074
2075 6761 pthread_mutex_lock(&sch->schedule_lock);
2076
2077
2/2
✓ Branch 0 taken 7111 times.
✓ Branch 1 taken 6761 times.
13872 for (unsigned i = 0; i < mux->nb_streams; i++) {
2078 7111 tq_receive_finish(mux->queue, i);
2079 7111 mux->streams[i].source_finished = 1;
2080 }
2081
2082 6761 schedule_update_locked(sch);
2083
2084 6761 pthread_mutex_unlock(&sch->schedule_lock);
2085
2086 6761 pthread_mutex_lock(&sch->mux_done_lock);
2087
2088
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6761 times.
6761 av_assert0(sch->nb_mux_done < sch->nb_mux);
2089 6761 sch->nb_mux_done++;
2090
2091 6761 pthread_cond_signal(&sch->mux_done_cond);
2092
2093 6761 pthread_mutex_unlock(&sch->mux_done_lock);
2094
2095 6761 return 0;
2096 }
2097
2098 364361 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2099 {
2100 SchDec *dec;
2101 int ret, dummy;
2102
2103
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 364361 times.
364361 av_assert0(dec_idx < sch->nb_dec);
2104 364361 dec = &sch->dec[dec_idx];
2105
2106 // the decoder should have given us post-flush end timestamp in pkt
2107
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 364358 times.
364361 if (dec->expect_end_ts) {
2108 3 Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2109 3 ret = av_thread_message_queue_send(dec->queue_end_ts, &ts, 0);
2110
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret < 0)
2111 return ret;
2112
2113 3 dec->expect_end_ts = 0;
2114 }
2115
2116 364361 ret = tq_receive(dec->queue, &dummy, pkt);
2117
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 364361 times.
364361 av_assert0(dummy <= 0);
2118
2119 // got a flush packet, on the next call to this function the decoder
2120 // will give us post-flush end timestamp
2121
7/8
✓ Branch 0 taken 361101 times.
✓ Branch 1 taken 3260 times.
✓ Branch 2 taken 958 times.
✓ Branch 3 taken 360143 times.
✓ Branch 4 taken 958 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 3 times.
✓ Branch 7 taken 955 times.
364361 if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2122 3 dec->expect_end_ts = 1;
2123
2124 364361 return ret;
2125 }
2126
2127 392310 static int send_to_filter(Scheduler *sch, SchFilterGraph *fg,
2128 unsigned in_idx, AVFrame *frame)
2129 {
2130
2/2
✓ Branch 0 taken 385911 times.
✓ Branch 1 taken 6399 times.
392310 if (frame)
2131 385911 return tq_send(fg->queue, in_idx, frame);
2132
2133
1/2
✓ Branch 0 taken 6399 times.
✗ Branch 1 not taken.
6399 if (!fg->inputs[in_idx].send_finished) {
2134 6399 fg->inputs[in_idx].send_finished = 1;
2135 6399 tq_send_finish(fg->queue, in_idx);
2136
2137 // close the control stream when all actual inputs are done
2138
2/2
✓ Branch 0 taken 6329 times.
✓ Branch 1 taken 70 times.
6399 if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
2139 6329 tq_send_finish(fg->queue, fg->nb_inputs);
2140 }
2141 6399 return 0;
2142 }
2143
2144 396410 static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2145 uint8_t *dst_finished, AVFrame *frame)
2146 {
2147 int ret;
2148
2149
2/2
✓ Branch 0 taken 6357 times.
✓ Branch 1 taken 390053 times.
396410 if (*dst_finished)
2150 6357 return AVERROR_EOF;
2151
2152
2/2
✓ Branch 0 taken 3263 times.
✓ Branch 1 taken 386790 times.
390053 if (!frame)
2153 3263 goto finish;
2154
2155 773580 ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2156
2/2
✓ Branch 0 taken 385911 times.
✓ Branch 1 taken 879 times.
386790 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2157 879 send_to_enc(sch, &sch->enc[dst.idx], frame);
2158
2/2
✓ Branch 0 taken 3174 times.
✓ Branch 1 taken 383616 times.
386790 if (ret == AVERROR_EOF)
2159 3174 goto finish;
2160
2161 383616 return ret;
2162
2163 6437 finish:
2164
2/2
✓ Branch 0 taken 6399 times.
✓ Branch 1 taken 38 times.
6437 if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2165 6399 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2166 else
2167 38 send_to_enc(sch, &sch->enc[dst.idx], NULL);
2168
2169 6437 *dst_finished = 1;
2170
2171 6437 return AVERROR_EOF;
2172 }
2173
2174 388630 int sch_dec_send(Scheduler *sch, unsigned dec_idx, AVFrame *frame)
2175 {
2176 SchDec *dec;
2177 388630 int ret = 0;
2178 388630 unsigned nb_done = 0;
2179
2180
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 388630 times.
388630 av_assert0(dec_idx < sch->nb_dec);
2181 388630 dec = &sch->dec[dec_idx];
2182
2183
2/2
✓ Branch 0 taken 389973 times.
✓ Branch 1 taken 388630 times.
778603 for (unsigned i = 0; i < dec->nb_dst; i++) {
2184 389973 uint8_t *finished = &dec->dst_finished[i];
2185 389973 AVFrame *to_send = frame;
2186
2187 // sending a frame consumes it, so make a temporary reference if needed
2188
2/2
✓ Branch 0 taken 1343 times.
✓ Branch 1 taken 388630 times.
389973 if (i < dec->nb_dst - 1) {
2189 1343 to_send = dec->send_frame;
2190
2191 // frame may sometimes contain props only,
2192 // e.g. to signal EOF timestamp
2193
2/2
✓ Branch 0 taken 1251 times.
✓ Branch 1 taken 92 times.
1343 ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2194 92 av_frame_copy_props(to_send, frame);
2195
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1343 times.
1343 if (ret < 0)
2196 return ret;
2197 }
2198
2199 389973 ret = dec_send_to_dst(sch, dec->dst[i], finished, to_send);
2200
2/2
✓ Branch 0 taken 6357 times.
✓ Branch 1 taken 383616 times.
389973 if (ret < 0) {
2201 6357 av_frame_unref(to_send);
2202
1/2
✓ Branch 0 taken 6357 times.
✗ Branch 1 not taken.
6357 if (ret == AVERROR_EOF) {
2203 6357 nb_done++;
2204 6357 ret = 0;
2205 6357 continue;
2206 }
2207 return ret;
2208 }
2209 }
2210
2211
2/2
✓ Branch 0 taken 6277 times.
✓ Branch 1 taken 382353 times.
388630 return (nb_done == dec->nb_dst) ? AVERROR_EOF : 0;
2212 }
2213
2214 6391 static int dec_done(Scheduler *sch, unsigned dec_idx)
2215 {
2216 6391 SchDec *dec = &sch->dec[dec_idx];
2217 6391 int ret = 0;
2218
2219 6391 tq_receive_finish(dec->queue, 0);
2220
2221 // make sure our source does not get stuck waiting for end timestamps
2222 // that will never arrive
2223
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6390 times.
6391 if (dec->queue_end_ts)
2224 1 av_thread_message_queue_set_err_recv(dec->queue_end_ts, AVERROR_EOF);
2225
2226
2/2
✓ Branch 0 taken 6437 times.
✓ Branch 1 taken 6391 times.
12828 for (unsigned i = 0; i < dec->nb_dst; i++) {
2227 6437 int err = dec_send_to_dst(sch, dec->dst[i], &dec->dst_finished[i], NULL);
2228
2/4
✓ Branch 0 taken 6437 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6437 times.
6437 if (err < 0 && err != AVERROR_EOF)
2229 ret = err_merge(ret, err);
2230 }
2231
2232 6391 return ret;
2233 }
2234
2235 415303 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2236 {
2237 SchEnc *enc;
2238 int ret, dummy;
2239
2240
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 415303 times.
415303 av_assert0(enc_idx < sch->nb_enc);
2241 415303 enc = &sch->enc[enc_idx];
2242
2243 415303 ret = tq_receive(enc->queue, &dummy, frame);
2244
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 415303 times.
415303 av_assert0(dummy <= 0);
2245
2246 415303 return ret;
2247 }
2248
2249 409221 static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2250 uint8_t *dst_finished, AVPacket *pkt)
2251 {
2252 int ret;
2253
2254
2/2
✓ Branch 0 taken 44 times.
✓ Branch 1 taken 409177 times.
409221 if (*dst_finished)
2255 44 return AVERROR_EOF;
2256
2257
2/2
✓ Branch 0 taken 6471 times.
✓ Branch 1 taken 402706 times.
409177 if (!pkt)
2258 6471 goto finish;
2259
2260 805412 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2261
2/2
✓ Branch 0 taken 402656 times.
✓ Branch 1 taken 50 times.
402706 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2262 50 tq_send(sch->dec[dst.idx].queue, 0, pkt);
2263
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 402704 times.
402706 if (ret == AVERROR_EOF)
2264 2 goto finish;
2265
2266 402704 return ret;
2267
2268 6473 finish:
2269
2/2
✓ Branch 0 taken 6472 times.
✓ Branch 1 taken 1 times.
6473 if (dst.type == SCH_NODE_TYPE_MUX)
2270 6472 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2271 else
2272 1 tq_send_finish(sch->dec[dst.idx].queue, 0);
2273
2274 6473 *dst_finished = 1;
2275
2276 6473 return AVERROR_EOF;
2277 }
2278
2279 402698 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2280 {
2281 SchEnc *enc;
2282 int ret;
2283
2284
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 402698 times.
402698 av_assert0(enc_idx < sch->nb_enc);
2285 402698 enc = &sch->enc[enc_idx];
2286
2287
2/2
✓ Branch 0 taken 402748 times.
✓ Branch 1 taken 402698 times.
805446 for (unsigned i = 0; i < enc->nb_dst; i++) {
2288 402748 uint8_t *finished = &enc->dst_finished[i];
2289 402748 AVPacket *to_send = pkt;
2290
2291 // sending a packet consumes it, so make a temporary reference if needed
2292
2/2
✓ Branch 0 taken 50 times.
✓ Branch 1 taken 402698 times.
402748 if (i < enc->nb_dst - 1) {
2293 50 to_send = enc->send_pkt;
2294
2295 50 ret = av_packet_ref(to_send, pkt);
2296
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50 times.
50 if (ret < 0)
2297 return ret;
2298 }
2299
2300 402748 ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2301
2/2
✓ Branch 0 taken 44 times.
✓ Branch 1 taken 402704 times.
402748 if (ret < 0) {
2302 44 av_packet_unref(to_send);
2303
1/2
✓ Branch 0 taken 44 times.
✗ Branch 1 not taken.
44 if (ret == AVERROR_EOF)
2304 44 continue;
2305 return ret;
2306 }
2307 }
2308
2309 402698 return 0;
2310 }
2311
2312 6472 static int enc_done(Scheduler *sch, unsigned enc_idx)
2313 {
2314 6472 SchEnc *enc = &sch->enc[enc_idx];
2315 6472 int ret = 0;
2316
2317 6472 tq_receive_finish(enc->queue, 0);
2318
2319
2/2
✓ Branch 0 taken 6473 times.
✓ Branch 1 taken 6472 times.
12945 for (unsigned i = 0; i < enc->nb_dst; i++) {
2320 6473 int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2321
2/4
✓ Branch 0 taken 6473 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6473 times.
6473 if (err < 0 && err != AVERROR_EOF)
2322 ret = err_merge(ret, err);
2323 }
2324
2325 6472 return ret;
2326 }
2327
2328 361375 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
2329 unsigned *in_idx, AVFrame *frame)
2330 {
2331 SchFilterGraph *fg;
2332
2333
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 361375 times.
361375 av_assert0(fg_idx < sch->nb_filters);
2334 361375 fg = &sch->filters[fg_idx];
2335
2336
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 361375 times.
361375 av_assert0(*in_idx <= fg->nb_inputs);
2337
2338 // update scheduling to account for desired input stream, if it changed
2339 //
2340 // this check needs no locking because only the filtering thread
2341 // updates this value
2342
2/2
✓ Branch 0 taken 833 times.
✓ Branch 1 taken 360542 times.
361375 if (*in_idx != fg->best_input) {
2343 833 pthread_mutex_lock(&sch->schedule_lock);
2344
2345 833 fg->best_input = *in_idx;
2346 833 schedule_update_locked(sch);
2347
2348 833 pthread_mutex_unlock(&sch->schedule_lock);
2349 }
2350
2351
2/2
✓ Branch 0 taken 359714 times.
✓ Branch 1 taken 1661 times.
361375 if (*in_idx == fg->nb_inputs) {
2352 1661 int terminate = waiter_wait(sch, &fg->waiter);
2353
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1661 times.
1661 return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
2354 }
2355
2356 9 while (1) {
2357 int ret, idx;
2358
2359 359723 ret = tq_receive(fg->queue, &idx, frame);
2360
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 359723 times.
359723 if (idx < 0)
2361 359714 return AVERROR_EOF;
2362
2/2
✓ Branch 0 taken 359714 times.
✓ Branch 1 taken 9 times.
359723 else if (ret >= 0) {
2363 359714 *in_idx = idx;
2364 359714 return 0;
2365 }
2366
2367 // disregard EOFs for specific streams - they should always be
2368 // preceded by an EOF frame
2369 }
2370 }
2371
2372 1 void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
2373 {
2374 SchFilterGraph *fg;
2375 SchFilterIn *fi;
2376
2377
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(fg_idx < sch->nb_filters);
2378 1 fg = &sch->filters[fg_idx];
2379
2380
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(in_idx < fg->nb_inputs);
2381 1 fi = &fg->inputs[in_idx];
2382
2383
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!fi->receive_finished) {
2384 1 fi->receive_finished = 1;
2385 1 tq_receive_finish(fg->queue, in_idx);
2386
2387 // close the control stream when all actual inputs are done
2388
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
2389 tq_receive_finish(fg->queue, fg->nb_inputs);
2390 }
2391 1 }
2392
2393 360890 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
2394 {
2395 SchFilterGraph *fg;
2396 SchedulerNode dst;
2397
2398
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 360890 times.
360890 av_assert0(fg_idx < sch->nb_filters);
2399 360890 fg = &sch->filters[fg_idx];
2400
2401
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 360890 times.
360890 av_assert0(out_idx < fg->nb_outputs);
2402 360890 dst = fg->outputs[out_idx].dst;
2403
2404 360890 return (dst.type == SCH_NODE_TYPE_ENC) ?
2405
1/2
✓ Branch 0 taken 360890 times.
✗ Branch 1 not taken.
360890 send_to_enc (sch, &sch->enc[dst.idx], frame) :
2406 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
2407 }
2408
2409 6384 static int filter_done(Scheduler *sch, unsigned fg_idx)
2410 {
2411 6384 SchFilterGraph *fg = &sch->filters[fg_idx];
2412 6384 int ret = 0;
2413
2414
2/2
✓ Branch 0 taken 12783 times.
✓ Branch 1 taken 6384 times.
19167 for (unsigned i = 0; i <= fg->nb_inputs; i++)
2415 12783 tq_receive_finish(fg->queue, i);
2416
2417
2/2
✓ Branch 0 taken 6434 times.
✓ Branch 1 taken 6384 times.
12818 for (unsigned i = 0; i < fg->nb_outputs; i++) {
2418 6434 SchedulerNode dst = fg->outputs[i].dst;
2419 12868 int err = (dst.type == SCH_NODE_TYPE_ENC) ?
2420
1/2
✓ Branch 0 taken 6434 times.
✗ Branch 1 not taken.
6434 send_to_enc (sch, &sch->enc[dst.idx], NULL) :
2421 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2422
2423
3/4
✓ Branch 0 taken 2805 times.
✓ Branch 1 taken 3629 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2805 times.
6434 if (err < 0 && err != AVERROR_EOF)
2424 ret = err_merge(ret, err);
2425 }
2426
2427 6384 pthread_mutex_lock(&sch->schedule_lock);
2428
2429 6384 fg->task_exited = 1;
2430
2431 6384 schedule_update_locked(sch);
2432
2433 6384 pthread_mutex_unlock(&sch->schedule_lock);
2434
2435 6384 return ret;
2436 }
2437
2438 int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
2439 {
2440 SchFilterGraph *fg;
2441
2442 av_assert0(fg_idx < sch->nb_filters);
2443 fg = &sch->filters[fg_idx];
2444
2445 return send_to_filter(sch, fg, fg->nb_inputs, frame);
2446 }
2447
2448 32798 static int task_cleanup(Scheduler *sch, SchedulerNode node)
2449 {
2450
5/6
✓ Branch 0 taken 6790 times.
✓ Branch 1 taken 6761 times.
✓ Branch 2 taken 6391 times.
✓ Branch 3 taken 6472 times.
✓ Branch 4 taken 6384 times.
✗ Branch 5 not taken.
32798 switch (node.type) {
2451 6790 case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
2452 6761 case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
2453 6391 case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
2454 6472 case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
2455 6384 case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
2456 default: av_assert0(0);
2457 }
2458 }
2459
2460 32784 static void *task_wrapper(void *arg)
2461 {
2462 32784 SchTask *task = arg;
2463 32784 Scheduler *sch = task->parent;
2464 int ret;
2465 32784 int err = 0;
2466
2467 32784 ret = task->func(task->func_arg);
2468
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 32783 times.
32784 if (ret < 0)
2469 1 av_log(task->func_arg, AV_LOG_ERROR,
2470 1 "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
2471
2472 32784 err = task_cleanup(sch, task->node);
2473 32784 ret = err_merge(ret, err);
2474
2475 // EOF is considered normal termination
2476
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 32784 times.
32784 if (ret == AVERROR_EOF)
2477 ret = 0;
2478
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 32783 times.
32784 if (ret < 0)
2479 1 atomic_store(&sch->task_failed, 1);
2480
2481
4/4
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 32783 times.
✓ Branch 2 taken 1 times.
✓ Branch 3 taken 32783 times.
32785 av_log(task->func_arg, ret < 0 ? AV_LOG_ERROR : AV_LOG_VERBOSE,
2482 "Terminating thread with return code %d (%s)\n", ret,
2483 1 ret < 0 ? av_err2str(ret) : "success");
2484
2485 32784 return (void*)(intptr_t)ret;
2486 }
2487
2488 32798 static int task_stop(Scheduler *sch, SchTask *task)
2489 {
2490 int ret;
2491 void *thread_ret;
2492
2493
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 32784 times.
32798 if (!task->thread_running)
2494 14 return task_cleanup(sch, task->node);
2495
2496 32784 ret = pthread_join(task->thread, &thread_ret);
2497
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 32784 times.
32784 av_assert0(ret == 0);
2498
2499 32784 task->thread_running = 0;
2500
2501 32784 return (intptr_t)thread_ret;
2502 }
2503
2504 13519 int sch_stop(Scheduler *sch, int64_t *finish_ts)
2505 {
2506 13519 int ret = 0, err;
2507
2508
2/2
✓ Branch 0 taken 6760 times.
✓ Branch 1 taken 6759 times.
13519 if (sch->state != SCH_STATE_STARTED)
2509 6760 return 0;
2510
2511 6759 atomic_store(&sch->terminate, 1);
2512
2513
2/2
✓ Branch 0 taken 13518 times.
✓ Branch 1 taken 6759 times.
20277 for (unsigned type = 0; type < 2; type++)
2514
4/4
✓ Branch 0 taken 13549 times.
✓ Branch 1 taken 13143 times.
✓ Branch 2 taken 13174 times.
✓ Branch 3 taken 13518 times.
26692 for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
2515
2/2
✓ Branch 0 taken 6790 times.
✓ Branch 1 taken 6384 times.
13174 SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
2516 13174 waiter_set(w, 1);
2517 }
2518
2519
2/2
✓ Branch 0 taken 6790 times.
✓ Branch 1 taken 6759 times.
13549 for (unsigned i = 0; i < sch->nb_demux; i++) {
2520 6790 SchDemux *d = &sch->demux[i];
2521
2522 6790 err = task_stop(sch, &d->task);
2523 6790 ret = err_merge(ret, err);
2524 }
2525
2526
2/2
✓ Branch 0 taken 6391 times.
✓ Branch 1 taken 6759 times.
13150 for (unsigned i = 0; i < sch->nb_dec; i++) {
2527 6391 SchDec *dec = &sch->dec[i];
2528
2529 6391 err = task_stop(sch, &dec->task);
2530 6391 ret = err_merge(ret, err);
2531 }
2532
2533
2/2
✓ Branch 0 taken 6384 times.
✓ Branch 1 taken 6759 times.
13143 for (unsigned i = 0; i < sch->nb_filters; i++) {
2534 6384 SchFilterGraph *fg = &sch->filters[i];
2535
2536 6384 err = task_stop(sch, &fg->task);
2537 6384 ret = err_merge(ret, err);
2538 }
2539
2540
2/2
✓ Branch 0 taken 6472 times.
✓ Branch 1 taken 6759 times.
13231 for (unsigned i = 0; i < sch->nb_enc; i++) {
2541 6472 SchEnc *enc = &sch->enc[i];
2542
2543 6472 err = task_stop(sch, &enc->task);
2544 6472 ret = err_merge(ret, err);
2545 }
2546
2547
2/2
✓ Branch 0 taken 6761 times.
✓ Branch 1 taken 6759 times.
13520 for (unsigned i = 0; i < sch->nb_mux; i++) {
2548 6761 SchMux *mux = &sch->mux[i];
2549
2550 6761 err = task_stop(sch, &mux->task);
2551 6761 ret = err_merge(ret, err);
2552 }
2553
2554
1/2
✓ Branch 0 taken 6759 times.
✗ Branch 1 not taken.
6759 if (finish_ts)
2555 6759 *finish_ts = trailing_dts(sch, 1);
2556
2557 6759 sch->state = SCH_STATE_STOPPED;
2558
2559 6759 return ret;
2560 }
2561