AI Skill Hub 强烈推荐:agentic-rag-for-dummies Agent工作流 是一款优质的Agent工作流。已获得 3.3k 颗 GitHub Star,AI 综合评分 8.2 分,在同类工具中表现稳健。如果你正在寻找可靠的Agent工作流解决方案,这是一个值得深入了解的选择。
基于LangGraph构建的模块化智能RAG系统,提供完整的检索增强生成工作流。通过Jupyter Notebook详细教学,适合想学习Agent AI和RAG技术的开发者快速上手和实践。
agentic-rag-for-dummies Agent工作流 是一套完整的 AI Agent 自动化工作流方案。通过可视化的节点编排,将复杂的多步骤任务拆解为清晰的自动化流程,实现全程无人值守的智能处理。支持与数百种外部服务和 API 无缝集成,适合构建数据处理管线、业务自动化和 AI 辅助决策系统。
基于LangGraph构建的模块化智能RAG系统,提供完整的检索增强生成工作流。通过Jupyter Notebook详细教学,适合想学习Agent AI和RAG技术的开发者快速上手和实践。
agentic-rag-for-dummies Agent工作流 是一套完整的 AI Agent 自动化工作流方案。通过可视化的节点编排,将复杂的多步骤任务拆解为清晰的自动化流程,实现全程无人值守的智能处理。支持与数百种外部服务和 API 无缝集成,适合构建数据处理管线、业务自动化和 AI 辅助决策系统。
# 克隆仓库 git clone https://github.com/GiovanniPasq/agentic-rag-for-dummies cd agentic-rag-for-dummies # 查看安装说明 cat README.md # 按 README 完成环境依赖安装后即可使用
# 查看帮助 agentic-rag-for-dummies --help # 基本运行 agentic-rag-for-dummies [options] <input> # 详细使用说明请查阅文档 # https://github.com/GiovanniPasq/agentic-rag-for-dummies
# agentic-rag-for-dummies 配置说明 # 查看配置选项 agentic-rag-for-dummies --config-example > config.yml # 常见配置项 # output_dir: ./output # log_level: info # workers: 4 # 环境变量(覆盖配置文件) export AGENTIC_RAG_FOR_DUMMIES_CONFIG="/path/to/config.yml"
<p align="center"> <img alt="Agentic RAG for Dummies Logo" src="assets/logo.png" width="350px"> </p>
<p align="center"> <strong>Build a modular Agentic RAG system with LangGraph, conversation memory, and human-in-the-loop query clarification</strong> </p>
<p align="center"> <a href="#overview">Overview</a> • <a href="#how-it-works">How It Works</a> • <a href="#llm-provider-configuration">LLM Providers</a> • <a href="#implementation">Implementation</a> • <a href="#installation--usage">Installation & Usage</a> • <a href="#troubleshooting">Troubleshooting</a> </p>
<p align="center"> <img src="https://img.shields.io/github/stars/GiovanniPasq/agentic-rag-for-dummies?style=social" alt="GitHub Stars"/> <img src="https://img.shields.io/github/forks/GiovanniPasq/agentic-rag-for-dummies?style=social" alt="GitHub Forks"/> <img src="https://img.shields.io/badge/license-MIT-green" alt="License"/> <a href="https://github.com/von-development/awesome-langgraph"> <img src="https://awesome.re/badge.svg" alt="Awesome LangGraph"/> </a> </p>
<p align="center"> <img src="https://img.shields.io/badge/python-3.11%2B-blue?logo=python&logoColor=white" alt="Python"/> <img src="https://img.shields.io/badge/LangGraph-1.2%2B-orange?logo=langchain&logoColor=white" alt="LangGraph"/> <img src="https://img.shields.io/badge/Qdrant-vector%20db-DC244C" alt="Qdrant"/> <img src="https://img.shields.io/badge/LLM%20Providers-Ollama%20%7C%20OpenAI%20%7C%20Anthropic%20%7C%20Google-purple" alt="LLM Providers"/> </p>
<p align="center"> <a href="https://colab.research.google.com/github/GiovanniPasq/agentic-rag-for-dummies/blob/main/notebooks/agentic_rag.ipynb"> <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/> </a> </p>
<p align="center"> <img alt="Agentic RAG Demo" src="assets/demo.gif" width="650px"> </p>
<p align="center"> <strong>If you like this project, a star ⭐️ would mean a lot :)</strong><br> </p>
This repository demonstrates how to build an Agentic RAG (Retrieval-Augmented Generation) system using LangGraph with minimal code. Most RAG tutorials show basic concepts but lack guidance on building modular, agent-driven systems — this project bridges that gap by providing both learning materials and an extensible architecture.
| Feature | Description |
|---|---|
| 🗂️ **Hierarchical Indexing** | Search small chunks for precision, retrieve large Parent chunks for context |
| 🧠 **Conversation Memory** | Maintains context across questions for natural dialogue |
| ❓ **Query Clarification** | Rewrites ambiguous queries or pauses to ask the user for details |
| 🤖 **Agent Orchestration** | LangGraph coordinates the full retrieval and reasoning workflow |
| 🔀 **Multi-Agent Map-Reduce** | Decomposes complex queries into parallel sub-queries |
| ✅ **Self-Correction** | Re-queries automatically if initial results are insufficient |
| 🗜️ **Context Compression** | Keeps working memory lean across long retrieval loops |
| 🔍 **Observability** | Track LLM calls, tool usage, and graph execution with Langfuse |
| 📊 **Evaluation** | Evaluate retrieval and answer quality with RAGAS metrics |
ollama pull granite4.1:8b
python from langchain_ollama import ChatOllama
llm = ChatOllama(model="granite4.1:8b", temperature=0, seed=42) ``` > ⚠️ For reliable tool calling and instruction following, prefer models 8B+. Smaller models may ignore retrieval instructions or hallucinate. See Troubleshooting.
---
Define paths and initialize core components.
import os
from pathlib import Path
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_qdrant.fastembed_sparse import FastEmbedSparse
from qdrant_client import QdrantClient
DOCS_DIR = "docs" # Directory containing your pdf files
MARKDOWN_DIR = "markdown_docs" # Directory containing the pdfs converted to markdown
PARENT_STORE_PATH = "parent_store" # Directory for parent chunk JSON files
CHILD_COLLECTION = "document_child_chunks"
DEFAULT_RETRIEVAL_K = 7
CHILD_CHUNK_SEPARATOR = "\n\n<CHILD_CHUNK_BOUNDARY>\n\n"
os.makedirs(DOCS_DIR, exist_ok=True)
os.makedirs(MARKDOWN_DIR, exist_ok=True)
os.makedirs(PARENT_STORE_PATH, exist_ok=True)
from langchain_ollama import ChatOllama
llm = ChatOllama(model="granite4.1:8b", temperature=0, seed=42)
dense_embeddings = HuggingFaceEmbeddings(model_name="Qwen/Qwen3-Embedding-0.6B")
sparse_embeddings = FastEmbedSparse(model_name="Qdrant/bm25")
client = QdrantClient(path="qdrant_db")
---
Create the processing nodes and edges for the LangGraph workflow.
#### Main Graph Nodes & Edges
from langgraph.types import Send, Command
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, RemoveMessage, ToolMessage
from typing import Literal, Set
MAIN_HISTORY_MESSAGES_TO_KEEP = 4
if MAIN_HISTORY_MESSAGES_TO_KEEP < 2:
raise ValueError("MAIN_HISTORY_MESSAGES_TO_KEEP must be at least 2.")
PRE_ANSWER_HISTORY_MESSAGES_TO_KEEP = max(MAIN_HISTORY_MESSAGES_TO_KEEP - 1, 0)
def _is_plain_conversation_message(msg) -> bool:
return (
isinstance(msg, (HumanMessage, AIMessage))
and not getattr(msg, "tool_calls", None)
and not getattr(msg, "name", None)
)
def _name_internal_message(message, name):
"""Tag a subgraph-only message so it is not treated as chat history."""
return message.model_copy(update={"name": name})
def _retrieval_contexts(messages) -> list[str]:
contexts = []
ignored_prefixes = (
"NO_RELEVANT_CHUNKS",
"NO_PARENT_DOCUMENT",
"RETRIEVAL_ERROR:",
"PARENT_RETRIEVAL_ERROR:",
)
for message in messages:
if not isinstance(message, ToolMessage):
continue
content = str(message.content).strip()
if content and not content.startswith(ignored_prefixes):
parts = content.split(CHILD_CHUNK_SEPARATOR) if message.name == "search_child_chunks" else [content]
contexts.extend(part for part in parts if part)
return list(dict.fromkeys(contexts))
def _format_conversation(messages) -> str:
lines = []
for msg in messages:
role = "User" if isinstance(msg, HumanMessage) else "Assistant"
lines.append(f"{role}: {msg.content}")
return "\n".join(lines)
def _remove_messages_not_in(messages, keep_ids):
removals = []
for msg in messages:
msg_id = getattr(msg, "id", None)
if isinstance(msg, SystemMessage) or not msg_id:
continue
if msg_id not in keep_ids:
removals.append(RemoveMessage(id=msg_id))
return removals
def _recent_conversation(messages, pending_query="") -> list:
"""Return recent context before the current user message."""
plain_messages = [msg for msg in messages if _is_plain_conversation_message(msg)]
recent_messages = plain_messages[:-1]
if pending_query:
for index in range(len(recent_messages) - 1, -1, -1):
msg = recent_messages[index]
if isinstance(msg, HumanMessage) and str(msg.content).strip() == pending_query:
return recent_messages[:index]
return recent_messages
def summarize_history(state: State):
messages = state.get("messages", [])
updates = {"agent_answers": [{"__reset__": True}]}
if not messages:
return updates
plain_messages = [msg for msg in messages if _is_plain_conversation_message(msg)]
keep_count = PRE_ANSWER_HISTORY_MESSAGES_TO_KEEP
messages_to_summarize = plain_messages[:-keep_count] if len(plain_messages) > keep_count else []
keep_ids = {getattr(msg, "id", None) for msg in plain_messages[-keep_count:]}
keep_ids.discard(None)
removals = _remove_messages_not_in(messages, keep_ids)
if removals:
updates["messages"] = removals
if not messages_to_summarize:
return updates
existing_summary = state.get("conversation_summary", "").strip()
conversation = "Existing summary:\n"
conversation += f"{existing_summary or '(none)'}\n\n"
conversation += "New messages to merge into the summary:\n"
conversation += _format_conversation(messages_to_summarize)
summary_response = llm.invoke([
SystemMessage(content=get_conversation_summary_prompt()),
HumanMessage(content=conversation),
])
updates["conversation_summary"] = summary_response.content.strip()
return updates
def rewrite_query(state: State):
last_message = state["messages"][-1]
current_query = str(last_message.content).strip()
conversation_summary = state.get("conversation_summary", "").strip()
pending_query = state.get("pendingQuery", "").strip()
pending_clarifications = state.get("pendingClarifications", [])
recent_messages = _recent_conversation(state["messages"], pending_query)
context_parts = []
if conversation_summary:
context_parts.append(f"Conversation Summary:\n{conversation_summary}")
if recent_messages:
context_parts.append(f"Recent Conversation:\n{_format_conversation(recent_messages)}")
if pending_query:
clarifications = [*pending_clarifications, current_query]
clarification_text = "\n".join(
f"{index}. {value}" for index, value in enumerate(clarifications, start=1)
)
context_parts.append(
f"Unresolved User Query:\n{pending_query}\n\n"
f"User Clarifications:\n{clarification_text}"
)
original_query = f"{pending_query}\nClarifications:\n{clarification_text}"
else:
clarifications = []
context_parts.append(f"User Query:\n{current_query}")
original_query = current_query
context_section = "\n\n".join(context_parts)
llm_with_structure = llm.with_structured_output(QueryAnalysis)
response = llm_with_structure.invoke([SystemMessage(content=get_rewrite_query_prompt()), HumanMessage(content=context_section)])
clarification_message_update = (
[_name_internal_message(last_message, "clarification_response")]
if pending_query else []
)
if response.questions and response.is_clear:
return {
"questionIsClear": True,
"originalQuery": original_query,
"pendingQuery": "",
"pendingClarifications": [],
"rewrittenQuestions": response.questions,
"messages": clarification_message_update,
}
clarification = response.clarification_needed if response.clarification_needed and len(response.clarification_needed.strip()) > 10 else "I need more information to understand your question."
return {
"questionIsClear": False,
"originalQuery": "",
"pendingQuery": pending_query or current_query,
"pendingClarifications": clarifications,
"rewrittenQuestions": [],
"messages": clarification_message_update + [
AIMessage(content=clarification, name="clarification")
],
}
def request_clarification(state: State):
return {}
def route_after_rewrite(state: State) -> Literal["request_clarification", "agent"]:
if not state.get("questionIsClear", False):
return "request_clarification"
else:
return [
Send("agent", {"question": query, "question_index": idx, "messages": []})
for idx, query in enumerate(state["rewrittenQuestions"])
]
def aggregate_answers(state: State):
messages = state.get("messages", [])
plain_messages = [msg for msg in messages if _is_plain_conversation_message(msg)]
keep_ids = {getattr(msg, "id", None) for msg in plain_messages[-PRE_ANSWER_HISTORY_MESSAGES_TO_KEEP:]}
keep_ids.discard(None)
removals = _remove_messages_not_in(messages, keep_ids)
if not state.get("agent_answers"):
return {"messages": removals + [AIMessage(content="No answers were generated.")]}
sorted_answers = sorted(state["agent_answers"], key=lambda x: x["index"])
formatted_answers = ""
for i, ans in enumerate(sorted_answers, start=1):
formatted_answers += (f"\nRetrieved response {i}:\n"f"{ans['answer']}\n")
user_message = HumanMessage(content=f"""Original user question: {state["originalQuery"]}\nRetrieved answers:{formatted_answers}""")
synthesis_response = llm.invoke([SystemMessage(content=get_aggregation_prompt()), user_message])
return {"messages": removals + [AIMessage(content=synthesis_response.content)]}
---
#### Agent Subgraph Nodes & Edges
def orchestrator(state: AgentState):
context_summary = state.get("context_summary", "").strip()
sys_msg = SystemMessage(content=get_orchestrator_prompt())
summary_injection = (
[HumanMessage(content=f"[COMPRESSED CONTEXT FROM PRIOR RESEARCH]\n\n{context_summary}")]
if context_summary else []
)
if not state.get("messages"):
human_msg = HumanMessage(content=state["question"], name="agent_question")
force_search = HumanMessage(content="YOU MUST CALL 'search_child_chunks' AS THE FIRST STEP TO ANSWER THIS QUESTION.")
response = llm_with_tools.invoke([sys_msg] + summary_injection + [human_msg, force_search])
response = _name_internal_message(response, "agent_response")
return {"messages": [human_msg, response], "tool_call_count": len(response.tool_calls or []), "iteration_count": 1}
response = llm_with_tools.invoke([sys_msg] + summary_injection + state["messages"])
response = _name_internal_message(response, "agent_response")
tool_calls = response.tool_calls if hasattr(response, "tool_calls") else []
return {"messages": [response], "tool_call_count": len(tool_calls) if tool_calls else 0, "iteration_count": 1}
def route_after_orchestrator_call(state: AgentState) -> Literal["tools", "fallback_response", "collect_answer"]:
iteration = state.get("iteration_count", 0)
tool_count = state.get("tool_call_count", 0)
last_message = state["messages"][-1]
tool_calls = getattr(last_message, "tool_calls", None) or []
if not tool_calls:
return "collect_answer"
# Accept a final answer at the iteration boundary, but do not execute
# tool calls that would exceed the configured research budget.
if iteration >= MAX_ITERATIONS or tool_count > MAX_TOOL_CALLS:
return "fallback_response"
return "tools"
def fallback_response(state: AgentState):
seen = set()
unique_contents = []
for m in state["messages"]:
if isinstance(m, ToolMessage) and m.content not in seen:
unique_contents.append(m.content)
seen.add(m.content)
context_summary = state.get("context_summary", "").strip()
context_parts = []
if context_summary:
context_parts.append(f"## Compressed Research Context (from prior iterations)\n\n{context_summary}")
if unique_contents:
context_parts.append(
"## Retrieved Data (current iteration)\n\n" +
"\n\n".join(f"--- DATA SOURCE {i} ---\n{content}" for i, content in enumerate(unique_contents, 1))
)
context_text = "\n\n".join(context_parts) if context_parts else "No data was retrieved from the documents."
prompt_content = (
f"USER QUERY: {state.get('question')}\n\n"
f"{context_text}\n\n"
f"INSTRUCTION:\nProvide the best possible answer using only the data above."
)
response = llm.invoke([SystemMessage(content=get_fallback_response_prompt()), HumanMessage(content=prompt_content)])
response = _name_internal_message(response, "agent_response")
return {"messages": [response]}
def should_compress_context(state: AgentState) -> Command[Literal["compress_context", "orchestrator"]]:
messages = state["messages"]
new_ids: Set[str] = set()
for msg in reversed(messages):
if isinstance(msg, AIMessage) and getattr(msg, "tool_calls", None):
for tc in msg.tool_calls:
if tc["name"] == "retrieve_parent_chunks":
raw = tc["args"].get("parent_id") or tc["args"].get("id") or tc["args"].get("ids") or []
if isinstance(raw, str):
new_ids.add(f"parent::{raw}")
else:
new_ids.update(f"parent::{r}" for r in raw)
elif tc["name"] == "search_child_chunks":
query = tc["args"].get("query", "")
if query:
new_ids.add(f"search::{query}")
break
updated_ids = state.get("retrieval_keys", set()) | new_ids
current_token_messages = estimate_context_tokens(messages)
current_token_summary = estimate_context_tokens([HumanMessage(content=state.get("context_summary", ""))])
current_tokens = current_token_messages + current_token_summary
max_allowed = BASE_TOKEN_THRESHOLD + int(current_token_summary * TOKEN_GROWTH_FACTOR)
goto = "compress_context" if current_tokens > max_allowed else "orchestrator"
return Command(
update={
"retrieval_keys": updated_ids,
"retrieved_contexts": _retrieval_contexts(messages),
},
goto=goto,
)
def compress_context(state: AgentState):
messages = state["messages"]
existing_summary = state.get("context_summary", "").strip()
if not messages:
return {}
conversation_text = f"USER QUESTION:\n{state.get('question')}\n\nConversation to compress:\n\n"
if existing_summary:
conversation_text += f"[PRIOR COMPRESSED CONTEXT]\n{existing_summary}\n\n"
for msg in messages[1:]:
if isinstance(msg, AIMessage):
tool_calls_info = ""
if getattr(msg, "tool_calls", None):
calls = ", ".join(f"{tc['name']}({tc['args']})" for tc in msg.tool_calls)
tool_calls_info = f" | Tool calls: {calls}"
conversation_text += f"[ASSISTANT{tool_calls_info}]\n{msg.content or '(tool call only)'}\n\n"
elif isinstance(msg, ToolMessage):
tool_name = getattr(msg, "name", "tool")
conversation_text += f"[TOOL RESULT — {tool_name}]\n{msg.content}\n\n"
summary_response = llm.invoke([SystemMessage(content=get_context_compression_prompt()), HumanMessage(content=conversation_text)])
new_summary = summary_response.content
retrieved_ids: Set[str] = state.get("retrieval_keys", set())
if retrieved_ids:
parent_ids = sorted(r for r in retrieved_ids if r.startswith("parent::"))
search_queries = sorted(r.replace("search::", "") for r in retrieved_ids if r.startswith("search::"))
block = "\n\n---\n**Already executed (do NOT repeat):**\n"
if parent_ids:
block += "Parent chunks retrieved:\n" + "\n".join(f"- {p.replace('parent::', '')}" for p in parent_ids) + "\n"
if search_queries:
block += "Search queries already run:\n" + "\n".join(f"- {q}" for q in search_queries) + "\n"
new_summary += block
return {"context_summary": new_summary, "messages": [RemoveMessage(id=m.id) for m in messages[1:]]}
def collect_answer(state: AgentState):
last_message = state["messages"][-1]
is_valid = isinstance(last_message, AIMessage) and last_message.content and not last_message.tool_calls
answer = last_message.content if is_valid else "Unable to generate an answer."
return {
"final_answer": answer,
"agent_answers": [{
"index": state["question_index"],
"question": state["question"],
"answer": answer,
"contexts": state.get("retrieved_contexts", []),
}]
}
Why this architecture? - Summarization maintains conversational context without overwhelming the LLM - Query rewriting ensures search queries are precise and unambiguous, using context intelligently - Human-in-the-loop catches unclear queries before wasting any retrieval resources - Parallel execution via Send API spawns independent agent subgraphs for each sub-question simultaneously - Context compression keeps the agent's working memory lean across long retrieval loops, preventing redundant fetches - Fallback response ensures graceful degradation — the agent always returns something useful even when the budget runs out - Answer collection & aggregation extracts clean final answers from agents and aggregates them into a single coherent response ---
Assemble the complete workflow graph with conversation memory and multi-agent architecture.
from langgraph.graph import START, END, StateGraph
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()
agent_builder = StateGraph(AgentState)
agent_builder.add_node(orchestrator)
agent_builder.add_node("tools", ToolNode([search_child_chunks, retrieve_parent_chunks]))
agent_builder.add_node(compress_context)
agent_builder.add_node(fallback_response)
agent_builder.add_node(should_compress_context)
agent_builder.add_node(collect_answer)
agent_builder.add_edge(START, "orchestrator")
agent_builder.add_conditional_edges("orchestrator", route_after_orchestrator_call, {"tools": "tools", "fallback_response": "fallback_response", "collect_answer": "collect_answer"})
agent_builder.add_edge("tools", "should_compress_context")
agent_builder.add_edge("compress_context", "orchestrator")
agent_builder.add_edge("fallback_response", "collect_answer")
agent_builder.add_edge("collect_answer", END)
agent_subgraph = agent_builder.compile()
graph_builder = StateGraph(State)
graph_builder.add_node(summarize_history)
graph_builder.add_node(rewrite_query)
graph_builder.add_node(request_clarification)
graph_builder.add_node("agent", agent_subgraph)
graph_builder.add_node(aggregate_answers)
graph_builder.add_edge(START, "summarize_history")
graph_builder.add_edge("summarize_history", "rewrite_query")
graph_builder.add_conditional_edges("rewrite_query", route_after_rewrite)
graph_builder.add_edge("request_clarification", "rewrite_query")
graph_builder.add_edge(["agent"], "aggregate_answers")
graph_builder.add_edge("aggregate_answers", END)
agent_graph = graph_builder.compile(checkpointer=checkpointer, interrupt_before=["request_clarification"])
Graph architecture explained:
The architecture flow diagram can be viewed here.
Agent Subgraph (processes individual questions): - START → orchestrator (invoke LLM with tools) - orchestrator → tools (if tool calls needed) OR fallback_response (if budget exhausted) OR collect_answer (if done) - tools → should_compress_context (check token budget) - should_compress_context → compress_context (if threshold exceeded) OR orchestrator (otherwise) - compress_context → orchestrator (resume with compressed memory) - fallback_response → collect_answer (package best-effort answer) - collect_answer → END (clean final answer with index)
Main Graph (orchestrates complete workflow): - START → summarize_history (roll older chat into summary and keep only recent exchanges) - summarize_history → rewrite_query (rewrite query with context, check clarity) - rewrite_query → request_clarification (if unclear) OR spawn parallel agent subgraphs via Send (if clear) - request_clarification → rewrite_query (after user provides clarification) - All agent subgraphs → aggregate_answers (merge all responses) - aggregate_answers → END (return final synthesized answer)
---
Sample pdf files can be found here: javascript, blockchain, fortinet_TMP.pdf).
See project/README.md for full Docker instructions and system requirements.
Google Colab: The notebook clones the repository and installs its requirements. Upload PDFs to docs/. Standard hosted Colab does not provide Ollama, so replace the default Ollama model cell with one of the documented cloud-provider examples before running the remaining cells.
Local (Jupyter/VSCode): Optionally create and activate a virtual environment, install dependencies with pip install -r requirements.txt or uv pip install -r requirements.txt, add your PDFs to docs/, then run all cells top to bottom.
The chat interface will appear at the end.
With Conversation Memory:
User: "How do I install SQL?"
Agent: [Provides installation steps from documentation]
User: "How do I update it?"
Agent: [Understands "it" = SQL, provides update instructions]
With Query Clarification:
User: "Tell me about that thing"
Agent: "I need more information. What specific topic are you asking about?"
User: "The installation process for PostgreSQL"
Agent: [Retrieves and answers with specific information]
---
This system is provider-agnostic: the runnable app uses Ollama by default, and the chat model initialization can be adapted to any LLM provider available in LangChain. The examples below cover the most common options, but the same pattern applies to any other supported provider.
Note: Model names change frequently. Always check the official documentation for the latest available models and their identifiers before deploying.
Set up Qdrant to store child chunks with hybrid search capabilities.
from qdrant_client.http import models as qmodels
from langchain_qdrant import QdrantVectorStore
from langchain_qdrant.qdrant import RetrievalMode
embedding_dimension = len(dense_embeddings.embed_query("test"))
def ensure_collection(collection_name):
if not client.collection_exists(collection_name):
client.create_collection(
collection_name=collection_name,
vectors_config=qmodels.VectorParams(
size=embedding_dimension,
distance=qmodels.Distance.COSINE
),
sparse_vectors_config={
"sparse": qmodels.SparseVectorParams()
},
)
---
Hard limits on tool calls and iterations prevent infinite loops. Token counting (via tiktoken) drives context compression decisions.
import tiktoken
from functools import lru_cache
MAX_TOOL_CALLS = 8 # Maximum tool calls per agent run
MAX_ITERATIONS = 10 # Maximum agent loop iterations
BASE_TOKEN_THRESHOLD = 2000 # Initial token threshold for compression
TOKEN_GROWTH_FACTOR = 0.9 # Multiplier applied after each compression
@lru_cache(maxsize=1)
def _get_token_encoding():
try:
return tiktoken.encoding_for_model("gpt-4")
except Exception:
try:
return tiktoken.get_encoding("cl100k_base")
except Exception:
return None
def estimate_context_tokens(messages: list) -> int:
contents = [
str(msg.content)
for msg in messages
if hasattr(msg, "content") and msg.content
]
encoding = _get_token_encoding()
if encoding is None:
return sum(max(1, len(content) // 4) for content in contents)
return sum(len(encoding.encode(content)) for content in contents)
---
#### 1. Install Dependencies ```bash
python -m venv .venv && source .venv/bin/activate pip install -r requirements.txt
uv venv .venv source .venv/bin/activate # Windows: .venv\Scripts\activate uv pip install -r requirements.txt
#### 2. Run the Applicationbash python project/app.py ```
Open the local URL (e.g., http://127.0.0.1:7860) to start chatting.
---
Build a Gradio interface with conversation persistence and human-in-the-loop support. For a complete end-to-end pipeline Gradio interface, including document ingestion, please refer to project/README.md.
Note: The notebook and full project stream the final aggregated answer while showing query analysis and tool activity in separate collapsible blocks. Raw orchestrator, compression, and fallback model output remains internal. The example below is intentionally minimal.
import gradio as gr
import uuid
def create_thread_id():
"""Generate a unique thread ID for each conversation"""
return {"configurable": {"thread_id": str(uuid.uuid4())}, "recursion_limit": 50}
def clear_session():
"""Clear thread for new conversation"""
global config
agent_graph.checkpointer.delete_thread(config["configurable"]["thread_id"])
config = create_thread_id()
def chat(message, history):
current_state = agent_graph.get_state(config)
if current_state.next:
agent_graph.update_state(config,{"messages": [HumanMessage(content=message.strip())]})
result = agent_graph.invoke(None, config)
else:
result = agent_graph.invoke({"messages": [HumanMessage(content=message.strip())]}, config)
return result['messages'][-1].content
config = create_thread_id()
with gr.Blocks() as demo:
chatbot = gr.Chatbot()
chatbot.clear(clear_session)
gr.ChatInterface(fn=chat, chatbot=chatbot)
demo.launch(theme=gr.themes.Citrus())
You're done! You now have a fully functional Agentic RAG system with conversation memory, hierarchical indexing, and human-in-the-loop query clarification.
---
User Query → Conversation Summary → Query Rewriting → Query Clarification →
Parallel Agent Reasoning → Aggregation → Final Response
Stage 1 — Conversation Understanding: Maintains a rolling summary and recent conversation history to preserve continuity without indefinitely increasing context size.
Stage 2 — Query Clarification: Resolves references ("How do I update it?" → "How do I update SQL?"), splits multi-part questions into focused sub-queries, detects unclear inputs, and rewrites queries for optimal retrieval. Pauses for human input when clarification is needed.
Stage 3 — Intelligent Retrieval (Multi-Agent Map-Reduce): Spawns parallel agent subgraphs — one per sub-query. Each agent searches child chunks, fetches parent chunks for context, self-corrects if results are insufficient, compresses context to avoid redundant fetches, and falls back gracefully if the search budget is exhausted.
Example: "What is JavaScript? What is Python?" → 2 parallel agents execute simultaneously.
Stage 4 — Response Generation: Aggregates all agent responses into a single coherent answer.
---
| Area | Common Problems | Suggested Solutions |
|---|---|---|
| **Model Selection** | - Responses ignore instructions<br>- Tools (retrieval/search) used incorrectly<br>- Poor context understanding<br>- Hallucinations or incomplete aggregation | - Use more capable LLMs<br>- Prefer models 8B+ for better reasoning<br>- Consider cloud-based models if local models are limited |
| **System Prompt Behavior** | - Model answers without retrieving documents<br>- Query rewriting loses context<br>- Aggregation introduces hallucinations | - Make retrieval explicit in system prompts<br>- Keep query rewriting close to user intent |
| **Retrieval Configuration** | - Relevant documents not retrieved<br>- Too much irrelevant information | - Increase retrieved chunks (k) or lower similarity thresholds to improve recall<br>- Reduce k or increase thresholds to improve precision |
| **Chunk Size / Document Splitting** | - Answers lack context or feel fragmented<br>- Retrieval is slow or embedding costs are high | - Increase chunk & parent sizes for more context<br>- Decrease chunk sizes to improve speed and reduce costs |
| **Context Compression** | - Agent loses important details after compression<br>- Compressed summaries are too vague | - Tune the compression system prompt<br>- Increase BASE_TOKEN_THRESHOLD to delay compression<br>- Increase TOKEN_GROWTH_FACTOR |
| **Agent Configuration** | - Agent gives up too early <br>- Agent loops too long | - Increase MAX_TOOL_CALLS / MAX_ITERATIONS for complex queries<br>- Decrease them to speed up simple queries |
| **Temperature & Consistency** | - Responses inconsistent or overly creative<br>- Responses too rigid or repetitive | - Set temperature to 0 for factual, consistent output<br>- Slightly increase temperature for summarization or analysis tasks |
| **Embedding Model Quality** | - Poor semantic search<br>- Weak performance on domain-specific or multilingual docs | - Use higher-quality or domain-specific embeddings<br>- Re-index all documents after changing embeddings |
💡 For additional troubleshooting tips see the README Troubleshooting.
本项目名为 Agentic RAG for Dummies,旨在通过极简的代码实现一个基于 LangGraph 的 Agentic RAG(检索增强生成)系统。不同于仅展示基础概念的传统 RAG 教程,本项目提供了一个可扩展的架构,帮助开发者构建具备模块化设计、对话记忆能力以及 Human-in-the-loop(人工介入)查询澄清机制的智能 Agent 系统,填补了从理论到工程实践之间的鸿沟。
本项目集成了多项高级 RAG 特性:通过 Hierarchical Indexing(层级索引)实现精准的小分块搜索与大 Parent 块上下文检索;内置 Conversation Memory(对话记忆)以维持自然对话的上下文连贯性;具备 Query Clarification(查询澄清)功能,能够自动重写模糊查询或暂停并询问用户以获取细节;最后通过 LangGraph 进行 Agent Orchestration(智能体编排),协调整个检索与推理流程。
首先,您需要从 https://ollama.com 下载并安装 Ollama。随后,通过命令行执行 `ollama pull qwen3:4b-instruct-2507-q4_K_M` 来获取本地模型。在 Python 环境中,请使用 `pip install -r requirements.txt` 安装必要的依赖库。需要注意的是,为了确保可靠的 Tool Calling(工具调用)和指令遵循能力,建议优先使用参数量在 7B 以上的模型,较小的模型可能会出现忽略检索指令或产生幻觉的情况。
本项目提供了灵活的使用方式。推荐使用 Google Colab 进行快速测试,只需点击 README 顶部的 Open in Colab 徽章,上传 PDF 文档至 `docs/` 目录并运行所有单元格即可。如果您希望在本地环境(如 Jupyter 或 VSCode)运行,请先创建并激活虚拟环境,安装依赖后将您的 PDF 文件放入 `docs/` 文件夹。系统支持带有记忆功能的对话,能够理解如“如何更新它?”这类依赖上下文的指代性问题。
系统采用了 Provider-agnostic(供应商无关)的设计理念,支持任何在 LangChain 中集成的 LLM 提供商。您只需通过修改一行代码即可轻松切换不同的模型后端。在配置 Vector Database(向量数据库)时,系统使用 Qdrant 来存储子分块,并支持 Hybrid Search(混合搜索)能力。请务必根据所选模型的实际名称进行配置,以确保流程的稳定性。
本项目通过 Gradio 构建了完整的 Chat Interface(聊天界面),支持对话持久化以及 Human-in-the-loop(人工介入)机制。该接口不仅能处理用户提问,还支持完整的 Streaming(流式输出),包括推理步骤(Reasoning steps)和工具调用(Tool calls)的实时展示。对于需要端到端文档摄取与处理的完整流水线,开发者可以参考项目中的详细文档。
系统采用四阶段智能工作流进行查询处理:首先进入 Conversation Understanding(对话理解)阶段,分析历史记录以提取上下文;随后进入 Query Rewriting(查询重写)与 Query Clarification(查询澄清)阶段,确保意图准确;接着通过 Parallel Agent Reasoning(并行智能体推理)进行处理;最后经过 Aggregation(结果聚合)生成最终的响应,确保整个过程既智能又受控。
在常见问题排查中,开发者应重点关注 Model Selection(模型选择)。如果遇到响应忽略指令、工具调用(如检索/搜索)不当、上下文理解能力差或出现幻觉(Hallucinations)等问题,通常是因为使用的 LLM 能力不足。建议升级到更强大的模型以提升系统的逻辑推理与指令遵循能力。
优质开源学习资源,模块化设计清晰,LangGraph官方认可度高。3k+星标印证其实用价值,Notebook形式降低入门门槛,持续维护品质稳定。
AI Skill Hub 为第三方内容聚合平台,本页面信息基于公开数据整理,不对工具功能和质量作任何法律背书。
建议在沙箱或测试环境中充分验证后,再部署至生产环境,并做好必要的安全评估。
✅ MIT 协议 — 最宽松的开源协议之一,可自由商用、修改、分发,仅需保留版权声明。
总体来看,agentic-rag-for-dummies Agent工作流 是一款质量优秀的Agent工作流,在同类工具中具备一定竞争力。AI Skill Hub 将持续追踪其更新动态,建议收藏备用,结合自身场景选择合适时机引入使用。
| 原始名称 | agentic-rag-for-dummies |
| 原始描述 | 开源AI工作流:A modular Agentic RAG built with LangGraph — learn Retrieval-Augmented Generatio。⭐3.3k · Jupyter Notebook |
| Topics | RAGAgentLangGraph工作流学习教程 |
| GitHub | https://github.com/GiovanniPasq/agentic-rag-for-dummies |
| License | MIT |
| 语言 | Jupyter Notebook |
收录时间:2026-05-18 · 更新时间:2026-05-19 · License:MIT · AI Skill Hub 不对第三方内容的准确性作法律背书。
选择 Agent 类型,复制安装指令后粘贴到对应客户端