
     j                     L   d Z ddlZddlZddlZddlmZmZmZmZm	Z	m
Z
mZmZmZmZmZ ddlmZmZ ddlmZmZ ddlmZ erddlmZ  G d d	          Z G d
 de          Z G d de          Z G d d          Z G d d          Z G d d          Z  G d d          Z!dS )ag  Batch evaluation functionality for Langfuse.

This module provides comprehensive batch evaluation capabilities for running evaluations
on traces and observations fetched from Langfuse. It includes type definitions,
protocols, result classes, and the implementation for large-scale evaluation workflows
with error handling, retry logic, and resume capability.
    N)TYPE_CHECKINGAny	AwaitableDictListOptionalProtocolSetTupleUnioncast)ObservationsViewTraceWithFullDetails)
EvaluationEvaluatorFunction)langfuse_logger)Langfusec                   N    e Zd ZdZddddedededeeeef                  fdZdS )	EvaluatorInputsaF
  Input data structure for evaluators, returned by mapper functions.

    This class provides a strongly-typed container for transforming API response
    objects (traces, observations) into the standardized format expected
    by evaluator functions. It ensures consistent access to input, output, expected
    output, and metadata regardless of the source entity type.

    Attributes:
        input: The input data that was provided to generate the output being evaluated.
            For traces, this might be the initial prompt or request. For observations,
            this could be the span's input. The exact meaning depends on your use case.
        output: The actual output that was produced and needs to be evaluated.
            For traces, this is typically the final response. For observations,
            this might be the generation output or span result.
        expected_output: Optional ground truth or expected result for comparison.
            Used by evaluators to assess correctness. May be None if no ground truth
            is available for the entity being evaluated.
        metadata: Optional structured metadata providing additional context for evaluation.
            Can include information about the entity, execution context, user attributes,
            or any other relevant data that evaluators might use.

    Examples:
        Simple mapper for traces:
        ```python
        from langfuse import EvaluatorInputs

        def trace_mapper(trace):
            return EvaluatorInputs(
                input=trace.input,
                output=trace.output,
                expected_output=None,  # No ground truth available
                metadata={"user_id": trace.user_id, "tags": trace.tags}
            )
        ```

        Mapper for observations extracting specific fields:
        ```python
        def observation_mapper(observation):
            # Extract input/output from observation's data
            input_data = observation.input if hasattr(observation, 'input') else None
            output_data = observation.output if hasattr(observation, 'output') else None

            return EvaluatorInputs(
                input=input_data,
                output=output_data,
                expected_output=None,
                metadata={
                    "observation_type": observation.type,
                    "model": observation.model,
                    "latency_ms": observation.end_time - observation.start_time
                }
            )
        ```
        ```

    Note:
        All arguments must be passed as keywords when instantiating this class.
    N)expected_outputmetadatainputoutputr   r   c                >    || _         || _        || _        || _        dS )a}  Initialize EvaluatorInputs with the provided data.

        Args:
            input: The input data for evaluation.
            output: The output data to be evaluated.
            expected_output: Optional ground truth for comparison.
            metadata: Optional additional context for evaluation.

        Note:
            All arguments must be provided as keywords.
        Nr   r   r   r   )selfr   r   r   r   s        c/lsinfo/ai/hellotax_ai/base_platform/venv/lib/python3.11/site-packages/langfuse/batch_evaluation.py__init__zEvaluatorInputs.__init__a   s%    & 
.     )	__name__
__module____qualname____doc__r   r   r   strr    r   r   r   r   %   sz        9 9@  $-1! ! ! ! 	!
 ! 4S>*! ! ! ! ! !r   r   c            	       ^    e Zd ZdZded         deeef         deee	e         f         fdZ
dS )MapperFunctionas  Protocol defining the interface for mapper functions in batch evaluation.

    Mapper functions transform API response objects (traces or observations)
    into the standardized EvaluatorInputs format that evaluators expect. This abstraction
    allows you to define how to extract and structure evaluation data from different
    entity types.

    Mapper functions must:
    - Accept a single item parameter (trace, observation)
    - Return an EvaluatorInputs instance with input, output, expected_output, metadata
    - Can be either synchronous or asynchronous
    - Should handle missing or malformed data gracefully
    item)r   r   kwargsreturnc                    dS )a
  Transform an API response object into evaluator inputs.

        This method defines how to extract evaluation-relevant data from the raw
        API response object. The implementation should map entity-specific fields
        to the standardized input/output/expected_output/metadata structure.

        Args:
            item: The API response object to transform. The type depends on the scope:
                - TraceWithFullDetails: When evaluating traces
                - ObservationsView: When evaluating observations

        Returns:
            EvaluatorInputs: A structured container with:
                - input: The input data that generated the output
                - output: The output to be evaluated
                - expected_output: Optional ground truth for comparison
                - metadata: Optional additional context

            Can return either a direct EvaluatorInputs instance or an awaitable
            (for async mappers that need to fetch additional data).

        Examples:
            Basic trace mapper:
            ```python
            def map_trace(trace):
                return EvaluatorInputs(
                    input=trace.input,
                    output=trace.output,
                    expected_output=None,
                    metadata={"trace_id": trace.id, "user": trace.user_id}
                )
            ```

            Observation mapper with conditional logic:
            ```python
            def map_observation(observation):
                # Extract fields based on observation type
                if observation.type == "GENERATION":
                    input_data = observation.input
                    output_data = observation.output
                else:
                    # For other types, use different fields
                    input_data = observation.metadata.get("input")
                    output_data = observation.metadata.get("output")

                return EvaluatorInputs(
                    input=input_data,
                    output=output_data,
                    expected_output=None,
                    metadata={"obs_id": observation.id, "type": observation.type}
                )
            ```

            Async mapper (if additional processing needed):
            ```python
            async def map_trace_async(trace):
                # Could do async processing here if needed
                processed_output = await some_async_transformation(trace.output)

                return EvaluatorInputs(
                    input=trace.input,
                    output=processed_output,
                    expected_output=None,
                    metadata={"trace_id": trace.id}
                )
            ```
        Nr%   )r   r(   r)   s      r   __call__zMapperFunction.__call__   s
    R 	r   N)r    r!   r"   r#   r   r   r$   r   r   r   r,   r%   r   r   r'   r'   z   sv         I >?I sCx.	I
 
	/ ::	;I I I I I Ir   r'   c                      e Zd ZdZddddddee         dee         dee         deeeef                  dee	         d	eeef         d
e
e	ee	         eeef         ee	         eee	                  eeeef                  f         fdZdS )CompositeEvaluatorFunctiona  Protocol defining the interface for composite evaluator functions.

    Composite evaluators create aggregate scores from multiple item-level evaluations.
    This is commonly used to compute weighted averages, combined metrics, or other
    composite assessments based on individual evaluation results.

    Composite evaluators:
    - Accept the same inputs as item-level evaluators (input, output, expected_output, metadata)
      plus the list of evaluations
    - Return either a single Evaluation, a list of Evaluations, or a dict
    - Can be either synchronous or asynchronous
    - Have access to both raw item data and evaluation results
    Nr   r   r   r   r   evaluationsr)   r*   c                    dS )a  Create a composite evaluation from item-level evaluation results.

        This method combines multiple evaluation scores into a single composite metric.
        Common use cases include weighted averages, pass/fail decisions based on multiple
        criteria, or custom scoring logic that considers multiple dimensions.

        Args:
            input: The input data that was provided to the system being evaluated.
            output: The output generated by the system being evaluated.
            expected_output: The expected/reference output for comparison (if available).
            metadata: Additional metadata about the evaluation context.
            evaluations: List of evaluation results from item-level evaluators.
                Each evaluation contains name, value, comment, and metadata.

        Returns:
            Can return any of:
            - Evaluation: A single composite evaluation result
            - List[Evaluation]: Multiple composite evaluations
            - Dict: A dict that will be converted to an Evaluation
                - name: Identifier for the composite metric (e.g., "composite_score")
                - value: The computed composite value
                - comment: Optional explanation of how the score was computed
                - metadata: Optional details about the composition logic

            Can return either a direct Evaluation instance or an awaitable
            (for async composite evaluators).

        Examples:
            Simple weighted average:
            ```python
            def weighted_composite(*, input, output, expected_output, metadata, evaluations):
                weights = {
                    "accuracy": 0.5,
                    "relevance": 0.3,
                    "safety": 0.2
                }

                total_score = 0.0
                total_weight = 0.0

                for eval in evaluations:
                    if eval.name in weights and isinstance(eval.value, (int, float)):
                        total_score += eval.value * weights[eval.name]
                        total_weight += weights[eval.name]

                final_score = total_score / total_weight if total_weight > 0 else 0.0

                return Evaluation(
                    name="composite_score",
                    value=final_score,
                    comment=f"Weighted average of {len(evaluations)} metrics"
                )
            ```

            Pass/fail composite based on thresholds:
            ```python
            def pass_fail_composite(*, input, output, expected_output, metadata, evaluations):
                # Must pass all criteria
                thresholds = {
                    "accuracy": 0.7,
                    "safety": 0.9,
                    "relevance": 0.6
                }

                passes = True
                failing_metrics = []

                for metric, threshold in thresholds.items():
                    eval_result = next((e for e in evaluations if e.name == metric), None)
                    if eval_result and isinstance(eval_result.value, (int, float)):
                        if eval_result.value < threshold:
                            passes = False
                            failing_metrics.append(metric)

                return Evaluation(
                    name="passes_all_checks",
                    value=passes,
                    comment=f"Failed: {', '.join(failing_metrics)}" if failing_metrics else "All checks passed",
                    data_type="BOOLEAN"
                )
            ```

            Async composite with external scoring:
            ```python
            async def llm_composite(*, input, output, expected_output, metadata, evaluations):
                # Use LLM to synthesize multiple evaluation results
                eval_summary = "\n".join(
                    f"- {e.name}: {e.value}" for e in evaluations
                )

                prompt = f"Given these evaluation scores:\n{eval_summary}\n"
                prompt += f"For the output: {output}\n"
                prompt += "Provide an overall quality score from 0-1."

                response = await openai.chat.completions.create(
                    model="gpt-4",
                    messages=[{"role": "user", "content": prompt}]
                )

                score = float(response.choices[0].message.content.strip())

                return Evaluation(
                    name="llm_composite_score",
                    value=score,
                    comment="LLM-synthesized composite score"
                )
            ```

            Context-aware composite:
            ```python
            def context_composite(*, input, output, expected_output, metadata, evaluations):
                # Adjust weighting based on metadata
                base_weights = {"accuracy": 0.5, "speed": 0.3, "cost": 0.2}

                # If metadata indicates high importance, prioritize accuracy
                if metadata and metadata.get('importance') == 'high':
                    weights = {"accuracy": 0.7, "speed": 0.2, "cost": 0.1}
                else:
                    weights = base_weights

                total = sum(
                    e.value * weights.get(e.name, 0)
                    for e in evaluations
                    if isinstance(e.value, (int, float))
                )

                return Evaluation(
                    name="weighted_composite",
                    value=total,
                    comment="Context-aware weighted composite"
                )
            ```
        Nr%   )r   r   r   r   r   r/   r)   s          r   r,   z#CompositeEvaluatorFunction.__call__   s
    l 	r   )r    r!   r"   r#   r   r   r   r$   r   r   r   r   r,   r%   r   r   r.   r.      s
        "  $ $)--1V V V }V 	V
 "#V 4S>*V *%V sCx.V 
ZS#X*$z"#$sCx.!	#
V V V V V Vr   r.   c                   :    e Zd ZdZddddddededededef
d	Zd
S )EvaluatorStatsu  Statistics for a single evaluator's performance during batch evaluation.

    This class tracks detailed metrics about how a specific evaluator performed
    across all items in a batch evaluation run. It helps identify evaluator issues,
    understand reliability, and optimize evaluation pipelines.

    Attributes:
        name: The name of the evaluator function (extracted from __name__).
        total_runs: Total number of times the evaluator was invoked.
        successful_runs: Number of times the evaluator completed successfully.
        failed_runs: Number of times the evaluator raised an exception or failed.
        total_scores_created: Total number of evaluation scores created by this evaluator.
            Can be higher than successful_runs if the evaluator returns multiple scores.

    Examples:
        Accessing evaluator stats from batch evaluation result:
        ```python
        result = client.run_batched_evaluation(...)

        for stats in result.evaluator_stats:
            print(f"Evaluator: {stats.name}")
            print(f"  Success rate: {stats.successful_runs / stats.total_runs:.1%}")
            print(f"  Scores created: {stats.total_scores_created}")

            if stats.failed_runs > 0:
                print(f"  ⚠️  Failed {stats.failed_runs} times")
        ```

        Identifying problematic evaluators:
        ```python
        result = client.run_batched_evaluation(...)

        # Find evaluators with high failure rates
        for stats in result.evaluator_stats:
            failure_rate = stats.failed_runs / stats.total_runs
            if failure_rate > 0.1:  # More than 10% failures
                print(f"⚠️  {stats.name} has {failure_rate:.1%} failure rate")
                print(f"    Consider debugging or removing this evaluator")
        ```

    Note:
        All arguments must be passed as keywords when instantiating this class.
    r   )
total_runssuccessful_runsfailed_runstotal_scores_creatednamer3   r4   r5   r6   c                L    || _         || _        || _        || _        || _        dS )a  Initialize EvaluatorStats with the provided metrics.

        Args:
            name: The evaluator function name.
            total_runs: Total number of evaluator invocations.
            successful_runs: Number of successful completions.
            failed_runs: Number of failures.
            total_scores_created: Total scores created by this evaluator.

        Note:
            All arguments must be provided as keywords.
        N)r7   r3   r4   r5   r6   )r   r7   r3   r4   r5   r6   s         r   r   zEvaluatorStats.__init__  s0    * 	$.&$8!!!r   N)r    r!   r"   r#   r$   intr   r%   r   r   r2   r2   }  s}        * *`  $%9 9 9 9 	9
 9 9 "9 9 9 9 9 9r   r2   c            
       :    e Zd ZdZdedee         dededef
dZdS )	BatchEvaluationResumeTokena  Token for resuming a failed batch evaluation run.

    This class encapsulates all the information needed to resume a batch evaluation
    that was interrupted or failed partway through. It uses timestamp-based filtering
    to avoid re-processing items that were already evaluated, even if the underlying
    dataset changed between runs.

    Attributes:
        scope: The type of items being evaluated ("traces", "observations").
        filter: The original JSON filter string used to query items.
        last_processed_timestamp: ISO 8601 timestamp of the last successfully processed item.
            Used to construct a filter that only fetches items after this timestamp.
        last_processed_id: The ID of the last successfully processed item, for reference.
        items_processed: Count of items successfully processed before interruption.

    Examples:
        Resuming a failed batch evaluation:
        ```python
        # Initial run that fails partway through
        try:
            result = client.run_batched_evaluation(
                scope="traces",
                mapper=my_mapper,
                evaluators=[evaluator1, evaluator2],
                filter='{"tags": ["production"]}',
                max_items=10000
            )
        except Exception as e:
            print(f"Evaluation failed: {e}")

            # Save the resume token
            if result.resume_token:
                # Store resume token for later (e.g., in a file or database)
                import json
                with open("resume_token.json", "w") as f:
                    json.dump({
                        "scope": result.resume_token.scope,
                        "filter": result.resume_token.filter,
                        "last_timestamp": result.resume_token.last_processed_timestamp,
                        "last_id": result.resume_token.last_processed_id,
                        "items_done": result.resume_token.items_processed
                    }, f)

        # Later, resume from where it left off
        with open("resume_token.json") as f:
            token_data = json.load(f)

        resume_token = BatchEvaluationResumeToken(
            scope=token_data["scope"],
            filter=token_data["filter"],
            last_processed_timestamp=token_data["last_timestamp"],
            last_processed_id=token_data["last_id"],
            items_processed=token_data["items_done"]
        )

        # Resume the evaluation
        result = client.run_batched_evaluation(
            scope="traces",
            mapper=my_mapper,
            evaluators=[evaluator1, evaluator2],
            resume_from=resume_token
        )

        print(f"Processed {result.total_items_processed} additional items")
        ```

        Handling partial completion:
        ```python
        result = client.run_batched_evaluation(...)

        if not result.completed:
            print(f"Evaluation incomplete. Processed {result.resume_token.items_processed} items")
            print(f"Last item: {result.resume_token.last_processed_id}")
            print(f"Resume from: {result.resume_token.last_processed_timestamp}")

            # Optionally retry automatically
            if result.resume_token:
                print("Retrying...")
                result = client.run_batched_evaluation(
                    scope=result.resume_token.scope,
                    mapper=my_mapper,
                    evaluators=my_evaluators,
                    resume_from=result.resume_token
                )
        ```

    Note:
        All arguments must be passed as keywords when instantiating this class.
        The timestamp-based approach means that items created after the initial run
        but before the timestamp will be skipped. This is intentional to avoid
        duplicates and ensure consistent evaluation.
    scopefilterlast_processed_timestamplast_processed_iditems_processedc                L    || _         || _        || _        || _        || _        dS )a  Initialize BatchEvaluationResumeToken with the provided state.

        Args:
            scope: The scope type ("traces", "observations").
            filter: The original JSON filter string.
            last_processed_timestamp: ISO 8601 timestamp of last processed item.
            last_processed_id: ID of last processed item.
            items_processed: Count of items processed before interruption.

        Note:
            All arguments must be provided as keywords.
        Nr<   r=   r>   r?   r@   )r   r<   r=   r>   r?   r@   s         r   r   z#BatchEvaluationResumeToken.__init__$  s0    * 
(@%!2.r   N)r    r!   r"   r#   r$   r   r9   r   r%   r   r   r;   r;     sk        [ [z/ / 	/
 #&/ / / / / / / /r   r;   c                       e Zd ZdZdededededededee         d	ee         d
e	de
dee         deeef         de	deeed         f         fdZdefdZdS )BatchEvaluationResultu;  Complete result structure for batch evaluation execution.

    This class encapsulates comprehensive statistics and metadata about a batch
    evaluation run, including counts, evaluator-specific metrics, timing information,
    error details, and resume capability.

    Attributes:
        total_items_fetched: Total number of items fetched from the API.
        total_items_processed: Number of items successfully evaluated.
        total_items_failed: Number of items that failed during evaluation.
        total_scores_created: Total scores created by all item-level evaluators.
        total_composite_scores_created: Scores created by the composite evaluator.
        total_evaluations_failed: Number of individual evaluator failures across all items.
        evaluator_stats: List of per-evaluator statistics (success/failure rates, scores created).
        resume_token: Token for resuming if evaluation was interrupted (None if completed).
        completed: True if all items were processed, False if stopped early or failed.
        duration_seconds: Total time taken to execute the batch evaluation.
        failed_item_ids: List of IDs for items that failed evaluation.
        error_summary: Dictionary mapping error types to occurrence counts.
        has_more_items: True if max_items limit was reached but more items exist.
        item_evaluations: Dictionary mapping item IDs to their evaluation results (both regular and composite).

    Examples:
        Basic result inspection:
        ```python
        result = client.run_batched_evaluation(...)

        print(f"Processed: {result.total_items_processed}/{result.total_items_fetched}")
        print(f"Scores created: {result.total_scores_created}")
        print(f"Duration: {result.duration_seconds:.2f}s")
        print(f"Success rate: {result.total_items_processed / result.total_items_fetched:.1%}")
        ```

        Detailed analysis with evaluator stats:
        ```python
        result = client.run_batched_evaluation(...)

        print(f"\n📊 Batch Evaluation Results")
        print(f"{'='*50}")
        print(f"Items processed: {result.total_items_processed}")
        print(f"Items failed: {result.total_items_failed}")
        print(f"Scores created: {result.total_scores_created}")

        if result.total_composite_scores_created > 0:
            print(f"Composite scores: {result.total_composite_scores_created}")

        print(f"\n📈 Evaluator Performance:")
        for stats in result.evaluator_stats:
            success_rate = stats.successful_runs / stats.total_runs if stats.total_runs > 0 else 0
            print(f"\n  {stats.name}:")
            print(f"    Success rate: {success_rate:.1%}")
            print(f"    Scores created: {stats.total_scores_created}")
            if stats.failed_runs > 0:
                print(f"    ⚠️  Failures: {stats.failed_runs}")

        if result.error_summary:
            print(f"\n⚠️  Errors encountered:")
            for error_type, count in result.error_summary.items():
                print(f"    {error_type}: {count}")
        ```

        Handling incomplete runs:
        ```python
        result = client.run_batched_evaluation(...)

        if not result.completed:
            print("⚠️  Evaluation incomplete!")

            if result.resume_token:
                print(f"Processed {result.resume_token.items_processed} items before failure")
                print(f"Use resume_from parameter to continue from:")
                print(f"  Timestamp: {result.resume_token.last_processed_timestamp}")
                print(f"  Last ID: {result.resume_token.last_processed_id}")

        if result.has_more_items:
            print(f"ℹ️  More items available beyond max_items limit")
        ```

        Performance monitoring:
        ```python
        result = client.run_batched_evaluation(...)

        items_per_second = result.total_items_processed / result.duration_seconds
        avg_scores_per_item = result.total_scores_created / result.total_items_processed

        print(f"Performance metrics:")
        print(f"  Throughput: {items_per_second:.2f} items/second")
        print(f"  Avg scores/item: {avg_scores_per_item:.2f}")
        print(f"  Total duration: {result.duration_seconds:.2f}s")

        if result.total_evaluations_failed > 0:
            failure_rate = result.total_evaluations_failed / (
                result.total_items_processed * len(result.evaluator_stats)
            )
            print(f"  Evaluation failure rate: {failure_rate:.1%}")
        ```

    Note:
        All arguments must be passed as keywords when instantiating this class.
    total_items_fetchedtotal_items_processedtotal_items_failedr6   total_composite_scores_createdtotal_evaluations_failedevaluator_statsresume_token	completedduration_secondsfailed_item_idserror_summaryhas_more_itemsitem_evaluationsr   c                    || _         || _        || _        || _        || _        || _        || _        || _        |	| _        |
| _	        || _
        || _        || _        || _        dS )a  Initialize BatchEvaluationResult with comprehensive statistics.

        Args:
            total_items_fetched: Total items fetched from API.
            total_items_processed: Items successfully evaluated.
            total_items_failed: Items that failed evaluation.
            total_scores_created: Scores from item-level evaluators.
            total_composite_scores_created: Scores from composite evaluator.
            total_evaluations_failed: Individual evaluator failures.
            evaluator_stats: Per-evaluator statistics.
            resume_token: Token for resuming (None if completed).
            completed: Whether all items were processed.
            duration_seconds: Total execution time.
            failed_item_ids: IDs of failed items.
            error_summary: Error types and counts.
            has_more_items: Whether more items exist beyond max_items.
            item_evaluations: Dictionary mapping item IDs to their evaluation results.

        Note:
            All arguments must be provided as keywords.
        NrE   rF   rG   r6   rH   rI   rJ   rK   rL   rM   rN   rO   rP   rQ   )r   rE   rF   rG   r6   rH   rI   rJ   rK   rL   rM   rN   rO   rP   rQ   s                  r   r   zBatchEvaluationResult.__init__  sz    N $7 %:""4$8!.L+(@%.(" 0.*, 0r   r*   c           
         g }|                     d           |                     d           |                     d           |                     d| j        rdnd            |                     d| j        dd           |                     d	| j                    |                     d
| j                    | j        dk    r|                     d| j                    | j        dk    r,| j        | j        z  dz  }|                     d|dd           |                     d| j                    | j        dk    r|                     d| j                    | j        | j        z   }|                     d|            | j        r|                     d           | j        D ]}|                     d|j	         d           |j
        dk    r|j
        dk    r|j        |j
        z  dz  nd}|                     d|j         d|j
         d|dd           |                     d|j                    |j        dk    r|                     d|j                    | j        dk    r|| j        dk    rq| j        | j        z  }|                     d           |                     d|dd           | j        dk    r(| j        | j        z  }|                     d |d           | j        rO|                     d!           | j                                        D ] \  }}|                     d| d"|            !| j        su|                     d#           | j        rY|                     d$| j        j                    |                     d%| j        j                    |                     d&           | j        r|                     d'           |                     d           d(                    |          S ))zReturn a formatted string representation of the batch evaluation results.

        Returns:
            A multi-line string with a summary of the evaluation results.
        z<============================================================zBatch Evaluation Resultsz	
Status: 	Completed
Incompletez
Duration: .2fsz
Items fetched: zItems processed: r   zItems failed: d   zSuccess rate: .1f%z
Scores created: zComposite scores: zTotal scores: z
Evaluator Performance:z  :z
    Runs: / (z
% success)z    Scores created: z    Failed runs: z
Performance:z  Throughput: z items/secondz  Avg scores per item: z
Errors encountered:: z
Warning: Evaluation incompletez  Last processed: z  Items processed: z'  Use resume_from parameter to continuez2
Note: More items available beyond max_items limit
)appendrL   rM   rE   rF   rG   r6   rH   rJ   r7   r3   r4   r5   rO   itemsrK   r>   r@   rP   join)	r   linessuccess_ratetotal_scoresstatsitems_per_sec
avg_scores
error_typecounts	            r   __str__zBatchEvaluationResult.__str__  s    X/000X 	S"Q++\SSTTT>$"7>>>>???C)ACCDDDE)CEEFFF"Q&&LLC$*ACCDDD #a''58PPSVVLLL=,====>>> 	E$*CEEFFF.22LLSd.QSSTTT043VV4l44555  	NLL3444- N N/%*///000#a'' !+a// -0@@3FF !
 LL9U%: 9 9U=M 9 9(89 9 9   LL!T8R!T!TUUU(1,,%L9J%L%LMMM %))d.Ca.G.G 69NNMLL)***LLJ-JJJJKKK(1,,!69SS
GzGGGHHH  	9LL0111%)%7%=%=%?%? 9 9!
E7*77778888 ~ 	HLL;<<<  HU):)SUU   V43D3TVVWWWFGGG 	PLLNOOOXyyr   N)r    r!   r"   r#   r9   r   r2   r   r;   boolfloatr$   r   r   rl   r%   r   r   rD   rD   @  s
       c cJ41 !41  #	41
  41 "41 ),41 #&41 n-41 9:41 41  41 c41 CH~41 41  sD$667!41 41 41 41lQ  Q  Q  Q  Q  Q  Q r   rD   c            !          e Zd ZdZdIdZdddddddd	dd
d	dddededee         de	e         de
de	e         de	e
         de
de	e         de	eeef                  dede	ee                  de
dede	e         def dZdede	e         de
de
de
de	e         deeeef                  fd Zd!eeef         dededee         de	e         de	eeef                  ded"eeef         dee
e
e
ee         f         fd#Zd$ed%edee         fd&Zded!eeef         defd'Zded(e	e         d)e	e         d*e	e         de	eeef                  d+ee         dee         fd,Zdd	d-ded.ed/e	e         d0ed1e	eeef                  d2ede
fd3Zd4e	e         de	e         de	e         fd5Ze d!eeef         dedefd6            Z!e d!eeef         dedefd7            Z"e dedefd8            Z#e d9e	ee                  dee         fd:            Z$d;e
d<e
d=e
d>e
d?e
d@e
d"eeef         dAe	e         dBedCe%dDee         dEeee
f         dFedGeeee         f         defdHZ&dS )JBatchEvaluationRunneraM  Handles batch evaluation execution for a Langfuse client.

    This class encapsulates all the logic for fetching items, running evaluators,
    creating scores, and managing the evaluation lifecycle. It provides a clean
    separation of concerns from the main Langfuse client class.

    The runner uses a streaming/pipeline approach to process items in batches,
    avoiding loading the entire dataset into memory. This makes it suitable for
    evaluating large numbers of items.

    Attributes:
        client: The Langfuse client instance used for API calls and score creation.
    clientr   c                     || _         dS )zqInitialize the batch evaluation runner.

        Args:
            client: The Langfuse client instance.
        N)rq   )r   rq   s     r   r   zBatchEvaluationRunner.__init__?  s     r   N2   io   F   )r=   fetch_batch_sizefetch_trace_fields	max_itemsmax_concurrencycomposite_evaluatorr    _add_observation_scores_to_trace_additional_trace_tagsmax_retriesverboseresume_fromr<   mapper
evaluatorsr=   rw   rx   ry   rz   r{   r   r|   r}   r~   r   r   r*   c                j
   	
567K   t          j                     }d}d}d}d}d}d}g }i }i }d D             5                     ||          }|                     |          ng }t                      }t	          j        |          7d}d}d}d} |r]t          j        d            dk    r|rt          j        d|            |r%t          j        d	|j         d
|j	         d           |r|$||k    r|rt          j        d| d           d}ny	  
                    |||||           d{V }!nq# t          $ rd}"d| d}#t          j        |# d|"            t          ||pd| pd|          }$                     ||||||5|$d|||||          cY d}"~"S d}"~"ww xY w|!sd}|rt          j        d           n|t          |!          z  }|r(t          j        d| d
t          |!           d           |!}%|K||z
  }&t          |!          |&k    r3|!d|&         }%|r't          j        dt          |%           d|            dt           t"          t$          f         dt&          t(          t           t&          t*          t*          t*          t,          t.                   f         t          f         f         f	5
 7f	d66fd|%D             }'t	          j        |'  d{V }(t3          |%|(          D ]\  })\  }*}+t5          |+t                    re|dz  }|                    |*           t9          |+          j        },|                    |,d          dz   ||,<   t          j        d|* d |+            |dz  }|+\  }-}.}/}0||-z  }||.z  }||/z  }|0||*<   |rYdk    r|*ntA          t$          |)          j!        }1|1r5|1|vr1 j"        #                    |1|!           |$                    |1            %                    |)          }|*} |rN|1|dk    r+||z  d"z  }2t          j        d#| d$| d%|2d&d'| d(	           nt          j        d#| d)| d(           t          |!          |k     rd}n|dz  }|	||k    rd}n||rt          j        d*            j"        &                                 t          j                     |z
  }3|rt          j        d+| d,|3d-d.           | p	|duo||k    }4                     ||||||5d|4||||o	|duo||k    |          S )/ar  Run batch evaluation asynchronously.

        This is the main implementation method that orchestrates the entire batch
        evaluation process: fetching items, mapping, evaluating, creating scores,
        and tracking statistics.

        Args:
            scope: The type of items to evaluate ("traces", "observations").
            mapper: Function to transform API response items to evaluator inputs.
            evaluators: List of evaluation functions to run on each item.
            filter: JSON filter string for querying items.
            fetch_batch_size: Number of items to fetch per API call.
            fetch_trace_fields: Comma-separated list of fields to include when fetching traces. Available field groups: 'core' (always included), 'io' (input, output, metadata), 'scores', 'observations', 'metrics'. If not specified, all fields are returned. Example: 'core,scores,metrics'. Note: Excluded 'observations' or 'scores' fields return empty arrays; excluded 'metrics' returns -1 for 'totalCost' and 'latency'. Only relevant if scope is 'traces'. Default: 'io'
            max_items: Maximum number of items to process (None = all).
            max_concurrency: Maximum number of concurrent evaluations.
            composite_evaluator: Optional function to create composite scores.
            metadata: Metadata to add to all created scores.
            _add_observation_scores_to_trace: Private option to duplicate
                observation-level scores onto the parent trace.
            _additional_trace_tags: Private option to add tags on traces via
                ingestion trace-create events.
            max_retries: Maximum retries for failed batch fetches.
            verbose: If True, log progress to console.
            resume_from: Resume token from a previous failed run.

        Returns:
            BatchEvaluationResult with comprehensive statistics.
        r   c           
      l    i | ]1}t          |d d          t          t          |d d                    2S )r    unknown_evaluator)r7   )getattrr2   ).0	evaluators     r   
<dictcomp>z3BatchEvaluationRunner.run_async.<locals>.<dictcomp>  s[      
  
  
  Iz+>??Y
4GHHB B B 
  
  
r   N   TzStarting batch evaluation on traceszFetching trace fields: zResuming from r^   z items already processed)zReached max_items limit ())r<   r=   pagelimitr~   fieldszFailed to fetch batch after z retriesr_    rB   F)rE   rF   rG   r6   rH   rI   evaluator_stats_dictrK   rL   
start_timerN   rO   rP   rQ   zNo more items to fetchzFetched batch z items)zLimiting batch to z items to respect max_items=r(   r*   c                 :  	K   4 d{V                       | 
          }	                     | 
	           d{V }||fcddd          d{V  S # t          $ r }||fcY d}~cddd          d{V  S d}~ww xY w# 1 d{V swxY w Y   dS )z3Process a single item and return (item_id, result).N)r(   r<   r   r   r{   r   r|   r   )_get_item_id_process_batch_evaluation_item	Exception)r(   item_idresulter|   r{   r   r   r   r   r<   r   	semaphores       r   process_itemz5BatchEvaluationRunner.run_async.<locals>.process_item  s      % , , , , , , , ,"//e<<G,'+'J'J!%"'#)'10C%-=]1E (K 	( 	( 	" 	" 	" 	" 	" 	" !(0, , , , , , , , , , , , , , % , , , '|+++++, , , , , , , , , , , , , ,,, , , , , , , , , , , , , , , ,s:   B
&A
B'B*B+B
BB


BBc                 &    g | ]} |          S r%   r%   )r   r(   r   s     r   
<listcomp>z3BatchEvaluationRunner.run_async.<locals>.<listcomp>  s#    EEED\\$''EEEr   zItem z	 failed: )trace_idtagsrY   z
Progress: r]   z items (rZ   z%), z scores createdz items processed, zFlushing scores to Langfuse...zBatch evaluation complete: z items processed in rW   rX   )'time_build_timestamp_filter_dedupe_tagssetasyncio	Semaphoreloggerinfor>   r@   _fetch_batch_with_retryr   errorr;   _build_resultlenr   r   r   r   r$   r9   r   r   gatherzip
isinstancera   typer    getwarningr   r   rq    _create_trace_tags_via_ingestionadd_get_item_timestampflush)8r   r<   r   r   r=   rw   rx   ry   rz   r{   r   r|   r}   r~   r   r   r   rE   rF   rG   r6   rH   rI   rN   rO   rQ   effective_filter normalized_additional_trace_tagsupdated_trace_idsr   has_morelast_item_timestamplast_item_idrb   r   	error_msgrK   items_to_processremaining_capacitytasksresultsr(   r   r   rj   scores_createdcomposite_createdevals_failedr/   r   progress_pctdurationcompleted_successfullyr   r   r   s8   ````     ```                                         @@@r   	run_asynczBatchEvaluationRunner.run_asyncG  sf     ^ Y[[
   ! )*&#$ %'(*8: 
  
 (	 
  
  
  77LL &1 4555 	)
 '*ee %o66	 -1&* 	K???@@@  %7 J6HJJKKK O[%I O O#3O O O    a	$)<	)I)I JK HI H H HIII%"::+* +- ;            P;PPP		00Q001119!-@-FB&2&8b$9      ))(;*?'9)=3Q-E)=!-#)$3"/#+%5 *        >    :K 89993u::- JHTHHSZZHHHIII  %$%.1F%F"u:: 222',-@.@-@'A$ @5E1F1F @ @4=@ @  ,02BBC,sE%S#tJ7G(G"H)"STTU, , , , , , , , , , , , , ,, FEEE4DEEEE#NE2222222G ,//?+I+I %+ %+''wfi00 $+&!+&#**7333!%f!6J0=0A0A*a0P0PST0TM*-N#E7#E#EV#E#EFFFF *Q.) QN$5|[ )N:(26GG2,<, 1<$W-7 <  %00 $G!%&6!=!=!F ! $ <8I(I(I KHH)1%E I    .11(;;; +/*B*B4*O*O'#*LL (Y]]#89#Ds#JLKX%: X XY X X(WX X2FX X X   
 KA%: A A/A A A   5zz,,, 	 (-@I-M-M#HC  a	H  	:K8999 9;;+ 	K&.C & &%& & &   &. "
T!F&9Y&F 	 !! 3"71!5+I%=!5,!+'WYd2W7Ji7W-! " 
 
 	
s    !E 
F0AF+%F0+F0r   r   r   c                J  K   |dk    r?| j         j        j                            |||d|i|          }t          |j                  S |dk    rC| j         j        j        j                            |||d|i          }t          |j                  S d| }t          |          )a%  Fetch a batch of items with retry logic.

        Args:
            scope: The type of items ("traces", "observations").
            filter: JSON filter string for querying.
            page: Page number (1-indexed).
            limit: Number of items per page.
            max_retries: Maximum number of retry attempts.
            verbose: Whether to log retry attempts.
            fields: Trace fields to fetch

        Returns:
            List of items from the API.

        Raises:
            Exception: If all retry attempts fail.
        r   r~   )r   r   r=   request_optionsr   observations)r   r   r=   r   zInvalid scope: )	rq   apitracelistdatalegacyobservations_v1get_many
ValueError)	r   r<   r=   r   r   r~   r   responseerror_messages	            r   r   z-BatchEvaluationRunner._fetch_batch_with_retrys  s      6 H{,11!. < 2  H &&&n$${-=FF!. <	 G  H &&&5e55M]+++r   r(   r   c	                   K   d}	d}
d}|                      ||           d{V }g }|D ]}t          |dd          }||         }|xj        dz  c_        	 |                     ||j        |j        |j        |j                   d{V }|xj        dz  c_        |xj	        t          |          z  c_	        |                    |           # t          $ rP}|xj        dz  c_        |dz  }t          j        d| d|                     ||           d	|            Y d}~d}~ww xY w|                     ||          }|D ]@}|	|                     |||d
k    rt%          t&          |          j        nd|||          z  }	A|r|r	 |                     ||j        |j        |j        |j        |           d{V }|D ]@}|
|                     |||d
k    rt%          t&          |          j        nd|||          z  }
A|                    |           n1# t          $ r$}t          j        d| d	|            Y d}~nd}~ww xY w|	|
||fS )ad  Process a single item: map, evaluate, create scores.

        Args:
            item: The API response object to evaluate.
            scope: The type of item ("traces", "observations").
            mapper: Function to transform item to evaluator inputs.
            evaluators: List of evaluator functions.
            composite_evaluator: Optional composite evaluator function.
            metadata: Additional metadata to add to scores.
            _add_observation_scores_to_trace: Whether to duplicate
                observation-level scores at trace level.
            evaluator_stats_dict: Dictionary tracking evaluator statistics.

        Returns:
            Tuple of (scores_created, composite_scores_created, evaluations_failed, all_evaluations).

        Raises:
            Exception: If mapping fails or item processing encounters fatal error.
        r   Nr    r   r   r   z
Evaluator z failed on item r_   r   )r<   r   r   
evaluationadditional_metadataadd_observation_score_to_tracer   r   r   r   r/   z#Composite evaluator failed on item )_run_mapperr   r3   _run_evaluator_internalr   r   r   r   r4   r6   r   extendr   r5   r   r   r   _create_score_for_scoper   r   r   _run_composite_evaluator)r   r(   r<   r   r   r{   r   r|   r   r   composite_scores_createdevaluations_failedevaluator_inputsr/   r   evaluator_namerg   eval_resultsr   r   r   composite_evalscomposite_evals                          r   r   z4BatchEvaluationRunner._process_batch_evaluation_item  s     < #$  "&!1!1&$!?!??????? )+# 	 	I$Y
<OPPN(8E!%)%A%A*0+2$4$D-6 &B & &             %%*%%**c,.?.??**""<0000   !!Q&!!"a'"= = =((u55= =9:= =       	 ##D%00% 
	 
	Jd::N** .55>>%$,/O ; 	 	 	NN  	U; 	UU(,(E(E'*0+2$4$D-6 + )F ) ) # # # # # # '6 
 
N,0L0L# ' N22 "&&6!=!=!F!F!#1,47W 1M 	1 	1 	,, ""?3333 U U USWSSPQSSTTTTTTTTU $	
 	
s3   A6C
D&AD!!D&BH 
IH>>Ir   r)   c                    K    |di |}t          j        |          r| d{V }t          |t          t          f          r|gS t          |t
                    r|S g S )a  Run an evaluator function and normalize the result.

        Unlike experiment._run_evaluator, this version raises exceptions
        so we can track failures in our statistics.

        Args:
            evaluator: The evaluator function to run.
            **kwargs: Arguments to pass to the evaluator.

        Returns:
            List of Evaluation objects.

        Raises:
            Exception: If evaluator raises an exception (not caught).
        Nr%   r   iscoroutiner   dictr   r   )r   r   r)   r   s       r   r   z-BatchEvaluationRunner._run_evaluator_internal  s      ( $$V$$ v&& 	"!\\\\\\F ftZ011 	8O%% 	MIr   c                 Z   K    ||          }t          j        |          r| d{V S |S )a3  Run mapper function (handles both sync and async mappers).

        Args:
            mapper: The mapper function to run.
            item: The API response object to map.

        Returns:
            EvaluatorInputs instance.

        Raises:
            Exception: If mapper raises an exception.
        )r(   N)r   r   )r   r   r(   r   s       r   r   z!BatchEvaluationRunner._run_mapper;  sD      " T"""v&& 	 <<<<<<r   r   r   r   r/   c                    K    ||||||          }t          j        |          r| d{V }t          |t          t          f          r|gS t          |t
                    r|S g S )a  Run composite evaluator function (handles both sync and async).

        Args:
            composite_evaluator: The composite evaluator function.
            input: The input data provided to the system.
            output: The output generated by the system.
            expected_output: The expected/reference output.
            metadata: Additional metadata about the evaluation context.
            evaluations: List of item-level evaluations.

        Returns:
            List of Evaluation objects (normalized from single or list return).

        Raises:
            Exception: If composite evaluator raises an exception.
        r   Nr   )r   r{   r   r   r   r   r/   r   s           r   r   z.BatchEvaluationRunner._run_composite_evaluatorQ  s      2 %$+#
 
 
 v&& 	"!\\\\\\F ftZ011 	8O%% 	MIr   )r   r   r   r   r   r   r   c          
         i |j         pi |pi }|dk    r<| j                            ||j        |j        |j        ||j        |j                   dS |dk    r| j                            |||j        |j        |j        ||j        |j                   d}|rA|r?| j                            ||j        |j        |j        ||j        |j                   |dz  }|S dS )am  Create a score linked to the appropriate entity based on scope.

        Args:
            scope: The type of entity ("traces", "observations").
            item_id: The ID of the entity.
            trace_id: The trace ID of the entity; required if scope=observations
            evaluation: The evaluation result to create a score from.
            additional_metadata: Additional metadata to merge with evaluation metadata.
            add_observation_score_to_trace: Whether to duplicate observation
                score on parent trace as well.

        Returns:
            Number of score events created.
        r   )r   r7   valuecommentr   	data_type	config_idr   r   )observation_idr   r7   r   r   r   r   r   r   )r   rq   create_scorer7   r   r   r   r   )	r   r<   r   r   r   r   r   score_metadatascore_counts	            r   r   z-BatchEvaluationRunner._create_score_for_scope|  s;   2
"(b
"(b

 HK$$ _ &"*'$.$. %    1n$$K$$&!_ &"*'$.$. % 	 	 	 K- 
!( 
!((%#$*&.+(2(2 )    q qr   original_filterc                    |s|S 	 |rt          j        |          ng }t          |t                    s+t	          j        dt          |          j                    g }n.# t           j        $ r t	          j        d|            g }Y nw xY w| 	                    |j
                  }d|d|j        d}|                    |           t          j        |          S )aB  Build filter with timestamp constraint for resume capability.

        Args:
            original_filter: The original JSON filter string.
            resume_from: Optional resume token with timestamp information.

        Returns:
            Modified filter string with timestamp constraint, or original filter.
        z$Filter should be a JSON array, got: z+Invalid JSON in original filter, ignoring: datetime>)r   columnoperatorr   )jsonloadsr   r   r   r   r   r    JSONDecodeError_get_timestamp_field_for_scoper<   r>   ra   dumps)r   r   r   filter_listtimestamp_fieldtimestamp_filters         r   r   z-BatchEvaluationRunner._build_timestamp_filter  s     	#""	9HP$*_555bKk400 !W4;L;L;UWW   !# 	 	 	NOoOO   KKK		 ==k>OPP% 9	
 
 	+,,,z+&&&s   AA (B
	B
c                     | j         S )zExtract ID from item based on scope.

        Args:
            item: The API response object.
            scope: The type of item.

        Returns:
            The item's ID.
        )idr(   r<   s     r   r   z"BatchEvaluationRunner._get_item_id  s     wr   c                     |dk    r*t          | d          r| j                                        S n/|dk    r)t          | d          r| j                                        S dS )zExtract timestamp from item based on scope.

        Args:
            item: The API response object.
            scope: The type of item.

        Returns:
            ISO 8601 timestamp string.
        r   	timestampr   r   r   )hasattrr  	isoformatr   r  s     r   r   z)BatchEvaluationRunner._get_item_timestamp  sp     Ht[)) 2~//1112n$$t\** 300222rr   c                 &    | dk    rdS | dk    rdS dS )zGet the timestamp field name for filtering based on scope.

        Args:
            scope: The type of items.

        Returns:
            The field name to use in filters.
        r   r  r   r   r%   )r<   s    r   r   z4BatchEvaluationRunner._get_timestamp_field_for_scope  s)     H;n$$<{r   r   c                     | g S g }t                      }| D ]0}||vr*|                    |           |                    |           1|S )z(Deduplicate tags while preserving order.)r   ra   r   )r   dedupedseentags       r   r   z"BatchEvaluationRunner._dedupe_tags%  s[     <Iuu 	 	C$s###r   rE   rF   rG   r6   rH   rI   rK   rL   r   rN   rO   rP   rQ   c                     t          j                     |
z
  }t          ||||||t          |                                          ||	|||||          S )a  Build the final BatchEvaluationResult.

        Args:
            total_items_fetched: Total items fetched.
            total_items_processed: Items successfully processed.
            total_items_failed: Items that failed.
            total_scores_created: Scores from item evaluators.
            total_composite_scores_created: Scores from composite evaluator.
            total_evaluations_failed: Individual evaluator failures.
            evaluator_stats_dict: Per-evaluator statistics.
            resume_token: Resume token if incomplete.
            completed: Whether evaluation completed fully.
            start_time: Start time (unix timestamp).
            failed_item_ids: IDs of failed items.
            error_summary: Error type counts.
            has_more_items: Whether more items exist.
            item_evaluations: Dictionary mapping item IDs to their evaluation results.

        Returns:
            BatchEvaluationResult instance.
        rS   )r   rD   r   values)r   rE   rF   rG   r6   rH   rI   r   rK   rL   r   rN   rO   rP   rQ   r   s                   r   r   z#BatchEvaluationRunner._build_result4  sk    L 9;;+$ 3"71!5+I%= !5!<!<!>!>??%%+')-
 
 
 	
r   )rq   r   )'r    r!   r"   r#   r   r$   r'   r   r   r   r9   r.   r   r   rm   r;   rD   r   r   r   r   r   r2   r   r   r   r   r   r   r   r   r   staticmethodr   r   r   r   rn   r   r%   r   r   rp   rp   0  s            !% ",0#' DH-1166:<@#j
 j
 j
 j
 	j

 *+j
 j
 j
 %SMj
 C=j
 j
 &&@Aj
 4S>*j
 +/j
 !)c 3j
 j
  !j
" 89#j
$ 
%j
 j
 j
 j
X	., ., 	.,
 ., ., ., ., 
e(*::;	<., ., ., .,`t
(*::;t
 t
 	t

 *+t
 &&@At
 4S>*t
 +/t
 #3#67t
 
sCd:..	/t
 t
 t
 t
l $    
j		       D (*::; 
	   ,)7) }) 	)
 "#) 4S>*) *%) 
j	) ) ) )` #' 05D D D D 	D
 3-D D &d38n5D )-D 
D D D DL)'!#)' 89)' 
#	)' )' )' )'V (*::; 
   \ (*::; 
   \. c c    \ 8DI. 49    \7
 7
  #7
  	7

 "7
 ),7
 #&7
 #3#677
 9:7
 7
 7
 c7
 CH~7
 7
 sD$4457
  
!7
 7
 7
 7
 7
 7
r   rp   )"r#   r   r   r   typingr   r   r   r   r   r   r	   r
   r   r   r   langfuse.apir   r   langfuse.experimentr   r   langfuse.loggerr   r   langfuse._client.clientr   r   r'   r.   r2   r;   rD   rp   r%   r   r   <module>r     s                                       > = = = = = = = 5 5 5 5 5 5 1000000R! R! R! R! R! R! R! R!jX X X X XX X X Xve e e e e e e ePF9 F9 F9 F9 F9 F9 F9 F9Rw/ w/ w/ w/ w/ w/ w/ w/tm  m  m  m  m  m  m  m `{
 {
 {
 {
 {
 {
 {
 {
 {
 {
r   