import asyncio
import time

from sqlalchemy import text
from sqlalchemy.orm import Session

from app.database import SessionLocal
from app.models.tax_data import DataSource, TaxDocument
from app.services.tax_data_processor.knowledge_base_client import (

    KnowledgeBaseClient,
    _parse_doc_number_parts,
)

from .config import BATCH_SIZE, DOC_TYPE_MAP, REQUEST_DELAY
from .state import ImportState

from common_logging import get_logger

logger = get_logger(__name__)


def _resolve_kb_category(doc, category_map: dict, source_category_map: dict):
    if doc.category_id and doc.category_id != 0:
        return category_map.get(doc.category_id)
    if doc.source_id:
        return source_category_map.get(doc.source_id)
    return None

def run_phase2(state: ImportState, base_url: str, token: str, source_category_map: dict=None) -> None:
    if source_category_map is None:
        source_category_map = {}
    if state.is_phase_done('2'):
        logger.info('Phase 2 已完成，跳过')
        return
    kb_id = state.get_kb_id()
    category_map = state.get_category_map()
    client = KnowledgeBaseClient(base_url=base_url, api_key=token)
    asyncio.run(client.ensure_metadata_fields(kb_id))
    cursor = state.get_phase2_cursor()
    total_imported = 0
    total_skipped = 0
    logger.info(f'Phase 2 开始，cursor={cursor}')
    while True:
        with SessionLocal() as db:
            batch = _fetch_batch(db, after_id=cursor, batch_size=BATCH_SIZE)
        if not batch:
            break
        for doc in batch:
            if not doc.content_text or not doc.content_text.strip():
                logger.warning(f'跳过空内容文档: id={doc.id} title={doc.title}')
                total_skipped += 1
                cursor = doc.id
                state.set_phase2_cursor(cursor)
                continue
            kb_cat_id = _resolve_kb_category(doc, category_map, source_category_map)
            result = asyncio.run(_import_one(client, doc, kb_id, kb_cat_id))
            if result and result.get('success'):
                knowledge_doc_id = result.get('knowledge_doc_id') or result.get('document_id')
                with SessionLocal() as db:
                    _writeback(db, doc.id, knowledge_doc_id)
                total_imported += 1
                logger.info(f'[{total_imported}] 导入成功: id={doc.id} → knowledge_doc_id={knowledge_doc_id}')
            else:
                error = result.get('error') if result else 'unknown'
                logger.error(f'导入失败: id={doc.id} title={doc.title} error={error}')
            cursor = doc.id
            state.set_phase2_cursor(cursor)
            time.sleep(REQUEST_DELAY)
    logger.info(f'✓ Phase 2 完成：导入 {total_imported} 条，跳过 {total_skipped} 条')
    state.mark_phase_done('2')

async def _import_one(client: KnowledgeBaseClient, doc: TaxDocument, kb_id: int, category_id: int | None) -> dict | None:
    num_parts = _parse_doc_number_parts(doc.doc_number)
    attachments = doc.attachments or []
    has_att = bool(attachments)
    att_types = list({a.get('type', '').lower() for a in attachments if a.get('type')}) if has_att else None
    if getattr(doc, 'doc_type', None) == 'qa' and doc.qa_question and doc.qa_answer:
        full_content = f'问：{doc.qa_question}\n\n答：{doc.qa_answer}'
    else:
        full_content = doc.content_text
    metadata = {'region_code': getattr(doc, 'region_code', None) or 'CN', 'doc_type': getattr(doc, 'doc_type', None) or 'regulation', 'source_id': getattr(doc, 'source_id', None)}
    result = await client.create_document(category_id=category_id, title=doc.title, content=full_content, source_url=doc.source_url, doc_number=doc.doc_number, doc_number_year=num_parts['doc_number_year'], doc_number_serial=num_parts['doc_number_serial'], issuing_authority=doc.issuing_authority, issue_date=doc.issue_date.isoformat() if doc.issue_date else None, effective_date=doc.effective_date.isoformat() if doc.effective_date else None, doc_status=doc.doc_status or 'effective', has_attachment=has_att, attachment_types=att_types, content_hash=doc.content_hash, version_number=1, doc_type=DOC_TYPE_MAP.get(doc.category_id, 'notice'), metadata=metadata)
    if result and result.get('success'):
        knowledge_doc_id = result.get('document_id')
        meta = {}
        if doc.doc_number:
            meta['doc_number'] = doc.doc_number
        if doc.issuing_authority:
            meta['issuing_authority'] = doc.issuing_authority
        if doc.issue_date:
            meta['issue_date'] = doc.issue_date.isoformat()
        if doc.effective_date:
            meta['effective_date'] = doc.effective_date.isoformat()
        if doc.doc_status:
            meta['doc_status'] = doc.doc_status
        if doc.superseded_by_doc_number:
            meta['superseded_by'] = doc.superseded_by_doc_number
        if doc.superseded_by_title:
            meta['superseded_by_title'] = doc.superseded_by_title
        if doc.source_url:
            meta['source_url'] = doc.source_url
        if doc.category_id is not None:
            meta['tax_category_id'] = str(doc.category_id)
        if meta:
            await client.annotate_document_metadata(knowledge_doc_id, meta)
        await _import_attachments(client, doc, knowledge_doc_id, category_id, kb_id)
    return result

async def _import_attachments(client: KnowledgeBaseClient, doc: TaxDocument, parent_knowledge_doc_id: int, category_id: int | None, kb_id: int) -> None:
    from app.config import get_settings
    from app.services.tax_data_processor.converters.doc_converter import DocConverter
    settings = get_settings()
    doc_converter = DocConverter()
    for att in doc.attachments or []:
        local_path = att.get('path', '')
        if not local_path:
            continue
        abs_path = str(settings.project_root / local_path)
        try:
            conv = await doc_converter.convert_local(abs_path)
        except Exception as e:
            logger.warning(f"[phase2] 附件转换失败 doc_id={doc.id} att={att.get('name')}: {e}")
            continue
        if not conv.get('success') or not conv.get('markdown', '').strip():
            continue
        att_result = await client.create_document(category_id=category_id, title=f"{doc.title} — {att['name']}", content=conv['markdown'], source_url=doc.source_url, doc_type='attachment', metadata={'parent_doc_id': str(parent_knowledge_doc_id), 'attachment_name': att['name'], 'attachment_type': att.get('type', '')})
        if att_result and att_result.get('success'):
            logger.info(f"[phase2] 附件独立入库成功: doc_id={doc.id} att={att['name']}")
        else:
            logger.warning(f"[phase2] 附件独立入库失败: doc_id={doc.id} att={att['name']}")

def _fetch_batch(db: Session, after_id: int, batch_size: int) -> list[TaxDocument]:
    from sqlalchemy import or_


    return db.query(TaxDocument).outerjoin(DataSource, TaxDocument.source_id == DataSource.id).filter(TaxDocument.processing_status == 'completed', not TaxDocument.is_imported, TaxDocument.content_text.isnot(None), TaxDocument.id > after_id, or_(TaxDocument.category_id != 0, TaxDocument.source_id.isnot(None)), DataSource.code != 'qa_12366').order_by(TaxDocument.id.asc()).limit(batch_size).all()

def _writeback(db: Session, tax_doc_id: int, knowledge_doc_id: int) -> None:
    db.execute(text('UPDATE tax_documents SET is_imported=true, knowledge_doc_id=:kid WHERE id=:id'), {'kid': knowledge_doc_id, 'id': tax_doc_id})
    db.commit()
