U
    "Afbd                  	   @   s  U d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZmZmZmZmZmZmZmZmZmZ d dlZd dlm Z  d dl!m"Z"m#Z# d d	l$m%Z% d d
l&m'Z'm(Z(m)Z)m*Z* d dl+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3 d dl4m5Z5 d dl6m7Z7m8Z8m9Z9 d dl:m;Z; d dl<m=Z= ddddgZ>dZ?e@eAd< eG dd dZBeG dd dZCdZDe@dddZEG dd deZFG d d! d!eFZGG d"d# d#eFZHe2eId$d%d&ZJeIee2 eee2  d'd(d)ZKejLeejMej f e2e@e9d*d+d,ZNeejOejOe1eIePeIdd-d.d/ZQG d0d deZRG d1d deRZSG d2d3 d3e8ZTG d4d de7ZUG d5d deTe5ZVdS )6    N)ABCabstractmethod)contextmanager)	dataclass)Path)AnyCallablecastDict	GeneratorIOIterableIteratorListOptionalTupleUnion)Tensor)_get_available_device_type_get_device_module)narrow_tensor_by_index)MetadataMetadataIndexSTATE_DICT_TYPEStorageMeta)LoadItemTypeLoadPlanLoadPlannerReadItemSavePlanSavePlanner	WriteItemWriteItemType)BlockingAsyncStager)StorageReaderStorageWriterWriteResult)_create_file_view)FutureFileSystemWriterFileSystemReader
FileSystemFileSystemBase	.metadata_metadata_fnc                   @   s*   e Zd ZU dZeed< eed< eed< dS )_StorageInfoz#This is the per entry storage info.relative_pathoffsetlengthN)__name__
__module____qualname____doc__str__annotations__int r:   r:   K/tmp/pip-unpacked-wheel-ak3y1t3j/torch/distributed/checkpoint/filesystem.pyr/   B   s   
r/   c                   @   s   e Zd ZU eed< dS )_StoragePrefixprefixN)r3   r4   r5   r7   r8   r:   r:   r:   r;   r<   K   s   
r<   z.distcpreturnc                   C   s   t t S N)r7   uuidZuuid4r:   r:   r:   r;   _generate_uuidS   s    rB   c                   @   sT   e Zd ZeeeddddZeddddZeee	e
jef  ddd	ZdS )
_TensorLoaderNsizeobjr?   c                 C   s   d S r@   r:   selfrE   rF   r:   r:   r;   addX   s    z_TensorLoader.addr>   c                 C   s   d S r@   r:   rH   r:   r:   r;   start_loading\   s    z_TensorLoader.start_loadingc                 C   s   d S r@   r:   rJ   r:   r:   r;   values`   s    z_TensorLoader.values)r3   r4   r5   r   r9   objectrI   rK   r   r   torchr   rL   r:   r:   r:   r;   rC   W   s   rC   c                   @   sX   e Zd ZeddddZeeddddZddd	d
Ze	e
ejef  dddZdS )_SerialCpuLoaderN)resolve_funr?   c                 C   s   || _ g | _d S r@   )rP   items)rH   rP   r:   r:   r;   __init__f   s    z_SerialCpuLoader.__init__rD   c                 C   s   | j ||f d S r@   )rQ   appendrG   r:   r:   r;   rI   j   s    z_SerialCpuLoader.addr>   c                 C   s   d S r@   r:   rJ   r:   r:   r;   rK   m   s    z_SerialCpuLoader.start_loadingc                 c   sP   | j D ]D\}}| | }| }|  | kr@| }||fV  qd S r@   )rQ   rP   detachcpuZstoragerE   numelclonerH   _rF   tensorr:   r:   r;   rL   p   s    z_SerialCpuLoader.values)r3   r4   r5   r   rR   r9   rM   rI   rK   r   r   rN   r   rL   r:   r:   r:   r;   rO   e   s   rO   c                   @   s   e Zd Zdeeej eddddZe	e
dddZeeejef  dd	d
ZddddZeeejef  dddZeeddddZddddZeeejef  dddZdS )_OverlappingCpuLoaderN@B )rP   streaminflight_threshholdr?   c                 C   s   || _ g | _|| _d| _t | _d| _d| _|r8|j	nt
 | _	t| j	| _ttjj|p`| j | _| j| j kr| j| j  d S )Nr   F)rP   rQ   r^   in_flight_datacollectionsdequecurrent_itemsidxstarteddevice_typer   r   device_moduler	   rN   cudaStreamZcurrent_streamr]   Zwait_stream)rH   rP   r]   r^   r:   r:   r;   rR   }   s     
 z_OverlappingCpuLoader.__init__r>   c                 C   s   | j t| jkS r@   )rc   lenrQ   rJ   r:   r:   r;   _done   s    z_OverlappingCpuLoader._donec                 C   sb   g }| j | jkr| j  | j | jkr^| j }|  j |d  |d   8  _ || q|S Nr   )	r_   r^   r]   synchronizerb   popleftrV   element_sizerS   )rH   drainedvalr:   r:   r;   _drain   s    

"z_OverlappingCpuLoader._drainc              	   C   s   | j | j | js| j| jk r| j| j \}}|  jd7  _| | }|j	j
| jkrl|jddd}n2|j	t	dkr|  | |j kr| }| j||f |  j| |  7  _qW 5 Q R X d S )N   rU   T)deviceZnon_blocking)rf   r]   rj   r_   r^   rQ   rc   rP   rT   rs   typere   torN   Zuntyped_storagerE   rV   itemsizerW   rb   rS   rn   rX   r:   r:   r;   _refill   s&    
z_OverlappingCpuLoader._refillc                 C   s(   | j s
tt| jdkr"| j  | jS rk   )rj   AssertionErrorri   rb   r]   rl   rJ   r:   r:   r;   _finish   s    

z_OverlappingCpuLoader._finishrD   c                 C   s"   | j rtd| j||f d S )Nz&cannot add items after loading started)rd   RuntimeErrorrQ   rS   rG   r:   r:   r;   rI      s    z_OverlappingCpuLoader.addc                 C   s0   | j r
d S d| _ | jjtdd |   d S )NTr   key)rd   rQ   sortoperator
itemgetterrw   rJ   r:   r:   r;   rK      s
    z#_OverlappingCpuLoader.start_loadingc                 c   s<   |    | js*|  }|   |E d H  q|  E d H  d S r@   )rK   rj   rq   rw   ry   )rH   ro   r:   r:   r;   rL      s    z_OverlappingCpuLoader.values)Nr\   )r3   r4   r5   r   r   rN   rh   r9   rR   propertyboolrj   r   r   r   rM   rq   rw   r   ry   rI   rK   r   rL   r:   r:   r:   r;   r[   |   s      
r[   )itemr?   c                 C   sB   d}| j d k	st| j jD ]}||9 }q| j jj}|tj| S Nrr   )Ztensor_datarx   rE   Z
propertiesdtyperN   _utilsZ_element_size)r   rE   sr   r:   r:   r;   
_item_size   s    

r   )binsrQ   r?   c           	      C   s   | dkr|gS dd |D }dd |D }dd t | D }dd t | D }|jtdd t|D ]\}}|||   | qd|D ]@}tt|tdd	d
 }|| | ||  t|7  < q|S )Nrr   c                 S   s   g | ]}|j tjkr|qS r:   rt   r"   BYTE_IO.0wir:   r:   r;   
<listcomp>   s      z+_split_by_size_and_type.<locals>.<listcomp>c                 S   s   g | ]}|j tjkr|qS r:   r   r   r:   r:   r;   r      s      c                 S   s   g | ]}g qS r:   r:   r   rY   r:   r:   r;   r      s     c                 S   s   g | ]}d qS )r   r:   r   r:   r:   r;   r      s     T)r|   reverser{   r   )ranger}   r   	enumeraterS   minr~   r   )	r   rQ   bytes_wtensor_wZbucketsZbucket_sizesir   rc   r:   r:   r;   _split_by_size_and_type   s    r   )r]   data
write_itemstorage_keyr?   c                 C   s   |   }|jtjkr4t|tjs$t| |	  n:t|t
jsDt|jt
dksXtt
|ttt |  |   | }t|j|t|||dS )NrU   )indexZsize_in_bytesstorage_data)tellrt   r"   r   
isinstanceioBytesIOrx   write	getbufferrN   r   rs   saver	   r   bytesr&   r   r/   )r]   r   r   r   r1   r2   r:   r:   r;   _write_item   s    
r   )create_stream
file_queueresult_queueplannerr^   	use_fsyncthread_countr?   c              	   C   s|  z\|  \}}}	tj }
tt|
d }|dkr^tj sF|r^| r^|dkr^t|j|d}n
t	|j}dd |	D }|D ]}|
t|| qz|  dd |	D }g }| |d}|D ]"}||}|t|||| q| D ]&\}}|jst|t|||| q|rFzt|  W n tk
rD   t  Y nX W 5 Q R X || qW n tjk
rv   Y nX d S )Nrr   r   )r^   c                 S   s   g | ]}|j tjkr|qS r:   r   r   r:   r:   r;   r   6  s      z+_write_files_from_queue.<locals>.<listcomp>c                 S   s   g | ]}|j tjkr|qS r:   r   r   r:   r:   r;   r   ;  s      wb)
get_nowaitrN   Z_CZ_get_privateuse1_backend_namegetattrrg   Zis_availabler[   Zresolve_datarO   rI   r   rK   rS   r   rL   Zis_cpurx   osfsyncfilenoAttributeErrorsyncputqueueEmpty)r   r   r   r   r^   r   r   	file_namer   Zwrite_itemsZcustom_backend_nameZcustom_device_modloaderr   r   r   Zwrite_resultsr]   r   rZ   r:   r:   r;   _write_files_from_queue  s\    	


r   c                   @   s4  e Zd Zeeeeejf ee	e
jddf dddZeeeejf eeeejf dddZeeeejf eeejf ddd	d
Zeeeejf eeejf dddZeeeejf ddddZeeeeejf edddZeeeejf edddZeeeejf ddddZdS )r,   Npathmoder?   c                 C   s   d S r@   r:   )rH   r   r   r:   r:   r;   r   V  s    zFileSystemBase.create_streamr   suffixr?   c                 C   s   d S r@   r:   rH   r   r   r:   r:   r;   concat_path]  s    zFileSystemBase.concat_pathr   new_pathr?   c                 C   s   d S r@   r:   rH   r   r   r:   r:   r;   renamec  s    zFileSystemBase.renamer   r?   c                 C   s   d S r@   r:   rH   r   r:   r:   r;   	init_pathi  s    zFileSystemBase.init_pathc                 C   s   d S r@   r:   r   r:   r:   r;   mkdirm  s    zFileSystemBase.mkdircheckpoint_idr?   c                 C   s   d S r@   r:   clsr   r:   r:   r;   validate_checkpoint_idq  s    z%FileSystemBase.validate_checkpoint_idc                 C   s   d S r@   r:   r   r:   r:   r;   existsv  s    zFileSystemBase.existsc                 C   s   d S r@   r:   r   r:   r:   r;   rm_filez  s    zFileSystemBase.rm_file)r3   r4   r5   r   r   r   r7   r   PathLiker   r   IOBaser   r   r   r   r   classmethodr   r   r   r   r:   r:   r:   r;   r,   U  s6      &c                   @   s  e Zd Zeeeejf eee	j
ddf dddZeeejf eeeejf dddZeeejf eeejf dd	d
Zeeejf eeejf ddddZeeejf ddddZeeeejf edddZeeejf edddZeeejf ddddZdS )r+   Nr   c              	   c   s.   t t||}t tj|V  W 5 Q R X d S r@   )r	   r   openr   r   )rH   r   r   r]   r:   r:   r;   r     s    zFileSystem.create_streamr   c                 C   s   t t|| S r@   )r	   r   r   r:   r:   r;   r     s    zFileSystem.concat_pathr   c                 C   s   t |tst|}|S r@   )r   r   r   r:   r:   r;   r     s    
zFileSystem.init_pathr   c                 C   s   t t|t t| d S r@   )r	   r   r   r   r:   r:   r;   r     s    zFileSystem.renamec                 C   s   t t|jddd d S )NT)parentsexist_ok)r	   r   r   r   r:   r:   r;   r     s    zFileSystem.mkdirr   c                 C   sR   t |trdS dt|krdS t|jD ]$}| r(tt|tjr( dS q(dS )NTz://F)r   r   r7   r   r   r   accessW_OK)r   r   pr:   r:   r;   r     s    
z!FileSystem.validate_checkpoint_idc                 C   s   t t| S r@   )r	   r   r   r   r:   r:   r;   r     s    zFileSystem.existsc                 C   s   t t|  d S r@   )r	   r   unlinkr   r:   r:   r;   r     s    zFileSystem.rm_file)r3   r4   r5   r   r   r7   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r:   r:   r:   r;   r+     s&     $ c                       s4  e Zd ZdZd$eeejf eee	e	ee
e
dd	 fddZd%eeejdf dd	d
dZeddddZeedddZee ee dddZeeeee  dddZeeee  ddddZee dddZeeeejf dddZeeeejf dd d!Zeeeejf ed	d"d#Z  Z S )&_FileSystemWritera  
    Basic implementation of StorageWriter using file IO.

    This implementation makes the following assumptions and simplifications:

    * The checkpoint path is an empty or non-existing directory.
    * File creation is atomic

    The checkpoint consist of one file per write request plus
    a `.metadata` file with the serialized metadata.

    Trr   逖 N)	r   single_file_per_rank
sync_filesr   per_thread_copy_ahead	overwriteargskwargsr?   c           	         sJ   t    t | _| j|| _|| _|| _|| _|| _	t
 | _|| _dS )a  
        Initialize the writer pointing to `path`.

        Args:
            path: directory where the checkpoint will be written to.
            single_file_per_rank: Produce one file per rank instead of one file per tensor/blob. Default to True.
            sync_files : force files to be synced to permanent storage. Default to True.
            thread_count: Number of IO threads to use to write. Default to 1.
            per_thread_copy_ahead: How many bytes to copy from the GPU ahead of saving then. Default 10Mb.
            overwrite: Whether to allow overwriting existing checkpoints. Defaults to True.

        N. B. If sync_files is disabled, there's no guarantee that the checkpoint will be consistent in the case of a failure.
        N)superrR   r+   fsr   r   r   r   r   r   rB   save_idr   )	rH   r   r   r   r   r   r   r   r   	__class__r:   r;   rR     s    
z_FileSystemWriter.__init__r   c                 C   s   |r| j || _t | _d S r@   )r   r   r   rB   r   rH   r   r:   r:   r;   reset  s    z_FileSystemWriter.reset)is_coordinatorr?   c                 C   s   d S r@   r:   )rH   r   r:   r:   r;   set_up_storage_writer  s    z'_FileSystemWriter.set_up_storage_writerplanr?   c                 C   sV   | j | j | j | jrR| jr@td| j d| jd ntd| jd|S )Nz#Detected an existing checkpoint in z#, overwriting since self.overwrite=z. Past version 2.5 of PyTorch, `overwrite` will default to False. Set this variable to True to maintain this functionality or False to raise when an existing checkpoint is found.z-Checkpoint already exists and self.overwrite=.)	r   r   r   r   metadata_pathr   warningswarnrz   rH   r   r:   r:   r;   prepare_local_plan  s    z$_FileSystemWriter.prepare_local_planplansr?   c                 C   s   dd t |D }|S )Nc                 S   s*   g | ]"\}}t j|td | ddqS )__rY   )r   )dataclassesreplacer<   )r   r   r   r:   r:   r;   r     s   z9_FileSystemWriter.prepare_global_plan.<locals>.<listcomp>)r   )rH   r   Z	new_plansr:   r:   r;   prepare_global_plan  s    z%_FileSystemWriter.prepare_global_planr   r   r?   c              
      sj  |j d  fdd}t }| jrbt| j|jD ]*}| }| j| j	|}|
|||f q4n4|jD ],}| }| j| j	|}|
|||gf qht }	g }
td| jD ]<}tjt| jj||	|| j| j| jfd}|  |
| qt| jj||	|| j| j| jd |
D ]}|  qg }z||	 7 }q&W n, tjk
rd   t }|| | Y S X d S )Nr   c                     s   j    t }  d7  | S r   )r=   DEFAULT_SUFFIX)r   Z
file_countZstorage_planr:   r;   gen_file  s    z._FileSystemWriter.write_data.<locals>.gen_filerr   )targetr   )r   r   r   r   r^   r   r   )r   r   Queuer   r   r   rQ   r   r   r   r   r   	threadingThreadr   r   r   r   startrS   joinr   r   r(   
set_result)rH   r   r   r   r   Zbucketr   r   r   r   threadsrY   tresfutr:   r   r;   
write_data  s^    


z_FileSystemWriter.write_data)metadataresultsr?   c              	   C   s   t  }|D ]}|dd |D  q
||_|  |_tt| j| jt	 d}| j
|dH}t|| | jrzt|  W n tk
r   t  Y nX W 5 Q R X | j| jr| j| j | j|| j d S )Nc                 S   s   i | ]}|j |jqS r:   )r   r   )r   wrr:   r:   r;   
<dictcomp>C  s      z,_FileSystemWriter.finish.<locals>.<dictcomp>z.tmpr   )dictupdater   storage_metar	   r   r   r   r   r.   r   pickledumpr   r   r   r   r   r   r   r   r   r   )rH   r
  r  Z
storage_mdZwr_listZtmp_pathmetadata_filer:   r:   r;   finish@  s     
z_FileSystemWriter.finishr>   c                 C   s   t | j| jdS )N)r   r   )r   r   r   rJ   r:   r:   r;   r  W  s    z_FileSystemWriter.storage_metac                 C   s   t t| j| jtS r@   )r	   r   r   r   r   r.   rJ   r:   r:   r;   r   Z  s    z_FileSystemWriter.metadata_pathc                 C   s   | j S zT
        return the checkpoint_id that will be used to save the checkpoint.
        r   rJ   r:   r:   r;   r   ^  s    z_FileSystemWriter.checkpoint_idc                 C   s
   t |S r@   r+   r   r   r:   r:   r;   r   e  s    z(_FileSystemWriter.validate_checkpoint_id)TTrr   r   T)N)!r3   r4   r5   r6   r   r7   r   r   r   r9   r   rR   r   r   r   r   r   r   r    r(   r&   r	  r   r  r   r   r  r   r   r   r   r   __classcell__r:   r:   r   r;   r     sB        "	
Er   c                       s   e Zd Zeeejf dd fddZee	j
dddZdeeejdf ddd	d
Zeeed dddZedddZeeddddZeedddZee ee dddZeeeejf dddZeeeejf edddZ  ZS )r*   Nr   c                    s4   t    t | _| j|| _t | _t | _	d S r@   )
r   rR   r+   r   r   r   r  r   rB   load_idr   r   r:   r;   rR   k  s
    
zFileSystemReader.__init__)sinfor?   c                 C   s   t ||j|jS r@   )r'   r1   r2   )rH   filer  r:   r:   r;   _slice_filer  s    zFileSystemReader._slice_filer   c                 C   s&   t  | _|r| j|| _t | _d S r@   )r  r   r   r   r   rB   r  r   r:   r:   r;   r   u  s    zFileSystemReader.resetr   c                 C   sh  t  }|jD ](}| j|j }|j}||g | q| D ]\}}| j| j	|}	| j
|	d}
|D ]}| j|j }| |
|}|jtjkrt||j}|d ||| qltttjttt |dd}t||j|j}|| }|  |  ks0t!d|j d|   d|   |"| |#|| qlW 5 Q R X q>t$ }|%d  |S )Nrbr   rU   )Zmap_locationzreq z mismatch sizes z vs )&r  rQ   r   Zstorage_indexr0   
setdefaultrS   r   r   r   r   r  rt   r   r   r   r   readr2   seekZ
load_bytesr	   r   rN   loadr   r   r   Zstorage_offsetslengthsZresolve_tensorrT   rE   rx   Zcopy_Zcommit_tensorr(   r  )rH   r   r   Zper_fileZ	read_itemZitem_mdr   r0   reqsr   r]   reqZ
file_slice
read_bytesrZ   Ztarget_tensorr  r:   r:   r;   	read_data{  sD    

  

zFileSystemReader.read_datar>   c              	   C   sZ   | j | jd}| j |d}t|}W 5 Q R X t|dd d krLt |_| j	|j_	|S )Nr-   r  r  )
r   r   r   r   r  r!  r   r   r  r  )rH   r   r  r
  r:   r:   r;   read_metadata  s    
zFileSystemReader.read_metadata)r
  r   r?   c                 C   s   |j | _ | j d k	std S r@   )r   rx   )rH   r
  r   r:   r:   r;   set_up_storage_reader  s    z&FileSystemReader.set_up_storage_readerr   c                 C   s   |S r@   r:   r   r:   r:   r;   r     s    z#FileSystemReader.prepare_local_planr   c                 C   s   |S r@   r:   )rH   r   r:   r:   r;   r     s    z$FileSystemReader.prepare_global_planc                 C   s   | j S r  r  rJ   r:   r:   r;   r     s    zFileSystemReader.checkpoint_idc                 C   s
   t |S r@   r  r   r:   r:   r;   r     s    z'FileSystemReader.validate_checkpoint_id)N)r3   r4   r5   r   r7   r   r   rR   r/   r   r   r  r   r   r   r(   r&  r   r'  r   r(  r   r   r   r   r   r   r   r  r:   r:   r   r;   r*   j  s   (c                
       sT   e Zd ZdZdeeejf eee	e	eedd fdd	Z
eed
 fddZ  ZS )r)   r   Trr   r   FN)r   r   r   r   r   cache_staged_state_dictr   r?   c              	      s   t  j|||||||d dS )aM  
        Initialize the writer pointing to `path`.

        Args:
            path: directory where the checkpoint will be written to.
            single_file_per_rank: Produce one file per rank instead of one file per tensor/blob. Default to True.
            sync_files : force files to be synced to permanent storage. Default to True.
            thread_count: Number of IO threads to use to write. Default to 1.
            per_thread_copy_ahead: How many bytes to copy from the GPU ahead of saving then. Default 10Mb.
            cache_staged_state_dict: Whether to cache the staged state_dict. This option decreases staging latency
                at the cost of increases memory usage. Additionally, if this parameter is set to True, it's the expectation
                that the stager is maintained and re-used for multiple dcp.async_save calls. Default to False.
            overwrite: Whether to allow overwriting existing checkpoints. Defaults to True.

        N. B. If sync_files is disabled, there's no guarantee that the checkpoint will be consistent in the case of a failure.
        )r   r   r   r   r   r)  r   N)r   rR   )rH   r   r   r   r   r   r)  r   r   r:   r;   rR     s    zFileSystemWriter.__init__)
state_dictr?   c                    s   d| _ t |S )zOverride of AsyncStager.stager   )r   r   stage)rH   r*  r   r:   r;   r+    s    zFileSystemWriter.stage)TTrr   r   FT)r3   r4   r5   r6   r   r7   r   r   r   r9   rR   r   r+  r  r:   r:   r   r;   r)     s$         $)Wr`   r   r   r~   r   r  r   r   rA   r   abcr   r   
contextlibr   r   pathlibr   typingr   r   r	   r
   r   r   r   r   r   r   r   r   rN   r   Ztorch._utilsr   r   Ztorch.distributed._shard._utilsr   Z%torch.distributed.checkpoint.metadatar   r   r   r   Z$torch.distributed.checkpoint.plannerr   r   r   r   r   r    r!   r"   Z$torch.distributed.checkpoint.stagingr#   Z$torch.distributed.checkpoint.storager$   r%   r&   Z"torch.distributed.checkpoint.utilsr'   Ztorch.futuresr(   __all__r.   r7   r8   r/   r<   r   rB   rC   rO   r[   r9   r   r   r   r   r   r   r   r   r,   r+   r   r*   r)   r:   r:   r:   r;   <module>   sv    8(
ZD*/ =Z