LCOV - code coverage report
Current view: top level - libavformat - fifo.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 226 295 76.6 %
Date: 2017-12-13 10:57:33 Functions: 17 17 100.0 %

          Line data    Source code
       1             : /*
       2             :  * FIFO pseudo-muxer
       3             :  * Copyright (c) 2016 Jan Sebechlebsky
       4             :  *
       5             :  * This file is part of FFmpeg.
       6             :  *
       7             :  * FFmpeg is free software; you can redistribute it and/or
       8             :  * modify it under the terms of the GNU Lesser General Public License
       9             :  * as published by the Free Software Foundation; either
      10             :  * version 2.1 of the License, or (at your option) any later version.
      11             :  *
      12             :  * FFmpeg is distributed in the hope that it will be useful,
      13             :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      14             :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      15             :  * GNU Lesser General Public License for more details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
      20             :  */
      21             : 
      22             : #include "libavutil/avassert.h"
      23             : #include "libavutil/opt.h"
      24             : #include "libavutil/time.h"
      25             : #include "libavutil/thread.h"
      26             : #include "libavutil/threadmessage.h"
      27             : #include "avformat.h"
      28             : #include "internal.h"
      29             : 
      30             : #define FIFO_DEFAULT_QUEUE_SIZE              60
      31             : #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS   0
      32             : #define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds
      33             : 
      34             : typedef struct FifoContext {
      35             :     const AVClass *class;
      36             :     AVFormatContext *avf;
      37             : 
      38             :     char *format;
      39             :     char *format_options_str;
      40             :     AVDictionary *format_options;
      41             : 
      42             :     int queue_size;
      43             :     AVThreadMessageQueue *queue;
      44             : 
      45             :     pthread_t writer_thread;
      46             : 
      47             :     /* Return value of last write_trailer_call */
      48             :     int write_trailer_ret;
      49             : 
      50             :     /* Time to wait before next recovery attempt
      51             :      * This can refer to the time in processed stream,
      52             :      * or real time. */
      53             :     int64_t recovery_wait_time;
      54             : 
      55             :     /* Maximal number of unsuccessful successive recovery attempts */
      56             :     int max_recovery_attempts;
      57             : 
      58             :     /* Whether to attempt recovery from failure */
      59             :     int attempt_recovery;
      60             : 
      61             :     /* If >0 stream time will be used when waiting
      62             :      * for the recovery attempt instead of real time */
      63             :     int recovery_wait_streamtime;
      64             : 
      65             :     /* If >0 recovery will be attempted regardless of error code
      66             :      * (except AVERROR_EXIT, so exit request is never ignored) */
      67             :     int recover_any_error;
      68             : 
      69             :     /* Whether to drop packets in case the queue is full. */
      70             :     int drop_pkts_on_overflow;
      71             : 
      72             :     /* Whether to wait for keyframe when recovering
      73             :      * from failure or queue overflow */
      74             :     int restart_with_keyframe;
      75             : 
      76             :     pthread_mutex_t overflow_flag_lock;
      77             :     int overflow_flag_lock_initialized;
      78             :     /* Value > 0 signals queue overflow */
      79             :     volatile uint8_t overflow_flag;
      80             : 
      81             : } FifoContext;
      82             : 
      83             : typedef struct FifoThreadContext {
      84             :     AVFormatContext *avf;
      85             : 
      86             :     /* Timestamp of last failure.
      87             :      * This is either pts in case stream time is used,
      88             :      * or microseconds as returned by av_getttime_relative() */
      89             :     int64_t last_recovery_ts;
      90             : 
      91             :     /* Number of current recovery process
      92             :      * Value > 0 means we are in recovery process */
      93             :     int recovery_nr;
      94             : 
      95             :     /* If > 0 all frames will be dropped until keyframe is received */
      96             :     uint8_t drop_until_keyframe;
      97             : 
      98             :     /* Value > 0 means that the previous write_header call was successful
      99             :      * so finalization by calling write_trailer and ff_io_close must be done
     100             :      * before exiting / reinitialization of underlying muxer */
     101             :     uint8_t header_written;
     102             : } FifoThreadContext;
     103             : 
     104             : typedef enum FifoMessageType {
     105             :     FIFO_WRITE_HEADER,
     106             :     FIFO_WRITE_PACKET,
     107             :     FIFO_FLUSH_OUTPUT
     108             : } FifoMessageType;
     109             : 
     110             : typedef struct FifoMessage {
     111             :     FifoMessageType type;
     112             :     AVPacket pkt;
     113             : } FifoMessage;
     114             : 
     115          51 : static int fifo_thread_write_header(FifoThreadContext *ctx)
     116             : {
     117          51 :     AVFormatContext *avf = ctx->avf;
     118          51 :     FifoContext *fifo = avf->priv_data;
     119          51 :     AVFormatContext *avf2 = fifo->avf;
     120          51 :     AVDictionary *format_options = NULL;
     121             :     int ret, i;
     122             : 
     123          51 :     ret = av_dict_copy(&format_options, fifo->format_options, 0);
     124          51 :     if (ret < 0)
     125           0 :         return ret;
     126             : 
     127          51 :     ret = ff_format_output_open(avf2, avf->filename, &format_options);
     128          51 :     if (ret < 0) {
     129           0 :         av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->filename,
     130           0 :                av_err2str(ret));
     131           0 :         goto end;
     132             :     }
     133             : 
     134         103 :     for (i = 0;i < avf2->nb_streams; i++)
     135          52 :         avf2->streams[i]->cur_dts = 0;
     136             : 
     137          51 :     ret = avformat_write_header(avf2, &format_options);
     138          51 :     if (!ret)
     139          51 :         ctx->header_written = 1;
     140             : 
     141             :     // Check for options unrecognized by underlying muxer
     142          51 :     if (format_options) {
     143           0 :         AVDictionaryEntry *entry = NULL;
     144           0 :         while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX)))
     145           0 :             av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
     146           0 :         ret = AVERROR(EINVAL);
     147             :     }
     148             : 
     149         102 : end:
     150          51 :     av_dict_free(&format_options);
     151          51 :     return ret;
     152             : }
     153             : 
     154           3 : static int fifo_thread_flush_output(FifoThreadContext *ctx)
     155             : {
     156           3 :     AVFormatContext *avf = ctx->avf;
     157           3 :     FifoContext *fifo = avf->priv_data;
     158           3 :     AVFormatContext *avf2 = fifo->avf;
     159             : 
     160           3 :     return av_write_frame(avf2, NULL);
     161             : }
     162             : 
     163         217 : static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
     164             : {
     165         217 :     AVFormatContext *avf = ctx->avf;
     166         217 :     FifoContext *fifo = avf->priv_data;
     167         217 :     AVFormatContext *avf2 = fifo->avf;
     168             :     AVRational src_tb, dst_tb;
     169             :     int ret, s_idx;
     170             : 
     171         217 :     if (ctx->drop_until_keyframe) {
     172           0 :         if (pkt->flags & AV_PKT_FLAG_KEY) {
     173           0 :             ctx->drop_until_keyframe = 0;
     174           0 :             av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
     175             :         } else {
     176           0 :             av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
     177           0 :             av_packet_unref(pkt);
     178           0 :             return 0;
     179             :         }
     180             :     }
     181             : 
     182         217 :     s_idx = pkt->stream_index;
     183         217 :     src_tb = avf->streams[s_idx]->time_base;
     184         217 :     dst_tb = avf2->streams[s_idx]->time_base;
     185         217 :     av_packet_rescale_ts(pkt, src_tb, dst_tb);
     186             : 
     187         217 :     ret = av_write_frame(avf2, pkt);
     188         217 :     if (ret >= 0)
     189         172 :         av_packet_unref(pkt);
     190         217 :     return ret;
     191             : }
     192             : 
     193          51 : static int fifo_thread_write_trailer(FifoThreadContext *ctx)
     194             : {
     195          51 :     AVFormatContext *avf = ctx->avf;
     196          51 :     FifoContext *fifo = avf->priv_data;
     197          51 :     AVFormatContext *avf2 = fifo->avf;
     198             :     int ret;
     199             : 
     200          51 :     if (!ctx->header_written)
     201           0 :         return 0;
     202             : 
     203          51 :     ret = av_write_trailer(avf2);
     204          51 :     ff_format_io_close(avf2, &avf2->pb);
     205             : 
     206          51 :     return ret;
     207             : }
     208             : 
     209         226 : static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
     210             : {
     211         226 :     int ret = AVERROR(EINVAL);
     212             : 
     213         226 :     if (!ctx->header_written) {
     214          51 :         ret = fifo_thread_write_header(ctx);
     215          51 :         if (ret < 0)
     216           0 :             return ret;
     217             :     }
     218             : 
     219         226 :     switch(msg->type) {
     220           6 :     case FIFO_WRITE_HEADER:
     221           6 :         av_assert0(ret >= 0);
     222           6 :         return ret;
     223         217 :     case FIFO_WRITE_PACKET:
     224         217 :         return fifo_thread_write_packet(ctx, &msg->pkt);
     225           3 :     case FIFO_FLUSH_OUTPUT:
     226           3 :         return fifo_thread_flush_output(ctx);
     227             :     }
     228             : 
     229           0 :     av_assert0(0);
     230             :     return AVERROR(EINVAL);
     231             : }
     232             : 
     233          75 : static int is_recoverable(const FifoContext *fifo, int err_no) {
     234          75 :     if (!fifo->attempt_recovery)
     235           0 :         return 0;
     236             : 
     237          75 :     if (fifo->recover_any_error)
     238           0 :         return err_no != AVERROR_EXIT;
     239             : 
     240          75 :     switch (err_no) {
     241           0 :     case AVERROR(EINVAL):
     242             :     case AVERROR(ENOSYS):
     243             :     case AVERROR_EOF:
     244             :     case AVERROR_EXIT:
     245             :     case AVERROR_PATCHWELCOME:
     246           0 :         return 0;
     247          75 :     default:
     248          75 :         return 1;
     249             :     }
     250             : }
     251             : 
     252           3 : static void free_message(void *msg)
     253             : {
     254           3 :     FifoMessage *fifo_msg = msg;
     255             : 
     256           3 :     if (fifo_msg->type == FIFO_WRITE_PACKET)
     257           3 :         av_packet_unref(&fifo_msg->pkt);
     258           3 : }
     259             : 
     260          30 : static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt,
     261             :                                                 int err_no)
     262             : {
     263          30 :     AVFormatContext *avf = ctx->avf;
     264          30 :     FifoContext *fifo = avf->priv_data;
     265             :     int ret;
     266             : 
     267          30 :     av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
     268          30 :            av_err2str(err_no));
     269             : 
     270          30 :     if (fifo->recovery_wait_streamtime) {
     271           0 :         if (pkt->pts == AV_NOPTS_VALUE)
     272           0 :             av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
     273             :                    " timestamp, recovery will be attempted immediately");
     274           0 :         ctx->last_recovery_ts = pkt->pts;
     275             :     } else {
     276          30 :         ctx->last_recovery_ts = av_gettime_relative();
     277             :     }
     278             : 
     279          30 :     if (fifo->max_recovery_attempts &&
     280           0 :         ctx->recovery_nr >= fifo->max_recovery_attempts) {
     281           0 :         av_log(avf, AV_LOG_ERROR,
     282             :                "Maximal number of %d recovery attempts reached.\n",
     283             :                fifo->max_recovery_attempts);
     284           0 :         ret = err_no;
     285             :     } else {
     286          30 :         ret = AVERROR(EAGAIN);
     287             :     }
     288             : 
     289          30 :     return ret;
     290             : }
     291             : 
     292          45 : static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
     293             : {
     294          45 :     AVFormatContext *avf = ctx->avf;
     295          45 :     FifoContext *fifo = avf->priv_data;
     296          45 :     AVPacket *pkt = &msg->pkt;
     297             :     int64_t time_since_recovery;
     298             :     int ret;
     299             : 
     300          45 :     if (!is_recoverable(fifo, err_no)) {
     301           0 :         ret = err_no;
     302           0 :         goto fail;
     303             :     }
     304             : 
     305          45 :     if (ctx->header_written) {
     306          45 :         fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
     307          45 :         ctx->header_written = 0;
     308             :     }
     309             : 
     310          45 :     if (!ctx->recovery_nr) {
     311          30 :         ctx->last_recovery_ts = fifo->recovery_wait_streamtime ?
     312          15 :                                 AV_NOPTS_VALUE : 0;
     313             :     } else {
     314          30 :         if (fifo->recovery_wait_streamtime) {
     315           0 :             if (ctx->last_recovery_ts == AV_NOPTS_VALUE) {
     316           0 :                 AVRational tb = avf->streams[pkt->stream_index]->time_base;
     317           0 :                 time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
     318           0 :                                                    tb, AV_TIME_BASE_Q);
     319             :             } else {
     320             :                 /* Enforce recovery immediately */
     321           0 :                 time_since_recovery = fifo->recovery_wait_time;
     322             :             }
     323             :         } else {
     324          30 :             time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
     325             :         }
     326             : 
     327          30 :         if (time_since_recovery < fifo->recovery_wait_time)
     328           0 :             return AVERROR(EAGAIN);
     329             :     }
     330             : 
     331          45 :     ctx->recovery_nr++;
     332             : 
     333          45 :     if (fifo->max_recovery_attempts) {
     334           0 :         av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n",
     335             :                ctx->recovery_nr, fifo->max_recovery_attempts);
     336             :     } else {
     337          45 :         av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n",
     338             :                ctx->recovery_nr);
     339             :     }
     340             : 
     341          45 :     if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow)
     342           0 :         ctx->drop_until_keyframe = 1;
     343             : 
     344          45 :     ret = fifo_thread_dispatch_message(ctx, msg);
     345          45 :     if (ret < 0) {
     346          30 :         if (is_recoverable(fifo, ret)) {
     347          30 :             return fifo_thread_process_recovery_failure(ctx, pkt, ret);
     348             :         } else {
     349           0 :             goto fail;
     350             :         }
     351             :     } else {
     352          15 :         av_log(avf, AV_LOG_INFO, "Recovery successful\n");
     353          15 :         ctx->recovery_nr = 0;
     354             :     }
     355             : 
     356          15 :     return 0;
     357             : 
     358           0 : fail:
     359           0 :     free_message(msg);
     360           0 :     return ret;
     361             : }
     362             : 
     363          15 : static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
     364             : {
     365          15 :     AVFormatContext *avf = ctx->avf;
     366          15 :     FifoContext *fifo = avf->priv_data;
     367             :     int ret;
     368             : 
     369             :     do {
     370          45 :         if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
     371          30 :             int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
     372          30 :             int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
     373          30 :             if (time_to_wait)
     374           0 :                 av_usleep(FFMIN(10000, time_to_wait));
     375             :         }
     376             : 
     377          45 :         ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
     378          45 :     } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow);
     379             : 
     380          15 :     if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) {
     381           0 :         if (msg->type == FIFO_WRITE_PACKET)
     382           0 :             av_packet_unref(&msg->pkt);
     383           0 :         ret = 0;
     384             :     }
     385             : 
     386          15 :     return ret;
     387             : }
     388             : 
     389           6 : static void *fifo_consumer_thread(void *data)
     390             : {
     391           6 :     AVFormatContext *avf = data;
     392           6 :     FifoContext *fifo = avf->priv_data;
     393           6 :     AVThreadMessageQueue *queue = fifo->queue;
     394           6 :     FifoMessage msg = {FIFO_WRITE_HEADER, {0}};
     395             :     int ret;
     396             : 
     397             :     FifoThreadContext fifo_thread_ctx;
     398           6 :     memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
     399           6 :     fifo_thread_ctx.avf = avf;
     400             : 
     401         175 :     while (1) {
     402         181 :         uint8_t just_flushed = 0;
     403             : 
     404         181 :         if (!fifo_thread_ctx.recovery_nr)
     405         181 :             ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
     406             : 
     407         181 :         if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
     408          15 :             int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
     409          15 :             if (rec_ret < 0) {
     410           0 :                 av_thread_message_queue_set_err_send(queue, rec_ret);
     411           0 :                 break;
     412             :             }
     413             :         }
     414             : 
     415             :         /* If the queue is full at the moment when fifo_write_packet
     416             :          * attempts to insert new message (packet) to the queue,
     417             :          * it sets the fifo->overflow_flag to 1 and drops packet.
     418             :          * Here in consumer thread, the flag is checked and if it is
     419             :          * set, the queue is flushed and flag cleared. */
     420         181 :         pthread_mutex_lock(&fifo->overflow_flag_lock);
     421         181 :         if (fifo->overflow_flag) {
     422           1 :             av_thread_message_flush(queue);
     423           1 :             if (fifo->restart_with_keyframe)
     424           0 :                 fifo_thread_ctx.drop_until_keyframe = 1;
     425           1 :             fifo->overflow_flag = 0;
     426           1 :             just_flushed = 1;
     427             :         }
     428         181 :         pthread_mutex_unlock(&fifo->overflow_flag_lock);
     429             : 
     430         181 :         if (just_flushed)
     431           1 :             av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
     432             : 
     433         181 :         ret = av_thread_message_queue_recv(queue, &msg, 0);
     434         181 :         if (ret < 0) {
     435           6 :             av_thread_message_queue_set_err_send(queue, ret);
     436           6 :             break;
     437             :         }
     438             :     }
     439             : 
     440           6 :     fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
     441             : 
     442           6 :     return NULL;
     443             : }
     444             : 
     445           6 : static int fifo_mux_init(AVFormatContext *avf, AVOutputFormat *oformat,
     446             :                          const char *filename)
     447             : {
     448           6 :     FifoContext *fifo = avf->priv_data;
     449             :     AVFormatContext *avf2;
     450           6 :     int ret = 0, i;
     451             : 
     452           6 :     ret = avformat_alloc_output_context2(&avf2, oformat, NULL, filename);
     453           6 :     if (ret < 0)
     454           0 :         return ret;
     455             : 
     456           6 :     fifo->avf = avf2;
     457             : 
     458           6 :     avf2->interrupt_callback = avf->interrupt_callback;
     459           6 :     avf2->max_delay = avf->max_delay;
     460           6 :     ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
     461           6 :     if (ret < 0)
     462           0 :         return ret;
     463           6 :     avf2->opaque = avf->opaque;
     464           6 :     avf2->io_close = avf->io_close;
     465           6 :     avf2->io_open = avf->io_open;
     466           6 :     avf2->flags = avf->flags;
     467             : 
     468          13 :     for (i = 0; i < avf->nb_streams; ++i) {
     469           7 :         AVStream *st = avformat_new_stream(avf2, NULL);
     470           7 :         if (!st)
     471           0 :             return AVERROR(ENOMEM);
     472             : 
     473           7 :         ret = ff_stream_encode_params_copy(st, avf->streams[i]);
     474           7 :         if (ret < 0)
     475           0 :             return ret;
     476             :     }
     477             : 
     478           6 :     return 0;
     479             : }
     480             : 
     481           6 : static int fifo_init(AVFormatContext *avf)
     482             : {
     483           6 :     FifoContext *fifo = avf->priv_data;
     484             :     AVOutputFormat *oformat;
     485           6 :     int ret = 0;
     486             : 
     487           6 :     if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
     488           0 :         av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
     489             :                " only when drop_pkts_on_overflow is also turned on\n");
     490           0 :         return AVERROR(EINVAL);
     491             :     }
     492             : 
     493           6 :     if (fifo->format_options_str) {
     494           4 :         ret = av_dict_parse_string(&fifo->format_options, fifo->format_options_str,
     495             :                                    "=", ":", 0);
     496           4 :         if (ret < 0) {
     497           0 :             av_log(avf, AV_LOG_ERROR, "Could not parse format options list '%s'\n",
     498             :                    fifo->format_options_str);
     499           0 :             return ret;
     500             :         }
     501             :     }
     502             : 
     503           6 :     oformat = av_guess_format(fifo->format, avf->filename, NULL);
     504           6 :     if (!oformat) {
     505           0 :         ret = AVERROR_MUXER_NOT_FOUND;
     506           0 :         return ret;
     507             :     }
     508             : 
     509           6 :     ret = fifo_mux_init(avf, oformat, avf->filename);
     510           6 :     if (ret < 0)
     511           0 :         return ret;
     512             : 
     513           6 :     ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
     514             :                                         sizeof(FifoMessage));
     515           6 :     if (ret < 0)
     516           0 :         return ret;
     517             : 
     518           6 :     av_thread_message_queue_set_free_func(fifo->queue, free_message);
     519             : 
     520           6 :     ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
     521           6 :     if (ret < 0)
     522           0 :         return AVERROR(ret);
     523           6 :     fifo->overflow_flag_lock_initialized = 1;
     524             : 
     525           6 :     return 0;
     526             : }
     527             : 
     528           6 : static int fifo_write_header(AVFormatContext *avf)
     529             : {
     530           6 :     FifoContext * fifo = avf->priv_data;
     531             :     int ret;
     532             : 
     533           6 :     ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf);
     534           6 :     if (ret) {
     535           0 :         av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
     536           0 :                av_err2str(AVERROR(ret)));
     537           0 :         ret = AVERROR(ret);
     538             :     }
     539             : 
     540           6 :     return ret;
     541             : }
     542             : 
     543         181 : static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
     544             : {
     545         181 :     FifoContext *fifo = avf->priv_data;
     546         181 :     FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
     547             :     int ret;
     548             : 
     549         181 :     if (pkt) {
     550         178 :         av_init_packet(&msg.pkt);
     551         178 :         ret = av_packet_ref(&msg.pkt,pkt);
     552         178 :         if (ret < 0)
     553           0 :             return ret;
     554             :     }
     555             : 
     556         181 :     ret = av_thread_message_queue_send(fifo->queue, &msg,
     557         181 :                                        fifo->drop_pkts_on_overflow ?
     558             :                                        AV_THREAD_MESSAGE_NONBLOCK : 0);
     559         181 :     if (ret == AVERROR(EAGAIN)) {
     560           3 :         uint8_t overflow_set = 0;
     561             : 
     562             :         /* Queue is full, set fifo->overflow_flag to 1
     563             :          * to let consumer thread know the queue should
     564             :          * be flushed. */
     565           3 :         pthread_mutex_lock(&fifo->overflow_flag_lock);
     566           3 :         if (!fifo->overflow_flag)
     567           1 :             fifo->overflow_flag = overflow_set = 1;
     568           3 :         pthread_mutex_unlock(&fifo->overflow_flag_lock);
     569             : 
     570           3 :         if (overflow_set)
     571           1 :             av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
     572           3 :         ret = 0;
     573           3 :         goto fail;
     574         178 :     } else if (ret < 0) {
     575           0 :         goto fail;
     576             :     }
     577             : 
     578         178 :     return ret;
     579           3 : fail:
     580           3 :     if (pkt)
     581           3 :         av_packet_unref(&msg.pkt);
     582           3 :     return ret;
     583             : }
     584             : 
     585           6 : static int fifo_write_trailer(AVFormatContext *avf)
     586             : {
     587           6 :     FifoContext *fifo= avf->priv_data;
     588             :     int ret;
     589             : 
     590           6 :     av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
     591             : 
     592           6 :     ret = pthread_join(fifo->writer_thread, NULL);
     593           6 :     if (ret < 0) {
     594           0 :         av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
     595           0 :                av_err2str(AVERROR(ret)));
     596           0 :         return AVERROR(ret);
     597             :     }
     598             : 
     599           6 :     ret = fifo->write_trailer_ret;
     600           6 :     return ret;
     601             : }
     602             : 
     603           6 : static void fifo_deinit(AVFormatContext *avf)
     604             : {
     605           6 :     FifoContext *fifo = avf->priv_data;
     606             : 
     607           6 :     av_dict_free(&fifo->format_options);
     608           6 :     avformat_free_context(fifo->avf);
     609           6 :     av_thread_message_queue_free(&fifo->queue);
     610           6 :     if (fifo->overflow_flag_lock_initialized)
     611           6 :         pthread_mutex_destroy(&fifo->overflow_flag_lock);
     612           6 : }
     613             : 
     614             : #define OFFSET(x) offsetof(FifoContext, x)
     615             : static const AVOption options[] = {
     616             :         {"fifo_format", "Target muxer", OFFSET(format),
     617             :          AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
     618             : 
     619             :         {"queue_size", "Size of fifo queue", OFFSET(queue_size),
     620             :          AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
     621             : 
     622             :         {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options_str),
     623             :          AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
     624             : 
     625             :         {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow),
     626             :          AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
     627             : 
     628             :         {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
     629             :          AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
     630             : 
     631             :         {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
     632             :         AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
     633             : 
     634             :         {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
     635             :          AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
     636             : 
     637             :         {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
     638             :          AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
     639             : 
     640             :         {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
     641             :          OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
     642             : 
     643             :         {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
     644             :          AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
     645             : 
     646             :         {NULL},
     647             : };
     648             : 
     649             : static const AVClass fifo_muxer_class = {
     650             :     .class_name = "Fifo muxer",
     651             :     .item_name  = av_default_item_name,
     652             :     .option     = options,
     653             :     .version    = LIBAVUTIL_VERSION_INT,
     654             : };
     655             : 
     656             : AVOutputFormat ff_fifo_muxer = {
     657             :     .name           = "fifo",
     658             :     .long_name      = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
     659             :     .priv_data_size = sizeof(FifoContext),
     660             :     .init           = fifo_init,
     661             :     .write_header   = fifo_write_header,
     662             :     .write_packet   = fifo_write_packet,
     663             :     .write_trailer  = fifo_write_trailer,
     664             :     .deinit         = fifo_deinit,
     665             :     .priv_class     = &fifo_muxer_class,
     666             :     .flags          = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE,
     667             : };

Generated by: LCOV version 1.13