o
    3i                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZmZmZ zd dl	m
Z
 W n ey3   d dlm
Z
 Y nw dZdZG dd deZdS )	    N)Thread)APIErrorDatetimeSerializer
batch_post)Emptyi  i  P c                   @   s\   e Zd ZdZedZ								dd	d
Zdd Zdd Z	dd Z
dd Zdd ZdS )Consumerz.Consumes the messages from the client's queue.posthogd   N      ?F
      c                 C   sV   t |  d| _|| _|| _|| _|| _|| _|| _|| _	d| _
|| _|	| _|
| _dS )zCreate a consumer thread.TN)r   __init__daemonflush_atflush_intervalapi_keyhoston_errorqueuegziprunningretriestimeouthistorical_migration)selfr   r   r   r   r   r   r   r   r   r    r   ^/lsinfo/ai/hellotax_ai/llm_service/venv_embed/lib/python3.10/site-packages/posthog/consumer.pyr      s   

zConsumer.__init__c                 C   s0   | j d | jr|   | js	| j d dS )zRuns the consumer.zconsumer is running...zconsumer exited.N)logdebugr   uploadr   r   r   r   run;   s
   zConsumer.runc                 C   s
   d| _ dS )zPause the consumer.FN)r   r    r   r   r   pauseC   s   
zConsumer.pausec                 C   s   d}|   }t|dkrdS zWz	| | d}W n@ tyX } z4| jd| d}| jrNz| || W n tyM } z| jd| W Y d}~nd}~ww W Y d}~nd}~ww W |D ]}| j  q\|S |D ]}| j  qhw )z:Upload the next batch of items, return whether successful.Fr   Tzerror uploading: %szon_error handler failed: %sN)	nextlenrequest	Exceptionr   errorr   r   	task_done)r   successbatcheitemr   r   r   r   G   s4   

zConsumer.uploadc                 C   s   | j }g }t }d}t|| jk rot | }|| jkr 	 |S z=|jd| j| d}ttj|t	d
 }|tkrE| jdt| W q|| ||7 }|tkr\| jd| W |S W n
 tyg   Y |S w t|| jk s|S )z)Return the next batch of items to upload.r   T)blockr   )clsz)Item exceeds 900kib limit, dropping. (%s)zhit batch size limit (size: %d))r   time	monotonicr$   r   r   getjsondumpsr   encodeMAX_MSG_SIZEr   r'   strappendBATCH_SIZE_LIMITr   r   )r   r   items
start_time
total_sizeelapsedr,   	item_sizer   r   r   r#   `   s<   

zConsumer.nextc                 C   s   dd }d}t | jd D ]O}zt| j| j| j| j|| jd W  dS  ty\ } z.|}||s1 || jk rRt	|dd}|rH|dkrHt
| n
t
td| d	 W Y d}~qd}~ww |ra|dS )
z=Attempt to upload the batch and retry before raising an errorc                 S   sB   t | tr| jdkrdS d| j  kodk n  o| jdv S dS )NzN/AFi  i  )i  i  T)
isinstancer   status)excr   r   r   is_retryable   s
   

&z&Consumer.request.<locals>.is_retryableN   )r   r   r*   r   retry_afterr         )ranger   r   r   r   r   r   r   r&   getattrr/   sleepmin)r   r*   rA   last_excattemptr+   rC   r   r   r   r%   ~   s6   
zConsumer.request)r	   NNr
   Fr   r   F)__name__
__module____qualname____doc__logging	getLoggerr   r   r!   r"   r   r#   r%   r   r   r   r   r      s"    

!r   )r2   rP   r/   	threadingr   posthog.requestr   r   r   r   r   ImportErrorQueuer5   r8   r   r   r   r   r   <module>   s    