
    zj                         d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlm	Z	m
Z
 ej        j        Z G d d          Z G d d	e          Z G d
 de          Z G d de          Z G d de          ZdS )    N)unique_name)wait_server_ready)core)default_main_programdefault_startup_programc                   R    e Zd ZdZd Zd Zd Zd Z	 ddZd Z	d	 Z
d
 Zd Zd ZdS )
Collective c                     || _         d | _        d | _        d | _        d | _        d | _        d | _        d | _        t          j	        }|
                                | _        |                                | _        d S N)nrings	endpointscurrent_endpointother_endpointsnranksrankstartup_programmain_programr   op_proto_and_checker_makerkOpRoleAttrNameop_role_keykOpRoleVarAttrNameop_role_var_key)selfr   op_makers      x/lsinfo/ai/hellotax_ai/data_center/backend/venv/lib/python3.11/site-packages/paddle/distributed/transpiler/collective.py__init__zCollective.__init__   st     $#	# 2#3355'::<<    c                 B   t          |t                    r|                    d          }|| _        |t	                      | _        || _        |t                      | _        t          |          | _        | j        dk    r%| j	        dk    r| j	        dk    rt          d          |dk     rt          d          || _        ||vrt          d|t          |                    || _        || _        |r5t          |          }|d d          }|                    |           || _        || _        | j                                        | j        _        |                                  | j                                        | j        _        |                                  d S )	N,   single_process_multi_threadboxz the number of endpoints must > 1r   zrank must >= 0z current endpoint %s is not in %s)
isinstancestrsplitr   r   r   r   lenr   mode
ValueErrorr   r   r   remover   	wait_portclone_origin_program_transpile_startup_program_transpile_main_program)	r   r   r   r   r   r   r+   r   r   s	            r   	transpilezCollective.transpile,   s    i%% 	-!,,I."#:#<#<D ( 4 6 6D)nnK1	:::	U""?@@@!88-...	9,,2 I   # 0 	3^^F'lO""#3444#2D "/3/C/I/I/K/K,''))),0,=,C,C,E,E)$$&&&&&r   c                      t          d          )Nz'call the inherited method of subclasses)NotImplementedErrorr   s    r   r/   z"Collective._transpile_main_programe   s    !"KLLLr   c           	          t          | j                  D ]5}|                     | j        | j        | j        | j        || j                   6|                                  d S r   )	ranger   _init_communicatorr   r   r   r   r+   _broadcast_params)r   ring_ids     r   r.   z%Collective._transpile_startup_programh   sp    T[)) 	 	G##$%	    	     r   Fc                 $   d                     |          }t          |          }	|d d          }
|
                    |           |                                }|dk    r|rt	          |
           |                                }t          j                    r|                    t          j	        d          dt
          j
        j        j                  }|                    di d|id|d	|d
|
| j        t          j        i           |s4|                    dd|ii d|	d|d|| j        t          j        i           d S |                    dd|ii d|	d|d|| j        t          j        i           d S t          j                    r|                    t          j	        d          dt
          j
        j        j                  }|                    di d|id|d	|d
|
| j        t          j        i           |                    dd|ii d|	d|d|d|| j        t          j        i           d S t$          j                                        j        t$          j                                        v r|                    t          j	        d          dt
          j
        j        j                  }|                    di d|id|d	|d
|
| j        t          j        i           |                    dd|ii d|	d|d|d|| j        t          j        i           d S d S )Nr    r   nccl_idT)namepersistabletypec_gen_nccl_idOutr   endpointr   r=   inputsoutputsattrsc_comm_initXr   r8   c_comm_init_multitrainer	ntrainers
trainer_idbkcl_idc_gen_bkcl_idr   xccl_idc_gen_xccl_id)joinr'   r*   global_blockr   r   is_compiled_with_cuda
create_varr   generateVarDescVarTypeRAW	append_opr   OpRoleForwardis_compiled_with_xpupaddledistributedParallelEnvdevice_typedeviceget_all_custom_device_type)r   programr   r   r   r8   r+   has_multitrainerendpoints_strr   r   blocknccl_id_varbkcl_id_varxccl_id_vars                  r   r6   zCollective._init_communicatort   s    ++Y#AAA,/000$$&&1999o...$$&&%'' f	** ))44 \)- +  K
 OO$,D 0%$fn		  
 
 
 $ &- &!7(&.		   
 
 
 
 
 3-#V$d!7(&.		   
 
 
 
 
 &(( <	** ))44 \)- +  K
 OO$,D 0%$fn		  
 
 
 OO"[)fDw$fn	       **,,8}7799: :  ** ))44 \)- +  K
 OO$,D 0%$fn		  
 
 
 OO"[)fDw$fn	      ': :r   c                    | j                                         }d}|                                D ]I}|j        r
|dz   | j        z  }|                    dd|id|id|dd| j        t          j        i	           Jt          | j                  D ]2}|                    d
d|id|id|| j        t          j        i	           3d S )Nr!   	broadcastxoutr8   rootr   rA   c_sync_comm_streamrF   r?   )
r   rO   iter_parametersis_distributedr   rV   r   rW   rX   r5   )r   rc   r8   params       r   r7   zCollective._broadcast_params   s   $1133**,, 	 	E# {dk1GOO U|wA$fn	  	 	 	 	 T[)) 	 	GOO)U| '4+;V^L	     	 	r   c                     | j         |j        vrdS t          |                                | j                            }|t          t          j                  z  o|t          t          j                  z  S )NF)r   
attr_namesint	all_attrsrW   BackwardLoss)r   opop_roles      r   _is_loss_grad_opzCollective._is_loss_grad_op  s]    2=005bllnnT%5677V_---L'C<L<L2LLr   c                     | j         |j        v oEt          |                                | j                            t          t          j                  z  S r   )r   rr   rs   rt   rW   ru   r   rw   s     r   _is_backward_opzCollective._is_backward_op  J    2=0 !SLLNN4+,6
 6
  6! 	!r   c                 8    d|j         v od|j         v od|j         v S )NParamGradLearningRate)input_namesr{   s     r   _is_update_opzCollective._is_update_op  s/    r~% 1".(1".0	
r   c                     | j         |j        v oEt          |                                | j                            t          t          j                  z  S r   )r   rr   rs   rt   rW   Optimizer{   s     r   _is_optimizer_opzCollective._is_optimizer_op  r}   r   N)F)__name__
__module____qualname____doc__r   r0   r/   r.   r6   r7   ry   r|   r   r    r   r   r	   r	      s        G= = =7' 7' 7'rM M M
! 
! 
!( z z z zx  6M M M! ! !

 
 
! ! ! ! !r   r	   c                   ,    e Zd ZdZddZd Zd Zd ZdS )	GradAllReducer
      c                 J    t                               | |           d| _        d S )Ngrad_allreduce)r	   r   r(   r   r   s     r   r   zGradAllReduce.__init__&  s#    D&)))$			r   c                 V    |                                   |                                  d S r   )_insert_scale_loss_grad_ops_insert_allreduce_opsr3   s    r   r/   z%GradAllReduce._transpile_main_program*  s,    ((***""$$$$$r   c                 r   | j                                         }t          t          t	          |j                                      D ]n\  }}|                     |          rT|j        |j        d                  }|	                    |dz   dd|id|idd| j
        z  | j        t          j        i           odS )	
        In order to keep the learning rate consistent in different numbers of
        training workers, we scale the loss grad by the number of workers
        r   r!   scalerF   r?         ?rA   N)r   rO   reversedlist	enumerateopsry   varsoutput_arg_names
_insert_opr   r   rW   ru   )r   rc   idxrw   loss_grad_vars        r   r   z)GradAllReduce._insert_scale_loss_grad_ops.  s    
 !..00Yuy%9%9 : :;; 	 	GC$$R((  %
2+>q+A B  !G /"M2t{!2(&/ ! 	 	 		 	r   c                 \   | j                                         }d}d }t          t          t	          |j                                      D ]]\  }}|                     |          rA| j        |j        v r2|	                                | j                 }t          |          dk    r^t          |          dz  dk    sJ |}t          dt          |          d          D ]}|j        ||                  }	|j        ||dz                     }|	j        r3||k    r9|dz  }|                    |dd|id|i| j        t           j        i           |dz  }|dz   | j        z  }|                    |d	d
|id|id|dt&          j        j        j        | j        t           j        i           ǐ_|d S t	          |j                  D ]h\  }}|                     |          rNt          | j                  D ]6}|                    ||z   dd|id|id|| j        t           j        i           7 d S id S )Nrh   r   r   r!   c_sync_calc_streamrF   r?   rA   
all_reducerj   rk   r8   reduce_typerm   )r   rO   r   r   r   r   r|   r   rr   rt   r'   r5   r   ro   r   r   rW   ru   r   rZ   r[   ReduceOpSUMr   )
r   rc   r8   gradr   rw   op_role_varoffsetirp   s
             r   r   z#GradAllReduce._insert_allreduce_opsB  s   !..00Yuy%9%9 : :;; *	 *	GC$$R(()(BM99 llnnT-AB{##q((;''!+q0000q#k"2"2A66  A!J{1~6E :k!a%&89D+ ! }}!(("!5$';%*DM#'#3V_"E )    !  '{dk9G$$) #T{!&%w)6+=+F+J ,fo % 
 
 
 
 <F ++ 	 	GC$$R(( $T[11 
 
G$$g1 #T{!&%w ,fo % 	 	 	 	 	 	r   Nr   )r   r   r   r   r   r/   r   r   r   r   r   r   r   #  s_        G% % % %% % %  (@ @ @ @ @r   r   c                   ,    e Zd ZdZddZd Zd Zd ZdS )	LocalSGDr
   r   c                 X    t                               | |           d| _        d| _        d S )Nz	@SNAPSHOT	local_sgd)r	   r   snapshot_keyr(   r   s     r   r   zLocalSGD.__init__  s+    D&)))'			r   c                    t                               |            | j                                        }g }|                                D ]}|j        s|                    |           |D ]h}|                    |                     |j	                  |j
        dd          }|                    dd|gid|gi| j        t          j        i           id S )NT)r;   shaper<   stop_gradientassignrF   r?   rA   )r	   r.   r   rO   rn   ro   appendrQ   snapshot_namer;   r   rV   r   rW   rX   )r   rc   non_dist_paramsrp   snapshots        r   r.   z#LocalSGD._transpile_startup_program  s    --d333$1133**,, 	. 	.E' .&&u---$ 	 	E''''
33k "	 (  H OOeW~
+'8	     	 	r   c                     || j         z   S r   )r   )r   
param_names     r   r   zLocalSGD.snapshot_name  s    D---r   c                 f   | j                                         }g }d}t          t          t	          |j                                      D ]^\  }}|                     |          rB|j        |                    d          d                  }|j	        rJ|
                    |                     |j                  |j        dd|j                  }|                    |dz   d|g|gdd	|gi| j        t"          j        i
           |                    |dz   dd|id	|i| j        t"          j        i
           |dz   | j        z  }|                    |dz   dd|gid|gid|dt(          j        j        j        | j        t"          j        i
           |                    ||f           `t3          | j                  D ]2}|                    dd|id	|id|| j        t"          j        i
           3t          |          D ]}|d         }|d         }|                    dd|gid	|gidd| j        z  | j        t"          j        i
           |                    d|g|gdd	|gi| j        t"          j        i
           |                    dd|gid	|gi| j        t"          j        i
           d S )Nrh   r   r   T)r;   r   r<   r   dtyper!   elementwise_sub)rF   Yr?   rA   r   r   rF      r   rj   rk   r8   r   rm   r   r   r   )r   rO   r   r   r   r   r   r   inputro   rQ   r   r;   r   r   r   r   rW   r   r   rZ   r[   r   r   r   r5   rV   r   )	r   rc   ordered_param_snapshotr8   r   rw   rp   r   param_snapshots	            r   r/   z LocalSGD._transpile_main_program  sV   !..00!#Yuy%9%9 : :;; )	A )	AGC!!"%% (A
288G#4#4Q#78'  ++++EJ77+ $"&+ ,     !G*"*5'::"UG,+V_= !      !G-<"EN+V_= !    #Q;$+5  !G%%>"UG,!7%v'9'B'F(&/ ! 
 
 
 '--uh.?@@@T[)) 	 	GOO)U| '4+;V_M	      ''=>> 	 	N"1%E%a(HOOeW~(S4;.$fo	     OO&&Zug66('9	     OOeW~
+'9	     %	 	r   Nr   )r   r   r   r   r   r.   r   r/   r   r   r   r   r     s_        G       
  .. . .N N N N Nr   r   c                   6    e Zd ZdZd Zd Zd Zd Zd Zd Z	dS )	SingleProcessMultiThreadz*
    single process multi thread mode
    c                 Z   t                               | d           d| _        t          t	          j        dd                    | _        t          t	          j        dd                    | _        t          t	          j        dd          	                    d                    | _
        d S )	Nr!   r"   PADDLE_FUSE_ALLREDUCE1PADDLE_LOSS_SCALEFLAGS_selected_gpusz0,1,2,3,4,5,6,7r    )r   r   r(   rs   osgetenvfuse_allreduce
loss_scaler'   r&   gpu_numsr3   s    r   r   z!SingleProcessMultiThread.__init__  s    tQ'''1	!"),CS"I"IJJbi(;SAABBI+->??EEcJJ
 
r   c           
      j   d}t          | j                  dk    rt          d | j        D                       }|dk    r|| _        t          d           t          d| j                   t          d| j                   t          d| j         d| j                    t          | j                  D ]6}|                     | j	        | j        | j        | j        || j
        d	           7d S d| _        t          d
           | j	                                        }|                    dddi           d S )Nr   r!   c                 D    h | ]}|                     d           d         S ):r   )r&   ).0rj   s     r   	<setcomp>zFSingleProcessMultiThread._transpile_startup_program.<locals>.<setcomp>	  s&    EEEQWWS\\!_EEEr   2begin to _transpile_startup_program for multi-nodecurrent_endpoint: total endpoints: rank: , ring_id: T3begin to _transpile_startup_program for single-nodecomm_init_allr8   r=   rD   )r'   r   r   printr   r   r   r5   r6   r   r+   rO   rV   )r   	nodes_numr8   rc   s       r   r.   z3SingleProcessMultiThread._transpile_startup_program  sP   	t~""EEdnEEEFFIq==#DKFGGG&(=>>>%t~666>49>>>>??? -- 	 	''()NIN   	 	 DKGHHH(5577EOOAOGGGGGr   c                 *   |                                  }| j        dk    r|dk    rd S | j        r|                     |           |dk    rd S | j        dk    r(t	          d|            |                                  d S |                                  d S )Nr   z(begin used fuse_allreduce param count = )_get_update_param_countr   r   r   r   _insert_fuse_allreduce_opsr   )r   	param_cnts     r   r/   z0SingleProcessMultiThread._transpile_main_program!  s    0022	?aINNF? 	8,,Y777>>F""HYHHIII++-----&&(((((r   c                    d}| j                                         }t          t          t	          |j                                      D ]\  }}|                     |          s| j        |j        vr*|	                                | j                 }t          |          dk    r]t          |          dz  dk    sJ t          dt          |          d          D ]"}|j        ||                  }|j        r|dz   }#|S )z-
        get need update param count
        r   r   r!   )r   rO   r   r   r   r   r|   r   rr   rt   r'   r5   r   ro   )r   param_countrc   r   rw   r   r   rp   s           r   r   z0SingleProcessMultiThread._get_update_param_count4  s    !..00Yuy%9%9 : :;; 	. 	.GC''++ #2=88,,..)=>K;1$${##a'1,,,,1c+..22 . .
;q>2' )Ao	. r   c                    |dk    rd| j         z  | j        z  }n
d| j        z  }t          d|            | j                                        }t          t          t          |j                                      D ]g\  }}| 	                    |          s|j
        |j        d                  }|                    |dz   dd|id|id|| j        t          j        i           hd	S )
r   r   r   z*begin _insert_scale_loss_grad_ops scale = r!   r   rF   r?   rA   N)r   r   r   r   rO   r   r   r   r   ry   r   r   r   r   rW   ru   )r   r   r   rc   r   rw   r   s          r   r   z4SingleProcessMultiThread._insert_scale_loss_grad_opsL  s
   
 q==$+%5EE$-'EB5BBCCC!..00Yuy%9%9 : :;; 
	 
	GC((,, !Jr':1'=>Ma]+.t'7I     	
	 
	r   c                    | j                                         }d}d}g }d}t          t          t	          |j                                      D ]\  }}|                     |          r| j        |j        v r|	                                | j                 }t          |          dk    r[t          |          dz  dk    sJ |}	t          dt          |          d          D ]a}
|j        ||
                  }|j        ||
dz                     }|j        r3|	|k    r(|                    |           t          ||	dz             }b|dS | j        dk    r|                    |dd|d         id|d         i| j        t&          j        i	           |dz  }|dz   | j        z  }|                    |d
d|id|id|| j        t&          j        i	           |dz  }|                    |dd|d         id|d         id|| j        t&          j        i	           |dz  }dS |}|                    ddgdt.          j        j        j        d          }ddt.          j        j        j        d}|                    |dd|i||d|	           |dz  }|                    |dd|id|i| j        t&          j        i	           |dz  }|dz   | j        z  }|                    |dd|id|id|| j        t&          j        dt6          j        j        j        i	           |dz  }|                    |dd|id|id|| j        t&          j        i	           |dz  }dS );
        insert coalesce_tensor and all reduce ops
        rh   Nr   r   r!   r   rF   r?   rA   c_allreduce_xsumr8   rm   fused_outputFTr;   r   r<   r   r   )	copy_dataset_constantr   coalesce_tensorInputOutputFusedOutputr   rj   rk   r   )r   rO   r   r   r   r   r|   r   rr   rt   r'   r5   r   ro   r   maxr   r   r   rW   ru   r   rQ   r   rS   rT   FP32rZ   r[   r   r   )r   rc   r8   r   input_gradsglobal_offsetr   rw   r   r   r   rp   output_gradsr   coalesce_tensor_attrss                  r   r   z3SingleProcessMultiThread._insert_fuse_allreduce_opsc  s6    !..00Yuy%9%9 : :;; 	G 	GGC$$R((G(BM99 llnnT-AB{##q((;''!+q0000q#k"2"2A66 G GA!J{1~6E :k!a%&89D+ ! }}#**4000(+M6A:(F(F<F!##)[^,A/'9     QM{dk1G'[), '4+;V_M     QM)[^,A/ '4+;V_M     QMMM 'L ++#c!l*/" ,  L " %-2% %!
 &-#/MM+     QM)\*-'9     QM{dk1G!\*-w$fo!6#5#>#B  
 
 
 QM )\*- '4+;V_M     QMMMr   N)
r   r   r   r   r   r.   r/   r   r   r   r   r   r   r   r     s         
 
 
H H H6) ) )&  0  .r r r r rr   r   c                   8    e Zd ZdZddZd Zd Zd Zd Zd	 Z	d
S )MultiThreadr
   r!   fuse_all_reducec                     t                               | |           d| _        || _        d| _        t          j        dd                              d          }t          |          | _	        d S )Nr#      r   z0,1,2,3,4,5,6,7,8r    )
r   r   r(   
trans_modefuse_grad_size_in_numr   r   r&   r'   gpu_num)r   r   r   r   s       r   r   zMultiThread.__init__  sg    tV,,,	$%("924GHHNN
 
 8}}r   c                 &   t          | j                  dk    rt          d           t          d| j                   t          d| j                   t          d| j         d| j                    t          | j                  D ]6}|                     | j        | j        | j        | j        || j	        d           7d S d| j
        v rt          d	           | j                                        }|                    d
t          t          t          t!          j        d                              d                              dd           d S t          d           | j                                        }|                    d
ddi           d S )Nr!   r   r   r   r   r   Txpuz:begin to _transpile_startup_program for single-node in XPUr   r   r    r   )devicesr8   r   r   r8   )r'   r   r   r   r   r   r5   r6   r   r+   r   rO   rV   r   maprs   r   r   r&   )r   r8   rc   s      r   r.   z&MultiThread._transpile_startup_program  s   t~""FGGG&(=>>>%t~666>49>>>>??? -- 	 	''()NIN   	 	 ''P   ,99;;(#' #RY/D%E%E%K%KC%P%P $ $
 $%    
 
 
 
 
 KLLL,99;;_YNKKKKKr   c                 ,   |                                   | j        dk    rMt          d           | j        | j        z  | _        |                                  |                                  d S | j        dk    r%t          d           |                                  d S | j        dk    rIt          t          j        d                              d                    dk    rt          d	           d S t          d
           |                                  d S )N
all_gatherz%begin to transpile in all-gather moder   z*begin to transpile in fuse all-reduce modeall_reduce_xpur   r    r!   zHskip transpile in all-reduce-xpu mode when number of devices is only onez%begin to transpile in all-reduce mode)r   r   r   r   r   allgather_ranks_insert_allgather_ops_update_adam_opsr   r'   r   r   r&   r   r3   s    r   r/   z#MultiThread._transpile_main_program  s"   ((***?l**9:::#';#=D &&(((!!#####_ 111>???++-----O///BI344::3??@@AEEZ     9:::&&(((((r   c                    | j                                         }d}d}t          t          t	          |j                                      D ]\  }}|                     |          r| j        |j        v rv|	                                | j                 }t          |          dk    r^t          |          dz  dk    sJ |}t          dt          |          d          D ]	}|j        ||                  }	|                    ||         dz   | j        gt          |	j                  dt           j        j        j        d          }
|j        ||d	z                     }|	j        r||k    r9|d	z  }|                    |d
d|id|i| j        t.          j        i           |d	z  }|d	z   | j        z  }|                    |dd|id|
id| j        d|| j        t.          j        i           |dS t	          |j                  D ]h\  }}|                     |          rNt          | j                  D ]6}|                    ||z   dd|id|id|| j        t.          j        i           7 dS idS )z9
        insert allgather op to the main_program
        rh   Nr   r   
_allgatherFTr   r!   r   rF   r?   rA   r  rj   rk   r   r8   rm   )r   rO   r   r   r   r   r|   r   rr   rt   r'   r5   r   rQ   r  r   r   rS   rT   r   ro   r   r   rW   ru   r   r   )r   rc   r8   r   r   rw   r   r   r   rp   new_grad_vars              r   r  z!MultiThread._insert_allgather_ops!  s    !..00Yuy%9%9 : :;; 0	 0	GC$$R((/(BM99 llnnT-AB{##q((;''!+q0000q#k"2"2A66 % %A!J{1~6E#(#3#3(^l:#3Hd5;6G6GH$)"l27&* $4 $ $L !:k!a%&89D+ ! }}!(("!5$';%*DM#'#3V_"E )    !  '{dk9G$$) #T{!& 5$d&:%w ,fo % 
 
 
 
 <F ++ 	 	GC$$R(( $T[11 
 
G$$g1 #T{!&%w ,fo % 	 	 	 	 	 	r   c           
        	
 | j                                         t          t          t	          j                                      D ]#\  }	|                     	          r|}	j        dk    r	j        dk    r5	                    d          d         
j	        	                    d          d                  j	        	                    d          d                  j	        	                    d          d                  j	        	                    d          d                  j	        	                    d          d                  j	        	                    d	          d                  d
}j	        	
                    d          d                  j	        	
                    d          d                  j	        	
                    d          d                  j	        	
                    d          d                  j	        	
                    d          d                  d}	                    d          	                    d          	                    d          	                    d          	                    d          d}	
fdt          | j                  D             }                    |ddj	        	                    d          d         dz            id|i| j        dd           |dz  }t          | j                  D ]1}||         |d<                       |	j        |||           |dz  }2                    |           %d S )!zC
        remove the original adam op, and add new adam ops
        adamlambr   r   r   Moment1Moment2Beta1PowBeta2Pow)r   r   r  r  r  r  ParamOut
Moment1Out
Moment2OutBeta1PowOutBeta2PowOut)r  r  r  r  r  epsilonbeta1beta2	lazy_modemin_row_size_to_use_multithread)r  r  r  r  r  c           	          g | ]l}                     d z   t          |          z   j                            d          d                  j        dt
          j        j        j        d          mS )_r   r   FTr   )	rQ   r%   r   r   r   r   rS   rT   r   )r   r   rc   rw   r   s     r   
<listcomp>z0MultiThread._update_adam_ops.<locals>.<listcomp>  s     	 	 	  $$'#-A6#j'):):1)=>D$)"l27&* %  	 	 	r   r&   rF   r  r?   )numaxisrA   r!   r   N)r   rO   r   r   r   r   r   r=   r   r   outputattrr5   r  r   
_remove_op)r   r   r   rB   rC   rD   
split_varsr   rc   rw   r   s           @@@r   r	  zMultiThread._update_adam_opsl  sj    !..00Yuy%9%9 : :;; @	) @	)GC$$R(( ?)Gv%%"'V*;*;XXg..q1
"Z(9(9!(<=$)Jrxx/G/G/J$K$z"((9*=*=a*@A$z"((9*=*=a*@A %
288J+?+?+B C %
288J+?+?+B C  !&
299Z+@+@+C D"'*RYY|-D-DQ-G"H"'*RYY|-D-DQ-G"H#(:bii.F.Fq.I#J#(:bii.F.Fq.I#J   "wwy11WWW--WWW--!#!5!579ww98 8 	 	 	 	 	 	 #4#788	 	 	
    UZ(9(9!(<|(KL #J/"&"6BB !    !t344 	  	 A%/]F6N$$W% '# %    aKFF  (((A@	) @	)r   c                    | j                                         }d| j        z  }d}g }t          |j                  D ]}|                     |          r| j        |j        v r|                                | j                 }t          |          dk    rXt          |          dz  dk    s
J d            t          dt          |          d          D ]\}||         }|                    |          }	||dz            }
|                    |
          }|	j        rG|                    |           ]|dS g }d}|D ]w}t          |          dk    s)t          |d                   | j        k    s|j        |k    r|                    |g           |j        }\|d                             |           xg }t!          |j                  D ]\  }}|                     |          r|D ]}|                    t'          j        d|d         j                   |d         j        dd	
          }|                    |           |                    |dd|i||ddd	dd	d|d         j        | j        t0          j        i            nt!          |j                  D ]\  }}|                     |          r}|D ]x}|                    |dd|id|id|dt4          j        j        j        | j        t0          j        i           |                    |dd|id|i| j        t0          j        i           y nt          |          dk    r|                                 dS t!          |j                  D ]Y\  }}|                     |          r?|                    |dd|d         id|d         id|| j        t0          j        i            nZ|                                 dS )r   r   Nr   zRvars need to be one param var followed by one grad var, but got odd number of varsr!   rh   FusedOutput_FT)r;   r   r<   r   r   r   r   r   	use_alignr   rA   r   rj   rk   r8   r   r   rF   r?   rm   )r   rO   r   r   r   r|   r   rr   rt   r'   r5   varro   r   r   r   r   r   rQ   r   rR   r;   r   r   rW   ru   rZ   r[   r   r   _sync_with_cpp)r   rc   r8   r   param_gradsrw   r   r   r   rp   	grad_namesegments
last_dtyper*  
fused_varsr   segmenttmp_var	fused_vars                      r   r   z&MultiThread._insert_fuse_allreduce_ops  s    !..00dk/59%% 	- 	-B$$R((-(BM99 llnnT-AB{##q((;''!+q0001 100 q#k"2"2A66 - -A!,QJ!IIj11E +AE 2I 99Y//D+ ! &&t,,,,<F
 		) 		)CH""x|$$(BBB9
**&&& Y

##C((((
 ++ 	 	GC$$R(( '  G#..(1<71:?<<  &aj.$)&* /  G %%g...$$. '1+27 K K''#WQZ%5 ,fo	 %     16 !++ 	 	GC$$R(( !+  I$$) #Y/!&	 2%w)6+=+F+J ,fo % 
 
 
 $$1 #Y/!&	 2#/A %     ), z??a  """F !++ 	 	GC$$R((   -A/"JqM2!7(&/ ! 	 	 	  	r   N)r!   r   )
r   r   r   r   r   r.   r/   r  r	  r   r   r   r   r   r     s        G% % % %%L %L %LN) ) )*I I IVF) F) F)Pt t t t tr   r   )r   rZ   paddle.baser   5paddle.distributed.fleet.base.private_helper_functionr   paddle.frameworkr   paddle.staticr   r   r   rW   r	   r   r   r   r   r   r   r   <module>r8     s   
			  # # # # # #      " ! ! ! ! ! G G G G G G G G		(	/D! D! D! D! D! D! D! D!N_ _ _ _ _J _ _ _Dp p p p pz p p pf] ] ] ] ]} ] ] ]@P P P P P- P P P P Pr   