基于any4any智能体的NL2SQL智能问数

本教程详细介绍如何基于any4any企业级多模态AI系统构建完整的NL2SQL智能问数系统。通过智能体架构设计,实现了8步工作流程的自然语言转SQL功能,包括用户上下文获取、历史对话检索、智能表选择、表结构分析、SQL生成、安全执行和结果格式化。教程涵盖ToolRegistry工具注册器、NL2SQL智能体、SQL执行器、上下文管理器和表管理器等核心组件的实现细节,并提供完整的项目部署、环境配置和数据库安全设置指南。通过本教程,开发者可以掌握如何构建安全、高效、智能的数据库查询系统,实现自然语言与数据库之间的无缝交互。
源码地址
前置教程
如想顺利完成本智能体NL2SQL项目,你可能需要先完成以下前置教程:
1. any4any NL2SQL智能体架构概述
1.1 智能体架构设计
any4any采用基于智能体的NL2SQL实现,核心架构包含以下组件:

核心智能体组件:
ToolRegistry(工具注册器):智能工具选择和管理
NL2SQLTool(NL2SQL智能体):8步智能SQL处理工作流
SQLContextManager(上下文管理器):智能历史对话检索
SQLExecutor(SQL执行器):安全SQL验证和执行
TableManager(表管理器):异步表结构获取
1.2 智能体工作流程
any4any的NL2SQL智能体实现了8步完整处理流程:
sequenceDiagram
participant User as 用户
participant TR as ToolRegistry
participant NT as NL2SQL智能体
participant CM as 上下文管理器
participant TM as 表管理器
participant SE as SQL执行器
participant LLM as LLM服务
User->>TR: 自然语言查询
TR->>TR: LLM智能工具选择
TR->>NT: 选择NL2SQL智能体
Note over NT: 步骤0: 获取用户上下文
NT->>LLM: 生成用户上下文
Note over NT: 步骤0.5: 获取增强上下文
NT->>CM: 检索历史对话
CM->>CM: 计算相关性得分
CM-->>NT: 返回增强上下文
Note over NT: 步骤1: 获取表信息
NT->>TM: 获取所有表基本信息
TM-->>NT: 返回表列表
Note over NT: 步骤2: 智能表选择
NT->>LLM: 分析需要的表
LLM-->>NT: 返回相关表名
Note over NT: 步骤3: 获取表结构
NT->>TM: 获取详细表结构
TM-->>NT: 返回表结构信息
Note over NT: 步骤4: SQL生成
NT->>LLM: 基于上下文生成SQL
LLM-->>NT: 返回SQL语句
Note over NT: 步骤5: 安全执行
NT->>SE: 验证并执行SQL
SE->>SE: 多层安全验证
SE-->>NT: 返回查询结果
Note over NT: 步骤6: 生成回答
NT->>LLM: 基于结果生成回答
LLM-->>NT: 返回最终回答
NT-->>User: 返回智能查询结果LLM-->>NT: 返回相关表名 Note over NT: 步骤3: 获取表结构 NT->>TM: 获取详细表结构 TM-->>NT: 返回表结构信息 Note over NT: 步骤4: SQL生成 NT->>LLM: 基于上下文生成SQL LLM-->>NT: 返回SQL语句 Note over NT: 步骤5: 安全执行 NT->>SE: 验证并执行SQL SE->>SE: 多层安全验证 SE-->>NT: 返回查询结果 Note over NT: 步骤6: 生成回答 NT->>LLM: 基于结果生成回答 LLM-->>NT: 返回最终回答 NT-->>User: 返回智能查询结果
2. 项目安装与配置
2.1 环境准备
2.1.1 系统要求
# 推荐系统配置- Python 3.10+- MySQL 8.0+- 内存: 8GB+- 硬盘: 10GB+- GPU: 可选,用于本地LLM加速2.1.2 数据库配置
WSL2下安装MySQL服务及基本使用教程:WSL(Ubuntu)环境下安装MySQL服务及基本使用教程
你可以直接使用any4any项目仓库中的any4any.sql文件来创建数据库。并在orders表中插入示例数据。
2.2 any4any项目部署
2.2.1 克隆和配置项目
# 克隆项目git clone https://github.com/eogee/any4any.git# 你也可以直接从网盘下载项目代码
# 链接:https://pan.quark.cn/s/5a195d6acfc7cd any4any
# 创建Python虚拟环境conda create -n any4any python=3.10conda activate any4any
# 安装依赖pip install -r requirements.txt2.2.2 环境变量配置
创建并配置 .env 文件:
# 数据库配置
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=your_password
MYSQL_DATABASE=any4any
# SQL专用配置
SQL_DB_TYPE=mysql
SQL_DB_HOST=localhost
SQL_DB_PORT=3306
SQL_DB_USERNAME=root
SQL_DB_PASSWORD=your_password
SQL_DB_DATABASE=any4any
# 智能体系统配置
TOOLS_ENABLED=true
# LLM模型配置(选择一种)
# 本地模型配置
LLM_MODEL_DIR=/path/to/your/llm/model
LLM_MODEL_ENABLED=true
# 外部API配置(可选)
OPENAI_API_KEY=your_openai_api_key
OPENAI_BASE_URL=https://api.openai.com/v1
# 应用配置
HOST=0.0.0.0
PORT=8888
API_KEY=your_api_key
# 数据库连接池配置
DB_POOL_ENABLED=true
DB_POOL_SIZE=10
DB_POOL_PRE_PING=true
DB_POOL_RECYCLE=3600
DB_RETRY_ENABLED=true
DB_CIRCUIT_BREAKER_ENABLED=true配置注意事项:
数据库配置:确保MySQL服务正在运行,用户有足够的权限
模型路径:如果使用本地模型,确保路径正确且模型文件完整
API密钥:如果使用外部API,确保密钥有效且有足够配额
端口配置:确保配置的端口未被占用
权限配置:建议为NL2SQL创建只读数据库用户
2.2.3 数据库用户权限配置
为了安全起见,创建专用的只读用户:
-- 创建NL2SQL专用用户
CREATE USER 'any4any_nl2sql'@'localhost' IDENTIFIED BY 'strong_password';
-- 授予只读权限
GRANT SELECT ON any4any.* TO 'any4any_nl2sql'@'localhost';
-- 禁止修改操作
REVOKE INSERT, UPDATE, DELETE, CREATE, DROP, ALTER ON any4any.* FROM 'any4any_nl2sql'@'localhost';
-- 刷新权限
FLUSH PRIVILEGES;更新 .env 文件使用专用用户:
SQL_DB_USERNAME=any4any_nl2sql
SQL_DB_PASSWORD=strong_password
MYSQL_USER=any4any_nl2sql
MYSQL_PASSWORD=strong_password3. 核心智能体组件实现
3.1 NL2SQL智能体核心代码
3.1.1 工具注册器(ToolRegistry)
# core/tools/tool_registry.py
class ToolRegistry:
"""工具注册器 - 智能工具选择和管理"""
def __init__(self):
self._tools: List[BaseTool] = []
self._load_tools()
def _load_tools(self):
"""加载所有智能体工具"""
# 加载NL2SQL智能体
from .nl2sql.workflow import NL2SQLTool
self.register_tool(NL2SQLTool())
# 按优先级排序
self._tools.sort(key=lambda tool: tool.priority)
async def process_with_tools(self, user_message: str, generate_response_func: Callable,
conversation_manager=None, user_id: str = None,
platform: str = None) -> Optional[str]:
"""使用智能体处理用户消息"""
# 使用LLM智能选择工具
selected_tool = await self._select_tool_with_llm(
user_message, generate_response_func,
conversation_manager, user_id, platform
)
if selected_tool and selected_tool.name == "nl2sql":
# 使用NL2SQL智能体处理
result = await selected_tool.process(
user_message, generate_response_func,
conversation_manager, user_id, platform
)
return result
return None
async def _select_tool_with_llm(self, user_message: str, generate_response_func: Callable,
conversation_manager=None, user_id: str = None,
platform: str = None) -> Optional[BaseTool]:
"""使用LLM智能选择最合适的工具"""
prompt = f"""你是一个智能工具选择器。请根据用户的需求,选择最合适的工具。
用户消息: {user_message}
可用工具:
1. **nl2sql** (优先级: 1)
描述: 自然语言转SQL查询工具,支持智能表选择和SQL生成
适用场景: 用户需要查询、统计、分析数据库中的数据
关键词: 查询、统计、多少、几个、总数、平均、最高、最低、列表等
2. **web_search** (优先级: 2)
描述: 网络搜索工具
适用场景: 用户需要搜索网络信息、最新资讯
关键词: 搜索、查找、搜、查一下、找一找、检索等
选择标准:
- 如果用户需要查询数据库信息,选择nl2sql
- 如果用户需要搜索网络信息,选择web_search
- 如果都不匹配,回答"无工具"
请直接回答工具名称(nl2sql、web_search)或"无工具":"""
response = await generate_response_func(prompt)
# 解析LLM选择结果
if "nl2sql" in response.lower():
return self.get_tool_by_name("nl2sql")
return None3.1.2 NL2SQL智能体(NL2SQLTool)
# core/tools/nl2sql/workflow.py
class NL2SQLTool(BaseTool):
"""NL2SQL智能体 - 完整的8步SQL处理工作流"""
def __init__(self, enabled: bool = True):
super().__init__(enabled)
self.table_manager = get_table_manager()
self.sql_executor = get_sql_executor()
self.context_manager = SQLContextManager()
self.user_context_enhancer = get_user_context_enhancer()
@property
def priority(self) -> int:
return 1 # 高优先级
@property
def name(self) -> str:
return "nl2sql"
@property
def description(self) -> str:
return "自然语言转SQL查询工具,支持智能表选择和SQL生成"
async def can_handle(self, user_message: str) -> bool:
"""检测是否为SQL相关问题"""
if not user_message or not user_message.strip():
return False
# 检查配置
try:
from config import Config
if not getattr(Config, 'NL2SQL_ENABLED', True):
return False
except Exception:
return False
# 现在由LLM进行功能识别,工具本身不做关键词检测
return True
async def process(self, user_message: str, generate_response_func: Callable,
conversation_manager=None, user_id: str = None,
platform: str = None) -> Optional[str]:
"""处理SQL问题 - 智能体主入口"""
try:
workflow_result = await self.process_sql_question(
question=user_message,
context="",
conversation_manager=conversation_manager,
user_id=user_id,
platform=platform
)
if workflow_result['success']:
return workflow_result['final_answer']
else:
self.logger.warning(f"NL2SQL workflow failed: {workflow_result.get('error', '')}")
return None
except Exception as e:
self.logger.error(f"NL2SQL processing failed: {e}")
return None
async def process_sql_question(self, question: str, context: str = "",
conversation_manager=None, user_id: str = None,
platform: str = None) -> Dict[str, Any]:
"""完整的8步NL2SQL智能工作流程"""
try:
# 步骤0: 获取用户上下文
user_context = ""
if user_id:
try:
user_context = await self.user_context_enhancer.get_user_context(user_id, question)
except Exception as e:
logger.error(f"Failed to generate user context: {e}")
user_context = ""
# 步骤0.5: 自动获取增强上下文(对话历史)
history_context = ""
if conversation_manager and user_id:
try:
history_context = await self.context_manager.get_enhanced_context(
current_question=question,
conversation_manager=conversation_manager,
user_id=user_id,
platform=platform,
manual_context=context,
max_history=self.context_manager.max_history_items
)
except Exception as e:
logger.error(f"Failed to generate history context: {e}")
history_context = context or ""
else:
history_context = context or ""
# 合并所有上下文:用户上下文 + 历史上下文 + 手动上下文
enhanced_context = "\n\n".join(filter(None, [
user_context, # 用户元数据上下文
history_context, # 对话历史上下文
context # 手动提供的上下文
]))
# 步骤1: 获取所有表的基本信息
all_tables_result = await self._get_all_tables_async()
if not all_tables_result['success']:
return {
'success': False,
'error': f"获取表信息失败: {all_tables_result['error']}",
'step': 'get_tables'
}
all_tables = all_tables_result['data']['tables']
# 步骤2: LLM分析问题并确定需要的表(基于增强上下文)
required_tables = await self._analyze_tables_needed(question, all_tables, enhanced_context)
if not required_tables:
return {
'success': False,
'error': "LLM无法确定需要查询哪些表",
'step': 'analyze_tables'
}
# 步骤3: 获取详细表结构
table_schemas_result = await self._get_table_schemas_async(required_tables)
if not table_schemas_result['success']:
return {
'success': False,
'error': f"获取表结构失败: {table_schemas_result['error']}",
'step': 'get_schemas'
}
# 步骤4: LLM生成SQL语句(基于增强上下文)
sql_result = await self._generate_sql(question, table_schemas_result['formatted_output'], enhanced_context)
if not sql_result['success']:
return {
'success': False,
'error': f"SQL生成失败: {sql_result['error']}",
'step': 'generate_sql'
}
# 记录生成的SQL语句
logger.info(f"NL2SQL Question: {question} | Generated SQL: {sql_result['generated_sql']}")
# 步骤5: 执行SQL查询
execution_result = await self._execute_sql_async(sql_result['generated_sql'])
if not execution_result['success']:
return {
'success': False,
'error': f"SQL执行失败: {execution_result['error']}",
'step': 'execute_sql',
'generated_sql': sql_result['generated_sql']
}
# 步骤6: LLM生成最终回答(基于完整上下文)
final_answer = await self._generate_final_answer(
question,
sql_result['generated_sql'],
execution_result['formatted_table'],
enhanced_context
)
# 分析上下文来源
context_sources = []
if user_context:
context_sources.append('user_metadata')
if history_context:
context_sources.append('conversation_history')
if context:
context_sources.append('manual')
return {
'success': True,
'question': question,
'generated_sql': sql_result['generated_sql'],
'query_result': execution_result['formatted_table'],
'final_answer': final_answer,
'row_count': execution_result['row_count'],
'step': 'completed',
'context_used': bool(enhanced_context),
'context_sources': context_sources,
'user_context_used': bool(user_context),
'history_context_used': bool(history_context),
'manual_context_used': bool(context),
'enhanced_context_length': len(enhanced_context) if enhanced_context else 0
}
except Exception as e:
logger.error(f"NL2SQL workflow processing failed: {e}")
return {
'success': False,
'error': f"工作流程执行失败: {str(e)}",
'step': 'workflow_error'
}3.1.3 SQL执行器(SQLExecutor)
# core/tools/nl2sql/sql_executor.py
class SQLExecutor:
"""SQL执行器 - 负责SQL安全验证和执行"""
def __init__(self):
self.engine = None
self.connection_pool = None
self.db_type = getattr(Config, 'SQL_DB_TYPE', 'mysql').lower()
self._initialize_connection()
def _initialize_connection(self):
"""初始化数据库连接池"""
try:
# 优先使用统一连接池
if Config.DB_POOL_ENABLED:
try:
from core.database.connection_pool import get_connection_pool
self.connection_pool = get_connection_pool()
return
except ImportError:
logger.warning("Unified connection pool not available, falling back to SQLAlchemy pool")
# 回退到SQLAlchemy连接池
if self.db_type == 'mysql':
connection_string = (
f"mysql+mysqlconnector://"
f"{getattr(Config, 'SQL_DB_USERNAME', 'root')}:"
f"{getattr(Config, 'SQL_DB_PASSWORD', 'root')}@"
f"{getattr(Config, 'SQL_DB_HOST', 'localhost')}:"
f"{getattr(Config, 'SQL_DB_PORT', '3306')}/"
f"{getattr(Config, 'SQL_DB_DATABASE', 'any4any')}"
)
else:
raise ValueError(f"不支持的数据库类型: {self.db_type}")
# 使用配置的连接池参数
self.engine = create_engine(
connection_string,
poolclass=QueuePool,
pool_size=Config.DB_POOL_SIZE,
pool_pre_ping=Config.DB_POOL_PRE_PING,
pool_recycle=Config.DB_POOL_RECYCLE,
echo=False
)
logger.info("SQL Executor initialized successfully with SQLAlchemy connection pool.")
except Exception as e:
logger.error(f"SQL Executor database connection initialization failed: {e}")
self.engine = None
def _validate_sql_safety(self, sql_query: str) -> Tuple[bool, str]:
"""多层SQL安全验证"""
try:
# 转换为大写进行检查
sql_upper = sql_query.strip().upper()
# 1. 只允许SELECT语句
if not sql_upper.startswith('SELECT'):
return False, "出于安全考虑,仅允许SELECT查询"
# 2. 检查危险关键词
dangerous_keywords = [
'DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', 'CREATE',
'TRUNCATE', 'EXEC', 'EXECUTE', 'UNION', 'MERGE', 'GRANT',
'REVOKE', 'COMMIT', 'ROLLBACK', 'SAVEPOINT'
]
for keyword in dangerous_keywords:
if re.search(r'\b' + keyword + r'\b', sql_upper):
return False, f"发现危险关键词 '{keyword}'"
# 3. 检查注释技巧
if '--' in sql_query or '/*' in sql_query or '*/' in sql_query:
return False, "不允许在SQL查询中使用注释"
# 4. 检查多语句
if ';' in sql_query.rstrip():
return False, "不允许执行多个SQL语句"
return True, ""
except Exception as e:
return False, f"SQL验证错误: {str(e)}"
def execute_sql_query(self, sql_query: str) -> Dict[str, Any]:
"""执行已验证的SQL查询"""
try:
# 验证SQL安全性
is_safe, error_msg = self._validate_sql_safety(sql_query)
if not is_safe:
return {
'success': False,
'error': error_msg,
'formatted_output': f'SQL安全验证失败: {error_msg}'
}
# 优先使用统一连接池
if self.connection_pool:
return self._execute_with_unified_pool(sql_query)
# 回退到SQLAlchemy连接池
if not self.engine:
return {
'success': False,
'error': '数据库连接不可用',
'formatted_output': '数据库连接不可用'
}
# 执行查询
with self.engine.connect() as conn:
result = conn.execute(text(sql_query))
if result.returns_rows:
rows = result.fetchall()
columns = list(result.keys())
formatted_results = self._format_query_results(rows, columns)
return {
'success': True,
'sql_query': sql_query,
'execution_time': '暂无',
**formatted_results
}
else:
return {
'success': True,
'sql_query': sql_query,
'execution_time': '暂无',
'data': [],
'columns': [],
'row_count': result.rowcount if hasattr(result, 'rowcount') else 0,
'formatted_table': f'查询执行成功。{result.rowcount} 行受影响。' if hasattr(result, 'rowcount') else '查询执行成功。'
}
except Exception as e:
logger.error(f"SQL执行失败: {e}")
return {
'success': False,
'error': str(e),
'sql_query': sql_query,
'formatted_output': f'SQL执行失败: {str(e)}'
}# core/tools/nl2sql/sql_executor.py
class SQLExecutor:
"""SQL执行器 - 负责SQL安全验证和执行"""
def __init__(self):
self.engine = None
self.connection_pool = None
self.db_type = getattr(Config, 'SQL_DB_TYPE', 'mysql').lower()
self._initialize_connection()
def _initialize_connection(self):
"""初始化数据库连接池"""
try:
# 优先使用统一连接池
if Config.DB_POOL_ENABLED:
try:
from core.database.connection_pool import get_connection_pool
self.connection_pool = get_connection_pool()
return
except ImportError:
logger.warning("Unified connection pool not available, falling back to SQLAlchemy pool")
# 回退到SQLAlchemy连接池
if self.db_type == 'mysql':
connection_string = (
f"mysql+mysqlconnector://"
f"{getattr(Config, 'SQL_DB_USERNAME', 'root')}:"
f"{getattr(Config, 'SQL_DB_PASSWORD', 'root')}@"
f"{getattr(Config, 'SQL_DB_HOST', 'localhost')}:"
f"{getattr(Config, 'SQL_DB_PORT', '3306')}/"
f"{getattr(Config, 'SQL_DB_DATABASE', 'any4any')}"
)
else:
raise ValueError(f"不支持的数据库类型: {self.db_type}")
# 使用配置的连接池参数
self.engine = create_engine(
connection_string,
poolclass=QueuePool,
pool_size=Config.DB_POOL_SIZE,
pool_pre_ping=Config.DB_POOL_PRE_PING,
pool_recycle=Config.DB_POOL_RECYCLE,
echo=False
)
logger.info("SQL Executor initialized successfully with SQLAlchemy connection pool.")
except Exception as e:
logger.error(f"SQL Executor database connection initialization failed: {e}")
self.engine = None
def _validate_sql_safety(self, sql_query: str) -> Tuple[bool, str]:
"""多层SQL安全验证"""
try:
# 转换为大写进行检查
sql_upper = sql_query.strip().upper()
# 1. 只允许SELECT语句
if not sql_upper.startswith('SELECT'):
return False, "出于安全考虑,仅允许SELECT查询"
# 2. 检查危险关键词
dangerous_keywords = [
'DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', 'CREATE',
'TRUNCATE', 'EXEC', 'EXECUTE', 'UNION', 'MERGE', 'GRANT',
'REVOKE', 'COMMIT', 'ROLLBACK', 'SAVEPOINT'
]
for keyword in dangerous_keywords:
if re.search(r'\b' + keyword + r'\b', sql_upper):
return False, f"发现危险关键词 '{keyword}'"
# 3. 检查注释技巧
if '--' in sql_query or '/*' in sql_query or '*/' in sql_query:
return False, "不允许在SQL查询中使用注释"
# 4. 检查多语句
if ';' in sql_query.rstrip():
return False, "不允许执行多个SQL语句"
return True, ""
except Exception as e:
return False, f"SQL验证错误: {str(e)}"
def execute_sql_query(self, sql_query: str) -> Dict[str, Any]:
"""执行已验证的SQL查询"""
try:
# 验证SQL安全性
is_safe, error_msg = self._validate_sql_safety(sql_query)
if not is_safe:
return {
'success': False,
'error': error_msg,
'formatted_output': f'SQL安全验证失败: {error_msg}'
}
# 优先使用统一连接池
if self.connection_pool:
return self._execute_with_unified_pool(sql_query)
# 回退到SQLAlchemy连接池
if not self.engine:
return {
'success': False,
'error': '数据库连接不可用',
'formatted_output': '数据库连接不可用'
}
# 执行查询
with self.engine.connect() as conn:
result = conn.execute(text(sql_query))
if result.returns_rows:
rows = result.fetchall()
columns = list(result.keys())
formatted_results = self._format_query_results(rows, columns)
return {
'success': True,
'sql_query': sql_query,
'execution_time': '暂无',
**formatted_results
}
else:
return {
'success': True,
'sql_query': sql_query,
'execution_time': '暂无',
'data': [],
'columns': [],
'row_count': result.rowcount if hasattr(result, 'rowcount') else 0,
'formatted_table': f'查询执行成功。{result.rowcount} 行受影响。' if hasattr(result, 'rowcount') else '查询执行成功。'
}
except Exception as e:
logger.error(f"SQL执行失败: {e}")
return {
'success': False,
'error': str(e),
'sql_query': sql_query,
'formatted_output': f'SQL执行失败: {str(e)}'
}3.1.4 上下文管理器(SQLContextManager)
# core/tools/nl2sql/context_manager.py
class SQLContextManager:
"""上下文管理器 - 支持智能历史检索和上下文感知"""
def __init__(self):
self.max_history_items = 5
self.max_context_length = 2000
self.sql_keywords = [
'查询', '统计', '多少', '几个', '总数', '平均', '最高', '最低', '列表',
'显示', '找出', '计算', '汇总', '排序', '分组', '分别', '都', '谁', '什么'
]
async def get_enhanced_context(
self,
current_question: str,
conversation_manager,
user_id: str,
platform: str,
manual_context: str = "",
max_history: int = 5
) -> str:
"""获取增强的上下文信息"""
try:
# 获取用户的历史对话
if not conversation_manager or not user_id:
return manual_context
# 获取最近的对话记录
recent_messages = conversation_manager.get_recent_messages(
user_id=user_id,
platform=platform,
limit=max_history * 2 # 获取更多消息以便筛选
)
if not recent_messages:
return manual_context
# 计算每条消息与当前问题的相关性得分
scored_messages = []
for message in recent_messages:
content = message.get('content', '')
if not content:
continue
# 计算相关性得分
relevance_score = self._calculate_relevance_score(content, current_question)
if relevance_score > 0.1: # 只保留有一定相关性的消息
scored_messages.append({
'content': content,
'role': message.get('role', 'user'),
'timestamp': message.get('timestamp', 0),
'relevance_score': relevance_score
})
# 按相关性得分和时间排序
scored_messages.sort(key=lambda x: (x['relevance_score'], x['timestamp']), reverse=True)
# 选择最相关的消息
selected_messages = scored_messages[:max_history]
# 构建上下文文本
context_parts = []
for msg in selected_messages:
role_label = "用户" if msg['role'] == 'user' else "助手"
context_parts.append(f"{role_label}: {msg['content']}")
# 添加手动上下文
if manual_context:
context_parts.append(f"手动上下文: {manual_context}")
# 合并并控制长度
enhanced_context = "\n\n".join(context_parts)
# 如果上下文过长,进行截断
if len(enhanced_context) > self.max_context_length:
# 按行截断,保留完整的对话轮次
lines = enhanced_context.split('\n\n')
truncated_context = ""
current_length = 0
for line in lines:
if current_length + len(line) + 2 <= self.max_context_length:
truncated_context += line + "\n\n"
current_length += len(line) + 2
else:
break
enhanced_context = truncated_context.rstrip()
return enhanced_context
except Exception as e:
logger.error(f"Failed to generate enhanced context: {e}")
return manual_context
def _calculate_relevance_score(self, message: str, current_question: str) -> float:
"""计算消息与当前问题的相关性得分"""
try:
score = 0.0
# 1. 时间衰减因子
# 这里简化处理,实际应该根据消息时间戳计算
# score *= time_decay_factor
# 2. 关键词匹配度
message_lower = message.lower()
question_lower = current_question.lower()
# SQL相关关键词匹配
sql_matches = 0
for keyword in self.sql_keywords:
if keyword in message_lower and keyword in question_lower:
sql_matches += 1
if sql_matches > 0:
score += sql_matches * 0.3
# 3. 语义相似度(基于词汇重叠度)
message_words = set(message_lower.split())
question_words = set(question_lower.split())
if message_words and question_words:
overlap = len(message_words & question_words)
similarity = overlap / len(message_words | question_words)
score += similarity * 0.4
# 4. 特殊模式匹配
# 如果消息包含SQL查询结果,提高相关性
if 'sql:' in message_lower or '查询结果' in message_lower:
score += 0.3
return min(score, 1.0) # 限制最大得分为1.0
except Exception as e:
logger.error(f"Failed to calculate relevance score: {e}")
return 0.03.2 启动和测试智能体
3.2.1 启动any4any服务
# 激活虚拟环境conda activate any4any
# 启动服务python cli.py服务启动后,将看到以下输出:
Starting servers...
Fastapi server started
Mcp server started...3.2.2 测试NL2SQL智能体
方法1:通过API接口测试
# 健康检查
curl http://localhost:8888/health
# 测试NL2SQL智能体
curl -X POST http://localhost:8888/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "default",
"messages": [
{"role": "user", "content": "查询所有用户的数量"}
],
"user": "test_user_001"
}'
# 测试复杂查询
curl -X POST http://localhost:8888/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "default",
"messages": [
{"role": "user", "content": "统计所有订单数量"}
],
"user": "test_user_001"
}'方法2:通过Web界面测试
访问 http://localhost:8888/docs 查看自动生成的API文档,或访问 http://localhost:8888/chat/ 使用Web聊天界面。
4. 配置注意事项和最佳实践
4.1 关键配置项详解
4.1.1 数据库连接池配置
# 连接池配置
DB_POOL_ENABLED=true # 启用连接池
DB_POOL_SIZE=10 # 连接池大小
DB_POOL_PRE_PING=true # 连接预检
DB_POOL_RECYCLE=3600 # 连接回收时间(秒)
DB_RETRY_ENABLED=true # 启用重试机制
DB_CIRCUIT_BREAKER_THRESHOLD=5 # 熔断器失败阈值
DB_CIRCUIT_BREAKER_TIMEOUT=60 # 熔断器恢复时间(秒)配置建议:
生产环境建议
DB_POOL_SIZE设置为 20-50开发环境可以设置为 5-10
DB_POOL_RECYCLE建议设置为 MySQL 的wait_timeout的 80%
4.1.2 安全配置
# API安全配置
API_KEY=your_secure_api_key # API访问密钥
SESSION_MAX_AGE=3600 # 会话有效期(秒)
# 数据库安全配置
SQL_DB_USERNAME=any4any_nl2sql # 只读用户
SQL_DB_PASSWORD=strong_password # 强密码安全建议:
使用强密码作为API密钥
为NL2SQL创建专用的只读数据库用户
定期轮换数据库密码
监控异常查询行为
4.2 性能优化建议
4.2.1 数据库优化
-- 为常用查询字段创建索引
CREATE INDEX idx_users_city ON users(city);
CREATE INDEX idx_users_age ON users(age);
CREATE INDEX idx_orders_user_id ON orders(user_id);
-- 分析表统计信息
ANALYZE TABLE users, orders;4.3 故障排除
4.3.1 常见问题及解决方案
问题1:智能体无法识别SQL查询
症状:用户输入SQL相关问题,但系统没有调用NL2SQL智能体
排查步骤:
检查
TOOLS_ENABLED=true查看日志中的工具选择过程
确认LLM服务正常工作
问题2:SQL生成不准确
症状:生成的SQL语句语法错误或逻辑错误
排查步骤:
检查表结构信息是否正确获取
查看SQL生成的提示词
确认LLM模型能力
检查上下文信息是否干扰
解决方案:
# 在workflow.py中添加调试信息
logger.debug(f"Table schemas: {table_schemas_result['formatted_output']}")
logger.debug(f"Enhanced context: {enhanced_context}")
logger.debug(f"SQL generation prompt: {prompt}")问题3:上下文感知功能不工作
症状:多轮对话中无法理解上下文
排查步骤:
确认用户信息和平台信息传递
检查会话管理器是否正常
查看上下文检索日志
验证历史对话存储
问题4:数据库连接问题
症状:SQL执行失败,提示数据库连接错误
排查步骤:
检查数据库服务状态
验证连接参数配置
测试数据库用户权限
检查防火墙设置
解决方案:
# 测试数据库连接
mysql -h localhost -u any4any_nl2sql -p any4any5. 总结
通过本教程,您已经掌握了基于any4any智能体架构的完整NL2SQL实现方案。相比传统的基于工作流或提示词的方法,智能体架构具有以下核心优势:
5.1 技术创新特点
智能工具选择:通过LLM智能选择合适的工具,而不是硬编码规则
8步完整工作流:从上下文获取到结果生成的完整处理流程
上下文感知能力:支持多轮对话和智能追问处理
多层安全验证:确保SQL查询的安全性
异步处理架构:支持高并发查询处理
5.2 架构优势
模块化设计:每个组件职责明确,易于维护和扩展
智能决策:通过LLM进行智能决策,提高系统灵活性
安全可靠:多层安全机制确保系统稳定运行
性能优化:连接池、缓存等机制保证高性能
5.3 应用价值
降低技术门槛:用户无需学习SQL即可进行数据查询
提高查询效率:智能上下文感知减少重复信息
增强用户体验:自然的对话式查询交互
保障数据安全:完善的权限控制和安全机制
any4any的NL2SQL智能体实现代表了当前智能问数技术的先进水平,为构建企业级智能数据查询系统提供了完整的解决方案。通过本教程的学习,您可以快速部署和使用这套系统,并根据实际需求进行定制化扩展。