https://t.me/RX1948
Server : Apache
System : Linux iad1-shared-b8-43 6.6.49-grsec-jammy+ #10 SMP Thu Sep 12 23:23:08 UTC 2024 x86_64
User : dh_edsupp ( 6597262)
PHP Version : 8.2.26
Disable Function : NONE
Directory :  /opt/prometheus-monitoring-scripts/lib/python3.10/site-packages/storable/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/prometheus-monitoring-scripts/lib/python3.10/site-packages/storable/core.py
#
# License
#
# python storable is distributed under the zlib/libpng license, which is OSS
# (Open Source Software) compliant.
#
# Copyright (C) 2009 Tim Aerts
#
# This software is provided 'as-is', without any express or implied
# warranty.  In no event will the authors be held liable for any damages
# arising from the use of this software.
#
# Permission is granted to anyone to use this software for any purpose,
# including commercial applications, and to alter it and redistribute it
# freely, subject to the following restrictions:
#
# 1. The origin of this software must not be misrepresented; you must not
#    claim that you wrote the original software. If you use this software
#    in a product, an acknowledgment in the product documentation would be
#    appreciated but is not required.
# 2. Altered source versions must be plainly marked as such, and must not be
#    misrepresented as being the original software.
# 3. This notice may not be removed or altered from any source distribution.
#
# Tim Aerts <aardbeiplantje@gmail.com>
#

from functools import wraps
from io import BytesIO
from struct import calcsize, unpack
import logging
import sys


if sys.version_info > (3, 0):
    xrange = range


def id_():
    n = 0
    while True:
        n += 1
        yield n


ID_GENERATOR = id_()
LOG = logging.getLogger(__name__)
DEBUG = False


def _guess_type(data):
    """
    In Perl, the "scalar" type maps to different Python types. Strictly
    speaking, the only *correct* output would be bytes objects. But this causes
    a discrepancy when using "frozen" storables and non-frozen storables (unless
    the generated test-data is wrong). For now, we will use the conversion
    functions below to "guess" the type.
    """
    try:
        converted_result = float(data)
        if converted_result.is_integer():
            # use "data" again to avoid rounding errors
            converted_result = int(data)
    except ValueError:
        converted_result = None

    if converted_result is None:
        try:
            converted_result = data.decode('ascii')
        except UnicodeDecodeError:
            converted_result = None

    return data if converted_result is None else converted_result


def maybelogged(f):
    """
    If the DEBUG flag is set in this module (must be set before importing),
    deserialisation functions will be logged.
    """

    if not DEBUG:
        return f

    @wraps(f)
    def fun(*args, **kwargs):
        id_ = next(ID_GENERATOR)
        LOG.debug('[%s] Entering %s with args=%r, kwargs=%r',
                  id_, f.__name__, args, kwargs)
        output = f(*args, **kwargs)
        LOG.debug('[%s] Result: %r', id_, output)
        return output
    return fun


@maybelogged
def _read_size(fh, cache):
    fmt = cache['size_unpack_fmt']
    return unpack(fmt, fh.read(calcsize(fmt)))[0]


@maybelogged
def SX_OBJECT(fh, cache):
    # From Storable.xs store function:
    # * The tag is always written in network order.
    i = SX_NETINT(fh, cache)
    cache['has_sx_object'] = True
    return (0, i)


@maybelogged
def SX_LSCALAR(fh, cache):
    raw_result = fh.read(_read_size(fh, cache))
    return _guess_type(raw_result)


@maybelogged
def SX_LUTF8STR(fh, cache):
    return fh.read(_read_size(fh, cache)).decode('utf-8')


@maybelogged
def SX_ARRAY(fh, cache):
    return [process_item(fh, cache) for _ in xrange(_read_size(fh, cache))]


@maybelogged
def SX_HASH(fh, cache):
    data = {}
    sz = _read_size(fh, cache)
    for _ in xrange(sz):
        value = process_item(fh, cache)
        key = _guess_type(fh.read(_read_size(fh, cache)))
        data[key] = value
    return data


@maybelogged
def SX_REF(fh, cache):
    return process_item(fh, cache)


@maybelogged
def SX_UNDEF(fh, cache):
    return None


@maybelogged
def SX_INTEGER(fh, cache):
    fmt = cache['int_unpack_fmt']
    return unpack(fmt, fh.read(calcsize(fmt)))[0]


@maybelogged
def SX_DOUBLE(fh, cache):
    fmt = cache['double_unpack_fmt']
    return unpack(fmt, fh.read(calcsize(fmt)))[0]


@maybelogged
def SX_BYTE(fh, cache):
    return _read_unsigned_byte(fh) - 128


@maybelogged
def SX_NETINT(fh, cache):
    fmt = '!I'
    return unpack(fmt, fh.read(calcsize(fmt)))[0]


@maybelogged
def SX_SCALAR(fh, cache):
    size = _read_unsigned_byte(fh)
    raw_result = fh.read(size)
    return _guess_type(raw_result)


@maybelogged
def SX_UTF8STR(fh, cache):
    return fh.read(_read_unsigned_byte(fh)).decode('utf-8')


@maybelogged
def SX_TIED_ARRAY(fh, cache):
    return process_item(fh, cache)


@maybelogged
def SX_TIED_HASH(fh, cache):
    return SX_TIED_ARRAY(fh, cache)


@maybelogged
def SX_TIED_SCALAR(fh, cache):
    return SX_TIED_ARRAY(fh, cache)


@maybelogged
def SX_SV_UNDEF(fh, cache):
    return None


@maybelogged
def SX_SV_YES(fh, cache):
    return True


@maybelogged
def SX_SV_NO(fh, cache):
    return False


@maybelogged
def SX_BLESS(fh, cache):
    size = _read_unsigned_byte(fh)
    package_name = fh.read(size)
    cache['classes'].append(package_name)
    return process_item(fh, cache)


@maybelogged
def SX_IX_BLESS(fh, cache):
    indx = _read_unsigned_byte(fh)
    package_name = cache['classes'][indx]
    return process_item(fh, cache)


@maybelogged
def SX_OVERLOAD(fh, cache):
    return process_item(fh, cache)


@maybelogged
def SX_TIED_KEY(fh, cache):
    data = process_item(fh, cache)
    key = process_item(fh, cache)
    return data


@maybelogged
def SX_TIED_IDX(fh, cache):
    data = process_item(fh, cache)
    # idx's are always big-endian dumped by storable's freeze/nfreeze I think
    indx_in_array = SX_NETINT(fh, cache)
    return data


@maybelogged
def SX_HOOK(fh, cache):
    flags = _read_unsigned_byte(fh)

    while flags & 0x40:   # SHF_NEED_RECURSE
        dummy = process_item(fh, cache)
        flags = _read_unsigned_byte(fh)

    if flags & 0x20:   # SHF_IDX_CLASSNAME
        if flags & 0x04:   # SHF_LARGE_CLASSLEN
            # TODO: test
            fmt = '>I'
            indx = unpack(fmt, fh.read(calcsize(fmt)))[0]
        else:
            indx = _read_unsigned_byte(fh)
        package_name = cache['classes'][indx]
    else:
        if flags & 0x04:   # SHF_LARGE_CLASSLEN
            # TODO: test
            # FIXME: is this actually possible?
            class_size = _read_size(fh, cache)
        else:
            class_size = _read_unsigned_byte(fh)

        package_name = fh.read(class_size)
        cache['classes'].append(package_name)

    arguments = {}

    if flags & 0x08:   # SHF_LARGE_STRLEN
        str_size = _read_size(fh, cache)
    else:
        str_size = _read_unsigned_byte(fh)

    if str_size:
        frozen_str = _guess_type(fh.read(str_size))
        arguments[0] = frozen_str

    if flags & 0x80:   # SHF_HAS_LIST
        if flags & 0x10:   # SHF_LARGE_LISTLEN
            list_size = _read_size(fh, cache)
        else:
            list_size = _read_unsigned_byte(fh)

        for i in xrange(list_size):
            fmt = '>I'
            indx_in_array = unpack(fmt, fh.read(calcsize(fmt)))[0]
            arguments[i + 1] = cache['objects'].get(indx_in_array)

    # FIXME: implement the real callback STORABLE_thaw() still, for now, just
    # return the dictionary 'arguments' as data
    type = flags & 0x03  # SHF_TYPE_MASK 0x03
    data = arguments
    if type == 3:  # SHT_EXTRA
        # TODO
        pass
    if type == 0:  # SHT_SCALAR
        # TODO
        pass
    if type == 1:  # SHT_ARRAY
        # TODO
        pass
    if type == 2:  # SHT_HASH
        # TODO
        pass

    return data


@maybelogged
def SX_FLAG_HASH(fh, cache):
    # TODO: NOT YET IMPLEMENTED!!!!!!
    flags = _read_unsigned_byte(fh)
    size = _read_size(fh, cache)
    data = {}
    for i in xrange(size):
        value = process_item(fh, cache)
        flags = _read_unsigned_byte(fh)
        keysize = _read_size(fh, cache)
        key = None
        if keysize:
            key = fh.read(keysize)
        data[key] = value

    return data


def SX_VSTRING(fh, cache):
    value = SX_SCALAR(fh, cache)
    return tuple(x for x in value[1:].split('.'))


def SX_LVSTRING(fh, cache):
    value = SX_LSCALAR(fh, cache)
    return tuple(x for x in value[1:].split('.'))


# *AFTER* all the subroutines
engine = {
    b'\x00': SX_OBJECT,      # ( 0): Already stored object
    b'\x01': SX_LSCALAR,     # ( 1): Scalar (large binary) follows (length, data)
    b'\x02': SX_ARRAY,       # ( 2): Array forthcoming (size, item list)
    b'\x03': SX_HASH,        # ( 3): Hash forthcoming (size, key/value pair list)
    b'\x04': SX_REF,         # ( 4): Reference to object forthcoming
    b'\x05': SX_UNDEF,       # ( 5): Undefined scalar
    b'\x06': SX_INTEGER,     # ( 6): Integer forthcoming
    b'\x07': SX_DOUBLE,      # ( 7): Double forthcoming
    b'\x08': SX_BYTE,        # ( 8): (signed) byte forthcoming
    b'\x09': SX_NETINT,      # ( 9): Integer in network order forthcoming
    b'\x0a': SX_SCALAR,      # (10): Scalar (binary, small) follows (length, data)
    b'\x0b': SX_TIED_ARRAY,  # (11): Tied array forthcoming
    b'\x0c': SX_TIED_HASH,   # (12): Tied hash forthcoming
    b'\x0d': SX_TIED_SCALAR, # (13): Tied scalar forthcoming
    b'\x0e': SX_SV_UNDEF,    # (14): Perl's immortal PL_sv_undef
    b'\x0f': SX_SV_YES,      # (15): Perl's immortal PL_sv_yes
    b'\x10': SX_SV_NO,       # (16): Perl's immortal PL_sv_no
    b'\x11': SX_BLESS,       # (17): Object is blessed
    b'\x12': SX_IX_BLESS,    # (18): Object is blessed, classname given by index
    b'\x13': SX_HOOK,        # (19): Stored via hook, user-defined
    b'\x14': SX_OVERLOAD,    # (20): Overloaded reference
    b'\x15': SX_TIED_KEY,    # (21): Tied magic key forthcoming
    b'\x16': SX_TIED_IDX,    # (22): Tied magic index forthcoming
    b'\x17': SX_UTF8STR,     # (23): UTF-8 string forthcoming (small)
    b'\x18': SX_LUTF8STR,    # (24): UTF-8 string forthcoming (large)
    b'\x19': SX_FLAG_HASH,   # (25): Hash with flags forthcoming (size, flags, key/flags/value triplet list)
    b'\x1d': SX_VSTRING,     # (29): vstring forthcoming (small)
    b'\x1e': SX_LVSTRING,    # (30): vstring forthcoming (large)
}


exclude_for_cache = {
    b'\x00',
    b'\x0b',
    b'\x0c',
    b'\x0d',
    b'\x11',
    b'\x12',
}


@maybelogged
def handle_sx_object_refs(cache, data):
    iterateelements = None
    if type(data) is list:
        iterateelements = enumerate(data)
    elif type(data) is dict:
        iterateelements = iter(data.items())
    else:
        return

    for k, item in iterateelements:
        if type(item) is list or type(item) is dict:
            handle_sx_object_refs(cache, item)
        elif type(item) is tuple:
            data[k] = cache['objects'][item[1]]
    return data


@maybelogged
def process_item(fh, cache):
    magic_type = fh.read(1)
    if magic_type in exclude_for_cache:
        data = engine[magic_type](fh, cache)
    else:
        i = cache['objectnr']
        cache['objectnr'] += 1
        data = engine[magic_type](fh, cache)
        cache['objects'][i] = data
    return data


@maybelogged
def thaw(frozen_data):
    fh = BytesIO(frozen_data)
    data = deserialize(fh)
    fh.close()
    return data


@maybelogged
def retrieve(filepath):
    data = None
    with open(filepath, 'rb') as fh:
        file_magic = fh.read(4)
        if file_magic == b'pst0':
            data = deserialize(fh)
    return data


def _read_unsigned_byte(fh):
    return unpack('B', fh.read(1))[0]


def skip_magic_header_if_present(fh):
    curr_pos = fh.tell()
    file_magic = fh.read(4)
    if file_magic != b'pst0':
        fh.seek(curr_pos)


@maybelogged
def deserialize(fh):
    skip_magic_header_if_present(fh)
    magic_byte = _read_unsigned_byte(fh)

    is_network_byte_order = (magic_byte & 1) == 1
    major_version_number = magic_byte >> 1
    minor_version_number = _read_unsigned_byte(fh)

    nvsize = 8  # Size of double in bytes
    integer_formats = {
        2: 'H',
        4: 'I',
        8: 'Q',
    }
    double_formats = {
        4: 'f',
        8: 'd',
    }
    if is_network_byte_order:
        byteorder = '!'
        # TODO: unsure what these values should be when reading a net-order
        # file
        intsize = 4
        longsize = 8
        ptrsize = 4
    else:
        size = _read_unsigned_byte(fh)
        archsize = fh.read(size)

        # 32-bit ppc:     4321
        # 32-bit x86:     1234
        # 64-bit x86_64:  12345678
        # 64-bit ppc:     87654321

        if archsize == b'1234' or archsize == b'12345678':
            byteorder = '<'
        else:
            byteorder = '>'

        x = fh.read(3)
        intsize, longsize, ptrsize = unpack('3B', x)
        if (major_version_number, minor_version_number) >= (2, 2):
            nvsize = _read_unsigned_byte(fh)
            if nvsize > 8:
                raise ValueError('Cannot handle 16 byte doubles')

    cache = {
        'objects': {},
        'objectnr': 0,
        'classes': [],
        'has_sx_object': False,
        'size_unpack_fmt': byteorder + integer_formats[intsize],
        'int_unpack_fmt': byteorder + integer_formats[longsize],
        'double_unpack_fmt': byteorder + double_formats[nvsize],
    }
    data = process_item(fh, cache)

    if cache['has_sx_object']:
        handle_sx_object_refs(cache, data)

    return data

https://t.me/RX1948 - 2025