FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/tests/api/api-threadmessage-test.c
Date: 2025-01-20 09:27:23
Exec Total Coverage
Lines: 88 105 83.8%
Functions: 5 5 100.0%
Branches: 45 72 62.5%

Line Branch Exec Source
1 /*
2 * Permission is hereby granted, free of charge, to any person obtaining a copy
3 * of this software and associated documentation files (the "Software"), to deal
4 * in the Software without restriction, including without limitation the rights
5 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
6 * copies of the Software, and to permit persons to whom the Software is
7 * furnished to do so, subject to the following conditions:
8 *
9 * The above copyright notice and this permission notice shall be included in
10 * all copies or substantial portions of the Software.
11 *
12 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
13 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
14 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
15 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
16 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
17 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
18 * THE SOFTWARE.
19 */
20
21 /**
22 * Thread message API test
23 */
24
25 #include "libavutil/avassert.h"
26 #include "libavutil/avstring.h"
27 #include "libavutil/frame.h"
28 #include "libavutil/mem.h"
29 #include "libavutil/threadmessage.h"
30 #include "libavutil/thread.h" // not public
31
32 struct sender_data {
33 int id;
34 pthread_t tid;
35 int workload;
36 AVThreadMessageQueue *queue;
37 };
38
39 /* same as sender_data but shuffled for testing purpose */
40 struct receiver_data {
41 pthread_t tid;
42 int workload;
43 int id;
44 AVThreadMessageQueue *queue;
45 };
46
47 struct message {
48 AVFrame *frame;
49 // we add some junk in the message to make sure the message size is >
50 // sizeof(void*)
51 int magic;
52 };
53
54 #define MAGIC 0xdeadc0de
55
56 12 static void free_frame(void *arg)
57 {
58 12 struct message *msg = arg;
59
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 av_assert0(msg->magic == MAGIC);
60 12 av_frame_free(&msg->frame);
61 12 }
62
63 10 static void *sender_thread(void *arg)
64 {
65 10 int i, ret = 0;
66 10 struct sender_data *wd = arg;
67
68 10 av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload);
69
1/2
✓ Branch 0 taken 64 times.
✗ Branch 1 not taken.
64 for (i = 0; i < wd->workload; i++) {
70
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 60 times.
64 if (rand() % wd->workload < wd->workload / 10) {
71 4 av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id);
72 4 av_thread_message_flush(wd->queue);
73 } else {
74 char *val;
75 60 AVDictionary *meta = NULL;
76 60 struct message msg = {
77 .magic = MAGIC,
78 60 .frame = av_frame_alloc(),
79 };
80
81
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 if (!msg.frame) {
82 ret = AVERROR(ENOMEM);
83 10 break;
84 }
85
86 /* we add some metadata to identify the frames */
87 60 val = av_asprintf("frame %d/%d from sender %d",
88 i + 1, wd->workload, wd->id);
89
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 if (!val) {
90 av_frame_free(&msg.frame);
91 ret = AVERROR(ENOMEM);
92 break;
93 }
94 60 ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL);
95
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 if (ret < 0) {
96 av_frame_free(&msg.frame);
97 break;
98 }
99 60 msg.frame->metadata = meta;
100
101 /* allocate a real frame in order to simulate "real" work */
102 60 msg.frame->format = AV_PIX_FMT_RGBA;
103 60 msg.frame->width = 320;
104 60 msg.frame->height = 240;
105 60 ret = av_frame_get_buffer(msg.frame, 0);
106
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 if (ret < 0) {
107 av_frame_free(&msg.frame);
108 break;
109 }
110
111 /* push the frame in the common queue */
112 60 av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n",
113 wd->id, i + 1, wd->workload, msg.frame);
114 60 ret = av_thread_message_queue_send(wd->queue, &msg, 0);
115
2/2
✓ Branch 0 taken 10 times.
✓ Branch 1 taken 50 times.
60 if (ret < 0) {
116 10 av_frame_free(&msg.frame);
117 10 break;
118 }
119 }
120 }
121 10 av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n",
122 10 wd->id, av_err2str(ret));
123
1/2
✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
10 av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF);
124 10 return NULL;
125 }
126
127 2 static void *receiver_thread(void *arg)
128 {
129 2 int i, ret = 0;
130 2 struct receiver_data *rd = arg;
131
132
2/2
✓ Branch 0 taken 42 times.
✓ Branch 1 taken 1 times.
43 for (i = 0; i < rd->workload; i++) {
133
2/2
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 39 times.
42 if (rand() % rd->workload < rd->workload / 10) {
134 3 av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue, "
135 "discarding %d message(s)\n", rd->id,
136 av_thread_message_queue_nb_elems(rd->queue));
137 3 av_thread_message_flush(rd->queue);
138 } else {
139 struct message msg;
140 AVDictionary *meta;
141 AVDictionaryEntry *e;
142
143 39 ret = av_thread_message_queue_recv(rd->queue, &msg, 0);
144
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 38 times.
39 if (ret < 0)
145 1 break;
146
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
38 av_assert0(msg.magic == MAGIC);
147 38 meta = msg.frame->metadata;
148 38 e = av_dict_get(meta, "sig", NULL, 0);
149 38 av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame);
150 38 av_frame_free(&msg.frame);
151 }
152 }
153
154 2 av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i);
155
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF);
156
157 2 return NULL;
158 }
159
160 12 static int get_workload(int minv, int maxv)
161 {
162
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 return maxv == minv ? maxv : rand() % (maxv - minv) + minv;
163 }
164
165 1 int main(int ac, char **av)
166 {
167 1 int i, ret = 0;
168 int max_queue_size;
169 int nb_senders, sender_min_load, sender_max_load;
170 int nb_receivers, receiver_min_load, receiver_max_load;
171 struct sender_data *senders;
172 struct receiver_data *receivers;
173 1 AVThreadMessageQueue *queue = NULL;
174
175
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ac != 8) {
176 av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> "
177 "<nb_senders> <sender_min_send> <sender_max_send> "
178 "<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]);
179 return 1;
180 }
181
182 1 max_queue_size = atoi(av[1]);
183 1 nb_senders = atoi(av[2]);
184 1 sender_min_load = atoi(av[3]);
185 1 sender_max_load = atoi(av[4]);
186 1 nb_receivers = atoi(av[5]);
187 1 receiver_min_load = atoi(av[6]);
188 1 receiver_max_load = atoi(av[7]);
189
190
2/4
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 if (max_queue_size <= 0 ||
191
3/6
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
1 nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 ||
192
2/4
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
1 nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) {
193 av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n");
194 return 1;
195 }
196
197 1 av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / "
198 "%d receivers receiving [%d-%d]\n", max_queue_size,
199 nb_senders, sender_min_load, sender_max_load,
200 nb_receivers, receiver_min_load, receiver_max_load);
201
202 1 senders = av_calloc(nb_senders, sizeof(*senders));
203 1 receivers = av_calloc(nb_receivers, sizeof(*receivers));
204
2/4
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
1 if (!senders || !receivers) {
205 ret = AVERROR(ENOMEM);
206 goto end;
207 }
208
209 1 ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message));
210
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (ret < 0)
211 goto end;
212
213 1 av_thread_message_queue_set_free_func(queue, free_frame);
214
215 #define SPAWN_THREADS(type) do { \
216 for (i = 0; i < nb_##type##s; i++) { \
217 struct type##_data *td = &type##s[i]; \
218 \
219 td->id = i; \
220 td->queue = queue; \
221 td->workload = get_workload(type##_min_load, type##_max_load); \
222 \
223 ret = pthread_create(&td->tid, NULL, type##_thread, td); \
224 if (ret) { \
225 const int err = AVERROR(ret); \
226 av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type) \
227 " thread: %s\n", av_err2str(err)); \
228 goto end; \
229 } \
230 } \
231 } while (0)
232
233 #define WAIT_THREADS(type) do { \
234 for (i = 0; i < nb_##type##s; i++) { \
235 struct type##_data *td = &type##s[i]; \
236 \
237 ret = pthread_join(td->tid, NULL); \
238 if (ret) { \
239 const int err = AVERROR(ret); \
240 av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type) \
241 " thread: %s\n", av_err2str(err)); \
242 goto end; \
243 } \
244 } \
245 } while (0)
246
247
3/4
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
✓ Branch 6 taken 2 times.
✓ Branch 7 taken 1 times.
3 SPAWN_THREADS(receiver);
248
3/4
✗ Branch 2 not taken.
✓ Branch 3 taken 10 times.
✓ Branch 6 taken 10 times.
✓ Branch 7 taken 1 times.
11 SPAWN_THREADS(sender);
249
250
3/4
✗ Branch 1 not taken.
✓ Branch 2 taken 10 times.
✓ Branch 5 taken 10 times.
✓ Branch 6 taken 1 times.
11 WAIT_THREADS(sender);
251
3/4
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
✓ Branch 5 taken 2 times.
✓ Branch 6 taken 1 times.
3 WAIT_THREADS(receiver);
252
253 1 end:
254 1 av_thread_message_queue_free(&queue);
255 1 av_freep(&senders);
256 1 av_freep(&receivers);
257
258
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
1 if (ret < 0 && ret != AVERROR_EOF) {
259 av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret));
260 return 1;
261 }
262 1 return 0;
263 }
264