LCOV - code coverage report
Current view: top level - libavutil - slicethread.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 107 119 89.9 %
Date: 2017-10-20 23:03:07 Functions: 5 5 100.0 %

          Line data    Source code
       1             : /*
       2             :  * This file is part of FFmpeg.
       3             :  *
       4             :  * FFmpeg is free software; you can redistribute it and/or
       5             :  * modify it under the terms of the GNU Lesser General Public
       6             :  * License as published by the Free Software Foundation; either
       7             :  * version 2.1 of the License, or (at your option) any later version.
       8             :  *
       9             :  * FFmpeg is distributed in the hope that it will be useful,
      10             :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      12             :  * Lesser General Public License for more details.
      13             :  *
      14             :  * You should have received a copy of the GNU Lesser General Public
      15             :  * License along with FFmpeg; if not, write to the Free Software
      16             :  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
      17             :  */
      18             : 
      19             : #include <stdatomic.h>
      20             : #include "slicethread.h"
      21             : #include "mem.h"
      22             : #include "thread.h"
      23             : #include "avassert.h"
      24             : 
      25             : #if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
      26             : 
      27             : typedef struct WorkerContext {
      28             :     AVSliceThread   *ctx;
      29             :     pthread_mutex_t mutex;
      30             :     pthread_cond_t  cond;
      31             :     pthread_t       thread;
      32             :     int             done;
      33             : } WorkerContext;
      34             : 
      35             : struct AVSliceThread {
      36             :     WorkerContext   *workers;
      37             :     int             nb_threads;
      38             :     int             nb_active_threads;
      39             :     int             nb_jobs;
      40             : 
      41             :     atomic_uint     first_job;
      42             :     atomic_uint     current_job;
      43             :     pthread_mutex_t done_mutex;
      44             :     pthread_cond_t  done_cond;
      45             :     int             done;
      46             :     int             finished;
      47             : 
      48             :     void            *priv;
      49             :     void            (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads);
      50             :     void            (*main_func)(void *priv);
      51             : };
      52             : 
      53       13205 : static int run_jobs(AVSliceThread *ctx)
      54             : {
      55       13205 :     unsigned nb_jobs    = ctx->nb_jobs;
      56       13205 :     unsigned nb_active_threads = ctx->nb_active_threads;
      57       13205 :     unsigned first_job    = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel);
      58       13205 :     unsigned current_job  = first_job;
      59             : 
      60             :     do {
      61       13205 :         ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads);
      62       13205 :     } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs);
      63             : 
      64       13205 :     return current_job == nb_jobs + nb_active_threads - 1;
      65             : }
      66             : 
      67       20137 : static void *attribute_align_arg thread_worker(void *v)
      68             : {
      69       20137 :     WorkerContext *w = v;
      70       20137 :     AVSliceThread *ctx = w->ctx;
      71             : 
      72       20137 :     pthread_mutex_lock(&w->mutex);
      73       20137 :     pthread_cond_signal(&w->cond);
      74             : 
      75             :     while (1) {
      76       41805 :         w->done = 1;
      77       92913 :         while (w->done)
      78       30971 :             pthread_cond_wait(&w->cond, &w->mutex);
      79             : 
      80       30971 :         if (ctx->finished) {
      81       20137 :             pthread_mutex_unlock(&w->mutex);
      82       40274 :             return NULL;
      83             :         }
      84             : 
      85       10834 :         if (run_jobs(ctx)) {
      86        1221 :             pthread_mutex_lock(&ctx->done_mutex);
      87        1221 :             ctx->done = 1;
      88        1221 :             pthread_cond_signal(&ctx->done_cond);
      89        1221 :             pthread_mutex_unlock(&ctx->done_mutex);
      90             :         }
      91             :     }
      92             : }
      93             : 
      94        2540 : int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
      95             :                               void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
      96             :                               void (*main_func)(void *priv),
      97             :                               int nb_threads)
      98             : {
      99             :     AVSliceThread *ctx;
     100             :     int nb_workers, i;
     101             : 
     102             : #if HAVE_W32THREADS
     103             :     w32thread_init();
     104             : #endif
     105             : 
     106        2540 :     av_assert0(nb_threads >= 0);
     107        2540 :     if (!nb_threads) {
     108        2513 :         int nb_cpus = av_cpu_count();
     109        2513 :         if (nb_cpus > 1)
     110        2513 :             nb_threads = nb_cpus + 1;
     111             :         else
     112           0 :             nb_threads = 1;
     113             :     }
     114             : 
     115        2540 :     nb_workers = nb_threads;
     116        2540 :     if (!main_func)
     117        2540 :         nb_workers--;
     118             : 
     119        2540 :     *pctx = ctx = av_mallocz(sizeof(*ctx));
     120        2540 :     if (!ctx)
     121           0 :         return AVERROR(ENOMEM);
     122             : 
     123        2540 :     if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) {
     124           0 :         av_freep(pctx);
     125           0 :         return AVERROR(ENOMEM);
     126             :     }
     127             : 
     128        2540 :     ctx->priv        = priv;
     129        2540 :     ctx->worker_func = worker_func;
     130        2540 :     ctx->main_func   = main_func;
     131        2540 :     ctx->nb_threads  = nb_threads;
     132        2540 :     ctx->nb_active_threads = 0;
     133        2540 :     ctx->nb_jobs     = 0;
     134        2540 :     ctx->finished    = 0;
     135             : 
     136        2540 :     atomic_init(&ctx->first_job, 0);
     137        2540 :     atomic_init(&ctx->current_job, 0);
     138        2540 :     pthread_mutex_init(&ctx->done_mutex, NULL);
     139        2540 :     pthread_cond_init(&ctx->done_cond, NULL);
     140        2540 :     ctx->done        = 0;
     141             : 
     142       22677 :     for (i = 0; i < nb_workers; i++) {
     143       20137 :         WorkerContext *w = &ctx->workers[i];
     144             :         int ret;
     145       20137 :         w->ctx = ctx;
     146       20137 :         pthread_mutex_init(&w->mutex, NULL);
     147       20137 :         pthread_cond_init(&w->cond, NULL);
     148       20137 :         pthread_mutex_lock(&w->mutex);
     149       20137 :         w->done = 0;
     150             : 
     151       20137 :         if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) {
     152           0 :             ctx->nb_threads = main_func ? i : i + 1;
     153           0 :             pthread_mutex_unlock(&w->mutex);
     154           0 :             pthread_cond_destroy(&w->cond);
     155           0 :             pthread_mutex_destroy(&w->mutex);
     156           0 :             avpriv_slicethread_free(pctx);
     157           0 :             return AVERROR(ret);
     158             :         }
     159             : 
     160       60411 :         while (!w->done)
     161       20137 :             pthread_cond_wait(&w->cond, &w->mutex);
     162       20137 :         pthread_mutex_unlock(&w->mutex);
     163             :     }
     164             : 
     165        2540 :     return nb_threads;
     166             : }
     167             : 
     168        2371 : void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
     169             : {
     170        2371 :     int nb_workers, i, is_last = 0;
     171             : 
     172        2371 :     av_assert0(nb_jobs > 0);
     173        2371 :     ctx->nb_jobs           = nb_jobs;
     174        2371 :     ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads);
     175        2371 :     atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed);
     176        2371 :     atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed);
     177        2371 :     nb_workers             = ctx->nb_active_threads;
     178        2371 :     if (!ctx->main_func || !execute_main)
     179        2371 :         nb_workers--;
     180             : 
     181       13205 :     for (i = 0; i < nb_workers; i++) {
     182       10834 :         WorkerContext *w = &ctx->workers[i];
     183       10834 :         pthread_mutex_lock(&w->mutex);
     184       10834 :         w->done = 0;
     185       10834 :         pthread_cond_signal(&w->cond);
     186       10834 :         pthread_mutex_unlock(&w->mutex);
     187             :     }
     188             : 
     189        2371 :     if (ctx->main_func && execute_main)
     190           0 :         ctx->main_func(ctx->priv);
     191             :     else
     192        2371 :         is_last = run_jobs(ctx);
     193             : 
     194        2371 :     if (!is_last) {
     195        1221 :         pthread_mutex_lock(&ctx->done_mutex);
     196        3663 :         while (!ctx->done)
     197        1221 :             pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex);
     198        1221 :         ctx->done = 0;
     199        1221 :         pthread_mutex_unlock(&ctx->done_mutex);
     200             :     }
     201        2371 : }
     202             : 
     203        2540 : void avpriv_slicethread_free(AVSliceThread **pctx)
     204             : {
     205             :     AVSliceThread *ctx;
     206             :     int nb_workers, i;
     207             : 
     208        2540 :     if (!pctx || !*pctx)
     209           0 :         return;
     210             : 
     211        2540 :     ctx = *pctx;
     212        2540 :     nb_workers = ctx->nb_threads;
     213        2540 :     if (!ctx->main_func)
     214        2540 :         nb_workers--;
     215             : 
     216        2540 :     ctx->finished = 1;
     217       22677 :     for (i = 0; i < nb_workers; i++) {
     218       20137 :         WorkerContext *w = &ctx->workers[i];
     219       20137 :         pthread_mutex_lock(&w->mutex);
     220       20137 :         w->done = 0;
     221       20137 :         pthread_cond_signal(&w->cond);
     222       20137 :         pthread_mutex_unlock(&w->mutex);
     223             :     }
     224             : 
     225       22677 :     for (i = 0; i < nb_workers; i++) {
     226       20137 :         WorkerContext *w = &ctx->workers[i];
     227       20137 :         pthread_join(w->thread, NULL);
     228       20137 :         pthread_cond_destroy(&w->cond);
     229       20137 :         pthread_mutex_destroy(&w->mutex);
     230             :     }
     231             : 
     232        2540 :     pthread_cond_destroy(&ctx->done_cond);
     233        2540 :     pthread_mutex_destroy(&ctx->done_mutex);
     234        2540 :     av_freep(&ctx->workers);
     235        2540 :     av_freep(pctx);
     236             : }
     237             : 
     238             : #else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
     239             : 
     240             : int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
     241             :                               void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
     242             :                               void (*main_func)(void *priv),
     243             :                               int nb_threads)
     244             : {
     245             :     *pctx = NULL;
     246             :     return AVERROR(EINVAL);
     247             : }
     248             : 
     249             : void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
     250             : {
     251             :     av_assert0(0);
     252             : }
     253             : 
     254             : void avpriv_slicethread_free(AVSliceThread **pctx)
     255             : {
     256             :     av_assert0(!pctx || !*pctx);
     257             : }
     258             : 
     259             : #endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */

Generated by: LCOV version 1.13