Java 逆引き集 | ConcurrentLinkedQueue(ロックフリーキュー) — 高スループット

Java Java
スポンサーリンク

ConcurrentLinkedQueue(ロックフリーキュー) — 高スループット

複数スレッドから同時に追加・取得しても待ち時間がほぼ発生しない、ロックフリーな待ち行列が ConcurrentLinkedQueue。FIFO順を保ちつつ、非ブロッキングで高いスループットを発揮します。初心者向けに、使い方・落とし穴・実用テンプレートをまとめます。


特性と前提

  • 非ブロッキング: 追加・取得が基本的に待たない。ロックで直列化しないためスループットが高い。
  • FIFO順: 先に入れた要素が先に取り出される(先頭は滞在時間が最長の要素)。
  • スレッドセーフ: 複数スレッドからの add/offer/poll/peek を安全に扱える。
  • null禁止: 要素に null は入れられない。
  • サイズ近似: size() は正確でない可能性がある(並行更新中は近似値)。サイズに依存した待機ロジックは避ける。
  • イテレーション: 走査は弱一貫性で、ConcurrentModificationException を出さず進むが「最新すべて」を見られる保証はない。
  • ブロッキングしない: 空でも poll() は即座に null を返す。待ちたいなら LinkedBlockingQueue などの「BlockingQueue」を使う。

基本コード例(プロデューサー・コンシューマー)

import java.util.concurrent.ConcurrentLinkedQueue;

public class CLQBasics {
    public static void main(String[] args) {
        ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<>();

        // 追加(複数スレッドから呼んでOK)
        q.offer("Task-1");
        q.offer("Task-2");

        // 取り出し(FIFO)
        System.out.println(q.poll()); // Task-1
        System.out.println(q.poll()); // Task-2
        System.out.println(q.poll()); // null(空)
    }
}
Java
  • ポイント: offer で追加、poll で先頭取得(空なら null)、peek は参照のみ。

複数スレッドでの利用例(実用ミニサンプル)

import java.util.concurrent.*;

public class CLQWorkers {
    private final ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
    private volatile boolean running = true;

    public void submit(Runnable task) { queue.offer(task); }

    public void startWorkers(int nThreads) {
        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread(() -> {
                while (running) {
                    Runnable task = queue.poll();
                    if (task != null) {
                        task.run();
                    } else {
                        // 空なら短い休憩(スピンしすぎ防止)
                        try { Thread.sleep(1); } catch (InterruptedException ignored) {}
                    }
                }
            }, "worker-" + i);
            t.start();
        }
    }

    public void stop() { running = false; }

    public static void main(String[] args) {
        CLQWorkers ex = new CLQWorkers();
        ex.startWorkers(2);

        ex.submit(() -> System.out.println("A"));
        ex.submit(() -> System.out.println("B"));
        ex.submit(() -> System.out.println("C"));

        try { Thread.sleep(50); } catch (InterruptedException ignored) {}
        ex.stop();
    }
}
Java
  • ポイント: 非ブロッキングゆえ「待機」は自前で調整。空の場合は短いスリープなどでCPU燃焼を防ぐ。

操作テンプレート集

  • 基本宣言
ConcurrentLinkedQueue<Type> q = new ConcurrentLinkedQueue<>();
Java
  • 追加・取得
q.offer(x);   // 末尾へ追加(失敗しない設計)
Type v = q.poll();  // 先頭から取得(空なら null)
Type h = q.peek();  // 先頭参照のみ(空なら null)
Java
  • 空判定・サイズ(近似)
boolean empty = q.isEmpty();
int approx = q.size(); // 近似値。厳密ロジックには使わない
Java
  • バルク取り出し(安全なドレイン)
Type v;
while ((v = q.poll()) != null) {
    // v を処理
}
Java
  • for-each(弱一貫性)
for (Type t : q) { /* 途中で追加・削除されても例外は出ないが完全性は保証されない */ }
Java

例題で理解する

例題1: 高速ログバッファ(非ブロッキング投入→定期的にドレイン)

import java.util.concurrent.*;

class LogBuffer {
    private final ConcurrentLinkedQueue<String> buf = new ConcurrentLinkedQueue<>();

    public void record(String line) { buf.offer(line); }

    public void flush() {
        String s;
        while ((s = buf.poll()) != null) {
            System.out.println(s); // 実務ではファイルやシンクへ
        }
    }
}

public class LoggerDemo {
    public static void main(String[] args) throws Exception {
        LogBuffer lb = new LogBuffer();
        ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor();
        ses.scheduleAtFixedRate(lb::flush, 0, 100, TimeUnit.MILLISECONDS);

        // 複数スレッドから投入しても低待機
        ExecutorService es = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            int id = i;
            es.submit(() -> lb.record("event-" + id));
        }

        es.shutdown();
        es.awaitTermination(1, TimeUnit.SECONDS);
        Thread.sleep(300);
        ses.shutdown();
    }
}
Java

例題2: 単純ワークキュー(プロデューサーと単一コンシューマー)

ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<>();

// producer
new Thread(() -> {
    for (int i = 0; i < 5; i++) q.offer("Job-" + i);
}).start();

// consumer
new Thread(() -> {
    for (;;) {
        String job = q.poll();
        if (job != null) {
            // 処理
            System.out.println("Do: " + job);
        } else {
            try { Thread.sleep(2); } catch (InterruptedException ignored) {}
        }
    }
}).start();
Java

よくある落とし穴と回避策

  • 落とし穴: 空時に待たないため、CPUを回し続けるスピン
    • 回避: 空のときは短時間スリープやバックオフ、あるいは「待機が必要」なら LinkedBlockingQueue.take() を検討。
  • 落とし穴: size() を待機判定に使う
    • 回避: size() は近似。待機ロジックに使わず、poll() の結果で分岐。
  • 落とし穴: null を入れる
    • 回避: nullは不可。nullを「空の印」には使えない。必要なら明示的な終端トークンを定義。
  • 落とし穴: 走査の完全性を期待
    • 回避: 走査中の変更は許容されるが完全に反映されないことがある。完全処理が必要なら poll() で逐次取り出す。
  • 落とし穴: ブロッキングが欲しいのに CLQ を選ぶ
    • 回避: 待機が必要なら BlockingQueue 系(LinkedBlockingQueueArrayBlockingQueuePriorityBlockingQueue)。

選定の目安と比較

  • ConcurrentLinkedQueue を選ぶ場面
    • 非ブロッキングでとにかく速く入出力したい。
    • 複数プロデューサー/複数コンシューマーでも高スループットを維持したい。
    • 「空でも待たずにすぐ返る」性質が欲しい。
  • 代替の検討
    • LinkedBlockingQueue: 待機・上限・バックプレッシャーが必要。
    • ArrayDeque + 外部同期: 単スレッド、または自前同期で十分なケース。
    • ConcurrentLinkedDeque: 両端操作が必要な並行処理。

ミニテンプレート集

  • プロデューサー/コンシューマーの基本
ConcurrentLinkedQueue<T> q = new ConcurrentLinkedQueue<>();
// producer
q.offer(task);
// consumer
T t = q.poll();
if (t != null) handle(t);
Java
  • 空時バックオフ(CPU燃焼防止)
T t = q.poll();
if (t == null) { Thread.onSpinWait(); /* or Thread.sleep(1) */ }
Java
  • 定期ドレイン処理
T t;
while ((t = q.poll()) != null) process(t);
Java
  • 終了トークン(毒薬)による停止シグナル
final T POISON = /* 特別値 */;
if (t == POISON) break;
Java

まとめ

  • ConcurrentLinkedQueue は非ブロッキングでスレッドセーフな FIFO キュー。高並行・高スループットの「入出力を待たない」場面で力を発揮する。
  • 空時は待たずに null を返すため、待機が必要な設計には BlockingQueue が適切。size() は近似、走査は弱一貫性という性質を理解し、offer/poll 中心のシンプルなループで安全に扱うのが基本。
タイトルとURLをコピーしました