# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import logging
import subprocess
import sys
import time
import warnings
from threading import Thread

import requests

from ._models import (
    ChartParsing,
    DocImgOrientationClassification,
    DocVLM,
    FormulaRecognition,
    LayoutDetection,
    SealTextDetection,
    TableCellsDetection,
    TableClassification,
    TableStructureRecognition,
    TextDetection,
    TextImageUnwarping,
    TextLineOrientationClassification,
    TextRecognition,
)
from ._pipelines import (
    DocPreprocessor,
    DocUnderstanding,
    FormulaRecognitionPipeline,
    PaddleOCR,
    PaddleOCRVL,
    PPChatOCRv4Doc,
    PPDocTranslation,
    PPStructureV3,
    SealRecognition,
    TableRecognitionPipelineV2,
)
from ._version import version
from ._utils.deprecation import CLIDeprecationWarning
from ._utils.logging import logger


def _register_pipelines(subparsers):
    for cls in [
        DocPreprocessor,
        DocUnderstanding,
        FormulaRecognitionPipeline,
        PaddleOCR,
        PaddleOCRVL,
        PPChatOCRv4Doc,
        PPDocTranslation,
        PPStructureV3,
        SealRecognition,
        TableRecognitionPipelineV2,
    ]:
        subcommand_executor = cls.get_cli_subcommand_executor()
        subparser = subcommand_executor.add_subparser(subparsers)
        subparser.set_defaults(executor=subcommand_executor.execute_with_args)


def _register_models(subparsers):
    for cls in [
        ChartParsing,
        DocImgOrientationClassification,
        DocVLM,
        FormulaRecognition,
        LayoutDetection,
        SealTextDetection,
        TableCellsDetection,
        TableClassification,
        TableStructureRecognition,
        TextDetection,
        TextImageUnwarping,
        TextLineOrientationClassification,
        TextRecognition,
    ]:
        subcommand_executor = cls.get_cli_subcommand_executor()
        subparser = subcommand_executor.add_subparser(subparsers)
        subparser.set_defaults(executor=subcommand_executor.execute_with_args)


def _register_install_hpi_deps_command(subparsers):
    def _install_hpi_deps(args):
        hpip = f"hpi-{args.variant}"
        try:
            subprocess.check_call(["paddlex", "--install", hpip])
            subprocess.check_call(["paddlex", "--install", "paddle2onnx"])
        except subprocess.CalledProcessError:
            sys.exit("Failed to install dependencies")

    subparser = subparsers.add_parser("install_hpi_deps")
    subparser.add_argument("variant", type=str, choices=["cpu", "gpu", "npu"])
    subparser.set_defaults(executor=_install_hpi_deps)


def _register_install_genai_server_deps_command(subparsers):
    def _install_genai_server_deps(args):
        try:
            subprocess.check_call(
                ["paddlex", "--install", f"genai-{args.variant}-server"]
            )
        except subprocess.CalledProcessError:
            sys.exit("Failed to install dependencies")

    subparser = subparsers.add_parser("install_genai_server_deps")
    subparser.add_argument(
        "variant", type=str, choices=["vllm", "sglang", "fastdeploy"]
    )
    subparser.set_defaults(executor=_install_genai_server_deps)


def _register_genai_server_command(subparsers):
    # TODO: Register the subparser whether the plugin is installed or not
    try:
        from paddlex.inference.genai.server import get_arg_parser, run_genai_server
    except RuntimeError:
        return

    def _show_prompt_when_server_is_running(host, port, backend):
        if host == "0.0.0.0":
            host = "localhost"
        while True:
            try:
                resp = requests.get(f"http://{host}:{port}/health", timeout=1)
                if resp.status_code == 200:
                    break
            except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
                pass
            time.sleep(1)
        prompt = f"""The PaddleOCR GenAI server has been started. You can either:
    1. Set the server URL in the module or pipeline configuration and call the PaddleOCR CLI or Python API. For example:
        paddleocr doc_parser --input demo.png --vl_rec_backend {backend}-server --vl_rec_server_url http://{host}:{port}/v1
    2. Make HTTP requests directly, or using the OpenAI client library."""
        logger.info(prompt)

    def _run_genai_server(args):
        Thread(
            target=_show_prompt_when_server_is_running,
            args=(args.host, args.port, args.backend),
            daemon=True,
        ).start()
        try:
            run_genai_server(args)
        except subprocess.CalledProcessError:
            sys.exit("Failed to run the server")

    paddlex_parser = get_arg_parser()
    subparser = subparsers.add_parser(
        "genai_server", parents=[paddlex_parser], conflict_handler="resolve"
    )
    subparser.set_defaults(executor=_run_genai_server)


def _register_doc2md_command(subparsers):
    """Register the doc2md subcommand."""

    def _execute_doc2md(args):
        if args.formats:
            from ._doc2md import supported_formats

            fmts = supported_formats()
            print("Supported formats: " + ", ".join(f".{f}" for f in fmts))
            return

        if not args.input:
            logger.error("--input is required when --formats is not set")
            sys.exit(2)

        from ._doc2md import convert
        from pathlib import Path

        output = args.output
        quiet = args.quiet

        # Build converter kwargs from CLI args
        converter_kwargs = {}
        if args.no_drawings:
            converter_kwargs["extract_drawings"] = False
        if args.no_headers_footers:
            converter_kwargs["extract_headers_footers"] = False
        if args.sheet_name is not None:
            converter_kwargs["sheet_name"] = args.sheet_name
        if args.max_rows is not None:
            converter_kwargs["max_rows"] = args.max_rows

        t1 = time.time()
        try:
            result = convert(args.input, output=output, **converter_kwargs)
        except Exception as e:
            logger.error(f"Conversion failed: {e}")
            sys.exit(1)

        elapsed = (time.time() - t1) * 1000
        if not quiet:
            logger.info(f"Conversion done in {elapsed:.0f} ms")

        if output:
            if not quiet:
                logger.info(f"Saved to: {output}")
                if result.images:
                    logger.info(f"Images saved to: {Path(output).parent / 'images'}/")
        else:
            print(result.markdown)

    subparser = subparsers.add_parser(
        "doc2md",
        help="Convert office documents (docx/xlsx/pptx) to Markdown",
    )
    subparser.add_argument(
        "-i",
        "--input",
        type=str,
        default=None,
        help="Input file path (.docx/.xlsx/.pptx)",
    )
    subparser.add_argument(
        "-o",
        "--output",
        type=str,
        default=None,
        help="Output Markdown file path (prints to stdout if omitted)",
    )
    subparser.add_argument(
        "-q",
        "--quiet",
        action="store_true",
        help="Suppress informational output",
    )
    subparser.add_argument(
        "--formats",
        action="store_true",
        help="List supported formats and exit",
    )
    # docx options
    subparser.add_argument(
        "--no-drawings",
        action="store_true",
        help="[docx/xlsx] Skip text box / drawing layer content extraction",
    )
    subparser.add_argument(
        "--no-headers-footers",
        action="store_true",
        help="[docx] Skip header and footer content extraction",
    )
    # xlsx options
    subparser.add_argument(
        "--sheet-name",
        type=str,
        default=None,
        help="[xlsx] Convert only the specified sheet (by name)",
    )
    subparser.add_argument(
        "--max-rows",
        type=int,
        default=None,
        help="[xlsx] Maximum number of rows to convert per sheet",
    )
    subparser.set_defaults(executor=_execute_doc2md)


def _get_parser():
    parser = argparse.ArgumentParser(prog="paddleocr")
    parser.add_argument(
        "-v", "--version", action="version", version=f"%(prog)s {version}"
    )
    subparsers = parser.add_subparsers(dest="subcommand")
    _register_pipelines(subparsers)
    _register_models(subparsers)
    _register_install_hpi_deps_command(subparsers)
    _register_install_genai_server_deps_command(subparsers)
    _register_genai_server_command(subparsers)
    _register_doc2md_command(subparsers)
    return parser


def _execute(args):
    args.executor(args)


def main():
    logger.setLevel(logging.INFO)
    warnings.filterwarnings("default", category=CLIDeprecationWarning)
    parser = _get_parser()
    args = parser.parse_args()
    if args.subcommand is None:
        parser.print_usage(sys.stderr)
        sys.exit(2)
    _execute(args)
