diff --git a/data_chain/apps/service/chunk_service.py b/data_chain/apps/service/chunk_service.py index a1c352de77b4287724c7a14b224edcceb29623f5..3e2896f278bba0e185184d98ae8663a24bb323fb 100644 --- a/data_chain/apps/service/chunk_service.py +++ b/data_chain/apps/service/chunk_service.py @@ -4,6 +4,7 @@ import random import time import jieba import traceback +import asyncio from data_chain.logger.logger import logger as logging from data_chain.apps.service.llm_service import get_question_chunk_relation from data_chain.models.constant import ChunkRelevance @@ -155,14 +156,24 @@ async def get_similar_chunks(content, kb_id=None, temporary_document_ids=None, m st = time.time() target_vector = await Vectorize.vectorize_embedding(content) logging.info(f"向量化耗时: {time.time()-st}") + retry_times = 3 if target_vector is not None: st = time.time() if temporary_document_ids: - chunk_id_list = await TemporaryVectorItemsManager.find_top_k_similar_temporary_vectors( - target_vector, - temporary_document_ids, - topk-len(chunk_tuple_list) - ) + chunk_id_list=[] + for i in range(retry_times): + try: + chunk_id_list = await asyncio.wait_for(TemporaryVectorItemsManager.find_top_k_similar_temporary_vectors( + target_vector, + temporary_document_ids, + topk-len(chunk_tuple_list) + ), + timeout=1 + ) + break + except Exception as e: + logging.error(f"检索临时向量时出错: {e}") + continue chunk_entity_list = await TemporaryChunkManager.select_by_temporary_chunk_ids(chunk_id_list) elif kb_id: kb_entity = await KnowledgeBaseManager.select_by_id(kb_id) @@ -172,7 +183,14 @@ async def get_similar_chunks(content, kb_id=None, temporary_document_ids=None, m vector_items_id = kb_entity.vector_items_id dim = embedding_model_out_dimensions[embedding_model] vector_items_table = await PostgresDB.get_dynamic_vector_items_table(vector_items_id, dim) - chunk_id_list = await VectorItemsManager.find_top_k_similar_vectors(vector_items_table, target_vector, kb_id, topk-len(chunk_tuple_list)) + chunk_id_list=[] + for i in range(retry_times): + try: + chunk_id_list = await asyncio.wait_for(VectorItemsManager.find_top_k_similar_vectors(vector_items_table, target_vector, kb_id, topk-len(chunk_tuple_list)),timeout=1) + break + except Exception as e: + logging.error(f"检索向量时出错: {e}") + continue chunk_entity_list = await ChunkManager.select_by_chunk_ids(chunk_id_list) logging.info(f"向量化检索耗时: {time.time()-st}") st = time.time() diff --git a/data_chain/manager/vector_items_manager.py b/data_chain/manager/vector_items_manager.py index 22f4a4c61b40bde15a26f446a1760ef27045f45e..a602501ece69e95bde9b7110eb9b6688cd67d793 100644 --- a/data_chain/manager/vector_items_manager.py +++ b/data_chain/manager/vector_items_manager.py @@ -125,7 +125,6 @@ class VectorItemsManager: f"WHERE v.kb_id = :kb_id AND v.enabled = true AND document.enabled = true " f"ORDER BY v.vector <=> :target_vector " f"LIMIT :topk") - # 获取会话并执行查询 async with await PostgresDB.get_session() as session: # 使用execute执行原始SQL语句,并传递参数 result = await session.execute(