最終更新:

Node.js Worker Threads実践: CPU集約タスクの並列処理


Worker Threadsとは

Node.js Worker Threadsは、JavaScriptコードを別スレッドで実行するための機能です。CPU集約的な処理を並列化し、メインスレッド(イベントループ)をブロックせずにパフォーマンスを向上させます。

なぜWorker Threadsが必要か

// ❌ CPU集約タスクでメインスレッドがブロック
const express = require('express');
const app = express();

app.get('/heavy', (req, res) => {
  // フィボナッチ計算(CPU集約)
  const result = fibonacci(45);
  res.json({ result });
});

app.get('/health', (req, res) => {
  res.json({ status: 'ok' });
});

// /heavyのリクエスト中、/healthも応答できない!
// ✅ Worker Threadsで並列処理
const { Worker } = require('worker_threads');

app.get('/heavy', async (req, res) => {
  const worker = new Worker('./fibonacci-worker.js', {
    workerData: { n: 45 }
  });

  worker.on('message', (result) => {
    res.json({ result });
  });

  worker.on('error', (error) => {
    res.status(500).json({ error: error.message });
  });
});

// メインスレッドはブロックされない

基本的な使い方

シンプルなWorker

// main.js
const { Worker } = require('worker_threads');

function runWorker(data) {
  return new Promise((resolve, reject) => {
    const worker = new Worker('./worker.js', {
      workerData: data
    });

    worker.on('message', resolve);
    worker.on('error', reject);
    worker.on('exit', (code) => {
      if (code !== 0) {
        reject(new Error(`Worker stopped with exit code ${code}`));
      }
    });
  });
}

// 使用
runWorker({ n: 10 })
  .then(result => console.log('Result:', result))
  .catch(err => console.error('Error:', err));
// worker.js
const { workerData, parentPort } = require('worker_threads');

function fibonacci(n) {
  if (n <= 1) return n;
  return fibonacci(n - 1) + fibonacci(n - 2);
}

const result = fibonacci(workerData.n);
parentPort.postMessage(result);

TypeScriptでの実装

// main.ts
import { Worker } from 'worker_threads';
import path from 'path';

interface WorkerData {
  n: number;
}

interface WorkerResult {
  result: number;
  duration: number;
}

function runWorker<T, R>(
  workerPath: string,
  data: T
): Promise<R> {
  return new Promise((resolve, reject) => {
    const worker = new Worker(workerPath, {
      workerData: data,
    });

    worker.on('message', (message: R) => {
      resolve(message);
    });

    worker.on('error', reject);

    worker.on('exit', (code) => {
      if (code !== 0) {
        reject(new Error(`Worker exited with code ${code}`));
      }
    });
  });
}

// 使用例
async function main() {
  const result = await runWorker<WorkerData, WorkerResult>(
    path.join(__dirname, 'worker.js'),
    { n: 45 }
  );

  console.log(`Result: ${result.result}`);
  console.log(`Duration: ${result.duration}ms`);
}

main();
// worker.ts
import { workerData, parentPort } from 'worker_threads';

interface WorkerData {
  n: number;
}

interface WorkerResult {
  result: number;
  duration: number;
}

function fibonacci(n: number): number {
  if (n <= 1) return n;
  return fibonacci(n - 1) + fibonacci(n - 2);
}

const startTime = Date.now();
const result = fibonacci(workerData.n);
const duration = Date.now() - startTime;

const response: WorkerResult = {
  result,
  duration,
};

parentPort!.postMessage(response);

Worker Pool実装

基本的なWorker Pool

// worker-pool.ts
import { Worker } from 'worker_threads';
import EventEmitter from 'events';
import os from 'os';

interface Task<T, R> {
  data: T;
  resolve: (value: R) => void;
  reject: (error: Error) => void;
}

export class WorkerPool<T = any, R = any> extends EventEmitter {
  private workers: Worker[] = [];
  private freeWorkers: Worker[] = [];
  private queue: Task<T, R>[] = [];
  private workerPath: string;
  private poolSize: number;

  constructor(workerPath: string, poolSize = os.cpus().length) {
    super();
    this.workerPath = workerPath;
    this.poolSize = poolSize;

    // ワーカープール初期化
    for (let i = 0; i < poolSize; i++) {
      this.addWorker();
    }
  }

  private addWorker(): void {
    const worker = new Worker(this.workerPath);

    worker.on('message', (result: R) => {
      this.freeWorkers.push(worker);
      this.processQueue();
      this.emit('taskComplete', result);
    });

    worker.on('error', (error) => {
      this.emit('error', error);
    });

    this.workers.push(worker);
    this.freeWorkers.push(worker);
  }

  private processQueue(): void {
    if (this.queue.length === 0 || this.freeWorkers.length === 0) {
      return;
    }

    const task = this.queue.shift()!;
    const worker = this.freeWorkers.pop()!;

    const onMessage = (result: R) => {
      worker.off('message', onMessage);
      worker.off('error', onError);
      task.resolve(result);
      this.freeWorkers.push(worker);
      this.processQueue();
    };

    const onError = (error: Error) => {
      worker.off('message', onMessage);
      worker.off('error', onError);
      task.reject(error);
      this.freeWorkers.push(worker);
      this.processQueue();
    };

    worker.once('message', onMessage);
    worker.once('error', onError);
    worker.postMessage(task.data);
  }

  exec(data: T): Promise<R> {
    return new Promise((resolve, reject) => {
      this.queue.push({ data, resolve, reject });
      this.processQueue();
    });
  }

  async destroy(): Promise<void> {
    await Promise.all(
      this.workers.map((worker) => worker.terminate())
    );
    this.workers = [];
    this.freeWorkers = [];
    this.queue = [];
  }

  getStats() {
    return {
      poolSize: this.poolSize,
      activeWorkers: this.poolSize - this.freeWorkers.length,
      queuedTasks: this.queue.length,
    };
  }
}

使用例

// example-pool.ts
import { WorkerPool } from './worker-pool';
import path from 'path';

interface ImageTask {
  imagePath: string;
  operations: string[];
}

interface ImageResult {
  outputPath: string;
  duration: number;
}

async function processImages() {
  const pool = new WorkerPool<ImageTask, ImageResult>(
    path.join(__dirname, 'image-worker.js'),
    4 // 4ワーカー
  );

  const tasks = [
    { imagePath: '/images/1.jpg', operations: ['resize', 'compress'] },
    { imagePath: '/images/2.jpg', operations: ['resize', 'compress'] },
    { imagePath: '/images/3.jpg', operations: ['resize', 'compress'] },
  ];

  try {
    const results = await Promise.all(
      tasks.map((task) => pool.exec(task))
    );

    console.log('Processed images:', results);
    console.log('Pool stats:', pool.getStats());
  } finally {
    await pool.destroy();
  }
}

processImages();

実践例: 画像処理

// image-worker.ts
import { parentPort } from 'worker_threads';
import sharp from 'sharp';
import path from 'path';

interface ImageTask {
  imagePath: string;
  operations: Array<'resize' | 'compress' | 'grayscale'>;
}

parentPort!.on('message', async (task: ImageTask) => {
  const startTime = Date.now();

  try {
    let pipeline = sharp(task.imagePath);

    // 操作を適用
    for (const operation of task.operations) {
      switch (operation) {
        case 'resize':
          pipeline = pipeline.resize(800, 600, { fit: 'inside' });
          break;
        case 'compress':
          pipeline = pipeline.jpeg({ quality: 80 });
          break;
        case 'grayscale':
          pipeline = pipeline.grayscale();
          break;
      }
    }

    const outputPath = path.join(
      path.dirname(task.imagePath),
      'processed',
      path.basename(task.imagePath)
    );

    await pipeline.toFile(outputPath);

    parentPort!.postMessage({
      outputPath,
      duration: Date.now() - startTime,
    });
  } catch (error) {
    parentPort!.postMessage({
      error: error instanceof Error ? error.message : 'Unknown error',
    });
  }
});

実践例: CSVデータ処理

// csv-worker.ts
import { parentPort, workerData } from 'worker_threads';
import { parse } from 'csv-parse/sync';
import fs from 'fs';

interface CsvTask {
  filePath: string;
  transform: string; // 変換ロジック(文字列として受け取り)
}

interface CsvResult {
  rowCount: number;
  processedData: any[];
  duration: number;
}

const startTime = Date.now();

try {
  const { filePath, transform } = workerData as CsvTask;

  // CSVファイル読み込み
  const content = fs.readFileSync(filePath, 'utf-8');
  const records = parse(content, { columns: true });

  // 変換ロジックを実行(evalは本番では避けるべき)
  const transformFn = new Function('data', transform);
  const processedData = records.map(transformFn);

  const result: CsvResult = {
    rowCount: records.length,
    processedData,
    duration: Date.now() - startTime,
  };

  parentPort!.postMessage(result);
} catch (error) {
  throw error;
}
// csv-processor.ts
import { WorkerPool } from './worker-pool';
import path from 'path';

interface CsvTask {
  filePath: string;
  transform: string;
}

interface CsvResult {
  rowCount: number;
  processedData: any[];
  duration: number;
}

async function processCsvFiles() {
  const pool = new WorkerPool<CsvTask, CsvResult>(
    path.join(__dirname, 'csv-worker.js'),
    4
  );

  const files = [
    { file: 'sales-2024-q1.csv', transform: 'return { ...data, total: data.quantity * data.price }' },
    { file: 'sales-2024-q2.csv', transform: 'return { ...data, total: data.quantity * data.price }' },
    { file: 'sales-2024-q3.csv', transform: 'return { ...data, total: data.quantity * data.price }' },
  ];

  const results = await Promise.all(
    files.map((f) =>
      pool.exec({
        filePath: path.join('./data', f.file),
        transform: f.transform,
      })
    )
  );

  console.log('Total rows processed:', results.reduce((sum, r) => sum + r.rowCount, 0));
  console.log('Total duration:', results.reduce((sum, r) => sum + r.duration, 0));

  await pool.destroy();
}

processCsvFiles();

SharedArrayBufferでの共有メモリ

// shared-memory-example.ts
import { Worker } from 'worker_threads';

// 共有メモリバッファ
const sharedBuffer = new SharedArrayBuffer(1024);
const sharedArray = new Int32Array(sharedBuffer);

// メインスレッドで値を設定
Atomics.store(sharedArray, 0, 100);

const worker = new Worker(`
  const { parentPort, workerData } = require('worker_threads');
  const sharedArray = new Int32Array(workerData);

  // ワーカーで値を読み取り
  const value = Atomics.load(sharedArray, 0);
  console.log('Worker read:', value);

  // ワーカーで値を更新
  Atomics.add(sharedArray, 0, 50);

  parentPort.postMessage('done');
`, { eval: true, workerData: sharedBuffer });

worker.on('message', () => {
  // メインスレッドで更新された値を確認
  console.log('Main thread read:', Atomics.load(sharedArray, 0)); // 150
});

パフォーマンス計測

// benchmark.ts
import { Worker } from 'worker_threads';
import { performance } from 'perf_hooks';

async function benchmark() {
  const n = 42;

  // シングルスレッド
  const singleStart = performance.now();
  const singleResult = fibonacci(n);
  const singleDuration = performance.now() - singleStart;

  // マルチスレッド(4ワーカー)
  const multiStart = performance.now();
  const tasks = Array.from({ length: 4 }, () =>
    runWorker({ n: n })
  );
  await Promise.all(tasks);
  const multiDuration = performance.now() - multiStart;

  console.log(`Single thread: ${singleDuration.toFixed(2)}ms`);
  console.log(`Multi thread (4 workers): ${multiDuration.toFixed(2)}ms`);
  console.log(`Speedup: ${(singleDuration / multiDuration).toFixed(2)}x`);
}

function fibonacci(n: number): number {
  if (n <= 1) return n;
  return fibonacci(n - 1) + fibonacci(n - 2);
}

function runWorker(data: any): Promise<any> {
  return new Promise((resolve, reject) => {
    const worker = new Worker('./fibonacci-worker.js', {
      workerData: data,
    });
    worker.on('message', resolve);
    worker.on('error', reject);
  });
}

benchmark();

Expressとの統合

// server.ts
import express from 'express';
import { WorkerPool } from './worker-pool';
import path from 'path';

const app = express();
const port = 3000;

// ワーカープールを起動時に初期化
const pool = new WorkerPool(
  path.join(__dirname, 'task-worker.js'),
  4
);

app.use(express.json());

app.post('/process', async (req, res) => {
  try {
    const result = await pool.exec(req.body);
    res.json({ success: true, result });
  } catch (error) {
    res.status(500).json({
      success: false,
      error: error instanceof Error ? error.message : 'Unknown error',
    });
  }
});

app.get('/stats', (req, res) => {
  res.json(pool.getStats());
});

// グレースフルシャットダウン
process.on('SIGTERM', async () => {
  console.log('SIGTERM received, shutting down...');
  await pool.destroy();
  process.exit(0);
});

app.listen(port, () => {
  console.log(`Server running on port ${port}`);
});

エラーハンドリング

// robust-worker.ts
import { Worker } from 'worker_threads';

interface WorkerOptions {
  maxRetries?: number;
  timeout?: number;
}

async function runWorkerWithRetry<T, R>(
  workerPath: string,
  data: T,
  options: WorkerOptions = {}
): Promise<R> {
  const { maxRetries = 3, timeout = 30000 } = options;
  let lastError: Error | null = null;

  for (let attempt = 0; attempt < maxRetries; attempt++) {
    try {
      return await runWorkerWithTimeout<T, R>(workerPath, data, timeout);
    } catch (error) {
      lastError = error as Error;
      console.warn(`Worker attempt ${attempt + 1} failed:`, error);

      if (attempt < maxRetries - 1) {
        // 指数バックオフ
        await new Promise((resolve) =>
          setTimeout(resolve, Math.pow(2, attempt) * 1000)
        );
      }
    }
  }

  throw lastError || new Error('Worker failed after all retries');
}

function runWorkerWithTimeout<T, R>(
  workerPath: string,
  data: T,
  timeout: number
): Promise<R> {
  return new Promise((resolve, reject) => {
    const worker = new Worker(workerPath, { workerData: data });
    let timeoutId: NodeJS.Timeout;

    const cleanup = () => {
      clearTimeout(timeoutId);
      worker.terminate();
    };

    timeoutId = setTimeout(() => {
      cleanup();
      reject(new Error(`Worker timed out after ${timeout}ms`));
    }, timeout);

    worker.on('message', (result: R) => {
      cleanup();
      resolve(result);
    });

    worker.on('error', (error) => {
      cleanup();
      reject(error);
    });

    worker.on('exit', (code) => {
      cleanup();
      if (code !== 0) {
        reject(new Error(`Worker exited with code ${code}`));
      }
    });
  });
}

ベストプラクティス

1. ワーカー数の最適化

import os from 'os';

// CPU コア数に基づく
const optimalWorkerCount = os.cpus().length;

// I/O バウンドタスク: CPU コア数より多め
const ioWorkerCount = os.cpus().length * 2;

// CPU バウンドタスク: CPU コア数と同じか少なめ
const cpuWorkerCount = Math.max(1, os.cpus().length - 1);

2. メモリ効率

// ❌ 非効率: 大きなデータをコピー
worker.postMessage(largeArray); // データがシリアライズされる

// ✅ 効率的: 転送可能オブジェクトを使用
worker.postMessage(largeArrayBuffer, [largeArrayBuffer]);

3. プール管理

// アプリケーション起動時にプール作成
const pool = new WorkerPool('./worker.js', 4);

// 使い回す
app.post('/task1', async (req, res) => {
  const result = await pool.exec(req.body);
  res.json(result);
});

// 終了時にクリーンアップ
process.on('exit', async () => {
  await pool.destroy();
});

まとめ

Node.js Worker Threadsを活用することで、CPU集約タスクを並列処理し、アプリケーションのパフォーマンスを大幅に向上させることができます。ワーカープールパターンを実装し、適切なエラーハンドリングとリソース管理を行うことで、本番環境でも安定したマルチスレッドアプリケーションを構築できます。

Worker Threadsを使うべき場面

  • 画像/動画処理
  • 大量データの変換・集計
  • 暗号化/復号化
  • 機械学習の推論
  • PDFレンダリング
  • コード圧縮・難読化

次のステップ

  • Cluster APIとの違いを理解
  • メッセージパッシングの最適化
  • SharedArrayBufferでのロック制御
  • 分散処理への拡張(BullMQ等)