o
    Ղit                     @   s
  d Z ddlmZmZmZmZmZ ddlZddlZddl	Z	ddl
mZ ddlmZmZmZmZmZmZmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ee Z!G dd deZ"G dd dZ#					ddedee$ de%de%de&dee$ de#fddZ'dS )ur   
Milvus 向量存储服务（支持分区管理）
基于 Milvus 实现向量存储和检索，支持分区优化
    )ListDictOptionalAnyTupleN)Session)connections
CollectionFieldSchemaCollectionSchemaDataTypeutility	Partition)Document)
Embeddings)KnowledgeDocument)get_embedding_factory)VectorStoreConnectionErrorc                   @   s\   e Zd ZdZddedee fddZdee	 deee
  fd	d
Zde	dee
 fddZdS )CustomEmbeddingsu<   自定义嵌入类，适配现有的多提供商嵌入服务Ndbmodel_idc                 C   s   || _ || _t | _dS )ux   
        初始化自定义嵌入

        Args:
            db: 数据库会话
            model_id: 模型ID
        N)r   r   r   embedding_factory)selfr   r    r   C/lsinfo/ai/hellotax_ai/base_platform/app/services/storage/milvus.py__init__   s   zCustomEmbeddings.__init__textsreturnc                 C   s\   g }|D ]'}| j j|| j| jd}|r|| q|r!t|d nd}|dg|  q|S )u   嵌入文档列表textr   r   r              )r   generate_embeddingr   r   appendlen)r   r   
embeddingsr   	embeddingdimr   r   r   embed_documents%   s   z CustomEmbeddings.embed_documentsr   c                 C   s(   | j j|| j| jd}|r|S dgd S )u   嵌入查询文本r   r!   r    )r   r"   r   r   )r   r   r&   r   r   r   embed_query6   s   zCustomEmbeddings.embed_queryN)__name__
__module____qualname____doc__r   r   intr   r   strfloatr(   r)   r   r   r   r   r      s
    r   c                   @   s  e Zd ZdZ						d7dedee d	ed
edededee fddZ	dedee defddZ
dd Zdd ZdefddZ		d8dedeeeef  deee  dedeee  deee  dee fdd Z	!	"	d9d#ee d$ed%ed&ee deeeef  f
d'd(Z	!	"	)	"d:d*ed#ee d$ed%ed+ed,edeeeef  fd-d.Zdedefd/d0Zdeeef fd1d2Zdee fd3d4Zd5d6 ZdS );MilvusVectorStoreu0   Milvus向量存储服务（支持分区管理）Ndocument_vectors	localhost19530Tr   knowledge_base_idcollection_namehostportuse_partition	tenant_idc           	   
   C   s   || _ || _|| _|| _|| _| ||| _z!dt v r#t	
d ntjd||d t	
d| d|  W n tyN } zt	d|  t d}~ww |   | jr_| jra|   dS dS dS )u  
        初始化Milvus向量存储

        Args:
            db: 数据库会话
            knowledge_base_id: 知识库ID（用于过滤和分区）
            collection_name: 集合名称
            host: Milvus服务器地址
            port: Milvus端口
            use_partition: 是否使用分区（提升查询性能）
            tenant_id: 租户ID（用于多租户隔离）
        defaultz Using existing Milvus connection)aliasr8   r9   z"Successfully connected to Milvus: :zFailed to connect to Milvus: N)r   r6   r7   r:   r;   _get_embedding_dimensionr'   r   list_connectionsloggerinfoconnect	Exceptionerrorr   _ensure_collection_ensure_partition)	r   r   r6   r7   r8   r9   r:   r;   er   r   r   r   C   s2   zMilvusVectorStore.__init__r   c              
   C   s2  z||st d W dS ddlm} ddlm} |||j|k	 }|r)|j
s5t d| d W dS |j
 rK|||jt|j
k	 }n|||j
|j
k	 }|rp|jrpt d|j d	|j d
 |jW S t d|j
 d W dS  ty } zt d| d W Y d}~dS d}~ww )u   
        动态获取向量维度

        Args:
            db: 数据库会话
            knowledge_base_id: 知识库ID

        Returns:
            向量维度，默认 1536
        z<No knowledge_base_id provided, using default dimension: 1536r    r   )KnowledgeBase)ModelzKnowledge base z> not found or has no model code, using default dimension: 1536z%Using model dimension from database: z	 (model: )z#Model dimension not found for code z, using default dimension: 1536z#Failed to get embedding dimension: z, using default: 1536N)rA   rB   
app.modelsrI   app.models.providerrJ   queryfilteridfirstcodewarningisdigitr/   	dimensionnamerD   rE   )r   r   r6   rI   rJ   kbmodelrH   r   r   r   r?   y   s.   


"
z*MilvusVectorStore._get_embedding_dimensionc                 C   s  zt | jrt| j| _td| j  ntdtj	dddtdtj	dtdtj	dtdtj
d	d
tdtj| jdtdtj
dd
tdtj	dtdtj	dtdtj
dd
tdtj
dd
tdtjdtdtj
dd
tdtj
dd
tdtj
dd
tdtj	dg}t|dd}t| j|d| _dddd id!}| jjd|d" td#| j  | j  W d%S  ty } z	td$|   d%}~ww )&u-   确保集合存在，如果不存在则创建zUsing existing collection: rP   T)rV   dtype
is_primaryauto_iddocument_id)rV   rY   chunk_index
chunk_text  )rV   rY   
max_lengthvector)rV   rY   r'   
model_named   r6   r;   chunk_idparent_chunk_id	is_parentchunk_level   
references  
doc_statusissue_date_intz:Document vectors for knowledge base with partition support)fieldsdescription)rV   schemaCOSINEIVF_FLATnlisti   )metric_type
index_typeparams)
field_nameindex_paramszCreated new collection: z$Failed to ensure collection exists: N)r   has_collectionr7   r	   
collectionrA   rB   r
   r   INT64VARCHARFLOAT_VECTORr'   BOOLr   create_indexloadrD   rE   )r   rm   ro   rw   rH   r   r   r   rF      sV   z$MilvusVectorStore._ensure_collectionc              
   C   s   z)d| j  }| j|s| j| td|  ntd|  || _W dS  tyJ } ztd|  d| _	d| _W Y d}~dS d}~ww )u   确保分区存在kb_zCreated partition: zUsing existing partition: z#Failed to ensure partition exists: FN)
r6   ry   has_partitioncreate_partitionrA   rB   partition_namerD   rE   r:   r   r   rH   r   r   r   rG      s   z#MilvusVectorStore._ensure_partitionc              
   C   s   | j s
td dS d| j  }z(| j|r-| j  | j| td|  W dS td| d W dS  tyP } zt	d| d	|   d
}~ww )u'  
        删除当前知识库对应的 Milvus 分区（kb_{knowledge_base_id}）

        比逐文档调用 delete_document_vectors() 高效得多，一次性删除分区内所有向量。

        Returns:
            True 表示成功，False 表示分区不存在（视为成功）
        z9drop_partition called without knowledge_base_id, skippingFr   zDropped Milvus partition: TzMilvus partition z does not exist, skipping dropz Failed to drop Milvus partition z: N)
r6   rA   rS   ry   r   releasedrop_partitionrB   rD   rE   r   r   r   r   r      s"   	

z MilvusVectorStore.drop_partitionr\   chunksr%   rb   doc_status_listissue_date_int_listc                 C   sP  zg }g }g }	g }
g }|D ]e}| dtt }| d}|du r%d}| dd}| dd}| d	g }t|trF|rCt|nd}n|rLt|nd}|| || |	| |
| ||	d
dd j
d
dd qdd | jjjD }|gt| dd |D dd |D ||gt| | jpdgt| | jpdgt| |||	|
|g}d|v r||dur|ndgt|  d|v r||dur|ndgt|  | jr| jr| jj|| jd}tdt| d| j  n| j|}tdt| d|  | j  |jW S  ty' } z	td|   d}~ww )uZ  
        添加文档向量到Milvus（支持分区）

        Args:
            document_id: 文档ID
            chunks: 文档块列表 [{text, metadata, chunk_index, chunk_id, parent_chunk_id, is_parent, chunk_level, references}]
            embeddings: 对应的嵌入向量列表
            model_name: 向量模型名称
            doc_status_list: 每个 chunk 对应的文档状态（若为 None 则全部填 "effective"）
            issue_date_int_list: 每个 chunk 对应的 YYYYMMDD 整数（若为 None 则全部填 0）

        Returns:
            插入的向量ID列表
        rd   re   N rf   Frg   leafri   utf-8rj   ignoreerrorsc                 S   s   h | ]}|j qS r   rV   .0fr   r   r   	<setcomp>A      z2MilvusVectorStore.add_documents.<locals>.<setcomp>c                 S   s   g | ]}|d  qS )r]   r   r   chunkr   r   r   
<listcomp>D  s    z3MilvusVectorStore.add_documents.<locals>.<listcomp>c                 S   s,   g | ]}|d   ddd jdddqS )r   r   Nr_   r   r   )encodedecoder   r   r   r   r   E  s   , r   rk   	effectiverl   )r   zSuccessfully added z vectors to partition z vectors to document z Failed to add document vectors: )getr0   uuiduuid4
isinstancelistjsondumpsr#   r   r   ry   ro   rm   r$   r6   r;   r:   r   insertrA   rB   flushprimary_keysrD   rE   )r   r\   r   r%   rb   r   r   	chunk_idsparent_chunk_ids
is_parentschunk_levelsreferences_listr   rd   re   rf   rg   ri   references_strexisting_fieldsentitiesinsert_resultrH   r   r   r   add_documents  sn   





$  
zMilvusVectorStore.add_documents   ffffff?query_embeddingk	thresholdfilter_dictc                 C   s  zg }| j r|d| j   | jr|d| j  |r[| D ]8\}}|dv r+q"t|tr?|| dt|   q"t|trP|| d| d q"|| d|  q"zdd | jj	j
D }d	|v ro|d
 W n	 tyy   Y nw |r|dr|d|d   |dr|d|d   |rd|nd}	dddid}
|dkr|n|d }g d}|gd|
||	r|	nd|d}| jr| jr| jg|d< td| j  | jjd/i |}g }|D ]}|D ]}t|j}||kr| jttj|jdktjdktjdktjdk }|du rq|jd p$d}z|r.t |ng }W n tj!t"fy@   g }Y nw |j|jd|jd!|jd"|jd#|r]|j#nd$|rd|j$nd|rk|j%nd|jd%ptd|jd&p|d|jd'pd(|jd)pd*|d+}|||f t&||kr nqt&||kr nqt'd,t&| d- |d| W S  ty } z	t(d.|   d}~ww )0uI  
        相似度搜索（支持分区加速）

        Args:
            query_embedding: 查询向量
            k: 返回结果数量
            threshold: 相似度阈值 (0-1)
            filter_dict: 过滤条件 (支持 is_parent 等字段)

        Returns:
            [(结果字典, 相似度分数), ...]
        ztenant_id == zknowledge_base_id == )issue_date_gteissue_date_ltez == z == ""c                 S   s   g | ]}|j qS r   r   r   r   r   r   r     r   z7MilvusVectorStore.similarity_search.<locals>.<listcomp>rk   z)doc_status not in ["obsolete", "expired"]r   zissue_date_int >= r   zissue_date_int <= z && r   rp   nprobe
   )rs   ru      )	r\   r]   r^   rb   rd   re   rf   rg   ri   ra   N)data
anns_fieldparamlimitexproutput_fieldspartition_nameszSearching in partition r\   	publishedTobsoleteri   r]   r^   rb   Unknownrd   re   rf   Frg   r   )rP   r\   r]   r   rb   titlecategory_idreference_urlrd   re   rf   rg   ri   z#Similarity search completed, found  resultszSimilarity search failed: r   ))r;   r#   r6   itemsr   boolr0   lowerry   ro   rm   rD   r   joinr:   r   rA   debugsearchr1   scorer   rN   r   rO   rP   entitystatusis_vectorizedrk   rQ   r   loadsJSONDecodeError	TypeErrorr   r   r   r$   rB   rE   )r   r   r   r   r   
conditionskeyvaluefield_namesr   search_paramssearch_limitr   search_kwargsresultsformatted_resultshitshitr   docr   ri   doc_dictrH   r   r   r   similarity_searchi  s   





	






z#MilvusVectorStore.similarity_search333333?rN   keyword_weightsemantic_weightc              
   C   s@  z|dkr|n|d }| j |||d d}ddlm}	 dg}
||d}| jr1|
d	 | j|d
< | jdur@|
d | j|d< |
d |
d |
d |	dd|
 d}t| j	||}t
dd |D dd}i }|D ]\}}|d |d f}||dd||< qr|D ]1}|j|jf}t|j}||vr|j|j|j|j|j|j|j|jdd|d||< q||| d< qg }| D ])}|d }|dkr|d dkr|d | nd}|| ||  }||d |f q|jdd  d!d" td#t|d|  d$ |d| W S  ty } z	td%|   d}~ww )&up  
        混合搜索（关键词 + 语义）

        Args:
            query: 查询文本
            query_embedding: 查询向量
            k: 返回结果数量
            threshold: 相似度阈值
            keyword_weight: 关键词权重
            semantic_weight: 语义权重

        Returns:
            [(结果字典, 综合分数), ...]
        r   r   g?)r   r   r   r   )r   zdv.id IN (SELECT id FROM document_vectors WHERE to_tsvector('simple', COALESCE(chunk_text, '')) @@ plainto_tsquery('simple', :query)))rN   r   zkd.tenant_id = :tenant_idr;   Nz)kc.knowledge_base_id = :knowledge_base_idr6   zkd.status = 'published'zkd.is_vectorized = truez6(kd.doc_status IS NULL OR kd.doc_status != 'obsolete')a$  
                SELECT
                    dv.id,
                    dv.document_id,
                    dv.chunk_index,
                    dv.chunk_text,
                    dv.model_name,
                    kd.title,
                    kd.category_id,
                    kd.reference_url,
                    ts_rank(
                        setweight(to_tsvector('simple', COALESCE(kd.title, '')), 'A') ||
                        setweight(to_tsvector('simple', COALESCE(dv.chunk_text, '')), 'B'),
                        plainto_tsquery('simple', :query)
                    ) as rank
                FROM document_vectors dv
                JOIN knowledge_documents kd ON dv.document_id = kd.id
                LEFT JOIN knowledge_categories kc ON kd.category_id = kc.id
                WHERE z AND zI
                ORDER BY rank DESC
                LIMIT :k
            c                 s   s    | ]}t |jV  qd S r*   )r1   rank)r   rowr   r   r   	<genexpr>K  s    z2MilvusVectorStore.hybrid_search.<locals>.<genexpr>r!   )r<   r\   r]   )r   semantic_scorekeyword_score)rP   r\   r]   r   rb   r   r   r   r   r   r   c                 S   s   | d S )N   r   )xr   r   r   <lambda>  s    z1MilvusVectorStore.hybrid_search.<locals>.<lambda>T)r   reversezHybrid search completed, found r   zHybrid search failed: )r   
sqlalchemyr   r;   r#   r6   r   r   r   executemaxr\   r]   r1   r   rP   r^   rb   r   r   r   valuessortrA   rB   r$   rD   rE   )r   rN   r   r   r   r   r   search_ksemantic_resultssql_textwhere_clausesru   keyword_querykeyword_rowsmax_keyword_rankcombined_resultsr   r   
result_keyr   r   final_resultsr   semantic_normkeyword_normcombined_scorerH   r   r   r   hybrid_search  s   











zMilvusVectorStore.hybrid_searchc              
   C   sb   zd| }| j | | j   td|  W dS  ty0 } z	td|   d}~ww )u   
        删除文档的所有向量

        Args:
            document_id: 文档ID

        Returns:
            删除的向量数量
        zdocument_id == zDeleted vectors for document r   z#Failed to delete document vectors: N)ry   deleter   rA   rB   rD   rE   )r   r\   r   rH   r   r   r   delete_document_vectors  s   


z)MilvusVectorStore.delete_document_vectorsc              
   C   s   z$| j j| j| jd}| jr"| jr"t| j | j}| j|d< |j|d< |W S  tyF } ztd|  d| j| jdW  Y d}~S d}~ww )u[   
        获取集合统计信息

        Returns:
            统计信息字典
        )total_vectorsr7   r:   r   partition_vectorszFailed to get statistics: r   N)	ry   num_entitiesr7   r:   r   r   rD   rA   rE   )r   stats	partitionrH   r   r   r   get_collection_stats  s$   

z&MilvusVectorStore.get_collection_statsc              
   C   sD   z| j jW S  ty! } ztd|  g W  Y d}~S d}~ww )u   列出所有分区zFailed to list partitions: N)ry   
partitionsrD   rA   rE   r   rH   r   r   r   list_partitions  s   
z!MilvusVectorStore.list_partitionsc              
   C   st   z| j r| jr| j  td| j d W dS W dS W dS  ty9 } ztd|  W Y d}~dS d}~ww )u   压缩分区（优化存储）z
Partition z compaction completedzFailed to compact partition: N)r:   r   ry   compactrA   rB   rD   rE   r  r   r   r   compact_partition  s   
z#MilvusVectorStore.compact_partition)Nr3   r4   r5   TN)NN)r   r   N)r   r   r   r   )r+   r,   r-   r.   r   r   r/   r0   r   r   r?   rF   rG   r   r   r   r   r1   r   r   r   r  r  r  r  r  r   r   r   r   r2   @   s    
6):"



a
 
 r2   r4   r5   Tr   r6   r8   r9   r:   r;   r   c                 C   s.   |du rddl m} | }t| |||||dS )uJ  
    获取Milvus向量存储实例

    Args:
        db: 数据库会话
        knowledge_base_id: 知识库ID
        host: Milvus服务器地址
        port: Milvus端口
        use_partition: 是否使用分区优化
        tenant_id: 租户ID（用于多租户隔离）

    Returns:
        MilvusVectorStore实例
    Nr   )get_current_tenant_id)r8   r9   r:   r;   )app.core.tenant_contextr  r2   )r   r6   r8   r9   r:   r;   r  r   r   r   get_vector_store  s   r  )Nr4   r5   TN)(r.   typingr   r   r   r   r   loggingr   r   sqlalchemy.ormr   pymilvusr   r	   r
   r   r   r   r   langchain_core.documentsr   LangChainDocumentlangchain_core.embeddingsr   rL   r   3app.services.llm.backends.embedding_backend_factoryr   app.core.exceptionsr   	getLoggerr+   rA   r   r2   r/   r0   r   r  r   r   r   r   <module>   sP    $
*     