FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/fftools/thread_queue.c
Date: 2025-10-19 14:07:46
Exec Total Coverage
Lines: 105 120 87.5%
Functions: 8 8 100.0%
Branches: 45 58 77.6%

Line Branch Exec Source
1 /*
2 * This file is part of FFmpeg.
3 *
4 * FFmpeg is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * FFmpeg is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with FFmpeg; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18
19 #include <stdint.h>
20 #include <string.h>
21
22 #include "libavutil/avassert.h"
23 #include "libavutil/container_fifo.h"
24 #include "libavutil/error.h"
25 #include "libavutil/fifo.h"
26 #include "libavutil/frame.h"
27 #include "libavutil/intreadwrite.h"
28 #include "libavutil/mem.h"
29 #include "libavutil/thread.h"
30
31 #include "libavcodec/packet.h"
32
33 #include "thread_queue.h"
34
35 enum {
36 FINISHED_SEND = (1 << 0),
37 FINISHED_RECV = (1 << 1),
38 };
39
40 struct ThreadQueue {
41 int choked;
42 int *finished;
43 unsigned int nb_streams;
44
45 enum ThreadQueueType type;
46
47 AVContainerFifo *fifo;
48 AVFifo *fifo_stream_index;
49
50 pthread_mutex_t lock;
51 pthread_cond_t cond;
52 };
53
54 31496 void tq_free(ThreadQueue **ptq)
55 {
56 31496 ThreadQueue *tq = *ptq;
57
58
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31496 times.
31496 if (!tq)
59 return;
60
61 31496 av_container_fifo_free(&tq->fifo);
62 31496 av_fifo_freep2(&tq->fifo_stream_index);
63
64 31496 av_freep(&tq->finished);
65
66 31496 pthread_cond_destroy(&tq->cond);
67 31496 pthread_mutex_destroy(&tq->lock);
68
69 31496 av_freep(ptq);
70 }
71
72 31496 ThreadQueue *tq_alloc(unsigned int nb_streams, size_t queue_size,
73 enum ThreadQueueType type)
74 {
75 ThreadQueue *tq;
76 int ret;
77
78 31496 tq = av_mallocz(sizeof(*tq));
79
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31496 times.
31496 if (!tq)
80 return NULL;
81
82 31496 ret = pthread_cond_init(&tq->cond, NULL);
83
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31496 times.
31496 if (ret) {
84 av_freep(&tq);
85 return NULL;
86 }
87
88 31496 ret = pthread_mutex_init(&tq->lock, NULL);
89
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31496 times.
31496 if (ret) {
90 pthread_cond_destroy(&tq->cond);
91 av_freep(&tq);
92 return NULL;
93 }
94
95 31496 tq->finished = av_calloc(nb_streams, sizeof(*tq->finished));
96
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31496 times.
31496 if (!tq->finished)
97 goto fail;
98 31496 tq->nb_streams = nb_streams;
99
100 31496 tq->type = type;
101
102 31496 tq->fifo = (type == THREAD_QUEUE_FRAMES) ?
103
2/2
✓ Branch 0 taken 16170 times.
✓ Branch 1 taken 15326 times.
31496 av_container_fifo_alloc_avframe(0) : av_container_fifo_alloc_avpacket(0);
104
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31496 times.
31496 if (!tq->fifo)
105 goto fail;
106
107 31496 tq->fifo_stream_index = av_fifo_alloc2(queue_size, sizeof(unsigned), 0);
108
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31496 times.
31496 if (!tq->fifo_stream_index)
109 goto fail;
110
111 31496 return tq;
112 fail:
113 tq_free(&tq);
114 return NULL;
115 }
116
117 1761144 int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
118 {
119 int *finished;
120 int ret;
121
122
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1761144 times.
1761144 av_assert0(stream_idx < tq->nb_streams);
123 1761144 finished = &tq->finished[stream_idx];
124
125 1761144 pthread_mutex_lock(&tq->lock);
126
127
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1761143 times.
1761144 if (*finished & FINISHED_SEND) {
128 1 ret = AVERROR(EINVAL);
129 1 goto finish;
130 }
131
132
4/4
✓ Branch 0 taken 2295098 times.
✓ Branch 1 taken 6803 times.
✓ Branch 3 taken 540758 times.
✓ Branch 4 taken 1754340 times.
2301901 while (!(*finished & FINISHED_RECV) && !av_fifo_can_write(tq->fifo_stream_index))
133 540758 pthread_cond_wait(&tq->cond, &tq->lock);
134
135
2/2
✓ Branch 0 taken 6803 times.
✓ Branch 1 taken 1754340 times.
1761143 if (*finished & FINISHED_RECV) {
136 6803 ret = AVERROR_EOF;
137 6803 *finished |= FINISHED_SEND;
138 } else {
139 1754340 ret = av_fifo_write(tq->fifo_stream_index, &stream_idx, 1);
140
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1754340 times.
1754340 if (ret < 0)
141 goto finish;
142
143 1754340 ret = av_container_fifo_write(tq->fifo, data, 0);
144
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1754340 times.
1754340 if (ret < 0)
145 goto finish;
146
147 1754340 pthread_cond_broadcast(&tq->cond);
148 }
149
150 1761144 finish:
151 1761144 pthread_mutex_unlock(&tq->lock);
152
153 1761144 return ret;
154 }
155
156 2694946 static int receive_locked(ThreadQueue *tq, int *stream_idx,
157 void *data)
158 {
159 2694946 unsigned int nb_finished = 0;
160
161
2/2
✓ Branch 0 taken 7060 times.
✓ Branch 1 taken 2687886 times.
2694946 if (tq->choked)
162 7060 return AVERROR(EAGAIN);
163
164
2/2
✓ Branch 1 taken 1719497 times.
✓ Branch 2 taken 968553 times.
2688050 while (av_container_fifo_read(tq->fifo, data, 0) >= 0) {
165 unsigned idx;
166 int ret;
167
168 1719497 ret = av_fifo_read(tq->fifo_stream_index, &idx, 1);
169
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1719497 times.
1719497 av_assert0(ret >= 0);
170
2/2
✓ Branch 0 taken 164 times.
✓ Branch 1 taken 1719333 times.
1719497 if (tq->finished[idx] & FINISHED_RECV) {
171 164 (tq->type == THREAD_QUEUE_FRAMES) ?
172
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 163 times.
164 av_frame_unref(data) : av_packet_unref(data);
173 164 continue;
174 }
175
176 1719333 *stream_idx = idx;
177 1719333 return 0;
178 }
179
180
2/2
✓ Branch 0 taken 1215338 times.
✓ Branch 1 taken 948160 times.
2163498 for (unsigned int i = 0; i < tq->nb_streams; i++) {
181
2/2
✓ Branch 0 taken 1180071 times.
✓ Branch 1 taken 35267 times.
1215338 if (!tq->finished[i])
182 1180071 continue;
183
184 /* return EOF to the consumer at most once for each stream */
185
2/2
✓ Branch 0 taken 20393 times.
✓ Branch 1 taken 14874 times.
35267 if (!(tq->finished[i] & FINISHED_RECV)) {
186 20393 tq->finished[i] |= FINISHED_RECV;
187 20393 *stream_idx = i;
188 20393 return AVERROR_EOF;
189 }
190
191 14874 nb_finished++;
192 }
193
194
2/2
✓ Branch 0 taken 8401 times.
✓ Branch 1 taken 939759 times.
948160 return nb_finished == tq->nb_streams ? AVERROR_EOF : AVERROR(EAGAIN);
195 }
196
197 1748127 int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
198 {
199 int ret;
200
201 1748127 *stream_idx = -1;
202
203 1748127 pthread_mutex_lock(&tq->lock);
204
205 946819 while (1) {
206 2694946 size_t can_read = av_container_fifo_can_read(tq->fifo);
207
208 2694946 ret = receive_locked(tq, stream_idx, data);
209
210 // signal other threads if the fifo state changed
211
2/2
✓ Branch 1 taken 1719367 times.
✓ Branch 2 taken 975579 times.
2694946 if (can_read != av_container_fifo_can_read(tq->fifo))
212 1719367 pthread_cond_broadcast(&tq->cond);
213
214
2/2
✓ Branch 0 taken 946819 times.
✓ Branch 1 taken 1748127 times.
2694946 if (ret == AVERROR(EAGAIN)) {
215 946819 pthread_cond_wait(&tq->cond, &tq->lock);
216 946819 continue;
217 }
218
219 1748127 break;
220 }
221
222 1748127 pthread_mutex_unlock(&tq->lock);
223
224 1748127 return ret;
225 }
226
227 49489 void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
228 {
229
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49489 times.
49489 av_assert0(stream_idx < tq->nb_streams);
230
231 49489 pthread_mutex_lock(&tq->lock);
232
233 /* mark the stream as send-finished;
234 * next time the consumer thread tries to read this stream it will get
235 * an EOF and recv-finished flag will be set */
236 49489 tq->finished[stream_idx] |= FINISHED_SEND;
237 49489 tq->choked = 0;
238 49489 pthread_cond_broadcast(&tq->cond);
239
240 49489 pthread_mutex_unlock(&tq->lock);
241 49489 }
242
243 39142 void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
244 {
245
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 39142 times.
39142 av_assert0(stream_idx < tq->nb_streams);
246
247 39142 pthread_mutex_lock(&tq->lock);
248
249 /* mark the stream as recv-finished;
250 * next time the producer thread tries to send for this stream, it will
251 * get an EOF and send-finished flag will be set */
252 39142 tq->finished[stream_idx] |= FINISHED_RECV;
253 39142 pthread_cond_broadcast(&tq->cond);
254
255 39142 pthread_mutex_unlock(&tq->lock);
256 39142 }
257
258 25792 void tq_choke(ThreadQueue *tq, int choked)
259 {
260 25792 pthread_mutex_lock(&tq->lock);
261
262 25792 int prev_choked = tq->choked;
263 25792 tq->choked = choked;
264
2/2
✓ Branch 0 taken 22736 times.
✓ Branch 1 taken 3056 times.
25792 if (choked != prev_choked)
265 22736 pthread_cond_broadcast(&tq->cond);
266
267 25792 pthread_mutex_unlock(&tq->lock);
268 25792 }
269