libuvに関する覚書(4) : スレッド uv_work_t, uv_async_t

関連投稿

libuvに関する覚書(1) : タイマー uv_timer_t
libuvに関する覚書(2) : ファイル監視 uv_fs_event_t
libuvに関する覚書(3) : パイプ uv_pipe_t

uv_queue_workuv_work_tを使ってスレッドを複数生成して、並列処理を行う。また、uv_async_tで途中の進捗状況をコールバックするサンプル。

uv_work_tuv_async_tはdataというvoidポインタのメンバがあるので、データの受け渡しはこの変数を通じて行う。サンプルでは3つのスレッドを生成してループでスレッドを開始している。

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <uv.h>

uv_loop_t *loop;

typedef struct values_ {
   int no;
   int input;
   int result;
   uv_async_t async;
} values_t;

int main(int argc, char **argv) {
  printf("---main start\n");

  // initialize rand
  srand(time(NULL));

  loop = uv_default_loop();

  // スレッド開始
  uv_work_t requests[3];
  values_t values[3];
  for( int i = 0; i < 3; i++ ){

    values[i].no = i+1;
    values[i].input = rand() % 5 + 1; // 1 - 5
    values[i].result = 0;
    values[i].async.data = (void*)malloc(64);
    memset(values[i].async.data, 0, 64);
    requests[i].data = (void*)&values[i];

    printf("---thread start\n");
    uv_async_init(loop, &values[i].async, worker_progress );
    uv_queue_work( loop, &requests[i], worker, worker_after );
  }

  printf("---wait...\n");
  uv_run(loop, UV_RUN_DEFAULT);
  
  printf("---main end\n");
  return 0;
}

各コールバック関数の実装。サンプルでは、ランダムの値だけスリープして、途中の進捗状況を送信している。進捗状況は別のコールバック関数内でprintfしている。

//size_t bufsize = 10;
void worker(uv_work_t *req) {
  printf("---worker\n");

  values_t * values = (values_t *) req->data;

  for(int i = 0; i < values->input; i ++ ){
    
    values->result += values->input;
    sprintf( values->async.data, "no=%d, %d / %d progressed.",
      values->no, i+1, values->input );

    uv_async_send(&values->async);

    sleep(1);
  }

  printf("no=%d, input=%d, result=%d\n",
    values->no, values->input, values->result);
  return;
}

void worker_progress(uv_async_t *handle){
  printf("---worker_progress\n");
  char* str = (char *)(handle->data);
  printf("%s\n", str);
  
}

void worker_after(uv_work_t *req, int status) {
  printf("---worker_after\n");

  values_t * values = (values_t *) req->data;
  printf("status=%d, no=%d, input=%d, result=%d\n",
    status, values->no, values->input, values->result );
  
  uv_close((uv_handle_t*) &values->async, NULL);

  return;
}

実際の実行結果。ゴチャゴチャしているが、並列処理していることがわかる。

> clang -luv thread.c -o thread && ./thread
---main start
---thread start
---thread start
---worker
---thread start
---wait...
---worker
---worker
---worker_progress
no=1, 1 / 1 progressed.
---worker_progress
no=2, 1 / 5 progressed.
---worker_progress
no=3, 1 / 3 progressed.
no=1, input=1, result=1
---worker_after
status=0, no=1, input=1, result=1
---worker_progress
no=2, 2 / 5 progressed.
---worker_progress
no=3, 2 / 3 progressed.
---worker_progress
no=2, 3 / 5 progressed.
---worker_progress
no=3, 3 / 3 progressed.
no=3, input=3, result=9
---worker_after
status=0, no=3, input=3, result=9
---worker_progress
no=2, 4 / 5 progressed.
---worker_progress
no=2, 5 / 5 progressed.
no=2, input=5, result=25
---worker_after
status=0, no=2, input=5, result=25
---main end