FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/fftools/thread_queue.c
Date: 2026-02-18 00:28:21
Exec Total Coverage
Lines: 106 120 88.3%
Functions: 8 8 100.0%
Branches: 46 58 79.3%

Line Branch Exec Source
1 /*
2 * This file is part of FFmpeg.
3 *
4 * FFmpeg is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * FFmpeg is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with FFmpeg; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18
19 #include <stdint.h>
20 #include <string.h>
21
22 #include "libavutil/avassert.h"
23 #include "libavutil/container_fifo.h"
24 #include "libavutil/error.h"
25 #include "libavutil/fifo.h"
26 #include "libavutil/frame.h"
27 #include "libavutil/intreadwrite.h"
28 #include "libavutil/mem.h"
29 #include "libavutil/thread.h"
30
31 #include "libavcodec/packet.h"
32
33 #include "thread_queue.h"
34
35 enum {
36 FINISHED_SEND = (1 << 0),
37 FINISHED_RECV = (1 << 1),
38 };
39
40 struct ThreadQueue {
41 int choked;
42 int *finished;
43 unsigned int nb_streams;
44
45 enum ThreadQueueType type;
46
47 AVContainerFifo *fifo;
48 AVFifo *fifo_stream_index;
49
50 pthread_mutex_t lock;
51 pthread_cond_t cond;
52 };
53
54 31718 void tq_free(ThreadQueue **ptq)
55 {
56 31718 ThreadQueue *tq = *ptq;
57
58
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 31715 times.
31718 if (!tq)
59 3 return;
60
61 31715 av_container_fifo_free(&tq->fifo);
62 31715 av_fifo_freep2(&tq->fifo_stream_index);
63
64 31715 av_freep(&tq->finished);
65
66 31715 pthread_cond_destroy(&tq->cond);
67 31715 pthread_mutex_destroy(&tq->lock);
68
69 31715 av_freep(ptq);
70 }
71
72 31715 ThreadQueue *tq_alloc(unsigned int nb_streams, size_t queue_size,
73 enum ThreadQueueType type)
74 {
75 ThreadQueue *tq;
76 int ret;
77
78 31715 tq = av_mallocz(sizeof(*tq));
79
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31715 times.
31715 if (!tq)
80 return NULL;
81
82 31715 ret = pthread_cond_init(&tq->cond, NULL);
83
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31715 times.
31715 if (ret) {
84 av_freep(&tq);
85 return NULL;
86 }
87
88 31715 ret = pthread_mutex_init(&tq->lock, NULL);
89
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31715 times.
31715 if (ret) {
90 pthread_cond_destroy(&tq->cond);
91 av_freep(&tq);
92 return NULL;
93 }
94
95 31715 tq->finished = av_calloc(nb_streams, sizeof(*tq->finished));
96
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31715 times.
31715 if (!tq->finished)
97 goto fail;
98 31715 tq->nb_streams = nb_streams;
99
100 31715 tq->type = type;
101
102 31715 tq->fifo = (type == THREAD_QUEUE_FRAMES) ?
103
2/2
✓ Branch 0 taken 16293 times.
✓ Branch 1 taken 15422 times.
31715 av_container_fifo_alloc_avframe(0) : av_container_fifo_alloc_avpacket(0);
104
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31715 times.
31715 if (!tq->fifo)
105 goto fail;
106
107 31715 tq->fifo_stream_index = av_fifo_alloc2(queue_size, sizeof(unsigned), 0);
108
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31715 times.
31715 if (!tq->fifo_stream_index)
109 goto fail;
110
111 31715 return tq;
112 fail:
113 tq_free(&tq);
114 return NULL;
115 }
116
117 1768281 int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
118 {
119 int *finished;
120 int ret;
121
122
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1768281 times.
1768281 av_assert0(stream_idx < tq->nb_streams);
123 1768281 finished = &tq->finished[stream_idx];
124
125 1768281 pthread_mutex_lock(&tq->lock);
126
127
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1768280 times.
1768281 if (*finished & FINISHED_SEND) {
128 1 ret = AVERROR(EINVAL);
129 1 goto finish;
130 }
131
132
4/4
✓ Branch 0 taken 2308437 times.
✓ Branch 1 taken 6804 times.
✓ Branch 3 taken 546961 times.
✓ Branch 4 taken 1761476 times.
2315241 while (!(*finished & FINISHED_RECV) && !av_fifo_can_write(tq->fifo_stream_index))
133 546961 pthread_cond_wait(&tq->cond, &tq->lock);
134
135
2/2
✓ Branch 0 taken 6804 times.
✓ Branch 1 taken 1761476 times.
1768280 if (*finished & FINISHED_RECV) {
136 6804 ret = AVERROR_EOF;
137 6804 *finished |= FINISHED_SEND;
138 } else {
139 1761476 ret = av_fifo_write(tq->fifo_stream_index, &stream_idx, 1);
140
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1761476 times.
1761476 if (ret < 0)
141 goto finish;
142
143 1761476 ret = av_container_fifo_write(tq->fifo, data, 0);
144
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1761476 times.
1761476 if (ret < 0)
145 goto finish;
146
147 1761476 pthread_cond_broadcast(&tq->cond);
148 }
149
150 1768281 finish:
151 1768281 pthread_mutex_unlock(&tq->lock);
152
153 1768281 return ret;
154 }
155
156 2705323 static int receive_locked(ThreadQueue *tq, int *stream_idx,
157 void *data)
158 {
159 2705323 unsigned int nb_finished = 0;
160
161
2/2
✓ Branch 0 taken 7058 times.
✓ Branch 1 taken 2698265 times.
2705323 if (tq->choked)
162 7058 return AVERROR(EAGAIN);
163
164
2/2
✓ Branch 1 taken 1726627 times.
✓ Branch 2 taken 971802 times.
2698429 while (av_container_fifo_read(tq->fifo, data, 0) >= 0) {
165 unsigned idx;
166 int ret;
167
168 1726627 ret = av_fifo_read(tq->fifo_stream_index, &idx, 1);
169
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1726627 times.
1726627 av_assert0(ret >= 0);
170
2/2
✓ Branch 0 taken 164 times.
✓ Branch 1 taken 1726463 times.
1726627 if (tq->finished[idx] & FINISHED_RECV) {
171 164 (tq->type == THREAD_QUEUE_FRAMES) ?
172
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 163 times.
164 av_frame_unref(data) : av_packet_unref(data);
173 164 continue;
174 }
175
176 1726463 *stream_idx = idx;
177 1726463 return 0;
178 }
179
180
2/2
✓ Branch 0 taken 1217659 times.
✓ Branch 1 taken 951254 times.
2168913 for (unsigned int i = 0; i < tq->nb_streams; i++) {
181
2/2
✓ Branch 0 taken 1183132 times.
✓ Branch 1 taken 34527 times.
1217659 if (!tq->finished[i])
182 1183132 continue;
183
184 /* return EOF to the consumer at most once for each stream */
185
2/2
✓ Branch 0 taken 20548 times.
✓ Branch 1 taken 13979 times.
34527 if (!(tq->finished[i] & FINISHED_RECV)) {
186 20548 tq->finished[i] |= FINISHED_RECV;
187 20548 *stream_idx = i;
188 20548 return AVERROR_EOF;
189 }
190
191 13979 nb_finished++;
192 }
193
194
2/2
✓ Branch 0 taken 8470 times.
✓ Branch 1 taken 942784 times.
951254 return nb_finished == tq->nb_streams ? AVERROR_EOF : AVERROR(EAGAIN);
195 }
196
197 1755481 int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
198 {
199 int ret;
200
201 1755481 *stream_idx = -1;
202
203 1755481 pthread_mutex_lock(&tq->lock);
204
205 949842 while (1) {
206 2705323 size_t can_read = av_container_fifo_can_read(tq->fifo);
207
208 2705323 ret = receive_locked(tq, stream_idx, data);
209
210 // signal other threads if the fifo state changed
211
2/2
✓ Branch 1 taken 1726498 times.
✓ Branch 2 taken 978825 times.
2705323 if (can_read != av_container_fifo_can_read(tq->fifo))
212 1726498 pthread_cond_broadcast(&tq->cond);
213
214
2/2
✓ Branch 0 taken 949842 times.
✓ Branch 1 taken 1755481 times.
2705323 if (ret == AVERROR(EAGAIN)) {
215 949842 pthread_cond_wait(&tq->cond, &tq->lock);
216 949842 continue;
217 }
218
219 1755481 break;
220 }
221
222 1755481 pthread_mutex_unlock(&tq->lock);
223
224 1755481 return ret;
225 }
226
227 49785 void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
228 {
229
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49785 times.
49785 av_assert0(stream_idx < tq->nb_streams);
230
231 49785 pthread_mutex_lock(&tq->lock);
232
233 /* mark the stream as send-finished;
234 * next time the consumer thread tries to read this stream it will get
235 * an EOF and recv-finished flag will be set */
236 49785 tq->finished[stream_idx] |= FINISHED_SEND;
237 49785 tq->choked = 0;
238 49785 pthread_cond_broadcast(&tq->cond);
239
240 49785 pthread_mutex_unlock(&tq->lock);
241 49785 }
242
243 39506 void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
244 {
245
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 39506 times.
39506 av_assert0(stream_idx < tq->nb_streams);
246
247 39506 pthread_mutex_lock(&tq->lock);
248
249 /* mark the stream as recv-finished;
250 * next time the producer thread tries to send for this stream, it will
251 * get an EOF and send-finished flag will be set */
252 39506 tq->finished[stream_idx] |= FINISHED_RECV;
253 39506 pthread_cond_broadcast(&tq->cond);
254
255 39506 pthread_mutex_unlock(&tq->lock);
256 39506 }
257
258 26034 void tq_choke(ThreadQueue *tq, int choked)
259 {
260 26034 pthread_mutex_lock(&tq->lock);
261
262 26034 int prev_choked = tq->choked;
263 26034 tq->choked = choked;
264
2/2
✓ Branch 0 taken 22950 times.
✓ Branch 1 taken 3084 times.
26034 if (choked != prev_choked)
265 22950 pthread_cond_broadcast(&tq->cond);
266
267 26034 pthread_mutex_unlock(&tq->lock);
268 26034 }
269