最終更新:

Node.js Streams完全ガイド: 大量データの効率的な処理パターン


はじめに

Node.js Streamsは、大量のデータを効率的に処理するための強力な抽象化です。ファイル全体をメモリに読み込むのではなく、小さなチャンクに分けて処理することで、メモリ効率を大幅に向上させます。

この記事では、Node.js Streamsの基礎から実践的な応用パターンまで、包括的に解説します。

Streamsの基本概念

なぜStreamsが必要か

通常のファイル読み込み:

// ❌ メモリ非効率
import fs from 'fs/promises';

const data = await fs.readFile('huge-file.txt', 'utf-8');
console.log(data.length);
// 10GBのファイルだと、10GBのメモリを消費

Streamsを使った場合:

// ✅ メモリ効率的
import fs from 'fs';

const stream = fs.createReadStream('huge-file.txt', 'utf-8');
let length = 0;

stream.on('data', (chunk) => {
  length += chunk.length;
});

stream.on('end', () => {
  console.log(length);
});
// 常に一定量のメモリしか消費しない

Streamの種類

  1. Readable Stream: データを読み取る(ファイル読み込み、HTTPリクエストボディなど)
  2. Writable Stream: データを書き込む(ファイル書き込み、HTTPレスポンスなど)
  3. Duplex Stream: 読み書き両方可能(TCP socket、WebSocketなど)
  4. Transform Stream: データを変換(圧縮、暗号化、パースなど)

Readable Stream

ファイルからの読み込み

// basic-readable.ts
import fs from 'fs';

const readableStream = fs.createReadStream('input.txt', {
  encoding: 'utf-8',
  highWaterMark: 16 * 1024, // 16KB チャンクサイズ
});

readableStream.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes`);
  console.log(chunk);
});

readableStream.on('end', () => {
  console.log('Stream ended');
});

readableStream.on('error', (error) => {
  console.error('Stream error:', error);
});

カスタムReadable Streamの作成

// custom-readable.ts
import { Readable } from 'stream';

class NumberStream extends Readable {
  private current = 1;
  private max: number;

  constructor(max: number) {
    super();
    this.max = max;
  }

  _read() {
    if (this.current <= this.max) {
      // データをプッシュ
      this.push(`${this.current}\n`);
      this.current++;
    } else {
      // ストリーム終了
      this.push(null);
    }
  }
}

// 使用例
const numberStream = new NumberStream(100);
numberStream.pipe(process.stdout);

非同期イテレータとしての使用

// async-iterator.ts
import fs from 'fs';

async function processFile() {
  const stream = fs.createReadStream('data.txt', 'utf-8');

  for await (const chunk of stream) {
    console.log(chunk);
    // 非同期処理も可能
    await processChunk(chunk);
  }
}

async function processChunk(chunk: string) {
  // データ処理
  return new Promise((resolve) => setTimeout(resolve, 100));
}

processFile();

Writable Stream

ファイルへの書き込み

// basic-writable.ts
import fs from 'fs';

const writableStream = fs.createWriteStream('output.txt', {
  encoding: 'utf-8',
});

writableStream.write('Hello, ');
writableStream.write('World!\n');

writableStream.end('Goodbye!', () => {
  console.log('Write completed');
});

writableStream.on('error', (error) => {
  console.error('Write error:', error);
});

カスタムWritable Streamの作成

// custom-writable.ts
import { Writable } from 'stream';

class ConsoleStream extends Writable {
  _write(
    chunk: Buffer,
    encoding: BufferEncoding,
    callback: (error?: Error | null) => void
  ) {
    console.log(`[LOG] ${chunk.toString()}`);
    callback();
  }

  _final(callback: (error?: Error | null) => void) {
    console.log('[LOG] Stream ended');
    callback();
  }
}

// 使用例
const consoleStream = new ConsoleStream();
consoleStream.write('Line 1\n');
consoleStream.write('Line 2\n');
consoleStream.end('Line 3\n');

バックプレッシャーの処理

// backpressure.ts
import fs from 'fs';

const readable = fs.createReadStream('large-input.txt');
const writable = fs.createWriteStream('output.txt');

readable.on('data', (chunk) => {
  const canContinue = writable.write(chunk);

  if (!canContinue) {
    // バックプレッシャー: 読み込みを一時停止
    console.log('Pausing read due to backpressure');
    readable.pause();
  }
});

writable.on('drain', () => {
  // バッファが空いたら再開
  console.log('Resuming read');
  readable.resume();
});

readable.on('end', () => {
  writable.end();
});

Transform Stream

カスタムTransform Streamの作成

// custom-transform.ts
import { Transform } from 'stream';

class UpperCaseTransform extends Transform {
  _transform(
    chunk: Buffer,
    encoding: BufferEncoding,
    callback: (error?: Error | null, data?: any) => void
  ) {
    const upperCased = chunk.toString().toUpperCase();
    this.push(upperCased);
    callback();
  }
}

// 使用例
const upperCase = new UpperCaseTransform();

process.stdin.pipe(upperCase).pipe(process.stdout);

JSONパーサー Transform Stream

// json-parser-transform.ts
import { Transform } from 'stream';

class JSONLineParser extends Transform {
  constructor() {
    super({ objectMode: true });
  }

  _transform(
    chunk: Buffer,
    encoding: BufferEncoding,
    callback: (error?: Error | null, data?: any) => void
  ) {
    const lines = chunk.toString().split('\n');

    for (const line of lines) {
      if (line.trim()) {
        try {
          const obj = JSON.parse(line);
          this.push(obj);
        } catch (error) {
          console.error('JSON parse error:', error);
        }
      }
    }

    callback();
  }
}

// 使用例
import fs from 'fs';

const readStream = fs.createReadStream('data.jsonl');
const parser = new JSONLineParser();

parser.on('data', (obj) => {
  console.log('Parsed object:', obj);
});

readStream.pipe(parser);

CSVパーサー

// csv-parser.ts
import { Transform } from 'stream';

interface CSVRow {
  [key: string]: string;
}

class CSVParser extends Transform {
  private headers: string[] | null = null;
  private buffer = '';

  constructor() {
    super({ objectMode: true });
  }

  _transform(
    chunk: Buffer,
    encoding: BufferEncoding,
    callback: (error?: Error | null, data?: any) => void
  ) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');

    // 最後の行は不完全かもしれないので残しておく
    this.buffer = lines.pop() || '';

    for (const line of lines) {
      if (!line.trim()) continue;

      const values = line.split(',').map((v) => v.trim());

      if (!this.headers) {
        // 最初の行はヘッダー
        this.headers = values;
      } else {
        // データ行をオブジェクトに変換
        const row: CSVRow = {};
        this.headers.forEach((header, index) => {
          row[header] = values[index] || '';
        });
        this.push(row);
      }
    }

    callback();
  }

  _flush(callback: (error?: Error | null, data?: any) => void) {
    // 最後の行を処理
    if (this.buffer.trim() && this.headers) {
      const values = this.buffer.split(',').map((v) => v.trim());
      const row: CSVRow = {};
      this.headers.forEach((header, index) => {
        row[header] = values[index] || '';
      });
      this.push(row);
    }
    callback();
  }
}

// 使用例
import fs from 'fs';

const csvStream = fs.createReadStream('data.csv');
const parser = new CSVParser();

parser.on('data', (row: CSVRow) => {
  console.log(row);
});

csvStream.pipe(parser);

Pipeline: Streamの組み合わせ

stream.pipelineの使用

// pipeline.ts
import { pipeline } from 'stream/promises';
import fs from 'fs';
import { createGzip } from 'zlib';

async function compressFile() {
  try {
    await pipeline(
      fs.createReadStream('input.txt'),
      createGzip(),
      fs.createWriteStream('input.txt.gz')
    );
    console.log('Compression completed');
  } catch (error) {
    console.error('Pipeline failed:', error);
  }
}

compressFile();

複数のTransformの連結

// multi-transform-pipeline.ts
import { pipeline } from 'stream/promises';
import { Transform } from 'stream';
import fs from 'fs';

// Transform 1: 行番号を追加
class AddLineNumbers extends Transform {
  private lineNumber = 1;

  _transform(
    chunk: Buffer,
    encoding: BufferEncoding,
    callback: (error?: Error | null, data?: any) => void
  ) {
    const lines = chunk.toString().split('\n');
    const numbered = lines.map((line) => {
      if (line.trim()) {
        return `${this.lineNumber++}: ${line}`;
      }
      return line;
    });
    this.push(numbered.join('\n'));
    callback();
  }
}

// Transform 2: 大文字に変換
class ToUpperCase extends Transform {
  _transform(
    chunk: Buffer,
    encoding: BufferEncoding,
    callback: (error?: Error | null, data?: any) => void
  ) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
}

// パイプライン
async function processFile() {
  await pipeline(
    fs.createReadStream('input.txt'),
    new AddLineNumbers(),
    new ToUpperCase(),
    fs.createWriteStream('output.txt')
  );
}

processFile();

実践的なユースケース

大きなファイルの行ごと処理

// process-lines.ts
import { createReadStream } from 'fs';
import { createInterface } from 'readline';

async function processLargeFile(filePath: string) {
  const fileStream = createReadStream(filePath);
  const rl = createInterface({
    input: fileStream,
    crlfDelay: Infinity,
  });

  let lineNumber = 0;

  for await (const line of rl) {
    lineNumber++;

    // 行ごとに処理
    if (line.includes('ERROR')) {
      console.log(`Error found on line ${lineNumber}: ${line}`);
    }
  }

  console.log(`Processed ${lineNumber} lines`);
}

processLargeFile('app.log');

HTTPストリーミングレスポンス

// http-streaming.ts
import http from 'http';
import fs from 'fs';

const server = http.createServer((req, res) => {
  if (req.url === '/download') {
    const filePath = 'large-file.zip';

    // ファイルサイズを取得
    const stat = fs.statSync(filePath);

    res.writeHead(200, {
      'Content-Type': 'application/zip',
      'Content-Length': stat.size,
      'Content-Disposition': 'attachment; filename="large-file.zip"',
    });

    // ファイルをストリーム
    const fileStream = fs.createReadStream(filePath);
    fileStream.pipe(res);

    fileStream.on('error', (error) => {
      console.error('Stream error:', error);
      res.statusCode = 500;
      res.end('Internal Server Error');
    });
  } else {
    res.statusCode = 404;
    res.end('Not Found');
  }
});

server.listen(3000, () => {
  console.log('Server running on http://localhost:3000');
});

データベースからのストリーミング

// db-streaming.ts
import { Readable } from 'stream';
import { prisma } from './db';

class DatabaseReadStream extends Readable {
  private skip = 0;
  private batchSize = 100;
  private tableName: string;

  constructor(tableName: string, batchSize = 100) {
    super({ objectMode: true });
    this.tableName = tableName;
    this.batchSize = batchSize;
  }

  async _read() {
    try {
      const records = await (prisma as any)[this.tableName].findMany({
        skip: this.skip,
        take: this.batchSize,
      });

      if (records.length === 0) {
        // データがなくなったらストリーム終了
        this.push(null);
        return;
      }

      // レコードを1つずつプッシュ
      records.forEach((record: any) => this.push(record));

      this.skip += records.length;
    } catch (error) {
      this.destroy(error as Error);
    }
  }
}

// 使用例
async function exportUsers() {
  const userStream = new DatabaseReadStream('user');
  const writeStream = fs.createWriteStream('users.jsonl');

  for await (const user of userStream) {
    writeStream.write(JSON.stringify(user) + '\n');
  }

  writeStream.end();
  console.log('Export completed');
}

exportUsers();

ファイルアップロードの処理

// upload-handler.ts
import { IncomingMessage } from 'http';
import { createWriteStream } from 'fs';
import { pipeline } from 'stream/promises';
import { createHash } from 'crypto';
import { Transform } from 'stream';

class HashTransform extends Transform {
  private hash = createHash('sha256');

  _transform(
    chunk: Buffer,
    encoding: BufferEncoding,
    callback: (error?: Error | null, data?: any) => void
  ) {
    this.hash.update(chunk);
    this.push(chunk);
    callback();
  }

  getHash(): string {
    return this.hash.digest('hex');
  }
}

async function handleUpload(req: IncomingMessage, uploadPath: string) {
  const hashTransform = new HashTransform();

  try {
    await pipeline(req, hashTransform, createWriteStream(uploadPath));

    const fileHash = hashTransform.getHash();
    console.log(`File uploaded successfully. SHA256: ${fileHash}`);

    return { success: true, hash: fileHash };
  } catch (error) {
    console.error('Upload failed:', error);
    return { success: false, error };
  }
}

リアルタイムログ処理

// log-processor.ts
import { Transform } from 'stream';
import WebSocket from 'ws';

class LogAnalyzer extends Transform {
  private errorCount = 0;
  private warningCount = 0;

  constructor(private ws: WebSocket) {
    super();
  }

  _transform(
    chunk: Buffer,
    encoding: BufferEncoding,
    callback: (error?: Error | null, data?: any) => void
  ) {
    const line = chunk.toString();

    if (line.includes('ERROR')) {
      this.errorCount++;
      this.ws.send(
        JSON.stringify({
          type: 'error',
          message: line,
          count: this.errorCount,
        })
      );
    } else if (line.includes('WARNING')) {
      this.warningCount++;
      this.ws.send(
        JSON.stringify({
          type: 'warning',
          message: line,
          count: this.warningCount,
        })
      );
    }

    this.push(chunk);
    callback();
  }

  _flush(callback: (error?: Error | null, data?: any) => void) {
    this.ws.send(
      JSON.stringify({
        type: 'summary',
        errors: this.errorCount,
        warnings: this.warningCount,
      })
    );
    callback();
  }
}

// WebSocketサーバー
const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
  const logStream = fs.createReadStream('/var/log/app.log');
  const analyzer = new LogAnalyzer(ws);

  logStream.pipe(analyzer);
});

エラーハンドリング

エラーの適切な処理

// error-handling.ts
import { pipeline } from 'stream/promises';
import fs from 'fs';
import { Transform } from 'stream';

class SafeTransform extends Transform {
  _transform(
    chunk: Buffer,
    encoding: BufferEncoding,
    callback: (error?: Error | null, data?: any) => void
  ) {
    try {
      // 処理
      const result = processChunk(chunk);
      this.push(result);
      callback();
    } catch (error) {
      // エラーをコールバックに渡す
      callback(error as Error);
    }
  }
}

async function safeProcessing() {
  try {
    await pipeline(
      fs.createReadStream('input.txt'),
      new SafeTransform(),
      fs.createWriteStream('output.txt')
    );
    console.log('Processing completed');
  } catch (error) {
    console.error('Processing failed:', error);
    // クリーンアップ処理
    await fs.promises.unlink('output.txt').catch(() => {});
  }
}

パフォーマンス最適化

highWaterMarkの調整

// optimize-chunk-size.ts
import fs from 'fs';

// デフォルト: 64KB
const defaultStream = fs.createReadStream('file.txt');

// 大きなチャンク: 1MB(大きなファイルに適している)
const largeChunkStream = fs.createReadStream('large-file.txt', {
  highWaterMark: 1024 * 1024,
});

// 小さなチャンク: 4KB(リアルタイム処理に適している)
const smallChunkStream = fs.createReadStream('realtime.log', {
  highWaterMark: 4 * 1024,
});

オブジェクトモード

// object-mode.ts
import { Transform } from 'stream';

class ObjectTransform extends Transform {
  constructor() {
    super({ objectMode: true });
  }

  _transform(
    obj: any,
    encoding: BufferEncoding,
    callback: (error?: Error | null, data?: any) => void
  ) {
    // オブジェクトを直接扱える
    this.push({ ...obj, processed: true, timestamp: Date.now() });
    callback();
  }
}

まとめ

Node.js Streamsは、大量データの効率的な処理に不可欠なツールです。メモリ効率、バックプレッシャー、エラーハンドリング、そしてパイプラインを理解することで、スケーラブルなアプリケーションを構築できます。

主なポイント:

  • Streamsはメモリ効率的なデータ処理を実現
  • 4種類のStream(Readable、Writable、Duplex、Transform)を理解する
  • pipelineを使った安全なStream処理
  • バックプレッシャーの適切な処理
  • カスタムStreamで柔軟な処理を実装

ファイル処理、HTTPストリーミング、データベース操作など、様々な場面でStreamsを活用することで、パフォーマンスとスケーラビリティを向上させることができます。