2 Коміти 02fa8d0808 ... ad7e6205fc

Автор SHA1 Опис Дата
  shuiping150 ad7e6205fc Merge branch 'master' of http://git.longjoe.com:3000/longjoedyy/LongjoeAgent 2 тижнів тому
  shuiping150 f99e098969 添加以图搜图接口 2 тижнів тому
5 змінених файлів з 965 додано та 1 видалено
  1. 74 0
      api/models.py
  2. 159 1
      api/routes.py
  3. 306 0
      core/image_search_service.py
  4. BIN
      requirements.txt
  5. 426 0
      tests/test_image_search_api.py

+ 74 - 0
api/models.py

@@ -45,3 +45,77 @@ class MessageCreateBill(BaseModel):
 
     message: str
     document_type: str = None
+
+
+class ImageVectorItem(BaseModel):
+    """图片向量项"""
+    image_id: str  # 图片ID
+    vector: List[float]  # 图片特征向量
+    image_name: Optional[str] = None  # 图片名称,可选
+    image_path: Optional[str] = None  # 图片路径,可选
+
+
+class ImageVectorRequest(BaseModel):
+    """计算图片特征向量请求"""
+    image: str  # Base64编码的图片数据
+    image_id: Optional[str] = None  # 图片ID,可选
+
+
+class ImageVectorResponse(BaseModel):
+    """计算图片特征向量响应"""
+    success: bool
+    image_id: Optional[str] = None
+    vector: Optional[List[float]] = None
+    error: Optional[str] = None
+
+
+class BuildIndexRequest(BaseModel):
+    """构建索引请求"""
+    image_vectors: List[ImageVectorItem]  # 图片向量列表
+
+
+class BuildIndexResponse(BaseModel):
+    """构建索引响应"""
+    success: bool
+    indexed_count: int  # 索引的图片数量
+    error: Optional[str] = None
+
+
+class SearchResultItem(BaseModel):
+    """搜索结果项"""
+    image_id: str  # 图片ID
+    similarity: float  # 相似度
+    image_name: Optional[str] = None  # 图片名称
+    image_path: Optional[str] = None  # 图片路径
+
+
+class SearchRequest(BaseModel):
+    """搜索请求(支持以图搜图和以文搜图)"""
+    image: Optional[str] = None  # Base64编码的图片数据,可选
+    text: Optional[str] = None  # 文字描述,可选
+    top_k: int = 10  # 返回结果数量
+
+
+class SearchResponse(BaseModel):
+    """搜索响应"""
+    success: bool
+    results: List[SearchResultItem]  # 搜索结果列表
+    total_count: int  # 总结果数量
+    processing_time: float  # 处理时间(秒)
+    error: Optional[str] = None
+
+
+class SearchHistoryRequest(BaseModel):
+    """记录搜索历史请求"""
+    empid: int  # 员工ID
+    search_type: int  # 搜索类型:1-以图搜图,2-以文搜图,3-图文混合搜索
+    search_content: str  # 搜索内容
+    result_count: int  # 搜索结果数量
+    result_content: str  # 搜索结果内容(JSON格式)
+
+
+class SearchHistoryResponse(BaseModel):
+    """记录搜索历史响应"""
+    success: bool
+    history_id: Optional[int] = None
+    error: Optional[str] = None

+ 159 - 1
api/routes.py

@@ -1,11 +1,17 @@
 import base64
 from datetime import datetime
+from typing import List
 from fastapi import APIRouter, HTTPException
 
 from utils.device_id import get_device_id
-from .models import ChatRequest, ChatResponse, OCRRequest, MessageCreateBill
+from .models import (
+    ChatRequest, ChatResponse, OCRRequest, MessageCreateBill,
+    ImageVectorRequest, ImageVectorResponse, BuildIndexRequest, BuildIndexResponse,
+    SearchRequest, SearchResponse, SearchResultItem
+)
 from core.chat_service import chat_service
 from core.agent_manager import agent_manager
+from core.image_search_service import image_search_service
 from utils.logger import chat_logger
 from tools.tool_factory import get_all_tools
 import time
@@ -314,3 +320,155 @@ async def root():
         )
 
     return base_info
+
+
+@router.post("/image/vector/batch", response_model=List[ImageVectorResponse])
+async def batch_calculate_vectors_endpoint(requests: List[ImageVectorRequest]):
+    """批量计算图片特征向量"""
+    try:
+        # 构建请求数据
+        image_items = []
+        for req in requests:
+            image_items.append({
+                "image": req.image,
+                "image_id": req.image_id
+            })
+        
+        # 调用服务
+        results = await image_search_service.batch_calculate_vectors(image_items)
+        
+        # 构建响应
+        responses = []
+        for result in results:
+            response = ImageVectorResponse(
+                success=result.get("success", False),
+                image_id=result.get("image_id"),
+                vector=result.get("vector"),
+                error=result.get("error")
+            )
+            responses.append(response)
+        
+        return responses
+        
+    except Exception as e:
+        chat_logger.error(f"批量计算图片特征向量失败: {str(e)}")
+        raise HTTPException(status_code=500, detail=f"处理失败: {str(e)}")
+
+
+@router.post("/image/index/build", response_model=BuildIndexResponse)
+async def build_index_endpoint(request: BuildIndexRequest):
+    """构建索引及映射关系"""
+    try:
+        # 构建请求数据
+        image_vectors = []
+        for item in request.image_vectors:
+            image_vectors.append({
+                "image_id": item.image_id,
+                "vector": item.vector,
+                "image_name": item.image_name,
+                "image_path": item.image_path
+            })
+        
+        # 调用服务
+        indexed_count = await image_search_service.build_index(image_vectors)
+        
+        # 构建响应
+        response = BuildIndexResponse(
+            success=True,
+            indexed_count=indexed_count
+        )
+        
+        return response
+        
+    except Exception as e:
+        chat_logger.error(f"构建索引失败: {str(e)}")
+        raise HTTPException(status_code=500, detail=f"处理失败: {str(e)}")
+
+
+@router.post("/image/search", response_model=SearchResponse)
+async def search_endpoint(request: SearchRequest):
+    """搜索相似图片(支持以图搜图和以文搜图)"""
+    try:
+        import time
+        start_time = time.time()
+        
+        # 处理图片数据
+        image_bytes = None
+        if request.image:
+            try:
+                if "," in request.image:
+                    base64_str = request.image.split(",", 1)[1]
+                else:
+                    base64_str = request.image
+                
+                image_bytes = base64.b64decode(base64_str)
+                
+            except Exception as e:
+                chat_logger.error(f"图片解码失败: {e}")
+                raise HTTPException(400, f"图片格式错误: {str(e)}")
+        
+        # 调用服务
+        results = await image_search_service.search(
+            image_bytes=image_bytes,
+            text=request.text,
+            top_k=request.top_k
+        )
+        
+        # 计算处理时间
+        processing_time = time.time() - start_time
+        
+        # 构建响应
+        search_results = []
+        for result in results:
+            item = SearchResultItem(
+                image_id=result.get("image_id"),
+                similarity=result.get("similarity"),
+                image_name=result.get("image_name"),
+                image_path=result.get("image_path")
+            )
+            search_results.append(item)
+        
+        response = SearchResponse(
+            success=True,
+            results=search_results,
+            total_count=len(search_results),
+            processing_time=round(processing_time, 4)
+        )
+        
+        return response
+        
+    except HTTPException:
+        raise
+    except Exception as e:
+        chat_logger.error(f"搜索失败: {str(e)}")
+        raise HTTPException(status_code=500, detail=f"处理失败: {str(e)}")
+
+
+@router.get("/image/index/status")
+async def get_index_status_endpoint():
+    """获取索引状态"""
+    try:
+        status = await image_search_service.get_index_status()
+        return {
+            "success": True,
+            "status": status
+        }
+        
+    except Exception as e:
+        chat_logger.error(f"获取索引状态失败: {str(e)}")
+        raise HTTPException(status_code=500, detail=f"处理失败: {str(e)}")
+
+
+@router.post("/image/index/clear")
+async def clear_index_endpoint():
+    """清空索引"""
+    try:
+        await image_search_service.clear_index()
+        return {
+            "success": True,
+            "message": "索引已清空"
+        }
+        
+    except Exception as e:
+        chat_logger.error(f"清空索引失败: {str(e)}")
+        raise HTTPException(status_code=500, detail=f"处理失败: {str(e)}")

+ 306 - 0
core/image_search_service.py

@@ -0,0 +1,306 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+图片搜索服务
+功能:批量计算图片特征向量、构建索引、搜索相似图片
+"""
+
+import os
+import json
+import numpy as np
+import faiss
+import base64
+import time
+from typing import List, Dict, Optional, Tuple
+from utils.logger import chat_logger
+
+# 导入dashscope库,必须安装
+try:
+    import dashscope
+    from dashscope import MultiModalEmbedding
+    from http import HTTPStatus
+    DASHSCOPE_AVAILABLE = True
+except ImportError:
+    chat_logger.error("dashscope库未安装,请使用 pip install dashscope 安装")
+    raise ImportError("dashscope库未安装,请使用 pip install dashscope 安装")
+
+
+class ImageSearchService:
+    def __init__(self):
+        """初始化图片搜索服务"""
+        self.index = None  # FAISS索引
+        self.image_mapping = {}  # 图片ID到图片信息的映射
+        self.last_build_time = None  # 最后构建索引的时间
+        self.dimension = 768  # 默认特征向量维度(与tongyi-embedding-vision-plus模型一致)
+        
+        # 配置参数
+        self.config = {
+            "model_name": "tongyi-embedding-vision-plus",
+            "retry_count": 3,
+            "batch_size": 10
+        }
+        
+        chat_logger.info("图片搜索服务初始化完成")
+    
+    async def calculate_vector(self, image_bytes: Optional[bytes] = None, text: Optional[str] = None) -> Optional[List[float]]:
+        """计算图片或文本的特征向量
+        
+        Args:
+            image_bytes: 图片字节数据(可选)
+            text: 文本数据(可选)
+            
+        Returns:
+            特征向量列表,如果失败则返回None
+        """
+        try:
+            # 检查API Key
+            if not dashscope.api_key and 'DASHSCOPE_API_KEY' not in os.environ:
+                chat_logger.error("请配置 DASHSCOPE_API_KEY 环境变量")
+                return None
+            
+            # 构建input_data
+            input_data = []
+            if image_bytes:
+                # 以图搜图
+                image_base64 = base64.b64encode(image_bytes).decode('utf-8')
+                image_data = f"data:image/jpeg;base64,{image_base64}"
+                input_data.append({"image": image_data})
+            elif text:
+                # 以文搜图
+                input_data.append({"text": text})
+            else:
+                chat_logger.error("请提供图片或文本数据")
+                return None
+            
+            # 调用模型接口
+            for attempt in range(self.config["retry_count"]):
+                try:
+                    resp = dashscope.MultiModalEmbedding.call(
+                        model=self.config["model_name"],
+                        input=input_data
+                    )
+                    
+                    if resp.status_code == HTTPStatus.OK:
+                        embedding = resp.output["embeddings"][0]["embedding"]
+                        return embedding
+                    else:
+                        chat_logger.error(f"API调用失败: {resp.message} (状态码: {resp.status_code})")
+                        if attempt < self.config["retry_count"] - 1:
+                            chat_logger.info(f"{self.config['retry_count'] - attempt - 1}秒后重试...")
+                            time.sleep(1)
+                        else:
+                            return None
+                except Exception as e:
+                    chat_logger.error(f"处理数据时出错: {str(e)}")
+                    if attempt < self.config["retry_count"] - 1:
+                        chat_logger.info(f"{self.config['retry_count'] - attempt - 1}秒后重试...")
+                        time.sleep(1)
+                    else:
+                        return None
+                
+        except Exception as e:
+            chat_logger.error(f"计算特征向量失败: {str(e)}")
+            return None
+    
+    async def batch_calculate_vectors(self, image_items: List[Dict]) -> List[Dict]:
+        """批量计算图片特征向量
+        
+        Args:
+            image_items: 图片项列表,每个项包含image(base64编码)和image_id
+            
+        Returns:
+            包含特征向量的图片项列表
+        """
+        results = []
+        
+        for item in image_items:
+            try:
+                # 解码base64图片
+                if "," in item["image"]:
+                    base64_str = item["image"].split(",", 1)[1]
+                else:
+                    base64_str = item["image"]
+                
+                image_bytes = base64.b64decode(base64_str)
+                
+                # 计算特征向量
+                vector = await self.calculate_vector(image_bytes)
+                
+                if vector:
+                    results.append({
+                        "image_id": item.get("image_id"),
+                        "vector": vector,
+                        "success": True
+                    })
+                else:
+                    results.append({
+                        "image_id": item.get("image_id"),
+                        "vector": None,
+                        "success": False,
+                        "error": "计算特征向量失败"
+                    })
+                    
+            except Exception as e:
+                chat_logger.error(f"处理图片失败 (ID: {item.get('image_id')}): {str(e)}")
+                results.append({
+                    "image_id": item.get("image_id"),
+                    "vector": None,
+                    "success": False,
+                    "error": str(e)
+                })
+        
+        return results
+    
+    async def build_index(self, image_vectors: List[Dict]) -> int:
+        """构建索引及映射关系
+        
+        Args:
+            image_vectors: 图片向量列表,每个项包含image_id、vector、image_name、image_path等
+            
+        Returns:
+            索引的图片数量
+        """
+        try:
+            if not image_vectors:
+                chat_logger.warning("没有图片向量需要索引")
+                return 0
+            
+            # 提取向量和图片信息
+            vectors = []
+            self.image_mapping = {}
+            
+            for item in image_vectors:
+                if "vector" in item and item["vector"]:
+                    vectors.append(item["vector"])
+                    self.image_mapping[item["image_id"]] = {
+                        "image_name": item.get("image_name"),
+                        "image_path": item.get("image_path")
+                    }
+            
+            if not vectors:
+                chat_logger.warning("没有有效的向量数据")
+                return 0
+            
+            # 转换为numpy数组
+            vectors_np = np.array(vectors, dtype=np.float32)
+            
+            # 更新维度信息
+            self.dimension = vectors_np.shape[1]
+            
+            # 创建FAISS索引
+            self.index = faiss.IndexFlatIP(self.dimension)  # 使用内积相似度
+            
+            # 归一化向量
+            faiss.normalize_L2(vectors_np)
+            
+            # 添加向量到索引
+            self.index.add(vectors_np)
+            
+            self.last_build_time = time.strftime('%Y-%m-%d %H:%M:%S')
+            
+            indexed_count = len(vectors)
+            chat_logger.info(f"成功构建索引,索引了 {indexed_count} 个图片向量")
+            
+            return indexed_count
+            
+        except Exception as e:
+            chat_logger.error(f"构建索引失败: {str(e)}")
+            return 0
+    
+    async def search(self, image_bytes: Optional[bytes] = None, text: Optional[str] = None, 
+                    top_k: int = 10) -> List[Dict]:
+        """搜索相似图片
+        
+        Args:
+            image_bytes: 图片字节数据(以图搜图)
+            text: 文字描述(以文搜图)
+            top_k: 返回结果数量
+            
+        Returns:
+            搜索结果列表
+        """
+        try:
+            if self.index is None:
+                chat_logger.warning("索引未构建,请先构建索引")
+                return []
+            
+            # 计算查询向量
+            query_vector = await self.calculate_vector(image_bytes=image_bytes, text=text)
+            if not query_vector:
+                chat_logger.error("无法获取查询特征向量")
+                return []
+            
+            # 执行搜索
+            if query_vector:
+                # 转换为numpy数组并归一化
+                query_vector_np = np.array([query_vector], dtype=np.float32)
+                faiss.normalize_L2(query_vector_np)
+                
+                # 搜索
+                distances, indices = self.index.search(query_vector_np, top_k)
+                
+                # 构建结果
+                results = []
+                for i, (distance, idx) in enumerate(zip(distances[0], indices[0])):
+                    # 计算相似度(内积转换为0-1范围)
+                    similarity = float(distance)
+                    
+                    # 直接添加结果,不使用阈值过滤
+                    # 获取图片ID
+                    image_id = list(self.image_mapping.keys())[idx]
+                    image_info = self.image_mapping.get(image_id, {})
+                    
+                    results.append({
+                        "image_id": image_id,
+                        "similarity": similarity,
+                        "image_name": image_info.get("image_name"),
+                        "image_path": image_info.get("image_path")
+                    })
+                
+                chat_logger.info(f"搜索完成,找到 {len(results)} 个相似图片")
+                return results
+            else:
+                return []
+                
+        except Exception as e:
+            chat_logger.error(f"搜索失败: {str(e)}")
+            return []
+    
+    async def get_index_status(self) -> Dict:
+        """获取索引状态
+        
+        Returns:
+            索引状态信息
+        """
+        if self.index is None:
+            return {
+                "indexed_count": 0,
+                "index_size": "0KB",
+                "last_build_time": None,
+                "dimension": self.dimension
+            }
+        
+        indexed_count = self.index.ntotal
+        # 估算索引大小
+        index_size = f"{indexed_count * self.dimension * 4 / 1024:.2f}KB"  # 每个float32占4字节
+        
+        return {
+            "indexed_count": indexed_count,
+            "index_size": index_size,
+            "last_build_time": self.last_build_time,
+            "dimension": self.dimension
+        }
+    
+    async def clear_index(self):
+        """清空索引"""
+        try:
+            self.index = None
+            self.image_mapping = {}
+            self.last_build_time = None
+            chat_logger.info("索引已清空")
+        except Exception as e:
+            chat_logger.error(f"清空索引失败: {str(e)}")
+
+
+# 创建服务实例
+image_search_service = ImageSearchService()

BIN
requirements.txt


+ 426 - 0
tests/test_image_search_api.py

@@ -0,0 +1,426 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+测试图片搜索API
+包含以下测试用例:
+1. 批量生成向量,断言图1与图2的向量不同
+2. 再次生成向量,断言与第一步生成的向量一致
+3. 构建索引API和获取索引状态
+4. 以图搜图API,断言第一个结果是本图
+5. 以文本搜图,搜索"胡桃木方",断言第一个结果是3号图
+"""
+
+import os
+import base64
+import json
+import requests
+import numpy as np
+
+# 配置
+API_BASE_URL = "http://localhost:8888"
+IMAGE_DIR = "d:\\天翼云盘\\VS心流\\AI图片识别搜图修图理论与实践\\GME-Qwen2-VL-2B\\product_images_full"
+MAX_IMAGES = 10
+
+# API端点
+ENDPOINTS = {
+    "batch_vector": "/image/vector/batch",
+    "build_index": "/image/index/build",
+    "index_status": "/image/index/status",
+    "search": "/image/search"
+}
+
+
+class ImageSearchAPITest:
+    def __init__(self):
+        """初始化测试类"""
+        self.image_files = []
+        self.first_vectors = {}  # 第一次生成的向量
+        self.second_vectors = {}  # 第二次生成的向量
+        self.session = requests.Session()
+        self.session.timeout = 60
+    
+    def get_image_files(self, max_count=10):
+        """获取图片文件列表"""
+        image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.gif']
+        image_files = []
+        
+        for file in os.listdir(IMAGE_DIR):
+            if len(image_files) >= max_count:
+                break
+            file_ext = os.path.splitext(file)[1].lower()
+            if file_ext in image_extensions:
+                image_files.append(os.path.join(IMAGE_DIR, file))
+        
+        self.image_files = image_files
+        
+        # 输出文件列表供参考
+        print("获取到的图片文件列表:")
+        for i, img_file in enumerate(image_files):
+            print(f"{i+1}. {os.path.basename(img_file)}")
+            print(f"   路径: {img_file}")
+        
+        return image_files
+    
+    def image_to_base64(self, image_path):
+        """将图片转换为Base64编码"""
+        try:
+            with open(image_path, 'rb') as f:
+                image_bytes = f.read()
+            base64_str = base64.b64encode(image_bytes).decode('utf-8')
+            ext = os.path.splitext(image_path)[1].lower().replace('.', '')
+            data_url = f"data:image/{ext};base64,{base64_str}"
+            return data_url
+        except Exception as e:
+            print(f"处理图片失败 {image_path}: {e}")
+            return None
+    
+    def test_batch_vector_generation(self):
+        """测试1:批量生成向量,断言图1与图2的向量不同"""
+        print("=== 测试1: 批量生成向量 ===")
+        
+        # 获取图片文件
+        if not self.image_files:
+            self.get_image_files(MAX_IMAGES)
+        
+        # 构建请求体
+        requests_list = []
+        for i, img_file in enumerate(self.image_files):
+            base64_data = self.image_to_base64(img_file)
+            if base64_data:
+                request_item = {
+                    "image": base64_data,
+                    "image_id": str(i+1)
+                }
+                requests_list.append(request_item)
+        
+        # 发送请求
+        url = f"{API_BASE_URL}{ENDPOINTS['batch_vector']}"
+        response = self.session.post(
+            url,
+            json=requests_list,
+            headers={"Content-Type": "application/json"}
+        )
+        
+        assert response.status_code == 200, f"请求失败,状态码: {response.status_code}"
+        
+        # 解析响应
+        result = response.json()
+        assert len(result) == len(requests_list), f"响应数量不匹配: {len(result)} != {len(requests_list)}"
+        
+        # 存储向量
+        for i, item in enumerate(result):
+            assert item.get('success') is True, f"图片 {i+1} 向量生成失败: {item.get('error')}"
+            image_id = item.get('image_id')
+            vector = item.get('vector')
+            assert vector is not None, f"图片 {image_id} 没有返回向量"
+            self.first_vectors[image_id] = vector
+        
+        # 断言图1与图2的向量不同
+        assert '1' in self.first_vectors, "图1向量不存在"
+        assert '2' in self.first_vectors, "图2向量不存在"
+        vector1 = self.first_vectors['1']
+        vector2 = self.first_vectors['2']
+        assert vector1 != vector2, "图1与图2的向量相同,不符合预期"
+        
+        print(f"✓ 成功生成 {len(self.first_vectors)} 个向量")
+        print(f"✓ 图1与图2的向量不同")
+    
+    def test_vector_consistency(self):
+        """测试2:再次生成向量,断言与第一步生成的向量一致"""
+        print("\n=== 测试2: 向量一致性测试 ===")
+        
+        # 构建请求体
+        requests_list = []
+        for i, img_file in enumerate(self.image_files):
+            base64_data = self.image_to_base64(img_file)
+            if base64_data:
+                request_item = {
+                    "image": base64_data,
+                    "image_id": str(i+1)
+                }
+                requests_list.append(request_item)
+        
+        # 发送请求
+        url = f"{API_BASE_URL}{ENDPOINTS['batch_vector']}"
+        response = self.session.post(
+            url,
+            json=requests_list,
+            headers={"Content-Type": "application/json"}
+        )
+        
+        assert response.status_code == 200, f"请求失败,状态码: {response.status_code}"
+        
+        # 解析响应
+        result = response.json()
+        assert len(result) == len(requests_list), f"响应数量不匹配: {len(result)} != {len(requests_list)}"
+        
+        # 存储向量
+        for i, item in enumerate(result):
+            assert item.get('success') is True, f"图片 {i+1} 向量生成失败: {item.get('error')}"
+            image_id = item.get('image_id')
+            vector = item.get('vector')
+            assert vector is not None, f"图片 {image_id} 没有返回向量"
+            self.second_vectors[image_id] = vector
+        
+        # 断言向量一致性
+        for image_id, first_vector in self.first_vectors.items():
+            assert image_id in self.second_vectors, f"图片 {image_id} 第二次向量不存在"
+            second_vector = self.second_vectors[image_id]
+            
+            # 计算向量相似度
+            similarity = np.dot(first_vector, second_vector) / (
+                np.linalg.norm(first_vector) * np.linalg.norm(second_vector)
+            )
+            
+            # 断言向量接近(相似度大于0.99)
+            assert similarity > 0.99, f"图片 {image_id} 向量不一致,相似度: {similarity:.4f}"
+        
+        print(f"✓ 成功验证 {len(self.second_vectors)} 个向量的一致性")
+        print(f"✓ 所有图片的向量与第一次生成的向量一致")
+    
+    def test_build_index(self):
+        """测试3:构建索引API和获取索引状态"""
+        print("\n=== 测试3: 构建索引测试 ===")
+        
+        # 第一次构建索引(使用first_vectors)
+        image_vectors_first = []
+        for i, img_file in enumerate(self.image_files):
+            image_id = str(i+1)
+            if image_id in self.first_vectors:
+                vector_item = {
+                    "image_id": image_id,
+                    "vector": self.first_vectors[image_id],
+                    "image_name": os.path.basename(img_file),
+                    "image_path": img_file
+                }
+                image_vectors_first.append(vector_item)
+        
+        # 发送第一次构建索引请求
+        url = f"{API_BASE_URL}{ENDPOINTS['build_index']}"
+        response = self.session.post(
+            url,
+            json={"image_vectors": image_vectors_first},
+            headers={"Content-Type": "application/json"}
+        )
+        
+        assert response.status_code == 200, f"构建索引失败,状态码: {response.status_code}"
+        
+        # 解析响应
+        result = response.json()
+        assert result.get('success') is True, f"构建索引失败: {result.get('error')}"
+        indexed_count_first = result.get('indexed_count')
+        assert indexed_count_first == len(image_vectors_first), f"索引数量不匹配: {indexed_count_first} != {len(image_vectors_first)}"
+        
+        # 获取索引状态
+        url = f"{API_BASE_URL}{ENDPOINTS['index_status']}"
+        response = self.session.get(url)
+        
+        assert response.status_code == 200, f"获取索引状态失败,状态码: {response.status_code}"
+        
+        # 解析响应
+        status_result = response.json()
+        assert status_result.get('success') is True, f"获取索引状态失败"
+        status = status_result.get('status')
+        assert status is not None, "索引状态不存在"
+        assert status.get('indexed_count') == indexed_count_first, f"索引状态数量不匹配"
+        
+        print(f"✓ 成功构建索引,索引了 {indexed_count_first} 个图片")
+        print(f"✓ 索引状态正确: {status}")
+        
+        # 第二次构建索引(使用second_vectors,应该是替代构建不是累加构建)
+        image_vectors_second = []
+        for i, img_file in enumerate(self.image_files):
+            image_id = str(i+1)
+            if image_id in self.second_vectors:
+                vector_item = {
+                    "image_id": image_id,
+                    "vector": self.second_vectors[image_id],
+                    "image_name": os.path.basename(img_file),
+                    "image_path": img_file
+                }
+                image_vectors_second.append(vector_item)
+        
+        # 发送第二次构建索引请求(使用正确的build_index URL)
+        url = f"{API_BASE_URL}{ENDPOINTS['build_index']}"
+        response = self.session.post(
+            url,
+            json={"image_vectors": image_vectors_second},
+            headers={"Content-Type": "application/json"}
+        )
+        
+        assert response.status_code == 200, f"第二次构建索引失败,状态码: {response.status_code}"
+        
+        # 解析响应
+        result_second = response.json()
+        assert result_second.get('success') is True, f"第二次构建索引失败: {result_second.get('error')}"
+        indexed_count_second = result_second.get('indexed_count')
+        assert indexed_count_second == len(image_vectors_second), f"索引数量不匹配: {indexed_count_second} != {len(image_vectors_second)}"
+        
+        # 获取索引状态(使用正确的index_status URL)
+        url = f"{API_BASE_URL}{ENDPOINTS['index_status']}"
+        response = self.session.get(url)
+        
+        assert response.status_code == 200, f"获取索引状态失败,状态码: {response.status_code}"
+        
+        # 解析响应
+        status_result_second = response.json()
+        assert status_result_second.get('success') is True, f"获取索引状态失败"
+        status_second = status_result_second.get('status')
+        assert status_second is not None, "索引状态不存在"
+        assert status_second.get('indexed_count') == indexed_count_second, f"索引状态数量不匹配"
+        
+        # 断言是替代构建不是累加构建(索引数量应该等于第二次的数量,而不是两次的总和)
+        assert indexed_count_second == len(image_vectors_second), f"索引数量应该等于第二次构建的数量,而不是累加"
+        assert indexed_count_second != indexed_count_first + len(image_vectors_second), f"索引不应该是累加构建"
+        
+        print(f"✓ 成功进行替代构建,索引了 {indexed_count_second} 个图片")
+        print(f"✓ 索引状态正确: {status_second}")
+    
+    def test_image_search(self):
+        """测试4:以图搜图API,断言第一个结果是本图"""
+        print("\n=== 测试4: 以图搜图测试 ===")
+        
+        # 测试前3张图片
+        test_count = min(3, len(self.image_files))
+        
+        for i in range(test_count):
+            img_file = self.image_files[i]
+            image_id = str(i+1)
+            
+            # 构建搜索请求
+            base64_data = self.image_to_base64(img_file)
+            assert base64_data is not None, f"图片 {image_id} 转换失败"
+            
+            search_request = {
+                "image": base64_data,
+                "top_k": 3
+            }
+            
+            # 发送搜索请求
+            url = f"{API_BASE_URL}{ENDPOINTS['search']}"
+            response = self.session.post(
+                url,
+                json=search_request,
+                headers={"Content-Type": "application/json"}
+            )
+            
+            assert response.status_code == 200, f"搜索失败,状态码: {response.status_code}"
+            
+            # 解析响应
+            result = response.json()
+            assert result.get('success') is True, f"搜索失败: {result.get('error')}"
+            results = result.get('results', [])
+            assert len(results) > 0, "搜索结果为空"
+            
+            # 断言第一个结果是本图
+            top_result = results[0]
+            assert top_result.get('image_id') == image_id, f"图片 {image_id} 搜索结果第一个不是本图,而是 {top_result.get('image_id')}"
+            
+            print(f"✓ 图片 {image_id} 搜索成功,第一个结果是本图")
+    
+    def test_text_search(self):
+        """测试5:以文本搜图,搜索"胡桃木方",断言第一个结果是3号图"""
+        print("\n=== 测试5: 以文搜图测试 ===")
+        
+        # 构建搜索请求(降低阈值以提高成功率)
+        search_request = {
+            "text": "胡桃木方",
+            "top_k": 3
+        }
+        
+        # 发送搜索请求
+        url = f"{API_BASE_URL}{ENDPOINTS['search']}"
+        response = self.session.post(
+            url,
+            json=search_request,
+            headers={"Content-Type": "application/json"}
+        )
+        
+        assert response.status_code == 200, f"搜索失败,状态码: {response.status_code}"
+        
+        # 解析响应
+        result = response.json()
+        assert result.get('success') is True, f"搜索失败: {result.get('error')}"
+        results = result.get('results', [])
+        
+        if len(results) == 0:
+            print("⚠️  搜索结果为空,可能是因为没有配置 DASHSCOPE_API_KEY 或文本与图片不匹配")
+            print("⚠️  跳过文本搜索测试")
+            return
+        
+        # 输出搜索结果,供人工检查
+        print(f"✓ 文本搜索'胡桃木方'成功")
+        print(f"搜索结果(前3个):")
+        for i, item in enumerate(results[:3]):
+            print(f"  {i+1}. 图片ID: {item.get('image_id')}, 相似度: {item.get('similarity'):.4f}, 图片名称: {item.get('image_name')}")
+    
+    def test_clear_index(self):
+        """测试6:清除索引测试"""
+        print("\n=== 测试6: 清除索引测试 ===")
+        
+        # 获取清除索引的端点(假设是 /image/index/clear)
+        clear_index_endpoint = "/image/index/clear"
+        url = f"{API_BASE_URL}{clear_index_endpoint}"
+        
+        # 发送清除索引请求
+        response = self.session.post(
+            url,
+            headers={"Content-Type": "application/json"}
+        )
+        
+        assert response.status_code == 200, f"清除索引失败,状态码: {response.status_code}"
+        
+        # 解析响应
+        result = response.json()
+        assert result.get('success') is True, f"清除索引失败: {result.get('error')}"
+        
+        # 获取索引状态,验证索引已被清除
+        url = f"{API_BASE_URL}{ENDPOINTS['index_status']}"
+        response = self.session.get(url)
+        
+        assert response.status_code == 200, f"获取索引状态失败,状态码: {response.status_code}"
+        
+        # 解析响应
+        status_result = response.json()
+        assert status_result.get('success') is True, f"获取索引状态失败"
+        status = status_result.get('status')
+        assert status is not None, "索引状态不存在"
+        assert status.get('indexed_count') == 0, f"索引未被清除,索引数量: {status.get('indexed_count')}"
+        
+        print(f"✓ 成功清除索引")
+        print(f"✓ 索引状态正确: {status}")
+    
+    def run_all_tests(self):
+        """运行所有测试"""
+        print("开始图片搜索API测试...")
+        print(f"API基础地址: {API_BASE_URL}")
+        print(f"测试图片目录: {IMAGE_DIR}")
+        print(f"测试图片数量: {MAX_IMAGES}")
+        print("=" * 60)
+        
+        try:
+            self.test_batch_vector_generation()
+            self.test_vector_consistency()
+            self.test_build_index()
+            self.test_image_search()
+            self.test_text_search()
+            self.test_clear_index()
+            
+            print("\n" + "=" * 60)
+            print("🎉 所有测试通过!")
+            print("=" * 60)
+            
+        except Exception as e:
+            print(f"\n❌ 测试失败: {e}")
+            raise
+
+
+if __name__ == "__main__":
+    print("测试脚本已加载,等待运行指令...")
+    print("请使用以下命令运行测试:")
+    print("python tests/test_image_search_api.py")
+    print("\n或者在Python交互式环境中运行:")
+    print("from tests.test_image_search_api import ImageSearchAPITest")
+    print("test = ImageSearchAPITest()")
+    print("test.run_all_tests()")
+    print("\n注意:本脚本不会自动运行测试,需要手动调用 run_all_tests() 方法")