LCOV - code coverage report
Current view: top level - tests/api - api-threadmessage-test.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 88 105 83.8 %
Date: 2018-05-20 11:54:08 Functions: 5 5 100.0 %

          Line data    Source code
       1             : /*
       2             :  * Permission is hereby granted, free of charge, to any person obtaining a copy
       3             :  * of this software and associated documentation files (the "Software"), to deal
       4             :  * in the Software without restriction, including without limitation the rights
       5             :  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
       6             :  * copies of the Software, and to permit persons to whom the Software is
       7             :  * furnished to do so, subject to the following conditions:
       8             :  *
       9             :  * The above copyright notice and this permission notice shall be included in
      10             :  * all copies or substantial portions of the Software.
      11             :  *
      12             :  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
      13             :  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
      14             :  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
      15             :  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
      16             :  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
      17             :  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
      18             :  * THE SOFTWARE.
      19             :  */
      20             : 
      21             : /**
      22             :  * Thread message API test
      23             :  */
      24             : 
      25             : #include "libavutil/avassert.h"
      26             : #include "libavutil/avstring.h"
      27             : #include "libavutil/frame.h"
      28             : #include "libavutil/threadmessage.h"
      29             : #include "libavutil/thread.h" // not public
      30             : 
      31             : struct sender_data {
      32             :     int id;
      33             :     pthread_t tid;
      34             :     int workload;
      35             :     AVThreadMessageQueue *queue;
      36             : };
      37             : 
      38             : /* same as sender_data but shuffled for testing purpose */
      39             : struct receiver_data {
      40             :     pthread_t tid;
      41             :     int workload;
      42             :     int id;
      43             :     AVThreadMessageQueue *queue;
      44             : };
      45             : 
      46             : struct message {
      47             :     AVFrame *frame;
      48             :     // we add some junk in the message to make sure the message size is >
      49             :     // sizeof(void*)
      50             :     int magic;
      51             : };
      52             : 
      53             : #define MAGIC 0xdeadc0de
      54             : 
      55          27 : static void free_frame(void *arg)
      56             : {
      57          27 :     struct message *msg = arg;
      58          27 :     av_assert0(msg->magic == MAGIC);
      59          27 :     av_frame_free(&msg->frame);
      60          27 : }
      61             : 
      62          10 : static void *sender_thread(void *arg)
      63             : {
      64          10 :     int i, ret = 0;
      65          10 :     struct sender_data *wd = arg;
      66             : 
      67          10 :     av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload);
      68          73 :     for (i = 0; i < wd->workload; i++) {
      69          71 :         if (rand() % wd->workload < wd->workload / 10) {
      70          13 :             av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id);
      71          13 :             av_thread_message_flush(wd->queue);
      72             :         } else {
      73             :             char *val;
      74          58 :             AVDictionary *meta = NULL;
      75          58 :             struct message msg = {
      76             :                 .magic = MAGIC,
      77          58 :                 .frame = av_frame_alloc(),
      78             :             };
      79             : 
      80          58 :             if (!msg.frame) {
      81           0 :                 ret = AVERROR(ENOMEM);
      82           8 :                 break;
      83             :             }
      84             : 
      85             :             /* we add some metadata to identify the frames */
      86          58 :             val = av_asprintf("frame %d/%d from sender %d",
      87             :                               i + 1, wd->workload, wd->id);
      88          58 :             if (!val) {
      89           0 :                 av_frame_free(&msg.frame);
      90           0 :                 ret = AVERROR(ENOMEM);
      91           0 :                 break;
      92             :             }
      93          58 :             ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL);
      94          58 :             if (ret < 0) {
      95           0 :                 av_frame_free(&msg.frame);
      96           0 :                 break;
      97             :             }
      98          58 :             msg.frame->metadata = meta;
      99             : 
     100             :             /* allocate a real frame in order to simulate "real" work */
     101          58 :             msg.frame->format = AV_PIX_FMT_RGBA;
     102          58 :             msg.frame->width  = 320;
     103          58 :             msg.frame->height = 240;
     104          58 :             ret = av_frame_get_buffer(msg.frame, 32);
     105          58 :             if (ret < 0) {
     106           0 :                 av_frame_free(&msg.frame);
     107           0 :                 break;
     108             :             }
     109             : 
     110             :             /* push the frame in the common queue */
     111          58 :             av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n",
     112             :                    wd->id, i + 1, wd->workload, msg.frame);
     113          58 :             ret = av_thread_message_queue_send(wd->queue, &msg, 0);
     114          58 :             if (ret < 0) {
     115           8 :                 av_frame_free(&msg.frame);
     116           8 :                 break;
     117             :             }
     118             :         }
     119             :     }
     120          10 :     av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n",
     121          10 :            wd->id, av_err2str(ret));
     122          10 :     av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF);
     123          10 :     return NULL;
     124             : }
     125             : 
     126           2 : static void *receiver_thread(void *arg)
     127             : {
     128           2 :     int i, ret = 0;
     129           2 :     struct receiver_data *rd = arg;
     130             : 
     131          28 :     for (i = 0; i < rd->workload; i++) {
     132          27 :         if (rand() % rd->workload < rd->workload / 10) {
     133           3 :             av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue, "
     134             :                    "discarding %d message(s)\n", rd->id,
     135             :                    av_thread_message_queue_nb_elems(rd->queue));
     136           3 :             av_thread_message_flush(rd->queue);
     137             :         } else {
     138             :             struct message msg;
     139             :             AVDictionary *meta;
     140             :             AVDictionaryEntry *e;
     141             : 
     142          24 :             ret = av_thread_message_queue_recv(rd->queue, &msg, 0);
     143          24 :             if (ret < 0)
     144           1 :                 break;
     145          23 :             av_assert0(msg.magic == MAGIC);
     146          23 :             meta = msg.frame->metadata;
     147          23 :             e = av_dict_get(meta, "sig", NULL, 0);
     148          23 :             av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame);
     149          23 :             av_frame_free(&msg.frame);
     150             :         }
     151             :     }
     152             : 
     153           2 :     av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i);
     154           2 :     av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF);
     155             : 
     156           2 :     return NULL;
     157             : }
     158             : 
     159          12 : static int get_workload(int minv, int maxv)
     160             : {
     161          12 :     return maxv == minv ? maxv : rand() % (maxv - minv) + minv;
     162             : }
     163             : 
     164           1 : int main(int ac, char **av)
     165             : {
     166           1 :     int i, ret = 0;
     167             :     int max_queue_size;
     168             :     int nb_senders, sender_min_load, sender_max_load;
     169             :     int nb_receivers, receiver_min_load, receiver_max_load;
     170             :     struct sender_data *senders;
     171             :     struct receiver_data *receivers;
     172           1 :     AVThreadMessageQueue *queue = NULL;
     173             : 
     174           1 :     if (ac != 8) {
     175           0 :         av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> "
     176             :                "<nb_senders> <sender_min_send> <sender_max_send> "
     177             :                "<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]);
     178           0 :         return 1;
     179             :     }
     180             : 
     181           1 :     max_queue_size    = atoi(av[1]);
     182           1 :     nb_senders        = atoi(av[2]);
     183           1 :     sender_min_load   = atoi(av[3]);
     184           1 :     sender_max_load   = atoi(av[4]);
     185           1 :     nb_receivers      = atoi(av[5]);
     186           1 :     receiver_min_load = atoi(av[6]);
     187           1 :     receiver_max_load = atoi(av[7]);
     188             : 
     189           1 :     if (max_queue_size <= 0 ||
     190           1 :         nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 ||
     191           1 :         nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) {
     192           0 :         av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n");
     193           0 :         return 1;
     194             :     }
     195             : 
     196           1 :     av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / "
     197             :            "%d receivers receiving [%d-%d]\n", max_queue_size,
     198             :            nb_senders, sender_min_load, sender_max_load,
     199             :            nb_receivers, receiver_min_load, receiver_max_load);
     200             : 
     201           1 :     senders = av_mallocz_array(nb_senders, sizeof(*senders));
     202           1 :     receivers = av_mallocz_array(nb_receivers, sizeof(*receivers));
     203           1 :     if (!senders || !receivers) {
     204           0 :         ret = AVERROR(ENOMEM);
     205           0 :         goto end;
     206             :     }
     207             : 
     208           1 :     ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message));
     209           1 :     if (ret < 0)
     210           0 :         goto end;
     211             : 
     212           1 :     av_thread_message_queue_set_free_func(queue, free_frame);
     213             : 
     214             : #define SPAWN_THREADS(type) do {                                                \
     215             :     for (i = 0; i < nb_##type##s; i++) {                                        \
     216             :         struct type##_data *td = &type##s[i];                                   \
     217             :                                                                                 \
     218             :         td->id = i;                                                             \
     219             :         td->queue = queue;                                                      \
     220             :         td->workload = get_workload(type##_min_load, type##_max_load);          \
     221             :                                                                                 \
     222             :         ret = pthread_create(&td->tid, NULL, type##_thread, td);                \
     223             :         if (ret) {                                                              \
     224             :             const int err = AVERROR(ret);                                       \
     225             :             av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type)    \
     226             :                    " thread: %s\n", av_err2str(err));                           \
     227             :             goto end;                                                           \
     228             :         }                                                                       \
     229             :     }                                                                           \
     230             : } while (0)
     231             : 
     232             : #define WAIT_THREADS(type) do {                                                 \
     233             :     for (i = 0; i < nb_##type##s; i++) {                                        \
     234             :         struct type##_data *td = &type##s[i];                                   \
     235             :                                                                                 \
     236             :         ret = pthread_join(td->tid, NULL);                                      \
     237             :         if (ret) {                                                              \
     238             :             const int err = AVERROR(ret);                                       \
     239             :             av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type)     \
     240             :                    " thread: %s\n", av_err2str(err));                           \
     241             :             goto end;                                                           \
     242             :         }                                                                       \
     243             :     }                                                                           \
     244             : } while (0)
     245             : 
     246           1 :     SPAWN_THREADS(receiver);
     247           1 :     SPAWN_THREADS(sender);
     248             : 
     249           1 :     WAIT_THREADS(sender);
     250           1 :     WAIT_THREADS(receiver);
     251             : 
     252           1 : end:
     253           1 :     av_thread_message_queue_free(&queue);
     254           1 :     av_freep(&senders);
     255           1 :     av_freep(&receivers);
     256             : 
     257           1 :     if (ret < 0 && ret != AVERROR_EOF) {
     258           0 :         av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret));
     259           0 :         return 1;
     260             :     }
     261           1 :     return 0;
     262             : }

Generated by: LCOV version 1.13