
    zj7                       d dl mZ d dlZd dlZd dlZd dlZd dlmZ d dlm	Z	m
Z
mZmZ d dlmZ d dlmZ d dlmZmZ d dlmZ e	r'd d	lmZ  ed
d          Z G d dee                   Z edg d          ZdZdZdZdad ad Z d Z!d Z"d Z#d Z$	 	 	 d6d7d"Z%ddefd8d,Z&ddefd9d.Z'd/ Z(d0 Z)d:d1Z*d;d2Z+d<d4Z,d=d5Z-dS )>    )annotationsN)
namedtuple)TYPE_CHECKINGAnyProtocolTypeVar)core)Node)
PythonFunc
_serialize)logger)Callable_RetTT)	covariantc                      e Zd ZddZdS )_FutureWrapperreturnr   c                    d S N )selfs    j/lsinfo/ai/hellotax_ai/data_center/backend/venv/lib/python3.11/site-packages/paddle/distributed/rpc/rpc.pywaitz_FutureWrapper.wait#   s          N)r   r   )__name__
__module____qualname__r   r   r   r   r   r   "   s        $$$$$$r   r   
WorkerInfo)namerankipportiic                
    | a d S r   _barrier_store)stores    r   _set_barrier_storer(   2   s    NNNr   c                     b d S r   r%   r   r   r   _del_barrier_storer*   7   s	    r   c                    t          j        t          | |||                    }t                              t          |          |           d S r   )pickledumpsr   r&   setstr)r   r    r!   r"   	self_infos        r   _set_self_infor1   <   sA    ZdB==>>Is4yy),,,,,r   c                @   g }t                      }t          |           D ]}}t          j        t                              t          |                              }|j        |vs
J d            |                    |j                   |	                    |           ~|S )Nz:The Worker name must be unique, but name `{}` is repeated.)
r.   ranger,   loadsr&   getr/   r   addappend)
world_size	all_infossr    infos        r   _exchange_all_service_infosr<   A   s    IAj!!  |N..s4yy99::y!!!H "!! 	
dir   c                 |    t                      } |                                 }|                                 }| d| S )N:)r
   get_host_ipget_free_port)noder!   	free_ports      r   _gen_endpointrC   N   sA    66D					B""$$I9r   r   r/   r    
int | Noner8   master_endpoint
str | Noner   Nonec                
   |t          t          j        d                   n|}|t          t          j        d                   n|}t          j        dd          }|t	                      }t          j        d| d|            ||nt          j        d         }|                    d          \  }}t          |          }t          t          j        d	d
                    }t          j	        |||dk    ||          }t          |           |                    d          \  }	}
t          |
          }
t          | ||	|
           t          |          }g }|D ]B}t          j        |j        |j        |j        |j                  }|                    |           Ct          j        | |           t          j                     t+          ||           t          j                     t          j        d| d           dS )aU  
    init rpc. Warning: All RPC API should only be used internally within a secure network environment and
    must not be accessible via the public internet.

    Args:
        name (str): worker name.
        rank (int, optional): worker id, default is None.
        world_size (int, optional): number of workers, default is None.
        master_endpoint (str, optional): id address of master, other nodes communicate with the master to
            get the information of all worker nodes, default is None.

    Returns:
        None.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle.distributed.rpc as rpc

            >>> rpc.init_rpc("worker0", rank=0, world_size=1,
            ...             master_endpoint="127.0.0.1:8001")

            >>> rpc.shutdown()

    NPADDLE_TRAINER_IDPADDLE_TRAINERS_NUMPADDLE_WORKER_ENDPOINTTrainer z: worker endpoint: PADDLE_MASTER_ENDPOINTr>   FLAGS_stop_check_timeout900r   )timeoutz: Init RPC done!)intosenvirongetenvrC   r   r;   splitr	   TCPStorer(   r1   r<   r   r   r    r!   r"   r7   init_and_set_agent_instancerpc_start_worker_barrier_never_timeoutrpc_start_client)r   r    r8   rE   worker_endpointmaster_addrmaster_portstop_check_timeoutr'   r!   r"   r9   c_infos	node_infor;   s                  r   init_rpcra   U   s   @ 48<3rz-.///TD  	BJ,-... 
 i 8$??O'//
KE4EEOEEFFF & 	Z01 
  /44S99Kk""KRY'A5IIJJM	"  E u$$S))HBt99D4r4(((+J77IG  	NINIL).
 
 	t$T73334,,,
K1411122222r   tofnCallable[..., _RetT]argstuple[Any, ...] | Nonekwargsdict[str, Any] | NonerP   rQ   c                P    t          | ||||          }|                                S )a;  
    Make a blocking RPC call to run function ``fn`` on worker ``to``. Warning: All RPC API should
    only be used internally within a secure network environment and must not be accessible via
    the public internet.

    Args:
        to (str): name of the destination worker.
        fn (fn): a callable function, such as Python callables.
        args (tuple, optional): the argument tuple for the ``fn`` invocation, default is None.
        kwargs (dict, optional): is a dictionary of keyword arguments for the ``fn``
                       invocation, default is None.
        timeout (int, optional): timeout in seconds to use for this RPC. If
                                   the RPC does not complete in this amount of
                                   time, an exception indicating it has
                                   timed out will be raised. A value less than or equal to 0
                                   indicates an infinite timeout, i.e. a timeout
                                   error will never be raised. The default value is -1.

    Returns:
        Returns the result of running ``fn`` with ``args`` and ``kwargs``.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle.distributed.rpc as rpc

            >>> def add(a, b):
            ...     return a + b

            >>> rpc.init_rpc("worker0", rank=0, world_size=1,
            ...         master_endpoint="127.0.0.1:8002")

            >>> ret = rpc.rpc_sync("worker0", add, args=(2, 3))
            >>> rpc.shutdown()

    )_invoke_rpcr   )rb   rc   re   rg   rP   futs         r   rpc_syncrl      s(    X b"dFG
4
4C88::r   _FutureWrapper[_RetT]c                (    t          | ||||          S )a  
    Make a non-blocking RPC call to run function ``fn`` on worker ``to``. Warning: All RPC API should
    only be used internally within a secure network environment and must not be accessible via the public internet.

    Args:
        to (str): name of the destination worker.
        fn (fn): a callable function, such as Python callables.
        args (tuple, optional): the argument tuple for the ``fn`` invocation, default is None.
        kwargs (dict, optional): is a dictionary of keyword arguments for the ``fn``
                       invocation, default is None.
        timeout (int, optional): timeout in seconds to use for this RPC. If
                                   the RPC does not complete in this amount of
                                   time, an exception indicating it has
                                   timed out will be raised. A value less than or equal to 0
                                   indicates an infinite timeout, i.e. a timeout
                                   error will never be raised. The default value is -1.

    Returns:
        Returns a :class:`FutureWrapper` object that can be waited
        on. When completed, the return value of ``fn`` on ``args`` and
        ``kwargs`` can be got by `fut.wait()`.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle.distributed.rpc as rpc

            >>> def add(a, b):
            ...     return a + b

            >>> rpc.init_rpc("worker0", rank=0, world_size=1,
            ...         master_endpoint="127.0.0.1:8003")

            >>> fut = rpc.rpc_async("worker0", add, args=(2, 3))
            >>> print(fut.wait())
            5

            >>> rpc.shutdown()

    )rj   )rb   rc   re   rg   rP   s        r   	rpc_asyncro      s    ` r2tVW555r   c                    |r|nd}|r|ni }t          t          |||                    }|dz  }|dk    rt          n|}t          j        | ||          }|S )Nr   i  r   )r   r   _MAX_RPC_TIMEOUT_MSr	   
invoke_rpc)rb   rc   re   rg   rP   
serial_obj
timeout_msfutures           r   rj   rj     sk    44RD%VV2FJr48899J4J(2a$$ZJ_RZ88FMr   c                    t          j        t                    |dk     rd S dt          t                    z   dz   t          dz  a dk    } fd}|rTfdt          d|          D             }t                              t          d          z   d            ||           d S t          d          z   g} ||           t                              t                     z   d           d S )	N)days   zBarrier//   r   c                l   t          j                     }t          |           dk    rt          j        d           t          j                     |z
  }t          j        |          k    rt          d|  d d          t          t          d |                     } t          |           dk    d S d S )Nr   g?)secondszKeys z are not ready since rank z is waiting them.c                X    t          t                              |                     dk    S )Nrz   )rQ   r&   r5   )keys    r   <lambda>zC_barrier_never_timeout.<locals>._check_keys_ready.<locals>.<lambda>$  s!    3~'9'9#'>'>#?#?1#D r   )timelensleepdatetime	timedeltaRuntimeErrorlistfilter)	wait_keys
start_timeelapse_timeglobal_rankrP   s      r   _check_keys_readyz1_barrier_never_timeout.<locals>._check_keys_ready  s    Y[[
)nnq  JsOOO)++
2K!+666@@"_I_____   DDiPP I )nnq      r   c                4    g | ]}t          |          z   S r   )r/   ).0r    barrier_prefixs     r   
<listcomp>z*_barrier_never_timeout.<locals>.<listcomp>*  s1     
 
 
+/NSYY&
 
 
r   )r   r   _BARRIER_TIMEOUT_MAX_DAYSr/   _barrier_countr3   r&   r6   )r   global_world_size	is_masterr   r   r   rP   s   `    @@r   rY   rY     s9    &?@@@G1  #n"5"55;NaNq I       A
 
 
 
38<M3N3N
 
 
	 	>CFF2A666)$$$$$#c!ff,-	)$$$>C,<,<<a@@@@@r   c                     t                      } | j        }t          t                                }t	          ||           t          j                     t                       t          j	        d| d           dS )a  
    Perform a shutdown of the RPC agent, stop the worker and destroy the agent.
    This will block until all local and remote RPC processes reach this method
    and wait for all outstanding work to complete. Warning: All RPC API should
    only be used internally within a secure network environment and must not be
    accessible via the public internet.

    Returns:
        None.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle.distributed.rpc as rpc

            >>> rpc.init_rpc("worker0", rank=0, world_size=1,
            ...             master_endpoint="127.0.0.1:8004")

            >>> rpc.shutdown()

    rL   z: rpc shutdown!N)
get_current_worker_infor    r   get_all_worker_infosrY   r	   rpc_stop_workerr*   r   r;   )r;   r    r8   s      r   shutdownr   5  sw    . #$$D9D)++,,J4,,,
K0400011111r   c                *    t          j        |           S )aa  
    Get worker information by worker name. Warning: All RPC API should
    only be used internally within a secure network environment and must
    not be accessible via the public internet.

    Args:
        name (str): name of the worker.

    Returns:
        class `WorkerInfo` with attribute `name`, `rank`, `ip` and `port`.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle.distributed.rpc as rpc
            >>> import os

            >>> os.environ["PADDLE_WORKER_ENDPOINT"] = "127.0.0.1:9002"
            >>> rpc.init_rpc("worker0", rank=0, world_size=1,
            ...             master_endpoint="127.0.0.1:8005")

            >>> print(rpc.get_worker_info("worker0"))
            {name: worker0, rank: 0, ip: 127.0.0.1, port: 9002}

            >>> rpc.shutdown()

    )r	   rpc_get_worker_info)r   s    r   get_worker_infor   V  s    : #D)))r   list[WorkerInfo]c                 (    t          j                    S )a  
    Get all worker information. Warning: All RPC API should only be used
    internally within a secure network environment and must not be
    accessible via the public internet.

    Returns:
        List[WorkerInfo].

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle.distributed.rpc as rpc
            >>> import os

            >>> os.environ["PADDLE_WORKER_ENDPOINT"] = "127.0.0.1:9003"
            >>> rpc.init_rpc("worker0", rank=0, world_size=1,
            ...         master_endpoint="127.0.0.1:8006")

            >>> print(rpc.get_all_worker_infos())
            [{name: worker0, rank: 0, ip: 127.0.0.1, port: 9003}]

            >>> rpc.shutdown()

    )r	   rpc_get_all_worker_infosr   r   r   r   r   v  s    4 (***r   c                 (    t          j                    S )a"  
    Get current worker information. Warning: All RPC API should only be used internally
    within a secure network environment and must not be accessible via the public internet.

    Returns:
        class `WorkerInfo` with attribute `name`, `rank`, `ip` and `port`.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle.distributed.rpc as rpc
            >>> import os

            >>> os.environ["PADDLE_WORKER_ENDPOINT"] = "127.0.0.1:9004"
            >>> rpc.init_rpc("worker0", rank=0, world_size=1,
            ...             master_endpoint="127.0.0.1:8007")

            >>> print(rpc.get_current_worker_info())
            {name: worker0, rank: 0, ip: 127.0.0.1, port: 9004}

            >>> rpc.shutdown()

    )r	   rpc_get_current_worker_infor   r   r   r   r     s    2 +---r   )NNN)
r   r/   r    rD   r8   rD   rE   rF   r   rG   )rb   r/   rc   rd   re   rf   rg   rh   rP   rQ   r   r   )rb   r/   rc   rd   re   rf   rg   rh   rP   rQ   r   rm   )r   rG   )r   r/   r   r   )r   r   )r   r   ).
__future__r   r   rR   r,   r   collectionsr   typingr   r   r   r   paddle.baser	   !paddle.distributed.launch.contextr
   paddle.distributed.rpc.internalr   r   %paddle.distributed.utils.launch_utilsr   collections.abcr   r   r   r   _DEFAULT_RPC_TIMEOUTrq   r   r&   r   r(   r*   r1   r<   rC   ra   rl   ro   rj   rY   r   r   r   r   r   r   r   <module>r      s   # " " " " "  				   " " " " " " 8 8 8 8 8 8 8 8 8 8 8 8       2 2 2 2 2 2 B B B B B B B B 8 8 8 8 8 8 %((((((GGt,,,E% % % % %% % % % Z&D&D&DEE
   $    
  
- - -

 
 
   !"&	I3 I3 I3 I3 I3^ $($('- - - - -f $($('06 06 06 06 06f  $A $A $AN2 2 2 2B* * * *@+ + + +:. . . . . .r   