ConcurrentLinkedQueue(ロックフリーキュー) — 高スループット
複数スレッドから同時に追加・取得しても待ち時間がほぼ発生しない、ロックフリーな待ち行列が ConcurrentLinkedQueue。FIFO順を保ちつつ、非ブロッキングで高いスループットを発揮します。初心者向けに、使い方・落とし穴・実用テンプレートをまとめます。
特性と前提
- 非ブロッキング: 追加・取得が基本的に待たない。ロックで直列化しないためスループットが高い。
- FIFO順: 先に入れた要素が先に取り出される(先頭は滞在時間が最長の要素)。
- スレッドセーフ: 複数スレッドからの add/offer/poll/peek を安全に扱える。
- null禁止: 要素に null は入れられない。
- サイズ近似:
size()は正確でない可能性がある(並行更新中は近似値)。サイズに依存した待機ロジックは避ける。 - イテレーション: 走査は弱一貫性で、ConcurrentModificationException を出さず進むが「最新すべて」を見られる保証はない。
- ブロッキングしない: 空でも
poll()は即座に null を返す。待ちたいならLinkedBlockingQueueなどの「BlockingQueue」を使う。
基本コード例(プロデューサー・コンシューマー)
import java.util.concurrent.ConcurrentLinkedQueue;
public class CLQBasics {
public static void main(String[] args) {
ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<>();
// 追加(複数スレッドから呼んでOK)
q.offer("Task-1");
q.offer("Task-2");
// 取り出し(FIFO)
System.out.println(q.poll()); // Task-1
System.out.println(q.poll()); // Task-2
System.out.println(q.poll()); // null(空)
}
}
Java- ポイント:
offerで追加、pollで先頭取得(空なら null)、peekは参照のみ。
複数スレッドでの利用例(実用ミニサンプル)
import java.util.concurrent.*;
public class CLQWorkers {
private final ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
private volatile boolean running = true;
public void submit(Runnable task) { queue.offer(task); }
public void startWorkers(int nThreads) {
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread(() -> {
while (running) {
Runnable task = queue.poll();
if (task != null) {
task.run();
} else {
// 空なら短い休憩(スピンしすぎ防止)
try { Thread.sleep(1); } catch (InterruptedException ignored) {}
}
}
}, "worker-" + i);
t.start();
}
}
public void stop() { running = false; }
public static void main(String[] args) {
CLQWorkers ex = new CLQWorkers();
ex.startWorkers(2);
ex.submit(() -> System.out.println("A"));
ex.submit(() -> System.out.println("B"));
ex.submit(() -> System.out.println("C"));
try { Thread.sleep(50); } catch (InterruptedException ignored) {}
ex.stop();
}
}
Java- ポイント: 非ブロッキングゆえ「待機」は自前で調整。空の場合は短いスリープなどでCPU燃焼を防ぐ。
操作テンプレート集
- 基本宣言
ConcurrentLinkedQueue<Type> q = new ConcurrentLinkedQueue<>();
Java- 追加・取得
q.offer(x); // 末尾へ追加(失敗しない設計)
Type v = q.poll(); // 先頭から取得(空なら null)
Type h = q.peek(); // 先頭参照のみ(空なら null)
Java- 空判定・サイズ(近似)
boolean empty = q.isEmpty();
int approx = q.size(); // 近似値。厳密ロジックには使わない
Java- バルク取り出し(安全なドレイン)
Type v;
while ((v = q.poll()) != null) {
// v を処理
}
Java- for-each(弱一貫性)
for (Type t : q) { /* 途中で追加・削除されても例外は出ないが完全性は保証されない */ }
Java例題で理解する
例題1: 高速ログバッファ(非ブロッキング投入→定期的にドレイン)
import java.util.concurrent.*;
class LogBuffer {
private final ConcurrentLinkedQueue<String> buf = new ConcurrentLinkedQueue<>();
public void record(String line) { buf.offer(line); }
public void flush() {
String s;
while ((s = buf.poll()) != null) {
System.out.println(s); // 実務ではファイルやシンクへ
}
}
}
public class LoggerDemo {
public static void main(String[] args) throws Exception {
LogBuffer lb = new LogBuffer();
ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor();
ses.scheduleAtFixedRate(lb::flush, 0, 100, TimeUnit.MILLISECONDS);
// 複数スレッドから投入しても低待機
ExecutorService es = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
int id = i;
es.submit(() -> lb.record("event-" + id));
}
es.shutdown();
es.awaitTermination(1, TimeUnit.SECONDS);
Thread.sleep(300);
ses.shutdown();
}
}
Java例題2: 単純ワークキュー(プロデューサーと単一コンシューマー)
ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<>();
// producer
new Thread(() -> {
for (int i = 0; i < 5; i++) q.offer("Job-" + i);
}).start();
// consumer
new Thread(() -> {
for (;;) {
String job = q.poll();
if (job != null) {
// 処理
System.out.println("Do: " + job);
} else {
try { Thread.sleep(2); } catch (InterruptedException ignored) {}
}
}
}).start();
Javaよくある落とし穴と回避策
- 落とし穴: 空時に待たないため、CPUを回し続けるスピン
- 回避: 空のときは短時間スリープやバックオフ、あるいは「待機が必要」なら
LinkedBlockingQueue.take()を検討。
- 回避: 空のときは短時間スリープやバックオフ、あるいは「待機が必要」なら
- 落とし穴:
size()を待機判定に使う- 回避:
size()は近似。待機ロジックに使わず、poll()の結果で分岐。
- 回避:
- 落とし穴: null を入れる
- 回避: nullは不可。nullを「空の印」には使えない。必要なら明示的な終端トークンを定義。
- 落とし穴: 走査の完全性を期待
- 回避: 走査中の変更は許容されるが完全に反映されないことがある。完全処理が必要なら
poll()で逐次取り出す。
- 回避: 走査中の変更は許容されるが完全に反映されないことがある。完全処理が必要なら
- 落とし穴: ブロッキングが欲しいのに CLQ を選ぶ
- 回避: 待機が必要なら
BlockingQueue系(LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue)。
- 回避: 待機が必要なら
選定の目安と比較
- ConcurrentLinkedQueue を選ぶ場面
- 非ブロッキングでとにかく速く入出力したい。
- 複数プロデューサー/複数コンシューマーでも高スループットを維持したい。
- 「空でも待たずにすぐ返る」性質が欲しい。
- 代替の検討
- LinkedBlockingQueue: 待機・上限・バックプレッシャーが必要。
- ArrayDeque + 外部同期: 単スレッド、または自前同期で十分なケース。
- ConcurrentLinkedDeque: 両端操作が必要な並行処理。
ミニテンプレート集
- プロデューサー/コンシューマーの基本
ConcurrentLinkedQueue<T> q = new ConcurrentLinkedQueue<>();
// producer
q.offer(task);
// consumer
T t = q.poll();
if (t != null) handle(t);
Java- 空時バックオフ(CPU燃焼防止)
T t = q.poll();
if (t == null) { Thread.onSpinWait(); /* or Thread.sleep(1) */ }
Java- 定期ドレイン処理
T t;
while ((t = q.poll()) != null) process(t);
Java- 終了トークン(毒薬)による停止シグナル
final T POISON = /* 特別値 */;
if (t == POISON) break;
Javaまとめ
- ConcurrentLinkedQueue は非ブロッキングでスレッドセーフな FIFO キュー。高並行・高スループットの「入出力を待たない」場面で力を発揮する。
- 空時は待たずに null を返すため、待機が必要な設計には BlockingQueue が適切。
size()は近似、走査は弱一貫性という性質を理解し、offer/poll中心のシンプルなループで安全に扱うのが基本。
