Line | Branch | Exec | Source |
---|---|---|---|
1 | /* | ||
2 | * FIFO pseudo-muxer | ||
3 | * Copyright (c) 2016 Jan Sebechlebsky | ||
4 | * | ||
5 | * This file is part of FFmpeg. | ||
6 | * | ||
7 | * FFmpeg is free software; you can redistribute it and/or | ||
8 | * modify it under the terms of the GNU Lesser General Public License | ||
9 | * as published by the Free Software Foundation; either | ||
10 | * version 2.1 of the License, or (at your option) any later version. | ||
11 | * | ||
12 | * FFmpeg is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
15 | * GNU Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public License | ||
18 | * along with FFmpeg; if not, write to the Free Software * Foundation, Inc., | ||
19 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | ||
20 | */ | ||
21 | |||
22 | #include <stdatomic.h> | ||
23 | |||
24 | #include "libavutil/avassert.h" | ||
25 | #include "libavutil/opt.h" | ||
26 | #include "libavutil/time.h" | ||
27 | #include "libavutil/thread.h" | ||
28 | #include "libavutil/threadmessage.h" | ||
29 | #include "avformat.h" | ||
30 | #include "internal.h" | ||
31 | #include "mux.h" | ||
32 | |||
33 | #define FIFO_DEFAULT_QUEUE_SIZE 60 | ||
34 | #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 0 | ||
35 | #define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds | ||
36 | |||
37 | typedef struct FifoContext { | ||
38 | const AVClass *class; | ||
39 | AVFormatContext *avf; | ||
40 | |||
41 | char *format; | ||
42 | AVDictionary *format_options; | ||
43 | |||
44 | int queue_size; | ||
45 | AVThreadMessageQueue *queue; | ||
46 | |||
47 | pthread_t writer_thread; | ||
48 | |||
49 | /* Return value of last write_trailer_call */ | ||
50 | int write_trailer_ret; | ||
51 | |||
52 | /* Time to wait before next recovery attempt | ||
53 | * This can refer to the time in processed stream, | ||
54 | * or real time. */ | ||
55 | int64_t recovery_wait_time; | ||
56 | |||
57 | /* Maximal number of unsuccessful successive recovery attempts */ | ||
58 | int max_recovery_attempts; | ||
59 | |||
60 | /* Whether to attempt recovery from failure */ | ||
61 | int attempt_recovery; | ||
62 | |||
63 | /* If >0 stream time will be used when waiting | ||
64 | * for the recovery attempt instead of real time */ | ||
65 | int recovery_wait_streamtime; | ||
66 | |||
67 | /* If >0 recovery will be attempted regardless of error code | ||
68 | * (except AVERROR_EXIT, so exit request is never ignored) */ | ||
69 | int recover_any_error; | ||
70 | |||
71 | /* Whether to drop packets in case the queue is full. */ | ||
72 | int drop_pkts_on_overflow; | ||
73 | |||
74 | /* Whether to wait for keyframe when recovering | ||
75 | * from failure or queue overflow */ | ||
76 | int restart_with_keyframe; | ||
77 | |||
78 | pthread_mutex_t overflow_flag_lock; | ||
79 | int overflow_flag_lock_initialized; | ||
80 | /* Value > 0 signals queue overflow */ | ||
81 | volatile uint8_t overflow_flag; | ||
82 | |||
83 | atomic_int_least64_t queue_duration; | ||
84 | int64_t last_sent_dts; | ||
85 | int64_t timeshift; | ||
86 | } FifoContext; | ||
87 | |||
88 | typedef struct FifoThreadContext { | ||
89 | AVFormatContext *avf; | ||
90 | |||
91 | /* Timestamp of last failure. | ||
92 | * This is either pts in case stream time is used, | ||
93 | * or microseconds as returned by av_gettime_relative() */ | ||
94 | int64_t last_recovery_ts; | ||
95 | |||
96 | /* Number of current recovery process | ||
97 | * Value > 0 means we are in recovery process */ | ||
98 | int recovery_nr; | ||
99 | |||
100 | /* If > 0 all frames will be dropped until keyframe is received */ | ||
101 | uint8_t drop_until_keyframe; | ||
102 | |||
103 | /* Value > 0 means that the previous write_header call was successful | ||
104 | * so finalization by calling write_trailer and ff_io_close must be done | ||
105 | * before exiting / reinitialization of underlying muxer */ | ||
106 | uint8_t header_written; | ||
107 | |||
108 | int64_t last_received_dts; | ||
109 | |||
110 | /* If > 0 at least one of the streams is a video stream */ | ||
111 | uint8_t has_video_stream; | ||
112 | } FifoThreadContext; | ||
113 | |||
114 | typedef enum FifoMessageType { | ||
115 | FIFO_NOOP, | ||
116 | FIFO_WRITE_HEADER, | ||
117 | FIFO_WRITE_PACKET, | ||
118 | FIFO_FLUSH_OUTPUT | ||
119 | } FifoMessageType; | ||
120 | |||
121 | typedef struct FifoMessage { | ||
122 | FifoMessageType type; | ||
123 | AVPacket pkt; | ||
124 | } FifoMessage; | ||
125 | |||
126 | 51 | static int fifo_thread_write_header(FifoThreadContext *ctx) | |
127 | { | ||
128 | 51 | AVFormatContext *avf = ctx->avf; | |
129 | 51 | FifoContext *fifo = avf->priv_data; | |
130 | 51 | AVFormatContext *avf2 = fifo->avf; | |
131 | 51 | AVDictionary *format_options = NULL; | |
132 | int ret, i; | ||
133 | |||
134 | 51 | ret = av_dict_copy(&format_options, fifo->format_options, 0); | |
135 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 51 times.
|
51 | if (ret < 0) |
136 | ✗ | goto end; | |
137 | |||
138 | 51 | ret = ff_format_output_open(avf2, avf->url, &format_options); | |
139 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 51 times.
|
51 | if (ret < 0) { |
140 | ✗ | av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->url, | |
141 | ✗ | av_err2str(ret)); | |
142 | ✗ | goto end; | |
143 | } | ||
144 | |||
145 |
2/2✓ Branch 0 taken 52 times.
✓ Branch 1 taken 51 times.
|
103 | for (i = 0;i < avf2->nb_streams; i++) |
146 | 52 | ffstream(avf2->streams[i])->cur_dts = 0; | |
147 | |||
148 | 51 | ret = avformat_write_header(avf2, &format_options); | |
149 |
1/2✓ Branch 0 taken 51 times.
✗ Branch 1 not taken.
|
51 | if (!ret) |
150 | 51 | ctx->header_written = 1; | |
151 | |||
152 | // Check for options unrecognized by underlying muxer | ||
153 |
1/2✓ Branch 0 taken 51 times.
✗ Branch 1 not taken.
|
51 | if (format_options) { |
154 | ✗ | const AVDictionaryEntry *entry = NULL; | |
155 | ✗ | while ((entry = av_dict_iterate(format_options, entry))) | |
156 | ✗ | av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key); | |
157 | ✗ | ret = AVERROR(EINVAL); | |
158 | } | ||
159 | |||
160 | 51 | end: | |
161 | 51 | av_dict_free(&format_options); | |
162 | 51 | return ret; | |
163 | } | ||
164 | |||
165 | 3 | static int fifo_thread_flush_output(FifoThreadContext *ctx) | |
166 | { | ||
167 | 3 | AVFormatContext *avf = ctx->avf; | |
168 | 3 | FifoContext *fifo = avf->priv_data; | |
169 | 3 | AVFormatContext *avf2 = fifo->avf; | |
170 | |||
171 | 3 | return av_write_frame(avf2, NULL); | |
172 | } | ||
173 | |||
174 | ✗ | static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts) | |
175 | { | ||
176 | ✗ | AVStream *st = avf->streams[pkt->stream_index]; | |
177 | ✗ | int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q); | |
178 | ✗ | int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts); | |
179 | ✗ | *last_dts = dts; | |
180 | ✗ | return duration; | |
181 | } | ||
182 | |||
183 | 145 | static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt) | |
184 | { | ||
185 | 145 | AVFormatContext *avf = ctx->avf; | |
186 | 145 | FifoContext *fifo = avf->priv_data; | |
187 | 145 | AVFormatContext *avf2 = fifo->avf; | |
188 | AVRational src_tb, dst_tb; | ||
189 | int ret, s_idx; | ||
190 | int64_t orig_pts, orig_dts, orig_duration; | ||
191 | 145 | enum AVMediaType stream_codec_type = avf->streams[pkt->stream_index]->codecpar->codec_type; | |
192 | |||
193 |
1/4✗ Branch 0 not taken.
✓ Branch 1 taken 145 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
145 | if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE) |
194 | ✗ | atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed); | |
195 | |||
196 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 145 times.
|
145 | if (ctx->drop_until_keyframe) { |
197 | ✗ | if (pkt->flags & AV_PKT_FLAG_KEY) { | |
198 | ✗ | if (!ctx->has_video_stream) { | |
199 | ✗ | ctx->drop_until_keyframe = 0; | |
200 | ✗ | av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n"); | |
201 | } else { | ||
202 | ✗ | if (stream_codec_type == AVMEDIA_TYPE_VIDEO) { | |
203 | ✗ | ctx->drop_until_keyframe = 0; | |
204 | ✗ | av_log(avf, AV_LOG_VERBOSE, "Video keyframe received, recovering...\n"); | |
205 | } else { | ||
206 | ✗ | av_log(avf, AV_LOG_VERBOSE, "Dropping non-video keyframe\n"); | |
207 | ✗ | av_packet_unref(pkt); | |
208 | ✗ | return 0; | |
209 | } | ||
210 | } | ||
211 | } else { | ||
212 | ✗ | av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n"); | |
213 | ✗ | av_packet_unref(pkt); | |
214 | ✗ | return 0; | |
215 | } | ||
216 | } | ||
217 | |||
218 | 145 | orig_pts = pkt->pts; | |
219 | 145 | orig_dts = pkt->dts; | |
220 | 145 | orig_duration = pkt->duration; | |
221 | 145 | s_idx = pkt->stream_index; | |
222 | 145 | src_tb = avf->streams[s_idx]->time_base; | |
223 | 145 | dst_tb = avf2->streams[s_idx]->time_base; | |
224 | 145 | av_packet_rescale_ts(pkt, src_tb, dst_tb); | |
225 | |||
226 | 145 | ret = av_write_frame(avf2, pkt); | |
227 |
2/2✓ Branch 0 taken 100 times.
✓ Branch 1 taken 45 times.
|
145 | if (ret >= 0) { |
228 | 100 | av_packet_unref(pkt); | |
229 | } else { | ||
230 | // avoid scaling twice | ||
231 | 45 | pkt->pts = orig_pts; | |
232 | 45 | pkt->dts = orig_dts; | |
233 | 45 | pkt->duration = orig_duration; | |
234 | } | ||
235 | 145 | return ret; | |
236 | } | ||
237 | |||
238 | 51 | static int fifo_thread_write_trailer(FifoThreadContext *ctx) | |
239 | { | ||
240 | 51 | AVFormatContext *avf = ctx->avf; | |
241 | 51 | FifoContext *fifo = avf->priv_data; | |
242 | 51 | AVFormatContext *avf2 = fifo->avf; | |
243 | int ret; | ||
244 | |||
245 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 51 times.
|
51 | if (!ctx->header_written) |
246 | ✗ | return 0; | |
247 | |||
248 | 51 | ret = av_write_trailer(avf2); | |
249 | 51 | ff_format_io_close(avf2, &avf2->pb); | |
250 | |||
251 | 51 | return ret; | |
252 | } | ||
253 | |||
254 | 154 | static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg) | |
255 | { | ||
256 | 154 | int ret = AVERROR(EINVAL); | |
257 | |||
258 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 154 times.
|
154 | if (msg->type == FIFO_NOOP) |
259 | ✗ | return 0; | |
260 | |||
261 |
2/2✓ Branch 0 taken 51 times.
✓ Branch 1 taken 103 times.
|
154 | if (!ctx->header_written) { |
262 | 51 | ret = fifo_thread_write_header(ctx); | |
263 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 51 times.
|
51 | if (ret < 0) |
264 | ✗ | return ret; | |
265 | } | ||
266 | |||
267 |
3/4✓ Branch 0 taken 6 times.
✓ Branch 1 taken 145 times.
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
|
154 | switch(msg->type) { |
268 | 6 | case FIFO_WRITE_HEADER: | |
269 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | av_assert0(ret >= 0); |
270 | 6 | return ret; | |
271 | 145 | case FIFO_WRITE_PACKET: | |
272 | 145 | return fifo_thread_write_packet(ctx, &msg->pkt); | |
273 | 3 | case FIFO_FLUSH_OUTPUT: | |
274 | 3 | return fifo_thread_flush_output(ctx); | |
275 | } | ||
276 | |||
277 | ✗ | av_assert0(0); | |
278 | return AVERROR(EINVAL); | ||
279 | } | ||
280 | |||
281 | 75 | static int is_recoverable(const FifoContext *fifo, int err_no) { | |
282 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 75 times.
|
75 | if (!fifo->attempt_recovery) |
283 | ✗ | return 0; | |
284 | |||
285 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 75 times.
|
75 | if (fifo->recover_any_error) |
286 | ✗ | return err_no != AVERROR_EXIT; | |
287 | |||
288 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 75 times.
|
75 | switch (err_no) { |
289 | ✗ | case AVERROR(EINVAL): | |
290 | case AVERROR(ENOSYS): | ||
291 | case AVERROR_EOF: | ||
292 | case AVERROR_EXIT: | ||
293 | case AVERROR_PATCHWELCOME: | ||
294 | ✗ | return 0; | |
295 | 75 | default: | |
296 | 75 | return 1; | |
297 | } | ||
298 | } | ||
299 | |||
300 | 3 | static void free_message(void *msg) | |
301 | { | ||
302 | 3 | FifoMessage *fifo_msg = msg; | |
303 | |||
304 |
1/2✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
|
3 | if (fifo_msg->type == FIFO_WRITE_PACKET) |
305 | 3 | av_packet_unref(&fifo_msg->pkt); | |
306 | 3 | } | |
307 | |||
308 | 30 | static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt, | |
309 | int err_no) | ||
310 | { | ||
311 | 30 | AVFormatContext *avf = ctx->avf; | |
312 | 30 | FifoContext *fifo = avf->priv_data; | |
313 | int ret; | ||
314 | |||
315 | 30 | av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n", | |
316 | 30 | av_err2str(err_no)); | |
317 | |||
318 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
|
30 | if (fifo->recovery_wait_streamtime) { |
319 | ✗ | if (pkt->pts == AV_NOPTS_VALUE) | |
320 | ✗ | av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation" | |
321 | " timestamp, recovery will be attempted immediately"); | ||
322 | ✗ | ctx->last_recovery_ts = pkt->pts; | |
323 | } else { | ||
324 | 30 | ctx->last_recovery_ts = av_gettime_relative(); | |
325 | } | ||
326 | |||
327 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
|
30 | if (fifo->max_recovery_attempts && |
328 | ✗ | ctx->recovery_nr >= fifo->max_recovery_attempts) { | |
329 | ✗ | av_log(avf, AV_LOG_ERROR, | |
330 | "Maximal number of %d recovery attempts reached.\n", | ||
331 | fifo->max_recovery_attempts); | ||
332 | ✗ | ret = err_no; | |
333 | } else { | ||
334 | 30 | ret = AVERROR(EAGAIN); | |
335 | } | ||
336 | |||
337 | 30 | return ret; | |
338 | } | ||
339 | |||
340 | 45 | static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no) | |
341 | { | ||
342 | 45 | AVFormatContext *avf = ctx->avf; | |
343 | 45 | FifoContext *fifo = avf->priv_data; | |
344 | 45 | AVPacket *pkt = &msg->pkt; | |
345 | int64_t time_since_recovery; | ||
346 | int ret; | ||
347 | |||
348 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 45 times.
|
45 | if (!is_recoverable(fifo, err_no)) { |
349 | ✗ | ret = err_no; | |
350 | ✗ | goto fail; | |
351 | } | ||
352 | |||
353 |
1/2✓ Branch 0 taken 45 times.
✗ Branch 1 not taken.
|
45 | if (ctx->header_written) { |
354 | 45 | fifo->write_trailer_ret = fifo_thread_write_trailer(ctx); | |
355 | 45 | ctx->header_written = 0; | |
356 | } | ||
357 | |||
358 |
2/2✓ Branch 0 taken 15 times.
✓ Branch 1 taken 30 times.
|
45 | if (!ctx->recovery_nr) { |
359 | 15 | ctx->last_recovery_ts = fifo->recovery_wait_streamtime ? | |
360 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
|
15 | AV_NOPTS_VALUE : 0; |
361 | } else { | ||
362 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
|
30 | if (fifo->recovery_wait_streamtime) { |
363 | ✗ | if (ctx->last_recovery_ts == AV_NOPTS_VALUE) { | |
364 | ✗ | AVRational tb = avf->streams[pkt->stream_index]->time_base; | |
365 | ✗ | time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts, | |
366 | ✗ | tb, AV_TIME_BASE_Q); | |
367 | } else { | ||
368 | /* Enforce recovery immediately */ | ||
369 | ✗ | time_since_recovery = fifo->recovery_wait_time; | |
370 | } | ||
371 | } else { | ||
372 | 30 | time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts; | |
373 | } | ||
374 | |||
375 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
|
30 | if (time_since_recovery < fifo->recovery_wait_time) |
376 | ✗ | return AVERROR(EAGAIN); | |
377 | } | ||
378 | |||
379 | 45 | ctx->recovery_nr++; | |
380 | |||
381 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 45 times.
|
45 | if (fifo->max_recovery_attempts) { |
382 | ✗ | av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n", | |
383 | ctx->recovery_nr, fifo->max_recovery_attempts); | ||
384 | } else { | ||
385 | 45 | av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n", | |
386 | ctx->recovery_nr); | ||
387 | } | ||
388 | |||
389 |
1/4✗ Branch 0 not taken.
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
45 | if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow) |
390 | ✗ | ctx->drop_until_keyframe = 1; | |
391 | |||
392 | 45 | ret = fifo_thread_dispatch_message(ctx, msg); | |
393 |
2/2✓ Branch 0 taken 30 times.
✓ Branch 1 taken 15 times.
|
45 | if (ret < 0) { |
394 |
1/2✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
|
30 | if (is_recoverable(fifo, ret)) { |
395 | 30 | return fifo_thread_process_recovery_failure(ctx, pkt, ret); | |
396 | } else { | ||
397 | ✗ | goto fail; | |
398 | } | ||
399 | } else { | ||
400 | 15 | av_log(avf, AV_LOG_INFO, "Recovery successful\n"); | |
401 | 15 | ctx->recovery_nr = 0; | |
402 | } | ||
403 | |||
404 | 15 | return 0; | |
405 | |||
406 | ✗ | fail: | |
407 | ✗ | free_message(msg); | |
408 | ✗ | return ret; | |
409 | } | ||
410 | |||
411 | 15 | static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no) | |
412 | { | ||
413 | 15 | AVFormatContext *avf = ctx->avf; | |
414 | 15 | FifoContext *fifo = avf->priv_data; | |
415 | int ret; | ||
416 | |||
417 | do { | ||
418 |
3/4✓ Branch 0 taken 45 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 30 times.
✓ Branch 3 taken 15 times.
|
45 | if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) { |
419 | 30 | int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts; | |
420 | 30 | int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery); | |
421 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
|
30 | if (time_to_wait) |
422 | ✗ | av_usleep(FFMIN(10000, time_to_wait)); | |
423 | } | ||
424 | |||
425 | 45 | ret = fifo_thread_attempt_recovery(ctx, msg, err_no); | |
426 |
3/4✓ Branch 0 taken 30 times.
✓ Branch 1 taken 15 times.
✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
|
45 | } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow); |
427 | |||
428 |
1/4✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
15 | if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) { |
429 | ✗ | if (msg->type == FIFO_WRITE_PACKET) | |
430 | ✗ | av_packet_unref(&msg->pkt); | |
431 | ✗ | ret = 0; | |
432 | } | ||
433 | |||
434 | 15 | return ret; | |
435 | } | ||
436 | |||
437 | 6 | static void *fifo_consumer_thread(void *data) | |
438 | { | ||
439 | 6 | AVFormatContext *avf = data; | |
440 | 6 | FifoContext *fifo = avf->priv_data; | |
441 | 6 | AVThreadMessageQueue *queue = fifo->queue; | |
442 | 6 | FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}}; | |
443 | int ret, i; | ||
444 | |||
445 | FifoThreadContext fifo_thread_ctx; | ||
446 | 6 | memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext)); | |
447 | 6 | fifo_thread_ctx.avf = avf; | |
448 | 6 | fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE; | |
449 | |||
450 | 6 | ff_thread_setname("fifo-consumer"); | |
451 | |||
452 |
2/2✓ Branch 0 taken 6 times.
✓ Branch 1 taken 5 times.
|
11 | for (i = 0; i < avf->nb_streams; i++) { |
453 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 5 times.
|
6 | if (avf->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) { |
454 | 1 | fifo_thread_ctx.has_video_stream = 1; | |
455 | 1 | break; | |
456 | } | ||
457 | } | ||
458 | |||
459 | 103 | while (1) { | |
460 | 109 | uint8_t just_flushed = 0; | |
461 | |||
462 |
1/2✓ Branch 0 taken 109 times.
✗ Branch 1 not taken.
|
109 | if (!fifo_thread_ctx.recovery_nr) |
463 | 109 | ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg); | |
464 | |||
465 |
3/4✓ Branch 0 taken 94 times.
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 94 times.
|
109 | if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) { |
466 | 15 | int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret); | |
467 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
|
15 | if (rec_ret < 0) { |
468 | ✗ | av_thread_message_queue_set_err_send(queue, rec_ret); | |
469 | ✗ | break; | |
470 | } | ||
471 | } | ||
472 | |||
473 | /* If the queue is full at the moment when fifo_write_packet | ||
474 | * attempts to insert new message (packet) to the queue, | ||
475 | * it sets the fifo->overflow_flag to 1 and drops packet. | ||
476 | * Here in consumer thread, the flag is checked and if it is | ||
477 | * set, the queue is flushed and flag cleared. */ | ||
478 | 109 | pthread_mutex_lock(&fifo->overflow_flag_lock); | |
479 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 108 times.
|
109 | if (fifo->overflow_flag) { |
480 | 1 | av_thread_message_flush(queue); | |
481 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (fifo->restart_with_keyframe) |
482 | ✗ | fifo_thread_ctx.drop_until_keyframe = 1; | |
483 | 1 | fifo->overflow_flag = 0; | |
484 | 1 | just_flushed = 1; | |
485 | } | ||
486 | 109 | pthread_mutex_unlock(&fifo->overflow_flag_lock); | |
487 | |||
488 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 108 times.
|
109 | if (just_flushed) |
489 | 1 | av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n"); | |
490 | |||
491 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 109 times.
|
109 | if (fifo->timeshift) |
492 | ✗ | while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift) | |
493 | ✗ | av_usleep(10000); | |
494 | |||
495 | 109 | ret = av_thread_message_queue_recv(queue, &msg, 0); | |
496 |
2/2✓ Branch 0 taken 6 times.
✓ Branch 1 taken 103 times.
|
109 | if (ret < 0) { |
497 | 6 | av_thread_message_queue_set_err_send(queue, ret); | |
498 | 6 | break; | |
499 | } | ||
500 | } | ||
501 | |||
502 | 6 | fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx); | |
503 | |||
504 | 6 | return NULL; | |
505 | } | ||
506 | |||
507 | 6 | static int fifo_mux_init(AVFormatContext *avf, const AVOutputFormat *oformat, | |
508 | const char *filename) | ||
509 | { | ||
510 | 6 | FifoContext *fifo = avf->priv_data; | |
511 | AVFormatContext *avf2; | ||
512 | 6 | int ret = 0, i; | |
513 | |||
514 | 6 | ret = avformat_alloc_output_context2(&avf2, oformat, NULL, filename); | |
515 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (ret < 0) |
516 | ✗ | return ret; | |
517 | |||
518 | 6 | fifo->avf = avf2; | |
519 | |||
520 | 6 | avf2->interrupt_callback = avf->interrupt_callback; | |
521 | 6 | avf2->max_delay = avf->max_delay; | |
522 | 6 | ret = av_dict_copy(&avf2->metadata, avf->metadata, 0); | |
523 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (ret < 0) |
524 | ✗ | return ret; | |
525 | 6 | avf2->opaque = avf->opaque; | |
526 | 6 | avf2->io_close2 = avf->io_close2; | |
527 | 6 | avf2->io_open = avf->io_open; | |
528 | 6 | avf2->flags = avf->flags; | |
529 | |||
530 |
2/2✓ Branch 0 taken 7 times.
✓ Branch 1 taken 6 times.
|
13 | for (i = 0; i < avf->nb_streams; ++i) { |
531 | 7 | AVStream *st = ff_stream_clone(avf2, avf->streams[i]); | |
532 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
|
7 | if (!st) |
533 | ✗ | return AVERROR(ENOMEM); | |
534 | } | ||
535 | |||
536 | 6 | return 0; | |
537 | } | ||
538 | |||
539 | 6 | static int fifo_init(AVFormatContext *avf) | |
540 | { | ||
541 | 6 | FifoContext *fifo = avf->priv_data; | |
542 | const AVOutputFormat *oformat; | ||
543 | 6 | int ret = 0; | |
544 | |||
545 |
1/4✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
6 | if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) { |
546 | ✗ | av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on" | |
547 | " only when drop_pkts_on_overflow is also turned on\n"); | ||
548 | ✗ | return AVERROR(EINVAL); | |
549 | } | ||
550 | 6 | atomic_init(&fifo->queue_duration, 0); | |
551 | 6 | fifo->last_sent_dts = AV_NOPTS_VALUE; | |
552 | |||
553 | #ifdef FIFO_TEST | ||
554 | /* This exists for the fifo_muxer test tool. */ | ||
555 |
2/4✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
|
4 | if (fifo->format && !strcmp(fifo->format, "fifo_test")) { |
556 | extern const FFOutputFormat ff_fifo_test_muxer; | ||
557 | 4 | oformat = &ff_fifo_test_muxer.p; | |
558 | } else | ||
559 | #endif | ||
560 | 2 | oformat = av_guess_format(fifo->format, avf->url, NULL); | |
561 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (!oformat) { |
562 | ✗ | ret = AVERROR_MUXER_NOT_FOUND; | |
563 | ✗ | return ret; | |
564 | } | ||
565 | |||
566 | 6 | ret = fifo_mux_init(avf, oformat, avf->url); | |
567 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (ret < 0) |
568 | ✗ | return ret; | |
569 | |||
570 | 6 | ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size, | |
571 | sizeof(FifoMessage)); | ||
572 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (ret < 0) |
573 | ✗ | return ret; | |
574 | |||
575 | 6 | av_thread_message_queue_set_free_func(fifo->queue, free_message); | |
576 | |||
577 | 6 | ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL); | |
578 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (ret < 0) |
579 | ✗ | return AVERROR(ret); | |
580 | 6 | fifo->overflow_flag_lock_initialized = 1; | |
581 | |||
582 | 6 | return 0; | |
583 | } | ||
584 | |||
585 | 6 | static int fifo_write_header(AVFormatContext *avf) | |
586 | { | ||
587 | 6 | FifoContext * fifo = avf->priv_data; | |
588 | int ret; | ||
589 | |||
590 | 6 | ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf); | |
591 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (ret) { |
592 | ✗ | av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n", | |
593 | ✗ | av_err2str(AVERROR(ret))); | |
594 | ✗ | ret = AVERROR(ret); | |
595 | } | ||
596 | |||
597 | 6 | return ret; | |
598 | } | ||
599 | |||
600 | 109 | static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt) | |
601 | { | ||
602 | 109 | FifoContext *fifo = avf->priv_data; | |
603 |
2/2✓ Branch 0 taken 106 times.
✓ Branch 1 taken 3 times.
|
109 | FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT}; |
604 | int ret; | ||
605 | |||
606 |
2/2✓ Branch 0 taken 106 times.
✓ Branch 1 taken 3 times.
|
109 | if (pkt) { |
607 | 106 | ret = av_packet_ref(&msg.pkt,pkt); | |
608 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 106 times.
|
106 | if (ret < 0) |
609 | ✗ | return ret; | |
610 | } | ||
611 | |||
612 | 109 | ret = av_thread_message_queue_send(fifo->queue, &msg, | |
613 | 109 | fifo->drop_pkts_on_overflow ? | |
614 | AV_THREAD_MESSAGE_NONBLOCK : 0); | ||
615 |
2/2✓ Branch 0 taken 3 times.
✓ Branch 1 taken 106 times.
|
109 | if (ret == AVERROR(EAGAIN)) { |
616 | 3 | uint8_t overflow_set = 0; | |
617 | |||
618 | /* Queue is full, set fifo->overflow_flag to 1 | ||
619 | * to let consumer thread know the queue should | ||
620 | * be flushed. */ | ||
621 | 3 | pthread_mutex_lock(&fifo->overflow_flag_lock); | |
622 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 2 times.
|
3 | if (!fifo->overflow_flag) |
623 | 1 | fifo->overflow_flag = overflow_set = 1; | |
624 | 3 | pthread_mutex_unlock(&fifo->overflow_flag_lock); | |
625 | |||
626 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 2 times.
|
3 | if (overflow_set) |
627 | 1 | av_log(avf, AV_LOG_WARNING, "FIFO queue full\n"); | |
628 | 3 | ret = 0; | |
629 | 3 | goto fail; | |
630 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 106 times.
|
106 | } else if (ret < 0) { |
631 | ✗ | goto fail; | |
632 | } | ||
633 | |||
634 |
1/6✗ Branch 0 not taken.
✓ Branch 1 taken 106 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
|
106 | if (fifo->timeshift && pkt && pkt->dts != AV_NOPTS_VALUE) |
635 | ✗ | atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed); | |
636 | |||
637 | 106 | return ret; | |
638 | 3 | fail: | |
639 |
1/2✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
|
3 | if (pkt) |
640 | 3 | av_packet_unref(&msg.pkt); | |
641 | 3 | return ret; | |
642 | } | ||
643 | |||
644 | 6 | static int fifo_write_trailer(AVFormatContext *avf) | |
645 | { | ||
646 | 6 | FifoContext *fifo= avf->priv_data; | |
647 | int ret; | ||
648 | |||
649 | 6 | av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF); | |
650 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (fifo->timeshift) { |
651 | ✗ | int64_t now = av_gettime_relative(); | |
652 | ✗ | int64_t elapsed = 0; | |
653 | ✗ | FifoMessage msg = {FIFO_NOOP}; | |
654 | do { | ||
655 | ✗ | int64_t delay = av_gettime_relative() - now; | |
656 | ✗ | if (delay < 0) { // Discontinuity? | |
657 | ✗ | delay = 10000; | |
658 | ✗ | now = av_gettime_relative(); | |
659 | } else { | ||
660 | ✗ | now += delay; | |
661 | } | ||
662 | ✗ | atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed); | |
663 | ✗ | elapsed += delay; | |
664 | ✗ | if (elapsed > fifo->timeshift) | |
665 | ✗ | break; | |
666 | ✗ | av_usleep(10000); | |
667 | ✗ | ret = av_thread_message_queue_send(fifo->queue, &msg, AV_THREAD_MESSAGE_NONBLOCK); | |
668 | ✗ | } while (ret >= 0 || ret == AVERROR(EAGAIN)); | |
669 | ✗ | atomic_store(&fifo->queue_duration, INT64_MAX); | |
670 | } | ||
671 | |||
672 | 6 | ret = pthread_join(fifo->writer_thread, NULL); | |
673 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (ret < 0) { |
674 | ✗ | av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n", | |
675 | ✗ | av_err2str(AVERROR(ret))); | |
676 | ✗ | return AVERROR(ret); | |
677 | } | ||
678 | |||
679 | 6 | ret = fifo->write_trailer_ret; | |
680 | 6 | return ret; | |
681 | } | ||
682 | |||
683 | 6 | static void fifo_deinit(AVFormatContext *avf) | |
684 | { | ||
685 | 6 | FifoContext *fifo = avf->priv_data; | |
686 | |||
687 | 6 | avformat_free_context(fifo->avf); | |
688 | 6 | av_thread_message_queue_free(&fifo->queue); | |
689 |
1/2✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
|
6 | if (fifo->overflow_flag_lock_initialized) |
690 | 6 | pthread_mutex_destroy(&fifo->overflow_flag_lock); | |
691 | 6 | } | |
692 | |||
693 | #define OFFSET(x) offsetof(FifoContext, x) | ||
694 | static const AVOption options[] = { | ||
695 | {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery), | ||
696 | AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, | ||
697 | |||
698 | {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow), | ||
699 | AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, | ||
700 | |||
701 | {"fifo_format", "Target muxer", OFFSET(format), | ||
702 | AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM}, | ||
703 | |||
704 | {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options), | ||
705 | AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM}, | ||
706 | |||
707 | {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts), | ||
708 | AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM}, | ||
709 | |||
710 | {"queue_size", "Size of fifo queue", OFFSET(queue_size), | ||
711 | AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM}, | ||
712 | |||
713 | {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery", | ||
714 | OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, | ||
715 | |||
716 | {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time), | ||
717 | AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM}, | ||
718 | |||
719 | {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error), | ||
720 | AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, | ||
721 | |||
722 | {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe), | ||
723 | AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, | ||
724 | |||
725 | {"timeshift", "Delay fifo output", OFFSET(timeshift), | ||
726 | AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM}, | ||
727 | |||
728 | {NULL}, | ||
729 | }; | ||
730 | |||
731 | static const AVClass fifo_muxer_class = { | ||
732 | .class_name = "Fifo muxer", | ||
733 | .item_name = av_default_item_name, | ||
734 | .option = options, | ||
735 | .version = LIBAVUTIL_VERSION_INT, | ||
736 | }; | ||
737 | |||
738 | const FFOutputFormat ff_fifo_muxer = { | ||
739 | .p.name = "fifo", | ||
740 | .p.long_name = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"), | ||
741 | .p.priv_class = &fifo_muxer_class, | ||
742 | .p.flags = AVFMT_NOFILE | AVFMT_TS_NEGATIVE, | ||
743 | .priv_data_size = sizeof(FifoContext), | ||
744 | .init = fifo_init, | ||
745 | .write_header = fifo_write_header, | ||
746 | .write_packet = fifo_write_packet, | ||
747 | .write_trailer = fifo_write_trailer, | ||
748 | .deinit = fifo_deinit, | ||
749 | .flags_internal = FF_OFMT_FLAG_ALLOW_FLUSH, | ||
750 | }; | ||
751 |