o
    die                     @  sT  d dl mZ d dlmZmZ d dlmZmZmZ d dl	Z	d dl
mZ d dlmZ d dlmZmZmZ ddlmZmZmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZmZm Z m!Z!m"Z" ddl#m$Z$m%Z%m&Z&m'Z'm(Z( G dd deZ)G dd de)Z*G dd de*Z+G dd de*Z,G dd de)Z-G dd de)Z.dddZ/G dd dZ0dS )    )annotations)ABCabstractmethod)AnyDictListN)GraphGraphModuleNode   )ConfigParallelExecutionCtxParameterMeta)decompose_and_functionalize)scatter)REGISTRY$FallbackParallelAxisPropagateHandler)ColumnParallelLinearRowParallelLinearVocabParallelCrossEntropyLossVocabParallelEmbedding sharded_cross_entropy_wrapper_fn)is_cross_entropyis_embedding	is_linearis_shape_consumerstable_topological_sortc                   @  sD   e Zd ZU dZdZded< edddZedddZ	dddZ
dS )PassBasez9
    Base class for parallelization targeted passes.
    Tboolneed_rerun_when_recompilereturnstrc                 C  s   | j S N)__name__cls r&   o/lsinfo/ai/hellotax_ai/llm_service/venv_embed/lib/python3.10/site-packages/optimum/fx/parallelization/passes.py	signature4   s   zPassBase.signaturegraph_moduler	   ctxr   configr   c                 C  s   t )a  
        Args:
            graph_module (`GraphModule`):
                graph module before processing.
            ctx (`ParallelExecutionCtx`):
                dynamic execution context which gathers and preserves information along processing.
            config (`Config`):
                static config to include instructions which persists the whole process.

        Returns:
            GraphModule: graph module after processed by the current pass.
        )NotImplementedErrorselfr)   r*   r+   r&   r&   r'   run8   s   zPassBase.runc                 C  s@   | j s
|jdkr
|S | j|||d}|jr|j  |  |S )Nr   )r*   r+   )r   compile_timesr/   lint_and_recompilegraphlint	recompiler-   r&   r&   r'   __call__H   s   
zPassBase.__call__Nr    r!   r)   r	   r*   r   r+   r   r    r	   )r#   
__module____qualname____doc__r   __annotations__classmethodr(   r   r/   r5   r&   r&   r&   r'   r   -   s   
 r   c                   @  sZ   e Zd ZdZedddZeddddZed ddZd!ddZd"ddZ	d#ddZ
dS )$AnalyzeBasea=  
    Base class for passes which only runs for analytical purposes and preserve graph structure
    during processing. Analytical passes are often prerequisite passes which provide information
    for passes later on to actually change the graph.

    Passes inheriting from `AnalyzeBase` places the class signature as a meta key in `node.meta`,
    which is a dict storing meta information related with a fx Node, such as the shape and dtype of
    output. Look-up APIs are exposed as classmethod so that passes using them won't need to create
    concrete instances.
    r    r!   c                 C  s   |   S r"   )r(   r$   r&   r&   r'   meta_key`   s   zAnalyzeBase.meta_keyFnoder
   fieldr   	must_haver   c              	   C  s|   |  |s|s	d S td| j d| d| j d|j|   }||vr:|r8td| d| j dt|  d S || S )Nz$Can't find information related with z in the current node `z` make sure z has run and marked itzInvalid query field z for z, valid fields are )already_executed_per_nodeRuntimeErrorr#   metar>   KeyErrorlistkeys)r%   r?   r@   rA   infor&   r&   r'   get_stored_field_infoe   s   
$z!AnalyzeBase.get_stored_field_infoc                 C  s   |   |jv S r"   )r>   rD   )r%   r?   r&   r&   r'   rB   x   s   z%AnalyzeBase.already_executed_per_noderH   Dict[Any, Any]Nonec                 C  s,   |  |rtd| d||j|  < d S )NzNode zq has already been marked by the current pass, check if the current pass has already been executed in the pipeline)rB   rC   rD   r>   )r.   r?   rH   r&   r&   r'   place_marker_per_node|   s
   

z!AnalyzeBase.place_marker_per_nodec                 C  s&   |   }||jv r|j| d S d S r"   )r>   rD   pop)r.   r?   keyr&   r&   r'   clear_marker_per_node   s   
z!AnalyzeBase.clear_marker_per_noder)   r	   c                 C  s    |j }|jD ]}| | qd S r"   )r2   nodesrO   )r.   r)   gr?   r&   r&   r'   	clean_all   s   
zAnalyzeBase.clean_allNr6   )F)r?   r
   r@   r   rA   r   r    r   )r?   r
   r    r   )r?   r
   rH   rJ   r    rK   )r?   r
   r    rK   )r)   r	   r    rK   )r#   r8   r9   r:   r<   r>   rI   rB   rL   rO   rR   r&   r&   r&   r'   r=   T   s    

	r=   c                   @  s$   e Zd ZdZddd	ZdddZdS )ParallelAxisSolverPassaF  
    A pass which tries to automatically identify parallel layers in the graph. There are three steps
    involved to find a possible parallel solution given the traced graph module and process group.

        - Decompostion & Functionalization
            The vanilla graph traced by dynamo frontend is a high-level graph which contains high-level
            pytorch ops, and there could be thousands of them, which makes graph analysis hard in order
            to cover all cases. So we decompose the high-level graph into low-level graph which only
            conrtains core aten ops, which is a much smaller set. And functionalization is also needed
            to remove inplace ops in the graph so that we get `aten.Add` instead of `aten.Add_` in the
            graph, which furthur reduces the op set we need to consider.

        - Parallel Axis Propagation
            We need to write parallel axis propagation rules for aten ops in the decomposed and functionalized
            graph, note that we don't need to cover every possible parallelization strategy because in general
            only certain ops(usually involves computation) can be parallelized in transformer models. And we just
            need to write rules for a subset of core aten op set in order to support most of the transformer models.

        - Backtracking Search
            After we have defined parallel axis propagation rules for each op in the graph, we do a brute force
            backtracking search to try to find a possible solution which respects the propagation rule of every
            op in the graph.


        Note that there are several practical concerns

            - Time Complexity. Although brute force backtracking introduces an exponential time complexity, we reduces
                the search space by injecting human heuristics. First, we only consider parallelization on the head dimension
                (for tensor parallel) or the sequence dimension(to support sequence parallel), then at any time the tensor is
                parallelized on at most one dimension. Second, we only allow axis switch around certain layers(like `nn.Linear`
                or `nn.Embedding), and all other ops fall into their places by the parallel axis of their input and rules we write.

            - Optimal Solution. Note that since we return the first solution we find, then it might not be optimal in terms of
                memory consumption and communication overhead. But again we can adjust the order of search and try parallelize
                as much as we can first before fall back to non-parallelized search paths. And we don't pay too much attention
                on calculating communication overhead because in practice they are bounded under the constraint that only certain
                layers are allowed to communicate.

    Our goal is not to solve an optimization problem which tries to give a best solution of parallelizing any model under memory/hardware
    constraints, but rather a cheap solution which relieves you from writing boilerplate code for parallelizing layers of different models.
    r)   r	   decomp_graphr   r    rK   c              	   C  s   dd |j jD }|jD ]2}d|jv r>|jd d \}}||v s(J d| d|| }| | | |d| j|ddi qd S )	Nc                 S  s   i | ]}|j |qS r&   )name).0r?   r&   r&   r'   
<dictcomp>   s    z5ParallelAxisSolverPass.trace_back.<locals>.<dictcomp>traced_fromr   zun-recognized node origin z not in graph being tracedparallel_axisr@   )r2   rP   rD   rO   rL   rI   )r.   r)   rT   node_mapr?   	node_name_	orig_noder&   r&   r'   
trace_back   s   


z!ParallelAxisSolverPass.trace_backr*   r   r+   r   c                   sT   t ||j }t| t|jd fddds"td|| |S )Nidxintc                   s   | t krdS |  }|jdkrt|jrtj|j }nt}||  }| }|D ]}	|d|i | d rA dS 
| q.dS )NTcall_functionrY   r   F)lenopr   is_supportedtargetmappingr   r>   	propagaterL   rO   )r`   r?   prop_clspropaxis_candidatesaxisr+   rP   searchr.   r&   r'   rn      s   z*ParallelAxisSolverPass.run.<locals>.searchr   zRFailed to find a solution to automatically parallelize ops in graph in greedy way.)r`   ra   )r   example_inputsr   rF   rP   rC   r_   )r.   r)   r*   r+   r2   r&   rm   r'   r/      s   
zParallelAxisSolverPass.runN)r)   r	   rT   r   r    rK   r7   )r#   r8   r9   r:   r_   r/   r&   r&   r&   r'   rS      s    
*rS   c                   @     e Zd ZdZdd	d
ZdS )ParallelLayerAnnotatePassa  
    This pass annotates layers which have different parallel axis(requires communication inside the layer) in their
    input and output tensors. Since heuristics applied during the searching process respect traditional classical ways of
    parallelizing layers(like Megatron-style `ColumnLinear` or `RowLinear`), we are guaranteed to match a valid replacement
    annotation according to parallelization strategy of input and output tensors.
    r)   r	   r*   r   r+   r   r    c                 C  s  |j jD ]}t|rst|jd d}t|d}i }|d u r.d|d< |d u r)dnd|d< n>|dkrL|js9J d	d|d< d|d
< |d u rGdnd|d< n |dkrld|d< d|d< |dkrh|jscJ d	d|d
< nd|d
< | || qt|rt|jd d}t|d}|d u r|dv sJ ddi}|dkr|jsJ d	d|d
< nd|d
< | || qt	|rt|jd d}|d ur| |ddi q|S )Nr   rY   columnrl   TFgather_outputr   zBillegal parallel axis for sequence parallelism deactivated settingsequence_parallel   rowinput_is_parallel)r   Nvocab)
r2   rP   r   rS   rI   argsenable_sequence_parallelrL   r   r   )r.   r)   r*   r+   r?   axis_before
axis_afterrH   r&   r&   r'   r/      s\   

zParallelLayerAnnotatePass.runNr7   r#   r8   r9   r:   r/   r&   r&   r&   r'   rq      s    rq   c                   @  sR   e Zd ZdZeddd	Zedd
dZedddZedddZdddZ	dS )ParallelLayerReplacePassa^  
    A pass which modifies graph according to information provided by previous analytical passes, in general it does two things for now:
        1. replaces linears and embedding layers with their parallel counterparts.
        2. modifies hard-coded arguments like the number of attention heads in the graph by dividing it by parallelism level.
    r?   r
   r*   r   r    rK   c                 C  s   | j j}tj| dd}|d u rd S |dv sJ | jjddd}t|dkr1||d }|d }n|}| j}|| j}| j|j}}	||	v rL|	| }
n%|d	kr_tj| d
dd}t	|||}
ntj| ddd}t
|||}
|
|	|< t|||
 d S )Nrl   rZ   >   rv   rr   .r   maxsplitru   r   rr   rs   T)r@   rA   rw   )r2   owning_modulerq   rI   rf   rsplitrc   get_submoduleparallel_layer_cacher   r   setattr)r?   r*   r)   rl   prefix_and_field
parent_modr@   modrN   layer_cachenew_modrs   rw   r&   r&   r'   handle_linear*  s4   

z&ParallelLayerReplacePass.handle_linearc                 C  s   | j j}tj| dd}|d u rd S |dv sJ d| jjddd}t|dkr3||d	 }|d }n|}| j}|| j}| j|j}}	||	v rN|	| }
n|j	d	ksWJ d
t
||}
|
|	|< t|||
 d S )Nrl   rZ      rx   2Only support parallelization on vocab dim for now.r   r   r   ru   r   illegal path for recompilation)r2   r   rq   rI   rf   r   rc   r   r   r0   r   r   )r?   r*   r)   rl   r   r   r@   r   rN   r   r   r&   r&   r'   handle_embeddingL  s&   


z)ParallelLayerReplacePass.handle_embeddingc                 C  s   t j| dd}|d u rd S |dv sJ d| jdkro| jj}| jjddd}t|d	kr8||d
 }|d }n|}| j}|| j}| j|j	}}	||	v rS|	| }
n|j
d
ks\J dt||jd}
|
|	|< t|||
 d S t|jd| _d S )Nrl   rZ   r   r   call_moduler   r   r   ru   r   r   )	reduction)process_group)rq   rI   rd   r2   r   rf   r   rc   r   r   r0   r   r   r   r   tp_group)r?   r*   rl   r)   r   r   r@   r   rN   r   r   r&   r&   r'   handle_cross_entropyf  s*   


z-ParallelLayerReplacePass.handle_cross_entropyc                   s   ddd}d fd
d}t j| dd}|d u rd S ||  |t k s%J t | tr2 | dkr4d S |j } | | dksCJ  | |  |< ||  | d S )Nr?   r
   r    	List[Any]c                 S  s`   d| j v rt| j d S d| j v rt| j d S t| jd tr't| jd S t| jdd  S Nsizeshaper   )kwargsrF   
isinstancery   tuple)r?   r&   r&   r'   extract_shape_from_node  s   

zVParallelLayerReplacePass.handle_hard_coded_axis_param.<locals>.extract_shape_from_node	new_shaperY   ra   c                   sx   d| j v r| dt| d S d| j v r| dt| d S t| jd tr0| dt| d S | |d  |  d S r   )r   update_kwargr   r   ry   
update_arg)r?   r   rY   r   r&   r'   update  s   

zEParallelLayerReplacePass.handle_hard_coded_axis_param.<locals>.updaterZ   r   )r?   r
   r    r   )r?   r
   r   r   rY   ra   )rS   rI   rc   r   ra   r   r   )r?   r*   r   r   rY   
world_sizer&   r   r'   handle_hard_coded_axis_param  s   



z5ParallelLayerReplacePass.handle_hard_coded_axis_paramr)   r	   r+   r   c                 C  sh   |j jD ]-}t|r| || qt|r| || qt|r'| || qt|r1| 	|| q|S r"   )
r2   rP   r   r   r   r   r   r   r   r   )r.   r)   r*   r+   r?   r&   r&   r'   r/     s   zParallelLayerReplacePass.runN)r?   r
   r*   r   r    rK   r7   )
r#   r8   r9   r:   staticmethodr   r   r   r   r/   r&   r&   r&   r'   r~   #  s    !"r~   c                   @  rp   )InitializeOrLoadWeightsPassz
    Weights loading and intialization pass, will initialize parameters on current rank and load weights from disk
    if necessary.
    r)   r	   r*   r   r+   r   r    c                   s  t |jt |j}g i |j}}}t|jddD ]s\} ||v r0|||| f qt dj	rJt
 |v rJ|||t
  f q fddt jD }	jsd j|jkrd }
ntjtj|	 j|jd jd}
tj D ]u\j|jv rdd	lm} ||jj d
ddR}|j}fddt jD }fddt jD }||  }t||}t   |
j!| | W d    n1 sw   Y  W d    n1 sw   Y  q|j"rqtj D ]s\j|jv r	qjr|dkr2tj#j$ jdd}j%r$j%n|j&}|| |'|j}nd }fddt jD }t  " jrXt(|j||
j!| j)d n|
j!| | W d    n	1 skw   Y  qt*|
d t
|
t
 kr|||
f j	r|
|t
 < q|D ]2\}}
|j+ddd}t,|dkr|-|d }|d }n|}|}||vr|
||< t*|||
 q|S )NF)remove_duplicaterD   c                   s4   g | ]}|j krjr | n |qS r&   )dimis_parallelr   rV   r   )param
param_metar   r&   r'   
<listcomp>  s    &z3InitializeOrLoadWeightsPass.run.<locals>.<listcomp>)dtypedevice)requires_gradr   )	safe_openptcpu)	frameworkr   c                   *   g | ]}| j kr ntd d d qS r"   r   to_sliceslicer   r   sourcer&   r'   r         c                   s(   g | ]}| j krjntd d d qS r"   )r   indexr   r   )r   rf   r&   r'   r     s    c                   r   r"   r   r   r   r&   r'   r     r   )scatter_dimr   r   r   ru   ).distget_world_sizer   get_rankparam_cachesortednamed_parametersappendgetattris_tiedidrangendimr   r   current_devicenn	Parametertorchzerosr   r   rg   itemsr   
weight_mapsafetensorsr   	get_slice
contiguous
empty_likecopy_no_graddataneed_initializeemptyr   init_fnweight_init_fntor   r   r   r   rc   r   )r.   r)   r*   r+   tp_ranknew_parameterstied_parametersr   rU   r   	new_paramr   fptensor_slicesource_index
load_indextensorweightr   r   r   r   r@   r&   )r   r   r   rf   r   r'   r/     s   




zInitializeOrLoadWeightsPass.runNr7   r}   r&   r&   r&   r'   r     s    r   r    PassPipelinec                   C  s   t t t t t gS )a8  
    Ensemble a pass pipeline which contains the following passes:
        1. `ParallelAxisSolverPass` to find a parallelization solution of tensors in the graph.
        2. `ParallelLayerAnnotatePass` to annotate parallelized layers according to the solution found in the first step.
        3. `ParallelLinearReplacePass` to do the actual replacement and modification of hard-coded attributes.
        4. `InitializeOrLoadWeightsPass` to load or initialize weights for parameters.

    Returns:
        PassPipeline: the pipeline used for automatic parallelism.
    )r   rS   rq   r~   r   r&   r&   r&   r'   build_parallel_pass_pipeline  s   r   c                   @  s:   e Zd ZdZg fdddZdd	 ZdddZdddZdS )r   z
    `PassPipeline` ensembles a list of passes and execute them one by one as provided in the list,
    it can be iterated and appended after initialization for flexibility.
    passesList[PassBase]r    rK   c                 C  s
   || _ d S r"   )_passes)r.   r   r&   r&   r'   __init__4  s   
zPassPipeline.__init__c                 C  s
   | j  S r"   )r   __iter__)r.   r&   r&   r'   r   7  s   
zPassPipeline.__iter__PASSr   c                 C  s   | j | d S r"   )r   r   )r.   r   r&   r&   r'   r   <  s   zPassPipeline.appendr)   r	   r*   r   r+   r   c                 C  sD   | j D ]	}||||d}q|jr | j D ]}t|tr|| q|S )N)r)   r*   r+   )r   clean_markers_after_all_passesr   r=   rR   )r.   r)   r*   r+   r   r&   r&   r'   r5   ?  s   



zPassPipeline.__call__N)r   r   r    rK   )r   r   r    rK   r7   )r#   r8   r9   r:   r   r   r   r5   r&   r&   r&   r'   r   .  s    
)r    r   )1
__future__r   abcr   r   typingr   r   r   r   torch.distributeddistributedr   torch.nnr   torch.fxr   r	   r
   corer   r   r   decompr   r   op_registryr   r   parallel_layersr   r   r   r   r   utilsr   r   r   r   r   r   r=   rS   rq   r~   r   r   r   r&   r&   r&   r'   <module>   s,   	'<Y: 
e