Skip to content
Retour au Blog

Construire des systèmes multi-agents : séquentiel vs parallèle

· 11 min de lecture

Lorsque j’ai commencé à construire des agents IA chez rooguys.com, je pensais que plus d’agents signifiait de meilleurs résultats. J’avais tort. La vraie question n’est pas de savoir combien d’agents il vous faut, mais comment ils doivent collaborer.

Les systèmes multi-agents sont partout en 2026. Le rapport de Google montre que 65 % des entreprises expérimentent les agents IA, mais moins de 25 % les ont passés à l’échelle en production. Une raison ? Elles ne comprennent pas comment orchestrer efficacement plusieurs agents.

Dans cet article, je vais décortiquer les deux schémas fondamentaux des systèmes multi-agents : l’exécution séquentielle et l’exécution parallèle. Vous apprendrez quand utiliser chacun, verrez de vrais exemples de code, et comprendrez les compromis qui comptent en production.

Qu’est-ce qu’un système multi-agents ?

Un système multi-agents est exactement ce que son nom indique : plusieurs agents IA qui collaborent pour accomplir une tâche. Au lieu d’avoir un agent qui fait tout, vous disposez d’agents spécialisés qui coopèrent.

Voyez cela comme une équipe. Vous ne demanderiez pas à une seule personne de concevoir, développer, tester et déployer une fonctionnalité. Vous auriez un designer, un développeur, un ingénieur QA et un spécialiste DevOps. Chacun apporte son expertise. Chacun se concentre sur son domaine.

Les systèmes multi-agents fonctionnent de la même manière. Vous pourriez avoir :

  • Un agent de recherche qui collecte des informations
  • Un agent de planification qui élabore une stratégie
  • Un agent d’exécution qui réalise les tâches
  • Un agent de revue qui valide les résultats

La magie ne réside pas dans les agents eux-mêmes. Elle réside dans la façon dont ils communiquent et se coordonnent.

Exécution séquentielle : l’un après l’autre

L’exécution séquentielle est le schéma le plus simple. Les agents s’exécutent l’un après l’autre, chaque agent recevant la sortie du précédent.

Quand utiliser le séquentiel

L’exécution séquentielle fonctionne le mieux lorsque :

  1. Chaque étape dépend de la précédente. Vous ne pouvez pas relire du code qui n’a pas été écrit. Vous ne pouvez pas déployer une application qui n’a pas été testée.

  2. L’ordre est important. La recherche doit précéder la planification. La planification doit précéder l’exécution.

  3. Vous devez détecter les erreurs tôt. Si l’agent 1 échoue, vous vous arrêtez avant de gaspiller des ressources sur les agents 2, 3 et 4.

  4. Vous construisez un pipeline. Création de contenu, traitement de données, analyse de documents. Ce sont des pipelines naturels.

Exemple séquentiel : pipeline de création de contenu

Laissez-moi vous montrer un exemple réel. Chez rooguys.com, nous avons construit un système de création de contenu avec trois agents :

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator

class ContentState(TypedDict):
topic: str
research: str
outline: str
draft: str
final: str

def research_agent(state: ContentState) -> dict:
"""Agent 1: Research the topic"""
topic = state["topic"]
# Simulate research - in production, this would call search APIs
research = f"Key findings about {topic}:\n- Point 1\n- Point 2\n- Point 3"
return {"research": research}

def outline_agent(state: ContentState) -> dict:
"""Agent 2: Create outline from research"""
research = state["research"]
# Create structured outline
outline = f"Outline based on research:\n1. Introduction\n2. Main Points\n3. Conclusion"
return {"outline": outline}

def writer_agent(state: ContentState) -> dict:
"""Agent 3: Write content from outline"""
outline = state["outline"]
research = state["research"]
# Generate full content
draft = f"Full article based on outline and research..."
return {"draft": draft}

def editor_agent(state: ContentState) -> dict:
"""Agent 4: Edit and polish"""
draft = state["draft"]
# Polish the content
final = f"Edited and polished version of the draft..."
return {"final": final}

# Build the sequential graph
workflow = StateGraph(ContentState)
workflow.add_node("research", research_agent)
workflow.add_node("outline", outline_agent)
workflow.add_node("writer", writer_agent)
workflow.add_node("editor", editor_agent)

# Define sequential edges
workflow.set_entry_point("research")
workflow.add_edge("research", "outline")
workflow.add_edge("outline", "writer")
workflow.add_edge("writer", "editor")
workflow.add_edge("editor", END)

app = workflow.compile()

C’est propre et prévisible. Chaque agent fait une chose bien. La sortie circule naturellement de l’un à l’autre.

Avantages et inconvénients du séquentiel

Avantages :

  • Simple à comprendre et à déboguer
  • Gestion claire des erreurs à chaque étape
  • Utilisation prévisible des ressources
  • Facile à instrumenter (logs et monitoring)

Inconvénients :

  • Exécution globale plus lente
  • Un agent lent bloque tout le reste
  • Aucun bénéfice de la parallélisation
  • Peut sembler rigide pour des workflows complexes

Exécution parallèle : tous en même temps

L’exécution parallèle lance plusieurs agents simultanément. Chaque agent travaille de manière indépendante, et les résultats sont combinés à la fin.

Quand utiliser le parallèle

L’exécution parallèle fonctionne le mieux lorsque :

  1. Les tâches sont indépendantes. Analyser trois documents différents. Interroger trois API différentes. Traiter trois sources de données différentes.

  2. La vitesse est critique. Vous avez besoin de résultats rapidement, et attendre séquentiellement prendrait trop de temps.

  3. Vous agrégez de l’information. Collecter des données depuis plusieurs sources pour prendre une décision.

  4. Vous avez besoin de vérification redondante. Plusieurs agents vérifient la même chose pour garantir l’exactitude.

Exemple parallèle : recherche multi-sources

Voici un système parallèle qui effectue une recherche sur un sujet à partir de plusieurs sources :

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List
import operator

def merge_lists(left: list, right: list) -> list:
"""Custom reducer to merge lists from parallel agents"""
return left + right

class ResearchState(TypedDict):
topic: str
web_findings: Annotated[List[str], merge_lists]
academic_findings: Annotated[List[str], merge_lists]
news_findings: Annotated[List[str], merge_lists]
social_findings: Annotated[List[str], merge_lists]
summary: str

def web_research_agent(state: ResearchState) -> dict:
"""Agent 1: Search the web"""
topic = state["topic"]
findings = [f"Web finding 1 about {topic}", f"Web finding 2 about {topic}"]
return {"web_findings": findings}

def academic_research_agent(state: ResearchState) -> dict:
"""Agent 2: Search academic papers"""
topic = state["topic"]
findings = [f"Academic paper 1 on {topic}", f"Academic paper 2 on {topic}"]
return {"academic_findings": findings}

def news_research_agent(state: ResearchState) -> dict:
"""Agent 3: Search news sources"""
topic = state["topic"]
findings = [f"News article 1 about {topic}", f"News article 2 about {topic}"]
return {"news_findings": findings}

def social_research_agent(state: ResearchState) -> dict:
"""Agent 4: Analyze social media"""
topic = state["topic"]
findings = [f"Social trend 1 about {topic}", f"Social trend 2 about {topic}"]
return {"social_findings": findings}

def synthesis_agent(state: ResearchState) -> dict:
"""Agent 5: Combine all findings"""
all_findings = (
state["web_findings"] +
state["academic_findings"] +
state["news_findings"] +
state["social_findings"]
)
summary = f"Synthesized summary from {len(all_findings)} sources"
return {"summary": summary}

# Build the parallel graph
workflow = StateGraph(ResearchState)
workflow.add_node("web", web_research_agent)
workflow.add_node("academic", academic_research_agent)
workflow.add_node("news", news_research_agent)
workflow.add_node("social", social_research_agent)
workflow.add_node("synthesis", synthesis_agent)

# All research agents run in parallel, then synthesis
workflow.set_entry_point("web")  # Entry doesn't matter much for parallel
workflow.add_edge("web", "synthesis")
workflow.add_edge("academic", "synthesis")
workflow.add_edge("news", "synthesis")
workflow.add_edge("social", "synthesis")
workflow.add_edge("synthesis", END)

app = workflow.compile()

Dans LangGraph, lorsque plusieurs arêtes pointent vers le même nœud, elles s’exécutent en parallèle. L’agent de synthèse attend que les quatre agents de recherche aient terminé avant de s’exécuter.

Avantages et inconvénients du parallèle

Avantages :

  • Bien plus rapide pour des tâches indépendantes
  • Meilleure utilisation des ressources
  • Permet d’agréger diverses perspectives
  • Naturel pour les workflows de collecte de données

Inconvénients :

  • Gestion des erreurs plus complexe
  • Plus difficile à déboguer en cas de problème
  • Nécessite de gérer les défaillances partielles
  • La gestion de l’état devient délicate

Schémas hybrides : le meilleur des deux mondes

Les vrais systèmes en production utilisent rarement du séquentiel pur ou du parallèle pur. Ils combinent les deux.

Schéma 1 : recherche en parallèle, exécution séquentielle

Recherche en parallèle, puis exécution séquentielle :

[Research Agent 1] ─┐
[Research Agent 2] ─┼─> [Planning Agent] -> [Execution Agent] -> [Review Agent]
[Research Agent 3] ─┘

C’est courant dans les workflows automatisés. Vous collectez l’information rapidement, puis vous la traitez avec soin.

Schéma 2 : séquentiel avec vérification parallèle

Exécution séquentielle, mais vérification en parallèle :

[Research] -> [Planning] -> [Execution] -> [Review Agent 1]
└> [Review Agent 2]
                                           [Review Agent 3]

v
                                            [Final Decision]

Plusieurs relecteurs détectent davantage d’erreurs. C’est ainsi que nous gérons les opérations critiques chez rooguys.com.

Exemple hybride : système de contenu en production

Voici un exemple plus réaliste qui combine les deux schémas :

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List, Optional
import operator

class ProductionContentState(TypedDict):
topic: str
# Parallel research outputs
competitor_analysis: str
keyword_research: str
trend_analysis: str
# Sequential outputs
brief: str
outline: str
draft: str
# Parallel review outputs
seo_score: int
quality_score: int
fact_check_results: str
# Final output
final_content: str
approved: bool

# Parallel research agents
def competitor_agent(state: ProductionContentState) -> dict:
return {"competitor_analysis": "Competitor insights..."}

def keyword_agent(state: ProductionContentState) -> dict:
return {"keyword_research": "Target keywords..."}

def trend_agent(state: ProductionContentState) -> dict:
return {"trend_analysis": "Current trends..."}

# Sequential creation agents
def brief_agent(state: ProductionContentState) -> dict:
# Combines all research into a brief
brief = f"Content brief based on research..."
return {"brief": brief}

def outline_agent(state: ProductionContentState) -> dict:
return {"outline": "Detailed outline..."}

def writer_agent(state: ProductionContentState) -> dict:
return {"draft": "Full draft content..."}

# Parallel review agents
def seo_reviewer(state: ProductionContentState) -> dict:
return {"seo_score": 85}

def quality_reviewer(state: ProductionContentState) -> dict:
return {"quality_score": 90}

def fact_checker(state: ProductionContentState) -> dict:
return {"fact_check_results": "All facts verified"}

# Final decision agent
def final_reviewer(state: ProductionContentState) -> dict:
seo_ok = state["seo_score"] >= 80
quality_ok = state["quality_score"] >= 80
facts_ok = "verified" in state["fact_check_results"].lower()

approved = seo_ok and quality_ok and facts_ok

if approved:
return {
"final_content": state["draft"],
"approved": True
}
else:
return {
"final_content": "Needs revision",
"approved": False
}

# Build the hybrid graph
workflow = StateGraph(ProductionContentState)

# Add all nodes
workflow.add_node("competitor", competitor_agent)
workflow.add_node("keyword", keyword_agent)
workflow.add_node("trend", trend_agent)
workflow.add_node("brief", brief_agent)
workflow.add_node("outline", outline_agent)
workflow.add_node("writer", writer_agent)
workflow.add_node("seo_review", seo_reviewer)
workflow.add_node("quality_review", quality_reviewer)
workflow.add_node("fact_check", fact_checker)
workflow.add_node("final_review", final_reviewer)

# Parallel research phase
workflow.set_entry_point("competitor")
workflow.add_edge("competitor", "brief")
workflow.add_edge("keyword", "brief")
workflow.add_edge("trend", "brief")

# Sequential creation phase
workflow.add_edge("brief", "outline")
workflow.add_edge("outline", "writer")

# Parallel review phase
workflow.add_edge("writer", "seo_review")
workflow.add_edge("writer", "quality_review")
workflow.add_edge("writer", "fact_check")

# Final decision
workflow.add_edge("seo_review", "final_review")
workflow.add_edge("quality_review", "final_review")
workflow.add_edge("fact_check", "final_review")
workflow.add_edge("final_review", END)

app = workflow.compile()

Cette approche hybride vous offre la vitesse là où elle compte (recherche, revue) et le contrôle là où c’est essentiel (création de contenu).

Utiliser l’Agent Development Kit (ADK) de Google

L’Agent Development Kit (ADK) de Google est un framework plus récent qui gagne du terrain en 2026. Il est conçu pour rendre le développement d’agents plus proche du développement logiciel traditionnel, avec un support intégré pour les schémas séquentiels, parallèles et en boucle.

ADK est agnostique en matière de modèle (fonctionne avec Gemini, OpenAI, Anthropic, et d’autres) et agnostique en matière de déploiement (exécution locale, sur Cloud Run, ou Vertex AI Agent Engine). Laissez-moi vous montrer à quoi ressemblent les mêmes schémas dans ADK.

Pipeline séquentiel avec ADK

ADK fournit une primitive SequentialAgent qui gère l’orchestration automatiquement. La clé est d’utiliser output_key pour transmettre les données entre les agents :

from google.adk.agents import LlmAgent, SequentialAgent
from google.adk.tools import FunctionTool

# Define tools for each agent
def parse_pdf(file_path: str) -> str:
"""Parse PDF and extract text"""
# Your PDF parsing logic
return "Extracted text from PDF..."

def extract_data(text: str) -> dict:
"""Extract structured data from text"""
# Your extraction logic
return {"entities": [], "dates": [], "amounts": []}

def generate_summary(data: dict) -> str:
"""Generate summary from structured data"""
# Your summarization logic
return "Summary of extracted data..."

# Step 1: Parser Agent
parser = LlmAgent(
name="ParserAgent",
model="gemini-2.0-flash",
instruction="Parse the PDF file and extract all text content.",
tools=[FunctionTool(parse_pdf)],
output_key="raw_text"  # Writes to session.state["raw_text"]
)

# Step 2: Extractor Agent
extractor = LlmAgent(
name="ExtractorAgent",
model="gemini-2.0-flash",
instruction="Extract structured data from {raw_text}. Look for entities, dates, and amounts.",
tools=[FunctionTool(extract_data)],
output_key="structured_data"
)

# Step 3: Summarizer Agent
summarizer = LlmAgent(
name="SummarizerAgent",
model="gemini-2.0-flash",
instruction="Generate a concise summary from {structured_data}.",
tools=[FunctionTool(generate_summary)],
output_key="final_summary"
)

# Orchestrate with SequentialAgent
pipeline = SequentialAgent(
name="DocumentProcessingPipeline",
sub_agents=[parser, extractor, summarizer]
)

Remarquez comment ADK utilise la syntaxe {variable} dans les instructions pour référencer les valeurs d’état. Cela rend le flux de données explicite et lisible.

Fan-out/gather parallèle avec ADK

Pour l’exécution parallèle, ADK fournit ParallelAgent. Chaque agent écrit dans une clé unique pour éviter les conditions de concurrence :

from google.adk.agents import LlmAgent, ParallelAgent, SequentialAgent

# Parallel review agents
security_auditor = LlmAgent(
name="SecurityAuditor",
model="gemini-2.0-flash",
instruction="Review the code for security vulnerabilities: injection attacks, auth issues, data exposure.",
output_key="security_report"
)

style_checker = LlmAgent(
name="StyleEnforcer",
model="gemini-2.0-flash",
instruction="Check code for style compliance, formatting issues, and best practices.",
output_key="style_report"
)

performance_analyst = LlmAgent(
name="PerformanceAnalyst",
model="gemini-2.0-flash",
instruction="Analyze time complexity, memory usage, and potential bottlenecks.",
output_key="performance_report"
)

# Fan-out: Run all reviews in parallel
parallel_reviews = ParallelAgent(
name="CodeReviewSwarm",
sub_agents=[security_auditor, style_checker, performance_analyst]
)

# Gather: Synthesize all reports
pr_summarizer = LlmAgent(
name="PRSummarizer",
model="gemini-2.0-flash",
instruction="""Create a consolidated Pull Request review using:
    - Security: {security_report}
    - Style: {style_report}
    - Performance: {performance_report}

Provide actionable feedback organized by priority.""",
output_key="final_review"
)

# Combine: Parallel reviews followed by synthesis
workflow = SequentialAgent(
name="CodeReviewWorkflow",
sub_agents=[parallel_reviews, pr_summarizer]
)

Le ParallelAgent exécute tous les sous-agents simultanément dans des threads distincts, mais ils partagent le même état de session. C’est pourquoi des valeurs output_key uniques sont essentielles.

Schéma coordinateur/répartiteur avec ADK

ADK prend en charge un routage piloté par LLM dans lequel un agent coordinateur décide quel spécialiste invoquer :

from google.adk.agents import LlmAgent

# Specialist agents
billing_specialist = LlmAgent(
name="BillingSpecialist",
description="Handles billing inquiries, invoices, payment issues, and refund requests.",
model="gemini-2.0-flash",
instruction="Help the user with billing-related questions. Access the billing system as needed."
)

tech_support = LlmAgent(
name="TechSupportSpecialist",
description="Troubleshoots technical issues, bugs, errors, and platform problems.",
model="gemini-2.0-flash",
instruction="Help the user resolve technical issues. Use diagnostic tools when appropriate."
)

account_manager = LlmAgent(
name="AccountManager",
description="Handles account settings, profile changes, subscriptions, and access issues.",
model="gemini-2.0-flash",
instruction="Help the user with account-related requests."
)

# Coordinator with auto-routing
coordinator = LlmAgent(
name="SupportCoordinator",
model="gemini-2.0-flash",
instruction="""You are a support coordinator. Analyze the user's request and route to the appropriate specialist.

    - Billing issues -> BillingSpecialist
    - Technical problems -> TechSupportSpecialist  
    - Account changes -> AccountManager

If unsure, ask clarifying questions first.""",
sub_agents=[billing_specialist, tech_support, account_manager]
)

Le mécanisme AutoFlow d’ADK utilise le champ description des sous-agents pour prendre des décisions de routage. Soyez précis avec vos descriptions, elles constituent en pratique la documentation API destinée au LLM.

Générateur-critique avec boucle dans ADK

Le LoopAgent d’ADK permet des schémas de raffinement itératif :

from google.adk.agents import LlmAgent, LoopAgent, SequentialAgent

# Generator: Creates SQL queries
generator = LlmAgent(
name="SQLGenerator",
model="gemini-2.0-flash",
instruction="""Generate a SQL query based on the user's request.

If you receive {feedback}, fix the errors and regenerate the query.

Output only the SQL query, nothing else.""",
output_key="draft_query"
)

# Critic: Validates the SQL
critic = LlmAgent(
name="SQLCritic",
model="gemini-2.0-flash",
instruction="""Review {draft_query} for:
    1. SQL syntax correctness
    2. Valid table and column names
    3. No SQL injection vulnerabilities

If the query is valid, output exactly: PASS
If invalid, output the specific errors to fix.""",
output_key="feedback"
)

# Loop until PASS or max iterations
validation_loop = LoopAgent(
name="SQLValidationLoop",
sub_agents=[generator, critic],
max_iterations=5,
exit_condition="PASS"  # Exits when critic outputs "PASS"
)

# Complete workflow
sql_workflow = SequentialAgent(
name="SQLQueryWorkflow",
sub_agents=[validation_loop]
)

Ce schéma est puissant pour la génération de code, la création de contenu, ou toute tâche où la qualité prime sur la vitesse.

ADK vs LangGraph : que choisir et quand

Caractéristique

ADK

LangGraph

Courbe d’apprentissage

Plus douce, plus opinionée

Plus raide, plus flexible

Gestion d’état

session.state intégré

Configuration manuelle de StateGraph

Déploiement

Intégration native GCP

N’importe quelle plateforme

Support des boucles

LoopAgent natif

Nécessite des arêtes conditionnelles

Débogage

Tracing intégré

Instrumentation manuelle

Support des modèles

Agnostique

Agnostique

Choisissez ADK si :

  • Vous êtes déjà dans l’écosystème Google Cloud
  • Vous voulez un déploiement intégré vers Vertex AI
  • Vous préférez la convention plutôt que la configuration
  • Vous avez besoin de prototypage rapide

Choisissez LangGraph si :

  • Vous avez besoin d’un contrôle maximal sur le flux d’exécution
  • Vous construisez des workflows cycliques complexes
  • Vous voulez un déploiement agnostique du framework
  • Vous avez des investissements existants dans LangChain

Les deux sont prêts pour la production. Le meilleur choix dépend de votre stack existante et de la familiarité de votre équipe.

Gestion des erreurs dans les systèmes multi-agents

C’est là que la plupart des systèmes échouent en production. Vous devez gérer les erreurs avec élégance.

Gestion d’erreurs en séquentiel

Dans les systèmes séquentiels, vous pouvez vous arrêter tôt :

def safe_sequential_agent(state, agent_func, agent_name):
"""Wrapper that handles errors in sequential execution"""
try:
result = agent_func(state)
return result
except Exception as e:
print(f"Agent {agent_name} failed: {e}")
return {"error": str(e), "failed_at": agent_name}

# In your workflow, check for errors
def check_for_errors(state):
if "error" in state:
return END  # Stop the workflow
return "next_agent"

Gestion d’erreurs en parallèle

Dans les systèmes parallèles, vous devez décider : tout faire échouer, ou continuer avec des résultats partiels ?

from concurrent.futures import ThreadPoolExecutor, as_completed

def run_parallel_agents(agents, state):
"""Run agents in parallel with error handling"""
results = {}
errors = []

with ThreadPoolExecutor(max_workers=len(agents)) as executor:
futures = {
executor.submit(agent.run, state): name
for name, agent in agents.items()
}

for future in as_completed(futures):
agent_name = futures[future]
try:
results[agent_name] = future.result()
except Exception as e:
errors.append((agent_name, str(e)))
# Decide: continue or fail?
# Option 1: Continue with partial results
results[agent_name] = None
# Option 2: Fail everything
# raise e

return results, errors

Chez rooguys.com, nous adoptons une approche de « dégradation progressive ». Si un agent de recherche échoue, nous continuons avec les autres. Mais si un agent critique échoue, nous arrêtons tout.

Considérations de coût

Les systèmes multi-agents peuvent devenir coûteux. Voici comment maîtriser les coûts :

Le séquentiel est moins cher (en général)

Avec une exécution séquentielle, vous pouvez vous arrêter tôt en cas d’échec. Vous ne payez pas pour des agents qui ne s’exécutent jamais.

Les coûts du parallèle s’additionnent

Exécuter 5 agents en parallèle signifie 5 appels API simultanés. Si chaque appel coûte 0,01 $ et que vous traitez 1000 requêtes par jour, cela représente 50 $/jour pour un seul workflow.

Stratégies d’optimisation des coûts

  1. Mettez en cache de manière agressive. Si l’entrée d’un agent n’a pas changé, réutilisez sa sortie.
import hashlib
import json

def get_cached_result(agent_name, input_state, cache):
"""Check cache before running agent"""
cache_key = hashlib.md5(
json.dumps(input_state, sort_keys=True).encode()
).hexdigest()

full_key = f"{agent_name}:{cache_key}"

if full_key in cache:
return cache[full_key]

return None
  1. Utilisez des modèles moins chers pour les agents simples. Tous les agents n’ont pas besoin de GPT-4. Utilisez GPT-3.5 ou des modèles locaux pour la recherche et la collecte de données.

  2. Mettez en place des timeouts. Ne laissez pas les agents s’exécuter indéfiniment.

import signal
from contextlib import contextmanager

class TimeoutError(Exception):
pass

@contextmanager
def timeout(seconds):
def handler(signum, frame):
raise TimeoutError("Agent timed out")
signal.signal(signal.SIGALRM, handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)

# Usage
try:
with timeout(30):  # 30 second timeout
result = agent.run(state)
except TimeoutError:
result = {"error": "timeout"}

Observabilité : voir ce qui se passe

Vous ne pouvez pas déboguer ce que vous ne voyez pas. Les systèmes multi-agents nécessitent une bonne observabilité.

Que suivre

  1. Le temps d’exécution des agents. Quels agents sont lents ?
  2. L’utilisation de tokens par agent. Quels agents sont coûteux ?
  3. Les taux de succès/échec. Quels agents échouent le plus ?
  4. Les transitions d’état. Quelles données circulent entre les agents ?

Approche simple de logging

import time
import json
from datetime import datetime

class AgentLogger:
def __init__(self, workflow_name):
self.workflow_name = workflow_name
self.logs = []

def log_agent_start(self, agent_name, input_state):
self.logs.append({
"timestamp": datetime.now().isoformat(),
"event": "agent_start",
"agent": agent_name,
"input_keys": list(input_state.keys())
})

def log_agent_end(self, agent_name, output, duration_ms, tokens_used=0):
self.logs.append({
"timestamp": datetime.now().isoformat(),
"event": "agent_end",
"agent": agent_name,
"duration_ms": duration_ms,
"tokens_used": tokens_used,
"output_keys": list(output.keys()) if output else None
})

def log_error(self, agent_name, error):
self.logs.append({
"timestamp": datetime.now().isoformat(),
"event": "error",
"agent": agent_name,
"error": str(error)
})

# Usage in agent wrapper
def logged_agent(agent_func, agent_name, logger):
def wrapper(state):
logger.log_agent_start(agent_name, state)
start_time = time.time()
try:
result = agent_func(state)
duration = (time.time() - start_time) * 1000
logger.log_agent_end(agent_name, result, duration)
return result
except Exception as e:
logger.log_error(agent_name, e)
raise
return wrapper

Choisir le bon schéma

Voici un cadre de décision :

Situation

Schéma

Pourquoi

Chaque étape a besoin de la sortie précédente

Séquentiel

Les dépendances exigent un ordre

Les tâches sont totalement indépendantes

Parallèle

Aucune raison d’attendre

La vitesse est critique

Parallèle

Maximiser le débit

Le coût est critique

Séquentiel

Possibilité d’arrêt anticipé en cas d’échec

Besoin de plusieurs perspectives

Parallèle

Rassembler des entrées variées

Construction d’un pipeline

Séquentiel

Flux naturel

Workflow complexe

Hybride

Le meilleur des deux mondes

Leçons tirées de la production

Après avoir exécuté des systèmes multi-agents en production chez rooguys.com, voici mes principaux enseignements :

1. Commencez simple

Ne construisez pas un système à 10 agents dès le premier jour. Commencez avec 2 ou 3 agents. Ajoutez-en seulement quand vous comprenez le flux.

2. La spécialisation des agents compte

Les agents généralistes semblent flexibles, mais créent de la confusion. Les agents spécialisés avec des responsabilités claires sont plus faciles à déboguer et à améliorer.

3. La gestion d’état est difficile

À mesure que votre système grandit, l’état devient complexe. Utilisez des classes d’état typées et validez les transitions d’état.

from pydantic import BaseModel, validator
from typing import List, Optional

class AgentState(BaseModel):
topic: str
research: Optional[str] = None
outline: Optional[str] = None
draft: Optional[str] = None

@validator('research')
def research_must_not_be_empty(cls, v):
if v is not None and len(v.strip()) == 0:
raise ValueError('Research cannot be empty string')
return v

4. Testez chaque agent indépendamment

Avant de tester le système complet, testez chaque agent isolément. Mockez les entrées et vérifiez les sorties.

def test_research_agent():
state = {"topic": "AI Agents"}
result = research_agent(state)
assert "research" in result
assert len(result["research"]) > 0
print("Research agent test passed")

5. Surveillez l’utilisation des tokens

Les coûts des LLM peuvent exploser rapidement. Définissez des budgets et des alertes.

Frameworks à considérer

Plusieurs frameworks facilitent la construction de systèmes multi-agents :

  • Google ADK : Conçu pour les systèmes multi-agents avec des primitives natives séquentielles, parallèles et en boucle. Idéal pour les déploiements GCP.
  • LangGraph : Excellent pour les workflows complexes avec cycles et gestion d’état. Flexibilité maximale.
  • CrewAI : Idéal pour les équipes d’agents basées sur des rôles, avec des schémas de collaboration proches de l’humain.
  • AutoGen : Le framework de Microsoft pour les agents conversationnels et les dialogues multi-agents.
  • LlamaIndex : Performant pour les systèmes multi-agents basés sur du RAG, avec des workflows axés sur les documents.

J’utilise à la fois ADK et LangGraph en production. ADK pour les déploiements Google Cloud et le prototypage rapide. LangGraph pour les workflows complexes nécessitant un contrôle fin.

Conclusion

Les systèmes multi-agents sont puissants, mais ils exigent une conception soignée. L’exécution séquentielle vous donne contrôle et simplicité. L’exécution parallèle vous donne vitesse et diversité. Les schémas hybrides vous offrent les deux.

La clé est de comprendre votre cas d’usage. Posez-vous ces questions :

  1. Mes agents dépendent-ils de la sortie des autres ?
  2. La vitesse compte-t-elle plus que le coût ?
  3. Ai-je besoin de plusieurs perspectives ou d’un pipeline unique ?

Répondez à ces questions, et le bon schéma deviendra évident.

Commencez petit. Testez souvent. Surveillez tout. Et rappelez-vous : plus d’agents ne signifie pas de meilleurs résultats. C’est la bonne architecture qui fait la différence.

*Vous avez des questions sur la construction de systèmes multi-agents ? Connectez-vous avec moi sur LinkedIn ou consultez mes autres articles sur mbakayoko.com.*

Articles similaires