
    zj                          d Z ddlZddlmZ ddlmZ g Z G d d          Z G d de          Z G d	 d
          Z	 G d d          Z
dS )z
Communicator is used for async distribute training in distribute_transpiler mode.
It's a wrapper of a cpp class Communicator and should be used inside fleet API.
    N)DistributedMode)corec                   f    e Zd ZddZ	 ddZ	 	 	 ddZd Zd	 Zd
 Zd Z	d Z
d Zd Zd ZddZdS )CommunicatorNc                    ||i }n|t           j        k    rd                    |d                   |d<   t          |d                   |d<   t          |d                   |d<   t          |d                   |d<   t          |d                   |d<   d}|t           j        k    rd}n8|t           j        k    rd	}n%|t           j        k    rd
}n|t           j        k    rd}|| _        || _        d| _	        d| _
        d| _        dS )a  
        Communicator is used for async distribute training in distribute_transpiler mode.
        It's a wrapper of a cpp class Communicator and should be used inside fleet API.

        Args:
            program(Program): the trainers program after transpile of distribute_transpiler.
            It's used by communicator to extract the information to do communication.

        Returns:
            None

        Examples:
            .. code-block:: python

                >>> import paddle

                >>> prog = paddle.static.Program()
                >>> comm = paddle.distributed.communicator.Communicator(prog)
                >>> comm.start()
                >>> comm.stop()
        N,pserver_endpointstrainers
trainer_idneed_global_stepbarrier_table_idSYNCASYNC
HALF_ASYNCGEO)r   r   joinstrr   r   r   modeenvscommunicator_	send_ctx_	recv_ctx_)selfr   kwargsr   mode_strs        o/lsinfo/ai/hellotax_ai/data_center/backend/venv/lib/python3.11/site-packages/paddle/distributed/communicator.py__init__zCommunicator.__init__*   s%   0 >|+++,/HH./- -()  #6*#566D!$VL%9!:!:D'*62D+E'F'FD#$'*62D+E'F'FD#$?'''HH_***HH_///#HH_(((H		!    c           	          |t           j                                        }t          j        | j        |||||| j                  | _        || _        || _	        d S N)
paddlestaticglobal_scoper   DistCommunicatorr   r   r   r   r   )r   send_ctxrecv_ctx	proto_txtunit64_hostsscopes         r   init_with_ctxzCommunicator.init_with_ctxa   s^     =M..00E!2II
 
 "!r     '     c                 >    | j                             |||           d S r    )r   "create_client_to_client_connection)r   pserver_timeout_mspserver_connect_timeout_ms	max_retrys       r   r/   z/Communicator.create_client_to_client_connectionr   s1     	== :I	
 	
 	
 	
 	
r   c                 4    | j                                         S r    )r   get_client_infor   s    r   r4   zCommunicator.get_client_info|   s    !11333r   c                 :    | j                             |           d S r    )r   set_clients)r   	host_lists     r   r7   zCommunicator.set_clients   s    &&y11111r   c                 h    | j         t          d           dS | j                                          dS )a  
        Start communicator. Should call before training process.

        Returns:
            None

        Examples:
            .. code-block:: python

                >>> import paddle

                >>> prog = paddle.static.Program()
                >>> comm = paddle.distributed.communicator.Communicator(prog)
                >>> comm.start()
                >>> comm.stop()
        Nz;you must call init_with_ctx first to init comm before start)r   printstartr5   s    r   r;   zCommunicator.start   s<    " %OPPPF  """""r   c                 h    | j         t          d           dS | j                                          dS )a  
        Stop communicator. Should call after training process.

        Returns:
            None

        Examples:
            .. code-block:: python

                >>> import paddle

                >>> prog = paddle.static.Program()
                >>> comm = paddle.distributed.communicator.Communicator(prog)
                >>> comm.start()
                >>> comm.stop()
        N:you must call init_with_ctx first to init comm before stop)r   r:   stopr5   s    r   r>   zCommunicator.stop   s<    " %NOOOF!!!!!r   c                 h    | j         t          d           dS | j                                          dS )aZ  
        Get communicator is running or stop.

        Returns:
            bool

        Examples:
            .. code-block:: python

                >>> import paddle

                >>> prog = paddle.static.Program()
                >>> comm = paddle.distributed.communicator.Communicator(prog)
                >>> comm.is_running()
        Nr=   )r   r:   
is_runningr5   s    r   r@   zCommunicator.is_running   s<      %NOOOF%%'''''r   c                 8    | j                                          d S r    )r   recvr5   s    r   rB   zCommunicator.recv       !!!!!r   c                 :    | j                             |           d S r    )r   init_paramsr   contexts     r   rE   zCommunicator.init_params   s    &&w/////r   c                 :    | j                             |           d S r    )r   
pull_denserF   s     r   rI   zCommunicator.pull_dense   s    %%g.....r   c                 j   |t           j                                        }|                                 st	          d          t          |t                    sJ t          |t                    sJ |dk    r| j        |         	                                }| j
                            |||           d S )NzTCommunicator should init first. Using fleet.init_worker() before push_sparse_param()rJ   )r!   r"   r#   r@   
ValueError
isinstancer   intr   table_idr   push_sparse_param)r   var_namerO   r)   s       r   rP   zCommunicator.push_sparse_param   s    =M..00E   	f   (C((((((C(((((r>>~h/88::H,,XxGGGGGr   )NNr    )r+   r,   r-   )rJ   N)__name__
__module____qualname__r   r*   r/   r4   r7   r;   r>   r@   rB   rE   rI   rP    r   r   r   r   )   s        5 5 5 5p BF" " " "& "#(	
 
 
 
4 4 42 2 2# # #," " ",( ( (*" " "0 0 0/ / /H H H H H Hr   r   c                   2     e Zd Zd fd	Zd Zd Zd Z xZS )FLCommunicatorNc                     d }t                                          ||           i }i }d}d| _        |                     ||||           d S )N WITH_COORDINATOR)superr   r   r*   )r   ps_hostsr   r   r%   	dense_mapprototxt	__class__s          r   r   zFLCommunicator.__init__   sX    v&&&	&	8Y(CCCCCr   c                 N    | j         | j                             ||           d S d S r    )r   start_coordinator)r   self_endpointtrainer_endpointss      r   ra   z FLCommunicator.start_coordinator   s>    )000     *)r   c                 f    | j         | j                             |           d S t          d          )Nzself.communicator_ is null)r   save_fl_strategyrL   )r   mps     r   re   zFLCommunicator.save_fl_strategy   s7    )//333339:::r   c                 J    i }| j         | j                                         }|S r    )r   query_fl_clients_info)r   info_mps     r   rh   z$FLCommunicator.query_fl_clients_info   s)    )(>>@@Gr   r    )rR   rS   rT   r   ra   re   rh   __classcell__)r_   s   @r   rW   rW      sp        D D D D D D  ; ; ;      r   rW   c                   &    e Zd Zd Zd Zd Zd ZdS )LargeScaleKVc                 6    t          j                    | _        d S r    )r   rl   scale_kvr5   s    r   r   zLargeScaleKV.__init__   s    )++r   c                 <    | j                             ||           d S r    )rn   saver   varnamedirnames      r   rp   zLargeScaleKV.save        7G,,,,,r   c                 <    | j                             ||           d S r    )rn   loadrq   s      r   rv   zLargeScaleKV.load   rt   r   c                 6    | j                             |          S r    )rn   size)r   rr   s     r   rx   zLargeScaleKV.size  s    }!!'***r   N)rR   rS   rT   r   rp   rv   rx   rU   r   r   rl   rl      sP        , , ,- - -- - -+ + + + +r   rl   c                       e Zd Zd Zd ZdS )HeterClientc                 <    t          j        |||          | _        d S r    )r   rz   heter_client_)r   endpointprevious_endpointr   s       r   r   zHeterClient.__init__  s%    !-'
 
r   c                 8    | j                                          d S r    )r|   r>   r5   s    r   r>   zHeterClient.stop  rC   r   N)rR   rS   rT   r   r>   rU   r   r   rz   rz     s2        
 
 

" " " " "r   rz   )__doc__r!   "paddle.distributed.ps.utils.publicr   paddle.frameworkr   __all__r   rW   rl   rz   rU   r   r   <module>r      s   : 
  > > > > > > ! ! ! ! ! !
nH nH nH nH nH nH nH nHb    \   :+ + + + + + + +" " " " " " " " " "r   