GCC Code Coverage Report
Directory: ../../../ffmpeg/ Exec Total Coverage
File: src/libavcodec/pthread_frame.c Lines: 381 548 69.5 %
Date: 2021-04-19 06:27:07 Branches: 201 351 57.3 %

Line Branch Exec Source
1
/*
2
 * This file is part of FFmpeg.
3
 *
4
 * FFmpeg is free software; you can redistribute it and/or
5
 * modify it under the terms of the GNU Lesser General Public
6
 * License as published by the Free Software Foundation; either
7
 * version 2.1 of the License, or (at your option) any later version.
8
 *
9
 * FFmpeg is distributed in the hope that it will be useful,
10
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12
 * Lesser General Public License for more details.
13
 *
14
 * You should have received a copy of the GNU Lesser General Public
15
 * License along with FFmpeg; if not, write to the Free Software
16
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
 */
18
19
/**
20
 * @file
21
 * Frame multithreading support functions
22
 * @see doc/multithreading.txt
23
 */
24
25
#include "config.h"
26
27
#include <stdatomic.h>
28
#include <stdint.h>
29
30
#include "avcodec.h"
31
#include "hwconfig.h"
32
#include "internal.h"
33
#include "pthread_internal.h"
34
#include "thread.h"
35
#include "version.h"
36
37
#include "libavutil/avassert.h"
38
#include "libavutil/buffer.h"
39
#include "libavutil/common.h"
40
#include "libavutil/cpu.h"
41
#include "libavutil/frame.h"
42
#include "libavutil/internal.h"
43
#include "libavutil/log.h"
44
#include "libavutil/mem.h"
45
#include "libavutil/opt.h"
46
#include "libavutil/thread.h"
47
48
enum {
49
    ///< Set when the thread is awaiting a packet.
50
    STATE_INPUT_READY,
51
    ///< Set before the codec has called ff_thread_finish_setup().
52
    STATE_SETTING_UP,
53
    /**
54
     * Set when the codec calls get_buffer().
55
     * State is returned to STATE_SETTING_UP afterwards.
56
     */
57
    STATE_GET_BUFFER,
58
     /**
59
      * Set when the codec calls get_format().
60
      * State is returned to STATE_SETTING_UP afterwards.
61
      */
62
    STATE_GET_FORMAT,
63
    ///< Set after the codec has called ff_thread_finish_setup().
64
    STATE_SETUP_FINISHED,
65
};
66
67
enum {
68
    UNINITIALIZED,  ///< Thread has not been created, AVCodec->close mustn't be called
69
    NEEDS_CLOSE,    ///< AVCodec->close needs to be called
70
    INITIALIZED,    ///< Thread has been properly set up
71
};
72
73
/**
74
 * Context used by codec threads and stored in their AVCodecInternal thread_ctx.
75
 */
76
typedef struct PerThreadContext {
77
    struct FrameThreadContext *parent;
78
79
    pthread_t      thread;
80
    int            thread_init;
81
    unsigned       pthread_init_cnt;///< Number of successfully initialized mutexes/conditions
82
    pthread_cond_t input_cond;      ///< Used to wait for a new packet from the main thread.
83
    pthread_cond_t progress_cond;   ///< Used by child threads to wait for progress to change.
84
    pthread_cond_t output_cond;     ///< Used by the main thread to wait for frames to finish.
85
86
    pthread_mutex_t mutex;          ///< Mutex used to protect the contents of the PerThreadContext.
87
    pthread_mutex_t progress_mutex; ///< Mutex used to protect frame progress values and progress_cond.
88
89
    AVCodecContext *avctx;          ///< Context used to decode packets passed to this thread.
90
91
    AVPacket       *avpkt;          ///< Input packet (for decoding) or output (for encoding).
92
93
    AVFrame *frame;                 ///< Output frame (for decoding) or input (for encoding).
94
    int     got_frame;              ///< The output of got_picture_ptr from the last avcodec_decode_video() call.
95
    int     result;                 ///< The result of the last codec decode/encode() call.
96
97
    atomic_int state;
98
99
#if FF_API_THREAD_SAFE_CALLBACKS
100
    /**
101
     * Array of frames passed to ff_thread_release_buffer().
102
     * Frames are released after all threads referencing them are finished.
103
     */
104
    AVFrame **released_buffers;
105
    int   num_released_buffers;
106
    int       released_buffers_allocated;
107
108
    AVFrame *requested_frame;       ///< AVFrame the codec passed to get_buffer()
109
    int      requested_flags;       ///< flags passed to get_buffer() for requested_frame
110
111
    const enum AVPixelFormat *available_formats; ///< Format array for get_format()
112
    enum AVPixelFormat result_format;            ///< get_format() result
113
#endif
114
115
    int die;                        ///< Set when the thread should exit.
116
117
    int hwaccel_serializing;
118
    int async_serializing;
119
120
    atomic_int debug_threads;       ///< Set if the FF_DEBUG_THREADS option is set.
121
} PerThreadContext;
122
123
/**
124
 * Context stored in the client AVCodecInternal thread_ctx.
125
 */
126
typedef struct FrameThreadContext {
127
    PerThreadContext *threads;     ///< The contexts for each thread.
128
    PerThreadContext *prev_thread; ///< The last thread submit_packet() was called on.
129
130
    unsigned    pthread_init_cnt;  ///< Number of successfully initialized mutexes/conditions
131
    pthread_mutex_t buffer_mutex;  ///< Mutex used to protect get/release_buffer().
132
    /**
133
     * This lock is used for ensuring threads run in serial when hwaccel
134
     * is used.
135
     */
136
    pthread_mutex_t hwaccel_mutex;
137
    pthread_mutex_t async_mutex;
138
    pthread_cond_t async_cond;
139
    int async_lock;
140
141
    int next_decoding;             ///< The next context to submit a packet to.
142
    int next_finished;             ///< The next context to return output from.
143
144
    int delaying;                  /**<
145
                                    * Set for the first N packets, where N is the number of threads.
146
                                    * While it is set, ff_thread_en/decode_frame won't return any results.
147
                                    */
148
} FrameThreadContext;
149
150
#if FF_API_THREAD_SAFE_CALLBACKS
151
#define THREAD_SAFE_CALLBACKS(avctx) \
152
((avctx)->thread_safe_callbacks || (avctx)->get_buffer2 == avcodec_default_get_buffer2)
153
#endif
154
155
594
static void async_lock(FrameThreadContext *fctx)
156
{
157
594
    pthread_mutex_lock(&fctx->async_mutex);
158
594
    while (fctx->async_lock)
159
        pthread_cond_wait(&fctx->async_cond, &fctx->async_mutex);
160
594
    fctx->async_lock = 1;
161
594
    pthread_mutex_unlock(&fctx->async_mutex);
162
594
}
163
164
594
static void async_unlock(FrameThreadContext *fctx)
165
{
166
594
    pthread_mutex_lock(&fctx->async_mutex);
167
594
    av_assert0(fctx->async_lock);
168
594
    fctx->async_lock = 0;
169
594
    pthread_cond_broadcast(&fctx->async_cond);
170
594
    pthread_mutex_unlock(&fctx->async_mutex);
171
594
}
172
173
/**
174
 * Codec worker thread.
175
 *
176
 * Automatically calls ff_thread_finish_setup() if the codec does
177
 * not provide an update_thread_context method, or if the codec returns
178
 * before calling it.
179
 */
180
94
static attribute_align_arg void *frame_worker_thread(void *arg)
181
{
182
94
    PerThreadContext *p = arg;
183
94
    AVCodecContext *avctx = p->avctx;
184
94
    const AVCodec *codec = avctx->codec;
185
186
94
    pthread_mutex_lock(&p->mutex);
187
    while (1) {
188

1238
        while (atomic_load(&p->state) == STATE_INPUT_READY && !p->die)
189
611
            pthread_cond_wait(&p->input_cond, &p->mutex);
190
191
627
        if (p->die) break;
192
193
FF_DISABLE_DEPRECATION_WARNINGS
194
533
        if (!codec->update_thread_context
195
#if FF_API_THREAD_SAFE_CALLBACKS
196

453
            && THREAD_SAFE_CALLBACKS(avctx)
197
#endif
198
            )
199
453
            ff_thread_finish_setup(avctx);
200
FF_ENABLE_DEPRECATION_WARNINGS
201
202
        /* If a decoder supports hwaccel, then it must call ff_get_format().
203
         * Since that call must happen before ff_thread_finish_setup(), the
204
         * decoder is required to implement update_thread_context() and call
205
         * ff_thread_finish_setup() manually. Therefore the above
206
         * ff_thread_finish_setup() call did not happen and hwaccel_serializing
207
         * cannot be true here. */
208
533
        av_assert0(!p->hwaccel_serializing);
209
210
        /* if the previous thread uses hwaccel then we take the lock to ensure
211
         * the threads don't run concurrently */
212
533
        if (avctx->hwaccel) {
213
            pthread_mutex_lock(&p->parent->hwaccel_mutex);
214
            p->hwaccel_serializing = 1;
215
        }
216
217
533
        av_frame_unref(p->frame);
218
533
        p->got_frame = 0;
219
533
        p->result = codec->decode(avctx, p->frame, &p->got_frame, p->avpkt);
220
221

533
        if ((p->result < 0 || !p->got_frame) && p->frame->buf[0]) {
222
            if (avctx->codec->caps_internal & FF_CODEC_CAP_ALLOCATE_PROGRESS)
223
                av_log(avctx, AV_LOG_ERROR, "A frame threaded decoder did not "
224
                       "free the frame on failure. This is a bug, please report it.\n");
225
            av_frame_unref(p->frame);
226
        }
227
228
533
        if (atomic_load(&p->state) == STATE_SETTING_UP)
229
12
            ff_thread_finish_setup(avctx);
230
231
533
        if (p->hwaccel_serializing) {
232
            p->hwaccel_serializing = 0;
233
            pthread_mutex_unlock(&p->parent->hwaccel_mutex);
234
        }
235
236
533
        if (p->async_serializing) {
237
            p->async_serializing = 0;
238
239
            async_unlock(p->parent);
240
        }
241
242
533
        pthread_mutex_lock(&p->progress_mutex);
243
244
533
        atomic_store(&p->state, STATE_INPUT_READY);
245
246
533
        pthread_cond_broadcast(&p->progress_cond);
247
533
        pthread_cond_signal(&p->output_cond);
248
533
        pthread_mutex_unlock(&p->progress_mutex);
249
    }
250
94
    pthread_mutex_unlock(&p->mutex);
251
252
94
    return NULL;
253
}
254
255
/**
256
 * Update the next thread's AVCodecContext with values from the reference thread's context.
257
 *
258
 * @param dst The destination context.
259
 * @param src The source context.
260
 * @param for_user 0 if the destination is a codec thread, 1 if the destination is the user's thread
261
 * @return 0 on success, negative error code on failure
262
 */
263
1074
static int update_context_from_thread(AVCodecContext *dst, AVCodecContext *src, int for_user)
264
{
265
1074
    int err = 0;
266
267

1074
    if (dst != src && (for_user || src->codec->update_thread_context)) {
268
621
        dst->time_base = src->time_base;
269
621
        dst->framerate = src->framerate;
270
621
        dst->width     = src->width;
271
621
        dst->height    = src->height;
272
621
        dst->pix_fmt   = src->pix_fmt;
273
621
        dst->sw_pix_fmt = src->sw_pix_fmt;
274
275
621
        dst->coded_width  = src->coded_width;
276
621
        dst->coded_height = src->coded_height;
277
278
621
        dst->has_b_frames = src->has_b_frames;
279
621
        dst->idct_algo    = src->idct_algo;
280
281
621
        dst->bits_per_coded_sample = src->bits_per_coded_sample;
282
621
        dst->sample_aspect_ratio   = src->sample_aspect_ratio;
283
284
621
        dst->profile = src->profile;
285
621
        dst->level   = src->level;
286
287
621
        dst->bits_per_raw_sample = src->bits_per_raw_sample;
288
621
        dst->ticks_per_frame     = src->ticks_per_frame;
289
621
        dst->color_primaries     = src->color_primaries;
290
291
621
        dst->color_trc   = src->color_trc;
292
621
        dst->colorspace  = src->colorspace;
293
621
        dst->color_range = src->color_range;
294
621
        dst->chroma_sample_location = src->chroma_sample_location;
295
296
621
        dst->hwaccel = src->hwaccel;
297
621
        dst->hwaccel_context = src->hwaccel_context;
298
299
621
        dst->channels       = src->channels;
300
621
        dst->sample_rate    = src->sample_rate;
301
621
        dst->sample_fmt     = src->sample_fmt;
302
621
        dst->channel_layout = src->channel_layout;
303
621
        dst->internal->hwaccel_priv_data = src->internal->hwaccel_priv_data;
304
305
621
        if (!!dst->hw_frames_ctx != !!src->hw_frames_ctx ||
306

621
            (dst->hw_frames_ctx && dst->hw_frames_ctx->data != src->hw_frames_ctx->data)) {
307
            av_buffer_unref(&dst->hw_frames_ctx);
308
309
            if (src->hw_frames_ctx) {
310
                dst->hw_frames_ctx = av_buffer_ref(src->hw_frames_ctx);
311
                if (!dst->hw_frames_ctx)
312
                    return AVERROR(ENOMEM);
313
            }
314
        }
315
316
621
        dst->hwaccel_flags = src->hwaccel_flags;
317
318
621
        err = av_buffer_replace(&dst->internal->pool, src->internal->pool);
319
621
        if (err < 0)
320
            return err;
321
    }
322
323
1074
    if (for_user) {
324
#if FF_API_CODED_FRAME
325
FF_DISABLE_DEPRECATION_WARNINGS
326
546
        dst->coded_frame = src->coded_frame;
327
FF_ENABLE_DEPRECATION_WARNINGS
328
#endif
329
546
        if (dst->codec->update_thread_context_for_user)
330
36
            err = dst->codec->update_thread_context_for_user(dst, src);
331
    } else {
332
528
        if (dst->codec->update_thread_context)
333
75
            err = dst->codec->update_thread_context(dst, src);
334
    }
335
336
1074
    return err;
337
}
338
339
/**
340
 * Update the next thread's AVCodecContext with values set by the user.
341
 *
342
 * @param dst The destination context.
343
 * @param src The source context.
344
 * @return 0 on success, negative error code on failure
345
 */
346
533
static int update_context_from_user(AVCodecContext *dst, AVCodecContext *src)
347
{
348
533
    dst->flags          = src->flags;
349
350
533
    dst->draw_horiz_band= src->draw_horiz_band;
351
533
    dst->get_buffer2    = src->get_buffer2;
352
353
533
    dst->opaque   = src->opaque;
354
533
    dst->debug    = src->debug;
355
356
533
    dst->slice_flags = src->slice_flags;
357
533
    dst->flags2      = src->flags2;
358
533
    dst->export_side_data = src->export_side_data;
359
360
533
    dst->skip_loop_filter = src->skip_loop_filter;
361
533
    dst->skip_idct        = src->skip_idct;
362
533
    dst->skip_frame       = src->skip_frame;
363
364
533
    dst->frame_number     = src->frame_number;
365
533
    dst->reordered_opaque = src->reordered_opaque;
366
#if FF_API_THREAD_SAFE_CALLBACKS
367
FF_DISABLE_DEPRECATION_WARNINGS
368
533
    dst->thread_safe_callbacks = src->thread_safe_callbacks;
369
FF_ENABLE_DEPRECATION_WARNINGS
370
#endif
371
372

533
    if (src->slice_count && src->slice_offset) {
373
        if (dst->slice_count < src->slice_count) {
374
            int err = av_reallocp_array(&dst->slice_offset, src->slice_count,
375
                                        sizeof(*dst->slice_offset));
376
            if (err < 0)
377
                return err;
378
        }
379
        memcpy(dst->slice_offset, src->slice_offset,
380
               src->slice_count * sizeof(*dst->slice_offset));
381
    }
382
533
    dst->slice_count = src->slice_count;
383
533
    return 0;
384
}
385
386
#if FF_API_THREAD_SAFE_CALLBACKS
387
/// Releases the buffers that this decoding thread was the last user of.
388
627
static void release_delayed_buffers(PerThreadContext *p)
389
{
390
627
    FrameThreadContext *fctx = p->parent;
391
392
627
    while (p->num_released_buffers > 0) {
393
        AVFrame *f;
394
395
        pthread_mutex_lock(&fctx->buffer_mutex);
396
397
        // fix extended data in case the caller screwed it up
398
        av_assert0(p->avctx->codec_type == AVMEDIA_TYPE_VIDEO ||
399
                   p->avctx->codec_type == AVMEDIA_TYPE_AUDIO);
400
        f = p->released_buffers[--p->num_released_buffers];
401
        f->extended_data = f->data;
402
        av_frame_unref(f);
403
404
        pthread_mutex_unlock(&fctx->buffer_mutex);
405
    }
406
627
}
407
#endif
408
409
582
static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
410
                         AVPacket *avpkt)
411
{
412
582
    FrameThreadContext *fctx = p->parent;
413
582
    PerThreadContext *prev_thread = fctx->prev_thread;
414
582
    const AVCodec *codec = p->avctx->codec;
415
    int ret;
416
417

582
    if (!avpkt->size && !(codec->capabilities & AV_CODEC_CAP_DELAY))
418
49
        return 0;
419
420
533
    pthread_mutex_lock(&p->mutex);
421
422
533
    ret = update_context_from_user(p->avctx, user_avctx);
423
533
    if (ret) {
424
        pthread_mutex_unlock(&p->mutex);
425
        return ret;
426
    }
427
533
    atomic_store_explicit(&p->debug_threads,
428
                          (p->avctx->debug & FF_DEBUG_THREADS) != 0,
429
                          memory_order_relaxed);
430
431
#if FF_API_THREAD_SAFE_CALLBACKS
432
533
    release_delayed_buffers(p);
433
#endif
434
435
533
    if (prev_thread) {
436
        int err;
437
521
        if (atomic_load(&prev_thread->state) == STATE_SETTING_UP) {
438
97
            pthread_mutex_lock(&prev_thread->progress_mutex);
439
194
            while (atomic_load(&prev_thread->state) == STATE_SETTING_UP)
440
97
                pthread_cond_wait(&prev_thread->progress_cond, &prev_thread->progress_mutex);
441
97
            pthread_mutex_unlock(&prev_thread->progress_mutex);
442
        }
443
444
521
        err = update_context_from_thread(p->avctx, prev_thread->avctx, 0);
445
521
        if (err) {
446
            pthread_mutex_unlock(&p->mutex);
447
            return err;
448
        }
449
    }
450
451
533
    av_packet_unref(p->avpkt);
452
533
    ret = av_packet_ref(p->avpkt, avpkt);
453
533
    if (ret < 0) {
454
        pthread_mutex_unlock(&p->mutex);
455
        av_log(p->avctx, AV_LOG_ERROR, "av_packet_ref() failed in submit_packet()\n");
456
        return ret;
457
    }
458
459
533
    atomic_store(&p->state, STATE_SETTING_UP);
460
533
    pthread_cond_signal(&p->input_cond);
461
533
    pthread_mutex_unlock(&p->mutex);
462
463
#if FF_API_THREAD_SAFE_CALLBACKS
464
FF_DISABLE_DEPRECATION_WARNINGS
465
    /*
466
     * If the client doesn't have a thread-safe get_buffer(),
467
     * then decoding threads call back to the main thread,
468
     * and it calls back to the client here.
469
     */
470
471
533
    if (!p->avctx->thread_safe_callbacks && (
472
368
         p->avctx->get_format != avcodec_default_get_format ||
473
368
         p->avctx->get_buffer2 != avcodec_default_get_buffer2)) {
474
        while (atomic_load(&p->state) != STATE_SETUP_FINISHED && atomic_load(&p->state) != STATE_INPUT_READY) {
475
            int call_done = 1;
476
            pthread_mutex_lock(&p->progress_mutex);
477
            while (atomic_load(&p->state) == STATE_SETTING_UP)
478
                pthread_cond_wait(&p->progress_cond, &p->progress_mutex);
479
480
            switch (atomic_load_explicit(&p->state, memory_order_acquire)) {
481
            case STATE_GET_BUFFER:
482
                p->result = ff_get_buffer(p->avctx, p->requested_frame, p->requested_flags);
483
                break;
484
            case STATE_GET_FORMAT:
485
                p->result_format = ff_get_format(p->avctx, p->available_formats);
486
                break;
487
            default:
488
                call_done = 0;
489
                break;
490
            }
491
            if (call_done) {
492
                atomic_store(&p->state, STATE_SETTING_UP);
493
                pthread_cond_signal(&p->progress_cond);
494
            }
495
            pthread_mutex_unlock(&p->progress_mutex);
496
        }
497
    }
498
FF_ENABLE_DEPRECATION_WARNINGS
499
#endif
500
501
533
    fctx->prev_thread = p;
502
533
    fctx->next_decoding++;
503
504
533
    return 0;
505
}
506
507
582
int ff_thread_decode_frame(AVCodecContext *avctx,
508
                           AVFrame *picture, int *got_picture_ptr,
509
                           AVPacket *avpkt)
510
{
511
582
    FrameThreadContext *fctx = avctx->internal->thread_ctx;
512
582
    int finished = fctx->next_finished;
513
    PerThreadContext *p;
514
    int err;
515
516
    /* release the async lock, permitting blocked hwaccel threads to
517
     * go forward while we are in this function */
518
582
    async_unlock(fctx);
519
520
    /*
521
     * Submit a packet to the next decoding thread.
522
     */
523
524
582
    p = &fctx->threads[fctx->next_decoding];
525
582
    err = submit_packet(p, avctx, avpkt);
526
582
    if (err)
527
        goto finish;
528
529
    /*
530
     * If we're still receiving the initial packets, don't return a frame.
531
     */
532
533
582
    if (fctx->next_decoding > (avctx->thread_count-1-(avctx->codec_id == AV_CODEC_ID_FFV1)))
534
118
        fctx->delaying = 0;
535
536
582
    if (fctx->delaying) {
537
68
        *got_picture_ptr=0;
538
68
        if (avpkt->size) {
539
48
            err = avpkt->size;
540
48
            goto finish;
541
        }
542
    }
543
544
    /*
545
     * Return the next available frame from the oldest thread.
546
     * If we're at the end of the stream, then we have to skip threads that
547
     * didn't output a frame/error, because we don't want to accidentally signal
548
     * EOF (avpkt->size == 0 && *got_picture_ptr == 0 && err >= 0).
549
     */
550
551
    do {
552
615
        p = &fctx->threads[finished++];
553
554
615
        if (atomic_load(&p->state) != STATE_INPUT_READY) {
555
130
            pthread_mutex_lock(&p->progress_mutex);
556
260
            while (atomic_load_explicit(&p->state, memory_order_relaxed) != STATE_INPUT_READY)
557
130
                pthread_cond_wait(&p->output_cond, &p->progress_mutex);
558
130
            pthread_mutex_unlock(&p->progress_mutex);
559
        }
560
561
615
        av_frame_move_ref(picture, p->frame);
562
615
        *got_picture_ptr = p->got_frame;
563
615
        picture->pkt_dts = p->avpkt->dts;
564
615
        err = p->result;
565
566
        /*
567
         * A later call with avkpt->size == 0 may loop over all threads,
568
         * including this one, searching for a frame/error to return before being
569
         * stopped by the "finished != fctx->next_finished" condition.
570
         * Make sure we don't mistakenly return the same frame/error again.
571
         */
572
615
        p->got_frame = 0;
573
615
        p->result = 0;
574
575
615
        if (finished >= avctx->thread_count) finished = 0;
576


615
    } while (!avpkt->size && !*got_picture_ptr && err >= 0 && finished != fctx->next_finished);
577
578
534
    update_context_from_thread(avctx, p->avctx, 1);
579
580
534
    if (fctx->next_decoding >= avctx->thread_count) fctx->next_decoding = 0;
581
582
534
    fctx->next_finished = finished;
583
584
    /* return the size of the consumed packet if no error occurred */
585
534
    if (err >= 0)
586
534
        err = avpkt->size;
587
finish:
588
582
    async_lock(fctx);
589
582
    return err;
590
}
591
592
412834
void ff_thread_report_progress(ThreadFrame *f, int n, int field)
593
{
594
    PerThreadContext *p;
595
412834
    atomic_int *progress = f->progress ? (atomic_int*)f->progress->data : NULL;
596
597
412834
    if (!progress ||
598
1172
        atomic_load_explicit(&progress[field], memory_order_relaxed) >= n)
599
411681
        return;
600
601
1153
    p = f->owner[field]->internal->thread_ctx;
602
603
1153
    if (atomic_load_explicit(&p->debug_threads, memory_order_relaxed))
604
        av_log(f->owner[field], AV_LOG_DEBUG,
605
               "%p finished %d field %d\n", progress, n, field);
606
607
1153
    pthread_mutex_lock(&p->progress_mutex);
608
609
1153
    atomic_store_explicit(&progress[field], n, memory_order_release);
610
611
1153
    pthread_cond_broadcast(&p->progress_cond);
612
1153
    pthread_mutex_unlock(&p->progress_mutex);
613
}
614
615
2681406
void ff_thread_await_progress(ThreadFrame *f, int n, int field)
616
{
617
    PerThreadContext *p;
618
2681406
    atomic_int *progress = f->progress ? (atomic_int*)f->progress->data : NULL;
619
620
2681406
    if (!progress ||
621
182570
        atomic_load_explicit(&progress[field], memory_order_acquire) >= n)
622
2680965
        return;
623
624
441
    p = f->owner[field]->internal->thread_ctx;
625
626
441
    if (atomic_load_explicit(&p->debug_threads, memory_order_relaxed))
627
        av_log(f->owner[field], AV_LOG_DEBUG,
628
               "thread awaiting %d field %d from %p\n", n, field, progress);
629
630
441
    pthread_mutex_lock(&p->progress_mutex);
631
895
    while (atomic_load_explicit(&progress[field], memory_order_relaxed) < n)
632
454
        pthread_cond_wait(&p->progress_cond, &p->progress_mutex);
633
441
    pthread_mutex_unlock(&p->progress_mutex);
634
}
635
636
29190
void ff_thread_finish_setup(AVCodecContext *avctx) {
637
29190
    PerThreadContext *p = avctx->internal->thread_ctx;
638
639
29190
    if (!(avctx->active_thread_type&FF_THREAD_FRAME)) return;
640
641

533
    if (avctx->hwaccel && !p->hwaccel_serializing) {
642
        pthread_mutex_lock(&p->parent->hwaccel_mutex);
643
        p->hwaccel_serializing = 1;
644
    }
645
646
    /* this assumes that no hwaccel calls happen before ff_thread_finish_setup() */
647
533
    if (avctx->hwaccel &&
648
        !(avctx->hwaccel->caps_internal & HWACCEL_CAP_ASYNC_SAFE)) {
649
        p->async_serializing = 1;
650
651
        async_lock(p->parent);
652
    }
653
654
533
    pthread_mutex_lock(&p->progress_mutex);
655
533
    if(atomic_load(&p->state) == STATE_SETUP_FINISHED){
656
        av_log(avctx, AV_LOG_WARNING, "Multiple ff_thread_finish_setup() calls\n");
657
    }
658
659
533
    atomic_store(&p->state, STATE_SETUP_FINISHED);
660
661
533
    pthread_cond_broadcast(&p->progress_cond);
662
533
    pthread_mutex_unlock(&p->progress_mutex);
663
}
664
665
/// Waits for all threads to finish.
666
12
static void park_frame_worker_threads(FrameThreadContext *fctx, int thread_count)
667
{
668
    int i;
669
670
12
    async_unlock(fctx);
671
672
106
    for (i = 0; i < thread_count; i++) {
673
94
        PerThreadContext *p = &fctx->threads[i];
674
675
94
        if (atomic_load(&p->state) != STATE_INPUT_READY) {
676
            pthread_mutex_lock(&p->progress_mutex);
677
            while (atomic_load(&p->state) != STATE_INPUT_READY)
678
                pthread_cond_wait(&p->output_cond, &p->progress_mutex);
679
            pthread_mutex_unlock(&p->progress_mutex);
680
        }
681
94
        p->got_frame = 0;
682
    }
683
684
12
    async_lock(fctx);
685
12
}
686
687
#define SENTINEL 0 // This forbids putting a mutex/condition variable at the front.
688
#define OFFSET_ARRAY(...) __VA_ARGS__, SENTINEL
689
#define DEFINE_OFFSET_ARRAY(type, name, mutexes, conds)                       \
690
static const unsigned name ## _offsets[] = { offsetof(type, pthread_init_cnt),\
691
                                             OFFSET_ARRAY mutexes,            \
692
                                             OFFSET_ARRAY conds }
693
694
#define OFF(member) offsetof(FrameThreadContext, member)
695
DEFINE_OFFSET_ARRAY(FrameThreadContext, thread_ctx,
696
                    (OFF(buffer_mutex), OFF(hwaccel_mutex), OFF(async_mutex)),
697
                    (OFF(async_cond)));
698
#undef OFF
699
700
#define OFF(member) offsetof(PerThreadContext, member)
701
DEFINE_OFFSET_ARRAY(PerThreadContext, per_thread,
702
                    (OFF(progress_mutex), OFF(mutex)),
703
                    (OFF(input_cond), OFF(progress_cond), OFF(output_cond)));
704
#undef OFF
705
706
106
static av_cold void free_pthread(void *obj, const unsigned offsets[])
707
{
708
106
    unsigned cnt = *(unsigned*)((char*)obj + offsets[0]);
709
106
    const unsigned *cur_offset = offsets;
710
711

330
    for (; *(++cur_offset) != SENTINEL && cnt; cnt--)
712
224
        pthread_mutex_destroy((pthread_mutex_t*)((char*)obj + *cur_offset));
713

400
    for (; *(++cur_offset) != SENTINEL && cnt; cnt--)
714
294
        pthread_cond_destroy ((pthread_cond_t *)((char*)obj + *cur_offset));
715
106
}
716
717
106
static av_cold int init_pthread(void *obj, const unsigned offsets[])
718
{
719
106
    const unsigned *cur_offset = offsets;
720
106
    unsigned cnt = 0;
721
    int err;
722
723
#define PTHREAD_INIT_LOOP(type)                                               \
724
    for (; *(++cur_offset) != SENTINEL; cnt++) {                              \
725
        pthread_ ## type ## _t *dst = (void*)((char*)obj + *cur_offset);      \
726
        err = pthread_ ## type ## _init(dst, NULL);                           \
727
        if (err) {                                                            \
728
            err = AVERROR(err);                                               \
729
            goto fail;                                                        \
730
        }                                                                     \
731
    }
732

330
    PTHREAD_INIT_LOOP(mutex)
733

400
    PTHREAD_INIT_LOOP(cond)
734
735
106
fail:
736
106
    *(unsigned*)((char*)obj + offsets[0]) = cnt;
737
106
    return err;
738
}
739
740
12
void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
741
{
742
12
    FrameThreadContext *fctx = avctx->internal->thread_ctx;
743
12
    const AVCodec *codec = avctx->codec;
744
    int i;
745
746
12
    park_frame_worker_threads(fctx, thread_count);
747
748
12
    if (fctx->prev_thread && avctx->internal->hwaccel_priv_data !=
749
12
                             fctx->prev_thread->avctx->internal->hwaccel_priv_data) {
750
        if (update_context_from_thread(avctx, fctx->prev_thread->avctx, 1) < 0) {
751
            av_log(avctx, AV_LOG_ERROR, "Failed to update user thread.\n");
752
        }
753
    }
754
755

12
    if (fctx->prev_thread && fctx->prev_thread != fctx->threads)
756
7
        if (update_context_from_thread(fctx->threads->avctx, fctx->prev_thread->avctx, 0) < 0) {
757
            av_log(avctx, AV_LOG_ERROR, "Final thread update failed\n");
758
            fctx->prev_thread->avctx->internal->is_copy = fctx->threads->avctx->internal->is_copy;
759
            fctx->threads->avctx->internal->is_copy = 1;
760
        }
761
762
106
    for (i = 0; i < thread_count; i++) {
763
94
        PerThreadContext *p = &fctx->threads[i];
764
94
        AVCodecContext *ctx = p->avctx;
765
766
94
        if (ctx->internal) {
767
94
            if (p->thread_init == INITIALIZED) {
768
94
                pthread_mutex_lock(&p->mutex);
769
94
                p->die = 1;
770
94
                pthread_cond_signal(&p->input_cond);
771
94
                pthread_mutex_unlock(&p->mutex);
772
773
94
                pthread_join(p->thread, NULL);
774
            }
775

94
            if (codec->close && p->thread_init != UNINITIALIZED)
776
94
                codec->close(ctx);
777
778
#if FF_API_THREAD_SAFE_CALLBACKS
779
94
            release_delayed_buffers(p);
780
94
            for (int j = 0; j < p->released_buffers_allocated; j++)
781
                av_frame_free(&p->released_buffers[j]);
782
94
            av_freep(&p->released_buffers);
783
#endif
784
94
            if (ctx->priv_data) {
785
94
                if (codec->priv_class)
786
40
                    av_opt_free(ctx->priv_data);
787
94
                av_freep(&ctx->priv_data);
788
            }
789
790
94
            av_freep(&ctx->slice_offset);
791
792
94
            av_buffer_unref(&ctx->internal->pool);
793
94
            av_freep(&ctx->internal);
794
94
            av_buffer_unref(&ctx->hw_frames_ctx);
795
        }
796
797
94
        av_frame_free(&p->frame);
798
799
94
        free_pthread(p, per_thread_offsets);
800
94
        av_packet_free(&p->avpkt);
801
802
94
        av_freep(&p->avctx);
803
    }
804
805
12
    av_freep(&fctx->threads);
806
12
    free_pthread(fctx, thread_ctx_offsets);
807
808
12
    av_freep(&avctx->internal->thread_ctx);
809
810

12
    if (avctx->priv_data && avctx->codec && avctx->codec->priv_class)
811
6
        av_opt_free(avctx->priv_data);
812
12
    avctx->codec = NULL;
813
12
}
814
815
94
static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
816
                               FrameThreadContext *fctx, AVCodecContext *avctx,
817
                               AVCodecContext *src, const AVCodec *codec, int first)
818
{
819
    AVCodecContext *copy;
820
    int err;
821
822
94
    atomic_init(&p->state, STATE_INPUT_READY);
823
824
94
    copy = av_memdup(src, sizeof(*src));
825
94
    if (!copy)
826
        return AVERROR(ENOMEM);
827
94
    copy->priv_data = NULL;
828
829
    /* From now on, this PerThreadContext will be cleaned up by
830
     * ff_frame_thread_free in case of errors. */
831
94
    (*threads_to_free)++;
832
833
94
    p->parent = fctx;
834
94
    p->avctx  = copy;
835
836
94
    copy->internal = av_memdup(src->internal, sizeof(*src->internal));
837
94
    if (!copy->internal)
838
        return AVERROR(ENOMEM);
839
94
    copy->internal->thread_ctx = p;
840
841
94
    copy->delay = avctx->delay;
842
843
94
    if (codec->priv_data_size) {
844
94
        copy->priv_data = av_mallocz(codec->priv_data_size);
845
94
        if (!copy->priv_data)
846
            return AVERROR(ENOMEM);
847
848
94
        if (codec->priv_class) {
849
40
            *(const AVClass **)copy->priv_data = codec->priv_class;
850
40
            err = av_opt_copy(copy->priv_data, src->priv_data);
851
40
            if (err < 0)
852
                return err;
853
        }
854
    }
855
856
94
    err = init_pthread(p, per_thread_offsets);
857
94
    if (err < 0)
858
        return err;
859
860
94
    if (!(p->frame = av_frame_alloc()) ||
861
94
        !(p->avpkt = av_packet_alloc()))
862
        return AVERROR(ENOMEM);
863
94
    copy->internal->last_pkt_props = p->avpkt;
864
865
94
    if (!first)
866
82
        copy->internal->is_copy = 1;
867
868
94
    if (codec->init) {
869
94
        err = codec->init(copy);
870
94
        if (err < 0) {
871
            if (codec->caps_internal & FF_CODEC_CAP_INIT_CLEANUP)
872
                p->thread_init = NEEDS_CLOSE;
873
            return err;
874
        }
875
    }
876
94
    p->thread_init = NEEDS_CLOSE;
877
878
94
    if (first)
879
12
        update_context_from_thread(avctx, copy, 1);
880
881
94
    atomic_init(&p->debug_threads, (copy->debug & FF_DEBUG_THREADS) != 0);
882
883
94
    err = AVERROR(pthread_create(&p->thread, NULL, frame_worker_thread, p));
884
94
    if (err < 0)
885
        return err;
886
94
    p->thread_init = INITIALIZED;
887
888
94
    return 0;
889
}
890
891
12
int ff_frame_thread_init(AVCodecContext *avctx)
892
{
893
12
    int thread_count = avctx->thread_count;
894
12
    const AVCodec *codec = avctx->codec;
895
12
    AVCodecContext *src = avctx;
896
    FrameThreadContext *fctx;
897
12
    int err, i = 0;
898
899
12
    if (!thread_count) {
900
4
        int nb_cpus = av_cpu_count();
901
        // use number of cores + 1 as thread count if there is more than one
902
4
        if (nb_cpus > 1)
903
4
            thread_count = avctx->thread_count = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS);
904
        else
905
            thread_count = avctx->thread_count = 1;
906
    }
907
908
12
    if (thread_count <= 1) {
909
        avctx->active_thread_type = 0;
910
        return 0;
911
    }
912
913
12
    avctx->internal->thread_ctx = fctx = av_mallocz(sizeof(FrameThreadContext));
914
12
    if (!fctx)
915
        return AVERROR(ENOMEM);
916
917
12
    err = init_pthread(fctx, thread_ctx_offsets);
918
12
    if (err < 0) {
919
        free_pthread(fctx, thread_ctx_offsets);
920
        av_freep(&avctx->internal->thread_ctx);
921
        return err;
922
    }
923
924
12
    fctx->async_lock = 1;
925
12
    fctx->delaying = 1;
926
927
12
    if (codec->type == AVMEDIA_TYPE_VIDEO)
928
8
        avctx->delay = src->thread_count - 1;
929
930
12
    fctx->threads = av_mallocz_array(thread_count, sizeof(PerThreadContext));
931
12
    if (!fctx->threads) {
932
        err = AVERROR(ENOMEM);
933
        goto error;
934
    }
935
936
106
    for (; i < thread_count; ) {
937
94
        PerThreadContext *p  = &fctx->threads[i];
938
94
        int first = !i;
939
940
94
        err = init_thread(p, &i, fctx, avctx, src, codec, first);
941
94
        if (err < 0)
942
            goto error;
943
    }
944
945
12
    return 0;
946
947
error:
948
    ff_frame_thread_free(avctx, i);
949
    return err;
950
}
951
952
void ff_thread_flush(AVCodecContext *avctx)
953
{
954
    int i;
955
    FrameThreadContext *fctx = avctx->internal->thread_ctx;
956
957
    if (!fctx) return;
958
959
    park_frame_worker_threads(fctx, avctx->thread_count);
960
    if (fctx->prev_thread) {
961
        if (fctx->prev_thread != &fctx->threads[0])
962
            update_context_from_thread(fctx->threads[0].avctx, fctx->prev_thread->avctx, 0);
963
    }
964
965
    fctx->next_decoding = fctx->next_finished = 0;
966
    fctx->delaying = 1;
967
    fctx->prev_thread = NULL;
968
    for (i = 0; i < avctx->thread_count; i++) {
969
        PerThreadContext *p = &fctx->threads[i];
970
        // Make sure decode flush calls with size=0 won't return old frames
971
        p->got_frame = 0;
972
        av_frame_unref(p->frame);
973
        p->result = 0;
974
975
#if FF_API_THREAD_SAFE_CALLBACKS
976
        release_delayed_buffers(p);
977
#endif
978
979
        if (avctx->codec->flush)
980
            avctx->codec->flush(p->avctx);
981
    }
982
}
983
984
34216
int ff_thread_can_start_frame(AVCodecContext *avctx)
985
{
986
34216
    PerThreadContext *p = avctx->internal->thread_ctx;
987
FF_DISABLE_DEPRECATION_WARNINGS
988

34216
    if ((avctx->active_thread_type&FF_THREAD_FRAME) && atomic_load(&p->state) != STATE_SETTING_UP &&
989
        (avctx->codec->update_thread_context
990
#if FF_API_THREAD_SAFE_CALLBACKS
991
         || !THREAD_SAFE_CALLBACKS(avctx)
992
#endif
993
         )) {
994
        return 0;
995
    }
996
FF_ENABLE_DEPRECATION_WARNINGS
997
34216
    return 1;
998
}
999
1000
79869
static int thread_get_buffer_internal(AVCodecContext *avctx, ThreadFrame *f, int flags)
1001
{
1002
79869
    PerThreadContext *p = avctx->internal->thread_ctx;
1003
    int err;
1004
1005
79869
    f->owner[0] = f->owner[1] = avctx;
1006
1007
79869
    if (!(avctx->active_thread_type & FF_THREAD_FRAME))
1008
79348
        return ff_get_buffer(avctx, f->f, flags);
1009
1010
FF_DISABLE_DEPRECATION_WARNINGS
1011
521
    if (atomic_load(&p->state) != STATE_SETTING_UP &&
1012
453
        (avctx->codec->update_thread_context
1013
#if FF_API_THREAD_SAFE_CALLBACKS
1014

453
         || !THREAD_SAFE_CALLBACKS(avctx)
1015
#endif
1016
         )) {
1017
FF_ENABLE_DEPRECATION_WARNINGS
1018
        av_log(avctx, AV_LOG_ERROR, "get_buffer() cannot be called after ff_thread_finish_setup()\n");
1019
        return -1;
1020
    }
1021
1022
521
    if (avctx->codec->caps_internal & FF_CODEC_CAP_ALLOCATE_PROGRESS) {
1023
        atomic_int *progress;
1024
68
        f->progress = av_buffer_alloc(2 * sizeof(*progress));
1025
68
        if (!f->progress) {
1026
            return AVERROR(ENOMEM);
1027
        }
1028
68
        progress = (atomic_int*)f->progress->data;
1029
1030
68
        atomic_init(&progress[0], -1);
1031
68
        atomic_init(&progress[1], -1);
1032
    }
1033
1034
521
    pthread_mutex_lock(&p->parent->buffer_mutex);
1035
#if !FF_API_THREAD_SAFE_CALLBACKS
1036
    err = ff_get_buffer(avctx, f->f, flags);
1037
#else
1038
FF_DISABLE_DEPRECATION_WARNINGS
1039

521
    if (THREAD_SAFE_CALLBACKS(avctx)) {
1040
521
        err = ff_get_buffer(avctx, f->f, flags);
1041
    } else {
1042
        pthread_mutex_lock(&p->progress_mutex);
1043
        p->requested_frame = f->f;
1044
        p->requested_flags = flags;
1045
        atomic_store_explicit(&p->state, STATE_GET_BUFFER, memory_order_release);
1046
        pthread_cond_broadcast(&p->progress_cond);
1047
1048
        while (atomic_load(&p->state) != STATE_SETTING_UP)
1049
            pthread_cond_wait(&p->progress_cond, &p->progress_mutex);
1050
1051
        err = p->result;
1052
1053
        pthread_mutex_unlock(&p->progress_mutex);
1054
1055
    }
1056

521
    if (!THREAD_SAFE_CALLBACKS(avctx) && !avctx->codec->update_thread_context)
1057
        ff_thread_finish_setup(avctx);
1058
FF_ENABLE_DEPRECATION_WARNINGS
1059
#endif
1060
521
    if (err)
1061
        av_buffer_unref(&f->progress);
1062
1063
521
    pthread_mutex_unlock(&p->parent->buffer_mutex);
1064
1065
521
    return err;
1066
}
1067
1068
#if FF_API_THREAD_SAFE_CALLBACKS
1069
FF_DISABLE_DEPRECATION_WARNINGS
1070
1643
enum AVPixelFormat ff_thread_get_format(AVCodecContext *avctx, const enum AVPixelFormat *fmt)
1071
{
1072
    enum AVPixelFormat res;
1073
1643
    PerThreadContext *p = avctx->internal->thread_ctx;
1074

1643
    if (!(avctx->active_thread_type & FF_THREAD_FRAME) || avctx->thread_safe_callbacks ||
1075
1
        avctx->get_format == avcodec_default_get_format)
1076
1643
        return ff_get_format(avctx, fmt);
1077
    if (atomic_load(&p->state) != STATE_SETTING_UP) {
1078
        av_log(avctx, AV_LOG_ERROR, "get_format() cannot be called after ff_thread_finish_setup()\n");
1079
        return -1;
1080
    }
1081
    pthread_mutex_lock(&p->progress_mutex);
1082
    p->available_formats = fmt;
1083
    atomic_store(&p->state, STATE_GET_FORMAT);
1084
    pthread_cond_broadcast(&p->progress_cond);
1085
1086
    while (atomic_load(&p->state) != STATE_SETTING_UP)
1087
        pthread_cond_wait(&p->progress_cond, &p->progress_mutex);
1088
1089
    res = p->result_format;
1090
1091
    pthread_mutex_unlock(&p->progress_mutex);
1092
1093
    return res;
1094
}
1095
FF_ENABLE_DEPRECATION_WARNINGS
1096
#endif
1097
1098
79869
int ff_thread_get_buffer(AVCodecContext *avctx, ThreadFrame *f, int flags)
1099
{
1100
79869
    int ret = thread_get_buffer_internal(avctx, f, flags);
1101
79869
    if (ret < 0)
1102
93
        av_log(avctx, AV_LOG_ERROR, "thread_get_buffer() failed\n");
1103
79869
    return ret;
1104
}
1105
1106
904541
void ff_thread_release_buffer(AVCodecContext *avctx, ThreadFrame *f)
1107
{
1108
#if FF_API_THREAD_SAFE_CALLBACKS
1109
FF_DISABLE_DEPRECATION_WARNINGS
1110
904541
    PerThreadContext *p = avctx->internal->thread_ctx;
1111
    FrameThreadContext *fctx;
1112
    AVFrame *dst;
1113
904541
    int ret = 0;
1114
905014
    int can_direct_free = !(avctx->active_thread_type & FF_THREAD_FRAME) ||
1115

473
                          THREAD_SAFE_CALLBACKS(avctx);
1116
FF_ENABLE_DEPRECATION_WARNINGS
1117
#endif
1118
1119
904541
    if (!f->f)
1120
263
        return;
1121
1122
904278
    if (avctx->debug & FF_DEBUG_BUFFERS)
1123
        av_log(avctx, AV_LOG_DEBUG, "thread_release_buffer called on pic %p\n", f);
1124
1125
904278
    av_buffer_unref(&f->progress);
1126
904278
    f->owner[0] = f->owner[1] = NULL;
1127
1128
#if !FF_API_THREAD_SAFE_CALLBACKS
1129
    av_frame_unref(f->f);
1130
#else
1131
    // when the frame buffers are not allocated, just reset it to clean state
1132

904278
    if (can_direct_free || !f->f->buf[0]) {
1133
904278
        av_frame_unref(f->f);
1134
904278
        return;
1135
    }
1136
1137
    fctx = p->parent;
1138
    pthread_mutex_lock(&fctx->buffer_mutex);
1139
1140
    if (p->num_released_buffers == p->released_buffers_allocated) {
1141
        AVFrame **tmp = av_realloc_array(p->released_buffers, p->released_buffers_allocated + 1,
1142
                                         sizeof(*p->released_buffers));
1143
        if (tmp) {
1144
            tmp[p->released_buffers_allocated] = av_frame_alloc();
1145
            p->released_buffers = tmp;
1146
        }
1147
1148
        if (!tmp || !tmp[p->released_buffers_allocated]) {
1149
            ret = AVERROR(ENOMEM);
1150
            goto fail;
1151
        }
1152
        p->released_buffers_allocated++;
1153
    }
1154
1155
    dst = p->released_buffers[p->num_released_buffers];
1156
    av_frame_move_ref(dst, f->f);
1157
1158
    p->num_released_buffers++;
1159
1160
fail:
1161
    pthread_mutex_unlock(&fctx->buffer_mutex);
1162
1163
    // make sure the frame is clean even if we fail to free it
1164
    // this leaks, but it is better than crashing
1165
    if (ret < 0) {
1166
        av_log(avctx, AV_LOG_ERROR, "Could not queue a frame for freeing, this will leak\n");
1167
        memset(f->f->buf, 0, sizeof(f->f->buf));
1168
        if (f->f->extended_buf)
1169
            memset(f->f->extended_buf, 0, f->f->nb_extended_buf * sizeof(*f->f->extended_buf));
1170
        av_frame_unref(f->f);
1171
    }
1172
#endif
1173
}