
    j                     R   d dl Z d dlZd dlZd dlZd dlZd dlmZm	Z	 d dl
mZmZ d dlmZ d dlmZ d dlZd dlmZ d dlmZ d dlmZ d d	lmZmZmZmZmZmZ d d
lm Z m!Z! e G d de                      Z"e G d dee                      Z#e G d de#                      Z$e G d de#                      Z%e G d de#                      Z&e G d de#e                      Z'e G d de#                      Z(e G d de#                      Z)e G d de#                      Z*dS )    N)ABCabstractmethod)	dataclassfield)	DictProxy)Path)DataClassJsonMixin)SourceConnectionNetworkError)RetryHandler)BaseDestinationConnectorBaseSourceConnectorPartitionConfigProcessorConfig
ReadConfigRetryStrategyConfig)ingest_log_streaming_initloggerc                   Z    e Zd ZdZd Zedefd            Zej        defd            ZdS )PipelineContextz9
    Data that gets shared across each pipeline node
    c                     d | _         d S N_ingest_docs_mapselfs    q/lsinfo/ai/hellotax_ai/base_platform/venv/lib/python3.11/site-packages/unstructured/ingest/pipeline/interfaces.py__post_init__zPipelineContext.__post_init__!   s    7;    returnc                 <    | j         t          d          | j         S )Nz!ingest_docs_map never initialized)r   
ValueErrorr   s    r   ingest_docs_mapzPipelineContext.ingest_docs_map$   s#     (@AAA$$r   valuec                     || _         d S r   r   )r   r#   s     r   r"   zPipelineContext.ingest_docs_map*   s     %r   N)	__name__
__module____qualname____doc__r   propertyr   r"   setter r   r   r   r      s         < < < % % % % X%
 &Y & & & & & &r   r   c                       e Zd ZU dZeed<   ddej        ej        ej	                          dej	        fdZ
defdZedej        ej	                 fd            Zd	 Zdej        e         fd
ZdS )PipelineNodezL
    Class that encapsulates logic to run during a single pipeline step
    pipeline_contextNiterabler   c                 0    |r|ng }|r2t          j        d j        j         dt	          |           d                                                                              s7|r                     |           _        nۉ                                  _        n j	        j
        dk    r0|r fd|D              _        n                                  _        nt          j         j	        j
        t           j	        j        rt          j        nt          j        f          5 }|                     j        |           _        d d d            n# 1 swxY w Y   t'           j        t(          j                  rd  j        D              _         j        S )NzCalling z with z docs   c                 :    g | ]}                     |          S r+   )run).0itr   s     r   
<listcomp>z)PipelineNode.__call__.<locals>.<listcomp>F   s#    ???txx||???r   )	processesinitializerinitargsc                     g | ]}||S r   r+   )r4   rs     r   r6   z)PipelineNode.__call__.<locals>.<listcomp>R   s    CCCQ]1]]]r   )r   info	__class__r%   len
initializesupported_multiprocessingr3   resultr.   num_processesmpPoolr   verboseloggingDEBUGINFOmap
isinstancetIterable)r   r/   pools   `  r   __call__zPipelineNode.__call__7   s   '/88R 	KR4>2RRc(mmRRR   	--// 	; )"hhx00"hhjj"0A55 )????h???"hhjj/=5+/+@+HZ'--gl\   ; "hhtx::; ; ; ; ; ; ; ; ; ; ; ; ; ; ; dk1:.. 	DCCdkCCCDK{s   $!EEEc                     dS )NTr+   r   s    r   r@   z&PipelineNode.supported_multiprocessingU       tr   c                     d S r   r+   r   argskwargss      r   r3   zPipelineNode.runX       r   c                     |                                  x}r.t          j        d|            |                    dd           t	          | j        j        rt          j        nt          j	                   d S )Nz	Creating T)parentsexist_ok)
get_pathr   r<   mkdirr   r.   rE   rF   rG   rH   )r   paths     r   r?   zPipelineNode.initialize\   so    ==??"4 	4K*D**+++JJtdJ333!43H3P"b'--V]Vbcccccr   c                     d S r   r+   r   s    r   rY   zPipelineNode.get_pathb   rP   r   r   )r%   r&   r'   r(   r   __annotations__rK   OptionalrL   AnyrN   boolr@   r   r3   r?   r   rY   r+   r   r   r-   r-   /   s           &%%% AJqu,=!> !%    <4     aj&7    ^d d d!*T*      r   r-   c                   l     e Zd ZU dZeed<    fdZedej	        e
         fd            ZdefdZ xZS )DocFactoryNodez>
    Encapsulated logic to generate a list of ingest docs
    source_doc_connectorc                     t          j        d| j                                                    t	                                                       | j                                         d S )Nz?Running doc factory to generate ingest docs. Source connector: )r   r<   rc   to_jsonsuperr?   r   r=   s    r   r?   zDocFactoryNode.initializen   sn    G!%!:!B!B!D!DG G	
 	
 	
 	!,,.....r   r   c                     d S r   r+   rR   s      r   r3   zDocFactoryNode.runv   rU   r   c                     dS NFr+   r   s    r   r@   z(DocFactoryNode.supported_multiprocessingz       ur   )r%   r&   r'   r(   r   r]   r?   r   rK   rL   dictr3   r`   r@   __classcell__r=   s   @r   rb   rb   f   s           .---/ / / / / aj&6    ^4        r   rb   c                        e Zd ZU dZeed<   dZej        e	         ed<   e
dej        e         fd            Z fdZededej        e         fd	            Z xZS )

SourceNodez
    Encapsulated logic to pull from a data source via base ingest docs
    Output of logic expected to be the json outputs of the data itself
    read_configNretry_strategy_configr   c           	          | j         x}rHt          t          j        t          |j        |j        t          t          j        t          j                  S d S )N)max_time	max_triesr   start_log_levelbackoff_log_level)	rr   r   backoffexpor
   max_retry_timemax_retriesr   level)r   rr   s     r   retry_strategyzSourceNode.retry_strategy   sR    $($>>  		,.=/; &"(,    tr   c                 p    t          j        d           t                                                       d S )Nz@Running source node to download data associated with ingest docsr   r<   rf   r?   rg   s    r   r?   zSourceNode.initialize   s0    VWWWr   ingest_doc_jsonc                     d S r   r+   )r   r   s     r   r3   zSourceNode.run   rU   r   )r%   r&   r'   r(   r   r]   rr   rK   r^   r   r)   r   r}   r?   r   strr3   rm   rn   s   @r   rp   rp   ~   s          
 =A1:&9:AAA
< 8    X     3 1:c?    ^    r   rp   c                        e Zd ZU dZeed<    ee          Zeed<    fdZ	de
fdZede
dej        e
         fd	            Zdefd
Z xZS )PartitionNodez`
    Encapsulates logic to run partition on the json files as the output of the source node
    partition_config)default_factorypartition_kwargsc                     t          j        d| j                                         dt	          j        | j                   d           t                                                       d S )NzCRunning partition node to extract content from json files. Config: z, partition kwargs: ])	r   r<   r   re   jsondumpsr   rf   r?   rg   s    r   r?   zPartitionNode.initialize   s{    F,4466F F!%D,A!B!BF F F	
 	
 	

 	r   r   c                     | j                                         }| j        |d<   t          j        t          j        |d                                                                                    d d         S )Nr   T)	sort_keys    )	r   to_dictr   hashlibsha256r   r   encode	hexdigest)r   	hash_dicts     r   create_hashzPartitionNode.create_hash   sg    )1133	(,(=	$%~djdCCCJJLLMMWWYYZ][]Z]^^r   	json_pathc                     d S r   r+   r   r   s     r   r3   zPartitionNode.run   rU   r   c                 ^    t          | j        j                  dz                                  S )Npartitioned)r   r.   work_dirresolver   s    r   rY   zPartitionNode.get_path   s'    T*344}DMMOOOr   )r%   r&   r'   r(   r   r]   r   rl   r   r?   r   r   r   rK   r^   r3   r   rY   rm   rn   s   @r   r   r      s           &%%%"U4888d888    _S _ _ _ _
 S QZ_    ^P$ P P P P P P P Pr   r   c                   H    e Zd ZdZededej        e         fd            ZdS )ReformatNodezr
    Encapsulated any logic to reformat the output List[Element]
    content from partition before writing it
    elements_jsonr   c                     d S r   r+   )r   r   s     r   r3   zReformatNode.run   rU   r   N)	r%   r&   r'   r(   r   r   rK   r^   r3   r+   r   r   r   r      sR         
  C    ^  r   r   c                   l     e Zd ZU dZeed<   edej        e	         fd            Z
 fdZdefdZ xZS )	WriteNodezV
    Encapsulated logic to write the final result to a downstream data connection
    dest_doc_connector
json_pathsc                     d S r   r+   )r   r   s     r   r3   zWriteNode.run   rU   r   c                     t          j        d| j                                         d           t	                                                       | j                                         d S )Nz=Running write node to upload content. Destination connector: r   )r   r<   r   re   rf   r?   rg   s    r   r?   zWriteNode.initialize   st    K&*&=&E&E&G&GK K K	
 	
 	
 	**,,,,,r   r   c                     dS rj   r+   r   s    r   r@   z#WriteNode.supported_multiprocessing   rk   r   )r%   r&   r'   r(   r   r]   r   rK   Listr   r3   r?   r`   r@   rm   rn   s   @r   r   r      s           1000afSk    ^- - - - -4        r   r   c                   >     e Zd ZdZ fdZedefd            Z xZS )CopyNodezh
    Encapsulated logic to copy the final result of the pipeline to the designated output location.
    c                 p    t          j        d           t                                                       d S )Nz<Running copy node to move content to desired output locationr   rg   s    r   r?   zCopyNode.initialize   s0    RSSSr   r   c                     d S r   r+   r   s     r   r3   zCopyNode.run   rU   r   )	r%   r&   r'   r(   r?   r   r   r3   rm   rn   s   @r   r   r      si              S    ^    r   r   c                   8     e Zd ZdZ fdZed             Z xZS )PermissionsNodezJ
    Encapsulated logic to do operations on permissions related data.
    c                 p    t          j        d           t                                                       d S )Nz:Running permissions node to cleanup the permissions folderr   rg   s    r   r?   zPermissionsNode.initialize   s0    PQQQr   c                     d S r   r+   r   s    r   r3   zPermissionsNode.run   rU   r   )r%   r&   r'   r(   r?   r   r3   rm   rn   s   @r   r   r      s]                ^    r   r   )+r   r   rF   multiprocessingrC   typingrK   abcr   r   dataclassesr   r   multiprocessing.managersr   pathlibr   rx   dataclasses_jsonr	   unstructured.ingest.errorr
   "unstructured.ingest.ingest_backoffr   unstructured.ingest.interfacesr   r   r   r   r   r   unstructured.ingest.loggerr   r   r   r-   rb   rp   r   r   r   r   r   r+   r   r   <module>r      s5              # # # # # # # # ( ( ( ( ( ( ( ( . . . . . .        / / / / / / B B B B B B ; ; ; ; ; ;                I H H H H H H H & & & & &o & & && 3 3 3 3 3%s 3 3 3l     \   .        @ P P P P PL P P P:     <           .     |        l     r   