o
    iy!                     @   sn   d Z ddlZddlZddlmZ ddlmZmZmZ ddl	m	Z	 ddl
mZ edZdZd	ZG d
d dZdS )u   断点续爬管理器

维护 {category_id}:{page}:{doc_id} 三级游标，持久化到本地 SQLite。
支持进程崩溃/中断后从最近检查点恢复，避免从头重跑。
    N)Path)OptionalSetTuple)datetime)loggerz/tmp/tax_documents/indexzcheckpoint.db2   c                   @   s   e Zd ZdZ	d!dededee fddZde	e fd	d
Z
deddfddZd"ddZdedefddZdeddfddZdefddZd"ddZdefddZd"ddZdejfddZd"dd ZdS )#CheckpointManageru  
    断点续爬管理器

    使用示例：
        cp = CheckpointManager(task_id="task-abc", category_id=5)

        # 启动时恢复
        completed_urls = cp.load_completed_urls()

        # 处理文档时
        for doc in all_documents:
            if doc["url"] in completed_urls:
                continue  # 跳过已完成
            process(doc)
            cp.mark_done(doc["url"])  # 记录完成

        # 任务结束时清理
        cp.clear()
    Ntask_idcategory_iddb_dirc                 C   sF   || _ || _|p	t| _| jt | _t | _g | _	t
 | _|   d S N)r
   r   DEFAULT_CHECKPOINT_DIRr   CHECKPOINT_DB_NAMEdb_path	threadingLock_lock_pending_urlsset_completed_urls
_ensure_db)selfr
   r   r    r   `/lsinfo/ai/hellotax_ai/data_center/backend/app/services/tax_data_processor/checkpoint_manager.py__init__,   s   

zCheckpointManager.__init__returnc              	   C   s~   |   }|d| j| jf }W d   n1 sw   Y  dd |D | _td| j d| j dt| j d | jS )	uD   启动时加载已完成的 URL 集合，用于跳过已处理文档zESELECT doc_url FROM checkpoints WHERE task_id = ? AND category_id = ?Nc                 S   s   h | ]}|d  qS )r   r   ).0rowr   r   r   	<setcomp>H   s    z8CheckpointManager.load_completed_urls.<locals>.<setcomp>[CheckpointManager] task= cat=u    已恢复     条断点记录)	_connectexecuter
   r   fetchallr   r   infolen)r   connrowsr   r   r   load_completed_urlsA   s   

z%CheckpointManager.load_completed_urlsdoc_urlc                 C   sj   | j | | j" | j| t| jtkr#|   W d   dS W d   dS 1 s.w   Y  dS )uH   标记单条文档处理完成，每 CHECKPOINT_INTERVAL 条批量落盘N)r   addr   r   appendr'   CHECKPOINT_INTERVAL_flushr   r+   r   r   r   	mark_doneO   s   
"zCheckpointManager.mark_donec                 C   s4   | j  |   W d   dS 1 sw   Y  dS )uH   强制将缓冲区写入磁盘（任务完成或异常退出时调用）N)r   r/   r   r   r   r   flushW   s   
"zCheckpointManager.flushc                 C   s
   || j v S )u   判断某 URL 是否已完成)r   r0   r   r   r   is_done\   s   
zCheckpointManager.is_donepagec              	   C   sN   |   }|d| j| j|t  f W d   dS 1 s w   Y  dS )uH   保存当前列表页游标（可选，用于从中断的页码恢复）a  
                INSERT INTO page_cursors (task_id, category_id, last_page, updated_at)
                VALUES (?, ?, ?, ?)
                ON CONFLICT(task_id, category_id)
                DO UPDATE SET last_page = excluded.last_page, updated_at = excluded.updated_at
                N)r#   r$   r
   r   r   utcnow	isoformat)r   r5   r(   r   r   r   save_page_cursor`   s   
"z"CheckpointManager.save_page_cursorc                 C   s   |   }|d| j| jf }W d   n1 sw   Y  |r&|d nd}|dkrBtd| j d| j d|d  d| d		 |S )
u<   加载已完成的最后页码，返回 0 表示从头开始zHSELECT last_page FROM page_cursors WHERE task_id = ? AND category_id = ?Nr   r    r!   u    从第    u    页继续（已完成第 1~u    页）)r#   r$   r
   r   fetchoner   r&   )r   r(   r   r5   r   r   r   load_page_cursorm   s$   

z"CheckpointManager.load_page_cursorc                 C   sz   |   }|d| j| jf |d| j| jf W d   n1 s#w   Y  | j  td| j d| j d dS )uB   任务成功完成后清理检查点记录（节省磁盘空间）z=DELETE FROM checkpoints WHERE task_id = ? AND category_id = ?z>DELETE FROM page_cursors WHERE task_id = ? AND category_id = ?Nr    r!   u    检查点已清理)r#   r$   r
   r   r   clearr   r&   r   r(   r   r   r   r<   |   s   



	zCheckpointManager.clearc                 C   s   | j | jt| jt| jdS )u   返回当前断点统计信息)r
   r   completed_countpending_flush)r
   r   r'   r   r   r2   r   r   r   stats   s
   zCheckpointManager.statsc                 C   s\   | j jddd |  }|d |d |d W d   dS 1 s'w   Y  dS )u&   初始化 SQLite 数据库和表结构T)parentsexist_oka  
                CREATE TABLE IF NOT EXISTS checkpoints (
                    id          INTEGER PRIMARY KEY AUTOINCREMENT,
                    task_id     TEXT    NOT NULL,
                    category_id INTEGER NOT NULL,
                    doc_url     TEXT    NOT NULL,
                    created_at  TEXT    NOT NULL DEFAULT (datetime('now')),
                    UNIQUE(task_id, category_id, doc_url)
                )
                z
                CREATE INDEX IF NOT EXISTS idx_cp_task_cat
                ON checkpoints(task_id, category_id)
                af  
                CREATE TABLE IF NOT EXISTS page_cursors (
                    task_id     TEXT    NOT NULL,
                    category_id INTEGER NOT NULL,
                    last_page   INTEGER NOT NULL DEFAULT 0,
                    updated_at  TEXT    NOT NULL,
                    PRIMARY KEY (task_id, category_id)
                )
                N)r   mkdirr#   r$   r=   r   r   r   r      s   
"zCheckpointManager._ensure_dbc                 C   s,   t jt| jdd}|d |d |S )uE   返回 WAL 模式连接（上下文管理器，自动提交/回滚）
   )timeoutzPRAGMA journal_mode=WALzPRAGMA synchronous=NORMAL)sqlite3connectstrr   r$   r=   r   r   r   r#      s   

zCheckpointManager._connectc              
      s   j sdS t    fddj D }zSz( }|d| W d   n1 s-w   Y  tdtj  d W n t	yY } zt
d|  W Y d}~nd}~ww W j   dS W j   dS j   w )uE   将 _pending_urls 批量写入数据库（调用方须持有 _lock）Nc                    s   g | ]
}j j| fqS r   )r
   r   )r   urlnowr   r   r   
<listcomp>   s    z,CheckpointManager._flush.<locals>.<listcomp>z
                    INSERT OR IGNORE INTO checkpoints
                        (task_id, category_id, doc_url, created_at)
                    VALUES (?, ?, ?, ?)
                    u   [CheckpointManager] 写入 r"   u(   [CheckpointManager] 写入断点失败: )r   r   r6   r7   r#   executemanyr   debugr'   	Exceptionerrorr<   )r   r)   r(   er   rJ   r   r/      s0   
	zCheckpointManager._flushr   )r   N)__name__
__module____qualname____doc__rH   intr   r   r   r   r*   r1   r3   boolr4   r8   r;   r<   dictr@   r   rF   
Connectionr#   r/   r   r   r   r   r	      s*    



"r	   )rU   rF   r   pathlibr   typingr   r   r   r   logurur   r   r   r.   r	   r   r   r   r   <module>   s    