FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/libavutil/executor.c
Date: 2025-01-20 09:27:23
Exec Total Coverage
Lines: 0 97 0.0%
Functions: 0 8 0.0%
Branches: 0 68 0.0%

Line Branch Exec Source
1 /*
2 * Copyright (C) 2023 Nuo Mi
3 *
4 * This file is part of FFmpeg.
5 *
6 * FFmpeg is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
10 *
11 * FFmpeg is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with FFmpeg; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21 #include "config.h"
22
23 #include <stdbool.h>
24
25 #include "mem.h"
26 #include "thread.h"
27
28 #include "executor.h"
29
30 #if !HAVE_THREADS
31
32 #define ExecutorThread char
33
34 #define executor_thread_create(t, a, s, ar) 0
35 #define executor_thread_join(t, r) do {} while(0)
36
37 #else
38
39 #define ExecutorThread pthread_t
40
41 #define executor_thread_create(t, a, s, ar) pthread_create(t, a, s, ar)
42 #define executor_thread_join(t, r) pthread_join(t, r)
43
44 #endif //!HAVE_THREADS
45
46 typedef struct ThreadInfo {
47 AVExecutor *e;
48 ExecutorThread thread;
49 } ThreadInfo;
50
51 struct AVExecutor {
52 AVTaskCallbacks cb;
53 int thread_count;
54 bool recursive;
55
56 ThreadInfo *threads;
57 uint8_t *local_contexts;
58
59 AVMutex lock;
60 AVCond cond;
61 int die;
62
63 AVTask *tasks;
64 };
65
66 static AVTask* remove_task(AVTask **prev, AVTask *t)
67 {
68 *prev = t->next;
69 t->next = NULL;
70 return t;
71 }
72
73 static void add_task(AVTask **prev, AVTask *t)
74 {
75 t->next = *prev;
76 *prev = t;
77 }
78
79 static int run_one_task(AVExecutor *e, void *lc)
80 {
81 AVTaskCallbacks *cb = &e->cb;
82 AVTask **prev;
83
84 for (prev = &e->tasks; *prev && !cb->ready(*prev, cb->user_data); prev = &(*prev)->next)
85 /* nothing */;
86 if (*prev) {
87 AVTask *t = remove_task(prev, *prev);
88 if (e->thread_count > 0)
89 ff_mutex_unlock(&e->lock);
90 cb->run(t, lc, cb->user_data);
91 if (e->thread_count > 0)
92 ff_mutex_lock(&e->lock);
93 return 1;
94 }
95 return 0;
96 }
97
98 #if HAVE_THREADS
99 static void *executor_worker_task(void *data)
100 {
101 ThreadInfo *ti = (ThreadInfo*)data;
102 AVExecutor *e = ti->e;
103 void *lc = e->local_contexts + (ti - e->threads) * e->cb.local_context_size;
104
105 ff_mutex_lock(&e->lock);
106 while (1) {
107 if (e->die) break;
108
109 if (!run_one_task(e, lc)) {
110 //no task in one loop
111 ff_cond_wait(&e->cond, &e->lock);
112 }
113 }
114 ff_mutex_unlock(&e->lock);
115 return NULL;
116 }
117 #endif
118
119 static void executor_free(AVExecutor *e, const int has_lock, const int has_cond)
120 {
121 if (e->thread_count) {
122 //signal die
123 ff_mutex_lock(&e->lock);
124 e->die = 1;
125 ff_cond_broadcast(&e->cond);
126 ff_mutex_unlock(&e->lock);
127
128 for (int i = 0; i < e->thread_count; i++)
129 executor_thread_join(e->threads[i].thread, NULL);
130 }
131 if (has_cond)
132 ff_cond_destroy(&e->cond);
133 if (has_lock)
134 ff_mutex_destroy(&e->lock);
135
136 av_free(e->threads);
137 av_free(e->local_contexts);
138
139 av_free(e);
140 }
141
142 AVExecutor* av_executor_alloc(const AVTaskCallbacks *cb, int thread_count)
143 {
144 AVExecutor *e;
145 int has_lock = 0, has_cond = 0;
146 if (!cb || !cb->user_data || !cb->ready || !cb->run || !cb->priority_higher)
147 return NULL;
148
149 e = av_mallocz(sizeof(*e));
150 if (!e)
151 return NULL;
152 e->cb = *cb;
153
154 e->local_contexts = av_calloc(FFMAX(thread_count, 1), e->cb.local_context_size);
155 if (!e->local_contexts)
156 goto free_executor;
157
158 e->threads = av_calloc(FFMAX(thread_count, 1), sizeof(*e->threads));
159 if (!e->threads)
160 goto free_executor;
161
162 if (!thread_count)
163 return e;
164
165 has_lock = !ff_mutex_init(&e->lock, NULL);
166 has_cond = !ff_cond_init(&e->cond, NULL);
167
168 if (!has_lock || !has_cond)
169 goto free_executor;
170
171 for (/* nothing */; e->thread_count < thread_count; e->thread_count++) {
172 ThreadInfo *ti = e->threads + e->thread_count;
173 ti->e = e;
174 if (executor_thread_create(&ti->thread, NULL, executor_worker_task, ti))
175 goto free_executor;
176 }
177 return e;
178
179 free_executor:
180 executor_free(e, has_lock, has_cond);
181 return NULL;
182 }
183
184 void av_executor_free(AVExecutor **executor)
185 {
186 int thread_count;
187
188 if (!executor || !*executor)
189 return;
190 thread_count = (*executor)->thread_count;
191 executor_free(*executor, thread_count, thread_count);
192 *executor = NULL;
193 }
194
195 void av_executor_execute(AVExecutor *e, AVTask *t)
196 {
197 AVTaskCallbacks *cb = &e->cb;
198 AVTask **prev;
199
200 if (e->thread_count)
201 ff_mutex_lock(&e->lock);
202 if (t) {
203 for (prev = &e->tasks; *prev && cb->priority_higher(*prev, t); prev = &(*prev)->next)
204 /* nothing */;
205 add_task(prev, t);
206 }
207 if (e->thread_count) {
208 ff_cond_signal(&e->cond);
209 ff_mutex_unlock(&e->lock);
210 }
211
212 if (!e->thread_count || !HAVE_THREADS) {
213 if (e->recursive)
214 return;
215 e->recursive = true;
216 // We are running in a single-threaded environment, so we must handle all tasks ourselves
217 while (run_one_task(e, e->local_contexts))
218 /* nothing */;
219 e->recursive = false;
220 }
221 }
222