FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/fftools/ffmpeg_sched.c
Date: 2024-07-26 21:54:09
Exec Total Coverage
Lines: 1031 1185 87.0%
Functions: 64 66 97.0%
Branches: 568 789 72.0%

Line Branch Exec Source
1 /*
2 * Inter-thread scheduling/synchronization.
3 * Copyright (c) 2023 Anton Khirnov
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31
32 #include "libavcodec/packet.h"
33
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
40 #include "libavutil/threadmessage.h"
41 #include "libavutil/time.h"
42
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
46
47 enum QueueType {
48 QUEUE_PACKETS,
49 QUEUE_FRAMES,
50 };
51
52 typedef struct SchWaiter {
53 pthread_mutex_t lock;
54 pthread_cond_t cond;
55 atomic_int choked;
56
57 // the following are internal state of schedule_update_locked() and must not
58 // be accessed outside of it
59 int choked_prev;
60 int choked_next;
61 } SchWaiter;
62
63 typedef struct SchTask {
64 Scheduler *parent;
65 SchedulerNode node;
66
67 SchThreadFunc func;
68 void *func_arg;
69
70 pthread_t thread;
71 int thread_running;
72 } SchTask;
73
74 typedef struct SchDec {
75 const AVClass *class;
76
77 SchedulerNode src;
78 SchedulerNode *dst;
79 uint8_t *dst_finished;
80 unsigned nb_dst;
81
82 SchTask task;
83 // Queue for receiving input packets, one stream.
84 ThreadQueue *queue;
85
86 // Queue for sending post-flush end timestamps back to the source
87 AVThreadMessageQueue *queue_end_ts;
88 int expect_end_ts;
89
90 // temporary storage used by sch_dec_send()
91 AVFrame *send_frame;
92 } SchDec;
93
94 typedef struct SchSyncQueue {
95 SyncQueue *sq;
96 AVFrame *frame;
97 pthread_mutex_t lock;
98
99 unsigned *enc_idx;
100 unsigned nb_enc_idx;
101 } SchSyncQueue;
102
103 typedef struct SchEnc {
104 const AVClass *class;
105
106 SchedulerNode src;
107 SchedulerNode *dst;
108 uint8_t *dst_finished;
109 unsigned nb_dst;
110
111 // [0] - index of the sync queue in Scheduler.sq_enc,
112 // [1] - index of this encoder in the sq
113 int sq_idx[2];
114
115 /* Opening encoders is somewhat nontrivial due to their interaction with
116 * sync queues, which are (among other things) responsible for maintaining
117 * constant audio frame size, when it is required by the encoder.
118 *
119 * Opening the encoder requires stream parameters, obtained from the first
120 * frame. However, that frame cannot be properly chunked by the sync queue
121 * without knowing the required frame size, which is only available after
122 * opening the encoder.
123 *
124 * This apparent circular dependency is resolved in the following way:
125 * - the caller creating the encoder gives us a callback which opens the
126 * encoder and returns the required frame size (if any)
127 * - when the first frame is sent to the encoder, the sending thread
128 * - calls this callback, opening the encoder
129 * - passes the returned frame size to the sync queue
130 */
131 int (*open_cb)(void *opaque, const AVFrame *frame);
132 int opened;
133
134 SchTask task;
135 // Queue for receiving input frames, one stream.
136 ThreadQueue *queue;
137 // tq_send() to queue returned EOF
138 int in_finished;
139
140 // temporary storage used by sch_enc_send()
141 AVPacket *send_pkt;
142 } SchEnc;
143
144 typedef struct SchDemuxStream {
145 SchedulerNode *dst;
146 uint8_t *dst_finished;
147 unsigned nb_dst;
148 } SchDemuxStream;
149
150 typedef struct SchDemux {
151 const AVClass *class;
152
153 SchDemuxStream *streams;
154 unsigned nb_streams;
155
156 SchTask task;
157 SchWaiter waiter;
158
159 // temporary storage used by sch_demux_send()
160 AVPacket *send_pkt;
161
162 // protected by schedule_lock
163 int task_exited;
164 } SchDemux;
165
166 typedef struct PreMuxQueue {
167 /**
168 * Queue for buffering the packets before the muxer task can be started.
169 */
170 AVFifo *fifo;
171 /**
172 * Maximum number of packets in fifo.
173 */
174 int max_packets;
175 /*
176 * The size of the AVPackets' buffers in queue.
177 * Updated when a packet is either pushed or pulled from the queue.
178 */
179 size_t data_size;
180 /* Threshold after which max_packets will be in effect */
181 size_t data_threshold;
182 } PreMuxQueue;
183
184 typedef struct SchMuxStream {
185 SchedulerNode src;
186 SchedulerNode src_sched;
187
188 unsigned *sub_heartbeat_dst;
189 unsigned nb_sub_heartbeat_dst;
190
191 PreMuxQueue pre_mux_queue;
192
193 // an EOF was generated while flushing the pre-mux queue
194 int init_eof;
195
196 ////////////////////////////////////////////////////////////
197 // The following are protected by Scheduler.schedule_lock //
198
199 /* dts+duration of the last packet sent to this stream
200 in AV_TIME_BASE_Q */
201 int64_t last_dts;
202 // this stream no longer accepts input
203 int source_finished;
204 ////////////////////////////////////////////////////////////
205 } SchMuxStream;
206
207 typedef struct SchMux {
208 const AVClass *class;
209
210 SchMuxStream *streams;
211 unsigned nb_streams;
212 unsigned nb_streams_ready;
213
214 int (*init)(void *arg);
215
216 SchTask task;
217 /**
218 * Set to 1 after starting the muxer task and flushing the
219 * pre-muxing queues.
220 * Set either before any tasks have started, or with
221 * Scheduler.mux_ready_lock held.
222 */
223 atomic_int mux_started;
224 ThreadQueue *queue;
225 unsigned queue_size;
226
227 AVPacket *sub_heartbeat_pkt;
228 } SchMux;
229
230 typedef struct SchFilterIn {
231 SchedulerNode src;
232 SchedulerNode src_sched;
233 int send_finished;
234 int receive_finished;
235 } SchFilterIn;
236
237 typedef struct SchFilterOut {
238 SchedulerNode dst;
239 } SchFilterOut;
240
241 typedef struct SchFilterGraph {
242 const AVClass *class;
243
244 SchFilterIn *inputs;
245 unsigned nb_inputs;
246 atomic_uint nb_inputs_finished_send;
247 unsigned nb_inputs_finished_receive;
248
249 SchFilterOut *outputs;
250 unsigned nb_outputs;
251
252 SchTask task;
253 // input queue, nb_inputs+1 streams
254 // last stream is control
255 ThreadQueue *queue;
256 SchWaiter waiter;
257
258 // protected by schedule_lock
259 unsigned best_input;
260 int task_exited;
261 } SchFilterGraph;
262
263 enum SchedulerState {
264 SCH_STATE_UNINIT,
265 SCH_STATE_STARTED,
266 SCH_STATE_STOPPED,
267 };
268
269 struct Scheduler {
270 const AVClass *class;
271
272 SchDemux *demux;
273 unsigned nb_demux;
274
275 SchMux *mux;
276 unsigned nb_mux;
277
278 unsigned nb_mux_ready;
279 pthread_mutex_t mux_ready_lock;
280
281 unsigned nb_mux_done;
282 pthread_mutex_t mux_done_lock;
283 pthread_cond_t mux_done_cond;
284
285
286 SchDec *dec;
287 unsigned nb_dec;
288
289 SchEnc *enc;
290 unsigned nb_enc;
291
292 SchSyncQueue *sq_enc;
293 unsigned nb_sq_enc;
294
295 SchFilterGraph *filters;
296 unsigned nb_filters;
297
298 char *sdp_filename;
299 int sdp_auto;
300
301 enum SchedulerState state;
302 atomic_int terminate;
303 atomic_int task_failed;
304
305 pthread_mutex_t schedule_lock;
306
307 atomic_int_least64_t last_dts;
308 };
309
310 /**
311 * Wait until this task is allowed to proceed.
312 *
313 * @retval 0 the caller should proceed
314 * @retval 1 the caller should terminate
315 */
316 461315 static int waiter_wait(Scheduler *sch, SchWaiter *w)
317 {
318 int terminate;
319
320
2/2
✓ Branch 0 taken 461023 times.
✓ Branch 1 taken 292 times.
461315 if (!atomic_load(&w->choked))
321 461023 return 0;
322
323 292 pthread_mutex_lock(&w->lock);
324
325
4/4
✓ Branch 0 taken 299 times.
✓ Branch 1 taken 242 times.
✓ Branch 2 taken 249 times.
✓ Branch 3 taken 50 times.
541 while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
326 249 pthread_cond_wait(&w->cond, &w->lock);
327
328 292 terminate = atomic_load(&sch->terminate);
329
330 292 pthread_mutex_unlock(&w->lock);
331
332 292 return terminate;
333 }
334
335 30665 static void waiter_set(SchWaiter *w, int choked)
336 {
337 30665 pthread_mutex_lock(&w->lock);
338
339 30665 atomic_store(&w->choked, choked);
340 30665 pthread_cond_signal(&w->cond);
341
342 30665 pthread_mutex_unlock(&w->lock);
343 30665 }
344
345 13208 static int waiter_init(SchWaiter *w)
346 {
347 int ret;
348
349 13208 atomic_init(&w->choked, 0);
350
351 13208 ret = pthread_mutex_init(&w->lock, NULL);
352
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13208 times.
13208 if (ret)
353 return AVERROR(ret);
354
355 13208 ret = pthread_cond_init(&w->cond, NULL);
356
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13208 times.
13208 if (ret)
357 return AVERROR(ret);
358
359 13208 return 0;
360 }
361
362 13208 static void waiter_uninit(SchWaiter *w)
363 {
364 13208 pthread_mutex_destroy(&w->lock);
365 13208 pthread_cond_destroy(&w->cond);
366 13208 }
367
368 26071 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
369 enum QueueType type)
370 {
371 ThreadQueue *tq;
372 ObjPool *op;
373
374
1/2
✓ Branch 0 taken 26071 times.
✗ Branch 1 not taken.
26071 if (queue_size <= 0) {
375
2/2
✓ Branch 0 taken 12886 times.
✓ Branch 1 taken 13185 times.
26071 if (type == QUEUE_FRAMES)
376 12886 queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
377 else
378 13185 queue_size = DEFAULT_PACKET_THREAD_QUEUE_SIZE;
379 }
380
381
2/2
✓ Branch 0 taken 12886 times.
✓ Branch 1 taken 13185 times.
26071 if (type == QUEUE_FRAMES) {
382 // This queue length is used in the decoder code to ensure that
383 // there are enough entries in fixed-size frame pools to account
384 // for frames held in queues inside the ffmpeg utility. If this
385 // can ever dynamically change then the corresponding decode
386 // code needs to be updated as well.
387
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12886 times.
12886 av_assert0(queue_size == DEFAULT_FRAME_THREAD_QUEUE_SIZE);
388 }
389
390
2/2
✓ Branch 0 taken 13185 times.
✓ Branch 1 taken 12886 times.
26071 op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() :
391 12886 objpool_alloc_frames();
392
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26071 times.
26071 if (!op)
393 return AVERROR(ENOMEM);
394
395
2/2
✓ Branch 0 taken 13185 times.
✓ Branch 1 taken 12886 times.
26071 tq = tq_alloc(nb_streams, queue_size, op,
396 (type == QUEUE_PACKETS) ? pkt_move : frame_move);
397
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26071 times.
26071 if (!tq) {
398 objpool_free(&op);
399 return AVERROR(ENOMEM);
400 }
401
402 26071 *ptq = tq;
403 26071 return 0;
404 }
405
406 static void *task_wrapper(void *arg);
407
408 32866 static int task_start(SchTask *task)
409 {
410 int ret;
411
412 32866 av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
413
414
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 32866 times.
32866 av_assert0(!task->thread_running);
415
416 32866 ret = pthread_create(&task->thread, NULL, task_wrapper, task);
417
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 32866 times.
32866 if (ret) {
418 av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
419 strerror(ret));
420 return AVERROR(ret);
421 }
422
423 32866 task->thread_running = 1;
424 32866 return 0;
425 }
426
427 32880 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
428 SchThreadFunc func, void *func_arg)
429 {
430 32880 task->parent = sch;
431
432 32880 task->node.type = type;
433 32880 task->node.idx = idx;
434
435 32880 task->func = func;
436 32880 task->func_arg = func_arg;
437 32880 }
438
439 507003 static int64_t trailing_dts(const Scheduler *sch, int count_finished)
440 {
441 507003 int64_t min_dts = INT64_MAX;
442
443
2/2
✓ Branch 0 taken 507258 times.
✓ Branch 1 taken 493153 times.
1000411 for (unsigned i = 0; i < sch->nb_mux; i++) {
444 507258 const SchMux *mux = &sch->mux[i];
445
446
2/2
✓ Branch 0 taken 544828 times.
✓ Branch 1 taken 493408 times.
1038236 for (unsigned j = 0; j < mux->nb_streams; j++) {
447 544828 const SchMuxStream *ms = &mux->streams[j];
448
449
4/4
✓ Branch 0 taken 36956 times.
✓ Branch 1 taken 507872 times.
✓ Branch 2 taken 29860 times.
✓ Branch 3 taken 7096 times.
544828 if (ms->source_finished && !count_finished)
450 29860 continue;
451
2/2
✓ Branch 0 taken 13850 times.
✓ Branch 1 taken 501118 times.
514968 if (ms->last_dts == AV_NOPTS_VALUE)
452 13850 return AV_NOPTS_VALUE;
453
454 501118 min_dts = FFMIN(min_dts, ms->last_dts);
455 }
456 }
457
458
2/2
✓ Branch 0 taken 468457 times.
✓ Branch 1 taken 24696 times.
493153 return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
459 }
460
461 6779 void sch_free(Scheduler **psch)
462 {
463 6779 Scheduler *sch = *psch;
464
465
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6779 times.
6779 if (!sch)
466 return;
467
468 6779 sch_stop(sch, NULL);
469
470
2/2
✓ Branch 0 taken 6809 times.
✓ Branch 1 taken 6779 times.
13588 for (unsigned i = 0; i < sch->nb_demux; i++) {
471 6809 SchDemux *d = &sch->demux[i];
472
473
2/2
✓ Branch 0 taken 7026 times.
✓ Branch 1 taken 6809 times.
13835 for (unsigned j = 0; j < d->nb_streams; j++) {
474 7026 SchDemuxStream *ds = &d->streams[j];
475 7026 av_freep(&ds->dst);
476 7026 av_freep(&ds->dst_finished);
477 }
478 6809 av_freep(&d->streams);
479
480 6809 av_packet_free(&d->send_pkt);
481
482 6809 waiter_uninit(&d->waiter);
483 }
484 6779 av_freep(&sch->demux);
485
486
2/2
✓ Branch 0 taken 6780 times.
✓ Branch 1 taken 6779 times.
13559 for (unsigned i = 0; i < sch->nb_mux; i++) {
487 6780 SchMux *mux = &sch->mux[i];
488
489
2/2
✓ Branch 0 taken 7132 times.
✓ Branch 1 taken 6780 times.
13912 for (unsigned j = 0; j < mux->nb_streams; j++) {
490 7132 SchMuxStream *ms = &mux->streams[j];
491
492
1/2
✓ Branch 0 taken 7132 times.
✗ Branch 1 not taken.
7132 if (ms->pre_mux_queue.fifo) {
493 AVPacket *pkt;
494
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 7132 times.
7132 while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
495 av_packet_free(&pkt);
496 7132 av_fifo_freep2(&ms->pre_mux_queue.fifo);
497 }
498
499 7132 av_freep(&ms->sub_heartbeat_dst);
500 }
501 6780 av_freep(&mux->streams);
502
503 6780 av_packet_free(&mux->sub_heartbeat_pkt);
504
505 6780 tq_free(&mux->queue);
506 }
507 6779 av_freep(&sch->mux);
508
509
2/2
✓ Branch 0 taken 6405 times.
✓ Branch 1 taken 6779 times.
13184 for (unsigned i = 0; i < sch->nb_dec; i++) {
510 6405 SchDec *dec = &sch->dec[i];
511
512 6405 tq_free(&dec->queue);
513
514 6405 av_thread_message_queue_free(&dec->queue_end_ts);
515
516 6405 av_freep(&dec->dst);
517 6405 av_freep(&dec->dst_finished);
518
519 6405 av_frame_free(&dec->send_frame);
520 }
521 6779 av_freep(&sch->dec);
522
523
2/2
✓ Branch 0 taken 6487 times.
✓ Branch 1 taken 6779 times.
13266 for (unsigned i = 0; i < sch->nb_enc; i++) {
524 6487 SchEnc *enc = &sch->enc[i];
525
526 6487 tq_free(&enc->queue);
527
528 6487 av_packet_free(&enc->send_pkt);
529
530 6487 av_freep(&enc->dst);
531 6487 av_freep(&enc->dst_finished);
532 }
533 6779 av_freep(&sch->enc);
534
535
2/2
✓ Branch 0 taken 2776 times.
✓ Branch 1 taken 6779 times.
9555 for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
536 2776 SchSyncQueue *sq = &sch->sq_enc[i];
537 2776 sq_free(&sq->sq);
538 2776 av_frame_free(&sq->frame);
539 2776 pthread_mutex_destroy(&sq->lock);
540 2776 av_freep(&sq->enc_idx);
541 }
542 6779 av_freep(&sch->sq_enc);
543
544
2/2
✓ Branch 0 taken 6399 times.
✓ Branch 1 taken 6779 times.
13178 for (unsigned i = 0; i < sch->nb_filters; i++) {
545 6399 SchFilterGraph *fg = &sch->filters[i];
546
547 6399 tq_free(&fg->queue);
548
549 6399 av_freep(&fg->inputs);
550 6399 av_freep(&fg->outputs);
551
552 6399 waiter_uninit(&fg->waiter);
553 }
554 6779 av_freep(&sch->filters);
555
556 6779 av_freep(&sch->sdp_filename);
557
558 6779 pthread_mutex_destroy(&sch->schedule_lock);
559
560 6779 pthread_mutex_destroy(&sch->mux_ready_lock);
561
562 6779 pthread_mutex_destroy(&sch->mux_done_lock);
563 6779 pthread_cond_destroy(&sch->mux_done_cond);
564
565 6779 av_freep(psch);
566 }
567
568 static const AVClass scheduler_class = {
569 .class_name = "Scheduler",
570 .version = LIBAVUTIL_VERSION_INT,
571 };
572
573 6779 Scheduler *sch_alloc(void)
574 {
575 Scheduler *sch;
576 int ret;
577
578 6779 sch = av_mallocz(sizeof(*sch));
579
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6779 times.
6779 if (!sch)
580 return NULL;
581
582 6779 sch->class = &scheduler_class;
583 6779 sch->sdp_auto = 1;
584
585 6779 ret = pthread_mutex_init(&sch->schedule_lock, NULL);
586
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6779 times.
6779 if (ret)
587 goto fail;
588
589 6779 ret = pthread_mutex_init(&sch->mux_ready_lock, NULL);
590
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6779 times.
6779 if (ret)
591 goto fail;
592
593 6779 ret = pthread_mutex_init(&sch->mux_done_lock, NULL);
594
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6779 times.
6779 if (ret)
595 goto fail;
596
597 6779 ret = pthread_cond_init(&sch->mux_done_cond, NULL);
598
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6779 times.
6779 if (ret)
599 goto fail;
600
601 6779 return sch;
602 fail:
603 sch_free(&sch);
604 return NULL;
605 }
606
607 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
608 {
609 av_freep(&sch->sdp_filename);
610 sch->sdp_filename = av_strdup(sdp_filename);
611 return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
612 }
613
614 static const AVClass sch_mux_class = {
615 .class_name = "SchMux",
616 .version = LIBAVUTIL_VERSION_INT,
617 .parent_log_context_offset = offsetof(SchMux, task.func_arg),
618 };
619
620 6780 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
621 void *arg, int sdp_auto, unsigned thread_queue_size)
622 {
623 6780 const unsigned idx = sch->nb_mux;
624
625 SchMux *mux;
626 int ret;
627
628 6780 ret = GROW_ARRAY(sch->mux, sch->nb_mux);
629
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6780 times.
6780 if (ret < 0)
630 return ret;
631
632 6780 mux = &sch->mux[idx];
633 6780 mux->class = &sch_mux_class;
634 6780 mux->init = init;
635 6780 mux->queue_size = thread_queue_size;
636
637 6780 task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
638
639 6780 sch->sdp_auto &= sdp_auto;
640
641 6780 return idx;
642 }
643
644 7132 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
645 {
646 SchMux *mux;
647 SchMuxStream *ms;
648 unsigned stream_idx;
649 int ret;
650
651
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7132 times.
7132 av_assert0(mux_idx < sch->nb_mux);
652 7132 mux = &sch->mux[mux_idx];
653
654 7132 ret = GROW_ARRAY(mux->streams, mux->nb_streams);
655
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7132 times.
7132 if (ret < 0)
656 return ret;
657 7132 stream_idx = mux->nb_streams - 1;
658
659 7132 ms = &mux->streams[stream_idx];
660
661 7132 ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
662
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7132 times.
7132 if (!ms->pre_mux_queue.fifo)
663 return AVERROR(ENOMEM);
664
665 7132 ms->last_dts = AV_NOPTS_VALUE;
666
667 7132 return stream_idx;
668 }
669
670 static const AVClass sch_demux_class = {
671 .class_name = "SchDemux",
672 .version = LIBAVUTIL_VERSION_INT,
673 .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
674 };
675
676 6809 int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
677 {
678 6809 const unsigned idx = sch->nb_demux;
679
680 SchDemux *d;
681 int ret;
682
683 6809 ret = GROW_ARRAY(sch->demux, sch->nb_demux);
684
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6809 times.
6809 if (ret < 0)
685 return ret;
686
687 6809 d = &sch->demux[idx];
688
689 6809 task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
690
691 6809 d->class = &sch_demux_class;
692 6809 d->send_pkt = av_packet_alloc();
693
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6809 times.
6809 if (!d->send_pkt)
694 return AVERROR(ENOMEM);
695
696 6809 ret = waiter_init(&d->waiter);
697
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6809 times.
6809 if (ret < 0)
698 return ret;
699
700 6809 return idx;
701 }
702
703 7026 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
704 {
705 SchDemux *d;
706 int ret;
707
708
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7026 times.
7026 av_assert0(demux_idx < sch->nb_demux);
709 7026 d = &sch->demux[demux_idx];
710
711 7026 ret = GROW_ARRAY(d->streams, d->nb_streams);
712
1/2
✓ Branch 0 taken 7026 times.
✗ Branch 1 not taken.
7026 return ret < 0 ? ret : d->nb_streams - 1;
713 }
714
715 static const AVClass sch_dec_class = {
716 .class_name = "SchDec",
717 .version = LIBAVUTIL_VERSION_INT,
718 .parent_log_context_offset = offsetof(SchDec, task.func_arg),
719 };
720
721 6405 int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx,
722 int send_end_ts)
723 {
724 6405 const unsigned idx = sch->nb_dec;
725
726 SchDec *dec;
727 int ret;
728
729 6405 ret = GROW_ARRAY(sch->dec, sch->nb_dec);
730
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6405 times.
6405 if (ret < 0)
731 return ret;
732
733 6405 dec = &sch->dec[idx];
734
735 6405 task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
736
737 6405 dec->class = &sch_dec_class;
738 6405 dec->send_frame = av_frame_alloc();
739
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6405 times.
6405 if (!dec->send_frame)
740 return AVERROR(ENOMEM);
741
742 6405 ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
743
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6405 times.
6405 if (ret < 0)
744 return ret;
745
746
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6404 times.
6405 if (send_end_ts) {
747 1 ret = av_thread_message_queue_alloc(&dec->queue_end_ts, 1, sizeof(Timestamp));
748
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ret < 0)
749 return ret;
750 }
751
752 6405 return idx;
753 }
754
755 static const AVClass sch_enc_class = {
756 .class_name = "SchEnc",
757 .version = LIBAVUTIL_VERSION_INT,
758 .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
759 };
760
761 6487 int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
762 int (*open_cb)(void *opaque, const AVFrame *frame))
763 {
764 6487 const unsigned idx = sch->nb_enc;
765
766 SchEnc *enc;
767 int ret;
768
769 6487 ret = GROW_ARRAY(sch->enc, sch->nb_enc);
770
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6487 times.
6487 if (ret < 0)
771 return ret;
772
773 6487 enc = &sch->enc[idx];
774
775 6487 enc->class = &sch_enc_class;
776 6487 enc->open_cb = open_cb;
777 6487 enc->sq_idx[0] = -1;
778 6487 enc->sq_idx[1] = -1;
779
780 6487 task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
781
782 6487 enc->send_pkt = av_packet_alloc();
783
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6487 times.
6487 if (!enc->send_pkt)
784 return AVERROR(ENOMEM);
785
786 6487 ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
787
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6487 times.
6487 if (ret < 0)
788 return ret;
789
790 6487 return idx;
791 }
792
793 static const AVClass sch_fg_class = {
794 .class_name = "SchFilterGraph",
795 .version = LIBAVUTIL_VERSION_INT,
796 .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
797 };
798
799 6399 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
800 SchThreadFunc func, void *ctx)
801 {
802 6399 const unsigned idx = sch->nb_filters;
803
804 SchFilterGraph *fg;
805 int ret;
806
807 6399 ret = GROW_ARRAY(sch->filters, sch->nb_filters);
808
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6399 times.
6399 if (ret < 0)
809 return ret;
810 6399 fg = &sch->filters[idx];
811
812 6399 fg->class = &sch_fg_class;
813
814 6399 task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
815
816
2/2
✓ Branch 0 taken 6343 times.
✓ Branch 1 taken 56 times.
6399 if (nb_inputs) {
817 6343 fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
818
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6343 times.
6343 if (!fg->inputs)
819 return AVERROR(ENOMEM);
820 6343 fg->nb_inputs = nb_inputs;
821 }
822
823
1/2
✓ Branch 0 taken 6399 times.
✗ Branch 1 not taken.
6399 if (nb_outputs) {
824 6399 fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
825
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6399 times.
6399 if (!fg->outputs)
826 return AVERROR(ENOMEM);
827 6399 fg->nb_outputs = nb_outputs;
828 }
829
830 6399 ret = waiter_init(&fg->waiter);
831
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6399 times.
6399 if (ret < 0)
832 return ret;
833
834 6399 ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
835
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6399 times.
6399 if (ret < 0)
836 return ret;
837
838 6399 return idx;
839 }
840
841 2776 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
842 {
843 SchSyncQueue *sq;
844 int ret;
845
846 2776 ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
847
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2776 times.
2776 if (ret < 0)
848 return ret;
849 2776 sq = &sch->sq_enc[sch->nb_sq_enc - 1];
850
851 2776 sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
852
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2776 times.
2776 if (!sq->sq)
853 return AVERROR(ENOMEM);
854
855 2776 sq->frame = av_frame_alloc();
856
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2776 times.
2776 if (!sq->frame)
857 return AVERROR(ENOMEM);
858
859 2776 ret = pthread_mutex_init(&sq->lock, NULL);
860
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2776 times.
2776 if (ret)
861 return AVERROR(ret);
862
863 2776 return sq - sch->sq_enc;
864 }
865
866 2821 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
867 int limiting, uint64_t max_frames)
868 {
869 SchSyncQueue *sq;
870 SchEnc *enc;
871 int ret;
872
873
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2821 times.
2821 av_assert0(sq_idx < sch->nb_sq_enc);
874 2821 sq = &sch->sq_enc[sq_idx];
875
876
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2821 times.
2821 av_assert0(enc_idx < sch->nb_enc);
877 2821 enc = &sch->enc[enc_idx];
878
879 2821 ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
880
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2821 times.
2821 if (ret < 0)
881 return ret;
882 2821 sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
883
884 2821 ret = sq_add_stream(sq->sq, limiting);
885
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2821 times.
2821 if (ret < 0)
886 return ret;
887
888 2821 enc->sq_idx[0] = sq_idx;
889 2821 enc->sq_idx[1] = ret;
890
891
2/2
✓ Branch 0 taken 2651 times.
✓ Branch 1 taken 170 times.
2821 if (max_frames != INT64_MAX)
892 2651 sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
893
894 2821 return 0;
895 }
896
897 26437 int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
898 {
899 int ret;
900
901
4/5
✓ Branch 0 taken 7049 times.
✓ Branch 1 taken 6451 times.
✓ Branch 2 taken 6449 times.
✓ Branch 3 taken 6488 times.
✗ Branch 4 not taken.
26437 switch (src.type) {
902 7049 case SCH_NODE_TYPE_DEMUX: {
903 SchDemuxStream *ds;
904
905
2/4
✓ Branch 0 taken 7049 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7049 times.
7049 av_assert0(src.idx < sch->nb_demux &&
906 src.idx_stream < sch->demux[src.idx].nb_streams);
907 7049 ds = &sch->demux[src.idx].streams[src.idx_stream];
908
909 7049 ret = GROW_ARRAY(ds->dst, ds->nb_dst);
910
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7049 times.
7049 if (ret < 0)
911 return ret;
912
913 7049 ds->dst[ds->nb_dst - 1] = dst;
914
915 // demuxed packets go to decoding or streamcopy
916
2/3
✓ Branch 0 taken 6404 times.
✓ Branch 1 taken 645 times.
✗ Branch 2 not taken.
7049 switch (dst.type) {
917 6404 case SCH_NODE_TYPE_DEC: {
918 SchDec *dec;
919
920
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6404 times.
6404 av_assert0(dst.idx < sch->nb_dec);
921 6404 dec = &sch->dec[dst.idx];
922
923
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6404 times.
6404 av_assert0(!dec->src.type);
924 6404 dec->src = src;
925 6404 break;
926 }
927 645 case SCH_NODE_TYPE_MUX: {
928 SchMuxStream *ms;
929
930
2/4
✓ Branch 0 taken 645 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 645 times.
645 av_assert0(dst.idx < sch->nb_mux &&
931 dst.idx_stream < sch->mux[dst.idx].nb_streams);
932 645 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
933
934
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 645 times.
645 av_assert0(!ms->src.type);
935 645 ms->src = src;
936
937 645 break;
938 }
939 default: av_assert0(0);
940 }
941
942 7049 break;
943 }
944 6451 case SCH_NODE_TYPE_DEC: {
945 SchDec *dec;
946
947
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6451 times.
6451 av_assert0(src.idx < sch->nb_dec);
948 6451 dec = &sch->dec[src.idx];
949
950 6451 ret = GROW_ARRAY(dec->dst, dec->nb_dst);
951
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6451 times.
6451 if (ret < 0)
952 return ret;
953
954 6451 dec->dst[dec->nb_dst - 1] = dst;
955
956 // decoded frames go to filters or encoding
957
2/3
✓ Branch 0 taken 6413 times.
✓ Branch 1 taken 38 times.
✗ Branch 2 not taken.
6451 switch (dst.type) {
958 6413 case SCH_NODE_TYPE_FILTER_IN: {
959 SchFilterIn *fi;
960
961
2/4
✓ Branch 0 taken 6413 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6413 times.
6413 av_assert0(dst.idx < sch->nb_filters &&
962 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
963 6413 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
964
965
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6413 times.
6413 av_assert0(!fi->src.type);
966 6413 fi->src = src;
967 6413 break;
968 }
969 38 case SCH_NODE_TYPE_ENC: {
970 SchEnc *enc;
971
972
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 av_assert0(dst.idx < sch->nb_enc);
973 38 enc = &sch->enc[dst.idx];
974
975
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 av_assert0(!enc->src.type);
976 38 enc->src = src;
977 38 break;
978 }
979 default: av_assert0(0);
980 }
981
982 6451 break;
983 }
984 6449 case SCH_NODE_TYPE_FILTER_OUT: {
985 SchFilterOut *fo;
986
987
2/4
✓ Branch 0 taken 6449 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6449 times.
6449 av_assert0(src.idx < sch->nb_filters &&
988 src.idx_stream < sch->filters[src.idx].nb_outputs);
989 6449 fo = &sch->filters[src.idx].outputs[src.idx_stream];
990
991
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6449 times.
6449 av_assert0(!fo->dst.type);
992 6449 fo->dst = dst;
993
994 // filtered frames go to encoding or another filtergraph
995
1/3
✓ Branch 0 taken 6449 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
6449 switch (dst.type) {
996 6449 case SCH_NODE_TYPE_ENC: {
997 SchEnc *enc;
998
999
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6449 times.
6449 av_assert0(dst.idx < sch->nb_enc);
1000 6449 enc = &sch->enc[dst.idx];
1001
1002
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6449 times.
6449 av_assert0(!enc->src.type);
1003 6449 enc->src = src;
1004 6449 break;
1005 }
1006 case SCH_NODE_TYPE_FILTER_IN: {
1007 SchFilterIn *fi;
1008
1009 av_assert0(dst.idx < sch->nb_filters &&
1010 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1011 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1012
1013 av_assert0(!fi->src.type);
1014 fi->src = src;
1015 break;
1016 }
1017 default: av_assert0(0);
1018 }
1019
1020
1021 6449 break;
1022 }
1023 6488 case SCH_NODE_TYPE_ENC: {
1024 SchEnc *enc;
1025
1026
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6488 times.
6488 av_assert0(src.idx < sch->nb_enc);
1027 6488 enc = &sch->enc[src.idx];
1028
1029 6488 ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1030
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6488 times.
6488 if (ret < 0)
1031 return ret;
1032
1033 6488 enc->dst[enc->nb_dst - 1] = dst;
1034
1035 // encoding packets go to muxing or decoding
1036
2/3
✓ Branch 0 taken 6487 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
6488 switch (dst.type) {
1037 6487 case SCH_NODE_TYPE_MUX: {
1038 SchMuxStream *ms;
1039
1040
2/4
✓ Branch 0 taken 6487 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6487 times.
6487 av_assert0(dst.idx < sch->nb_mux &&
1041 dst.idx_stream < sch->mux[dst.idx].nb_streams);
1042 6487 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1043
1044
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6487 times.
6487 av_assert0(!ms->src.type);
1045 6487 ms->src = src;
1046
1047 6487 break;
1048 }
1049 1 case SCH_NODE_TYPE_DEC: {
1050 SchDec *dec;
1051
1052
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(dst.idx < sch->nb_dec);
1053 1 dec = &sch->dec[dst.idx];
1054
1055
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(!dec->src.type);
1056 1 dec->src = src;
1057
1058 1 break;
1059 }
1060 default: av_assert0(0);
1061 }
1062
1063 6488 break;
1064 }
1065 default: av_assert0(0);
1066 }
1067
1068 26437 return 0;
1069 }
1070
1071 6780 static int mux_task_start(SchMux *mux)
1072 {
1073 6780 int ret = 0;
1074
1075 6780 ret = task_start(&mux->task);
1076
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6780 times.
6780 if (ret < 0)
1077 return ret;
1078
1079 /* flush the pre-muxing queues */
1080
2/2
✓ Branch 0 taken 7132 times.
✓ Branch 1 taken 6780 times.
13912 for (unsigned i = 0; i < mux->nb_streams; i++) {
1081 7132 SchMuxStream *ms = &mux->streams[i];
1082 AVPacket *pkt;
1083
1084
2/2
✓ Branch 1 taken 3167 times.
✓ Branch 2 taken 7132 times.
10299 while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0) {
1085
2/2
✓ Branch 0 taken 3114 times.
✓ Branch 1 taken 53 times.
3167 if (pkt) {
1086
2/2
✓ Branch 0 taken 3097 times.
✓ Branch 1 taken 17 times.
3114 if (!ms->init_eof)
1087 3097 ret = tq_send(mux->queue, i, pkt);
1088 3114 av_packet_free(&pkt);
1089
2/2
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 3096 times.
3114 if (ret == AVERROR_EOF)
1090 18 ms->init_eof = 1;
1091
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3096 times.
3096 else if (ret < 0)
1092 return ret;
1093 } else
1094 53 tq_send_finish(mux->queue, i);
1095 }
1096 }
1097
1098 6780 atomic_store(&mux->mux_started, 1);
1099
1100 6780 return 0;
1101 }
1102
1103 int print_sdp(const char *filename);
1104
1105 6780 static int mux_init(Scheduler *sch, SchMux *mux)
1106 {
1107 int ret;
1108
1109 6780 ret = mux->init(mux->task.func_arg);
1110
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6780 times.
6780 if (ret < 0)
1111 return ret;
1112
1113 6780 sch->nb_mux_ready++;
1114
1115
2/4
✓ Branch 0 taken 6780 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6780 times.
6780 if (sch->sdp_filename || sch->sdp_auto) {
1116 if (sch->nb_mux_ready < sch->nb_mux)
1117 return 0;
1118
1119 ret = print_sdp(sch->sdp_filename);
1120 if (ret < 0) {
1121 av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1122 return ret;
1123 }
1124
1125 /* SDP is written only after all the muxers are ready, so now we
1126 * start ALL the threads */
1127 for (unsigned i = 0; i < sch->nb_mux; i++) {
1128 ret = mux_task_start(&sch->mux[i]);
1129 if (ret < 0)
1130 return ret;
1131 }
1132 } else {
1133 6780 ret = mux_task_start(mux);
1134
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6780 times.
6780 if (ret < 0)
1135 return ret;
1136 }
1137
1138 6780 return 0;
1139 }
1140
1141 7132 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1142 size_t data_threshold, int max_packets)
1143 {
1144 SchMux *mux;
1145 SchMuxStream *ms;
1146
1147
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7132 times.
7132 av_assert0(mux_idx < sch->nb_mux);
1148 7132 mux = &sch->mux[mux_idx];
1149
1150
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7132 times.
7132 av_assert0(stream_idx < mux->nb_streams);
1151 7132 ms = &mux->streams[stream_idx];
1152
1153 7132 ms->pre_mux_queue.max_packets = max_packets;
1154 7132 ms->pre_mux_queue.data_threshold = data_threshold;
1155 7132 }
1156
1157 7132 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1158 {
1159 SchMux *mux;
1160 7132 int ret = 0;
1161
1162
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7132 times.
7132 av_assert0(mux_idx < sch->nb_mux);
1163 7132 mux = &sch->mux[mux_idx];
1164
1165
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7132 times.
7132 av_assert0(stream_idx < mux->nb_streams);
1166
1167 7132 pthread_mutex_lock(&sch->mux_ready_lock);
1168
1169
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7132 times.
7132 av_assert0(mux->nb_streams_ready < mux->nb_streams);
1170
1171 // this may be called during initialization - do not start
1172 // threads before sch_start() is called
1173
2/2
✓ Branch 0 taken 6780 times.
✓ Branch 1 taken 352 times.
7132 if (++mux->nb_streams_ready == mux->nb_streams &&
1174
2/2
✓ Branch 0 taken 6332 times.
✓ Branch 1 taken 448 times.
6780 sch->state >= SCH_STATE_STARTED)
1175 6332 ret = mux_init(sch, mux);
1176
1177 7132 pthread_mutex_unlock(&sch->mux_ready_lock);
1178
1179 7132 return ret;
1180 }
1181
1182 1 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1183 unsigned dec_idx)
1184 {
1185 SchMux *mux;
1186 SchMuxStream *ms;
1187 1 int ret = 0;
1188
1189
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(mux_idx < sch->nb_mux);
1190 1 mux = &sch->mux[mux_idx];
1191
1192
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(stream_idx < mux->nb_streams);
1193 1 ms = &mux->streams[stream_idx];
1194
1195 1 ret = GROW_ARRAY(ms->sub_heartbeat_dst, ms->nb_sub_heartbeat_dst);
1196
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ret < 0)
1197 return ret;
1198
1199
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(dec_idx < sch->nb_dec);
1200 1 ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1201
1202
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!mux->sub_heartbeat_pkt) {
1203 1 mux->sub_heartbeat_pkt = av_packet_alloc();
1204
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!mux->sub_heartbeat_pkt)
1205 return AVERROR(ENOMEM);
1206 }
1207
1208 1 return 0;
1209 }
1210
1211 492368 static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
1212 {
1213 424495 while (1) {
1214 SchFilterGraph *fg;
1215
1216 // fed directly by a demuxer (i.e. not through a filtergraph)
1217
2/2
✓ Branch 0 taken 490344 times.
✓ Branch 1 taken 426519 times.
916863 if (src.type == SCH_NODE_TYPE_DEMUX) {
1218 490344 sch->demux[src.idx].waiter.choked_next = 0;
1219 490344 return;
1220 }
1221
1222
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 426519 times.
426519 av_assert0(src.type == SCH_NODE_TYPE_FILTER_OUT);
1223 426519 fg = &sch->filters[src.idx];
1224
1225 // the filtergraph contains internal sources and
1226 // requested to be scheduled directly
1227
2/2
✓ Branch 0 taken 2024 times.
✓ Branch 1 taken 424495 times.
426519 if (fg->best_input == fg->nb_inputs) {
1228 2024 fg->waiter.choked_next = 0;
1229 2024 return;
1230 }
1231
1232 424495 src = fg->inputs[fg->best_input].src_sched;
1233 }
1234 }
1235
1236 503062 static void schedule_update_locked(Scheduler *sch)
1237 {
1238 int64_t dts;
1239 503062 int have_unchoked = 0;
1240
1241 // on termination request all waiters are choked,
1242 // we are not to unchoke them
1243
2/2
✓ Branch 0 taken 2837 times.
✓ Branch 1 taken 500225 times.
503062 if (atomic_load(&sch->terminate))
1244 2837 return;
1245
1246 500225 dts = trailing_dts(sch, 0);
1247
1248 500225 atomic_store(&sch->last_dts, dts);
1249
1250 // initialize our internal state
1251
2/2
✓ Branch 0 taken 1000450 times.
✓ Branch 1 taken 500225 times.
1500675 for (unsigned type = 0; type < 2; type++)
1252
4/4
✓ Branch 0 taken 949710 times.
✓ Branch 1 taken 1003842 times.
✓ Branch 2 taken 953102 times.
✓ Branch 3 taken 1000450 times.
1953552 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1253
2/2
✓ Branch 0 taken 449485 times.
✓ Branch 1 taken 503617 times.
953102 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1254 953102 w->choked_prev = atomic_load(&w->choked);
1255 953102 w->choked_next = 1;
1256 }
1257
1258 // figure out the sources that are allowed to proceed
1259
2/2
✓ Branch 0 taken 500491 times.
✓ Branch 1 taken 500225 times.
1000716 for (unsigned i = 0; i < sch->nb_mux; i++) {
1260 500491 SchMux *mux = &sch->mux[i];
1261
1262
2/2
✓ Branch 0 taken 543279 times.
✓ Branch 1 taken 500491 times.
1043770 for (unsigned j = 0; j < mux->nb_streams; j++) {
1263 543279 SchMuxStream *ms = &mux->streams[j];
1264
1265 // unblock sources for output streams that are not finished
1266 // and not too far ahead of the trailing stream
1267
2/2
✓ Branch 0 taken 30088 times.
✓ Branch 1 taken 513191 times.
543279 if (ms->source_finished)
1268 30088 continue;
1269
4/4
✓ Branch 0 taken 21635 times.
✓ Branch 1 taken 491556 times.
✓ Branch 2 taken 5262 times.
✓ Branch 3 taken 16373 times.
513191 if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1270 5262 continue;
1271
4/4
✓ Branch 0 taken 491556 times.
✓ Branch 1 taken 16373 times.
✓ Branch 2 taken 15561 times.
✓ Branch 3 taken 475995 times.
507929 if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1272 15561 continue;
1273
1274 // resolve the source to unchoke
1275 492368 unchoke_for_stream(sch, ms->src_sched);
1276 492368 have_unchoked = 1;
1277 }
1278 }
1279
1280 // make sure to unchoke at least one source, if still available
1281
4/4
✓ Branch 0 taken 55705 times.
✓ Branch 1 taken 486938 times.
✓ Branch 2 taken 42418 times.
✓ Branch 3 taken 13287 times.
542643 for (unsigned type = 0; !have_unchoked && type < 2; type++)
1282
4/4
✓ Branch 0 taken 30449 times.
✓ Branch 1 taken 42635 times.
✓ Branch 2 taken 42075 times.
✓ Branch 3 taken 31009 times.
73084 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1283
2/2
✓ Branch 0 taken 17162 times.
✓ Branch 1 taken 24913 times.
42075 int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1284
2/2
✓ Branch 0 taken 17162 times.
✓ Branch 1 taken 24913 times.
42075 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1285
2/2
✓ Branch 0 taken 11409 times.
✓ Branch 1 taken 30666 times.
42075 if (!exited) {
1286 11409 w->choked_next = 0;
1287 11409 have_unchoked = 1;
1288 11409 break;
1289 }
1290 }
1291
1292
1293
2/2
✓ Branch 0 taken 1000450 times.
✓ Branch 1 taken 500225 times.
1500675 for (unsigned type = 0; type < 2; type++)
1294
4/4
✓ Branch 0 taken 949710 times.
✓ Branch 1 taken 1003842 times.
✓ Branch 2 taken 953102 times.
✓ Branch 3 taken 1000450 times.
1953552 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1295
2/2
✓ Branch 0 taken 449485 times.
✓ Branch 1 taken 503617 times.
953102 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1296
2/2
✓ Branch 0 taken 17457 times.
✓ Branch 1 taken 935645 times.
953102 if (w->choked_prev != w->choked_next)
1297 17457 waiter_set(w, w->choked_next);
1298 }
1299
1300 }
1301
1302 enum {
1303 CYCLE_NODE_NEW = 0,
1304 CYCLE_NODE_STARTED,
1305 CYCLE_NODE_DONE,
1306 };
1307
1308 static int
1309 6399 check_acyclic_for_output(const Scheduler *sch, SchedulerNode src,
1310 uint8_t *filters_visited, SchedulerNode *filters_stack)
1311 {
1312 6399 unsigned nb_filters_stack = 0;
1313
1314 6399 memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1315
1316 6415 while (1) {
1317 12814 const SchFilterGraph *fg = &sch->filters[src.idx];
1318
1319 12814 filters_visited[src.idx] = CYCLE_NODE_STARTED;
1320
1321 // descend into every input, depth first
1322
2/2
✓ Branch 0 taken 6414 times.
✓ Branch 1 taken 6400 times.
12814 if (src.idx_stream < fg->nb_inputs) {
1323 6414 const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1324
1325 // connected to demuxer, no cycles possible
1326
2/2
✓ Branch 0 taken 6413 times.
✓ Branch 1 taken 1 times.
6414 if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX)
1327 6414 continue;
1328
1329 // otherwise connected to another filtergraph
1330
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(fi->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
1331
1332 // found a cycle
1333
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED)
1334 return AVERROR(EINVAL);
1335
1336 // place current position on stack and descend
1337
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(nb_filters_stack < sch->nb_filters);
1338 1 filters_stack[nb_filters_stack++] = src;
1339 1 src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 };
1340 1 continue;
1341 }
1342
1343 6400 filters_visited[src.idx] = CYCLE_NODE_DONE;
1344
1345 // previous search finished,
1346
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6399 times.
6400 if (nb_filters_stack) {
1347 1 src = filters_stack[--nb_filters_stack];
1348 1 continue;
1349 }
1350 6399 return 0;
1351 }
1352 }
1353
1354 6778 static int check_acyclic(Scheduler *sch)
1355 {
1356 6778 uint8_t *filters_visited = NULL;
1357 6778 SchedulerNode *filters_stack = NULL;
1358
1359 6778 int ret = 0;
1360
1361
2/2
✓ Branch 0 taken 483 times.
✓ Branch 1 taken 6295 times.
6778 if (!sch->nb_filters)
1362 483 return 0;
1363
1364 6295 filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1365
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6295 times.
6295 if (!filters_visited)
1366 return AVERROR(ENOMEM);
1367
1368 6295 filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1369
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6295 times.
6295 if (!filters_stack) {
1370 ret = AVERROR(ENOMEM);
1371 goto fail;
1372 }
1373
1374 // trace the transcoding graph upstream from every filtegraph
1375
2/2
✓ Branch 0 taken 6399 times.
✓ Branch 1 taken 6295 times.
12694 for (unsigned i = 0; i < sch->nb_filters; i++) {
1376 6399 ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1377 filters_visited, filters_stack);
1378
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6399 times.
6399 if (ret < 0) {
1379 av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1380 goto fail;
1381 }
1382 }
1383
1384 6295 fail:
1385 6295 av_freep(&filters_visited);
1386 6295 av_freep(&filters_stack);
1387 6295 return ret;
1388 }
1389
1390 6778 static int start_prepare(Scheduler *sch)
1391 {
1392 int ret;
1393
1394
2/2
✓ Branch 0 taken 6809 times.
✓ Branch 1 taken 6778 times.
13587 for (unsigned i = 0; i < sch->nb_demux; i++) {
1395 6809 SchDemux *d = &sch->demux[i];
1396
1397
2/2
✓ Branch 0 taken 7026 times.
✓ Branch 1 taken 6809 times.
13835 for (unsigned j = 0; j < d->nb_streams; j++) {
1398 7026 SchDemuxStream *ds = &d->streams[j];
1399
1400
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7026 times.
7026 if (!ds->nb_dst) {
1401 av_log(d, AV_LOG_ERROR,
1402 "Demuxer stream %u not connected to any sink\n", j);
1403 return AVERROR(EINVAL);
1404 }
1405
1406 7026 ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1407
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7026 times.
7026 if (!ds->dst_finished)
1408 return AVERROR(ENOMEM);
1409 }
1410 }
1411
1412
2/2
✓ Branch 0 taken 6405 times.
✓ Branch 1 taken 6778 times.
13183 for (unsigned i = 0; i < sch->nb_dec; i++) {
1413 6405 SchDec *dec = &sch->dec[i];
1414
1415
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6405 times.
6405 if (!dec->src.type) {
1416 av_log(dec, AV_LOG_ERROR,
1417 "Decoder not connected to a source\n");
1418 return AVERROR(EINVAL);
1419 }
1420
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6405 times.
6405 if (!dec->nb_dst) {
1421 av_log(dec, AV_LOG_ERROR,
1422 "Decoder not connected to any sink\n");
1423 return AVERROR(EINVAL);
1424 }
1425
1426 6405 dec->dst_finished = av_calloc(dec->nb_dst, sizeof(*dec->dst_finished));
1427
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6405 times.
6405 if (!dec->dst_finished)
1428 return AVERROR(ENOMEM);
1429 }
1430
1431
2/2
✓ Branch 0 taken 6487 times.
✓ Branch 1 taken 6778 times.
13265 for (unsigned i = 0; i < sch->nb_enc; i++) {
1432 6487 SchEnc *enc = &sch->enc[i];
1433
1434
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6487 times.
6487 if (!enc->src.type) {
1435 av_log(enc, AV_LOG_ERROR,
1436 "Encoder not connected to a source\n");
1437 return AVERROR(EINVAL);
1438 }
1439
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6487 times.
6487 if (!enc->nb_dst) {
1440 av_log(enc, AV_LOG_ERROR,
1441 "Encoder not connected to any sink\n");
1442 return AVERROR(EINVAL);
1443 }
1444
1445 6487 enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1446
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6487 times.
6487 if (!enc->dst_finished)
1447 return AVERROR(ENOMEM);
1448 }
1449
1450
2/2
✓ Branch 0 taken 6780 times.
✓ Branch 1 taken 6778 times.
13558 for (unsigned i = 0; i < sch->nb_mux; i++) {
1451 6780 SchMux *mux = &sch->mux[i];
1452
1453
2/2
✓ Branch 0 taken 7132 times.
✓ Branch 1 taken 6780 times.
13912 for (unsigned j = 0; j < mux->nb_streams; j++) {
1454 7132 SchMuxStream *ms = &mux->streams[j];
1455
1456
2/3
✓ Branch 0 taken 6487 times.
✓ Branch 1 taken 645 times.
✗ Branch 2 not taken.
7132 switch (ms->src.type) {
1457 6487 case SCH_NODE_TYPE_ENC: {
1458 6487 SchEnc *enc = &sch->enc[ms->src.idx];
1459
2/2
✓ Branch 0 taken 38 times.
✓ Branch 1 taken 6449 times.
6487 if (enc->src.type == SCH_NODE_TYPE_DEC) {
1460 38 ms->src_sched = sch->dec[enc->src.idx].src;
1461
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 av_assert0(ms->src_sched.type == SCH_NODE_TYPE_DEMUX);
1462 } else {
1463 6449 ms->src_sched = enc->src;
1464
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6449 times.
6449 av_assert0(ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
1465 }
1466 6487 break;
1467 }
1468 645 case SCH_NODE_TYPE_DEMUX:
1469 645 ms->src_sched = ms->src;
1470 645 break;
1471 default:
1472 av_log(mux, AV_LOG_ERROR,
1473 "Muxer stream #%u not connected to a source\n", j);
1474 return AVERROR(EINVAL);
1475 }
1476 }
1477
1478 6780 ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1479 QUEUE_PACKETS);
1480
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6780 times.
6780 if (ret < 0)
1481 return ret;
1482 }
1483
1484
2/2
✓ Branch 0 taken 6399 times.
✓ Branch 1 taken 6778 times.
13177 for (unsigned i = 0; i < sch->nb_filters; i++) {
1485 6399 SchFilterGraph *fg = &sch->filters[i];
1486
1487
2/2
✓ Branch 0 taken 6413 times.
✓ Branch 1 taken 6399 times.
12812 for (unsigned j = 0; j < fg->nb_inputs; j++) {
1488 6413 SchFilterIn *fi = &fg->inputs[j];
1489 SchDec *dec;
1490
1491
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6413 times.
6413 if (!fi->src.type) {
1492 av_log(fg, AV_LOG_ERROR,
1493 "Filtergraph input %u not connected to a source\n", j);
1494 return AVERROR(EINVAL);
1495 }
1496
1497
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6413 times.
6413 if (fi->src.type == SCH_NODE_TYPE_FILTER_OUT)
1498 fi->src_sched = fi->src;
1499 else {
1500
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6413 times.
6413 av_assert0(fi->src.type == SCH_NODE_TYPE_DEC);
1501 6413 dec = &sch->dec[fi->src.idx];
1502
1503
2/3
✓ Branch 0 taken 6412 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
6413 switch (dec->src.type) {
1504 6412 case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break;
1505 1 case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break;
1506 default: av_assert0(0);
1507 }
1508 }
1509 }
1510
1511
2/2
✓ Branch 0 taken 6449 times.
✓ Branch 1 taken 6399 times.
12848 for (unsigned j = 0; j < fg->nb_outputs; j++) {
1512 6449 SchFilterOut *fo = &fg->outputs[j];
1513
1514
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6449 times.
6449 if (!fo->dst.type) {
1515 av_log(fg, AV_LOG_ERROR,
1516 "Filtergraph %u output %u not connected to a sink\n", i, j);
1517 return AVERROR(EINVAL);
1518 }
1519 }
1520 }
1521
1522 // Check that the transcoding graph has no cycles.
1523 6778 ret = check_acyclic(sch);
1524
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6778 times.
6778 if (ret < 0)
1525 return ret;
1526
1527 6778 return 0;
1528 }
1529
1530 6778 int sch_start(Scheduler *sch)
1531 {
1532 int ret;
1533
1534 6778 ret = start_prepare(sch);
1535
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6778 times.
6778 if (ret < 0)
1536 return ret;
1537
1538
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6778 times.
6778 av_assert0(sch->state == SCH_STATE_UNINIT);
1539 6778 sch->state = SCH_STATE_STARTED;
1540
1541
2/2
✓ Branch 0 taken 6780 times.
✓ Branch 1 taken 6778 times.
13558 for (unsigned i = 0; i < sch->nb_mux; i++) {
1542 6780 SchMux *mux = &sch->mux[i];
1543
1544
2/2
✓ Branch 0 taken 448 times.
✓ Branch 1 taken 6332 times.
6780 if (mux->nb_streams_ready == mux->nb_streams) {
1545 448 ret = mux_init(sch, mux);
1546
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 448 times.
448 if (ret < 0)
1547 goto fail;
1548 }
1549 }
1550
1551
2/2
✓ Branch 0 taken 6487 times.
✓ Branch 1 taken 6778 times.
13265 for (unsigned i = 0; i < sch->nb_enc; i++) {
1552 6487 SchEnc *enc = &sch->enc[i];
1553
1554 6487 ret = task_start(&enc->task);
1555
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6487 times.
6487 if (ret < 0)
1556 goto fail;
1557 }
1558
1559
2/2
✓ Branch 0 taken 6399 times.
✓ Branch 1 taken 6778 times.
13177 for (unsigned i = 0; i < sch->nb_filters; i++) {
1560 6399 SchFilterGraph *fg = &sch->filters[i];
1561
1562 6399 ret = task_start(&fg->task);
1563
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6399 times.
6399 if (ret < 0)
1564 goto fail;
1565 }
1566
1567
2/2
✓ Branch 0 taken 6405 times.
✓ Branch 1 taken 6778 times.
13183 for (unsigned i = 0; i < sch->nb_dec; i++) {
1568 6405 SchDec *dec = &sch->dec[i];
1569
1570 6405 ret = task_start(&dec->task);
1571
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6405 times.
6405 if (ret < 0)
1572 goto fail;
1573 }
1574
1575
2/2
✓ Branch 0 taken 6809 times.
✓ Branch 1 taken 6778 times.
13587 for (unsigned i = 0; i < sch->nb_demux; i++) {
1576 6809 SchDemux *d = &sch->demux[i];
1577
1578
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 6795 times.
6809 if (!d->nb_streams)
1579 14 continue;
1580
1581 6795 ret = task_start(&d->task);
1582
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6795 times.
6795 if (ret < 0)
1583 goto fail;
1584 }
1585
1586 6778 pthread_mutex_lock(&sch->schedule_lock);
1587 6778 schedule_update_locked(sch);
1588 6778 pthread_mutex_unlock(&sch->schedule_lock);
1589
1590 6778 return 0;
1591 fail:
1592 sch_stop(sch, NULL);
1593 return ret;
1594 }
1595
1596 20868 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1597 {
1598 int ret, err;
1599
1600 // convert delay to absolute timestamp
1601 20868 timeout_us += av_gettime();
1602
1603 20868 pthread_mutex_lock(&sch->mux_done_lock);
1604
1605
2/2
✓ Branch 0 taken 20867 times.
✓ Branch 1 taken 1 times.
20868 if (sch->nb_mux_done < sch->nb_mux) {
1606 20867 struct timespec tv = { .tv_sec = timeout_us / 1000000,
1607 20867 .tv_nsec = (timeout_us % 1000000) * 1000 };
1608 20867 pthread_cond_timedwait(&sch->mux_done_cond, &sch->mux_done_lock, &tv);
1609 }
1610
1611 20868 ret = sch->nb_mux_done == sch->nb_mux;
1612
1613 20868 pthread_mutex_unlock(&sch->mux_done_lock);
1614
1615 20868 *transcode_ts = atomic_load(&sch->last_dts);
1616
1617 // abort transcoding if any task failed
1618 20868 err = atomic_load(&sch->task_failed);
1619
1620
3/4
✓ Branch 0 taken 14090 times.
✓ Branch 1 taken 6778 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 14090 times.
20868 return ret || err;
1621 }
1622
1623 6449 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1624 {
1625 int ret;
1626
1627 6449 ret = enc->open_cb(enc->task.func_arg, frame);
1628
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6449 times.
6449 if (ret < 0)
1629 return ret;
1630
1631 // ret>0 signals audio frame size, which means sync queue must
1632 // have been enabled during encoder creation
1633
2/2
✓ Branch 0 taken 156 times.
✓ Branch 1 taken 6293 times.
6449 if (ret > 0) {
1634 SchSyncQueue *sq;
1635
1636
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 156 times.
156 av_assert0(enc->sq_idx[0] >= 0);
1637 156 sq = &sch->sq_enc[enc->sq_idx[0]];
1638
1639 156 pthread_mutex_lock(&sq->lock);
1640
1641 156 sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1642
1643 156 pthread_mutex_unlock(&sq->lock);
1644 }
1645
1646 6449 return 0;
1647 }
1648
1649 426167 static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1650 {
1651 int ret;
1652
1653
2/2
✓ Branch 0 taken 13103 times.
✓ Branch 1 taken 413064 times.
426167 if (!frame) {
1654 13103 tq_send_finish(enc->queue, 0);
1655 13103 return 0;
1656 }
1657
1658
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 413064 times.
413064 if (enc->in_finished)
1659 return AVERROR_EOF;
1660
1661 413064 ret = tq_send(enc->queue, 0, frame);
1662
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 413063 times.
413064 if (ret < 0)
1663 1 enc->in_finished = 1;
1664
1665 413064 return ret;
1666 }
1667
1668 33974 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1669 {
1670 33974 SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1671 33974 int ret = 0;
1672
1673 // inform the scheduling code that no more input will arrive along this path;
1674 // this is necessary because the sync queue may not send an EOF downstream
1675 // until other streams finish
1676 // TODO: consider a cleaner way of passing this information through
1677 // the pipeline
1678
2/2
✓ Branch 0 taken 2990 times.
✓ Branch 1 taken 30984 times.
33974 if (!frame) {
1679
2/2
✓ Branch 0 taken 2990 times.
✓ Branch 1 taken 2990 times.
5980 for (unsigned i = 0; i < enc->nb_dst; i++) {
1680 SchMux *mux;
1681 SchMuxStream *ms;
1682
1683
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2990 times.
2990 if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1684 continue;
1685
1686 2990 mux = &sch->mux[enc->dst[i].idx];
1687 2990 ms = &mux->streams[enc->dst[i].idx_stream];
1688
1689 2990 pthread_mutex_lock(&sch->schedule_lock);
1690
1691 2990 ms->source_finished = 1;
1692 2990 schedule_update_locked(sch);
1693
1694 2990 pthread_mutex_unlock(&sch->schedule_lock);
1695 }
1696 }
1697
1698 33974 pthread_mutex_lock(&sq->lock);
1699
1700 33974 ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1701
2/2
✓ Branch 0 taken 33972 times.
✓ Branch 1 taken 2 times.
33974 if (ret < 0)
1702 2 goto finish;
1703
1704 81839 while (1) {
1705 SchEnc *enc;
1706
1707 // TODO: the SQ API should be extended to allow returning EOF
1708 // for individual streams
1709 115811 ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1710
2/2
✓ Branch 0 taken 33972 times.
✓ Branch 1 taken 81839 times.
115811 if (ret < 0) {
1711
2/2
✓ Branch 0 taken 5580 times.
✓ Branch 1 taken 28392 times.
33972 ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1712 33972 break;
1713 }
1714
1715 81839 enc = &sch->enc[sq->enc_idx[ret]];
1716 81839 ret = send_to_enc_thread(sch, enc, sq->frame);
1717
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 81839 times.
81839 if (ret < 0) {
1718 av_frame_unref(sq->frame);
1719 if (ret != AVERROR_EOF)
1720 break;
1721
1722 sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1723 continue;
1724 }
1725 }
1726
1727
2/2
✓ Branch 0 taken 28392 times.
✓ Branch 1 taken 5580 times.
33972 if (ret < 0) {
1728 // close all encoders fed from this sync queue
1729
2/2
✓ Branch 0 taken 5810 times.
✓ Branch 1 taken 5580 times.
11390 for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1730 5810 int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1731
1732 // if the sync queue error is EOF and closing the encoder
1733 // produces a more serious error, make sure to pick the latter
1734
2/4
✓ Branch 0 taken 5810 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 5810 times.
✗ Branch 3 not taken.
5810 ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1735 }
1736 }
1737
1738 33972 finish:
1739 33974 pthread_mutex_unlock(&sq->lock);
1740
1741 33974 return ret;
1742 }
1743
1744 372495 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1745 {
1746
6/6
✓ Branch 0 taken 371578 times.
✓ Branch 1 taken 917 times.
✓ Branch 2 taken 361333 times.
✓ Branch 3 taken 10245 times.
✓ Branch 4 taken 6449 times.
✓ Branch 5 taken 354884 times.
372495 if (enc->open_cb && frame && !enc->opened) {
1747 6449 int ret = enc_open(sch, enc, frame);
1748
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6449 times.
6449 if (ret < 0)
1749 return ret;
1750 6449 enc->opened = 1;
1751
1752 // discard empty frames that only carry encoder init parameters
1753
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 6446 times.
6449 if (!frame->buf[0]) {
1754 3 av_frame_unref(frame);
1755 3 return 0;
1756 }
1757 }
1758
1759 372492 return (enc->sq_idx[0] >= 0) ?
1760
2/2
✓ Branch 0 taken 33974 times.
✓ Branch 1 taken 338518 times.
711010 send_to_enc_sq (sch, enc, frame) :
1761 338518 send_to_enc_thread(sch, enc, frame);
1762 }
1763
1764 3167 static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
1765 {
1766 3167 PreMuxQueue *q = &ms->pre_mux_queue;
1767 3167 AVPacket *tmp_pkt = NULL;
1768 int ret;
1769
1770
2/2
✓ Branch 1 taken 143 times.
✓ Branch 2 taken 3024 times.
3167 if (!av_fifo_can_write(q->fifo)) {
1771 143 size_t packets = av_fifo_can_read(q->fifo);
1772
1/2
✓ Branch 0 taken 143 times.
✗ Branch 1 not taken.
143 size_t pkt_size = pkt ? pkt->size : 0;
1773 143 int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1774
2/2
✓ Branch 0 taken 138 times.
✓ Branch 1 taken 5 times.
143 size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1775 143 size_t new_size = FFMIN(2 * packets, max_packets);
1776
1777
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 143 times.
143 if (new_size <= packets) {
1778 av_log(mux, AV_LOG_ERROR,
1779 "Too many packets buffered for output stream.\n");
1780 return AVERROR(ENOSPC);
1781 }
1782 143 ret = av_fifo_grow2(q->fifo, new_size - packets);
1783
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 143 times.
143 if (ret < 0)
1784 return ret;
1785 }
1786
1787
2/2
✓ Branch 0 taken 3114 times.
✓ Branch 1 taken 53 times.
3167 if (pkt) {
1788 3114 tmp_pkt = av_packet_alloc();
1789
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3114 times.
3114 if (!tmp_pkt)
1790 return AVERROR(ENOMEM);
1791
1792 3114 av_packet_move_ref(tmp_pkt, pkt);
1793 3114 q->data_size += tmp_pkt->size;
1794 }
1795 3167 av_fifo_write(q->fifo, &tmp_pkt, 1);
1796
1797 3167 return 0;
1798 }
1799
1800 481464 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1801 AVPacket *pkt)
1802 {
1803 481464 SchMuxStream *ms = &mux->streams[stream_idx];
1804
2/2
✓ Branch 0 taken 465204 times.
✓ Branch 1 taken 9128 times.
474332 int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1805
2/2
✓ Branch 0 taken 474332 times.
✓ Branch 1 taken 7132 times.
955796 av_rescale_q(pkt->dts + pkt->duration, pkt->time_base, AV_TIME_BASE_Q) :
1806 AV_NOPTS_VALUE;
1807
1808 // queue the packet if the muxer cannot be started yet
1809
2/2
✓ Branch 0 taken 3203 times.
✓ Branch 1 taken 478261 times.
481464 if (!atomic_load(&mux->mux_started)) {
1810 3203 int queued = 0;
1811
1812 // the muxer could have started between the above atomic check and
1813 // locking the mutex, then this block falls through to normal send path
1814 3203 pthread_mutex_lock(&sch->mux_ready_lock);
1815
1816
2/2
✓ Branch 0 taken 3167 times.
✓ Branch 1 taken 36 times.
3203 if (!atomic_load(&mux->mux_started)) {
1817 3167 int ret = mux_queue_packet(mux, ms, pkt);
1818
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3167 times.
3167 queued = ret < 0 ? ret : 1;
1819 }
1820
1821 3203 pthread_mutex_unlock(&sch->mux_ready_lock);
1822
1823
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3203 times.
3203 if (queued < 0)
1824 return queued;
1825
2/2
✓ Branch 0 taken 3167 times.
✓ Branch 1 taken 36 times.
3203 else if (queued)
1826 3167 goto update_schedule;
1827 }
1828
1829
2/2
✓ Branch 0 taken 471218 times.
✓ Branch 1 taken 7079 times.
478297 if (pkt) {
1830 int ret;
1831
1832
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 471218 times.
471218 if (ms->init_eof)
1833 return AVERROR_EOF;
1834
1835 471218 ret = tq_send(mux->queue, stream_idx, pkt);
1836
2/2
✓ Branch 0 taken 47 times.
✓ Branch 1 taken 471171 times.
471218 if (ret < 0)
1837 47 return ret;
1838 } else
1839 7079 tq_send_finish(mux->queue, stream_idx);
1840
1841 481417 update_schedule:
1842 // TODO: use atomics to check whether this changes trailing dts
1843 // to avoid locking unnecesarily
1844
4/4
✓ Branch 0 taken 16260 times.
✓ Branch 1 taken 465157 times.
✓ Branch 2 taken 7132 times.
✓ Branch 3 taken 9128 times.
481417 if (dts != AV_NOPTS_VALUE || !pkt) {
1845 472289 pthread_mutex_lock(&sch->schedule_lock);
1846
1847
2/2
✓ Branch 0 taken 465157 times.
✓ Branch 1 taken 7132 times.
472289 if (pkt) ms->last_dts = dts;
1848 7132 else ms->source_finished = 1;
1849
1850 472289 schedule_update_locked(sch);
1851
1852 472289 pthread_mutex_unlock(&sch->schedule_lock);
1853 }
1854
1855 481417 return 0;
1856 }
1857
1858 static int
1859 467055 demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst,
1860 uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
1861 {
1862 int ret;
1863
1864
2/2
✓ Branch 0 taken 3122 times.
✓ Branch 1 taken 463933 times.
467055 if (*dst_finished)
1865 3122 return AVERROR_EOF;
1866
1867
4/4
✓ Branch 0 taken 460006 times.
✓ Branch 1 taken 3927 times.
✓ Branch 2 taken 67448 times.
✓ Branch 3 taken 392558 times.
463933 if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
1868
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 67446 times.
67448 (flags & DEMUX_SEND_STREAMCOPY_EOF)) {
1869 2 av_packet_unref(pkt);
1870 2 pkt = NULL;
1871 }
1872
1873
2/2
✓ Branch 0 taken 3929 times.
✓ Branch 1 taken 460004 times.
463933 if (!pkt)
1874 3929 goto finish;
1875
1876 920008 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
1877
2/2
✓ Branch 0 taken 67446 times.
✓ Branch 1 taken 392558 times.
460004 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
1878 392558 tq_send(sch->dec[dst.idx].queue, 0, pkt);
1879
2/2
✓ Branch 0 taken 3120 times.
✓ Branch 1 taken 456884 times.
460004 if (ret == AVERROR_EOF)
1880 3120 goto finish;
1881
1882 456884 return ret;
1883
1884 7049 finish:
1885
2/2
✓ Branch 0 taken 645 times.
✓ Branch 1 taken 6404 times.
7049 if (dst.type == SCH_NODE_TYPE_MUX)
1886 645 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
1887 else
1888 6404 tq_send_finish(sch->dec[dst.idx].queue, 0);
1889
1890 7049 *dst_finished = 1;
1891 7049 return AVERROR_EOF;
1892 }
1893
1894 466594 static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds,
1895 AVPacket *pkt, unsigned flags)
1896 {
1897 466594 unsigned nb_done = 0;
1898
1899
2/2
✓ Branch 0 taken 467055 times.
✓ Branch 1 taken 466594 times.
933649 for (unsigned i = 0; i < ds->nb_dst; i++) {
1900 467055 AVPacket *to_send = pkt;
1901 467055 uint8_t *finished = &ds->dst_finished[i];
1902
1903 int ret;
1904
1905 // sending a packet consumes it, so make a temporary reference if needed
1906
4/4
✓ Branch 0 taken 460006 times.
✓ Branch 1 taken 7049 times.
✓ Branch 2 taken 438 times.
✓ Branch 3 taken 459568 times.
467055 if (pkt && i < ds->nb_dst - 1) {
1907 438 to_send = d->send_pkt;
1908
1909 438 ret = av_packet_ref(to_send, pkt);
1910
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 438 times.
438 if (ret < 0)
1911 return ret;
1912 }
1913
1914 467055 ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
1915
2/2
✓ Branch 0 taken 460006 times.
✓ Branch 1 taken 7049 times.
467055 if (to_send)
1916 460006 av_packet_unref(to_send);
1917
2/2
✓ Branch 0 taken 10171 times.
✓ Branch 1 taken 456884 times.
467055 if (ret == AVERROR_EOF)
1918 10171 nb_done++;
1919
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 456884 times.
456884 else if (ret < 0)
1920 return ret;
1921 }
1922
1923
2/2
✓ Branch 0 taken 10148 times.
✓ Branch 1 taken 456446 times.
466594 return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
1924 }
1925
1926 11 static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
1927 {
1928 11 Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
1929
1930
3/6
✓ Branch 0 taken 11 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 11 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 11 times.
11 av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
1931
1932
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 11 times.
25 for (unsigned i = 0; i < d->nb_streams; i++) {
1933 14 SchDemuxStream *ds = &d->streams[i];
1934
1935
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 14 times.
28 for (unsigned j = 0; j < ds->nb_dst; j++) {
1936 14 const SchedulerNode *dst = &ds->dst[j];
1937 SchDec *dec;
1938 int ret;
1939
1940
3/4
✓ Branch 0 taken 14 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
✓ Branch 3 taken 6 times.
14 if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
1941 8 continue;
1942
1943 6 dec = &sch->dec[dst->idx];
1944
1945 6 ret = tq_send(dec->queue, 0, pkt);
1946
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0)
1947 return ret;
1948
1949
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 3 times.
6 if (dec->queue_end_ts) {
1950 Timestamp ts;
1951 3 ret = av_thread_message_queue_recv(dec->queue_end_ts, &ts, 0);
1952
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret < 0)
1953 return ret;
1954
1955
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (max_end_ts.ts == AV_NOPTS_VALUE ||
1956 (ts.ts != AV_NOPTS_VALUE &&
1957 av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
1958 3 max_end_ts = ts;
1959
1960 }
1961 }
1962 }
1963
1964 11 pkt->pts = max_end_ts.ts;
1965 11 pkt->time_base = max_end_ts.tb;
1966
1967 11 return 0;
1968 }
1969
1970 459629 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
1971 unsigned flags)
1972 {
1973 SchDemux *d;
1974 int terminate;
1975
1976
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 459629 times.
459629 av_assert0(demux_idx < sch->nb_demux);
1977 459629 d = &sch->demux[demux_idx];
1978
1979 459629 terminate = waiter_wait(sch, &d->waiter);
1980
2/2
✓ Branch 0 taken 50 times.
✓ Branch 1 taken 459579 times.
459629 if (terminate)
1981 50 return AVERROR_EXIT;
1982
1983 // flush the downstreams after seek
1984
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 459568 times.
459579 if (pkt->stream_index == -1)
1985 11 return demux_flush(sch, d, pkt);
1986
1987
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 459568 times.
459568 av_assert0(pkt->stream_index < d->nb_streams);
1988
1989 459568 return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
1990 }
1991
1992 6809 static int demux_done(Scheduler *sch, unsigned demux_idx)
1993 {
1994 6809 SchDemux *d = &sch->demux[demux_idx];
1995 6809 int ret = 0;
1996
1997
2/2
✓ Branch 0 taken 7026 times.
✓ Branch 1 taken 6809 times.
13835 for (unsigned i = 0; i < d->nb_streams; i++) {
1998 7026 int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
1999
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7026 times.
7026 if (err != AVERROR_EOF)
2000 ret = err_merge(ret, err);
2001 }
2002
2003 6809 pthread_mutex_lock(&sch->schedule_lock);
2004
2005 6809 d->task_exited = 1;
2006
2007 6809 schedule_update_locked(sch);
2008
2009 6809 pthread_mutex_unlock(&sch->schedule_lock);
2010
2011 6809 return ret;
2012 }
2013
2014 487887 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2015 {
2016 SchMux *mux;
2017 int ret, stream_idx;
2018
2019
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 487887 times.
487887 av_assert0(mux_idx < sch->nb_mux);
2020 487887 mux = &sch->mux[mux_idx];
2021
2022 487887 ret = tq_receive(mux->queue, &stream_idx, pkt);
2023 487887 pkt->stream_index = stream_idx;
2024 487887 return ret;
2025 }
2026
2027 198 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2028 {
2029 SchMux *mux;
2030
2031
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 198 times.
198 av_assert0(mux_idx < sch->nb_mux);
2032 198 mux = &sch->mux[mux_idx];
2033
2034
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 198 times.
198 av_assert0(stream_idx < mux->nb_streams);
2035 198 tq_receive_finish(mux->queue, stream_idx);
2036
2037 198 pthread_mutex_lock(&sch->schedule_lock);
2038 198 mux->streams[stream_idx].source_finished = 1;
2039
2040 198 schedule_update_locked(sch);
2041
2042 198 pthread_mutex_unlock(&sch->schedule_lock);
2043 198 }
2044
2045 437605 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2046 const AVPacket *pkt)
2047 {
2048 SchMux *mux;
2049 SchMuxStream *ms;
2050
2051
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 437605 times.
437605 av_assert0(mux_idx < sch->nb_mux);
2052 437605 mux = &sch->mux[mux_idx];
2053
2054
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 437605 times.
437605 av_assert0(stream_idx < mux->nb_streams);
2055 437605 ms = &mux->streams[stream_idx];
2056
2057
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 437605 times.
437610 for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2058 5 SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2059 int ret;
2060
2061 5 ret = av_packet_copy_props(mux->sub_heartbeat_pkt, pkt);
2062
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
5 if (ret < 0)
2063 return ret;
2064
2065 5 tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2066 }
2067
2068 437605 return 0;
2069 }
2070
2071 6780 static int mux_done(Scheduler *sch, unsigned mux_idx)
2072 {
2073 6780 SchMux *mux = &sch->mux[mux_idx];
2074
2075 6780 pthread_mutex_lock(&sch->schedule_lock);
2076
2077
2/2
✓ Branch 0 taken 7132 times.
✓ Branch 1 taken 6780 times.
13912 for (unsigned i = 0; i < mux->nb_streams; i++) {
2078 7132 tq_receive_finish(mux->queue, i);
2079 7132 mux->streams[i].source_finished = 1;
2080 }
2081
2082 6780 schedule_update_locked(sch);
2083
2084 6780 pthread_mutex_unlock(&sch->schedule_lock);
2085
2086 6780 pthread_mutex_lock(&sch->mux_done_lock);
2087
2088
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6780 times.
6780 av_assert0(sch->nb_mux_done < sch->nb_mux);
2089 6780 sch->nb_mux_done++;
2090
2091 6780 pthread_cond_signal(&sch->mux_done_cond);
2092
2093 6780 pthread_mutex_unlock(&sch->mux_done_lock);
2094
2095 6780 return 0;
2096 }
2097
2098 368712 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2099 {
2100 SchDec *dec;
2101 int ret, dummy;
2102
2103
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 368712 times.
368712 av_assert0(dec_idx < sch->nb_dec);
2104 368712 dec = &sch->dec[dec_idx];
2105
2106 // the decoder should have given us post-flush end timestamp in pkt
2107
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 368709 times.
368712 if (dec->expect_end_ts) {
2108 3 Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2109 3 ret = av_thread_message_queue_send(dec->queue_end_ts, &ts, 0);
2110
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret < 0)
2111 return ret;
2112
2113 3 dec->expect_end_ts = 0;
2114 }
2115
2116 368712 ret = tq_receive(dec->queue, &dummy, pkt);
2117
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 368712 times.
368712 av_assert0(dummy <= 0);
2118
2119 // got a flush packet, on the next call to this function the decoder
2120 // will give us post-flush end timestamp
2121
7/8
✓ Branch 0 taken 365439 times.
✓ Branch 1 taken 3273 times.
✓ Branch 2 taken 958 times.
✓ Branch 3 taken 364481 times.
✓ Branch 4 taken 958 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 3 times.
✓ Branch 7 taken 955 times.
368712 if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2122 3 dec->expect_end_ts = 1;
2123
2124 368712 return ret;
2125 }
2126
2127 396672 static int send_to_filter(Scheduler *sch, SchFilterGraph *fg,
2128 unsigned in_idx, AVFrame *frame)
2129 {
2130
2/2
✓ Branch 0 taken 390259 times.
✓ Branch 1 taken 6413 times.
396672 if (frame)
2131 390259 return tq_send(fg->queue, in_idx, frame);
2132
2133
1/2
✓ Branch 0 taken 6413 times.
✗ Branch 1 not taken.
6413 if (!fg->inputs[in_idx].send_finished) {
2134 6413 fg->inputs[in_idx].send_finished = 1;
2135 6413 tq_send_finish(fg->queue, in_idx);
2136
2137 // close the control stream when all actual inputs are done
2138
2/2
✓ Branch 0 taken 6343 times.
✓ Branch 1 taken 70 times.
6413 if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
2139 6343 tq_send_finish(fg->queue, fg->nb_inputs);
2140 }
2141 6413 return 0;
2142 }
2143
2144 400771 static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2145 uint8_t *dst_finished, AVFrame *frame)
2146 {
2147 int ret;
2148
2149
2/2
✓ Branch 0 taken 6357 times.
✓ Branch 1 taken 394414 times.
400771 if (*dst_finished)
2150 6357 return AVERROR_EOF;
2151
2152
2/2
✓ Branch 0 taken 3276 times.
✓ Branch 1 taken 391138 times.
394414 if (!frame)
2153 3276 goto finish;
2154
2155 782276 ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2156
2/2
✓ Branch 0 taken 390259 times.
✓ Branch 1 taken 879 times.
391138 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2157 879 send_to_enc(sch, &sch->enc[dst.idx], frame);
2158
2/2
✓ Branch 0 taken 3175 times.
✓ Branch 1 taken 387963 times.
391138 if (ret == AVERROR_EOF)
2159 3175 goto finish;
2160
2161 387963 return ret;
2162
2163 6451 finish:
2164
2/2
✓ Branch 0 taken 6413 times.
✓ Branch 1 taken 38 times.
6451 if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2165 6413 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2166 else
2167 38 send_to_enc(sch, &sch->enc[dst.idx], NULL);
2168
2169 6451 *dst_finished = 1;
2170
2171 6451 return AVERROR_EOF;
2172 }
2173
2174 392978 int sch_dec_send(Scheduler *sch, unsigned dec_idx, AVFrame *frame)
2175 {
2176 SchDec *dec;
2177 int ret;
2178 392978 unsigned nb_done = 0;
2179
2180
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 392978 times.
392978 av_assert0(dec_idx < sch->nb_dec);
2181 392978 dec = &sch->dec[dec_idx];
2182
2183
2/2
✓ Branch 0 taken 394320 times.
✓ Branch 1 taken 392978 times.
787298 for (unsigned i = 0; i < dec->nb_dst; i++) {
2184 394320 uint8_t *finished = &dec->dst_finished[i];
2185 394320 AVFrame *to_send = frame;
2186
2187 // sending a frame consumes it, so make a temporary reference if needed
2188
2/2
✓ Branch 0 taken 1342 times.
✓ Branch 1 taken 392978 times.
394320 if (i < dec->nb_dst - 1) {
2189 1342 to_send = dec->send_frame;
2190
2191 // frame may sometimes contain props only,
2192 // e.g. to signal EOF timestamp
2193
2/2
✓ Branch 0 taken 1250 times.
✓ Branch 1 taken 92 times.
1342 ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2194 92 av_frame_copy_props(to_send, frame);
2195
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1342 times.
1342 if (ret < 0)
2196 return ret;
2197 }
2198
2199 394320 ret = dec_send_to_dst(sch, dec->dst[i], finished, to_send);
2200
2/2
✓ Branch 0 taken 6357 times.
✓ Branch 1 taken 387963 times.
394320 if (ret < 0) {
2201 6357 av_frame_unref(to_send);
2202
1/2
✓ Branch 0 taken 6357 times.
✗ Branch 1 not taken.
6357 if (ret == AVERROR_EOF) {
2203 6357 nb_done++;
2204 6357 continue;
2205 }
2206 return ret;
2207 }
2208 }
2209
2210
2/2
✓ Branch 0 taken 6279 times.
✓ Branch 1 taken 386699 times.
392978 return (nb_done == dec->nb_dst) ? AVERROR_EOF : 0;
2211 }
2212
2213 6405 static int dec_done(Scheduler *sch, unsigned dec_idx)
2214 {
2215 6405 SchDec *dec = &sch->dec[dec_idx];
2216 6405 int ret = 0;
2217
2218 6405 tq_receive_finish(dec->queue, 0);
2219
2220 // make sure our source does not get stuck waiting for end timestamps
2221 // that will never arrive
2222
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6404 times.
6405 if (dec->queue_end_ts)
2223 1 av_thread_message_queue_set_err_recv(dec->queue_end_ts, AVERROR_EOF);
2224
2225
2/2
✓ Branch 0 taken 6451 times.
✓ Branch 1 taken 6405 times.
12856 for (unsigned i = 0; i < dec->nb_dst; i++) {
2226 6451 int err = dec_send_to_dst(sch, dec->dst[i], &dec->dst_finished[i], NULL);
2227
2/4
✓ Branch 0 taken 6451 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6451 times.
6451 if (err < 0 && err != AVERROR_EOF)
2228 ret = err_merge(ret, err);
2229 }
2230
2231 6405 return ret;
2232 }
2233
2234 419548 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2235 {
2236 SchEnc *enc;
2237 int ret, dummy;
2238
2239
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 419548 times.
419548 av_assert0(enc_idx < sch->nb_enc);
2240 419548 enc = &sch->enc[enc_idx];
2241
2242 419548 ret = tq_receive(enc->queue, &dummy, frame);
2243
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 419548 times.
419548 av_assert0(dummy <= 0);
2244
2245 419548 return ret;
2246 }
2247
2248 413466 static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2249 uint8_t *dst_finished, AVPacket *pkt)
2250 {
2251 int ret;
2252
2253
2/2
✓ Branch 0 taken 44 times.
✓ Branch 1 taken 413422 times.
413466 if (*dst_finished)
2254 44 return AVERROR_EOF;
2255
2256
2/2
✓ Branch 0 taken 6486 times.
✓ Branch 1 taken 406936 times.
413422 if (!pkt)
2257 6486 goto finish;
2258
2259 813872 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2260
2/2
✓ Branch 0 taken 406886 times.
✓ Branch 1 taken 50 times.
406936 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2261 50 tq_send(sch->dec[dst.idx].queue, 0, pkt);
2262
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 406934 times.
406936 if (ret == AVERROR_EOF)
2263 2 goto finish;
2264
2265 406934 return ret;
2266
2267 6488 finish:
2268
2/2
✓ Branch 0 taken 6487 times.
✓ Branch 1 taken 1 times.
6488 if (dst.type == SCH_NODE_TYPE_MUX)
2269 6487 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2270 else
2271 1 tq_send_finish(sch->dec[dst.idx].queue, 0);
2272
2273 6488 *dst_finished = 1;
2274
2275 6488 return AVERROR_EOF;
2276 }
2277
2278 406928 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2279 {
2280 SchEnc *enc;
2281 int ret;
2282
2283
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 406928 times.
406928 av_assert0(enc_idx < sch->nb_enc);
2284 406928 enc = &sch->enc[enc_idx];
2285
2286
2/2
✓ Branch 0 taken 406978 times.
✓ Branch 1 taken 406928 times.
813906 for (unsigned i = 0; i < enc->nb_dst; i++) {
2287 406978 uint8_t *finished = &enc->dst_finished[i];
2288 406978 AVPacket *to_send = pkt;
2289
2290 // sending a packet consumes it, so make a temporary reference if needed
2291
2/2
✓ Branch 0 taken 50 times.
✓ Branch 1 taken 406928 times.
406978 if (i < enc->nb_dst - 1) {
2292 50 to_send = enc->send_pkt;
2293
2294 50 ret = av_packet_ref(to_send, pkt);
2295
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50 times.
50 if (ret < 0)
2296 return ret;
2297 }
2298
2299 406978 ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2300
2/2
✓ Branch 0 taken 44 times.
✓ Branch 1 taken 406934 times.
406978 if (ret < 0) {
2301 44 av_packet_unref(to_send);
2302
1/2
✓ Branch 0 taken 44 times.
✗ Branch 1 not taken.
44 if (ret == AVERROR_EOF)
2303 44 continue;
2304 return ret;
2305 }
2306 }
2307
2308 406928 return 0;
2309 }
2310
2311 6487 static int enc_done(Scheduler *sch, unsigned enc_idx)
2312 {
2313 6487 SchEnc *enc = &sch->enc[enc_idx];
2314 6487 int ret = 0;
2315
2316 6487 tq_receive_finish(enc->queue, 0);
2317
2318
2/2
✓ Branch 0 taken 6488 times.
✓ Branch 1 taken 6487 times.
12975 for (unsigned i = 0; i < enc->nb_dst; i++) {
2319 6488 int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2320
2/4
✓ Branch 0 taken 6488 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6488 times.
6488 if (err < 0 && err != AVERROR_EOF)
2321 ret = err_merge(ret, err);
2322 }
2323
2324 6487 return ret;
2325 }
2326
2327 365624 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
2328 unsigned *in_idx, AVFrame *frame)
2329 {
2330 SchFilterGraph *fg;
2331
2332
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 365624 times.
365624 av_assert0(fg_idx < sch->nb_filters);
2333 365624 fg = &sch->filters[fg_idx];
2334
2335
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 365624 times.
365624 av_assert0(*in_idx <= fg->nb_inputs);
2336
2337 // update scheduling to account for desired input stream, if it changed
2338 //
2339 // this check needs no locking because only the filtering thread
2340 // updates this value
2341
2/2
✓ Branch 0 taken 819 times.
✓ Branch 1 taken 364805 times.
365624 if (*in_idx != fg->best_input) {
2342 819 pthread_mutex_lock(&sch->schedule_lock);
2343
2344 819 fg->best_input = *in_idx;
2345 819 schedule_update_locked(sch);
2346
2347 819 pthread_mutex_unlock(&sch->schedule_lock);
2348 }
2349
2350
2/2
✓ Branch 0 taken 363938 times.
✓ Branch 1 taken 1686 times.
365624 if (*in_idx == fg->nb_inputs) {
2351 1686 int terminate = waiter_wait(sch, &fg->waiter);
2352
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1686 times.
1686 return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
2353 }
2354
2355 13 while (1) {
2356 int ret, idx;
2357
2358 363951 ret = tq_receive(fg->queue, &idx, frame);
2359
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 363951 times.
363951 if (idx < 0)
2360 363938 return AVERROR_EOF;
2361
2/2
✓ Branch 0 taken 363938 times.
✓ Branch 1 taken 13 times.
363951 else if (ret >= 0) {
2362 363938 *in_idx = idx;
2363 363938 return 0;
2364 }
2365
2366 // disregard EOFs for specific streams - they should always be
2367 // preceded by an EOF frame
2368 }
2369 }
2370
2371 1 void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
2372 {
2373 SchFilterGraph *fg;
2374 SchFilterIn *fi;
2375
2376
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(fg_idx < sch->nb_filters);
2377 1 fg = &sch->filters[fg_idx];
2378
2379
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(in_idx < fg->nb_inputs);
2380 1 fi = &fg->inputs[in_idx];
2381
2382
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!fi->receive_finished) {
2383 1 fi->receive_finished = 1;
2384 1 tq_receive_finish(fg->queue, in_idx);
2385
2386 // close the control stream when all actual inputs are done
2387
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
2388 tq_receive_finish(fg->queue, fg->nb_inputs);
2389 }
2390 1 }
2391
2392 365129 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
2393 {
2394 SchFilterGraph *fg;
2395 SchedulerNode dst;
2396
2397
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 365129 times.
365129 av_assert0(fg_idx < sch->nb_filters);
2398 365129 fg = &sch->filters[fg_idx];
2399
2400
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 365129 times.
365129 av_assert0(out_idx < fg->nb_outputs);
2401 365129 dst = fg->outputs[out_idx].dst;
2402
2403 365129 return (dst.type == SCH_NODE_TYPE_ENC) ?
2404
1/2
✓ Branch 0 taken 365129 times.
✗ Branch 1 not taken.
365129 send_to_enc (sch, &sch->enc[dst.idx], frame) :
2405 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
2406 }
2407
2408 6399 static int filter_done(Scheduler *sch, unsigned fg_idx)
2409 {
2410 6399 SchFilterGraph *fg = &sch->filters[fg_idx];
2411 6399 int ret = 0;
2412
2413
2/2
✓ Branch 0 taken 12812 times.
✓ Branch 1 taken 6399 times.
19211 for (unsigned i = 0; i <= fg->nb_inputs; i++)
2414 12812 tq_receive_finish(fg->queue, i);
2415
2416
2/2
✓ Branch 0 taken 6449 times.
✓ Branch 1 taken 6399 times.
12848 for (unsigned i = 0; i < fg->nb_outputs; i++) {
2417 6449 SchedulerNode dst = fg->outputs[i].dst;
2418 12898 int err = (dst.type == SCH_NODE_TYPE_ENC) ?
2419
1/2
✓ Branch 0 taken 6449 times.
✗ Branch 1 not taken.
6449 send_to_enc (sch, &sch->enc[dst.idx], NULL) :
2420 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2421
2422
3/4
✓ Branch 0 taken 2804 times.
✓ Branch 1 taken 3645 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2804 times.
6449 if (err < 0 && err != AVERROR_EOF)
2423 ret = err_merge(ret, err);
2424 }
2425
2426 6399 pthread_mutex_lock(&sch->schedule_lock);
2427
2428 6399 fg->task_exited = 1;
2429
2430 6399 schedule_update_locked(sch);
2431
2432 6399 pthread_mutex_unlock(&sch->schedule_lock);
2433
2434 6399 return ret;
2435 }
2436
2437 int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
2438 {
2439 SchFilterGraph *fg;
2440
2441 av_assert0(fg_idx < sch->nb_filters);
2442 fg = &sch->filters[fg_idx];
2443
2444 return send_to_filter(sch, fg, fg->nb_inputs, frame);
2445 }
2446
2447 32880 static int task_cleanup(Scheduler *sch, SchedulerNode node)
2448 {
2449
5/6
✓ Branch 0 taken 6809 times.
✓ Branch 1 taken 6780 times.
✓ Branch 2 taken 6405 times.
✓ Branch 3 taken 6487 times.
✓ Branch 4 taken 6399 times.
✗ Branch 5 not taken.
32880 switch (node.type) {
2450 6809 case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
2451 6780 case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
2452 6405 case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
2453 6487 case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
2454 6399 case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
2455 default: av_assert0(0);
2456 }
2457 }
2458
2459 32866 static void *task_wrapper(void *arg)
2460 {
2461 32866 SchTask *task = arg;
2462 32866 Scheduler *sch = task->parent;
2463 int ret;
2464 32866 int err = 0;
2465
2466 32866 ret = task->func(task->func_arg);
2467
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 32865 times.
32866 if (ret < 0)
2468 1 av_log(task->func_arg, AV_LOG_ERROR,
2469 1 "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
2470
2471 32866 err = task_cleanup(sch, task->node);
2472 32866 ret = err_merge(ret, err);
2473
2474 // EOF is considered normal termination
2475
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 32866 times.
32866 if (ret == AVERROR_EOF)
2476 ret = 0;
2477
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 32865 times.
32866 if (ret < 0)
2478 1 atomic_store(&sch->task_failed, 1);
2479
2480
4/4
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 32865 times.
✓ Branch 2 taken 1 times.
✓ Branch 3 taken 32865 times.
32867 av_log(task->func_arg, ret < 0 ? AV_LOG_ERROR : AV_LOG_VERBOSE,
2481 "Terminating thread with return code %d (%s)\n", ret,
2482 1 ret < 0 ? av_err2str(ret) : "success");
2483
2484 32866 return (void*)(intptr_t)ret;
2485 }
2486
2487 32880 static int task_stop(Scheduler *sch, SchTask *task)
2488 {
2489 int ret;
2490 void *thread_ret;
2491
2492
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 32866 times.
32880 if (!task->thread_running)
2493 14 return task_cleanup(sch, task->node);
2494
2495 32866 ret = pthread_join(task->thread, &thread_ret);
2496
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 32866 times.
32866 av_assert0(ret == 0);
2497
2498 32866 task->thread_running = 0;
2499
2500 32866 return (intptr_t)thread_ret;
2501 }
2502
2503 13557 int sch_stop(Scheduler *sch, int64_t *finish_ts)
2504 {
2505 13557 int ret = 0, err;
2506
2507
2/2
✓ Branch 0 taken 6779 times.
✓ Branch 1 taken 6778 times.
13557 if (sch->state != SCH_STATE_STARTED)
2508 6779 return 0;
2509
2510 6778 atomic_store(&sch->terminate, 1);
2511
2512
2/2
✓ Branch 0 taken 13556 times.
✓ Branch 1 taken 6778 times.
20334 for (unsigned type = 0; type < 2; type++)
2513
4/4
✓ Branch 0 taken 13587 times.
✓ Branch 1 taken 13177 times.
✓ Branch 2 taken 13208 times.
✓ Branch 3 taken 13556 times.
26764 for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
2514
2/2
✓ Branch 0 taken 6809 times.
✓ Branch 1 taken 6399 times.
13208 SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
2515 13208 waiter_set(w, 1);
2516 }
2517
2518
2/2
✓ Branch 0 taken 6809 times.
✓ Branch 1 taken 6778 times.
13587 for (unsigned i = 0; i < sch->nb_demux; i++) {
2519 6809 SchDemux *d = &sch->demux[i];
2520
2521 6809 err = task_stop(sch, &d->task);
2522 6809 ret = err_merge(ret, err);
2523 }
2524
2525
2/2
✓ Branch 0 taken 6405 times.
✓ Branch 1 taken 6778 times.
13183 for (unsigned i = 0; i < sch->nb_dec; i++) {
2526 6405 SchDec *dec = &sch->dec[i];
2527
2528 6405 err = task_stop(sch, &dec->task);
2529 6405 ret = err_merge(ret, err);
2530 }
2531
2532
2/2
✓ Branch 0 taken 6399 times.
✓ Branch 1 taken 6778 times.
13177 for (unsigned i = 0; i < sch->nb_filters; i++) {
2533 6399 SchFilterGraph *fg = &sch->filters[i];
2534
2535 6399 err = task_stop(sch, &fg->task);
2536 6399 ret = err_merge(ret, err);
2537 }
2538
2539
2/2
✓ Branch 0 taken 6487 times.
✓ Branch 1 taken 6778 times.
13265 for (unsigned i = 0; i < sch->nb_enc; i++) {
2540 6487 SchEnc *enc = &sch->enc[i];
2541
2542 6487 err = task_stop(sch, &enc->task);
2543 6487 ret = err_merge(ret, err);
2544 }
2545
2546
2/2
✓ Branch 0 taken 6780 times.
✓ Branch 1 taken 6778 times.
13558 for (unsigned i = 0; i < sch->nb_mux; i++) {
2547 6780 SchMux *mux = &sch->mux[i];
2548
2549 6780 err = task_stop(sch, &mux->task);
2550 6780 ret = err_merge(ret, err);
2551 }
2552
2553
1/2
✓ Branch 0 taken 6778 times.
✗ Branch 1 not taken.
6778 if (finish_ts)
2554 6778 *finish_ts = trailing_dts(sch, 1);
2555
2556 6778 sch->state = SCH_STATE_STOPPED;
2557
2558 6778 return ret;
2559 }
2560