
    zjG                        d dl Z d dlZd dlmZ d dlmZmZ d dlZd dl	Z	d dl
mZmZ d dlmZmZmZmZ g Z G d d          ZdS )    N)defaultdict)
cmp_to_keyreduce)coreunique_name)	ParameterProgramdefault_startup_programin_dygraph_modec                      e Zd ZdZd0dZd Zd Zd Zd Zd	 Z	d
 Z
d Zd Zd Zd Zd Zd Zd Zd Zd1dZd Zd Zd Zd Zd Zd Zd Zd Zd Zd Z	 d2d Zd! Z	 d1d"Z d# Z!d$ Z"d% Z#d& Z$d' Z%d( Z&d) Z'd* Z(d+ Z)d, Z*d- Z+d. Z,	 d3d/Z-dS )4PipelineOptimizera
  
        :api_attr: Static Graph

    Pipeline Optimizer: Make a program to run as pipeline, that is splitting a
    program into multiple sections (sub-programs) and each section run on a
    device to enable the training of large scale models and the use of
    heterogeneous devices. Meanwhile, all sections run in the stype of pipeline.

    Args:
        optimizer (Optimizer): The optimizer to use, such as SGD.
        num_microbatches (int): Number of microbatches. [Optional. Default:1].
        start_cpu_core_id (int): The first cpu core id to use. [Optional. Default:0].

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.base as base
            >>> import paddle.base.layers as layers
            >>> import numpy as np

            >>> paddle.enable_static()
            >>> with base.device_guard("gpu:0"):
            ...     x = paddle.static.data(name='x', shape=[-1, 1], dtype='int64')
            ...     y = paddle.static.data(name='y', shape=[-1, 1], dtype='int64')
            ...     data_loader = base.io.DataLoader.from_generator(
            ...         feed_list=[x, y],
            ...         capacity=64,
            ...         use_double_buffer=True,
            ...         iterable=False)

            ...     emb_x = layers.embedding(input=x, param_attr=base.ParamAttr(name="embx"), size=[10,2], is_sparse=False)
            ...     emb_y = layers.embedding(input=y, param_attr=base.ParamAttr(name="emby",learning_rate=0.9), size=[10,2], is_sparse=False)

            >>> with base.device_guard("gpu:1"):
            ...     concat = layers.concat([emb_x, emb_y], axis=1)
            ...     fc = paddle.static.nn.fc(x=concat, name="fc", size=1, num_flatten_dims=1, bias_attr=False)
            ...     loss = paddle.mean(fc)
            >>> optimizer = paddle.optimizer.SGD(learning_rate=0.5)
            >>> optimizer = paddle.incubate.optimizer.PipelineOptimizer(optimizer)
            >>> optimizer.minimize(loss)

            >>> def train_reader():
            ...     for _ in range(4):
            ...         x = np.random.random(size=[1]).astype('int64')
            ...         y = np.random.random(size=[1]).astype('int64')
            ...         yield x, y
            >>> data_loader.set_sample_generator(train_reader, batch_size=1)

            >>> place = paddle.CUDAPlace(0)
            >>> exe = paddle.static.Executor(place)
            >>> exe.run(paddle.static.default_startup_program())
            >>> batch_size = 1
            >>> data_loader.start()
            >>> exe.train_from_dataset(
            ...         paddle.static.default_main_program())
            >>> data_loader.reset()
       r   c                 R   d| _         t          j                    rd| _         t                      rt	          d          t
          j        j        t
          j        j	        j
        j        f}t          ||          s#t          d| dt          |           d          || _        | j        | _        t#          | j        d          r&| j        j        | _        t#          | j        d          &|dk    s
J d	            || _        |d
k    s
J d            || _        d | _        t          j        }|j        | _        |                                | _        |                                | _        |                                | _        d | _        g | _         i | _!        d | _"        d | _#        d S )Ncpugpuz,In dygraph, don't support PipelineOptimizer.zGThe 'optimizer' parameter for PipelineOptimizer must be an instance of z, but the given type is .	inner_optr   z*num_microbatches must be a positive value.r   z1start_cpu_core_id must be a non-negative integer.)$_devicer   is_compiled_with_cudar   	Exceptionpaddle	optimizer	Optimizerstaticamp	decoratorOptimizerWithMixedPrecision
isinstance
ValueErrortype
_optimizer_origin_optimizerhasattrr   _num_microbatches_start_cpu_core_id_place_listop_proto_and_checker_makerOpRole_op_rolekOpRoleAttrName_op_role_keykOpRoleVarAttrName_op_role_var_keykOpDeviceAttrName_op_device_key_param_device_map_pipeline_pair_pp_ring_mapoutput_var_to_opinput_var_to_op)selfr   num_microbatchesstart_cpu_core_idvalid_optimizersop_makers         r/lsinfo/ai/hellotax_ai/data_center/backend/venv/lib/python3.11/site-packages/paddle/incubate/optimizer/pipeline.py__init__zPipelineOptimizer.__init__^   s   %'' 	! DL 	LJKKK&M'C
 )%566 	P#P P=A)__P P P  
 $ "&d,k:: 	F%)%;%ED" d,k:: 	F  1$$$8 %$$ "2 A%%%? &%% #42 $4466 ( ; ; = =&88::!%  $#    c                 \   |j         |         }|j                                        d         }|                    |          }d}|j        dk    rxt          j        |dz             }|                    |dgd          }|                    |dz   |z   dd|id	|id
|j	        d|j	        | j
        | j        j        i           |dz  }|                    |dz   |z   dd|j        dk    r|n|id|j        dk    r|n|id| j        | j
        | j        j        d|j        dk    rt          j        j        j        nt          j        j        j        i           |dz  }|j        dk    rH|                    |dz   |z   dd|id	|id
|j	        d|j	        | j
        | j        j        i           |dz  }|S )zj
        Insert allreduce op to sync global information for global
        gradient clip and amp.
        r   
reduce_any_cast_int32r   int32)nameshapedtypecastXOutin_dtype	out_dtyper    inputsoutputsattrs
all_reducexoutring_idreduce_type)opsdescoutput_arg_namesvarr    r   generate
create_var
_insert_oprC   r+   r)   Optimizeglobal_ring_idr   distributedReduceOpMAXSUM)	r5   op_idxblockopout_nameout_varoffsettemp_var_nametemp_vars	            r:   _insert_allreduce_opz&PipelineOptimizer._insert_allreduce_op   s   
 Yv7++--a0))H%%7l""'0M1IJJM''"1#W (  H 
V#W~)%t}'=  
 
 
 aKFQJRW%<%<'J<(?(?HHWM4.!4=#9w,.. &/33+48 	 	
 	
 	
 	!7l""
V#X(%t}'=  
 
 
 aKFr<   c                 	   t                      }d}d}|j                                        }|||z   k     r=d}|j        |         }g }	|j        dk    r|                     |          rd}nJ|j        dk    ry|                     |          rd|j                            d          D ],}
|                    |
          r|	                    |
           -|j        	                    d|	           n|j        dk    r|j                            d          D ],}
|                    |
          r|	                    |
           -|j        	                    d|	           |j        
                    d|	           n<|j        d	k    r|j                            d          D ],}
|                    |
          r|	                    |
           -|j        	                    d|	           |j        
                    d|	           t          |	          dk    r|                    |           |d
z  }n|j        dk    ry|                     |          rd|j                            d          D ],}
|                    |
          r|	                    |
           -|j        	                    d|	           d}|j                                        |j                                        z   }|D ]J}||v sd|v r|                    |           |                    t#          |                    rD|                    t#          |                    }|j        t&          j        j        j        k    r2|                    |t&          j        j        j        |j                  }n}t3          |t4                    rR|                    |j        |j        |j        |j        |j        |j         |j!        |j"        |j#        |j$        
  
        }n|%                    |d          }| &                    ||           L|d
z  }| j'        s|s| (                    |d
z
  |          }||z  }||z  }|||z   k     =|)                                 d S )Nr   Fr>   TconcatrE   update_loss_scalingrF   check_finite_and_unscaler   sum_blocking_queue)rA   r    persistable)
rA   rB   rC   r    	lod_levelstop_gradient	trainableoptimize_attrregularizer
error_clip)*setrS   op_sizerR   r    _is_optimize_opinput_find_var_recursiveappend	set_input
set_outputlen
_remove_op_is_gradient_clip_opinput_arg_namesrT   addstr_var_recursiver   VarDescVarTypeREADERrW   rn   r   r   create_parameterrA   rB   rC   ro   rp   rq   rr   rs   rt   _clone_variable_clone_var_attruse_shardingrg   _sync_with_cpp)r5   r`   	ori_blockused_var_setadded_op_numr_   rv   should_insertra   
reserved_x
input_namevarsrU   
source_vardest_varinserted_opss                   r:   _create_varszPipelineOptimizer._create_vars   s   uu*$$&&w--- "M6"B Jw,&&4+?+?+C+C& $H$$)=)=b)A)A$"$'--"4"4 6 6J00<< 6"))*555!!#z2222111"$'--"4"4 6 6J00<< 6"))*555!!#z222""5*5555666"$'--"4"4 6 6J00<< 6"))*555!!#z222""5*555z??a''$$V,,,qLG ( E!!d&?&?&C&C!"$'--"4"4 6 6J00<< 6"))*555!!#z222 $7**,,rw/G/G/I/IID ; ; ,&&*;s*B*B  %%%,,SXX66 &55c#hh??
?dl&:&AAA$// !\18$.$:  0    HH
  
I66 H$55'_(.(.'_","6&0&>","6&0&>$.$:#-#8  6    HH  %44ZGGH$$Xz:::: aKF   44VaZGGLL(Ll"F] w---^ 	r<   c                     | j         |j        v sJ t          |                    | j                             }|t          | j        j                  z  o|t          | j        j                  z  S N)r+   
attr_namesintattrr)   BackwardLoss)r5   ra   op_roles      r:   _is_loss_grad_opz"PipelineOptimizer._is_loss_grad_op  sp     BM1111bggd/0011T]3444 
3MD
 D
 :
 	
r<   c                     | j         |j        v oAt          |                    | j                             t          | j        j                  k    S r   )r+   r   r   r   r)   Forwardr5   ra   s     r:   _is_forward_opz PipelineOptimizer._is_forward_op#  sE     BM1 
)**++s4=3H/I/II	
r<   c                     | j         |j        v o@t          |                    | j                             t          | j        j                  z  S r   )r+   r   r   r   r)   r   r   s     r:   _is_backward_opz!PipelineOptimizer._is_backward_op(  E     BM1 
)**++c$-2H.I.II	
r<   c                     | j         |j        v sJ t          |                    | j                             t          | j        j                  k    S r   )r+   r   r   r   r)   r   r   s     r:   _is_loss_opzPipelineOptimizer._is_loss_op-  sG     BM11112774,--..#dm6H2I2IIIr<   c                     | j         |j        v o@t          |                    | j                             t          | j        j                  z  S r   )r+   r   r   r   r)   rY   r   s     r:   rw   z!PipelineOptimizer._is_optimize_op1  r   r<   c                 8    d|j         v od|j         v od|j         v S )NParamGradLearningRate)input_namesr   s     r:   _is_update_opzPipelineOptimizer._is_update_op6  s/    r~% 3".(32>1	
r<   c                    t          t                    }|                    d          }|j        D ]}|                    | j                  }|| j         dk    rp|D ]l}||         }|j        }|                                j        	                                }	|	
                    |           |	                    | j        d           m||         }|j        }|                                j        	                                }	|	
                    |           |	                    | j        d           g }
|D ]3}||         }|                                 |
                    |           4|
S )a  
        Split a program into sections according to devices that ops run on.
        The op whose op_device attr is "gpu:all" is copied to all sections.

        Args:
            main_program (Program): the main program
            devices: all used devices
        r   :all )r   r	   r`   rR   r   r/   r   rS   global_block	append_op	copy_from	_set_attrr   rz   )r5   main_programdevicesdevice_program_mapr`   ra   deviceprogramop_descap_opprogram_listkeys               r:   _split_programz PipelineOptimizer._split_program=  s}    )11""1%%) 	9 	9BWWT011FDL.....% = =F08G gG#00227AACCEOOG,,,OOD$7<<<<= -V4',,..3==??((( 3R8888 	) 	)C(-G""$$$((((r<   c                 ~    d|v sd|v s
J d            |d|                     d                   }| j        |         }|S )a  
        For adam optimizer, it will add accumulators and initialize them
        with fill_constant, and force the op device to cpu. Hence, we should
        get the real op_device attribute of the fill_constant as the device
        where the corresponding parameters on.
        beta1_pow_accbeta2_pow_acczPFor accumulators for Adam, the name must contain beta1_pow_acc or beta2_pow_acc.r   _beta)indexr0   )r5   var_name
param_namer   s       r:   "_get_op_device_for_startup_programz4PipelineOptimizer._get_op_device_for_startup_programc  s\     (**o.I.I.I  /J.II a(.."9"99:
'
3r<   c                    |                                 }t                      }|j        D ]}|                    | j                  }|dk    r7|j        dk    s
J d            |j        d         }|                     |          }|r)t          |	                    d          d                   }nd }|r||k    r|j
        }	|                                 j
                                        }
|
                    |	           |
                    | j        d           |                                 |                     |                                 |           |S )Nr   fill_constantzcFor ops in startup program with the op_device attribute of cpu, they must be of type fill_constant.r   :r   r   )r   r	   rR   r   r/   r    rT   r   r   splitrS   r   r   r   r   r   )r5   startup_program	device_idr`   new_startup_programra   r   
output_vardevice_indexr   r   s              r:   _split_startup_programz(PipelineOptimizer._split_startup_programr  s`   ,,..%ii) 	5 	5BWWT011Fw/111B 211  03
@@LL "6<<#4#4Q#788  ,)33gG'4466;EEGGEOOG$$$OOD/4444**,,,-::<<eDDD""r<   c                     d|v r|                     dd          }d|v r|                     dd          }| j        |         }|dS d}t          |          D ]\  }}||k    r|} n|S )zM
        Find the post op that has variable named var_name as input.
        z
.cast_fp32r   
.cast_fp16N)replacer4   reversed)r5   r   r   post_ops	result_oppost_oppost_idxs          r:   _find_post_opzPipelineOptimizer._find_post_op  s    
 8##''b99H8##''b99H'14	!)(!3!3 	 	GX%#	   r<   c                 j    | j         |         }|dS d}t          |          D ]\  }}||k     r|} n|S )ze
        Find the previous op of op with index that outputs
        variable named var_name.
        N)r3   r   )r5   r   r   prev_opsr   prev_opprev_idxs          r:   _find_prev_opzPipelineOptimizer._find_prev_op  s_    
 (24	!)(!3!3 	 	GX%#	   r<   c                 ^    |                     ||           |                    ||           d S r   )_rename_input_rename_output)r5   ra   old_namenew_names       r:   _rename_argzPipelineOptimizer._rename_arg  s4    
8,,,
(H-----r<   Nc                     |                     ||j        ||j        n||j        |j        |j        |j        |j                                                  }| 	                    ||           |S )z
        Create a new var for block, which has the same type,
        shape and dtype as ref_var, then rename it with the
        name `name`.
        N)rA   rB   rC   r    ro   rn   is_dataneed_check_feed)
rW   rB   rC   r    ro   rn   r   rS   r   r   )r5   r`   ref_varrA   rC   new_vars         r:   _create_varzPipelineOptimizer._create_var  sw     ""-#(='--e'+O#L88:: # 	
 	
 	Wg...r<   c                 Z    |j         |_         t          |d          r|j        |_        d S d S )Nis_distributed)rp   r#   r   )r5   destsrcs      r:   r   z!PipelineOptimizer._clone_var_attr  s;     .3()) 	5"%"4D	5 	5r<   c                 r    |                     t          j                              }|dk    r
|d|         n|S )zD
        Strip the grad suffix from the given variable name
        N)findr   grad_var_suffix)r5   rA   poss      r:   _strip_grad_suffixz$PipelineOptimizer._strip_grad_suffix  s8     ii,..// BYYtDSDzzD0r<   c                 .    |t          j                    z   S )z?
        Append grad suffix to the given variable name
        )r   r   )r5   rA   s     r:   _append_grad_suffixz%PipelineOptimizer._append_grad_suffix  s     d*,,,,r<   c                     |                     | j                  r|                    | j                  nd}|r|dd         dk    s
J d            |S )z6
        Get the op_device attribute of a op.
        Nr      r   z<Now, only gpu devices are supported in pipeline parallelism.)has_attrr/   r   )r5   ra   r   s      r:   _get_op_device_attrz%PipelineOptimizer._get_op_device_attr  sl     {{4.//BGGD'((( 	
  	!A#;%'''N ('' r<   c                    t          | j        j                  }|                    | j                  |k    r%|                    | j        | j         d           dS |j        dk    r| 	                    |          r|j
                                        D ]}d|v s
J d            t          |j
                                                  dk    sJ |j
                                        d         }|                     ||          }|                    d          sJ |j         d|             |                    | j                  }|s
J d	            |                    | j        |           dS |j        d
k    s|j        dk    r| 	                    |          s|                     |          ri|                     ||j
                            d          d                   }	|                    | j        |	                    | j                             dS |j        dk    r|                     |          st          |j                  dk    rt          |j                  dk    sJ |j        d         }
|j        d         }d|v rK|                     ||          }|                    | j        |                    | j                             dS |                     ||j
                            d          d                   }	|                    | j        |	                    | j                             dS |                     |          rd}|j        ||z                                | j                  r(|j        ||z                                | j                  sU|dz  }|j        ||z                                | j                  -|j        ||z                                | j                  U|j        ||z                                | j                  }|s
J d            t-          |          D ]+}|j        ||z                                | j        |           ,dS |                     |          rz|j        d
k    ro|                    d          }t          |          dk    sJ |                     |d                   }| j        |         }|                    | j        |           dS |                     |          s|                     |          r| j        |j        v s
J d            |                    | j                  }t          |          dk    s
J d            |d         }| j        |         }|j        dk    s,|j        dk    s!|j        dk    s|j        dk    s|j        dk    r
| j         d}|                    | j        |           dS |j        dk    s|j        dk    rs|                    | j        | j         d           |                    | j        | j        j                   |j        d         }|                    |          }d|_        dS g d}|j        |v sJ d| d|j                     |                     |          sJ |                    | j        | j         d           dS )a
  
        Add op_device attribute for ops that have not that attribute set.
        We use "gpu:all" to represent the op should be put on all
        sub-programs, such as lr-related ops. Note that: "gpu:all"
        is only used by pipeline as an indicator.
        r   rl   z@RENAME@z3The op must be sum used to accumulate renamed vars.r   r   	op_devicez has no op_device attr for var z$The post op must have op_device set.rD   scalerE   memcpyz@Fetchz1Please put you program within device_guard scope.rF   zEgradient_clip and regularization ops must have op_role_var attribute.   zHop_role_var for gradient_clip regularization ops must have two elements.sqrtr   elementwise_maxelementwise_divalloc_float_statusclear_float_statusT)rj   r>   ri   rl   rk   r   z9For other ops without op_device set, they must be one of z, but it is N) r   r)   LRSchedr   r+   r   r/   r   r    r   rS   r   r}   rT   r   r   r   r   rx   rw   r   rR   rangeoutputr   r0   r   _is_regularization_opr-   r   rU   rn   )r5   ra   idxr`   lrsched_rolerA   rb   r   r   r   r   output_namerd   i	grad_namer   op_role_varfloat_status_namefloat_status_varother_known_opss                       r:   _add_op_device_attr_for_opz,PipelineOptimizer._add_op_device_attr_for_op  sZ    4=011774$%%55 LL,.C.C.CDDDDDW$"6"6r":"://11  !T)))I *))) rw//1122a7777w//11!4H((h77G##K00  <JJJJ 0 \\$"566FAAAAA6LL,f55555g27g#5#5  $$ $6(,(;(;B(?(? $6 ((bgmmC.@.@.CDDGLL,gll4;N.O.OPPPPPW  )=)=b)A)A  B&''1,,R5H1I1IQ1N1N1NN+A.J-a0K;&&,,S+>>'d6I)J)J     ,,S"'--2D2DQ2GHH'd6I)J)J     b!! F	EFif-66#  YsV|,11$2EFF ! if-66#  YsV|,11$2EFF YsV|,11$2EFFFNNNNN66]] J J	#'",,T-@&IIIIJ J!!"%% ;	E"'V*;*;		%((Iy>>Q&&&&001>>J+J7FLL,f55555&&r** 4	Ed.H.H.L.L 4	E (BM999J :99 ''$"788K{##q(((= )(( %QJ+J7F 5  7f$$7o--7///7/// L...LL,f55555W,,,;O0O0OLL,.C.C.CDDD LL*DM,ABBB " 3A 6$yy):;; ,0(((  O 7o--- 6E   g    .--
 ''+++++LL,.C.C.CDDDDDr<   c                 8   t          t          |j                            D ]w\  }}|j        dk    s|j        dk    s|j        dk    r$|                    | j        | j         d           J|                     |          r`|                     |||           xdS )zd
        Add op_device attribute for ops in block that have
        not that attribute set.
        create_py_readerreadcreate_double_buffer_readerr   N)		enumeratelistrR   r    r   r/   r   r   r  )r5   r`   r  ra   s       r:   _add_op_device_attrz%PipelineOptimizer._add_op_device_attrc  s    
 !ei11 	< 	<GC---7f$$7;;; T0T\2G2G2GHHH''++ ++BU;;;;!	< 	<r<   c           	         g }t          | j        j                  t          | j        j                  t          | j        j                  t          | j        j                  t          | j        j                  t          | j        j                  t          | j        j                  z  g}|j        D ]}|                    |j	                  sJ|j	        dk    r5|
                    | j                  t          | j        j                  k    s
J d            |                    | j                  sJ d|j	         d| j         d            |
                    | j                  }t          |          |v sJ d| d|j	         d|             |                    | j                  sJ d|j	         d| j         d            |
                    | j                  }|sJ d	|j	         d
            || j         dk    rS|                    d          d         }|dk    s
J d            ||vr|                    |           |S )z
        Check whether ops in a block have both the op_device and the
        op_role attributes set.
        Then, return all devices in order.
        conditional_blockz`Now, the only supported op without kernel is conditional_block, and its op role must be LRSched.zop (z	) has no z attribute.zop_role z for op z must be one of zop_device attribute for op z has not been set.r   r   r   r   z<Now only gpu devices are supported for pipeline parallelism.)r   r)   r  r   r   r   rY   rR   _has_kernelr    r   r+   r   r/   r   r   rz   )r5   r`   device_listvalid_op_role_valuera   r   r   dev_types           r:   _check_validationz#PipelineOptimizer._check_validationz  s     %&&%&&&''"##&''&''#dm.@*A*AA
 ) !	+ !	+B>>"'** w"555GGD-..#dm6K2L2LLLLJ ML ;;t011  GrwGG):GGG 1 ggd/00Gw<<#6666Z7ZZBGZZEXZZ 766 ;;t233  IrwII)<III 3 WWT011F  IbgIII 6 DL.....||C((+Hu$$$N %$$ [((""6***r<   c                    	
 i d}t          t          j                            D ]\                                 r} nd|d
t          t          j                            D ]a\                       j                  }| j         dk    r/j        D ])                              }|j	        r d} 
                              }| j        vrD j                 }|s|r|                     j                  nd}|| j         dk    r||k    rvrg <   ||f         v r|                    d          d         dz   	 fd	
 f
d t          |                    d          d                   t          |                    d          d                              +c                                 dS )	zp
        Insert a pair of send and recv ops for every two
        consecutive ops on different devices.
        Nr   )r   first_optimize_indexr   r   c                                                    }                              }|s|sJ d             |r|| k     sJ d| d|  d             d S |r|| k    sJ d| d|  d             d S d S )Nzdsend/recv in pipeline should only be inserted in forward or backward,please check the op_role of op=zEIn forward, send/recv can only be passed forward, but now prev_stage=z great than cur_stage=z, please check op_device of op=zGIn backward, send/recv can only be passed backward, but now prev_stage=z less than cur_stage=)r   r   )cur_idprev_id
is_forwardis_backwardra   r5   s       r:   _check_stagezKPipelineOptimizer._insert_sendrecv_ops_for_boundaries.<locals>._check_stage  s   !%!4!4R!8!8J"&"6"6r":":K%   ?:<? ? 4
 " 	&///u*1u uIOu upru u  0//// % &///t*1t tHNt toqt t  0// //r<   c                 &  
 t          |           z   }t          |          z   }||f         v rd S | |z
  dk    r= | dz
  |            | | dz
                                          ||f           d S | |z
  dk     r= | dz   |            | | dz                                           ||f           d S t          | |z
            dk    sJ                              ||f                               j                  }j                 }|| f}|dz  | z   }|j        vrAj                            |           j        j        |<   j        }xj        dz  c_        nj        |         }j	        dk    rΉ
                    d         z   dd|ij        |j        |dd	d
dd|i           dxx         dz  cc<   t          |j                  }	|	d         dk     rj        n|	d         |	d<   
                    d         z   dd|gid|	d|j        j        |j        |dd	d
dd|i           dxx         dz  cc<   d S j	        dk    rt          |j                  }	|	d         dk     rj        n|	d         |	d<   t!          j        |	          }
j        dk    o|
j        z  dk    }d|j        v r|j                            d          d         dd         }                    |          }
                    d         z   dd|gid|gid|	d|j        j        |j        |dd	i           dxx         dz  cc<   d S  | |           
                    d         z   dd|gid|gij        |j        |i           dxx         dz  cc<   |j                            d          d         }                    |          }t-          |t.                    rd	nd}
                    d         z   |r|rdndd|ij        |j        |ddd|d
ddj        dj        i           dxx         dz  cc<   d }t3          |          t3          j        j                  k    rd         }j        j        }n}j        j        }
                    |d         z   dd|gid|gij        |j        |d|i          }t3          |          t3          j        j                  k    r&|                    dd            dxx         dz  cc<   
                    d         z   |r|rdnd!d|gid|	d|j        j        |j        |dd	d
dd|dj        dj        i	           dxx         dz  cc<   |r]|s]
                    d         z   d"d|gid|gij        |j        |dd	ddd#j        d$j        i           dxx         dz  cc<   d S d S d S t?          d%j	         d&          )'Nr   r     zF-then-Br   send_v2rE   use_calc_streamTpeerrP   r   r    rJ   rL   r   recv_v2rF   	out_shaperC   r   r    rK   rL   1F1Bsubprogassignr   r    rJ   rK   rL   c_sync_calc_stream@Fpartial_sendnumidr$  c_sync_comm_streampipeline_flagr   partial_recvpartial_allgathernranksrankz@Now only 'F-then-B' and '1F1B' are supported.The given value is r   ) r   rz   absr   r+   r   r1   rP   r2   schedule_mode_insert_op_without_syncr/   r  rB   micro_batch_sizerC   npprod	mp_degreerA   r   rU   r   r   mp_rankr   r)   r   rY   r   r   r   )r&  r'  cur_devprev_devr   rU   pairpair_keyrP   	var_shapenumeluse_mporigin_nameassociate_varprefix_name
prefix_varis_paraminsert_indexnew_op_rolesync_comm_opr*  _insert_send_recvr`   device_typeextra_index_infor   input_var_to_devicera   r5   r   s                       r:   rZ  zPPipelineOptimizer._insert_sendrecv_ops_for_boundaries.<locals>._insert_send_recv  s_   )CKK7G*S\\9H*.A(.KKK'!++))&1*g>>>))&&1*===+H5<<$h/   ')B..))&1*g>>>))&&1*===+H5<<$h/   v/00A5555'188'89LMMM ggd&788G*X.C#V,D&~6H4#666+2248886:l)(3"&,)"&"3H"=)Z7755"'*:7*C"C!*$': $ 3X $ 17 14 & )7#	 6    )111Q6111$(OO	  )|a// !11!*1 "!
 55"'*:7*C"C!*%*SEN +Y ' $ 3W $ 17 14 & )7#	 6    )111Q611111+v55$(OO	  )|a// !11!*1 "! !#	 2 2"&.1"4 "!DN2a7  %00 +.(..*C*CA*Fqt*LK,1IIk,B,BM!99&+.>w.G&G%-(+m_'=).$/$+SY$($7$($5w$5t'" :    -W555:555"F$VW55555"'*:7*C"C!5$'#<%*SEN $ 3X $ 17# 6 	 	 	 )111Q6111&)hnnS&9&9!&<%*YY{%;%;
$.z9$E$EPDD5 ! 55"'*:7*C"C (.!419!4		%3$': $ 3X $ 17 15 )7 & %t~ $dl	# 6   & )111Q6111'+w<<3t}/E+F+FFF+; 6,L +/-*@KK+0L*.-*@K','D'D".1A'1J"J!5$'#<%*SEN $ 3X $ 1; )7# (E 
( 
( w<<3t}/D+E+EEE(22?BGGG,W555:55555"'*:7*C"C (.!419!4		%3%*SEN +Y ' $ 3W $ 17 14 & )7 %t~ $dl# 6   * )111Q6111! ;( ;!99&+.>w.G&G%8(+cU|).$($7$($5w$5t$-q$,dn$*DL'" :    -W555:55555!; ; ; ;$ )H262DH H H  r<   r   )r  r  rR   rw   r   r/   r   r   rU   r   r   r0   r   r   r   )r5   r`   r$  
cur_devicerU   prev_devicer   r*  rZ  r[  r\  r   r]  ra   r   s   ``     @@@@@@@@r:   #_insert_sendrecv_ops_for_boundariesz5PipelineOptimizer._insert_sendrecv_ops_for_boundaries  s    !#"4	??33 	 	IE2##B'' ',$ $8
 

 #4	??33 S	 S	IE2!455J22222. O Oii)); ",,UH==?t'=== "&"8"BK" =DNT%8999$   &+DL9N9N9N*N*N*,,#66646'1,0CH0MMM(..s33A6<     (W W W W W W W W W W W W W Wr "!
((--a011))#..q122   YO` 	r<   c           	         | j         dk    rdS t          t          t          t	          |j                                                D ]\  }}|                     |          ry|j        dk    sJ d|j                     |                    d          sJ t          |
                    d                    }|| j         z  }|                    d|            dS dS )zJ
        Scale the loss corresponding to number of micro-batches.
        r   Nr   z6loss_grad_op must be fill_constant op, but this op is value)r$   r   tupler  r  rR   r   r    r   floatr   r   )r5   r`   r   ra   
loss_scales        r:   _insert_loss_scalez$PipelineOptimizer._insert_loss_scale  s     !Q&&F!%	$uy//(B(B"C"CDD 
	 
	IE2$$R(( 	w/1110&(g0 0 211 {{7+++++"2777#3#344
'$*@@
Wj111	
	 
	r<   c                 t   t          |j                  D ]\  }}|                     |          s|j        }|j        }||z   }|j        dk    s|j        dk    rE|D ]Z}t          j                    |vr|                    t          j                              }|dz   }	| 	                    |||	           [d S )NrD   r=  @MERGED)
r  rR   rw   r   rT   r    r   r   stripr   )
r5   r`   r   ra   r   output_namesin_out_namesrA   r   new_grad_names
             r:   _rename_gradient_var_namez+PipelineOptimizer._rename_gradient_var_name  s    "59-- 	: 	:IE2''++ ,K.L&5Lw&  BG/C$C$C % : :'))55!ZZ(<(>(>??
 $y 0  T=9999:	: 	:r<   Fc                 
   |r|j         nd}|r&|j        r|                     |||j        |          }|S g }d}|rdnd}	|rt          j        nd}
t          t          t          t          |j
                                                D ]+\  }}|                     |          rs|j        dk    rh|j        d         }|j        d         }|                    d          | j        v r2|                    dd	          |k    sJ |                    |           |                     |          r||d
z   }|                     |          rk| j        |j        v r\|                    | j                  }t/          |          dk    rt/          |          dz  dk    sJ t1          dt/          |          d          D ]}d}||         }|                    |          s#d|v r(|t5          j                    z   }||	z   }|                    |          s#|                     ||j        |         ||
           |                    |          sJ |                    |          }|                    |          }d|_        |                     ||z   di d|gid|j!        d|j"        dtG          d          | j$        | j%        j&        j'        i           |d
z  }||d
z            }|j        |         }d|v }||u}|rk|dz   }|                     ||||
          }d|_        |                     ||z   dd|id|id|j"        d|j"        | j$        | j%        j(        i           |d
z  }|}|                     ||z   dd||gid|i| j$        | j%        j(        i           |d
z  }|)                    |           -|s|S d}t          t          t          t          |j
                                                D ]#\  }}|                     |          r	||d
z   } n$|J |D ]}|                    dd	          }|                    dd	          }|                    |          s"|                     ||j        |         |           |                    |          sJ |                    |          }|                    |          }d|_        |                     |dd|id|id|j"        d|j"        | j$        | j%        j&        i           |S )zz
        Create a new merged gradient for each parameter and accumulate the
        corresponding gradient to it.
        FN@MERGED@FP16rh  rD   r   @GRADr   r   r   r  
@BroadCastTr   rF   rB   rC   rb  r7  	cast_fp16@TMPrE   rG   rH   rl   z@FP16z@GRAD@MERGED@FP16)*fp16_allreducefuse_grad_merge_accumulate_gradients_with_fusefuse_grad_size_in_MBr   float16r   rc  r  r  rR   rw   r    r   rT   ri  r0   r   r~   r   r-   r   r   r}   r  has_varr   r   r   r   rU   rn   rX   rB   rC   rd  r+   r)   rY   r  r   rz   )r5   r`   pp_allreduce_in_optimizestrategyshardrt  fused_gradient_namesmerged_gradient_namesfirst_opt_op_idxmerged_suffixrC   r   ra   in_namerb   r  r  rd   r   param_grad_namemerged_param_grad_nameparam_grad_varmerged_param_grad_varr  grad_varis_fp16_grad	need_castcast_grad_var_namecast_grad_varfp16_grad_namefp16_grad_vars                                  r:   _accumulate_gradientsz'PipelineOptimizer._accumulate_gradients  s&    5=G00% 	(0 	(#'#G#G~x'De$ $  (' "*8Gi"0:d!%	$uy//(B(B"C"CDD b	I b	IIE2##B'' BGv,=,=,Q/.q1>>'**d.DDD"??<<<HHHH$$U+++##B'' -,<,D#(19  ##B'' SI%66 ggd&;<<{##q((;''!+q0000q#k"2"2A66 LI LIAF!,QJ ==44 ! #z11 &043G3I3I&IO-<}-L* ==)?@@ ((!!Jz22!	   !==)?@@@@@%*YY%?%?N,1II6L,M,M)8<)5$$.7,!!&)>(? @#%:%@#%:%@#U1XX -t}/E/M %    aKF +AE 2I$z)4H#.)#;L ,N BI  1
 .=v-E*(,(8(8!>3Eu) ) 5:1(("2V";!'$'?%*M$: *HN +]-@ $ 14=3I# ) 
 
 
 !#0$$.7" #&;X%FG!&(= > -t}/E %    aKF)001GHHHH 	)((!%	$uy//(B(B"C"CDD 	 	IE2##B'' ,<,D#(19 +++
 4 	 	N&..w;;I'//0CRHHJ==++ K  
:(>	JJJ==+++++!IIn55Myy++H#(H &]+) 3%t}'=  
 
 
 
 %$r<   c                    |                      ||          }g }|rdnd}|rt          j        nt          j        }d}	d }
|D ]K\  }}|                    |          }|                    |t          j                    z   |z   ||j        dd          }|                    |          }t          |d          r|j
        |_
        |                     |          }t          |          dk    s|	|z   |k    s|j        |
k    r%|                    |g|g|gf           |j        }
d}	|d	         d                             |           |d	         d
                             |           |d	         d                             |           |	|z  }	Mg }g }|D ]}|d         }|d         }|                    d|d         j         |d         j        dd          }|d         j        t          j        k    rdnd}|d|d         j         z   }|                    ||d         j        dd          }|                    |           |                    |           t          |          t          |          k    sJ t          |          t          |          k    sJ d }t!          |j                  D ] \  }}|                     |          r||} n!|J d}t'          t          |                    D ]}||         }||         }||         d         }||         d
         } ||         d         }!|                    ||z   dd| i||dddddddd|d         j        | j        | j        j        dt          j        d          ddi           |d
z  }|                    ||z   dd| i|!|dddddddddddd|!d         j        | j        | j        j        j        i           |d
z  }||z  }d}t'          t          |                    D ]}||         }||         }d|j        v }"|"|u}#|#rj|j        dz   }$|                    |$|dd          }%|                    ||z   dd|id |%id!|j        d"|%j        | j        | j        j        i#           |d
z  }|%}|                    ||z   d$d||gid |i| j        | j        j        i#           |d
z  }|r|D ]\  }}|                    |          }|t          j                    z   dz   }&|                    |&          sJ |                    |&          }'|t          j                    z   dz   }(|                    |(t          j        |j        dd          })|                    ||z   dd|'id |)id!t          j        d"t          j        | j        | j        j        i#           |d
z  }t'          t          |                    D ]}||         j        ||<   ||fS )%Nro  rh  g        TF)rA   rC   rB   rn   rp   r   r   r   r   r  
FusedGrad_)rA   rC   rn   rp   zFusedMergedGrad.cast_fp16.FusedMergedGrad_coalesce_tensorInput)OutputFusedOutputuser_defined_size_of_dtype	copy_data	use_alignrC   set_constantnpuconstantrI   rr  rs  rD   rE   rF   rG   rH   r7  rl   )_sort_grad_param_by_dtyper   rx  float32rU   rW   r   r   rB   r#   r   _get_var_sizer}   rC   rz   rA   r  rR   r   r  rE  r+   r)   r   is_compiled_with_custom_devicerY   r  rX   ry  )*r5   
main_blockfp16
fused_sizegrad_param_pairsr  grad_param_segmentsr  rC   cur_size
last_dtypegradparam	real_gradmerged_grad_var
real_paramtmp_sizefused_gradientsfused_merged_gradientsgrad_param_segmentgrad_segmentmerged_grad_segment
fused_gradfused_merged_grad_name_prefixfused_merged_grad_namefused_merged_gradfirst_back_op_idxr   ra   rd   r  gradsparamsmerged_gradsr  r  r  r  r  	fp16_gradfp32_grad_name	fp32_grads*                                             r:   &_insert_accumulate_gradients_with_fusez8PipelineOptimizer._insert_accumulate_gradients_with_fuse  s     99(
 
 !*.=I"&:FN
+ 	% 	%KD%"t,,I(33T1333mCo # 4  O $..Jz#344 K1;1J.)))44H
 '((A--h&33?j00#**[:,0AB   '_
#B'*11)<<<#B'*11*===#B'*11/BBBH$!#"5 	= 	=-a0L"4Q"7#..8,q/"688"1o+!#	 /  J 'q)/6>AA -,& * .3)!,1334 # !+ 5 5+)!,2 #	 !6 ! ! "":..."))*;<<<<?##s+>'?'?????)**c2E.F.FFFFF !":>22 	 	IE2##B'' ,=,E$)! ,,,s.//00 :	 :	A(+J 6q 9'*1-E(+A.F.q1!4L..!F*&(#(DD 1!U1X^%t}'=
 #D$G$N$N- /   < aKF ..!F*&(*#4 
 1!"D\!_2%t}'='E /   $ aKFF 	F"s?++,, #	 #	A(+J 6q 9&*/9L$D0I + &0_v%=" * 5 5+ %"'	 !6 ! ! %%*V3,"M2"J$4#]%8)4=+A & 
 
 
 !*
!!&//<= 12($-*@A "    aKFF 	/  e&NN400	!&)=)?)?!?.!P!)).99999&NN>::	!&)=)?)?!?)!K&11' .#/ %"' 2  	 %%*V3+"I."FN#V^)4=+A & 
 
 
 ! s12233 	G 	GA(>q(A(F"1%%%'777r<   c           	      D   d }g }t          t          t          t          |j                                                D ]\  }}|                     |          rs|j        dk    rh|j        d         }	|j        d         }
|
	                    d          | j
        v r2|	                    dd          |
k    sJ |                    |           |                     |          r"| |dz   }|t          |j                  k    r d S |                     |          r| j        |j        v r|                    | j                  }t          |          dk    rt          |          dz  dk    sJ t%          dt          |          d          D ]K}||         }|                    |          s d|v r%|                    ||dz            ||         f           Lt          |          dk    rd S |r|j        nd}d	 t%          |          D             }|D ]M}|r|                    |d                   nd}d|cxk    r|k     sn J ||                             |           Ng }|D ]#}|                     |||||          \  }}||z  }$|                                 |S )
NrD   r   rp  r   r   r   r  rq  c                     g | ]}g S  r  ).0r  s     r:   
<listcomp>zEPipelineOptimizer._accumulate_gradients_with_fuse.<locals>.<listcomp>  s    555!2555r<   )r   rc  r  r  rR   rw   r    r   rT   ri  r0   r   r~   r   r}   r-   r   r   r  ry  rz   
worker_numr   r  r   )r5   r  r  r  r|  r  r  r   ra   r  rb   r  r  r   rA  device_to_pairsrM  root_idall_fused_merged_gradientspairsr  s                        r:   rv  z1PipelineOptimizer._accumulate_gradients_with_fusez  s(     !%	$z~2F2F(G(G"H"HII 	 	IE2##B'' BGv,=,=,Q/.q1>>'**d.DDD"??<<<HHHH))%000##B'' ,<,D#(19 #s:>':':::FF##B'' %66 ggd&;<<{##q((;''!+q0000q#k"2"2A66  A!,QJ%--j99 ! #z11 $++$QU+[^<      A%%F%*1!!55uV}}555$ 	2 	2D/4;ell47+++!G((((&((((((G$++D1111%'"$ 	A 	AE ;;D*e5E &  '*@@&&!!###))r<   c                 v   g }g }g }|D ]}|                     |d                   j        }|t          j        k    r|                    |           H|t          j        k    r|                    |           n|                    |           |}|                    |           |                    |           |S )Nr   )rU   rC   r   r  rz   rx  extend)	r5   r  r  
fp16_pairs
fp32_pairsother_pairsr  rC   sorted_pairss	            r:   r  z+PipelineOptimizer._sort_grad_param_by_dtype  s    

% 	* 	*ENN58,,2E&&!!%((((&.((!!%((((""5))))!J'''K(((r<   c                    t           j        j        j        dt           j        j        j        dt           j        j        j        dt           j        j        j        dt           j        j        j        dt           j        j        j        dt           j        j        j	        dt           j        j        j
        dt           j        j        j        di	}d|j        vsJ t          d |j        d          ||j                 z  dz  dz  S )Nr        r   r   c                     | |z  S r   r  )rN   ys     r:   <lambda>z1PipelineOptimizer._get_var_size.<locals>.<lambda>  s
    A r<   g      @)r   r   r   FP16BF16FP32FP64INT16INT32INT64BOOLUINT8rB   r   rC   )r5   rU   dtype_to_sizes      r:   r  zPipelineOptimizer._get_var_size  s    L %qL %qL %qL %qL &L &L &L %qL &

 """"%%sy!44CI&' 	
r<   c                    |j         }|D ]}|                    d          j        D ]}|                    d          s|                    d          j        }|                    |          }|                    d          }|j        D ]7}	|	j        }
|j                                        }|	                    |
           8|
                                 |                     ||           |                    d|           ݌d S )Nr   	sub_block)
parent_idx)r   r`   rR   r   r   r<  _create_blockrS   r   r   r   r   r   )r5   r  r   r   progra   origin_sub_block_idorigin_sub_blocknew_sub_blocksub_opr   r   s               r:   _add_sub_blocksz!PipelineOptimizer._add_sub_blocks  s   !)  	9 	9Djjmm' 9 9{{;// &(ggk&:&:&=##/#5#56I#J#J  $ 2 2a 2 @ @.2 - -F$kG).88::EOOG,,,,,,...!!-1ABBB[-88889	9 	9r<   c                     |j         D ]:}|                    |j                  s|                    | j                  }|c S d S r   )rR   r  r    r   r/   )r5   r`   ra   r   s       r:   _get_device_infoz"PipelineOptimizer._get_device_info  sV    ) 	 	B>>"'**  344I		 	r<   c                 >   i }|D ]s}|                     d          }|j        D ]T}|dk    r	|                    |          }|j        s&||vrg ||<   |||         vr||                             |           Utt          |                                          D ]0}t          ||                   dk    r|                    |           1i }	|                                D ]}||         D ]}|                     d          }|j	        D ]}
|
j
        dk    s!|
j
        dk    s|
j
        dk    s|
j
        dk    r/|
                    | j                  t          | j        j        j                  k    rj||
j                                        v r||	vsJ d| d	|
 d
            ||	|<    n|                                D ]Z}||	vr|	|         }|                     d          }|                     |          }t          |                    d          d                   }||         }|D ]}||k    r
|                     d          }|                     |          }t          |                    d          d                   }||f}|dz  |z   }|| j        vrA| j                            |           | j        | j        |<   | j        }| xj        dz  c_        n| j        |         }|                    ddd|                    |          i| j        |dd| j        | j        j        d|d|i           |                    ddd|                    |          gid|                    |          j        d|                    |          j        | j        |dd| j        | j        j        d|d|i           |                    ddd|                    |          gid|                    |          gi| j        || j        | j        j        d|i           \dS )zu
        Special Case: process persistable vars that exist in
        multiple sections, e.g., shared weight
        r   double_buffer_0r   r1  r  r  rj   z two sections write the same var(z): second op r   r   r,  r-  rE   r.  Fr/  rP   r0  rF   r2  rC   r3  r=  r7  N)r`   r   rU   rn   rz   r  keysr}   poprR   r    r   r+   r   r)   rY   r  rS   rT   r  r   r1   rP   r2   rX   r/   rB   rC   )r5   r   startup_progr   var_infor  r`   r   rU   
write_infora   
write_progwrite_blockwrite_devicewrite_dev_index	all_progs
read_blockread_deviceread_dev_indexrM  rN  rP   s                         r:   +_process_persistable_vars_in_multi_sectionsz=PipelineOptimizer._process_persistable_vars_in_multi_sections  s       	4 	4DJJqMME!J 	4 	4000ii)) 8++)+HX&x111X&--d333	4 X]]__-- 	' 	'H8H%&&!++X&&& 
  	 	H *  

1)  B9,,7&8887f,,7&;;; wwt011S.66 6   !27#;#;#=#==='z999(x ( ("$( ( (  :99 04
8, >. ! G	 G	Hz)) $H-J$**1--K00==L!,"4"4S"9"9!"<==O *I! < <:%%!ZZ]]
"33J??!$[%6%6s%;%;A%>!?!?'8*T1NBt222'..t44426,D%h/"lGLLA%LLL"/9G&&"[__X66 +\)5 )4=+@!7 '     %%""Z^^H%=%=$>?#Z^^H%=%=%C!9!9!?+[)5 )4=+@!7
	 &     %%-*..":":!;<"Z^^H%=%=$>?+[ )4=+@!7 &    a<G	 G	r<   c                     |j                             d          o,|j                             d                              d          S )Nop_namescopez/gradient_cliprS   r   r   
startswithr   s     r:   r   z&PipelineOptimizer._is_gradient_clip_opt  sB    w// 'BGLL5
 5

*%
&
&	'r<   c                     |j                             d          o,|j                             d                              d          S )Nr  z/regularizationr  r   s     r:   r
  z'PipelineOptimizer._is_regularization_opy  sB    w// (BGLL5
 5

*&
'
'	(r<   c                 n    |j                             d          od|j                             d          v S )Nr  zweight decay)rS   r   r   r   s     r:   _is_weight_decay_opz%PipelineOptimizer._is_weight_decay_op~  s:    w
 
 =^ < <<	=r<   c                 *   t          t                    }t          t                    }t          |j                  D ]S\  }}|j        D ]}||                             ||g            |j        D ]}||                             ||g            T||fS )z2
        Get info of op input and output.
        )r   r  r  rR   r   rz   rT   )r5   r`   r3   r4   r   ra   r   s          r:   _get_input_output_infoz(PipelineOptimizer._get_input_output_info  s    
 't,,%d++"59-- 	? 	?IE2. > >)00"e====/ ? ? *112u+>>>>?  00r<   c           	         | j         dk    rdS |                    d          }| j        dk    rdnd}d}t          |j                  D ])\  }}|j        |k    r|                     |          r|} n*|dS d}t          t          |j                            D ]\  }}||k    r n|j        dk    r|                    d          rr|j	        d         }|
                    |          }	|                    ||z   d	
           |dz  }|                    |dd|	gid|	gi| j        | j        j        i           |                                 dS )zC
        optimize forward send's sync_comm_stream schedule
        r4  Nr   r   r1  r?  r=  r>  F)syncnoprE   rF   r7  )rD  r`   rI  r  rR   r    r   r  r   r   rU   r~   rE  r+   r)   r   r   )
r5   r   r`   	recv_typebackward_recv_indexr   ra   rd   r   rU   s
             r:   _optimize_forward_send_syncz-PipelineOptimizer._optimize_forward_send_sync  s    ''Fa  !%1!4!4II.	""59-- 	 	IE2w)##(<(<R(@(@#&+# &F"4	??33 	 	IE2+++w...2;;3O3O.-a0ii))  e <<<! ---#<"SEN,dm.DE .    	r<   c           	         d}d}|                                 }t          |                                 j                  }t          |          D ]}d}|                                 j        |         }t	          |                    | j                            }	|	t	          | j        j                  k    r||}|j	        dk    r"|j	        dk    r|j	        dk    r|j	        dk    r|	t	          | j        j
                  k    r||k    r|dz  }|}n>|	t	          | j        j                  k    r||k    r|dz  }|}nt          d|	           i }
|j        D ]}|                    |          |
|<   i }|j        D ]}|                    |          ||<   |                    ||j	        |
||                                	           |                    |dz              |	t	          | j        j
                  k    r|dz  }|	t	          | j        j                  k    r|dz  }|                                 dS )
zc
        A pass to move the recv op to the beginning of
        the forward/backward phase
        r   Nr?  r@  r  r1  r   zUnknown op_role: r7  )r   r}   rR   r  r   r   r+   r)   r   r    r   r   r   rx   rj  r	  rE  	all_attrsr~   r   )r5   r   forward_insert_indexbackward_insert_indexr`   num_opsr  rW  ra   r   	op_inputsrA   
op_outputss                r:   _mv_head_recvzPipelineOptimizer._mv_head_recv  s{   
  ! $$$&&g**,,011w -	+ -	+AL%%''+A.B"''$"34455G3t}56666)1()%>))G222Gu$$Gy((#dm34444,,,(A-(3C 67777---)Q.)4 !>W!>!>???I 1 1"$((4..	$J 3 3#%99T??
4  ))"W "llnn *    QU####dm34444$)$$C 67777%*%r<   c                    |                                 }t                      }t                      }|j        D ]}|                     |          r4|j        D ]+}|j        |         }|j        r|                    |           ,K|                     |          r#|j	        D ]}||v r|                    |           t          |          dk    rdS t          j        d|            dS )z;
        Pipeline may need multiple forward before
        r   Na  The pipeline requires multiple forward calculations before backward, so when the persistable var is changed in the forward, it may cause errors in the backward calculation who using this persistable var. However, some backward op don't need this var(NoNeedBufferVars), there will be no error at this time.
So please check these persistable vars which changed in forward and used in backward:
)r   ru   rR   r   rT   r   rn   r   r   r   r}   warningswarn)r5   r   r`   persist_outputused_in_backwardra   r   rU   s           r:   _check_pipeline_persist_varz-PipelineOptimizer._check_pipeline_persist_var  s*    $$&&55) 		7 		7B""2&& 7 " 3 5 5H*X.C 5&**84445 %%b)) 7 " 2 7 7H>11(,,X666  A%%FA /?A A	
 	
 	
 	
 	
r<   c                 ~	   |j         }|| _        |j        }|t                      }|j        }|s
J d            g d}|D ]}	|	|v sJ d|	 d            |d         | _        |d         | _        |d         | _        |d         | _        |d	         | _	        |d
         | _
        |d         | _        |d         | _        |                    dd          | _        | j        dk    sJ d| j        cxk    r| j        k     sn J | j                            ||||          \  }
}| j        j        | _        |                     |          \  | _        | _        |                     |           |                     |          }d }t1          |t3          |                    }||k    s
J d            |                     |           |j        }|                     ||          }|D ]*}|                     |                                |           +t=          j        dd           rItA          t=          j        d                    | _        | j        tC          |          k     s
J d            n| xj        tC          |          z  c_        | "                    || j                            | #                    ||           g }|D ]g}tA          |$                    d          d                   }tK          j&                    r*|'                    tK          j(        |dz                       h| )                    || j                  }d|i|_        || j                                                 }| j        s| *                    |           | j        sR| +                    |           |,                                 | -                    |           |,                                 tK          j&                    r"tA          t=          j        dd                    }| .                    || j                            | /                    || j                            dd| j        tC          |          | j        tC          |          || j                 || j                 |d| j0        | j1        d|_        |
||| j2        | j3        fS )NzPlease use pipeline with fleet.)
local_rankrD  rF  rP   rZ   r   rI  rJ  z&Please use pipeline with fleet to use r   r  rD  rF  r   rP   rZ   rI  rJ  scale_gradientFr   r   c                     t          |                     d          d                   }t          |                    d          d                   }||k     rdS ||k    rdS dS )Nr   r   r   r   )r   r   )device1device2dev1_iddev2_ids       r:   
device_cmpz.PipelineOptimizer.minimize.<locals>.device_cmpJ  sc    '--,,Q/00G'--,,Q/00G  r7""qqr<   )r   z`With pipeline parallelism, you must use gpu devices one after another in the order of their ids.PADDLE_MANUAL_PIPELINE_STAGEzTManually specified pipeline stage must be less than total number of pipeline stages.r   r   FLAGS_selected_gpus0PipelineTrainerSectionr   )trainerdevice_workerpipeline_stagenum_pipeline_stagesrD  inner_parallelismsection_programplaceplace_id
sync_stepsr6   r7   )4r`   origin_main_blockr   r
   _pipeline_optr  rD  rF  r   rP   rZ   rI  rJ  getr  r!   minimizer"   r0   r  r3   r4   r  r"  sortedr   r`  r   r   r   osgetenvr   r}   r  r  r   r   r   rz   	CUDAPlacer   rf  rm  r   r  r  r  r$   r%   r1   r2   )r5   lossr   parameter_listno_grad_setr  r   pipeline_optrequired_keysr   optimize_opsparams_gradsr  r  sorted_device_listr   p
place_listdev	dev_indexr   
real_blockr,  s                          r:   r1  zPipelineOptimizer.minimize  s:    Z
!+!)"577O#1>>>>>|	
 	
 	
 ! 	 	C,&&&???? '&&& '|4)/: ,-? @(8#I.*+;<%k2#I.*../?GG~""""DL11114>111111%)_%=%=/>;&
 &
"l "&!7!I
 ''
33	
!  	  ,,,,,Z88	 	 	 $KZ
5K5KLLL![0001 100
 	00<<< ")**<EE 	< 	<Aann..
;;;;93T:: 	0!"),J"K"KLLDO?S%5%5555 6555 OOs;///OO((do)FGGG 	Z666
 	A 	ACCIIcNN1-..I)++ A!!$.Q"?"?@@@ #99T_
 

 2)
% "$/2??AA
" 	0##J///  	( **:666%%'''&&z222%%'''%'' 	B29%:C@@AAH 	<8999 	((do)FGGG )&"o#&{#3#3!/!$[!1!1+DO<0  $ 6!%!8&
 &
" 
 	
r<   )r   r   r   )FNN)NNN).__name__
__module____qualname____doc__r;   rg   r   r   r   r   r   rw   r   r   r   r   r   r   r   r   r   r   r   r   r  r  r"  r`  rf  rm  r  r  rv  r  r  r  r  r  r   r
  r   r  r  r  r  r1  r  r<   r:   r   r   "   s       9 9v)$ )$ )$ )$Z8 8 8tU U Un
 
 

 
 


 
 

J J J
 
 


 
 
$ $ $L  # # #8  (  . . .   &5 5 5
1 1 1- - -  xE xE xEt< < <.3 3 3jg g gR	  $: : :& KO]% ]% ]% ]%~]8 ]8 ]8@ 37<* <* <* <*|  $
 
 
(9 9 9"  | | ||' ' '
( ( (
= = =1 1 1"* * *X7 7 7r
 
 
> LPW
 W
 W
 W
 W
 W
r<   r   )r3  r  collectionsr   	functoolsr   r   numpyrG  r   paddle.baser   r   paddle.base.frameworkr   r	   r
   r   __all__r   r  r<   r:   <module>rM     s    
			  # # # # # # ( ( ( ( ( ( ( (      ) ) ) ) ) ) ) )            M
 M
 M
 M
 M
 M
 M
 M
 M
 M
r<   