あなたのモノリスのAIサービスが文書に窒息を起こし始めたとき PDFのアップロードを処理し、テキストを抽出し、感情分析を実行し、概要を生成するドキュメント処理サービスを想像してください。通常の使用では、ユーザーが一度に500のドキュメントをアップロードする日までうまく機能します!突然、Node.jsサーバは20分間停止し、他のリクエストをブロックし、ユーザーにステータスアップデートや進捗に関するフィードバックがありません。 このような失敗は、共通の教訓を教えます: AI workloads should not run inside your web server. 解決策は? 複数のAI従業員に仕事を配布し、各言語が最善を尽くし、メッセージ列を用いてすべてを調整する方法です。ここでは、Node.jsをオーケストラとして、Node.jsとPythonをAI従業員として使用して、RabbitMQとCeleryを通じて通信する文書を並行して処理するシステムを構築しました。 なぜこのアーキテクチャは意味があるのか コードに潜入する前に、この特定の組み合わせがなぜ機能するのかについて話しましょう。 : 迅速でイベント主導で、HTTP リクエストの処理やタスクの調整に最適です。 Node.js as the orchestrator : ほとんどのAI/ML ライブラリ (spaCy、トランスフォーマー、sikit-learn) は Python ネイティブです。 Python ワーカーでハード AI 処理を実行すると、Node.js API が軽量になります。 Python AI workers : JavaScript が優れているタスク(PDF パッチング、OCR プレプロセッサ、特定の NLP タスク)では、ノードワーカーも使用できます。 Node.js AI workers : タスクが失われないことを保証する信頼性の高いメッセージブローカー. If a worker crashes, the task gets requeued automatically. RabbitMQ : Battle-tested distributed task queue for Python. Handles retries, priorities, and worker scaling out of the box. Pythonのためのバトルテストされた分散タスク列。 Celery 結果は、以下のようなシステムです。 Your API responds in milliseconds, not minutes Workers can scale independently (add 10 Python workers if you need more NLP power) Failed tasks automatically retry You can monitor everything in real-time アーキテクチャ 一目で 以下、我々が建設しているもの: ユーザーは REST API を通じてドキュメントをアップロードします。Node.js サーバーはそれを保存し、タスクを RabbitMQ に投稿し、すぐにタスク ID を返します。Python および Node.js 従業員は自分の能力に基づいてタスクを取得します。各従業員はその部分(テキスト抽出、翻訳、感情分析)を処理し、結果を返します。 財団を設立する Docker Compose を使用して、ステック全体をローカルに実行しています. これにより、労働者を追加したり、生産に似た条件でテストしたりすることは無意味です。 以下は、すべてを組み合わせるDocker Composeの設定です。 version: '3.8' services: rabbitmq: image: rabbitmq:3-management ports: - "5672:5672" - "15672:15672" environment: RABBITMQ_DEFAULT_USER: admin RABBITMQ_DEFAULT_PASS: admin123 redis: image: redis:7-alpine ports: - "6379:6379" api: build: context: ./api ports: - "3000:3000" depends_on: - rabbitmq - redis environment: RABBITMQ_URL: amqp://admin:admin123@rabbitmq:5672 REDIS_URL: redis://redis:6379 volumes: - ./uploads:/app/uploads python-worker: build: context: ./workers/python depends_on: - rabbitmq - redis environment: CELERY_BROKER_URL: amqp://admin:admin123@rabbitmq:5672 CELERY_RESULT_BACKEND: redis://redis:6379 deploy: replicas: 2 node-worker: build: context: ./workers/node depends_on: - rabbitmq - redis environment: RABBITMQ_URL: amqp://admin:admin123@rabbitmq:5672 REDIS_URL: redis://redis:6379 deploy: replicas: 2 Start the entire stack with そして、あなたはあなたのマシン上で動作する分散型AIパイプラインを持っています。 docker-compose up -d Node.js API:それを速くする API サーバーは、リクエストを受信し、タスクを送信し、結果を返します。 // api/src/server.js import express from 'express'; import amqp from 'amqplib'; import { createClient } from 'redis'; import multer from 'multer'; import { v4 as uuidv4 } from 'uuid'; const app = express(); const upload = multer({ dest: 'uploads/' }); let channel, redisClient; async function initializeConnections() { const connection = await amqp.connect(process.env.RABBITMQ_URL); channel = await connection.createChannel(); await channel.assertQueue('document_analysis', { durable: true }); redisClient = createClient({ url: process.env.REDIS_URL }); await redisClient.connect(); } app.post('/analyze', upload.single('document'), async (req, res) => { const jobId = uuidv4(); const { analysisType = 'full' } = req.body; const task = { jobId, filePath: req.file.path, fileName: req.file.originalname, analysisType, createdAt: new Date().toISOString() }; // Publish to RabbitMQ channel.sendToQueue('document_analysis', Buffer.from(JSON.stringify(task)), { persistent: true } ); // Store initial status in Redis await redisClient.set(`job:${jobId}`, JSON.stringify({ status: 'queued', progress: 0, createdAt: task.createdAt }), { EX: 86400 }); // 24h expiry res.json({ jobId, status: 'queued' }); }); app.get('/status/:jobId', async (req, res) => { const data = await redisClient.get(`job:${req.params.jobId}`); if (!data) { return res.status(404).json({ error: 'Job not found' }); } res.json(JSON.parse(data)); }); await initializeConnections(); app.listen(3000, () => console.log('API running on port 3000')); 注意 How 仕事IDで即座に戻ります. 実際の処理は従業員の中で非同期的に行われます. /analyze Python Workers: Heavy Lifting with Celery (Python労働者:セレリーで重いリフト) 私はCeleryを使用しています、それはRabbitMQと完璧に統合し、すべての分散列の複雑さを処理します。 # workers/python/tasks.py from celery import Celery import redis import json from transformers import pipeline import spacy # Initialize Celery app = Celery('tasks', broker=os.getenv('CELERY_BROKER_URL'), backend=os.getenv('CELERY_RESULT_BACKEND')) # Load models once at startup nlp = spacy.load('en_core_web_sm') sentiment_analyzer = pipeline('sentiment-analysis') redis_client = redis.from_url(os.getenv('CELERY_RESULT_BACKEND')) @app.task(bind=True) def analyze_sentiment(self, job_id, text): """Analyze sentiment of document text""" try: update_progress(job_id, 30, 'Analyzing sentiment') # Process in chunks if text is large chunk_size = 512 chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)] sentiments = [] for chunk in chunks: result = sentiment_analyzer(chunk)[0] sentiments.append(result) # Aggregate results avg_score = sum(s['score'] for s in sentiments) / len(sentiments) dominant_label = max(set(s['label'] for s in sentiments), key=lambda x: sum(1 for s in sentiments if s['label'] == x)) return { 'sentiment': dominant_label, 'confidence': avg_score, 'details': sentiments[:5] # First 5 chunks for detail } except Exception as e: self.retry(exc=e, countdown=60, max_retries=3) @app.task(bind=True) def extract_entities(self, job_id, text): """Extract named entities using spaCy""" try: update_progress(job_id, 50, 'Extracting entities') doc = nlp(text) entities = {} for ent in doc.ents: entity_type = ent.label_ if entity_type not in entities: entities[entity_type] = [] entities[entity_type].append({ 'text': ent.text, 'start': ent.start_char, 'end': ent.end_char }) return entities except Exception as e: self.retry(exc=e, countdown=60, max_retries=3) def update_progress(job_id, progress, message): """Update job progress in Redis""" job_key = f'job:{job_id}' data = redis_client.get(job_key) if data: job_data = json.loads(data) job_data['progress'] = progress job_data['message'] = message redis_client.set(job_key, json.dumps(job_data), ex=86400) セレリーの美しさは、これらの労働者を独立してスケールできるということです. もっと感情分析のパワーが必要ですか? もっと労働者コンテナを追加するだけです: . docker-compose up -d --scale python-worker=5 Node.js ワーカー:JavaScript が何を最もよくするかを処理する PDF パッシングやプレプロセッサなどのタスクでは、Node.js ワーカーは完璧です. 彼らは同じ RabbitMQ 列から消費し、Python ワーカーと共に作業します。 // workers/node/worker.js import amqp from 'amqplib'; import { createClient } from 'redis'; import pdf from 'pdf-parse'; import fs from 'fs/promises'; const redisClient = createClient({ url: process.env.REDIS_URL }); await redisClient.connect(); async function processDocument(task) { const { jobId, filePath, analysisType } = task; try { await updateProgress(jobId, 10, 'Extracting text from PDF'); // Read and parse PDF const dataBuffer = await fs.readFile(filePath); const pdfData = await pdf(dataBuffer); const result = { text: pdfData.text, pages: pdfData.numpages, metadata: pdfData.info, wordCount: pdfData.text.split(/\s+/).length }; await updateProgress(jobId, 20, 'Text extracted, queuing analysis tasks'); // Publish extracted text to Python workers for AI analysis if (analysisType === 'full' || analysisType === 'sentiment') { await publishTask('sentiment_analysis', { jobId, text: result.text }); } if (analysisType === 'full' || analysisType === 'entities') { await publishTask('entity_extraction', { jobId, text: result.text }); } // Store extraction results await storeResult(jobId, 'extraction', result); } catch (error) { console.error(`Error processing document ${jobId}:`, error); await updateProgress(jobId, -1, `Error: ${error.message}`); } } async function updateProgress(jobId, progress, message) { const jobKey = `job:${jobId}`; const data = await redisClient.get(jobKey); if (data) { const jobData = JSON.parse(data); jobData.progress = progress; jobData.message = message; jobData.status = progress < 0 ? 'failed' : progress === 100 ? 'completed' : 'processing'; await redisClient.set(jobKey, JSON.stringify(jobData), { EX: 86400 }); } } // Connect to RabbitMQ and start consuming const connection = await amqp.connect(process.env.RABBITMQ_URL); const channel = await connection.createChannel(); await channel.assertQueue('document_analysis', { durable: true }); channel.prefetch(1); channel.consume('document_analysis', async (msg) => { if (msg) { const task = JSON.parse(msg.content.toString()); await processDocument(task); channel.ack(msg); } }); console.log('Node.js worker waiting for documents...'); 連携結果: aggregator pattern 分散型労働者の課題の一つは、結果を収集することです.I use an aggregator pattern where workers store their results in Redis with a consistent key structure, and the API aggregates them on request. 私は、従業員が結果を一貫したキー構造でRedisに格納するアグレガーパターンを使用します。 // api/src/aggregator.js export async function getJobResults(jobId) { const jobData = await redisClient.get(`job:${jobId}`); if (!jobData) return null; const job = JSON.parse(jobData); // Gather all result keys for this job const resultKeys = await redisClient.keys(`job:${jobId}:*`); const results = {}; for (const key of resultKeys) { const [, , resultType] = key.split(':'); const data = await redisClient.get(key); results[resultType] = JSON.parse(data); } return { jobId, status: job.status, progress: job.progress, message: job.message, createdAt: job.createdAt, results }; } クライアントが呼ぶとき , 彼らはすべての完了した分析の完全なイメージを得る。 /status/:jobId 失敗を優しく扱う 分散型システムでは、物事が失敗します。労働者が崩壊し、ネットワークが崩壊し、モデルがタイムアウトします。 : Celery automatic retries @app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 5}) def risky_task(self, data): # If this fails, Celery will retry 3 times with 5-second delays pass 繰り返し失敗するメッセージの場合: Dead letter queues await channel.assertQueue('document_analysis_dlq', { durable: true }); await channel.assertQueue('document_analysis', { durable: true, arguments: { 'x-dead-letter-exchange': '', 'x-dead-letter-routing-key': 'document_analysis_dlq' } }); 従業員が永久に吊るされるのを防ぐために、 Timeouts @app.task(time_limit=300, soft_time_limit=270) def long_running_task(data): # Hard limit at 5 minutes, soft limit warning at 4.5 minutes pass あなたの分散パイプラインの監視 複数の作業員と列が存在する場合、視覚化が重要です。私はRabbitMQ管理コンソールを使用しています( ) 列の深さとメッセージレートを監視する。 http://localhost:15672 http://localhost:15672 アプリケーションレベルのモニタリングのために、私はメトリックエンドポイントを追加しました: app.get('/metrics', async (req, res) => { const connection = await amqp.connect(process.env.RABBITMQ_URL); const channel = await connection.createChannel(); const queue = await channel.checkQueue('document_analysis'); res.json({ queueDepth: queue.messageCount, consumers: queue.consumerCount, // Could add more metrics from Redis, worker health checks, etc. }); await connection.close(); }); 生産のために、プロメテウス・メトリックとグラファナ・ダッシュボードが必要ですが、開発中に迅速な洞察を得ることができます。 RabbitMQの地元ダッシュボードはこちら: システムのスケール化 このアーキテクチャの美しい点は、独立したスケーリングです。 AI処理パワーを増やすには: Scale Python workers docker-compose up -d --scale python-worker=10 より多くのPDF処理パスポート: Scale Node workers docker-compose up -d --scale node-worker=5 要求量が大きい場合: Scale the API docker-compose up -d --scale api=3 各コンポーネントはボトルネックに基づいて独立してスケールします. RabbitMQ は負荷バランスを自動的に処理します。 実践でのパフォーマンス 私はこのシステムを1000のPDF文書(平均10ページ)でテストしました。 47分、無反応。 : 8分, API responsive throughout 時間: 3.5 分 API Response Monolith (single Node.js process) Distributed (2 Node workers, 4 Python workers) Distributed (5 Node workers, 10 Python workers) スケールは、I/O またはネットワークの制限に到達するまで、従業員とほぼ線形にスケールします。 リアルワールドGOTCHAS I LEARN : トランスフォーマーモデルをロードするには10〜20秒かかります。 作業員がスタートするときに1回、タスクごとにではなく、毎回モデルを再充電していることに気づく前に「遅い労働者」をデバッグするのに時間を費やしました。 Model loading time RabbitMQ には、デフォルトのメッセージサイズ制限があります。 大きなドキュメントの場合、ファイルを保存して、コンテンツそのものではなく、メッセージ内のパスを渡します。 Message size limits Python AI 従業員は、モデルに応じて 2 ~ 4 GB をそれぞれ使用できます。 Worker memory RabbitMQ 接続を 1 人の従業員につき 1 つ作成し、再利用します。 Connection pooling このアーキテクチャを使用するとき このパターンは、次のように意味があります: あなたは数秒以上かかるタスクを処理しています。 パイプラインの異なる部分のための異なる言語/ランタイムが必要です。 コンポーネントを独立してスケールしたい 信頼性が重要(RabbitMQは負けたタスクを保証しない) 後でより多くの従業員タイプを追加できます。 それは単純なCRUD APIやミリ秒で完了するタスクのための過剰なものですが、AIワークロード、データ処理パイプライン、またはCPU密集したバックグラウンドの仕事の場合、このアーキテクチャは私を無数回救いました。 コード GitHubの完全なコードをご覧ください: https://github.com/ivmarcos/distributed-ai-document-analyzer https://github.com/ivmarcos/distributed-ai-document-analyzer リポジトリには、Docker Compose の完全な設定、API サーバ、両方のワークラータイプ、サンプルドキュメント、テスト スイートが含まれています。 システム全体が動いているのを見る。 docker-compose up