基于any4any智能体的NL2SQL智能问数

本教程详细介绍如何基于any4any企业级多模态AI系统构建完整的NL2SQL智能问数系统。通过智能体架构设计,实现了8步工作流程的自然语言转SQL功能,包括用户上下文获取、历史对话检索、智能表选择、表结构分析、SQL生成、安全执行和结果格式化。教程涵盖ToolRegistry工具注册器、NL2SQL智能体、SQL执行器、上下文管理器和表管理器等核心组件的实现细节,并提供完整的项目部署、环境配置和数据库安全设置指南。通过本教程,开发者可以掌握如何构建安全、高效、智能的数据库查询系统,实现自然语言与数据库之间的无缝交互。

源码地址

前置教程

如想顺利完成本智能体NL2SQL项目,你可能需要先完成以下前置教程:

1. any4any NL2SQL智能体架构概述

1.1 智能体架构设计

any4any采用基于智能体的NL2SQL实现,核心架构包含以下组件:

核心智能体组件

  1. ToolRegistry(工具注册器):智能工具选择和管理

  2. NL2SQLTool(NL2SQL智能体):8步智能SQL处理工作流

  3. SQLContextManager(上下文管理器):智能历史对话检索

  4. SQLExecutor(SQL执行器):安全SQL验证和执行

  5. TableManager(表管理器):异步表结构获取

1.2 智能体工作流程

any4any的NL2SQL智能体实现了8步完整处理流程:

sequenceDiagram
    participant User as 用户
    participant TR as ToolRegistry
    participant NT as NL2SQL智能体
    participant CM as 上下文管理器
    participant TM as 表管理器
    participant SE as SQL执行器
    participant LLM as LLM服务
    User->>TR: 自然语言查询
    TR->>TR: LLM智能工具选择
    TR->>NT: 选择NL2SQL智能体
    Note over NT: 步骤0: 获取用户上下文
    NT->>LLM: 生成用户上下文
    Note over NT: 步骤0.5: 获取增强上下文
    NT->>CM: 检索历史对话
    CM->>CM: 计算相关性得分
    CM-->>NT: 返回增强上下文
    Note over NT: 步骤1: 获取表信息
    NT->>TM: 获取所有表基本信息
    TM-->>NT: 返回表列表
    Note over NT: 步骤2: 智能表选择
    NT->>LLM: 分析需要的表
    LLM-->>NT: 返回相关表名
    Note over NT: 步骤3: 获取表结构
    NT->>TM: 获取详细表结构
    TM-->>NT: 返回表结构信息
    Note over NT: 步骤4: SQL生成
    NT->>LLM: 基于上下文生成SQL
    LLM-->>NT: 返回SQL语句
    Note over NT: 步骤5: 安全执行
    NT->>SE: 验证并执行SQL
    SE->>SE: 多层安全验证
    SE-->>NT: 返回查询结果
    Note over NT: 步骤6: 生成回答
    NT->>LLM: 基于结果生成回答
    LLM-->>NT: 返回最终回答
    NT-->>User: 返回智能查询结果

LLM-->>NT: 返回相关表名 Note over NT: 步骤3: 获取表结构 NT->>TM: 获取详细表结构 TM-->>NT: 返回表结构信息 Note over NT: 步骤4: SQL生成 NT->>LLM: 基于上下文生成SQL LLM-->>NT: 返回SQL语句 Note over NT: 步骤5: 安全执行 NT->>SE: 验证并执行SQL SE->>SE: 多层安全验证 SE-->>NT: 返回查询结果 Note over NT: 步骤6: 生成回答 NT->>LLM: 基于结果生成回答 LLM-->>NT: 返回最终回答 NT-->>User: 返回智能查询结果

2. 项目安装与配置

2.1 环境准备

2.1.1 系统要求

# 推荐系统配置- Python 3.10+- MySQL 8.0+- 内存: 8GB+- 硬盘: 10GB+- GPU: 可选,用于本地LLM加速

2.1.2 数据库配置

WSL2下安装MySQL服务及基本使用教程:WSL(Ubuntu)环境下安装MySQL服务及基本使用教程

你可以直接使用any4any项目仓库中的any4any.sql文件来创建数据库。并在orders表中插入示例数据。

2.2 any4any项目部署

2.2.1 克隆和配置项目

# 克隆项目git clone https://github.com/eogee/any4any.git# 你也可以直接从网盘下载项目代码
# 链接:https://pan.quark.cn/s/5a195d6acfc7cd any4any
# 创建Python虚拟环境conda create -n any4any python=3.10conda activate any4any
# 安装依赖pip install -r requirements.txt

2.2.2 环境变量配置

创建并配置 .env 文件:

# 数据库配置
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=your_password
MYSQL_DATABASE=any4any
# SQL专用配置
SQL_DB_TYPE=mysql
SQL_DB_HOST=localhost
SQL_DB_PORT=3306
SQL_DB_USERNAME=root
SQL_DB_PASSWORD=your_password
SQL_DB_DATABASE=any4any
# 智能体系统配置
TOOLS_ENABLED=true
# LLM模型配置(选择一种)
# 本地模型配置
LLM_MODEL_DIR=/path/to/your/llm/model
LLM_MODEL_ENABLED=true
# 外部API配置(可选)
OPENAI_API_KEY=your_openai_api_key
OPENAI_BASE_URL=https://api.openai.com/v1
# 应用配置
HOST=0.0.0.0
PORT=8888
API_KEY=your_api_key
# 数据库连接池配置
DB_POOL_ENABLED=true
DB_POOL_SIZE=10
DB_POOL_PRE_PING=true
DB_POOL_RECYCLE=3600
DB_RETRY_ENABLED=true
DB_CIRCUIT_BREAKER_ENABLED=true

配置注意事项

  1. 数据库配置:确保MySQL服务正在运行,用户有足够的权限

  2. 模型路径:如果使用本地模型,确保路径正确且模型文件完整

  3. API密钥:如果使用外部API,确保密钥有效且有足够配额

  4. 端口配置:确保配置的端口未被占用

  5. 权限配置:建议为NL2SQL创建只读数据库用户

2.2.3 数据库用户权限配置

为了安全起见,创建专用的只读用户:

-- 创建NL2SQL专用用户
CREATE USER 'any4any_nl2sql'@'localhost' IDENTIFIED BY 'strong_password';
-- 授予只读权限
GRANT SELECT ON any4any.* TO 'any4any_nl2sql'@'localhost';
-- 禁止修改操作
REVOKE INSERT, UPDATE, DELETE, CREATE, DROP, ALTER ON any4any.* FROM 'any4any_nl2sql'@'localhost';
-- 刷新权限
FLUSH PRIVILEGES;

更新 .env 文件使用专用用户:

SQL_DB_USERNAME=any4any_nl2sql
SQL_DB_PASSWORD=strong_password
MYSQL_USER=any4any_nl2sql
MYSQL_PASSWORD=strong_password

3. 核心智能体组件实现

3.1 NL2SQL智能体核心代码

3.1.1 工具注册器(ToolRegistry)

# core/tools/tool_registry.py
class ToolRegistry:
    """工具注册器 - 智能工具选择和管理"""
    def __init__(self):
        self._tools: List[BaseTool] = []
        self._load_tools()
    def _load_tools(self):
        """加载所有智能体工具"""
        # 加载NL2SQL智能体
        from .nl2sql.workflow import NL2SQLTool
        self.register_tool(NL2SQLTool())
        # 按优先级排序
        self._tools.sort(key=lambda tool: tool.priority)
    async def process_with_tools(self, user_message: str, generate_response_func: Callable,
                               conversation_manager=None, user_id: str = None,
                               platform: str = None) -> Optional[str]:
        """使用智能体处理用户消息"""
        # 使用LLM智能选择工具
        selected_tool = await self._select_tool_with_llm(
            user_message, generate_response_func,
            conversation_manager, user_id, platform
        )
        if selected_tool and selected_tool.name == "nl2sql":
            # 使用NL2SQL智能体处理
            result = await selected_tool.process(
                user_message, generate_response_func,
                conversation_manager, user_id, platform
            )
            return result
        return None
    async def _select_tool_with_llm(self, user_message: str, generate_response_func: Callable,
                                  conversation_manager=None, user_id: str = None,
                                  platform: str = None) -> Optional[BaseTool]:
        """使用LLM智能选择最合适的工具"""
        prompt = f"""你是一个智能工具选择器。请根据用户的需求,选择最合适的工具。
用户消息: {user_message}
可用工具:
1. **nl2sql** (优先级: 1)
   描述: 自然语言转SQL查询工具,支持智能表选择和SQL生成
   适用场景: 用户需要查询、统计、分析数据库中的数据
   关键词: 查询、统计、多少、几个、总数、平均、最高、最低、列表等
2. **web_search** (优先级: 2)
   描述: 网络搜索工具
   适用场景: 用户需要搜索网络信息、最新资讯
   关键词: 搜索、查找、搜、查一下、找一找、检索等
选择标准:
- 如果用户需要查询数据库信息,选择nl2sql
- 如果用户需要搜索网络信息,选择web_search
- 如果都不匹配,回答"无工具"
请直接回答工具名称(nl2sql、web_search)或"无工具":"""
        response = await generate_response_func(prompt)
        # 解析LLM选择结果
        if "nl2sql" in response.lower():
            return self.get_tool_by_name("nl2sql")
        return None

3.1.2 NL2SQL智能体(NL2SQLTool)

# core/tools/nl2sql/workflow.py
class NL2SQLTool(BaseTool):
    """NL2SQL智能体 - 完整的8步SQL处理工作流"""
    def __init__(self, enabled: bool = True):
        super().__init__(enabled)
        self.table_manager = get_table_manager()
        self.sql_executor = get_sql_executor()
        self.context_manager = SQLContextManager()
        self.user_context_enhancer = get_user_context_enhancer()
    @property
    def priority(self) -> int:
        return 1  # 高优先级
    @property
    def name(self) -> str:
        return "nl2sql"
    @property
    def description(self) -> str:
        return "自然语言转SQL查询工具,支持智能表选择和SQL生成"
    async def can_handle(self, user_message: str) -> bool:
        """检测是否为SQL相关问题"""
        if not user_message or not user_message.strip():
            return False
        # 检查配置
        try:
            from config import Config
            if not getattr(Config, 'NL2SQL_ENABLED', True):
                return False
        except Exception:
            return False
        # 现在由LLM进行功能识别,工具本身不做关键词检测
        return True
    async def process(self, user_message: str, generate_response_func: Callable,
                     conversation_manager=None, user_id: str = None,
                     platform: str = None) -> Optional[str]:
        """处理SQL问题 - 智能体主入口"""
        try:
            workflow_result = await self.process_sql_question(
                question=user_message,
                context="",
                conversation_manager=conversation_manager,
                user_id=user_id,
                platform=platform
            )
            if workflow_result['success']:
                return workflow_result['final_answer']
            else:
                self.logger.warning(f"NL2SQL workflow failed: {workflow_result.get('error', '')}")
                return None
        except Exception as e:
            self.logger.error(f"NL2SQL processing failed: {e}")
            return None
    async def process_sql_question(self, question: str, context: str = "",
                                 conversation_manager=None, user_id: str = None,
                                 platform: str = None) -> Dict[str, Any]:
        """完整的8步NL2SQL智能工作流程"""
        try:
            # 步骤0: 获取用户上下文
            user_context = ""
            if user_id:
                try:
                    user_context = await self.user_context_enhancer.get_user_context(user_id, question)
                except Exception as e:
                    logger.error(f"Failed to generate user context: {e}")
                    user_context = ""
            # 步骤0.5: 自动获取增强上下文(对话历史)
            history_context = ""
            if conversation_manager and user_id:
                try:
                    history_context = await self.context_manager.get_enhanced_context(
                        current_question=question,
                        conversation_manager=conversation_manager,
                        user_id=user_id,
                        platform=platform,
                        manual_context=context,
                        max_history=self.context_manager.max_history_items
                    )
                except Exception as e:
                    logger.error(f"Failed to generate history context: {e}")
                    history_context = context or ""
            else:
                history_context = context or ""
            # 合并所有上下文:用户上下文 + 历史上下文 + 手动上下文
            enhanced_context = "\n\n".join(filter(None, [
                user_context,     # 用户元数据上下文
                history_context,  # 对话历史上下文
                context           # 手动提供的上下文
            ]))
            # 步骤1: 获取所有表的基本信息
            all_tables_result = await self._get_all_tables_async()
            if not all_tables_result['success']:
                return {
                    'success': False,
                    'error': f"获取表信息失败: {all_tables_result['error']}",
                    'step': 'get_tables'
                }
            all_tables = all_tables_result['data']['tables']
            # 步骤2: LLM分析问题并确定需要的表(基于增强上下文)
            required_tables = await self._analyze_tables_needed(question, all_tables, enhanced_context)
            if not required_tables:
                return {
                    'success': False,
                    'error': "LLM无法确定需要查询哪些表",
                    'step': 'analyze_tables'
                }
            # 步骤3: 获取详细表结构
            table_schemas_result = await self._get_table_schemas_async(required_tables)
            if not table_schemas_result['success']:
                return {
                    'success': False,
                    'error': f"获取表结构失败: {table_schemas_result['error']}",
                    'step': 'get_schemas'
                }
            # 步骤4: LLM生成SQL语句(基于增强上下文)
            sql_result = await self._generate_sql(question, table_schemas_result['formatted_output'], enhanced_context)
            if not sql_result['success']:
                return {
                    'success': False,
                    'error': f"SQL生成失败: {sql_result['error']}",
                    'step': 'generate_sql'
                }
            # 记录生成的SQL语句
            logger.info(f"NL2SQL Question: {question} | Generated SQL: {sql_result['generated_sql']}")
            # 步骤5: 执行SQL查询
            execution_result = await self._execute_sql_async(sql_result['generated_sql'])
            if not execution_result['success']:
                return {
                    'success': False,
                    'error': f"SQL执行失败: {execution_result['error']}",
                    'step': 'execute_sql',
                    'generated_sql': sql_result['generated_sql']
                }
            # 步骤6: LLM生成最终回答(基于完整上下文)
            final_answer = await self._generate_final_answer(
                question,
                sql_result['generated_sql'],
                execution_result['formatted_table'],
                enhanced_context
            )
            # 分析上下文来源
            context_sources = []
            if user_context:
                context_sources.append('user_metadata')
            if history_context:
                context_sources.append('conversation_history')
            if context:
                context_sources.append('manual')
            return {
                'success': True,
                'question': question,
                'generated_sql': sql_result['generated_sql'],
                'query_result': execution_result['formatted_table'],
                'final_answer': final_answer,
                'row_count': execution_result['row_count'],
                'step': 'completed',
                'context_used': bool(enhanced_context),
                'context_sources': context_sources,
                'user_context_used': bool(user_context),
                'history_context_used': bool(history_context),
                'manual_context_used': bool(context),
                'enhanced_context_length': len(enhanced_context) if enhanced_context else 0
            }
        except Exception as e:
            logger.error(f"NL2SQL workflow processing failed: {e}")
            return {
                'success': False,
                'error': f"工作流程执行失败: {str(e)}",
                'step': 'workflow_error'
            }

3.1.3 SQL执行器(SQLExecutor)

# core/tools/nl2sql/sql_executor.py
class SQLExecutor:
    """SQL执行器 - 负责SQL安全验证和执行"""
    def __init__(self):
        self.engine = None
        self.connection_pool = None
        self.db_type = getattr(Config, 'SQL_DB_TYPE', 'mysql').lower()
        self._initialize_connection()
    def _initialize_connection(self):
        """初始化数据库连接池"""
        try:
            # 优先使用统一连接池
            if Config.DB_POOL_ENABLED:
                try:
                    from core.database.connection_pool import get_connection_pool
                    self.connection_pool = get_connection_pool()
                    return
                except ImportError:
                    logger.warning("Unified connection pool not available, falling back to SQLAlchemy pool")
            # 回退到SQLAlchemy连接池
            if self.db_type == 'mysql':
                connection_string = (
                    f"mysql+mysqlconnector://"
                    f"{getattr(Config, 'SQL_DB_USERNAME', 'root')}:"
                    f"{getattr(Config, 'SQL_DB_PASSWORD', 'root')}@"
                    f"{getattr(Config, 'SQL_DB_HOST', 'localhost')}:"
                    f"{getattr(Config, 'SQL_DB_PORT', '3306')}/"
                    f"{getattr(Config, 'SQL_DB_DATABASE', 'any4any')}"
                )
            else:
                raise ValueError(f"不支持的数据库类型: {self.db_type}")
            # 使用配置的连接池参数
            self.engine = create_engine(
                connection_string,
                poolclass=QueuePool,
                pool_size=Config.DB_POOL_SIZE,
                pool_pre_ping=Config.DB_POOL_PRE_PING,
                pool_recycle=Config.DB_POOL_RECYCLE,
                echo=False
            )
            logger.info("SQL Executor initialized successfully with SQLAlchemy connection pool.")
        except Exception as e:
            logger.error(f"SQL Executor database connection initialization failed: {e}")
            self.engine = None
    def _validate_sql_safety(self, sql_query: str) -> Tuple[bool, str]:
        """多层SQL安全验证"""
        try:
            # 转换为大写进行检查
            sql_upper = sql_query.strip().upper()
            # 1. 只允许SELECT语句
            if not sql_upper.startswith('SELECT'):
                return False, "出于安全考虑,仅允许SELECT查询"
            # 2. 检查危险关键词
            dangerous_keywords = [
                'DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', 'CREATE',
                'TRUNCATE', 'EXEC', 'EXECUTE', 'UNION', 'MERGE', 'GRANT',
                'REVOKE', 'COMMIT', 'ROLLBACK', 'SAVEPOINT'
            ]
            for keyword in dangerous_keywords:
                if re.search(r'\b' + keyword + r'\b', sql_upper):
                    return False, f"发现危险关键词 '{keyword}'"
            # 3. 检查注释技巧
            if '--' in sql_query or '/*' in sql_query or '*/' in sql_query:
                return False, "不允许在SQL查询中使用注释"
            # 4. 检查多语句
            if ';' in sql_query.rstrip():
                return False, "不允许执行多个SQL语句"
            return True, ""
        except Exception as e:
            return False, f"SQL验证错误: {str(e)}"
    def execute_sql_query(self, sql_query: str) -> Dict[str, Any]:
        """执行已验证的SQL查询"""
        try:
            # 验证SQL安全性
            is_safe, error_msg = self._validate_sql_safety(sql_query)
            if not is_safe:
                return {
                    'success': False,
                    'error': error_msg,
                    'formatted_output': f'SQL安全验证失败: {error_msg}'
                }
            # 优先使用统一连接池
            if self.connection_pool:
                return self._execute_with_unified_pool(sql_query)
            # 回退到SQLAlchemy连接池
            if not self.engine:
                return {
                    'success': False,
                    'error': '数据库连接不可用',
                    'formatted_output': '数据库连接不可用'
                }
            # 执行查询
            with self.engine.connect() as conn:
                result = conn.execute(text(sql_query))
                if result.returns_rows:
                    rows = result.fetchall()
                    columns = list(result.keys())
                    formatted_results = self._format_query_results(rows, columns)
                    return {
                        'success': True,
                        'sql_query': sql_query,
                        'execution_time': '暂无',
                        **formatted_results
                    }
                else:
                    return {
                        'success': True,
                        'sql_query': sql_query,
                        'execution_time': '暂无',
                        'data': [],
                        'columns': [],
                        'row_count': result.rowcount if hasattr(result, 'rowcount') else 0,
                        'formatted_table': f'查询执行成功。{result.rowcount} 行受影响。' if hasattr(result, 'rowcount') else '查询执行成功。'
                    }
        except Exception as e:
            logger.error(f"SQL执行失败: {e}")
            return {
                'success': False,
                'error': str(e),
                'sql_query': sql_query,
                'formatted_output': f'SQL执行失败: {str(e)}'
            }# core/tools/nl2sql/sql_executor.py
class SQLExecutor:
    """SQL执行器 - 负责SQL安全验证和执行"""
    def __init__(self):
        self.engine = None
        self.connection_pool = None
        self.db_type = getattr(Config, 'SQL_DB_TYPE', 'mysql').lower()
        self._initialize_connection()
    def _initialize_connection(self):
        """初始化数据库连接池"""
        try:
            # 优先使用统一连接池
            if Config.DB_POOL_ENABLED:
                try:
                    from core.database.connection_pool import get_connection_pool
                    self.connection_pool = get_connection_pool()
                    return
                except ImportError:
                    logger.warning("Unified connection pool not available, falling back to SQLAlchemy pool")
            # 回退到SQLAlchemy连接池
            if self.db_type == 'mysql':
                connection_string = (
                    f"mysql+mysqlconnector://"
                    f"{getattr(Config, 'SQL_DB_USERNAME', 'root')}:"
                    f"{getattr(Config, 'SQL_DB_PASSWORD', 'root')}@"
                    f"{getattr(Config, 'SQL_DB_HOST', 'localhost')}:"
                    f"{getattr(Config, 'SQL_DB_PORT', '3306')}/"
                    f"{getattr(Config, 'SQL_DB_DATABASE', 'any4any')}"
                )
            else:
                raise ValueError(f"不支持的数据库类型: {self.db_type}")
            # 使用配置的连接池参数
            self.engine = create_engine(
                connection_string,
                poolclass=QueuePool,
                pool_size=Config.DB_POOL_SIZE,
                pool_pre_ping=Config.DB_POOL_PRE_PING,
                pool_recycle=Config.DB_POOL_RECYCLE,
                echo=False
            )
            logger.info("SQL Executor initialized successfully with SQLAlchemy connection pool.")
        except Exception as e:
            logger.error(f"SQL Executor database connection initialization failed: {e}")
            self.engine = None
    def _validate_sql_safety(self, sql_query: str) -> Tuple[bool, str]:
        """多层SQL安全验证"""
        try:
            # 转换为大写进行检查
            sql_upper = sql_query.strip().upper()
            # 1. 只允许SELECT语句
            if not sql_upper.startswith('SELECT'):
                return False, "出于安全考虑,仅允许SELECT查询"
            # 2. 检查危险关键词
            dangerous_keywords = [
                'DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', 'CREATE',
                'TRUNCATE', 'EXEC', 'EXECUTE', 'UNION', 'MERGE', 'GRANT',
                'REVOKE', 'COMMIT', 'ROLLBACK', 'SAVEPOINT'
            ]
            for keyword in dangerous_keywords:
                if re.search(r'\b' + keyword + r'\b', sql_upper):
                    return False, f"发现危险关键词 '{keyword}'"
            # 3. 检查注释技巧
            if '--' in sql_query or '/*' in sql_query or '*/' in sql_query:
                return False, "不允许在SQL查询中使用注释"
            # 4. 检查多语句
            if ';' in sql_query.rstrip():
                return False, "不允许执行多个SQL语句"
            return True, ""
        except Exception as e:
            return False, f"SQL验证错误: {str(e)}"
    def execute_sql_query(self, sql_query: str) -> Dict[str, Any]:
        """执行已验证的SQL查询"""
        try:
            # 验证SQL安全性
            is_safe, error_msg = self._validate_sql_safety(sql_query)
            if not is_safe:
                return {
                    'success': False,
                    'error': error_msg,
                    'formatted_output': f'SQL安全验证失败: {error_msg}'
                }
            # 优先使用统一连接池
            if self.connection_pool:
                return self._execute_with_unified_pool(sql_query)
            # 回退到SQLAlchemy连接池
            if not self.engine:
                return {
                    'success': False,
                    'error': '数据库连接不可用',
                    'formatted_output': '数据库连接不可用'
                }
            # 执行查询
            with self.engine.connect() as conn:
                result = conn.execute(text(sql_query))
                if result.returns_rows:
                    rows = result.fetchall()
                    columns = list(result.keys())
                    formatted_results = self._format_query_results(rows, columns)
                    return {
                        'success': True,
                        'sql_query': sql_query,
                        'execution_time': '暂无',
                        **formatted_results
                    }
                else:
                    return {
                        'success': True,
                        'sql_query': sql_query,
                        'execution_time': '暂无',
                        'data': [],
                        'columns': [],
                        'row_count': result.rowcount if hasattr(result, 'rowcount') else 0,
                        'formatted_table': f'查询执行成功。{result.rowcount} 行受影响。' if hasattr(result, 'rowcount') else '查询执行成功。'
                    }
        except Exception as e:
            logger.error(f"SQL执行失败: {e}")
            return {
                'success': False,
                'error': str(e),
                'sql_query': sql_query,
                'formatted_output': f'SQL执行失败: {str(e)}'
            }

3.1.4 上下文管理器(SQLContextManager)

# core/tools/nl2sql/context_manager.py
class SQLContextManager:
    """上下文管理器 - 支持智能历史检索和上下文感知"""
    def __init__(self):
        self.max_history_items = 5
        self.max_context_length = 2000
        self.sql_keywords = [
            '查询', '统计', '多少', '几个', '总数', '平均', '最高', '最低', '列表',
            '显示', '找出', '计算', '汇总', '排序', '分组', '分别', '都', '谁', '什么'
        ]
    async def get_enhanced_context(
        self,
        current_question: str,
        conversation_manager,
        user_id: str,
        platform: str,
        manual_context: str = "",
        max_history: int = 5
    ) -> str:
        """获取增强的上下文信息"""
        try:
            # 获取用户的历史对话
            if not conversation_manager or not user_id:
                return manual_context
            # 获取最近的对话记录
            recent_messages = conversation_manager.get_recent_messages(
                user_id=user_id,
                platform=platform,
                limit=max_history * 2  # 获取更多消息以便筛选
            )
            if not recent_messages:
                return manual_context
            # 计算每条消息与当前问题的相关性得分
            scored_messages = []
            for message in recent_messages:
                content = message.get('content', '')
                if not content:
                    continue
                # 计算相关性得分
                relevance_score = self._calculate_relevance_score(content, current_question)
                if relevance_score > 0.1:  # 只保留有一定相关性的消息
                    scored_messages.append({
                        'content': content,
                        'role': message.get('role', 'user'),
                        'timestamp': message.get('timestamp', 0),
                        'relevance_score': relevance_score
                    })
            # 按相关性得分和时间排序
            scored_messages.sort(key=lambda x: (x['relevance_score'], x['timestamp']), reverse=True)
            # 选择最相关的消息
            selected_messages = scored_messages[:max_history]
            # 构建上下文文本
            context_parts = []
            for msg in selected_messages:
                role_label = "用户" if msg['role'] == 'user' else "助手"
                context_parts.append(f"{role_label}: {msg['content']}")
            # 添加手动上下文
            if manual_context:
                context_parts.append(f"手动上下文: {manual_context}")
            # 合并并控制长度
            enhanced_context = "\n\n".join(context_parts)
            # 如果上下文过长,进行截断
            if len(enhanced_context) > self.max_context_length:
                # 按行截断,保留完整的对话轮次
                lines = enhanced_context.split('\n\n')
                truncated_context = ""
                current_length = 0
                for line in lines:
                    if current_length + len(line) + 2 <= self.max_context_length:
                        truncated_context += line + "\n\n"
                        current_length += len(line) + 2
                    else:
                        break
                enhanced_context = truncated_context.rstrip()
            return enhanced_context
        except Exception as e:
            logger.error(f"Failed to generate enhanced context: {e}")
            return manual_context
    def _calculate_relevance_score(self, message: str, current_question: str) -> float:
        """计算消息与当前问题的相关性得分"""
        try:
            score = 0.0
            # 1. 时间衰减因子
            # 这里简化处理,实际应该根据消息时间戳计算
            # score *= time_decay_factor
            # 2. 关键词匹配度
            message_lower = message.lower()
            question_lower = current_question.lower()
            # SQL相关关键词匹配
            sql_matches = 0
            for keyword in self.sql_keywords:
                if keyword in message_lower and keyword in question_lower:
                    sql_matches += 1
            if sql_matches > 0:
                score += sql_matches * 0.3
            # 3. 语义相似度(基于词汇重叠度)
            message_words = set(message_lower.split())
            question_words = set(question_lower.split())
            if message_words and question_words:
                overlap = len(message_words & question_words)
                similarity = overlap / len(message_words | question_words)
                score += similarity * 0.4
            # 4. 特殊模式匹配
            # 如果消息包含SQL查询结果,提高相关性
            if 'sql:' in message_lower or '查询结果' in message_lower:
                score += 0.3
            return min(score, 1.0)  # 限制最大得分为1.0
        except Exception as e:
            logger.error(f"Failed to calculate relevance score: {e}")
            return 0.0

3.2 启动和测试智能体

3.2.1 启动any4any服务

# 激活虚拟环境conda activate any4any
# 启动服务python cli.py

服务启动后,将看到以下输出:

Starting servers...
Fastapi server started
Mcp server started...

3.2.2 测试NL2SQL智能体

方法1:通过API接口测试

# 健康检查
curl http://localhost:8888/health
# 测试NL2SQL智能体
curl -X POST http://localhost:8888/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{
    "model": "default",
    "messages": [
      {"role": "user", "content": "查询所有用户的数量"}
    ],
    "user": "test_user_001"
  }'
# 测试复杂查询
curl -X POST http://localhost:8888/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{
    "model": "default",
    "messages": [
      {"role": "user", "content": "统计所有订单数量"}
    ],
    "user": "test_user_001"
  }'

方法2:通过Web界面测试

访问 http://localhost:8888/docs 查看自动生成的API文档,或访问 http://localhost:8888/chat/ 使用Web聊天界面。

4. 配置注意事项和最佳实践

4.1 关键配置项详解

4.1.1 数据库连接池配置

# 连接池配置
DB_POOL_ENABLED=true          # 启用连接池
DB_POOL_SIZE=10              # 连接池大小
DB_POOL_PRE_PING=true        # 连接预检
DB_POOL_RECYCLE=3600         # 连接回收时间(秒)
DB_RETRY_ENABLED=true        # 启用重试机制
DB_CIRCUIT_BREAKER_THRESHOLD=5  # 熔断器失败阈值
DB_CIRCUIT_BREAKER_TIMEOUT=60   # 熔断器恢复时间(秒)

配置建议

  • 生产环境建议 DB_POOL_SIZE 设置为 20-50

  • 开发环境可以设置为 5-10

  • DB_POOL_RECYCLE 建议设置为 MySQL 的 wait_timeout 的 80%

4.1.2 安全配置

# API安全配置
API_KEY=your_secure_api_key   # API访问密钥
SESSION_MAX_AGE=3600         # 会话有效期(秒)
# 数据库安全配置
SQL_DB_USERNAME=any4any_nl2sql    # 只读用户
SQL_DB_PASSWORD=strong_password    # 强密码

安全建议

  1. 使用强密码作为API密钥

  2. 为NL2SQL创建专用的只读数据库用户

  3. 定期轮换数据库密码

  4. 监控异常查询行为

4.2 性能优化建议

4.2.1 数据库优化

-- 为常用查询字段创建索引
CREATE INDEX idx_users_city ON users(city);
CREATE INDEX idx_users_age ON users(age);
CREATE INDEX idx_orders_user_id ON orders(user_id);
-- 分析表统计信息
ANALYZE TABLE users, orders;

4.3 故障排除

4.3.1 常见问题及解决方案

问题1:智能体无法识别SQL查询

症状:用户输入SQL相关问题,但系统没有调用NL2SQL智能体

排查步骤

  1. 检查 TOOLS_ENABLED=true

  2. 查看日志中的工具选择过程

  3. 确认LLM服务正常工作

问题2:SQL生成不准确

症状:生成的SQL语句语法错误或逻辑错误

排查步骤

  1. 检查表结构信息是否正确获取

  2. 查看SQL生成的提示词

  3. 确认LLM模型能力

  4. 检查上下文信息是否干扰

解决方案

# 在workflow.py中添加调试信息
logger.debug(f"Table schemas: {table_schemas_result['formatted_output']}")
logger.debug(f"Enhanced context: {enhanced_context}")
logger.debug(f"SQL generation prompt: {prompt}")

问题3:上下文感知功能不工作

症状:多轮对话中无法理解上下文

排查步骤

  1. 确认用户信息和平台信息传递

  2. 检查会话管理器是否正常

  3. 查看上下文检索日志

  4. 验证历史对话存储

问题4:数据库连接问题

症状:SQL执行失败,提示数据库连接错误

排查步骤

  1. 检查数据库服务状态

  2. 验证连接参数配置

  3. 测试数据库用户权限

  4. 检查防火墙设置

解决方案

# 测试数据库连接
mysql -h localhost -u any4any_nl2sql -p any4any

5. 总结

通过本教程,您已经掌握了基于any4any智能体架构的完整NL2SQL实现方案。相比传统的基于工作流或提示词的方法,智能体架构具有以下核心优势:

5.1 技术创新特点

  1. 智能工具选择:通过LLM智能选择合适的工具,而不是硬编码规则

  2. 8步完整工作流:从上下文获取到结果生成的完整处理流程

  3. 上下文感知能力:支持多轮对话和智能追问处理

  4. 多层安全验证:确保SQL查询的安全性

  5. 异步处理架构:支持高并发查询处理

5.2 架构优势

  • 模块化设计:每个组件职责明确,易于维护和扩展

  • 智能决策:通过LLM进行智能决策,提高系统灵活性

  • 安全可靠:多层安全机制确保系统稳定运行

  • 性能优化:连接池、缓存等机制保证高性能

5.3 应用价值

  • 降低技术门槛:用户无需学习SQL即可进行数据查询

  • 提高查询效率:智能上下文感知减少重复信息

  • 增强用户体验:自然的对话式查询交互

  • 保障数据安全:完善的权限控制和安全机制

any4any的NL2SQL智能体实现代表了当前智能问数技术的先进水平,为构建企业级智能数据查询系统提供了完整的解决方案。通过本教程的学习,您可以快速部署和使用这套系统,并根据实际需求进行定制化扩展。


基于any4any智能体的NL2SQL智能问数
http://localhost:8090//archives/1765679536338
作者
昊昱天合
发布于
2025年12月14日
更新于
2025年12月14日
许可协议