Server : Apache System : Linux iad1-shared-b8-43 6.6.49-grsec-jammy+ #10 SMP Thu Sep 12 23:23:08 UTC 2024 x86_64 User : dh_edsupp ( 6597262) PHP Version : 8.2.26 Disable Function : NONE Directory : /lib/python3/dist-packages/trac/admin/ |
Upload File : |
# -*- coding: utf-8 -*- # # Copyright (C) 2005-2021 Edgewall Software # Copyright (C) 2005 Jonas Borgström <jonas@edgewall.com> # All rights reserved. # # This software is licensed as described in the file COPYING, which # you should have received as part of this distribution. The terms # are also available at https://trac.edgewall.org/wiki/TracLicense. # # This software consists of voluntary contributions made by many # individuals. For the exact contribution history, see the revision # history and logs, available at https://trac.edgewall.org/. # # Author: Jonas Borgström <jonas@edgewall.com> import itertools import os import pkg_resources import re import shutil from functools import partial from trac import log from trac.admin.api import IAdminPanelProvider from trac.core import * from trac.loader import get_plugin_info from trac.log import LOG_LEVELS, LOG_LEVEL_ALIASES, LOG_LEVEL_ALIASES_MAP from trac.perm import IPermissionRequestor, PermissionExistsError, \ PermissionSystem from trac.util.datefmt import all_timezones, pytz from trac.util.html import tag from trac.util.text import exception_to_unicode, unicode_from_base64, \ unicode_to_base64 from trac.util.translation import _, Locale, get_available_locales, \ ngettext, tag_ from trac.web.api import HTTPNotFound, IRequestHandler, \ is_valid_default_handler from trac.web.chrome import Chrome, INavigationContributor, \ ITemplateProvider, add_notice, add_stylesheet, \ add_warning from trac.wiki.formatter import format_to_html _valid_log_levels = set() _valid_log_levels.update(log.LOG_LEVELS) _valid_log_levels.update(log.LOG_LEVEL_ALIASES) class AdminModule(Component): """Web administration interface provider and panel manager.""" implements(INavigationContributor, IRequestHandler, ITemplateProvider) panel_providers = ExtensionPoint(IAdminPanelProvider) # INavigationContributor methods def get_active_navigation_item(self, req): return 'admin' def get_navigation_items(self, req): # The 'Admin' navigation item is only visible if at least one # admin panel is available panels, providers = self._get_panels(req) if panels: yield 'mainnav', 'admin', tag.a(_("Admin"), href=req.href.admin()) # IRequestHandler methods def match_request(self, req): match = re.match('/admin(?:/([^/]+)(?:/([^/]+)(?:/(.+))?)?)?$', req.path_info) if match: req.args['cat_id'] = match.group(1) req.args['panel_id'] = match.group(2) req.args['path_info'] = match.group(3) return True def process_request(self, req): panels, providers = self._get_panels(req) if not panels: raise HTTPNotFound(_("No administration panels available")) def _panel_order(panel): items = panel[::2] return items[0] != 'general', items != ('general', 'basics'), items panels.sort(key=_panel_order) cat_id = req.args.get('cat_id') or panels[0][0] panel_id = req.args.get('panel_id') path_info = req.args.get('path_info') if not panel_id: try: panel_id = list(filter(lambda panel: panel[0] == cat_id, panels))[0][2] except IndexError: raise HTTPNotFound(_("Unknown administration panel")) provider = providers.get((cat_id, panel_id)) if not provider: raise HTTPNotFound(_("Unknown administration panel")) resp = provider.render_admin_panel(req, cat_id, panel_id, path_info) template, data = resp[:2] data.update({ 'active_cat': cat_id, 'active_panel': panel_id, 'panel_href': partial(req.href, 'admin', cat_id, panel_id), 'panels': itertools.groupby([{ 'category': {'id': panel[0], 'label': panel[1]}, 'panel': {'id': panel[2], 'label': panel[3]} } for panel in panels], lambda k: k['category']), }) add_stylesheet(req, 'common/css/admin.css') return resp # ITemplateProvider methods def get_htdocs_dirs(self): return [] def get_templates_dirs(self): return [pkg_resources.resource_filename('trac.admin', 'templates')] # Internal methods def _get_panels(self, req): """Return a list of available admin panels.""" panels = [] providers = {} for provider in self.panel_providers: p = list(provider.get_admin_panels(req) or []) for panel in p: providers[(panel[0], panel[2])] = provider panels += p return panels, providers def _save_config(config, req, log, notices=None): """Try to save the config, and display either a success notice or a failure warning. """ try: config.save() if notices is None: notices = [_("Your changes have been saved.")] for notice in notices: add_notice(req, notice) except Exception as e: log.error("Error writing to trac.ini: %s", exception_to_unicode(e)) add_warning(req, _("Error writing to trac.ini, make sure it is " "writable by the web server. Your changes have " "not been saved.")) class BasicsAdminPanel(Component): implements(IAdminPanelProvider) request_handlers = ExtensionPoint(IRequestHandler) # IAdminPanelProvider methods def get_admin_panels(self, req): if 'TRAC_ADMIN' in req.perm('admin', 'general/basics'): yield ('general', _("General"), 'basics', _("Basic Settings")) def render_admin_panel(self, req, cat, page, path_info): valid_default_handlers = [handler.__class__.__name__ for handler in self.request_handlers if is_valid_default_handler(handler)] if Locale: locale_ids = get_available_locales() locales = [Locale.parse(locale) for locale in locale_ids] # don't use str(locale) to prevent storing expanded locale # identifier, see #11258 languages = sorted((id, locale.display_name) for id, locale in zip(locale_ids, locales)) else: locale_ids, locales, languages = [], [], [] if req.method == 'POST': for option in ('name', 'url', 'descr'): self.config.set('project', option, req.args.get(option)) default_handler = req.args.get('default_handler') self.config.set('trac', 'default_handler', default_handler) default_timezone = req.args.get('default_timezone') if default_timezone not in all_timezones: default_timezone = '' self.config.set('trac', 'default_timezone', default_timezone) default_language = req.args.get('default_language') if default_language not in locale_ids: default_language = '' self.config.set('trac', 'default_language', default_language) default_date_format = req.args.get('default_date_format') if default_date_format != 'iso8601': default_date_format = '' self.config.set('trac', 'default_date_format', default_date_format) default_dateinfo_format = req.args.get('default_dateinfo_format') if default_dateinfo_format not in ('relative', 'absolute'): default_dateinfo_format = 'relative' self.config.set('trac', 'default_dateinfo_format', default_dateinfo_format) _save_config(self.config, req, self.log) req.redirect(req.href.admin(cat, page)) default_handler = self.config.get('trac', 'default_handler') default_timezone = self.config.get('trac', 'default_timezone') default_language = self.config.get('trac', 'default_language') default_date_format = self.config.get('trac', 'default_date_format') default_dateinfo_format = self.config.get('trac', 'default_dateinfo_format') data = { 'default_handler': default_handler, 'valid_default_handlers': sorted(valid_default_handlers), 'default_timezone': default_timezone, 'timezones': all_timezones, 'has_pytz': pytz is not None, 'default_language': default_language.replace('-', '_'), 'languages': languages, 'default_date_format': default_date_format, 'default_dateinfo_format': default_dateinfo_format, 'has_babel': Locale is not None, } Chrome(self.env).add_textarea_grips(req) return 'admin_basics.html', data class LoggingAdminPanel(Component): implements(IAdminPanelProvider) # IAdminPanelProvider methods def get_admin_panels(self, req): if 'TRAC_ADMIN' in req.perm('admin', 'general/logging'): yield ('general', _("General"), 'logging', _("Logging")) def render_admin_panel(self, req, cat, page, path_info): log_type = self.env.log_type log_level = self.env.log_level log_file = self.env.log_file log_dir = self.env.log_dir log_types = [ dict(name='none', label=_("None"), selected=log_type == 'none', disabled=False), dict(name='stderr', label=_("Console"), selected=log_type == 'stderr', disabled=False), dict(name='file', label=_("File"), selected=log_type == 'file', disabled=False), dict(name='syslog', label=_("Syslog"), selected=log_type in ('unix', 'syslog'), disabled=os.name != 'posix'), dict(name='eventlog', label=_("Windows event log"), selected=log_type in ('winlog', 'eventlog', 'nteventlog'), disabled=os.name != 'nt'), ] if req.method == 'POST': changed = False new_type = req.args.get('log_type') if new_type not in [t['name'] for t in log_types]: raise TracError( _("Unknown log type %(type)s", type=new_type), _("Invalid log type") ) new_file = req.args.get('log_file', log_file) if not new_file: raise TracError(_("You must specify a log file"), _("Missing field")) new_level = req.args.get('log_level', log_level) if new_level not in _valid_log_levels: raise TracError( _("Unknown log level %(level)s", level=new_level), _("Invalid log level")) # Create logger to be sure the configuration is valid. new_file_path = new_file if not os.path.isabs(new_file_path): new_file_path = os.path.join(self.env.log_dir, new_file) try: logger, handler = \ self.env.create_logger(new_type, new_file_path, new_level, self.env.log_format) except Exception as e: add_warning(req, tag_("Changes not saved. Logger configuration " "error: %(error)s. Inspect the log for more " "information.", error=tag.code(exception_to_unicode(e)))) self.log.error("Logger configuration error: %s", exception_to_unicode(e, traceback=True)) else: handler.close() if new_type != log_type: self.config.set('logging', 'log_type', new_type) changed = True log_type = new_type if new_level != log_level: self.config.set('logging', 'log_level', new_level) changed = True log_level = new_level if new_file != log_file: self.config.set('logging', 'log_file', new_file) changed = True log_file = new_file if changed: _save_config(self.config, req, self.log), req.redirect(req.href.admin(cat, page)) # Order log levels by priority value, with aliases excluded. all_levels = sorted(log.LOG_LEVEL_MAP, key=log.LOG_LEVEL_MAP.get, reverse=True) log_levels = [level for level in all_levels if level in log.LOG_LEVELS] log_level = LOG_LEVEL_ALIASES_MAP.get(log_level, log_level) data = { 'type': log_type, 'types': log_types, 'level': log_level, 'levels': log_levels, 'file': log_file, 'dir': log_dir } return 'admin_logging.html', {'log': data} class PermissionAdminPanel(Component): implements(IAdminPanelProvider, IPermissionRequestor) # IPermissionRequestor methods def get_permission_actions(self): actions = ['PERMISSION_GRANT', 'PERMISSION_REVOKE'] return actions + [('PERMISSION_ADMIN', actions)] # IAdminPanelProvider methods def get_admin_panels(self, req): perm = req.perm('admin', 'general/perm') if 'PERMISSION_GRANT' in perm or 'PERMISSION_REVOKE' in perm: yield ('general', _("General"), 'perm', _("Permissions")) def render_admin_panel(self, req, cat, page, path_info): perm = PermissionSystem(self.env) all_actions = perm.get_actions() if req.method == 'POST': subject = req.args.get('subject', '').strip() target = req.args.get('target', '').strip() action = req.args.get('action') group = req.args.get('group', '').strip() if subject and subject.isupper() or \ group and group.isupper() or \ target and target.isupper(): raise TracError(_("All upper-cased tokens are reserved for " "permission names.")) # Grant permission to subject if 'add' in req.args and subject and action: req.perm('admin', 'general/perm').require('PERMISSION_GRANT') if action not in all_actions: raise TracError(_("Unknown action")) req.perm.require(action) try: perm.grant_permission(subject, action) except TracError as e: add_warning(req, e) else: add_notice(req, _("The subject %(subject)s has been " "granted the permission %(action)s.", subject=subject, action=action)) # Add subject to group elif 'add' in req.args and subject and group: req.perm('admin', 'general/perm').require('PERMISSION_GRANT') for action in sorted( perm.get_user_permissions(group, expand_meta=False)): req.perm.require(action, message=tag_( "The subject %(subject)s was not added to the " "group %(group)s. The group has %(perm)s " "permission and you cannot grant permissions you " "don't possess.", subject=tag.strong(subject), group=tag.strong(group), perm=tag.strong(action))) try: perm.grant_permission(subject, group) except TracError as e: add_warning(req, e) else: add_notice(req, _("The subject %(subject)s has been " "added to the group %(group)s.", subject=subject, group=group)) # Copy permissions to subject elif 'copy' in req.args and subject and target: req.perm('admin', 'general/perm').require('PERMISSION_GRANT') subject_permissions = perm.get_users_dict().get(subject, []) if not subject_permissions: add_warning(req, _("The subject %(subject)s does not " "have any permissions.", subject=subject)) for action in subject_permissions: if action not in all_actions: # plugin disabled? self.log.warning("Skipped granting %s to %s: " "permission unavailable.", action, target) else: if action not in req.perm: add_warning(req, _("The permission %(action)s was " "not granted to %(subject)s " "because users cannot grant " "permissions they don't possess.", action=action, subject=subject)) continue try: perm.grant_permission(target, action) except PermissionExistsError: pass else: add_notice(req, _("The subject %(subject)s has " "been granted the permission " "%(action)s.", subject=target, action=action)) req.redirect(req.href.admin(cat, page)) # Remove permissions action elif 'remove' in req.args and 'sel' in req.args: req.perm('admin', 'general/perm').require('PERMISSION_REVOKE') for key in req.args.getlist('sel'): subject, action = key.split(':', 1) subject = unicode_from_base64(subject) action = unicode_from_base64(action) if (subject, action) in perm.get_all_permissions(): perm.revoke_permission(subject, action) add_notice(req, _("The selected permissions have been " "revoked.")) req.redirect(req.href.admin(cat, page)) return 'admin_perms.html', { 'actions': all_actions, 'allowed_actions': [a for a in all_actions if a in req.perm], 'perms': perm.get_users_dict(), 'groups': perm.get_groups_dict(), 'unicode_to_base64': unicode_to_base64 } class PluginAdminPanel(Component): implements(IAdminPanelProvider) # IAdminPanelProvider methods def get_admin_panels(self, req): if 'TRAC_ADMIN' in req.perm('admin', 'general/plugin'): yield ('general', _("General"), 'plugin', _("Plugins")) def render_admin_panel(self, req, cat, page, path_info): if req.method == 'POST': if 'install' in req.args: self._do_install(req) elif 'uninstall' in req.args: self._do_uninstall(req) else: self._do_update(req) anchor = '' if 'plugin' in req.args: anchor = '#no%d' % (req.args.getint('plugin') + 1) req.redirect(req.href.admin(cat, page) + anchor) return self._render_view(req) # Internal methods def _do_install(self, req): """Install a plugin.""" if 'plugin_file' not in req.args: raise TracError(_("No file uploaded")) upload = req.args['plugin_file'] if isinstance(upload, str) or not upload.filename: raise TracError(_("No file uploaded")) plugin_filename = upload.filename.replace('\\', '/').replace(':', '/') plugin_filename = os.path.basename(plugin_filename) if not plugin_filename: raise TracError(_("No file uploaded")) if not plugin_filename.endswith('.egg') and \ not plugin_filename.endswith('.py'): raise TracError(_("Uploaded file is not a Python source file or " "egg")) target_path = os.path.join(self.env.plugins_dir, plugin_filename) if os.path.isfile(target_path): raise TracError(_("Plugin %(name)s already installed", name=plugin_filename)) self.log.info("Installing plugin %s", plugin_filename) flags = os.O_CREAT + os.O_WRONLY + os.O_EXCL try: flags += os.O_BINARY except AttributeError: # OS_BINARY not available on every platform pass with os.fdopen(os.open(target_path, flags, 0o666), 'wb') as target_file: shutil.copyfileobj(upload.file, target_file) self.log.info("Plugin %s installed to %s", plugin_filename, target_path) # TODO: Validate that the uploaded file is a valid Trac plugin # Make the environment reset itself on the next request self.env.config.touch() def _do_uninstall(self, req): """Uninstall a plugin.""" plugin_filename = req.args.get('plugin_filename') if not plugin_filename: return plugin_path = os.path.join(self.env.plugins_dir, plugin_filename) if not os.path.isfile(plugin_path): return self.log.info("Uninstalling plugin %s", plugin_filename) os.remove(plugin_path) # Make the environment reset itself on the next request self.env.config.touch() def _do_update(self, req): """Update component enable state.""" components = req.args.getlist('component') enabled = req.args.getlist('enable') added, removed = [], [] # FIXME: this needs to be more intelligent and minimize multiple # component names to prefix rules for component in components: is_enabled = bool(self.env.is_component_enabled(component)) must_enable = component in enabled if is_enabled != must_enable: self.config.set('components', component, 'disabled' if is_enabled else 'enabled') self.log.info("%sabling component %s", "Dis" if is_enabled else "En", component) if must_enable: added.append(component) else: removed.append(component) if added or removed: def make_list(items): parts = [item.rsplit('.', 1) for item in items] return tag.table(tag.tbody( tag.tr(tag.td(c, class_='trac-name'), tag.td('(%s.*)' % m, class_='trac-name')) for m, c in parts), class_='trac-pluglist') added.sort() removed.sort() notices = [] if removed: msg = ngettext("The following component has been disabled:", "The following components have been disabled:", len(removed)) notices.append(tag(msg, make_list(removed))) if added: msg = ngettext("The following component has been enabled:", "The following components have been enabled:", len(added)) notices.append(tag(msg, make_list(added))) # set the default value of options for only the enabled components for component in added: self.config.set_defaults(component=component) _save_config(self.config, req, self.log, notices) def _render_view(self, req): plugins = get_plugin_info(self.env, include_core=True) def safe_wiki_to_html(context, text): try: return format_to_html(self.env, context, text) except Exception as e: self.log.error("Unable to render component documentation: %s", exception_to_unicode(e, traceback=True)) return tag.pre(text) data = { 'plugins': plugins, 'show': req.args.get('show'), 'readonly': not os.access(self.env.plugins_dir, os.F_OK + os.W_OK), 'safe_wiki_to_html': safe_wiki_to_html, } return 'admin_plugins.html', data