FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/fftools/ffmpeg_sched.c
Date: 2024-07-17 14:05:47
Exec Total Coverage
Lines: 1031 1185 87.0%
Functions: 64 66 97.0%
Branches: 567 789 71.9%

Line Branch Exec Source
1 /*
2 * Inter-thread scheduling/synchronization.
3 * Copyright (c) 2023 Anton Khirnov
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31
32 #include "libavcodec/packet.h"
33
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
40 #include "libavutil/threadmessage.h"
41 #include "libavutil/time.h"
42
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
46
47 enum QueueType {
48 QUEUE_PACKETS,
49 QUEUE_FRAMES,
50 };
51
52 typedef struct SchWaiter {
53 pthread_mutex_t lock;
54 pthread_cond_t cond;
55 atomic_int choked;
56
57 // the following are internal state of schedule_update_locked() and must not
58 // be accessed outside of it
59 int choked_prev;
60 int choked_next;
61 } SchWaiter;
62
63 typedef struct SchTask {
64 Scheduler *parent;
65 SchedulerNode node;
66
67 SchThreadFunc func;
68 void *func_arg;
69
70 pthread_t thread;
71 int thread_running;
72 } SchTask;
73
74 typedef struct SchDec {
75 const AVClass *class;
76
77 SchedulerNode src;
78 SchedulerNode *dst;
79 uint8_t *dst_finished;
80 unsigned nb_dst;
81
82 SchTask task;
83 // Queue for receiving input packets, one stream.
84 ThreadQueue *queue;
85
86 // Queue for sending post-flush end timestamps back to the source
87 AVThreadMessageQueue *queue_end_ts;
88 int expect_end_ts;
89
90 // temporary storage used by sch_dec_send()
91 AVFrame *send_frame;
92 } SchDec;
93
94 typedef struct SchSyncQueue {
95 SyncQueue *sq;
96 AVFrame *frame;
97 pthread_mutex_t lock;
98
99 unsigned *enc_idx;
100 unsigned nb_enc_idx;
101 } SchSyncQueue;
102
103 typedef struct SchEnc {
104 const AVClass *class;
105
106 SchedulerNode src;
107 SchedulerNode *dst;
108 uint8_t *dst_finished;
109 unsigned nb_dst;
110
111 // [0] - index of the sync queue in Scheduler.sq_enc,
112 // [1] - index of this encoder in the sq
113 int sq_idx[2];
114
115 /* Opening encoders is somewhat nontrivial due to their interaction with
116 * sync queues, which are (among other things) responsible for maintaining
117 * constant audio frame size, when it is required by the encoder.
118 *
119 * Opening the encoder requires stream parameters, obtained from the first
120 * frame. However, that frame cannot be properly chunked by the sync queue
121 * without knowing the required frame size, which is only available after
122 * opening the encoder.
123 *
124 * This apparent circular dependency is resolved in the following way:
125 * - the caller creating the encoder gives us a callback which opens the
126 * encoder and returns the required frame size (if any)
127 * - when the first frame is sent to the encoder, the sending thread
128 * - calls this callback, opening the encoder
129 * - passes the returned frame size to the sync queue
130 */
131 int (*open_cb)(void *opaque, const AVFrame *frame);
132 int opened;
133
134 SchTask task;
135 // Queue for receiving input frames, one stream.
136 ThreadQueue *queue;
137 // tq_send() to queue returned EOF
138 int in_finished;
139
140 // temporary storage used by sch_enc_send()
141 AVPacket *send_pkt;
142 } SchEnc;
143
144 typedef struct SchDemuxStream {
145 SchedulerNode *dst;
146 uint8_t *dst_finished;
147 unsigned nb_dst;
148 } SchDemuxStream;
149
150 typedef struct SchDemux {
151 const AVClass *class;
152
153 SchDemuxStream *streams;
154 unsigned nb_streams;
155
156 SchTask task;
157 SchWaiter waiter;
158
159 // temporary storage used by sch_demux_send()
160 AVPacket *send_pkt;
161
162 // protected by schedule_lock
163 int task_exited;
164 } SchDemux;
165
166 typedef struct PreMuxQueue {
167 /**
168 * Queue for buffering the packets before the muxer task can be started.
169 */
170 AVFifo *fifo;
171 /**
172 * Maximum number of packets in fifo.
173 */
174 int max_packets;
175 /*
176 * The size of the AVPackets' buffers in queue.
177 * Updated when a packet is either pushed or pulled from the queue.
178 */
179 size_t data_size;
180 /* Threshold after which max_packets will be in effect */
181 size_t data_threshold;
182 } PreMuxQueue;
183
184 typedef struct SchMuxStream {
185 SchedulerNode src;
186 SchedulerNode src_sched;
187
188 unsigned *sub_heartbeat_dst;
189 unsigned nb_sub_heartbeat_dst;
190
191 PreMuxQueue pre_mux_queue;
192
193 // an EOF was generated while flushing the pre-mux queue
194 int init_eof;
195
196 ////////////////////////////////////////////////////////////
197 // The following are protected by Scheduler.schedule_lock //
198
199 /* dts+duration of the last packet sent to this stream
200 in AV_TIME_BASE_Q */
201 int64_t last_dts;
202 // this stream no longer accepts input
203 int source_finished;
204 ////////////////////////////////////////////////////////////
205 } SchMuxStream;
206
207 typedef struct SchMux {
208 const AVClass *class;
209
210 SchMuxStream *streams;
211 unsigned nb_streams;
212 unsigned nb_streams_ready;
213
214 int (*init)(void *arg);
215
216 SchTask task;
217 /**
218 * Set to 1 after starting the muxer task and flushing the
219 * pre-muxing queues.
220 * Set either before any tasks have started, or with
221 * Scheduler.mux_ready_lock held.
222 */
223 atomic_int mux_started;
224 ThreadQueue *queue;
225 unsigned queue_size;
226
227 AVPacket *sub_heartbeat_pkt;
228 } SchMux;
229
230 typedef struct SchFilterIn {
231 SchedulerNode src;
232 SchedulerNode src_sched;
233 int send_finished;
234 int receive_finished;
235 } SchFilterIn;
236
237 typedef struct SchFilterOut {
238 SchedulerNode dst;
239 } SchFilterOut;
240
241 typedef struct SchFilterGraph {
242 const AVClass *class;
243
244 SchFilterIn *inputs;
245 unsigned nb_inputs;
246 atomic_uint nb_inputs_finished_send;
247 unsigned nb_inputs_finished_receive;
248
249 SchFilterOut *outputs;
250 unsigned nb_outputs;
251
252 SchTask task;
253 // input queue, nb_inputs+1 streams
254 // last stream is control
255 ThreadQueue *queue;
256 SchWaiter waiter;
257
258 // protected by schedule_lock
259 unsigned best_input;
260 int task_exited;
261 } SchFilterGraph;
262
263 enum SchedulerState {
264 SCH_STATE_UNINIT,
265 SCH_STATE_STARTED,
266 SCH_STATE_STOPPED,
267 };
268
269 struct Scheduler {
270 const AVClass *class;
271
272 SchDemux *demux;
273 unsigned nb_demux;
274
275 SchMux *mux;
276 unsigned nb_mux;
277
278 unsigned nb_mux_ready;
279 pthread_mutex_t mux_ready_lock;
280
281 unsigned nb_mux_done;
282 pthread_mutex_t mux_done_lock;
283 pthread_cond_t mux_done_cond;
284
285
286 SchDec *dec;
287 unsigned nb_dec;
288
289 SchEnc *enc;
290 unsigned nb_enc;
291
292 SchSyncQueue *sq_enc;
293 unsigned nb_sq_enc;
294
295 SchFilterGraph *filters;
296 unsigned nb_filters;
297
298 char *sdp_filename;
299 int sdp_auto;
300
301 enum SchedulerState state;
302 atomic_int terminate;
303 atomic_int task_failed;
304
305 pthread_mutex_t schedule_lock;
306
307 atomic_int_least64_t last_dts;
308 };
309
310 /**
311 * Wait until this task is allowed to proceed.
312 *
313 * @retval 0 the caller should proceed
314 * @retval 1 the caller should terminate
315 */
316 460833 static int waiter_wait(Scheduler *sch, SchWaiter *w)
317 {
318 int terminate;
319
320
2/2
✓ Branch 0 taken 460535 times.
✓ Branch 1 taken 298 times.
460833 if (!atomic_load(&w->choked))
321 460535 return 0;
322
323 298 pthread_mutex_lock(&w->lock);
324
325
4/4
✓ Branch 0 taken 308 times.
✓ Branch 1 taken 235 times.
✓ Branch 2 taken 245 times.
✓ Branch 3 taken 63 times.
543 while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
326 245 pthread_cond_wait(&w->cond, &w->lock);
327
328 298 terminate = atomic_load(&sch->terminate);
329
330 298 pthread_mutex_unlock(&w->lock);
331
332 298 return terminate;
333 }
334
335 30539 static void waiter_set(SchWaiter *w, int choked)
336 {
337 30539 pthread_mutex_lock(&w->lock);
338
339 30539 atomic_store(&w->choked, choked);
340 30539 pthread_cond_signal(&w->cond);
341
342 30539 pthread_mutex_unlock(&w->lock);
343 30539 }
344
345 13204 static int waiter_init(SchWaiter *w)
346 {
347 int ret;
348
349 13204 atomic_init(&w->choked, 0);
350
351 13204 ret = pthread_mutex_init(&w->lock, NULL);
352
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13204 times.
13204 if (ret)
353 return AVERROR(ret);
354
355 13204 ret = pthread_cond_init(&w->cond, NULL);
356
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13204 times.
13204 if (ret)
357 return AVERROR(ret);
358
359 13204 return 0;
360 }
361
362 13204 static void waiter_uninit(SchWaiter *w)
363 {
364 13204 pthread_mutex_destroy(&w->lock);
365 13204 pthread_cond_destroy(&w->cond);
366 13204 }
367
368 26063 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
369 enum QueueType type)
370 {
371 ThreadQueue *tq;
372 ObjPool *op;
373
374
1/2
✓ Branch 0 taken 26063 times.
✗ Branch 1 not taken.
26063 if (queue_size <= 0) {
375
2/2
✓ Branch 0 taken 12882 times.
✓ Branch 1 taken 13181 times.
26063 if (type == QUEUE_FRAMES)
376 12882 queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
377 else
378 13181 queue_size = DEFAULT_PACKET_THREAD_QUEUE_SIZE;
379 }
380
381
2/2
✓ Branch 0 taken 12882 times.
✓ Branch 1 taken 13181 times.
26063 if (type == QUEUE_FRAMES) {
382 // This queue length is used in the decoder code to ensure that
383 // there are enough entries in fixed-size frame pools to account
384 // for frames held in queues inside the ffmpeg utility. If this
385 // can ever dynamically change then the corresponding decode
386 // code needs to be updated as well.
387
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12882 times.
12882 av_assert0(queue_size == DEFAULT_FRAME_THREAD_QUEUE_SIZE);
388 }
389
390
2/2
✓ Branch 0 taken 13181 times.
✓ Branch 1 taken 12882 times.
26063 op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() :
391 12882 objpool_alloc_frames();
392
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26063 times.
26063 if (!op)
393 return AVERROR(ENOMEM);
394
395
2/2
✓ Branch 0 taken 13181 times.
✓ Branch 1 taken 12882 times.
26063 tq = tq_alloc(nb_streams, queue_size, op,
396 (type == QUEUE_PACKETS) ? pkt_move : frame_move);
397
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26063 times.
26063 if (!tq) {
398 objpool_free(&op);
399 return AVERROR(ENOMEM);
400 }
401
402 26063 *ptq = tq;
403 26063 return 0;
404 }
405
406 static void *task_wrapper(void *arg);
407
408 32856 static int task_start(SchTask *task)
409 {
410 int ret;
411
412 32856 av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
413
414
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 32856 times.
32856 av_assert0(!task->thread_running);
415
416 32856 ret = pthread_create(&task->thread, NULL, task_wrapper, task);
417
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 32856 times.
32856 if (ret) {
418 av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
419 strerror(ret));
420 return AVERROR(ret);
421 }
422
423 32856 task->thread_running = 1;
424 32856 return 0;
425 }
426
427 32870 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
428 SchThreadFunc func, void *func_arg)
429 {
430 32870 task->parent = sch;
431
432 32870 task->node.type = type;
433 32870 task->node.idx = idx;
434
435 32870 task->func = func;
436 32870 task->func_arg = func_arg;
437 32870 }
438
439 506846 static int64_t trailing_dts(const Scheduler *sch, int count_finished)
440 {
441 506846 int64_t min_dts = INT64_MAX;
442
443
2/2
✓ Branch 0 taken 507091 times.
✓ Branch 1 taken 493039 times.
1000130 for (unsigned i = 0; i < sch->nb_mux; i++) {
444 507091 const SchMux *mux = &sch->mux[i];
445
446
2/2
✓ Branch 0 taken 544860 times.
✓ Branch 1 taken 493284 times.
1038144 for (unsigned j = 0; j < mux->nb_streams; j++) {
447 544860 const SchMuxStream *ms = &mux->streams[j];
448
449
4/4
✓ Branch 0 taken 37376 times.
✓ Branch 1 taken 507484 times.
✓ Branch 2 taken 30282 times.
✓ Branch 3 taken 7094 times.
544860 if (ms->source_finished && !count_finished)
450 30282 continue;
451
2/2
✓ Branch 0 taken 13807 times.
✓ Branch 1 taken 500771 times.
514578 if (ms->last_dts == AV_NOPTS_VALUE)
452 13807 return AV_NOPTS_VALUE;
453
454 500771 min_dts = FFMIN(min_dts, ms->last_dts);
455 }
456 }
457
458
2/2
✓ Branch 0 taken 468431 times.
✓ Branch 1 taken 24608 times.
493039 return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
459 }
460
461 6777 void sch_free(Scheduler **psch)
462 {
463 6777 Scheduler *sch = *psch;
464
465
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6777 times.
6777 if (!sch)
466 return;
467
468 6777 sch_stop(sch, NULL);
469
470
2/2
✓ Branch 0 taken 6807 times.
✓ Branch 1 taken 6777 times.
13584 for (unsigned i = 0; i < sch->nb_demux; i++) {
471 6807 SchDemux *d = &sch->demux[i];
472
473
2/2
✓ Branch 0 taken 7024 times.
✓ Branch 1 taken 6807 times.
13831 for (unsigned j = 0; j < d->nb_streams; j++) {
474 7024 SchDemuxStream *ds = &d->streams[j];
475 7024 av_freep(&ds->dst);
476 7024 av_freep(&ds->dst_finished);
477 }
478 6807 av_freep(&d->streams);
479
480 6807 av_packet_free(&d->send_pkt);
481
482 6807 waiter_uninit(&d->waiter);
483 }
484 6777 av_freep(&sch->demux);
485
486
2/2
✓ Branch 0 taken 6778 times.
✓ Branch 1 taken 6777 times.
13555 for (unsigned i = 0; i < sch->nb_mux; i++) {
487 6778 SchMux *mux = &sch->mux[i];
488
489
2/2
✓ Branch 0 taken 7130 times.
✓ Branch 1 taken 6778 times.
13908 for (unsigned j = 0; j < mux->nb_streams; j++) {
490 7130 SchMuxStream *ms = &mux->streams[j];
491
492
1/2
✓ Branch 0 taken 7130 times.
✗ Branch 1 not taken.
7130 if (ms->pre_mux_queue.fifo) {
493 AVPacket *pkt;
494
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 7130 times.
7130 while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
495 av_packet_free(&pkt);
496 7130 av_fifo_freep2(&ms->pre_mux_queue.fifo);
497 }
498
499 7130 av_freep(&ms->sub_heartbeat_dst);
500 }
501 6778 av_freep(&mux->streams);
502
503 6778 av_packet_free(&mux->sub_heartbeat_pkt);
504
505 6778 tq_free(&mux->queue);
506 }
507 6777 av_freep(&sch->mux);
508
509
2/2
✓ Branch 0 taken 6403 times.
✓ Branch 1 taken 6777 times.
13180 for (unsigned i = 0; i < sch->nb_dec; i++) {
510 6403 SchDec *dec = &sch->dec[i];
511
512 6403 tq_free(&dec->queue);
513
514 6403 av_thread_message_queue_free(&dec->queue_end_ts);
515
516 6403 av_freep(&dec->dst);
517 6403 av_freep(&dec->dst_finished);
518
519 6403 av_frame_free(&dec->send_frame);
520 }
521 6777 av_freep(&sch->dec);
522
523
2/2
✓ Branch 0 taken 6485 times.
✓ Branch 1 taken 6777 times.
13262 for (unsigned i = 0; i < sch->nb_enc; i++) {
524 6485 SchEnc *enc = &sch->enc[i];
525
526 6485 tq_free(&enc->queue);
527
528 6485 av_packet_free(&enc->send_pkt);
529
530 6485 av_freep(&enc->dst);
531 6485 av_freep(&enc->dst_finished);
532 }
533 6777 av_freep(&sch->enc);
534
535
2/2
✓ Branch 0 taken 2776 times.
✓ Branch 1 taken 6777 times.
9553 for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
536 2776 SchSyncQueue *sq = &sch->sq_enc[i];
537 2776 sq_free(&sq->sq);
538 2776 av_frame_free(&sq->frame);
539 2776 pthread_mutex_destroy(&sq->lock);
540 2776 av_freep(&sq->enc_idx);
541 }
542 6777 av_freep(&sch->sq_enc);
543
544
2/2
✓ Branch 0 taken 6397 times.
✓ Branch 1 taken 6777 times.
13174 for (unsigned i = 0; i < sch->nb_filters; i++) {
545 6397 SchFilterGraph *fg = &sch->filters[i];
546
547 6397 tq_free(&fg->queue);
548
549 6397 av_freep(&fg->inputs);
550 6397 av_freep(&fg->outputs);
551
552 6397 waiter_uninit(&fg->waiter);
553 }
554 6777 av_freep(&sch->filters);
555
556 6777 av_freep(&sch->sdp_filename);
557
558 6777 pthread_mutex_destroy(&sch->schedule_lock);
559
560 6777 pthread_mutex_destroy(&sch->mux_ready_lock);
561
562 6777 pthread_mutex_destroy(&sch->mux_done_lock);
563 6777 pthread_cond_destroy(&sch->mux_done_cond);
564
565 6777 av_freep(psch);
566 }
567
568 static const AVClass scheduler_class = {
569 .class_name = "Scheduler",
570 .version = LIBAVUTIL_VERSION_INT,
571 };
572
573 6777 Scheduler *sch_alloc(void)
574 {
575 Scheduler *sch;
576 int ret;
577
578 6777 sch = av_mallocz(sizeof(*sch));
579
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6777 times.
6777 if (!sch)
580 return NULL;
581
582 6777 sch->class = &scheduler_class;
583 6777 sch->sdp_auto = 1;
584
585 6777 ret = pthread_mutex_init(&sch->schedule_lock, NULL);
586
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6777 times.
6777 if (ret)
587 goto fail;
588
589 6777 ret = pthread_mutex_init(&sch->mux_ready_lock, NULL);
590
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6777 times.
6777 if (ret)
591 goto fail;
592
593 6777 ret = pthread_mutex_init(&sch->mux_done_lock, NULL);
594
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6777 times.
6777 if (ret)
595 goto fail;
596
597 6777 ret = pthread_cond_init(&sch->mux_done_cond, NULL);
598
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6777 times.
6777 if (ret)
599 goto fail;
600
601 6777 return sch;
602 fail:
603 sch_free(&sch);
604 return NULL;
605 }
606
607 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
608 {
609 av_freep(&sch->sdp_filename);
610 sch->sdp_filename = av_strdup(sdp_filename);
611 return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
612 }
613
614 static const AVClass sch_mux_class = {
615 .class_name = "SchMux",
616 .version = LIBAVUTIL_VERSION_INT,
617 .parent_log_context_offset = offsetof(SchMux, task.func_arg),
618 };
619
620 6778 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
621 void *arg, int sdp_auto, unsigned thread_queue_size)
622 {
623 6778 const unsigned idx = sch->nb_mux;
624
625 SchMux *mux;
626 int ret;
627
628 6778 ret = GROW_ARRAY(sch->mux, sch->nb_mux);
629
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6778 times.
6778 if (ret < 0)
630 return ret;
631
632 6778 mux = &sch->mux[idx];
633 6778 mux->class = &sch_mux_class;
634 6778 mux->init = init;
635 6778 mux->queue_size = thread_queue_size;
636
637 6778 task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
638
639 6778 sch->sdp_auto &= sdp_auto;
640
641 6778 return idx;
642 }
643
644 7130 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
645 {
646 SchMux *mux;
647 SchMuxStream *ms;
648 unsigned stream_idx;
649 int ret;
650
651
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7130 times.
7130 av_assert0(mux_idx < sch->nb_mux);
652 7130 mux = &sch->mux[mux_idx];
653
654 7130 ret = GROW_ARRAY(mux->streams, mux->nb_streams);
655
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7130 times.
7130 if (ret < 0)
656 return ret;
657 7130 stream_idx = mux->nb_streams - 1;
658
659 7130 ms = &mux->streams[stream_idx];
660
661 7130 ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
662
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7130 times.
7130 if (!ms->pre_mux_queue.fifo)
663 return AVERROR(ENOMEM);
664
665 7130 ms->last_dts = AV_NOPTS_VALUE;
666
667 7130 return stream_idx;
668 }
669
670 static const AVClass sch_demux_class = {
671 .class_name = "SchDemux",
672 .version = LIBAVUTIL_VERSION_INT,
673 .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
674 };
675
676 6807 int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
677 {
678 6807 const unsigned idx = sch->nb_demux;
679
680 SchDemux *d;
681 int ret;
682
683 6807 ret = GROW_ARRAY(sch->demux, sch->nb_demux);
684
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6807 times.
6807 if (ret < 0)
685 return ret;
686
687 6807 d = &sch->demux[idx];
688
689 6807 task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
690
691 6807 d->class = &sch_demux_class;
692 6807 d->send_pkt = av_packet_alloc();
693
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6807 times.
6807 if (!d->send_pkt)
694 return AVERROR(ENOMEM);
695
696 6807 ret = waiter_init(&d->waiter);
697
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6807 times.
6807 if (ret < 0)
698 return ret;
699
700 6807 return idx;
701 }
702
703 7024 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
704 {
705 SchDemux *d;
706 int ret;
707
708
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7024 times.
7024 av_assert0(demux_idx < sch->nb_demux);
709 7024 d = &sch->demux[demux_idx];
710
711 7024 ret = GROW_ARRAY(d->streams, d->nb_streams);
712
1/2
✓ Branch 0 taken 7024 times.
✗ Branch 1 not taken.
7024 return ret < 0 ? ret : d->nb_streams - 1;
713 }
714
715 static const AVClass sch_dec_class = {
716 .class_name = "SchDec",
717 .version = LIBAVUTIL_VERSION_INT,
718 .parent_log_context_offset = offsetof(SchDec, task.func_arg),
719 };
720
721 6403 int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx,
722 int send_end_ts)
723 {
724 6403 const unsigned idx = sch->nb_dec;
725
726 SchDec *dec;
727 int ret;
728
729 6403 ret = GROW_ARRAY(sch->dec, sch->nb_dec);
730
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6403 times.
6403 if (ret < 0)
731 return ret;
732
733 6403 dec = &sch->dec[idx];
734
735 6403 task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
736
737 6403 dec->class = &sch_dec_class;
738 6403 dec->send_frame = av_frame_alloc();
739
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6403 times.
6403 if (!dec->send_frame)
740 return AVERROR(ENOMEM);
741
742 6403 ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
743
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6403 times.
6403 if (ret < 0)
744 return ret;
745
746
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6402 times.
6403 if (send_end_ts) {
747 1 ret = av_thread_message_queue_alloc(&dec->queue_end_ts, 1, sizeof(Timestamp));
748
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ret < 0)
749 return ret;
750 }
751
752 6403 return idx;
753 }
754
755 static const AVClass sch_enc_class = {
756 .class_name = "SchEnc",
757 .version = LIBAVUTIL_VERSION_INT,
758 .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
759 };
760
761 6485 int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
762 int (*open_cb)(void *opaque, const AVFrame *frame))
763 {
764 6485 const unsigned idx = sch->nb_enc;
765
766 SchEnc *enc;
767 int ret;
768
769 6485 ret = GROW_ARRAY(sch->enc, sch->nb_enc);
770
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6485 times.
6485 if (ret < 0)
771 return ret;
772
773 6485 enc = &sch->enc[idx];
774
775 6485 enc->class = &sch_enc_class;
776 6485 enc->open_cb = open_cb;
777 6485 enc->sq_idx[0] = -1;
778 6485 enc->sq_idx[1] = -1;
779
780 6485 task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
781
782 6485 enc->send_pkt = av_packet_alloc();
783
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6485 times.
6485 if (!enc->send_pkt)
784 return AVERROR(ENOMEM);
785
786 6485 ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
787
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6485 times.
6485 if (ret < 0)
788 return ret;
789
790 6485 return idx;
791 }
792
793 static const AVClass sch_fg_class = {
794 .class_name = "SchFilterGraph",
795 .version = LIBAVUTIL_VERSION_INT,
796 .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
797 };
798
799 6397 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
800 SchThreadFunc func, void *ctx)
801 {
802 6397 const unsigned idx = sch->nb_filters;
803
804 SchFilterGraph *fg;
805 int ret;
806
807 6397 ret = GROW_ARRAY(sch->filters, sch->nb_filters);
808
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6397 times.
6397 if (ret < 0)
809 return ret;
810 6397 fg = &sch->filters[idx];
811
812 6397 fg->class = &sch_fg_class;
813
814 6397 task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
815
816
2/2
✓ Branch 0 taken 6341 times.
✓ Branch 1 taken 56 times.
6397 if (nb_inputs) {
817 6341 fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
818
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6341 times.
6341 if (!fg->inputs)
819 return AVERROR(ENOMEM);
820 6341 fg->nb_inputs = nb_inputs;
821 }
822
823
1/2
✓ Branch 0 taken 6397 times.
✗ Branch 1 not taken.
6397 if (nb_outputs) {
824 6397 fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
825
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6397 times.
6397 if (!fg->outputs)
826 return AVERROR(ENOMEM);
827 6397 fg->nb_outputs = nb_outputs;
828 }
829
830 6397 ret = waiter_init(&fg->waiter);
831
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6397 times.
6397 if (ret < 0)
832 return ret;
833
834 6397 ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
835
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6397 times.
6397 if (ret < 0)
836 return ret;
837
838 6397 return idx;
839 }
840
841 2776 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
842 {
843 SchSyncQueue *sq;
844 int ret;
845
846 2776 ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
847
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2776 times.
2776 if (ret < 0)
848 return ret;
849 2776 sq = &sch->sq_enc[sch->nb_sq_enc - 1];
850
851 2776 sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
852
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2776 times.
2776 if (!sq->sq)
853 return AVERROR(ENOMEM);
854
855 2776 sq->frame = av_frame_alloc();
856
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2776 times.
2776 if (!sq->frame)
857 return AVERROR(ENOMEM);
858
859 2776 ret = pthread_mutex_init(&sq->lock, NULL);
860
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2776 times.
2776 if (ret)
861 return AVERROR(ret);
862
863 2776 return sq - sch->sq_enc;
864 }
865
866 2821 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
867 int limiting, uint64_t max_frames)
868 {
869 SchSyncQueue *sq;
870 SchEnc *enc;
871 int ret;
872
873
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2821 times.
2821 av_assert0(sq_idx < sch->nb_sq_enc);
874 2821 sq = &sch->sq_enc[sq_idx];
875
876
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2821 times.
2821 av_assert0(enc_idx < sch->nb_enc);
877 2821 enc = &sch->enc[enc_idx];
878
879 2821 ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
880
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2821 times.
2821 if (ret < 0)
881 return ret;
882 2821 sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
883
884 2821 ret = sq_add_stream(sq->sq, limiting);
885
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2821 times.
2821 if (ret < 0)
886 return ret;
887
888 2821 enc->sq_idx[0] = sq_idx;
889 2821 enc->sq_idx[1] = ret;
890
891
2/2
✓ Branch 0 taken 2651 times.
✓ Branch 1 taken 170 times.
2821 if (max_frames != INT64_MAX)
892 2651 sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
893
894 2821 return 0;
895 }
896
897 26429 int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
898 {
899 int ret;
900
901
4/5
✓ Branch 0 taken 7047 times.
✓ Branch 1 taken 6449 times.
✓ Branch 2 taken 6447 times.
✓ Branch 3 taken 6486 times.
✗ Branch 4 not taken.
26429 switch (src.type) {
902 7047 case SCH_NODE_TYPE_DEMUX: {
903 SchDemuxStream *ds;
904
905
2/4
✓ Branch 0 taken 7047 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7047 times.
7047 av_assert0(src.idx < sch->nb_demux &&
906 src.idx_stream < sch->demux[src.idx].nb_streams);
907 7047 ds = &sch->demux[src.idx].streams[src.idx_stream];
908
909 7047 ret = GROW_ARRAY(ds->dst, ds->nb_dst);
910
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7047 times.
7047 if (ret < 0)
911 return ret;
912
913 7047 ds->dst[ds->nb_dst - 1] = dst;
914
915 // demuxed packets go to decoding or streamcopy
916
2/3
✓ Branch 0 taken 6402 times.
✓ Branch 1 taken 645 times.
✗ Branch 2 not taken.
7047 switch (dst.type) {
917 6402 case SCH_NODE_TYPE_DEC: {
918 SchDec *dec;
919
920
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6402 times.
6402 av_assert0(dst.idx < sch->nb_dec);
921 6402 dec = &sch->dec[dst.idx];
922
923
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6402 times.
6402 av_assert0(!dec->src.type);
924 6402 dec->src = src;
925 6402 break;
926 }
927 645 case SCH_NODE_TYPE_MUX: {
928 SchMuxStream *ms;
929
930
2/4
✓ Branch 0 taken 645 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 645 times.
645 av_assert0(dst.idx < sch->nb_mux &&
931 dst.idx_stream < sch->mux[dst.idx].nb_streams);
932 645 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
933
934
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 645 times.
645 av_assert0(!ms->src.type);
935 645 ms->src = src;
936
937 645 break;
938 }
939 default: av_assert0(0);
940 }
941
942 7047 break;
943 }
944 6449 case SCH_NODE_TYPE_DEC: {
945 SchDec *dec;
946
947
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6449 times.
6449 av_assert0(src.idx < sch->nb_dec);
948 6449 dec = &sch->dec[src.idx];
949
950 6449 ret = GROW_ARRAY(dec->dst, dec->nb_dst);
951
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6449 times.
6449 if (ret < 0)
952 return ret;
953
954 6449 dec->dst[dec->nb_dst - 1] = dst;
955
956 // decoded frames go to filters or encoding
957
2/3
✓ Branch 0 taken 6411 times.
✓ Branch 1 taken 38 times.
✗ Branch 2 not taken.
6449 switch (dst.type) {
958 6411 case SCH_NODE_TYPE_FILTER_IN: {
959 SchFilterIn *fi;
960
961
2/4
✓ Branch 0 taken 6411 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6411 times.
6411 av_assert0(dst.idx < sch->nb_filters &&
962 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
963 6411 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
964
965
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6411 times.
6411 av_assert0(!fi->src.type);
966 6411 fi->src = src;
967 6411 break;
968 }
969 38 case SCH_NODE_TYPE_ENC: {
970 SchEnc *enc;
971
972
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 av_assert0(dst.idx < sch->nb_enc);
973 38 enc = &sch->enc[dst.idx];
974
975
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 av_assert0(!enc->src.type);
976 38 enc->src = src;
977 38 break;
978 }
979 default: av_assert0(0);
980 }
981
982 6449 break;
983 }
984 6447 case SCH_NODE_TYPE_FILTER_OUT: {
985 SchFilterOut *fo;
986
987
2/4
✓ Branch 0 taken 6447 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6447 times.
6447 av_assert0(src.idx < sch->nb_filters &&
988 src.idx_stream < sch->filters[src.idx].nb_outputs);
989 6447 fo = &sch->filters[src.idx].outputs[src.idx_stream];
990
991
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6447 times.
6447 av_assert0(!fo->dst.type);
992 6447 fo->dst = dst;
993
994 // filtered frames go to encoding or another filtergraph
995
1/3
✓ Branch 0 taken 6447 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
6447 switch (dst.type) {
996 6447 case SCH_NODE_TYPE_ENC: {
997 SchEnc *enc;
998
999
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6447 times.
6447 av_assert0(dst.idx < sch->nb_enc);
1000 6447 enc = &sch->enc[dst.idx];
1001
1002
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6447 times.
6447 av_assert0(!enc->src.type);
1003 6447 enc->src = src;
1004 6447 break;
1005 }
1006 case SCH_NODE_TYPE_FILTER_IN: {
1007 SchFilterIn *fi;
1008
1009 av_assert0(dst.idx < sch->nb_filters &&
1010 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1011 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1012
1013 av_assert0(!fi->src.type);
1014 fi->src = src;
1015 break;
1016 }
1017 default: av_assert0(0);
1018 }
1019
1020
1021 6447 break;
1022 }
1023 6486 case SCH_NODE_TYPE_ENC: {
1024 SchEnc *enc;
1025
1026
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6486 times.
6486 av_assert0(src.idx < sch->nb_enc);
1027 6486 enc = &sch->enc[src.idx];
1028
1029 6486 ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1030
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6486 times.
6486 if (ret < 0)
1031 return ret;
1032
1033 6486 enc->dst[enc->nb_dst - 1] = dst;
1034
1035 // encoding packets go to muxing or decoding
1036
2/3
✓ Branch 0 taken 6485 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
6486 switch (dst.type) {
1037 6485 case SCH_NODE_TYPE_MUX: {
1038 SchMuxStream *ms;
1039
1040
2/4
✓ Branch 0 taken 6485 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6485 times.
6485 av_assert0(dst.idx < sch->nb_mux &&
1041 dst.idx_stream < sch->mux[dst.idx].nb_streams);
1042 6485 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1043
1044
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6485 times.
6485 av_assert0(!ms->src.type);
1045 6485 ms->src = src;
1046
1047 6485 break;
1048 }
1049 1 case SCH_NODE_TYPE_DEC: {
1050 SchDec *dec;
1051
1052
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(dst.idx < sch->nb_dec);
1053 1 dec = &sch->dec[dst.idx];
1054
1055
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(!dec->src.type);
1056 1 dec->src = src;
1057
1058 1 break;
1059 }
1060 default: av_assert0(0);
1061 }
1062
1063 6486 break;
1064 }
1065 default: av_assert0(0);
1066 }
1067
1068 26429 return 0;
1069 }
1070
1071 6778 static int mux_task_start(SchMux *mux)
1072 {
1073 6778 int ret = 0;
1074
1075 6778 ret = task_start(&mux->task);
1076
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6778 times.
6778 if (ret < 0)
1077 return ret;
1078
1079 /* flush the pre-muxing queues */
1080
2/2
✓ Branch 0 taken 7130 times.
✓ Branch 1 taken 6778 times.
13908 for (unsigned i = 0; i < mux->nb_streams; i++) {
1081 7130 SchMuxStream *ms = &mux->streams[i];
1082 AVPacket *pkt;
1083
1084
2/2
✓ Branch 1 taken 3094 times.
✓ Branch 2 taken 7130 times.
10224 while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0) {
1085
2/2
✓ Branch 0 taken 3038 times.
✓ Branch 1 taken 56 times.
3094 if (pkt) {
1086
2/2
✓ Branch 0 taken 3019 times.
✓ Branch 1 taken 19 times.
3038 if (!ms->init_eof)
1087 3019 ret = tq_send(mux->queue, i, pkt);
1088 3038 av_packet_free(&pkt);
1089
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 3018 times.
3038 if (ret == AVERROR_EOF)
1090 20 ms->init_eof = 1;
1091
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3018 times.
3018 else if (ret < 0)
1092 return ret;
1093 } else
1094 56 tq_send_finish(mux->queue, i);
1095 }
1096 }
1097
1098 6778 atomic_store(&mux->mux_started, 1);
1099
1100 6778 return 0;
1101 }
1102
1103 int print_sdp(const char *filename);
1104
1105 6778 static int mux_init(Scheduler *sch, SchMux *mux)
1106 {
1107 int ret;
1108
1109 6778 ret = mux->init(mux->task.func_arg);
1110
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6778 times.
6778 if (ret < 0)
1111 return ret;
1112
1113 6778 sch->nb_mux_ready++;
1114
1115
2/4
✓ Branch 0 taken 6778 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6778 times.
6778 if (sch->sdp_filename || sch->sdp_auto) {
1116 if (sch->nb_mux_ready < sch->nb_mux)
1117 return 0;
1118
1119 ret = print_sdp(sch->sdp_filename);
1120 if (ret < 0) {
1121 av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1122 return ret;
1123 }
1124
1125 /* SDP is written only after all the muxers are ready, so now we
1126 * start ALL the threads */
1127 for (unsigned i = 0; i < sch->nb_mux; i++) {
1128 ret = mux_task_start(&sch->mux[i]);
1129 if (ret < 0)
1130 return ret;
1131 }
1132 } else {
1133 6778 ret = mux_task_start(mux);
1134
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6778 times.
6778 if (ret < 0)
1135 return ret;
1136 }
1137
1138 6778 return 0;
1139 }
1140
1141 7130 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1142 size_t data_threshold, int max_packets)
1143 {
1144 SchMux *mux;
1145 SchMuxStream *ms;
1146
1147
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7130 times.
7130 av_assert0(mux_idx < sch->nb_mux);
1148 7130 mux = &sch->mux[mux_idx];
1149
1150
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7130 times.
7130 av_assert0(stream_idx < mux->nb_streams);
1151 7130 ms = &mux->streams[stream_idx];
1152
1153 7130 ms->pre_mux_queue.max_packets = max_packets;
1154 7130 ms->pre_mux_queue.data_threshold = data_threshold;
1155 7130 }
1156
1157 7130 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1158 {
1159 SchMux *mux;
1160 7130 int ret = 0;
1161
1162
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7130 times.
7130 av_assert0(mux_idx < sch->nb_mux);
1163 7130 mux = &sch->mux[mux_idx];
1164
1165
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7130 times.
7130 av_assert0(stream_idx < mux->nb_streams);
1166
1167 7130 pthread_mutex_lock(&sch->mux_ready_lock);
1168
1169
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7130 times.
7130 av_assert0(mux->nb_streams_ready < mux->nb_streams);
1170
1171 // this may be called during initialization - do not start
1172 // threads before sch_start() is called
1173
2/2
✓ Branch 0 taken 6778 times.
✓ Branch 1 taken 352 times.
7130 if (++mux->nb_streams_ready == mux->nb_streams &&
1174
2/2
✓ Branch 0 taken 6330 times.
✓ Branch 1 taken 448 times.
6778 sch->state >= SCH_STATE_STARTED)
1175 6330 ret = mux_init(sch, mux);
1176
1177 7130 pthread_mutex_unlock(&sch->mux_ready_lock);
1178
1179 7130 return ret;
1180 }
1181
1182 1 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1183 unsigned dec_idx)
1184 {
1185 SchMux *mux;
1186 SchMuxStream *ms;
1187 1 int ret = 0;
1188
1189
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(mux_idx < sch->nb_mux);
1190 1 mux = &sch->mux[mux_idx];
1191
1192
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(stream_idx < mux->nb_streams);
1193 1 ms = &mux->streams[stream_idx];
1194
1195 1 ret = GROW_ARRAY(ms->sub_heartbeat_dst, ms->nb_sub_heartbeat_dst);
1196
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ret < 0)
1197 return ret;
1198
1199
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(dec_idx < sch->nb_dec);
1200 1 ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1201
1202
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (!mux->sub_heartbeat_pkt) {
1203 1 mux->sub_heartbeat_pkt = av_packet_alloc();
1204
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (!mux->sub_heartbeat_pkt)
1205 return AVERROR(ENOMEM);
1206 }
1207
1208 1 return 0;
1209 }
1210
1211 489987 static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
1212 {
1213 422149 while (1) {
1214 SchFilterGraph *fg;
1215
1216 // fed directly by a demuxer (i.e. not through a filtergraph)
1217
2/2
✓ Branch 0 taken 488048 times.
✓ Branch 1 taken 424088 times.
912136 if (src.type == SCH_NODE_TYPE_DEMUX) {
1218 488048 sch->demux[src.idx].waiter.choked_next = 0;
1219 488048 return;
1220 }
1221
1222
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 424088 times.
424088 av_assert0(src.type == SCH_NODE_TYPE_FILTER_OUT);
1223 424088 fg = &sch->filters[src.idx];
1224
1225 // the filtergraph contains internal sources and
1226 // requested to be scheduled directly
1227
2/2
✓ Branch 0 taken 1939 times.
✓ Branch 1 taken 422149 times.
424088 if (fg->best_input == fg->nb_inputs) {
1228 1939 fg->waiter.choked_next = 0;
1229 1939 return;
1230 }
1231
1232 422149 src = fg->inputs[fg->best_input].src_sched;
1233 }
1234 }
1235
1236 503038 static void schedule_update_locked(Scheduler *sch)
1237 {
1238 int64_t dts;
1239 503038 int have_unchoked = 0;
1240
1241 // on termination request all waiters are choked,
1242 // we are not to unchoke them
1243
2/2
✓ Branch 0 taken 2968 times.
✓ Branch 1 taken 500070 times.
503038 if (atomic_load(&sch->terminate))
1244 2968 return;
1245
1246 500070 dts = trailing_dts(sch, 0);
1247
1248 500070 atomic_store(&sch->last_dts, dts);
1249
1250 // initialize our internal state
1251
2/2
✓ Branch 0 taken 1000140 times.
✓ Branch 1 taken 500070 times.
1500210 for (unsigned type = 0; type < 2; type++)
1252
4/4
✓ Branch 0 taken 949415 times.
✓ Branch 1 taken 1003541 times.
✓ Branch 2 taken 952816 times.
✓ Branch 3 taken 1000140 times.
1952956 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1253
2/2
✓ Branch 0 taken 449345 times.
✓ Branch 1 taken 503471 times.
952816 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1254 952816 w->choked_prev = atomic_load(&w->choked);
1255 952816 w->choked_next = 1;
1256 }
1257
1258 // figure out the sources that are allowed to proceed
1259
2/2
✓ Branch 0 taken 500336 times.
✓ Branch 1 taken 500070 times.
1000406 for (unsigned i = 0; i < sch->nb_mux; i++) {
1260 500336 SchMux *mux = &sch->mux[i];
1261
1262
2/2
✓ Branch 0 taken 543124 times.
✓ Branch 1 taken 500336 times.
1043460 for (unsigned j = 0; j < mux->nb_streams; j++) {
1263 543124 SchMuxStream *ms = &mux->streams[j];
1264
1265 // unblock sources for output streams that are not finished
1266 // and not too far ahead of the trailing stream
1267
2/2
✓ Branch 0 taken 30469 times.
✓ Branch 1 taken 512655 times.
543124 if (ms->source_finished)
1268 30469 continue;
1269
4/4
✓ Branch 0 taken 21667 times.
✓ Branch 1 taken 490988 times.
✓ Branch 2 taken 5365 times.
✓ Branch 3 taken 16302 times.
512655 if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1270 5365 continue;
1271
4/4
✓ Branch 0 taken 490988 times.
✓ Branch 1 taken 16302 times.
✓ Branch 2 taken 17303 times.
✓ Branch 3 taken 473685 times.
507290 if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1272 17303 continue;
1273
1274 // resolve the source to unchoke
1275 489987 unchoke_for_stream(sch, ms->src_sched);
1276 489987 have_unchoked = 1;
1277 }
1278 }
1279
1280 // make sure to unchoke at least one source, if still available
1281
4/4
✓ Branch 0 taken 55285 times.
✓ Branch 1 taken 486915 times.
✓ Branch 2 taken 42130 times.
✓ Branch 3 taken 13155 times.
542200 for (unsigned type = 0; !have_unchoked && type < 2; type++)
1282
4/4
✓ Branch 0 taken 30132 times.
✓ Branch 1 taken 42349 times.
✓ Branch 2 taken 41804 times.
✓ Branch 3 taken 30677 times.
72481 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1283
2/2
✓ Branch 0 taken 16977 times.
✓ Branch 1 taken 24827 times.
41804 int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1284
2/2
✓ Branch 0 taken 16977 times.
✓ Branch 1 taken 24827 times.
41804 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1285
2/2
✓ Branch 0 taken 11453 times.
✓ Branch 1 taken 30351 times.
41804 if (!exited) {
1286 11453 w->choked_next = 0;
1287 11453 have_unchoked = 1;
1288 11453 break;
1289 }
1290 }
1291
1292
1293
2/2
✓ Branch 0 taken 1000140 times.
✓ Branch 1 taken 500070 times.
1500210 for (unsigned type = 0; type < 2; type++)
1294
4/4
✓ Branch 0 taken 949415 times.
✓ Branch 1 taken 1003541 times.
✓ Branch 2 taken 952816 times.
✓ Branch 3 taken 1000140 times.
1952956 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1295
2/2
✓ Branch 0 taken 449345 times.
✓ Branch 1 taken 503471 times.
952816 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1296
2/2
✓ Branch 0 taken 17335 times.
✓ Branch 1 taken 935481 times.
952816 if (w->choked_prev != w->choked_next)
1297 17335 waiter_set(w, w->choked_next);
1298 }
1299
1300 }
1301
1302 enum {
1303 CYCLE_NODE_NEW = 0,
1304 CYCLE_NODE_STARTED,
1305 CYCLE_NODE_DONE,
1306 };
1307
1308 static int
1309 6397 check_acyclic_for_output(const Scheduler *sch, SchedulerNode src,
1310 uint8_t *filters_visited, SchedulerNode *filters_stack)
1311 {
1312 6397 unsigned nb_filters_stack = 0;
1313
1314 6397 memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1315
1316 6413 while (1) {
1317 12810 const SchFilterGraph *fg = &sch->filters[src.idx];
1318
1319 12810 filters_visited[src.idx] = CYCLE_NODE_STARTED;
1320
1321 // descend into every input, depth first
1322
2/2
✓ Branch 0 taken 6412 times.
✓ Branch 1 taken 6398 times.
12810 if (src.idx_stream < fg->nb_inputs) {
1323 6412 const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1324
1325 // connected to demuxer, no cycles possible
1326
2/2
✓ Branch 0 taken 6411 times.
✓ Branch 1 taken 1 times.
6412 if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX)
1327 6412 continue;
1328
1329 // otherwise connected to another filtergraph
1330
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(fi->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
1331
1332 // found a cycle
1333
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED)
1334 return AVERROR(EINVAL);
1335
1336 // place current position on stack and descend
1337
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 av_assert0(nb_filters_stack < sch->nb_filters);
1338 1 filters_stack[nb_filters_stack++] = src;
1339 1 src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 };
1340 1 continue;
1341 }
1342
1343 6398 filters_visited[src.idx] = CYCLE_NODE_DONE;
1344
1345 // previous search finished,
1346
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6397 times.
6398 if (nb_filters_stack) {
1347 1 src = filters_stack[--nb_filters_stack];
1348 1 continue;
1349 }
1350 6397 return 0;
1351 }
1352 }
1353
1354 6776 static int check_acyclic(Scheduler *sch)
1355 {
1356 6776 uint8_t *filters_visited = NULL;
1357 6776 SchedulerNode *filters_stack = NULL;
1358
1359 6776 int ret = 0;
1360
1361
2/2
✓ Branch 0 taken 483 times.
✓ Branch 1 taken 6293 times.
6776 if (!sch->nb_filters)
1362 483 return 0;
1363
1364 6293 filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1365
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6293 times.
6293 if (!filters_visited)
1366 return AVERROR(ENOMEM);
1367
1368 6293 filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1369
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6293 times.
6293 if (!filters_stack) {
1370 ret = AVERROR(ENOMEM);
1371 goto fail;
1372 }
1373
1374 // trace the transcoding graph upstream from every filtegraph
1375
2/2
✓ Branch 0 taken 6397 times.
✓ Branch 1 taken 6293 times.
12690 for (unsigned i = 0; i < sch->nb_filters; i++) {
1376 6397 ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1377 filters_visited, filters_stack);
1378
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6397 times.
6397 if (ret < 0) {
1379 av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1380 goto fail;
1381 }
1382 }
1383
1384 6293 fail:
1385 6293 av_freep(&filters_visited);
1386 6293 av_freep(&filters_stack);
1387 6293 return ret;
1388 }
1389
1390 6776 static int start_prepare(Scheduler *sch)
1391 {
1392 int ret;
1393
1394
2/2
✓ Branch 0 taken 6807 times.
✓ Branch 1 taken 6776 times.
13583 for (unsigned i = 0; i < sch->nb_demux; i++) {
1395 6807 SchDemux *d = &sch->demux[i];
1396
1397
2/2
✓ Branch 0 taken 7024 times.
✓ Branch 1 taken 6807 times.
13831 for (unsigned j = 0; j < d->nb_streams; j++) {
1398 7024 SchDemuxStream *ds = &d->streams[j];
1399
1400
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7024 times.
7024 if (!ds->nb_dst) {
1401 av_log(d, AV_LOG_ERROR,
1402 "Demuxer stream %u not connected to any sink\n", j);
1403 return AVERROR(EINVAL);
1404 }
1405
1406 7024 ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1407
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7024 times.
7024 if (!ds->dst_finished)
1408 return AVERROR(ENOMEM);
1409 }
1410 }
1411
1412
2/2
✓ Branch 0 taken 6403 times.
✓ Branch 1 taken 6776 times.
13179 for (unsigned i = 0; i < sch->nb_dec; i++) {
1413 6403 SchDec *dec = &sch->dec[i];
1414
1415
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6403 times.
6403 if (!dec->src.type) {
1416 av_log(dec, AV_LOG_ERROR,
1417 "Decoder not connected to a source\n");
1418 return AVERROR(EINVAL);
1419 }
1420
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6403 times.
6403 if (!dec->nb_dst) {
1421 av_log(dec, AV_LOG_ERROR,
1422 "Decoder not connected to any sink\n");
1423 return AVERROR(EINVAL);
1424 }
1425
1426 6403 dec->dst_finished = av_calloc(dec->nb_dst, sizeof(*dec->dst_finished));
1427
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6403 times.
6403 if (!dec->dst_finished)
1428 return AVERROR(ENOMEM);
1429 }
1430
1431
2/2
✓ Branch 0 taken 6485 times.
✓ Branch 1 taken 6776 times.
13261 for (unsigned i = 0; i < sch->nb_enc; i++) {
1432 6485 SchEnc *enc = &sch->enc[i];
1433
1434
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6485 times.
6485 if (!enc->src.type) {
1435 av_log(enc, AV_LOG_ERROR,
1436 "Encoder not connected to a source\n");
1437 return AVERROR(EINVAL);
1438 }
1439
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6485 times.
6485 if (!enc->nb_dst) {
1440 av_log(enc, AV_LOG_ERROR,
1441 "Encoder not connected to any sink\n");
1442 return AVERROR(EINVAL);
1443 }
1444
1445 6485 enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1446
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6485 times.
6485 if (!enc->dst_finished)
1447 return AVERROR(ENOMEM);
1448 }
1449
1450
2/2
✓ Branch 0 taken 6778 times.
✓ Branch 1 taken 6776 times.
13554 for (unsigned i = 0; i < sch->nb_mux; i++) {
1451 6778 SchMux *mux = &sch->mux[i];
1452
1453
2/2
✓ Branch 0 taken 7130 times.
✓ Branch 1 taken 6778 times.
13908 for (unsigned j = 0; j < mux->nb_streams; j++) {
1454 7130 SchMuxStream *ms = &mux->streams[j];
1455
1456
2/3
✓ Branch 0 taken 6485 times.
✓ Branch 1 taken 645 times.
✗ Branch 2 not taken.
7130 switch (ms->src.type) {
1457 6485 case SCH_NODE_TYPE_ENC: {
1458 6485 SchEnc *enc = &sch->enc[ms->src.idx];
1459
2/2
✓ Branch 0 taken 38 times.
✓ Branch 1 taken 6447 times.
6485 if (enc->src.type == SCH_NODE_TYPE_DEC) {
1460 38 ms->src_sched = sch->dec[enc->src.idx].src;
1461
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 av_assert0(ms->src_sched.type == SCH_NODE_TYPE_DEMUX);
1462 } else {
1463 6447 ms->src_sched = enc->src;
1464
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6447 times.
6447 av_assert0(ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
1465 }
1466 6485 break;
1467 }
1468 645 case SCH_NODE_TYPE_DEMUX:
1469 645 ms->src_sched = ms->src;
1470 645 break;
1471 default:
1472 av_log(mux, AV_LOG_ERROR,
1473 "Muxer stream #%u not connected to a source\n", j);
1474 return AVERROR(EINVAL);
1475 }
1476 }
1477
1478 6778 ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1479 QUEUE_PACKETS);
1480
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6778 times.
6778 if (ret < 0)
1481 return ret;
1482 }
1483
1484
2/2
✓ Branch 0 taken 6397 times.
✓ Branch 1 taken 6776 times.
13173 for (unsigned i = 0; i < sch->nb_filters; i++) {
1485 6397 SchFilterGraph *fg = &sch->filters[i];
1486
1487
2/2
✓ Branch 0 taken 6411 times.
✓ Branch 1 taken 6397 times.
12808 for (unsigned j = 0; j < fg->nb_inputs; j++) {
1488 6411 SchFilterIn *fi = &fg->inputs[j];
1489 SchDec *dec;
1490
1491
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6411 times.
6411 if (!fi->src.type) {
1492 av_log(fg, AV_LOG_ERROR,
1493 "Filtergraph input %u not connected to a source\n", j);
1494 return AVERROR(EINVAL);
1495 }
1496
1497
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6411 times.
6411 if (fi->src.type == SCH_NODE_TYPE_FILTER_OUT)
1498 fi->src_sched = fi->src;
1499 else {
1500
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6411 times.
6411 av_assert0(fi->src.type == SCH_NODE_TYPE_DEC);
1501 6411 dec = &sch->dec[fi->src.idx];
1502
1503
2/3
✓ Branch 0 taken 6410 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
6411 switch (dec->src.type) {
1504 6410 case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break;
1505 1 case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break;
1506 default: av_assert0(0);
1507 }
1508 }
1509 }
1510
1511
2/2
✓ Branch 0 taken 6447 times.
✓ Branch 1 taken 6397 times.
12844 for (unsigned j = 0; j < fg->nb_outputs; j++) {
1512 6447 SchFilterOut *fo = &fg->outputs[j];
1513
1514
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6447 times.
6447 if (!fo->dst.type) {
1515 av_log(fg, AV_LOG_ERROR,
1516 "Filtergraph %u output %u not connected to a sink\n", i, j);
1517 return AVERROR(EINVAL);
1518 }
1519 }
1520 }
1521
1522 // Check that the transcoding graph has no cycles.
1523 6776 ret = check_acyclic(sch);
1524
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6776 times.
6776 if (ret < 0)
1525 return ret;
1526
1527 6776 return 0;
1528 }
1529
1530 6776 int sch_start(Scheduler *sch)
1531 {
1532 int ret;
1533
1534 6776 ret = start_prepare(sch);
1535
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6776 times.
6776 if (ret < 0)
1536 return ret;
1537
1538
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6776 times.
6776 av_assert0(sch->state == SCH_STATE_UNINIT);
1539 6776 sch->state = SCH_STATE_STARTED;
1540
1541
2/2
✓ Branch 0 taken 6778 times.
✓ Branch 1 taken 6776 times.
13554 for (unsigned i = 0; i < sch->nb_mux; i++) {
1542 6778 SchMux *mux = &sch->mux[i];
1543
1544
2/2
✓ Branch 0 taken 448 times.
✓ Branch 1 taken 6330 times.
6778 if (mux->nb_streams_ready == mux->nb_streams) {
1545 448 ret = mux_init(sch, mux);
1546
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 448 times.
448 if (ret < 0)
1547 goto fail;
1548 }
1549 }
1550
1551
2/2
✓ Branch 0 taken 6485 times.
✓ Branch 1 taken 6776 times.
13261 for (unsigned i = 0; i < sch->nb_enc; i++) {
1552 6485 SchEnc *enc = &sch->enc[i];
1553
1554 6485 ret = task_start(&enc->task);
1555
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6485 times.
6485 if (ret < 0)
1556 goto fail;
1557 }
1558
1559
2/2
✓ Branch 0 taken 6397 times.
✓ Branch 1 taken 6776 times.
13173 for (unsigned i = 0; i < sch->nb_filters; i++) {
1560 6397 SchFilterGraph *fg = &sch->filters[i];
1561
1562 6397 ret = task_start(&fg->task);
1563
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6397 times.
6397 if (ret < 0)
1564 goto fail;
1565 }
1566
1567
2/2
✓ Branch 0 taken 6403 times.
✓ Branch 1 taken 6776 times.
13179 for (unsigned i = 0; i < sch->nb_dec; i++) {
1568 6403 SchDec *dec = &sch->dec[i];
1569
1570 6403 ret = task_start(&dec->task);
1571
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6403 times.
6403 if (ret < 0)
1572 goto fail;
1573 }
1574
1575
2/2
✓ Branch 0 taken 6807 times.
✓ Branch 1 taken 6776 times.
13583 for (unsigned i = 0; i < sch->nb_demux; i++) {
1576 6807 SchDemux *d = &sch->demux[i];
1577
1578
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 6793 times.
6807 if (!d->nb_streams)
1579 14 continue;
1580
1581 6793 ret = task_start(&d->task);
1582
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6793 times.
6793 if (ret < 0)
1583 goto fail;
1584 }
1585
1586 6776 pthread_mutex_lock(&sch->schedule_lock);
1587 6776 schedule_update_locked(sch);
1588 6776 pthread_mutex_unlock(&sch->schedule_lock);
1589
1590 6776 return 0;
1591 fail:
1592 sch_stop(sch, NULL);
1593 return ret;
1594 }
1595
1596 20982 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1597 {
1598 int ret, err;
1599
1600 // convert delay to absolute timestamp
1601 20982 timeout_us += av_gettime();
1602
1603 20982 pthread_mutex_lock(&sch->mux_done_lock);
1604
1605
1/2
✓ Branch 0 taken 20982 times.
✗ Branch 1 not taken.
20982 if (sch->nb_mux_done < sch->nb_mux) {
1606 20982 struct timespec tv = { .tv_sec = timeout_us / 1000000,
1607 20982 .tv_nsec = (timeout_us % 1000000) * 1000 };
1608 20982 pthread_cond_timedwait(&sch->mux_done_cond, &sch->mux_done_lock, &tv);
1609 }
1610
1611 20982 ret = sch->nb_mux_done == sch->nb_mux;
1612
1613 20982 pthread_mutex_unlock(&sch->mux_done_lock);
1614
1615 20982 *transcode_ts = atomic_load(&sch->last_dts);
1616
1617 // abort transcoding if any task failed
1618 20982 err = atomic_load(&sch->task_failed);
1619
1620
3/4
✓ Branch 0 taken 14206 times.
✓ Branch 1 taken 6776 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 14206 times.
20982 return ret || err;
1621 }
1622
1623 6447 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1624 {
1625 int ret;
1626
1627 6447 ret = enc->open_cb(enc->task.func_arg, frame);
1628
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6447 times.
6447 if (ret < 0)
1629 return ret;
1630
1631 // ret>0 signals audio frame size, which means sync queue must
1632 // have been enabled during encoder creation
1633
2/2
✓ Branch 0 taken 156 times.
✓ Branch 1 taken 6291 times.
6447 if (ret > 0) {
1634 SchSyncQueue *sq;
1635
1636
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 156 times.
156 av_assert0(enc->sq_idx[0] >= 0);
1637 156 sq = &sch->sq_enc[enc->sq_idx[0]];
1638
1639 156 pthread_mutex_lock(&sq->lock);
1640
1641 156 sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1642
1643 156 pthread_mutex_unlock(&sq->lock);
1644 }
1645
1646 6447 return 0;
1647 }
1648
1649 426167 static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1650 {
1651 int ret;
1652
1653
2/2
✓ Branch 0 taken 13105 times.
✓ Branch 1 taken 413062 times.
426167 if (!frame) {
1654 13105 tq_send_finish(enc->queue, 0);
1655 13105 return 0;
1656 }
1657
1658
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 413062 times.
413062 if (enc->in_finished)
1659 return AVERROR_EOF;
1660
1661 413062 ret = tq_send(enc->queue, 0, frame);
1662
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 413061 times.
413062 if (ret < 0)
1663 1 enc->in_finished = 1;
1664
1665 413062 return ret;
1666 }
1667
1668 33972 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1669 {
1670 33972 SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1671 33972 int ret = 0;
1672
1673 // inform the scheduling code that no more input will arrive along this path;
1674 // this is necessary because the sync queue may not send an EOF downstream
1675 // until other streams finish
1676 // TODO: consider a cleaner way of passing this information through
1677 // the pipeline
1678
2/2
✓ Branch 0 taken 2990 times.
✓ Branch 1 taken 30982 times.
33972 if (!frame) {
1679
2/2
✓ Branch 0 taken 2990 times.
✓ Branch 1 taken 2990 times.
5980 for (unsigned i = 0; i < enc->nb_dst; i++) {
1680 SchMux *mux;
1681 SchMuxStream *ms;
1682
1683
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2990 times.
2990 if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1684 continue;
1685
1686 2990 mux = &sch->mux[enc->dst[i].idx];
1687 2990 ms = &mux->streams[enc->dst[i].idx_stream];
1688
1689 2990 pthread_mutex_lock(&sch->schedule_lock);
1690
1691 2990 ms->source_finished = 1;
1692 2990 schedule_update_locked(sch);
1693
1694 2990 pthread_mutex_unlock(&sch->schedule_lock);
1695 }
1696 }
1697
1698 33972 pthread_mutex_lock(&sq->lock);
1699
1700 33972 ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1701
2/2
✓ Branch 0 taken 33970 times.
✓ Branch 1 taken 2 times.
33972 if (ret < 0)
1702 2 goto finish;
1703
1704 81837 while (1) {
1705 SchEnc *enc;
1706
1707 // TODO: the SQ API should be extended to allow returning EOF
1708 // for individual streams
1709 115807 ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1710
2/2
✓ Branch 0 taken 33970 times.
✓ Branch 1 taken 81837 times.
115807 if (ret < 0) {
1711
2/2
✓ Branch 0 taken 5582 times.
✓ Branch 1 taken 28388 times.
33970 ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1712 33970 break;
1713 }
1714
1715 81837 enc = &sch->enc[sq->enc_idx[ret]];
1716 81837 ret = send_to_enc_thread(sch, enc, sq->frame);
1717
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 81837 times.
81837 if (ret < 0) {
1718 av_frame_unref(sq->frame);
1719 if (ret != AVERROR_EOF)
1720 break;
1721
1722 sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1723 continue;
1724 }
1725 }
1726
1727
2/2
✓ Branch 0 taken 28388 times.
✓ Branch 1 taken 5582 times.
33970 if (ret < 0) {
1728 // close all encoders fed from this sync queue
1729
2/2
✓ Branch 0 taken 5816 times.
✓ Branch 1 taken 5582 times.
11398 for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1730 5816 int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1731
1732 // if the sync queue error is EOF and closing the encoder
1733 // produces a more serious error, make sure to pick the latter
1734
2/4
✓ Branch 0 taken 5816 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 5816 times.
✗ Branch 3 not taken.
5816 ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1735 }
1736 }
1737
1738 33970 finish:
1739 33972 pthread_mutex_unlock(&sq->lock);
1740
1741 33972 return ret;
1742 }
1743
1744 372489 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1745 {
1746
6/6
✓ Branch 0 taken 371572 times.
✓ Branch 1 taken 917 times.
✓ Branch 2 taken 361331 times.
✓ Branch 3 taken 10241 times.
✓ Branch 4 taken 6447 times.
✓ Branch 5 taken 354884 times.
372489 if (enc->open_cb && frame && !enc->opened) {
1747 6447 int ret = enc_open(sch, enc, frame);
1748
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6447 times.
6447 if (ret < 0)
1749 return ret;
1750 6447 enc->opened = 1;
1751
1752 // discard empty frames that only carry encoder init parameters
1753
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 6444 times.
6447 if (!frame->buf[0]) {
1754 3 av_frame_unref(frame);
1755 3 return 0;
1756 }
1757 }
1758
1759 372486 return (enc->sq_idx[0] >= 0) ?
1760
2/2
✓ Branch 0 taken 33972 times.
✓ Branch 1 taken 338514 times.
711000 send_to_enc_sq (sch, enc, frame) :
1761 338514 send_to_enc_thread(sch, enc, frame);
1762 }
1763
1764 3094 static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
1765 {
1766 3094 PreMuxQueue *q = &ms->pre_mux_queue;
1767 3094 AVPacket *tmp_pkt = NULL;
1768 int ret;
1769
1770
2/2
✓ Branch 1 taken 139 times.
✓ Branch 2 taken 2955 times.
3094 if (!av_fifo_can_write(q->fifo)) {
1771 139 size_t packets = av_fifo_can_read(q->fifo);
1772
1/2
✓ Branch 0 taken 139 times.
✗ Branch 1 not taken.
139 size_t pkt_size = pkt ? pkt->size : 0;
1773 139 int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1774
2/2
✓ Branch 0 taken 134 times.
✓ Branch 1 taken 5 times.
139 size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1775 139 size_t new_size = FFMIN(2 * packets, max_packets);
1776
1777
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 139 times.
139 if (new_size <= packets) {
1778 av_log(mux, AV_LOG_ERROR,
1779 "Too many packets buffered for output stream.\n");
1780 return AVERROR(ENOSPC);
1781 }
1782 139 ret = av_fifo_grow2(q->fifo, new_size - packets);
1783
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 139 times.
139 if (ret < 0)
1784 return ret;
1785 }
1786
1787
2/2
✓ Branch 0 taken 3038 times.
✓ Branch 1 taken 56 times.
3094 if (pkt) {
1788 3038 tmp_pkt = av_packet_alloc();
1789
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3038 times.
3038 if (!tmp_pkt)
1790 return AVERROR(ENOMEM);
1791
1792 3038 av_packet_move_ref(tmp_pkt, pkt);
1793 3038 q->data_size += tmp_pkt->size;
1794 }
1795 3094 av_fifo_write(q->fifo, &tmp_pkt, 1);
1796
1797 3094 return 0;
1798 }
1799
1800 481441 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1801 AVPacket *pkt)
1802 {
1803 481441 SchMuxStream *ms = &mux->streams[stream_idx];
1804
2/2
✓ Branch 0 taken 465183 times.
✓ Branch 1 taken 9128 times.
474311 int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1805
2/2
✓ Branch 0 taken 474311 times.
✓ Branch 1 taken 7130 times.
955752 av_rescale_q(pkt->dts + pkt->duration, pkt->time_base, AV_TIME_BASE_Q) :
1806 AV_NOPTS_VALUE;
1807
1808 // queue the packet if the muxer cannot be started yet
1809
2/2
✓ Branch 0 taken 3125 times.
✓ Branch 1 taken 478316 times.
481441 if (!atomic_load(&mux->mux_started)) {
1810 3125 int queued = 0;
1811
1812 // the muxer could have started between the above atomic check and
1813 // locking the mutex, then this block falls through to normal send path
1814 3125 pthread_mutex_lock(&sch->mux_ready_lock);
1815
1816
2/2
✓ Branch 0 taken 3094 times.
✓ Branch 1 taken 31 times.
3125 if (!atomic_load(&mux->mux_started)) {
1817 3094 int ret = mux_queue_packet(mux, ms, pkt);
1818
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3094 times.
3094 queued = ret < 0 ? ret : 1;
1819 }
1820
1821 3125 pthread_mutex_unlock(&sch->mux_ready_lock);
1822
1823
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3125 times.
3125 if (queued < 0)
1824 return queued;
1825
2/2
✓ Branch 0 taken 3094 times.
✓ Branch 1 taken 31 times.
3125 else if (queued)
1826 3094 goto update_schedule;
1827 }
1828
1829
2/2
✓ Branch 0 taken 471273 times.
✓ Branch 1 taken 7074 times.
478347 if (pkt) {
1830 int ret;
1831
1832
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 471273 times.
471273 if (ms->init_eof)
1833 return AVERROR_EOF;
1834
1835 471273 ret = tq_send(mux->queue, stream_idx, pkt);
1836
2/2
✓ Branch 0 taken 57 times.
✓ Branch 1 taken 471216 times.
471273 if (ret < 0)
1837 57 return ret;
1838 } else
1839 7074 tq_send_finish(mux->queue, stream_idx);
1840
1841 481384 update_schedule:
1842 // TODO: use atomics to check whether this changes trailing dts
1843 // to avoid locking unnecesarily
1844
4/4
✓ Branch 0 taken 16258 times.
✓ Branch 1 taken 465126 times.
✓ Branch 2 taken 7130 times.
✓ Branch 3 taken 9128 times.
481384 if (dts != AV_NOPTS_VALUE || !pkt) {
1845 472256 pthread_mutex_lock(&sch->schedule_lock);
1846
1847
2/2
✓ Branch 0 taken 465126 times.
✓ Branch 1 taken 7130 times.
472256 if (pkt) ms->last_dts = dts;
1848 7130 else ms->source_finished = 1;
1849
1850 472256 schedule_update_locked(sch);
1851
1852 472256 pthread_mutex_unlock(&sch->schedule_lock);
1853 }
1854
1855 481384 return 0;
1856 }
1857
1858 static int
1859 466558 demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst,
1860 uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
1861 {
1862 int ret;
1863
1864
2/2
✓ Branch 0 taken 3115 times.
✓ Branch 1 taken 463443 times.
466558 if (*dst_finished)
1865 3115 return AVERROR_EOF;
1866
1867
4/4
✓ Branch 0 taken 459511 times.
✓ Branch 1 taken 3932 times.
✓ Branch 2 taken 67429 times.
✓ Branch 3 taken 392082 times.
463443 if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
1868
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 67427 times.
67429 (flags & DEMUX_SEND_STREAMCOPY_EOF)) {
1869 2 av_packet_unref(pkt);
1870 2 pkt = NULL;
1871 }
1872
1873
2/2
✓ Branch 0 taken 3934 times.
✓ Branch 1 taken 459509 times.
463443 if (!pkt)
1874 3934 goto finish;
1875
1876 919018 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
1877
2/2
✓ Branch 0 taken 67427 times.
✓ Branch 1 taken 392082 times.
459509 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
1878 392082 tq_send(sch->dec[dst.idx].queue, 0, pkt);
1879
2/2
✓ Branch 0 taken 3113 times.
✓ Branch 1 taken 456396 times.
459509 if (ret == AVERROR_EOF)
1880 3113 goto finish;
1881
1882 456396 return ret;
1883
1884 7047 finish:
1885
2/2
✓ Branch 0 taken 645 times.
✓ Branch 1 taken 6402 times.
7047 if (dst.type == SCH_NODE_TYPE_MUX)
1886 645 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
1887 else
1888 6402 tq_send_finish(sch->dec[dst.idx].queue, 0);
1889
1890 7047 *dst_finished = 1;
1891 7047 return AVERROR_EOF;
1892 }
1893
1894 466097 static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds,
1895 AVPacket *pkt, unsigned flags)
1896 {
1897 466097 unsigned nb_done = 0;
1898
1899
2/2
✓ Branch 0 taken 466558 times.
✓ Branch 1 taken 466097 times.
932655 for (unsigned i = 0; i < ds->nb_dst; i++) {
1900 466558 AVPacket *to_send = pkt;
1901 466558 uint8_t *finished = &ds->dst_finished[i];
1902
1903 int ret;
1904
1905 // sending a packet consumes it, so make a temporary reference if needed
1906
4/4
✓ Branch 0 taken 459511 times.
✓ Branch 1 taken 7047 times.
✓ Branch 2 taken 438 times.
✓ Branch 3 taken 459073 times.
466558 if (pkt && i < ds->nb_dst - 1) {
1907 438 to_send = d->send_pkt;
1908
1909 438 ret = av_packet_ref(to_send, pkt);
1910
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 438 times.
438 if (ret < 0)
1911 return ret;
1912 }
1913
1914 466558 ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
1915
2/2
✓ Branch 0 taken 459511 times.
✓ Branch 1 taken 7047 times.
466558 if (to_send)
1916 459511 av_packet_unref(to_send);
1917
2/2
✓ Branch 0 taken 10162 times.
✓ Branch 1 taken 456396 times.
466558 if (ret == AVERROR_EOF)
1918 10162 nb_done++;
1919
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 456396 times.
456396 else if (ret < 0)
1920 return ret;
1921 }
1922
1923
2/2
✓ Branch 0 taken 10138 times.
✓ Branch 1 taken 455959 times.
466097 return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
1924 }
1925
1926 11 static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
1927 {
1928 11 Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
1929
1930
3/6
✓ Branch 0 taken 11 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 11 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 11 times.
11 av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
1931
1932
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 11 times.
25 for (unsigned i = 0; i < d->nb_streams; i++) {
1933 14 SchDemuxStream *ds = &d->streams[i];
1934
1935
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 14 times.
28 for (unsigned j = 0; j < ds->nb_dst; j++) {
1936 14 const SchedulerNode *dst = &ds->dst[j];
1937 SchDec *dec;
1938 int ret;
1939
1940
3/4
✓ Branch 0 taken 14 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
✓ Branch 3 taken 6 times.
14 if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
1941 8 continue;
1942
1943 6 dec = &sch->dec[dst->idx];
1944
1945 6 ret = tq_send(dec->queue, 0, pkt);
1946
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (ret < 0)
1947 return ret;
1948
1949
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 3 times.
6 if (dec->queue_end_ts) {
1950 Timestamp ts;
1951 3 ret = av_thread_message_queue_recv(dec->queue_end_ts, &ts, 0);
1952
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret < 0)
1953 return ret;
1954
1955
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (max_end_ts.ts == AV_NOPTS_VALUE ||
1956 (ts.ts != AV_NOPTS_VALUE &&
1957 av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
1958 3 max_end_ts = ts;
1959
1960 }
1961 }
1962 }
1963
1964 11 pkt->pts = max_end_ts.ts;
1965 11 pkt->time_base = max_end_ts.tb;
1966
1967 11 return 0;
1968 }
1969
1970 459147 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
1971 unsigned flags)
1972 {
1973 SchDemux *d;
1974 int terminate;
1975
1976
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 459147 times.
459147 av_assert0(demux_idx < sch->nb_demux);
1977 459147 d = &sch->demux[demux_idx];
1978
1979 459147 terminate = waiter_wait(sch, &d->waiter);
1980
2/2
✓ Branch 0 taken 63 times.
✓ Branch 1 taken 459084 times.
459147 if (terminate)
1981 63 return AVERROR_EXIT;
1982
1983 // flush the downstreams after seek
1984
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 459073 times.
459084 if (pkt->stream_index == -1)
1985 11 return demux_flush(sch, d, pkt);
1986
1987
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 459073 times.
459073 av_assert0(pkt->stream_index < d->nb_streams);
1988
1989 459073 return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
1990 }
1991
1992 6807 static int demux_done(Scheduler *sch, unsigned demux_idx)
1993 {
1994 6807 SchDemux *d = &sch->demux[demux_idx];
1995 6807 int ret = 0;
1996
1997
2/2
✓ Branch 0 taken 7024 times.
✓ Branch 1 taken 6807 times.
13831 for (unsigned i = 0; i < d->nb_streams; i++) {
1998 7024 int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
1999
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7024 times.
7024 if (err != AVERROR_EOF)
2000 ret = err_merge(ret, err);
2001 }
2002
2003 6807 pthread_mutex_lock(&sch->schedule_lock);
2004
2005 6807 d->task_exited = 1;
2006
2007 6807 schedule_update_locked(sch);
2008
2009 6807 pthread_mutex_unlock(&sch->schedule_lock);
2010
2011 6807 return ret;
2012 }
2013
2014 487881 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2015 {
2016 SchMux *mux;
2017 int ret, stream_idx;
2018
2019
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 487881 times.
487881 av_assert0(mux_idx < sch->nb_mux);
2020 487881 mux = &sch->mux[mux_idx];
2021
2022 487881 ret = tq_receive(mux->queue, &stream_idx, pkt);
2023 487881 pkt->stream_index = stream_idx;
2024 487881 return ret;
2025 }
2026
2027 198 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2028 {
2029 SchMux *mux;
2030
2031
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 198 times.
198 av_assert0(mux_idx < sch->nb_mux);
2032 198 mux = &sch->mux[mux_idx];
2033
2034
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 198 times.
198 av_assert0(stream_idx < mux->nb_streams);
2035 198 tq_receive_finish(mux->queue, stream_idx);
2036
2037 198 pthread_mutex_lock(&sch->schedule_lock);
2038 198 mux->streams[stream_idx].source_finished = 1;
2039
2040 198 schedule_update_locked(sch);
2041
2042 198 pthread_mutex_unlock(&sch->schedule_lock);
2043 198 }
2044
2045 437603 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2046 const AVPacket *pkt)
2047 {
2048 SchMux *mux;
2049 SchMuxStream *ms;
2050
2051
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 437603 times.
437603 av_assert0(mux_idx < sch->nb_mux);
2052 437603 mux = &sch->mux[mux_idx];
2053
2054
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 437603 times.
437603 av_assert0(stream_idx < mux->nb_streams);
2055 437603 ms = &mux->streams[stream_idx];
2056
2057
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 437603 times.
437608 for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2058 5 SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2059 int ret;
2060
2061 5 ret = av_packet_copy_props(mux->sub_heartbeat_pkt, pkt);
2062
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
5 if (ret < 0)
2063 return ret;
2064
2065 5 tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2066 }
2067
2068 437603 return 0;
2069 }
2070
2071 6778 static int mux_done(Scheduler *sch, unsigned mux_idx)
2072 {
2073 6778 SchMux *mux = &sch->mux[mux_idx];
2074
2075 6778 pthread_mutex_lock(&sch->schedule_lock);
2076
2077
2/2
✓ Branch 0 taken 7130 times.
✓ Branch 1 taken 6778 times.
13908 for (unsigned i = 0; i < mux->nb_streams; i++) {
2078 7130 tq_receive_finish(mux->queue, i);
2079 7130 mux->streams[i].source_finished = 1;
2080 }
2081
2082 6778 schedule_update_locked(sch);
2083
2084 6778 pthread_mutex_unlock(&sch->schedule_lock);
2085
2086 6778 pthread_mutex_lock(&sch->mux_done_lock);
2087
2088
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6778 times.
6778 av_assert0(sch->nb_mux_done < sch->nb_mux);
2089 6778 sch->nb_mux_done++;
2090
2091 6778 pthread_cond_signal(&sch->mux_done_cond);
2092
2093 6778 pthread_mutex_unlock(&sch->mux_done_lock);
2094
2095 6778 return 0;
2096 }
2097
2098 368513 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2099 {
2100 SchDec *dec;
2101 int ret, dummy;
2102
2103
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 368513 times.
368513 av_assert0(dec_idx < sch->nb_dec);
2104 368513 dec = &sch->dec[dec_idx];
2105
2106 // the decoder should have given us post-flush end timestamp in pkt
2107
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 368510 times.
368513 if (dec->expect_end_ts) {
2108 3 Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2109 3 ret = av_thread_message_queue_send(dec->queue_end_ts, &ts, 0);
2110
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret < 0)
2111 return ret;
2112
2113 3 dec->expect_end_ts = 0;
2114 }
2115
2116 368513 ret = tq_receive(dec->queue, &dummy, pkt);
2117
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 368513 times.
368513 av_assert0(dummy <= 0);
2118
2119 // got a flush packet, on the next call to this function the decoder
2120 // will give us post-flush end timestamp
2121
7/8
✓ Branch 0 taken 365238 times.
✓ Branch 1 taken 3275 times.
✓ Branch 2 taken 958 times.
✓ Branch 3 taken 364280 times.
✓ Branch 4 taken 958 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 3 times.
✓ Branch 7 taken 955 times.
368513 if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2122 3 dec->expect_end_ts = 1;
2123
2124 368513 return ret;
2125 }
2126
2127 396471 static int send_to_filter(Scheduler *sch, SchFilterGraph *fg,
2128 unsigned in_idx, AVFrame *frame)
2129 {
2130
2/2
✓ Branch 0 taken 390060 times.
✓ Branch 1 taken 6411 times.
396471 if (frame)
2131 390060 return tq_send(fg->queue, in_idx, frame);
2132
2133
1/2
✓ Branch 0 taken 6411 times.
✗ Branch 1 not taken.
6411 if (!fg->inputs[in_idx].send_finished) {
2134 6411 fg->inputs[in_idx].send_finished = 1;
2135 6411 tq_send_finish(fg->queue, in_idx);
2136
2137 // close the control stream when all actual inputs are done
2138
2/2
✓ Branch 0 taken 6341 times.
✓ Branch 1 taken 70 times.
6411 if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
2139 6341 tq_send_finish(fg->queue, fg->nb_inputs);
2140 }
2141 6411 return 0;
2142 }
2143
2144 400567 static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2145 uint8_t *dst_finished, AVFrame *frame)
2146 {
2147 int ret;
2148
2149
2/2
✓ Branch 0 taken 6353 times.
✓ Branch 1 taken 394214 times.
400567 if (*dst_finished)
2150 6353 return AVERROR_EOF;
2151
2152
2/2
✓ Branch 0 taken 3275 times.
✓ Branch 1 taken 390939 times.
394214 if (!frame)
2153 3275 goto finish;
2154
2155 781878 ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2156
2/2
✓ Branch 0 taken 390060 times.
✓ Branch 1 taken 879 times.
390939 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2157 879 send_to_enc(sch, &sch->enc[dst.idx], frame);
2158
2/2
✓ Branch 0 taken 3174 times.
✓ Branch 1 taken 387765 times.
390939 if (ret == AVERROR_EOF)
2159 3174 goto finish;
2160
2161 387765 return ret;
2162
2163 6449 finish:
2164
2/2
✓ Branch 0 taken 6411 times.
✓ Branch 1 taken 38 times.
6449 if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2165 6411 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2166 else
2167 38 send_to_enc(sch, &sch->enc[dst.idx], NULL);
2168
2169 6449 *dst_finished = 1;
2170
2171 6449 return AVERROR_EOF;
2172 }
2173
2174 392776 int sch_dec_send(Scheduler *sch, unsigned dec_idx, AVFrame *frame)
2175 {
2176 SchDec *dec;
2177 int ret;
2178 392776 unsigned nb_done = 0;
2179
2180
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 392776 times.
392776 av_assert0(dec_idx < sch->nb_dec);
2181 392776 dec = &sch->dec[dec_idx];
2182
2183
2/2
✓ Branch 0 taken 394118 times.
✓ Branch 1 taken 392776 times.
786894 for (unsigned i = 0; i < dec->nb_dst; i++) {
2184 394118 uint8_t *finished = &dec->dst_finished[i];
2185 394118 AVFrame *to_send = frame;
2186
2187 // sending a frame consumes it, so make a temporary reference if needed
2188
2/2
✓ Branch 0 taken 1342 times.
✓ Branch 1 taken 392776 times.
394118 if (i < dec->nb_dst - 1) {
2189 1342 to_send = dec->send_frame;
2190
2191 // frame may sometimes contain props only,
2192 // e.g. to signal EOF timestamp
2193
2/2
✓ Branch 0 taken 1250 times.
✓ Branch 1 taken 92 times.
1342 ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2194 92 av_frame_copy_props(to_send, frame);
2195
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1342 times.
1342 if (ret < 0)
2196 return ret;
2197 }
2198
2199 394118 ret = dec_send_to_dst(sch, dec->dst[i], finished, to_send);
2200
2/2
✓ Branch 0 taken 6353 times.
✓ Branch 1 taken 387765 times.
394118 if (ret < 0) {
2201 6353 av_frame_unref(to_send);
2202
1/2
✓ Branch 0 taken 6353 times.
✗ Branch 1 not taken.
6353 if (ret == AVERROR_EOF) {
2203 6353 nb_done++;
2204 6353 continue;
2205 }
2206 return ret;
2207 }
2208 }
2209
2210
2/2
✓ Branch 0 taken 6274 times.
✓ Branch 1 taken 386502 times.
392776 return (nb_done == dec->nb_dst) ? AVERROR_EOF : 0;
2211 }
2212
2213 6403 static int dec_done(Scheduler *sch, unsigned dec_idx)
2214 {
2215 6403 SchDec *dec = &sch->dec[dec_idx];
2216 6403 int ret = 0;
2217
2218 6403 tq_receive_finish(dec->queue, 0);
2219
2220 // make sure our source does not get stuck waiting for end timestamps
2221 // that will never arrive
2222
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6402 times.
6403 if (dec->queue_end_ts)
2223 1 av_thread_message_queue_set_err_recv(dec->queue_end_ts, AVERROR_EOF);
2224
2225
2/2
✓ Branch 0 taken 6449 times.
✓ Branch 1 taken 6403 times.
12852 for (unsigned i = 0; i < dec->nb_dst; i++) {
2226 6449 int err = dec_send_to_dst(sch, dec->dst[i], &dec->dst_finished[i], NULL);
2227
2/4
✓ Branch 0 taken 6449 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6449 times.
6449 if (err < 0 && err != AVERROR_EOF)
2228 ret = err_merge(ret, err);
2229 }
2230
2231 6403 return ret;
2232 }
2233
2234 419542 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2235 {
2236 SchEnc *enc;
2237 int ret, dummy;
2238
2239
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 419542 times.
419542 av_assert0(enc_idx < sch->nb_enc);
2240 419542 enc = &sch->enc[enc_idx];
2241
2242 419542 ret = tq_receive(enc->queue, &dummy, frame);
2243
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 419542 times.
419542 av_assert0(dummy <= 0);
2244
2245 419542 return ret;
2246 }
2247
2248 413460 static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2249 uint8_t *dst_finished, AVPacket *pkt)
2250 {
2251 int ret;
2252
2253
2/2
✓ Branch 0 taken 42 times.
✓ Branch 1 taken 413418 times.
413460 if (*dst_finished)
2254 42 return AVERROR_EOF;
2255
2256
2/2
✓ Branch 0 taken 6484 times.
✓ Branch 1 taken 406934 times.
413418 if (!pkt)
2257 6484 goto finish;
2258
2259 813868 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2260
2/2
✓ Branch 0 taken 406884 times.
✓ Branch 1 taken 50 times.
406934 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2261 50 tq_send(sch->dec[dst.idx].queue, 0, pkt);
2262
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 406932 times.
406934 if (ret == AVERROR_EOF)
2263 2 goto finish;
2264
2265 406932 return ret;
2266
2267 6486 finish:
2268
2/2
✓ Branch 0 taken 6485 times.
✓ Branch 1 taken 1 times.
6486 if (dst.type == SCH_NODE_TYPE_MUX)
2269 6485 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2270 else
2271 1 tq_send_finish(sch->dec[dst.idx].queue, 0);
2272
2273 6486 *dst_finished = 1;
2274
2275 6486 return AVERROR_EOF;
2276 }
2277
2278 406924 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2279 {
2280 SchEnc *enc;
2281 int ret;
2282
2283
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 406924 times.
406924 av_assert0(enc_idx < sch->nb_enc);
2284 406924 enc = &sch->enc[enc_idx];
2285
2286
2/2
✓ Branch 0 taken 406974 times.
✓ Branch 1 taken 406924 times.
813898 for (unsigned i = 0; i < enc->nb_dst; i++) {
2287 406974 uint8_t *finished = &enc->dst_finished[i];
2288 406974 AVPacket *to_send = pkt;
2289
2290 // sending a packet consumes it, so make a temporary reference if needed
2291
2/2
✓ Branch 0 taken 50 times.
✓ Branch 1 taken 406924 times.
406974 if (i < enc->nb_dst - 1) {
2292 50 to_send = enc->send_pkt;
2293
2294 50 ret = av_packet_ref(to_send, pkt);
2295
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50 times.
50 if (ret < 0)
2296 return ret;
2297 }
2298
2299 406974 ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2300
2/2
✓ Branch 0 taken 42 times.
✓ Branch 1 taken 406932 times.
406974 if (ret < 0) {
2301 42 av_packet_unref(to_send);
2302
1/2
✓ Branch 0 taken 42 times.
✗ Branch 1 not taken.
42 if (ret == AVERROR_EOF)
2303 42 continue;
2304 return ret;
2305 }
2306 }
2307
2308 406924 return 0;
2309 }
2310
2311 6485 static int enc_done(Scheduler *sch, unsigned enc_idx)
2312 {
2313 6485 SchEnc *enc = &sch->enc[enc_idx];
2314 6485 int ret = 0;
2315
2316 6485 tq_receive_finish(enc->queue, 0);
2317
2318
2/2
✓ Branch 0 taken 6486 times.
✓ Branch 1 taken 6485 times.
12971 for (unsigned i = 0; i < enc->nb_dst; i++) {
2319 6486 int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2320
2/4
✓ Branch 0 taken 6486 times.