base_tool.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. import re
  2. import html
  3. import requests
  4. import json
  5. from typing import List, Dict, Any, Optional, Callable
  6. from pathlib import Path
  7. def html_to_text(html_content: str) -> str:
  8. """HTML转文本"""
  9. if not html_content:
  10. return ""
  11. clean = re.compile(r"<[^>]+>")
  12. text = clean.sub("", html_content)
  13. text = html.unescape(text)
  14. return re.sub(r"\s+", " ", text).strip()
  15. def get_unique_match_count(search_text: str, filter_words: List[str]) -> int:
  16. """获取唯一匹配计数"""
  17. sorted_keywords = sorted(filter_words, key=len, reverse=True)
  18. match_count = 0
  19. remaining_text = search_text.lower()
  20. for keyword in sorted_keywords:
  21. kw_lower = keyword.lower()
  22. if kw_lower in remaining_text:
  23. match_count += 1
  24. remaining_text = remaining_text.replace(kw_lower, "", 1)
  25. return match_count
  26. def calculate_relevance_score(
  27. doc_name: str, doc_keywords: str, search_keywords: List[str]
  28. ) -> float:
  29. """
  30. 计算文档与搜索关键词的相关性得分
  31. Args:
  32. doc_name: 文档标题
  33. doc_keywords: 文档关键词
  34. search_keywords: 搜索关键词列表
  35. Returns:
  36. 相关性得分(0-100)
  37. """
  38. # 合并搜索文本
  39. search_text = f"{doc_name} {doc_keywords}".lower()
  40. search_keywords_lower = [kw.lower() for kw in search_keywords]
  41. # 权重设置
  42. TITLE_WEIGHT = 0.7 # 标题权重(提高)
  43. KEYWORD_WEIGHT = 0.3 # 关键词权重
  44. EXACT_MATCH_BONUS = 0.5 # 精确匹配奖励(提高)
  45. PARTIAL_MATCH_FACTOR = 0.3 # 部分匹配因子
  46. total_score = 0.0
  47. # 1. 标题匹配得分(重新设计)
  48. title_score = 0.0
  49. doc_name_lower = doc_name.lower()
  50. for keyword in search_keywords_lower:
  51. if keyword in doc_name_lower:
  52. # 基础得分:基于关键词长度和位置
  53. base_score = min(len(keyword) * 2, 10) # 每个字符2分,最多10分
  54. # 位置权重:标题开头和结尾的匹配更重要
  55. if doc_name_lower.startswith(keyword):
  56. base_score *= 1.5
  57. elif doc_name_lower.endswith(keyword):
  58. base_score *= 1.3
  59. # 精确匹配奖励:完全包含关键词
  60. if f" {keyword} " in f" {doc_name_lower} ":
  61. base_score *= 1 + EXACT_MATCH_BONUS
  62. title_score += base_score
  63. # 2. 关键词匹配得分
  64. keyword_score = 0.0
  65. for keyword in search_keywords_lower:
  66. if keyword in doc_keywords.lower():
  67. keyword_score += min(len(keyword) * 1.5, 8) # 每个字符1.5分,最多8分
  68. # 3. 计算总得分
  69. total_score = (title_score * TITLE_WEIGHT) + (keyword_score * KEYWORD_WEIGHT)
  70. # 4. 匹配数量奖励(重要改进)
  71. matched_count = sum(1 for kw in search_keywords_lower if kw in search_text)
  72. if matched_count > 0:
  73. coverage_ratio = matched_count / len(search_keywords_lower)
  74. # 匹配越多,奖励越大
  75. total_score *= 1 + coverage_ratio * 0.5
  76. # 5. 特殊关键词优先级(针对你的具体需求)
  77. priority_keywords = ["交期", "审核", "修改", "终止", "失败", "错误", "明细", "功能"]
  78. for keyword in priority_keywords:
  79. if keyword in search_keywords_lower and keyword in search_text:
  80. total_score *= 1.2 # 提高优先级关键词奖励
  81. # 6. 长标题惩罚调整(避免长标题得分过低)
  82. if len(doc_name) > 30:
  83. # 长标题轻微惩罚,但不要过度惩罚
  84. total_score *= 0.9
  85. # 7. 确保至少匹配一个关键词就有基础分
  86. if matched_count == 0:
  87. return 0.0
  88. return min(total_score, 100.0)
  89. def find_most_relevant_document(
  90. doc_list: List[dict], search_keywords: List[str], max_matches: int = 10
  91. ) -> List[dict]:
  92. """
  93. 找到最相关的文档
  94. Args:
  95. doc_list: 文档列表
  96. search_keywords: 搜索关键词
  97. max_matches: 最大返回数量(增加到10)
  98. Returns:
  99. 按相关性排序的文档列表
  100. """
  101. scored_docs = []
  102. for doc in doc_list:
  103. doc_id = doc["DocID"]
  104. doc_name = doc["DocName"]
  105. doc_keywords = doc.get("keyword", "")
  106. # 计算相关性得分
  107. score = calculate_relevance_score(doc_name, doc_keywords, search_keywords)
  108. # 降低过滤门槛,只要匹配至少一个关键词就考虑
  109. match_count = sum(
  110. 1
  111. for kw in search_keywords
  112. if kw.lower() in f"{doc_name} {doc_keywords}".lower()
  113. )
  114. if match_count > 0:
  115. scored_docs.append(
  116. {
  117. "doc_id": doc_id,
  118. "doc_name": doc_name,
  119. "keywords": doc_keywords,
  120. "relevance_score": score,
  121. "match_count": match_count,
  122. }
  123. )
  124. # 按相关性得分降序排序
  125. scored_docs.sort(key=lambda x: x["relevance_score"], reverse=True)
  126. return scored_docs[:max_matches]
  127. def call_csharp_api(
  128. backend_url: str, token: str, uoName: str, functionName: str, SParms: dict
  129. ) -> str:
  130. """调用C# API的通用方法"""
  131. print(f"🔧 API调用调试信息:")
  132. print(f" - 后端地址: {backend_url}")
  133. print(f" - Token: {'已配置' if token else '未配置'}")
  134. print(f" - 功能: {functionName}")
  135. print(f" - 参数: {SParms}")
  136. if not backend_url or not token:
  137. error_msg = f"错误:未配置后端地址或认证令牌。后端: {backend_url or '未配置'}, Token: {'已配置' if token else '未配置'}"
  138. print(f"❌ {error_msg}")
  139. return error_msg
  140. headers = {
  141. "Accept": "application/json, text/plain, */*",
  142. "Content-Type": "application/json",
  143. "X-TOKEN": token,
  144. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
  145. }
  146. payload = {
  147. "token": token,
  148. "CList": [
  149. {
  150. "uoName": uoName,
  151. "functionName": functionName,
  152. "SParms": SParms,
  153. "ifcommit": True,
  154. "returnStrList": [],
  155. }
  156. ],
  157. "language": "zh-cn",
  158. }
  159. try:
  160. print(f"🌐 发送API请求到: {backend_url}")
  161. response = requests.post(backend_url, headers=headers, json=payload, timeout=30)
  162. print(f"📡 响应状态码: {response.status_code}")
  163. if response.status_code == 200:
  164. data = response.json()
  165. # 检查是否存在ErrMsg字段,如果有则直接返回错误信息
  166. if "ErrMsg" in data and data["ErrMsg"]:
  167. error_msg = f"API返回错误: {data['ErrMsg']}"
  168. print(f"❌ {error_msg}")
  169. return error_msg
  170. return process_api_response(data)
  171. else:
  172. error_msg = f"API请求失败,状态码: {response.status_code}"
  173. print(f"❌ {error_msg}")
  174. return error_msg
  175. except Exception as e:
  176. error_msg = f"API调用异常: {str(e)}"
  177. print(f"❌ {error_msg}")
  178. return error_msg
  179. def process_api_response(data: Dict[str, Any]) -> str:
  180. """处理API响应"""
  181. try:
  182. inner_json_str = data.get("reJob", {}).get("0", "{}")
  183. print("inner_json_str:" + inner_json_str)
  184. inner_data = json.loads(inner_json_str)
  185. if "err_msg" in inner_data:
  186. return f"API返回错误: {inner_data['err_msg']}"
  187. warning_msg = None
  188. if "warning_msg" in inner_data:
  189. warning_msg = inner_data["warning_msg"]
  190. if "data" in inner_data:
  191. data_list = inner_data["data"]
  192. if not data_list:
  193. return "没有数据"
  194. if isinstance(data_list[0], dict):
  195. headers = list(data_list[0].keys())
  196. result = [",".join(headers)]
  197. for row in data_list:
  198. result.append(",".join([str(row.get(h, "")) for h in headers]))
  199. if warning_msg:
  200. result.append(f"# 警告: {warning_msg}")
  201. print(result)
  202. return "\n".join(result)
  203. return json.dumps(data, ensure_ascii=False)
  204. except Exception as e:
  205. return f"响应处理错误: {str(e)}"
  206. # 工具配置管理函数
  207. def load_tool_config(
  208. config_path: Path, get_default_config: Optional[Callable] = None
  209. ) -> Dict[str, Any]:
  210. """
  211. 加载工具配置的通用函数
  212. Args:
  213. config_path: 配置文件路径
  214. get_default_config: 获取默认配置的回调函数,如果不提供则返回空字典
  215. """
  216. if not config_path.exists():
  217. print(f"警告: 配置文件不存在: {config_path}")
  218. if get_default_config:
  219. return get_default_config()
  220. return {}
  221. try:
  222. with open(config_path, "r", encoding="utf-8") as f:
  223. return json.load(f)
  224. except json.JSONDecodeError as e:
  225. print(f"错误: 配置文件格式不正确: {e}")
  226. if get_default_config:
  227. return get_default_config()
  228. return {}
  229. except Exception as e:
  230. print(f"错误: 读取配置文件失败: {e}")
  231. if get_default_config:
  232. return get_default_config()
  233. return {}
  234. def assemble_tool_description(tool_config: Dict[str, Any]) -> str:
  235. """组装工具描述,将所有键值组合成一个完整的字符串"""
  236. if not tool_config:
  237. return ""
  238. description_parts = []
  239. # 基础描述
  240. if "基础描述" in tool_config:
  241. description_parts.append(tool_config["基础描述"])
  242. # 功能说明
  243. if "功能说明" in tool_config:
  244. description_parts.append(f"\n功能: {tool_config['功能说明']}")
  245. # 入参说明
  246. if "入参说明" in tool_config:
  247. if isinstance(tool_config["入参说明"], dict):
  248. description_parts.append("\n参数:")
  249. for param, desc in tool_config["入参说明"].items():
  250. description_parts.append(f" {param}: {desc}")
  251. else:
  252. description_parts.append(f"\n参数说明: {tool_config['入参说明']}")
  253. # 返回值说明
  254. if "返回值说明" in tool_config:
  255. if isinstance(tool_config["返回值说明"], dict):
  256. description_parts.append("\n返回:")
  257. for key, value in tool_config["返回值说明"].items():
  258. if isinstance(value, list):
  259. description_parts.append(f" {key}:")
  260. for item in value:
  261. description_parts.append(f" - {item}")
  262. else:
  263. description_parts.append(f" {key}: {value}")
  264. else:
  265. description_parts.append(f"\n返回结果: {tool_config['返回值说明']}")
  266. # 输出格式要求
  267. if "输出格式要求" in tool_config:
  268. if isinstance(tool_config["输出格式要求"], list):
  269. description_parts.append("\n输出要求:")
  270. for requirement in tool_config["输出格式要求"]:
  271. description_parts.append(f" - {requirement}")
  272. else:
  273. description_parts.append(f"\n注意: {tool_config['输出格式要求']}")
  274. # 使用示例
  275. if "使用示例" in tool_config:
  276. if isinstance(tool_config["使用示例"], list):
  277. description_parts.append("\n示例:")
  278. for example in tool_config["使用示例"]:
  279. description_parts.append(f" - {example}")
  280. else:
  281. description_parts.append(f"\n示例: {tool_config['使用示例']}")
  282. return "\n".join(description_parts)
  283. def get_tool_prompt(
  284. tool_name: str, default_config_func: Optional[Callable] = None
  285. ) -> str:
  286. """
  287. 获取工具的完整提示词
  288. Args:
  289. tool_name: 工具名称
  290. default_config_func: 获取默认配置的函数
  291. """
  292. # 计算配置文件路径
  293. current_file = Path(__file__)
  294. config_path = current_file.parent.parent / "config" / "tool_config.json"
  295. # 加载配置
  296. config = load_tool_config(config_path, default_config_func)
  297. # 获取工具配置
  298. tool_config = config.get(tool_name, {})
  299. # 如果配置为空且提供了默认配置函数,使用默认配置
  300. if not tool_config and default_config_func:
  301. default_config = default_config_func()
  302. if isinstance(default_config, dict) and tool_name in default_config:
  303. tool_config = default_config[tool_name]
  304. elif isinstance(default_config, dict) and not default_config:
  305. # 如果返回的是整个配置字典
  306. tool_config = default_config
  307. else:
  308. tool_config = {}
  309. # 组装描述
  310. if tool_config:
  311. return assemble_tool_description(tool_config)
  312. else:
  313. return f"执行 {tool_name} 功能"