FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/libavformat/async.c
Date: 2025-01-20 09:27:23
Exec Total Coverage
Lines: 0 243 0.0%
Functions: 0 17 0.0%
Branches: 0 84 0.0%

Line Branch Exec Source
1 /*
2 * Input async protocol.
3 * Copyright (c) 2015 Zhang Rui <bbcallen@gmail.com>
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 *
21 * Based on libavformat/cache.c by Michael Niedermayer
22 */
23
24 /**
25 * @TODO
26 * support timeout
27 * support work with concatdec, hls
28 */
29
30 #include "libavutil/avassert.h"
31 #include "libavutil/avstring.h"
32 #include "libavutil/error.h"
33 #include "libavutil/fifo.h"
34 #include "libavutil/log.h"
35 #include "libavutil/opt.h"
36 #include "libavutil/thread.h"
37 #include "url.h"
38 #include <stdint.h>
39
40 #if HAVE_UNISTD_H
41 #include <unistd.h>
42 #endif
43
44 #define BUFFER_CAPACITY (4 * 1024 * 1024)
45 #define READ_BACK_CAPACITY (4 * 1024 * 1024)
46 #define SHORT_SEEK_THRESHOLD (256 * 1024)
47
48 typedef struct RingBuffer
49 {
50 AVFifo *fifo;
51 int read_back_capacity;
52
53 int read_pos;
54 } RingBuffer;
55
56 typedef struct AsyncContext {
57 AVClass *class;
58 URLContext *inner;
59
60 int seek_request;
61 int64_t seek_pos;
62 int seek_whence;
63 int seek_completed;
64 int64_t seek_ret;
65
66 int inner_io_error;
67 int io_error;
68 int io_eof_reached;
69
70 int64_t logical_pos;
71 int64_t logical_size;
72 RingBuffer ring;
73
74 pthread_cond_t cond_wakeup_main;
75 pthread_cond_t cond_wakeup_background;
76 pthread_mutex_t mutex;
77 pthread_t async_buffer_thread;
78
79 int abort_request;
80 AVIOInterruptCB interrupt_callback;
81 } AsyncContext;
82
83 static int ring_init(RingBuffer *ring, unsigned int capacity, int read_back_capacity)
84 {
85 memset(ring, 0, sizeof(RingBuffer));
86 ring->fifo = av_fifo_alloc2(capacity + read_back_capacity, 1, 0);
87 if (!ring->fifo)
88 return AVERROR(ENOMEM);
89
90 ring->read_back_capacity = read_back_capacity;
91 return 0;
92 }
93
94 static void ring_destroy(RingBuffer *ring)
95 {
96 av_fifo_freep2(&ring->fifo);
97 }
98
99 static void ring_reset(RingBuffer *ring)
100 {
101 av_fifo_reset2(ring->fifo);
102 ring->read_pos = 0;
103 }
104
105 static int ring_size(RingBuffer *ring)
106 {
107 return av_fifo_can_read(ring->fifo) - ring->read_pos;
108 }
109
110 static int ring_space(RingBuffer *ring)
111 {
112 return av_fifo_can_write(ring->fifo);
113 }
114
115 static int ring_read(RingBuffer *ring, void *dest, int buf_size)
116 {
117 int ret = 0;
118
119 av_assert2(buf_size <= ring_size(ring));
120 if (dest)
121 ret = av_fifo_peek(ring->fifo, dest, buf_size, ring->read_pos);
122 ring->read_pos += buf_size;
123
124 if (ring->read_pos > ring->read_back_capacity) {
125 av_fifo_drain2(ring->fifo, ring->read_pos - ring->read_back_capacity);
126 ring->read_pos = ring->read_back_capacity;
127 }
128
129 return ret;
130 }
131
132 static int wrapped_url_read(void *src, void *dst, size_t *size)
133 {
134 URLContext *h = src;
135 AsyncContext *c = h->priv_data;
136 int ret;
137
138 ret = ffurl_read(c->inner, dst, *size);
139 *size = ret > 0 ? ret : 0;
140 c->inner_io_error = ret < 0 ? ret : 0;
141
142 return c->inner_io_error;
143 }
144
145 static int ring_write(RingBuffer *ring, URLContext *h, size_t size)
146 {
147 int ret;
148
149 av_assert2(size <= ring_space(ring));
150 ret = av_fifo_write_from_cb(ring->fifo, wrapped_url_read, h, &size);
151 if (ret < 0)
152 return ret;
153
154 return size;
155 }
156
157 static int ring_size_of_read_back(RingBuffer *ring)
158 {
159 return ring->read_pos;
160 }
161
162 static int ring_drain(RingBuffer *ring, int offset)
163 {
164 av_assert2(offset >= -ring_size_of_read_back(ring));
165 av_assert2(offset <= ring_size(ring));
166 ring->read_pos += offset;
167 return 0;
168 }
169
170 static int async_check_interrupt(void *arg)
171 {
172 URLContext *h = arg;
173 AsyncContext *c = h->priv_data;
174
175 if (c->abort_request)
176 return 1;
177
178 if (ff_check_interrupt(&c->interrupt_callback))
179 c->abort_request = 1;
180
181 return c->abort_request;
182 }
183
184 static void *async_buffer_task(void *arg)
185 {
186 URLContext *h = arg;
187 AsyncContext *c = h->priv_data;
188 RingBuffer *ring = &c->ring;
189 int ret = 0;
190 int64_t seek_ret;
191
192 ff_thread_setname("async");
193
194 while (1) {
195 int fifo_space, to_copy;
196
197 pthread_mutex_lock(&c->mutex);
198 if (async_check_interrupt(h)) {
199 c->io_eof_reached = 1;
200 c->io_error = AVERROR_EXIT;
201 pthread_cond_signal(&c->cond_wakeup_main);
202 pthread_mutex_unlock(&c->mutex);
203 break;
204 }
205
206 if (c->seek_request) {
207 seek_ret = ffurl_seek(c->inner, c->seek_pos, c->seek_whence);
208 if (seek_ret >= 0) {
209 c->io_eof_reached = 0;
210 c->io_error = 0;
211 ring_reset(ring);
212 }
213
214 c->seek_completed = 1;
215 c->seek_ret = seek_ret;
216 c->seek_request = 0;
217
218
219 pthread_cond_signal(&c->cond_wakeup_main);
220 pthread_mutex_unlock(&c->mutex);
221 continue;
222 }
223
224 fifo_space = ring_space(ring);
225 if (c->io_eof_reached || fifo_space <= 0) {
226 pthread_cond_signal(&c->cond_wakeup_main);
227 pthread_cond_wait(&c->cond_wakeup_background, &c->mutex);
228 pthread_mutex_unlock(&c->mutex);
229 continue;
230 }
231 pthread_mutex_unlock(&c->mutex);
232
233 to_copy = FFMIN(4096, fifo_space);
234 ret = ring_write(ring, h, to_copy);
235
236 pthread_mutex_lock(&c->mutex);
237 if (ret <= 0) {
238 c->io_eof_reached = 1;
239 if (c->inner_io_error < 0)
240 c->io_error = c->inner_io_error;
241 }
242
243 pthread_cond_signal(&c->cond_wakeup_main);
244 pthread_mutex_unlock(&c->mutex);
245 }
246
247 return NULL;
248 }
249
250 static int async_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
251 {
252 AsyncContext *c = h->priv_data;
253 int ret;
254 AVIOInterruptCB interrupt_callback = {.callback = async_check_interrupt, .opaque = h};
255
256 av_strstart(arg, "async:", &arg);
257
258 ret = ring_init(&c->ring, BUFFER_CAPACITY, READ_BACK_CAPACITY);
259 if (ret < 0)
260 goto fifo_fail;
261
262 /* wrap interrupt callback */
263 c->interrupt_callback = h->interrupt_callback;
264 ret = ffurl_open_whitelist(&c->inner, arg, flags, &interrupt_callback, options, h->protocol_whitelist, h->protocol_blacklist, h);
265 if (ret != 0) {
266 av_log(h, AV_LOG_ERROR, "ffurl_open failed : %s, %s\n", av_err2str(ret), arg);
267 goto url_fail;
268 }
269
270 c->logical_size = ffurl_size(c->inner);
271 h->is_streamed = c->inner->is_streamed;
272
273 ret = pthread_mutex_init(&c->mutex, NULL);
274 if (ret != 0) {
275 ret = AVERROR(ret);
276 av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", av_err2str(ret));
277 goto mutex_fail;
278 }
279
280 ret = pthread_cond_init(&c->cond_wakeup_main, NULL);
281 if (ret != 0) {
282 ret = AVERROR(ret);
283 av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret));
284 goto cond_wakeup_main_fail;
285 }
286
287 ret = pthread_cond_init(&c->cond_wakeup_background, NULL);
288 if (ret != 0) {
289 ret = AVERROR(ret);
290 av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret));
291 goto cond_wakeup_background_fail;
292 }
293
294 ret = pthread_create(&c->async_buffer_thread, NULL, async_buffer_task, h);
295 if (ret) {
296 ret = AVERROR(ret);
297 av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", av_err2str(ret));
298 goto thread_fail;
299 }
300
301 return 0;
302
303 thread_fail:
304 pthread_cond_destroy(&c->cond_wakeup_background);
305 cond_wakeup_background_fail:
306 pthread_cond_destroy(&c->cond_wakeup_main);
307 cond_wakeup_main_fail:
308 pthread_mutex_destroy(&c->mutex);
309 mutex_fail:
310 ffurl_closep(&c->inner);
311 url_fail:
312 ring_destroy(&c->ring);
313 fifo_fail:
314 return ret;
315 }
316
317 static int async_close(URLContext *h)
318 {
319 AsyncContext *c = h->priv_data;
320 int ret;
321
322 pthread_mutex_lock(&c->mutex);
323 c->abort_request = 1;
324 pthread_cond_signal(&c->cond_wakeup_background);
325 pthread_mutex_unlock(&c->mutex);
326
327 ret = pthread_join(c->async_buffer_thread, NULL);
328 if (ret != 0)
329 av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", av_err2str(ret));
330
331 pthread_cond_destroy(&c->cond_wakeup_background);
332 pthread_cond_destroy(&c->cond_wakeup_main);
333 pthread_mutex_destroy(&c->mutex);
334 ffurl_closep(&c->inner);
335 ring_destroy(&c->ring);
336
337 return 0;
338 }
339
340 static int async_read_internal(URLContext *h, void *dest, int size)
341 {
342 AsyncContext *c = h->priv_data;
343 RingBuffer *ring = &c->ring;
344 int read_complete = !dest;
345 int to_read = size;
346 int ret = 0;
347
348 pthread_mutex_lock(&c->mutex);
349
350 while (to_read > 0) {
351 int fifo_size, to_copy;
352 if (async_check_interrupt(h)) {
353 ret = AVERROR_EXIT;
354 break;
355 }
356 fifo_size = ring_size(ring);
357 to_copy = FFMIN(to_read, fifo_size);
358 if (to_copy > 0) {
359 ring_read(ring, dest, to_copy);
360 if (dest)
361 dest = (uint8_t *)dest + to_copy;
362 c->logical_pos += to_copy;
363 to_read -= to_copy;
364 ret = size - to_read;
365
366 if (to_read <= 0 || !read_complete)
367 break;
368 } else if (c->io_eof_reached) {
369 if (ret <= 0) {
370 if (c->io_error)
371 ret = c->io_error;
372 else
373 ret = AVERROR_EOF;
374 }
375 break;
376 }
377 pthread_cond_signal(&c->cond_wakeup_background);
378 pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
379 }
380
381 pthread_cond_signal(&c->cond_wakeup_background);
382 pthread_mutex_unlock(&c->mutex);
383
384 return ret;
385 }
386
387 static int async_read(URLContext *h, unsigned char *buf, int size)
388 {
389 return async_read_internal(h, buf, size);
390 }
391
392 static int64_t async_seek(URLContext *h, int64_t pos, int whence)
393 {
394 AsyncContext *c = h->priv_data;
395 RingBuffer *ring = &c->ring;
396 int64_t ret;
397 int64_t new_logical_pos;
398 int fifo_size;
399 int fifo_size_of_read_back;
400
401 if (whence == AVSEEK_SIZE) {
402 av_log(h, AV_LOG_TRACE, "async_seek: AVSEEK_SIZE: %"PRId64"\n", (int64_t)c->logical_size);
403 return c->logical_size;
404 } else if (whence == SEEK_CUR) {
405 av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
406 new_logical_pos = pos + c->logical_pos;
407 } else if (whence == SEEK_SET){
408 av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
409 new_logical_pos = pos;
410 } else {
411 return AVERROR(EINVAL);
412 }
413 if (new_logical_pos < 0)
414 return AVERROR(EINVAL);
415
416 fifo_size = ring_size(ring);
417 fifo_size_of_read_back = ring_size_of_read_back(ring);
418 if (new_logical_pos == c->logical_pos) {
419 /* current position */
420 return c->logical_pos;
421 } else if ((new_logical_pos >= (c->logical_pos - fifo_size_of_read_back)) &&
422 (new_logical_pos < (c->logical_pos + fifo_size + SHORT_SEEK_THRESHOLD))) {
423 int pos_delta = (int)(new_logical_pos - c->logical_pos);
424 /* fast seek */
425 av_log(h, AV_LOG_TRACE, "async_seek: fask_seek %"PRId64" from %d dist:%d/%d\n",
426 new_logical_pos, (int)c->logical_pos,
427 (int)(new_logical_pos - c->logical_pos), fifo_size);
428
429 if (pos_delta > 0) {
430 // fast seek forwards
431 async_read_internal(h, NULL, pos_delta);
432 } else {
433 // fast seek backwards
434 ring_drain(ring, pos_delta);
435 c->logical_pos = new_logical_pos;
436 }
437
438 return c->logical_pos;
439 } else if (c->logical_size <= 0) {
440 /* can not seek */
441 return AVERROR(EINVAL);
442 } else if (new_logical_pos > c->logical_size) {
443 /* beyond end */
444 return AVERROR(EINVAL);
445 }
446
447 pthread_mutex_lock(&c->mutex);
448
449 c->seek_request = 1;
450 c->seek_pos = new_logical_pos;
451 c->seek_whence = SEEK_SET;
452 c->seek_completed = 0;
453 c->seek_ret = 0;
454
455 while (1) {
456 if (async_check_interrupt(h)) {
457 ret = AVERROR_EXIT;
458 break;
459 }
460 if (c->seek_completed) {
461 if (c->seek_ret >= 0)
462 c->logical_pos = c->seek_ret;
463 ret = c->seek_ret;
464 break;
465 }
466 pthread_cond_signal(&c->cond_wakeup_background);
467 pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
468 }
469
470 pthread_mutex_unlock(&c->mutex);
471
472 return ret;
473 }
474
475 #define OFFSET(x) offsetof(AsyncContext, x)
476 #define D AV_OPT_FLAG_DECODING_PARAM
477
478 static const AVOption options[] = {
479 {NULL},
480 };
481
482 #undef D
483 #undef OFFSET
484
485 static const AVClass async_context_class = {
486 .class_name = "Async",
487 .item_name = av_default_item_name,
488 .option = options,
489 .version = LIBAVUTIL_VERSION_INT,
490 };
491
492 const URLProtocol ff_async_protocol = {
493 .name = "async",
494 .url_open2 = async_open,
495 .url_read = async_read,
496 .url_seek = async_seek,
497 .url_close = async_close,
498 .priv_data_size = sizeof(AsyncContext),
499 .priv_data_class = &async_context_class,
500 };
501
502 #if 0
503
504 #define TEST_SEEK_POS (1536)
505 #define TEST_STREAM_SIZE (2048)
506
507 typedef struct TestContext {
508 AVClass *class;
509 int64_t logical_pos;
510 int64_t logical_size;
511
512 /* options */
513 int opt_read_error;
514 } TestContext;
515
516 static int async_test_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
517 {
518 TestContext *c = h->priv_data;
519 c->logical_pos = 0;
520 c->logical_size = TEST_STREAM_SIZE;
521 return 0;
522 }
523
524 static int async_test_close(URLContext *h)
525 {
526 return 0;
527 }
528
529 static int async_test_read(URLContext *h, unsigned char *buf, int size)
530 {
531 TestContext *c = h->priv_data;
532 int i;
533 int read_len = 0;
534
535 if (c->opt_read_error)
536 return c->opt_read_error;
537
538 if (c->logical_pos >= c->logical_size)
539 return AVERROR_EOF;
540
541 for (i = 0; i < size; ++i) {
542 buf[i] = c->logical_pos & 0xFF;
543
544 c->logical_pos++;
545 read_len++;
546
547 if (c->logical_pos >= c->logical_size)
548 break;
549 }
550
551 return read_len;
552 }
553
554 static int64_t async_test_seek(URLContext *h, int64_t pos, int whence)
555 {
556 TestContext *c = h->priv_data;
557 int64_t new_logical_pos;
558
559 if (whence == AVSEEK_SIZE) {
560 return c->logical_size;
561 } else if (whence == SEEK_CUR) {
562 new_logical_pos = pos + c->logical_pos;
563 } else if (whence == SEEK_SET){
564 new_logical_pos = pos;
565 } else {
566 return AVERROR(EINVAL);
567 }
568 if (new_logical_pos < 0)
569 return AVERROR(EINVAL);
570
571 c->logical_pos = new_logical_pos;
572 return new_logical_pos;
573 }
574
575 #define OFFSET(x) offsetof(TestContext, x)
576 #define D AV_OPT_FLAG_DECODING_PARAM
577
578 static const AVOption async_test_options[] = {
579 { "async-test-read-error", "cause read fail",
580 OFFSET(opt_read_error), AV_OPT_TYPE_INT, { .i64 = 0 }, INT_MIN, INT_MAX, .flags = D },
581 {NULL},
582 };
583
584 #undef D
585 #undef OFFSET
586
587 static const AVClass async_test_context_class = {
588 .class_name = "Async-Test",
589 .item_name = av_default_item_name,
590 .option = async_test_options,
591 .version = LIBAVUTIL_VERSION_INT,
592 };
593
594 const URLProtocol ff_async_test_protocol = {
595 .name = "async-test",
596 .url_open2 = async_test_open,
597 .url_read = async_test_read,
598 .url_seek = async_test_seek,
599 .url_close = async_test_close,
600 .priv_data_size = sizeof(TestContext),
601 .priv_data_class = &async_test_context_class,
602 };
603
604 int main(void)
605 {
606 URLContext *h = NULL;
607 int i;
608 int ret;
609 int64_t size;
610 int64_t pos;
611 int64_t read_len;
612 unsigned char buf[4096];
613 AVDictionary *opts = NULL;
614
615 ffurl_register_protocol(&ff_async_protocol);
616 ffurl_register_protocol(&ff_async_test_protocol);
617
618 /*
619 * test normal read
620 */
621 ret = ffurl_open_whitelist(&h, "async:async-test:", AVIO_FLAG_READ,
622 NULL, NULL, NULL, NULL, NULL);
623 printf("open: %d\n", ret);
624
625 size = ffurl_size(h);
626 printf("size: %"PRId64"\n", size);
627
628 pos = ffurl_seek(h, 0, SEEK_CUR);
629 read_len = 0;
630 while (1) {
631 ret = ffurl_read(h, buf, sizeof(buf));
632 if (ret == AVERROR_EOF) {
633 printf("read-error: AVERROR_EOF at %"PRId64"\n", ffurl_seek(h, 0, SEEK_CUR));
634 break;
635 }
636 else if (ret == 0)
637 break;
638 else if (ret < 0) {
639 printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
640 goto fail;
641 } else {
642 for (i = 0; i < ret; ++i) {
643 if (buf[i] != (pos & 0xFF)) {
644 printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
645 (int)buf[i], (int)(pos & 0xFF), pos);
646 break;
647 }
648 pos++;
649 }
650 }
651
652 read_len += ret;
653 }
654 printf("read: %"PRId64"\n", read_len);
655
656 /*
657 * test normal seek
658 */
659 ret = ffurl_read(h, buf, 1);
660 printf("read: %d\n", ret);
661
662 pos = ffurl_seek(h, TEST_SEEK_POS, SEEK_SET);
663 printf("seek: %"PRId64"\n", pos);
664
665 read_len = 0;
666 while (1) {
667 ret = ffurl_read(h, buf, sizeof(buf));
668 if (ret == AVERROR_EOF)
669 break;
670 else if (ret == 0)
671 break;
672 else if (ret < 0) {
673 printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
674 goto fail;
675 } else {
676 for (i = 0; i < ret; ++i) {
677 if (buf[i] != (pos & 0xFF)) {
678 printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
679 (int)buf[i], (int)(pos & 0xFF), pos);
680 break;
681 }
682 pos++;
683 }
684 }
685
686 read_len += ret;
687 }
688 printf("read: %"PRId64"\n", read_len);
689
690 ret = ffurl_read(h, buf, 1);
691 printf("read: %d\n", ret);
692
693 /*
694 * test read error
695 */
696 ffurl_close(h);
697 av_dict_set_int(&opts, "async-test-read-error", -10000, 0);
698 ret = ffurl_open_whitelist(&h, "async:async-test:", AVIO_FLAG_READ,
699 NULL, &opts, NULL, NULL, NULL);
700 printf("open: %d\n", ret);
701
702 ret = ffurl_read(h, buf, 1);
703 printf("read: %d\n", ret);
704
705 fail:
706 av_dict_free(&opts);
707 ffurl_close(h);
708 return 0;
709 }
710
711 #endif
712