Server : Apache System : Linux iad1-shared-b8-43 6.6.49-grsec-jammy+ #10 SMP Thu Sep 12 23:23:08 UTC 2024 x86_64 User : dh_edsupp ( 6597262) PHP Version : 8.2.26 Disable Function : NONE Directory : /lib/python3/dist-packages/trac/versioncontrol/ |
Upload File : |
# -*- coding: utf-8 -*- # # Copyright (C) 2008-2021 Edgewall Software # All rights reserved. # # This software is licensed as described in the file COPYING, which # you should have received as part of this distribution. The terms # are also available at https://trac.edgewall.org/wiki/TracLicense. # # This software consists of voluntary contributions made by many # individuals. For the exact contribution history, see the revision # history and logs, available at https://trac.edgewall.org/. import os.path import sys from trac.admin import IAdminCommandProvider, IAdminPanelProvider from trac.api import IEnvironmentSetupParticipant from trac.config import ListOption from trac.core import * from trac.perm import IPermissionRequestor from trac.util import as_bool, is_path_below from trac.util.html import tag from trac.util.text import breakable_path, normalize_whitespace, print_table, \ printerr, printout from trac.util.translation import _, ngettext, tag_ from trac.versioncontrol import DbRepositoryProvider, InvalidRepository, \ NoSuchChangeset, RepositoryManager, is_default from trac.web.chrome import Chrome, add_notice, add_warning class VersionControlAdmin(Component): """trac-admin command provider for version control administration.""" implements(IAdminCommandProvider, IEnvironmentSetupParticipant, IPermissionRequestor) # IAdminCommandProvider methods def get_admin_commands(self): yield ('changeset added', '<repos> <rev> [rev] [...]', """Notify trac about changesets added to a repository This command should be called from a post-commit hook. It will trigger a cache update and notify components about the addition. """, self._complete_repos, self._do_changeset_added) yield ('changeset modified', '<repos> <rev> [rev] [...]', """Notify trac about changesets modified in a repository This command should be called from a post-revprop hook after revision properties like the commit message, author or date have been changed. It will trigger a cache update for the given revisions and notify components about the change. """, self._complete_repos, self._do_changeset_modified) yield ('repository list', '', 'List source repositories', None, self._do_list) yield ('repository resync', '<repos> [rev]', """Re-synchronize trac with repositories When [rev] is specified, only that revision is synchronized. Otherwise, the complete revision history is synchronized. Note that this operation can take a long time to complete. If synchronization gets interrupted, it can be resumed later using the `sync` command. <repos> must be the repository name, not the repository path. Use `list` to see a list of repository names and associated paths. To synchronize all repositories, specify "*" for <repos>. The default repository can be specified using "(default)". """, self._complete_repos, self._do_resync) yield ('repository sync', '<repos> [rev]', """Resume synchronization of repositories It works like `resync`, except that it doesn't clear the already synchronized changesets, so it's a better way to resume an interrupted `resync`. See `resync` help for detailed usage. """, self._complete_repos, self._do_sync) def get_reponames(self): rm = RepositoryManager(self.env) return [reponame or '(default)' for reponame in rm.get_all_repositories()] def _complete_repos(self, args): if len(args) == 1: return self.get_reponames() def _do_changeset_added(self, reponame, first_rev, *revs): if is_default(reponame): reponame = '' rm = RepositoryManager(self.env) errors = rm.notify('changeset_added', reponame, (first_rev,) + revs) for error in errors: printerr(error) return 2 if errors else 0 def _do_changeset_modified(self, reponame, first_rev, *revs): if is_default(reponame): reponame = '' rm = RepositoryManager(self.env) errors = rm.notify('changeset_modified', reponame, (first_rev,) + revs) for error in errors: printerr(error) return 2 if errors else 0 def _do_list(self): rm = RepositoryManager(self.env) values = [] for reponame, info in sorted(rm.get_all_repositories().items()): alias = '' if 'alias' in info: alias = info['alias'] or '(default)' values.append((reponame or '(default)', info.get('type', ''), alias, info.get('dir', ''))) print_table(values, [_('Name'), _('Type'), _('Alias'), _('Directory')]) def _sync(self, reponame, rev, clean): rm = RepositoryManager(self.env) if reponame == '*': if rev is not None: raise TracError(_('Cannot synchronize a single revision ' 'on multiple repositories')) repositories = rm.get_real_repositories() else: if is_default(reponame): reponame = '' repos = rm.get_repository(reponame) if repos is None: raise TracError(_("Repository \"%(repo)s\" doesn't exist", repo=reponame or '(default)')) repositories = [repos] for repos in sorted(repositories, key=lambda r: r.reponame): pretty_name = repos.reponame or '(default)' if rev is not None: repos.sync_changeset(rev) printout(_('%(rev)s resynced on %(reponame)s.', rev=rev, reponame=pretty_name)) else: printout(_('Resyncing repository history for %(reponame)s... ', reponame=pretty_name)) repos.sync(self._sync_feedback, clean=clean) self._sync_feedback(None) for cnt, in self.env.db_query( "SELECT count(rev) FROM revision WHERE repos=%s", (repos.id,)): printout(ngettext('%(num)s revision cached.', '%(num)s revisions cached.', num=cnt)) printout(_('Done.')) def _sync_feedback(self, rev): if rev is not None: sys.stdout.write(' [%s]\r' % rev) else: # Erase to end of line. sys.stdout.write('\033[K') sys.stdout.flush() def _do_resync(self, reponame, rev=None): self._sync(reponame, rev, clean=True) def _do_sync(self, reponame, rev=None): self._sync(reponame, rev, clean=False) # IEnvironmentSetupParticipant methods def environment_created(self): """Index the repositories.""" for repos in RepositoryManager(self.env).get_real_repositories(): pretty_name = repos.reponame or '(default)' printout(_(" Indexing '%(name)s' repository", name=pretty_name)) try: repos.sync(self._sync_feedback) except TracError: printerr(_(""" --------------------------------------------------------------------- Warning: couldn't index '%(pretty_name)s' repository. This can happen for a variety of reasons: wrong repository type, no appropriate third party library for this repository type, no repository at the specified repository path... You can nevertheless start using your Trac environment, but you'll need to check your `%(name)s.type` and `%(name)s.dir` option values in the [repositories] section of your trac.ini file. """, pretty_name=pretty_name, name=repos.reponame or '')) else: self._sync_feedback(None) def environment_needs_upgrade(self): pass def upgrade_environment(self): pass # IPermissionRequestor methods def get_permission_actions(self): return [('VERSIONCONTROL_ADMIN', ['BROWSER_VIEW', 'CHANGESET_VIEW', 'FILE_VIEW', 'LOG_VIEW'])] class RepositoryAdminPanel(Component): """Web admin panel for repository administration.""" implements(IAdminPanelProvider) allowed_repository_dir_prefixes = ListOption('versioncontrol', 'allowed_repository_dir_prefixes', '', doc="""Comma-separated list of allowed prefixes for repository directories when adding and editing repositories in the repository admin panel. If the list is empty, all repository directories are allowed. """) # IAdminPanelProvider methods def get_admin_panels(self, req): types = RepositoryManager(self.env).get_supported_types() if types and 'VERSIONCONTROL_ADMIN' \ in req.perm('admin', 'versioncontrol/repository'): yield ('versioncontrol', _('Version Control'), 'repository', _('Repositories')) def render_admin_panel(self, req, category, page, path_info): # Retrieve info for all repositories rm = RepositoryManager(self.env) all_repos = rm.get_all_repositories() db_provider = self.env[DbRepositoryProvider] if path_info: # Detail view reponame = path_info if not is_default(path_info) else '' info = all_repos.get(reponame) if info is None: raise TracError(_("Repository '%(repo)s' not found", repo=path_info)) if req.method == 'POST': if req.args.get('cancel'): req.redirect(req.href.admin(category, page)) elif db_provider and req.args.get('save'): # Modify repository changes = {} valid = True for field in db_provider.repository_attrs: value = normalize_whitespace(req.args.get(field)) if (value is not None or field in ('hidden', 'sync_per_request')) \ and value != info.get(field): changes[field] = value if 'dir' in changes and not \ self._check_dir(req, changes['dir']): valid = False if valid and changes: db_provider.modify_repository(reponame, changes) add_notice(req, _('Your changes have been saved.')) name = req.args.get('name') pretty_name = name or '(default)' resync = tag.code('trac-admin "%s" repository resync ' '"%s"' % (self.env.path, pretty_name)) if 'dir' in changes: msg = tag_('You should now run %(resync)s to ' 'synchronize Trac with the repository.', resync=resync) add_notice(req, msg) elif 'type' in changes: msg = tag_('You may have to run %(resync)s to ' 'synchronize Trac with the repository.', resync=resync) add_notice(req, msg) if name and name != path_info and 'alias' not in info: cset_added = tag.code('trac-admin "%s" changeset ' 'added "%s" $REV' % (self.env.path, pretty_name)) msg = tag_('You will need to update your ' 'post-commit hook to call ' '%(cset_added)s with the new ' 'repository name.', cset_added=cset_added) add_notice(req, msg) if valid: req.redirect(req.href.admin(category, page)) chrome = Chrome(self.env) chrome.add_wiki_toolbars(req) chrome.add_auto_preview(req) data = {'view': 'detail', 'reponame': reponame} else: # List view if req.method == 'POST': # Add a repository if db_provider and req.args.get('add_repos'): name = req.args.get('name') pretty_name = name or '(default)' if name in all_repos: raise TracError(_('The repository "%(name)s" already ' 'exists.', name=pretty_name)) type_ = req.args.get('type') # Avoid errors when copy/pasting paths dir = normalize_whitespace(req.args.get('dir', '')) if name is None or type_ is None or not dir: add_warning(req, _('Missing arguments to add a ' 'repository.')) elif self._check_dir(req, dir): db_provider.add_repository(name, dir, type_) add_notice(req, _('The repository "%(name)s" has been ' 'added.', name=pretty_name)) resync = tag.code('trac-admin "%s" repository resync ' '"%s"' % (self.env.path, pretty_name)) msg = tag_('You should now run %(resync)s to ' 'synchronize Trac with the repository.', resync=resync) add_notice(req, msg) cset_added = tag.code('trac-admin "%s" changeset ' 'added "%s" $REV' % (self.env.path, pretty_name)) doc = tag.a(_("documentation"), href=req.href.wiki('TracRepositoryAdmin') + '#Synchronization') msg = tag_('You should also set up a post-commit hook ' 'on the repository to call %(cset_added)s ' 'for each committed changeset. See the ' '%(doc)s for more information.', cset_added=cset_added, doc=doc) add_notice(req, msg) # Add a repository alias elif db_provider and req.args.get('add_alias'): name = req.args.get('name') pretty_name = name or '(default)' alias = req.args.get('alias') if name is not None and alias is not None: try: db_provider.add_alias(name, alias) except self.env.db_exc.IntegrityError: raise TracError(_('The alias "%(name)s" already ' 'exists.', name=pretty_name)) add_notice(req, _('The alias "%(name)s" has been ' 'added.', name=pretty_name)) else: add_warning(req, _('Missing arguments to add an ' 'alias.')) # Refresh the list of repositories elif req.args.get('refresh'): pass # Remove repositories elif db_provider and req.args.get('remove'): sel = req.args.getlist('sel') if sel: for name in sel: db_provider.remove_repository(name) add_notice(req, _('The selected repositories have ' 'been removed.')) else: add_warning(req, _('No repositories were selected.')) req.redirect(req.href.admin(category, page)) data = {'view': 'list'} # Find repositories that are editable db_repos = {} if db_provider is not None: db_repos = dict(db_provider.get_repositories()) # Prepare common rendering data repositories = {reponame: self._extend_info(reponame, info.copy(), reponame in db_repos) for (reponame, info) in all_repos.items()} types = sorted([''] + rm.get_supported_types()) data.update( {'types': types, 'default_type': rm.default_repository_type, 'repositories': repositories, 'can_add_alias': any('alias' not in info for info in repositories.values())}) return 'admin_repositories.html', data def _extend_info(self, reponame, info, editable): """Extend repository info for rendering.""" info['name'] = reponame info['hidden'] = as_bool(info.get('hidden')) info['sync_per_request'] = as_bool(info.get('sync_per_request')) info['editable'] = editable if 'alias' not in info: if info.get('dir') is not None: info['prettydir'] = breakable_path(info['dir']) or '' try: repos = RepositoryManager(self.env).get_repository(reponame) except InvalidRepository as e: info['error'] = e except TracError: pass # Probably "unsupported connector" else: youngest_rev = repos.get_youngest_rev() info['rev'] = youngest_rev try: info['display_rev'] = repos.display_rev(youngest_rev) except NoSuchChangeset: pass return info def _check_dir(self, req, dir): """Check that a repository directory is valid, and add a warning message if not. """ if not os.path.isabs(dir): add_warning(req, _('The repository directory must be an absolute ' 'path.')) return False prefixes = [os.path.join(self.env.path, prefix) for prefix in self.allowed_repository_dir_prefixes] if prefixes and not any(is_path_below(dir, prefix) for prefix in prefixes): add_warning(req, _('The repository directory must be located ' 'below one of the following directories: ' '%(dirs)s', dirs=', '.join(prefixes))) return False return True