FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/fftools/thread_queue.c
Date: 2026-04-05 01:03:18
Exec Total Coverage
Lines: 106 120 88.3%
Functions: 8 8 100.0%
Branches: 45 58 77.6%

Line Branch Exec Source
1 /*
2 * This file is part of FFmpeg.
3 *
4 * FFmpeg is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * FFmpeg is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with FFmpeg; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18
19 #include <stdint.h>
20 #include <string.h>
21
22 #include "libavutil/avassert.h"
23 #include "libavutil/container_fifo.h"
24 #include "libavutil/error.h"
25 #include "libavutil/fifo.h"
26 #include "libavutil/frame.h"
27 #include "libavutil/intreadwrite.h"
28 #include "libavutil/mem.h"
29 #include "libavutil/thread.h"
30
31 #include "libavcodec/packet.h"
32
33 #include "thread_queue.h"
34
35 enum {
36 FINISHED_SEND = (1 << 0),
37 FINISHED_RECV = (1 << 1),
38 };
39
40 struct ThreadQueue {
41 int choked;
42 int *finished;
43 unsigned int nb_streams;
44
45 enum ThreadQueueType type;
46
47 AVContainerFifo *fifo;
48 AVFifo *fifo_stream_index;
49
50 pthread_mutex_t lock;
51 pthread_cond_t cond;
52 };
53
54 31885 void tq_free(ThreadQueue **ptq)
55 {
56 31885 ThreadQueue *tq = *ptq;
57
58
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 31882 times.
31885 if (!tq)
59 3 return;
60
61 31882 av_container_fifo_free(&tq->fifo);
62 31882 av_fifo_freep2(&tq->fifo_stream_index);
63
64 31882 av_freep(&tq->finished);
65
66 31882 pthread_cond_destroy(&tq->cond);
67 31882 pthread_mutex_destroy(&tq->lock);
68
69 31882 av_freep(ptq);
70 }
71
72 31882 ThreadQueue *tq_alloc(unsigned int nb_streams, size_t queue_size,
73 enum ThreadQueueType type)
74 {
75 ThreadQueue *tq;
76 int ret;
77
78 31882 tq = av_mallocz(sizeof(*tq));
79
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31882 times.
31882 if (!tq)
80 return NULL;
81
82 31882 ret = pthread_cond_init(&tq->cond, NULL);
83
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31882 times.
31882 if (ret) {
84 av_freep(&tq);
85 return NULL;
86 }
87
88 31882 ret = pthread_mutex_init(&tq->lock, NULL);
89
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31882 times.
31882 if (ret) {
90 pthread_cond_destroy(&tq->cond);
91 av_freep(&tq);
92 return NULL;
93 }
94
95 31882 tq->finished = av_calloc(nb_streams, sizeof(*tq->finished));
96
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31882 times.
31882 if (!tq->finished)
97 goto fail;
98 31882 tq->nb_streams = nb_streams;
99
100 31882 tq->type = type;
101
102 31882 tq->fifo = (type == THREAD_QUEUE_FRAMES) ?
103
2/2
✓ Branch 0 taken 16373 times.
✓ Branch 1 taken 15509 times.
31882 av_container_fifo_alloc_avframe(0) : av_container_fifo_alloc_avpacket(0);
104
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31882 times.
31882 if (!tq->fifo)
105 goto fail;
106
107 31882 tq->fifo_stream_index = av_fifo_alloc2(queue_size, sizeof(unsigned), 0);
108
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31882 times.
31882 if (!tq->fifo_stream_index)
109 goto fail;
110
111 31882 return tq;
112 fail:
113 tq_free(&tq);
114 return NULL;
115 }
116
117 1974924 int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
118 {
119 int *finished;
120 int ret;
121
122
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1974924 times.
1974924 av_assert0(stream_idx < tq->nb_streams);
123 1974924 finished = &tq->finished[stream_idx];
124
125 1974924 pthread_mutex_lock(&tq->lock);
126
127
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1974923 times.
1974924 if (*finished & FINISHED_SEND) {
128 1 ret = AVERROR(EINVAL);
129 1 goto finish;
130 }
131
132
4/4
✓ Branch 0 taken 2579786 times.
✓ Branch 1 taken 6848 times.
✓ Branch 3 taken 611711 times.
✓ Branch 4 taken 1968075 times.
2586634 while (!(*finished & FINISHED_RECV) && !av_fifo_can_write(tq->fifo_stream_index))
133 611711 pthread_cond_wait(&tq->cond, &tq->lock);
134
135
2/2
✓ Branch 0 taken 6848 times.
✓ Branch 1 taken 1968075 times.
1974923 if (*finished & FINISHED_RECV) {
136 6848 ret = AVERROR_EOF;
137 6848 *finished |= FINISHED_SEND;
138 } else {
139 1968075 ret = av_fifo_write(tq->fifo_stream_index, &stream_idx, 1);
140
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1968075 times.
1968075 if (ret < 0)
141 goto finish;
142
143 1968075 ret = av_container_fifo_write(tq->fifo, data, 0);
144
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1968075 times.
1968075 if (ret < 0)
145 goto finish;
146
147 1968075 pthread_cond_broadcast(&tq->cond);
148 }
149
150 1974924 finish:
151 1974924 pthread_mutex_unlock(&tq->lock);
152
153 1974924 return ret;
154 }
155
156 3006945 static int receive_locked(ThreadQueue *tq, int *stream_idx,
157 void *data)
158 {
159 3006945 unsigned int nb_finished = 0;
160
161
2/2
✓ Branch 0 taken 7170 times.
✓ Branch 1 taken 2999775 times.
3006945 if (tq->choked)
162 7170 return AVERROR(EAGAIN);
163
164
2/2
✓ Branch 1 taken 1933171 times.
✓ Branch 2 taken 1066763 times.
2999934 while (av_container_fifo_read(tq->fifo, data, 0) >= 0) {
165 unsigned idx;
166 int ret;
167
168 1933171 ret = av_fifo_read(tq->fifo_stream_index, &idx, 1);
169
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1933171 times.
1933171 av_assert0(ret >= 0);
170
2/2
✓ Branch 0 taken 159 times.
✓ Branch 1 taken 1933012 times.
1933171 if (tq->finished[idx] & FINISHED_RECV) {
171 159 (tq->type == THREAD_QUEUE_FRAMES) ?
172
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 159 times.
159 av_frame_unref(data) : av_packet_unref(data);
173 159 continue;
174 }
175
176 1933012 *stream_idx = idx;
177 1933012 return 0;
178 }
179
180
2/2
✓ Branch 0 taken 1337108 times.
✓ Branch 1 taken 1046099 times.
2383207 for (unsigned int i = 0; i < tq->nb_streams; i++) {
181
2/2
✓ Branch 0 taken 1301271 times.
✓ Branch 1 taken 35837 times.
1337108 if (!tq->finished[i])
182 1301271 continue;
183
184 /* return EOF to the consumer at most once for each stream */
185
2/2
✓ Branch 0 taken 20664 times.
✓ Branch 1 taken 15173 times.
35837 if (!(tq->finished[i] & FINISHED_RECV)) {
186 20664 tq->finished[i] |= FINISHED_RECV;
187 20664 *stream_idx = i;
188 20664 return AVERROR_EOF;
189 }
190
191 15173 nb_finished++;
192 }
193
194
2/2
✓ Branch 0 taken 8517 times.
✓ Branch 1 taken 1037582 times.
1046099 return nb_finished == tq->nb_streams ? AVERROR_EOF : AVERROR(EAGAIN);
195 }
196
197 1962193 int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
198 {
199 int ret;
200
201 1962193 *stream_idx = -1;
202
203 1962193 pthread_mutex_lock(&tq->lock);
204
205 1044752 while (1) {
206 3006945 size_t can_read = av_container_fifo_can_read(tq->fifo);
207
208 3006945 ret = receive_locked(tq, stream_idx, data);
209
210 // signal other threads if the fifo state changed
211
2/2
✓ Branch 1 taken 1933048 times.
✓ Branch 2 taken 1073897 times.
3006945 if (can_read != av_container_fifo_can_read(tq->fifo))
212 1933048 pthread_cond_broadcast(&tq->cond);
213
214
2/2
✓ Branch 0 taken 1044752 times.
✓ Branch 1 taken 1962193 times.
3006945 if (ret == AVERROR(EAGAIN)) {
215 1044752 pthread_cond_wait(&tq->cond, &tq->lock);
216 1044752 continue;
217 }
218
219 1962193 break;
220 }
221
222 1962193 pthread_mutex_unlock(&tq->lock);
223
224 1962193 return ret;
225 }
226
227 50040 void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
228 {
229
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50040 times.
50040 av_assert0(stream_idx < tq->nb_streams);
230
231 50040 pthread_mutex_lock(&tq->lock);
232
233 /* mark the stream as send-finished;
234 * next time the consumer thread tries to read this stream it will get
235 * an EOF and recv-finished flag will be set */
236 50040 tq->finished[stream_idx] |= FINISHED_SEND;
237 50040 tq->choked = 0;
238 50040 pthread_cond_broadcast(&tq->cond);
239
240 50040 pthread_mutex_unlock(&tq->lock);
241 50040 }
242
243 39715 void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
244 {
245
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 39715 times.
39715 av_assert0(stream_idx < tq->nb_streams);
246
247 39715 pthread_mutex_lock(&tq->lock);
248
249 /* mark the stream as recv-finished;
250 * next time the producer thread tries to send for this stream, it will
251 * get an EOF and send-finished flag will be set */
252 39715 tq->finished[stream_idx] |= FINISHED_RECV;
253 39715 pthread_cond_broadcast(&tq->cond);
254
255 39715 pthread_mutex_unlock(&tq->lock);
256 39715 }
257
258 26270 void tq_choke(ThreadQueue *tq, int choked)
259 {
260 26270 pthread_mutex_lock(&tq->lock);
261
262 26270 int prev_choked = tq->choked;
263 26270 tq->choked = choked;
264
2/2
✓ Branch 0 taken 23192 times.
✓ Branch 1 taken 3078 times.
26270 if (choked != prev_choked)
265 23192 pthread_cond_broadcast(&tq->cond);
266
267 26270 pthread_mutex_unlock(&tq->lock);
268 26270 }
269