from datetime import date, datetime, timedelta

import httpx
from common_logging import get_logger
from sqlalchemy.orm import Session

from app.models.tax_data import TaxDocument
from app.services.tax_data_processor.category_processor import CategoryProcessor
from app.services.tax_data_processor.processor_engine import ProcessorEngine

logger = get_logger(__name__)

DAILY_WINDOW_DAYS = 7
WEEKLY_WINDOW_DAYS = 30
FULL_CHECK_WINDOW_DAYS = 0
DAILY_CATEGORIES = {6, 8}
WEEKLY_CATEGORIES = {1, 2, 3}
MEDIUM_CATEGORIES = {4, 5, 7}

class IncrementalUpdater:

    def __init__(self, db: Session, request_timeout: int=15):
        self.db = db
        self.timeout = request_timeout
        self.category_processor = CategoryProcessor()
        self._engine = ProcessorEngine(timeout=request_timeout)

    async def check_category_updates(self, category_id: int, window_days: int=DAILY_WINDOW_DAYS) -> dict:
        config = self.category_processor.get_category_config(category_id)
        if not config:
            return {'category_id': category_id, 'error': f'未知分类ID: {category_id}'}
        logger.info(f"[增量] 检查分类 {category_id}（{config['name']}），窗口={window_days}天")
        cutoff: date | None = (datetime.utcnow() - timedelta(days=window_days)).date() if window_days > 0 else None
        candidates = await self._fetch_candidate_list(category_id, cutoff)
        if candidates is None:
            return {'category_id': category_id, 'error': '获取文档列表失败', 'new_documents': [], 'updated_documents': [], 'skipped': 0}
        logger.info(f"[增量] 分类 {category_id} 候选文档 {len(candidates)} 条（{('全部' if not cutoff else f'{window_days}天窗口')}）")
        new_documents: list[dict] = []
        updated_documents: list[dict] = []
        skipped = 0
        for doc in candidates:
            doc_url = doc['url']
            db_doc = self.db.query(TaxDocument).filter(TaxDocument.source_url == doc_url).first()
            if db_doc is None:
                new_documents.append(doc)
                continue
            changed = await self._is_content_changed(doc_url, db_doc, category_id=category_id)
            if changed:
                updated_documents.append(doc)
            else:
                skipped += 1
        logger.info(f'[增量] 分类 {category_id} 完成: 新增={len(new_documents)} 更新={len(updated_documents)} 跳过={skipped}')
        return {'category_id': category_id, 'category_name': config['name'], 'new_documents': new_documents, 'updated_documents': updated_documents, 'skipped': skipped, 'error': None}

    async def check_categories(self, category_ids: list[int] | None=None, window_days: int=DAILY_WINDOW_DAYS) -> list[dict]:
        if category_ids is None:
            category_ids = list(self.category_processor.CATEGORIES.keys())
        results = []
        for cid in category_ids:
            result = await self.check_category_updates(cid, window_days=window_days)
            results.append(result)
        total_new = sum(len(r.get('new_documents', [])) for r in results)
        total_updated = sum(len(r.get('updated_documents', [])) for r in results)
        logger.info(f'[增量] 全部 {len(category_ids)} 类检查完成: 新增={total_new} 更新={total_updated}')
        return results

    async def check_all_categories(self) -> list[dict]:
        return await self.check_categories(window_days=DAILY_WINDOW_DAYS)

    def update_last_check_time(self, category_id: int | None=None):
        now = datetime.utcnow()
        query = self.db.query(TaxDocument)
        if category_id is not None:
            query = query.filter(TaxDocument.category_id == category_id)
        query.update({'last_check_time': now}, synchronize_session=False)
        self.db.commit()
        logger.info(f'[增量] 更新 last_check_time: category_id={category_id}')

    async def _fetch_candidate_list(self, category_id: int, cutoff: date | None) -> list[dict] | None:
        try:
            all_docs = await self.category_processor.fetch_all_documents(category_id)
        except Exception as e:
            logger.error(f'[增量] 获取分类 {category_id} 列表失败: {e}'.opt(exception=True))
            return None
        if cutoff is None:
            return all_docs
        filtered = []
        for doc in all_docs:
            doc_date_str = doc.get('date')
            if not doc_date_str:
                filtered.append(doc)
                continue
            try:
                doc_date = _parse_date(doc_date_str)
                if doc_date >= cutoff:
                    filtered.append(doc)
            except Exception:
                filtered.append(doc)
        return filtered

    async def _is_content_changed(self, url: str, db_doc: TaxDocument, category_id: int=0) -> bool:
        try:
            async with httpx.AsyncClient(timeout=self.timeout, follow_redirects=True) as client:
                resp = await client.head(url)
                if resp.status_code in (404, 410):
                    logger.warning(f'[增量] 文档已失效 ({resp.status_code}): {url}')
                    return False
                last_modified_str = resp.headers.get('last-modified')
        except Exception as e:
            logger.warning(f'[增量] HEAD 请求失败，降级为 hash 对比: {url} — {e}')
            last_modified_str = None
        if last_modified_str and db_doc.updated_at:
            try:
                from email.utils import parsedate_to_datetime
                last_modified = parsedate_to_datetime(last_modified_str)
                diff = abs((last_modified.replace(tzinfo=None) - db_doc.updated_at).total_seconds())
                if diff < 60:
                    return False
            except Exception:
                pass
        return await self._hash_changed(url, db_doc.content_hash, category_id=category_id)

    async def check_source(self, source_id: int, window_days: int=7) -> dict:
        from app.config import get_settings
        from app.models.tax_data import DataSource, TaxDocument
        from app.services.tax_data_processor.adapters import get_adapter
        settings = get_settings()
        source = self.db.query(DataSource).filter(DataSource.id == source_id).first()
        if not source:
            return {'source_id': source_id, 'error': '数据源不存在'}
        try:
            engine = ProcessorEngine(timeout=settings.crawler_request_timeout, max_retries=source.max_retries, delay_min=source.request_delay_min, delay_max=source.request_delay_max)
            adapter = get_adapter(source, engine)
            local_urls = {r[0] for r in self.db.query(TaxDocument.source_url).filter(TaxDocument.source_id == source_id).all()}
            new_documents = []
            async for item in adapter.list_all_documents():
                if item.url not in local_urls:
                    new_documents.append({'url': item.url, 'title': item.title})
            return {'source_id': source_id, 'new_documents': new_documents, 'updated_documents': []}
        except Exception as e:
            return {'source_id': source_id, 'error': str(e)}

    async def _hash_changed(self, url: str, stored_hash: str | None, category_id: int=0) -> bool:
        if not stored_hash:
            return True
        try:
            result = await self._engine.process_document(url, category_id, '/tmp')
            if not result or not result.get('success'):
                logger.warning(f'[增量] process_document 失败，跳过 hash 对比: {url}')
                return False
            new_hash = result.get('content_hash', '')
        except Exception as e:
            logger.warning(f'[增量] hash 对比异常，跳过: {url} — {e}')
            return False
        if new_hash and new_hash != stored_hash:
            logger.info(f'[增量] hash 变化，标记更新: {url}')
            return True
        return False

def _parse_date(date_str: str) -> date:
    date_str = date_str.strip()
    for fmt in ('%Y-%m-%d', '%Y/%m/%d', '%Y年%m月%d日', '%Y.%m.%d'):
        try:
            return datetime.strptime(date_str, fmt).date()
        except ValueError:
            continue
    raise ValueError(f'无法解析日期: {date_str}')
