Line | Branch | Exec | Source |
---|---|---|---|
1 | /* | ||
2 | * Inter-thread scheduling/synchronization. | ||
3 | * Copyright (c) 2023 Anton Khirnov | ||
4 | * | ||
5 | * This file is part of FFmpeg. | ||
6 | * | ||
7 | * FFmpeg is free software; you can redistribute it and/or | ||
8 | * modify it under the terms of the GNU Lesser General Public | ||
9 | * License as published by the Free Software Foundation; either | ||
10 | * version 2.1 of the License, or (at your option) any later version. | ||
11 | * | ||
12 | * FFmpeg is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with FFmpeg; if not, write to the Free Software | ||
19 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | ||
20 | */ | ||
21 | |||
22 | #include <stdatomic.h> | ||
23 | #include <stddef.h> | ||
24 | #include <stdint.h> | ||
25 | |||
26 | #include "cmdutils.h" | ||
27 | #include "ffmpeg_sched.h" | ||
28 | #include "ffmpeg_utils.h" | ||
29 | #include "sync_queue.h" | ||
30 | #include "thread_queue.h" | ||
31 | |||
32 | #include "libavcodec/packet.h" | ||
33 | |||
34 | #include "libavutil/avassert.h" | ||
35 | #include "libavutil/error.h" | ||
36 | #include "libavutil/fifo.h" | ||
37 | #include "libavutil/frame.h" | ||
38 | #include "libavutil/mem.h" | ||
39 | #include "libavutil/thread.h" | ||
40 | #include "libavutil/threadmessage.h" | ||
41 | #include "libavutil/time.h" | ||
42 | |||
43 | // 100 ms | ||
44 | // FIXME: some other value? make this dynamic? | ||
45 | #define SCHEDULE_TOLERANCE (100 * 1000) | ||
46 | |||
47 | enum QueueType { | ||
48 | QUEUE_PACKETS, | ||
49 | QUEUE_FRAMES, | ||
50 | }; | ||
51 | |||
52 | typedef struct SchWaiter { | ||
53 | pthread_mutex_t lock; | ||
54 | pthread_cond_t cond; | ||
55 | atomic_int choked; | ||
56 | |||
57 | // the following are internal state of schedule_update_locked() and must not | ||
58 | // be accessed outside of it | ||
59 | int choked_prev; | ||
60 | int choked_next; | ||
61 | } SchWaiter; | ||
62 | |||
63 | typedef struct SchTask { | ||
64 | Scheduler *parent; | ||
65 | SchedulerNode node; | ||
66 | |||
67 | SchThreadFunc func; | ||
68 | void *func_arg; | ||
69 | |||
70 | pthread_t thread; | ||
71 | int thread_running; | ||
72 | } SchTask; | ||
73 | |||
74 | typedef struct SchDecOutput { | ||
75 | SchedulerNode *dst; | ||
76 | uint8_t *dst_finished; | ||
77 | unsigned nb_dst; | ||
78 | } SchDecOutput; | ||
79 | |||
80 | typedef struct SchDec { | ||
81 | const AVClass *class; | ||
82 | |||
83 | SchedulerNode src; | ||
84 | |||
85 | SchDecOutput *outputs; | ||
86 | unsigned nb_outputs; | ||
87 | |||
88 | SchTask task; | ||
89 | // Queue for receiving input packets, one stream. | ||
90 | ThreadQueue *queue; | ||
91 | |||
92 | // Queue for sending post-flush end timestamps back to the source | ||
93 | AVThreadMessageQueue *queue_end_ts; | ||
94 | int expect_end_ts; | ||
95 | |||
96 | // temporary storage used by sch_dec_send() | ||
97 | AVFrame *send_frame; | ||
98 | } SchDec; | ||
99 | |||
100 | typedef struct SchSyncQueue { | ||
101 | SyncQueue *sq; | ||
102 | AVFrame *frame; | ||
103 | pthread_mutex_t lock; | ||
104 | |||
105 | unsigned *enc_idx; | ||
106 | unsigned nb_enc_idx; | ||
107 | } SchSyncQueue; | ||
108 | |||
109 | typedef struct SchEnc { | ||
110 | const AVClass *class; | ||
111 | |||
112 | SchedulerNode src; | ||
113 | SchedulerNode *dst; | ||
114 | uint8_t *dst_finished; | ||
115 | unsigned nb_dst; | ||
116 | |||
117 | // [0] - index of the sync queue in Scheduler.sq_enc, | ||
118 | // [1] - index of this encoder in the sq | ||
119 | int sq_idx[2]; | ||
120 | |||
121 | /* Opening encoders is somewhat nontrivial due to their interaction with | ||
122 | * sync queues, which are (among other things) responsible for maintaining | ||
123 | * constant audio frame size, when it is required by the encoder. | ||
124 | * | ||
125 | * Opening the encoder requires stream parameters, obtained from the first | ||
126 | * frame. However, that frame cannot be properly chunked by the sync queue | ||
127 | * without knowing the required frame size, which is only available after | ||
128 | * opening the encoder. | ||
129 | * | ||
130 | * This apparent circular dependency is resolved in the following way: | ||
131 | * - the caller creating the encoder gives us a callback which opens the | ||
132 | * encoder and returns the required frame size (if any) | ||
133 | * - when the first frame is sent to the encoder, the sending thread | ||
134 | * - calls this callback, opening the encoder | ||
135 | * - passes the returned frame size to the sync queue | ||
136 | */ | ||
137 | int (*open_cb)(void *opaque, const AVFrame *frame); | ||
138 | int opened; | ||
139 | |||
140 | SchTask task; | ||
141 | // Queue for receiving input frames, one stream. | ||
142 | ThreadQueue *queue; | ||
143 | // tq_send() to queue returned EOF | ||
144 | int in_finished; | ||
145 | |||
146 | // temporary storage used by sch_enc_send() | ||
147 | AVPacket *send_pkt; | ||
148 | } SchEnc; | ||
149 | |||
150 | typedef struct SchDemuxStream { | ||
151 | SchedulerNode *dst; | ||
152 | uint8_t *dst_finished; | ||
153 | unsigned nb_dst; | ||
154 | } SchDemuxStream; | ||
155 | |||
156 | typedef struct SchDemux { | ||
157 | const AVClass *class; | ||
158 | |||
159 | SchDemuxStream *streams; | ||
160 | unsigned nb_streams; | ||
161 | |||
162 | SchTask task; | ||
163 | SchWaiter waiter; | ||
164 | |||
165 | // temporary storage used by sch_demux_send() | ||
166 | AVPacket *send_pkt; | ||
167 | |||
168 | // protected by schedule_lock | ||
169 | int task_exited; | ||
170 | } SchDemux; | ||
171 | |||
172 | typedef struct PreMuxQueue { | ||
173 | /** | ||
174 | * Queue for buffering the packets before the muxer task can be started. | ||
175 | */ | ||
176 | AVFifo *fifo; | ||
177 | /** | ||
178 | * Maximum number of packets in fifo. | ||
179 | */ | ||
180 | int max_packets; | ||
181 | /* | ||
182 | * The size of the AVPackets' buffers in queue. | ||
183 | * Updated when a packet is either pushed or pulled from the queue. | ||
184 | */ | ||
185 | size_t data_size; | ||
186 | /* Threshold after which max_packets will be in effect */ | ||
187 | size_t data_threshold; | ||
188 | } PreMuxQueue; | ||
189 | |||
190 | typedef struct SchMuxStream { | ||
191 | SchedulerNode src; | ||
192 | SchedulerNode src_sched; | ||
193 | |||
194 | unsigned *sub_heartbeat_dst; | ||
195 | unsigned nb_sub_heartbeat_dst; | ||
196 | |||
197 | PreMuxQueue pre_mux_queue; | ||
198 | |||
199 | // an EOF was generated while flushing the pre-mux queue | ||
200 | int init_eof; | ||
201 | |||
202 | //////////////////////////////////////////////////////////// | ||
203 | // The following are protected by Scheduler.schedule_lock // | ||
204 | |||
205 | /* dts+duration of the last packet sent to this stream | ||
206 | in AV_TIME_BASE_Q */ | ||
207 | int64_t last_dts; | ||
208 | // this stream no longer accepts input | ||
209 | int source_finished; | ||
210 | //////////////////////////////////////////////////////////// | ||
211 | } SchMuxStream; | ||
212 | |||
213 | typedef struct SchMux { | ||
214 | const AVClass *class; | ||
215 | |||
216 | SchMuxStream *streams; | ||
217 | unsigned nb_streams; | ||
218 | unsigned nb_streams_ready; | ||
219 | |||
220 | int (*init)(void *arg); | ||
221 | |||
222 | SchTask task; | ||
223 | /** | ||
224 | * Set to 1 after starting the muxer task and flushing the | ||
225 | * pre-muxing queues. | ||
226 | * Set either before any tasks have started, or with | ||
227 | * Scheduler.mux_ready_lock held. | ||
228 | */ | ||
229 | atomic_int mux_started; | ||
230 | ThreadQueue *queue; | ||
231 | unsigned queue_size; | ||
232 | |||
233 | AVPacket *sub_heartbeat_pkt; | ||
234 | } SchMux; | ||
235 | |||
236 | typedef struct SchFilterIn { | ||
237 | SchedulerNode src; | ||
238 | SchedulerNode src_sched; | ||
239 | int send_finished; | ||
240 | int receive_finished; | ||
241 | } SchFilterIn; | ||
242 | |||
243 | typedef struct SchFilterOut { | ||
244 | SchedulerNode dst; | ||
245 | } SchFilterOut; | ||
246 | |||
247 | typedef struct SchFilterGraph { | ||
248 | const AVClass *class; | ||
249 | |||
250 | SchFilterIn *inputs; | ||
251 | unsigned nb_inputs; | ||
252 | atomic_uint nb_inputs_finished_send; | ||
253 | unsigned nb_inputs_finished_receive; | ||
254 | |||
255 | SchFilterOut *outputs; | ||
256 | unsigned nb_outputs; | ||
257 | |||
258 | SchTask task; | ||
259 | // input queue, nb_inputs+1 streams | ||
260 | // last stream is control | ||
261 | ThreadQueue *queue; | ||
262 | SchWaiter waiter; | ||
263 | |||
264 | // protected by schedule_lock | ||
265 | unsigned best_input; | ||
266 | int task_exited; | ||
267 | } SchFilterGraph; | ||
268 | |||
269 | enum SchedulerState { | ||
270 | SCH_STATE_UNINIT, | ||
271 | SCH_STATE_STARTED, | ||
272 | SCH_STATE_STOPPED, | ||
273 | }; | ||
274 | |||
275 | struct Scheduler { | ||
276 | const AVClass *class; | ||
277 | |||
278 | SchDemux *demux; | ||
279 | unsigned nb_demux; | ||
280 | |||
281 | SchMux *mux; | ||
282 | unsigned nb_mux; | ||
283 | |||
284 | unsigned nb_mux_ready; | ||
285 | pthread_mutex_t mux_ready_lock; | ||
286 | |||
287 | unsigned nb_mux_done; | ||
288 | pthread_mutex_t mux_done_lock; | ||
289 | pthread_cond_t mux_done_cond; | ||
290 | |||
291 | |||
292 | SchDec *dec; | ||
293 | unsigned nb_dec; | ||
294 | |||
295 | SchEnc *enc; | ||
296 | unsigned nb_enc; | ||
297 | |||
298 | SchSyncQueue *sq_enc; | ||
299 | unsigned nb_sq_enc; | ||
300 | |||
301 | SchFilterGraph *filters; | ||
302 | unsigned nb_filters; | ||
303 | |||
304 | char *sdp_filename; | ||
305 | int sdp_auto; | ||
306 | |||
307 | enum SchedulerState state; | ||
308 | atomic_int terminate; | ||
309 | atomic_int task_failed; | ||
310 | |||
311 | pthread_mutex_t schedule_lock; | ||
312 | |||
313 | atomic_int_least64_t last_dts; | ||
314 | }; | ||
315 | |||
316 | /** | ||
317 | * Wait until this task is allowed to proceed. | ||
318 | * | ||
319 | * @retval 0 the caller should proceed | ||
320 | * @retval 1 the caller should terminate | ||
321 | */ | ||
322 | 491275 | static int waiter_wait(Scheduler *sch, SchWaiter *w) | |
323 | { | ||
324 | int terminate; | ||
325 | |||
326 |
2/2✓ Branch 0 taken 491027 times.
✓ Branch 1 taken 248 times.
|
491275 | if (!atomic_load(&w->choked)) |
327 | 491027 | return 0; | |
328 | |||
329 | 248 | pthread_mutex_lock(&w->lock); | |
330 | |||
331 |
4/4✓ Branch 0 taken 254 times.
✓ Branch 1 taken 204 times.
✓ Branch 2 taken 210 times.
✓ Branch 3 taken 44 times.
|
458 | while (atomic_load(&w->choked) && !atomic_load(&sch->terminate)) |
332 | 210 | pthread_cond_wait(&w->cond, &w->lock); | |
333 | |||
334 | 248 | terminate = atomic_load(&sch->terminate); | |
335 | |||
336 | 248 | pthread_mutex_unlock(&w->lock); | |
337 | |||
338 | 248 | return terminate; | |
339 | } | ||
340 | |||
341 | 32894 | static void waiter_set(SchWaiter *w, int choked) | |
342 | { | ||
343 | 32894 | pthread_mutex_lock(&w->lock); | |
344 | |||
345 | 32894 | atomic_store(&w->choked, choked); | |
346 | 32894 | pthread_cond_signal(&w->cond); | |
347 | |||
348 | 32894 | pthread_mutex_unlock(&w->lock); | |
349 | 32894 | } | |
350 | |||
351 | 14701 | static int waiter_init(SchWaiter *w) | |
352 | { | ||
353 | int ret; | ||
354 | |||
355 | 14701 | atomic_init(&w->choked, 0); | |
356 | |||
357 | 14701 | ret = pthread_mutex_init(&w->lock, NULL); | |
358 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 14701 times.
|
14701 | if (ret) |
359 | ✗ | return AVERROR(ret); | |
360 | |||
361 | 14701 | ret = pthread_cond_init(&w->cond, NULL); | |
362 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 14701 times.
|
14701 | if (ret) |
363 | ✗ | return AVERROR(ret); | |
364 | |||
365 | 14701 | return 0; | |
366 | } | ||
367 | |||
368 | 14701 | static void waiter_uninit(SchWaiter *w) | |
369 | { | ||
370 | 14701 | pthread_mutex_destroy(&w->lock); | |
371 | 14701 | pthread_cond_destroy(&w->cond); | |
372 | 14701 | } | |
373 | |||
374 | 29903 | static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size, | |
375 | enum QueueType type) | ||
376 | { | ||
377 | ThreadQueue *tq; | ||
378 | ObjPool *op; | ||
379 | |||
380 |
1/2✓ Branch 0 taken 29903 times.
✗ Branch 1 not taken.
|
29903 | if (queue_size <= 0) { |
381 |
2/2✓ Branch 0 taken 15233 times.
✓ Branch 1 taken 14670 times.
|
29903 | if (type == QUEUE_FRAMES) |
382 | 15233 | queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE; | |
383 | else | ||
384 | 14670 | queue_size = DEFAULT_PACKET_THREAD_QUEUE_SIZE; | |
385 | } | ||
386 | |||
387 |
2/2✓ Branch 0 taken 15233 times.
✓ Branch 1 taken 14670 times.
|
29903 | if (type == QUEUE_FRAMES) { |
388 | // This queue length is used in the decoder code to ensure that | ||
389 | // there are enough entries in fixed-size frame pools to account | ||
390 | // for frames held in queues inside the ffmpeg utility. If this | ||
391 | // can ever dynamically change then the corresponding decode | ||
392 | // code needs to be updated as well. | ||
393 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 15233 times.
|
15233 | av_assert0(queue_size == DEFAULT_FRAME_THREAD_QUEUE_SIZE); |
394 | } | ||
395 | |||
396 |
2/2✓ Branch 0 taken 14670 times.
✓ Branch 1 taken 15233 times.
|
29903 | op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() : |
397 | 15233 | objpool_alloc_frames(); | |
398 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 29903 times.
|
29903 | if (!op) |
399 | ✗ | return AVERROR(ENOMEM); | |
400 | |||
401 |
2/2✓ Branch 0 taken 14670 times.
✓ Branch 1 taken 15233 times.
|
29903 | tq = tq_alloc(nb_streams, queue_size, op, |
402 | (type == QUEUE_PACKETS) ? pkt_move : frame_move); | ||
403 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 29903 times.
|
29903 | if (!tq) { |
404 | ✗ | objpool_free(&op); | |
405 | ✗ | return AVERROR(ENOMEM); | |
406 | } | ||
407 | |||
408 | 29903 | *ptq = tq; | |
409 | 29903 | return 0; | |
410 | } | ||
411 | |||
412 | static void *task_wrapper(void *arg); | ||
413 | |||
414 | 37053 | static int task_start(SchTask *task) | |
415 | { | ||
416 | int ret; | ||
417 | |||
418 | 37053 | av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n"); | |
419 | |||
420 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 37053 times.
|
37053 | av_assert0(!task->thread_running); |
421 | |||
422 | 37053 | ret = pthread_create(&task->thread, NULL, task_wrapper, task); | |
423 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 37053 times.
|
37053 | if (ret) { |
424 | ✗ | av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n", | |
425 | strerror(ret)); | ||
426 | ✗ | return AVERROR(ret); | |
427 | } | ||
428 | |||
429 | 37053 | task->thread_running = 1; | |
430 | 37053 | return 0; | |
431 | } | ||
432 | |||
433 | 37067 | static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx, | |
434 | SchThreadFunc func, void *func_arg) | ||
435 | { | ||
436 | 37067 | task->parent = sch; | |
437 | |||
438 | 37067 | task->node.type = type; | |
439 | 37067 | task->node.idx = idx; | |
440 | |||
441 | 37067 | task->func = func; | |
442 | 37067 | task->func_arg = func_arg; | |
443 | 37067 | } | |
444 | |||
445 | 535352 | static int64_t trailing_dts(const Scheduler *sch, int count_finished) | |
446 | { | ||
447 | 535352 | int64_t min_dts = INT64_MAX; | |
448 | |||
449 |
2/2✓ Branch 0 taken 535604 times.
✓ Branch 1 taken 520431 times.
|
1056035 | for (unsigned i = 0; i < sch->nb_mux; i++) { |
450 | 535604 | const SchMux *mux = &sch->mux[i]; | |
451 | |||
452 |
2/2✓ Branch 0 taken 581802 times.
✓ Branch 1 taken 520683 times.
|
1102485 | for (unsigned j = 0; j < mux->nb_streams; j++) { |
453 | 581802 | const SchMuxStream *ms = &mux->streams[j]; | |
454 | |||
455 |
4/4✓ Branch 0 taken 44798 times.
✓ Branch 1 taken 537004 times.
✓ Branch 2 taken 36478 times.
✓ Branch 3 taken 8320 times.
|
581802 | if (ms->source_finished && !count_finished) |
456 | 36478 | continue; | |
457 |
2/2✓ Branch 0 taken 14921 times.
✓ Branch 1 taken 530403 times.
|
545324 | if (ms->last_dts == AV_NOPTS_VALUE) |
458 | 14921 | return AV_NOPTS_VALUE; | |
459 | |||
460 | 530403 | min_dts = FFMIN(min_dts, ms->last_dts); | |
461 | } | ||
462 | } | ||
463 | |||
464 |
2/2✓ Branch 0 taken 492273 times.
✓ Branch 1 taken 28158 times.
|
520431 | return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts; |
465 | } | ||
466 | |||
467 | 7915 | void sch_free(Scheduler **psch) | |
468 | { | ||
469 | 7915 | Scheduler *sch = *psch; | |
470 | |||
471 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7915 times.
|
7915 | if (!sch) |
472 | ✗ | return; | |
473 | |||
474 | 7915 | sch_stop(sch, NULL); | |
475 | |||
476 |
2/2✓ Branch 0 taken 7164 times.
✓ Branch 1 taken 7915 times.
|
15079 | for (unsigned i = 0; i < sch->nb_demux; i++) { |
477 | 7164 | SchDemux *d = &sch->demux[i]; | |
478 | |||
479 |
2/2✓ Branch 0 taken 7390 times.
✓ Branch 1 taken 7164 times.
|
14554 | for (unsigned j = 0; j < d->nb_streams; j++) { |
480 | 7390 | SchDemuxStream *ds = &d->streams[j]; | |
481 | 7390 | av_freep(&ds->dst); | |
482 | 7390 | av_freep(&ds->dst_finished); | |
483 | } | ||
484 | 7164 | av_freep(&d->streams); | |
485 | |||
486 | 7164 | av_packet_free(&d->send_pkt); | |
487 | |||
488 | 7164 | waiter_uninit(&d->waiter); | |
489 | } | ||
490 | 7915 | av_freep(&sch->demux); | |
491 | |||
492 |
2/2✓ Branch 0 taken 7916 times.
✓ Branch 1 taken 7915 times.
|
15831 | for (unsigned i = 0; i < sch->nb_mux; i++) { |
493 | 7916 | SchMux *mux = &sch->mux[i]; | |
494 | |||
495 |
2/2✓ Branch 0 taken 8356 times.
✓ Branch 1 taken 7916 times.
|
16272 | for (unsigned j = 0; j < mux->nb_streams; j++) { |
496 | 8356 | SchMuxStream *ms = &mux->streams[j]; | |
497 | |||
498 |
1/2✓ Branch 0 taken 8356 times.
✗ Branch 1 not taken.
|
8356 | if (ms->pre_mux_queue.fifo) { |
499 | AVPacket *pkt; | ||
500 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 8356 times.
|
8356 | while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0) |
501 | ✗ | av_packet_free(&pkt); | |
502 | 8356 | av_fifo_freep2(&ms->pre_mux_queue.fifo); | |
503 | } | ||
504 | |||
505 | 8356 | av_freep(&ms->sub_heartbeat_dst); | |
506 | } | ||
507 | 7916 | av_freep(&mux->streams); | |
508 | |||
509 | 7916 | av_packet_free(&mux->sub_heartbeat_pkt); | |
510 | |||
511 | 7916 | tq_free(&mux->queue); | |
512 | } | ||
513 | 7915 | av_freep(&sch->mux); | |
514 | |||
515 |
2/2✓ Branch 0 taken 6754 times.
✓ Branch 1 taken 7915 times.
|
14669 | for (unsigned i = 0; i < sch->nb_dec; i++) { |
516 | 6754 | SchDec *dec = &sch->dec[i]; | |
517 | |||
518 | 6754 | tq_free(&dec->queue); | |
519 | |||
520 | 6754 | av_thread_message_queue_free(&dec->queue_end_ts); | |
521 | |||
522 |
2/2✓ Branch 0 taken 6760 times.
✓ Branch 1 taken 6754 times.
|
13514 | for (unsigned j = 0; j < dec->nb_outputs; j++) { |
523 | 6760 | SchDecOutput *o = &dec->outputs[j]; | |
524 | |||
525 | 6760 | av_freep(&o->dst); | |
526 | 6760 | av_freep(&o->dst_finished); | |
527 | } | ||
528 | |||
529 | 6754 | av_freep(&dec->outputs); | |
530 | |||
531 | 6754 | av_frame_free(&dec->send_frame); | |
532 | } | ||
533 | 7915 | av_freep(&sch->dec); | |
534 | |||
535 |
2/2✓ Branch 0 taken 7696 times.
✓ Branch 1 taken 7915 times.
|
15611 | for (unsigned i = 0; i < sch->nb_enc; i++) { |
536 | 7696 | SchEnc *enc = &sch->enc[i]; | |
537 | |||
538 | 7696 | tq_free(&enc->queue); | |
539 | |||
540 | 7696 | av_packet_free(&enc->send_pkt); | |
541 | |||
542 | 7696 | av_freep(&enc->dst); | |
543 | 7696 | av_freep(&enc->dst_finished); | |
544 | } | ||
545 | 7915 | av_freep(&sch->enc); | |
546 | |||
547 |
2/2✓ Branch 0 taken 3068 times.
✓ Branch 1 taken 7915 times.
|
10983 | for (unsigned i = 0; i < sch->nb_sq_enc; i++) { |
548 | 3068 | SchSyncQueue *sq = &sch->sq_enc[i]; | |
549 | 3068 | sq_free(&sq->sq); | |
550 | 3068 | av_frame_free(&sq->frame); | |
551 | 3068 | pthread_mutex_destroy(&sq->lock); | |
552 | 3068 | av_freep(&sq->enc_idx); | |
553 | } | ||
554 | 7915 | av_freep(&sch->sq_enc); | |
555 | |||
556 |
2/2✓ Branch 0 taken 7537 times.
✓ Branch 1 taken 7915 times.
|
15452 | for (unsigned i = 0; i < sch->nb_filters; i++) { |
557 | 7537 | SchFilterGraph *fg = &sch->filters[i]; | |
558 | |||
559 | 7537 | tq_free(&fg->queue); | |
560 | |||
561 | 7537 | av_freep(&fg->inputs); | |
562 | 7537 | av_freep(&fg->outputs); | |
563 | |||
564 | 7537 | waiter_uninit(&fg->waiter); | |
565 | } | ||
566 | 7915 | av_freep(&sch->filters); | |
567 | |||
568 | 7915 | av_freep(&sch->sdp_filename); | |
569 | |||
570 | 7915 | pthread_mutex_destroy(&sch->schedule_lock); | |
571 | |||
572 | 7915 | pthread_mutex_destroy(&sch->mux_ready_lock); | |
573 | |||
574 | 7915 | pthread_mutex_destroy(&sch->mux_done_lock); | |
575 | 7915 | pthread_cond_destroy(&sch->mux_done_cond); | |
576 | |||
577 | 7915 | av_freep(psch); | |
578 | } | ||
579 | |||
580 | static const AVClass scheduler_class = { | ||
581 | .class_name = "Scheduler", | ||
582 | .version = LIBAVUTIL_VERSION_INT, | ||
583 | }; | ||
584 | |||
585 | 7915 | Scheduler *sch_alloc(void) | |
586 | { | ||
587 | Scheduler *sch; | ||
588 | int ret; | ||
589 | |||
590 | 7915 | sch = av_mallocz(sizeof(*sch)); | |
591 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7915 times.
|
7915 | if (!sch) |
592 | ✗ | return NULL; | |
593 | |||
594 | 7915 | sch->class = &scheduler_class; | |
595 | 7915 | sch->sdp_auto = 1; | |
596 | |||
597 | 7915 | ret = pthread_mutex_init(&sch->schedule_lock, NULL); | |
598 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7915 times.
|
7915 | if (ret) |
599 | ✗ | goto fail; | |
600 | |||
601 | 7915 | ret = pthread_mutex_init(&sch->mux_ready_lock, NULL); | |
602 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7915 times.
|
7915 | if (ret) |
603 | ✗ | goto fail; | |
604 | |||
605 | 7915 | ret = pthread_mutex_init(&sch->mux_done_lock, NULL); | |
606 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7915 times.
|
7915 | if (ret) |
607 | ✗ | goto fail; | |
608 | |||
609 | 7915 | ret = pthread_cond_init(&sch->mux_done_cond, NULL); | |
610 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7915 times.
|
7915 | if (ret) |
611 | ✗ | goto fail; | |
612 | |||
613 | 7915 | return sch; | |
614 | ✗ | fail: | |
615 | ✗ | sch_free(&sch); | |
616 | ✗ | return NULL; | |
617 | } | ||
618 | |||
619 | ✗ | int sch_sdp_filename(Scheduler *sch, const char *sdp_filename) | |
620 | { | ||
621 | ✗ | av_freep(&sch->sdp_filename); | |
622 | ✗ | sch->sdp_filename = av_strdup(sdp_filename); | |
623 | ✗ | return sch->sdp_filename ? 0 : AVERROR(ENOMEM); | |
624 | } | ||
625 | |||
626 | static const AVClass sch_mux_class = { | ||
627 | .class_name = "SchMux", | ||
628 | .version = LIBAVUTIL_VERSION_INT, | ||
629 | .parent_log_context_offset = offsetof(SchMux, task.func_arg), | ||
630 | }; | ||
631 | |||
632 | 7916 | int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *), | |
633 | void *arg, int sdp_auto, unsigned thread_queue_size) | ||
634 | { | ||
635 | 7916 | const unsigned idx = sch->nb_mux; | |
636 | |||
637 | SchMux *mux; | ||
638 | int ret; | ||
639 | |||
640 | 7916 | ret = GROW_ARRAY(sch->mux, sch->nb_mux); | |
641 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7916 times.
|
7916 | if (ret < 0) |
642 | ✗ | return ret; | |
643 | |||
644 | 7916 | mux = &sch->mux[idx]; | |
645 | 7916 | mux->class = &sch_mux_class; | |
646 | 7916 | mux->init = init; | |
647 | 7916 | mux->queue_size = thread_queue_size; | |
648 | |||
649 | 7916 | task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg); | |
650 | |||
651 | 7916 | sch->sdp_auto &= sdp_auto; | |
652 | |||
653 | 7916 | return idx; | |
654 | } | ||
655 | |||
656 | 8356 | int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx) | |
657 | { | ||
658 | SchMux *mux; | ||
659 | SchMuxStream *ms; | ||
660 | unsigned stream_idx; | ||
661 | int ret; | ||
662 | |||
663 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 8356 times.
|
8356 | av_assert0(mux_idx < sch->nb_mux); |
664 | 8356 | mux = &sch->mux[mux_idx]; | |
665 | |||
666 | 8356 | ret = GROW_ARRAY(mux->streams, mux->nb_streams); | |
667 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 8356 times.
|
8356 | if (ret < 0) |
668 | ✗ | return ret; | |
669 | 8356 | stream_idx = mux->nb_streams - 1; | |
670 | |||
671 | 8356 | ms = &mux->streams[stream_idx]; | |
672 | |||
673 | 8356 | ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0); | |
674 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 8356 times.
|
8356 | if (!ms->pre_mux_queue.fifo) |
675 | ✗ | return AVERROR(ENOMEM); | |
676 | |||
677 | 8356 | ms->last_dts = AV_NOPTS_VALUE; | |
678 | |||
679 | 8356 | return stream_idx; | |
680 | } | ||
681 | |||
682 | static const AVClass sch_demux_class = { | ||
683 | .class_name = "SchDemux", | ||
684 | .version = LIBAVUTIL_VERSION_INT, | ||
685 | .parent_log_context_offset = offsetof(SchDemux, task.func_arg), | ||
686 | }; | ||
687 | |||
688 | 7164 | int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx) | |
689 | { | ||
690 | 7164 | const unsigned idx = sch->nb_demux; | |
691 | |||
692 | SchDemux *d; | ||
693 | int ret; | ||
694 | |||
695 | 7164 | ret = GROW_ARRAY(sch->demux, sch->nb_demux); | |
696 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7164 times.
|
7164 | if (ret < 0) |
697 | ✗ | return ret; | |
698 | |||
699 | 7164 | d = &sch->demux[idx]; | |
700 | |||
701 | 7164 | task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx); | |
702 | |||
703 | 7164 | d->class = &sch_demux_class; | |
704 | 7164 | d->send_pkt = av_packet_alloc(); | |
705 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7164 times.
|
7164 | if (!d->send_pkt) |
706 | ✗ | return AVERROR(ENOMEM); | |
707 | |||
708 | 7164 | ret = waiter_init(&d->waiter); | |
709 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7164 times.
|
7164 | if (ret < 0) |
710 | ✗ | return ret; | |
711 | |||
712 | 7164 | return idx; | |
713 | } | ||
714 | |||
715 | 7390 | int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx) | |
716 | { | ||
717 | SchDemux *d; | ||
718 | int ret; | ||
719 | |||
720 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7390 times.
|
7390 | av_assert0(demux_idx < sch->nb_demux); |
721 | 7390 | d = &sch->demux[demux_idx]; | |
722 | |||
723 | 7390 | ret = GROW_ARRAY(d->streams, d->nb_streams); | |
724 |
1/2✓ Branch 0 taken 7390 times.
✗ Branch 1 not taken.
|
7390 | return ret < 0 ? ret : d->nb_streams - 1; |
725 | } | ||
726 | |||
727 | 6760 | int sch_add_dec_output(Scheduler *sch, unsigned dec_idx) | |
728 | { | ||
729 | SchDec *dec; | ||
730 | int ret; | ||
731 | |||
732 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6760 times.
|
6760 | av_assert0(dec_idx < sch->nb_dec); |
733 | 6760 | dec = &sch->dec[dec_idx]; | |
734 | |||
735 | 6760 | ret = GROW_ARRAY(dec->outputs, dec->nb_outputs); | |
736 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6760 times.
|
6760 | if (ret < 0) |
737 | ✗ | return ret; | |
738 | |||
739 | 6760 | return dec->nb_outputs - 1; | |
740 | } | ||
741 | |||
742 | static const AVClass sch_dec_class = { | ||
743 | .class_name = "SchDec", | ||
744 | .version = LIBAVUTIL_VERSION_INT, | ||
745 | .parent_log_context_offset = offsetof(SchDec, task.func_arg), | ||
746 | }; | ||
747 | |||
748 | 6754 | int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts) | |
749 | { | ||
750 | 6754 | const unsigned idx = sch->nb_dec; | |
751 | |||
752 | SchDec *dec; | ||
753 | int ret; | ||
754 | |||
755 | 6754 | ret = GROW_ARRAY(sch->dec, sch->nb_dec); | |
756 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6754 times.
|
6754 | if (ret < 0) |
757 | ✗ | return ret; | |
758 | |||
759 | 6754 | dec = &sch->dec[idx]; | |
760 | |||
761 | 6754 | task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx); | |
762 | |||
763 | 6754 | dec->class = &sch_dec_class; | |
764 | 6754 | dec->send_frame = av_frame_alloc(); | |
765 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6754 times.
|
6754 | if (!dec->send_frame) |
766 | ✗ | return AVERROR(ENOMEM); | |
767 | |||
768 | 6754 | ret = sch_add_dec_output(sch, idx); | |
769 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6754 times.
|
6754 | if (ret < 0) |
770 | ✗ | return ret; | |
771 | |||
772 | 6754 | ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS); | |
773 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6754 times.
|
6754 | if (ret < 0) |
774 | ✗ | return ret; | |
775 | |||
776 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6753 times.
|
6754 | if (send_end_ts) { |
777 | 1 | ret = av_thread_message_queue_alloc(&dec->queue_end_ts, 1, sizeof(Timestamp)); | |
778 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (ret < 0) |
779 | ✗ | return ret; | |
780 | } | ||
781 | |||
782 | 6754 | return idx; | |
783 | } | ||
784 | |||
785 | static const AVClass sch_enc_class = { | ||
786 | .class_name = "SchEnc", | ||
787 | .version = LIBAVUTIL_VERSION_INT, | ||
788 | .parent_log_context_offset = offsetof(SchEnc, task.func_arg), | ||
789 | }; | ||
790 | |||
791 | 7696 | int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, | |
792 | int (*open_cb)(void *opaque, const AVFrame *frame)) | ||
793 | { | ||
794 | 7696 | const unsigned idx = sch->nb_enc; | |
795 | |||
796 | SchEnc *enc; | ||
797 | int ret; | ||
798 | |||
799 | 7696 | ret = GROW_ARRAY(sch->enc, sch->nb_enc); | |
800 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7696 times.
|
7696 | if (ret < 0) |
801 | ✗ | return ret; | |
802 | |||
803 | 7696 | enc = &sch->enc[idx]; | |
804 | |||
805 | 7696 | enc->class = &sch_enc_class; | |
806 | 7696 | enc->open_cb = open_cb; | |
807 | 7696 | enc->sq_idx[0] = -1; | |
808 | 7696 | enc->sq_idx[1] = -1; | |
809 | |||
810 | 7696 | task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx); | |
811 | |||
812 | 7696 | enc->send_pkt = av_packet_alloc(); | |
813 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7696 times.
|
7696 | if (!enc->send_pkt) |
814 | ✗ | return AVERROR(ENOMEM); | |
815 | |||
816 | 7696 | ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES); | |
817 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7696 times.
|
7696 | if (ret < 0) |
818 | ✗ | return ret; | |
819 | |||
820 | 7696 | return idx; | |
821 | } | ||
822 | |||
823 | static const AVClass sch_fg_class = { | ||
824 | .class_name = "SchFilterGraph", | ||
825 | .version = LIBAVUTIL_VERSION_INT, | ||
826 | .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg), | ||
827 | }; | ||
828 | |||
829 | 7537 | int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, | |
830 | SchThreadFunc func, void *ctx) | ||
831 | { | ||
832 | 7537 | const unsigned idx = sch->nb_filters; | |
833 | |||
834 | SchFilterGraph *fg; | ||
835 | int ret; | ||
836 | |||
837 | 7537 | ret = GROW_ARRAY(sch->filters, sch->nb_filters); | |
838 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7537 times.
|
7537 | if (ret < 0) |
839 | ✗ | return ret; | |
840 | 7537 | fg = &sch->filters[idx]; | |
841 | |||
842 | 7537 | fg->class = &sch_fg_class; | |
843 | |||
844 | 7537 | task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx); | |
845 | |||
846 |
2/2✓ Branch 0 taken 6698 times.
✓ Branch 1 taken 839 times.
|
7537 | if (nb_inputs) { |
847 | 6698 | fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs)); | |
848 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6698 times.
|
6698 | if (!fg->inputs) |
849 | ✗ | return AVERROR(ENOMEM); | |
850 | 6698 | fg->nb_inputs = nb_inputs; | |
851 | } | ||
852 | |||
853 |
1/2✓ Branch 0 taken 7537 times.
✗ Branch 1 not taken.
|
7537 | if (nb_outputs) { |
854 | 7537 | fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs)); | |
855 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7537 times.
|
7537 | if (!fg->outputs) |
856 | ✗ | return AVERROR(ENOMEM); | |
857 | 7537 | fg->nb_outputs = nb_outputs; | |
858 | } | ||
859 | |||
860 | 7537 | ret = waiter_init(&fg->waiter); | |
861 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7537 times.
|
7537 | if (ret < 0) |
862 | ✗ | return ret; | |
863 | |||
864 | 7537 | ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES); | |
865 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7537 times.
|
7537 | if (ret < 0) |
866 | ✗ | return ret; | |
867 | |||
868 | 7537 | return idx; | |
869 | } | ||
870 | |||
871 | 3068 | int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx) | |
872 | { | ||
873 | SchSyncQueue *sq; | ||
874 | int ret; | ||
875 | |||
876 | 3068 | ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc); | |
877 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3068 times.
|
3068 | if (ret < 0) |
878 | ✗ | return ret; | |
879 | 3068 | sq = &sch->sq_enc[sch->nb_sq_enc - 1]; | |
880 | |||
881 | 3068 | sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx); | |
882 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3068 times.
|
3068 | if (!sq->sq) |
883 | ✗ | return AVERROR(ENOMEM); | |
884 | |||
885 | 3068 | sq->frame = av_frame_alloc(); | |
886 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3068 times.
|
3068 | if (!sq->frame) |
887 | ✗ | return AVERROR(ENOMEM); | |
888 | |||
889 | 3068 | ret = pthread_mutex_init(&sq->lock, NULL); | |
890 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3068 times.
|
3068 | if (ret) |
891 | ✗ | return AVERROR(ret); | |
892 | |||
893 | 3068 | return sq - sch->sq_enc; | |
894 | } | ||
895 | |||
896 | 3121 | int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx, | |
897 | int limiting, uint64_t max_frames) | ||
898 | { | ||
899 | SchSyncQueue *sq; | ||
900 | SchEnc *enc; | ||
901 | int ret; | ||
902 | |||
903 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3121 times.
|
3121 | av_assert0(sq_idx < sch->nb_sq_enc); |
904 | 3121 | sq = &sch->sq_enc[sq_idx]; | |
905 | |||
906 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3121 times.
|
3121 | av_assert0(enc_idx < sch->nb_enc); |
907 | 3121 | enc = &sch->enc[enc_idx]; | |
908 | |||
909 | 3121 | ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx); | |
910 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3121 times.
|
3121 | if (ret < 0) |
911 | ✗ | return ret; | |
912 | 3121 | sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx; | |
913 | |||
914 | 3121 | ret = sq_add_stream(sq->sq, limiting); | |
915 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3121 times.
|
3121 | if (ret < 0) |
916 | ✗ | return ret; | |
917 | |||
918 | 3121 | enc->sq_idx[0] = sq_idx; | |
919 | 3121 | enc->sq_idx[1] = ret; | |
920 | |||
921 |
2/2✓ Branch 0 taken 2942 times.
✓ Branch 1 taken 179 times.
|
3121 | if (max_frames != INT64_MAX) |
922 | 2942 | sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames); | |
923 | |||
924 | 3121 | return 0; | |
925 | } | ||
926 | |||
927 | 29580 | int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst) | |
928 | { | ||
929 | int ret; | ||
930 | |||
931 |
4/5✓ Branch 0 taken 7413 times.
✓ Branch 1 taken 6812 times.
✓ Branch 2 taken 7658 times.
✓ Branch 3 taken 7697 times.
✗ Branch 4 not taken.
|
29580 | switch (src.type) { |
932 | 7413 | case SCH_NODE_TYPE_DEMUX: { | |
933 | SchDemuxStream *ds; | ||
934 | |||
935 |
2/4✓ Branch 0 taken 7413 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7413 times.
|
7413 | av_assert0(src.idx < sch->nb_demux && |
936 | src.idx_stream < sch->demux[src.idx].nb_streams); | ||
937 | 7413 | ds = &sch->demux[src.idx].streams[src.idx_stream]; | |
938 | |||
939 | 7413 | ret = GROW_ARRAY(ds->dst, ds->nb_dst); | |
940 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7413 times.
|
7413 | if (ret < 0) |
941 | ✗ | return ret; | |
942 | |||
943 | 7413 | ds->dst[ds->nb_dst - 1] = dst; | |
944 | |||
945 | // demuxed packets go to decoding or streamcopy | ||
946 |
2/3✓ Branch 0 taken 6753 times.
✓ Branch 1 taken 660 times.
✗ Branch 2 not taken.
|
7413 | switch (dst.type) { |
947 | 6753 | case SCH_NODE_TYPE_DEC: { | |
948 | SchDec *dec; | ||
949 | |||
950 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6753 times.
|
6753 | av_assert0(dst.idx < sch->nb_dec); |
951 | 6753 | dec = &sch->dec[dst.idx]; | |
952 | |||
953 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6753 times.
|
6753 | av_assert0(!dec->src.type); |
954 | 6753 | dec->src = src; | |
955 | 6753 | break; | |
956 | } | ||
957 | 660 | case SCH_NODE_TYPE_MUX: { | |
958 | SchMuxStream *ms; | ||
959 | |||
960 |
2/4✓ Branch 0 taken 660 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 660 times.
|
660 | av_assert0(dst.idx < sch->nb_mux && |
961 | dst.idx_stream < sch->mux[dst.idx].nb_streams); | ||
962 | 660 | ms = &sch->mux[dst.idx].streams[dst.idx_stream]; | |
963 | |||
964 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 660 times.
|
660 | av_assert0(!ms->src.type); |
965 | 660 | ms->src = src; | |
966 | |||
967 | 660 | break; | |
968 | } | ||
969 | ✗ | default: av_assert0(0); | |
970 | } | ||
971 | |||
972 | 7413 | break; | |
973 | } | ||
974 | 6812 | case SCH_NODE_TYPE_DEC: { | |
975 | SchDec *dec; | ||
976 | SchDecOutput *o; | ||
977 | |||
978 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6812 times.
|
6812 | av_assert0(src.idx < sch->nb_dec); |
979 | 6812 | dec = &sch->dec[src.idx]; | |
980 | |||
981 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6812 times.
|
6812 | av_assert0(src.idx_stream < dec->nb_outputs); |
982 | 6812 | o = &dec->outputs[src.idx_stream]; | |
983 | |||
984 | 6812 | ret = GROW_ARRAY(o->dst, o->nb_dst); | |
985 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6812 times.
|
6812 | if (ret < 0) |
986 | ✗ | return ret; | |
987 | |||
988 | 6812 | o->dst[o->nb_dst - 1] = dst; | |
989 | |||
990 | // decoded frames go to filters or encoding | ||
991 |
2/3✓ Branch 0 taken 6774 times.
✓ Branch 1 taken 38 times.
✗ Branch 2 not taken.
|
6812 | switch (dst.type) { |
992 | 6774 | case SCH_NODE_TYPE_FILTER_IN: { | |
993 | SchFilterIn *fi; | ||
994 | |||
995 |
2/4✓ Branch 0 taken 6774 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6774 times.
|
6774 | av_assert0(dst.idx < sch->nb_filters && |
996 | dst.idx_stream < sch->filters[dst.idx].nb_inputs); | ||
997 | 6774 | fi = &sch->filters[dst.idx].inputs[dst.idx_stream]; | |
998 | |||
999 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6774 times.
|
6774 | av_assert0(!fi->src.type); |
1000 | 6774 | fi->src = src; | |
1001 | 6774 | break; | |
1002 | } | ||
1003 | 38 | case SCH_NODE_TYPE_ENC: { | |
1004 | SchEnc *enc; | ||
1005 | |||
1006 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
|
38 | av_assert0(dst.idx < sch->nb_enc); |
1007 | 38 | enc = &sch->enc[dst.idx]; | |
1008 | |||
1009 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
|
38 | av_assert0(!enc->src.type); |
1010 | 38 | enc->src = src; | |
1011 | 38 | break; | |
1012 | } | ||
1013 | ✗ | default: av_assert0(0); | |
1014 | } | ||
1015 | |||
1016 | 6812 | break; | |
1017 | } | ||
1018 | 7658 | case SCH_NODE_TYPE_FILTER_OUT: { | |
1019 | SchFilterOut *fo; | ||
1020 | |||
1021 |
2/4✓ Branch 0 taken 7658 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7658 times.
|
7658 | av_assert0(src.idx < sch->nb_filters && |
1022 | src.idx_stream < sch->filters[src.idx].nb_outputs); | ||
1023 | 7658 | fo = &sch->filters[src.idx].outputs[src.idx_stream]; | |
1024 | |||
1025 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7658 times.
|
7658 | av_assert0(!fo->dst.type); |
1026 | 7658 | fo->dst = dst; | |
1027 | |||
1028 | // filtered frames go to encoding or another filtergraph | ||
1029 |
1/3✓ Branch 0 taken 7658 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
|
7658 | switch (dst.type) { |
1030 | 7658 | case SCH_NODE_TYPE_ENC: { | |
1031 | SchEnc *enc; | ||
1032 | |||
1033 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7658 times.
|
7658 | av_assert0(dst.idx < sch->nb_enc); |
1034 | 7658 | enc = &sch->enc[dst.idx]; | |
1035 | |||
1036 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7658 times.
|
7658 | av_assert0(!enc->src.type); |
1037 | 7658 | enc->src = src; | |
1038 | 7658 | break; | |
1039 | } | ||
1040 | ✗ | case SCH_NODE_TYPE_FILTER_IN: { | |
1041 | SchFilterIn *fi; | ||
1042 | |||
1043 | ✗ | av_assert0(dst.idx < sch->nb_filters && | |
1044 | dst.idx_stream < sch->filters[dst.idx].nb_inputs); | ||
1045 | ✗ | fi = &sch->filters[dst.idx].inputs[dst.idx_stream]; | |
1046 | |||
1047 | ✗ | av_assert0(!fi->src.type); | |
1048 | ✗ | fi->src = src; | |
1049 | ✗ | break; | |
1050 | } | ||
1051 | ✗ | default: av_assert0(0); | |
1052 | } | ||
1053 | |||
1054 | |||
1055 | 7658 | break; | |
1056 | } | ||
1057 | 7697 | case SCH_NODE_TYPE_ENC: { | |
1058 | SchEnc *enc; | ||
1059 | |||
1060 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7697 times.
|
7697 | av_assert0(src.idx < sch->nb_enc); |
1061 | 7697 | enc = &sch->enc[src.idx]; | |
1062 | |||
1063 | 7697 | ret = GROW_ARRAY(enc->dst, enc->nb_dst); | |
1064 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7697 times.
|
7697 | if (ret < 0) |
1065 | ✗ | return ret; | |
1066 | |||
1067 | 7697 | enc->dst[enc->nb_dst - 1] = dst; | |
1068 | |||
1069 | // encoding packets go to muxing or decoding | ||
1070 |
2/3✓ Branch 0 taken 7696 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
7697 | switch (dst.type) { |
1071 | 7696 | case SCH_NODE_TYPE_MUX: { | |
1072 | SchMuxStream *ms; | ||
1073 | |||
1074 |
2/4✓ Branch 0 taken 7696 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7696 times.
|
7696 | av_assert0(dst.idx < sch->nb_mux && |
1075 | dst.idx_stream < sch->mux[dst.idx].nb_streams); | ||
1076 | 7696 | ms = &sch->mux[dst.idx].streams[dst.idx_stream]; | |
1077 | |||
1078 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7696 times.
|
7696 | av_assert0(!ms->src.type); |
1079 | 7696 | ms->src = src; | |
1080 | |||
1081 | 7696 | break; | |
1082 | } | ||
1083 | 1 | case SCH_NODE_TYPE_DEC: { | |
1084 | SchDec *dec; | ||
1085 | |||
1086 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | av_assert0(dst.idx < sch->nb_dec); |
1087 | 1 | dec = &sch->dec[dst.idx]; | |
1088 | |||
1089 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | av_assert0(!dec->src.type); |
1090 | 1 | dec->src = src; | |
1091 | |||
1092 | 1 | break; | |
1093 | } | ||
1094 | ✗ | default: av_assert0(0); | |
1095 | } | ||
1096 | |||
1097 | 7697 | break; | |
1098 | } | ||
1099 | ✗ | default: av_assert0(0); | |
1100 | } | ||
1101 | |||
1102 | 29580 | return 0; | |
1103 | } | ||
1104 | |||
1105 | 7916 | static int mux_task_start(SchMux *mux) | |
1106 | { | ||
1107 | 7916 | int ret = 0; | |
1108 | |||
1109 | 7916 | ret = task_start(&mux->task); | |
1110 |
1/2✓ Branch 0 taken 7916 times.
✗ Branch 1 not taken.
|
7916 | if (ret < 0) |
1111 | ✗ | return ret; | |
1112 | |||
1113 | /* flush the pre-muxing queues */ | ||
1114 | 3362 | while (1) { | |
1115 | 11278 | int min_stream = -1; | |
1116 | 11278 | Timestamp min_ts = { .ts = AV_NOPTS_VALUE }; | |
1117 | |||
1118 | AVPacket *pkt; | ||
1119 | |||
1120 | // find the stream with the earliest dts or EOF in pre-muxing queue | ||
1121 |
2/2✓ Branch 0 taken 21851 times.
✓ Branch 1 taken 11172 times.
|
33023 | for (unsigned i = 0; i < mux->nb_streams; i++) { |
1122 | 21851 | SchMuxStream *ms = &mux->streams[i]; | |
1123 | |||
1124 |
2/2✓ Branch 1 taken 16207 times.
✓ Branch 2 taken 5644 times.
|
21851 | if (av_fifo_peek(ms->pre_mux_queue.fifo, &pkt, 1, 0) < 0) |
1125 | 16207 | continue; | |
1126 | |||
1127 |
4/4✓ Branch 0 taken 5551 times.
✓ Branch 1 taken 93 times.
✓ Branch 2 taken 13 times.
✓ Branch 3 taken 5538 times.
|
5644 | if (!pkt || pkt->dts == AV_NOPTS_VALUE) { |
1128 | 106 | min_stream = i; | |
1129 | 106 | break; | |
1130 | } | ||
1131 | |||
1132 |
4/4✓ Branch 0 taken 2256 times.
✓ Branch 1 taken 3282 times.
✓ Branch 2 taken 38 times.
✓ Branch 3 taken 2218 times.
|
7794 | if (min_ts.ts == AV_NOPTS_VALUE || |
1133 | 2256 | av_compare_ts(min_ts.ts, min_ts.tb, pkt->dts, pkt->time_base) > 0) { | |
1134 | 3320 | min_stream = i; | |
1135 | 3320 | min_ts = (Timestamp){ .ts = pkt->dts, .tb = pkt->time_base }; | |
1136 | } | ||
1137 | } | ||
1138 | |||
1139 |
2/2✓ Branch 0 taken 3362 times.
✓ Branch 1 taken 7916 times.
|
11278 | if (min_stream >= 0) { |
1140 | 3362 | SchMuxStream *ms = &mux->streams[min_stream]; | |
1141 | |||
1142 | 3362 | ret = av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1); | |
1143 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3362 times.
|
3362 | av_assert0(ret >= 0); |
1144 | |||
1145 |
2/2✓ Branch 0 taken 3269 times.
✓ Branch 1 taken 93 times.
|
3362 | if (pkt) { |
1146 |
2/2✓ Branch 0 taken 3255 times.
✓ Branch 1 taken 14 times.
|
3269 | if (!ms->init_eof) |
1147 | 3255 | ret = tq_send(mux->queue, min_stream, pkt); | |
1148 | 3269 | av_packet_free(&pkt); | |
1149 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 3268 times.
|
3269 | if (ret == AVERROR_EOF) |
1150 | 1 | ms->init_eof = 1; | |
1151 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3268 times.
|
3268 | else if (ret < 0) |
1152 | ✗ | return ret; | |
1153 | } else | ||
1154 | 93 | tq_send_finish(mux->queue, min_stream); | |
1155 | |||
1156 | 3362 | continue; | |
1157 | } | ||
1158 | |||
1159 | 7916 | break; | |
1160 | } | ||
1161 | |||
1162 | 7916 | atomic_store(&mux->mux_started, 1); | |
1163 | |||
1164 | 7916 | return 0; | |
1165 | } | ||
1166 | |||
1167 | int print_sdp(const char *filename); | ||
1168 | |||
1169 | 7916 | static int mux_init(Scheduler *sch, SchMux *mux) | |
1170 | { | ||
1171 | int ret; | ||
1172 | |||
1173 | 7916 | ret = mux->init(mux->task.func_arg); | |
1174 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7916 times.
|
7916 | if (ret < 0) |
1175 | ✗ | return ret; | |
1176 | |||
1177 | 7916 | sch->nb_mux_ready++; | |
1178 | |||
1179 |
2/4✓ Branch 0 taken 7916 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7916 times.
|
7916 | if (sch->sdp_filename || sch->sdp_auto) { |
1180 | ✗ | if (sch->nb_mux_ready < sch->nb_mux) | |
1181 | ✗ | return 0; | |
1182 | |||
1183 | ✗ | ret = print_sdp(sch->sdp_filename); | |
1184 | ✗ | if (ret < 0) { | |
1185 | ✗ | av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n"); | |
1186 | ✗ | return ret; | |
1187 | } | ||
1188 | |||
1189 | /* SDP is written only after all the muxers are ready, so now we | ||
1190 | * start ALL the threads */ | ||
1191 | ✗ | for (unsigned i = 0; i < sch->nb_mux; i++) { | |
1192 | ✗ | ret = mux_task_start(&sch->mux[i]); | |
1193 | ✗ | if (ret < 0) | |
1194 | ✗ | return ret; | |
1195 | } | ||
1196 | } else { | ||
1197 | 7916 | ret = mux_task_start(mux); | |
1198 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7916 times.
|
7916 | if (ret < 0) |
1199 | ✗ | return ret; | |
1200 | } | ||
1201 | |||
1202 | 7916 | return 0; | |
1203 | } | ||
1204 | |||
1205 | 8356 | void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, | |
1206 | size_t data_threshold, int max_packets) | ||
1207 | { | ||
1208 | SchMux *mux; | ||
1209 | SchMuxStream *ms; | ||
1210 | |||
1211 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 8356 times.
|
8356 | av_assert0(mux_idx < sch->nb_mux); |
1212 | 8356 | mux = &sch->mux[mux_idx]; | |
1213 | |||
1214 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 8356 times.
|
8356 | av_assert0(stream_idx < mux->nb_streams); |
1215 | 8356 | ms = &mux->streams[stream_idx]; | |
1216 | |||
1217 | 8356 | ms->pre_mux_queue.max_packets = max_packets; | |
1218 | 8356 | ms->pre_mux_queue.data_threshold = data_threshold; | |
1219 | 8356 | } | |
1220 | |||
1221 | 8356 | int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx) | |
1222 | { | ||
1223 | SchMux *mux; | ||
1224 | 8356 | int ret = 0; | |
1225 | |||
1226 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 8356 times.
|
8356 | av_assert0(mux_idx < sch->nb_mux); |
1227 | 8356 | mux = &sch->mux[mux_idx]; | |
1228 | |||
1229 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 8356 times.
|
8356 | av_assert0(stream_idx < mux->nb_streams); |
1230 | |||
1231 | 8356 | pthread_mutex_lock(&sch->mux_ready_lock); | |
1232 | |||
1233 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 8356 times.
|
8356 | av_assert0(mux->nb_streams_ready < mux->nb_streams); |
1234 | |||
1235 | // this may be called during initialization - do not start | ||
1236 | // threads before sch_start() is called | ||
1237 |
2/2✓ Branch 0 taken 7916 times.
✓ Branch 1 taken 440 times.
|
8356 | if (++mux->nb_streams_ready == mux->nb_streams && |
1238 |
2/2✓ Branch 0 taken 7462 times.
✓ Branch 1 taken 454 times.
|
7916 | sch->state >= SCH_STATE_STARTED) |
1239 | 7462 | ret = mux_init(sch, mux); | |
1240 | |||
1241 | 8356 | pthread_mutex_unlock(&sch->mux_ready_lock); | |
1242 | |||
1243 | 8356 | return ret; | |
1244 | } | ||
1245 | |||
1246 | 1 | int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, | |
1247 | unsigned dec_idx) | ||
1248 | { | ||
1249 | SchMux *mux; | ||
1250 | SchMuxStream *ms; | ||
1251 | 1 | int ret = 0; | |
1252 | |||
1253 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | av_assert0(mux_idx < sch->nb_mux); |
1254 | 1 | mux = &sch->mux[mux_idx]; | |
1255 | |||
1256 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | av_assert0(stream_idx < mux->nb_streams); |
1257 | 1 | ms = &mux->streams[stream_idx]; | |
1258 | |||
1259 | 1 | ret = GROW_ARRAY(ms->sub_heartbeat_dst, ms->nb_sub_heartbeat_dst); | |
1260 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (ret < 0) |
1261 | ✗ | return ret; | |
1262 | |||
1263 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | av_assert0(dec_idx < sch->nb_dec); |
1264 | 1 | ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx; | |
1265 | |||
1266 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
1 | if (!mux->sub_heartbeat_pkt) { |
1267 | 1 | mux->sub_heartbeat_pkt = av_packet_alloc(); | |
1268 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (!mux->sub_heartbeat_pkt) |
1269 | ✗ | return AVERROR(ENOMEM); | |
1270 | } | ||
1271 | |||
1272 | 1 | return 0; | |
1273 | } | ||
1274 | |||
1275 | 518655 | static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) | |
1276 | { | ||
1277 | 427274 | while (1) { | |
1278 | SchFilterGraph *fg; | ||
1279 | |||
1280 | // fed directly by a demuxer (i.e. not through a filtergraph) | ||
1281 |
2/2✓ Branch 0 taken 494071 times.
✓ Branch 1 taken 451858 times.
|
945929 | if (src.type == SCH_NODE_TYPE_DEMUX) { |
1282 | 494071 | sch->demux[src.idx].waiter.choked_next = 0; | |
1283 | 494071 | return; | |
1284 | } | ||
1285 | |||
1286 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 451858 times.
|
451858 | av_assert0(src.type == SCH_NODE_TYPE_FILTER_OUT); |
1287 | 451858 | fg = &sch->filters[src.idx]; | |
1288 | |||
1289 | // the filtergraph contains internal sources and | ||
1290 | // requested to be scheduled directly | ||
1291 |
2/2✓ Branch 0 taken 24584 times.
✓ Branch 1 taken 427274 times.
|
451858 | if (fg->best_input == fg->nb_inputs) { |
1292 | 24584 | fg->waiter.choked_next = 0; | |
1293 | 24584 | return; | |
1294 | } | ||
1295 | |||
1296 | 427274 | src = fg->inputs[fg->best_input].src_sched; | |
1297 | } | ||
1298 | } | ||
1299 | |||
1300 | 531566 | static void schedule_update_locked(Scheduler *sch) | |
1301 | { | ||
1302 | int64_t dts; | ||
1303 | 531566 | int have_unchoked = 0; | |
1304 | |||
1305 | // on termination request all waiters are choked, | ||
1306 | // we are not to unchoke them | ||
1307 |
2/2✓ Branch 0 taken 4128 times.
✓ Branch 1 taken 527438 times.
|
531566 | if (atomic_load(&sch->terminate)) |
1308 | 4128 | return; | |
1309 | |||
1310 | 527438 | dts = trailing_dts(sch, 0); | |
1311 | |||
1312 | 527438 | atomic_store(&sch->last_dts, dts); | |
1313 | |||
1314 | // initialize our internal state | ||
1315 |
2/2✓ Branch 0 taken 1054876 times.
✓ Branch 1 taken 527438 times.
|
1582314 | for (unsigned type = 0; type < 2; type++) |
1316 |
4/4✓ Branch 0 taken 1004805 times.
✓ Branch 1 taken 1037045 times.
✓ Branch 2 taken 986974 times.
✓ Branch 3 taken 1054876 times.
|
2041850 | for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) { |
1317 |
2/2✓ Branch 0 taken 477367 times.
✓ Branch 1 taken 509607 times.
|
986974 | SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter; |
1318 | 986974 | w->choked_prev = atomic_load(&w->choked); | |
1319 | 986974 | w->choked_next = 1; | |
1320 | } | ||
1321 | |||
1322 | // figure out the sources that are allowed to proceed | ||
1323 |
2/2✓ Branch 0 taken 527705 times.
✓ Branch 1 taken 527438 times.
|
1055143 | for (unsigned i = 0; i < sch->nb_mux; i++) { |
1324 | 527705 | SchMux *mux = &sch->mux[i]; | |
1325 | |||
1326 |
2/2✓ Branch 0 taken 582523 times.
✓ Branch 1 taken 527705 times.
|
1110228 | for (unsigned j = 0; j < mux->nb_streams; j++) { |
1327 | 582523 | SchMuxStream *ms = &mux->streams[j]; | |
1328 | |||
1329 | // unblock sources for output streams that are not finished | ||
1330 | // and not too far ahead of the trailing stream | ||
1331 |
2/2✓ Branch 0 taken 36686 times.
✓ Branch 1 taken 545837 times.
|
582523 | if (ms->source_finished) |
1332 | 36686 | continue; | |
1333 |
4/4✓ Branch 0 taken 29389 times.
✓ Branch 1 taken 516448 times.
✓ Branch 2 taken 9592 times.
✓ Branch 3 taken 19797 times.
|
545837 | if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE) |
1334 | 9592 | continue; | |
1335 |
4/4✓ Branch 0 taken 516448 times.
✓ Branch 1 taken 19797 times.
✓ Branch 2 taken 17590 times.
✓ Branch 3 taken 498858 times.
|
536245 | if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE) |
1336 | 17590 | continue; | |
1337 | |||
1338 | // resolve the source to unchoke | ||
1339 | 518655 | unchoke_for_stream(sch, ms->src_sched); | |
1340 | 518655 | have_unchoked = 1; | |
1341 | } | ||
1342 | } | ||
1343 | |||
1344 | // make sure to unchoke at least one source, if still available | ||
1345 |
4/4✓ Branch 0 taken 61757 times.
✓ Branch 1 taken 513553 times.
✓ Branch 2 taken 47872 times.
✓ Branch 3 taken 13885 times.
|
575310 | for (unsigned type = 0; !have_unchoked && type < 2; type++) |
1346 |
4/4✓ Branch 0 taken 32987 times.
✓ Branch 1 taken 46728 times.
✓ Branch 2 taken 46116 times.
✓ Branch 3 taken 33599 times.
|
79715 | for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) { |
1347 |
2/2✓ Branch 0 taken 19102 times.
✓ Branch 1 taken 27014 times.
|
46116 | int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited; |
1348 |
2/2✓ Branch 0 taken 19102 times.
✓ Branch 1 taken 27014 times.
|
46116 | SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter; |
1349 |
2/2✓ Branch 0 taken 14273 times.
✓ Branch 1 taken 31843 times.
|
46116 | if (!exited) { |
1350 | 14273 | w->choked_next = 0; | |
1351 | 14273 | have_unchoked = 1; | |
1352 | 14273 | break; | |
1353 | } | ||
1354 | } | ||
1355 | |||
1356 | |||
1357 |
2/2✓ Branch 0 taken 1054876 times.
✓ Branch 1 taken 527438 times.
|
1582314 | for (unsigned type = 0; type < 2; type++) |
1358 |
4/4✓ Branch 0 taken 1004805 times.
✓ Branch 1 taken 1037045 times.
✓ Branch 2 taken 986974 times.
✓ Branch 3 taken 1054876 times.
|
2041850 | for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) { |
1359 |
2/2✓ Branch 0 taken 477367 times.
✓ Branch 1 taken 509607 times.
|
986974 | SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter; |
1360 |
2/2✓ Branch 0 taken 18193 times.
✓ Branch 1 taken 968781 times.
|
986974 | if (w->choked_prev != w->choked_next) |
1361 | 18193 | waiter_set(w, w->choked_next); | |
1362 | } | ||
1363 | |||
1364 | } | ||
1365 | |||
1366 | enum { | ||
1367 | CYCLE_NODE_NEW = 0, | ||
1368 | CYCLE_NODE_STARTED, | ||
1369 | CYCLE_NODE_DONE, | ||
1370 | }; | ||
1371 | |||
1372 | static int | ||
1373 | 7537 | check_acyclic_for_output(const Scheduler *sch, SchedulerNode src, | |
1374 | uint8_t *filters_visited, SchedulerNode *filters_stack) | ||
1375 | { | ||
1376 | 7537 | unsigned nb_filters_stack = 0; | |
1377 | |||
1378 | 7537 | memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited)); | |
1379 | |||
1380 | 6776 | while (1) { | |
1381 | 14313 | const SchFilterGraph *fg = &sch->filters[src.idx]; | |
1382 | |||
1383 | 14313 | filters_visited[src.idx] = CYCLE_NODE_STARTED; | |
1384 | |||
1385 | // descend into every input, depth first | ||
1386 |
2/2✓ Branch 0 taken 6775 times.
✓ Branch 1 taken 7538 times.
|
14313 | if (src.idx_stream < fg->nb_inputs) { |
1387 | 6775 | const SchFilterIn *fi = &fg->inputs[src.idx_stream++]; | |
1388 | |||
1389 | // connected to demuxer, no cycles possible | ||
1390 |
2/2✓ Branch 0 taken 6774 times.
✓ Branch 1 taken 1 times.
|
6775 | if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX) |
1391 | 6775 | continue; | |
1392 | |||
1393 | // otherwise connected to another filtergraph | ||
1394 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | av_assert0(fi->src_sched.type == SCH_NODE_TYPE_FILTER_OUT); |
1395 | |||
1396 | // found a cycle | ||
1397 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED) |
1398 | ✗ | return AVERROR(EINVAL); | |
1399 | |||
1400 | // place current position on stack and descend | ||
1401 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | av_assert0(nb_filters_stack < sch->nb_filters); |
1402 | 1 | filters_stack[nb_filters_stack++] = src; | |
1403 | 1 | src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 }; | |
1404 | 1 | continue; | |
1405 | } | ||
1406 | |||
1407 | 7538 | filters_visited[src.idx] = CYCLE_NODE_DONE; | |
1408 | |||
1409 | // previous search finished, | ||
1410 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 7537 times.
|
7538 | if (nb_filters_stack) { |
1411 | 1 | src = filters_stack[--nb_filters_stack]; | |
1412 | 1 | continue; | |
1413 | } | ||
1414 | 7537 | return 0; | |
1415 | } | ||
1416 | } | ||
1417 | |||
1418 | 7914 | static int check_acyclic(Scheduler *sch) | |
1419 | { | ||
1420 | 7914 | uint8_t *filters_visited = NULL; | |
1421 | 7914 | SchedulerNode *filters_stack = NULL; | |
1422 | |||
1423 | 7914 | int ret = 0; | |
1424 | |||
1425 |
2/2✓ Branch 0 taken 489 times.
✓ Branch 1 taken 7425 times.
|
7914 | if (!sch->nb_filters) |
1426 | 489 | return 0; | |
1427 | |||
1428 | 7425 | filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited)); | |
1429 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7425 times.
|
7425 | if (!filters_visited) |
1430 | ✗ | return AVERROR(ENOMEM); | |
1431 | |||
1432 | 7425 | filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack)); | |
1433 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7425 times.
|
7425 | if (!filters_stack) { |
1434 | ✗ | ret = AVERROR(ENOMEM); | |
1435 | ✗ | goto fail; | |
1436 | } | ||
1437 | |||
1438 | // trace the transcoding graph upstream from every filtegraph | ||
1439 |
2/2✓ Branch 0 taken 7537 times.
✓ Branch 1 taken 7425 times.
|
14962 | for (unsigned i = 0; i < sch->nb_filters; i++) { |
1440 | 7537 | ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i }, | |
1441 | filters_visited, filters_stack); | ||
1442 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7537 times.
|
7537 | if (ret < 0) { |
1443 | ✗ | av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n"); | |
1444 | ✗ | goto fail; | |
1445 | } | ||
1446 | } | ||
1447 | |||
1448 | 7425 | fail: | |
1449 | 7425 | av_freep(&filters_visited); | |
1450 | 7425 | av_freep(&filters_stack); | |
1451 | 7425 | return ret; | |
1452 | } | ||
1453 | |||
1454 | 7914 | static int start_prepare(Scheduler *sch) | |
1455 | { | ||
1456 | int ret; | ||
1457 | |||
1458 |
2/2✓ Branch 0 taken 7164 times.
✓ Branch 1 taken 7914 times.
|
15078 | for (unsigned i = 0; i < sch->nb_demux; i++) { |
1459 | 7164 | SchDemux *d = &sch->demux[i]; | |
1460 | |||
1461 |
2/2✓ Branch 0 taken 7390 times.
✓ Branch 1 taken 7164 times.
|
14554 | for (unsigned j = 0; j < d->nb_streams; j++) { |
1462 | 7390 | SchDemuxStream *ds = &d->streams[j]; | |
1463 | |||
1464 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7390 times.
|
7390 | if (!ds->nb_dst) { |
1465 | ✗ | av_log(d, AV_LOG_ERROR, | |
1466 | "Demuxer stream %u not connected to any sink\n", j); | ||
1467 | ✗ | return AVERROR(EINVAL); | |
1468 | } | ||
1469 | |||
1470 | 7390 | ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished)); | |
1471 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7390 times.
|
7390 | if (!ds->dst_finished) |
1472 | ✗ | return AVERROR(ENOMEM); | |
1473 | } | ||
1474 | } | ||
1475 | |||
1476 |
2/2✓ Branch 0 taken 6754 times.
✓ Branch 1 taken 7914 times.
|
14668 | for (unsigned i = 0; i < sch->nb_dec; i++) { |
1477 | 6754 | SchDec *dec = &sch->dec[i]; | |
1478 | |||
1479 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6754 times.
|
6754 | if (!dec->src.type) { |
1480 | ✗ | av_log(dec, AV_LOG_ERROR, | |
1481 | "Decoder not connected to a source\n"); | ||
1482 | ✗ | return AVERROR(EINVAL); | |
1483 | } | ||
1484 | |||
1485 |
2/2✓ Branch 0 taken 6760 times.
✓ Branch 1 taken 6754 times.
|
13514 | for (unsigned j = 0; j < dec->nb_outputs; j++) { |
1486 | 6760 | SchDecOutput *o = &dec->outputs[j]; | |
1487 | |||
1488 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6760 times.
|
6760 | if (!o->nb_dst) { |
1489 | ✗ | av_log(dec, AV_LOG_ERROR, | |
1490 | "Decoder output %u not connected to any sink\n", j); | ||
1491 | ✗ | return AVERROR(EINVAL); | |
1492 | } | ||
1493 | |||
1494 | 6760 | o->dst_finished = av_calloc(o->nb_dst, sizeof(*o->dst_finished)); | |
1495 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6760 times.
|
6760 | if (!o->dst_finished) |
1496 | ✗ | return AVERROR(ENOMEM); | |
1497 | } | ||
1498 | } | ||
1499 | |||
1500 |
2/2✓ Branch 0 taken 7696 times.
✓ Branch 1 taken 7914 times.
|
15610 | for (unsigned i = 0; i < sch->nb_enc; i++) { |
1501 | 7696 | SchEnc *enc = &sch->enc[i]; | |
1502 | |||
1503 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7696 times.
|
7696 | if (!enc->src.type) { |
1504 | ✗ | av_log(enc, AV_LOG_ERROR, | |
1505 | "Encoder not connected to a source\n"); | ||
1506 | ✗ | return AVERROR(EINVAL); | |
1507 | } | ||
1508 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7696 times.
|
7696 | if (!enc->nb_dst) { |
1509 | ✗ | av_log(enc, AV_LOG_ERROR, | |
1510 | "Encoder not connected to any sink\n"); | ||
1511 | ✗ | return AVERROR(EINVAL); | |
1512 | } | ||
1513 | |||
1514 | 7696 | enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished)); | |
1515 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7696 times.
|
7696 | if (!enc->dst_finished) |
1516 | ✗ | return AVERROR(ENOMEM); | |
1517 | } | ||
1518 | |||
1519 |
2/2✓ Branch 0 taken 7916 times.
✓ Branch 1 taken 7914 times.
|
15830 | for (unsigned i = 0; i < sch->nb_mux; i++) { |
1520 | 7916 | SchMux *mux = &sch->mux[i]; | |
1521 | |||
1522 |
2/2✓ Branch 0 taken 8356 times.
✓ Branch 1 taken 7916 times.
|
16272 | for (unsigned j = 0; j < mux->nb_streams; j++) { |
1523 | 8356 | SchMuxStream *ms = &mux->streams[j]; | |
1524 | |||
1525 |
2/3✓ Branch 0 taken 7696 times.
✓ Branch 1 taken 660 times.
✗ Branch 2 not taken.
|
8356 | switch (ms->src.type) { |
1526 | 7696 | case SCH_NODE_TYPE_ENC: { | |
1527 | 7696 | SchEnc *enc = &sch->enc[ms->src.idx]; | |
1528 |
2/2✓ Branch 0 taken 38 times.
✓ Branch 1 taken 7658 times.
|
7696 | if (enc->src.type == SCH_NODE_TYPE_DEC) { |
1529 | 38 | ms->src_sched = sch->dec[enc->src.idx].src; | |
1530 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
|
38 | av_assert0(ms->src_sched.type == SCH_NODE_TYPE_DEMUX); |
1531 | } else { | ||
1532 | 7658 | ms->src_sched = enc->src; | |
1533 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7658 times.
|
7658 | av_assert0(ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT); |
1534 | } | ||
1535 | 7696 | break; | |
1536 | } | ||
1537 | 660 | case SCH_NODE_TYPE_DEMUX: | |
1538 | 660 | ms->src_sched = ms->src; | |
1539 | 660 | break; | |
1540 | ✗ | default: | |
1541 | ✗ | av_log(mux, AV_LOG_ERROR, | |
1542 | "Muxer stream #%u not connected to a source\n", j); | ||
1543 | ✗ | return AVERROR(EINVAL); | |
1544 | } | ||
1545 | } | ||
1546 | |||
1547 | 7916 | ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size, | |
1548 | QUEUE_PACKETS); | ||
1549 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7916 times.
|
7916 | if (ret < 0) |
1550 | ✗ | return ret; | |
1551 | } | ||
1552 | |||
1553 |
2/2✓ Branch 0 taken 7537 times.
✓ Branch 1 taken 7914 times.
|
15451 | for (unsigned i = 0; i < sch->nb_filters; i++) { |
1554 | 7537 | SchFilterGraph *fg = &sch->filters[i]; | |
1555 | |||
1556 |
2/2✓ Branch 0 taken 6774 times.
✓ Branch 1 taken 7537 times.
|
14311 | for (unsigned j = 0; j < fg->nb_inputs; j++) { |
1557 | 6774 | SchFilterIn *fi = &fg->inputs[j]; | |
1558 | SchDec *dec; | ||
1559 | |||
1560 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6774 times.
|
6774 | if (!fi->src.type) { |
1561 | ✗ | av_log(fg, AV_LOG_ERROR, | |
1562 | "Filtergraph input %u not connected to a source\n", j); | ||
1563 | ✗ | return AVERROR(EINVAL); | |
1564 | } | ||
1565 | |||
1566 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6774 times.
|
6774 | if (fi->src.type == SCH_NODE_TYPE_FILTER_OUT) |
1567 | ✗ | fi->src_sched = fi->src; | |
1568 | else { | ||
1569 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6774 times.
|
6774 | av_assert0(fi->src.type == SCH_NODE_TYPE_DEC); |
1570 | 6774 | dec = &sch->dec[fi->src.idx]; | |
1571 | |||
1572 |
2/3✓ Branch 0 taken 6773 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
6774 | switch (dec->src.type) { |
1573 | 6773 | case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break; | |
1574 | 1 | case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break; | |
1575 | ✗ | default: av_assert0(0); | |
1576 | } | ||
1577 | } | ||
1578 | } | ||
1579 | |||
1580 |
2/2✓ Branch 0 taken 7658 times.
✓ Branch 1 taken 7537 times.
|
15195 | for (unsigned j = 0; j < fg->nb_outputs; j++) { |
1581 | 7658 | SchFilterOut *fo = &fg->outputs[j]; | |
1582 | |||
1583 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7658 times.
|
7658 | if (!fo->dst.type) { |
1584 | ✗ | av_log(fg, AV_LOG_ERROR, | |
1585 | "Filtergraph %u output %u not connected to a sink\n", i, j); | ||
1586 | ✗ | return AVERROR(EINVAL); | |
1587 | } | ||
1588 | } | ||
1589 | } | ||
1590 | |||
1591 | // Check that the transcoding graph has no cycles. | ||
1592 | 7914 | ret = check_acyclic(sch); | |
1593 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7914 times.
|
7914 | if (ret < 0) |
1594 | ✗ | return ret; | |
1595 | |||
1596 | 7914 | return 0; | |
1597 | } | ||
1598 | |||
1599 | 7914 | int sch_start(Scheduler *sch) | |
1600 | { | ||
1601 | int ret; | ||
1602 | |||
1603 | 7914 | ret = start_prepare(sch); | |
1604 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7914 times.
|
7914 | if (ret < 0) |
1605 | ✗ | return ret; | |
1606 | |||
1607 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7914 times.
|
7914 | av_assert0(sch->state == SCH_STATE_UNINIT); |
1608 | 7914 | sch->state = SCH_STATE_STARTED; | |
1609 | |||
1610 |
2/2✓ Branch 0 taken 7916 times.
✓ Branch 1 taken 7914 times.
|
15830 | for (unsigned i = 0; i < sch->nb_mux; i++) { |
1611 | 7916 | SchMux *mux = &sch->mux[i]; | |
1612 | |||
1613 |
2/2✓ Branch 0 taken 454 times.
✓ Branch 1 taken 7462 times.
|
7916 | if (mux->nb_streams_ready == mux->nb_streams) { |
1614 | 454 | ret = mux_init(sch, mux); | |
1615 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 454 times.
|
454 | if (ret < 0) |
1616 | ✗ | goto fail; | |
1617 | } | ||
1618 | } | ||
1619 | |||
1620 |
2/2✓ Branch 0 taken 7696 times.
✓ Branch 1 taken 7914 times.
|
15610 | for (unsigned i = 0; i < sch->nb_enc; i++) { |
1621 | 7696 | SchEnc *enc = &sch->enc[i]; | |
1622 | |||
1623 | 7696 | ret = task_start(&enc->task); | |
1624 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7696 times.
|
7696 | if (ret < 0) |
1625 | ✗ | goto fail; | |
1626 | } | ||
1627 | |||
1628 |
2/2✓ Branch 0 taken 7537 times.
✓ Branch 1 taken 7914 times.
|
15451 | for (unsigned i = 0; i < sch->nb_filters; i++) { |
1629 | 7537 | SchFilterGraph *fg = &sch->filters[i]; | |
1630 | |||
1631 | 7537 | ret = task_start(&fg->task); | |
1632 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7537 times.
|
7537 | if (ret < 0) |
1633 | ✗ | goto fail; | |
1634 | } | ||
1635 | |||
1636 |
2/2✓ Branch 0 taken 6754 times.
✓ Branch 1 taken 7914 times.
|
14668 | for (unsigned i = 0; i < sch->nb_dec; i++) { |
1637 | 6754 | SchDec *dec = &sch->dec[i]; | |
1638 | |||
1639 | 6754 | ret = task_start(&dec->task); | |
1640 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6754 times.
|
6754 | if (ret < 0) |
1641 | ✗ | goto fail; | |
1642 | } | ||
1643 | |||
1644 |
2/2✓ Branch 0 taken 7164 times.
✓ Branch 1 taken 7914 times.
|
15078 | for (unsigned i = 0; i < sch->nb_demux; i++) { |
1645 | 7164 | SchDemux *d = &sch->demux[i]; | |
1646 | |||
1647 |
2/2✓ Branch 0 taken 14 times.
✓ Branch 1 taken 7150 times.
|
7164 | if (!d->nb_streams) |
1648 | 14 | continue; | |
1649 | |||
1650 | 7150 | ret = task_start(&d->task); | |
1651 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7150 times.
|
7150 | if (ret < 0) |
1652 | ✗ | goto fail; | |
1653 | } | ||
1654 | |||
1655 | 7914 | pthread_mutex_lock(&sch->schedule_lock); | |
1656 | 7914 | schedule_update_locked(sch); | |
1657 | 7914 | pthread_mutex_unlock(&sch->schedule_lock); | |
1658 | |||
1659 | 7914 | return 0; | |
1660 | ✗ | fail: | |
1661 | ✗ | sch_stop(sch, NULL); | |
1662 | ✗ | return ret; | |
1663 | } | ||
1664 | |||
1665 | 24293 | int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts) | |
1666 | { | ||
1667 | int ret, err; | ||
1668 | |||
1669 | // convert delay to absolute timestamp | ||
1670 | 24293 | timeout_us += av_gettime(); | |
1671 | |||
1672 | 24293 | pthread_mutex_lock(&sch->mux_done_lock); | |
1673 | |||
1674 |
1/2✓ Branch 0 taken 24293 times.
✗ Branch 1 not taken.
|
24293 | if (sch->nb_mux_done < sch->nb_mux) { |
1675 | 24293 | struct timespec tv = { .tv_sec = timeout_us / 1000000, | |
1676 | 24293 | .tv_nsec = (timeout_us % 1000000) * 1000 }; | |
1677 | 24293 | pthread_cond_timedwait(&sch->mux_done_cond, &sch->mux_done_lock, &tv); | |
1678 | } | ||
1679 | |||
1680 | 24293 | ret = sch->nb_mux_done == sch->nb_mux; | |
1681 | |||
1682 | 24293 | pthread_mutex_unlock(&sch->mux_done_lock); | |
1683 | |||
1684 | 24293 | *transcode_ts = atomic_load(&sch->last_dts); | |
1685 | |||
1686 | // abort transcoding if any task failed | ||
1687 | 24293 | err = atomic_load(&sch->task_failed); | |
1688 | |||
1689 |
3/4✓ Branch 0 taken 16379 times.
✓ Branch 1 taken 7914 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 16379 times.
|
24293 | return ret || err; |
1690 | } | ||
1691 | |||
1692 | 7658 | static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame) | |
1693 | { | ||
1694 | int ret; | ||
1695 | |||
1696 | 7658 | ret = enc->open_cb(enc->task.func_arg, frame); | |
1697 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7658 times.
|
7658 | if (ret < 0) |
1698 | ✗ | return ret; | |
1699 | |||
1700 | // ret>0 signals audio frame size, which means sync queue must | ||
1701 | // have been enabled during encoder creation | ||
1702 |
2/2✓ Branch 0 taken 163 times.
✓ Branch 1 taken 7495 times.
|
7658 | if (ret > 0) { |
1703 | SchSyncQueue *sq; | ||
1704 | |||
1705 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 163 times.
|
163 | av_assert0(enc->sq_idx[0] >= 0); |
1706 | 163 | sq = &sch->sq_enc[enc->sq_idx[0]]; | |
1707 | |||
1708 | 163 | pthread_mutex_lock(&sq->lock); | |
1709 | |||
1710 | 163 | sq_frame_samples(sq->sq, enc->sq_idx[1], ret); | |
1711 | |||
1712 | 163 | pthread_mutex_unlock(&sq->lock); | |
1713 | } | ||
1714 | |||
1715 | 7658 | return 0; | |
1716 | } | ||
1717 | |||
1718 | 451548 | static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame) | |
1719 | { | ||
1720 | int ret; | ||
1721 | |||
1722 |
2/2✓ Branch 0 taken 15527 times.
✓ Branch 1 taken 436021 times.
|
451548 | if (!frame) { |
1723 | 15527 | tq_send_finish(enc->queue, 0); | |
1724 | 15527 | return 0; | |
1725 | } | ||
1726 | |||
1727 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 436021 times.
|
436021 | if (enc->in_finished) |
1728 | ✗ | return AVERROR_EOF; | |
1729 | |||
1730 | 436021 | ret = tq_send(enc->queue, 0, frame); | |
1731 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 436020 times.
|
436021 | if (ret < 0) |
1732 | 1 | enc->in_finished = 1; | |
1733 | |||
1734 | 436021 | return ret; | |
1735 | } | ||
1736 | |||
1737 | 35511 | static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame) | |
1738 | { | ||
1739 | 35511 | SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]]; | |
1740 | 35511 | int ret = 0; | |
1741 | |||
1742 | // inform the scheduling code that no more input will arrive along this path; | ||
1743 | // this is necessary because the sync queue may not send an EOF downstream | ||
1744 | // until other streams finish | ||
1745 | // TODO: consider a cleaner way of passing this information through | ||
1746 | // the pipeline | ||
1747 |
2/2✓ Branch 0 taken 3301 times.
✓ Branch 1 taken 32210 times.
|
35511 | if (!frame) { |
1748 |
2/2✓ Branch 0 taken 3301 times.
✓ Branch 1 taken 3301 times.
|
6602 | for (unsigned i = 0; i < enc->nb_dst; i++) { |
1749 | SchMux *mux; | ||
1750 | SchMuxStream *ms; | ||
1751 | |||
1752 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3301 times.
|
3301 | if (enc->dst[i].type != SCH_NODE_TYPE_MUX) |
1753 | ✗ | continue; | |
1754 | |||
1755 | 3301 | mux = &sch->mux[enc->dst[i].idx]; | |
1756 | 3301 | ms = &mux->streams[enc->dst[i].idx_stream]; | |
1757 | |||
1758 | 3301 | pthread_mutex_lock(&sch->schedule_lock); | |
1759 | |||
1760 | 3301 | ms->source_finished = 1; | |
1761 | 3301 | schedule_update_locked(sch); | |
1762 | |||
1763 | 3301 | pthread_mutex_unlock(&sch->schedule_lock); | |
1764 | } | ||
1765 | } | ||
1766 | |||
1767 | 35511 | pthread_mutex_lock(&sq->lock); | |
1768 | |||
1769 | 35511 | ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame)); | |
1770 |
1/2✓ Branch 0 taken 35511 times.
✗ Branch 1 not taken.
|
35511 | if (ret < 0) |
1771 | ✗ | goto finish; | |
1772 | |||
1773 | 83054 | while (1) { | |
1774 | SchEnc *enc; | ||
1775 | |||
1776 | // TODO: the SQ API should be extended to allow returning EOF | ||
1777 | // for individual streams | ||
1778 | 118565 | ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame)); | |
1779 |
2/2✓ Branch 0 taken 35511 times.
✓ Branch 1 taken 83054 times.
|
118565 | if (ret < 0) { |
1780 |
2/2✓ Branch 0 taken 6164 times.
✓ Branch 1 taken 29347 times.
|
35511 | ret = (ret == AVERROR(EAGAIN)) ? 0 : ret; |
1781 | 35511 | break; | |
1782 | } | ||
1783 | |||
1784 | 83054 | enc = &sch->enc[sq->enc_idx[ret]]; | |
1785 | 83054 | ret = send_to_enc_thread(sch, enc, sq->frame); | |
1786 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 83054 times.
|
83054 | if (ret < 0) { |
1787 | ✗ | av_frame_unref(sq->frame); | |
1788 | ✗ | if (ret != AVERROR_EOF) | |
1789 | ✗ | break; | |
1790 | |||
1791 | ✗ | sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL)); | |
1792 | ✗ | continue; | |
1793 | } | ||
1794 | } | ||
1795 | |||
1796 |
2/2✓ Branch 0 taken 29347 times.
✓ Branch 1 taken 6164 times.
|
35511 | if (ret < 0) { |
1797 | // close all encoders fed from this sync queue | ||
1798 |
2/2✓ Branch 0 taken 6416 times.
✓ Branch 1 taken 6164 times.
|
12580 | for (unsigned i = 0; i < sq->nb_enc_idx; i++) { |
1799 | 6416 | int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL); | |
1800 | |||
1801 | // if the sync queue error is EOF and closing the encoder | ||
1802 | // produces a more serious error, make sure to pick the latter | ||
1803 |
2/4✓ Branch 0 taken 6416 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 6416 times.
✗ Branch 3 not taken.
|
6416 | ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err); |
1804 | } | ||
1805 | } | ||
1806 | |||
1807 | 35511 | finish: | |
1808 | 35511 | pthread_mutex_unlock(&sq->lock); | |
1809 | |||
1810 | 35511 | return ret; | |
1811 | } | ||
1812 | |||
1813 | 397592 | static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame) | |
1814 | { | ||
1815 |
6/6✓ Branch 0 taken 396675 times.
✓ Branch 1 taken 917 times.
✓ Branch 2 taken 384301 times.
✓ Branch 3 taken 12374 times.
✓ Branch 4 taken 7658 times.
✓ Branch 5 taken 376643 times.
|
397592 | if (enc->open_cb && frame && !enc->opened) { |
1816 | 7658 | int ret = enc_open(sch, enc, frame); | |
1817 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7658 times.
|
7658 | if (ret < 0) |
1818 | ✗ | return ret; | |
1819 | 7658 | enc->opened = 1; | |
1820 | |||
1821 | // discard empty frames that only carry encoder init parameters | ||
1822 |
2/2✓ Branch 0 taken 3 times.
✓ Branch 1 taken 7655 times.
|
7658 | if (!frame->buf[0]) { |
1823 | 3 | av_frame_unref(frame); | |
1824 | 3 | return 0; | |
1825 | } | ||
1826 | } | ||
1827 | |||
1828 | 397589 | return (enc->sq_idx[0] >= 0) ? | |
1829 |
2/2✓ Branch 0 taken 35511 times.
✓ Branch 1 taken 362078 times.
|
759667 | send_to_enc_sq (sch, enc, frame) : |
1830 | 362078 | send_to_enc_thread(sch, enc, frame); | |
1831 | } | ||
1832 | |||
1833 | 3362 | static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt) | |
1834 | { | ||
1835 | 3362 | PreMuxQueue *q = &ms->pre_mux_queue; | |
1836 | 3362 | AVPacket *tmp_pkt = NULL; | |
1837 | int ret; | ||
1838 | |||
1839 |
2/2✓ Branch 1 taken 145 times.
✓ Branch 2 taken 3217 times.
|
3362 | if (!av_fifo_can_write(q->fifo)) { |
1840 | 145 | size_t packets = av_fifo_can_read(q->fifo); | |
1841 |
1/2✓ Branch 0 taken 145 times.
✗ Branch 1 not taken.
|
145 | size_t pkt_size = pkt ? pkt->size : 0; |
1842 | 145 | int thresh_reached = (q->data_size + pkt_size) > q->data_threshold; | |
1843 |
2/2✓ Branch 0 taken 140 times.
✓ Branch 1 taken 5 times.
|
145 | size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX; |
1844 | 145 | size_t new_size = FFMIN(2 * packets, max_packets); | |
1845 | |||
1846 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 145 times.
|
145 | if (new_size <= packets) { |
1847 | ✗ | av_log(mux, AV_LOG_ERROR, | |
1848 | "Too many packets buffered for output stream.\n"); | ||
1849 | ✗ | return AVERROR(ENOSPC); | |
1850 | } | ||
1851 | 145 | ret = av_fifo_grow2(q->fifo, new_size - packets); | |
1852 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 145 times.
|
145 | if (ret < 0) |
1853 | ✗ | return ret; | |
1854 | } | ||
1855 | |||
1856 |
2/2✓ Branch 0 taken 3269 times.
✓ Branch 1 taken 93 times.
|
3362 | if (pkt) { |
1857 | 3269 | tmp_pkt = av_packet_alloc(); | |
1858 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3269 times.
|
3269 | if (!tmp_pkt) |
1859 | ✗ | return AVERROR(ENOMEM); | |
1860 | |||
1861 | 3269 | av_packet_move_ref(tmp_pkt, pkt); | |
1862 | 3269 | q->data_size += tmp_pkt->size; | |
1863 | } | ||
1864 | 3362 | av_fifo_write(q->fifo, &tmp_pkt, 1); | |
1865 | |||
1866 | 3362 | return 0; | |
1867 | } | ||
1868 | |||
1869 | 505899 | static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx, | |
1870 | AVPacket *pkt) | ||
1871 | { | ||
1872 | 505899 | SchMuxStream *ms = &mux->streams[stream_idx]; | |
1873 |
2/2✓ Branch 0 taken 488336 times.
✓ Branch 1 taken 9207 times.
|
497543 | int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ? |
1874 |
2/2✓ Branch 0 taken 497543 times.
✓ Branch 1 taken 8356 times.
|
1003442 | av_rescale_q(pkt->dts + pkt->duration, pkt->time_base, AV_TIME_BASE_Q) : |
1875 | AV_NOPTS_VALUE; | ||
1876 | |||
1877 | // queue the packet if the muxer cannot be started yet | ||
1878 |
2/2✓ Branch 0 taken 3402 times.
✓ Branch 1 taken 502497 times.
|
505899 | if (!atomic_load(&mux->mux_started)) { |
1879 | 3402 | int queued = 0; | |
1880 | |||
1881 | // the muxer could have started between the above atomic check and | ||
1882 | // locking the mutex, then this block falls through to normal send path | ||
1883 | 3402 | pthread_mutex_lock(&sch->mux_ready_lock); | |
1884 | |||
1885 |
2/2✓ Branch 0 taken 3362 times.
✓ Branch 1 taken 40 times.
|
3402 | if (!atomic_load(&mux->mux_started)) { |
1886 | 3362 | int ret = mux_queue_packet(mux, ms, pkt); | |
1887 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3362 times.
|
3362 | queued = ret < 0 ? ret : 1; |
1888 | } | ||
1889 | |||
1890 | 3402 | pthread_mutex_unlock(&sch->mux_ready_lock); | |
1891 | |||
1892 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3402 times.
|
3402 | if (queued < 0) |
1893 | ✗ | return queued; | |
1894 |
2/2✓ Branch 0 taken 3362 times.
✓ Branch 1 taken 40 times.
|
3402 | else if (queued) |
1895 | 3362 | goto update_schedule; | |
1896 | } | ||
1897 | |||
1898 |
2/2✓ Branch 0 taken 494274 times.
✓ Branch 1 taken 8263 times.
|
502537 | if (pkt) { |
1899 | int ret; | ||
1900 | |||
1901 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 494274 times.
|
494274 | if (ms->init_eof) |
1902 | ✗ | return AVERROR_EOF; | |
1903 | |||
1904 | 494274 | ret = tq_send(mux->queue, stream_idx, pkt); | |
1905 |
2/2✓ Branch 0 taken 64 times.
✓ Branch 1 taken 494210 times.
|
494274 | if (ret < 0) |
1906 | 64 | return ret; | |
1907 | } else | ||
1908 | 8263 | tq_send_finish(mux->queue, stream_idx); | |
1909 | |||
1910 | 505835 | update_schedule: | |
1911 | // TODO: use atomics to check whether this changes trailing dts | ||
1912 | // to avoid locking unnecesarily | ||
1913 |
4/4✓ Branch 0 taken 17563 times.
✓ Branch 1 taken 488272 times.
✓ Branch 2 taken 8356 times.
✓ Branch 3 taken 9207 times.
|
505835 | if (dts != AV_NOPTS_VALUE || !pkt) { |
1914 | 496628 | pthread_mutex_lock(&sch->schedule_lock); | |
1915 | |||
1916 |
2/2✓ Branch 0 taken 488272 times.
✓ Branch 1 taken 8356 times.
|
496628 | if (pkt) ms->last_dts = dts; |
1917 | 8356 | else ms->source_finished = 1; | |
1918 | |||
1919 | 496628 | schedule_update_locked(sch); | |
1920 | |||
1921 | 496628 | pthread_mutex_unlock(&sch->schedule_lock); | |
1922 | } | ||
1923 | |||
1924 | 505835 | return 0; | |
1925 | } | ||
1926 | |||
1927 | static int | ||
1928 | 478108 | demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst, | |
1929 | uint8_t *dst_finished, AVPacket *pkt, unsigned flags) | ||
1930 | { | ||
1931 | int ret; | ||
1932 | |||
1933 |
2/2✓ Branch 0 taken 3433 times.
✓ Branch 1 taken 474675 times.
|
478108 | if (*dst_finished) |
1934 | 3433 | return AVERROR_EOF; | |
1935 | |||
1936 |
4/4✓ Branch 0 taken 470695 times.
✓ Branch 1 taken 3980 times.
✓ Branch 2 taken 67696 times.
✓ Branch 3 taken 402999 times.
|
474675 | if (pkt && dst.type == SCH_NODE_TYPE_MUX && |
1937 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 67694 times.
|
67696 | (flags & DEMUX_SEND_STREAMCOPY_EOF)) { |
1938 | 2 | av_packet_unref(pkt); | |
1939 | 2 | pkt = NULL; | |
1940 | } | ||
1941 | |||
1942 |
2/2✓ Branch 0 taken 3982 times.
✓ Branch 1 taken 470693 times.
|
474675 | if (!pkt) |
1943 | 3982 | goto finish; | |
1944 | |||
1945 | 941386 | ret = (dst.type == SCH_NODE_TYPE_MUX) ? | |
1946 |
2/2✓ Branch 0 taken 67694 times.
✓ Branch 1 taken 402999 times.
|
470693 | send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) : |
1947 | 402999 | tq_send(sch->dec[dst.idx].queue, 0, pkt); | |
1948 |
2/2✓ Branch 0 taken 3431 times.
✓ Branch 1 taken 467262 times.
|
470693 | if (ret == AVERROR_EOF) |
1949 | 3431 | goto finish; | |
1950 | |||
1951 | 467262 | return ret; | |
1952 | |||
1953 | 7413 | finish: | |
1954 |
2/2✓ Branch 0 taken 660 times.
✓ Branch 1 taken 6753 times.
|
7413 | if (dst.type == SCH_NODE_TYPE_MUX) |
1955 | 660 | send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL); | |
1956 | else | ||
1957 | 6753 | tq_send_finish(sch->dec[dst.idx].queue, 0); | |
1958 | |||
1959 | 7413 | *dst_finished = 1; | |
1960 | 7413 | return AVERROR_EOF; | |
1961 | } | ||
1962 | |||
1963 | 477647 | static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds, | |
1964 | AVPacket *pkt, unsigned flags) | ||
1965 | { | ||
1966 | 477647 | unsigned nb_done = 0; | |
1967 | |||
1968 |
2/2✓ Branch 0 taken 478108 times.
✓ Branch 1 taken 477647 times.
|
955755 | for (unsigned i = 0; i < ds->nb_dst; i++) { |
1969 | 478108 | AVPacket *to_send = pkt; | |
1970 | 478108 | uint8_t *finished = &ds->dst_finished[i]; | |
1971 | |||
1972 | int ret; | ||
1973 | |||
1974 | // sending a packet consumes it, so make a temporary reference if needed | ||
1975 |
4/4✓ Branch 0 taken 470695 times.
✓ Branch 1 taken 7413 times.
✓ Branch 2 taken 438 times.
✓ Branch 3 taken 470257 times.
|
478108 | if (pkt && i < ds->nb_dst - 1) { |
1976 | 438 | to_send = d->send_pkt; | |
1977 | |||
1978 | 438 | ret = av_packet_ref(to_send, pkt); | |
1979 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 438 times.
|
438 | if (ret < 0) |
1980 | ✗ | return ret; | |
1981 | } | ||
1982 | |||
1983 | 478108 | ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags); | |
1984 |
2/2✓ Branch 0 taken 470695 times.
✓ Branch 1 taken 7413 times.
|
478108 | if (to_send) |
1985 | 470695 | av_packet_unref(to_send); | |
1986 |
2/2✓ Branch 0 taken 10846 times.
✓ Branch 1 taken 467262 times.
|
478108 | if (ret == AVERROR_EOF) |
1987 | 10846 | nb_done++; | |
1988 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 467262 times.
|
467262 | else if (ret < 0) |
1989 | ✗ | return ret; | |
1990 | } | ||
1991 | |||
1992 |
2/2✓ Branch 0 taken 10822 times.
✓ Branch 1 taken 466825 times.
|
477647 | return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0; |
1993 | } | ||
1994 | |||
1995 | 11 | static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt) | |
1996 | { | ||
1997 | 11 | Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE }; | |
1998 | |||
1999 |
3/6✓ Branch 0 taken 11 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 11 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 11 times.
|
11 | av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems); |
2000 | |||
2001 |
2/2✓ Branch 0 taken 14 times.
✓ Branch 1 taken 11 times.
|
25 | for (unsigned i = 0; i < d->nb_streams; i++) { |
2002 | 14 | SchDemuxStream *ds = &d->streams[i]; | |
2003 | |||
2004 |
2/2✓ Branch 0 taken 14 times.
✓ Branch 1 taken 14 times.
|
28 | for (unsigned j = 0; j < ds->nb_dst; j++) { |
2005 | 14 | const SchedulerNode *dst = &ds->dst[j]; | |
2006 | SchDec *dec; | ||
2007 | int ret; | ||
2008 | |||
2009 |
3/4✓ Branch 0 taken 14 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
✓ Branch 3 taken 6 times.
|
14 | if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC) |
2010 | 8 | continue; | |
2011 | |||
2012 | 6 | dec = &sch->dec[dst->idx]; | |
2013 | |||
2014 | 6 | ret = tq_send(dec->queue, 0, pkt); | |
2015 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (ret < 0) |
2016 | ✗ | return ret; | |
2017 | |||
2018 |
2/2✓ Branch 0 taken 3 times.
✓ Branch 1 taken 3 times.
|
6 | if (dec->queue_end_ts) { |
2019 | Timestamp ts; | ||
2020 | 3 | ret = av_thread_message_queue_recv(dec->queue_end_ts, &ts, 0); | |
2021 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (ret < 0) |
2022 | ✗ | return ret; | |
2023 | |||
2024 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (max_end_ts.ts == AV_NOPTS_VALUE || |
2025 | ✗ | (ts.ts != AV_NOPTS_VALUE && | |
2026 | ✗ | av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0)) | |
2027 | 3 | max_end_ts = ts; | |
2028 | |||
2029 | } | ||
2030 | } | ||
2031 | } | ||
2032 | |||
2033 | 11 | pkt->pts = max_end_ts.ts; | |
2034 | 11 | pkt->time_base = max_end_ts.tb; | |
2035 | |||
2036 | 11 | return 0; | |
2037 | } | ||
2038 | |||
2039 | 470312 | int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt, | |
2040 | unsigned flags) | ||
2041 | { | ||
2042 | SchDemux *d; | ||
2043 | int terminate; | ||
2044 | |||
2045 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 470312 times.
|
470312 | av_assert0(demux_idx < sch->nb_demux); |
2046 | 470312 | d = &sch->demux[demux_idx]; | |
2047 | |||
2048 | 470312 | terminate = waiter_wait(sch, &d->waiter); | |
2049 |
2/2✓ Branch 0 taken 44 times.
✓ Branch 1 taken 470268 times.
|
470312 | if (terminate) |
2050 | 44 | return AVERROR_EXIT; | |
2051 | |||
2052 | // flush the downstreams after seek | ||
2053 |
2/2✓ Branch 0 taken 11 times.
✓ Branch 1 taken 470257 times.
|
470268 | if (pkt->stream_index == -1) |
2054 | 11 | return demux_flush(sch, d, pkt); | |
2055 | |||
2056 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 470257 times.
|
470257 | av_assert0(pkt->stream_index < d->nb_streams); |
2057 | |||
2058 | 470257 | return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags); | |
2059 | } | ||
2060 | |||
2061 | 7164 | static int demux_done(Scheduler *sch, unsigned demux_idx) | |
2062 | { | ||
2063 | 7164 | SchDemux *d = &sch->demux[demux_idx]; | |
2064 | 7164 | int ret = 0; | |
2065 | |||
2066 |
2/2✓ Branch 0 taken 7390 times.
✓ Branch 1 taken 7164 times.
|
14554 | for (unsigned i = 0; i < d->nb_streams; i++) { |
2067 | 7390 | int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0); | |
2068 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7390 times.
|
7390 | if (err != AVERROR_EOF) |
2069 | ✗ | ret = err_merge(ret, err); | |
2070 | } | ||
2071 | |||
2072 | 7164 | pthread_mutex_lock(&sch->schedule_lock); | |
2073 | |||
2074 | 7164 | d->task_exited = 1; | |
2075 | |||
2076 | 7164 | schedule_update_locked(sch); | |
2077 | |||
2078 | 7164 | pthread_mutex_unlock(&sch->schedule_lock); | |
2079 | |||
2080 | 7164 | return ret; | |
2081 | } | ||
2082 | |||
2083 | 513462 | int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt) | |
2084 | { | ||
2085 | SchMux *mux; | ||
2086 | int ret, stream_idx; | ||
2087 | |||
2088 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 513462 times.
|
513462 | av_assert0(mux_idx < sch->nb_mux); |
2089 | 513462 | mux = &sch->mux[mux_idx]; | |
2090 | |||
2091 | 513462 | ret = tq_receive(mux->queue, &stream_idx, pkt); | |
2092 | 513462 | pkt->stream_index = stream_idx; | |
2093 | 513462 | return ret; | |
2094 | } | ||
2095 | |||
2096 | 199 | void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx) | |
2097 | { | ||
2098 | SchMux *mux; | ||
2099 | |||
2100 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 199 times.
|
199 | av_assert0(mux_idx < sch->nb_mux); |
2101 | 199 | mux = &sch->mux[mux_idx]; | |
2102 | |||
2103 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 199 times.
|
199 | av_assert0(stream_idx < mux->nb_streams); |
2104 | 199 | tq_receive_finish(mux->queue, stream_idx); | |
2105 | |||
2106 | 199 | pthread_mutex_lock(&sch->schedule_lock); | |
2107 | 199 | mux->streams[stream_idx].source_finished = 1; | |
2108 | |||
2109 | 199 | schedule_update_locked(sch); | |
2110 | |||
2111 | 199 | pthread_mutex_unlock(&sch->schedule_lock); | |
2112 | 199 | } | |
2113 | |||
2114 | 460067 | int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, | |
2115 | const AVPacket *pkt) | ||
2116 | { | ||
2117 | SchMux *mux; | ||
2118 | SchMuxStream *ms; | ||
2119 | |||
2120 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 460067 times.
|
460067 | av_assert0(mux_idx < sch->nb_mux); |
2121 | 460067 | mux = &sch->mux[mux_idx]; | |
2122 | |||
2123 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 460067 times.
|
460067 | av_assert0(stream_idx < mux->nb_streams); |
2124 | 460067 | ms = &mux->streams[stream_idx]; | |
2125 | |||
2126 |
2/2✓ Branch 0 taken 5 times.
✓ Branch 1 taken 460067 times.
|
460072 | for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) { |
2127 | 5 | SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]]; | |
2128 | int ret; | ||
2129 | |||
2130 | 5 | ret = av_packet_copy_props(mux->sub_heartbeat_pkt, pkt); | |
2131 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
|
5 | if (ret < 0) |
2132 | ✗ | return ret; | |
2133 | |||
2134 | 5 | tq_send(dst->queue, 0, mux->sub_heartbeat_pkt); | |
2135 | } | ||
2136 | |||
2137 | 460067 | return 0; | |
2138 | } | ||
2139 | |||
2140 | 7916 | static int mux_done(Scheduler *sch, unsigned mux_idx) | |
2141 | { | ||
2142 | 7916 | SchMux *mux = &sch->mux[mux_idx]; | |
2143 | |||
2144 | 7916 | pthread_mutex_lock(&sch->schedule_lock); | |
2145 | |||
2146 |
2/2✓ Branch 0 taken 8356 times.
✓ Branch 1 taken 7916 times.
|
16272 | for (unsigned i = 0; i < mux->nb_streams; i++) { |
2147 | 8356 | tq_receive_finish(mux->queue, i); | |
2148 | 8356 | mux->streams[i].source_finished = 1; | |
2149 | } | ||
2150 | |||
2151 | 7916 | schedule_update_locked(sch); | |
2152 | |||
2153 | 7916 | pthread_mutex_unlock(&sch->schedule_lock); | |
2154 | |||
2155 | 7916 | pthread_mutex_lock(&sch->mux_done_lock); | |
2156 | |||
2157 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7916 times.
|
7916 | av_assert0(sch->nb_mux_done < sch->nb_mux); |
2158 | 7916 | sch->nb_mux_done++; | |
2159 | |||
2160 | 7916 | pthread_cond_signal(&sch->mux_done_cond); | |
2161 | |||
2162 | 7916 | pthread_mutex_unlock(&sch->mux_done_lock); | |
2163 | |||
2164 | 7916 | return 0; | |
2165 | } | ||
2166 | |||
2167 | 375988 | int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt) | |
2168 | { | ||
2169 | SchDec *dec; | ||
2170 | int ret, dummy; | ||
2171 | |||
2172 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 375988 times.
|
375988 | av_assert0(dec_idx < sch->nb_dec); |
2173 | 375988 | dec = &sch->dec[dec_idx]; | |
2174 | |||
2175 | // the decoder should have given us post-flush end timestamp in pkt | ||
2176 |
2/2✓ Branch 0 taken 3 times.
✓ Branch 1 taken 375985 times.
|
375988 | if (dec->expect_end_ts) { |
2177 | 3 | Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base }; | |
2178 | 3 | ret = av_thread_message_queue_send(dec->queue_end_ts, &ts, 0); | |
2179 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (ret < 0) |
2180 | ✗ | return ret; | |
2181 | |||
2182 | 3 | dec->expect_end_ts = 0; | |
2183 | } | ||
2184 | |||
2185 | 375988 | ret = tq_receive(dec->queue, &dummy, pkt); | |
2186 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 375988 times.
|
375988 | av_assert0(dummy <= 0); |
2187 | |||
2188 | // got a flush packet, on the next call to this function the decoder | ||
2189 | // will give us post-flush end timestamp | ||
2190 |
7/8✓ Branch 0 taken 372665 times.
✓ Branch 1 taken 3323 times.
✓ Branch 2 taken 958 times.
✓ Branch 3 taken 371707 times.
✓ Branch 4 taken 958 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 3 times.
✓ Branch 7 taken 955 times.
|
375988 | if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts) |
2191 | 3 | dec->expect_end_ts = 1; | |
2192 | |||
2193 | 375988 | return ret; | |
2194 | } | ||
2195 | |||
2196 | 404669 | static int send_to_filter(Scheduler *sch, SchFilterGraph *fg, | |
2197 | unsigned in_idx, AVFrame *frame) | ||
2198 | { | ||
2199 |
2/2✓ Branch 0 taken 397895 times.
✓ Branch 1 taken 6774 times.
|
404669 | if (frame) |
2200 | 397895 | return tq_send(fg->queue, in_idx, frame); | |
2201 | |||
2202 |
1/2✓ Branch 0 taken 6774 times.
✗ Branch 1 not taken.
|
6774 | if (!fg->inputs[in_idx].send_finished) { |
2203 | 6774 | fg->inputs[in_idx].send_finished = 1; | |
2204 | 6774 | tq_send_finish(fg->queue, in_idx); | |
2205 | |||
2206 | // close the control stream when all actual inputs are done | ||
2207 |
2/2✓ Branch 0 taken 6698 times.
✓ Branch 1 taken 76 times.
|
6774 | if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1) |
2208 | 6698 | tq_send_finish(fg->queue, fg->nb_inputs); | |
2209 | } | ||
2210 | 6774 | return 0; | |
2211 | } | ||
2212 | |||
2213 | 409072 | static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst, | |
2214 | uint8_t *dst_finished, AVFrame *frame) | ||
2215 | { | ||
2216 | int ret; | ||
2217 | |||
2218 |
2/2✓ Branch 0 taken 6962 times.
✓ Branch 1 taken 402110 times.
|
409072 | if (*dst_finished) |
2219 | 6962 | return AVERROR_EOF; | |
2220 | |||
2221 |
2/2✓ Branch 0 taken 3336 times.
✓ Branch 1 taken 398774 times.
|
402110 | if (!frame) |
2222 | 3336 | goto finish; | |
2223 | |||
2224 | 797548 | ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ? | |
2225 |
2/2✓ Branch 0 taken 397895 times.
✓ Branch 1 taken 879 times.
|
398774 | send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) : |
2226 | 879 | send_to_enc(sch, &sch->enc[dst.idx], frame); | |
2227 |
2/2✓ Branch 0 taken 3476 times.
✓ Branch 1 taken 395298 times.
|
398774 | if (ret == AVERROR_EOF) |
2228 | 3476 | goto finish; | |
2229 | |||
2230 | 395298 | return ret; | |
2231 | |||
2232 | 6812 | finish: | |
2233 |
2/2✓ Branch 0 taken 6774 times.
✓ Branch 1 taken 38 times.
|
6812 | if (dst.type == SCH_NODE_TYPE_FILTER_IN) |
2234 | 6774 | send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL); | |
2235 | else | ||
2236 | 38 | send_to_enc(sch, &sch->enc[dst.idx], NULL); | |
2237 | |||
2238 | 6812 | *dst_finished = 1; | |
2239 | |||
2240 | 6812 | return AVERROR_EOF; | |
2241 | } | ||
2242 | |||
2243 | 400827 | int sch_dec_send(Scheduler *sch, unsigned dec_idx, | |
2244 | unsigned out_idx, AVFrame *frame) | ||
2245 | { | ||
2246 | SchDec *dec; | ||
2247 | SchDecOutput *o; | ||
2248 | int ret; | ||
2249 | 400827 | unsigned nb_done = 0; | |
2250 | |||
2251 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 400827 times.
|
400827 | av_assert0(dec_idx < sch->nb_dec); |
2252 | 400827 | dec = &sch->dec[dec_idx]; | |
2253 | |||
2254 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 400827 times.
|
400827 | av_assert0(out_idx < dec->nb_outputs); |
2255 | 400827 | o = &dec->outputs[out_idx]; | |
2256 | |||
2257 |
2/2✓ Branch 0 taken 402260 times.
✓ Branch 1 taken 400827 times.
|
803087 | for (unsigned i = 0; i < o->nb_dst; i++) { |
2258 | 402260 | uint8_t *finished = &o->dst_finished[i]; | |
2259 | 402260 | AVFrame *to_send = frame; | |
2260 | |||
2261 | // sending a frame consumes it, so make a temporary reference if needed | ||
2262 |
2/2✓ Branch 0 taken 1433 times.
✓ Branch 1 taken 400827 times.
|
402260 | if (i < o->nb_dst - 1) { |
2263 | 1433 | to_send = dec->send_frame; | |
2264 | |||
2265 | // frame may sometimes contain props only, | ||
2266 | // e.g. to signal EOF timestamp | ||
2267 |
2/2✓ Branch 0 taken 1335 times.
✓ Branch 1 taken 98 times.
|
1433 | ret = frame->buf[0] ? av_frame_ref(to_send, frame) : |
2268 | 98 | av_frame_copy_props(to_send, frame); | |
2269 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1433 times.
|
1433 | if (ret < 0) |
2270 | ✗ | return ret; | |
2271 | } | ||
2272 | |||
2273 | 402260 | ret = dec_send_to_dst(sch, o->dst[i], finished, to_send); | |
2274 |
2/2✓ Branch 0 taken 6962 times.
✓ Branch 1 taken 395298 times.
|
402260 | if (ret < 0) { |
2275 | 6962 | av_frame_unref(to_send); | |
2276 |
1/2✓ Branch 0 taken 6962 times.
✗ Branch 1 not taken.
|
6962 | if (ret == AVERROR_EOF) { |
2277 | 6962 | nb_done++; | |
2278 | 6962 | continue; | |
2279 | } | ||
2280 | ✗ | return ret; | |
2281 | } | ||
2282 | } | ||
2283 | |||
2284 |
2/2✓ Branch 0 taken 6874 times.
✓ Branch 1 taken 393953 times.
|
400827 | return (nb_done == o->nb_dst) ? AVERROR_EOF : 0; |
2285 | } | ||
2286 | |||
2287 | 6754 | static int dec_done(Scheduler *sch, unsigned dec_idx) | |
2288 | { | ||
2289 | 6754 | SchDec *dec = &sch->dec[dec_idx]; | |
2290 | 6754 | int ret = 0; | |
2291 | |||
2292 | 6754 | tq_receive_finish(dec->queue, 0); | |
2293 | |||
2294 | // make sure our source does not get stuck waiting for end timestamps | ||
2295 | // that will never arrive | ||
2296 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6753 times.
|
6754 | if (dec->queue_end_ts) |
2297 | 1 | av_thread_message_queue_set_err_recv(dec->queue_end_ts, AVERROR_EOF); | |
2298 | |||
2299 |
2/2✓ Branch 0 taken 6760 times.
✓ Branch 1 taken 6754 times.
|
13514 | for (unsigned i = 0; i < dec->nb_outputs; i++) { |
2300 | 6760 | SchDecOutput *o = &dec->outputs[i]; | |
2301 | |||
2302 |
2/2✓ Branch 0 taken 6812 times.
✓ Branch 1 taken 6760 times.
|
13572 | for (unsigned j = 0; j < o->nb_dst; j++) { |
2303 | 6812 | int err = dec_send_to_dst(sch, o->dst[j], &o->dst_finished[j], NULL); | |
2304 |
2/4✓ Branch 0 taken 6812 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6812 times.
|
6812 | if (err < 0 && err != AVERROR_EOF) |
2305 | ✗ | ret = err_merge(ret, err); | |
2306 | } | ||
2307 | } | ||
2308 | |||
2309 | 6754 | return ret; | |
2310 | } | ||
2311 | |||
2312 | 443713 | int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame) | |
2313 | { | ||
2314 | SchEnc *enc; | ||
2315 | int ret, dummy; | ||
2316 | |||
2317 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 443713 times.
|
443713 | av_assert0(enc_idx < sch->nb_enc); |
2318 | 443713 | enc = &sch->enc[enc_idx]; | |
2319 | |||
2320 | 443713 | ret = tq_receive(enc->queue, &dummy, frame); | |
2321 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 443713 times.
|
443713 | av_assert0(dummy <= 0); |
2322 | |||
2323 | 443713 | return ret; | |
2324 | } | ||
2325 | |||
2326 | 437638 | static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst, | |
2327 | uint8_t *dst_finished, AVPacket *pkt) | ||
2328 | { | ||
2329 | int ret; | ||
2330 | |||
2331 |
2/2✓ Branch 0 taken 44 times.
✓ Branch 1 taken 437594 times.
|
437638 | if (*dst_finished) |
2332 | 44 | return AVERROR_EOF; | |
2333 | |||
2334 |
2/2✓ Branch 0 taken 7695 times.
✓ Branch 1 taken 429899 times.
|
437594 | if (!pkt) |
2335 | 7695 | goto finish; | |
2336 | |||
2337 | 859798 | ret = (dst.type == SCH_NODE_TYPE_MUX) ? | |
2338 |
2/2✓ Branch 0 taken 429849 times.
✓ Branch 1 taken 50 times.
|
429899 | send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) : |
2339 | 50 | tq_send(sch->dec[dst.idx].queue, 0, pkt); | |
2340 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 429897 times.
|
429899 | if (ret == AVERROR_EOF) |
2341 | 2 | goto finish; | |
2342 | |||
2343 | 429897 | return ret; | |
2344 | |||
2345 | 7697 | finish: | |
2346 |
2/2✓ Branch 0 taken 7696 times.
✓ Branch 1 taken 1 times.
|
7697 | if (dst.type == SCH_NODE_TYPE_MUX) |
2347 | 7696 | send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL); | |
2348 | else | ||
2349 | 1 | tq_send_finish(sch->dec[dst.idx].queue, 0); | |
2350 | |||
2351 | 7697 | *dst_finished = 1; | |
2352 | |||
2353 | 7697 | return AVERROR_EOF; | |
2354 | } | ||
2355 | |||
2356 | 429891 | int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt) | |
2357 | { | ||
2358 | SchEnc *enc; | ||
2359 | int ret; | ||
2360 | |||
2361 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 429891 times.
|
429891 | av_assert0(enc_idx < sch->nb_enc); |
2362 | 429891 | enc = &sch->enc[enc_idx]; | |
2363 | |||
2364 |
2/2✓ Branch 0 taken 429941 times.
✓ Branch 1 taken 429891 times.
|
859832 | for (unsigned i = 0; i < enc->nb_dst; i++) { |
2365 | 429941 | uint8_t *finished = &enc->dst_finished[i]; | |
2366 | 429941 | AVPacket *to_send = pkt; | |
2367 | |||
2368 | // sending a packet consumes it, so make a temporary reference if needed | ||
2369 |
2/2✓ Branch 0 taken 50 times.
✓ Branch 1 taken 429891 times.
|
429941 | if (i < enc->nb_dst - 1) { |
2370 | 50 | to_send = enc->send_pkt; | |
2371 | |||
2372 | 50 | ret = av_packet_ref(to_send, pkt); | |
2373 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 50 times.
|
50 | if (ret < 0) |
2374 | ✗ | return ret; | |
2375 | } | ||
2376 | |||
2377 | 429941 | ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send); | |
2378 |
2/2✓ Branch 0 taken 44 times.
✓ Branch 1 taken 429897 times.
|
429941 | if (ret < 0) { |
2379 | 44 | av_packet_unref(to_send); | |
2380 |
1/2✓ Branch 0 taken 44 times.
✗ Branch 1 not taken.
|
44 | if (ret == AVERROR_EOF) |
2381 | 44 | continue; | |
2382 | ✗ | return ret; | |
2383 | } | ||
2384 | } | ||
2385 | |||
2386 | 429891 | return 0; | |
2387 | } | ||
2388 | |||
2389 | 7696 | static int enc_done(Scheduler *sch, unsigned enc_idx) | |
2390 | { | ||
2391 | 7696 | SchEnc *enc = &sch->enc[enc_idx]; | |
2392 | 7696 | int ret = 0; | |
2393 | |||
2394 | 7696 | tq_receive_finish(enc->queue, 0); | |
2395 | |||
2396 |
2/2✓ Branch 0 taken 7697 times.
✓ Branch 1 taken 7696 times.
|
15393 | for (unsigned i = 0; i < enc->nb_dst; i++) { |
2397 | 7697 | int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL); | |
2398 |
2/4✓ Branch 0 taken 7697 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7697 times.
|
7697 | if (err < 0 && err != AVERROR_EOF) |
2399 | ✗ | ret = err_merge(ret, err); | |
2400 | } | ||
2401 | |||
2402 | 7696 | return ret; | |
2403 | } | ||
2404 | |||
2405 | 388590 | int sch_filter_receive(Scheduler *sch, unsigned fg_idx, | |
2406 | unsigned *in_idx, AVFrame *frame) | ||
2407 | { | ||
2408 | SchFilterGraph *fg; | ||
2409 | |||
2410 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 388590 times.
|
388590 | av_assert0(fg_idx < sch->nb_filters); |
2411 | 388590 | fg = &sch->filters[fg_idx]; | |
2412 | |||
2413 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 388590 times.
|
388590 | av_assert0(*in_idx <= fg->nb_inputs); |
2414 | |||
2415 | // update scheduling to account for desired input stream, if it changed | ||
2416 | // | ||
2417 | // this check needs no locking because only the filtering thread | ||
2418 | // updates this value | ||
2419 |
2/2✓ Branch 0 taken 907 times.
✓ Branch 1 taken 387683 times.
|
388590 | if (*in_idx != fg->best_input) { |
2420 | 907 | pthread_mutex_lock(&sch->schedule_lock); | |
2421 | |||
2422 | 907 | fg->best_input = *in_idx; | |
2423 | 907 | schedule_update_locked(sch); | |
2424 | |||
2425 | 907 | pthread_mutex_unlock(&sch->schedule_lock); | |
2426 | } | ||
2427 | |||
2428 |
2/2✓ Branch 0 taken 367627 times.
✓ Branch 1 taken 20963 times.
|
388590 | if (*in_idx == fg->nb_inputs) { |
2429 | 20963 | int terminate = waiter_wait(sch, &fg->waiter); | |
2430 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 20963 times.
|
20963 | return terminate ? AVERROR_EOF : AVERROR(EAGAIN); |
2431 | } | ||
2432 | |||
2433 | 26 | while (1) { | |
2434 | int ret, idx; | ||
2435 | |||
2436 | 367653 | ret = tq_receive(fg->queue, &idx, frame); | |
2437 |
2/2✓ Branch 0 taken 6 times.
✓ Branch 1 taken 367647 times.
|
367653 | if (idx < 0) |
2438 | 367627 | return AVERROR_EOF; | |
2439 |
2/2✓ Branch 0 taken 367621 times.
✓ Branch 1 taken 26 times.
|
367647 | else if (ret >= 0) { |
2440 | 367621 | *in_idx = idx; | |
2441 | 367621 | return 0; | |
2442 | } | ||
2443 | |||
2444 | // disregard EOFs for specific streams - they should always be | ||
2445 | // preceded by an EOF frame | ||
2446 | } | ||
2447 | } | ||
2448 | |||
2449 | 1 | void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx) | |
2450 | { | ||
2451 | SchFilterGraph *fg; | ||
2452 | SchFilterIn *fi; | ||
2453 | |||
2454 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | av_assert0(fg_idx < sch->nb_filters); |
2455 | 1 | fg = &sch->filters[fg_idx]; | |
2456 | |||
2457 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | av_assert0(in_idx < fg->nb_inputs); |
2458 | 1 | fi = &fg->inputs[in_idx]; | |
2459 | |||
2460 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
1 | if (!fi->receive_finished) { |
2461 | 1 | fi->receive_finished = 1; | |
2462 | 1 | tq_receive_finish(fg->queue, in_idx); | |
2463 | |||
2464 | // close the control stream when all actual inputs are done | ||
2465 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (++fg->nb_inputs_finished_receive == fg->nb_inputs) |
2466 | ✗ | tq_receive_finish(fg->queue, fg->nb_inputs); | |
2467 | } | ||
2468 | 1 | } | |
2469 | |||
2470 | 389017 | int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame) | |
2471 | { | ||
2472 | SchFilterGraph *fg; | ||
2473 | SchedulerNode dst; | ||
2474 | |||
2475 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 389017 times.
|
389017 | av_assert0(fg_idx < sch->nb_filters); |
2476 | 389017 | fg = &sch->filters[fg_idx]; | |
2477 | |||
2478 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 389017 times.
|
389017 | av_assert0(out_idx < fg->nb_outputs); |
2479 | 389017 | dst = fg->outputs[out_idx].dst; | |
2480 | |||
2481 | 389017 | return (dst.type == SCH_NODE_TYPE_ENC) ? | |
2482 |
1/2✓ Branch 0 taken 389017 times.
✗ Branch 1 not taken.
|
389017 | send_to_enc (sch, &sch->enc[dst.idx], frame) : |
2483 | ✗ | send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame); | |
2484 | } | ||
2485 | |||
2486 | 7537 | static int filter_done(Scheduler *sch, unsigned fg_idx) | |
2487 | { | ||
2488 | 7537 | SchFilterGraph *fg = &sch->filters[fg_idx]; | |
2489 | 7537 | int ret = 0; | |
2490 | |||
2491 |
2/2✓ Branch 0 taken 14311 times.
✓ Branch 1 taken 7537 times.
|
21848 | for (unsigned i = 0; i <= fg->nb_inputs; i++) |
2492 | 14311 | tq_receive_finish(fg->queue, i); | |
2493 | |||
2494 |
2/2✓ Branch 0 taken 7658 times.
✓ Branch 1 taken 7537 times.
|
15195 | for (unsigned i = 0; i < fg->nb_outputs; i++) { |
2495 | 7658 | SchedulerNode dst = fg->outputs[i].dst; | |
2496 | 15316 | int err = (dst.type == SCH_NODE_TYPE_ENC) ? | |
2497 |
1/2✓ Branch 0 taken 7658 times.
✗ Branch 1 not taken.
|
7658 | send_to_enc (sch, &sch->enc[dst.idx], NULL) : |
2498 | ✗ | send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL); | |
2499 | |||
2500 |
3/4✓ Branch 0 taken 3096 times.
✓ Branch 1 taken 4562 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 3096 times.
|
7658 | if (err < 0 && err != AVERROR_EOF) |
2501 | ✗ | ret = err_merge(ret, err); | |
2502 | } | ||
2503 | |||
2504 | 7537 | pthread_mutex_lock(&sch->schedule_lock); | |
2505 | |||
2506 | 7537 | fg->task_exited = 1; | |
2507 | |||
2508 | 7537 | schedule_update_locked(sch); | |
2509 | |||
2510 | 7537 | pthread_mutex_unlock(&sch->schedule_lock); | |
2511 | |||
2512 | 7537 | return ret; | |
2513 | } | ||
2514 | |||
2515 | ✗ | int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame) | |
2516 | { | ||
2517 | SchFilterGraph *fg; | ||
2518 | |||
2519 | ✗ | av_assert0(fg_idx < sch->nb_filters); | |
2520 | ✗ | fg = &sch->filters[fg_idx]; | |
2521 | |||
2522 | ✗ | return send_to_filter(sch, fg, fg->nb_inputs, frame); | |
2523 | } | ||
2524 | |||
2525 | 37067 | static int task_cleanup(Scheduler *sch, SchedulerNode node) | |
2526 | { | ||
2527 |
5/6✓ Branch 0 taken 7164 times.
✓ Branch 1 taken 7916 times.
✓ Branch 2 taken 6754 times.
✓ Branch 3 taken 7696 times.
✓ Branch 4 taken 7537 times.
✗ Branch 5 not taken.
|
37067 | switch (node.type) { |
2528 | 7164 | case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx); | |
2529 | 7916 | case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx); | |
2530 | 6754 | case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx); | |
2531 | 7696 | case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx); | |
2532 | 7537 | case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx); | |
2533 | ✗ | default: av_assert0(0); | |
2534 | } | ||
2535 | } | ||
2536 | |||
2537 | 37053 | static void *task_wrapper(void *arg) | |
2538 | { | ||
2539 | 37053 | SchTask *task = arg; | |
2540 | 37053 | Scheduler *sch = task->parent; | |
2541 | int ret; | ||
2542 | 37053 | int err = 0; | |
2543 | |||
2544 | 37053 | ret = task->func(task->func_arg); | |
2545 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 37052 times.
|
37053 | if (ret < 0) |
2546 | 1 | av_log(task->func_arg, AV_LOG_ERROR, | |
2547 | 1 | "Task finished with error code: %d (%s)\n", ret, av_err2str(ret)); | |
2548 | |||
2549 | 37053 | err = task_cleanup(sch, task->node); | |
2550 | 37053 | ret = err_merge(ret, err); | |
2551 | |||
2552 | // EOF is considered normal termination | ||
2553 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 37053 times.
|
37053 | if (ret == AVERROR_EOF) |
2554 | ✗ | ret = 0; | |
2555 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 37052 times.
|
37053 | if (ret < 0) |
2556 | 1 | atomic_store(&sch->task_failed, 1); | |
2557 | |||
2558 |
4/4✓ Branch 0 taken 1 times.
✓ Branch 1 taken 37052 times.
✓ Branch 2 taken 1 times.
✓ Branch 3 taken 37052 times.
|
37054 | av_log(task->func_arg, ret < 0 ? AV_LOG_ERROR : AV_LOG_VERBOSE, |
2559 | "Terminating thread with return code %d (%s)\n", ret, | ||
2560 | 1 | ret < 0 ? av_err2str(ret) : "success"); | |
2561 | |||
2562 | 37053 | return (void*)(intptr_t)ret; | |
2563 | } | ||
2564 | |||
2565 | 37067 | static int task_stop(Scheduler *sch, SchTask *task) | |
2566 | { | ||
2567 | int ret; | ||
2568 | void *thread_ret; | ||
2569 | |||
2570 |
2/2✓ Branch 0 taken 14 times.
✓ Branch 1 taken 37053 times.
|
37067 | if (!task->thread_running) |
2571 | 14 | return task_cleanup(sch, task->node); | |
2572 | |||
2573 | 37053 | ret = pthread_join(task->thread, &thread_ret); | |
2574 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 37053 times.
|
37053 | av_assert0(ret == 0); |
2575 | |||
2576 | 37053 | task->thread_running = 0; | |
2577 | |||
2578 | 37053 | return (intptr_t)thread_ret; | |
2579 | } | ||
2580 | |||
2581 | 15829 | int sch_stop(Scheduler *sch, int64_t *finish_ts) | |
2582 | { | ||
2583 | 15829 | int ret = 0, err; | |
2584 | |||
2585 |
2/2✓ Branch 0 taken 7915 times.
✓ Branch 1 taken 7914 times.
|
15829 | if (sch->state != SCH_STATE_STARTED) |
2586 | 7915 | return 0; | |
2587 | |||
2588 | 7914 | atomic_store(&sch->terminate, 1); | |
2589 | |||
2590 |
2/2✓ Branch 0 taken 15828 times.
✓ Branch 1 taken 7914 times.
|
23742 | for (unsigned type = 0; type < 2; type++) |
2591 |
4/4✓ Branch 0 taken 15078 times.
✓ Branch 1 taken 15451 times.
✓ Branch 2 taken 14701 times.
✓ Branch 3 taken 15828 times.
|
30529 | for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) { |
2592 |
2/2✓ Branch 0 taken 7164 times.
✓ Branch 1 taken 7537 times.
|
14701 | SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter; |
2593 | 14701 | waiter_set(w, 1); | |
2594 | } | ||
2595 | |||
2596 |
2/2✓ Branch 0 taken 7164 times.
✓ Branch 1 taken 7914 times.
|
15078 | for (unsigned i = 0; i < sch->nb_demux; i++) { |
2597 | 7164 | SchDemux *d = &sch->demux[i]; | |
2598 | |||
2599 | 7164 | err = task_stop(sch, &d->task); | |
2600 | 7164 | ret = err_merge(ret, err); | |
2601 | } | ||
2602 | |||
2603 |
2/2✓ Branch 0 taken 6754 times.
✓ Branch 1 taken 7914 times.
|
14668 | for (unsigned i = 0; i < sch->nb_dec; i++) { |
2604 | 6754 | SchDec *dec = &sch->dec[i]; | |
2605 | |||
2606 | 6754 | err = task_stop(sch, &dec->task); | |
2607 | 6754 | ret = err_merge(ret, err); | |
2608 | } | ||
2609 | |||
2610 |
2/2✓ Branch 0 taken 7537 times.
✓ Branch 1 taken 7914 times.
|
15451 | for (unsigned i = 0; i < sch->nb_filters; i++) { |
2611 | 7537 | SchFilterGraph *fg = &sch->filters[i]; | |
2612 | |||
2613 | 7537 | err = task_stop(sch, &fg->task); | |
2614 | 7537 | ret = err_merge(ret, err); | |
2615 | } | ||
2616 | |||
2617 |
2/2✓ Branch 0 taken 7696 times.
✓ Branch 1 taken 7914 times.
|
15610 | for (unsigned i = 0; i < sch->nb_enc; i++) { |
2618 | 7696 | SchEnc *enc = &sch->enc[i]; | |
2619 | |||
2620 | 7696 | err = task_stop(sch, &enc->task); | |
2621 | 7696 | ret = err_merge(ret, err); | |
2622 | } | ||
2623 | |||
2624 |
2/2✓ Branch 0 taken 7916 times.
✓ Branch 1 taken 7914 times.
|
15830 | for (unsigned i = 0; i < sch->nb_mux; i++) { |
2625 | 7916 | SchMux *mux = &sch->mux[i]; | |
2626 | |||
2627 | 7916 | err = task_stop(sch, &mux->task); | |
2628 | 7916 | ret = err_merge(ret, err); | |
2629 | } | ||
2630 | |||
2631 |
1/2✓ Branch 0 taken 7914 times.
✗ Branch 1 not taken.
|
7914 | if (finish_ts) |
2632 | 7914 | *finish_ts = trailing_dts(sch, 1); | |
2633 | |||
2634 | 7914 | sch->state = SCH_STATE_STOPPED; | |
2635 | |||
2636 | 7914 | return ret; | |
2637 | } | ||
2638 |