https://t.me/RX1948
Server : Apache
System : Linux iad1-shared-b8-43 6.6.49-grsec-jammy+ #10 SMP Thu Sep 12 23:23:08 UTC 2024 x86_64
User : dh_edsupp ( 6597262)
PHP Version : 8.2.26
Disable Function : NONE
Directory :  /lib/python3/dist-packages/S3/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //lib/python3/dist-packages/S3/MultiPart.py
# -*- coding: utf-8 -*-

## Amazon S3 Multipart upload support
## Author: Jerome Leclanche <jerome.leclanche@gmail.com>
## License: GPL Version 2

from __future__ import absolute_import

import sys
from logging import debug, info, warning, error
from .Exceptions import ParameterError
from .S3Uri import S3UriS3
from .BaseUtils import getTextFromXml, getTreeFromXml, s3_quote, parseNodes
from .Utils import formatSize, calculateChecksum

SIZE_1MB = 1024 * 1024


class MultiPartUpload(object):
    """Supports MultiPartUpload and MultiPartUpload(Copy) operation"""
    MIN_CHUNK_SIZE_MB = 5        # 5MB
    MAX_CHUNK_SIZE_MB = 5 * 1024     # 5GB
    MAX_FILE_SIZE = 5 * 1024 * 1024  # 5TB

    def __init__(self, s3, src, dst_uri, headers_baseline=None,
                 src_size=None):
        self.s3 = s3
        self.file_stream = None
        self.src_uri = None
        self.src_size = src_size
        self.dst_uri = dst_uri
        self.parts = {}
        self.headers_baseline = headers_baseline or {}

        if isinstance(src, S3UriS3):
            # Source is the uri of an object to s3-to-s3 copy with multipart.
            self.src_uri = src
            if not src_size:
                raise ParameterError("Source size is missing for "
                                     "MultipartUploadCopy operation")
            c_size = self.s3.config.multipart_copy_chunk_size_mb * SIZE_1MB
        else:
            # Source is a file_stream to upload
            self.file_stream = src
            c_size = self.s3.config.multipart_chunk_size_mb * SIZE_1MB

        self.chunk_size = c_size
        self.upload_id = self.initiate_multipart_upload()

    def get_parts_information(self, uri, upload_id):
        part_list = self.s3.list_multipart(uri, upload_id)

        parts = dict()
        for elem in part_list:
            try:
                parts[int(elem['PartNumber'])] = {
                    'checksum': elem['ETag'],
                    'size': elem['Size']
                }
            except KeyError:
                pass

        return parts

    def get_unique_upload_id(self, uri):
        upload_id = ""
        multipart_list = self.s3.get_multipart(uri)
        for mpupload in multipart_list:
            try:
                mp_upload_id = mpupload['UploadId']
                mp_path = mpupload['Key']
                info("mp_path: %s, object: %s" % (mp_path, uri.object()))
                if mp_path == uri.object():
                    if upload_id:
                        raise ValueError(
                            "More than one UploadId for URI %s.  Disable "
                            "multipart upload, or use\n %s multipart %s\n"
                            "to list the Ids, then pass a unique --upload-id "
                            "into the put command." % (uri, sys.argv[0], uri))
                    upload_id = mp_upload_id
            except KeyError:
                pass

        return upload_id

    def initiate_multipart_upload(self):
        """
        Begin a multipart upload
        http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadInitiate.html
        """
        if self.s3.config.upload_id:
            self.upload_id = self.s3.config.upload_id
        elif self.s3.config.put_continue:
            self.upload_id = self.get_unique_upload_id(self.dst_uri)
        else:
            self.upload_id = ""

        if not self.upload_id:
            request = self.s3.create_request("OBJECT_POST", uri=self.dst_uri,
                                             headers=self.headers_baseline,
                                             uri_params={'uploads': None})
            response = self.s3.send_request(request)
            data = response["data"]
            self.upload_id = getTextFromXml(data, "UploadId")

        return self.upload_id

    def upload_all_parts(self, extra_label=''):
        """
        Execute a full multipart upload on a file
        Returns the seq/etag dict
        TODO use num_processes to thread it
        """
        if not self.upload_id:
            raise ParameterError("Attempting to use a multipart upload that "
                                 "has not been initiated.")

        remote_statuses = {}

        if self.src_uri:
            filename = self.src_uri.uri()
            # Continue is not possible with multipart copy
        else:
            filename = self.file_stream.stream_name

        if self.s3.config.put_continue:
            remote_statuses = self.get_parts_information(self.dst_uri,
                                                         self.upload_id)

        if extra_label:
            extra_label = u' ' + extra_label
        labels = {
            'source': filename,
            'destination': self.dst_uri.uri(),
        }

        seq = 1

        if self.src_size:
            size_left = self.src_size
            nr_parts = self.src_size // self.chunk_size \
                + (self.src_size % self.chunk_size and 1)
            debug("MultiPart: Uploading %s in %d parts" % (filename, nr_parts))

            while size_left > 0:
                offset = self.chunk_size * (seq - 1)
                current_chunk_size = min(self.src_size - offset,
                                         self.chunk_size)
                size_left -= current_chunk_size
                labels['extra'] = "[part %d of %d, %s]%s" % (
                    seq, nr_parts, "%d%sB" % formatSize(current_chunk_size,
                                                        human_readable=True),
                    extra_label)
                try:
                    if self.file_stream:
                        self.upload_part(
                            seq, offset, current_chunk_size, labels,
                            remote_status=remote_statuses.get(seq))
                    else:
                        self.copy_part(
                            seq, offset, current_chunk_size, labels,
                            remote_status=remote_statuses.get(seq))
                except:
                    error(u"\nUpload of '%s' part %d failed. Use\n  "
                          "%s abortmp %s %s\nto abort the upload, or\n  "
                          "%s --upload-id %s put ...\nto continue the upload."
                          % (filename, seq, sys.argv[0], self.dst_uri,
                             self.upload_id, sys.argv[0], self.upload_id))
                    raise
                seq += 1

            debug("MultiPart: Upload finished: %d parts", seq - 1)
            return


        # Else -> Case of u"<stdin>" source
        debug("MultiPart: Uploading from %s" % filename)
        while True:
            buffer = self.file_stream.read(self.chunk_size)
            offset = 0 # send from start of the buffer
            current_chunk_size = len(buffer)
            labels['extra'] = "[part %d of -, %s]%s" % (
                seq, "%d%sB" % formatSize(current_chunk_size,
                                          human_readable=True),
                extra_label)
            if not buffer:
                # EOF
                break
            try:
                self.upload_part(seq, offset, current_chunk_size, labels,
                                 buffer,
                                 remote_status=remote_statuses.get(seq))
            except:
                error(u"\nUpload of '%s' part %d failed. Use\n  "
                      "%s abortmp %s %s\nto abort, or\n  "
                      "%s --upload-id %s put ...\nto continue the upload."
                      % (filename, seq, sys.argv[0], self.dst_uri,
                         self.upload_id, sys.argv[0], self.upload_id))
                raise
            seq += 1

        debug("MultiPart: Upload finished: %d parts", seq - 1)

    def upload_part(self, seq, offset, chunk_size, labels, buffer='',
                    remote_status=None):
        """
        Upload a file chunk
        http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
        """
        # TODO implement Content-MD5
        debug("Uploading part %i of %r (%s bytes)" % (seq, self.upload_id,
                                                      chunk_size))

        if remote_status is not None:
            if int(remote_status['size']) == chunk_size:
                checksum = calculateChecksum(buffer, self.file_stream, offset,
                                             chunk_size,
                                             self.s3.config.send_chunk)
                remote_checksum = remote_status['checksum'].strip('"\'')
                if remote_checksum == checksum:
                    warning("MultiPart: size and md5sum match for %s part %d, "
                            "skipping." % (self.dst_uri, seq))
                    self.parts[seq] = remote_status['checksum']
                    return None
                else:
                    warning("MultiPart: checksum (%s vs %s) does not match for"
                            " %s part %d, reuploading."
                            % (remote_checksum, checksum, self.dst_uri, seq))
            else:
                warning("MultiPart: size (%d vs %d) does not match for %s part"
                        " %d, reuploading." % (int(remote_status['size']),
                                               chunk_size, self.dst_uri, seq))

        headers = {"content-length": str(chunk_size)}
        query_string_params = {'partNumber': '%s' % seq,
                               'uploadId': self.upload_id}
        request = self.s3.create_request("OBJECT_PUT", uri=self.dst_uri,
                                         headers=headers,
                                         uri_params=query_string_params)
        response = self.s3.send_file(request, self.file_stream, labels, buffer,
                                     offset=offset, chunk_size=chunk_size)
        self.parts[seq] = response["headers"].get('etag', '').strip('"\'')
        return response

    def copy_part(self, seq, offset, chunk_size, labels, remote_status=None):
        """
        Copy a remote file chunk
        http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
        http://docs.amazonwebservices.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
        """
        debug("Copying part %i of %r (%s bytes)" % (seq, self.upload_id,
                                                    chunk_size))

        # set up headers with copy-params.
        # Examples:
        #    x-amz-copy-source: /source_bucket/sourceObject
        #    x-amz-copy-source-range:bytes=first-last
        #    x-amz-copy-source-if-match: etag
        #    x-amz-copy-source-if-none-match: etag
        #    x-amz-copy-source-if-unmodified-since: time_stamp
        #    x-amz-copy-source-if-modified-since: time_stamp
        headers = {
            "x-amz-copy-source": s3_quote("/%s/%s" % (self.src_uri.bucket(),
                                                      self.src_uri.object()),
                                          quote_backslashes=False,
                                          unicode_output=True)
        }

        # byte range, with end byte included. A 10 byte file has bytes=0-9
        headers["x-amz-copy-source-range"] = \
            "bytes=%d-%d" % (offset, (offset + chunk_size - 1))

        query_string_params = {'partNumber': '%s' % seq,
                               'uploadId': self.upload_id}
        request = self.s3.create_request("OBJECT_PUT", uri=self.dst_uri,
                                         headers=headers,
                                         uri_params=query_string_params)

        labels[u'action'] = u'remote copy'
        response = self.s3.send_request_with_progress(request, labels,
                                                      chunk_size)

        # NOTE: Amazon sends whitespace while upload progresses, which
        # accumulates in response body and seems to confuse XML parser.
        # Strip newlines to find ETag in XML response data
        #data = response["data"].replace("\n", '')
        self.parts[seq] = getTextFromXml(response['data'], "ETag") or ''

        return response

    def complete_multipart_upload(self):
        """
        Finish a multipart upload
        http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadComplete.html
        """
        debug("MultiPart: Completing upload: %s" % self.upload_id)

        parts_xml = []
        part_xml = "<Part><PartNumber>%i</PartNumber><ETag>%s</ETag></Part>"
        for seq, etag in self.parts.items():
            parts_xml.append(part_xml % (seq, etag))
        body = "<CompleteMultipartUpload>%s</CompleteMultipartUpload>" \
               % "".join(parts_xml)

        headers = {"content-length": str(len(body))}
        request = self.s3.create_request(
            "OBJECT_POST", uri=self.dst_uri, headers=headers, body=body,
            uri_params={'uploadId': self.upload_id})
        response = self.s3.send_request(request)

        return response

    def abort_upload(self):
        """
        Abort multipart upload
        http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadAbort.html
        """
        debug("MultiPart: Aborting upload: %s" % self.upload_id)
        #request = self.s3.create_request("OBJECT_DELETE", uri = self.uri,
        #                                  uri_params = {'uploadId': self.upload_id})
        #response = self.s3.send_request(request)
        response = None
        return response


# vim:et:ts=4:sts=4:ai

https://t.me/RX1948 - 2025