import base64 import requests from typing import Dict, Any, Optional from utils.logger import chat_logger import aiohttp import asyncio class PaddleOCRService: """PaddleOCR API服务封装""" def __init__(self, api_url: str, token: str): self.api_url = api_url self.token = token self.headers = { "Authorization": f"token {token}", "Content-Type": "application/json", } async def recognize_image_async( self, image_bytes: bytes, file_type: int = 1, # 1: 图片, 0: PDF use_doc_orientation_classify: bool = False, use_doc_unwarping: bool = False, use_textline_orientation: bool = False, ) -> Dict[str, Any]: """异步调用OCR API""" # 同步调用(如果不需要异步,可以用requests) return self.recognize_image_sync( image_bytes=image_bytes, file_type=file_type, use_doc_orientation_classify=use_doc_orientation_classify, use_doc_unwarping=use_doc_unwarping, use_textline_orientation=use_textline_orientation, ) def recognize_image_sync( self, image_bytes: bytes, file_type: int = 1, use_doc_orientation_classify: bool = False, use_doc_unwarping: bool = False, use_textline_orientation: bool = False, ) -> Dict[str, Any]: """同步调用OCR API""" try: # 转换为base64 file_data = base64.b64encode(image_bytes).decode("ascii") payload = { "file": file_data, "fileType": file_type, "useDocOrientationClassify": use_doc_orientation_classify, "useDocUnwarping": use_doc_unwarping, "useTextlineOrientation": use_textline_orientation, } response = requests.post( self.api_url, json=payload, headers=self.headers, timeout=30 ) if response.status_code != 200: chat_logger.error( f"OCR API调用失败: {response.status_code}, {response.text}" ) raise Exception(f"OCR识别失败: {response.status_code}") result = response.json() if "result" not in result: chat_logger.error(f"OCR返回格式错误: {result}") raise Exception("OCR返回格式错误") return result["result"] except requests.exceptions.Timeout: chat_logger.error("OCR API调用超时") raise Exception("OCR识别超时") except Exception as e: chat_logger.error(f"OCR识别异常: {str(e)}") raise def extract_text_from_result(self, json_data: Dict) -> str: """从OCR结果中提取文本(格式化)""" try: ocr_result = json_data["ocrResults"][0]["prunedResult"] texts = ocr_result["rec_texts"] scores = ocr_result["rec_scores"] boxes = ocr_result["rec_boxes"] ocr_text = "识别文本 | 识别框坐标" ocr_text += "\r\n" + ("-" * 50) for text, score, box in zip(texts, scores, boxes): if score > 0.5: ocr_text += f"\r\n{text} | {box}" # # 按置信度过滤并排序 # filtered_results = [] # for i, (text, score) in enumerate(zip(texts, scores)): # if score > 0.5: # 置信度阈值 # filtered_results.append( # { # "index": i, # "text": text, # "score": float(score), # "box": ( # ocr_data["rec_boxes"][i] # if i < len(ocr_data["rec_boxes"]) # else None # ), # } # ) # # 按位置排序(从上到下,从左到右) # filtered_results.sort( # key=lambda x: ( # x["box"][0][1] if x["box"] else 0, # y坐标 # x["box"][0][0] if x["box"] else 0, # x坐标 # ) # ) # # 拼接为文本 # ocr_text = "\n".join([item["text"] for item in filtered_results]) # chat_logger.info(f"OCR识别成功,识别到{len(filtered_results)}个文本块") return ocr_text except Exception as e: chat_logger.error(f"提取OCR文本失败: {str(e)}") raise