FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/fftools/thread_queue.c
Date: 2025-10-10 03:51:19
Exec Total Coverage
Lines: 105 120 87.5%
Functions: 8 8 100.0%
Branches: 44 58 75.9%

Line Branch Exec Source
1 /*
2 * This file is part of FFmpeg.
3 *
4 * FFmpeg is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * FFmpeg is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with FFmpeg; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18
19 #include <stdint.h>
20 #include <string.h>
21
22 #include "libavutil/avassert.h"
23 #include "libavutil/container_fifo.h"
24 #include "libavutil/error.h"
25 #include "libavutil/fifo.h"
26 #include "libavutil/frame.h"
27 #include "libavutil/intreadwrite.h"
28 #include "libavutil/mem.h"
29 #include "libavutil/thread.h"
30
31 #include "libavcodec/packet.h"
32
33 #include "thread_queue.h"
34
35 enum {
36 FINISHED_SEND = (1 << 0),
37 FINISHED_RECV = (1 << 1),
38 };
39
40 struct ThreadQueue {
41 int choked;
42 int *finished;
43 unsigned int nb_streams;
44
45 enum ThreadQueueType type;
46
47 AVContainerFifo *fifo;
48 AVFifo *fifo_stream_index;
49
50 pthread_mutex_t lock;
51 pthread_cond_t cond;
52 };
53
54 31496 void tq_free(ThreadQueue **ptq)
55 {
56 31496 ThreadQueue *tq = *ptq;
57
58
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31496 times.
31496 if (!tq)
59 return;
60
61 31496 av_container_fifo_free(&tq->fifo);
62 31496 av_fifo_freep2(&tq->fifo_stream_index);
63
64 31496 av_freep(&tq->finished);
65
66 31496 pthread_cond_destroy(&tq->cond);
67 31496 pthread_mutex_destroy(&tq->lock);
68
69 31496 av_freep(ptq);
70 }
71
72 31496 ThreadQueue *tq_alloc(unsigned int nb_streams, size_t queue_size,
73 enum ThreadQueueType type)
74 {
75 ThreadQueue *tq;
76 int ret;
77
78 31496 tq = av_mallocz(sizeof(*tq));
79
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31496 times.
31496 if (!tq)
80 return NULL;
81
82 31496 ret = pthread_cond_init(&tq->cond, NULL);
83
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31496 times.
31496 if (ret) {
84 av_freep(&tq);
85 return NULL;
86 }
87
88 31496 ret = pthread_mutex_init(&tq->lock, NULL);
89
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31496 times.
31496 if (ret) {
90 pthread_cond_destroy(&tq->cond);
91 av_freep(&tq);
92 return NULL;
93 }
94
95 31496 tq->finished = av_calloc(nb_streams, sizeof(*tq->finished));
96
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31496 times.
31496 if (!tq->finished)
97 goto fail;
98 31496 tq->nb_streams = nb_streams;
99
100 31496 tq->type = type;
101
102 31496 tq->fifo = (type == THREAD_QUEUE_FRAMES) ?
103
2/2
✓ Branch 0 taken 16170 times.
✓ Branch 1 taken 15326 times.
31496 av_container_fifo_alloc_avframe(0) : av_container_fifo_alloc_avpacket(0);
104
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31496 times.
31496 if (!tq->fifo)
105 goto fail;
106
107 31496 tq->fifo_stream_index = av_fifo_alloc2(queue_size, sizeof(unsigned), 0);
108
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31496 times.
31496 if (!tq->fifo_stream_index)
109 goto fail;
110
111 31496 return tq;
112 fail:
113 tq_free(&tq);
114 return NULL;
115 }
116
117 1761150 int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
118 {
119 int *finished;
120 int ret;
121
122
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1761150 times.
1761150 av_assert0(stream_idx < tq->nb_streams);
123 1761150 finished = &tq->finished[stream_idx];
124
125 1761150 pthread_mutex_lock(&tq->lock);
126
127
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1761149 times.
1761150 if (*finished & FINISHED_SEND) {
128 1 ret = AVERROR(EINVAL);
129 1 goto finish;
130 }
131
132
4/4
✓ Branch 0 taken 2299088 times.
✓ Branch 1 taken 6817 times.
✓ Branch 3 taken 544756 times.
✓ Branch 4 taken 1754332 times.
2305905 while (!(*finished & FINISHED_RECV) && !av_fifo_can_write(tq->fifo_stream_index))
133 544756 pthread_cond_wait(&tq->cond, &tq->lock);
134
135
2/2
✓ Branch 0 taken 6817 times.
✓ Branch 1 taken 1754332 times.
1761149 if (*finished & FINISHED_RECV) {
136 6817 ret = AVERROR_EOF;
137 6817 *finished |= FINISHED_SEND;
138 } else {
139 1754332 ret = av_fifo_write(tq->fifo_stream_index, &stream_idx, 1);
140
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1754332 times.
1754332 if (ret < 0)
141 goto finish;
142
143 1754332 ret = av_container_fifo_write(tq->fifo, data, 0);
144
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1754332 times.
1754332 if (ret < 0)
145 goto finish;
146
147 1754332 pthread_cond_broadcast(&tq->cond);
148 }
149
150 1761150 finish:
151 1761150 pthread_mutex_unlock(&tq->lock);
152
153 1761150 return ret;
154 }
155
156 2688621 static int receive_locked(ThreadQueue *tq, int *stream_idx,
157 void *data)
158 {
159 2688621 unsigned int nb_finished = 0;
160
161
2/2
✓ Branch 0 taken 7002 times.
✓ Branch 1 taken 2681619 times.
2688621 if (tq->choked)
162 7002 return AVERROR(EAGAIN);
163
164
2/2
✓ Branch 1 taken 1719500 times.
✓ Branch 2 taken 962274 times.
2681774 while (av_container_fifo_read(tq->fifo, data, 0) >= 0) {
165 unsigned idx;
166 int ret;
167
168 1719500 ret = av_fifo_read(tq->fifo_stream_index, &idx, 1);
169
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1719500 times.
1719500 av_assert0(ret >= 0);
170
2/2
✓ Branch 0 taken 155 times.
✓ Branch 1 taken 1719345 times.
1719500 if (tq->finished[idx] & FINISHED_RECV) {
171 155 (tq->type == THREAD_QUEUE_FRAMES) ?
172
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 155 times.
155 av_frame_unref(data) : av_packet_unref(data);
173 155 continue;
174 }
175
176 1719345 *stream_idx = idx;
177 1719345 return 0;
178 }
179
180
2/2
✓ Branch 0 taken 1211352 times.
✓ Branch 1 taken 941880 times.
2153232 for (unsigned int i = 0; i < tq->nb_streams; i++) {
181
2/2
✓ Branch 0 taken 1175291 times.
✓ Branch 1 taken 36061 times.
1211352 if (!tq->finished[i])
182 1175291 continue;
183
184 /* return EOF to the consumer at most once for each stream */
185
2/2
✓ Branch 0 taken 20394 times.
✓ Branch 1 taken 15667 times.
36061 if (!(tq->finished[i] & FINISHED_RECV)) {
186 20394 tq->finished[i] |= FINISHED_RECV;
187 20394 *stream_idx = i;
188 20394 return AVERROR_EOF;
189 }
190
191 15667 nb_finished++;
192 }
193
194
2/2
✓ Branch 0 taken 8401 times.
✓ Branch 1 taken 933479 times.
941880 return nb_finished == tq->nb_streams ? AVERROR_EOF : AVERROR(EAGAIN);
195 }
196
197 1748140 int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
198 {
199 int ret;
200
201 1748140 *stream_idx = -1;
202
203 1748140 pthread_mutex_lock(&tq->lock);
204
205 940481 while (1) {
206 2688621 size_t can_read = av_container_fifo_can_read(tq->fifo);
207
208 2688621 ret = receive_locked(tq, stream_idx, data);
209
210 // signal other threads if the fifo state changed
211
2/2
✓ Branch 1 taken 1719375 times.
✓ Branch 2 taken 969246 times.
2688621 if (can_read != av_container_fifo_can_read(tq->fifo))
212 1719375 pthread_cond_broadcast(&tq->cond);
213
214
2/2
✓ Branch 0 taken 940481 times.
✓ Branch 1 taken 1748140 times.
2688621 if (ret == AVERROR(EAGAIN)) {
215 940481 pthread_cond_wait(&tq->cond, &tq->lock);
216 940481 continue;
217 }
218
219 1748140 break;
220 }
221
222 1748140 pthread_mutex_unlock(&tq->lock);
223
224 1748140 return ret;
225 }
226
227 49489 void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
228 {
229
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49489 times.
49489 av_assert0(stream_idx < tq->nb_streams);
230
231 49489 pthread_mutex_lock(&tq->lock);
232
233 /* mark the stream as send-finished;
234 * next time the consumer thread tries to read this stream it will get
235 * an EOF and recv-finished flag will be set */
236 49489 tq->finished[stream_idx] |= FINISHED_SEND;
237 49489 tq->choked = 0;
238 49489 pthread_cond_broadcast(&tq->cond);
239
240 49489 pthread_mutex_unlock(&tq->lock);
241 49489 }
242
243 39142 void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
244 {
245
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 39142 times.
39142 av_assert0(stream_idx < tq->nb_streams);
246
247 39142 pthread_mutex_lock(&tq->lock);
248
249 /* mark the stream as recv-finished;
250 * next time the producer thread tries to send for this stream, it will
251 * get an EOF and send-finished flag will be set */
252 39142 tq->finished[stream_idx] |= FINISHED_RECV;
253 39142 pthread_cond_broadcast(&tq->cond);
254
255 39142 pthread_mutex_unlock(&tq->lock);
256 39142 }
257
258 25803 void tq_choke(ThreadQueue *tq, int choked)
259 {
260 25803 pthread_mutex_lock(&tq->lock);
261
262 25803 int prev_choked = tq->choked;
263 25803 tq->choked = choked;
264
2/2
✓ Branch 0 taken 22734 times.
✓ Branch 1 taken 3069 times.
25803 if (choked != prev_choked)
265 22734 pthread_cond_broadcast(&tq->cond);
266
267 25803 pthread_mutex_unlock(&tq->lock);
268 25803 }
269