libevの使い方(自分用)

libev まとめ
koblas/echo.cxx

上記の素敵なサンプルを自分でコーディングしただけの備忘録

ev::statでファイルの書き込みを監視する

指定したファイルを監視して書き込まれたらコンソールに表示するtailコマンドのようなサンプル。

ファイルのopenreadをクラス内部で行い、コールバック用のメソッドvoid evstat_t::read( ev::stat & watcher, int event )を定義する。

// evstat_t.h
#ifndef EVSTAT_T_
#define EVSTAT_T_
#include <array>
#include <ev++.h>

class evstat_t {
  public:

  evstat_t(const char * path);
  ~evstat_t();
  void read(ev::stat & watcher, int event);

  private:
  
  const static std::size_t BUFSIZE = 4096;
  std::array<char, BUFSIZE> buffer_;
  int fd_;
};
#endif
// evstat_t.cpp
#include "evstat_t.h"
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <iostream>

using namespace std;

//---------------------------------
// evstat_t
//---------------------------------
evstat_t::evstat_t(const char * path) 
{
  fd_ = open(path, O_RDONLY);
}
//---------------------------------
// ~evstat_t
//---------------------------------
evstat_t::~evstat_t(){
  cout<<"-- destructor --"<<endl;
  if( fd_ > 0 ){
      close( fd_ );
  }
}
//---------------------------------
// read
//---------------------------------
void evstat_t::read( ev::stat & watcher, int event ){
  cout<<"-- read --"<<endl;
  cout<<"event="<<event<<endl;
  cout<<"ev::stat.attr.st_nlink="<<watcher.attr.st_nlink<<endl;
  cout<<"ev::stat.attr.st_size="<<watcher.attr.st_size<<endl;
  // read file
  ssize_t read_bytes = ::read(fd_, buffer_.data(), buffer_.size() - 1);
  cout<<"read_bytes="<<read_bytes<<endl;
  
  if (read_bytes == -1) {
    cout<<"failed to read()"<<endl;
    return;
  }
  if (read_bytes == 0) {
    cout<<"close file"<<endl;
    return;
  }
  buffer_[read_bytes] = '\0';
  cout << buffer_.data() <<flush;
  return;
}

ev::default_loopを生成して、ev::stat::setで監視対象とコールバックを登録する。その後、ev::stat::start()ev::defalut_loop.run(0)でイベントループを開始する。

//test.cpp
#include <iostream>
#include "evstat_t.h"
#include "evtimer_t.h"
#include "evioserv_t.h"

using namespace std;
//-----------------------
// main
//-----------------------
int main(int argc, char **argv) 
{
  const char* file_path = argv[1];
  evstat_t evstat( file_path );

  // create loop and watcher
  ev::default_loop loop;
  ev::stat watcher(loop);

  // set callback and target descriptor
  watcher.set<evstat_t, &evstat_t::read>( &evstat );
  ev::tstamp interval = 0;
  watcher.set( file_path, interval );

  // start watch and loop
  cout<<"watcher.start"<<endl;
  watcher.start();
  cout<<"loop.run"<<endl;
  loop.run(0);
}

ev::timerを使ったタイマー機能

statよりも単純な実装になる。基本的にタイマーのコールバックvoid awake(ev::timer & watcher, int event)を定義するだけ。

//evtimer_t.h
#ifndef EVTIMER_T_
#define EVTIMER_T_
#include <array>
#include <ev++.h>

class evtimer_t {
  public:

  evtimer_t( const double & timeout );
  ~evtimer_t();

  void awake(ev::timer & watcher, int event);

  private:
  double timeout_;
};

#endif
//evtimer_t.cpp
#include "evtimer_t.h"
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <iostream>

using namespace std;

//---------------------------------
// evtimer_t
//---------------------------------
evtimer_t::evtimer_t( const double & timeout ) :
timeout_(timeout){
}

//---------------------------------
// ~evtimer_t
//---------------------------------
evtimer_t::~evtimer_t(){
}
//---------------------------------
// awake
//---------------------------------
void evtimer_t::awake( ev::timer & watcher, int event ){
  cout<<"-- awake --"<<endl;
  
  // next timeout
  cout<<"count="<<timeout_<<endl;
  watcher.set(timeout_);
  watcher.start();

  return;
}

使い方もev::statとほぼ同じ。下記例だと2つのタイマーを同時に実行している。

//test.cpp
#include <iostream>
#include "evstat_t.h"
#include "evtimer_t.h"
#include "evioserv_t.h"

using namespace std;
//-----------------------
// main
//-----------------------
int main(int argc, char **argv) 
{
  ev::default_loop loop;

  const double count1 = 1.5;
  ev::timer watcher1(loop);
  evtimer_t evtimer1(count1);
  watcher1.set<evtimer_t, &evtimer_t::awake>( &evtimer1 );
  watcher1.set(count1);
  watcher1.start();

  const double count2 = 2.0;
  ev::timer watcher2(loop);
  evtimer_t evtimer2(count2);
  watcher2.set<evtimer_t, &evtimer_t::awake>( &evtimer2 );
  watcher2.set(count2);
  watcher2.start();

  loop.run(0);
}

ev::ioを使ったEchoサーバー

ev::ioはデスクリプタのI/Oを監視してイベントを発生させることができる。

例では、AF_INET(IPV4)でTCP/IPのソケットを生成してev::ioに監視させて、ソケットの入力を読み取って、そのまま出力に書き込んでいる。バッファ用のbuffer_tクラス、個々のクライアントとの接続をインスタンス化するevio_tクラス、Echoサーバーになるevioserv_tの3つのクラスで構成されている。

buffer_tクラス

#ifndef EVIOSERV_T_
#define EVIOSERV_T_
#include <array>
#include <ev++.h>
#include <string>
#include <sys/socket.h>
#include <netinet/in.h>
#include <map>
#include <memory>
#include <list>

class buffer_t{
  private:
  char *data;
  ssize_t len;

  public:
  ssize_t pos;
  buffer_t(const char *bytes, ssize_t nbytes);
  virtual ~buffer_t();
  char *dpos();
  ssize_t nbytes();
  const char* to_str();
};
#include "evioserv_t.h"
#include <unistd.h>
#include <fcntl.h>
#include <iostream>

using namespace std;

int evio_t::total_clients = 0;
//-----------------------
// buffer_t class
//-----------------------
buffer_t::buffer_t(const char *bytes, ssize_t nbytes){
  pos = 0;
  len = nbytes;
  data = new char[nbytes];
  memcpy(data, bytes, nbytes);
}
//-----------------------
// ~buffer_t
//-----------------------
buffer_t::~buffer_t(){
  delete [] data;
  data = NULL;
}
//-----------------------
// dpos
//-----------------------
char * buffer_t::dpos(){
  return data + pos;
}
//-----------------------
// nbytes
//-----------------------
ssize_t buffer_t::nbytes(){
  return len - pos;
}
//-----------------------
// to_str
//-----------------------
const char* buffer_t::to_str(){
  return data;
}

evio_tクラス

入力されたデータは、std::list<T>でキューとして保持される。ソケットに書き込むときにキューから取り出して送信する。入出力はcallbackですべてコールバックさせて、reventsの値を見てREADなのかWRITEなのかを判定してから、各種イベント処理を行う。

class evio_t{
  private:

  ev::io io;
  static int total_clients;
  int sfd;

  std::list<buffer_t*> write_queue;

  public:

  evio_t( int socketid );
  virtual ~evio_t();

  void callback( ev::io & watcher, int revents );
  void write_callback(ev::io &watcher);
  void read_callback(ev::io &watcher);
};

ソース内のコメントにもあるが、外部からの入力のみを監視するだけなら、io.set(ev::READ)だけでよい。以下の例だと、キューにデータがないときはev::READで待ち構えており、データが入力された時だけev::WRITEを有効にして書き込みイベントを発生させる?ような動きになっている。

//-----------------------
// evio_t class
//-----------------------
evio_t::evio_t( int socketid )
: sfd( socketid ){

  cout<<"create new connection. descriptor="<<socketid
    <<" "<<total_clients<<" clients connected."<<endl;

  fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL, 0) | O_NONBLOCK);
  total_clients++;
  io.set<evio_t, &evio_t::callback>(this);
  io.start(sfd, ev::READ);
}
//-----------------------
// ~evio_t
//-----------------------
evio_t::~evio_t(){
  
  cout<<"delete this connection. descriptor="<<sfd
    <<" "<<total_clients<<" clients connected."<<endl;

  io.stop();
  close(sfd);
}
//-----------------------
// callback
//-----------------------
void evio_t::callback( ev::io & watcher, int revents ){
  cout<<"[evio_t::callback]"<<endl;
  cout<<"revents="<<revents<<endl;

  if(EV_ERROR & revents){
    perror("[error] revents has EV_ERROR");
    return;
  }

  if( revents & EV_READ ){
    read_callback(watcher);
  }
  if( revents & EV_WRITE ){
    write_callback(watcher);
  }

#if 1
  // 基本的にev::READだが、
  // キューがあるときだけev::WRITEにする
  if(write_queue.empty()){
    cout<<"set ev::READ"<<endl;
    io.set(ev::READ);
  }else{
    cout<<"set ev::READ | ev::WRITE"<<endl;
    io.set(ev::READ|ev::WRITE);
  }
#else
  // ev::READだけだと、常に外部から書き込まれたデータを
  // Readするだけになる
  io.set(ev::READ);
#endif
}
//-----------------------
// read_callback
//-----------------------
void evio_t::read_callback(ev::io &watcher){
  cout<<"-- evio_t::read_callback --"<<endl;
  char buffer[1024];
  ssize_t nread = recv(watcher.fd, buffer, sizeof(buffer), 0);
  if( nread < 0 ){
    perror("nread < 0, read error");
    return;
  }
  if( nread == 0 ){
    cout<<"nread = 0, close this connection."<<endl;
    delete this;
  }else{
    cout<<"[input]"<<endl;
    cout<<"buffer size = "<<nread<<" bytes."<<endl;
    buffer_t *buf = new buffer_t(buffer, nread);
    cout<<buf->to_str()<<endl;
    write_queue.push_back( buf );
    cout<<"queue size = "<<write_queue.size()<<endl;
  }
}
//-----------------------
// write_callback
//-----------------------
void evio_t::write_callback(ev::io &watcher){
  cout<<"-- evio_t::write_callback --"<<endl;
  if(write_queue.empty()){
    cout<<"empty queue."<<endl;
    io.set(ev::READ);
    return;
  }
  
  buffer_t * buffer = write_queue.front();
  ssize_t written = write(watcher.fd, buffer->dpos(), buffer->nbytes());
  if( written < 0 ){
    perror("written < 0, write error");
    return;
  }
  
  buffer->pos += written;

  cout<<"buffer :"<<buffer->nbytes()<<"/"<<written<<endl;

  if( buffer->nbytes() == 0 ){
    write_queue.pop_front();
    delete buffer;
    buffer = NULL;
  }
}

evioserv_tクラス

class evioserv_t{
  private:

  int socketid;
  ev::io io;
  ev::sig sio;

  public:

  evioserv_t(int port);
  virtual ~evioserv_t();

  void accept(ev::io &watcher, int revents);
  static void signal_callback(ev::sig &signal, int revents);

};

ソケットを生成するときに、familyにAF_INETを指定してIPV4アドレスでTCP/IP通信できるようにする。ポート番号はテストコマンドで指定できるようにしている。

//-----------------------
// evioserv_t class
//-----------------------
evioserv_t::evioserv_t(int port){

  socketid = socket(AF_INET, SOCK_STREAM, 0);
  struct sockaddr_in addr;
  addr.sin_family = AF_INET;
  addr.sin_port = htons(port);
  addr.sin_addr.s_addr = INADDR_ANY;

  // bind
  int ret = ::bind(socketid, (struct sockaddr *)&addr, sizeof(addr));
  if( ret != 0 ){
    perror("bind error");
  }

  fcntl(socketid, F_SETFL, fcntl(socketid, F_GETFL, 0) | O_NONBLOCK);
  listen(socketid, 5);

  io.set<evioserv_t, &evioserv_t::accept>(this);
  io.start(socketid, ev::READ);

  sio.set<&evioserv_t::signal_callback>();
  sio.start(SIGINT);

  cout<<"Listening on port "<<port<<endl;
}
//-----------------------
// ~evioserv_t
//-----------------------
evioserv_t::~evioserv_t(){
  cout<<"evioserv_t::~evioserv_t"<<endl;
  ::shutdown(socketid, SHUT_RDWR);
  ::close(socketid);
}
//-----------------------
// accept
//-----------------------
void evioserv_t::accept(ev::io &watcher, int revents){
  if (EV_ERROR & revents) {
    cout<<"[error] invalid event = "<<revents<<endl;
    return;
  }
  struct sockaddr_in client_addr;
  socklen_t client_len = sizeof(client_addr);
  // accept
  int client_sd = ::accept(watcher.fd, (struct sockaddr*)&client_addr, &client_len);
  if( client_sd < 0 ){
    cout<<"[error] failed to accept."<<endl;
    return;
  }

  evio_t * evio = new evio_t(client_sd);
}
//-----------------------
// signal_callback
//-----------------------
void evioserv_t::signal_callback(ev::sig &signal, int revents){
  cout<<"[signal_callback]"<<endl;
  signal.loop.break_loop();
  cout<<"break loop"<<endl;
}

ev::ioのテストツール

int main(int argc, char **argv) 
{
  int port = 8192;
  ev::default_loop loop;
  evioserv_t echo(port);
  loop.run(0);
  return 0;
}

テストツールを起動したら、telnetで接続して文字列を入力&Enterすると、サーバー側に送信される。そしてサーバー側の応答として入力した文字列がそのまま返ってくるサンプルで、対話式のアプリケーションなどに応用できそう。

> ./test
-- start test --
Listening on port 8192

別のターミナルを起動して、telnetで接続する。

> telnet localhost 8192
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
abc              <-- 入力した文字
abc              <-- サーバーから応答された文字列

サーバー側のコンソールログ

create new connection. descriptor=6 0 clients connected.

[evio_t::callback]
revents=1
-- evio_t::read_callback --
[input]
buffer size = 10 bytes.
fdasfdsa

queue size = 1
set ev::READ | ev::WRITE

[evio_t::callback]
revents=2
-- evio_t::write_callback --
buffer :0/10
set ev::READ