Line | Branch | Exec | Source |
---|---|---|---|
1 | /* | ||
2 | * Permission is hereby granted, free of charge, to any person obtaining a copy | ||
3 | * of this software and associated documentation files (the "Software"), to deal | ||
4 | * in the Software without restriction, including without limitation the rights | ||
5 | * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
6 | * copies of the Software, and to permit persons to whom the Software is | ||
7 | * furnished to do so, subject to the following conditions: | ||
8 | * | ||
9 | * The above copyright notice and this permission notice shall be included in | ||
10 | * all copies or substantial portions of the Software. | ||
11 | * | ||
12 | * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
13 | * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
14 | * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL | ||
15 | * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
16 | * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
17 | * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
18 | * THE SOFTWARE. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * Thread message API test | ||
23 | */ | ||
24 | |||
25 | #include "libavutil/avassert.h" | ||
26 | #include "libavutil/avstring.h" | ||
27 | #include "libavutil/frame.h" | ||
28 | #include "libavutil/mem.h" | ||
29 | #include "libavutil/threadmessage.h" | ||
30 | #include "libavutil/thread.h" // not public | ||
31 | |||
32 | struct sender_data { | ||
33 | int id; | ||
34 | pthread_t tid; | ||
35 | int workload; | ||
36 | AVThreadMessageQueue *queue; | ||
37 | }; | ||
38 | |||
39 | /* same as sender_data but shuffled for testing purpose */ | ||
40 | struct receiver_data { | ||
41 | pthread_t tid; | ||
42 | int workload; | ||
43 | int id; | ||
44 | AVThreadMessageQueue *queue; | ||
45 | }; | ||
46 | |||
47 | struct message { | ||
48 | AVFrame *frame; | ||
49 | // we add some junk in the message to make sure the message size is > | ||
50 | // sizeof(void*) | ||
51 | int magic; | ||
52 | }; | ||
53 | |||
54 | #define MAGIC 0xdeadc0de | ||
55 | |||
56 | 12 | static void free_frame(void *arg) | |
57 | { | ||
58 | 12 | struct message *msg = arg; | |
59 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | av_assert0(msg->magic == MAGIC); |
60 | 12 | av_frame_free(&msg->frame); | |
61 | 12 | } | |
62 | |||
63 | 10 | static void *sender_thread(void *arg) | |
64 | { | ||
65 | 10 | int i, ret = 0; | |
66 | 10 | struct sender_data *wd = arg; | |
67 | |||
68 | 10 | av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload); | |
69 |
1/2✓ Branch 0 taken 64 times.
✗ Branch 1 not taken.
|
64 | for (i = 0; i < wd->workload; i++) { |
70 |
2/2✓ Branch 1 taken 4 times.
✓ Branch 2 taken 60 times.
|
64 | if (rand() % wd->workload < wd->workload / 10) { |
71 | 4 | av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id); | |
72 | 4 | av_thread_message_flush(wd->queue); | |
73 | } else { | ||
74 | char *val; | ||
75 | 60 | AVDictionary *meta = NULL; | |
76 | 60 | struct message msg = { | |
77 | .magic = MAGIC, | ||
78 | 60 | .frame = av_frame_alloc(), | |
79 | }; | ||
80 | |||
81 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
|
60 | if (!msg.frame) { |
82 | ✗ | ret = AVERROR(ENOMEM); | |
83 | 10 | break; | |
84 | } | ||
85 | |||
86 | /* we add some metadata to identify the frames */ | ||
87 | 60 | val = av_asprintf("frame %d/%d from sender %d", | |
88 | i + 1, wd->workload, wd->id); | ||
89 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
|
60 | if (!val) { |
90 | ✗ | av_frame_free(&msg.frame); | |
91 | ✗ | ret = AVERROR(ENOMEM); | |
92 | ✗ | break; | |
93 | } | ||
94 | 60 | ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL); | |
95 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
|
60 | if (ret < 0) { |
96 | ✗ | av_frame_free(&msg.frame); | |
97 | ✗ | break; | |
98 | } | ||
99 | 60 | msg.frame->metadata = meta; | |
100 | |||
101 | /* allocate a real frame in order to simulate "real" work */ | ||
102 | 60 | msg.frame->format = AV_PIX_FMT_RGBA; | |
103 | 60 | msg.frame->width = 320; | |
104 | 60 | msg.frame->height = 240; | |
105 | 60 | ret = av_frame_get_buffer(msg.frame, 0); | |
106 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
|
60 | if (ret < 0) { |
107 | ✗ | av_frame_free(&msg.frame); | |
108 | ✗ | break; | |
109 | } | ||
110 | |||
111 | /* push the frame in the common queue */ | ||
112 | 60 | av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n", | |
113 | wd->id, i + 1, wd->workload, msg.frame); | ||
114 | 60 | ret = av_thread_message_queue_send(wd->queue, &msg, 0); | |
115 |
2/2✓ Branch 0 taken 10 times.
✓ Branch 1 taken 50 times.
|
60 | if (ret < 0) { |
116 | 10 | av_frame_free(&msg.frame); | |
117 | 10 | break; | |
118 | } | ||
119 | } | ||
120 | } | ||
121 | 10 | av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n", | |
122 | 10 | wd->id, av_err2str(ret)); | |
123 |
1/2✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
|
10 | av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF); |
124 | 10 | return NULL; | |
125 | } | ||
126 | |||
127 | 2 | static void *receiver_thread(void *arg) | |
128 | { | ||
129 | 2 | int i, ret = 0; | |
130 | 2 | struct receiver_data *rd = arg; | |
131 | |||
132 |
2/2✓ Branch 0 taken 42 times.
✓ Branch 1 taken 1 times.
|
43 | for (i = 0; i < rd->workload; i++) { |
133 |
2/2✓ Branch 1 taken 3 times.
✓ Branch 2 taken 39 times.
|
42 | if (rand() % rd->workload < rd->workload / 10) { |
134 | 3 | av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue, " | |
135 | "discarding %d message(s)\n", rd->id, | ||
136 | av_thread_message_queue_nb_elems(rd->queue)); | ||
137 | 3 | av_thread_message_flush(rd->queue); | |
138 | } else { | ||
139 | struct message msg; | ||
140 | AVDictionary *meta; | ||
141 | AVDictionaryEntry *e; | ||
142 | |||
143 | 39 | ret = av_thread_message_queue_recv(rd->queue, &msg, 0); | |
144 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 38 times.
|
39 | if (ret < 0) |
145 | 1 | break; | |
146 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 38 times.
|
38 | av_assert0(msg.magic == MAGIC); |
147 | 38 | meta = msg.frame->metadata; | |
148 | 38 | e = av_dict_get(meta, "sig", NULL, 0); | |
149 | 38 | av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame); | |
150 | 38 | av_frame_free(&msg.frame); | |
151 | } | ||
152 | } | ||
153 | |||
154 | 2 | av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i); | |
155 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
|
2 | av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF); |
156 | |||
157 | 2 | return NULL; | |
158 | } | ||
159 | |||
160 | 12 | static int get_workload(int minv, int maxv) | |
161 | { | ||
162 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | return maxv == minv ? maxv : rand() % (maxv - minv) + minv; |
163 | } | ||
164 | |||
165 | 1 | int main(int ac, char **av) | |
166 | { | ||
167 | 1 | int i, ret = 0; | |
168 | int max_queue_size; | ||
169 | int nb_senders, sender_min_load, sender_max_load; | ||
170 | int nb_receivers, receiver_min_load, receiver_max_load; | ||
171 | struct sender_data *senders; | ||
172 | struct receiver_data *receivers; | ||
173 | 1 | AVThreadMessageQueue *queue = NULL; | |
174 | |||
175 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (ac != 8) { |
176 | ✗ | av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> " | |
177 | "<nb_senders> <sender_min_send> <sender_max_send> " | ||
178 | "<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]); | ||
179 | ✗ | return 1; | |
180 | } | ||
181 | |||
182 | 1 | max_queue_size = atoi(av[1]); | |
183 | 1 | nb_senders = atoi(av[2]); | |
184 | 1 | sender_min_load = atoi(av[3]); | |
185 | 1 | sender_max_load = atoi(av[4]); | |
186 | 1 | nb_receivers = atoi(av[5]); | |
187 | 1 | receiver_min_load = atoi(av[6]); | |
188 | 1 | receiver_max_load = atoi(av[7]); | |
189 | |||
190 |
2/4✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | if (max_queue_size <= 0 || |
191 |
3/6✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
|
1 | nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 || |
192 |
2/4✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
|
1 | nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) { |
193 | ✗ | av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n"); | |
194 | ✗ | return 1; | |
195 | } | ||
196 | |||
197 | 1 | av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / " | |
198 | "%d receivers receiving [%d-%d]\n", max_queue_size, | ||
199 | nb_senders, sender_min_load, sender_max_load, | ||
200 | nb_receivers, receiver_min_load, receiver_max_load); | ||
201 | |||
202 | 1 | senders = av_calloc(nb_senders, sizeof(*senders)); | |
203 | 1 | receivers = av_calloc(nb_receivers, sizeof(*receivers)); | |
204 |
2/4✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
|
1 | if (!senders || !receivers) { |
205 | ✗ | ret = AVERROR(ENOMEM); | |
206 | ✗ | goto end; | |
207 | } | ||
208 | |||
209 | 1 | ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message)); | |
210 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (ret < 0) |
211 | ✗ | goto end; | |
212 | |||
213 | 1 | av_thread_message_queue_set_free_func(queue, free_frame); | |
214 | |||
215 | #define SPAWN_THREADS(type) do { \ | ||
216 | for (i = 0; i < nb_##type##s; i++) { \ | ||
217 | struct type##_data *td = &type##s[i]; \ | ||
218 | \ | ||
219 | td->id = i; \ | ||
220 | td->queue = queue; \ | ||
221 | td->workload = get_workload(type##_min_load, type##_max_load); \ | ||
222 | \ | ||
223 | ret = pthread_create(&td->tid, NULL, type##_thread, td); \ | ||
224 | if (ret) { \ | ||
225 | const int err = AVERROR(ret); \ | ||
226 | av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type) \ | ||
227 | " thread: %s\n", av_err2str(err)); \ | ||
228 | goto end; \ | ||
229 | } \ | ||
230 | } \ | ||
231 | } while (0) | ||
232 | |||
233 | #define WAIT_THREADS(type) do { \ | ||
234 | for (i = 0; i < nb_##type##s; i++) { \ | ||
235 | struct type##_data *td = &type##s[i]; \ | ||
236 | \ | ||
237 | ret = pthread_join(td->tid, NULL); \ | ||
238 | if (ret) { \ | ||
239 | const int err = AVERROR(ret); \ | ||
240 | av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type) \ | ||
241 | " thread: %s\n", av_err2str(err)); \ | ||
242 | goto end; \ | ||
243 | } \ | ||
244 | } \ | ||
245 | } while (0) | ||
246 | |||
247 |
3/4✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
✓ Branch 6 taken 2 times.
✓ Branch 7 taken 1 times.
|
3 | SPAWN_THREADS(receiver); |
248 |
3/4✗ Branch 2 not taken.
✓ Branch 3 taken 10 times.
✓ Branch 6 taken 10 times.
✓ Branch 7 taken 1 times.
|
11 | SPAWN_THREADS(sender); |
249 | |||
250 |
3/4✗ Branch 1 not taken.
✓ Branch 2 taken 10 times.
✓ Branch 5 taken 10 times.
✓ Branch 6 taken 1 times.
|
11 | WAIT_THREADS(sender); |
251 |
3/4✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
✓ Branch 5 taken 2 times.
✓ Branch 6 taken 1 times.
|
3 | WAIT_THREADS(receiver); |
252 | |||
253 | 1 | end: | |
254 | 1 | av_thread_message_queue_free(&queue); | |
255 | 1 | av_freep(&senders); | |
256 | 1 | av_freep(&receivers); | |
257 | |||
258 |
1/4✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
1 | if (ret < 0 && ret != AVERROR_EOF) { |
259 | ✗ | av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret)); | |
260 | ✗ | return 1; | |
261 | } | ||
262 | 1 | return 0; | |
263 | } | ||
264 |