o
    3/iY                     @   s~  d dl mZ d dlmZ d dlmZ d dlmZmZm	Z	 d dl
mZmZmZ d dlmZmZmZ d dlmZmZ d dlmZmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dlm Z! d dlm"Z# dd Z$dededefddZ%dd Z&G dd deZ'dede(fddZ)G dd  d Z*ed!d"G d#d$ d$eZe$eee_+ed!d"G d%d& d&e*eZed!d"G d'd( d(e*eZed!d"G d)d* d*e*eZe$eee_+ed!d"G d+d, d,e*eZe$eee_+ed!d"G d-d. d.e*e!Z e$e!e e _+ed!d"G d/d0 d0e*e#Z"e$e#e"e"_+ed!d"G d1d2 d2e*eZ,e$ee,e,_+G d3d4 d4eZ-ed!d"G d5d6 d6eZ.dS )7    )Optional)	dataclassN)BooleanInt32
const_expr)if_generateand_dsl_user_op)MbarrierArrayCooperativeGroup
PipelineOp)PipelineStatePipelineUserType)Agent
agent_sync)NamedBarrier)PipelineAsync)PipelineCpAsync)PipelineTmaAsync)PipelineTmaUmma)PipelineUmmaAsync)PipelineAsyncUmmac                    s   t  fdd}|S )zPCreate a static factory that constructs parent_cls then re-classes to child_cls.c                     s"   j | i |}t|d  |S )N	__class__)createobject__setattr__)argskwargsobj	child_cls
parent_cls [/lsinfo/ai/hellotax_ai/llm_service/venv_vllm/lib/python3.10/site-packages/quack/pipeline.pyr      s   z _override_create.<locals>.create)staticmethod)r!   r    r   r"   r   r#   _override_create   s   r%   indexphasereturnc                 C   s   t dtd| |dS )zPConstruct a PipelineState from index and phase (count/stages unused by callers).r   )stagescountr&   r'   )r   r   )r&   r'   r"   r"   r#   _make_state%   s   r+   c                 C   sn   t |r-t |rtj  tj  | ||||d W d   dS 1 s&w   Y  dS | ||||d dS )zGOptionally wrap a parent pipeline method call in sync_warp + elect_one.locipN)r   cutearch	sync_warp	elect_one)parent_methodselfstater2   syncwarpr-   r.   r"   r"   r#   _call_with_elect_one*   s   
"r7   c                   @   s.   e Zd ZeddddefddZdd ZdS )PipelineStateWAdvanceNr,   num_iterationsc                C   sH   |  j t|7  _ | jt| }|| j }|  j|N  _|| j | _d S N)_countr   _indexr)   _phase)r4   r9   r-   r.   	new_indexnum_crossingsr"   r"   r#   advance_iters9   s
   
z#PipelineStateWAdvance.advance_itersc                 C   s(   t | jt|d t|d t|d S )Nr         )r8   r)   r   r4   valuesr"   r"   r#   __new_from_mlir_values__C   s   "z.PipelineStateWAdvance.__new_from_mlir_values__)__name__
__module____qualname__r	   r   r@   rE   r"   r"   r"   r#   r8   8   s    	r8   typer)   c                 C   sP   | t ju rt|tdtdtdS | t ju r$t|tdtdtdS J d)zz
    Creates a pipeline state. Producers are assumed to start with an empty buffer and have a flipped phase bit of 1.
    r   rA   FzBError: invalid PipelineUserType specified for make_pipeline_state.)r   Producerr8   r   Consumer)rI   r)   r"   r"   r#   make_pipeline_stateI   s
   

rL   c                
   @   s   e Zd ZdZe	dddddededee fddZeddddefd	d
Z	e	dddddededee fddZ
eddddefddZdS )_PipelineIndexPhaseMixinz_Mixin providing _w_index_phase / _w_index methods that delegate to PipelineState-based parents.Nr,   r&   r'   try_acquire_tokenc                C       t ||}| j||||d d S Nr,   )r+   producer_acquire)r4   r&   r'   rN   r-   r.   r5   r"   r"   r#   producer_acquire_w_index_phase[      

z7_PipelineIndexPhaseMixin.producer_acquire_w_index_phasec                C   "   t |td}| j|||d d S Nr   r,   )r+   r   producer_commitr4   r&   r-   r.   r5   r"   r"   r#   producer_commit_w_indexh      z0_PipelineIndexPhaseMixin.producer_commit_w_indextry_wait_tokenc                C   rO   rP   )r+   consumer_wait)r4   r&   r'   rZ   r-   r.   r5   r"   r"   r#   consumer_wait_w_index_phasem   rS   z4_PipelineIndexPhaseMixin.consumer_wait_w_index_phasec                C   rT   rU   )r+   r   consumer_releaserW   r"   r"   r#   consumer_release_w_indexz   rY   z1_PipelineIndexPhaseMixin.consumer_release_w_indexr:   )rF   rG   rH   __doc__r	   r   r   r   rR   rX   r\   r^   r"   r"   r"   r#   rM   X   s<    rM   T)frozenc                   @   sR   e Zd ZeedZeddddeddfddZeddddeddfddZ	dS )	r   Nr,   r&   r(   c                C   s    t jj| j| | j||d dS )z
        The aligned flavor of arrive is used when all threads in the CTA will execute the
        same instruction. See PTX documentation.
        
barrier_idnumber_of_threadsr-   r.   N)r/   r0   barrier_arriverb   num_threadsr4   r&   r-   r.   r"   r"   r#   arrive_w_index   s   
zNamedBarrier.arrive_w_indexc                C   s    t jj| j| | j||d d S )Nra   )r/   r0   barrierrb   re   rf   r"   r"   r#   arrive_and_wait_w_index   s   
z$NamedBarrier.arrive_and_wait_w_index)
rF   rG   rH   r%   NamedBarrierOgr   r	   r   rg   ri   r"   r"   r"   r#   r      s    
 r   c                
   @   s   e Zd ZU dZdZeed< dZeed< dZeed< dZ	eed< e
dddddd	ed
ededefddZeddddefddZeddddefddZdS )r   a  
    PipelineAsync with optional elect_one for producer_commit and consumer_release.

    When elect_one_*=True (set at create time), only one elected thread per warp
    signals the barrier arrive. This is useful when the mask count is set to 1 per warp.

    Args (to create):
        elect_one_commit: If True, only elected thread signals producer_commit.
        syncwarp_before_commit: If True (default), issue syncwarp before elect_one.
        elect_one_release: If True, only elected thread signals consumer_release.
        syncwarp_before_release: If True (default), issue syncwarp before elect_one.
            Set syncwarp to False when threads are already converged (e.g. after wgmma wait_group).
    F_elect_one_commitT_syncwarp_before_commit_elect_one_release_syncwarp_before_release)elect_one_commitsyncwarp_before_commitelect_one_releasesyncwarp_before_releasero   rp   rq   rr   c                 O   sZ   t j|i |}t|dt t|d|  t|d| t|d| t|d| |S )Nr   rk   rl   rm   rn   )PipelineAsyncOgr   r   r   r   )ro   rp   rq   rr   r   r   r   r"   r"   r#   r      s   	zPipelineAsync.createNr,   r5   c                C      t tj| || j| j|| d S r:   )r7   rs   rV   rk   rl   r4   r5   r-   r.   r"   r"   r#   rV         zPipelineAsync.producer_commitc                C   rt   r:   )r7   rs   r]   rm   rn   ru   r"   r"   r#   r]      rv   zPipelineAsync.consumer_release)rF   rG   rH   r_   rk   bool__annotations__rl   rm   rn   r$   r   r	   r   rV   r]   r"   r"   r"   r#   r      s0   
 r   c                   @   s^   e Zd ZU dZeed< dZeed< eddddedefdd	Ze	d
d
dde
fddZd
S )r   Frm   Trn   )rq   rr   rq   rr   c                 O   s>   t j|i |}t|dt t|d|  t|d| |S )Nr   rm   rn   )PipelineCpAsyncOgr   r   r   r   )rq   rr   r   r   r   r"   r"   r#   r      s
   zPipelineCpAsync.createNr,   r5   c                C   rt   r:   )r7   ry   r]   rm   rn   ru   r"   r"   r#   r]      rv   z PipelineCpAsync.consumer_release)rF   rG   rH   rm   rw   rx   rn   r$   r   r	   r   r]   r"   r"   r"   r#   r      s   
 r   c                	   @   <   e Zd ZdZe		d
ddddedee defdd	Z	dS )r   >Override producer_acquire to take in extra_tx_count parameter.Nr   r,   r5   rN   extra_tx_countc                   sx   t |du p|dk fdd d t|dkr)jjjj d dS jj| }jjj| d dS )
        TMA producer commit conditionally waits on buffer empty and sets the transaction barrier for leader threadblocks.
        Nr   c                         j jjj dS rP   sync_object_emptywaitr&   r'   r"   r.   r-   r4   r5   r"   r#   <lambda>       z3PipelineTmaAsync.producer_acquire.<locals>.<lambda>r,   )r   r   sync_object_fullarriver&   producer_masktx_countarrive_and_expect_tx)r4   r5   rN   r|   r-   r.   r   r"   r   r#   rQ     s   z!PipelineTmaAsync.producer_acquireNr   
rF   rG   rH   r_   r	   r   r   r   intrQ   r"   r"   r"   r#   r         r   c                	   @   rz   )r   r{   Nr   r,   r5   rN   r|   c                   s   t |du p|dk fdd d t|dkr-t j fdd d dS jj| t j fdd d dS )r}   Nr   c                      r~   rP   r   r"   r   r"   r#   r   D  r   z2PipelineTmaUmma.producer_acquire.<locals>.<lambda>r,   c                         j jjj dS rP   r   r   r&   r   r"   r   r"   r#   r   K  s    c                      s   j jj dS rP   )r   r   r&   r"   r.   r-   r4   r5   r   r"   r#   r   U  s    
)r   r   is_leader_ctar   r   )r4   r5   rN   r|   r-   r.   r"   r   r#   rQ   5  s(   
	
z PipelineTmaUmma.producer_acquirer   r   r"   r"   r"   r#   r   1  r   r   c                   @      e Zd ZdS )r   NrF   rG   rH   r"   r"   r"   r#   r   c      r   c                   @   r   )r   Nr   r"   r"   r"   r#   r   n  r   r   c                
   @   sZ   e Zd ZdZe		dddddedee dee fdd	Zeddddefd
dZ	dS )PipelineTmaCpAsyncz
    PipelineTmaCpAsync is used for CpAsync + TMA producers and AsyncThread consumers.
    Compared to PipelineTmaAsync, producer_acquire gates the full-barrier arrive on is_tma_warp.
    NTr,   r5   rN   is_tma_warpc                   sL   t |d u p|dk fdd d t | fdd d d S )Nr   c                      r~   rP   r   r"   r   r"   r#   r     r   z5PipelineTmaCpAsync.producer_acquire.<locals>.<lambda>r,   c                      r   rP   r   r"   r   r"   r#   r     r   )r   r4   r5   rN   r   r-   r.   r"   r   r#   rQ     s   

z#PipelineTmaCpAsync.producer_acquirec                C   "   t jj| j|||d||d dS )z9We need the mbarrier to track the completion of cp.async.r,   Nr/   r0   cp_async_mbarrier_arrive_noincproducer_get_barrierru   r"   r"   r#   producer_cpasync_commit  s   
z*PipelineTmaCpAsync.producer_cpasync_commitNT)
rF   rG   rH   r_   r	   r   r   r   rQ   r   r"   r"   r"   r#   r   y  s"    r   c                   @   s^   e Zd Ze		dddddejdedeee	f dede
e d	dfd
dZdd Zdd ZdS )MbarrierArrayWDropCountr   Nr,   barrier_storage
num_stagesagentr   
drop_countr(   c                C   s   || _ || _|| _|\| _| _| jj| _|| _| jdkr td| jdkr)td| jt	j
u r8| jdk r8tdt|d urD| j| | _| j | _| j||d d S )Nr   z3Error: Mbarrier stage count must be greater than 0.z4Error: Mbarrier arrive count must be greater than 0.z=Error: Mbarrier tx count must not be less than 0 for TMA ops.r,   )r   r   r   op_typecgsizearrive_countr   
ValueErrorr   TmaLoadr   mbarrier_basembarrier_init)r4   r   r   r   r   r   r-   r.   r"   r"   r#   __init__  s    


z MbarrierArrayWDropCount.__init__c                 C   s   | j | jgS r:   )r   r   )r4   r"   r"   r#   __extract_mlir_values__  s   z/MbarrierArrayWDropCount.__extract_mlir_values__c                 C   s$   t |d | j| j| jf| j|d S )Nr   rA   )r   r   r   r   r   rC   r"   r"   r#   rE     s   z0MbarrierArrayWDropCount.__new_from_mlir_values__)r   N)rF   rG   rH   r	   r/   Pointerr   tupler   r   r   r   r   r   rE   r"   r"   r"   r#   r     s,    

"r   c                   @   s   e Zd ZdZeedddddddddededed	ed
ej	de
ej deeef dede
e fddZe		dddddede
e de
e fddZeddddefddZdS )PipelineTmaCpAsyncUmmazr
    PipelineTmaCpAsync is used for CpAsync + TMA producers and UMMA consumers
    (e.g. Blackwell mainloops)
    N)rA   rA   F)r   cta_layout_vmnkmcast_mode_mn
defer_syncproducer_drop_countr-   r.   r   producer_groupconsumer_groupr   r   r   r   r   r   c              	   C   sT  t |tjstdt| tj}tj}||f}||f}t|j	dd| ||||	|
d}t
j|j	dd|  | ||	|
d}|du sJtj||	|
ddkrOd}d}nt
j|||	|
d}t
j||	|
d}|du sptj|d	g|	|
d
dkrvtjjjjntjjjj}|}|stj  |du stj||	|
ddkrttj nttjdd t||| ||||S )a  Creates and initializes a new PipelineTmaUmma instance.

        :param num_stages: Number of buffer stages for this pipeline
        :type num_stages: int
        :param producer_group: CooperativeGroup for the producer agent
        :type producer_group: CooperativeGroup
        :param consumer_group: CooperativeGroup for the consumer agent
        :type consumer_group: CooperativeGroup
        :param tx_count: Number of bytes expected to be written to the transaction barrier for one stage
        :type tx_count: int
        :param barrier_storage: Pointer to the shared memory address for this pipeline's mbarriers
        :type barrier_storage: cute.Pointer, optional
        :param cta_layout_vmnk: Layout of the cluster shape
        :type cta_layout_vmnk: cute.Layout, optional
        :param mcast_mode_mn: Tuple specifying multicast modes for m and n dimensions (each 0 or 1)
        :type mcast_mode_mn: tuple[int, int], optional
        :raises ValueError: If barrier_storage is not a cute.Pointer instance
        :return: A new PipelineTmaUmma instance configured with the provided parameters
        :rtype: PipelineTmaUmma
        z7Expected barrier_storage to be a cute.Pointer, but got    )	min_align)r   r-   r.   r,   NrA   Tr   )moder-   r.   )
is_relaxed)
isinstancer/   r   	TypeErrorrI   r   r   
TCGen05Mmar   alignPipelineTmaUmmaOg_make_sync_objectr   _compute_mcast_arrival_mask_compute_is_leader_ctanvgputcgen05CtaGroupONETWOr0   mbarrier_init_fencer   r   ThreadBlockThreadBlockClusterr   )r   r   r   r   r   r   r   r   r   r-   r.   producer_typeconsumer_typeproducerconsumerr   r   r   r   	cta_groupconsumer_maskr"   r"   r#   r     sh   $
	 

zPipelineTmaCpAsyncUmma.createTr,   r5   rN   r   c                   sT   t |du p|dk fdd d t tj| fdd d dS )z
        TMA producer commit conditionally waits on buffer empty and sets the
        transaction barrier for leader threadblocks.
        Nr   c                      r~   rP   r   r"   r   r"   r#   r   T  r   z9PipelineTmaCpAsyncUmma.producer_acquire.<locals>.<lambda>r,   c                      r   rP   r   r"   r   r"   r#   r   \  r   )r   r   r   r   r"   r   r#   rQ   D  s   

z'PipelineTmaCpAsyncUmma.producer_acquirec                C   r   )zJ
        We need the mbarrier to track the completion of cp.async
        r,   Nr   ru   r"   r"   r#   r   a  s   
z.PipelineTmaCpAsyncUmma.producer_cpasync_commitr   )rF   rG   rH   r_   r	   r$   r   r   r/   r   r   Layoutr   rw   r   r   r   r   rQ   r   r"   r"   r"   r#   r     sZ    
	
dr   )/typingr   dataclassesr   cutlass.cuter/   cutlassr   r   r   cutlass.cutlass_dslr   r   r	   cutlass.pipeliner
   r   r   r   r   r   r   r   rj   r   rs   r   ry   r   PipelineTmaAsyncOgr   r   r   PipelineUmmaAsyncOgr   PipelineAsyncUmmaOgr%   r+   r7   r8   r   rL   rM   r   r   r   r   r"   r"   r"   r#   <module>   s\   +D#+'0