import asyncio
import os
from datetime import datetime
from pathlib import Path

from celery import Task
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.exc import IntegrityError

from app.celery_app import celery_app
from app.config import get_settings
from app.models.tax_data import DataProcessingTask, TaxDocument
from app.services.tax_data_processor.category_processor import CategoryProcessor
from app.services.tax_data_processor.checkpoint_manager import CheckpointManager
from app.services.tax_data_processor.incremental_updater import IncrementalUpdater
from app.services.tax_data_processor.knowledge_base_client import KnowledgeBaseClient
from app.services.tax_data_processor.processor_engine import ProcessorEngine
from app.services.tax_data_processor.relation_identity import normalize_legal_doc_number
from app.services.tax_data_processor.staging_package_builder import StagingPackageBuilder
from common_logging import get_logger
logger = get_logger(__name__)

settings = get_settings()

class DatabaseTask(Task):
    _db = None

    @property
    def db(self):
        if self._db is None:
            from app.database import SessionLocal
            self._db = SessionLocal()
        return self._db

    def after_return(self, *args, **kwargs):
        if self._db is not None:
            self._db.close()
            self._db = None

def _get_or_create_task(db, task_id: str, category_id: int, mode: str) -> DataProcessingTask:
    task = db.query(DataProcessingTask).filter(DataProcessingTask.task_id == task_id).first()
    if not task:
        task = DataProcessingTask(task_id=task_id, category_id=category_id, mode=mode, status='pending', progress=0, total_count=0, success_count=0, failed_count=0)
        db.add(task)
        db.commit()
        db.refresh(task)
    return task

def _update_task(db, task: DataProcessingTask, **kwargs):
    for k, v in kwargs.items():
        setattr(task, k, v)
    task.updated_at = datetime.utcnow()
    db.commit()

def _archive_previous_version(db, source_url: str, new_result: dict):
    old_doc = db.query(TaxDocument).filter(TaxDocument.source_url == source_url).first()
    if not old_doc:
        return
    old_hash = old_doc.content_hash
    new_hash = new_result.get('content_hash')
    if not old_hash or not new_hash or old_hash == new_hash:
        return
    try:
        import json
        ts = datetime.utcnow().strftime('%Y%m%d%H%M%S')
        archive_dir = settings.raw_data_dir / 'versions' / str(old_doc.id)
        archive_dir.mkdir(parents=True, exist_ok=True)
        archive_file = archive_dir / f'{ts}.json'
        payload = {'tax_doc_id': old_doc.id, 'source_url': old_doc.source_url, 'archived_at': datetime.utcnow().isoformat(), 'previous': {'title': old_doc.title, 'doc_number': old_doc.doc_number, 'issuing_authority': old_doc.issuing_authority, 'issue_date': old_doc.issue_date.isoformat() if old_doc.issue_date else None, 'effective_date': old_doc.effective_date.isoformat() if old_doc.effective_date else None, 'content_hash': old_hash, 'content_html': old_doc.content_html, 'content_markdown': old_doc.content_markdown, 'attachments': old_doc.attachments, 'updated_at': old_doc.updated_at.isoformat() if old_doc.updated_at else None}, 'incoming': {'content_hash': new_hash, 'title': new_result.get('title')}}
        with open(archive_file, 'w', encoding='utf-8') as f:
            json.dump(payload, f, ensure_ascii=False, indent=2)
        logger.info(f'[version] 已归档旧版本: {archive_file}')
    except Exception as e:
        logger.warning(f'[version] 归档旧版本失败 source_url={source_url}: {e}')

def _upsert_tax_document(db, result: dict) -> bool:
    try:
        if result.get('content_html'):
            result['content_html'] = result['content_html'].replace('\x00', '')
        if result.get('content_markdown'):
            result['content_markdown'] = result['content_markdown'].replace('\x00', '')
        if result.get('content_text'):
            result['content_text'] = result['content_text'].replace('\x00', '')
        _archive_previous_version(db, result['source_url'], result)
        stmt = pg_insert(TaxDocument).values(source_url=result['source_url'], category_id=result['category_id'], title=result['title'], doc_number=result.get('doc_number'), issuing_authority=result.get('issuing_authority'), issue_date=result.get('issue_date'), effective_date=result.get('effective_date'), doc_status=result.get('doc_status') or None, content_html=result.get('content_html'), content_markdown=result.get('content_markdown'), content_text=result.get('content_text'), content_hash=result.get('content_hash'), attachments=result.get('attachments'), inline_images=result.get('inline_images'), inline_videos=result.get('inline_videos'), file_path=result.get('file_path'), file_directory=result.get('file_directory'), fetch_strategy=result.get('fetch_strategy') or 'normal', supersedes=result.get('supersedes'), references=result.get('references'), processing_status='completed').on_conflict_do_update(index_elements=['source_url'], set_={'title': result['title'], 'doc_number': result.get('doc_number'), 'issuing_authority': result.get('issuing_authority'), 'issue_date': result.get('issue_date'), 'effective_date': result.get('effective_date'), 'doc_status': result.get('doc_status') or None, 'content_html': result.get('content_html'), 'content_markdown': result.get('content_markdown'), 'content_text': result.get('content_text'), 'content_hash': result.get('content_hash'), 'attachments': result.get('attachments'), 'inline_images': result.get('inline_images'), 'inline_videos': result.get('inline_videos'), 'file_path': result.get('file_path'), 'file_directory': result.get('file_directory'), 'fetch_strategy': result.get('fetch_strategy') or 'normal', 'supersedes': result.get('supersedes'), 'references': result.get('references'), 'processing_status': 'completed', 'updated_at': datetime.utcnow()})
        db.execute(stmt)
        db.commit()
        return True
    except IntegrityError as e:
        db.rollback()
        logger.warning(f"[upsert] 写入冲突已忽略: {result.get('source_url')} — {e}")
        return False
    except Exception:
        db.rollback()
        import traceback
        logger.error(f"[upsert] 写入失败: {result.get('source_url')}")
        logger.error(f'[upsert] 异常详情: {traceback.format_exc()}')
        return False

def _rewrite_attachment_urls(db, doc_id: int, attachments: list) -> None:
    if not attachments:
        return
    tax_doc = db.query(TaxDocument).filter(TaxDocument.id == doc_id).first()
    if not tax_doc or not tax_doc.content_markdown:
        return
    content = tax_doc.content_markdown
    changed = False
    for idx, att in enumerate(attachments):
        source_url = att.get('url', '')
        local_path = att.get('path', '')
        if not source_url or f']({source_url})' not in content:
            continue
        abs_local = settings.project_root / local_path if local_path else None
        if abs_local and abs_local.is_file():
            new_url = f'/api/v1/documents/{doc_id}/attachments/{idx}/download'
        else:
            new_url = source_url
        if new_url != source_url:
            content = content.replace(f']({source_url})', f']({new_url})')
            changed = True
            logger.info(f'[attachment-url] doc_id={doc_id} 附件[{idx}] URL 已替换为本地端点')
    if changed:
        tax_doc.content_markdown = content
        db.commit()

def _flush_import_batch(db, kb_client: KnowledgeBaseClient, batch_doc_ids: list[int], kb_id: int=1, category_id_map: dict | None=None) -> int:
    if not batch_doc_ids:
        return 0
    category_id_map = category_id_map or {}
    imported = 0
    for doc_id in batch_doc_ids:
        tax_doc = db.query(TaxDocument).filter(TaxDocument.id == doc_id).first()
        if not tax_doc:
            continue
        if tax_doc.is_imported:
            imported += 1
            continue
        kb_category_id = category_id_map.get(tax_doc.category_id)
        try:
            result = asyncio.run(kb_client.import_document(tax_doc, kb_id=kb_id, category_id=kb_category_id))
            if result and result.get('success'):
                tax_doc.is_imported = True
                tax_doc.knowledge_doc_id = result.get('knowledge_doc_id')
                db.commit()
                imported += 1
            else:
                logger.warning(f"[batch-import] 导入失败 doc_id={doc_id} error={(result.get('error') if result else 'unknown')}")
        except Exception as e:
            db.rollback()
            logger.error(f'[batch-import] 导入异常 doc_id={doc_id}: {e}'.opt(exception=True))
    logger.info(f'[batch-import] 本批完成: requested={len(batch_doc_ids)} imported={imported}')
    return imported

@celery_app.task(bind=True, base=DatabaseTask, queue='data_center_queue', name='app.tasks.processor_tasks.process_category_task')
def process_category_task(self, task_id: str, category_id: int, mode: str='full', force: bool=False):
    logger.info(f'[{task_id}] 开始处理分类 {category_id}，模式: {mode}')
    task = _get_or_create_task(self.db, task_id, category_id, mode)
    checkpoint = CheckpointManager(task_id=task_id, category_id=category_id)
    completed_urls = checkpoint.load_completed_urls()
    resume_page = checkpoint.load_page_cursor()
    try:
        _update_task(self.db, task, status='running', started_at=datetime.utcnow(), progress=0)
        category_processor = CategoryProcessor()
        config = category_processor.get_category_config(category_id)
        if not config:
            _update_task(self.db, task, status='failed', error_message='未知的分类ID', progress=0)
            return {'success': False, 'error': '未知的分类ID'}
        processor_engine = ProcessorEngine(timeout=settings.crawler_request_timeout, max_retries=settings.crawler_max_retries, delay_min=settings.crawler_delay_min, delay_max=settings.crawler_delay_max)
        total_pages = category_processor.calculate_total_pages(category_id)
        all_documents = []
        start_page = resume_page + 1
        for page in range(start_page, total_pages + 1):
            page_url = category_processor.build_page_url(category_id, page)
            logger.info(f'[{task_id}] 获取列表页 {page}/{total_pages}: {page_url}')
            docs = asyncio.run(processor_engine.process_list_page(page_url, category_id))
            if docs:
                all_documents.extend(docs)
            list_progress = int(page / total_pages * 20)
            _update_task(self.db, task, progress=list_progress)
            checkpoint.save_page_cursor(page)
        total = len(all_documents)
        logger.info(f'[{task_id}] 共获取 {total} 个新文档链接（已有断点记录 {len(completed_urls)} 条）')
        _update_task(self.db, task, total_count=total + len(completed_urls), progress=20)
        if total == 0 and (not completed_urls):
            _update_task(self.db, task, status='completed', progress=100, completed_at=datetime.utcnow())
            checkpoint.clear()
            return {'success': True, 'task_id': task_id, 'total': 0}
        save_dir = str(settings.raw_data_dir / config['name'])
        Path(save_dir).mkdir(parents=True, exist_ok=True)
        success_count = len(completed_urls)
        failed_count = 0
        kb_client = KnowledgeBaseClient(base_url=settings.base_platform_url, api_key=settings.base_platform_api_key)
        kb_id = settings.base_platform_kb_id
        category_id_map = settings.kb_category_id_map
        staging_builder = StagingPackageBuilder()
        IMPORT_BATCH_SIZE = 500
        pending_import_ids: list[int] = []
        total_imported = 0
        for doc in all_documents:
            doc_url = doc['url']
            if checkpoint.is_done(doc_url):
                success_count += 1
                continue
            if mode == 'full' and (not force):
                existing = self.db.query(TaxDocument.id).filter(TaxDocument.source_url == doc_url).first()
                if existing:
                    checkpoint.mark_done(doc_url)
                    success_count += 1
                    continue
            try:
                result = asyncio.run(processor_engine.process_document(doc_url, category_id, save_dir))
                if result and result.get('success'):
                    _upsert_tax_document(self.db, result)
                    checkpoint.mark_done(doc_url)
                    success_count += 1
                    saved_doc = self.db.query(TaxDocument).filter(TaxDocument.source_url == doc_url).first()
                    if saved_doc:
                        try:
                            staging_builder.save_package(saved_doc)
                        except Exception as pkg_err:
                            logger.warning(f'[{task_id}] staging 包生成失败 doc_id={saved_doc.id}: {pkg_err}')
                        if result.get('attachments'):
                            _rewrite_attachment_urls(self.db, saved_doc.id, result['attachments'])
                        pending_import_ids.append(saved_doc.id)
                    if len(pending_import_ids) >= IMPORT_BATCH_SIZE:
                        logger.info(f'[{task_id}] 触发批量推送: {len(pending_import_ids)} 条')
                        imported = _flush_import_batch(self.db, kb_client, pending_import_ids, kb_id=kb_id, category_id_map=category_id_map)
                        total_imported += imported
                        pending_import_ids = []
                else:
                    failed_count += 1
                    err = result.get('error', '未知错误') if result else '处理失败'
                    logger.warning(f'[{task_id}] 文档处理失败: {doc_url} — {err}')
            except Exception as doc_err:
                failed_count += 1
                logger.error(f'[{task_id}] 文档异常: {doc_url} — {doc_err}'.opt(exception=True))
            all_total = total + len(completed_urls)
            processed = success_count + failed_count
            progress = 20 + int(processed / max(all_total, 1) * 80)
            _update_task(self.db, task, success_count=success_count, failed_count=failed_count, progress=progress)
        checkpoint.flush()
        if pending_import_ids:
            logger.info(f'[{task_id}] 推送最终批次: {len(pending_import_ids)} 条')
            imported = _flush_import_batch(self.db, kb_client, pending_import_ids, kb_id=kb_id, category_id_map=category_id_map)
            total_imported += imported
            pending_import_ids = []
        logger.info(f'[{task_id}] 知识库导入完成: total_imported={total_imported}')
        _update_task(self.db, task, status='completed', progress=100, completed_at=datetime.utcnow())
        if failed_count == 0:
            checkpoint.clear()
        logger.info(f'[{task_id}] 完成: total={total + len(completed_urls)} success={success_count} failed={failed_count}')
        return {'success': True, 'task_id': task_id, 'category_id': category_id, 'total': total + len(completed_urls), 'success_count': success_count, 'failed_count': failed_count}
    except Exception as e:
        logger.error(f'[{task_id}] 处理分类任务失败: {e}'.opt(exception=True))
        checkpoint.flush()
        _update_task(self.db, task, status='failed', error_message=str(e), completed_at=datetime.utcnow())
        return {'success': False, 'error': str(e)}

@celery_app.task(base=DatabaseTask, bind=True, name='app.tasks.processor_tasks.process_document_task')
def process_document_task(self, document_url: str, category_id: int):
    logger.info(f'开始处理文档: {document_url}')
    try:
        processor_engine = ProcessorEngine(timeout=settings.crawler_request_timeout, max_retries=settings.crawler_max_retries, delay_min=settings.crawler_delay_min, delay_max=settings.crawler_delay_max)
        category_processor = CategoryProcessor()
        config = category_processor.get_category_config(category_id)
        if not config:
            return {'success': False, 'error': '未知的分类ID'}
        save_dir = str(settings.raw_data_dir / config['name'])
        Path(save_dir).mkdir(parents=True, exist_ok=True)
        result = asyncio.run(processor_engine.process_document(document_url, category_id, save_dir))
        if result and result.get('success'):
            _upsert_tax_document(self.db, result)
            saved_doc = self.db.query(TaxDocument).filter(TaxDocument.source_url == document_url).first()
            if saved_doc and result.get('attachments'):
                _rewrite_attachment_urls(self.db, saved_doc.id, result['attachments'])
            return {'success': True, 'document_url': document_url, 'title': result.get('title')}
        else:
            return {'success': False, 'error': result.get('error', '处理失败') if result else '处理失败'}
    except Exception as e:
        logger.error(f'处理文档任务失败: {e}'.opt(exception=True))
        return {'success': False, 'error': str(e)}

@celery_app.task(base=DatabaseTask, bind=True, name='app.tasks.processor_tasks.reprocess_from_local_task')
def reprocess_from_local_task(self, tax_doc_id: int):
    doc = self.db.query(TaxDocument).filter(TaxDocument.id == tax_doc_id).first()
    if not doc:
        return {'success': False, 'error': '文档不存在'}
    processor_engine = ProcessorEngine(timeout=settings.crawler_request_timeout, max_retries=settings.crawler_max_retries, delay_min=settings.crawler_delay_min, delay_max=settings.crawler_delay_max)
    category_processor = CategoryProcessor()
    config = category_processor.get_category_config(doc.category_id)
    save_dir = str(settings.raw_data_dir / config['name']) if config else str(settings.raw_data_dir)
    Path(save_dir).mkdir(parents=True, exist_ok=True)
    if doc.content_html:
        result = asyncio.run(processor_engine.reprocess_from_html(doc.content_html, doc.source_url, doc.category_id, save_dir))
    else:
        result = asyncio.run(processor_engine.process_document(doc.source_url, doc.category_id, save_dir))
    if result and result.get('success'):
        if not result.get('attachments') and doc.attachments:
            result['attachments'] = doc.attachments
        _upsert_tax_document(self.db, result)
        return {'success': True, 'doc_id': tax_doc_id, 'title': result.get('title')}
    else:
        return {'success': False, 'error': result.get('error', '处理失败') if result else '处理失败'}

@celery_app.task(base=DatabaseTask, bind=True, name='app.tasks.processor_tasks.import_to_knowledge_base_task')
def import_to_knowledge_base_task(self, tax_doc_id: int):
    logger.info(f'开始导入文档 {tax_doc_id} 到知识库')
    try:
        tax_doc = self.db.query(TaxDocument).filter(TaxDocument.id == tax_doc_id).first()
        if not tax_doc:
            return {'success': False, 'error': '文档不存在'}
        if tax_doc.is_imported:
            return {'success': True, 'message': '文档已导入', 'knowledge_doc_id': tax_doc.knowledge_doc_id}
        kb_client = KnowledgeBaseClient(base_url=settings.base_platform_url, api_key=settings.base_platform_api_key)
        kb_id = settings.base_platform_kb_id
        kb_category_id = settings.kb_category_id_map.get(tax_doc.category_id)
        result = asyncio.run(kb_client.import_document(tax_doc, kb_id=kb_id, category_id=kb_category_id))
        if result and result.get('success'):
            tax_doc.is_imported = True
            tax_doc.knowledge_doc_id = result.get('knowledge_doc_id')
            self.db.commit()
            return {'success': True, 'tax_doc_id': tax_doc_id, 'knowledge_doc_id': result.get('knowledge_doc_id')}
        else:
            return {'success': False, 'error': result.get('error', '导入失败') if result else '导入失败'}
    except Exception as e:
        logger.error(f'导入文档任务失败: {e}'.opt(exception=True))
        return {'success': False, 'error': str(e)}

@celery_app.task(base=DatabaseTask, bind=True, name='app.tasks.processor_tasks.backfill_attachment_content_task')
def backfill_attachment_content_task(self, doc_ids: list[int] | None=None):
    import json
    from datetime import datetime

    from app.services.tax_data_processor.converters.doc_converter import DocConverter
    from app.services.tax_data_processor.document_cleaner import DocumentCleaner
    skipped_log_path = '/tmp/skipped_ocr_attachments.json'
    skipped_records = []
    if doc_ids is None:
        from sqlalchemy import text as sa_text
        rows = self.db.query(TaxDocument.id).filter(TaxDocument.attachments.isnot(None), sa_text("attachments::text != 'null'"), sa_text("attachments::text != '[]'"), TaxDocument.content_markdown.notlike('%## 附件%')).all()
        doc_ids = [r[0] for r in rows]
    total = len(doc_ids)
    logger.info(f'[backfill_attachment] 待补填文档数: {total}')
    doc_converter = DocConverter()
    cleaner = DocumentCleaner()
    labels = '一二三四五六七八九十'
    success_count = 0
    failed_count = 0
    skipped_count = 0
    for doc_id in doc_ids:
        doc = self.db.query(TaxDocument).filter(TaxDocument.id == doc_id).first()
        if not doc:
            skipped_count += 1
            continue
        attachments: list = doc.attachments or []
        if not attachments:
            skipped_count += 1
            continue
        attachment_sections = []
        image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.tif', '.webp'}
        skipped_extensions = {'.pdf'}
        LARGE_FILE_THRESHOLD = 10 * 1024 * 1024
        for i, att in enumerate(attachments):
            local_path = att.get('path', '')
            if not local_path or not os.path.isfile(local_path):
                logger.debug(f'[backfill_attachment] doc_id={doc_id} 附件文件不存在: {local_path}')
                continue
            file_ext = os.path.splitext(local_path)[1].lower()
            if file_ext in image_extensions:
                logger.debug(f"[backfill_attachment] doc_id={doc_id} 跳过扫描附件: {att.get('name')} ({file_ext})")
                skipped_records.append({'doc_id': doc_id, 'attachment_name': att.get('name'), 'file_path': local_path, 'reason': '扫描附件（图片）', 'file_extension': file_ext, 'file_size_mb': os.path.getsize(local_path) / 1024 / 1024 if os.path.exists(local_path) else 0})
                continue
            if file_ext in skipped_extensions:
                logger.debug(f"[backfill_attachment] doc_id={doc_id} 跳过PDF文件: {att.get('name')} ({file_ext})")
                skipped_records.append({'doc_id': doc_id, 'attachment_name': att.get('name'), 'file_path': local_path, 'reason': 'PDF文件（需要OCR，处理慢）', 'file_extension': file_ext, 'file_size_mb': os.path.getsize(local_path) / 1024 / 1024 if os.path.exists(local_path) else 0})
                continue
            if file_ext in skipped_extensions:
                logger.debug(f"[backfill_attachment] doc_id={doc_id} 跳过PDF文件: {att.get('name')} ({file_ext})")
                skipped_records.append({'doc_id': doc_id, 'attachment_name': att.get('name'), 'file_path': local_path, 'reason': 'PDF文件（需要OCR，处理慢）', 'file_extension': file_ext, 'file_size_mb': os.path.getsize(local_path) / 1024 / 1024 if os.path.exists(local_path) else 0})
                continue
            file_size = os.path.getsize(local_path)
            if file_size > LARGE_FILE_THRESHOLD:
                logger.warning(f"[backfill_attachment] doc_id={doc_id} 跳过大文件: {att.get('name')} (大小: {file_size / 1024 / 1024:.2f}MB, 阈值: {LARGE_FILE_THRESHOLD / 1024 / 1024:.0f}MB)")
                skipped_records.append({'doc_id': doc_id, 'attachment_name': att.get('name'), 'file_path': local_path, 'reason': f'文件过大 ({file_size / 1024 / 1024:.2f}MB > {LARGE_FILE_THRESHOLD / 1024 / 1024:.0f}MB)', 'file_extension': file_ext, 'file_size_mb': file_size / 1024 / 1024})
                continue
            try:
                conv_result = asyncio.run(doc_converter.convert_local(local_path))
                if conv_result.get('success') and conv_result.get('markdown', '').strip():
                    attachment_sections.append({'name': att.get('name', f'附件{i + 1}'), 'markdown': conv_result['markdown']})
                    logger.info(f"[backfill_attachment] doc_id={doc_id} 附件转换成功: {att.get('name')} (quality={conv_result['parse_quality_score']:.2f})")
            except Exception as e:
                logger.warning(f"[backfill_attachment] doc_id={doc_id} 附件转换异常: {att.get('name')} — {e}")
        if not attachment_sections:
            skipped_count += 1
            continue
        use_ordinal = len(attachment_sections) > 1
        parts = []
        for j, sec in enumerate(attachment_sections):
            if use_ordinal:
                label = labels[j] if j < len(labels) else str(j + 1)
                heading = f"附件{label}：{sec['name']}"
            else:
                heading = f"附件：{sec['name']}"
            parts.append(f"\n\n---\n\n## {heading}\n\n{sec['markdown']}")
        extra_md = ''.join(parts)
        new_markdown = (doc.content_markdown or '') + extra_md
        new_text = cleaner.markdown_to_rag_text(new_markdown).replace('\x00', '')
        try:
            doc.content_markdown = new_markdown.replace('\x00', '')
            doc.content_text = new_text
            self.db.commit()
            success_count += 1
            logger.info(f'[backfill_attachment] doc_id={doc_id} 补填完成，追加 {len(attachment_sections)} 个附件')
        except Exception as e:
            self.db.rollback()
            failed_count += 1
            logger.error(f'[backfill_attachment] doc_id={doc_id} 写库失败: {e}')
    if skipped_records:
        try:
            with open(skipped_log_path, 'w', encoding='utf-8') as f:
                json.dump({'timestamp': datetime.now().isoformat(), 'total_skipped': len(skipped_records), 'skipped_items': skipped_records}, f, ensure_ascii=False, indent=2)
            logger.info(f'[backfill_attachment] 跳过记录已保存到: {skipped_log_path} (共 {len(skipped_records)} 条)')
        except Exception as e:
            logger.error(f'[backfill_attachment] 保存跳过记录失败: {e}')
    logger.info(f'[backfill_attachment] 完成: total={total} success={success_count} failed={failed_count} skipped={skipped_count}')
    return {'success': True, 'total': total, 'success_count': success_count, 'failed_count': failed_count, 'skipped_count': skipped_count}

@celery_app.task(base=DatabaseTask, bind=True, name='app.tasks.processor_tasks.fill_content_text_task')
def fill_content_text_task(self, batch_size: int=200):
    from app.services.tax_data_processor.document_cleaner import DocumentCleaner
    cleaner = DocumentCleaner()
    total = self.db.query(TaxDocument).filter(TaxDocument.content_markdown.isnot(None), TaxDocument.content_text.is_(None)).count()
    logger.info(f'[fill_content_text] 待填充文档数: {total}')
    processed = 0
    while True:
        docs = self.db.query(TaxDocument).filter(TaxDocument.content_markdown.isnot(None), TaxDocument.content_text.is_(None)).order_by(TaxDocument.id).limit(batch_size).all()
        if not docs:
            break
        for doc in docs:
            try:
                text = cleaner.markdown_to_rag_text(doc.content_markdown)
                doc.content_text = text.replace('\x00', '') if text else ''
            except Exception as e:
                logger.warning(f'[fill_content_text] doc_id={doc.id} 转换失败: {e}')
                doc.content_text = ''
        self.db.commit()
        processed += len(docs)
        logger.info(f'[fill_content_text] 进度: {processed}/{total}')
    logger.info(f'[fill_content_text] 完成，共处理 {processed} 条')
    return {'success': True, 'total': total, 'processed': processed}

@celery_app.task(base=DatabaseTask, bind=True, name='app.tasks.processor_tasks.incremental_update_task')
def incremental_update_task(self, category_ids: list[int] | None=None, window_days: int=7, source_ids: list[int] | None=None, task_id: str | None=None):
    scope = f"categories={category_ids or 'all'} window={window_days}天"
    logger.info(f'开始执行增量更新: {scope}')
    task = None
    if task_id:
        task = self.db.query(DataProcessingTask).filter(DataProcessingTask.task_id == task_id).first()
        if task:
            _update_task(self.db, task, status='running', progress=10)
    try:
        updater = IncrementalUpdater(self.db)
        target_categories = category_ids if category_ids else list(range(1, 9))
        total_categories = len(target_categories)
        results = []
        for idx, cid in enumerate(target_categories):
            if task:
                progress = 10 + int(idx / total_categories * 80)
                _update_task(self.db, task, progress=progress)
            result = asyncio.run(updater.check_category_updates(cid, window_days=window_days))
            results.append(result)
            logger.info(f"[增量] 分类 {cid} 检查完成: 新增={len(result.get('new_documents', []))} 更新={len(result.get('updated_documents', []))}")
        total_new = 0
        total_updated = 0
        total_revectorized = 0
        kb_client_incremental: KnowledgeBaseClient | None = None

        def _get_kb_client() -> KnowledgeBaseClient:
            nonlocal kb_client_incremental
            if kb_client_incremental is None:
                kb_client_incremental = KnowledgeBaseClient(base_url=settings.base_platform_url, api_key=settings.base_platform_api_key)
            return kb_client_incremental
        if task:
            _update_task(self.db, task, progress=90)
        for result in results:
            if result.get('error'):
                logger.error(f"[增量] 分类 {result.get('category_id')} 检查失败: {result.get('error')}")
                continue
            category_id = result['category_id']
            for doc in result.get('new_documents', []):
                process_document_task.delay(doc['url'], category_id)
                total_new += 1
            for doc in result.get('updated_documents', []):
                doc_url = doc['url']
                existing = self.db.query(TaxDocument).filter(TaxDocument.source_url == doc_url).first()
                if existing and existing.is_imported and existing.knowledge_doc_id:
                    try:
                        from pathlib import Path

                        from app.services.tax_data_processor.category_processor import (
                            CategoryProcessor,
                        )
                        from app.services.tax_data_processor.processor_engine import ProcessorEngine
                        category_processor = CategoryProcessor()
                        config = category_processor.get_category_config(category_id)
                        save_dir = str(settings.raw_data_dir / config['name']) if config else str(settings.raw_data_dir / 'incremental')
                        Path(save_dir).mkdir(parents=True, exist_ok=True)
                        engine = ProcessorEngine(timeout=settings.crawler_request_timeout, max_retries=settings.crawler_max_retries, delay_min=settings.crawler_delay_min, delay_max=settings.crawler_delay_max)
                        crawl_result = asyncio.run(engine.process_document(doc_url, category_id, save_dir))
                        if crawl_result and crawl_result.get('success'):
                            new_content = crawl_result.get('content_text') or crawl_result.get('content_markdown') or ''
                            if new_content:
                                rv_result = asyncio.run(_get_kb_client().update_document_content(knowledge_doc_id=existing.knowledge_doc_id, new_content=new_content))
                                if rv_result and rv_result.get('success'):
                                    logger.info(f'[增量] 增量重向量化成功 source_url={doc_url} knowledge_doc_id={existing.knowledge_doc_id}')
                                    total_revectorized += 1
                                else:
                                    logger.warning(f"[增量] 增量重向量化失败 source_url={doc_url}: {(rv_result.get('error') if rv_result else 'unknown')}")
                            _upsert_tax_document(self.db, crawl_result)
                            inc_doc = self.db.query(TaxDocument).filter(TaxDocument.source_url == doc_url).first()
                            if inc_doc and crawl_result.get('attachments'):
                                _rewrite_attachment_urls(self.db, inc_doc.id, crawl_result['attachments'])
                        else:
                            err = crawl_result.get('error', '处理失败') if crawl_result else '处理失败'
                            logger.warning(f'[增量] 重爬失败（已导入文档）: {doc_url} — {err}')
                    except Exception as inc_err:
                        logger.error(f'[增量] 已导入文档更新异常: {doc_url} — {inc_err}'.opt(exception=True))
                else:
                    process_document_task.delay(doc_url, category_id)
                total_updated += 1
        if source_ids:
            for sid in source_ids:
                src_result = asyncio.run(updater.check_source(sid, window_days))
                if src_result.get('error'):
                    logger.error(f"[增量] 数据源 {sid} 检查失败: {src_result['error']}")
                    continue
                for _doc in src_result.get('new_documents', []):
                    process_source_task.delay(str(__import__('uuid').uuid4()), sid, mode='full')
                    total_new += 1
        for cid in category_ids or list(range(1, 9)):
            updater.update_last_check_time(cid)
        logger.info(f'[增量] 完成: {scope} — 新增={total_new} 更新={total_updated} 重向量化={total_revectorized}')
        if task:
            _update_task(self.db, task, status='completed', progress=100, total_count=total_new + total_updated, success_count=total_new + total_updated, completed_at=datetime.utcnow())
        return {'success': True, 'scope': scope, 'total_new': total_new, 'total_updated': total_updated, 'total_revectorized': total_revectorized, 'results': [{'category_id': r.get('category_id'), 'category_name': r.get('category_name'), 'new': len(r.get('new_documents', [])), 'updated': len(r.get('updated_documents', [])), 'skipped': r.get('skipped', 0), 'error': r.get('error')} for r in results]}
    except Exception as e:
        logger.error(f'增量更新任务失败: {e}'.opt(exception=True))
        if task:
            _update_task(self.db, task, status='failed', error_message=str(e), completed_at=datetime.utcnow())
        return {'success': False, 'error': str(e)}

@celery_app.task(base=DatabaseTask, bind=True, name='app.tasks.processor_tasks.build_relations_task')
def build_relations_task(self, batch_size: int=200):
    from app.services.tax_data_processor.relation_builder import rebuild_document_relations
    stats = rebuild_document_relations(self.db, batch_size=batch_size, dry_run=False, logger=logger)
    logger.info(f'[build_relations] 完成: {stats.as_dict()}')
    return {'success': True, **stats.as_dict()}

@celery_app.task(base=DatabaseTask, bind=True, name='app.tasks.processor_tasks.fill_superseded_by_task')
def fill_superseded_by_task(self, batch_size: int=200):
    import re as _re
    _PATTERNS = [_re.compile('根据[^，,。\\n]{0,10}?《[^》]{2,60}》[（(]([^）)]{3,80})[）)]'), _re.compile('根据([\\u4e00-\\u9fa5]{2,15}[〔\\[（(]\\d{4}[〕\\]）)]第?\\s*\\d+\\s*号)'), _re.compile('根据((?:国务院令|主席令)第\\s*\\d+\\s*号)')]
    db = self.db
    candidates = db.query(TaxDocument).filter(TaxDocument.doc_status == 'obsolete', TaxDocument.superseded_by_doc_id.is_(None), TaxDocument.superseded_by_doc_number.is_(None), TaxDocument.content_markdown.isnot(None)).all()
    logger.info(f'[fill_superseded_by] 候选文档: {len(candidates)}')
    indexed_docs = db.query(TaxDocument.id, TaxDocument.title, TaxDocument.doc_number).filter(TaxDocument.doc_number.isnot(None)).all()
    docs_by_normalized_number: dict[str, list] = {}
    for indexed_doc in indexed_docs:
        normalized = normalize_legal_doc_number(indexed_doc.doc_number)
        if normalized:
            docs_by_normalized_number.setdefault(normalized, []).append(indexed_doc)

    def _safe_normalized_match(doc_number: str, head: str, current_doc_id: int):
        normalized = normalize_legal_doc_number(doc_number)
        if not normalized:
            return None
        matches = [item for item in docs_by_normalized_number.get(normalized, []) if item.id != current_doc_id]
        if not matches:
            return None
        title_matches = [item for item in matches if item.title and item.title in head]
        if len(title_matches) == 1:
            return title_matches[0]
        if len(matches) == 1 and (_re.search('\\d{4}', doc_number) or len(normalized) >= 12):
            return matches[0]
        return None
    filled = 0
    for doc in candidates:
        head = (doc.content_markdown or '')[:500]
        if not any(k in head for k in ['废止', '失效']):
            continue
        doc_number = None
        for pat in _PATTERNS:
            m = pat.search(head)
            if m:
                doc_number = m.group(1).strip()
                break
        if not doc_number:
            continue
        matched = db.query(TaxDocument.id).filter(TaxDocument.doc_number == doc_number, TaxDocument.id != doc.id).first()
        if not matched:
            matched = _safe_normalized_match(doc_number, head, doc.id)
        if matched:
            doc.superseded_by_doc_id = matched.id
        else:
            doc.superseded_by_doc_number = doc_number
        filled += 1
    try:
        db.commit()
    except Exception as e:
        db.rollback()
        logger.error(f'[fill_superseded_by] commit 失败: {e}')
    logger.info(f'[fill_superseded_by] 完成: 填充 {filled}/{len(candidates)} 条')
    return {'success': True, 'total': len(candidates), 'filled': filled}

@celery_app.task(base=DatabaseTask, bind=True, name='app.tasks.processor_tasks.fill_effective_dates_task')
def fill_effective_dates_task(self, batch_size: int=500):
    from app.services.tax_data_processor.parsers.metadata_extractor import MetadataExtractor
    regulation_doc_types = {'regulation', 'local_policy', 'normative'}
    metadata_extractor = MetadataExtractor()

    def _extract(md: str, issue_date) -> str | None:
        explicit_date = metadata_extractor._extract_effective_date(md, str(issue_date) if issue_date else None)
        if explicit_date:
            return explicit_date
        return str(issue_date) if issue_date else None
    db = self.db
    total = db.query(TaxDocument).filter(TaxDocument.doc_type.in_(regulation_doc_types), TaxDocument.effective_date.is_(None)).count()
    logger.info(f'[fill_effective_dates] 需要回填: {total} 条')
    filled = 0
    offset = 0
    while True:
        batch = db.query(TaxDocument).filter(TaxDocument.doc_type.in_(regulation_doc_types), TaxDocument.effective_date.is_(None)).order_by(TaxDocument.id).limit(batch_size).all()
        if not batch:
            break
        for doc in batch:
            md = doc.content_markdown or ''
            result = _extract(md, doc.issue_date)
            if result:
                doc.effective_date = result
                filled += 1
        try:
            db.commit()
        except Exception as e:
            db.rollback()
            logger.error(f'[fill_effective_dates] commit 失败 offset={offset}: {e}')
        offset += len(batch)
        logger.info(f'[fill_effective_dates] 进度: {offset}/{total} 已填: {filled}')
    logger.info(f'[fill_effective_dates] 完成: total={total} filled={filled}')
    return {'success': True, 'total': total, 'filled': filled}

@celery_app.task(base=DatabaseTask, bind=True, name='app.tasks.processor_tasks.strip_ocr_content_task')
def strip_ocr_content_task(self):
    from sqlalchemy import or_

    from app.services.tax_data_processor.document_cleaner import DocumentCleaner
    db = self.db
    docs = db.query(TaxDocument).filter(or_(TaxDocument.content_markdown.like('%正文图片内容%'), TaxDocument.content_markdown.like('%<!-- 图片%'))).all()
    total = len(docs)
    logger.info(f'[strip_ocr_content] 找到 {total} 条含 OCR 内容的文档')
    cleaned = 0
    for doc in docs:
        original = doc.content_markdown or ''
        stripped = DocumentCleaner.strip_ocr_content(original)
        if stripped != original:
            doc.content_markdown = stripped
            cleaned += 1
    try:
        db.commit()
    except Exception as e:
        db.rollback()
        logger.error(f'[strip_ocr_content] commit 失败: {e}')
        raise
    logger.info(f'[strip_ocr_content] 完成: total={total} cleaned={cleaned}')
    return {'success': True, 'total': total, 'cleaned': cleaned}

@celery_app.task(base=DatabaseTask, bind=True, name='app.tasks.processor_tasks.redownload_missing_attachments_task')
def redownload_missing_attachments_task(self):
    from app.services.tax_data_processor.parsers.attachment_parser import AttachmentParser
    db = self.db
    AttachmentParser(timeout=60, max_retries=3)
    from sqlalchemy import text as sa_text
    docs = db.query(TaxDocument).filter(TaxDocument.attachments.isnot(None), sa_text("attachments::text != 'null'"), sa_text("attachments::text != '[]'")).all()
    missing_total = 0
    downloaded = 0
    failed = 0
    for doc in docs:
        attachments = doc.attachments or []
        changed = False
        for idx, att in enumerate(attachments):
            path = att.get('path', '')
            if path and os.path.isfile(path):
                continue
            url = att.get('original_url') or att.get('url', '')
            if not url or url.startswith('/api/'):
                logger.warning(f'[redownload] doc_id={doc.id} 附件[{idx}] 无有效 URL，跳过')
                missing_total += 1
                failed += 1
                continue
            from urllib.parse import urljoin
            if '/zcfgk/' in url and doc.source_url:
                parts = url.rsplit('/', 1)
                if len(parts) == 2:
                    base_url = doc.source_url.rsplit('/', 1)[0]
                    url = urljoin(base_url + '/', parts[1])
            if not path:
                att_dir = os.path.join(settings.raw_data_dir, 'attachments', doc.title)
                os.makedirs(att_dir, exist_ok=True)
                name = att.get('name') or os.path.basename(url.rstrip('/'))
                path = os.path.join(att_dir, name)
            missing_total += 1
            logger.info(f'[redownload] doc_id={doc.id} 附件[{idx}] 缺失，重新下载: {url}')
            import httpx
            headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36', 'Referer': doc.source_url}
            try:

                async def _download(url=url, headers=headers, path=path):
                    async with httpx.AsyncClient(follow_redirects=True, timeout=30) as client:
                        r = await client.get(url, headers=headers)
                        if r.status_code == 200:
                            with open(path, 'wb') as f:
                                f.write(r.content)
                            return {'success': True}
                        else:
                            return {'success': False, 'error': f'HTTP {r.status_code}'}
                result = asyncio.run(_download())
            except Exception as e:
                result = {'success': False, 'error': str(e)}
            if result['success']:
                att['path'] = path
                downloaded += 1
                changed = True
            else:
                failed += 1
                logger.warning(f"[redownload] doc_id={doc.id} 附件[{idx}] 下载失败: {result.get('error')}")
        if changed:
            try:
                from sqlalchemy.orm.attributes import flag_modified
                flag_modified(doc, 'attachments')
                db.add(doc)
                db.commit()
            except Exception as e:
                db.rollback()
                logger.error(f'[redownload] doc_id={doc.id} commit 失败: {e}')
    logger.info(f'[redownload] 完成: missing={missing_total} downloaded={downloaded} failed={failed}')
    return {'success': True, 'missing_total': missing_total, 'downloaded': downloaded, 'failed': failed}

@celery_app.task(base=DatabaseTask, bind=True, name='app.tasks.processor_tasks.fix_json_null_task')
def fix_json_null_task(self):
    from sqlalchemy import text
    db = self.db
    r1 = db.execute(text("\n        UPDATE tax_documents\n        SET supersedes = NULL\n        WHERE supersedes IS NOT NULL AND jsonb_typeof(supersedes) = 'null'\n    "))
    r2 = db.execute(text('\n        UPDATE tax_documents\n        SET "references" = NULL\n        WHERE "references" IS NOT NULL AND jsonb_typeof("references") = \'null\'\n    '))
    try:
        db.commit()
    except Exception as e:
        db.rollback()
        logger.error(f'[fix_json_null] commit 失败: {e}')
        raise
    sup_fixed = r1.rowcount
    ref_fixed = r2.rowcount
    logger.info(f'[fix_json_null] 完成: supersedes={sup_fixed} references={ref_fixed}')
    return {'success': True, 'supersedes_fixed': sup_fixed, 'references_fixed': ref_fixed}

@celery_app.task(base=DatabaseTask, bind=True, name='app.tasks.processor_tasks.fill_issuing_authority_task')
def fill_issuing_authority_task(self, batch_size: int=200):
    from app.services.tax_data_processor.parsers.metadata_extractor import MetadataExtractor
    extractor = MetadataExtractor()
    db = self.db
    total = db.query(TaxDocument).filter(TaxDocument.issuing_authority.is_(None)).count()
    logger.info(f'[fill_issuing_authority] 需要回填: {total} 条')
    filled = 0
    offset = 0
    while True:
        batch = db.query(TaxDocument).filter(TaxDocument.issuing_authority.is_(None)).order_by(TaxDocument.id).limit(batch_size).all()
        if not batch:
            break
        for doc in batch:
            authority = None
            if doc.doc_number:
                import re as _re
                m = _re.match('^([\\u4e00-\\u9fa5][\\u4e00-\\u9fa5 ]{3,60}?)(?:公告|令|函|发|〔|\\[|（|\\()', doc.doc_number)
                if m:
                    authority = m.group(1).strip()
            if not authority:
                content = (doc.content_markdown or '')[:500]
                authority = extractor._extract_issuing_authority(doc.title or '', content)
            if authority:
                doc.issuing_authority = authority
                filled += 1
        try:
            db.commit()
        except Exception as e:
            db.rollback()
            logger.error(f'[fill_issuing_authority] commit 失败 offset={offset}: {e}')
        offset += len(batch)
        logger.info(f'[fill_issuing_authority] 进度: {offset}/{total} 已填: {filled}')
    logger.info(f'[fill_issuing_authority] 完成: total={total} filled={filled}')
    return {'success': True, 'total': total, 'filled': filled}

def _upsert_tax_document_v2(db, result, source_id: int) -> bool:
    try:
        content_html = getattr(result, 'content_html', None) or ''
        content_markdown = getattr(result, 'content_markdown', None) or ''
        content_text = getattr(result, 'content_text', None) or ''
        content_html = content_html.replace('\x00', '')
        content_markdown = content_markdown.replace('\x00', '')
        content_text = content_text.replace('\x00', '')
        _archive_previous_version(db, result.source_url, {'content_hash': getattr(result, 'content_hash', None), 'title': result.title})
        stmt = pg_insert(TaxDocument).values(source_url=result.source_url, category_id=0, source_id=source_id, title=result.title, doc_number=getattr(result, 'doc_number', None), issuing_authority=getattr(result, 'issuing_authority', None), issue_date=getattr(result, 'issue_date', None), effective_date=getattr(result, 'effective_date', None), doc_status=getattr(result, 'doc_status', None) or None, doc_type=getattr(result, 'doc_type', None), region_code=getattr(result, 'region_code', None), qa_question=getattr(result, 'qa_question', None), qa_answer=getattr(result, 'qa_answer', None), interpretation_form=getattr(result, 'interpretation_form', None), content_html=content_html, content_markdown=content_markdown, content_text=content_text, content_hash=getattr(result, 'content_hash', None), attachments=getattr(result, 'attachments', None), inline_images=getattr(result, 'inline_images', None), inline_videos=getattr(result, 'inline_videos', None), fetch_strategy=getattr(result, 'fetch_strategy', None) or 'normal', supersedes=getattr(result, 'supersedes', None), references=getattr(result, 'references', None), processing_status='completed').on_conflict_do_update(index_elements=['source_url'], set_={'source_id': source_id, 'title': result.title, 'doc_number': getattr(result, 'doc_number', None), 'issuing_authority': getattr(result, 'issuing_authority', None), 'issue_date': getattr(result, 'issue_date', None), 'effective_date': getattr(result, 'effective_date', None), 'doc_status': getattr(result, 'doc_status', None) or None, 'doc_type': getattr(result, 'doc_type', None), 'region_code': getattr(result, 'region_code', None), 'qa_question': getattr(result, 'qa_question', None), 'qa_answer': getattr(result, 'qa_answer', None), 'interpretation_form': getattr(result, 'interpretation_form', None), 'content_html': content_html, 'content_markdown': content_markdown, 'content_text': content_text, 'content_hash': getattr(result, 'content_hash', None), 'attachments': getattr(result, 'attachments', None), 'inline_images': getattr(result, 'inline_images', None), 'inline_videos': getattr(result, 'inline_videos', None), 'fetch_strategy': getattr(result, 'fetch_strategy', None) or 'normal', 'supersedes': getattr(result, 'supersedes', None), 'references': getattr(result, 'references', None), 'processing_status': 'completed', 'updated_at': datetime.utcnow()})
        db.execute(stmt)
        db.commit()
        return True
    except IntegrityError as e:
        db.rollback()
        logger.warning(f'[upsert_v2] 写入冲突已忽略: {result.source_url} — {e}')
        return False
    except Exception:
        db.rollback()
        import traceback
        logger.error(f'[upsert_v2] 写入失败: {result.source_url}')
        logger.error(f'[upsert_v2] 异常详情: {traceback.format_exc()}')
        return False

@celery_app.task(bind=True, base=DatabaseTask, queue='data_center_queue', name='app.tasks.processor_tasks.process_source_task')
def process_source_task(self, task_id: str, source_id: int, mode: str='full', force: bool=False, start_page: int=None, end_page: int=None):
    from app.models.tax_data import DataSource
    from app.services.tax_data_processor.adapters import get_adapter
    source = self.db.query(DataSource).filter(DataSource.id == source_id).first()
    if not source:
        return {'success': False, 'error': '数据源不存在'}
    task = _get_or_create_task(self.db, task_id, 0, mode)
    task.source_id = source_id
    self.db.commit()
    engine = ProcessorEngine(timeout=settings.crawler_request_timeout, max_retries=source.max_retries, delay_min=source.request_delay_min, delay_max=source.request_delay_max)
    adapter = get_adapter(source, engine)
    checkpoint = CheckpointManager(task_id=task_id, category_id=source_id)
    completed_urls = checkpoint.load_completed_urls()
    try:
        _update_task(self.db, task, status='running', started_at=datetime.utcnow(), progress=0)
        existing_urls: set = {url for url, in self.db.query(TaxDocument.source_url).filter(TaxDocument.source_id == source_id).all()}
        logger.info(f'[{task_id}] 已加载 {len(existing_urls)} 条已有 URL')
        kb_client = KnowledgeBaseClient(base_url=settings.base_platform_url, api_key=settings.base_platform_api_key)
        kb_id = settings.base_platform_kb_id
        IMPORT_BATCH_SIZE = 500
        pending_import_ids = []
        total_imported = 0
        success_count = len(completed_urls)
        failed_count = 0
        processed = 0
        concurrency = getattr(source, 'concurrency', None) or 3
        state = {'success_count': success_count, 'failed_count': failed_count, 'processed': processed, 'total_imported': total_imported, 'pending_import_ids': pending_import_ids}
        from app.services.tax_data_processor.parsers.attachment_parser import AttachmentParser
        attachment_parser = AttachmentParser(timeout=60, max_retries=3)
        save_dir_source = str(settings.raw_data_dir / source.code)
        Path(save_dir_source).mkdir(parents=True, exist_ok=True)
        sem = asyncio.Semaphore(concurrency)

        async def _process_one(item):
            async with sem:
                try:
                    parsed = await adapter.parse_document(item)
                    if parsed:
                        if parsed.attachments:
                            downloaded = []
                            for att in parsed.attachments:
                                dl = await attachment_parser.download_attachment(att['url'], str(Path(save_dir_source) / att['name']), base_dir=settings.project_root)
                                if dl['success']:
                                    downloaded.append({'name': att['name'], 'url': att['url'], 'path': dl['path'], 'size': dl.get('size', 0), 'type': att.get('type', '')})
                            parsed.attachments = downloaded
                        _upsert_tax_document_v2(self.db, parsed, source_id)
                        checkpoint.mark_done(item.url)
                        existing_urls.add(item.url)
                        state['success_count'] += 1
                        saved = self.db.query(TaxDocument).filter(TaxDocument.source_url == item.url).first()
                        if saved:
                            if parsed.attachments:
                                _rewrite_attachment_urls(self.db, saved.id, parsed.attachments)
                            if source.code != 'qa_12366':
                                state['pending_import_ids'].append(saved.id)
                        if len(state['pending_import_ids']) >= IMPORT_BATCH_SIZE:
                            state['total_imported'] += _flush_import_batch(self.db, kb_client, state['pending_import_ids'], kb_id=kb_id)
                            state['pending_import_ids'].clear()
                    else:
                        state['failed_count'] += 1
                except Exception as e:
                    state['failed_count'] += 1
                    logger.error(f'[{task_id}] 文档处理异常: {item.url} — {e}'.opt(exception=True))
                state['processed'] += 1
                _update_task(self.db, task, success_count=state['success_count'], failed_count=state['failed_count'], progress=min(20 + state['processed'], 99))

        async def _run():
            tasks = []
            async for item in adapter.list_all_documents(start_page=start_page, end_page=end_page):
                if checkpoint.is_done(item.url):
                    state['success_count'] += 1
                    continue
                if mode == 'full' and (not force):
                    if item.url in existing_urls:
                        checkpoint.mark_done(item.url)
                        state['success_count'] += 1
                        continue
                tasks.append(asyncio.ensure_future(_process_one(item)))
                if len(tasks) >= concurrency * 4:
                    results = await asyncio.gather(*tasks, return_exceptions=True)
                    for r in results:
                        if isinstance(r, Exception):
                            logger.error(f'[{task_id}] gather异常: {r}', exc_info=r)
                    tasks.clear()
            if tasks:
                results = await asyncio.gather(*tasks, return_exceptions=True)
                for r in results:
                    if isinstance(r, Exception):
                        logger.error(f'[{task_id}] gather异常: {r}', exc_info=r)
        asyncio.run(_run())
        success_count = state['success_count']
        failed_count = state['failed_count']
        total_imported = state['total_imported']
        pending_import_ids = state['pending_import_ids']
        checkpoint.flush()
        if pending_import_ids:
            total_imported += _flush_import_batch(self.db, kb_client, pending_import_ids, kb_id=kb_id)
        _update_task(self.db, task, status='completed', progress=100, completed_at=datetime.utcnow())
        if failed_count == 0:
            checkpoint.clear()
        return {'success': True, 'task_id': task_id, 'source_id': source_id, 'success_count': success_count, 'failed_count': failed_count}
    except BaseException as e:
        import traceback
        tb = traceback.format_exc()
        logger.error(f'[{task_id}] process_source_task 异常: {type(e).__name__}: {e}\n{tb}')
        checkpoint.flush()
        _update_task(self.db, task, status='failed', error_message=f'{type(e).__name__}: {e}', completed_at=datetime.utcnow())
        return {'success': False, 'error': str(e)}

@celery_app.task(bind=True, base=DatabaseTask, queue='data_center_queue', name='app.tasks.processor_tasks.backfill_media_task')
def backfill_media_task(self, batch_size: int=50):
    from bs4 import BeautifulSoup
    from sqlalchemy.orm.attributes import flag_modified

    from app.services.tax_data_processor.category_processor import CategoryProcessor
    from app.services.tax_data_processor.parsers.attachment_parser import AttachmentParser
    from app.services.tax_data_processor.parsers.html_parser import HTMLParser
    from app.services.tax_data_processor.processor_engine import ProcessorEngine
    db = self.db
    engine = ProcessorEngine(timeout=60, max_retries=1, delay_min=0, delay_max=0)
    html_parser = HTMLParser()
    attachment_parser = AttachmentParser(timeout=60, max_retries=2)
    category_processor = CategoryProcessor()
    total = db.query(TaxDocument).filter(TaxDocument.content_html.isnot(None), TaxDocument.inline_images.is_(None)).count()
    logger.info(f'[backfill_media] 待回填: {total}')
    success_count = failed_count = 0
    while True:
        batch = db.query(TaxDocument).filter(TaxDocument.content_html.isnot(None), TaxDocument.inline_images.is_(None)).order_by(TaxDocument.id).limit(batch_size).all()
        if not batch:
            break
        for doc in batch:
            try:
                config = category_processor.get_category_config(doc.category_id) if doc.category_id else None
                save_dir = str(settings.raw_data_dir / (config['name'] if config else 'backfill'))
                Path(save_dir).mkdir(parents=True, exist_ok=True)
                result = asyncio.run(engine.reprocess_from_html(doc.content_html, doc.source_url, doc.category_id or 0, save_dir))
                if not result or not result.get('success'):
                    failed_count += 1
                    doc.inline_images = []
                    db.commit()
                    continue
                if doc.doc_type == 'qa':
                    doc.inline_images = []
                    db.commit()
                    success_count += 1
                    continue
                parsed = html_parser.parse(doc.content_html, doc.source_url)
                cleaned_soup = BeautifulSoup(parsed['content_html'], 'lxml')
                interp = doc.interpretation_form
                if interp == 'image':
                    import hashlib as _hashlib

                    from app.services.tax_data_processor.adapters.chinatax_zcjd.adapter import (
                        _download_inline_images,
                    )
                    img_dir = str(settings.raw_data_dir / 'chinatax_zcjd' / _hashlib.sha256(doc.source_url.encode()).hexdigest()[:12] / 'images')
                    inline_images = asyncio.run(_download_inline_images(cleaned_soup, img_dir))
                    inline_videos = []
                elif interp == 'video':
                    import hashlib as _hashlib

                    from app.services.tax_data_processor.adapters.chinatax_zcjd.adapter import (
                        _download_inline_videos,
                    )


                    vid_dir = str(settings.raw_data_dir / 'chinatax_zcjd' / _hashlib.sha256(doc.source_url.encode()).hexdigest()[:12] / 'videos')
                    inline_images = []
                    inline_videos = asyncio.run(_download_inline_videos(cleaned_soup, vid_dir))
                else:
                    media = html_parser.extract_media(cleaned_soup, doc.source_url)
                    inline_images = []
                    if media['images']:
                        img_dir = Path(save_dir) / f'backfill_{doc.id}' / 'images'
                        img_dir.mkdir(parents=True, exist_ok=True)
                        for idx, img_info in enumerate(media['images']):
                            src = img_info['src']
                            ext = Path(src.split('?')[0]).suffix.lower() or '.jpg'
                            dl = asyncio.run(attachment_parser.download_attachment(src, str(img_dir / f'img_{idx:03d}{ext}'), base_dir=settings.project_root))
                            if dl['success']:
                                inline_images.append({'src_original': src, 'path': dl['path'], 'alt': img_info['alt'], 'ocr_text': ''})
                    inline_videos = []
                    if media['videos']:
                        vid_dir = Path(save_dir) / f'backfill_{doc.id}' / 'videos'
                        vid_dir.mkdir(parents=True, exist_ok=True)
                        for idx, vid_info in enumerate(media['videos']):
                            src = vid_info['src']
                            ext = Path(src.split('?')[0]).suffix.lower() or '.mp4'
                            dl = asyncio.run(attachment_parser.download_attachment(src, str(vid_dir / f'vid_{idx:03d}{ext}'), base_dir=settings.project_root))
                            if dl['success']:
                                inline_videos.append({'src_original': src, 'path': dl['path'], 'transcript': '', 'keyframes': []})
                attachments = list(doc.attachments or [])
                att_changed = False
                for att in attachments:
                    if att.get('path'):
                        continue
                    dl = asyncio.run(attachment_parser.download_attachment(att['url'], str(Path(save_dir) / att['name']), base_dir=settings.project_root))
                    if dl['success']:
                        att['path'] = dl['path']
                        att_changed = True
                doc.inline_images = inline_images
                doc.inline_videos = inline_videos or None
                if doc.doc_type != 'qa':
                    doc.content_markdown = result['content_markdown']
                    doc.content_text = result['content_text']
                if att_changed:
                    doc.attachments = attachments
                    flag_modified(doc, 'attachments')
                db.commit()
                success_count += 1
            except Exception as e:
                db.rollback()
                failed_count += 1
                logger.error(f'[backfill_media] doc_id={doc.id} 失败: {e}')
        logger.info(f'[backfill_media] 进度 success={success_count} failed={failed_count}')
    logger.info(f'[backfill_media] 完成: total={total} success={success_count} failed={failed_count}')
    return {'success': True, 'total': total, 'success_count': success_count, 'failed_count': failed_count}
