Line | Branch | Exec | Source |
---|---|---|---|
1 | /* | ||
2 | * Inter-thread scheduling/synchronization. | ||
3 | * Copyright (c) 2023 Anton Khirnov | ||
4 | * | ||
5 | * This file is part of FFmpeg. | ||
6 | * | ||
7 | * FFmpeg is free software; you can redistribute it and/or | ||
8 | * modify it under the terms of the GNU Lesser General Public | ||
9 | * License as published by the Free Software Foundation; either | ||
10 | * version 2.1 of the License, or (at your option) any later version. | ||
11 | * | ||
12 | * FFmpeg is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with FFmpeg; if not, write to the Free Software | ||
19 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | ||
20 | */ | ||
21 | |||
22 | #include <stdatomic.h> | ||
23 | #include <stddef.h> | ||
24 | #include <stdint.h> | ||
25 | |||
26 | #include "cmdutils.h" | ||
27 | #include "ffmpeg_sched.h" | ||
28 | #include "ffmpeg_utils.h" | ||
29 | #include "sync_queue.h" | ||
30 | #include "thread_queue.h" | ||
31 | |||
32 | #include "libavcodec/packet.h" | ||
33 | |||
34 | #include "libavutil/avassert.h" | ||
35 | #include "libavutil/error.h" | ||
36 | #include "libavutil/fifo.h" | ||
37 | #include "libavutil/frame.h" | ||
38 | #include "libavutil/mem.h" | ||
39 | #include "libavutil/thread.h" | ||
40 | #include "libavutil/threadmessage.h" | ||
41 | #include "libavutil/time.h" | ||
42 | |||
43 | // 100 ms | ||
44 | // FIXME: some other value? make this dynamic? | ||
45 | #define SCHEDULE_TOLERANCE (100 * 1000) | ||
46 | |||
47 | enum QueueType { | ||
48 | QUEUE_PACKETS, | ||
49 | QUEUE_FRAMES, | ||
50 | }; | ||
51 | |||
52 | typedef struct SchWaiter { | ||
53 | pthread_mutex_t lock; | ||
54 | pthread_cond_t cond; | ||
55 | atomic_int choked; | ||
56 | |||
57 | // the following are internal state of schedule_update_locked() and must not | ||
58 | // be accessed outside of it | ||
59 | int choked_prev; | ||
60 | int choked_next; | ||
61 | } SchWaiter; | ||
62 | |||
63 | typedef struct SchTask { | ||
64 | Scheduler *parent; | ||
65 | SchedulerNode node; | ||
66 | |||
67 | SchThreadFunc func; | ||
68 | void *func_arg; | ||
69 | |||
70 | pthread_t thread; | ||
71 | int thread_running; | ||
72 | } SchTask; | ||
73 | |||
74 | typedef struct SchDecOutput { | ||
75 | SchedulerNode *dst; | ||
76 | uint8_t *dst_finished; | ||
77 | unsigned nb_dst; | ||
78 | } SchDecOutput; | ||
79 | |||
80 | typedef struct SchDec { | ||
81 | const AVClass *class; | ||
82 | |||
83 | SchedulerNode src; | ||
84 | |||
85 | SchDecOutput *outputs; | ||
86 | unsigned nb_outputs; | ||
87 | |||
88 | SchTask task; | ||
89 | // Queue for receiving input packets, one stream. | ||
90 | ThreadQueue *queue; | ||
91 | |||
92 | // Queue for sending post-flush end timestamps back to the source | ||
93 | AVThreadMessageQueue *queue_end_ts; | ||
94 | int expect_end_ts; | ||
95 | |||
96 | // temporary storage used by sch_dec_send() | ||
97 | AVFrame *send_frame; | ||
98 | } SchDec; | ||
99 | |||
100 | typedef struct SchSyncQueue { | ||
101 | SyncQueue *sq; | ||
102 | AVFrame *frame; | ||
103 | pthread_mutex_t lock; | ||
104 | |||
105 | unsigned *enc_idx; | ||
106 | unsigned nb_enc_idx; | ||
107 | } SchSyncQueue; | ||
108 | |||
109 | typedef struct SchEnc { | ||
110 | const AVClass *class; | ||
111 | |||
112 | SchedulerNode src; | ||
113 | SchedulerNode *dst; | ||
114 | uint8_t *dst_finished; | ||
115 | unsigned nb_dst; | ||
116 | |||
117 | // [0] - index of the sync queue in Scheduler.sq_enc, | ||
118 | // [1] - index of this encoder in the sq | ||
119 | int sq_idx[2]; | ||
120 | |||
121 | /* Opening encoders is somewhat nontrivial due to their interaction with | ||
122 | * sync queues, which are (among other things) responsible for maintaining | ||
123 | * constant audio frame size, when it is required by the encoder. | ||
124 | * | ||
125 | * Opening the encoder requires stream parameters, obtained from the first | ||
126 | * frame. However, that frame cannot be properly chunked by the sync queue | ||
127 | * without knowing the required frame size, which is only available after | ||
128 | * opening the encoder. | ||
129 | * | ||
130 | * This apparent circular dependency is resolved in the following way: | ||
131 | * - the caller creating the encoder gives us a callback which opens the | ||
132 | * encoder and returns the required frame size (if any) | ||
133 | * - when the first frame is sent to the encoder, the sending thread | ||
134 | * - calls this callback, opening the encoder | ||
135 | * - passes the returned frame size to the sync queue | ||
136 | */ | ||
137 | int (*open_cb)(void *opaque, const AVFrame *frame); | ||
138 | int opened; | ||
139 | |||
140 | SchTask task; | ||
141 | // Queue for receiving input frames, one stream. | ||
142 | ThreadQueue *queue; | ||
143 | // tq_send() to queue returned EOF | ||
144 | int in_finished; | ||
145 | |||
146 | // temporary storage used by sch_enc_send() | ||
147 | AVPacket *send_pkt; | ||
148 | } SchEnc; | ||
149 | |||
150 | typedef struct SchDemuxStream { | ||
151 | SchedulerNode *dst; | ||
152 | uint8_t *dst_finished; | ||
153 | unsigned nb_dst; | ||
154 | } SchDemuxStream; | ||
155 | |||
156 | typedef struct SchDemux { | ||
157 | const AVClass *class; | ||
158 | |||
159 | SchDemuxStream *streams; | ||
160 | unsigned nb_streams; | ||
161 | |||
162 | SchTask task; | ||
163 | SchWaiter waiter; | ||
164 | |||
165 | // temporary storage used by sch_demux_send() | ||
166 | AVPacket *send_pkt; | ||
167 | |||
168 | // protected by schedule_lock | ||
169 | int task_exited; | ||
170 | } SchDemux; | ||
171 | |||
172 | typedef struct PreMuxQueue { | ||
173 | /** | ||
174 | * Queue for buffering the packets before the muxer task can be started. | ||
175 | */ | ||
176 | AVFifo *fifo; | ||
177 | /** | ||
178 | * Maximum number of packets in fifo. | ||
179 | */ | ||
180 | int max_packets; | ||
181 | /* | ||
182 | * The size of the AVPackets' buffers in queue. | ||
183 | * Updated when a packet is either pushed or pulled from the queue. | ||
184 | */ | ||
185 | size_t data_size; | ||
186 | /* Threshold after which max_packets will be in effect */ | ||
187 | size_t data_threshold; | ||
188 | } PreMuxQueue; | ||
189 | |||
190 | typedef struct SchMuxStream { | ||
191 | SchedulerNode src; | ||
192 | SchedulerNode src_sched; | ||
193 | |||
194 | unsigned *sub_heartbeat_dst; | ||
195 | unsigned nb_sub_heartbeat_dst; | ||
196 | |||
197 | PreMuxQueue pre_mux_queue; | ||
198 | |||
199 | // an EOF was generated while flushing the pre-mux queue | ||
200 | int init_eof; | ||
201 | |||
202 | //////////////////////////////////////////////////////////// | ||
203 | // The following are protected by Scheduler.schedule_lock // | ||
204 | |||
205 | /* dts+duration of the last packet sent to this stream | ||
206 | in AV_TIME_BASE_Q */ | ||
207 | int64_t last_dts; | ||
208 | // this stream no longer accepts input | ||
209 | int source_finished; | ||
210 | //////////////////////////////////////////////////////////// | ||
211 | } SchMuxStream; | ||
212 | |||
213 | typedef struct SchMux { | ||
214 | const AVClass *class; | ||
215 | |||
216 | SchMuxStream *streams; | ||
217 | unsigned nb_streams; | ||
218 | unsigned nb_streams_ready; | ||
219 | |||
220 | int (*init)(void *arg); | ||
221 | |||
222 | SchTask task; | ||
223 | /** | ||
224 | * Set to 1 after starting the muxer task and flushing the | ||
225 | * pre-muxing queues. | ||
226 | * Set either before any tasks have started, or with | ||
227 | * Scheduler.mux_ready_lock held. | ||
228 | */ | ||
229 | atomic_int mux_started; | ||
230 | ThreadQueue *queue; | ||
231 | unsigned queue_size; | ||
232 | |||
233 | AVPacket *sub_heartbeat_pkt; | ||
234 | } SchMux; | ||
235 | |||
236 | typedef struct SchFilterIn { | ||
237 | SchedulerNode src; | ||
238 | SchedulerNode src_sched; | ||
239 | int send_finished; | ||
240 | int receive_finished; | ||
241 | } SchFilterIn; | ||
242 | |||
243 | typedef struct SchFilterOut { | ||
244 | SchedulerNode dst; | ||
245 | } SchFilterOut; | ||
246 | |||
247 | typedef struct SchFilterGraph { | ||
248 | const AVClass *class; | ||
249 | |||
250 | SchFilterIn *inputs; | ||
251 | unsigned nb_inputs; | ||
252 | atomic_uint nb_inputs_finished_send; | ||
253 | unsigned nb_inputs_finished_receive; | ||
254 | |||
255 | SchFilterOut *outputs; | ||
256 | unsigned nb_outputs; | ||
257 | |||
258 | SchTask task; | ||
259 | // input queue, nb_inputs+1 streams | ||
260 | // last stream is control | ||
261 | ThreadQueue *queue; | ||
262 | SchWaiter waiter; | ||
263 | |||
264 | // protected by schedule_lock | ||
265 | unsigned best_input; | ||
266 | int task_exited; | ||
267 | } SchFilterGraph; | ||
268 | |||
269 | enum SchedulerState { | ||
270 | SCH_STATE_UNINIT, | ||
271 | SCH_STATE_STARTED, | ||
272 | SCH_STATE_STOPPED, | ||
273 | }; | ||
274 | |||
275 | struct Scheduler { | ||
276 | const AVClass *class; | ||
277 | |||
278 | SchDemux *demux; | ||
279 | unsigned nb_demux; | ||
280 | |||
281 | SchMux *mux; | ||
282 | unsigned nb_mux; | ||
283 | |||
284 | unsigned nb_mux_ready; | ||
285 | pthread_mutex_t mux_ready_lock; | ||
286 | |||
287 | unsigned nb_mux_done; | ||
288 | pthread_mutex_t mux_done_lock; | ||
289 | pthread_cond_t mux_done_cond; | ||
290 | |||
291 | |||
292 | SchDec *dec; | ||
293 | unsigned nb_dec; | ||
294 | |||
295 | SchEnc *enc; | ||
296 | unsigned nb_enc; | ||
297 | |||
298 | SchSyncQueue *sq_enc; | ||
299 | unsigned nb_sq_enc; | ||
300 | |||
301 | SchFilterGraph *filters; | ||
302 | unsigned nb_filters; | ||
303 | |||
304 | char *sdp_filename; | ||
305 | int sdp_auto; | ||
306 | |||
307 | enum SchedulerState state; | ||
308 | atomic_int terminate; | ||
309 | atomic_int task_failed; | ||
310 | |||
311 | pthread_mutex_t schedule_lock; | ||
312 | |||
313 | atomic_int_least64_t last_dts; | ||
314 | }; | ||
315 | |||
316 | /** | ||
317 | * Wait until this task is allowed to proceed. | ||
318 | * | ||
319 | * @retval 0 the caller should proceed | ||
320 | * @retval 1 the caller should terminate | ||
321 | */ | ||
322 | 464714 | static int waiter_wait(Scheduler *sch, SchWaiter *w) | |
323 | { | ||
324 | int terminate; | ||
325 | |||
326 |
2/2✓ Branch 0 taken 464458 times.
✓ Branch 1 taken 256 times.
|
464714 | if (!atomic_load(&w->choked)) |
327 | 464458 | return 0; | |
328 | |||
329 | 256 | pthread_mutex_lock(&w->lock); | |
330 | |||
331 |
4/4✓ Branch 0 taken 262 times.
✓ Branch 1 taken 199 times.
✓ Branch 2 taken 205 times.
✓ Branch 3 taken 57 times.
|
461 | while (atomic_load(&w->choked) && !atomic_load(&sch->terminate)) |
332 | 205 | pthread_cond_wait(&w->cond, &w->lock); | |
333 | |||
334 | 256 | terminate = atomic_load(&sch->terminate); | |
335 | |||
336 | 256 | pthread_mutex_unlock(&w->lock); | |
337 | |||
338 | 256 | return terminate; | |
339 | } | ||
340 | |||
341 | 30620 | static void waiter_set(SchWaiter *w, int choked) | |
342 | { | ||
343 | 30620 | pthread_mutex_lock(&w->lock); | |
344 | |||
345 | 30620 | atomic_store(&w->choked, choked); | |
346 | 30620 | pthread_cond_signal(&w->cond); | |
347 | |||
348 | 30620 | pthread_mutex_unlock(&w->lock); | |
349 | 30620 | } | |
350 | |||
351 | 13285 | static int waiter_init(SchWaiter *w) | |
352 | { | ||
353 | int ret; | ||
354 | |||
355 | 13285 | atomic_init(&w->choked, 0); | |
356 | |||
357 | 13285 | ret = pthread_mutex_init(&w->lock, NULL); | |
358 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 13285 times.
|
13285 | if (ret) |
359 | ✗ | return AVERROR(ret); | |
360 | |||
361 | 13285 | ret = pthread_cond_init(&w->cond, NULL); | |
362 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 13285 times.
|
13285 | if (ret) |
363 | ✗ | return AVERROR(ret); | |
364 | |||
365 | 13285 | return 0; | |
366 | } | ||
367 | |||
368 | 13285 | static void waiter_uninit(SchWaiter *w) | |
369 | { | ||
370 | 13285 | pthread_mutex_destroy(&w->lock); | |
371 | 13285 | pthread_cond_destroy(&w->cond); | |
372 | 13285 | } | |
373 | |||
374 | 26291 | static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size, | |
375 | enum QueueType type) | ||
376 | { | ||
377 | ThreadQueue *tq; | ||
378 | ObjPool *op; | ||
379 | |||
380 |
1/2✓ Branch 0 taken 26291 times.
✗ Branch 1 not taken.
|
26291 | if (queue_size <= 0) { |
381 |
2/2✓ Branch 0 taken 13037 times.
✓ Branch 1 taken 13254 times.
|
26291 | if (type == QUEUE_FRAMES) |
382 | 13037 | queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE; | |
383 | else | ||
384 | 13254 | queue_size = DEFAULT_PACKET_THREAD_QUEUE_SIZE; | |
385 | } | ||
386 | |||
387 |
2/2✓ Branch 0 taken 13037 times.
✓ Branch 1 taken 13254 times.
|
26291 | if (type == QUEUE_FRAMES) { |
388 | // This queue length is used in the decoder code to ensure that | ||
389 | // there are enough entries in fixed-size frame pools to account | ||
390 | // for frames held in queues inside the ffmpeg utility. If this | ||
391 | // can ever dynamically change then the corresponding decode | ||
392 | // code needs to be updated as well. | ||
393 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 13037 times.
|
13037 | av_assert0(queue_size == DEFAULT_FRAME_THREAD_QUEUE_SIZE); |
394 | } | ||
395 | |||
396 |
2/2✓ Branch 0 taken 13254 times.
✓ Branch 1 taken 13037 times.
|
26291 | op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() : |
397 | 13037 | objpool_alloc_frames(); | |
398 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 26291 times.
|
26291 | if (!op) |
399 | ✗ | return AVERROR(ENOMEM); | |
400 | |||
401 |
2/2✓ Branch 0 taken 13254 times.
✓ Branch 1 taken 13037 times.
|
26291 | tq = tq_alloc(nb_streams, queue_size, op, |
402 | (type == QUEUE_PACKETS) ? pkt_move : frame_move); | ||
403 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 26291 times.
|
26291 | if (!tq) { |
404 | ✗ | objpool_free(&op); | |
405 | ✗ | return AVERROR(ENOMEM); | |
406 | } | ||
407 | |||
408 | 26291 | *ptq = tq; | |
409 | 26291 | return 0; | |
410 | } | ||
411 | |||
412 | static void *task_wrapper(void *arg); | ||
413 | |||
414 | 33123 | static int task_start(SchTask *task) | |
415 | { | ||
416 | int ret; | ||
417 | |||
418 | 33123 | av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n"); | |
419 | |||
420 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 33123 times.
|
33123 | av_assert0(!task->thread_running); |
421 | |||
422 | 33123 | ret = pthread_create(&task->thread, NULL, task_wrapper, task); | |
423 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 33123 times.
|
33123 | if (ret) { |
424 | ✗ | av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n", | |
425 | strerror(ret)); | ||
426 | ✗ | return AVERROR(ret); | |
427 | } | ||
428 | |||
429 | 33123 | task->thread_running = 1; | |
430 | 33123 | return 0; | |
431 | } | ||
432 | |||
433 | 33137 | static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx, | |
434 | SchThreadFunc func, void *func_arg) | ||
435 | { | ||
436 | 33137 | task->parent = sch; | |
437 | |||
438 | 33137 | task->node.type = type; | |
439 | 33137 | task->node.idx = idx; | |
440 | |||
441 | 33137 | task->func = func; | |
442 | 33137 | task->func_arg = func_arg; | |
443 | 33137 | } | |
444 | |||
445 | 509033 | static int64_t trailing_dts(const Scheduler *sch, int count_finished) | |
446 | { | ||
447 | 509033 | int64_t min_dts = INT64_MAX; | |
448 | |||
449 |
2/2✓ Branch 0 taken 509288 times.
✓ Branch 1 taken 495137 times.
|
1004425 | for (unsigned i = 0; i < sch->nb_mux; i++) { |
450 | 509288 | const SchMux *mux = &sch->mux[i]; | |
451 | |||
452 |
2/2✓ Branch 0 taken 555576 times.
✓ Branch 1 taken 495392 times.
|
1050968 | for (unsigned j = 0; j < mux->nb_streams; j++) { |
453 | 555576 | const SchMuxStream *ms = &mux->streams[j]; | |
454 | |||
455 |
4/4✓ Branch 0 taken 40577 times.
✓ Branch 1 taken 514999 times.
✓ Branch 2 taken 33355 times.
✓ Branch 3 taken 7222 times.
|
555576 | if (ms->source_finished && !count_finished) |
456 | 33355 | continue; | |
457 |
2/2✓ Branch 0 taken 13896 times.
✓ Branch 1 taken 508325 times.
|
522221 | if (ms->last_dts == AV_NOPTS_VALUE) |
458 | 13896 | return AV_NOPTS_VALUE; | |
459 | |||
460 | 508325 | min_dts = FFMIN(min_dts, ms->last_dts); | |
461 | } | ||
462 | } | ||
463 | |||
464 |
2/2✓ Branch 0 taken 470038 times.
✓ Branch 1 taken 25099 times.
|
495137 | return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts; |
465 | } | ||
466 | |||
467 | 6817 | void sch_free(Scheduler **psch) | |
468 | { | ||
469 | 6817 | Scheduler *sch = *psch; | |
470 | |||
471 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6817 times.
|
6817 | if (!sch) |
472 | ✗ | return; | |
473 | |||
474 | 6817 | sch_stop(sch, NULL); | |
475 | |||
476 |
2/2✓ Branch 0 taken 6846 times.
✓ Branch 1 taken 6817 times.
|
13663 | for (unsigned i = 0; i < sch->nb_demux; i++) { |
477 | 6846 | SchDemux *d = &sch->demux[i]; | |
478 | |||
479 |
2/2✓ Branch 0 taken 7072 times.
✓ Branch 1 taken 6846 times.
|
13918 | for (unsigned j = 0; j < d->nb_streams; j++) { |
480 | 7072 | SchDemuxStream *ds = &d->streams[j]; | |
481 | 7072 | av_freep(&ds->dst); | |
482 | 7072 | av_freep(&ds->dst_finished); | |
483 | } | ||
484 | 6846 | av_freep(&d->streams); | |
485 | |||
486 | 6846 | av_packet_free(&d->send_pkt); | |
487 | |||
488 | 6846 | waiter_uninit(&d->waiter); | |
489 | } | ||
490 | 6817 | av_freep(&sch->demux); | |
491 | |||
492 |
2/2✓ Branch 0 taken 6818 times.
✓ Branch 1 taken 6817 times.
|
13635 | for (unsigned i = 0; i < sch->nb_mux; i++) { |
493 | 6818 | SchMux *mux = &sch->mux[i]; | |
494 | |||
495 |
2/2✓ Branch 0 taken 7258 times.
✓ Branch 1 taken 6818 times.
|
14076 | for (unsigned j = 0; j < mux->nb_streams; j++) { |
496 | 7258 | SchMuxStream *ms = &mux->streams[j]; | |
497 | |||
498 |
1/2✓ Branch 0 taken 7258 times.
✗ Branch 1 not taken.
|
7258 | if (ms->pre_mux_queue.fifo) { |
499 | AVPacket *pkt; | ||
500 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 7258 times.
|
7258 | while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0) |
501 | ✗ | av_packet_free(&pkt); | |
502 | 7258 | av_fifo_freep2(&ms->pre_mux_queue.fifo); | |
503 | } | ||
504 | |||
505 | 7258 | av_freep(&ms->sub_heartbeat_dst); | |
506 | } | ||
507 | 6818 | av_freep(&mux->streams); | |
508 | |||
509 | 6818 | av_packet_free(&mux->sub_heartbeat_pkt); | |
510 | |||
511 | 6818 | tq_free(&mux->queue); | |
512 | } | ||
513 | 6817 | av_freep(&sch->mux); | |
514 | |||
515 |
2/2✓ Branch 0 taken 6436 times.
✓ Branch 1 taken 6817 times.
|
13253 | for (unsigned i = 0; i < sch->nb_dec; i++) { |
516 | 6436 | SchDec *dec = &sch->dec[i]; | |
517 | |||
518 | 6436 | tq_free(&dec->queue); | |
519 | |||
520 | 6436 | av_thread_message_queue_free(&dec->queue_end_ts); | |
521 | |||
522 |
2/2✓ Branch 0 taken 6442 times.
✓ Branch 1 taken 6436 times.
|
12878 | for (unsigned j = 0; j < dec->nb_outputs; j++) { |
523 | 6442 | SchDecOutput *o = &dec->outputs[j]; | |
524 | |||
525 | 6442 | av_freep(&o->dst); | |
526 | 6442 | av_freep(&o->dst_finished); | |
527 | } | ||
528 | |||
529 | 6436 | av_freep(&dec->outputs); | |
530 | |||
531 | 6436 | av_frame_free(&dec->send_frame); | |
532 | } | ||
533 | 6817 | av_freep(&sch->dec); | |
534 | |||
535 |
2/2✓ Branch 0 taken 6598 times.
✓ Branch 1 taken 6817 times.
|
13415 | for (unsigned i = 0; i < sch->nb_enc; i++) { |
536 | 6598 | SchEnc *enc = &sch->enc[i]; | |
537 | |||
538 | 6598 | tq_free(&enc->queue); | |
539 | |||
540 | 6598 | av_packet_free(&enc->send_pkt); | |
541 | |||
542 | 6598 | av_freep(&enc->dst); | |
543 | 6598 | av_freep(&enc->dst_finished); | |
544 | } | ||
545 | 6817 | av_freep(&sch->enc); | |
546 | |||
547 |
2/2✓ Branch 0 taken 2777 times.
✓ Branch 1 taken 6817 times.
|
9594 | for (unsigned i = 0; i < sch->nb_sq_enc; i++) { |
548 | 2777 | SchSyncQueue *sq = &sch->sq_enc[i]; | |
549 | 2777 | sq_free(&sq->sq); | |
550 | 2777 | av_frame_free(&sq->frame); | |
551 | 2777 | pthread_mutex_destroy(&sq->lock); | |
552 | 2777 | av_freep(&sq->enc_idx); | |
553 | } | ||
554 | 6817 | av_freep(&sch->sq_enc); | |
555 | |||
556 |
2/2✓ Branch 0 taken 6439 times.
✓ Branch 1 taken 6817 times.
|
13256 | for (unsigned i = 0; i < sch->nb_filters; i++) { |
557 | 6439 | SchFilterGraph *fg = &sch->filters[i]; | |
558 | |||
559 | 6439 | tq_free(&fg->queue); | |
560 | |||
561 | 6439 | av_freep(&fg->inputs); | |
562 | 6439 | av_freep(&fg->outputs); | |
563 | |||
564 | 6439 | waiter_uninit(&fg->waiter); | |
565 | } | ||
566 | 6817 | av_freep(&sch->filters); | |
567 | |||
568 | 6817 | av_freep(&sch->sdp_filename); | |
569 | |||
570 | 6817 | pthread_mutex_destroy(&sch->schedule_lock); | |
571 | |||
572 | 6817 | pthread_mutex_destroy(&sch->mux_ready_lock); | |
573 | |||
574 | 6817 | pthread_mutex_destroy(&sch->mux_done_lock); | |
575 | 6817 | pthread_cond_destroy(&sch->mux_done_cond); | |
576 | |||
577 | 6817 | av_freep(psch); | |
578 | } | ||
579 | |||
580 | static const AVClass scheduler_class = { | ||
581 | .class_name = "Scheduler", | ||
582 | .version = LIBAVUTIL_VERSION_INT, | ||
583 | }; | ||
584 | |||
585 | 6817 | Scheduler *sch_alloc(void) | |
586 | { | ||
587 | Scheduler *sch; | ||
588 | int ret; | ||
589 | |||
590 | 6817 | sch = av_mallocz(sizeof(*sch)); | |
591 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6817 times.
|
6817 | if (!sch) |
592 | ✗ | return NULL; | |
593 | |||
594 | 6817 | sch->class = &scheduler_class; | |
595 | 6817 | sch->sdp_auto = 1; | |
596 | |||
597 | 6817 | ret = pthread_mutex_init(&sch->schedule_lock, NULL); | |
598 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6817 times.
|
6817 | if (ret) |
599 | ✗ | goto fail; | |
600 | |||
601 | 6817 | ret = pthread_mutex_init(&sch->mux_ready_lock, NULL); | |
602 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6817 times.
|
6817 | if (ret) |
603 | ✗ | goto fail; | |
604 | |||
605 | 6817 | ret = pthread_mutex_init(&sch->mux_done_lock, NULL); | |
606 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6817 times.
|
6817 | if (ret) |
607 | ✗ | goto fail; | |
608 | |||
609 | 6817 | ret = pthread_cond_init(&sch->mux_done_cond, NULL); | |
610 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6817 times.
|
6817 | if (ret) |
611 | ✗ | goto fail; | |
612 | |||
613 | 6817 | return sch; | |
614 | ✗ | fail: | |
615 | ✗ | sch_free(&sch); | |
616 | ✗ | return NULL; | |
617 | } | ||
618 | |||
619 | ✗ | int sch_sdp_filename(Scheduler *sch, const char *sdp_filename) | |
620 | { | ||
621 | ✗ | av_freep(&sch->sdp_filename); | |
622 | ✗ | sch->sdp_filename = av_strdup(sdp_filename); | |
623 | ✗ | return sch->sdp_filename ? 0 : AVERROR(ENOMEM); | |
624 | } | ||
625 | |||
626 | static const AVClass sch_mux_class = { | ||
627 | .class_name = "SchMux", | ||
628 | .version = LIBAVUTIL_VERSION_INT, | ||
629 | .parent_log_context_offset = offsetof(SchMux, task.func_arg), | ||
630 | }; | ||
631 | |||
632 | 6818 | int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *), | |
633 | void *arg, int sdp_auto, unsigned thread_queue_size) | ||
634 | { | ||
635 | 6818 | const unsigned idx = sch->nb_mux; | |
636 | |||
637 | SchMux *mux; | ||
638 | int ret; | ||
639 | |||
640 | 6818 | ret = GROW_ARRAY(sch->mux, sch->nb_mux); | |
641 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6818 times.
|
6818 | if (ret < 0) |
642 | ✗ | return ret; | |
643 | |||
644 | 6818 | mux = &sch->mux[idx]; | |
645 | 6818 | mux->class = &sch_mux_class; | |
646 | 6818 | mux->init = init; | |
647 | 6818 | mux->queue_size = thread_queue_size; | |
648 | |||
649 | 6818 | task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg); | |
650 | |||
651 | 6818 | sch->sdp_auto &= sdp_auto; | |
652 | |||
653 | 6818 | return idx; | |
654 | } | ||
655 | |||
656 | 7258 | int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx) | |
657 | { | ||
658 | SchMux *mux; | ||
659 | SchMuxStream *ms; | ||
660 | unsigned stream_idx; | ||
661 | int ret; | ||
662 | |||
663 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7258 times.
|
7258 | av_assert0(mux_idx < sch->nb_mux); |
664 | 7258 | mux = &sch->mux[mux_idx]; | |
665 | |||
666 | 7258 | ret = GROW_ARRAY(mux->streams, mux->nb_streams); | |
667 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7258 times.
|
7258 | if (ret < 0) |
668 | ✗ | return ret; | |
669 | 7258 | stream_idx = mux->nb_streams - 1; | |
670 | |||
671 | 7258 | ms = &mux->streams[stream_idx]; | |
672 | |||
673 | 7258 | ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0); | |
674 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7258 times.
|
7258 | if (!ms->pre_mux_queue.fifo) |
675 | ✗ | return AVERROR(ENOMEM); | |
676 | |||
677 | 7258 | ms->last_dts = AV_NOPTS_VALUE; | |
678 | |||
679 | 7258 | return stream_idx; | |
680 | } | ||
681 | |||
682 | static const AVClass sch_demux_class = { | ||
683 | .class_name = "SchDemux", | ||
684 | .version = LIBAVUTIL_VERSION_INT, | ||
685 | .parent_log_context_offset = offsetof(SchDemux, task.func_arg), | ||
686 | }; | ||
687 | |||
688 | 6846 | int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx) | |
689 | { | ||
690 | 6846 | const unsigned idx = sch->nb_demux; | |
691 | |||
692 | SchDemux *d; | ||
693 | int ret; | ||
694 | |||
695 | 6846 | ret = GROW_ARRAY(sch->demux, sch->nb_demux); | |
696 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6846 times.
|
6846 | if (ret < 0) |
697 | ✗ | return ret; | |
698 | |||
699 | 6846 | d = &sch->demux[idx]; | |
700 | |||
701 | 6846 | task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx); | |
702 | |||
703 | 6846 | d->class = &sch_demux_class; | |
704 | 6846 | d->send_pkt = av_packet_alloc(); | |
705 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6846 times.
|
6846 | if (!d->send_pkt) |
706 | ✗ | return AVERROR(ENOMEM); | |
707 | |||
708 | 6846 | ret = waiter_init(&d->waiter); | |
709 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6846 times.
|
6846 | if (ret < 0) |
710 | ✗ | return ret; | |
711 | |||
712 | 6846 | return idx; | |
713 | } | ||
714 | |||
715 | 7072 | int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx) | |
716 | { | ||
717 | SchDemux *d; | ||
718 | int ret; | ||
719 | |||
720 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7072 times.
|
7072 | av_assert0(demux_idx < sch->nb_demux); |
721 | 7072 | d = &sch->demux[demux_idx]; | |
722 | |||
723 | 7072 | ret = GROW_ARRAY(d->streams, d->nb_streams); | |
724 |
1/2✓ Branch 0 taken 7072 times.
✗ Branch 1 not taken.
|
7072 | return ret < 0 ? ret : d->nb_streams - 1; |
725 | } | ||
726 | |||
727 | 6442 | int sch_add_dec_output(Scheduler *sch, unsigned dec_idx) | |
728 | { | ||
729 | SchDec *dec; | ||
730 | int ret; | ||
731 | |||
732 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6442 times.
|
6442 | av_assert0(dec_idx < sch->nb_dec); |
733 | 6442 | dec = &sch->dec[dec_idx]; | |
734 | |||
735 | 6442 | ret = GROW_ARRAY(dec->outputs, dec->nb_outputs); | |
736 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6442 times.
|
6442 | if (ret < 0) |
737 | ✗ | return ret; | |
738 | |||
739 | 6442 | return dec->nb_outputs - 1; | |
740 | } | ||
741 | |||
742 | static const AVClass sch_dec_class = { | ||
743 | .class_name = "SchDec", | ||
744 | .version = LIBAVUTIL_VERSION_INT, | ||
745 | .parent_log_context_offset = offsetof(SchDec, task.func_arg), | ||
746 | }; | ||
747 | |||
748 | 6436 | int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts) | |
749 | { | ||
750 | 6436 | const unsigned idx = sch->nb_dec; | |
751 | |||
752 | SchDec *dec; | ||
753 | int ret; | ||
754 | |||
755 | 6436 | ret = GROW_ARRAY(sch->dec, sch->nb_dec); | |
756 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6436 times.
|
6436 | if (ret < 0) |
757 | ✗ | return ret; | |
758 | |||
759 | 6436 | dec = &sch->dec[idx]; | |
760 | |||
761 | 6436 | task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx); | |
762 | |||
763 | 6436 | dec->class = &sch_dec_class; | |
764 | 6436 | dec->send_frame = av_frame_alloc(); | |
765 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6436 times.
|
6436 | if (!dec->send_frame) |
766 | ✗ | return AVERROR(ENOMEM); | |
767 | |||
768 | 6436 | ret = sch_add_dec_output(sch, idx); | |
769 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6436 times.
|
6436 | if (ret < 0) |
770 | ✗ | return ret; | |
771 | |||
772 | 6436 | ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS); | |
773 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6436 times.
|
6436 | if (ret < 0) |
774 | ✗ | return ret; | |
775 | |||
776 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6435 times.
|
6436 | if (send_end_ts) { |
777 | 1 | ret = av_thread_message_queue_alloc(&dec->queue_end_ts, 1, sizeof(Timestamp)); | |
778 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (ret < 0) |
779 | ✗ | return ret; | |
780 | } | ||
781 | |||
782 | 6436 | return idx; | |
783 | } | ||
784 | |||
785 | static const AVClass sch_enc_class = { | ||
786 | .class_name = "SchEnc", | ||
787 | .version = LIBAVUTIL_VERSION_INT, | ||
788 | .parent_log_context_offset = offsetof(SchEnc, task.func_arg), | ||
789 | }; | ||
790 | |||
791 | 6598 | int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, | |
792 | int (*open_cb)(void *opaque, const AVFrame *frame)) | ||
793 | { | ||
794 | 6598 | const unsigned idx = sch->nb_enc; | |
795 | |||
796 | SchEnc *enc; | ||
797 | int ret; | ||
798 | |||
799 | 6598 | ret = GROW_ARRAY(sch->enc, sch->nb_enc); | |
800 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6598 times.
|
6598 | if (ret < 0) |
801 | ✗ | return ret; | |
802 | |||
803 | 6598 | enc = &sch->enc[idx]; | |
804 | |||
805 | 6598 | enc->class = &sch_enc_class; | |
806 | 6598 | enc->open_cb = open_cb; | |
807 | 6598 | enc->sq_idx[0] = -1; | |
808 | 6598 | enc->sq_idx[1] = -1; | |
809 | |||
810 | 6598 | task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx); | |
811 | |||
812 | 6598 | enc->send_pkt = av_packet_alloc(); | |
813 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6598 times.
|
6598 | if (!enc->send_pkt) |
814 | ✗ | return AVERROR(ENOMEM); | |
815 | |||
816 | 6598 | ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES); | |
817 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6598 times.
|
6598 | if (ret < 0) |
818 | ✗ | return ret; | |
819 | |||
820 | 6598 | return idx; | |
821 | } | ||
822 | |||
823 | static const AVClass sch_fg_class = { | ||
824 | .class_name = "SchFilterGraph", | ||
825 | .version = LIBAVUTIL_VERSION_INT, | ||
826 | .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg), | ||
827 | }; | ||
828 | |||
829 | 6439 | int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, | |
830 | SchThreadFunc func, void *ctx) | ||
831 | { | ||
832 | 6439 | const unsigned idx = sch->nb_filters; | |
833 | |||
834 | SchFilterGraph *fg; | ||
835 | int ret; | ||
836 | |||
837 | 6439 | ret = GROW_ARRAY(sch->filters, sch->nb_filters); | |
838 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6439 times.
|
6439 | if (ret < 0) |
839 | ✗ | return ret; | |
840 | 6439 | fg = &sch->filters[idx]; | |
841 | |||
842 | 6439 | fg->class = &sch_fg_class; | |
843 | |||
844 | 6439 | task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx); | |
845 | |||
846 |
2/2✓ Branch 0 taken 6380 times.
✓ Branch 1 taken 59 times.
|
6439 | if (nb_inputs) { |
847 | 6380 | fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs)); | |
848 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6380 times.
|
6380 | if (!fg->inputs) |
849 | ✗ | return AVERROR(ENOMEM); | |
850 | 6380 | fg->nb_inputs = nb_inputs; | |
851 | } | ||
852 | |||
853 |
1/2✓ Branch 0 taken 6439 times.
✗ Branch 1 not taken.
|
6439 | if (nb_outputs) { |
854 | 6439 | fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs)); | |
855 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6439 times.
|
6439 | if (!fg->outputs) |
856 | ✗ | return AVERROR(ENOMEM); | |
857 | 6439 | fg->nb_outputs = nb_outputs; | |
858 | } | ||
859 | |||
860 | 6439 | ret = waiter_init(&fg->waiter); | |
861 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6439 times.
|
6439 | if (ret < 0) |
862 | ✗ | return ret; | |
863 | |||
864 | 6439 | ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES); | |
865 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6439 times.
|
6439 | if (ret < 0) |
866 | ✗ | return ret; | |
867 | |||
868 | 6439 | return idx; | |
869 | } | ||
870 | |||
871 | 2777 | int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx) | |
872 | { | ||
873 | SchSyncQueue *sq; | ||
874 | int ret; | ||
875 | |||
876 | 2777 | ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc); | |
877 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2777 times.
|
2777 | if (ret < 0) |
878 | ✗ | return ret; | |
879 | 2777 | sq = &sch->sq_enc[sch->nb_sq_enc - 1]; | |
880 | |||
881 | 2777 | sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx); | |
882 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2777 times.
|
2777 | if (!sq->sq) |
883 | ✗ | return AVERROR(ENOMEM); | |
884 | |||
885 | 2777 | sq->frame = av_frame_alloc(); | |
886 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2777 times.
|
2777 | if (!sq->frame) |
887 | ✗ | return AVERROR(ENOMEM); | |
888 | |||
889 | 2777 | ret = pthread_mutex_init(&sq->lock, NULL); | |
890 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2777 times.
|
2777 | if (ret) |
891 | ✗ | return AVERROR(ret); | |
892 | |||
893 | 2777 | return sq - sch->sq_enc; | |
894 | } | ||
895 | |||
896 | 2830 | int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx, | |
897 | int limiting, uint64_t max_frames) | ||
898 | { | ||
899 | SchSyncQueue *sq; | ||
900 | SchEnc *enc; | ||
901 | int ret; | ||
902 | |||
903 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2830 times.
|
2830 | av_assert0(sq_idx < sch->nb_sq_enc); |
904 | 2830 | sq = &sch->sq_enc[sq_idx]; | |
905 | |||
906 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2830 times.
|
2830 | av_assert0(enc_idx < sch->nb_enc); |
907 | 2830 | enc = &sch->enc[enc_idx]; | |
908 | |||
909 | 2830 | ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx); | |
910 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2830 times.
|
2830 | if (ret < 0) |
911 | ✗ | return ret; | |
912 | 2830 | sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx; | |
913 | |||
914 | 2830 | ret = sq_add_stream(sq->sq, limiting); | |
915 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2830 times.
|
2830 | if (ret < 0) |
916 | ✗ | return ret; | |
917 | |||
918 | 2830 | enc->sq_idx[0] = sq_idx; | |
919 | 2830 | enc->sq_idx[1] = ret; | |
920 | |||
921 |
2/2✓ Branch 0 taken 2651 times.
✓ Branch 1 taken 179 times.
|
2830 | if (max_frames != INT64_MAX) |
922 | 2651 | sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames); | |
923 | |||
924 | 2830 | return 0; | |
925 | } | ||
926 | |||
927 | 26748 | int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst) | |
928 | { | ||
929 | int ret; | ||
930 | |||
931 |
4/5✓ Branch 0 taken 7095 times.
✓ Branch 1 taken 6494 times.
✓ Branch 2 taken 6560 times.
✓ Branch 3 taken 6599 times.
✗ Branch 4 not taken.
|
26748 | switch (src.type) { |
932 | 7095 | case SCH_NODE_TYPE_DEMUX: { | |
933 | SchDemuxStream *ds; | ||
934 | |||
935 |
2/4✓ Branch 0 taken 7095 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7095 times.
|
7095 | av_assert0(src.idx < sch->nb_demux && |
936 | src.idx_stream < sch->demux[src.idx].nb_streams); | ||
937 | 7095 | ds = &sch->demux[src.idx].streams[src.idx_stream]; | |
938 | |||
939 | 7095 | ret = GROW_ARRAY(ds->dst, ds->nb_dst); | |
940 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7095 times.
|
7095 | if (ret < 0) |
941 | ✗ | return ret; | |
942 | |||
943 | 7095 | ds->dst[ds->nb_dst - 1] = dst; | |
944 | |||
945 | // demuxed packets go to decoding or streamcopy | ||
946 |
2/3✓ Branch 0 taken 6435 times.
✓ Branch 1 taken 660 times.
✗ Branch 2 not taken.
|
7095 | switch (dst.type) { |
947 | 6435 | case SCH_NODE_TYPE_DEC: { | |
948 | SchDec *dec; | ||
949 | |||
950 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6435 times.
|
6435 | av_assert0(dst.idx < sch->nb_dec); |
951 | 6435 | dec = &sch->dec[dst.idx]; | |
952 | |||
953 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6435 times.
|
6435 | av_assert0(!dec->src.type); |
954 | 6435 | dec->src = src; | |
955 | 6435 | break; | |
956 | } | ||
957 | 660 | case SCH_NODE_TYPE_MUX: { | |
958 | SchMuxStream *ms; | ||
959 | |||
960 |
2/4✓ Branch 0 taken 660 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 660 times.
|
660 | av_assert0(dst.idx < sch->nb_mux && |
961 | dst.idx_stream < sch->mux[dst.idx].nb_streams); | ||
962 | 660 | ms = &sch->mux[dst.idx].streams[dst.idx_stream]; | |
963 | |||
964 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 660 times.
|
660 | av_assert0(!ms->src.type); |
965 | 660 | ms->src = src; | |
966 | |||
967 | 660 | break; | |
968 | } | ||
969 | ✗ | default: av_assert0(0); | |
970 | } | ||
971 | |||
972 | 7095 | break; | |
973 | } | ||
974 | 6494 | case SCH_NODE_TYPE_DEC: { | |
975 | SchDec *dec; | ||
976 | SchDecOutput *o; | ||
977 | |||
978 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6494 times.
|
6494 | av_assert0(src.idx < sch->nb_dec); |
979 | 6494 | dec = &sch->dec[src.idx]; | |
980 | |||
981 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6494 times.
|
6494 | av_assert0(src.idx_stream < dec->nb_outputs); |
982 | 6494 | o = &dec->outputs[src.idx_stream]; | |
983 | |||
984 | 6494 | ret = GROW_ARRAY(o->dst, o->nb_dst); | |
985 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6494 times.
|
6494 | if (ret < 0) |
986 | ✗ | return ret; | |
987 | |||
988 | 6494 | o->dst[o->nb_dst - 1] = dst; | |
989 | |||
990 | // decoded frames go to filters or encoding | ||
991 |
2/3✓ Branch 0 taken 6456 times.
✓ Branch 1 taken 38 times.
✗ Branch 2 not taken.
|
6494 | switch (dst.type) { |
992 | 6456 | case SCH_NODE_TYPE_FILTER_IN: { | |
993 | SchFilterIn *fi; | ||
994 | |||
995 |
2/4✓ Branch 0 taken 6456 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6456 times.
|
6456 | av_assert0(dst.idx < sch->nb_filters && |
996 | dst.idx_stream < sch->filters[dst.idx].nb_inputs); | ||
997 | 6456 | fi = &sch->filters[dst.idx].inputs[dst.idx_stream]; | |
998 | |||
999 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6456 times.
|
6456 | av_assert0(!fi->src.type); |
1000 | 6456 | fi->src = src; | |
1001 | 6456 | break; | |
1002 | } | ||
1003 | 38 | case SCH_NODE_TYPE_ENC: { | |
1004 | SchEnc *enc; | ||
1005 | |||
1006 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
|
38 | av_assert0(dst.idx < sch->nb_enc); |
1007 | 38 | enc = &sch->enc[dst.idx]; | |
1008 | |||
1009 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
|
38 | av_assert0(!enc->src.type); |
1010 | 38 | enc->src = src; | |
1011 | 38 | break; | |
1012 | } | ||
1013 | ✗ | default: av_assert0(0); | |
1014 | } | ||
1015 | |||
1016 | 6494 | break; | |
1017 | } | ||
1018 | 6560 | case SCH_NODE_TYPE_FILTER_OUT: { | |
1019 | SchFilterOut *fo; | ||
1020 | |||
1021 |
2/4✓ Branch 0 taken 6560 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6560 times.
|
6560 | av_assert0(src.idx < sch->nb_filters && |
1022 | src.idx_stream < sch->filters[src.idx].nb_outputs); | ||
1023 | 6560 | fo = &sch->filters[src.idx].outputs[src.idx_stream]; | |
1024 | |||
1025 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6560 times.
|
6560 | av_assert0(!fo->dst.type); |
1026 | 6560 | fo->dst = dst; | |
1027 | |||
1028 | // filtered frames go to encoding or another filtergraph | ||
1029 |
1/3✓ Branch 0 taken 6560 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
|
6560 | switch (dst.type) { |
1030 | 6560 | case SCH_NODE_TYPE_ENC: { | |
1031 | SchEnc *enc; | ||
1032 | |||
1033 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6560 times.
|
6560 | av_assert0(dst.idx < sch->nb_enc); |
1034 | 6560 | enc = &sch->enc[dst.idx]; | |
1035 | |||
1036 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6560 times.
|
6560 | av_assert0(!enc->src.type); |
1037 | 6560 | enc->src = src; | |
1038 | 6560 | break; | |
1039 | } | ||
1040 | ✗ | case SCH_NODE_TYPE_FILTER_IN: { | |
1041 | SchFilterIn *fi; | ||
1042 | |||
1043 | ✗ | av_assert0(dst.idx < sch->nb_filters && | |
1044 | dst.idx_stream < sch->filters[dst.idx].nb_inputs); | ||
1045 | ✗ | fi = &sch->filters[dst.idx].inputs[dst.idx_stream]; | |
1046 | |||
1047 | ✗ | av_assert0(!fi->src.type); | |
1048 | ✗ | fi->src = src; | |
1049 | ✗ | break; | |
1050 | } | ||
1051 | ✗ | default: av_assert0(0); | |
1052 | } | ||
1053 | |||
1054 | |||
1055 | 6560 | break; | |
1056 | } | ||
1057 | 6599 | case SCH_NODE_TYPE_ENC: { | |
1058 | SchEnc *enc; | ||
1059 | |||
1060 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6599 times.
|
6599 | av_assert0(src.idx < sch->nb_enc); |
1061 | 6599 | enc = &sch->enc[src.idx]; | |
1062 | |||
1063 | 6599 | ret = GROW_ARRAY(enc->dst, enc->nb_dst); | |
1064 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6599 times.
|
6599 | if (ret < 0) |
1065 | ✗ | return ret; | |
1066 | |||
1067 | 6599 | enc->dst[enc->nb_dst - 1] = dst; | |
1068 | |||
1069 | // encoding packets go to muxing or decoding | ||
1070 |
2/3✓ Branch 0 taken 6598 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
6599 | switch (dst.type) { |
1071 | 6598 | case SCH_NODE_TYPE_MUX: { | |
1072 | SchMuxStream *ms; | ||
1073 | |||
1074 |
2/4✓ Branch 0 taken 6598 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6598 times.
|
6598 | av_assert0(dst.idx < sch->nb_mux && |
1075 | dst.idx_stream < sch->mux[dst.idx].nb_streams); | ||
1076 | 6598 | ms = &sch->mux[dst.idx].streams[dst.idx_stream]; | |
1077 | |||
1078 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6598 times.
|
6598 | av_assert0(!ms->src.type); |
1079 | 6598 | ms->src = src; | |
1080 | |||
1081 | 6598 | break; | |
1082 | } | ||
1083 | 1 | case SCH_NODE_TYPE_DEC: { | |
1084 | SchDec *dec; | ||
1085 | |||
1086 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | av_assert0(dst.idx < sch->nb_dec); |
1087 | 1 | dec = &sch->dec[dst.idx]; | |
1088 | |||
1089 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | av_assert0(!dec->src.type); |
1090 | 1 | dec->src = src; | |
1091 | |||
1092 | 1 | break; | |
1093 | } | ||
1094 | ✗ | default: av_assert0(0); | |
1095 | } | ||
1096 | |||
1097 | 6599 | break; | |
1098 | } | ||
1099 | ✗ | default: av_assert0(0); | |
1100 | } | ||
1101 | |||
1102 | 26748 | return 0; | |
1103 | } | ||
1104 | |||
1105 | 6818 | static int mux_task_start(SchMux *mux) | |
1106 | { | ||
1107 | 6818 | int ret = 0; | |
1108 | |||
1109 | 6818 | ret = task_start(&mux->task); | |
1110 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6818 times.
|
6818 | if (ret < 0) |
1111 | ✗ | return ret; | |
1112 | |||
1113 | /* flush the pre-muxing queues */ | ||
1114 |
2/2✓ Branch 0 taken 7258 times.
✓ Branch 1 taken 6818 times.
|
14076 | for (unsigned i = 0; i < mux->nb_streams; i++) { |
1115 | 7258 | SchMuxStream *ms = &mux->streams[i]; | |
1116 | AVPacket *pkt; | ||
1117 | |||
1118 |
2/2✓ Branch 1 taken 3414 times.
✓ Branch 2 taken 7258 times.
|
10672 | while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0) { |
1119 |
2/2✓ Branch 0 taken 3312 times.
✓ Branch 1 taken 102 times.
|
3414 | if (pkt) { |
1120 |
2/2✓ Branch 0 taken 3297 times.
✓ Branch 1 taken 15 times.
|
3312 | if (!ms->init_eof) |
1121 | 3297 | ret = tq_send(mux->queue, i, pkt); | |
1122 | 3312 | av_packet_free(&pkt); | |
1123 |
2/2✓ Branch 0 taken 16 times.
✓ Branch 1 taken 3296 times.
|
3312 | if (ret == AVERROR_EOF) |
1124 | 16 | ms->init_eof = 1; | |
1125 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3296 times.
|
3296 | else if (ret < 0) |
1126 | ✗ | return ret; | |
1127 | } else | ||
1128 | 102 | tq_send_finish(mux->queue, i); | |
1129 | } | ||
1130 | } | ||
1131 | |||
1132 | 6818 | atomic_store(&mux->mux_started, 1); | |
1133 | |||
1134 | 6818 | return 0; | |
1135 | } | ||
1136 | |||
1137 | int print_sdp(const char *filename); | ||
1138 | |||
1139 | 6818 | static int mux_init(Scheduler *sch, SchMux *mux) | |
1140 | { | ||
1141 | int ret; | ||
1142 | |||
1143 | 6818 | ret = mux->init(mux->task.func_arg); | |
1144 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6818 times.
|
6818 | if (ret < 0) |
1145 | ✗ | return ret; | |
1146 | |||
1147 | 6818 | sch->nb_mux_ready++; | |
1148 | |||
1149 |
2/4✓ Branch 0 taken 6818 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6818 times.
|
6818 | if (sch->sdp_filename || sch->sdp_auto) { |
1150 | ✗ | if (sch->nb_mux_ready < sch->nb_mux) | |
1151 | ✗ | return 0; | |
1152 | |||
1153 | ✗ | ret = print_sdp(sch->sdp_filename); | |
1154 | ✗ | if (ret < 0) { | |
1155 | ✗ | av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n"); | |
1156 | ✗ | return ret; | |
1157 | } | ||
1158 | |||
1159 | /* SDP is written only after all the muxers are ready, so now we | ||
1160 | * start ALL the threads */ | ||
1161 | ✗ | for (unsigned i = 0; i < sch->nb_mux; i++) { | |
1162 | ✗ | ret = mux_task_start(&sch->mux[i]); | |
1163 | ✗ | if (ret < 0) | |
1164 | ✗ | return ret; | |
1165 | } | ||
1166 | } else { | ||
1167 | 6818 | ret = mux_task_start(mux); | |
1168 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6818 times.
|
6818 | if (ret < 0) |
1169 | ✗ | return ret; | |
1170 | } | ||
1171 | |||
1172 | 6818 | return 0; | |
1173 | } | ||
1174 | |||
1175 | 7258 | void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, | |
1176 | size_t data_threshold, int max_packets) | ||
1177 | { | ||
1178 | SchMux *mux; | ||
1179 | SchMuxStream *ms; | ||
1180 | |||
1181 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7258 times.
|
7258 | av_assert0(mux_idx < sch->nb_mux); |
1182 | 7258 | mux = &sch->mux[mux_idx]; | |
1183 | |||
1184 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7258 times.
|
7258 | av_assert0(stream_idx < mux->nb_streams); |
1185 | 7258 | ms = &mux->streams[stream_idx]; | |
1186 | |||
1187 | 7258 | ms->pre_mux_queue.max_packets = max_packets; | |
1188 | 7258 | ms->pre_mux_queue.data_threshold = data_threshold; | |
1189 | 7258 | } | |
1190 | |||
1191 | 7258 | int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx) | |
1192 | { | ||
1193 | SchMux *mux; | ||
1194 | 7258 | int ret = 0; | |
1195 | |||
1196 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7258 times.
|
7258 | av_assert0(mux_idx < sch->nb_mux); |
1197 | 7258 | mux = &sch->mux[mux_idx]; | |
1198 | |||
1199 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7258 times.
|
7258 | av_assert0(stream_idx < mux->nb_streams); |
1200 | |||
1201 | 7258 | pthread_mutex_lock(&sch->mux_ready_lock); | |
1202 | |||
1203 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7258 times.
|
7258 | av_assert0(mux->nb_streams_ready < mux->nb_streams); |
1204 | |||
1205 | // this may be called during initialization - do not start | ||
1206 | // threads before sch_start() is called | ||
1207 |
2/2✓ Branch 0 taken 6818 times.
✓ Branch 1 taken 440 times.
|
7258 | if (++mux->nb_streams_ready == mux->nb_streams && |
1208 |
2/2✓ Branch 0 taken 6364 times.
✓ Branch 1 taken 454 times.
|
6818 | sch->state >= SCH_STATE_STARTED) |
1209 | 6364 | ret = mux_init(sch, mux); | |
1210 | |||
1211 | 7258 | pthread_mutex_unlock(&sch->mux_ready_lock); | |
1212 | |||
1213 | 7258 | return ret; | |
1214 | } | ||
1215 | |||
1216 | 1 | int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, | |
1217 | unsigned dec_idx) | ||
1218 | { | ||
1219 | SchMux *mux; | ||
1220 | SchMuxStream *ms; | ||
1221 | 1 | int ret = 0; | |
1222 | |||
1223 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | av_assert0(mux_idx < sch->nb_mux); |
1224 | 1 | mux = &sch->mux[mux_idx]; | |
1225 | |||
1226 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | av_assert0(stream_idx < mux->nb_streams); |
1227 | 1 | ms = &mux->streams[stream_idx]; | |
1228 | |||
1229 | 1 | ret = GROW_ARRAY(ms->sub_heartbeat_dst, ms->nb_sub_heartbeat_dst); | |
1230 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (ret < 0) |
1231 | ✗ | return ret; | |
1232 | |||
1233 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | av_assert0(dec_idx < sch->nb_dec); |
1234 | 1 | ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx; | |
1235 | |||
1236 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
1 | if (!mux->sub_heartbeat_pkt) { |
1237 | 1 | mux->sub_heartbeat_pkt = av_packet_alloc(); | |
1238 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (!mux->sub_heartbeat_pkt) |
1239 | ✗ | return AVERROR(ENOMEM); | |
1240 | } | ||
1241 | |||
1242 | 1 | return 0; | |
1243 | } | ||
1244 | |||
1245 | 497336 | static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) | |
1246 | { | ||
1247 | 426136 | while (1) { | |
1248 | SchFilterGraph *fg; | ||
1249 | |||
1250 | // fed directly by a demuxer (i.e. not through a filtergraph) | ||
1251 |
2/2✓ Branch 0 taken 492960 times.
✓ Branch 1 taken 430512 times.
|
923472 | if (src.type == SCH_NODE_TYPE_DEMUX) { |
1252 | 492960 | sch->demux[src.idx].waiter.choked_next = 0; | |
1253 | 492960 | return; | |
1254 | } | ||
1255 | |||
1256 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 430512 times.
|
430512 | av_assert0(src.type == SCH_NODE_TYPE_FILTER_OUT); |
1257 | 430512 | fg = &sch->filters[src.idx]; | |
1258 | |||
1259 | // the filtergraph contains internal sources and | ||
1260 | // requested to be scheduled directly | ||
1261 |
2/2✓ Branch 0 taken 4376 times.
✓ Branch 1 taken 426136 times.
|
430512 | if (fg->best_input == fg->nb_inputs) { |
1262 | 4376 | fg->waiter.choked_next = 0; | |
1263 | 4376 | return; | |
1264 | } | ||
1265 | |||
1266 | 426136 | src = fg->inputs[fg->best_input].src_sched; | |
1267 | } | ||
1268 | } | ||
1269 | |||
1270 | 505389 | static void schedule_update_locked(Scheduler *sch) | |
1271 | { | ||
1272 | int64_t dts; | ||
1273 | 505389 | int have_unchoked = 0; | |
1274 | |||
1275 | // on termination request all waiters are choked, | ||
1276 | // we are not to unchoke them | ||
1277 |
2/2✓ Branch 0 taken 3172 times.
✓ Branch 1 taken 502217 times.
|
505389 | if (atomic_load(&sch->terminate)) |
1278 | 3172 | return; | |
1279 | |||
1280 | 502217 | dts = trailing_dts(sch, 0); | |
1281 | |||
1282 | 502217 | atomic_store(&sch->last_dts, dts); | |
1283 | |||
1284 | // initialize our internal state | ||
1285 |
2/2✓ Branch 0 taken 1004434 times.
✓ Branch 1 taken 502217 times.
|
1506651 | for (unsigned type = 0; type < 2; type++) |
1286 |
4/4✓ Branch 0 taken 954386 times.
✓ Branch 1 taken 1008045 times.
✓ Branch 2 taken 957997 times.
✓ Branch 3 taken 1004434 times.
|
1962431 | for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) { |
1287 |
2/2✓ Branch 0 taken 452169 times.
✓ Branch 1 taken 505828 times.
|
957997 | SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter; |
1288 | 957997 | w->choked_prev = atomic_load(&w->choked); | |
1289 | 957997 | w->choked_next = 1; | |
1290 | } | ||
1291 | |||
1292 | // figure out the sources that are allowed to proceed | ||
1293 |
2/2✓ Branch 0 taken 502484 times.
✓ Branch 1 taken 502217 times.
|
1004701 | for (unsigned i = 0; i < sch->nb_mux; i++) { |
1294 | 502484 | SchMux *mux = &sch->mux[i]; | |
1295 | |||
1296 |
2/2✓ Branch 0 taken 557241 times.
✓ Branch 1 taken 502484 times.
|
1059725 | for (unsigned j = 0; j < mux->nb_streams; j++) { |
1297 | 557241 | SchMuxStream *ms = &mux->streams[j]; | |
1298 | |||
1299 | // unblock sources for output streams that are not finished | ||
1300 | // and not too far ahead of the trailing stream | ||
1301 |
2/2✓ Branch 0 taken 33899 times.
✓ Branch 1 taken 523342 times.
|
557241 | if (ms->source_finished) |
1302 | 33899 | continue; | |
1303 |
4/4✓ Branch 0 taken 28583 times.
✓ Branch 1 taken 494759 times.
✓ Branch 2 taken 9850 times.
✓ Branch 3 taken 18733 times.
|
523342 | if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE) |
1304 | 9850 | continue; | |
1305 |
4/4✓ Branch 0 taken 494759 times.
✓ Branch 1 taken 18733 times.
✓ Branch 2 taken 16156 times.
✓ Branch 3 taken 478603 times.
|
513492 | if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE) |
1306 | 16156 | continue; | |
1307 | |||
1308 | // resolve the source to unchoke | ||
1309 | 497336 | unchoke_for_stream(sch, ms->src_sched); | |
1310 | 497336 | have_unchoked = 1; | |
1311 | } | ||
1312 | } | ||
1313 | |||
1314 | // make sure to unchoke at least one source, if still available | ||
1315 |
4/4✓ Branch 0 taken 55684 times.
✓ Branch 1 taken 489091 times.
✓ Branch 2 taken 42558 times.
✓ Branch 3 taken 13126 times.
|
544775 | for (unsigned type = 0; !have_unchoked && type < 2; type++) |
1316 |
4/4✓ Branch 0 taken 29960 times.
✓ Branch 1 taken 42809 times.
✓ Branch 2 taken 42184 times.
✓ Branch 3 taken 30585 times.
|
72769 | for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) { |
1317 |
2/2✓ Branch 0 taken 16834 times.
✓ Branch 1 taken 25350 times.
|
42184 | int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited; |
1318 |
2/2✓ Branch 0 taken 16834 times.
✓ Branch 1 taken 25350 times.
|
42184 | SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter; |
1319 |
2/2✓ Branch 0 taken 11973 times.
✓ Branch 1 taken 30211 times.
|
42184 | if (!exited) { |
1320 | 11973 | w->choked_next = 0; | |
1321 | 11973 | have_unchoked = 1; | |
1322 | 11973 | break; | |
1323 | } | ||
1324 | } | ||
1325 | |||
1326 | |||
1327 |
2/2✓ Branch 0 taken 1004434 times.
✓ Branch 1 taken 502217 times.
|
1506651 | for (unsigned type = 0; type < 2; type++) |
1328 |
4/4✓ Branch 0 taken 954386 times.
✓ Branch 1 taken 1008045 times.
✓ Branch 2 taken 957997 times.
✓ Branch 3 taken 1004434 times.
|
1962431 | for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) { |
1329 |
2/2✓ Branch 0 taken 452169 times.
✓ Branch 1 taken 505828 times.
|
957997 | SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter; |
1330 |
2/2✓ Branch 0 taken 17335 times.
✓ Branch 1 taken 940662 times.
|
957997 | if (w->choked_prev != w->choked_next) |
1331 | 17335 | waiter_set(w, w->choked_next); | |
1332 | } | ||
1333 | |||
1334 | } | ||
1335 | |||
1336 | enum { | ||
1337 | CYCLE_NODE_NEW = 0, | ||
1338 | CYCLE_NODE_STARTED, | ||
1339 | CYCLE_NODE_DONE, | ||
1340 | }; | ||
1341 | |||
1342 | static int | ||
1343 | 6439 | check_acyclic_for_output(const Scheduler *sch, SchedulerNode src, | |
1344 | uint8_t *filters_visited, SchedulerNode *filters_stack) | ||
1345 | { | ||
1346 | 6439 | unsigned nb_filters_stack = 0; | |
1347 | |||
1348 | 6439 | memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited)); | |
1349 | |||
1350 | 6458 | while (1) { | |
1351 | 12897 | const SchFilterGraph *fg = &sch->filters[src.idx]; | |
1352 | |||
1353 | 12897 | filters_visited[src.idx] = CYCLE_NODE_STARTED; | |
1354 | |||
1355 | // descend into every input, depth first | ||
1356 |
2/2✓ Branch 0 taken 6457 times.
✓ Branch 1 taken 6440 times.
|
12897 | if (src.idx_stream < fg->nb_inputs) { |
1357 | 6457 | const SchFilterIn *fi = &fg->inputs[src.idx_stream++]; | |
1358 | |||
1359 | // connected to demuxer, no cycles possible | ||
1360 |
2/2✓ Branch 0 taken 6456 times.
✓ Branch 1 taken 1 times.
|
6457 | if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX) |
1361 | 6457 | continue; | |
1362 | |||
1363 | // otherwise connected to another filtergraph | ||
1364 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | av_assert0(fi->src_sched.type == SCH_NODE_TYPE_FILTER_OUT); |
1365 | |||
1366 | // found a cycle | ||
1367 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED) |
1368 | ✗ | return AVERROR(EINVAL); | |
1369 | |||
1370 | // place current position on stack and descend | ||
1371 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | av_assert0(nb_filters_stack < sch->nb_filters); |
1372 | 1 | filters_stack[nb_filters_stack++] = src; | |
1373 | 1 | src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 }; | |
1374 | 1 | continue; | |
1375 | } | ||
1376 | |||
1377 | 6440 | filters_visited[src.idx] = CYCLE_NODE_DONE; | |
1378 | |||
1379 | // previous search finished, | ||
1380 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6439 times.
|
6440 | if (nb_filters_stack) { |
1381 | 1 | src = filters_stack[--nb_filters_stack]; | |
1382 | 1 | continue; | |
1383 | } | ||
1384 | 6439 | return 0; | |
1385 | } | ||
1386 | } | ||
1387 | |||
1388 | 6816 | static int check_acyclic(Scheduler *sch) | |
1389 | { | ||
1390 | 6816 | uint8_t *filters_visited = NULL; | |
1391 | 6816 | SchedulerNode *filters_stack = NULL; | |
1392 | |||
1393 | 6816 | int ret = 0; | |
1394 | |||
1395 |
2/2✓ Branch 0 taken 489 times.
✓ Branch 1 taken 6327 times.
|
6816 | if (!sch->nb_filters) |
1396 | 489 | return 0; | |
1397 | |||
1398 | 6327 | filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited)); | |
1399 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6327 times.
|
6327 | if (!filters_visited) |
1400 | ✗ | return AVERROR(ENOMEM); | |
1401 | |||
1402 | 6327 | filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack)); | |
1403 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6327 times.
|
6327 | if (!filters_stack) { |
1404 | ✗ | ret = AVERROR(ENOMEM); | |
1405 | ✗ | goto fail; | |
1406 | } | ||
1407 | |||
1408 | // trace the transcoding graph upstream from every filtegraph | ||
1409 |
2/2✓ Branch 0 taken 6439 times.
✓ Branch 1 taken 6327 times.
|
12766 | for (unsigned i = 0; i < sch->nb_filters; i++) { |
1410 | 6439 | ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i }, | |
1411 | filters_visited, filters_stack); | ||
1412 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6439 times.
|
6439 | if (ret < 0) { |
1413 | ✗ | av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n"); | |
1414 | ✗ | goto fail; | |
1415 | } | ||
1416 | } | ||
1417 | |||
1418 | 6327 | fail: | |
1419 | 6327 | av_freep(&filters_visited); | |
1420 | 6327 | av_freep(&filters_stack); | |
1421 | 6327 | return ret; | |
1422 | } | ||
1423 | |||
1424 | 6816 | static int start_prepare(Scheduler *sch) | |
1425 | { | ||
1426 | int ret; | ||
1427 | |||
1428 |
2/2✓ Branch 0 taken 6846 times.
✓ Branch 1 taken 6816 times.
|
13662 | for (unsigned i = 0; i < sch->nb_demux; i++) { |
1429 | 6846 | SchDemux *d = &sch->demux[i]; | |
1430 | |||
1431 |
2/2✓ Branch 0 taken 7072 times.
✓ Branch 1 taken 6846 times.
|
13918 | for (unsigned j = 0; j < d->nb_streams; j++) { |
1432 | 7072 | SchDemuxStream *ds = &d->streams[j]; | |
1433 | |||
1434 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7072 times.
|
7072 | if (!ds->nb_dst) { |
1435 | ✗ | av_log(d, AV_LOG_ERROR, | |
1436 | "Demuxer stream %u not connected to any sink\n", j); | ||
1437 | ✗ | return AVERROR(EINVAL); | |
1438 | } | ||
1439 | |||
1440 | 7072 | ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished)); | |
1441 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7072 times.
|
7072 | if (!ds->dst_finished) |
1442 | ✗ | return AVERROR(ENOMEM); | |
1443 | } | ||
1444 | } | ||
1445 | |||
1446 |
2/2✓ Branch 0 taken 6436 times.
✓ Branch 1 taken 6816 times.
|
13252 | for (unsigned i = 0; i < sch->nb_dec; i++) { |
1447 | 6436 | SchDec *dec = &sch->dec[i]; | |
1448 | |||
1449 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6436 times.
|
6436 | if (!dec->src.type) { |
1450 | ✗ | av_log(dec, AV_LOG_ERROR, | |
1451 | "Decoder not connected to a source\n"); | ||
1452 | ✗ | return AVERROR(EINVAL); | |
1453 | } | ||
1454 | |||
1455 |
2/2✓ Branch 0 taken 6442 times.
✓ Branch 1 taken 6436 times.
|
12878 | for (unsigned j = 0; j < dec->nb_outputs; j++) { |
1456 | 6442 | SchDecOutput *o = &dec->outputs[j]; | |
1457 | |||
1458 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6442 times.
|
6442 | if (!o->nb_dst) { |
1459 | ✗ | av_log(dec, AV_LOG_ERROR, | |
1460 | "Decoder output %u not connected to any sink\n", j); | ||
1461 | ✗ | return AVERROR(EINVAL); | |
1462 | } | ||
1463 | |||
1464 | 6442 | o->dst_finished = av_calloc(o->nb_dst, sizeof(*o->dst_finished)); | |
1465 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6442 times.
|
6442 | if (!o->dst_finished) |
1466 | ✗ | return AVERROR(ENOMEM); | |
1467 | } | ||
1468 | } | ||
1469 | |||
1470 |
2/2✓ Branch 0 taken 6598 times.
✓ Branch 1 taken 6816 times.
|
13414 | for (unsigned i = 0; i < sch->nb_enc; i++) { |
1471 | 6598 | SchEnc *enc = &sch->enc[i]; | |
1472 | |||
1473 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6598 times.
|
6598 | if (!enc->src.type) { |
1474 | ✗ | av_log(enc, AV_LOG_ERROR, | |
1475 | "Encoder not connected to a source\n"); | ||
1476 | ✗ | return AVERROR(EINVAL); | |
1477 | } | ||
1478 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6598 times.
|
6598 | if (!enc->nb_dst) { |
1479 | ✗ | av_log(enc, AV_LOG_ERROR, | |
1480 | "Encoder not connected to any sink\n"); | ||
1481 | ✗ | return AVERROR(EINVAL); | |
1482 | } | ||
1483 | |||
1484 | 6598 | enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished)); | |
1485 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6598 times.
|
6598 | if (!enc->dst_finished) |
1486 | ✗ | return AVERROR(ENOMEM); | |
1487 | } | ||
1488 | |||
1489 |
2/2✓ Branch 0 taken 6818 times.
✓ Branch 1 taken 6816 times.
|
13634 | for (unsigned i = 0; i < sch->nb_mux; i++) { |
1490 | 6818 | SchMux *mux = &sch->mux[i]; | |
1491 | |||
1492 |
2/2✓ Branch 0 taken 7258 times.
✓ Branch 1 taken 6818 times.
|
14076 | for (unsigned j = 0; j < mux->nb_streams; j++) { |
1493 | 7258 | SchMuxStream *ms = &mux->streams[j]; | |
1494 | |||
1495 |
2/3✓ Branch 0 taken 6598 times.
✓ Branch 1 taken 660 times.
✗ Branch 2 not taken.
|
7258 | switch (ms->src.type) { |
1496 | 6598 | case SCH_NODE_TYPE_ENC: { | |
1497 | 6598 | SchEnc *enc = &sch->enc[ms->src.idx]; | |
1498 |
2/2✓ Branch 0 taken 38 times.
✓ Branch 1 taken 6560 times.
|
6598 | if (enc->src.type == SCH_NODE_TYPE_DEC) { |
1499 | 38 | ms->src_sched = sch->dec[enc->src.idx].src; | |
1500 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
|
38 | av_assert0(ms->src_sched.type == SCH_NODE_TYPE_DEMUX); |
1501 | } else { | ||
1502 | 6560 | ms->src_sched = enc->src; | |
1503 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6560 times.
|
6560 | av_assert0(ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT); |
1504 | } | ||
1505 | 6598 | break; | |
1506 | } | ||
1507 | 660 | case SCH_NODE_TYPE_DEMUX: | |
1508 | 660 | ms->src_sched = ms->src; | |
1509 | 660 | break; | |
1510 | ✗ | default: | |
1511 | ✗ | av_log(mux, AV_LOG_ERROR, | |
1512 | "Muxer stream #%u not connected to a source\n", j); | ||
1513 | ✗ | return AVERROR(EINVAL); | |
1514 | } | ||
1515 | } | ||
1516 | |||
1517 | 6818 | ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size, | |
1518 | QUEUE_PACKETS); | ||
1519 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6818 times.
|
6818 | if (ret < 0) |
1520 | ✗ | return ret; | |
1521 | } | ||
1522 | |||
1523 |
2/2✓ Branch 0 taken 6439 times.
✓ Branch 1 taken 6816 times.
|
13255 | for (unsigned i = 0; i < sch->nb_filters; i++) { |
1524 | 6439 | SchFilterGraph *fg = &sch->filters[i]; | |
1525 | |||
1526 |
2/2✓ Branch 0 taken 6456 times.
✓ Branch 1 taken 6439 times.
|
12895 | for (unsigned j = 0; j < fg->nb_inputs; j++) { |
1527 | 6456 | SchFilterIn *fi = &fg->inputs[j]; | |
1528 | SchDec *dec; | ||
1529 | |||
1530 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6456 times.
|
6456 | if (!fi->src.type) { |
1531 | ✗ | av_log(fg, AV_LOG_ERROR, | |
1532 | "Filtergraph input %u not connected to a source\n", j); | ||
1533 | ✗ | return AVERROR(EINVAL); | |
1534 | } | ||
1535 | |||
1536 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6456 times.
|
6456 | if (fi->src.type == SCH_NODE_TYPE_FILTER_OUT) |
1537 | ✗ | fi->src_sched = fi->src; | |
1538 | else { | ||
1539 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6456 times.
|
6456 | av_assert0(fi->src.type == SCH_NODE_TYPE_DEC); |
1540 | 6456 | dec = &sch->dec[fi->src.idx]; | |
1541 | |||
1542 |
2/3✓ Branch 0 taken 6455 times.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
6456 | switch (dec->src.type) { |
1543 | 6455 | case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break; | |
1544 | 1 | case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break; | |
1545 | ✗ | default: av_assert0(0); | |
1546 | } | ||
1547 | } | ||
1548 | } | ||
1549 | |||
1550 |
2/2✓ Branch 0 taken 6560 times.
✓ Branch 1 taken 6439 times.
|
12999 | for (unsigned j = 0; j < fg->nb_outputs; j++) { |
1551 | 6560 | SchFilterOut *fo = &fg->outputs[j]; | |
1552 | |||
1553 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6560 times.
|
6560 | if (!fo->dst.type) { |
1554 | ✗ | av_log(fg, AV_LOG_ERROR, | |
1555 | "Filtergraph %u output %u not connected to a sink\n", i, j); | ||
1556 | ✗ | return AVERROR(EINVAL); | |
1557 | } | ||
1558 | } | ||
1559 | } | ||
1560 | |||
1561 | // Check that the transcoding graph has no cycles. | ||
1562 | 6816 | ret = check_acyclic(sch); | |
1563 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6816 times.
|
6816 | if (ret < 0) |
1564 | ✗ | return ret; | |
1565 | |||
1566 | 6816 | return 0; | |
1567 | } | ||
1568 | |||
1569 | 6816 | int sch_start(Scheduler *sch) | |
1570 | { | ||
1571 | int ret; | ||
1572 | |||
1573 | 6816 | ret = start_prepare(sch); | |
1574 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6816 times.
|
6816 | if (ret < 0) |
1575 | ✗ | return ret; | |
1576 | |||
1577 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6816 times.
|
6816 | av_assert0(sch->state == SCH_STATE_UNINIT); |
1578 | 6816 | sch->state = SCH_STATE_STARTED; | |
1579 | |||
1580 |
2/2✓ Branch 0 taken 6818 times.
✓ Branch 1 taken 6816 times.
|
13634 | for (unsigned i = 0; i < sch->nb_mux; i++) { |
1581 | 6818 | SchMux *mux = &sch->mux[i]; | |
1582 | |||
1583 |
2/2✓ Branch 0 taken 454 times.
✓ Branch 1 taken 6364 times.
|
6818 | if (mux->nb_streams_ready == mux->nb_streams) { |
1584 | 454 | ret = mux_init(sch, mux); | |
1585 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 454 times.
|
454 | if (ret < 0) |
1586 | ✗ | goto fail; | |
1587 | } | ||
1588 | } | ||
1589 | |||
1590 |
2/2✓ Branch 0 taken 6598 times.
✓ Branch 1 taken 6816 times.
|
13414 | for (unsigned i = 0; i < sch->nb_enc; i++) { |
1591 | 6598 | SchEnc *enc = &sch->enc[i]; | |
1592 | |||
1593 | 6598 | ret = task_start(&enc->task); | |
1594 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6598 times.
|
6598 | if (ret < 0) |
1595 | ✗ | goto fail; | |
1596 | } | ||
1597 | |||
1598 |
2/2✓ Branch 0 taken 6439 times.
✓ Branch 1 taken 6816 times.
|
13255 | for (unsigned i = 0; i < sch->nb_filters; i++) { |
1599 | 6439 | SchFilterGraph *fg = &sch->filters[i]; | |
1600 | |||
1601 | 6439 | ret = task_start(&fg->task); | |
1602 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6439 times.
|
6439 | if (ret < 0) |
1603 | ✗ | goto fail; | |
1604 | } | ||
1605 | |||
1606 |
2/2✓ Branch 0 taken 6436 times.
✓ Branch 1 taken 6816 times.
|
13252 | for (unsigned i = 0; i < sch->nb_dec; i++) { |
1607 | 6436 | SchDec *dec = &sch->dec[i]; | |
1608 | |||
1609 | 6436 | ret = task_start(&dec->task); | |
1610 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6436 times.
|
6436 | if (ret < 0) |
1611 | ✗ | goto fail; | |
1612 | } | ||
1613 | |||
1614 |
2/2✓ Branch 0 taken 6846 times.
✓ Branch 1 taken 6816 times.
|
13662 | for (unsigned i = 0; i < sch->nb_demux; i++) { |
1615 | 6846 | SchDemux *d = &sch->demux[i]; | |
1616 | |||
1617 |
2/2✓ Branch 0 taken 14 times.
✓ Branch 1 taken 6832 times.
|
6846 | if (!d->nb_streams) |
1618 | 14 | continue; | |
1619 | |||
1620 | 6832 | ret = task_start(&d->task); | |
1621 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6832 times.
|
6832 | if (ret < 0) |
1622 | ✗ | goto fail; | |
1623 | } | ||
1624 | |||
1625 | 6816 | pthread_mutex_lock(&sch->schedule_lock); | |
1626 | 6816 | schedule_update_locked(sch); | |
1627 | 6816 | pthread_mutex_unlock(&sch->schedule_lock); | |
1628 | |||
1629 | 6816 | return 0; | |
1630 | ✗ | fail: | |
1631 | ✗ | sch_stop(sch, NULL); | |
1632 | ✗ | return ret; | |
1633 | } | ||
1634 | |||
1635 | 20857 | int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts) | |
1636 | { | ||
1637 | int ret, err; | ||
1638 | |||
1639 | // convert delay to absolute timestamp | ||
1640 | 20857 | timeout_us += av_gettime(); | |
1641 | |||
1642 | 20857 | pthread_mutex_lock(&sch->mux_done_lock); | |
1643 | |||
1644 |
1/2✓ Branch 0 taken 20857 times.
✗ Branch 1 not taken.
|
20857 | if (sch->nb_mux_done < sch->nb_mux) { |
1645 | 20857 | struct timespec tv = { .tv_sec = timeout_us / 1000000, | |
1646 | 20857 | .tv_nsec = (timeout_us % 1000000) * 1000 }; | |
1647 | 20857 | pthread_cond_timedwait(&sch->mux_done_cond, &sch->mux_done_lock, &tv); | |
1648 | } | ||
1649 | |||
1650 | 20857 | ret = sch->nb_mux_done == sch->nb_mux; | |
1651 | |||
1652 | 20857 | pthread_mutex_unlock(&sch->mux_done_lock); | |
1653 | |||
1654 | 20857 | *transcode_ts = atomic_load(&sch->last_dts); | |
1655 | |||
1656 | // abort transcoding if any task failed | ||
1657 | 20857 | err = atomic_load(&sch->task_failed); | |
1658 | |||
1659 |
3/4✓ Branch 0 taken 14041 times.
✓ Branch 1 taken 6816 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 14041 times.
|
20857 | return ret || err; |
1660 | } | ||
1661 | |||
1662 | 6560 | static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame) | |
1663 | { | ||
1664 | int ret; | ||
1665 | |||
1666 | 6560 | ret = enc->open_cb(enc->task.func_arg, frame); | |
1667 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6560 times.
|
6560 | if (ret < 0) |
1668 | ✗ | return ret; | |
1669 | |||
1670 | // ret>0 signals audio frame size, which means sync queue must | ||
1671 | // have been enabled during encoder creation | ||
1672 |
2/2✓ Branch 0 taken 163 times.
✓ Branch 1 taken 6397 times.
|
6560 | if (ret > 0) { |
1673 | SchSyncQueue *sq; | ||
1674 | |||
1675 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 163 times.
|
163 | av_assert0(enc->sq_idx[0] >= 0); |
1676 | 163 | sq = &sch->sq_enc[enc->sq_idx[0]]; | |
1677 | |||
1678 | 163 | pthread_mutex_lock(&sq->lock); | |
1679 | |||
1680 | 163 | sq_frame_samples(sq->sq, enc->sq_idx[1], ret); | |
1681 | |||
1682 | 163 | pthread_mutex_unlock(&sq->lock); | |
1683 | } | ||
1684 | |||
1685 | 6560 | return 0; | |
1686 | } | ||
1687 | |||
1688 | 428169 | static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame) | |
1689 | { | ||
1690 | int ret; | ||
1691 | |||
1692 |
2/2✓ Branch 0 taken 13286 times.
✓ Branch 1 taken 414883 times.
|
428169 | if (!frame) { |
1693 | 13286 | tq_send_finish(enc->queue, 0); | |
1694 | 13286 | return 0; | |
1695 | } | ||
1696 | |||
1697 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 414883 times.
|
414883 | if (enc->in_finished) |
1698 | ✗ | return AVERROR_EOF; | |
1699 | |||
1700 | 414883 | ret = tq_send(enc->queue, 0, frame); | |
1701 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 414882 times.
|
414883 | if (ret < 0) |
1702 | 1 | enc->in_finished = 1; | |
1703 | |||
1704 | 414883 | return ret; | |
1705 | } | ||
1706 | |||
1707 | 34126 | static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame) | |
1708 | { | ||
1709 | 34126 | SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]]; | |
1710 | 34126 | int ret = 0; | |
1711 | |||
1712 | // inform the scheduling code that no more input will arrive along this path; | ||
1713 | // this is necessary because the sync queue may not send an EOF downstream | ||
1714 | // until other streams finish | ||
1715 | // TODO: consider a cleaner way of passing this information through | ||
1716 | // the pipeline | ||
1717 |
2/2✓ Branch 0 taken 3009 times.
✓ Branch 1 taken 31117 times.
|
34126 | if (!frame) { |
1718 |
2/2✓ Branch 0 taken 3009 times.
✓ Branch 1 taken 3009 times.
|
6018 | for (unsigned i = 0; i < enc->nb_dst; i++) { |
1719 | SchMux *mux; | ||
1720 | SchMuxStream *ms; | ||
1721 | |||
1722 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3009 times.
|
3009 | if (enc->dst[i].type != SCH_NODE_TYPE_MUX) |
1723 | ✗ | continue; | |
1724 | |||
1725 | 3009 | mux = &sch->mux[enc->dst[i].idx]; | |
1726 | 3009 | ms = &mux->streams[enc->dst[i].idx_stream]; | |
1727 | |||
1728 | 3009 | pthread_mutex_lock(&sch->schedule_lock); | |
1729 | |||
1730 | 3009 | ms->source_finished = 1; | |
1731 | 3009 | schedule_update_locked(sch); | |
1732 | |||
1733 | 3009 | pthread_mutex_unlock(&sch->schedule_lock); | |
1734 | } | ||
1735 | } | ||
1736 | |||
1737 | 34126 | pthread_mutex_lock(&sq->lock); | |
1738 | |||
1739 | 34126 | ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame)); | |
1740 |
2/2✓ Branch 0 taken 34125 times.
✓ Branch 1 taken 1 times.
|
34126 | if (ret < 0) |
1741 | 1 | goto finish; | |
1742 | |||
1743 | 81961 | while (1) { | |
1744 | SchEnc *enc; | ||
1745 | |||
1746 | // TODO: the SQ API should be extended to allow returning EOF | ||
1747 | // for individual streams | ||
1748 | 116086 | ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame)); | |
1749 |
2/2✓ Branch 0 taken 34125 times.
✓ Branch 1 taken 81961 times.
|
116086 | if (ret < 0) { |
1750 |
2/2✓ Branch 0 taken 5577 times.
✓ Branch 1 taken 28548 times.
|
34125 | ret = (ret == AVERROR(EAGAIN)) ? 0 : ret; |
1751 | 34125 | break; | |
1752 | } | ||
1753 | |||
1754 | 81961 | enc = &sch->enc[sq->enc_idx[ret]]; | |
1755 | 81961 | ret = send_to_enc_thread(sch, enc, sq->frame); | |
1756 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 81961 times.
|
81961 | if (ret < 0) { |
1757 | ✗ | av_frame_unref(sq->frame); | |
1758 | ✗ | if (ret != AVERROR_EOF) | |
1759 | ✗ | break; | |
1760 | |||
1761 | ✗ | sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL)); | |
1762 | ✗ | continue; | |
1763 | } | ||
1764 | } | ||
1765 | |||
1766 |
2/2✓ Branch 0 taken 28548 times.
✓ Branch 1 taken 5577 times.
|
34125 | if (ret < 0) { |
1767 | // close all encoders fed from this sync queue | ||
1768 |
2/2✓ Branch 0 taken 5789 times.
✓ Branch 1 taken 5577 times.
|
11366 | for (unsigned i = 0; i < sq->nb_enc_idx; i++) { |
1769 | 5789 | int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL); | |
1770 | |||
1771 | // if the sync queue error is EOF and closing the encoder | ||
1772 | // produces a more serious error, make sure to pick the latter | ||
1773 |
2/4✓ Branch 0 taken 5789 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 5789 times.
✗ Branch 3 not taken.
|
5789 | ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err); |
1774 | } | ||
1775 | } | ||
1776 | |||
1777 | 34125 | finish: | |
1778 | 34126 | pthread_mutex_unlock(&sq->lock); | |
1779 | |||
1780 | 34126 | return ret; | |
1781 | } | ||
1782 | |||
1783 | 374548 | static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame) | |
1784 | { | ||
1785 |
6/6✓ Branch 0 taken 373631 times.
✓ Branch 1 taken 917 times.
✓ Branch 2 taken 363163 times.
✓ Branch 3 taken 10468 times.
✓ Branch 4 taken 6560 times.
✓ Branch 5 taken 356603 times.
|
374548 | if (enc->open_cb && frame && !enc->opened) { |
1786 | 6560 | int ret = enc_open(sch, enc, frame); | |
1787 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6560 times.
|
6560 | if (ret < 0) |
1788 | ✗ | return ret; | |
1789 | 6560 | enc->opened = 1; | |
1790 | |||
1791 | // discard empty frames that only carry encoder init parameters | ||
1792 |
2/2✓ Branch 0 taken 3 times.
✓ Branch 1 taken 6557 times.
|
6560 | if (!frame->buf[0]) { |
1793 | 3 | av_frame_unref(frame); | |
1794 | 3 | return 0; | |
1795 | } | ||
1796 | } | ||
1797 | |||
1798 | 374545 | return (enc->sq_idx[0] >= 0) ? | |
1799 |
2/2✓ Branch 0 taken 34126 times.
✓ Branch 1 taken 340419 times.
|
714964 | send_to_enc_sq (sch, enc, frame) : |
1800 | 340419 | send_to_enc_thread(sch, enc, frame); | |
1801 | } | ||
1802 | |||
1803 | 3414 | static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt) | |
1804 | { | ||
1805 | 3414 | PreMuxQueue *q = &ms->pre_mux_queue; | |
1806 | 3414 | AVPacket *tmp_pkt = NULL; | |
1807 | int ret; | ||
1808 | |||
1809 |
2/2✓ Branch 1 taken 151 times.
✓ Branch 2 taken 3263 times.
|
3414 | if (!av_fifo_can_write(q->fifo)) { |
1810 | 151 | size_t packets = av_fifo_can_read(q->fifo); | |
1811 |
1/2✓ Branch 0 taken 151 times.
✗ Branch 1 not taken.
|
151 | size_t pkt_size = pkt ? pkt->size : 0; |
1812 | 151 | int thresh_reached = (q->data_size + pkt_size) > q->data_threshold; | |
1813 |
2/2✓ Branch 0 taken 146 times.
✓ Branch 1 taken 5 times.
|
151 | size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX; |
1814 | 151 | size_t new_size = FFMIN(2 * packets, max_packets); | |
1815 | |||
1816 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 151 times.
|
151 | if (new_size <= packets) { |
1817 | ✗ | av_log(mux, AV_LOG_ERROR, | |
1818 | "Too many packets buffered for output stream.\n"); | ||
1819 | ✗ | return AVERROR(ENOSPC); | |
1820 | } | ||
1821 | 151 | ret = av_fifo_grow2(q->fifo, new_size - packets); | |
1822 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 151 times.
|
151 | if (ret < 0) |
1823 | ✗ | return ret; | |
1824 | } | ||
1825 | |||
1826 |
2/2✓ Branch 0 taken 3312 times.
✓ Branch 1 taken 102 times.
|
3414 | if (pkt) { |
1827 | 3312 | tmp_pkt = av_packet_alloc(); | |
1828 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3312 times.
|
3312 | if (!tmp_pkt) |
1829 | ✗ | return AVERROR(ENOMEM); | |
1830 | |||
1831 | 3312 | av_packet_move_ref(tmp_pkt, pkt); | |
1832 | 3312 | q->data_size += tmp_pkt->size; | |
1833 | } | ||
1834 | 3414 | av_fifo_write(q->fifo, &tmp_pkt, 1); | |
1835 | |||
1836 | 3414 | return 0; | |
1837 | } | ||
1838 | |||
1839 | 483645 | static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx, | |
1840 | AVPacket *pkt) | ||
1841 | { | ||
1842 | 483645 | SchMuxStream *ms = &mux->streams[stream_idx]; | |
1843 |
2/2✓ Branch 0 taken 467180 times.
✓ Branch 1 taken 9207 times.
|
476387 | int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ? |
1844 |
2/2✓ Branch 0 taken 476387 times.
✓ Branch 1 taken 7258 times.
|
960032 | av_rescale_q(pkt->dts + pkt->duration, pkt->time_base, AV_TIME_BASE_Q) : |
1845 | AV_NOPTS_VALUE; | ||
1846 | |||
1847 | // queue the packet if the muxer cannot be started yet | ||
1848 |
2/2✓ Branch 0 taken 3448 times.
✓ Branch 1 taken 480197 times.
|
483645 | if (!atomic_load(&mux->mux_started)) { |
1849 | 3448 | int queued = 0; | |
1850 | |||
1851 | // the muxer could have started between the above atomic check and | ||
1852 | // locking the mutex, then this block falls through to normal send path | ||
1853 | 3448 | pthread_mutex_lock(&sch->mux_ready_lock); | |
1854 | |||
1855 |
2/2✓ Branch 0 taken 3414 times.
✓ Branch 1 taken 34 times.
|
3448 | if (!atomic_load(&mux->mux_started)) { |
1856 | 3414 | int ret = mux_queue_packet(mux, ms, pkt); | |
1857 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3414 times.
|
3414 | queued = ret < 0 ? ret : 1; |
1858 | } | ||
1859 | |||
1860 | 3448 | pthread_mutex_unlock(&sch->mux_ready_lock); | |
1861 | |||
1862 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3448 times.
|
3448 | if (queued < 0) |
1863 | ✗ | return queued; | |
1864 |
2/2✓ Branch 0 taken 3414 times.
✓ Branch 1 taken 34 times.
|
3448 | else if (queued) |
1865 | 3414 | goto update_schedule; | |
1866 | } | ||
1867 | |||
1868 |
2/2✓ Branch 0 taken 473075 times.
✓ Branch 1 taken 7156 times.
|
480231 | if (pkt) { |
1869 | int ret; | ||
1870 | |||
1871 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 473075 times.
|
473075 | if (ms->init_eof) |
1872 | ✗ | return AVERROR_EOF; | |
1873 | |||
1874 | 473075 | ret = tq_send(mux->queue, stream_idx, pkt); | |
1875 |
2/2✓ Branch 0 taken 52 times.
✓ Branch 1 taken 473023 times.
|
473075 | if (ret < 0) |
1876 | 52 | return ret; | |
1877 | } else | ||
1878 | 7156 | tq_send_finish(mux->queue, stream_idx); | |
1879 | |||
1880 | 483593 | update_schedule: | |
1881 | // TODO: use atomics to check whether this changes trailing dts | ||
1882 | // to avoid locking unnecesarily | ||
1883 |
4/4✓ Branch 0 taken 16465 times.
✓ Branch 1 taken 467128 times.
✓ Branch 2 taken 7258 times.
✓ Branch 3 taken 9207 times.
|
483593 | if (dts != AV_NOPTS_VALUE || !pkt) { |
1884 | 474386 | pthread_mutex_lock(&sch->schedule_lock); | |
1885 | |||
1886 |
2/2✓ Branch 0 taken 467128 times.
✓ Branch 1 taken 7258 times.
|
474386 | if (pkt) ms->last_dts = dts; |
1887 | 7258 | else ms->source_finished = 1; | |
1888 | |||
1889 | 474386 | schedule_update_locked(sch); | |
1890 | |||
1891 | 474386 | pthread_mutex_unlock(&sch->schedule_lock); | |
1892 | } | ||
1893 | |||
1894 | 483593 | return 0; | |
1895 | } | ||
1896 | |||
1897 | static int | ||
1898 | 470478 | demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst, | |
1899 | uint8_t *dst_finished, AVPacket *pkt, unsigned flags) | ||
1900 | { | ||
1901 | int ret; | ||
1902 | |||
1903 |
2/2✓ Branch 0 taken 3118 times.
✓ Branch 1 taken 467360 times.
|
470478 | if (*dst_finished) |
1904 | 3118 | return AVERROR_EOF; | |
1905 | |||
1906 |
4/4✓ Branch 0 taken 463383 times.
✓ Branch 1 taken 3977 times.
✓ Branch 2 taken 67680 times.
✓ Branch 3 taken 395703 times.
|
467360 | if (pkt && dst.type == SCH_NODE_TYPE_MUX && |
1907 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 67678 times.
|
67680 | (flags & DEMUX_SEND_STREAMCOPY_EOF)) { |
1908 | 2 | av_packet_unref(pkt); | |
1909 | 2 | pkt = NULL; | |
1910 | } | ||
1911 | |||
1912 |
2/2✓ Branch 0 taken 3979 times.
✓ Branch 1 taken 463381 times.
|
467360 | if (!pkt) |
1913 | 3979 | goto finish; | |
1914 | |||
1915 | 926762 | ret = (dst.type == SCH_NODE_TYPE_MUX) ? | |
1916 |
2/2✓ Branch 0 taken 67678 times.
✓ Branch 1 taken 395703 times.
|
463381 | send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) : |
1917 | 395703 | tq_send(sch->dec[dst.idx].queue, 0, pkt); | |
1918 |
2/2✓ Branch 0 taken 3116 times.
✓ Branch 1 taken 460265 times.
|
463381 | if (ret == AVERROR_EOF) |
1919 | 3116 | goto finish; | |
1920 | |||
1921 | 460265 | return ret; | |
1922 | |||
1923 | 7095 | finish: | |
1924 |
2/2✓ Branch 0 taken 660 times.
✓ Branch 1 taken 6435 times.
|
7095 | if (dst.type == SCH_NODE_TYPE_MUX) |
1925 | 660 | send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL); | |
1926 | else | ||
1927 | 6435 | tq_send_finish(sch->dec[dst.idx].queue, 0); | |
1928 | |||
1929 | 7095 | *dst_finished = 1; | |
1930 | 7095 | return AVERROR_EOF; | |
1931 | } | ||
1932 | |||
1933 | 470015 | static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds, | |
1934 | AVPacket *pkt, unsigned flags) | ||
1935 | { | ||
1936 | 470015 | unsigned nb_done = 0; | |
1937 | |||
1938 |
2/2✓ Branch 0 taken 470478 times.
✓ Branch 1 taken 470015 times.
|
940493 | for (unsigned i = 0; i < ds->nb_dst; i++) { |
1939 | 470478 | AVPacket *to_send = pkt; | |
1940 | 470478 | uint8_t *finished = &ds->dst_finished[i]; | |
1941 | |||
1942 | int ret; | ||
1943 | |||
1944 | // sending a packet consumes it, so make a temporary reference if needed | ||
1945 |
4/4✓ Branch 0 taken 463383 times.
✓ Branch 1 taken 7095 times.
✓ Branch 2 taken 440 times.
✓ Branch 3 taken 462943 times.
|
470478 | if (pkt && i < ds->nb_dst - 1) { |
1946 | 440 | to_send = d->send_pkt; | |
1947 | |||
1948 | 440 | ret = av_packet_ref(to_send, pkt); | |
1949 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 440 times.
|
440 | if (ret < 0) |
1950 | ✗ | return ret; | |
1951 | } | ||
1952 | |||
1953 | 470478 | ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags); | |
1954 |
2/2✓ Branch 0 taken 463383 times.
✓ Branch 1 taken 7095 times.
|
470478 | if (to_send) |
1955 | 463383 | av_packet_unref(to_send); | |
1956 |
2/2✓ Branch 0 taken 10213 times.
✓ Branch 1 taken 460265 times.
|
470478 | if (ret == AVERROR_EOF) |
1957 | 10213 | nb_done++; | |
1958 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 460265 times.
|
460265 | else if (ret < 0) |
1959 | ✗ | return ret; | |
1960 | } | ||
1961 | |||
1962 |
2/2✓ Branch 0 taken 10189 times.
✓ Branch 1 taken 459826 times.
|
470015 | return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0; |
1963 | } | ||
1964 | |||
1965 | 11 | static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt) | |
1966 | { | ||
1967 | 11 | Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE }; | |
1968 | |||
1969 |
3/6✓ Branch 0 taken 11 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 11 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 11 times.
|
11 | av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems); |
1970 | |||
1971 |
2/2✓ Branch 0 taken 14 times.
✓ Branch 1 taken 11 times.
|
25 | for (unsigned i = 0; i < d->nb_streams; i++) { |
1972 | 14 | SchDemuxStream *ds = &d->streams[i]; | |
1973 | |||
1974 |
2/2✓ Branch 0 taken 14 times.
✓ Branch 1 taken 14 times.
|
28 | for (unsigned j = 0; j < ds->nb_dst; j++) { |
1975 | 14 | const SchedulerNode *dst = &ds->dst[j]; | |
1976 | SchDec *dec; | ||
1977 | int ret; | ||
1978 | |||
1979 |
3/4✓ Branch 0 taken 14 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
✓ Branch 3 taken 6 times.
|
14 | if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC) |
1980 | 8 | continue; | |
1981 | |||
1982 | 6 | dec = &sch->dec[dst->idx]; | |
1983 | |||
1984 | 6 | ret = tq_send(dec->queue, 0, pkt); | |
1985 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (ret < 0) |
1986 | ✗ | return ret; | |
1987 | |||
1988 |
2/2✓ Branch 0 taken 3 times.
✓ Branch 1 taken 3 times.
|
6 | if (dec->queue_end_ts) { |
1989 | Timestamp ts; | ||
1990 | 3 | ret = av_thread_message_queue_recv(dec->queue_end_ts, &ts, 0); | |
1991 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (ret < 0) |
1992 | ✗ | return ret; | |
1993 | |||
1994 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (max_end_ts.ts == AV_NOPTS_VALUE || |
1995 | ✗ | (ts.ts != AV_NOPTS_VALUE && | |
1996 | ✗ | av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0)) | |
1997 | 3 | max_end_ts = ts; | |
1998 | |||
1999 | } | ||
2000 | } | ||
2001 | } | ||
2002 | |||
2003 | 11 | pkt->pts = max_end_ts.ts; | |
2004 | 11 | pkt->time_base = max_end_ts.tb; | |
2005 | |||
2006 | 11 | return 0; | |
2007 | } | ||
2008 | |||
2009 | 463011 | int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt, | |
2010 | unsigned flags) | ||
2011 | { | ||
2012 | SchDemux *d; | ||
2013 | int terminate; | ||
2014 | |||
2015 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 463011 times.
|
463011 | av_assert0(demux_idx < sch->nb_demux); |
2016 | 463011 | d = &sch->demux[demux_idx]; | |
2017 | |||
2018 | 463011 | terminate = waiter_wait(sch, &d->waiter); | |
2019 |
2/2✓ Branch 0 taken 57 times.
✓ Branch 1 taken 462954 times.
|
463011 | if (terminate) |
2020 | 57 | return AVERROR_EXIT; | |
2021 | |||
2022 | // flush the downstreams after seek | ||
2023 |
2/2✓ Branch 0 taken 11 times.
✓ Branch 1 taken 462943 times.
|
462954 | if (pkt->stream_index == -1) |
2024 | 11 | return demux_flush(sch, d, pkt); | |
2025 | |||
2026 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 462943 times.
|
462943 | av_assert0(pkt->stream_index < d->nb_streams); |
2027 | |||
2028 | 462943 | return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags); | |
2029 | } | ||
2030 | |||
2031 | 6846 | static int demux_done(Scheduler *sch, unsigned demux_idx) | |
2032 | { | ||
2033 | 6846 | SchDemux *d = &sch->demux[demux_idx]; | |
2034 | 6846 | int ret = 0; | |
2035 | |||
2036 |
2/2✓ Branch 0 taken 7072 times.
✓ Branch 1 taken 6846 times.
|
13918 | for (unsigned i = 0; i < d->nb_streams; i++) { |
2037 | 7072 | int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0); | |
2038 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7072 times.
|
7072 | if (err != AVERROR_EOF) |
2039 | ✗ | ret = err_merge(ret, err); | |
2040 | } | ||
2041 | |||
2042 | 6846 | pthread_mutex_lock(&sch->schedule_lock); | |
2043 | |||
2044 | 6846 | d->task_exited = 1; | |
2045 | |||
2046 | 6846 | schedule_update_locked(sch); | |
2047 | |||
2048 | 6846 | pthread_mutex_unlock(&sch->schedule_lock); | |
2049 | |||
2050 | 6846 | return ret; | |
2051 | } | ||
2052 | |||
2053 | 490126 | int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt) | |
2054 | { | ||
2055 | SchMux *mux; | ||
2056 | int ret, stream_idx; | ||
2057 | |||
2058 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 490126 times.
|
490126 | av_assert0(mux_idx < sch->nb_mux); |
2059 | 490126 | mux = &sch->mux[mux_idx]; | |
2060 | |||
2061 | 490126 | ret = tq_receive(mux->queue, &stream_idx, pkt); | |
2062 | 490126 | pkt->stream_index = stream_idx; | |
2063 | 490126 | return ret; | |
2064 | } | ||
2065 | |||
2066 | 199 | void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx) | |
2067 | { | ||
2068 | SchMux *mux; | ||
2069 | |||
2070 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 199 times.
|
199 | av_assert0(mux_idx < sch->nb_mux); |
2071 | 199 | mux = &sch->mux[mux_idx]; | |
2072 | |||
2073 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 199 times.
|
199 | av_assert0(stream_idx < mux->nb_streams); |
2074 | 199 | tq_receive_finish(mux->queue, stream_idx); | |
2075 | |||
2076 | 199 | pthread_mutex_lock(&sch->schedule_lock); | |
2077 | 199 | mux->streams[stream_idx].source_finished = 1; | |
2078 | |||
2079 | 199 | schedule_update_locked(sch); | |
2080 | |||
2081 | 199 | pthread_mutex_unlock(&sch->schedule_lock); | |
2082 | 199 | } | |
2083 | |||
2084 | 438927 | int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, | |
2085 | const AVPacket *pkt) | ||
2086 | { | ||
2087 | SchMux *mux; | ||
2088 | SchMuxStream *ms; | ||
2089 | |||
2090 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 438927 times.
|
438927 | av_assert0(mux_idx < sch->nb_mux); |
2091 | 438927 | mux = &sch->mux[mux_idx]; | |
2092 | |||
2093 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 438927 times.
|
438927 | av_assert0(stream_idx < mux->nb_streams); |
2094 | 438927 | ms = &mux->streams[stream_idx]; | |
2095 | |||
2096 |
2/2✓ Branch 0 taken 5 times.
✓ Branch 1 taken 438927 times.
|
438932 | for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) { |
2097 | 5 | SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]]; | |
2098 | int ret; | ||
2099 | |||
2100 | 5 | ret = av_packet_copy_props(mux->sub_heartbeat_pkt, pkt); | |
2101 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
|
5 | if (ret < 0) |
2102 | ✗ | return ret; | |
2103 | |||
2104 | 5 | tq_send(dst->queue, 0, mux->sub_heartbeat_pkt); | |
2105 | } | ||
2106 | |||
2107 | 438927 | return 0; | |
2108 | } | ||
2109 | |||
2110 | 6818 | static int mux_done(Scheduler *sch, unsigned mux_idx) | |
2111 | { | ||
2112 | 6818 | SchMux *mux = &sch->mux[mux_idx]; | |
2113 | |||
2114 | 6818 | pthread_mutex_lock(&sch->schedule_lock); | |
2115 | |||
2116 |
2/2✓ Branch 0 taken 7258 times.
✓ Branch 1 taken 6818 times.
|
14076 | for (unsigned i = 0; i < mux->nb_streams; i++) { |
2117 | 7258 | tq_receive_finish(mux->queue, i); | |
2118 | 7258 | mux->streams[i].source_finished = 1; | |
2119 | } | ||
2120 | |||
2121 | 6818 | schedule_update_locked(sch); | |
2122 | |||
2123 | 6818 | pthread_mutex_unlock(&sch->schedule_lock); | |
2124 | |||
2125 | 6818 | pthread_mutex_lock(&sch->mux_done_lock); | |
2126 | |||
2127 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6818 times.
|
6818 | av_assert0(sch->nb_mux_done < sch->nb_mux); |
2128 | 6818 | sch->nb_mux_done++; | |
2129 | |||
2130 | 6818 | pthread_cond_signal(&sch->mux_done_cond); | |
2131 | |||
2132 | 6818 | pthread_mutex_unlock(&sch->mux_done_lock); | |
2133 | |||
2134 | 6818 | return 0; | |
2135 | } | ||
2136 | |||
2137 | 371392 | int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt) | |
2138 | { | ||
2139 | SchDec *dec; | ||
2140 | int ret, dummy; | ||
2141 | |||
2142 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 371392 times.
|
371392 | av_assert0(dec_idx < sch->nb_dec); |
2143 | 371392 | dec = &sch->dec[dec_idx]; | |
2144 | |||
2145 | // the decoder should have given us post-flush end timestamp in pkt | ||
2146 |
2/2✓ Branch 0 taken 3 times.
✓ Branch 1 taken 371389 times.
|
371392 | if (dec->expect_end_ts) { |
2147 | 3 | Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base }; | |
2148 | 3 | ret = av_thread_message_queue_send(dec->queue_end_ts, &ts, 0); | |
2149 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (ret < 0) |
2150 | ✗ | return ret; | |
2151 | |||
2152 | 3 | dec->expect_end_ts = 0; | |
2153 | } | ||
2154 | |||
2155 | 371392 | ret = tq_receive(dec->queue, &dummy, pkt); | |
2156 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 371392 times.
|
371392 | av_assert0(dummy <= 0); |
2157 | |||
2158 | // got a flush packet, on the next call to this function the decoder | ||
2159 | // will give us post-flush end timestamp | ||
2160 |
7/8✓ Branch 0 taken 368087 times.
✓ Branch 1 taken 3305 times.
✓ Branch 2 taken 958 times.
✓ Branch 3 taken 367129 times.
✓ Branch 4 taken 958 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 3 times.
✓ Branch 7 taken 955 times.
|
371392 | if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts) |
2161 | 3 | dec->expect_end_ts = 1; | |
2162 | |||
2163 | 371392 | return ret; | |
2164 | } | ||
2165 | |||
2166 | 399751 | static int send_to_filter(Scheduler *sch, SchFilterGraph *fg, | |
2167 | unsigned in_idx, AVFrame *frame) | ||
2168 | { | ||
2169 |
2/2✓ Branch 0 taken 393295 times.
✓ Branch 1 taken 6456 times.
|
399751 | if (frame) |
2170 | 393295 | return tq_send(fg->queue, in_idx, frame); | |
2171 | |||
2172 |
1/2✓ Branch 0 taken 6456 times.
✗ Branch 1 not taken.
|
6456 | if (!fg->inputs[in_idx].send_finished) { |
2173 | 6456 | fg->inputs[in_idx].send_finished = 1; | |
2174 | 6456 | tq_send_finish(fg->queue, in_idx); | |
2175 | |||
2176 | // close the control stream when all actual inputs are done | ||
2177 |
2/2✓ Branch 0 taken 6380 times.
✓ Branch 1 taken 76 times.
|
6456 | if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1) |
2178 | 6380 | tq_send_finish(fg->queue, fg->nb_inputs); | |
2179 | } | ||
2180 | 6456 | return 0; | |
2181 | } | ||
2182 | |||
2183 | 403856 | static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst, | |
2184 | uint8_t *dst_finished, AVFrame *frame) | ||
2185 | { | ||
2186 | int ret; | ||
2187 | |||
2188 |
2/2✓ Branch 0 taken 6366 times.
✓ Branch 1 taken 397490 times.
|
403856 | if (*dst_finished) |
2189 | 6366 | return AVERROR_EOF; | |
2190 | |||
2191 |
2/2✓ Branch 0 taken 3316 times.
✓ Branch 1 taken 394174 times.
|
397490 | if (!frame) |
2192 | 3316 | goto finish; | |
2193 | |||
2194 | 788348 | ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ? | |
2195 |
2/2✓ Branch 0 taken 393295 times.
✓ Branch 1 taken 879 times.
|
394174 | send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) : |
2196 | 879 | send_to_enc(sch, &sch->enc[dst.idx], frame); | |
2197 |
2/2✓ Branch 0 taken 3178 times.
✓ Branch 1 taken 390996 times.
|
394174 | if (ret == AVERROR_EOF) |
2198 | 3178 | goto finish; | |
2199 | |||
2200 | 390996 | return ret; | |
2201 | |||
2202 | 6494 | finish: | |
2203 |
2/2✓ Branch 0 taken 6456 times.
✓ Branch 1 taken 38 times.
|
6494 | if (dst.type == SCH_NODE_TYPE_FILTER_IN) |
2204 | 6456 | send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL); | |
2205 | else | ||
2206 | 38 | send_to_enc(sch, &sch->enc[dst.idx], NULL); | |
2207 | |||
2208 | 6494 | *dst_finished = 1; | |
2209 | |||
2210 | 6494 | return AVERROR_EOF; | |
2211 | } | ||
2212 | |||
2213 | 395930 | int sch_dec_send(Scheduler *sch, unsigned dec_idx, | |
2214 | unsigned out_idx, AVFrame *frame) | ||
2215 | { | ||
2216 | SchDec *dec; | ||
2217 | SchDecOutput *o; | ||
2218 | int ret; | ||
2219 | 395930 | unsigned nb_done = 0; | |
2220 | |||
2221 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 395930 times.
|
395930 | av_assert0(dec_idx < sch->nb_dec); |
2222 | 395930 | dec = &sch->dec[dec_idx]; | |
2223 | |||
2224 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 395930 times.
|
395930 | av_assert0(out_idx < dec->nb_outputs); |
2225 | 395930 | o = &dec->outputs[out_idx]; | |
2226 | |||
2227 |
2/2✓ Branch 0 taken 397362 times.
✓ Branch 1 taken 395930 times.
|
793292 | for (unsigned i = 0; i < o->nb_dst; i++) { |
2228 | 397362 | uint8_t *finished = &o->dst_finished[i]; | |
2229 | 397362 | AVFrame *to_send = frame; | |
2230 | |||
2231 | // sending a frame consumes it, so make a temporary reference if needed | ||
2232 |
2/2✓ Branch 0 taken 1432 times.
✓ Branch 1 taken 395930 times.
|
397362 | if (i < o->nb_dst - 1) { |
2233 | 1432 | to_send = dec->send_frame; | |
2234 | |||
2235 | // frame may sometimes contain props only, | ||
2236 | // e.g. to signal EOF timestamp | ||
2237 |
2/2✓ Branch 0 taken 1334 times.
✓ Branch 1 taken 98 times.
|
1432 | ret = frame->buf[0] ? av_frame_ref(to_send, frame) : |
2238 | 98 | av_frame_copy_props(to_send, frame); | |
2239 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1432 times.
|
1432 | if (ret < 0) |
2240 | ✗ | return ret; | |
2241 | } | ||
2242 | |||
2243 | 397362 | ret = dec_send_to_dst(sch, o->dst[i], finished, to_send); | |
2244 |
2/2✓ Branch 0 taken 6366 times.
✓ Branch 1 taken 390996 times.
|
397362 | if (ret < 0) { |
2245 | 6366 | av_frame_unref(to_send); | |
2246 |
1/2✓ Branch 0 taken 6366 times.
✗ Branch 1 not taken.
|
6366 | if (ret == AVERROR_EOF) { |
2247 | 6366 | nb_done++; | |
2248 | 6366 | continue; | |
2249 | } | ||
2250 | ✗ | return ret; | |
2251 | } | ||
2252 | } | ||
2253 | |||
2254 |
2/2✓ Branch 0 taken 6275 times.
✓ Branch 1 taken 389655 times.
|
395930 | return (nb_done == o->nb_dst) ? AVERROR_EOF : 0; |
2255 | } | ||
2256 | |||
2257 | 6436 | static int dec_done(Scheduler *sch, unsigned dec_idx) | |
2258 | { | ||
2259 | 6436 | SchDec *dec = &sch->dec[dec_idx]; | |
2260 | 6436 | int ret = 0; | |
2261 | |||
2262 | 6436 | tq_receive_finish(dec->queue, 0); | |
2263 | |||
2264 | // make sure our source does not get stuck waiting for end timestamps | ||
2265 | // that will never arrive | ||
2266 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6435 times.
|
6436 | if (dec->queue_end_ts) |
2267 | 1 | av_thread_message_queue_set_err_recv(dec->queue_end_ts, AVERROR_EOF); | |
2268 | |||
2269 |
2/2✓ Branch 0 taken 6442 times.
✓ Branch 1 taken 6436 times.
|
12878 | for (unsigned i = 0; i < dec->nb_outputs; i++) { |
2270 | 6442 | SchDecOutput *o = &dec->outputs[i]; | |
2271 | |||
2272 |
2/2✓ Branch 0 taken 6494 times.
✓ Branch 1 taken 6442 times.
|
12936 | for (unsigned j = 0; j < o->nb_dst; j++) { |
2273 | 6494 | int err = dec_send_to_dst(sch, o->dst[j], &o->dst_finished[j], NULL); | |
2274 |
2/4✓ Branch 0 taken 6494 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6494 times.
|
6494 | if (err < 0 && err != AVERROR_EOF) |
2275 | ✗ | ret = err_merge(ret, err); | |
2276 | } | ||
2277 | } | ||
2278 | |||
2279 | 6436 | return ret; | |
2280 | } | ||
2281 | |||
2282 | 421477 | int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame) | |
2283 | { | ||
2284 | SchEnc *enc; | ||
2285 | int ret, dummy; | ||
2286 | |||
2287 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 421477 times.
|
421477 | av_assert0(enc_idx < sch->nb_enc); |
2288 | 421477 | enc = &sch->enc[enc_idx]; | |
2289 | |||
2290 | 421477 | ret = tq_receive(enc->queue, &dummy, frame); | |
2291 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 421477 times.
|
421477 | av_assert0(dummy <= 0); |
2292 | |||
2293 | 421477 | return ret; | |
2294 | } | ||
2295 | |||
2296 | 415402 | static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst, | |
2297 | uint8_t *dst_finished, AVPacket *pkt) | ||
2298 | { | ||
2299 | int ret; | ||
2300 | |||
2301 |
2/2✓ Branch 0 taken 46 times.
✓ Branch 1 taken 415356 times.
|
415402 | if (*dst_finished) |
2302 | 46 | return AVERROR_EOF; | |
2303 | |||
2304 |
2/2✓ Branch 0 taken 6597 times.
✓ Branch 1 taken 408759 times.
|
415356 | if (!pkt) |
2305 | 6597 | goto finish; | |
2306 | |||
2307 | 817518 | ret = (dst.type == SCH_NODE_TYPE_MUX) ? | |
2308 |
2/2✓ Branch 0 taken 408709 times.
✓ Branch 1 taken 50 times.
|
408759 | send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) : |
2309 | 50 | tq_send(sch->dec[dst.idx].queue, 0, pkt); | |
2310 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 408757 times.
|
408759 | if (ret == AVERROR_EOF) |
2311 | 2 | goto finish; | |
2312 | |||
2313 | 408757 | return ret; | |
2314 | |||
2315 | 6599 | finish: | |
2316 |
2/2✓ Branch 0 taken 6598 times.
✓ Branch 1 taken 1 times.
|
6599 | if (dst.type == SCH_NODE_TYPE_MUX) |
2317 | 6598 | send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL); | |
2318 | else | ||
2319 | 1 | tq_send_finish(sch->dec[dst.idx].queue, 0); | |