https://t.me/RX1948
Server : Apache
System : Linux iad1-shared-b8-43 6.6.49-grsec-jammy+ #10 SMP Thu Sep 12 23:23:08 UTC 2024 x86_64
User : dh_edsupp ( 6597262)
PHP Version : 8.2.26
Disable Function : NONE
Directory :  /lib/python3/dist-packages/trac/db/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //lib/python3/dist-packages/trac/db/sqlite_backend.py
# -*- coding: utf-8 -*-
#
# Copyright (C) 2005-2021 Edgewall Software
# Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at https://trac.edgewall.org/wiki/TracLicense.
#
# This software consists of voluntary contributions made by many
# individuals. For the exact contribution history, see the revision
# history and logs, available at https://trac.edgewall.org/log/.
#
# Author: Christopher Lenz <cmlenz@gmx.de>

import errno
import os
import re
import weakref
from contextlib import closing

from trac.config import ConfigurationError, ListOption
from trac.core import Component, TracError, implements
from trac.db.api import ConnectionBase, IDatabaseConnector
from trac.db.schema import Table, Column, Index
from trac.db.util import ConnectionWrapper, IterableCursor
from trac.util import get_pkginfo, getuser, lazy
from trac.util.html import tag
from trac.util.translation import _, tag_

_like_escape_re = re.compile(r'([/_%])')

_glob_escape_re = re.compile(r'[*?\[]')

try:
    import pysqlite2.dbapi2 as sqlite
except ImportError:
    import sqlite3 as sqlite

sqlite_version = sqlite.sqlite_version_info
sqlite_version_string = sqlite.sqlite_version
pysqlite_version = sqlite.version_info
pysqlite_version_string = get_pkginfo(sqlite).get('version',
                                                  '%d.%d.%s'
                                                  % pysqlite_version)
min_sqlite_version = (3, 0, 0)
min_pysqlite_version = (2, 6, 0)  # version provided by Python 2.7


class PyFormatCursor(sqlite.Cursor):
    def _rollback_on_error(self, function, *args, **kwargs):
        try:
            return function(self, *args, **kwargs)
        except sqlite.DatabaseError:
            self.cnx.rollback()
            raise

    def execute(self, sql, args=None):
        if args:
            sql = sql % (('?',) * len(args))
        return self._rollback_on_error(sqlite.Cursor.execute, sql,
                                       args or [])

    def executemany(self, sql, args):
        if not args:
            return
        sql = sql % (('?',) * len(args[0]))
        return self._rollback_on_error(sqlite.Cursor.executemany, sql,
                                       args)


# EagerCursor taken from the example in pysqlite's repository:
#
#   https://github.com/ghaering/pysqlite/blob/master/misc/eager.py
#
# Only change is to subclass it from PyFormatCursor instead of
# sqlite.Cursor.

class EagerCursor(PyFormatCursor):
    def __init__(self, con):
        PyFormatCursor.__init__(self, con)
        self.rows = []
        self.pos = 0

    def execute(self, *args):
        result = PyFormatCursor.execute(self, *args)
        self.rows = PyFormatCursor.fetchall(self)
        self.pos = 0
        return result

    def fetchone(self):
        try:
            row = self.rows[self.pos]
        except IndexError:
            return None
        else:
            self.pos += 1
            return row

    def fetchmany(self, num=None):
        if num is None:
            num = self.arraysize

        result = self.rows[self.pos:self.pos + num]
        self.pos += num
        return result

    def fetchall(self):
        result = self.rows[self.pos:]
        self.pos = len(self.rows)
        return result


# Mapping from "abstract" SQL types to DB-specific types
_type_map = {
    'int': 'integer',
    'int64': 'integer',
}


def _to_sql(table):
    sql = ["CREATE TABLE %s (" % table.name]
    coldefs = []
    for column in table.columns:
        ctype = column.type.lower()
        ctype = _type_map.get(ctype, ctype)
        if column.auto_increment:
            ctype = "integer PRIMARY KEY"
        elif len(table.key) == 1 and column.name in table.key:
            ctype += " PRIMARY KEY"
        coldefs.append("    %s %s" % (column.name, ctype))
    if len(table.key) > 1:
        coldefs.append("    UNIQUE (%s)" % ','.join(table.key))
    sql.append(',\n'.join(coldefs) + '\n);')
    yield '\n'.join(sql)
    for index in table.indices:
        unique = 'UNIQUE' if index.unique else ''
        yield "CREATE %s INDEX %s_%s_idx ON %s (%s);" % (unique, table.name,
              '_'.join(index.columns), table.name, ','.join(index.columns))


class SQLiteConnector(Component):
    """Database connector for SQLite.

    Database URLs should be of the form:
    {{{
    sqlite:path/to/trac.db
    }}}
    """
    implements(IDatabaseConnector)

    required = False

    extensions = ListOption('sqlite', 'extensions',
        doc="""Paths to [https://sqlite.org/loadext.html sqlite extensions].
        The paths may be absolute or relative to the Trac environment.
        """)

    memory_cnx = None

    def __init__(self):
        self.error = None

    # IDatabaseConnector methods

    def get_supported_schemes(self):
        if sqlite_version < min_sqlite_version:
            self.error = _("SQLite version is %(version)s. Minimum required "
                           "version is %(min_version)s.",
                           version=sqlite_version_string,
                           min_version='%d.%d.%d' % min_sqlite_version)
        elif pysqlite_version < min_pysqlite_version:
            self.error = _("Need at least PySqlite %(version)s or higher",
                           version='%d.%d.%d' % min_pysqlite_version)
        yield 'sqlite', -1 if self.error else 1

    def get_connection(self, path, log=None, params={}):
        self.required = True
        params['extensions'] = self._extensions
        if path == ':memory:':
            try:
                self.memory_cnx.cursor()
            except (AttributeError, sqlite.DatabaseError):
                # memory_cnx is None or database connection closed.
                self.memory_cnx = SQLiteConnection(path, log, params)
            return self.memory_cnx
        else:
            return SQLiteConnection(path, log, params)

    def get_exceptions(self):
        return sqlite

    def init_db(self, path, schema=None, log=None, params={}):

        def insert_schema(cursor, schema):
            if schema is None:
                from trac.db_default import schema
            for table in schema:
                for stmt in self.to_sql(table):
                    cursor.execute(stmt)

        if path != ':memory:':
            # make the directory to hold the database
            if self.db_exists(path):
                raise TracError(_("Database already exists at %(path)s",
                                  path=path))
            dir = os.path.dirname(path)
            if not os.path.exists(dir):
                os.makedirs(dir)
            # this direct connect will create the database if needed
            cnx = sqlite.connect(path, isolation_level=None,
                                 timeout=int(params.get('timeout', 10000)))
            with closing(cnx.cursor()) as cursor:
                _set_journal_mode(cursor, params.get('journal_mode'))
                set_synchronous(cursor, params.get('synchronous'))
                insert_schema(cursor, schema)
            cnx.isolation_level = 'DEFERRED'
        else:
            cnx = self.get_connection(path, log, params)
            with closing(cnx.cursor()) as cursor:
                insert_schema(cursor, schema)
        cnx.commit()

    def destroy_db(self, path, log=None, params={}):
        if path != ':memory:':
            if not os.path.isabs(path):
                path = os.path.join(self.env.path, path)
            try:
                os.remove(path)
            except OSError as e:
                if e.errno != errno.ENOENT:
                    raise

    def db_exists(self, path, log=None, params={}):
        return os.path.exists(path)

    def to_sql(self, table):
        return _to_sql(table)

    def alter_column_types(self, table, columns):
        """Yield SQL statements altering the type of one or more columns of
        a table.

        Type changes are specified as a `columns` dict mapping column names
        to `(from, to)` SQL type tuples.
        """
        for name, (from_, to) in sorted(columns.items()):
            if _type_map.get(to, to) != _type_map.get(from_, from_):
                raise NotImplementedError("Conversion from %s to %s is not "
                                          "implemented" % (from_, to))
        return ()

    def backup(self, dest_file):
        """Simple SQLite-specific backup of the database.

        :param dest_file: Destination file basename
        """
        import shutil
        db_str = self.config.get('trac', 'database')
        try:
            db_str = db_str[:db_str.index('?')]
        except ValueError:
            pass
        db_name = os.path.join(self.env.path, db_str[7:])
        shutil.copy(db_name, dest_file)
        if not os.path.exists(dest_file):
            raise TracError(_("No destination file created"))
        return dest_file

    def get_system_info(self):
        yield 'SQLite', sqlite_version_string
        yield 'pysqlite', pysqlite_version_string

    @lazy
    def _extensions(self):
        _extensions = []
        for extpath in self.extensions:
            if not os.path.isabs(extpath):
                extpath = os.path.join(self.env.path, extpath)
            _extensions.append(extpath)
        return _extensions


class SQLiteConnection(ConnectionBase, ConnectionWrapper):
    """Connection wrapper for SQLite."""

    __slots__ = ['_active_cursors', '_eager']

    poolable = sqlite_version >= (3, 3, 8)

    def __init__(self, path, log=None, params={}):
        self.cnx = None
        if path != ':memory:':
            if not os.access(path, os.F_OK):
                raise ConfigurationError(_('Database "%(path)s" not found.',
                                           path=path))

            dbdir = os.path.dirname(path)
            if not os.access(path, os.R_OK + os.W_OK) or \
                    not os.access(dbdir, os.R_OK + os.W_OK):
                raise ConfigurationError(tag_(
                    "The user %(user)s requires read _and_ write permissions "
                    "to the database file %(path)s and the directory it is "
                    "located in.", user=tag.code(getuser()),
                    path=tag.code(path)))

        self._active_cursors = weakref.WeakKeyDictionary()
        timeout = int(params.get('timeout', 10.0))
        self._eager = params.get('cursor', 'eager') == 'eager'
        # eager is default, can be turned off by specifying ?cursor=
        cnx = sqlite.connect(path, detect_types=sqlite.PARSE_DECLTYPES,
                             isolation_level=None,
                             check_same_thread=sqlite_version < (3, 3, 1),
                             timeout=timeout)
        # load extensions
        extensions = params.get('extensions', [])
        if len(extensions) > 0:
            cnx.enable_load_extension(True)
            for ext in extensions:
                cnx.load_extension(ext)
            cnx.enable_load_extension(False)

        with closing(cnx.cursor()) as cursor:
            _set_journal_mode(cursor, params.get('journal_mode'))
            set_synchronous(cursor, params.get('synchronous'))
        cnx.isolation_level = 'DEFERRED'
        ConnectionWrapper.__init__(self, cnx, log)

    def cursor(self):
        cursor = self.cnx.cursor((PyFormatCursor, EagerCursor)[self._eager])
        self._active_cursors[cursor] = True
        cursor.cnx = self
        return IterableCursor(cursor, self.log)

    def rollback(self):
        for cursor in self._active_cursors:
            cursor.close()
        self.cnx.rollback()

    def cast(self, column, type):
        if sqlite_version >= (3, 2, 3):
            return 'CAST(%s AS %s)' % (column, _type_map.get(type, type))
        elif type == 'int':
            # hack to force older SQLite versions to convert column to an int
            return '1*' + column
        else:
            return column

    def concat(self, *args):
        return '||'.join(args)

    def drop_column(self, table, column):
        column_names = self.get_column_names(table)
        if column in column_names:
            table_schema = self._get_table_schema(table)
            table_schema.remove_columns([column])
            temp_table = table + '_old'
            table_name = self.quote(table)
            temp_table_name = self.quote(temp_table)
            column_names.remove(column)
            cols_to_copy = ','.join(self.quote(col) for col in column_names)
            cursor = self.cursor()
            cursor.execute("""
                CREATE TEMPORARY TABLE %s AS SELECT * FROM %s
                """ % (temp_table_name, table_name))
            self.drop_table(table)
            for sql in _to_sql(table_schema):
                cursor.execute(sql)
            cursor.execute("""
                INSERT INTO %s (%s) SELECT %s FROM %s
                """ % (table_name, cols_to_copy, cols_to_copy,
                       temp_table_name))
            self.drop_table(temp_table)

    def drop_table(self, table):
        cursor = self.cursor()
        if sqlite_version < (3, 7, 6):
            # SQLite versions at least between 3.6.21 and 3.7.5 have a
            # buggy behavior with DROP TABLE IF EXISTS (#12298)
            try:
                cursor.execute("DROP TABLE " + self.quote(table))
            except sqlite.OperationalError: # "no such table"
                pass
        else:
            cursor.execute("DROP TABLE IF EXISTS " + self.quote(table))

    def get_column_names(self, table):
        return [row[1] for row in self._get_table_info(table)]

    def get_last_id(self, cursor, table, column='id'):
        return cursor.lastrowid

    def get_sequence_names(self):
        return []

    def get_table_names(self):
        rows = self.execute("""
            SELECT name FROM sqlite_master WHERE type='table'
            """)
        return [row[0] for row in rows]

    def has_table(self, table):
        return bool(self._get_table_info(table))

    def like(self):
        if sqlite_version >= (3, 1, 0):
            return "LIKE %s ESCAPE '/'"
        else:
            return 'LIKE %s'

    def like_escape(self, text):
        if sqlite_version >= (3, 1, 0):
            return _like_escape_re.sub(r'/\1', text)
        else:
            return text

    def prefix_match(self):
        return 'GLOB %s'

    def prefix_match_value(self, prefix):
        return _glob_escape_re.sub(lambda m: '[%s]' % m.group(0), prefix) + '*'

    def quote(self, identifier):
        return _quote(identifier)

    def reset_tables(self):
        cursor = self.cursor()
        table_names = self.get_table_names()
        for name in table_names:
            cursor.execute("DELETE FROM %s" % name)
        return table_names

    def update_sequence(self, cursor, table, column='id'):
        # SQLite handles sequence updates automatically
        # https://www.sqlite.org/autoinc.html
        pass

    def _get_table_info(self, table):
        cursor = self.cursor()
        cursor.execute("PRAGMA table_info(%s)" % self.quote(table))
        return list(cursor)

    def _get_table_schema(self, table):
        key = None
        items = []
        for row in self._get_table_info(table):
            column = row[1]
            type_ = row[2]
            pk = row[5]
            if pk == 1 and type_ == 'integer':
                key = [column]
                auto_increment = True
            else:
                auto_increment = False
            items.append(Column(column, type=type_,
                                auto_increment=auto_increment))
        cursor = self.cursor()
        cursor.execute("PRAGMA index_list(%s)" % self.quote(table))
        for row in cursor.fetchall():
            index = row[1]
            unique = row[2]
            cursor.execute("PRAGMA index_info(%s)" % self.quote(index))
            columns = [row[2] for row in cursor]
            if key is None and index.startswith('sqlite_autoindex_'):
                key = columns
            else:
                items.append(Index(columns, unique=bool(unique)))
        return Table(table, key=key or [])[items]


def _quote(identifier):
    return "`%s`" % identifier.replace('`', '``')


def _set_journal_mode(cursor, value):
    if not value:
        return
    value = value.upper()
    if value == 'OFF':
        raise TracError(_("PRAGMA journal_mode `%(value)s` cannot be used "
                          "in SQLite", value=value))
    cursor.execute('PRAGMA journal_mode = %s' % _quote(value))
    row = cursor.fetchone()
    if not row:
        raise TracError(_("PRAGMA journal_mode isn't supported by SQLite "
                          "%(version)s", version=sqlite_version_string))
    if (row[0] or '').upper() != value:
        raise TracError(_("PRAGMA journal_mode `%(value)s` isn't supported "
                          "by SQLite %(version)s",
                          value=value, version=sqlite_version_string))


def set_synchronous(cursor, value):
    if not value:
        return
    if value.isdigit():
        value = str(int(value))
    cursor.execute('PRAGMA synchronous = %s' % _quote(value))

https://t.me/RX1948 - 2025