o
    ~ri(                     @   s2  d Z ddlZddlZddlZddlZddlmZ ddlm	Z	 ddl
mZmZmZ e r5ddlZddlmZ eeZdd	 Ze rKe rKdd
lmZ nddlmZ G dd deZG dd deZdadd Zdd Zdd Zdd Zdd Zdd Z d*ddZ!dd Z"d+d!d"Z#d,d$d%Z$d+d&d'Z%d(d) Z&dS )-z
Integration with Deepspeed
    N)partialmethod   )dep_version_check)is_accelerate_availableis_torch_availablelogging)nnc                  C   sD   t jdd u} | r z	t jd}W dS  t jjy   Y dS w d S )N	deepspeedTF)	importlibutil	find_specmetadataPackageNotFoundError)package_exists_ r   q/lsinfo/ai/hellotax_ai/llm_service/venv_embed/lib/python3.10/site-packages/transformers/integrations/deepspeed.pyis_deepspeed_available$   s   r   )HfDeepSpeedConfig)objectc                       s    e Zd ZdZ fddZ  ZS )r   aJ  
    This object contains a DeepSpeed configuration dictionary and can be quickly queried for things like zero stage.

    A `weakref` of this object is stored in the module's globals to be able to access the config from areas where
    things like the Trainer object is not available (e.g. `from_pretrained` and `_get_resized_embeddings`). Therefore
    it's important that this object remains alive while the program is still running.

    [`Trainer`] uses the `HfTrainerDeepSpeedConfig` subclass instead. That subclass has logic to sync the configuration
    with values of [`TrainingArguments`] by replacing special placeholder values: `"auto"`. Without this special logic
    the DeepSpeed configuration is not modified in any way.

    Args:
        config_file_or_dict (`Union[str, Dict]`): path to DeepSpeed config file or dict.

    c                    s(   t |  td td t | d S )N
accelerater	   )set_hf_deepspeed_configr   super__init__selfconfig_file_or_dict	__class__r   r   r   J   s   zHfDeepSpeedConfig.__init__)__name__
__module____qualname____doc__r   __classcell__r   r   r   r   r   9   s    r   c                       sX   e Zd ZdZ fddZdd Zdd Zdd
dZeeddZ	dddZ
dd Z  ZS )HfTrainerDeepSpeedConfigz
    The `HfTrainerDeepSpeedConfig` object is meant to be created during `TrainingArguments` object creation and has the
    same lifespan as the latter.
    c                    s   t  | d | _g | _d S N)r   r   _dtype
mismatchesr   r   r   r   r   X   s   
z!HfTrainerDeepSpeedConfig.__init__c                 C   s   | j d u r	td| j S )Nz8trainer_config_process() wasn't called yet to tell dtype)r&   
ValueErrorr   r   r   r   dtype]   s   
zHfTrainerDeepSpeedConfig.dtypec                 C   s   |  |}|d u rdS |dkS )NFauto)	get_value)r   ds_key_longvalr   r   r   is_autob   s   
z HfTrainerDeepSpeedConfig.is_autoNTc              
   C   s   |  |\}}|du rdS ||dkr|||< dS |sdS ||}|dur?||krA| jd| d| d| d|  dS dS dS )a  
        A utility method that massages the config file and can optionally verify that the values match.

        1. Replace "auto" values with `TrainingArguments` value.

        2. If it wasn't "auto" and `must_match` is true, then check that DS config matches Trainer
        config values and if mismatched add the entry to `self.mismatched` - will assert during
        `trainer_config_finalize` for one or more mismatches.

        Nr+   z- ds =z vs hf )find_config_nodegetr'   append)r   r-   hf_valhf_key
must_matchconfigds_keyds_valr   r   r   
fill_matchi   s   
(z#HfTrainerDeepSpeedConfig.fill_matchF)r6   c                 C   sH  |j |j |j }| d|jd|  | d|jd | d|d|  | d|jd | d|jd	 | d
|j|jgd | d|jd | d|j	d | 
dd | d|jd	 |jrr| jdi | jd< |j| jd d< | d|jpz|jd | d|jp|jd | drtj| _dS | drtj| _dS tj| _dS )z
        Adjust the config with `TrainingArguments` values. This stage is run during `TrainingArguments` object
        creation.
        train_micro_batch_size_per_gpuper_device_train_batch_sizegradient_accumulation_stepstrain_batch_sizeztrain_batch_size (calculated)gradient_clippingmax_grad_normzoptimizer.params.lrlearning_ratezoptimizer.params.betaszadam_beta1+adam_beta2zoptimizer.params.epsadam_epsilonzoptimizer.params.weight_decayweight_decayzscheduler.params.warmup_min_lrr   zscheduler.params.warmup_max_lr
checkpointuse_node_local_storagezfp16.enabledzfp16|fp16_full_evalzbf16.enabledzbf16|bf16_full_evalN)
world_sizer<   r=   r:   r@   rA   
adam_beta1
adam_beta2rB   rC   	fill_onlysave_on_each_noder7   r2   fp16fp16_full_evalbf16bf16_full_evalis_truetorchbfloat16r&   float16float32)r   argsauto_find_batch_sizer>   r   r   r   trainer_config_process   sN   


z/HfTrainerDeepSpeedConfig.trainer_config_processc                    sZ  g d} fdd|D }t |dkrd}t|drYt|jdr%|jj}n4t|jdr2t|jj}n't|jd	rEt|jjdrE|jjj}nt|jd	rYt|jjdrYt|jjj}|du retd
| d d||   	 r dt
d| |   dd|   d|d  d||d t  jdkrd j}td| ddS )z
        This stage is run after we have the model and know num_training_steps.

        Now we can complete the configuration process.
        )$zero_optimization.reduce_bucket_size-zero_optimization.stage3_prefetch_bucket_size4zero_optimization.stage3_param_persistence_thresholdc                    s   g | ]	}  |r|qS r   )r/   ).0xr)   r   r   
<listcomp>       zDHfTrainerDeepSpeedConfig.trainer_config_finalize.<locals>.<listcomp>r   Nr7   hidden_sizehidden_sizestext_configzThe model's config file has neither `hidden_size` nor `hidden_sizes` entry, therefore it's not possible to automatically fill out the following `auto` entries in the DeepSpeed config file: zb. You can fix that by replacing `auto` values for these keys with an integer value of your choice.rW   rX   g?rY   
   z scheduler.params.total_num_stepsznum_training_steps (calculated)z!scheduler.params.warmup_num_stepswarmup_steps
z]Please correct the following DeepSpeed config values that mismatch TrainingArguments values:
zF
The easiest method is to set these DeepSpeed config values to 'auto'.)lenhasattrr7   r^   maxr_   r`   r(   rI   is_zero3intr:   get_warmup_stepsr'   join)r   rT   modelnum_training_stepshidden_size_based_keyshidden_size_auto_keysr^   r'   r   r)   r   trainer_config_finalize   s^   	

z0HfTrainerDeepSpeedConfig.trainer_config_finalize)NTF)r   r    r!   r"   r   r*   r/   r:   r   rI   rV   ro   r#   r   r   r   r   r$   R   s    

:r$   c                 C   s   t | ad S r%   )weakrefref_hf_deepspeed_config_weak_ref)hf_deepspeed_config_objr   r   r   r   	  s   r   c                   C   s   d a d S r%   )rs   r   r   r   r   unset_hf_deepspeed_config  s   ru   c                   C   s    t d urt  d urt   S dS )NF)rs   rg   r   r   r   r   is_deepspeed_zero3_enabled  s   
rv   c                   C   s   t d urt  d urt  jS d S r%   )rs   r7   r   r   r   r   deepspeed_config  s   rw   c              	      s   ddl ddl}ddlm} ddlm  |   fdd| * |  | | j W d   n1 s:w   Y  W d   dS W d   dS 1 sRw   Y  dS )aA  
    DeepSpeed ZeRO-3 variant of `PreTrainedModel.initialize_weights`. Mirrors the `smart_apply`
    dispatch logic but gathers each module's partitioned parameters before calling
    `_initialize_weights`, so initialization operates on full tensors instead of empty shards.
    Only rank 0 performs the actual init.
    r   Nr   )guard_torch_init_functions)PreTrainedModelc                    s   |   D ]}t| r||j q|| qt| jdd}|rQjj|dd j dkr?||  W d    d S W d    d S 1 sJw   Y  d S ||  d S )NF)recurser   modifier_rank)	children
isinstance_initialize_weightslist
parameterszeroGatheredParameterscommget_rank)model_or_modulefnchildparamsry   _apply_zero3r	   is_remote_coder   r   r   4  s   
"z.initialize_weights_zero3.<locals>._apply_zero3)	r	   rP   initializationrx   modeling_utilsry   r   no_gradr   )rk   rP   rx   r   r   r   initialize_weights_zero3%  s   
"r   c                    s  t  }|dur1|di dd}|di }t|tr)t||di dd}|dkr1tddd	lm mm	m
} t|d
d}| j}i }	|   D ]\}
}tj|j|jdd|	|
< qNfdd|D } fdd|D }t|dkri }| D ]\}}|||g ||	\}}||	v r|||< q}|dur||_|S dd |D }i }i }t| fddd}|D ]:}||}||||||	\}}||	v r|dur|| } |j|j|jd}|||}||||| q|||< q| D ]A\}}z%|j|| | jd}| D ]\}}t|tr|d n|}|||< qW q t y6 } zt!d| d| |d}~ww |dur?||_|S )z
    Apply weight conversions (renaming and merging/splitting operations) to a state dict.
    This is a simplified version that handles the conversion without loading into the model.
    Ntensor_parallelautotp_size   	inferencetp_sizezWeight conversions (e.g., MoE expert fusion) with DeepSpeed Tensor Parallelism are not yet implemented but support is coming soon. Please disable tensor_parallel in your DeepSpeed config or convert your checkpoint to the expected format first.r   )WeightConverterWeightRenamingdot_natural_keyrename_source_key	_metadatameta)r*   devicec                       g | ]	}t | r|qS r   r~   rZ   entry)r   r   r   r\   k  r]   z;_apply_weight_conversions_to_state_dict.<locals>.<listcomp>c                    r   r   r   r   )r   r   r   r\   l  r]   r   c                 S   s   i | ]}|j D ]}||qqS r   )source_patterns)rZ   	converterkr   r   r   
<dictcomp>{  s    z;_apply_weight_conversions_to_state_dict.<locals>.<dictcomp>c                    s    | S r%   r   )r   )r   r   r   <lambda>  s    z9_apply_weight_conversions_to_state_dict.<locals>.<lambda>)key)r   target_patterns
operations)rk   r7   z'Failed to apply weight conversion for 'zb'. This likely means the checkpoint format is incompatible with the current model version. Error: )"rw   r2   r~   dictrf   NotImplementedErrorcore_model_loadingr   r   r   r   getattrbase_model_prefix
state_dictitemsrP   emptyshaper*   rd   r   sortedkeyspopr   r   r   
setdefault
add_tensorconvertr7   r   	ExceptionRuntimeError)rk   r   weight_mapping	ds_configr   inference_configr   r   prefixmodel_state_dictr   param	renamings
convertersnew_state_dictoriginal_keytensorrenamed_keyr   pattern_to_converterconversion_mappingsorted_keyssource_patternr   new_convertermappingrealized_valuetarget_nameer   )r   r   r   r   '_apply_weight_conversions_to_state_dictH  s   


r   c                    s   t |dd| }dur|_d}|durt |dd}|dur0t|dkr0t| ||}|| _g  |  t t | ddfdd|	 D }dd
t
jf fdd| |d	d  fS )a  
    Loads state dict into a model specifically for Zero3, since DeepSpeed does not support the `transformers`
    tensor parallelism API.

    Nearly identical code to PyTorch's `_load_from_state_dict`

    Args:
        model_to_load: The model to load weights into
        state_dict: The state dict containing the weights
        load_config: Optional LoadStateDictConfig containing weight_mapping and other loading options
    r   Nr   r   r   c                    s<   i | ]\}}   d | dur d | n||qS ).N)r2   )rZ   r   v)meta_model_state_dictprefix_modelr   r   r     s    *z5_load_state_dict_into_zero3_model.<locals>.<dictcomp> Fmodulec                    s$  d u ri n	 |d d i }||d< |||dg g  f}t rwdd l}t| j|d d dd}g }|D ]}	|	|v rL||	 }
d|
_||
 |	 q5t|dkrw|j	j
|dd tj dkrh| j|  W d    n1 srw   Y  | j D ]\}}|d ur|||| d | q|d S )	Nassign_to_params_buffersTr   F)r   rz   r{   r   )r2   rv   r	   r   named_parameters_is_hf_initializedr3   discardrd   r   r   rP   distributedr   _load_from_state_dict_modulesr   )r   r   r   r   local_metadatarT   r	   r   params_to_gatherr   r   namer   )
error_msgsloadr   missing_keysr   r   r     s2    


z/_load_state_dict_into_zero3_model.<locals>.load)r   )r   F)r   copyr   rd   r   _weight_conversionsr   setr   r   r   Module)model_to_loadr   load_configr   r   )r   r   r   r   r   r   r   !_load_state_dict_into_zero3_model  s(   "r   c                    s   ddl m}m} |j}d}d|v r||d}n| r td  }d|d< d}	d	|v r6||}	||	fS t||rH fd
d}
|||
d}	||	fS )zY
    A convenience wrapper that deals with optimizer and lr scheduler configuration.
    r   )
DummyOptimDummySchedulerN	optimizer)r   zDetected ZeRO Offload and non-DeepSpeed optimizers: This combination should work as long as the custom optimizer has both CPU and GPU implementation (except LAMB)Tzero_allow_untested_optimizer	schedulerc                    s"   t  }d |_|j | d}|S )N)rl   r   )r   lr_schedulercreate_scheduler)r   trainer_copyr   rl   trainerr   r   _lr_scheduler_callable*  s   
z5deepspeed_optim_sched.<locals>._lr_scheduler_callable)lr_scheduler_callable)	accelerate.utilsr   r   r7   
is_offloadloggerinfocreate_optimizerr~   )r   hf_deepspeed_configrT   rl   model_parametersr   r   r7   r   r   r   r   r   r   deepspeed_optim_sched  s&   

r   Fc                 C   s   ddl m} | j}| j}| jjjj}|||| |	|
  |r>| s*td|d |d d\}}d}	||fS d| _|jdi d	d
}
|
d
kr`ddl}|j||
| |jd}ttdd | }	t| ||||	\}}||fS )a  
    Init DeepSpeed, after updating the DeepSpeed configuration with any relevant Trainer's args.

    If `resume_from_checkpoint` was passed then an attempt to resume from a previously saved checkpoint will be made.

    Args:
        trainer: Trainer object
        num_training_steps: per single gpu
        resume_from_checkpoint: path to a checkpoint if to resume from after normal DeepSpeedEngine load
        inference: launch in inference mode (no optimizer and no lr scheduler)
        auto_find_batch_size: whether to ignore the `train_micro_batch_size_per_gpu` argument as it's being
            set automatically by the auto batch size finder

    Returns: optimizer, lr_scheduler

    We may use `deepspeed_init` more than once during the life of Trainer, when we do - it's a temp hack based on:
    https://github.com/deepspeedai/DeepSpeed/issues/1394#issuecomment-937405374 until Deepspeed fixes a bug where it
    can't resume from a checkpoint after it did some stepping https://github.com/deepspeedai/DeepSpeed/issues/1612

    r   )r   zMZeRO inference only makes sense with ZeRO Stage 3 - please adjust your configr   r   )NNNr   r   r   )rk   r   r*   r7   c                 S   s   | j S r%   )requires_grad)pr   r   r   r   r  s    z deepspeed_init.<locals>.<lambda>)deepspeed.utilsr   rk   rT   acceleratorstatedeepspeed_pluginhf_ds_configro   setLevelget_process_log_levelrg   r(   del_config_sub_treer   r7   r2   r	   tp_model_initr*   r   filterr   r   )r   rl   r   	ds_loggerrk   rT   r   r   r   r   deepspeed_tp_sizer	   r   r   r   deepspeed_init:  s:   


r  Tc                 C   sv   dd l }t| | d}t|dkr4td|  | j||ddd\}}|d u r2td| d S td| )Nr   z/global_step*zAttempting to resume from T)load_module_strictload_optimizer_statesload_lr_scheduler_statesz-[deepspeed] failed to resume from checkpoint z!Can't find a valid checkpoint at )globr   rd   r   r   load_checkpointr(   )deepspeed_enginecheckpoint_pathr  r  deepspeed_checkpoint_dirs	load_pathr   r   r   r   deepspeed_load_checkpoint}  s   
r  c                 C   s2   | j j}t|jj|_|jj|_|j|| dS )a  
    Sets values in the deepspeed plugin based on the TrainingArguments.

    Args:
        accelerator (`Accelerator`): The Accelerator object.
        args (`TrainingArguments`): The training arguments to propagate to DeepSpeed config.
        auto_find_batch_size (`bool`, *optional*, defaults to `False`):
            Whether batch size was auto-discovered by trying increasingly smaller sizes.
    N)r  r  r$   r  r7   rw   rV   )r  rT   rU   	ds_pluginr   r   r   propagate_args_to_deepspeed  s   

r  c                    s  d|vrd|v r|d |d< |di |}|j }|jdkr-|jdkr-ddlm} | }n| jdur:| jd  }ntd	|j}	t	j
jjj||d
|d dkd }
t	j
jjj|
|d
 t fddt|	D }t }|t|d }|r||fS |S )aq  
    Computes the loss under sequence parallelism with `sp_backend="deepspeed"` and `sp_size > 1`.

    Performs weighted loss aggregation across SP ranks, accounting for varying numbers of valid tokens per rank
    (e.g., when some ranks receive only padding or prompt tokens that are masked with -100).

    Args:
        accelerator (`Accelerator`): The accelerator instance with `torch_device_mesh` support.
        model (`torch.nn.Module`): The model to compute the loss for.
        inputs (`dict[str, torch.Tensor | Any]`): The input data for the model. Must include `"shift_labels"` key.
        return_outputs (`bool`): Whether to return the model outputs along with the loss.
        pc (`accelerate.parallelism_config.ParallelismConfig`): The parallelism configuration.

    Returns:
        The loss, or a tuple of `(loss, outputs)` if `return_outputs` is `True`.
    labelsshift_labelsr	   r   r   )groupsNspzSequence parallelism is enabled but no SP process group is available. Ensure torch_device_mesh is initialized or sp_backend='deepspeed' with sp_size > 1.)groupir   c                 3   s,    | ]} | d kr|  |  V  qdS )r   Nr   )rZ   rankgood_tokens_per_ranklosses_per_rankr   r   	<genexpr>  s    z,deepspeed_sp_compute_loss.<locals>.<genexpr>r   )loss
sp_backendsp_sizer  r  _get_sequence_parallel_grouptorch_device_mesh	get_groupr(   rP   r   r   
functional
all_gatherviewsumrangerf   )r  rk   inputsreturn_outputspcoutputsr$  r  sp_groupsp_world_sizegood_tokens
total_losstotal_good_tokensr   r   r   deepspeed_sp_compute_loss  s,   

r8  r%   rp   )T)'r"   r   importlib.metadatar
   importlib.utilrq   	functoolsr   dependency_versions_checkr   utilsr   r   r   rP   r   
get_loggerr   r   r   accelerate.utils.deepspeedr   DeepSpeedConfigbuiltinsr   r$   rs   r   ru   rv   rw   r   r   r   r   r  r  r  r8  r   r   r   r   <module>   s@   
 5#
kQ
6
C
