FFmpeg coverage


Directory: ../../../ffmpeg/
File: src/libavformat/udp.c
Date: 2025-07-28 20:30:09
Exec Total Coverage
Lines: 0 550 0.0%
Functions: 0 19 0.0%
Branches: 0 371 0.0%

Line Branch Exec Source
1 /*
2 * UDP prototype streaming system
3 * Copyright (c) 2000, 2001, 2002 Fabrice Bellard
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 /**
23 * @file
24 * UDP protocol
25 */
26
27 #define _DEFAULT_SOURCE
28 #define _BSD_SOURCE /* Needed for using struct ip_mreq with recent glibc */
29
30 #include "avformat.h"
31 #include "libavutil/avassert.h"
32 #include "libavutil/mem.h"
33 #include "libavutil/parseutils.h"
34 #include "libavutil/fifo.h"
35 #include "libavutil/intreadwrite.h"
36 #include "libavutil/opt.h"
37 #include "libavutil/log.h"
38 #include "libavutil/time.h"
39 #include "network.h"
40 #include "os_support.h"
41 #include "url.h"
42 #include "ip.h"
43
44 #ifdef __APPLE__
45 #include "TargetConditionals.h"
46 #endif
47
48 #if HAVE_UDPLITE_H
49 #include "udplite.h"
50 #else
51 /* On many Linux systems, udplite.h is missing but the kernel supports UDP-Lite.
52 * So, we provide a fallback here.
53 */
54 #define UDPLITE_SEND_CSCOV 10
55 #define UDPLITE_RECV_CSCOV 11
56 #endif
57
58 #ifndef IPPROTO_UDPLITE
59 #define IPPROTO_UDPLITE 136
60 #endif
61
62 #if HAVE_W32THREADS
63 #undef HAVE_PTHREAD_CANCEL
64 #define HAVE_PTHREAD_CANCEL 1
65 #endif
66
67 #if HAVE_PTHREAD_CANCEL
68 #include "libavutil/thread.h"
69 #endif
70
71 #ifndef IPV6_ADD_MEMBERSHIP
72 #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
73 #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
74 #endif
75
76 #define UDP_TX_BUF_SIZE 32768
77 #define UDP_RX_BUF_SIZE 393216
78 #define UDP_MAX_PKT_SIZE 65536
79 #define UDP_HEADER_SIZE 8
80
81 typedef struct UDPQueuedPacketHeader {
82 int pkt_size;
83 struct sockaddr_storage addr;
84 socklen_t addr_len;
85 } UDPQueuedPacketHeader;
86
87 typedef struct UDPContext {
88 const AVClass *class;
89 int udp_fd;
90 int ttl;
91 int udplite_coverage;
92 int buffer_size;
93 int pkt_size;
94 int is_multicast;
95 int is_broadcast;
96 int local_port;
97 int reuse_socket;
98 int overrun_nonfatal;
99 struct sockaddr_storage dest_addr;
100 int dest_addr_len;
101 int is_connected;
102
103 /* Circular Buffer variables for use in UDP receive code */
104 int circular_buffer_size;
105 AVFifo *rx_fifo;
106 AVFifo *tx_fifo;
107 int circular_buffer_error;
108 int64_t bitrate; /* number of bits to send per second */
109 int64_t burst_bits;
110 int close_req;
111 #if HAVE_PTHREAD_CANCEL
112 pthread_t circular_buffer_thread;
113 pthread_mutex_t mutex;
114 pthread_cond_t cond;
115 int thread_started;
116 #endif
117 uint8_t tmp[UDP_MAX_PKT_SIZE + sizeof(UDPQueuedPacketHeader)];
118 int remaining_in_dg;
119 char *localaddr;
120 int timeout;
121 struct sockaddr_storage local_addr_storage;
122 char *sources;
123 char *block;
124 IPSourceFilters filters;
125 struct sockaddr_storage last_recv_addr;
126 socklen_t last_recv_addr_len;
127 } UDPContext;
128
129 #define OFFSET(x) offsetof(UDPContext, x)
130 #define D AV_OPT_FLAG_DECODING_PARAM
131 #define E AV_OPT_FLAG_ENCODING_PARAM
132 static const AVOption options[] = {
133 { "buffer_size", "System data size (in bytes)", OFFSET(buffer_size), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
134 { "bitrate", "Bits to send per second", OFFSET(bitrate), AV_OPT_TYPE_INT64, { .i64 = 0 }, 0, INT64_MAX, .flags = E },
135 { "burst_bits", "Max length of bursts in bits (when using bitrate)", OFFSET(burst_bits), AV_OPT_TYPE_INT64, { .i64 = 0 }, 0, INT64_MAX, .flags = E },
136 { "localport", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, D|E },
137 { "local_port", "Local port", OFFSET(local_port), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
138 { "localaddr", "Local address", OFFSET(localaddr), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E },
139 { "udplite_coverage", "choose UDPLite head size which should be validated by checksum", OFFSET(udplite_coverage), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E },
140 { "pkt_size", "Maximum UDP packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 1472 }, -1, INT_MAX, .flags = D|E },
141 { "reuse", "explicitly allow reusing UDP sockets", OFFSET(reuse_socket), AV_OPT_TYPE_BOOL, { .i64 = -1 }, -1, 1, D|E },
142 { "reuse_socket", "explicitly allow reusing UDP sockets", OFFSET(reuse_socket), AV_OPT_TYPE_BOOL, { .i64 = -1 }, -1, 1, .flags = D|E },
143 { "broadcast", "explicitly allow or disallow broadcast destination", OFFSET(is_broadcast), AV_OPT_TYPE_BOOL, { .i64 = 0 }, 0, 1, E },
144 { "ttl", "Time to live (multicast only)", OFFSET(ttl), AV_OPT_TYPE_INT, { .i64 = 16 }, 0, 255, E },
145 { "connect", "set if connect() should be called on socket", OFFSET(is_connected), AV_OPT_TYPE_BOOL, { .i64 = 0 }, 0, 1, .flags = D|E },
146 { "fifo_size", "set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
147 { "overrun_nonfatal", "survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, D },
148 { "timeout", "set raise error timeout, in microseconds (only in read mode)",OFFSET(timeout), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D },
149 { "sources", "Source list", OFFSET(sources), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E },
150 { "block", "Block list", OFFSET(block), AV_OPT_TYPE_STRING, { .str = NULL }, .flags = D|E },
151 { NULL }
152 };
153
154 static const AVClass udp_class = {
155 .class_name = "udp",
156 .item_name = av_default_item_name,
157 .option = options,
158 .version = LIBAVUTIL_VERSION_INT,
159 };
160
161 static const AVClass udplite_context_class = {
162 .class_name = "udplite",
163 .item_name = av_default_item_name,
164 .option = options,
165 .version = LIBAVUTIL_VERSION_INT,
166 };
167
168 static int udp_set_multicast_ttl(int sockfd, int mcastTTL,
169 struct sockaddr *addr,
170 void *logctx)
171 {
172 int protocol, cmd;
173
174 /* There is some confusion in the world whether IP_MULTICAST_TTL
175 * takes a byte or an int as an argument.
176 * BSD seems to indicate byte so we are going with that and use
177 * int and fall back to byte to be safe */
178 switch (addr->sa_family) {
179 #ifdef IP_MULTICAST_TTL
180 case AF_INET:
181 protocol = IPPROTO_IP;
182 cmd = IP_MULTICAST_TTL;
183 break;
184 #endif
185 #ifdef IPV6_MULTICAST_HOPS
186 case AF_INET6:
187 protocol = IPPROTO_IPV6;
188 cmd = IPV6_MULTICAST_HOPS;
189 break;
190 #endif
191 default:
192 return 0;
193 }
194
195 if (setsockopt(sockfd, protocol, cmd, &mcastTTL, sizeof(mcastTTL)) < 0) {
196 /* BSD compatibility */
197 unsigned char ttl = (unsigned char) mcastTTL;
198
199 ff_log_net_error(logctx, AV_LOG_DEBUG, "setsockopt(IPV4/IPV6 MULTICAST TTL)");
200 if (setsockopt(sockfd, protocol, cmd, &ttl, sizeof(ttl)) < 0) {
201 ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IPV4/IPV6 MULTICAST TTL)");
202 return ff_neterrno();
203 }
204 }
205
206 return 0;
207 }
208
209 static int udp_join_multicast_group(int sockfd, struct sockaddr *addr,
210 struct sockaddr *local_addr, void *logctx)
211 {
212 #ifdef IP_ADD_MEMBERSHIP
213 if (addr->sa_family == AF_INET) {
214 struct ip_mreq mreq;
215
216 mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
217 if (local_addr)
218 mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr;
219 else
220 mreq.imr_interface.s_addr = INADDR_ANY;
221 if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
222 ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IP_ADD_MEMBERSHIP)");
223 return ff_neterrno();
224 }
225 }
226 #endif
227 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
228 if (addr->sa_family == AF_INET6) {
229 struct ipv6_mreq mreq6;
230
231 memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
232 //TODO: Interface index should be looked up from local_addr
233 mreq6.ipv6mr_interface = 0;
234 if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
235 ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IPV6_ADD_MEMBERSHIP)");
236 return ff_neterrno();
237 }
238 }
239 #endif
240 return 0;
241 }
242
243 static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr,
244 struct sockaddr *local_addr, void *logctx)
245 {
246 #ifdef IP_DROP_MEMBERSHIP
247 if (addr->sa_family == AF_INET) {
248 struct ip_mreq mreq;
249
250 mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
251 if (local_addr)
252 mreq.imr_interface = ((struct sockaddr_in *)local_addr)->sin_addr;
253 else
254 mreq.imr_interface.s_addr = INADDR_ANY;
255 if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
256 ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IP_DROP_MEMBERSHIP)");
257 return -1;
258 }
259 }
260 #endif
261 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
262 if (addr->sa_family == AF_INET6) {
263 struct ipv6_mreq mreq6;
264
265 memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
266 //TODO: Interface index should be looked up from local_addr
267 mreq6.ipv6mr_interface = 0;
268 if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
269 ff_log_net_error(logctx, AV_LOG_ERROR, "setsockopt(IPV6_DROP_MEMBERSHIP)");
270 return -1;
271 }
272 }
273 #endif
274 return 0;
275 }
276
277 static int udp_set_multicast_sources(URLContext *h,
278 int sockfd, struct sockaddr *addr,
279 int addr_len, struct sockaddr_storage *local_addr,
280 struct sockaddr_storage *sources,
281 int nb_sources, int include)
282 {
283 int i;
284 if (addr->sa_family != AF_INET) {
285 #if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE)
286 /* For IPv4 prefer the old approach, as that alone works reliably on
287 * Windows and it also supports supplying the interface based on its
288 * address. */
289 int i;
290 for (i = 0; i < nb_sources; i++) {
291 struct group_source_req mreqs;
292 int level = addr->sa_family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6;
293
294 //TODO: Interface index should be looked up from local_addr
295 mreqs.gsr_interface = 0;
296 memcpy(&mreqs.gsr_group, addr, addr_len);
297 memcpy(&mreqs.gsr_source, &sources[i], sizeof(*sources));
298
299 if (setsockopt(sockfd, level,
300 include ? MCAST_JOIN_SOURCE_GROUP : MCAST_BLOCK_SOURCE,
301 (const void *)&mreqs, sizeof(mreqs)) < 0) {
302 if (include)
303 ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(MCAST_JOIN_SOURCE_GROUP)");
304 else
305 ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(MCAST_BLOCK_SOURCE)");
306 return ff_neterrno();
307 }
308 }
309 return 0;
310 #else
311 av_log(h, AV_LOG_ERROR,
312 "Setting multicast sources only supported for IPv4\n");
313 return AVERROR(EINVAL);
314 #endif
315 }
316 #if HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE)
317 for (i = 0; i < nb_sources; i++) {
318 struct ip_mreq_source mreqs;
319 if (sources[i].ss_family != AF_INET) {
320 av_log(h, AV_LOG_ERROR, "Source/block address %d is of incorrect protocol family\n", i + 1);
321 return AVERROR(EINVAL);
322 }
323
324 mreqs.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
325 if (local_addr)
326 mreqs.imr_interface = ((struct sockaddr_in *)local_addr)->sin_addr;
327 else
328 mreqs.imr_interface.s_addr = INADDR_ANY;
329 mreqs.imr_sourceaddr.s_addr = ((struct sockaddr_in *)&sources[i])->sin_addr.s_addr;
330
331 if (setsockopt(sockfd, IPPROTO_IP,
332 include ? IP_ADD_SOURCE_MEMBERSHIP : IP_BLOCK_SOURCE,
333 (const void *)&mreqs, sizeof(mreqs)) < 0) {
334 if (include)
335 ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)");
336 else
337 ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(IP_BLOCK_SOURCE)");
338 return ff_neterrno();
339 }
340 }
341 #else
342 return AVERROR(ENOSYS);
343 #endif
344 return 0;
345 }
346 static int udp_set_url(URLContext *h,
347 struct sockaddr_storage *addr,
348 const char *hostname, int port)
349 {
350 struct addrinfo *res0;
351 int addr_len;
352
353 res0 = ff_ip_resolve_host(h, hostname, port, SOCK_DGRAM, AF_UNSPEC, 0);
354 if (!res0) return AVERROR(EIO);
355 memcpy(addr, res0->ai_addr, res0->ai_addrlen);
356 addr_len = res0->ai_addrlen;
357 freeaddrinfo(res0);
358
359 return addr_len;
360 }
361
362 static int udp_socket_create(URLContext *h, struct sockaddr_storage *addr,
363 socklen_t *addr_len, const char *localaddr)
364 {
365 UDPContext *s = h->priv_data;
366 int udp_fd = -1;
367 struct addrinfo *res0, *res;
368 int family = AF_UNSPEC;
369
370 if (((struct sockaddr *) &s->dest_addr)->sa_family)
371 family = ((struct sockaddr *) &s->dest_addr)->sa_family;
372 res0 = ff_ip_resolve_host(h, (localaddr && localaddr[0]) ? localaddr : NULL,
373 s->local_port,
374 SOCK_DGRAM, family, AI_PASSIVE);
375 if (!res0)
376 goto fail;
377 for (res = res0; res; res=res->ai_next) {
378 if (s->udplite_coverage)
379 udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, IPPROTO_UDPLITE, h);
380 else
381 udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, 0, h);
382 if (udp_fd != -1) break;
383 ff_log_net_error(h, AV_LOG_ERROR, "socket");
384 }
385
386 if (udp_fd < 0)
387 goto fail;
388
389 memcpy(addr, res->ai_addr, res->ai_addrlen);
390 *addr_len = res->ai_addrlen;
391
392 freeaddrinfo(res0);
393
394 return udp_fd;
395
396 fail:
397 if (udp_fd >= 0)
398 closesocket(udp_fd);
399 if(res0)
400 freeaddrinfo(res0);
401 return -1;
402 }
403
404 static int udp_port(struct sockaddr_storage *addr, int addr_len)
405 {
406 char sbuf[sizeof(int)*3+1];
407 int error;
408
409 if ((error = getnameinfo((struct sockaddr *)addr, addr_len, NULL, 0, sbuf, sizeof(sbuf), NI_NUMERICSERV)) != 0) {
410 av_log(NULL, AV_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(error));
411 return -1;
412 }
413
414 return strtol(sbuf, NULL, 10);
415 }
416
417
418 /**
419 * If no filename is given to av_open_input_file because you want to
420 * get the local port first, then you must call this function to set
421 * the remote server address.
422 *
423 * url syntax: udp://host:port[?option=val...]
424 * option: 'ttl=n' : set the ttl value (for multicast only)
425 * 'localport=n' : set the local port
426 * 'pkt_size=n' : set max packet size
427 * 'reuse=1' : enable reusing the socket
428 * 'overrun_nonfatal=1': survive in case of circular buffer overrun
429 *
430 * @param h media file context
431 * @param uri of the remote server
432 * @return zero if no error.
433 */
434 int ff_udp_set_remote_url(URLContext *h, const char *uri)
435 {
436 UDPContext *s = h->priv_data;
437 char hostname[256], buf[10];
438 int port;
439 const char *p;
440
441 av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
442
443 /* set the destination address */
444 s->dest_addr_len = udp_set_url(h, &s->dest_addr, hostname, port);
445 if (s->dest_addr_len < 0) {
446 return AVERROR(EIO);
447 }
448 s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr);
449 p = strchr(uri, '?');
450 if (p) {
451 if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
452 int was_connected = s->is_connected;
453 s->is_connected = strtol(buf, NULL, 10);
454 if (s->is_connected && !was_connected) {
455 if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr,
456 s->dest_addr_len)) {
457 s->is_connected = 0;
458 ff_log_net_error(h, AV_LOG_ERROR, "connect");
459 return AVERROR(EIO);
460 }
461 }
462 }
463 }
464
465 return 0;
466 }
467
468 /**
469 * This function is identical to ff_udp_set_remote_url, except that it takes a sockaddr directly.
470 */
471 int ff_udp_set_remote_addr(URLContext *h, const struct sockaddr *dest_addr, socklen_t dest_addr_len, int do_connect)
472 {
473 UDPContext *s = h->priv_data;
474
475 /* set the destination address */
476 if ((size_t)dest_addr_len > sizeof(s->dest_addr))
477 return AVERROR(EIO);
478 s->dest_addr_len = dest_addr_len;
479 memcpy(&s->dest_addr, dest_addr, dest_addr_len);
480
481 s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr);
482 if (do_connect >= 0) {
483 int was_connected = s->is_connected;
484 s->is_connected = do_connect;
485 if (s->is_connected && !was_connected) {
486 if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr,
487 s->dest_addr_len)) {
488 s->is_connected = 0;
489 ff_log_net_error(h, AV_LOG_ERROR, "connect");
490 return AVERROR(EIO);
491 }
492 }
493 }
494
495 return 0;
496 }
497
498 /**
499 * Return the local port used by the UDP connection
500 * @param h media file context
501 * @return the local port number
502 */
503 int ff_udp_get_local_port(URLContext *h)
504 {
505 UDPContext *s = h->priv_data;
506 return s->local_port;
507 }
508
509 void ff_udp_get_last_recv_addr(URLContext *h, struct sockaddr_storage *addr, socklen_t *addr_len)
510 {
511 UDPContext *s = h->priv_data;
512 *addr = s->last_recv_addr;
513 *addr_len = s->last_recv_addr_len;
514 }
515
516 /**
517 * Return the udp file handle for select() usage to wait for several RTP
518 * streams at the same time.
519 * @param h media file context
520 */
521 static int udp_get_file_handle(URLContext *h)
522 {
523 UDPContext *s = h->priv_data;
524 return s->udp_fd;
525 }
526
527 #if HAVE_PTHREAD_CANCEL
528 static void *circular_buffer_task_rx( void *_URLContext)
529 {
530 URLContext *h = _URLContext;
531 UDPContext *s = h->priv_data;
532 int old_cancelstate;
533
534 ff_thread_setname("udp-rx");
535
536 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
537 pthread_mutex_lock(&s->mutex);
538 if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
539 av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
540 s->circular_buffer_error = AVERROR(EIO);
541 goto end;
542 }
543 while(1) {
544 UDPQueuedPacketHeader pkt_header;
545
546 pthread_mutex_unlock(&s->mutex);
547 /* Blocking operations are always cancellation points;
548 see "General Information" / "Thread Cancelation Overview"
549 in Single Unix. */
550 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
551 pkt_header.pkt_size = recvfrom(s->udp_fd, s->tmp + sizeof(pkt_header), sizeof(s->tmp) - sizeof(pkt_header), 0, (struct sockaddr *)&pkt_header.addr, &pkt_header.addr_len);
552 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
553 pthread_mutex_lock(&s->mutex);
554 if (pkt_header.pkt_size < 0) {
555 if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
556 s->circular_buffer_error = ff_neterrno();
557 goto end;
558 }
559 continue;
560 }
561 if (ff_ip_check_source_lists(&pkt_header.addr, &s->filters))
562 continue;
563 memcpy(s->tmp, &pkt_header, sizeof(pkt_header));
564
565 if (av_fifo_can_write(s->rx_fifo) < pkt_header.pkt_size + sizeof(pkt_header)) {
566 /* No Space left */
567 if (s->overrun_nonfatal) {
568 av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
569 "Surviving due to overrun_nonfatal option\n");
570 continue;
571 } else {
572 av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
573 "To avoid, increase fifo_size URL option. "
574 "To survive in such case, use overrun_nonfatal option\n");
575 s->circular_buffer_error = AVERROR(EIO);
576 goto end;
577 }
578 }
579 av_fifo_write(s->rx_fifo, s->tmp, pkt_header.pkt_size + sizeof(pkt_header));
580 pthread_cond_signal(&s->cond);
581 }
582
583 end:
584 pthread_cond_signal(&s->cond);
585 pthread_mutex_unlock(&s->mutex);
586 return NULL;
587 }
588
589 static void *circular_buffer_task_tx( void *_URLContext)
590 {
591 URLContext *h = _URLContext;
592 UDPContext *s = h->priv_data;
593 int64_t target_timestamp = av_gettime_relative();
594 int64_t start_timestamp = av_gettime_relative();
595 int64_t sent_bits = 0;
596 int64_t burst_interval = s->bitrate ? (s->burst_bits * 1000000 / s->bitrate) : 0;
597 int64_t max_delay = s->bitrate ? ((int64_t)h->max_packet_size * 8 * 1000000 / s->bitrate + 1) : 0;
598
599 ff_thread_setname("udp-tx");
600
601 pthread_mutex_lock(&s->mutex);
602
603 if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
604 av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
605 s->circular_buffer_error = AVERROR(EIO);
606 goto end;
607 }
608
609 for(;;) {
610 int len;
611 const uint8_t *p;
612 uint8_t tmp[4];
613 int64_t timestamp;
614
615 len = av_fifo_can_read(s->tx_fifo);
616
617 while (len<4) {
618 if (s->close_req)
619 goto end;
620 pthread_cond_wait(&s->cond, &s->mutex);
621 len = av_fifo_can_read(s->tx_fifo);
622 }
623
624 av_fifo_read(s->tx_fifo, tmp, 4);
625 len = AV_RL32(tmp);
626
627 av_assert0(len >= 0);
628 av_assert0(len <= sizeof(s->tmp));
629
630 av_fifo_read(s->tx_fifo, s->tmp, len);
631
632 pthread_mutex_unlock(&s->mutex);
633
634 if (s->bitrate) {
635 timestamp = av_gettime_relative();
636 if (timestamp < target_timestamp) {
637 int64_t delay = target_timestamp - timestamp;
638 if (delay > max_delay) {
639 delay = max_delay;
640 start_timestamp = timestamp + delay;
641 sent_bits = 0;
642 }
643 av_usleep(delay);
644 } else {
645 if (timestamp - burst_interval > target_timestamp) {
646 start_timestamp = timestamp - burst_interval;
647 sent_bits = 0;
648 }
649 }
650 sent_bits += len * 8;
651 target_timestamp = start_timestamp + sent_bits * 1000000 / s->bitrate;
652 }
653
654 p = s->tmp;
655 while (len) {
656 int ret;
657 av_assert0(len > 0);
658 if (!s->is_connected) {
659 ret = sendto (s->udp_fd, p, len, 0,
660 (struct sockaddr *) &s->dest_addr,
661 s->dest_addr_len);
662 } else
663 ret = send(s->udp_fd, p, len, 0);
664 if (ret >= 0) {
665 len -= ret;
666 p += ret;
667 } else {
668 ret = ff_neterrno();
669 if (ret != AVERROR(EAGAIN) && ret != AVERROR(EINTR)) {
670 pthread_mutex_lock(&s->mutex);
671 s->circular_buffer_error = ret;
672 pthread_mutex_unlock(&s->mutex);
673 return NULL;
674 }
675 }
676 }
677
678 pthread_mutex_lock(&s->mutex);
679 }
680
681 end:
682 pthread_mutex_unlock(&s->mutex);
683 return NULL;
684 }
685
686
687 #endif
688
689 /* put it in UDP context */
690 /* return non zero if error */
691 static int udp_open(URLContext *h, const char *uri, int flags)
692 {
693 char hostname[1024];
694 int port, udp_fd = -1, tmp, bind_ret = -1, dscp = -1;
695 UDPContext *s = h->priv_data;
696 int is_output;
697 const char *p;
698 char buf[256];
699 struct sockaddr_storage my_addr;
700 socklen_t len;
701 int ret;
702
703 h->is_streamed = 1;
704
705 is_output = !(flags & AVIO_FLAG_READ);
706 if (s->buffer_size < 0)
707 s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_RX_BUF_SIZE;
708
709 if (s->sources) {
710 if ((ret = ff_ip_parse_sources(h, s->sources, &s->filters)) < 0)
711 goto fail;
712 }
713
714 if (s->block) {
715 if ((ret = ff_ip_parse_blocks(h, s->block, &s->filters)) < 0)
716 goto fail;
717 }
718
719 p = strchr(uri, '?');
720 if (p) {
721 if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) {
722 char *endptr = NULL;
723 s->reuse_socket = strtol(buf, &endptr, 10);
724 /* assume if no digits were found it is a request to enable it */
725 if (buf == endptr)
726 s->reuse_socket = 1;
727 }
728 if (av_find_info_tag(buf, sizeof(buf), "overrun_nonfatal", p)) {
729 char *endptr = NULL;
730 s->overrun_nonfatal = strtol(buf, &endptr, 10);
731 /* assume if no digits were found it is a request to enable it */
732 if (buf == endptr)
733 s->overrun_nonfatal = 1;
734 if (!HAVE_PTHREAD_CANCEL)
735 av_log(h, AV_LOG_WARNING,
736 "'overrun_nonfatal' option was set but it is not supported "
737 "on this build (pthread support is required)\n");
738 }
739 if (av_find_info_tag(buf, sizeof(buf), "ttl", p)) {
740 s->ttl = strtol(buf, NULL, 10);
741 if (s->ttl < 0 || s->ttl > 255) {
742 av_log(h, AV_LOG_ERROR, "ttl(%d) should be in range [0,255]\n", s->ttl);
743 ret = AVERROR(EINVAL);
744 goto fail;
745 }
746 }
747 if (av_find_info_tag(buf, sizeof(buf), "udplite_coverage", p)) {
748 s->udplite_coverage = strtol(buf, NULL, 10);
749 }
750 if (av_find_info_tag(buf, sizeof(buf), "localport", p)) {
751 s->local_port = strtol(buf, NULL, 10);
752 }
753 if (av_find_info_tag(buf, sizeof(buf), "pkt_size", p)) {
754 s->pkt_size = strtol(buf, NULL, 10);
755 }
756 if (av_find_info_tag(buf, sizeof(buf), "buffer_size", p)) {
757 s->buffer_size = strtol(buf, NULL, 10);
758 }
759 if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
760 s->is_connected = strtol(buf, NULL, 10);
761 }
762 if (av_find_info_tag(buf, sizeof(buf), "dscp", p)) {
763 dscp = strtol(buf, NULL, 10);
764 }
765 if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
766 s->circular_buffer_size = strtol(buf, NULL, 10);
767 if (!HAVE_PTHREAD_CANCEL)
768 av_log(h, AV_LOG_WARNING,
769 "'circular_buffer_size' option was set but it is not supported "
770 "on this build (pthread support is required)\n");
771 }
772 if (av_find_info_tag(buf, sizeof(buf), "bitrate", p)) {
773 s->bitrate = strtoll(buf, NULL, 10);
774 if (!HAVE_PTHREAD_CANCEL)
775 av_log(h, AV_LOG_WARNING,
776 "'bitrate' option was set but it is not supported "
777 "on this build (pthread support is required)\n");
778 }
779 if (av_find_info_tag(buf, sizeof(buf), "burst_bits", p)) {
780 s->burst_bits = strtoll(buf, NULL, 10);
781 }
782 if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
783 av_freep(&s->localaddr);
784 s->localaddr = av_strdup(buf);
785 if (!s->localaddr) {
786 ret = AVERROR(ENOMEM);
787 goto fail;
788 }
789 }
790 if (av_find_info_tag(buf, sizeof(buf), "sources", p)) {
791 if ((ret = ff_ip_parse_sources(h, buf, &s->filters)) < 0)
792 goto fail;
793 }
794 if (av_find_info_tag(buf, sizeof(buf), "block", p)) {
795 if ((ret = ff_ip_parse_blocks(h, buf, &s->filters)) < 0)
796 goto fail;
797 }
798 if (!is_output && av_find_info_tag(buf, sizeof(buf), "timeout", p))
799 s->timeout = strtol(buf, NULL, 10);
800 if (is_output && av_find_info_tag(buf, sizeof(buf), "broadcast", p))
801 s->is_broadcast = strtol(buf, NULL, 10);
802 }
803 /* handling needed to support options picking from both AVOption and URL */
804 s->circular_buffer_size *= 188;
805 if (flags & AVIO_FLAG_WRITE) {
806 h->max_packet_size = s->pkt_size;
807 } else {
808 h->max_packet_size = UDP_MAX_PKT_SIZE;
809 }
810 h->rw_timeout = s->timeout;
811
812 /* fill the dest addr */
813 av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
814
815 /* XXX: fix av_url_split */
816 if (hostname[0] == '\0' || hostname[0] == '?') {
817 /* only accepts null hostname if input */
818 if (!(flags & AVIO_FLAG_READ)) {
819 ret = AVERROR(EINVAL);
820 goto fail;
821 }
822 } else {
823 if ((ret = ff_udp_set_remote_url(h, uri)) < 0)
824 goto fail;
825 }
826
827 if ((s->is_multicast || s->local_port < 0) && (h->flags & AVIO_FLAG_READ))
828 s->local_port = port;
829
830 udp_fd = udp_socket_create(h, &my_addr, &len, s->localaddr);
831 if (udp_fd < 0) {
832 ret = AVERROR(EIO);
833 goto fail;
834 }
835
836 s->local_addr_storage=my_addr; //store for future multicast join
837
838 /* Follow the requested reuse option, unless it's multicast in which
839 * case enable reuse unless explicitly disabled.
840 */
841 if (s->reuse_socket > 0 || (s->is_multicast && s->reuse_socket < 0)) {
842 s->reuse_socket = 1;
843 if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)) != 0) {
844 ret = ff_neterrno();
845 goto fail;
846 }
847 }
848
849 if (s->is_broadcast) {
850 #ifdef SO_BROADCAST
851 if (setsockopt (udp_fd, SOL_SOCKET, SO_BROADCAST, &(s->is_broadcast), sizeof(s->is_broadcast)) != 0) {
852 ret = ff_neterrno();
853 goto fail;
854 }
855 #else
856 ret = AVERROR(ENOSYS);
857 goto fail;
858 #endif
859 }
860
861 /* Set the checksum coverage for UDP-Lite (RFC 3828) for sending and receiving.
862 * The receiver coverage has to be less than or equal to the sender coverage.
863 * Otherwise, the receiver will drop all packets.
864 */
865 if (s->udplite_coverage) {
866 if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_SEND_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0)
867 av_log(h, AV_LOG_WARNING, "socket option UDPLITE_SEND_CSCOV not available");
868
869 if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_RECV_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0)
870 av_log(h, AV_LOG_WARNING, "socket option UDPLITE_RECV_CSCOV not available");
871 }
872
873 if (dscp >= 0) {
874 dscp <<= 2;
875 if (setsockopt (udp_fd, IPPROTO_IP, IP_TOS, &dscp, sizeof(dscp)) != 0) {
876 ret = ff_neterrno();
877 goto fail;
878 }
879 }
880
881 /* If multicast, try binding the multicast address first, to avoid
882 * receiving UDP packets from other sources aimed at the same UDP
883 * port. This fails on windows. This makes sending to the same address
884 * using sendto() fail, so only do it if we're opened in read-only mode. */
885 if (s->is_multicast && (h->flags & AVIO_FLAG_READ)) {
886 bind_ret = bind(udp_fd,(struct sockaddr *)&s->dest_addr, len);
887 }
888 /* bind to the local address if not multicast or if the multicast
889 * bind failed */
890 /* the bind is needed to give a port to the socket now */
891 if (bind_ret < 0 && bind(udp_fd,(struct sockaddr *)&my_addr, len) < 0) {
892 ff_log_net_error(h, AV_LOG_ERROR, "bind failed");
893 ret = ff_neterrno();
894 goto fail;
895 }
896
897 len = sizeof(my_addr);
898 getsockname(udp_fd, (struct sockaddr *)&my_addr, &len);
899 s->local_port = udp_port(&my_addr, len);
900
901 if (s->is_multicast) {
902 if (h->flags & AVIO_FLAG_WRITE) {
903 /* output */
904 if ((ret = udp_set_multicast_ttl(udp_fd, s->ttl, (struct sockaddr *)&s->dest_addr, h)) < 0)
905 goto fail;
906 }
907 if (h->flags & AVIO_FLAG_READ) {
908 /* input */
909 if (s->filters.nb_include_addrs) {
910 if ((ret = udp_set_multicast_sources(h, udp_fd,
911 (struct sockaddr *)&s->dest_addr,
912 s->dest_addr_len, &s->local_addr_storage,
913 s->filters.include_addrs,
914 s->filters.nb_include_addrs, 1)) < 0)
915 goto fail;
916 } else {
917 if ((ret = udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr,
918 (struct sockaddr *)&s->local_addr_storage, h)) < 0)
919 goto fail;
920 }
921 if (s->filters.nb_exclude_addrs) {
922 if ((ret = udp_set_multicast_sources(h, udp_fd,
923 (struct sockaddr *)&s->dest_addr,
924 s->dest_addr_len, &s->local_addr_storage,
925 s->filters.exclude_addrs,
926 s->filters.nb_exclude_addrs, 0)) < 0)
927 goto fail;
928 }
929 }
930 }
931
932 if (is_output) {
933 /* limit the tx buf size to limit latency */
934 tmp = s->buffer_size;
935 if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &tmp, sizeof(tmp)) < 0) {
936 ff_log_net_error(h, AV_LOG_ERROR, "setsockopt(SO_SNDBUF)");
937 ret = ff_neterrno();
938 goto fail;
939 }
940 } else {
941 /* set udp recv buffer size to the requested value (default UDP_RX_BUF_SIZE) */
942 tmp = s->buffer_size;
943 if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, sizeof(tmp)) < 0) {
944 ff_log_net_error(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF)");
945 }
946 len = sizeof(tmp);
947 if (getsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, &len) < 0) {
948 ff_log_net_error(h, AV_LOG_WARNING, "getsockopt(SO_RCVBUF)");
949 } else {
950 av_log(h, AV_LOG_DEBUG, "end receive buffer size reported is %d\n", tmp);
951 if(tmp < s->buffer_size)
952 av_log(h, AV_LOG_WARNING, "attempted to set receive buffer to size %d but it only ended up set as %d\n", s->buffer_size, tmp);
953 }
954
955 /* make the socket non-blocking */
956 ff_socket_nonblock(udp_fd, 1);
957 }
958 if (s->is_connected) {
959 if (connect(udp_fd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len)) {
960 ff_log_net_error(h, AV_LOG_ERROR, "connect");
961 ret = ff_neterrno();
962 goto fail;
963 }
964 }
965
966 s->udp_fd = udp_fd;
967
968 #if HAVE_PTHREAD_CANCEL
969 /*
970 Create thread in case of:
971 1. Input and circular_buffer_size is set
972 2. Output and bitrate and circular_buffer_size is set
973 */
974
975 if (is_output && s->bitrate && !s->circular_buffer_size) {
976 /* Warn user in case of 'circular_buffer_size' is not set */
977 av_log(h, AV_LOG_WARNING,"'bitrate' option was set but 'circular_buffer_size' is not, but required\n");
978 }
979
980 if ((!is_output && s->circular_buffer_size) || (is_output && s->bitrate && s->circular_buffer_size)) {
981 /* start the task going */
982 AVFifo *fifo = av_fifo_alloc2(s->circular_buffer_size, 1, 0);
983 if (!fifo) {
984 ret = AVERROR(ENOMEM);
985 goto fail;
986 }
987 if (is_output)
988 s->tx_fifo = fifo;
989 else
990 s->rx_fifo = fifo;
991 ret = pthread_mutex_init(&s->mutex, NULL);
992 if (ret != 0) {
993 av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
994 ret = AVERROR(ret);
995 goto fail;
996 }
997 ret = pthread_cond_init(&s->cond, NULL);
998 if (ret != 0) {
999 av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
1000 ret = AVERROR(ret);
1001 goto cond_fail;
1002 }
1003 ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h);
1004 if (ret != 0) {
1005 av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
1006 ret = AVERROR(ret);
1007 goto thread_fail;
1008 }
1009 s->thread_started = 1;
1010 }
1011 #endif
1012
1013 return 0;
1014 #if HAVE_PTHREAD_CANCEL
1015 thread_fail:
1016 pthread_cond_destroy(&s->cond);
1017 cond_fail:
1018 pthread_mutex_destroy(&s->mutex);
1019 #endif
1020 fail:
1021 if (udp_fd >= 0)
1022 closesocket(udp_fd);
1023 av_fifo_freep2(&s->rx_fifo);
1024 av_fifo_freep2(&s->tx_fifo);
1025 ff_ip_reset_filters(&s->filters);
1026 return ret;
1027 }
1028
1029 static int udplite_open(URLContext *h, const char *uri, int flags)
1030 {
1031 UDPContext *s = h->priv_data;
1032
1033 // set default checksum coverage
1034 s->udplite_coverage = UDP_HEADER_SIZE;
1035
1036 return udp_open(h, uri, flags);
1037 }
1038
1039 static int udp_read(URLContext *h, uint8_t *buf, int size)
1040 {
1041 UDPContext *s = h->priv_data;
1042 int ret;
1043 #if HAVE_PTHREAD_CANCEL
1044 int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
1045
1046 if (s->rx_fifo) {
1047 pthread_mutex_lock(&s->mutex);
1048 do {
1049 avail = av_fifo_can_read(s->rx_fifo);
1050 if (avail) { // >=size) {
1051 UDPQueuedPacketHeader header;
1052
1053 av_fifo_read(s->rx_fifo, &header, sizeof(header));
1054
1055 s->last_recv_addr = header.addr;
1056 s->last_recv_addr_len = header.addr_len;
1057
1058 avail = header.pkt_size;
1059 if(avail > size){
1060 av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
1061 avail = size;
1062 }
1063
1064 av_fifo_read(s->rx_fifo, buf, avail);
1065 av_fifo_drain2(s->rx_fifo, header.pkt_size - avail);
1066 pthread_mutex_unlock(&s->mutex);
1067 return avail;
1068 } else if(s->circular_buffer_error){
1069 int err = s->circular_buffer_error;
1070 pthread_mutex_unlock(&s->mutex);
1071 return err;
1072 } else if(nonblock) {
1073 pthread_mutex_unlock(&s->mutex);
1074 return AVERROR(EAGAIN);
1075 } else {
1076 /* FIXME: using the monotonic clock would be better,
1077 but it does not exist on all supported platforms. */
1078 int64_t t = av_gettime() + 100000;
1079 struct timespec tv = { .tv_sec = t / 1000000,
1080 .tv_nsec = (t % 1000000) * 1000 };
1081 int err = pthread_cond_timedwait(&s->cond, &s->mutex, &tv);
1082 if (err) {
1083 pthread_mutex_unlock(&s->mutex);
1084 return AVERROR(err == ETIMEDOUT ? EAGAIN : err);
1085 }
1086 nonblock = 1;
1087 }
1088 } while(1);
1089 }
1090 #endif
1091
1092 if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
1093 ret = ff_network_wait_fd(s->udp_fd, 0);
1094 if (ret < 0)
1095 return ret;
1096 }
1097 s->last_recv_addr_len = sizeof(s->last_recv_addr);
1098 ret = recvfrom(s->udp_fd, buf, size, 0, (struct sockaddr *)&s->last_recv_addr, &s->last_recv_addr_len);
1099 if (ret < 0)
1100 return ff_neterrno();
1101 if (ff_ip_check_source_lists(&s->last_recv_addr, &s->filters))
1102 return AVERROR(EINTR);
1103 return ret;
1104 }
1105
1106 static int udp_write(URLContext *h, const uint8_t *buf, int size)
1107 {
1108 UDPContext *s = h->priv_data;
1109 int ret;
1110
1111 #if HAVE_PTHREAD_CANCEL
1112 if (s->tx_fifo) {
1113 uint8_t tmp[4];
1114
1115 pthread_mutex_lock(&s->mutex);
1116
1117 /*
1118 Return error if last tx failed.
1119 Here we can't know on which packet error was, but it needs to know that error exists.
1120 */
1121 if (s->circular_buffer_error<0) {
1122 int err = s->circular_buffer_error;
1123 pthread_mutex_unlock(&s->mutex);
1124 return err;
1125 }
1126
1127 if (av_fifo_can_write(s->tx_fifo) < size + 4) {
1128 /* What about a partial packet tx ? */
1129 pthread_mutex_unlock(&s->mutex);
1130 return AVERROR(ENOMEM);
1131 }
1132 AV_WL32(tmp, size);
1133 av_fifo_write(s->tx_fifo, tmp, 4); /* size of packet */
1134 av_fifo_write(s->tx_fifo, buf, size); /* the data */
1135 pthread_cond_signal(&s->cond);
1136 pthread_mutex_unlock(&s->mutex);
1137 return size;
1138 }
1139 #endif
1140 if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
1141 ret = ff_network_wait_fd(s->udp_fd, 1);
1142 if (ret < 0)
1143 return ret;
1144 }
1145
1146 if (!s->is_connected) {
1147 ret = sendto (s->udp_fd, buf, size, 0,
1148 (struct sockaddr *) &s->dest_addr,
1149 s->dest_addr_len);
1150 } else
1151 ret = send(s->udp_fd, buf, size, 0);
1152
1153 return ret < 0 ? ff_neterrno() : ret;
1154 }
1155
1156 static int udp_close(URLContext *h)
1157 {
1158 UDPContext *s = h->priv_data;
1159
1160 #if HAVE_PTHREAD_CANCEL
1161 // Request close once writing is finished
1162 if (s->thread_started && !(h->flags & AVIO_FLAG_READ)) {
1163 pthread_mutex_lock(&s->mutex);
1164 s->close_req = 1;
1165 pthread_cond_signal(&s->cond);
1166 pthread_mutex_unlock(&s->mutex);
1167 }
1168 #endif
1169
1170 if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
1171 udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr,
1172 (struct sockaddr *)&s->local_addr_storage, h);
1173 #if HAVE_PTHREAD_CANCEL
1174 if (s->thread_started) {
1175 int ret;
1176 // Cancel only read, as write has been signaled as success to the user
1177 if (h->flags & AVIO_FLAG_READ) {
1178 #ifdef _WIN32
1179 /* recvfrom() is not a cancellation point for win32, so we shutdown
1180 * the socket and abort pending IO, subsequent recvfrom() calls
1181 * will fail with WSAESHUTDOWN causing the thread to exit. */
1182 shutdown(s->udp_fd, SD_RECEIVE);
1183 CancelIoEx((HANDLE)(SOCKET)s->udp_fd, NULL);
1184 #else
1185 pthread_cancel(s->circular_buffer_thread);
1186 #endif
1187 }
1188 ret = pthread_join(s->circular_buffer_thread, NULL);
1189 if (ret != 0)
1190 av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
1191 pthread_mutex_destroy(&s->mutex);
1192 pthread_cond_destroy(&s->cond);
1193 }
1194 #endif
1195 closesocket(s->udp_fd);
1196 av_fifo_freep2(&s->rx_fifo);
1197 av_fifo_freep2(&s->tx_fifo);
1198 ff_ip_reset_filters(&s->filters);
1199 return 0;
1200 }
1201
1202 const URLProtocol ff_udp_protocol = {
1203 .name = "udp",
1204 .url_open = udp_open,
1205 .url_read = udp_read,
1206 .url_write = udp_write,
1207 .url_close = udp_close,
1208 .url_get_file_handle = udp_get_file_handle,
1209 .priv_data_size = sizeof(UDPContext),
1210 .priv_data_class = &udp_class,
1211 .flags = URL_PROTOCOL_FLAG_NETWORK,
1212 };
1213
1214 const URLProtocol ff_udplite_protocol = {
1215 .name = "udplite",
1216 .url_open = udplite_open,
1217 .url_read = udp_read,
1218 .url_write = udp_write,
1219 .url_close = udp_close,
1220 .url_get_file_handle = udp_get_file_handle,
1221 .priv_data_size = sizeof(UDPContext),
1222 .priv_data_class = &udplite_context_class,
1223 .flags = URL_PROTOCOL_FLAG_NETWORK,
1224 };
1225