Java 非同期 I/O のデザインパターン (クライアント編)

New I/O の非同期処理を実装する時にいつも使うパターンの個人的設計まとめ。非同期 I/O はアプリ要件によって設計を柔軟に変える必要があるので定石というわけではありませんし、安全に組み替えるにはそれなりの知識が必要です。
この記事が説明のベースにしているサンプルソースMy Design Pattern for Asyncronous I/O · GitHub に置いてあります。

非同期 I/O といえば 1 つのスレッドで複数のソケット I/O を管理する方法です。

非同期 I/O を生かした設計というものは必ず Producer/Consumer 型、イベント駆動型設計となります。これはパフォーマンスと引き換えにオブジェクト指向の汎用設計化を低下させ、特定の困難なバグを生む余地を増やします。個人的には C10K 問題を想定する必要がない程度 (同時に扱うソケット数が 1000 やそこいら程度) であればソケット数分のスレッドで同期 I/O を使用する方法をお勧めしております。

仮定要件と設計

この記事で仮定するクライアントの挙動は以下の通り:

  • 送信した 1 行分のデータをそのまま返信してくる echo サービスに対するクライアント。つまりデータ単位が「行」でその区切りは改行 (CRLF, CR, LF 兼用)。
  • クライアントの実装は 1 秒ごとに任意の文字列を送信し、それが返ってくることを期待する (任意のタイミングで送信データが発生する事を想定)。
  • あるデータに対する応答が返ってくるまで次の行の送信は行わない (パイプライン化を行わない)。
  • 1スレッドで複数のクライアント (通信端点; ソケット) を操作する。クライアントは任意のタイミングで参加が可能。
  • クライアントの処理中に例外が発生したらそのクライアントのみを切り捨てる。全体をダウンさせない。


各クラスの役割は以下の通り:

Client
送信データが発生した時の通知と、送信データの参照、データ受信時の通知をアプリケーション側で実装するための抽象クラス。インスタンス内部に SocketChannel と I/O 用のバッファを保持している。通信端点。
Worker
Selector を保持し送受信可能になった Client に対して送受信処理を行わせるためのスレッド。

Worker スレッド

Worker スレッドのインスタンス変数とコンストラクタ:

public class Worker extends Thread {
  private final Selector selector;
  private final List<Client> queue = new ArrayList<Client>();
  public Worker() throws IOException {
    selector = Selector.open();
  }
  …
}
クライアントの追加

実行中の Worker スレッドへ新しいクライアントを追加するにはキューを使用します。これは Selector#select() と SelectableChannel#register() が同じ Mutex を使用しているため、Worker スレッド内で select() 待機中に別スレッドから register() を行うと処理が停止する (設計によってはデッドロックとなる) ためです。このため、キュー経由でクライアントを渡し Worker スレッド内で register() を行うよう実装しています。

public void add(Client endpoint){
  synchronized(queue){
    queue.add(endpoint);
  }
  selector.wakeup();
}
select 処理

Worker スレッドが扱うソケットのいずれかが OP_READ/OP_WRITE の状態になったかを知るには Selector#select() を使用します。ただしそれ以外のも以下のケースで呼び出しが終了します (詳しくは API リファレンス参照):

  1. Selector#wakeup() が呼び出された (この設計では新しいクライアントがキューに投入された)。
  2. スレッドが割り込まれた (この設計では Worker スレッドに終了要求が出た)。

select() から処理が戻ってきた後に Worker スレッドの終了要求が出されていないかを調べています。次にキューに保存されているクライアントを全て Selector に登録しています。try ブロックで囲っていますが、この中で発生する IOException は Selector が予期しない状態に陥っているケースです。

private boolean select(){
  try {
    selector.select();
    if(this.isInterrupted()){
        return false;
      }
    synchronized(queue){
      while(! queue.isEmpty()){
        Client endpoint = queue.remove(0);
        SelectionKey key = endpoint.join(selector);
        key.attach(endpoint);
      }
    }
    return true;
  } catch(IOException ex){
    ex.printStackTrace();
    return false;
  }
}
送受信の実行
  1. 上記 select() メソッドが false で終了したら Worker スレッドの処理を終了。
  2. selector から通知が行えるようになった SelectionKey のセットを参照。
  3. Iterator を使用して列挙。列挙済みのものは it.remove() で通知対象から削除する (これを行わないと interestOpts() で通知フラグを下ろしても次回以降に残ってしまう仕様)。
  4. SelectionKey から処理対象の Client インスタンスを参照して read() または write() を実行。
  5. 例外が発生した場合はそのクライアントのみを close() して切り離し。

Client インスタンスがそれぞれバッファを持っているため、読み出しに使用する ByteBuffer はスレッドに対して一つあればよい。また 1024 というサイズにあまり意味はなく、一度に想定する受信データサイズを想定して 数バイト〜channel.socket().getReadBufferSize() バイトの範囲にあれば良いと思います。

@Override
public void run(){
  ByteBuffer readBuffer = ByteBuffer.allocate(1024);
  while(true){
    if(! select()){
      break;
    }
    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> it = keys.iterator();
    while(it.hasNext()){
      SelectionKey key = it.next();
      it.remove();
      Client endpoint = (Client)key.attachment();
      try {
        if(key.isReadable()){
          endpoint.read(readBuffer);
        } else if(key.isWritable()){
          endpoint.write();
        }
      } catch(Exception ex){
        ex.printStackTrace();
        endpoint.close();
      }
    }
  }
}

Client クラス

インスタンス変数とコンストラクタは以下の通り:

public abstract class Client {
  private final SocketChannel channel;
  private ByteBuffer out = null;
  private final ByteArrayOutputStream inBuffer = new ByteArrayOutputStream();
  private SelectionKey key = null;
  private volatile boolean writeReady = false;

  public Client(String server, int port) throws IOException{
    this(new InetSocketAddress(server, port));
  }
  public Client(SocketAddress address) throws IOException{
    this(SocketChannel.open(address));
  }
  public Client(SocketChannel channel) throws IOException{
    this.channel = channel;
    channel.configureBlocking(false);
  }
  …
}
- in は Client ではなく Worker で良い。
  • 送信用 ByteBuffer は送信ごとにデータ長が変わるため都度作成しています。
  • writeReady のフラグはサブクラス (クライアント実装) が送信データの準備ができた時に ture となるフラグです。この変数のみ Worker スレッド以外から参照/変更されるため volatile 属性を付加しています。
クライアント実装部分

abstract 宣言されているものはサブクラスで実装するものです。
select() 待機中の selector に OP_XXX を変更した場合は wakeup() で呼び起こす必要があります。

public abstract void init();
public abstract String send();
public abstract void receive(String line);
protected void writeReady(boolean ready){
  writeReady = ready;
  if(ready){
    key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
    key.selector().wakeup();
  }
}
データの送信

データ送信部分を実装する上での注意点など。

  • 一度の SocketChannel#write() 呼び出しで ByteBuffer 内の全データが受け付けられるとは限らない。渡し切れなかった残りのデータを次回以降の送信可能時のために保持しておく必要があります。
  • write() の呼び出しが完了しても ByteBuffer 内の該当データが送信されている保証はありません。ただし送信内容は OS かソケットの送信バッファにコピーされているため、送信要求が完了した ByteBuffer の領域を即時書き換えたとしても直前の送信には影響はありません。
public void write() throws IOException {
  if(out == null){
    String data = send() + "\r\n";
    out = ByteBuffer.wrap(data.getBytes("UTF-8"));
  }
  channel.write(out);
  if(out.remaining() == 0){
    out = null;
    if(! writeReady){
      key.interestOps(SelectionKey.OP_READ);
    }
  }
}
  1. 送信バッファにデータが残っていない場合はサブクラスにデータを要求 (write() が呼び出された時点でクライアント側には送信データがある)。
  2. チャネルに対して非同期出力を実行。
  3. 内部バッファのデータを送信し終えたら参照を切り離す。この時、実装側がこれ以上送信データを持っていなければ OP_WRITE フラグを下ろす。
データの受信

データ受信部分を実装する上での注意点など:

  • 一度の読み込みでアプリケーションの定義するデータ単位を読み出せるとは限らない。逆に一度の読み出しで複数のデータ単位を読み出す可能性もある。このため ByteBuffer とは別の可変長バッファ (ByteArrayOutputStream) にデータ区切りを検出するまでのデータを保持する。
  • データ単位の区切り (例えば改行など) を検出した時点で可変長バッファを文字列化し実装側へコールバックする。
public void read() throws IOException{
  channel.read(in);
  in.flip();
  while(in.remaining() > 0){
    byte ch = in.get();
    if(ch != '\r' && ch != '\n'){
      inBuffer.write(ch & 0xFF);
    } else {
      if(ch == '\r' && in.remaining() > 0 && in.get() != '\n'){
        in.position(in.position() - 1);
      }
      String line = inBuffer.toString("UTF-8");
      inBuffer.reset();
      receive(line);
    }
  }
  in.clear();
}
  1. ByteBuffer へ読み出し。
  2. 読み込んだデータを 1 バイトごとに可変長バッファへ移動。この時、改行文字を検出したらその部分までを文字列化し実装側に通知する。

ちなみにこの実装だと CR と LF の間に読み込みの分割が入ったらバグになりますね…

切断の検知

サーバ側からの切断やネットワークによる切断は OP_READ の通知として呼び出されます。今回の設計では SocketChannel#read() からの例外発生時の挙動として処理しています。
切断時の処理が必要な場合は close() をオーバーライドする必要があります。

通知のタイミング

SelectionKey.OP_READ, SelectionKey.OP_WRITE を使用した書込可能と読込可能通知に注意する必要がある。
OP_READ フラグは相手がデータを送ってきた時のみ select() が働くため常に立てっぱなしという設計でも問題になる事はあまりない。しかし OP_WRITE に関しては殆どの時刻で書き込み可能であるため上げ下げを適時行わなければならない。

監視するソケットを追加

Selector#select() と SelectableChannel#register() は共に同じ Mutex を使用するため、スレッド A が select() 中にスレッド B で register() を行うとスレッド B が停止し設計によってはデッドロックを引き起こす。このため、別スレッドからのソケット追加はキューを使用しなければならない。

private final List<SocketChannel> queue = new ArrayList<SocketChannel>();
public void join(SocketChannel channel){
  synchronized(queue){
    queue.add(channel);
  }
  selector.wakeup();
  return;
}
int count = selector.select();
if(count == 0){
  synchronized(queue){
    if(queue.size() > 0){
      while
    } else {
      break;
    }
  }
}
クライアント実装

今回の設計でクライアント側で実装する処理は以下の通り:

  1. 1 秒ごとに送信可能な状態へ遷移するためにタイマーを設定
  2. 送信データが参照されるたびに送信可能フラグを下ろす。
  3. 受信したデータを標準出力へ表示。
class TimeProducer extends Client{
  private final Timer timer = new Timer();
  public TimeProducer() throws IOException{
    super("localhost", 8976);
    timer.scheduleAtFixedRate(new TimerTask(){
      @Override
      public void run(){
        writeReady(true);
      }
    }, 1000, 1000);
  }
  @Override
  public void init(){ }
  @Override
  public String send() {
    writeReady(false);
    return DateFormat.getDateTimeInstance().format(new Date());
  }
  @Override
  public void receive(String line) {
    System.out.println(line.length());
  }
}