| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365 |
- import re
- import html
- import requests
- import json
- from typing import List, Dict, Any, Optional, Callable
- from pathlib import Path
- def html_to_text(html_content: str) -> str:
- """HTML转文本"""
- if not html_content:
- return ""
- clean = re.compile(r"<[^>]+>")
- text = clean.sub("", html_content)
- text = html.unescape(text)
- return re.sub(r"\s+", " ", text).strip()
- def get_unique_match_count(search_text: str, filter_words: List[str]) -> int:
- """获取唯一匹配计数"""
- sorted_keywords = sorted(filter_words, key=len, reverse=True)
- match_count = 0
- remaining_text = search_text.lower()
- for keyword in sorted_keywords:
- kw_lower = keyword.lower()
- if kw_lower in remaining_text:
- match_count += 1
- remaining_text = remaining_text.replace(kw_lower, "", 1)
- return match_count
- def calculate_relevance_score(
- doc_name: str, doc_keywords: str, search_keywords: List[str]
- ) -> float:
- """
- 计算文档与搜索关键词的相关性得分
- Args:
- doc_name: 文档标题
- doc_keywords: 文档关键词
- search_keywords: 搜索关键词列表
- Returns:
- 相关性得分(0-100)
- """
- # 合并搜索文本
- search_text = f"{doc_name} {doc_keywords}".lower()
- search_keywords_lower = [kw.lower() for kw in search_keywords]
- # 权重设置
- TITLE_WEIGHT = 0.7 # 标题权重(提高)
- KEYWORD_WEIGHT = 0.3 # 关键词权重
- EXACT_MATCH_BONUS = 0.5 # 精确匹配奖励(提高)
- PARTIAL_MATCH_FACTOR = 0.3 # 部分匹配因子
- total_score = 0.0
- # 1. 标题匹配得分(重新设计)
- title_score = 0.0
- doc_name_lower = doc_name.lower()
- for keyword in search_keywords_lower:
- if keyword in doc_name_lower:
- # 基础得分:基于关键词长度和位置
- base_score = min(len(keyword) * 2, 10) # 每个字符2分,最多10分
- # 位置权重:标题开头和结尾的匹配更重要
- if doc_name_lower.startswith(keyword):
- base_score *= 1.5
- elif doc_name_lower.endswith(keyword):
- base_score *= 1.3
- # 精确匹配奖励:完全包含关键词
- if f" {keyword} " in f" {doc_name_lower} ":
- base_score *= 1 + EXACT_MATCH_BONUS
- title_score += base_score
- # 2. 关键词匹配得分
- keyword_score = 0.0
- for keyword in search_keywords_lower:
- if keyword in doc_keywords.lower():
- keyword_score += min(len(keyword) * 1.5, 8) # 每个字符1.5分,最多8分
- # 3. 计算总得分
- total_score = (title_score * TITLE_WEIGHT) + (keyword_score * KEYWORD_WEIGHT)
- # 4. 匹配数量奖励(重要改进)
- matched_count = sum(1 for kw in search_keywords_lower if kw in search_text)
- if matched_count > 0:
- coverage_ratio = matched_count / len(search_keywords_lower)
- # 匹配越多,奖励越大
- total_score *= 1 + coverage_ratio * 0.5
- # 5. 特殊关键词优先级(针对你的具体需求)
- priority_keywords = ["交期", "审核", "修改", "终止", "失败", "错误", "明细", "功能"]
- for keyword in priority_keywords:
- if keyword in search_keywords_lower and keyword in search_text:
- total_score *= 1.2 # 提高优先级关键词奖励
- # 6. 长标题惩罚调整(避免长标题得分过低)
- if len(doc_name) > 30:
- # 长标题轻微惩罚,但不要过度惩罚
- total_score *= 0.9
- # 7. 确保至少匹配一个关键词就有基础分
- if matched_count == 0:
- return 0.0
- return min(total_score, 100.0)
- def find_most_relevant_document(
- doc_list: List[dict], search_keywords: List[str], max_matches: int = 10
- ) -> List[dict]:
- """
- 找到最相关的文档
- Args:
- doc_list: 文档列表
- search_keywords: 搜索关键词
- max_matches: 最大返回数量(增加到10)
- Returns:
- 按相关性排序的文档列表
- """
- scored_docs = []
- for doc in doc_list:
- doc_id = doc["DocID"]
- doc_name = doc["DocName"]
- doc_keywords = doc.get("keyword", "")
- # 计算相关性得分
- score = calculate_relevance_score(doc_name, doc_keywords, search_keywords)
- # 降低过滤门槛,只要匹配至少一个关键词就考虑
- match_count = sum(
- 1
- for kw in search_keywords
- if kw.lower() in f"{doc_name} {doc_keywords}".lower()
- )
- if match_count > 0:
- scored_docs.append(
- {
- "doc_id": doc_id,
- "doc_name": doc_name,
- "keywords": doc_keywords,
- "relevance_score": score,
- "match_count": match_count,
- }
- )
- # 按相关性得分降序排序
- scored_docs.sort(key=lambda x: x["relevance_score"], reverse=True)
- return scored_docs[:max_matches]
- def call_csharp_api(
- backend_url: str, token: str, uoName: str, functionName: str, SParms: dict
- ) -> str:
- """调用C# API的通用方法"""
- print(f"🔧 API调用调试信息:")
- print(f" - 后端地址: {backend_url}")
- print(f" - Token: {'已配置' if token else '未配置'}")
- print(f" - 功能: {functionName}")
- print(f" - 参数: {SParms}")
- if not backend_url or not token:
- error_msg = f"错误:未配置后端地址或认证令牌。后端: {backend_url or '未配置'}, Token: {'已配置' if token else '未配置'}"
- print(f"❌ {error_msg}")
- return error_msg
- headers = {
- "Accept": "application/json, text/plain, */*",
- "Content-Type": "application/json",
- "X-TOKEN": token,
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
- }
- payload = {
- "token": token,
- "CList": [
- {
- "uoName": uoName,
- "functionName": functionName,
- "SParms": SParms,
- "ifcommit": True,
- "returnStrList": [],
- }
- ],
- "language": "zh-cn",
- }
- try:
- print(f"🌐 发送API请求到: {backend_url}")
- response = requests.post(backend_url, headers=headers, json=payload, timeout=30)
- print(f"📡 响应状态码: {response.status_code}")
- if response.status_code == 200:
- data = response.json()
- return process_api_response(data)
- else:
- error_msg = f"API请求失败,状态码: {response.status_code}"
- print(f"❌ {error_msg}")
- return error_msg
- except Exception as e:
- error_msg = f"API调用异常: {str(e)}"
- print(f"❌ {error_msg}")
- return error_msg
- def process_api_response(data: Dict[str, Any]) -> str:
- """处理API响应"""
- try:
- inner_json_str = data.get("reJob", {}).get("0", "{}")
- inner_data = json.loads(inner_json_str)
- if "err_msg" in inner_data:
- return f"API返回错误: {inner_data['err_msg']}"
- if "data" in inner_data:
- data_list = inner_data["data"]
- if not data_list:
- return "NO_DATA"
- if isinstance(data_list[0], dict):
- headers = list(data_list[0].keys())
- result = [",".join(headers)]
- for row in data_list:
- result.append(",".join([str(row.get(h, "")) for h in headers]))
- return "\n".join(result)
- return json.dumps(data, ensure_ascii=False)
- except Exception as e:
- return f"响应处理错误: {str(e)}"
- # 工具配置管理函数
- def load_tool_config(
- config_path: Path, get_default_config: Optional[Callable] = None
- ) -> Dict[str, Any]:
- """
- 加载工具配置的通用函数
- Args:
- config_path: 配置文件路径
- get_default_config: 获取默认配置的回调函数,如果不提供则返回空字典
- """
- if not config_path.exists():
- print(f"警告: 配置文件不存在: {config_path}")
- if get_default_config:
- return get_default_config()
- return {}
- try:
- with open(config_path, "r", encoding="utf-8") as f:
- return json.load(f)
- except json.JSONDecodeError as e:
- print(f"错误: 配置文件格式不正确: {e}")
- if get_default_config:
- return get_default_config()
- return {}
- except Exception as e:
- print(f"错误: 读取配置文件失败: {e}")
- if get_default_config:
- return get_default_config()
- return {}
- def assemble_tool_description(tool_config: Dict[str, Any]) -> str:
- """组装工具描述,将所有键值组合成一个完整的字符串"""
- if not tool_config:
- return ""
- description_parts = []
- # 基础描述
- if "基础描述" in tool_config:
- description_parts.append(tool_config["基础描述"])
- # 功能说明
- if "功能说明" in tool_config:
- description_parts.append(f"\n功能: {tool_config['功能说明']}")
- # 入参说明
- if "入参说明" in tool_config:
- if isinstance(tool_config["入参说明"], dict):
- description_parts.append("\n参数:")
- for param, desc in tool_config["入参说明"].items():
- description_parts.append(f" {param}: {desc}")
- else:
- description_parts.append(f"\n参数说明: {tool_config['入参说明']}")
- # 返回值说明
- if "返回值说明" in tool_config:
- if isinstance(tool_config["返回值说明"], dict):
- description_parts.append("\n返回:")
- for key, value in tool_config["返回值说明"].items():
- if isinstance(value, list):
- description_parts.append(f" {key}:")
- for item in value:
- description_parts.append(f" - {item}")
- else:
- description_parts.append(f" {key}: {value}")
- else:
- description_parts.append(f"\n返回结果: {tool_config['返回值说明']}")
- # 输出格式要求
- if "输出格式要求" in tool_config:
- if isinstance(tool_config["输出格式要求"], list):
- description_parts.append("\n输出要求:")
- for requirement in tool_config["输出格式要求"]:
- description_parts.append(f" - {requirement}")
- else:
- description_parts.append(f"\n注意: {tool_config['输出格式要求']}")
- # 使用示例
- if "使用示例" in tool_config:
- description_parts.append(f"\n示例: {tool_config['使用示例']}")
- return "\n".join(description_parts)
- def get_tool_prompt(
- tool_name: str, default_config_func: Optional[Callable] = None
- ) -> str:
- """
- 获取工具的完整提示词
- Args:
- tool_name: 工具名称
- default_config_func: 获取默认配置的函数
- """
- # 计算配置文件路径
- current_file = Path(__file__)
- config_path = current_file.parent.parent / "config" / "tool_config.json"
- # 加载配置
- config = load_tool_config(config_path, default_config_func)
- # 获取工具配置
- tool_config = config.get(tool_name, {})
- # 如果配置为空且提供了默认配置函数,使用默认配置
- if not tool_config and default_config_func:
- default_config = default_config_func()
- if isinstance(default_config, dict) and tool_name in default_config:
- tool_config = default_config[tool_name]
- elif isinstance(default_config, dict) and not default_config:
- # 如果返回的是整个配置字典
- tool_config = default_config
- else:
- tool_config = {}
- # 组装描述
- if tool_config:
- return assemble_tool_description(tool_config)
- else:
- return f"执行 {tool_name} 功能"
|