Line | Branch | Exec | Source |
---|---|---|---|
1 | /* | ||
2 | * FIFO pseudo-muxer | ||
3 | * Copyright (c) 2016 Jan Sebechlebsky | ||
4 | * | ||
5 | * This file is part of FFmpeg. | ||
6 | * | ||
7 | * FFmpeg is free software; you can redistribute it and/or | ||
8 | * modify it under the terms of the GNU Lesser General Public License | ||
9 | * as published by the Free Software Foundation; either | ||
10 | * version 2.1 of the License, or (at your option) any later version. | ||
11 | * | ||
12 | * FFmpeg is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
15 | * GNU Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public License | ||
18 | * along with FFmpeg; if not, write to the Free Software * Foundation, Inc., | ||
19 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | ||
20 | */ | ||
21 | |||
22 | #include <stdatomic.h> | ||
23 | |||
24 | #include "libavutil/avassert.h" | ||
25 | #include "libavutil/opt.h" | ||
26 | #include "libavutil/time.h" | ||
27 | #include "libavutil/thread.h" | ||
28 | #include "libavutil/threadmessage.h" | ||
29 | #include "avformat.h" | ||
30 | #include "internal.h" | ||
31 | #include "mux.h" | ||
32 | |||
33 | #define FIFO_DEFAULT_QUEUE_SIZE 60 | ||
34 | #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 0 | ||
35 | #define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds | ||
36 | |||
37 | typedef struct FifoContext { | ||
38 | const AVClass *class; | ||
39 | AVFormatContext *avf; | ||
40 | |||
41 | char *format; | ||
42 | AVDictionary *format_options; | ||
43 | |||
44 | int queue_size; | ||
45 | AVThreadMessageQueue *queue; | ||
46 | |||
47 | pthread_t writer_thread; | ||
48 | |||
49 | /* Return value of last write_trailer_call */ | ||
50 | int write_trailer_ret; | ||
51 | |||
52 | /* Time to wait before next recovery attempt | ||
53 | * This can refer to the time in processed stream, | ||
54 | * or real time. */ | ||
55 | int64_t recovery_wait_time; | ||
56 | |||
57 | /* Maximal number of unsuccessful successive recovery attempts */ | ||
58 | int max_recovery_attempts; | ||
59 | |||
60 | /* Whether to attempt recovery from failure */ | ||
61 | int attempt_recovery; | ||
62 | |||
63 | /* If >0 stream time will be used when waiting | ||
64 | * for the recovery attempt instead of real time */ | ||
65 | int recovery_wait_streamtime; | ||
66 | |||
67 | /* If >0 recovery will be attempted regardless of error code | ||
68 | * (except AVERROR_EXIT, so exit request is never ignored) */ | ||
69 | int recover_any_error; | ||
70 | |||
71 | /* Whether to drop packets in case the queue is full. */ | ||
72 | int drop_pkts_on_overflow; | ||
73 | |||
74 | /* Whether to wait for keyframe when recovering | ||
75 | * from failure or queue overflow */ | ||
76 | int restart_with_keyframe; | ||
77 | |||
78 | pthread_mutex_t overflow_flag_lock; | ||
79 | int overflow_flag_lock_initialized; | ||
80 | /* Value > 0 signals queue overflow */ | ||
81 | volatile uint8_t overflow_flag; | ||
82 | |||
83 | atomic_int_least64_t queue_duration; | ||
84 | int64_t last_sent_dts; | ||
85 | int64_t timeshift; | ||
86 | } FifoContext; | ||
87 | |||
88 | typedef struct FifoThreadContext { | ||
89 | AVFormatContext *avf; | ||
90 | |||
91 | /* Timestamp of last failure. | ||
92 | * This is either pts in case stream time is used, | ||
93 | * or microseconds as returned by av_gettime_relative() */ | ||
94 | int64_t last_recovery_ts; | ||
95 | |||
96 | /* Number of current recovery process | ||
97 | * Value > 0 means we are in recovery process */ | ||
98 | int recovery_nr; | ||
99 | |||
100 | /* If > 0 all frames will be dropped until keyframe is received */ | ||
101 | uint8_t drop_until_keyframe; | ||
102 | |||
103 | /* Value > 0 means that the previous write_header call was successful | ||
104 | * so finalization by calling write_trailer and ff_io_close must be done | ||
105 | * before exiting / reinitialization of underlying muxer */ | ||
106 | uint8_t header_written; | ||
107 | |||
108 | int64_t last_received_dts; | ||
109 | } FifoThreadContext; | ||
110 | |||
111 | typedef enum FifoMessageType { | ||
112 | FIFO_NOOP, | ||
113 | FIFO_WRITE_HEADER, | ||
114 | FIFO_WRITE_PACKET, | ||
115 | FIFO_FLUSH_OUTPUT | ||
116 | } FifoMessageType; | ||
117 | |||
118 | typedef struct FifoMessage { | ||
119 | FifoMessageType type; | ||
120 | AVPacket pkt; | ||
121 | } FifoMessage; | ||
122 | |||
123 | 51 | static int fifo_thread_write_header(FifoThreadContext *ctx) | |
124 | { | ||
125 | 51 | AVFormatContext *avf = ctx->avf; | |
126 | 51 | FifoContext *fifo = avf->priv_data; | |
127 | 51 | AVFormatContext *avf2 = fifo->avf; | |
128 | 51 | AVDictionary *format_options = NULL; | |
129 | int ret, i; | ||
130 | |||
131 | 51 | ret = av_dict_copy(&format_options, fifo->format_options, 0); | |
132 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 51 times.
|
51 | if (ret < 0) |
133 | ✗ | goto end; | |
134 | |||
135 | 51 | ret = ff_format_output_open(avf2, avf->url, &format_options); | |
136 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 51 times.
|
51 | if (ret < 0) { |
137 | ✗ | av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->url, | |
138 | ✗ | av_err2str(ret)); | |
139 | ✗ | goto end; | |
140 | } | ||
141 | |||
142 |
2/2✓ Branch 0 taken 52 times.
✓ Branch 1 taken 51 times.
|
103 | for (i = 0;i < avf2->nb_streams; i++) |
143 | 52 | ffstream(avf2->streams[i])->cur_dts = 0; | |
144 | |||
145 | 51 | ret = avformat_write_header(avf2, &format_options); | |
146 |
1/2✓ Branch 0 taken 51 times.
✗ Branch 1 not taken.
|
51 | if (!ret) |
147 | 51 | ctx->header_written = 1; | |
148 | |||
149 | // Check for options unrecognized by underlying muxer | ||
150 |
1/2✓ Branch 0 taken 51 times.
✗ Branch 1 not taken.
|
51 | if (format_options) { |
151 | ✗ | const AVDictionaryEntry *entry = NULL; | |
152 | ✗ | while ((entry = av_dict_iterate(format_options, entry))) | |
153 | ✗ | av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key); | |
154 | ✗ | ret = AVERROR(EINVAL); | |
155 | } | ||
156 | |||
157 | 51 | end: | |
158 | 51 | av_dict_free(&format_options); | |
159 | 51 | return ret; | |
160 | } | ||
161 | |||
162 | 3 | static int fifo_thread_flush_output(FifoThreadContext *ctx) | |
163 | { | ||
164 | 3 | AVFormatContext *avf = ctx->avf; | |
165 | 3 | FifoContext *fifo = avf->priv_data; | |
166 | 3 | AVFormatContext *avf2 = fifo->avf; | |
167 | |||
168 | 3 | return av_write_frame(avf2, NULL); | |
169 | } | ||
170 | |||
171 | ✗ | static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts) | |
172 | { | ||
173 | ✗ | AVStream *st = avf->streams[pkt->stream_index]; | |
174 | ✗ | int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q); | |
175 | ✗ | int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts); | |
176 | ✗ | *last_dts = dts; | |
177 | ✗ | return duration; | |
178 | } | ||
179 | |||
180 | 145 | static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt) | |
181 | { | ||
182 | 145 | AVFormatContext *avf = ctx->avf; | |
183 | 145 | FifoContext *fifo = avf->priv_data; | |
184 | 145 | AVFormatContext *avf2 = fifo->avf; | |
185 | AVRational src_tb, dst_tb; | ||
186 | int ret, s_idx; | ||
187 | int64_t orig_pts, orig_dts, orig_duration; | ||
188 | |||
189 |
1/4✗ Branch 0 not taken.
✓ Branch 1 taken 145 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
145 | if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE) |
190 | ✗ | atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed); | |
191 | |||
192 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 145 times.
|
145 | if (ctx->drop_until_keyframe) { |
193 | ✗ | if (pkt->flags & AV_PKT_FLAG_KEY) { | |
194 | ✗ | ctx->drop_until_keyframe = 0; | |
195 | ✗ | av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n"); | |
196 | } else { | ||
197 | ✗ | av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n"); | |
198 | ✗ | av_packet_unref(pkt); | |
199 | ✗ | return 0; | |
200 | } | ||
201 | } | ||
202 | |||
203 | 145 | orig_pts = pkt->pts; | |
204 | 145 | orig_dts = pkt->dts; | |
205 | 145 | orig_duration = pkt->duration; | |
206 | 145 | s_idx = pkt->stream_index; | |
207 | 145 | src_tb = avf->streams[s_idx]->time_base; | |
208 | 145 | dst_tb = avf2->streams[s_idx]->time_base; | |
209 | 145 | av_packet_rescale_ts(pkt, src_tb, dst_tb); | |
210 | |||
211 | 145 | ret = av_write_frame(avf2, pkt); | |
212 |
2/2✓ Branch 0 taken 100 times.
✓ Branch 1 taken 45 times.
|
145 | if (ret >= 0) { |
213 | 100 | av_packet_unref(pkt); | |
214 | } else { | ||
215 | // avoid scaling twice | ||
216 | 45 | pkt->pts = orig_pts; | |
217 | 45 | pkt->dts = orig_dts; | |
218 | 45 | pkt->duration = orig_duration; | |
219 | } | ||
220 | 145 | return ret; | |
221 | } | ||
222 | |||
223 | 51 | static int fifo_thread_write_trailer(FifoThreadContext *ctx) | |
224 | { | ||
225 | 51 | AVFormatContext *avf = ctx->avf; | |
226 | 51 | FifoContext *fifo = avf->priv_data; | |
227 | 51 | AVFormatContext *avf2 = fifo->avf; | |
228 | int ret; | ||
229 | |||
230 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 51 times.
|
51 | if (!ctx->header_written) |
231 | ✗ | return 0; | |
232 | |||
233 | 51 | ret = av_write_trailer(avf2); | |
234 | 51 | ff_format_io_close(avf2, &avf2->pb); | |
235 | |||
236 | 51 | return ret; | |
237 | } | ||
238 | |||
239 | 154 | static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg) | |
240 | { | ||
241 | 154 | int ret = AVERROR(EINVAL); | |
242 | |||
243 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 154 times.
|
154 | if (msg->type == FIFO_NOOP) |
244 | ✗ | return 0; | |
245 | |||
246 |
2/2✓ Branch 0 taken 51 times.
✓ Branch 1 taken 103 times.
|
154 | if (!ctx->header_written) { |
247 | 51 | ret = fifo_thread_write_header(ctx); | |
248 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 51 times.
|
51 | if (ret < 0) |
249 | ✗ | return ret; | |
250 | } | ||
251 | |||
252 |
3/4✓ Branch 0 taken 6 times.
✓ Branch 1 taken 145 times.
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
|
154 | switch(msg->type) { |
253 | 6 | case FIFO_WRITE_HEADER: | |
254 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | av_assert0(ret >= 0); |
255 | 6 | return ret; | |
256 | 145 | case FIFO_WRITE_PACKET: | |
257 | 145 | return fifo_thread_write_packet(ctx, &msg->pkt); | |
258 | 3 | case FIFO_FLUSH_OUTPUT: | |
259 | 3 | return fifo_thread_flush_output(ctx); | |
260 | } | ||
261 | |||
262 | ✗ | av_assert0(0); | |
263 | return AVERROR(EINVAL); | ||
264 | } | ||
265 | |||
266 | 75 | static int is_recoverable(const FifoContext *fifo, int err_no) { | |
267 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 75 times.
|
75 | if (!fifo->attempt_recovery) |
268 | ✗ | return 0; | |
269 | |||
270 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 75 times.
|
75 | if (fifo->recover_any_error) |
271 | ✗ | return err_no != AVERROR_EXIT; | |
272 | |||
273 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 75 times.
|
75 | switch (err_no) { |
274 | ✗ | case AVERROR(EINVAL): | |
275 | case AVERROR(ENOSYS): | ||
276 | case AVERROR_EOF: | ||
277 | case AVERROR_EXIT: | ||
278 | case AVERROR_PATCHWELCOME: | ||
279 | ✗ | return 0; | |
280 | 75 | default: | |
281 | 75 | return 1; | |
282 | } | ||
283 | } | ||
284 | |||
285 | 3 | static void free_message(void *msg) | |
286 | { | ||
287 | 3 | FifoMessage *fifo_msg = msg; | |
288 | |||
289 |
1/2✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
|
3 | if (fifo_msg->type == FIFO_WRITE_PACKET) |
290 | 3 | av_packet_unref(&fifo_msg->pkt); | |
291 | 3 | } | |
292 | |||
293 | 30 | static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt, | |
294 | int err_no) | ||
295 | { | ||
296 | 30 | AVFormatContext *avf = ctx->avf; | |
297 | 30 | FifoContext *fifo = avf->priv_data; | |
298 | int ret; | ||
299 | |||
300 | 30 | av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n", | |
301 | 30 | av_err2str(err_no)); | |
302 | |||
303 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
|
30 | if (fifo->recovery_wait_streamtime) { |
304 | ✗ | if (pkt->pts == AV_NOPTS_VALUE) | |
305 | ✗ | av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation" | |
306 | " timestamp, recovery will be attempted immediately"); | ||
307 | ✗ | ctx->last_recovery_ts = pkt->pts; | |
308 | } else { | ||
309 | 30 | ctx->last_recovery_ts = av_gettime_relative(); | |
310 | } | ||
311 | |||
312 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
|
30 | if (fifo->max_recovery_attempts && |
313 | ✗ | ctx->recovery_nr >= fifo->max_recovery_attempts) { | |
314 | ✗ | av_log(avf, AV_LOG_ERROR, | |
315 | "Maximal number of %d recovery attempts reached.\n", | ||
316 | fifo->max_recovery_attempts); | ||
317 | ✗ | ret = err_no; | |
318 | } else { | ||
319 | 30 | ret = AVERROR(EAGAIN); | |
320 | } | ||
321 | |||
322 | 30 | return ret; | |
323 | } | ||
324 | |||
325 | 45 | static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no) | |
326 | { | ||
327 | 45 | AVFormatContext *avf = ctx->avf; | |
328 | 45 | FifoContext *fifo = avf->priv_data; | |
329 | 45 | AVPacket *pkt = &msg->pkt; | |
330 | int64_t time_since_recovery; | ||
331 | int ret; | ||
332 | |||
333 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 45 times.
|
45 | if (!is_recoverable(fifo, err_no)) { |
334 | ✗ | ret = err_no; | |
335 | ✗ | goto fail; | |
336 | } | ||
337 | |||
338 |
1/2✓ Branch 0 taken 45 times.
✗ Branch 1 not taken.
|
45 | if (ctx->header_written) { |
339 | 45 | fifo->write_trailer_ret = fifo_thread_write_trailer(ctx); | |
340 | 45 | ctx->header_written = 0; | |
341 | } | ||
342 | |||
343 |
2/2✓ Branch 0 taken 15 times.
✓ Branch 1 taken 30 times.
|
45 | if (!ctx->recovery_nr) { |
344 | 15 | ctx->last_recovery_ts = fifo->recovery_wait_streamtime ? | |
345 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
|
15 | AV_NOPTS_VALUE : 0; |
346 | } else { | ||
347 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
|
30 | if (fifo->recovery_wait_streamtime) { |
348 | ✗ | if (ctx->last_recovery_ts == AV_NOPTS_VALUE) { | |
349 | ✗ | AVRational tb = avf->streams[pkt->stream_index]->time_base; | |
350 | ✗ | time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts, | |
351 | ✗ | tb, AV_TIME_BASE_Q); | |
352 | } else { | ||
353 | /* Enforce recovery immediately */ | ||
354 | ✗ | time_since_recovery = fifo->recovery_wait_time; | |
355 | } | ||
356 | } else { | ||
357 | 30 | time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts; | |
358 | } | ||
359 | |||
360 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
|
30 | if (time_since_recovery < fifo->recovery_wait_time) |
361 | ✗ | return AVERROR(EAGAIN); | |
362 | } | ||
363 | |||
364 | 45 | ctx->recovery_nr++; | |
365 | |||
366 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 45 times.
|
45 | if (fifo->max_recovery_attempts) { |
367 | ✗ | av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n", | |
368 | ctx->recovery_nr, fifo->max_recovery_attempts); | ||
369 | } else { | ||
370 | 45 | av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n", | |
371 | ctx->recovery_nr); | ||
372 | } | ||
373 | |||
374 |
1/4✗ Branch 0 not taken.
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
45 | if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow) |
375 | ✗ | ctx->drop_until_keyframe = 1; | |
376 | |||
377 | 45 | ret = fifo_thread_dispatch_message(ctx, msg); | |
378 |
2/2✓ Branch 0 taken 30 times.
✓ Branch 1 taken 15 times.
|
45 | if (ret < 0) { |
379 |
1/2✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
|
30 | if (is_recoverable(fifo, ret)) { |
380 | 30 | return fifo_thread_process_recovery_failure(ctx, pkt, ret); | |
381 | } else { | ||
382 | ✗ | goto fail; | |
383 | } | ||
384 | } else { | ||
385 | 15 | av_log(avf, AV_LOG_INFO, "Recovery successful\n"); | |
386 | 15 | ctx->recovery_nr = 0; | |
387 | } | ||
388 | |||
389 | 15 | return 0; | |
390 | |||
391 | ✗ | fail: | |
392 | ✗ | free_message(msg); | |
393 | ✗ | return ret; | |
394 | } | ||
395 | |||
396 | 15 | static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no) | |
397 | { | ||
398 | 15 | AVFormatContext *avf = ctx->avf; | |
399 | 15 | FifoContext *fifo = avf->priv_data; | |
400 | int ret; | ||
401 | |||
402 | do { | ||
403 |
3/4✓ Branch 0 taken 45 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 30 times.
✓ Branch 3 taken 15 times.
|
45 | if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) { |
404 | 30 | int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts; | |
405 | 30 | int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery); | |
406 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
|
30 | if (time_to_wait) |
407 | ✗ | av_usleep(FFMIN(10000, time_to_wait)); | |
408 | } | ||
409 | |||
410 | 45 | ret = fifo_thread_attempt_recovery(ctx, msg, err_no); | |
411 |
3/4✓ Branch 0 taken 30 times.
✓ Branch 1 taken 15 times.
✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
|
45 | } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow); |
412 | |||
413 |
1/4✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
15 | if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) { |
414 | ✗ | if (msg->type == FIFO_WRITE_PACKET) | |
415 | ✗ | av_packet_unref(&msg->pkt); | |
416 | ✗ | ret = 0; | |
417 | } | ||
418 | |||
419 | 15 | return ret; | |
420 | } | ||
421 | |||
422 | 6 | static void *fifo_consumer_thread(void *data) | |
423 | { | ||
424 | 6 | AVFormatContext *avf = data; | |
425 | 6 | FifoContext *fifo = avf->priv_data; | |
426 | 6 | AVThreadMessageQueue *queue = fifo->queue; | |
427 | 6 | FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}}; | |
428 | int ret; | ||
429 | |||
430 | FifoThreadContext fifo_thread_ctx; | ||
431 | 6 | memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext)); | |
432 | 6 | fifo_thread_ctx.avf = avf; | |
433 | 6 | fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE; | |
434 | |||
435 | 6 | ff_thread_setname("fifo-consumer"); | |
436 | |||
437 | 103 | while (1) { | |
438 | 109 | uint8_t just_flushed = 0; | |
439 | |||
440 |
1/2✓ Branch 0 taken 109 times.
✗ Branch 1 not taken.
|
109 | if (!fifo_thread_ctx.recovery_nr) |
441 | 109 | ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg); | |
442 | |||
443 |
3/4✓ Branch 0 taken 94 times.
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 94 times.
|
109 | if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) { |
444 | 15 | int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret); | |
445 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
|
15 | if (rec_ret < 0) { |
446 | ✗ | av_thread_message_queue_set_err_send(queue, rec_ret); | |
447 | ✗ | break; | |
448 | } | ||
449 | } | ||
450 | |||
451 | /* If the queue is full at the moment when fifo_write_packet | ||
452 | * attempts to insert new message (packet) to the queue, | ||
453 | * it sets the fifo->overflow_flag to 1 and drops packet. | ||
454 | * Here in consumer thread, the flag is checked and if it is | ||
455 | * set, the queue is flushed and flag cleared. */ | ||
456 | 109 | pthread_mutex_lock(&fifo->overflow_flag_lock); | |
457 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 108 times.
|
109 | if (fifo->overflow_flag) { |
458 | 1 | av_thread_message_flush(queue); | |
459 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (fifo->restart_with_keyframe) |
460 | ✗ | fifo_thread_ctx.drop_until_keyframe = 1; | |
461 | 1 | fifo->overflow_flag = 0; | |
462 | 1 | just_flushed = 1; | |
463 | } | ||
464 | 109 | pthread_mutex_unlock(&fifo->overflow_flag_lock); | |
465 | |||
466 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 108 times.
|
109 | if (just_flushed) |
467 | 1 | av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n"); | |
468 | |||
469 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 109 times.
|
109 | if (fifo->timeshift) |
470 | ✗ | while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift) | |
471 | ✗ | av_usleep(10000); | |
472 | |||
473 | 109 | ret = av_thread_message_queue_recv(queue, &msg, 0); | |
474 |
2/2✓ Branch 0 taken 6 times.
✓ Branch 1 taken 103 times.
|
109 | if (ret < 0) { |
475 | 6 | av_thread_message_queue_set_err_send(queue, ret); | |
476 | 6 | break; | |
477 | } | ||
478 | } | ||
479 | |||
480 | 6 | fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx); | |
481 | |||
482 | 6 | return NULL; | |
483 | } | ||
484 | |||
485 | 6 | static int fifo_mux_init(AVFormatContext *avf, const AVOutputFormat *oformat, | |
486 | const char *filename) | ||
487 | { | ||
488 | 6 | FifoContext *fifo = avf->priv_data; | |
489 | AVFormatContext *avf2; | ||
490 | 6 | int ret = 0, i; | |
491 | |||
492 | 6 | ret = avformat_alloc_output_context2(&avf2, oformat, NULL, filename); | |
493 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (ret < 0) |
494 | ✗ | return ret; | |
495 | |||
496 | 6 | fifo->avf = avf2; | |
497 | |||
498 | 6 | avf2->interrupt_callback = avf->interrupt_callback; | |
499 | 6 | avf2->max_delay = avf->max_delay; | |
500 | 6 | ret = av_dict_copy(&avf2->metadata, avf->metadata, 0); | |
501 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (ret < 0) |
502 | ✗ | return ret; | |
503 | 6 | avf2->opaque = avf->opaque; | |
504 | 6 | avf2->io_close2 = avf->io_close2; | |
505 | 6 | avf2->io_open = avf->io_open; | |
506 | 6 | avf2->flags = avf->flags; | |
507 | |||
508 |
2/2✓ Branch 0 taken 7 times.
✓ Branch 1 taken 6 times.
|
13 | for (i = 0; i < avf->nb_streams; ++i) { |
509 | 7 | AVStream *st = ff_stream_clone(avf2, avf->streams[i]); | |
510 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
|
7 | if (!st) |
511 | ✗ | return AVERROR(ENOMEM); | |
512 | } | ||
513 | |||
514 | 6 | return 0; | |
515 | } | ||
516 | |||
517 | 6 | static int fifo_init(AVFormatContext *avf) | |
518 | { | ||
519 | 6 | FifoContext *fifo = avf->priv_data; | |
520 | const AVOutputFormat *oformat; | ||
521 | 6 | int ret = 0; | |
522 | |||
523 |
1/4✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
6 | if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) { |
524 | ✗ | av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on" | |
525 | " only when drop_pkts_on_overflow is also turned on\n"); | ||
526 | ✗ | return AVERROR(EINVAL); | |
527 | } | ||
528 | 6 | atomic_init(&fifo->queue_duration, 0); | |
529 | 6 | fifo->last_sent_dts = AV_NOPTS_VALUE; | |
530 | |||
531 | #ifdef FIFO_TEST | ||
532 | /* This exists for the fifo_muxer test tool. */ | ||
533 |
2/4✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
|
4 | if (fifo->format && !strcmp(fifo->format, "fifo_test")) { |
534 | extern const FFOutputFormat ff_fifo_test_muxer; | ||
535 | 4 | oformat = &ff_fifo_test_muxer.p; | |
536 | } else | ||
537 | #endif | ||
538 | 2 | oformat = av_guess_format(fifo->format, avf->url, NULL); | |
539 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (!oformat) { |
540 | ✗ | ret = AVERROR_MUXER_NOT_FOUND; | |
541 | ✗ | return ret; | |
542 | } | ||
543 | |||
544 | 6 | ret = fifo_mux_init(avf, oformat, avf->url); | |
545 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (ret < 0) |
546 | ✗ | return ret; | |
547 | |||
548 | 6 | ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size, | |
549 | sizeof(FifoMessage)); | ||
550 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (ret < 0) |
551 | ✗ | return ret; | |
552 | |||
553 | 6 | av_thread_message_queue_set_free_func(fifo->queue, free_message); | |
554 | |||
555 | 6 | ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL); | |
556 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (ret < 0) |
557 | ✗ | return AVERROR(ret); | |
558 | 6 | fifo->overflow_flag_lock_initialized = 1; | |
559 | |||
560 | 6 | return 0; | |
561 | } | ||
562 | |||
563 | 6 | static int fifo_write_header(AVFormatContext *avf) | |
564 | { | ||
565 | 6 | FifoContext * fifo = avf->priv_data; | |
566 | int ret; | ||
567 | |||
568 | 6 | ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf); | |
569 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (ret) { |
570 | ✗ | av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n", | |
571 | ✗ | av_err2str(AVERROR(ret))); | |
572 | ✗ | ret = AVERROR(ret); | |
573 | } | ||
574 | |||
575 | 6 | return ret; | |
576 | } | ||
577 | |||
578 | 109 | static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt) | |
579 | { | ||
580 | 109 | FifoContext *fifo = avf->priv_data; | |
581 |
2/2✓ Branch 0 taken 106 times.
✓ Branch 1 taken 3 times.
|
109 | FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT}; |
582 | int ret; | ||
583 | |||
584 |
2/2✓ Branch 0 taken 106 times.
✓ Branch 1 taken 3 times.
|
109 | if (pkt) { |
585 | 106 | ret = av_packet_ref(&msg.pkt,pkt); | |
586 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 106 times.
|
106 | if (ret < 0) |
587 | ✗ | return ret; | |
588 | } | ||
589 | |||
590 | 109 | ret = av_thread_message_queue_send(fifo->queue, &msg, | |
591 | 109 | fifo->drop_pkts_on_overflow ? | |
592 | AV_THREAD_MESSAGE_NONBLOCK : 0); | ||
593 |
2/2✓ Branch 0 taken 3 times.
✓ Branch 1 taken 106 times.
|
109 | if (ret == AVERROR(EAGAIN)) { |
594 | 3 | uint8_t overflow_set = 0; | |
595 | |||
596 | /* Queue is full, set fifo->overflow_flag to 1 | ||
597 | * to let consumer thread know the queue should | ||
598 | * be flushed. */ | ||
599 | 3 | pthread_mutex_lock(&fifo->overflow_flag_lock); | |
600 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 2 times.
|
3 | if (!fifo->overflow_flag) |
601 | 1 | fifo->overflow_flag = overflow_set = 1; | |
602 | 3 | pthread_mutex_unlock(&fifo->overflow_flag_lock); | |
603 | |||
604 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 2 times.
|
3 | if (overflow_set) |
605 | 1 | av_log(avf, AV_LOG_WARNING, "FIFO queue full\n"); | |
606 | 3 | ret = 0; | |
607 | 3 | goto fail; | |
608 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 106 times.
|
106 | } else if (ret < 0) { |
609 | ✗ | goto fail; | |
610 | } | ||
611 | |||
612 |
1/6✗ Branch 0 not taken.
✓ Branch 1 taken 106 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
|
106 | if (fifo->timeshift && pkt && pkt->dts != AV_NOPTS_VALUE) |
613 | ✗ | atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed); | |
614 | |||
615 | 106 | return ret; | |
616 | 3 | fail: | |
617 |
1/2✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
|
3 | if (pkt) |
618 | 3 | av_packet_unref(&msg.pkt); | |
619 | 3 | return ret; | |
620 | } | ||
621 | |||
622 | 6 | static int fifo_write_trailer(AVFormatContext *avf) | |
623 | { | ||
624 | 6 | FifoContext *fifo= avf->priv_data; | |
625 | int ret; | ||
626 | |||
627 | 6 | av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF); | |
628 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (fifo->timeshift) { |
629 | ✗ | int64_t now = av_gettime_relative(); | |
630 | ✗ | int64_t elapsed = 0; | |
631 | ✗ | FifoMessage msg = {FIFO_NOOP}; | |
632 | do { | ||
633 | ✗ | int64_t delay = av_gettime_relative() - now; | |
634 | ✗ | if (delay < 0) { // Discontinuity? | |
635 | ✗ | delay = 10000; | |
636 | ✗ | now = av_gettime_relative(); | |
637 | } else { | ||
638 | ✗ | now += delay; | |
639 | } | ||
640 | ✗ | atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed); | |
641 | ✗ | elapsed += delay; | |
642 | ✗ | if (elapsed > fifo->timeshift) | |
643 | ✗ | break; | |
644 | ✗ | av_usleep(10000); | |
645 | ✗ | ret = av_thread_message_queue_send(fifo->queue, &msg, AV_THREAD_MESSAGE_NONBLOCK); | |
646 | ✗ | } while (ret >= 0 || ret == AVERROR(EAGAIN)); | |
647 | ✗ | atomic_store(&fifo->queue_duration, INT64_MAX); | |
648 | } | ||
649 | |||
650 | 6 | ret = pthread_join(fifo->writer_thread, NULL); | |
651 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (ret < 0) { |
652 | ✗ | av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n", | |
653 | ✗ | av_err2str(AVERROR(ret))); | |
654 | ✗ | return AVERROR(ret); | |
655 | } | ||
656 | |||
657 | 6 | ret = fifo->write_trailer_ret; | |
658 | 6 | return ret; | |
659 | } | ||
660 | |||
661 | 6 | static void fifo_deinit(AVFormatContext *avf) | |
662 | { | ||
663 | 6 | FifoContext *fifo = avf->priv_data; | |
664 | |||
665 | 6 | avformat_free_context(fifo->avf); | |
666 | 6 | av_thread_message_queue_free(&fifo->queue); | |
667 |
1/2✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
|
6 | if (fifo->overflow_flag_lock_initialized) |
668 | 6 | pthread_mutex_destroy(&fifo->overflow_flag_lock); | |
669 | 6 | } | |
670 | |||
671 | #define OFFSET(x) offsetof(FifoContext, x) | ||
672 | static const AVOption options[] = { | ||
673 | {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery), | ||
674 | AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, | ||
675 | |||
676 | {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow), | ||
677 | AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, | ||
678 | |||
679 | {"fifo_format", "Target muxer", OFFSET(format), | ||
680 | AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM}, | ||
681 | |||
682 | {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options), | ||
683 | AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM}, | ||
684 | |||
685 | {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts), | ||
686 | AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM}, | ||
687 | |||
688 | {"queue_size", "Size of fifo queue", OFFSET(queue_size), | ||
689 | AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM}, | ||
690 | |||
691 | {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery", | ||
692 | OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, | ||
693 | |||
694 | {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time), | ||
695 | AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM}, | ||
696 | |||
697 | {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error), | ||
698 | AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, | ||
699 | |||
700 | {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe), | ||
701 | AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, | ||
702 | |||
703 | {"timeshift", "Delay fifo output", OFFSET(timeshift), | ||
704 | AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM}, | ||
705 | |||
706 | {NULL}, | ||
707 | }; | ||
708 | |||
709 | static const AVClass fifo_muxer_class = { | ||
710 | .class_name = "Fifo muxer", | ||
711 | .item_name = av_default_item_name, | ||
712 | .option = options, | ||
713 | .version = LIBAVUTIL_VERSION_INT, | ||
714 | }; | ||
715 | |||
716 | const FFOutputFormat ff_fifo_muxer = { | ||
717 | .p.name = "fifo", | ||
718 | .p.long_name = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"), | ||
719 | .p.priv_class = &fifo_muxer_class, | ||
720 | #if FF_API_ALLOW_FLUSH | ||
721 | .p.flags = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE, | ||
722 | #else | ||
723 | .p.flags = AVFMT_NOFILE | AVFMT_TS_NEGATIVE, | ||
724 | #endif | ||
725 | .priv_data_size = sizeof(FifoContext), | ||
726 | .init = fifo_init, | ||
727 | .write_header = fifo_write_header, | ||
728 | .write_packet = fifo_write_packet, | ||
729 | .write_trailer = fifo_write_trailer, | ||
730 | .deinit = fifo_deinit, | ||
731 | .flags_internal = FF_OFMT_FLAG_ALLOW_FLUSH, | ||
732 | }; | ||
733 |