https://t.me/RX1948
Server : Apache
System : Linux iad1-shared-b8-43 6.6.49-grsec-jammy+ #10 SMP Thu Sep 12 23:23:08 UTC 2024 x86_64
User : dh_edsupp ( 6597262)
PHP Version : 8.2.26
Disable Function : NONE
Directory :  /lib/python3/dist-packages/duplicity/backends/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //lib/python3/dist-packages/duplicity/backends/_boto_multi.py
# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf8 -*-
#
# Copyright 2002 Ben Escoto <ben@emerose.org>
# Copyright 2007 Kenneth Loafman <kenneth@loafman.com>
# Copyright 2011 Henrique Carvalho Alves <hcarvalhoalves@gmail.com>
#
# This file is part of duplicity.
#
# Duplicity is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# Duplicity is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

from __future__ import division
from future import standard_library
standard_library.install_aliases()
from builtins import range

import os
import psutil
import queue
import socket
import sys
import threading
import time
import traceback

from duplicity import config
from duplicity import log
from duplicity import progress
from duplicity.errors import *  # pylint: disable=unused-wildcard-import
from duplicity.filechunkio import FileChunkIO

from ._boto_single import BotoBackend as BotoSingleBackend
from ._boto_single import get_connection

BOTO_MIN_VERSION = u"2.1.1"

# Multiprocessing is not supported on *BSD
if sys.platform not in (u'darwin', u'linux2'):
    from multiprocessing import dummy as multiprocessing
    log.Debug(u'Multiprocessing is not supported on %s, will use threads instead.' % sys.platform)
else:
    import multiprocessing


class ConsumerThread(threading.Thread):
    u"""
    A background thread that collects all written bytes from all
    the pool workers, and reports it to the progress module.
    Wakes up every second to check for termination
    """
    def __init__(self, queue, total):
        super(ConsumerThread, self).__init__()
        self.daemon = True
        self.finish = False
        self.progress = {}
        self.queue = queue
        self.total = total

    def run(self):
        wait = True
        while not self.finish:
            try:
                args = self.queue.get(wait, 1)
                self.progress[args[0]] = args[1]
                wait = False
            except queue.Empty as e:
                progress.report_transfer(sum(self.progress.values()), self.total)
                wait = True
                pass


class BotoBackend(BotoSingleBackend):
    u"""
    Backend for Amazon's Simple Storage System, (aka Amazon S3), though
    the use of the boto module, (http://code.google.com/p/boto/).

    To make use of this backend you must set aws_access_key_id
    and aws_secret_access_key in your ~/.boto or /etc/boto.cfg
    with your Amazon Web Services key id and secret respectively.
    Alternatively you can export the environment variables
    AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
    """

    def __init__(self, parsed_url):
        BotoSingleBackend.__init__(self, parsed_url)
        try:
            import boto
        except ImportError:
            raise
        self._setup_pool()

    def _setup_pool(self):
        number_of_procs = config.s3_multipart_max_procs
        if not number_of_procs:
            number_of_procs = psutil.cpu_count(logical=False)

        if getattr(self, u'_pool', False):
            log.Debug(u"A process pool already exists. Destroying previous pool.")
            self._pool.terminate()  # pylint:disable=access-member-before-definition
            self._pool.join()  # pylint:disable=access-member-before-definition
            self._pool = None

        log.Debug(u"Setting multipart boto backend process pool to %d processes" % number_of_procs)

        self._pool = multiprocessing.Pool(processes=number_of_procs)

    def _close(self):
        BotoSingleBackend._close(self)
        log.Debug(u"Closing pool")
        self._pool.terminate()
        self._pool.join()

    def upload(self, filename, key, headers=None):
        import boto  # pylint: disable=import-error

        chunk_size = config.s3_multipart_chunk_size

        # Check minimum chunk size for S3
        if chunk_size < config.s3_multipart_minimum_chunk_size:
            log.Warn(u"Minimum chunk size is %d, but %d specified." % (
                config.s3_multipart_minimum_chunk_size, chunk_size))
            chunk_size = config.s3_multipart_minimum_chunk_size

        # Decide in how many chunks to upload
        bytes = os.path.getsize(filename)  # pylint: disable=redefined-builtin
        if bytes < chunk_size:
            chunks = 1
        else:
            chunks = bytes // chunk_size
            if (bytes % chunk_size):
                chunks += 1

        log.Debug(u"Uploading %d bytes in %d chunks" % (bytes, chunks))

        mp = self.bucket.initiate_multipart_upload(key.key, headers, encrypt_key=config.s3_use_sse)

        # Initiate a queue to share progress data between the pool
        # workers and a consumer thread, that will collect and report
        queue = None
        if config.progress:
            manager = multiprocessing.Manager()
            queue = manager.Queue()
            consumer = ConsumerThread(queue, bytes)
            consumer.start()
        tasks = []
        for n in range(chunks):
            storage_uri = boto.storage_uri(self.boto_uri_str)
            params = [self.scheme, self.parsed_url, storage_uri, self.bucket_name,
                      mp.id, filename, n, chunk_size, config.num_retries,
                      queue]
            tasks.append(self._pool.apply_async(multipart_upload_worker, params))

        log.Debug(u"Waiting for the pool to finish processing %s tasks" % len(tasks))
        while tasks:
            try:
                tasks[0].wait(timeout=config.s3_multipart_max_timeout)
                if tasks[0].ready():
                    if tasks[0].successful():
                        del tasks[0]
                    else:
                        log.Debug(u"Part upload not successful, aborting multipart upload.")
                        self._setup_pool()
                        break
                else:
                    raise multiprocessing.TimeoutError
            except multiprocessing.TimeoutError:
                log.Debug(u"%s tasks did not finish by the specified timeout,"
                          u"aborting multipart upload and resetting pool." % len(tasks))
                self._setup_pool()
                break

        log.Debug(u"Done waiting for the pool to finish processing")

        # Terminate the consumer thread, if any
        if config.progress:
            consumer.finish = True
            consumer.join()

        if len(tasks) > 0 or len(mp.get_all_parts()) < chunks:
            mp.cancel_upload()
            raise BackendException(u"Multipart upload failed. Aborted.")

        return mp.complete_upload()


def multipart_upload_worker(scheme, parsed_url, storage_uri, bucket_name, multipart_id,
                            filename, offset, bytes, num_retries, queue):  # pylint: disable=redefined-builtin
    u"""
    Worker method for uploading a file chunk to S3 using multipart upload.
    Note that the file chunk is read into memory, so it's important to keep
    this number reasonably small.
    """

    def _upload_callback(uploaded, total):
        worker_name = multiprocessing.current_process().name
        log.Debug(u"%s: Uploaded %s/%s bytes" % (worker_name, uploaded, total))
        if queue is not None:
            queue.put([offset, uploaded])  # Push data to the consumer thread

    def _upload(num_retries):
        worker_name = multiprocessing.current_process().name
        log.Debug(u"%s: Uploading chunk %d" % (worker_name, offset + 1))
        try:
            conn = get_connection(scheme, parsed_url, storage_uri)
            bucket = conn.lookup(bucket_name)

            for mp in bucket.list_multipart_uploads():
                if mp.id == multipart_id:
                    with FileChunkIO(filename, u'r', offset=offset * bytes, bytes=bytes) as fd:
                        start = time.time()
                        try:
                            mp.upload_part_from_file(fd, offset + 1, cb=_upload_callback,
                                                     num_cb=max(2, 8 * bytes / (1024 * 1024))
                                                     )  # Max num of callbacks = 8 times x megabyte
                        except socket.gaierror as ex:
                            log.Warn(ex.strerror)
                        end = time.time()
                        log.Debug((u"{name}: Uploaded chunk {chunk} "
                                   u"at roughly {speed} bytes/second").format(name=worker_name,
                                                                              chunk=offset + 1,
                                                                              speed=(bytes /
                                                                                     max(1, abs(end - start)))))
                    break
            conn.close()
            conn = None
            bucket = None
            del conn
        except Exception as e:
            traceback.print_exc()
            if num_retries:
                log.Debug(u"%s: Upload of chunk %d failed. Retrying %d more times..." % (
                    worker_name, offset + 1, num_retries - 1))
                return _upload(num_retries - 1)
            log.Debug(u"%s: Upload of chunk %d failed. Aborting..." % (
                worker_name, offset + 1))
            raise e
        log.Debug(u"%s: Upload of chunk %d complete" % (worker_name, offset + 1))

    return _upload(num_retries)

https://t.me/RX1948 - 2025