o
    Q5i                     @   s`   d dl Z d dlZd dlmZmZ d dlmZ d dlmZm	Z	m
Z
 G dd dZG dd dZdS )	    N)Optional	Generator)Cache)EmbeddingReturnTypePrioritizedQueueItemQueueItemInnerc                
   @   s\   e Zd ZdZdddZdd Zdee fd	d
Z	dde	de	de
ee ddf fddZdS )CustomFIFOQueuez%Class which defines a custom orderingreturnNc                 C   s   t  | _g | _t  | _dS ) N)	threadingLock_lock_queue_event_queueEvent_sync_eventself r   j/lsinfo/ai/hellotax_ai/llm_service/venv_embed/lib/python3.10/site-packages/infinity_emb/inference/queue.py__init__   s   
zCustomFIFOQueue.__init__c                 C   s
   t | jS N)lenr   r   r   r   r   __len__   s   
zCustomFIFOQueue.__len__itemsc                 C   s@   | j  | j| W d    n1 sw   Y  | j  d S r   )r   r   extendr   set)r   r   r   r   r   r      s   zCustomFIFOQueue.extend   皙?sizemax_n_batchesc           	      k   s    | j s| j|sdS || }| j | j d| }| j |d | _ | j s+| j  W d   n1 s5w   Y  t||krD|  dd |D }tdt||D ]}||||  V  qSdS )a  
        pop batch `up to size` + `continuous (sorted)` from queue

        Args:
            size (int): max size of batch
            max_n_batches: number of batches to be popped and sorted.
            timeout (float, optional): timeout until None is returned. Defaults to 0.2.
            latest_first (bool, optional): guarantees processing of oldest item in list.
                As latest first requires getting argmin of created timestamps,
                which is slow.  Defaults to False.

        returns:
            None: if there is not a single item in self._queue after timeout
            else: list[EmbeddingInner] with len(1<=size)
        Nc                 S   s   g | ]}|j j s|j qS r   )itemfuturedone).0mir   r   r   
<listcomp>G   s
    z7CustomFIFOQueue.pop_optimal_batches.<locals>.<listcomp>r   )r   r   waitr   clearr   sortrange)	r   r   r   timeoutkwargssize_batchesnew_items_l	new_itemsir   r   r   pop_optimal_batches#   s(   
z#CustomFIFOQueue.pop_optimal_batches)r	   N)r   r   )__name__
__module____qualname____doc__r   r   listr   r   intr   r   r0   r   r   r   r   r      s    
r   c                   @   s>   e Zd Zddee ddfddZdd Zdedefd	d
Z	dS )ResultKVStoreFutureNcacher	   c                 C   s
   || _ dS )zholds instance of CacheN)_cache)r   r8   r   r   r   r   P   s   
zResultKVStoreFuture.__init__c                 C   s   dS )
deprecatedr   r   r   r   r   r   r   T   s   zResultKVStoreFuture.__len__r    c                    s&   | j rt| j | |jI dH S )zwait for future to returnN)r9   asynciocreate_taskaget_completer!   )r   r    r   r   r   wait_for_responseX   s   z%ResultKVStoreFuture.wait_for_responser   )
r1   r2   r3   r   r   r   r   r   r   r>   r   r   r   r   r7   O   s    r7   )r;   r   typingr   r   $infinity_emb.inference.caching_layerr   infinity_emb.primitivesr   r   r   r   r7   r   r   r   r   <module>   s   ?