Java 逆引き集 | Windowing(スライディング窓)パターン実装 — 時系列処理

Java Java
スポンサーリンク

目的と前提

時系列処理で「連続 N 件」「直近 N 秒」のように、一定範囲のデータを滑らせながら集計するのがスライディング窓(windowing)です。Java 標準の Stream API には専用ウィンドウ関数はありませんが、インデックス走査やバッファ(Deque)を組み合わせれば、固定サイズ窓・時間窓を安全に実装できます。重要な設計ポイントは「1回の走査で必要な状態だけ保持する」「順序を維持する」「短絡(limit / takeWhile)で無駄を減らす」の3つです。

固定サイズ窓の基本パターン

インデックス走査で List を窓切りする

連続 N 件ずつの窓を作る最もシンプルな方法は、List のインデックスを IntStream で走査し、範囲 i..i+N を切り出すことです。これは順序を保ちつつ、O(1) のランダムアクセスを活かせるため ArrayList に向いています。

List<Integer> series = List.of(1, 2, 3, 4, 5, 6);
int N = 3;

List<List<Integer>> windows =
    IntStream.range(0, Math.max(0, series.size() - N + 1))
             .mapToObj(i -> series.subList(i, i + N))
             .toList();

System.out.println(windows); // [[1,2,3],[2,3,4],[3,4,5],[4,5,6]]
Java

メモリ効率を保ちたいなら「サブリストをその場で集計」して結果のみを保存すると良いです。

List<Double> movingAvg =
    IntStream.range(0, Math.max(0, series.size() - N + 1))
             .mapToDouble(i -> series.subList(i, i + N).stream()
                                      .mapToDouble(x -> x)
                                      .average().orElse(Double.NaN))
             .boxed()
             .toList();

System.out.println(movingAvg); // [2.0, 3.0, 4.0, 5.0]
Java

ストリームの逐次走査+Deque(1回のパスで窓更新)

List 以外(Iterator や I/O)を扱う場合は、走査しながら固定長のバッファを更新して、窓が満たされたタイミングだけ集計します。これが最も現場向きで、メモリ・速度ともに安定します。

Deque<Integer> buf = new ArrayDeque<>();
int N = 3;
List<Double> avg = new ArrayList<>();

Stream.of(1,2,3,4,5,6).forEach(x -> {
    buf.addLast(x);
    if (buf.size() > N) buf.removeFirst();
    if (buf.size() == N) {
        double a = buf.stream().mapToDouble(i -> i).average().orElse(Double.NaN);
        avg.add(a);
    }
});

System.out.println(avg); // [2.0, 3.0, 4.0, 5.0]
Java

計算を高速化したい場合は「合計をインクリメンタル更新」して O(1) で移動平均を出せます。

int N = 3;
Deque<Integer> q = new ArrayDeque<>();
long sum = 0;
List<Double> avg2 = new ArrayList<>();

for (int x : List.of(1,2,3,4,5,6)) {
    q.addLast(x); sum += x;
    if (q.size() > N) sum -= q.removeFirst();
    if (q.size() == N) avg2.add(sum / (double) N);
}
Java

時間窓(直近 N 秒/分)の設計

タイムスタンプ付きイベントの直近 N 秒

時刻で滑らせる場合は、到着順に処理しつつ「窓開始時刻を超える古い要素を落とす」だけです。Deque と while ループで古いものを排除します。

record Event(long ts, double value) {}
long windowMs = 5_000;

Deque<Event> w = new ArrayDeque<>();
List<Double> sums = new ArrayList<>();

List<Event> events = List.of(
    new Event(1_000, 1.0),
    new Event(3_000, 2.0),
    new Event(6_000, 3.0), // ここで 1_000 は窓外へ
    new Event(8_000, 4.0)
);

for (Event e : events) {
    w.addLast(e);
    long start = e.ts() - windowMs;
    while (!w.isEmpty() && w.peekFirst().ts() < start) {
        w.removeFirst();
    }
    double sum = w.stream().mapToDouble(Event::value).sum();
    sums.add(sum);
}

System.out.println(sums); // 到着時点の直近5秒合計
Java

時間窓では「到着順が時刻順」前提が重要です。前提が崩れる(遅延到着)場合は、到着ごとにソート挿入するか、バッチ材質化→ソート→窓処理に切り替えます。

タンブリング窓(非オーバーラップ)との違い

スライディング窓は「毎ステップ更新」されますが、タンブリング窓は固定区間(例:毎1分)で非オーバーラップに集計します。タンブリングは「バケット分け→各バケットを reduce/collect」が定石で、滑らせない分だけ処理もリソースも軽くなります。

Map<Long, Double> perMin =
    events.stream()
          .collect(Collectors.groupingBy(e -> e.ts() / 60_000,
                     Collectors.summingDouble(Event::value)));
Java

実例で身につける設計

価格の移動平均(SMA)と移動最大(rolling max)

固定サイズ N の移動平均・最大は、Deque とインクリメンタル更新で高速に書けます。

List<Double> prices = List.of(100,101,102,98,97,99,103);
int N = 3;

Deque<Double> win = new ArrayDeque<>();
double sum = 0;
List<Double> sma = new ArrayList<>();
List<Double> rmax = new ArrayList<>();

for (double p : prices) {
    win.addLast(p); sum += p;
    if (win.size() > N) sum -= win.removeFirst();
    if (win.size() == N) {
        sma.add(sum / N);
        rmax.add(win.stream().mapToDouble(x -> x).max().orElse(Double.NaN));
    }
}

System.out.println(sma);  // 移動平均
System.out.println(rmax); // 移動最大(単純版)
Java

移動最大を高速にしたいなら「モノトニックキュー(値が降順になるように末尾を削る)」で O(1) 平均にできます。必要になったら差し替え可能です。

文字列ログの直近 K 行のエラー率

固定サイズ K の「最新ログ窓」で、ERROR の割合を出します。直近 K 行の更新だけで算出できるため、一回走査で十分です。

int K = 5;
Deque<String> w = new ArrayDeque<>();
int errCount = 0;
List<Double> ratio = new ArrayList<>();

for (String line : List.of("INFO","ERROR","WARN","ERROR","INFO","ERROR","INFO")) {
    w.addLast(line);
    if ("ERROR".equals(line)) errCount++;
    if (w.size() > K) {
        String old = w.removeFirst();
        if ("ERROR".equals(old)) errCount--;
    }
    if (w.size() == K) ratio.add(errCount / (double) K);
}

System.out.println(ratio); // 直近5行のエラー率
Java

深掘り:正しさと性能の勘所

一度の走査で状態を最小保持する

時系列処理は「順序通りに一度流す」が基本です。窓に必要なのは「中身(Deque)」「インクリメンタルで更新するための補助状態(合計・カウントなど)」のみ。余計な材質化や再走査を避けると、I/O でもメモリでも安定します。

LinkedList・Iterator・I/O に強い実装を選ぶ

List.subList での窓切りは ArrayList なら速いですが、LinkedList や Iterator には向きません。Deque バッファのパターンは「どんなソースでも一回の走査」で動くため、外部 API 統合やストリーム生成元が Iterator のときに有利です。

並列化は基本的に難しい(順序依存のため)

スライディング窓は「前の要素と次の要素で状態が重なる」ため、素直な並列化は効果が出づらいです。大規模データなら、まず前処理(map/filter)を並列で軽くし、窓の集計部は順次で行うか、タンブリング窓へ設計変更するのが安全です。

窓の境界条件と欠損の扱い

開始直後は窓が満たされるまで出力を遅らせるか、サイズ不足の窓を許容(小さい窓で集計)するかを仕様で決めます。欠損(null)や外れ値は事前に filter / map で適切に整形して、窓ロジックを純粋化するとバグが減ります。

テンプレート(すぐ使える雛形)

List の固定サイズ窓(インデックス版)

static <T> Stream<List<T>> sliding(List<T> list, int size) {
    int n = Math.max(0, list.size() - size + 1);
    return IntStream.range(0, n).mapToObj(i -> list.subList(i, i + size));
}
Java

ストリーム入力の固定サイズ窓(Deque+Consumer)

static <T> void sliding(Stream<T> in, int size, java.util.function.Consumer<List<T>> onWindow) {
    Deque<T> buf = new ArrayDeque<>(size);
    in.forEach(x -> {
        buf.addLast(x);
        if (buf.size() > size) buf.removeFirst();
        if (buf.size() == size) onWindow.accept(new ArrayList<>(buf));
    });
}
Java

時間窓(直近 durationMs)

static <T> List<T> pruneOld(java.util.function.Function<T, Long> ts, Deque<T> w, long nowMs, long durationMs) {
    long start = nowMs - durationMs;
    while (!w.isEmpty() && ts.apply(w.peekFirst()) < start) w.removeFirst();
    return new ArrayList<>(w);
}
Java

移動平均(インクリメンタル更新)

static List<Double> movingAvg(List<Double> xs, int size) {
    Deque<Double> q = new ArrayDeque<>(size);
    double sum = 0;
    List<Double> out = new ArrayList<>();
    for (double x : xs) {
        q.addLast(x); sum += x;
        if (q.size() > size) sum -= q.removeFirst();
        if (q.size() == size) out.add(sum / size);
    }
    return out;
}
Java

まとめ

Java の時系列 windowing は、専用 API がなくても「インデックス走査」か「Deque+インクリメンタル更新」で堅実に書けます。固定サイズ窓は List に対してインデックス走査が最短、Iterator や I/O では Deque を使った逐次処理が王道。時間窓は「古いものを落とす」ロジックを明確にし、順序前提と境界条件を仕様化。並列よりも一回の走査と短絡で無駄を削り、状態を最小化すれば、正しく速い時系列処理が組めます。

タイトルとURLをコピーしました