o
    Ղi >                    @   s  d Z ddlZddlZddlmZ ddlmZ ddlmZmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlm Z  ddl!m"Z" ddl#m$Z$ ddl%m&Z& e& Z'G dd de
Z(de)de*de)defddZ+defddZ,de)d e-fd!d"Z.d#e-de/fd$d%Z0d&e*d'e1ddfd(d)Z2	*	dxd+e d,e1e* d-e*d.e-dB de*f
d/d0Z3ej4d1e(d2d3d4dyde)de*de)d7e/fd8d9Z5ej4e(d1d:d;d<e)de*fd=d>Z6ej4e(d1d?d;d@e*fdAdBZ7ej4e(d1dCd;d@e*fdDdEZ8ej4e(d1dFd;dzdGeee*  fdHdIZ9ej4e(d1dJd;d{dLe*fdMdNZ:ej4e(d1dOd;		P	d|dQeee*  dRe*dSeee*  fdTdUZ;ej4e(d1dVd;d{dLe*fdWdXZ<ej4e(d1dYd;d{dLe*fdZd[Z=ej4e(d1d\d;d}dLe*fd^d_Z>ej4e(d1d`d;dadb Z?ej4e(d1dcd;ddde Z@ej4e(d1dfd;dgdh ZAej4e(d1did;d{dLe*fdjdkZBdle*de/fdmdnZCej4d1e(d2dod4	6	d~de)dle*de)d7e/dpe*dqe*fdrdsZDej4d1e(d2dtd4ddLe*fdvdwZEdS )u   Celery 处理任务    NdatetimePath)ListOptional)Task)logger)insert)IntegrityError)
celery_app)TaxDocumentDataProcessingTaskProcessorEngineCategoryProcessor)IncrementalUpdater)CheckpointManager)KnowledgeBaseClient)normalize_legal_doc_number)StagingPackageBuilder)get_settingsc                   @   s(   e Zd ZdZdZedd Zdd ZdS )DatabaseTasku!   带数据库会话的任务基类Nc                 C   s$   | j d u rddlm} | | _ | j S )Nr   )SessionLocal)_dbapp.databaser   )selfr    r   G/lsinfo/ai/hellotax_ai/data_center/backend/app/tasks/processor_tasks.pydb"   s   
zDatabaseTask.dbc                 O   s"   | j d ur| j   d | _ d S d S N)r   close)r   argskwargsr   r   r   after_return)   s   


zDatabaseTask.after_return)__name__
__module____qualname____doc__r   propertyr    r%   r   r   r   r   r      s    
r   task_idcategory_idmodereturnc              
   C   sV   |  ttj|k }|s)t|||dddddd}| | |   | | |S )u6   获取已存在的任务记录，如不存在则创建pendingr   )r+   r,   r-   statusprogresstotal_countsuccess_countfailed_count)queryr   filterr+   firstaddcommitrefresh)r    r+   r,   r-   taskr   r   r   _get_or_create_task/   s    


r<   r;   c                 K   s4   |  D ]
\}}t||| qt |_|   dS )u   更新任务字段并提交N)itemssetattrr   utcnow
updated_atr9   )r    r;   r$   kvr   r   r   _update_taskC   s   
rC   
source_url
new_resultc                 C   s  |  ttj|k }|sdS |j}|d}|r!|r!||kr#dS zddl}ddlm	} t
 d}tjd t|j }	|	jddd |	| d	 }
|j|jt
  |j|j|j|jrd|j nd|jrm|j nd||j|j|j|jr}|j ndd

||ddd}t|
ddd}|j||ddd W d   n1 sw   Y  td|
  W dS  t y } zt!d| d|  W Y d}~dS d}~ww )uz   
    当文档 hash 变化时归档旧版本到:
      {settings.raw_data_dir}/versions/{tax_doc_id}/{timestamp}.json
    Ncontent_hashr   r   z%Y%m%d%H%M%SversionsTparentsexist_okz.json)
title
doc_numberissuing_authority
issue_dateeffective_daterF   content_htmlcontent_markdownattachmentsr@   rK   rF   rK   )
tax_doc_idrD   archived_atpreviousincomingwutf-8encodingF   ensure_asciiindentu   [version] 已归档旧版本: u+   [version] 归档旧版本失败 source_url=: )"r5   r   r6   rD   r7   rF   getjsonpathlibr   r   r?   strftimesettingsraw_data_dirstridmkdir	isoformatrK   rL   rM   rN   rO   rP   rQ   rR   r@   opendumpr	   info	Exceptionwarning)r    rD   rE   old_docold_hashnew_hashrb   r   tsarchive_dirarchive_filepayloadfer   r   r   _archive_previous_versionK   sV   

$ry   resultc              
   C   s  z,| dr|d dd|d< | dr |d dd|d< | dr/|d dd|d< t| |d | ttjd"i d|d d|d d|d d	| d	d
| d
d| dd| dd| dpkdd| dd| dd| dd| dd| dd| dd| dd| dpdd| dd| dddjdgi d|d d	| d	d
| d
d| dd| dd| dpdd| dd| dd| dd| dd| dd| dd| dd| dp	dd| dd| ddddt id}| 	| | 
  W dS  tyS } z|   td| d d|  W Y d}~dS d}~w ty } z$|   ddl}td | d  td!|   W Y d}~dS d}~ww )#u   
    原子写入/更新 TaxDocument，利用 source_url UNIQUE 约束。
    INSERT ... ON CONFLICT DO UPDATE，避免并发重复写入。
    返回 True 表示成功（新增或更新），False 表示跳过/失败。
    rP     rQ   content_textrD   r,   rK   rL   rM   rN   rO   
doc_statusNrF   rR   inline_imagesinline_videosfetch_strategynormal
supersedes
referencesprocessing_status	completedr@   index_elementsset_Tu    [upsert] 写入冲突已忽略:     — Fr   u   [upsert] 写入失败: u   [upsert] 异常详情: r   )ra   replacery   	pg_insertr   valueson_conflict_do_updater   r?   executer9   r   rollbackr	   ro   rn   	tracebackerror
format_exc)r    rz   stmtrx   r   r   r   r   _upsert_tax_document   s   






	
















	







/r   doc_idrR   c                 C   s
  |sdS |  ttj|k }|r|jsdS |j}d}t|D ]V\}}|dd}|dd}	|r;d| d|vr<q!|	rCtj	|	 nd}
|
rU|

 rUd| d	| d
}n|}||krw|d| dd| d}d}td| d| d q!|r||_|   dS dS )u"  
    将 content_markdown 中附件的源 URL 替换为本地下载端点。
    - 有本地文件 → /api/v1/documents/{id}/attachments/{idx}/download
    - 无本地文件 → 保留源 URL（降级）
    不产生额外查询：直接 UPDATE，仅当内容有变化时才写库。
    NFurlr|   pathz]()z/api/v1/documents/z/attachments/z	/downloadTz[attachment-url] doc_id=    附件[u   ] URL 已替换为本地端点)r5   r   r6   rh   r7   rQ   	enumeratera   re   project_rootis_filer   r	   rm   r9   )r    r   rR   tax_doccontentchangedidxattrD   
local_path	abs_localnew_urlr   r   r   _rewrite_attachment_urls   s6   
r      	kb_clientbatch_doc_idskb_idcategory_id_mapc                 C   s.  |sdS |pi }d}|D ]{}|  ttj|k }|sq|jr&|d7 }q||j}z8t	|j
|||d}	|	rQ|	drQd|_|	d|_|   |d7 }ntd| d|	r^|	d	nd
  W q ty }
 z|   tjd| d|
 dd W Y d}
~
qd}
~
ww tdt| d|  |S )u  
    批量推送知识库（每500条触发）。

    Args:
        db: 数据库会话
        kb_client: 知识库客户端
        batch_doc_ids: 待推送的 TaxDocument ID 列表
        kb_id: base_platform 知识库 ID（用于图谱构建）
        category_id_map: {data_center_category_id: base_platform_category_id} 映射

    Returns:
        本批成功导入数量
    r   r   r   r,   successTknowledge_doc_idu#   [batch-import] 导入失败 doc_id=z error=r   unknownu#   [batch-import] 导入异常 doc_id=r`   exc_infoNu'   [batch-import] 本批完成: requested=z
 imported=)r5   r   r6   rh   r7   is_importedra   r,   asynciorunimport_documentr   r9   r	   ro   rn   r   r   rm   len)r    r   r   r   r   importedr   r   kb_category_idrz   rx   r   r   r   _flush_import_batch   sL   
&r   Tdata_center_queuez/app.tasks.processor_tasks.process_category_task)bindbasequeuenamefullFforcec           +      C   s  t d| d| d|  t| j|||}t||d}| }| }zt| j|dt	 dd t
 }	|	|}
|
sLt| j|dd	dd
 dd	dW S ttjtjtjtjd}|	|}g }|d }t||d D ]>}|	||}t d| d| d| d|  t|||}|r|| t|| d }t| j||d || qjt|}t d| d| dt| d t| j||t| dd |dkr|st| j|ddt	 d |  d|ddW S ttj|
d  }t|j ddd t|}d}t!tj"tj#d}tj$}tj%}t& }d }g }d}|D ]9}|d! }|'|r,|d7 }q|d"krQ|sQ| j(t)j*+t)j,|k- } | rQ|.| |d7 }qzt|/|||}!|!r|!0d#rt1| j|! |.| |d7 }| j(t)+t)j,|k- }"|"rz|2|" W n" t3y }# zt 4d| d$|"j* d|#  W Y d%}#~#nd%}#~#ww |!0d&rt5| j|"j*|!d&  |6|"j* t||krt d| d't| d( t7| j||||d)}$||$7 }g }n|d7 }|!r|!0d*d+nd,}%t 4d| d-| d.|%  W n' t3y0 }& z|d7 }t j8d| d/| d.|& dd0 W Y d%}&~&nd%}&~&ww |t| }'|| }(dt|(t9|'d d1  })t| j||||)d2 q|:  |ryt d| d3t| d( t7| j||||d)}$||$7 }g }t d| d4|  t| j|ddt	 d |dkr|  t d| d5|t|  d6| d7|  d|||t| ||d8W S  t3y }* z,t j8d| d9|* dd0 |:  t| j|dt|*t	 d: dt|*dW  Y d%}*~*S d%}*~*ww );u0   处理单个分类的任务（含断点续爬）[u   ] 开始处理分类 u   ，模式: r+   r,   runningr   r0   
started_atr1   failed   未知的分类ID)r0   error_messager1   Fr   r   timeoutmax_retries	delay_min	delay_maxr   u   ] 获取列表页 /r`      )r1   u   ] 共获取 u)    个新文档链接（已有断点记录 u    条）)r2   r1   r   d   r0   r1   completed_atT)r   r+   totalr   rH   base_urlapi_key  r   r   r   u!   ] staging 包生成失败 doc_id=NrR   u   ] 触发批量推送:     条)r   r   r   u   未知错误   处理失败u   ] 文档处理失败: r   u   ] 文档异常: r   P   r3   r4   r1   u   ] 推送最终批次: u(   ] 知识库导入完成: total_imported=u   ] 完成: total=	 success= failed=)r   r+   r,   r   r3   r4   u   ] 处理分类任务失败: r0   r   r   );r	   rm   r<   r    r   load_completed_urlsload_page_cursorrC   r   r?   r   get_category_configr   re   crawler_request_timeoutcrawler_max_retriescrawler_delay_mincrawler_delay_maxcalculate_total_pagesrangebuild_page_urlr   r   process_list_pageextendintsave_page_cursorr   clearrg   rf   r   ri   r   base_platform_urlbase_platform_api_keybase_platform_kb_idkb_category_id_mapr   is_doner5   r   rh   r6   rD   r7   	mark_doneprocess_documentra   r   save_packagern   ro   r   appendr   r   maxflush)+r   r+   r,   r-   r   r;   
checkpointcompleted_urlsresume_pagecategory_processorconfigprocessor_enginetotal_pagesall_documents
start_pagepagepage_urldocslist_progressr   save_dirr3   r4   r   r   r   staging_builderIMPORT_BATCH_SIZEpending_import_idstotal_importeddocdoc_urlexistingrz   	saved_docpkg_errr   errdoc_err	all_total	processedr1   rx   r   r   r   process_category_task=  sR  

"




,


	r  z/app.tasks.processor_tasks.process_document_task)r   r   r   document_urlc           
   
   C   sV  t d|  zttjtjtjtjd}t }|	|}|s%dddW S t
tj|d  }t|jddd t||||}|rw|d	rwt| j| | jttj|k }|rm|d
rmt| j|j|d
  d||ddW S d|r|dddW S ddW S  ty }	 zt jd|	 dd dt
|	dW  Y d}	~	S d}	~	ww )u-   处理单个文档的任务（原子写入）u   开始处理文档: r   Fr   r   r   TrH   r   rR   rK   )r   r  rK   r   r   u   处理文档任务失败: r   N)r	   rm   r   re   r   r   r   r   r   r   rg   rf   r   ri   r   r   r   ra   r   r    r5   r   r6   rD   r7   r   rh   rn   r   )
r   r  r,   r   r   r   r  rz   r  rx   r   r   r   process_document_task  s@   

"r  z3app.tasks.processor_tasks.reprocess_from_local_taskrT   c                 C   s&  | j ttj|k }|sdddS ttjtj	tj
tjd}t }||j}|r5ttj|d  nttj}t|jddd |jrUt||j|j|j|}nt||j|j|}|r|dr|d	su|jru|j|d	< t| j | d||d
dS d|r|dddS ddS )us   从本地存储的原始 HTML 重处理文档，不发网络请求。content_html 为空时回退到网络抓取。F   文档不存在r   r   r   TrH   r   rR   rK   )r   r   rK   r   r   )r    r5   r   r6   rh   r7   r   re   r   r   r   r   r   r   r,   rg   rf   r   ri   rP   r   r   reprocess_from_htmlrD   r   ra   rR   r   )r   rT   r  r   r   r   r  rz   r   r   r   reprocess_from_local_taskA  s4   
 
r  z7app.tasks.processor_tasks.import_to_knowledge_base_taskc              
   C   s6  t d| d zn| jttj|k }|s dddW S |jr+dd|j	dW S t
tjtjd	}tj}tj|j}t|j|||d
}|rg|drgd|_|d|_	| j  d||ddW S d|rs|dddW S ddW S  ty } zt jd| dd dt|dW  Y d}~S d}~ww )u!   导入文档到知识库的任务u   开始导入文档 u    到知识库Fr  r   Tu   文档已导入)r   messager   r   r   r   r   )r   rT   r   r   u   导入失败u   导入文档任务失败: r   N)r	   rm   r    r5   r   r6   rh   r7   r   r   r   re   r   r   r   r   ra   r,   r   r   r   r9   rn   r   rg   )r   rT   r   r   r   r   rz   rx   r   r   r   import_to_knowledge_base_taskh  s6   
"r  z:app.tasks.processor_tasks.backfill_attachment_content_taskdoc_idsc           )      C   st  ddl m} ddlm} ddl}ddlm} d}g }|du rFddlm} | j	t
jt
jd|d|d	t
jd
 }	dd |	D }t|}
td|
  | }| }d}d}d}d}|D ]c}| j	t
t
j|k }|sz|d7 }qb|jp~g }|s|d7 }qbg }h d}dh}d}t|D ]\}}|dd}|rtj|std| d|  qtj|d  }||v rtd| d|d d| d |||d|d|tj|rtj |d d ndd q||v r,td| d|d d| d |||d|d|tj|r&tj |d d ndd q||v rdtd| d|d d| d |||d|d|tj|r^tj |d d ndd qtj |}||krt!d| d |d d!|d d d"d#|d d d$d%	 |||d|d&|d d d"d'|d d d$d%||d d d qzAt"#|$|}|d(r|d)d% r||dd*|d  |d) d+ td| d,|d d-|d. d"d W q t&y } zt!d| d/|d d0|  W Y d}~qd}~ww |s!|d7 }qbt|dk}g } t|D ]8\}!}"|rP|!t|k r?||! nt'|!d }#d*|# d1|"d  }$nd2|"d  }$| d3|$ d4|"d)   q-d(| }%|jppd|% }&|)|&*d5d}'z#|&*d5d|_|'|_+| j,  |d7 }td| d6t| d7 W qb t&y } z| j-  |d7 }t.d| d8|  W Y d}~qbd}~ww |r!z;t/|d9d:d;}(|j0|1 2 t||d<|(d=d>d? W d   n	1 sw   Y  td@| dAt| dB W n t&y  } zt.dC|  W Y d}~nd}~ww tdD|
 dE| dF| dG|  dH|
|||dIS )Ju   
    对有附件但 content_markdown 中缺少附件内容的文档，直接读取本地附件文件转换并追加。
    不重新抓取网页，不重新下载附件，仅对已有本地文件做 convert_local。
    doc_ids 为 None 时自动查找所有符合条件的文档。
    r   )DocConverterDocumentCleanerNr   z!/tmp/skipped_ocr_attachments.jsontextattachments::text != 'null'attachments::text != '[]'u   %## 附件%c                 S   s   g | ]}|d  qS )r   r   .0rr   r   r   
<listcomp>  s    z4backfill_attachment_content_task.<locals>.<listcomp>u*   [backfill_attachment] 待补填文档数: u   一二三四五六七八九十r   >   .bmp.gif.png.tif.jpeg.tiff.webp.jpgz.pdfi   r   r|   z[backfill_attachment] doc_id=u    附件文件不存在: u    跳过扫描附件: r   z (r   u   扫描附件（图片）i   )r   attachment_name	file_pathreasonfile_extensionfile_size_mbu    跳过PDF文件: u$   PDF文件（需要OCR，处理慢）u    跳过大文件: u
    (大小: z.2fu   MB, 阈值: z.0fzMB)u   文件过大 (zMB > r   markdownu   附件)r   r6  u    附件转换成功: z
 (quality=parse_quality_scoreu    附件转换异常: r   u   ：u	   附件：z


---

## z

r{   u    补填完成，追加 u
    个附件u    写库失败: rX   rY   rZ   )	timestamptotal_skippedskipped_itemsFr\   r]   u0   [backfill_attachment] 跳过记录已保存到: u    (共 u    条)u0   [backfill_attachment] 保存跳过记录失败: u$   [backfill_attachment] 完成: total=r   r   z	 skipped=T)r   r   r3   r4   skipped_count)38app.services.tax_data_processor.converters.doc_converterr  0app.services.tax_data_processor.document_cleanerr   rb   r   
sqlalchemyr"  r    r5   r   rh   r6   rR   isnotrQ   notlikeallr   r	   rm   r7   r   ra   osr   isfiledebugsplitextlowerr   existsgetsizero   r   r   convert_localstriprn   rg   joinmarkdown_to_rag_textr   r}   r9   r   r   rk   rl   nowrj   ))r   r  r  r   rb   r   skipped_log_pathskipped_recordssa_textrowsr   doc_convertercleanerlabelsr3   r4   r;  r   r  rR   attachment_sectionsimage_extensionsskipped_extensionsLARGE_FILE_THRESHOLDir   r   file_ext	file_sizeconv_resultrx   use_ordinalpartsjseclabelheadingextra_mdnew_markdownnew_textrw   r   r   r    backfill_attachment_content_task  sT  




$"
$$
$$
$
."

 
"
 rf  z0app.tasks.processor_tasks.fill_content_text_task   
batch_sizec                 C   sN  ddl m} | }| jttjdtj	d
 }td|  d}d}	 | jttjdtj	dtj| }|sHnP|D ]6}z||j}	|	r[|	ddnd|_W qJ ty }
 ztd|j d	|
  d|_W Y d}
~
qJd}
~
ww | j  |t|7 }td
| d|  q*td| d d||dS )u   
    全库扫描，为 content_text 为空但 content_markdown 不为空的文档填充 RAG 纯文本。
    每 batch_size 条提交一次，避免长事务。
    r   r  Nu(   [fill_content_text] 待填充文档数: Tr{   r|   z[fill_content_text] doc_id=u    转换失败: u   [fill_content_text] 进度: r   u'   [fill_content_text] 完成，共处理 r   )r   r   r  )r=  r   r    r5   r   r6   rQ   r?  r}   is_countr	   rm   order_byrh   limitrA  rL  r   rn   ro   r9   r   )r   rh  r   rS  r   r  offsetr  r  r"  rx   r   r   r   fill_content_text_taskQ  sB   


rn  z1app.tasks.processor_tasks.incremental_update_task   category_idswindow_days
source_idsc           !         sf  d|pd d| d}t d|  zt| j}t|j||d}d}d}d}	d d	tf fd
d}
|D ]`}|drQt 	d|d d|d  q7|d }|dg D ]}t
|d | |d7 }q[|dg D ]&}|d }| jttj|k }|r|jr|jrzddlm} ddlm} ddlm} | }||}|rttj|d  nttjd }||jddd |tjtjtj tj!d}t|"|||}|rU|drU|dp|dpd}|r-t|
 j#|j|d }|r|drt d!| d"|j  |	d7 }	nt $d#| d$|r(|dnd%  t%| j| | jttj|k }|rT|d&rTt&| j|j'|d&  n|r^|dd'nd'}t $d(| d)|  W n& t(y } zt j	d*| d)| dd+ W Y d}~nd}~ww t
|| |d7 }qpq7|r|D ]=}t|)||}|drt 	d,| d|d   q|dg D ]}t*jtt+d-, |d.d/ |d7 }qĐq|pt-t.dd0D ]}|/| qt d1| d2| d3| d4|	  d||||	d5d6 |D d7W S  t(y2 }  zt j	d8|  dd+ d9t| d:W  Y d} ~ S d} ~ ww );u   
    增量更新任务

    Args:
        category_ids: 要检查的分类ID列表；None = 全部 8 类
        window_days:  增量时间窗口（天）；0 = 不过滤日期（全量扫描）
    zcategories=rA  z window=u   天u   开始执行增量更新: )rp  rq  r   Nr.   c                      s    d u rt tjtjd  S )Nr   )r   re   r   r   r   kb_client_incrementalr   r   _get_kb_client  s   z/incremental_update_task.<locals>._get_kb_clientr   u   [增量] 分类 r,   u    检查失败: new_documentsr   r   updated_documentsr   r   r   r   incrementalTrH   r   r   r}   rQ   r|   )r   new_contentu-   [增量] 增量重向量化成功 source_url=z knowledge_doc_id=u-   [增量] 增量重向量化失败 source_url=r`   r   rR   r   u,   [增量] 重爬失败（已导入文档）: r   u&   [增量] 已导入文档更新异常: r   u   [增量] 数据源 uuidr   )r-   	   u   [增量] 完成: u    — 新增=u    更新=u    重向量化=c              
   S   sP   g | ]$}| d | dt| dg t| dg | dd| ddqS )r,   category_namerv  rw  skippedr   r   )r,   r|  newupdatedr}  r   )ra   r   r%  r   r   r   r(    s    	
z+incremental_update_task.<locals>.<listcomp>)r   scope	total_newtotal_updatedtotal_revectorizedresultsu   增量更新任务失败: Fr   )0r	   rm   r   r    r   r   check_categoriesr   ra   r   r  delayr5   r   r6   rD   r7   r   r   0app.services.tax_data_processor.processor_enginer   2app.services.tax_data_processor.category_processorr   rc   r   r   rg   re   rf   ri   r   r   r   r   r   update_document_contentro   r   r   rh   rn   check_sourceprocess_source_task
__import__uuid4listr   update_last_check_time)!r   rp  rq  rr  r  updaterr  r  r  r  ru  rz   r,   r  r  r  r   r   r   r   r   r  enginecrawl_resultry  	rv_resultinc_docr  inc_errsid
src_resultcidrx   r   rs  r   incremental_update_task  s   

	



$


Q	r  z.app.tasks.processor_tasks.build_relations_taskc                 C   sB   ddl m} || j|dtd}td|   ddi| S )u   
    批量填充已有文档的关联关系字段。

    统一走 relation_builder，确保后台任务和全库修复脚本使用同一套
    匹配、去重、自引用过滤和反向 superseded_by 回填规则。
    r   )rebuild_document_relationsF)rh  dry_runr	   u   [build_relations] 完成: r   T)0app.services.tax_data_processor.relation_builderr  r    r	   rm   as_dict)r   rh  r  statsr   r   r   build_relations_task1  s   r  z1app.tasks.processor_tasks.fill_superseded_by_taskc              
      s  ddl   d d dg}| j}|ttjdktjdtj	dtj
d }tdt|  |tjtjtjtjd }i |D ]}t|j}|rd|g | qRdtd	td
tf fdd}d}	|D ][}
|
j
p}ddd tfdddD sqwd}|D ]}|}|r|d } nq|sqw|tjtj|ktj|
jk }|s|||
j}|r|j|
_n||
_	|	d7 }	qwz|  W n ty } z|  t d|  W Y d}~nd}~ww td|	 dt| d dt||	dS )u  
    从被废止文档自身正文提取 superseded_by 信息。

    逻辑：
    1. 遍历 doc_status=obsolete 且无 superseded_by_doc_id/doc_number 的文档
    2. 从 content_markdown 开头提取"根据XXX，本文废止"中的废止依据文号
    3. 按文号查本库（含安全归一化匹配）→ 写 superseded_by_doc_id；未找到 → 写 superseded_by_doc_number
    r   NuF   根据[^，,。\n]{0,10}?《[^》]{2,60}》[（(]([^）)]{3,80})[）)]uH   根据([\u4e00-\u9fa5]{2,15}[〔\[（(]\d{4}[〕\]）)]第?\s*\d+\s*号)u1   根据((?:国务院令|主席令)第\s*\d+\s*号)obsoleteu#   [fill_superseded_by] 候选文档: rL   headcurrent_doc_idc                    s   t | }|sd S  fdd|g D }|sd S fdd|D }t|dkr,|d S t|dkrBd| s>t|dkrB|d S d S )Nc                    s   g | ]	}|j  kr|qS r   )rh   r&  item)r  r   r   r(  r  s
    
zKfill_superseded_by_task.<locals>._safe_normalized_match.<locals>.<listcomp>c                    s    g | ]}|j r|j  v r|qS r   )rK   r  r  r   r   r(  y  s     r   r   z\d{4}   )r   ra   r   search)rL   r  r  
normalizedmatchestitle_matches)_redocs_by_normalized_number)r  r  r   _safe_normalized_matchn  s   

$z7fill_superseded_by_task.<locals>._safe_normalized_matchr|   r   c                 3   s    | ]}| v V  qd S r!   r   )r&  rA   r  r   r   	<genexpr>  s    z*fill_superseded_by_task.<locals>.<genexpr>)u   废止u   失效r   u$   [fill_superseded_by] commit 失败: u$   [fill_superseded_by] 完成: 填充 r   r   Tr   r   filled)!recompiler    r5   r   r6   r~   superseded_by_doc_idri  superseded_by_doc_numberrQ   r?  rA  r	   rm   r   rh   rK   rL   r   
setdefaultr   rg   r   anyr  grouprJ  r7   r9   rn   r   r   )r   rh  	_PATTERNSr    
candidatesindexed_docsindexed_docr  r  r  r  rL   patmmatchedrx   r   )r  r  r  r   fill_superseded_by_task@  sx   
	








r  z3app.tasks.processor_tasks.fill_effective_dates_taskr   c              
      s  ddl m} h d}|  dtdtdB f fdd}| j}|ttj|tj	
d }td	| d
 d}d}	 |ttj|tj	
dtj| }	|	s[nV|	D ]}
|
jpcd}|||
j}|rs||
_	|d7 }q]z|  W n! ty } z|  td| d|  W Y d}~nd}~ww |t|	7 }td| d| d|  q>td| d|  d||dS )u  
    批量回填缺失的 effective_date 字段。

    策略（优先级从高到低）：
    1. 从 content_markdown 中提取具体日期（自YYYY年MM月DD日起施行等）
    2. 正文含"发布/印发/公布/颁布之日起施行" → effective_date = issue_date
    3. 上述均未命中时：effective_date = issue_date（行政惯例：通知类文件自发文日起执行）
    r   MetadataExtractor>   	normative
regulationlocal_policymdr.   Nc                    s0     | |r	t|nd}|r|S |rt|S dS )uM   从 content_markdown 提取生效日期，无法提取时返回 issue_date。N)_extract_effective_daterg   )r  rN   explicit_datemetadata_extractorr   r   _extract  s   z+fill_effective_dates_task.<locals>._extractu%   [fill_effective_dates] 需要回填: r   Tr|   r   u,   [fill_effective_dates] commit 失败 offset=r`   u   [fill_effective_dates] 进度: r   	    已填: u%   [fill_effective_dates] 完成: total= filled=r  ):app.services.tax_data_processor.parsers.metadata_extractorr  rg   r    r5   r   r6   doc_typein_rO   ri  rj  r	   rm   rk  rh   rl  rA  rQ   rN   r9   rn   r   r   r   )r   rh  r  regulation_doc_typesr  r    r   r  rm  batchr  r  rz   rx   r   r  r   fill_effective_dates_task  s\   






"r  z0app.tasks.processor_tasks.strip_ocr_content_taskc              
   C   s   ddl m} ddlm} | j}|t|tj	dtj	d
 }t|}td| d d}|D ]}|jp;d}||}	|	|krL|	|_|d	7 }q5z|  W n tyl }
 z|  td
|
   d}
~
ww td| d|  d||dS )uD   批量清除 content_markdown 中残留的 OCR 图片内容段落。r   r  )or_u   %正文图片内容%u   %<!-- 图片%u   [strip_ocr_content] 找到 u    条含 OCR 内容的文档r|   r   u#   [strip_ocr_content] commit 失败: Nu"   [strip_ocr_content] 完成: total=z	 cleaned=T)r   r   cleaned)r=  r   r>  r  r    r5   r   r6   rQ   likerA  r   r	   rm   strip_ocr_contentr9   rn   r   r   )r   r   r  r    r  r   r  r  originalstrippedrx   r   r   r   strip_ocr_content_task  s@   




r  z=app.tasks.processor_tasks.redownload_missing_attachments_taskc                    s  ddl m} | j}|ddd}ddlm} |ttj	d|d|d	
 }d}d}d}|D ]G}	|	jp8g }
d
}t|
D ]\}}|ddrRtjrRq?|dp\|ddrddrztd|	j d| d |d7 }|d7 }q?ddlm} dv r|	jrd}t|dkr|	jddd }||d |d stjtjd|	j}tj|dd |dptj d}tj|||d7 }t!d|	j d| d  ddl"d|	jd z fdd }t#$| }W n t%y } zd
t&|d!}W Y d}~nd}~ww |d" r%|d< |d7 }d}q?|d7 }td|	j d| d#|d$  q?|ryzdd%l'm(} ||	d |)|	 |*  W q1 t%yx } z|+  t,d|	j d&|  W Y d}~q1d}~ww q1t!d'| d(| d)|  d|||d*S )+uB   扫描全库，找出本地文件缺失的附件并重新下载。r   AttachmentParser<      r   r   r!  Nr#  r$  Fr   r|   original_urlr   z/api/z[redownload] doc_id=r   u   ] 无有效 URL，跳过r   )urljoinz/zcfgk/r\   r   rR   T)rJ   r   u   ] 缺失，重新下载: zBMozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36)z
User-AgentRefererc               	      s   j ddd4 I d H L} | j dI d H }|jdkrEtd}||j W d    n1 s2w   Y  ddiW  d   I d H  S dd	|j d
W  d   I d H  S 1 I d H s^w   Y  d S )NT   )follow_redirectsr   )headersrg  wbr   FzHTTP r   )AsyncClientra   status_coderk   writer   )clientr'  rw   r  httpxr   r   r   r   	_downloadS  s   
0z6redownload_missing_attachments_task.<locals>._downloadr   r   u   ] 下载失败: r   flag_modifiedu    commit 失败: u   [redownload] 完成: missing=z downloaded=r   )r   missing_total
downloadedr   )-9app.services.tax_data_processor.parsers.attachment_parserr  r    r>  r"  r5   r   r6   rR   r?  rA  r   ra   rB  r   rC  
startswithr	   ro   rh   urllib.parser  rD   splitr   rsplitrK  re   rf   rK   makedirsbasenamerstriprm   r  r   r   rn   rg   sqlalchemy.orm.attributesr  r8   r9   r   r   )r   r  r    parserrP  r  r  r  r   r  rR   r   r   r   r  r^  r   att_dirr   r  rz   rx   r  r   r  r   #redownload_missing_attachments_task  s   




	
&

$
r  z,app.tasks.processor_tasks.fix_json_null_taskc              
   C   s   ddl m} | j}||d}||d}z|  W n ty6 } z|  td|   d}~ww |j	}|j	}t
d| d|  d	||d
S )u   
    清理 supersedes / references 字段中存储的 JSON null 值（jsonb 'null'），
    将其改为 SQL NULL，避免 jsonb_array_elements 报错。
    r   r!  z
        UPDATE tax_documents
        SET supersedes = NULL
        WHERE supersedes IS NOT NULL AND jsonb_typeof(supersedes) = 'null'
    z
        UPDATE tax_documents
        SET "references" = NULL
        WHERE "references" IS NOT NULL AND jsonb_typeof("references") = 'null'
    u   [fix_json_null] commit 失败: Nu#   [fix_json_null] 完成: supersedes=z references=T)r   supersedes_fixedreferences_fixed)r>  r"  r    r   r9   rn   r   r	   r   rowcountrm   )r   r"  r    r1r2rx   	sup_fixed	ref_fixedr   r   r   fix_json_null_tasku  s    r  z5app.tasks.processor_tasks.fill_issuing_authority_taskc              
   C   s  ddl m} | }| j}|ttjd }t	
d| d d}d}	 |ttjdtj| }|s@nx|D ]8}	d}
|	jr]ddl}|d|	j}|r]|d }
|
sq|	jpcd	dd
 }||	jpnd	|}
|
rz|
|	_|d7 }qBz|  W n! ty } z|  t	d| d|  W Y d}~nd}~ww |t|7 }t	
d| d| d|  q(t	
d| d|  d||dS )u   
    对 issuing_authority 为 NULL 的文档，尝试从 doc_number / title / content_markdown 推断发文机关。
    复用 MetadataExtractor._extract_issuing_authority()。
    r   r  Nu'   [fill_issuing_authority] 需要回填: r   TuM   ^([\u4e00-\u9fa5][\u4e00-\u9fa5 ]{3,60}?)(?:公告|令|函|发|〔|\[|（|\()r   r|   r   u.   [fill_issuing_authority] commit 失败 offset=r`   u!   [fill_issuing_authority] 进度: r   r  u'   [fill_issuing_authority] 完成: total=r  r  )r  r  r    r5   r   r6   rM   ri  rj  r	   rm   rk  rh   rl  rA  rL   r  matchr  rJ  rQ   _extract_issuing_authorityrK   r9   rn   r   r   r   )r   rh  r  	extractorr    r   r  rm  r  r  	authorityr  r  r   rx   r   r   r   fill_issuing_authority_task  sT   "(r	  	source_idc           	      C   s  zrt |ddp	d}t |ddpd}t |ddpd}|dd}|dd}|dd}t| |jt |dd|jd ttjd)i d	|jd
dd|d|jdt |dddt |dddt |dddt |dddt |ddpsddt |dddt |dddt |dddt |dddt |ddd|d|d|dt |dddt |dddt |dddt |dddt |ddpddt |dddt |dddd jd	gi d|d|jdt |dddt |dddt |dddt |dddt |ddpddt |dddt |dddt |dddt |dddt |ddd|d|d|dt |dddt |ddt |ddt |ddt |ddpTdt |ddt |ddd t	
 d!d"}| | |   W d#S  ty } z|   td$|j d%|  W Y d}~d&S d}~w ty } z"|   ddl}td'|j  td(|   W Y d}~d&S d}~ww )*u   
    与 _upsert_tax_document 相同，但额外写入 source_id / doc_type / region_code /
    qa_question / qa_answer，且 category_id 固定为 0。
    result 为 adapter 返回的 ParsedDocument 对象（有属性访问）。
    rP   Nr|   rQ   r}   r{   rF   rS   rD   r,   r   r
  rK   rL   rM   rN   rO   r~   r  region_codeqa_question	qa_answerinterpretation_formrR   r   r   r   r   r   r   r   r   )r   r   r   r   r   r   r@   r   Tu#   [upsert_v2] 写入冲突已忽略: r   Fu   [upsert_v2] 写入失败: u   [upsert_v2] 异常详情: r   )getattrr   ry   rD   rK   r   r   r   r   r   r?   r   r9   r   r   r	   ro   rn   r   r   r   )	r    rz   r
  rP   rQ   r}   r   rx   r   r   r   r   _upsert_tax_document_v2  s  
	
	





;r  z-app.tasks.processor_tasks.process_source_taskr  end_pagec                    s   ddl m} ddlm} j||jk s!dddS t	jd_
j  ttjjjjd}	||	td }
ztjd	t dd
 tdd jtjtj
k D td dt d ttjtj d	tj!
d g }d}t|
}d}d}t"dd pd|||||dddl#m$} |dddt%tj&j' t(j)ddd t*+ 	
fddfdd}t*,|  d }d }d  }d! }-  |r|t.j	|
d"7 }tjd#d$t d% |dkr)/  d||d&W S  t0y } zAdd l1}|2 }t3d d't4|j5 d(| d)|  -  tjd*t4|j5 d(| t d+ dt%|dW  Y d }~S d }~ww ),Nr   )
DataSource)get_adapterFu   数据源不存在r   r   r   r   r   c                 s   s    | ]\}|V  qd S r!   r   )r&  r   r   r   r   r  L  s    
z&process_source_task.<locals>.<genexpr>r   u   ] 已加载 u    条已有 URLr   r   concurrencyr  )r3   r4   r  r  r
  r  r  r  TrH   c                    sN  	4 I d H  z | I d H }|r|jrWg }|jD ]7}j|d tt|d  tjdI d H }||d |d |d rD|d nd|dd|d	dd
 q||_t	j
| | j | j d  d7  < j
ttj| jk }|r|jrtj
|j|j 
jdkrd |j td  krd  tj
d d7  < d   nd  d7  < W n+ ty } zd  d7  < tjd d| j d| dd W Y d }~nd }~ww d  d7  < tj
d d tdd  dd W d   I d H  d S 1 I d H s w   Y  d S )Nr   r   base_dirr   r   r|   sizer   type)r   r   r   r  r  r3   r   qa_12366r
  r  r   r4   r   u   ] 文档处理异常: r   Tr   r  r   c   r   )parse_documentrR   download_attachmentrg   r   re   r   r   ra   r  r    r   r   r8   r5   r   r6   rD   r7   r   rh   coder   r   r   rn   r	   r   rC   min)r  parsedr  r   dlsavedrx   )r	  adapterattachment_parserr   existing_urlsr   r   save_dir_sourcer   semsourcer
  stater;   r+   r   r   _process_onen  sn   







.0z)process_source_task.<locals>._process_onec                     s6  g } j d2 zg3 d H W }|jr	d  d7  < q
dkr9s9|jv r9|j 	d  d7  < q
| t | t| d krqtj| ddiI d H }|D ]}t	|t
rltjd
 d	| |d
 qX|   q
6 | rtj| ddiI d H }|D ]}t	|t
rtjd
 d	| |d
 qd S d S )N)r  r  r3   r   r      return_exceptionsTr   u   ] gather异常: r   )list_all_documentsr   r   r   r   r   ensure_futurer   gather
isinstancern   r	   r   r   )tasksr  r  r'  )r*  r#  r   r  r  r%  r   r-   r  r)  r+   r   r   _run  s:   


z!process_source_task.<locals>._runr3   r4   r  r
  r  r   r   r   )r   r+   r
  r3   r4   u   ] process_source_task 异常: r`   
r   r   )6app.models.tax_datar  (app.services.tax_data_processor.adaptersr  r    r5   r6   rh   r7   r<   r
  r9   r   re   r   r   request_delay_minrequest_delay_maxr   r   rC   r   r?   setr   rD   rA  r	   rm   r   r   r   r   r   r  r  r  rg   rf   r  r   ri   r   	Semaphorer   r   r   r   BaseExceptionr   r   r   r  r&   )r   r+   r
  r-   r   r  r  r  r  r  r   r
  r  r3   r4   r  r  r2  rx   r   tbr   )r	  r*  r#  r$  r   r  r  r%  r   r   r   r-   r&  r   r'  r(  r
  r  r)  r;   r+   r   r  /  s   



( 2

(r  z-app.tasks.processor_tasks.backfill_media_task2   c           *      C   s  ddl m} ddlm} ddlm} ddlm} ddlm	} ddl
m} | j}|dd	ddd
}	| }
|ddd}| }|ttjdtjd }td|  d }}	 |ttjdtjdtj| }|syn7|D ]'}z|jr||jnd}ttj|r|d nd }t |j!ddd t"#|	$|j|j%|jpd|}|r|&ds|d	7 }g |_|'  W q{|j(dkrg |_|'  |d	7 }W q{|
)|j|j%}||d d}|j*}|dkrddl+m,} ddl-}ttjd |.|j%/ 0 dd  d }t"#|||}g }n|dkrKddl+m1} ddl-}ttjd |.|j%/ 0 dd  d }g }t"#|||}n|
2||j%}g }|d rt |d|j  d }|j!ddd t3|d D ]A\} }!|!d  }"t |"4d!d j56 pd"}#t"#|j7|"t|d#| d$|#  tj8d%}$|$d r|9|"|$d& |!d' d(d) qrg }|d rt |d|j  d }|j!ddd t3|d D ]?\} }%|%d  }"t |"4d!d j56 pd*}#t"#|j7|"t|d+| d$|#  tj8d%}$|$d r|9|"|$d& d(g d, qt:|j;pg }&d-}'|&D ].}(|(&d&r*q t"#|j7|(d. tt ||(d  tj8d%}$|$d rM|$d& |(d&< d}'q ||_|pVd|_<|j(dkrh|d/ |_=|d0 |_>|'rs|&|_;||d1 |'  |d	7 }W q{ t?y }) z|@  |d	7 }tAd2|j d3|)  W Y d})~)q{d})~)ww td4| d5|  q[td6| d7| d5|  d|||d8S )9u   从 content_html 回填存量文档的 inline_images/inline_videos/content_markdown/content_text。
    不重新爬取，复用 reprocess_from_html 去噪流程。ocr_text/transcript 留空，等 ECS 填充。
    r   r   r   )
HTMLParserr  )BeautifulSoupr  r  r   r   r\   r  Nu   [backfill_media] 待回填: Tr   backfillrH   r   qarP   lxmlimage)_download_inline_imageschinatax_zcjdr  imagesvideo)_download_inline_videosvideos	backfill_src?r0  img_03dr  r   altr|   )src_originalr   rN  ocr_textz.mp4vid_)rO  r   
transcript	keyframesFr   rQ   r}   rR   z[backfill_media] doc_id=u	    失败: u    [backfill_media] 进度 success=r   u   [backfill_media] 完成: total=r   )r   r   r3   r4   )Br  r   r  r   3app.services.tax_data_processor.parsers.html_parserr=  r  r  bs4r>  r  r  r    r5   r   r6   rP   r?  r   ri  rj  r	   rm   rk  rh   rl  rA  r,   r   rg   re   rf   r   ri   r   r   r  rD   ra   r9   r  parser  >app.services.tax_data_processor.adapters.chinatax_zcjd.adapterrC  hashlibsha256encode	hexdigestrG  extract_mediar   r  suffixrF  r  r   r   r  rR   r   rQ   r}   rn   r   r   )*r   rh  r   r   r=  r  r>  r  r    r  html_parserr$  r   r   r3   r4   r  r  r   r  rz   r   cleaned_soupinterprC  _hashlibimg_dirr   r   rG  vid_dirmediar   img_inforJ  extr!  vid_inforR   att_changedr   rx   r   r   r   backfill_media_task  s  





.
.







$xri  )r   N)r   Fr!   )rg  )Nro  N)r   )r   FNN)r<  )Fr)   r   rB  r   rc   r   typingr   r   celeryr   logurur	   sqlalchemy.dialects.postgresqlr
   r   sqlalchemy.excr   app.celery_appr   r4  r   r   r  r   r  r   3app.services.tax_data_processor.incremental_updaterr   2app.services.tax_data_processor.checkpoint_managerr   5app.services.tax_data_processor.knowledge_base_clientr   1app.services.tax_data_processor.relation_identityr   7app.services.tax_data_processor.staging_package_builderr   
app.configr   re   r   rg   r   r<   rC   dictry   boolr   r  r   r   r;   r  r  r  r  rf  rn  r  r  r  r  r  r  r  r	  r  r  ri  r   r   r   r   <module>   s    9P-
@ U.&& B-

 2hE
&
^
<
] &