Server : Apache System : Linux iad1-shared-b8-43 6.6.49-grsec-jammy+ #10 SMP Thu Sep 12 23:23:08 UTC 2024 x86_64 User : dh_edsupp ( 6597262) PHP Version : 8.2.26 Disable Function : NONE Directory : /lib/python3/dist-packages/trac/db/ |
Upload File : |
# -*- coding: utf-8 -*- # # Copyright (C) 2005-2021 Edgewall Software # Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de> # All rights reserved. # # This software is licensed as described in the file COPYING, which # you should have received as part of this distribution. The terms # are also available at https://trac.edgewall.org/wiki/TracLicense. # # This software consists of voluntary contributions made by many # individuals. For the exact contribution history, see the revision # history and logs, available at https://trac.edgewall.org/log/. # # Author: Christopher Lenz <cmlenz@gmx.de> import errno import os import re import weakref from contextlib import closing from trac.config import ConfigurationError, ListOption from trac.core import Component, TracError, implements from trac.db.api import ConnectionBase, IDatabaseConnector from trac.db.schema import Table, Column, Index from trac.db.util import ConnectionWrapper, IterableCursor from trac.util import get_pkginfo, getuser, lazy from trac.util.html import tag from trac.util.translation import _, tag_ _like_escape_re = re.compile(r'([/_%])') _glob_escape_re = re.compile(r'[*?\[]') try: import pysqlite2.dbapi2 as sqlite except ImportError: import sqlite3 as sqlite sqlite_version = sqlite.sqlite_version_info sqlite_version_string = sqlite.sqlite_version pysqlite_version = sqlite.version_info pysqlite_version_string = get_pkginfo(sqlite).get('version', '%d.%d.%s' % pysqlite_version) min_sqlite_version = (3, 0, 0) min_pysqlite_version = (2, 6, 0) # version provided by Python 2.7 class PyFormatCursor(sqlite.Cursor): def _rollback_on_error(self, function, *args, **kwargs): try: return function(self, *args, **kwargs) except sqlite.DatabaseError: self.cnx.rollback() raise def execute(self, sql, args=None): if args: sql = sql % (('?',) * len(args)) return self._rollback_on_error(sqlite.Cursor.execute, sql, args or []) def executemany(self, sql, args): if not args: return sql = sql % (('?',) * len(args[0])) return self._rollback_on_error(sqlite.Cursor.executemany, sql, args) # EagerCursor taken from the example in pysqlite's repository: # # https://github.com/ghaering/pysqlite/blob/master/misc/eager.py # # Only change is to subclass it from PyFormatCursor instead of # sqlite.Cursor. class EagerCursor(PyFormatCursor): def __init__(self, con): PyFormatCursor.__init__(self, con) self.rows = [] self.pos = 0 def execute(self, *args): result = PyFormatCursor.execute(self, *args) self.rows = PyFormatCursor.fetchall(self) self.pos = 0 return result def fetchone(self): try: row = self.rows[self.pos] except IndexError: return None else: self.pos += 1 return row def fetchmany(self, num=None): if num is None: num = self.arraysize result = self.rows[self.pos:self.pos + num] self.pos += num return result def fetchall(self): result = self.rows[self.pos:] self.pos = len(self.rows) return result # Mapping from "abstract" SQL types to DB-specific types _type_map = { 'int': 'integer', 'int64': 'integer', } def _to_sql(table): sql = ["CREATE TABLE %s (" % table.name] coldefs = [] for column in table.columns: ctype = column.type.lower() ctype = _type_map.get(ctype, ctype) if column.auto_increment: ctype = "integer PRIMARY KEY" elif len(table.key) == 1 and column.name in table.key: ctype += " PRIMARY KEY" coldefs.append(" %s %s" % (column.name, ctype)) if len(table.key) > 1: coldefs.append(" UNIQUE (%s)" % ','.join(table.key)) sql.append(',\n'.join(coldefs) + '\n);') yield '\n'.join(sql) for index in table.indices: unique = 'UNIQUE' if index.unique else '' yield "CREATE %s INDEX %s_%s_idx ON %s (%s);" % (unique, table.name, '_'.join(index.columns), table.name, ','.join(index.columns)) class SQLiteConnector(Component): """Database connector for SQLite. Database URLs should be of the form: {{{ sqlite:path/to/trac.db }}} """ implements(IDatabaseConnector) required = False extensions = ListOption('sqlite', 'extensions', doc="""Paths to [https://sqlite.org/loadext.html sqlite extensions]. The paths may be absolute or relative to the Trac environment. """) memory_cnx = None def __init__(self): self.error = None # IDatabaseConnector methods def get_supported_schemes(self): if sqlite_version < min_sqlite_version: self.error = _("SQLite version is %(version)s. Minimum required " "version is %(min_version)s.", version=sqlite_version_string, min_version='%d.%d.%d' % min_sqlite_version) elif pysqlite_version < min_pysqlite_version: self.error = _("Need at least PySqlite %(version)s or higher", version='%d.%d.%d' % min_pysqlite_version) yield 'sqlite', -1 if self.error else 1 def get_connection(self, path, log=None, params={}): self.required = True params['extensions'] = self._extensions if path == ':memory:': try: self.memory_cnx.cursor() except (AttributeError, sqlite.DatabaseError): # memory_cnx is None or database connection closed. self.memory_cnx = SQLiteConnection(path, log, params) return self.memory_cnx else: return SQLiteConnection(path, log, params) def get_exceptions(self): return sqlite def init_db(self, path, schema=None, log=None, params={}): def insert_schema(cursor, schema): if schema is None: from trac.db_default import schema for table in schema: for stmt in self.to_sql(table): cursor.execute(stmt) if path != ':memory:': # make the directory to hold the database if self.db_exists(path): raise TracError(_("Database already exists at %(path)s", path=path)) dir = os.path.dirname(path) if not os.path.exists(dir): os.makedirs(dir) # this direct connect will create the database if needed cnx = sqlite.connect(path, isolation_level=None, timeout=int(params.get('timeout', 10000))) with closing(cnx.cursor()) as cursor: _set_journal_mode(cursor, params.get('journal_mode')) set_synchronous(cursor, params.get('synchronous')) insert_schema(cursor, schema) cnx.isolation_level = 'DEFERRED' else: cnx = self.get_connection(path, log, params) with closing(cnx.cursor()) as cursor: insert_schema(cursor, schema) cnx.commit() def destroy_db(self, path, log=None, params={}): if path != ':memory:': if not os.path.isabs(path): path = os.path.join(self.env.path, path) try: os.remove(path) except OSError as e: if e.errno != errno.ENOENT: raise def db_exists(self, path, log=None, params={}): return os.path.exists(path) def to_sql(self, table): return _to_sql(table) def alter_column_types(self, table, columns): """Yield SQL statements altering the type of one or more columns of a table. Type changes are specified as a `columns` dict mapping column names to `(from, to)` SQL type tuples. """ for name, (from_, to) in sorted(columns.items()): if _type_map.get(to, to) != _type_map.get(from_, from_): raise NotImplementedError("Conversion from %s to %s is not " "implemented" % (from_, to)) return () def backup(self, dest_file): """Simple SQLite-specific backup of the database. :param dest_file: Destination file basename """ import shutil db_str = self.config.get('trac', 'database') try: db_str = db_str[:db_str.index('?')] except ValueError: pass db_name = os.path.join(self.env.path, db_str[7:]) shutil.copy(db_name, dest_file) if not os.path.exists(dest_file): raise TracError(_("No destination file created")) return dest_file def get_system_info(self): yield 'SQLite', sqlite_version_string yield 'pysqlite', pysqlite_version_string @lazy def _extensions(self): _extensions = [] for extpath in self.extensions: if not os.path.isabs(extpath): extpath = os.path.join(self.env.path, extpath) _extensions.append(extpath) return _extensions class SQLiteConnection(ConnectionBase, ConnectionWrapper): """Connection wrapper for SQLite.""" __slots__ = ['_active_cursors', '_eager'] poolable = sqlite_version >= (3, 3, 8) def __init__(self, path, log=None, params={}): self.cnx = None if path != ':memory:': if not os.access(path, os.F_OK): raise ConfigurationError(_('Database "%(path)s" not found.', path=path)) dbdir = os.path.dirname(path) if not os.access(path, os.R_OK + os.W_OK) or \ not os.access(dbdir, os.R_OK + os.W_OK): raise ConfigurationError(tag_( "The user %(user)s requires read _and_ write permissions " "to the database file %(path)s and the directory it is " "located in.", user=tag.code(getuser()), path=tag.code(path))) self._active_cursors = weakref.WeakKeyDictionary() timeout = int(params.get('timeout', 10.0)) self._eager = params.get('cursor', 'eager') == 'eager' # eager is default, can be turned off by specifying ?cursor= cnx = sqlite.connect(path, detect_types=sqlite.PARSE_DECLTYPES, isolation_level=None, check_same_thread=sqlite_version < (3, 3, 1), timeout=timeout) # load extensions extensions = params.get('extensions', []) if len(extensions) > 0: cnx.enable_load_extension(True) for ext in extensions: cnx.load_extension(ext) cnx.enable_load_extension(False) with closing(cnx.cursor()) as cursor: _set_journal_mode(cursor, params.get('journal_mode')) set_synchronous(cursor, params.get('synchronous')) cnx.isolation_level = 'DEFERRED' ConnectionWrapper.__init__(self, cnx, log) def cursor(self): cursor = self.cnx.cursor((PyFormatCursor, EagerCursor)[self._eager]) self._active_cursors[cursor] = True cursor.cnx = self return IterableCursor(cursor, self.log) def rollback(self): for cursor in self._active_cursors: cursor.close() self.cnx.rollback() def cast(self, column, type): if sqlite_version >= (3, 2, 3): return 'CAST(%s AS %s)' % (column, _type_map.get(type, type)) elif type == 'int': # hack to force older SQLite versions to convert column to an int return '1*' + column else: return column def concat(self, *args): return '||'.join(args) def drop_column(self, table, column): column_names = self.get_column_names(table) if column in column_names: table_schema = self._get_table_schema(table) table_schema.remove_columns([column]) temp_table = table + '_old' table_name = self.quote(table) temp_table_name = self.quote(temp_table) column_names.remove(column) cols_to_copy = ','.join(self.quote(col) for col in column_names) cursor = self.cursor() cursor.execute(""" CREATE TEMPORARY TABLE %s AS SELECT * FROM %s """ % (temp_table_name, table_name)) self.drop_table(table) for sql in _to_sql(table_schema): cursor.execute(sql) cursor.execute(""" INSERT INTO %s (%s) SELECT %s FROM %s """ % (table_name, cols_to_copy, cols_to_copy, temp_table_name)) self.drop_table(temp_table) def drop_table(self, table): cursor = self.cursor() if sqlite_version < (3, 7, 6): # SQLite versions at least between 3.6.21 and 3.7.5 have a # buggy behavior with DROP TABLE IF EXISTS (#12298) try: cursor.execute("DROP TABLE " + self.quote(table)) except sqlite.OperationalError: # "no such table" pass else: cursor.execute("DROP TABLE IF EXISTS " + self.quote(table)) def get_column_names(self, table): return [row[1] for row in self._get_table_info(table)] def get_last_id(self, cursor, table, column='id'): return cursor.lastrowid def get_sequence_names(self): return [] def get_table_names(self): rows = self.execute(""" SELECT name FROM sqlite_master WHERE type='table' """) return [row[0] for row in rows] def has_table(self, table): return bool(self._get_table_info(table)) def like(self): if sqlite_version >= (3, 1, 0): return "LIKE %s ESCAPE '/'" else: return 'LIKE %s' def like_escape(self, text): if sqlite_version >= (3, 1, 0): return _like_escape_re.sub(r'/\1', text) else: return text def prefix_match(self): return 'GLOB %s' def prefix_match_value(self, prefix): return _glob_escape_re.sub(lambda m: '[%s]' % m.group(0), prefix) + '*' def quote(self, identifier): return _quote(identifier) def reset_tables(self): cursor = self.cursor() table_names = self.get_table_names() for name in table_names: cursor.execute("DELETE FROM %s" % name) return table_names def update_sequence(self, cursor, table, column='id'): # SQLite handles sequence updates automatically # https://www.sqlite.org/autoinc.html pass def _get_table_info(self, table): cursor = self.cursor() cursor.execute("PRAGMA table_info(%s)" % self.quote(table)) return list(cursor) def _get_table_schema(self, table): key = None items = [] for row in self._get_table_info(table): column = row[1] type_ = row[2] pk = row[5] if pk == 1 and type_ == 'integer': key = [column] auto_increment = True else: auto_increment = False items.append(Column(column, type=type_, auto_increment=auto_increment)) cursor = self.cursor() cursor.execute("PRAGMA index_list(%s)" % self.quote(table)) for row in cursor.fetchall(): index = row[1] unique = row[2] cursor.execute("PRAGMA index_info(%s)" % self.quote(index)) columns = [row[2] for row in cursor] if key is None and index.startswith('sqlite_autoindex_'): key = columns else: items.append(Index(columns, unique=bool(unique))) return Table(table, key=key or [])[items] def _quote(identifier): return "`%s`" % identifier.replace('`', '``') def _set_journal_mode(cursor, value): if not value: return value = value.upper() if value == 'OFF': raise TracError(_("PRAGMA journal_mode `%(value)s` cannot be used " "in SQLite", value=value)) cursor.execute('PRAGMA journal_mode = %s' % _quote(value)) row = cursor.fetchone() if not row: raise TracError(_("PRAGMA journal_mode isn't supported by SQLite " "%(version)s", version=sqlite_version_string)) if (row[0] or '').upper() != value: raise TracError(_("PRAGMA journal_mode `%(value)s` isn't supported " "by SQLite %(version)s", value=value, version=sqlite_version_string)) def set_synchronous(cursor, value): if not value: return if value.isdigit(): value = str(int(value)) cursor.execute('PRAGMA synchronous = %s' % _quote(value))