Posted on 2014-03-27 tags: java, concurrent
QPS = Queries per Second, RPS = Requests per Second という言葉がある。読んで字の如し、秒間クエリ数を表す。例えば次のような HTTP アクセスログがあったとする。
1 | 127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326 |
このとき、ある一秒間を含むようなログの行数を数えれば QPS を出すことができる。
1 2 | $ grep '10/Oct/2000:13:55:36 -0700' access_log | wc -l 123 |
リクエストを送る側が受ける側の性能の問題だったりで「500 QPS くらいで送って」と頼まれるときがある。そこでうまい具合に QPS をコントロールしたいのだが、そんなに自明な問題ではないことに気づいた。素朴には「メインスレッドが 500 個スレッドを作ってそれぞれがリクエストを飛ばし、メインスレッドは1秒寝る」みたいなことをやればいいと考えるかもしれない。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | class Main { public static void main(String[] args) throws Exception { int arg = 0; while (true) { for (int i = 0; i < 500; ++i) { new Thread(new Task(arg++)).run(); } Thread.sleep(1000); } } static class Task implements Runnable { private int arg; public Task(int arg) { this.arg = arg; } public void run() { sendRequest(arg); } } } |
しかし、これではメインスレッドが Thread
や Runnable
のインスタンスを作る分遅れ、遅れたところから 1 秒間眠ることになる。これでは正しく QPS コントロールされているとはいえない。ちょっとしたテストならそれでも良いかもしれないが、規模が大きくなるにつれて無視できないことになる。
また、リクエストを送ったがネットワークの気まぐれで失敗した場合にリトライしたいこともある。リトライのロジックを同じスレッドの中で行うとすると次のようになる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | class Main { public static void main(String[] args) throws Exception { int arg = 0; while (true) { for (int i = 0; i < 500; ++i) { new Thread(new Task(arg++)).run(); } Thread.sleep(1000); } } static class Task implements Runnable { private int arg; public Task(int arg) { this.arg = arg; } public void run() { try { sendRequest(arg); } catch (Exception e) { run(); } } } } |
しかしこれではリクエストが失敗したらすぐに同じリクエストが飛ぶことになり、これも QPS が狂ってしまうことになる。
改めてきちんと問題を定義する。
java.util.Timer
の scheduleAtFixedRate
というメソッドを使う。これを使って 1
秒ごとに実行されるようにしておけば、毎秒きっちり 1 回の頻度で実行されるようになる (イレギュラーなスケジューリングは考えない) 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | import java.util.Timer; import java.util.TimerTask; class Main { public static void main(String[] args) { Timer timer = new Timer(); Task task = new Task(); timer.scheduleAtFixedRate(task, 0L, 1000L); } static class Task extends TimerTask { public void run() { System.out.println("run!"); } } } |
これで 1 秒に 1 回の割合でタスクが実行されることになる。実際にはこのタスクの中で n 回のリクエストを行いたいので、固定頻度で呼ばれる Runner と実際にリクエストを行う Task を分け、次のようになる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | import java.util.Timer; import java.util.TimerTask; class Main { static final int n = 500; static int arg = 0; public static void main(String[] args) { Timer timer = new Timer(); Runner runner = new Runner(); timer.scheduleAtFixedRate(runner, 0L, 1000L); } static class Runner extends TimerTask { public void run() { for (int i = 0; i < n; ++i) { new Thread(new Task(arg++)).run(); } } } static class Task implements Runnable { private int arg; public Task(int arg) { this.arg = arg; } public void run() { sendRequest(arg); } } } |
失敗した時のリトライを同じスレッドで行おうとするとどうしても QPS が狂ってしまうことは明らかである。そこでタスクキューにタスクを追加していき、次の実行の際にまとめて実行するという手段が考えられる。タスクのインスタンスはリクエストを送る際の引数とリトライ回数を保持している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | import java.util.LinkedList; import java.util.Timer; import java.util.TimerTask; import java.util.Queue; class Main { static final int n = 500; static final int m = 3; static int arg = 0; static Queue<Thread> tasks = new LinkedList<Thread>(); public static void main(String[] args) { Timer timer = new Timer(); Runner runner = new Runner(); timer.scheduleAtFixedRate(runner, 0L, 1000L); } static class Runner extends TimerTask { public void run() { while (tasks.size() < n) { tasks.offer(new Thread(new Task(arg++, m))); } for (int i = 0; i < n; ++i) { Thread task = tasks.poll(); task.run(); } } } static class Task implements Runnable { private int arg; private int nRetry; public Task(int arg, int nRetry) { this.arg = arg; this.nRetry = nRetry; } public void run() { try { sendRequest(arg); } catch (Exception e) { if (nRetry > 0) { nRetry--; tasks.offer(new Thread(this)); } else { System.out.println("arg = " + arg + " tried " + m + " times, but failed."); } } } } } |
しかしこの実装では、メインスレッドの他にタスクに失敗したスレッドからもアクセスされるため、それが同時に発生するとまずい。そこで同期化されたタスクキューを使う。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | import java.util.concurrent.LinkedBlockingQueue; import java.util.Timer; import java.util.TimerTask; import java.util.Queue; class Main { static final int n = 500; static final int m = 3; static int arg = 0; static Queue<Thread> tasks = new LinkedBlockingQueue<Thread>(); public static void main(String[] args) { Timer timer = new Timer(); Runner runner = new Runner(); timer.scheduleAtFixedRate(runner, 0L, 1000L); } static class Runner extends TimerTask { public void run() { while (tasks.size() < n) { tasks.offer(new Thread(new Task(arg++, m))); } for (int i = 0; i < n; ++i) { Thread thread = tasks.poll(); thread.run(); } } } static class Task implements Runnable { private int arg; private int nRetry; public Task(int arg, int nRetry) { this.arg = arg; this.nRetry = nRetry; } public void run() { try { sendRequest(arg); } catch (Exception e) { if (nRetry > 0) { nRetry--; tasks.offer(new Thread(this)); } else { System.out.println("arg = " + arg + " tried " + m + " times, but failed."); } } } } } |
以上で QPS コントロールの要件は満たされたはずだが、さらに改善の余地がある。何度もスレッドを作っている点だ。これまでの実装ではスレッドが使い捨てになっており、毎回新しくスレッドを作るという点でパフォーマンス的に問題がある。そこで数の固定されたスレッドプールを使う。
java.util.concurrent.ExecutorService
はスレッドプールを扱う API である。この
invokeAll
メソッドを使えば、引数の Iterable
に入ったタスクを一気に発火することができる。その後タスクキューをクリアすればよい。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.Timer; import java.util.TimerTask; import java.util.Queue; class Main { static final int n = 500; static final int m = 3; static int arg = 0; static LinkedBlockingQueue<Task> tasks = new LinkedBlockingQueue<Task>(); static ExecutorService exe = Executors.newFixedThreadPool(n); public static void main(String[] args) { Timer timer = new Timer(); Runner runner = new Runner(); timer.scheduleAtFixedRate(runner, 0L, 1000L); } static class Runner extends TimerTask { public void run() { while (tasks.size() < n) { tasks.offer(new Task(arg++, m)); } try { exe.invokeAll(tasks); tasks.clear(); } catch (Exception e) { e.printStackTrace(); } } } static class Task implements Callable<Void> { private int arg; private int nRetry; public Task(int arg, int nRetry) { this.arg = arg; this.nRetry = nRetry; } public Void call() { try { sendRequest(arg); } catch (Exception e) { if (nRetry > 0) { nRetry--; tasks.offer(this); } else { System.out.println("arg = " + arg + " tried " + m + " times, but failed."); } } return null; } } } |
しかしこれでも、失敗時のタスクキュー追加のタイミングによって次の問題がある。
invokeAll
の直前に追加された場合、追加されたぶんも含めて invoke
されてしま
うinvokeAll
の直後に追加された場合、そのタスクが消えてしまう解決策として次の2通りがある。
synchronized
で同期化するここでは後者を採用する。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.ArrayList; import java.util.Timer; import java.util.TimerTask; import java.util.Random; class Main { static final int n = 500; static final int m = 3; static int arg = 0; static LinkedBlockingQueue<Task> tasks = new LinkedBlockingQueue<Task>(); static ExecutorService exe = Executors.newFixedThreadPool(n); public static void main(String[] args) { Timer timer = new Timer(); Runner runner = new Runner(); timer.scheduleAtFixedRate(runner, 0L, 1000L); } static class Runner extends TimerTask { public void run() { while (tasks.size() < n) { tasks.offer(new Task(arg++, m)); } ArrayList<Task> toInvoke = new ArrayList<Task>(); for (int i = 0; i < n; ++i) { toInvoke.add(tasks.poll()); } try { exe.invokeAll(toInvoke); } catch (Exception e) { e.printStackTrace(); } } } static class Task implements Callable<Void> { private int arg; private int nRetry; public Task(int arg, int nRetry) { this.arg = arg; this.nRetry = nRetry; } public Void call() { try { sendRequest(arg); } catch (Exception e) { if (nRetry > 0) { nRetry--; tasks.offer(this); } else { System.out.println("arg = " + arg + " tried " + m + " times, but failed."); } } return null; } } } |
失敗したときに何度かやり直すことを許すようなリクエストを送るときの QPS コントロールについて、問題を考察した。
これらについてそれぞれ解決策を示し、最終的にこの問題における設計パターンとコーディング例を示した。動作確認に使った動く完全なコード例を Gist にあげておく。
この実装の問題として、多くの場合毎秒処理されるタスクは1秒間のうち最初のほうに固まってしまい、いびつな間隔で実行されてしまうということがある。できれば 1 秒間のうちでうまく分散して実行されるようにしたい。
comments powered by Disqus