Server : Apache System : Linux iad1-shared-b8-43 6.6.49-grsec-jammy+ #10 SMP Thu Sep 12 23:23:08 UTC 2024 x86_64 User : dh_edsupp ( 6597262) PHP Version : 8.2.26 Disable Function : NONE Directory : /lib/python3/dist-packages/trac/ticket/ |
Upload File : |
# -*- coding: utf-8 -*- # # Copyright (C) 2006-2021 Edgewall Software # Copyright (C) 2006 Alec Thomas # Copyright (C) 2007 Eli Carter # Copyright (C) 2007 Christian Boos <cboos@edgewall.org> # All rights reserved. # # This software is licensed as described in the file COPYING, which # you should have received as part of this distribution. The terms # are also available at https://trac.edgewall.org/wiki/TracLicense. # # This software consists of voluntary contributions made by many # individuals. For the exact contribution history, see the revision # history and logs, available at https://trac.edgewall.org/log/. # # Author: Eli Carter import io from configparser import ParsingError, RawConfigParser from collections import defaultdict from functools import partial from pkg_resources import resource_filename from trac.api import IEnvironmentSetupParticipant from trac.config import ConfigSection, Configuration, ConfigurationError from trac.core import * from trac.perm import PermissionCache, PermissionSystem from trac.resource import ResourceNotFound from trac.ticket.api import ITicketActionController, TicketSystem from trac.ticket.model import Component as TicketComponent, Resolution from trac.util import exception_to_unicode, get_reporter_id, sub_val, to_list from trac.util.html import tag from trac.util.presentation import separated from trac.util.translation import _, tag_, cleandoc_ from trac.versioncontrol.api import RepositoryManager from trac.web.chrome import Chrome, add_script, add_script_data from trac.wiki.formatter import MacroError, ProcessorError from trac.wiki.macros import WikiMacroBase, parse_args # -- Utilities for the ConfigurableTicketWorkflow def parse_workflow_config(rawactions): """Given a list of options from [ticket-workflow]""" required_attrs = { 'oldstates': [], 'newstate': '', 'name': '', 'label': '', 'default': 0, 'operations': [], 'permissions': [], } optional_attrs = { 'set_owner': [], 'set_resolution': [], } known_attrs = required_attrs.copy() known_attrs.update(optional_attrs) actions = defaultdict(dict) for option, value in rawactions: parts = option.split('.') name = parts[0] if len(parts) == 1: try: # Base name, of the syntax: old,states,here -> newstate oldstates, newstate = [x.strip() for x in value.split('->')] except ValueError: continue # Syntax error, a warning will be logged later actions[name]['oldstates'] = to_list(oldstates) actions[name]['newstate'] = newstate else: attribute = parts[1] if attribute not in known_attrs or \ isinstance(known_attrs[attribute], str): actions[name][attribute] = value elif isinstance(known_attrs[attribute], int): actions[name][attribute] = int(value) elif isinstance(known_attrs[attribute], list): actions[name][attribute] = to_list(value) for action, attributes in actions.items(): if 'label' not in attributes: if 'name' in attributes: # backwards-compatibility, #11828 attributes['label'] = attributes['name'] else: attributes['label'] = action.replace("_", " ").strip() for key, val in required_attrs.items(): attributes.setdefault(key, val) for val in ('<none>', '< none >'): sub_val(attributes['oldstates'], val, None) return actions def get_workflow_config(config): """Usually passed self.config, this will return the parsed ticket-workflow section. """ raw_actions = list(config.options('ticket-workflow')) actions = parse_workflow_config(raw_actions) return actions def load_workflow_config_snippet(config, filename): """Loads the ticket-workflow section from the given file (expected to be in the 'workflows' tree) into the provided config. """ filename = resource_filename('trac.ticket', 'workflows/%s' % filename) new_config = Configuration(filename) for name, value in new_config.options('ticket-workflow'): config.set('ticket-workflow', name, value) class ConfigurableTicketWorkflow(Component): """Ticket action controller which provides actions according to a workflow defined in trac.ini. The workflow is defined in the `[ticket-workflow]` section of the [wiki:TracIni#ticket-workflow-section trac.ini] configuration file. """ implements(IEnvironmentSetupParticipant, ITicketActionController) ticket_workflow_section = ConfigSection('ticket-workflow', """The workflow for tickets is controlled by plugins. By default, there's only a `ConfigurableTicketWorkflow` component in charge. That component allows the workflow to be configured via this section in the `trac.ini` file. See TracWorkflow for more details. """) operations = ('del_owner', 'set_owner', 'set_owner_to_self', 'may_set_owner', 'set_resolution', 'del_resolution', 'leave_status', 'reset_workflow') def __init__(self): self.actions = self.get_all_actions() self.log.debug('Workflow actions at initialization: %s\n', self.actions) # IEnvironmentSetupParticipant methods def environment_created(self): """When an environment is created, we provide the basic-workflow, unless a ticket-workflow section already exists. """ if 'ticket-workflow' not in self.config.sections(): load_workflow_config_snippet(self.config, 'basic-workflow.ini') self.config.save() self.actions = self.get_all_actions() def environment_needs_upgrade(self): pass def upgrade_environment(self): pass # ITicketActionController methods def get_ticket_actions(self, req, ticket): """Returns a list of (weight, action) tuples that are valid for this request and this ticket.""" # Get the list of actions that can be performed # Determine the current status of this ticket. If this ticket is in # the process of being modified, we need to base our information on the # pre-modified state so that we don't try to do two (or more!) steps at # once and get really confused. ticket_status = ticket._old.get('status', ticket['status']) exists = ticket_status is not None ticket_owner = ticket._old.get('owner', ticket['owner']) author = get_reporter_id(req, 'author') resource = ticket.resource allowed_actions = [] for action_name, action_info in self.actions.items(): operations = action_info['operations'] newstate = action_info['newstate'] if 'leave_status' not in operations and newstate == '*': # Some other controller handles this operation continue # Exclude action that is effectively a No-op. if len(operations) == 1 and \ 'set_owner_to_self' in operations and \ ticket_owner == author and ticket_status == newstate: continue oldstates = action_info['oldstates'] if exists and oldstates == ['*'] or ticket_status in oldstates: # This action is valid in this state. Check permissions. if self._is_action_allowed(req, action_info, resource): allowed_actions.append((action_info['default'], action_name)) # Append special `_reset` action if status is invalid. if exists and '_reset' in self.actions and \ ticket_status not in TicketSystem(self.env).get_all_status(): reset = self.actions['_reset'] if self._is_action_allowed(req, reset, resource): allowed_actions.append((reset['default'], '_reset')) return allowed_actions def _is_action_allowed(self, req, action, resource): """Returns `True` if the workflow action is allowed for the `resource`. """ perm_cache = req.perm(resource) required_perms = action['permissions'] if required_perms: for permission in required_perms: if permission in perm_cache: break else: return False return True def get_all_status(self): """Return a list of all states described by the configuration. """ all_status = set() for attributes in self.actions.values(): all_status.update(attributes['oldstates']) all_status.add(attributes['newstate']) all_status.discard('*') all_status.discard('') all_status.discard(None) return all_status def render_ticket_action_control(self, req, ticket, action): self.log.debug('render_ticket_action_control: action "%s"', action) this_action = self.actions[action] label = this_action['label'] operations = this_action['operations'] ticket_owner = ticket._old.get('owner', ticket['owner']) ticket_status = ticket._old.get('status', ticket['status']) next_status = this_action['newstate'] author = get_reporter_id(req, 'author') author_info = partial(Chrome(self.env).authorinfo, req, resource=ticket.resource) format_author = partial(Chrome(self.env).format_author, req, resource=ticket.resource) formatted_current_owner = author_info(ticket_owner) exists = ticket_status is not None ticket_system = TicketSystem(self.env) control = [] # default to nothing hints = [] if 'reset_workflow' in operations: control.append(_("from invalid state")) hints.append(_("Current state no longer exists")) if 'del_owner' in operations: hints.append(_("The ticket will be disowned")) if 'set_owner' in operations or 'may_set_owner' in operations: owners = self.get_allowed_owners(req, ticket, this_action) if 'set_owner' in operations: default_owner = author elif 'may_set_owner' in operations: if not exists: default_owner = ticket_system.default_owner else: default_owner = ticket_owner or None if owners is not None and default_owner not in owners: owners.insert(0, default_owner) else: # Protect against future modification for case that another # operation is added to the outer conditional raise AssertionError(operations) id = 'action_%s_reassign_owner' % action if not owners: owner = req.args.get(id, default_owner) control.append( tag_("to %(owner)s", owner=tag.input(type='text', id=id, name=id, value=owner))) if not exists or ticket_owner is None: hints.append(_("The owner will be the specified user")) else: hints.append(tag_("The owner will be changed from " "%(current_owner)s to the specified " "user", current_owner=formatted_current_owner)) elif len(owners) == 1: owner = tag.input(type='hidden', id=id, name=id, value=owners[0]) formatted_new_owner = author_info(owners[0]) control.append(tag_("to %(owner)s", owner=tag(formatted_new_owner, owner))) if not exists or ticket_owner is None: hints.append(tag_("The owner will be %(new_owner)s", new_owner=formatted_new_owner)) elif ticket['owner'] != owners[0]: hints.append(tag_("The owner will be changed from " "%(current_owner)s to %(new_owner)s", current_owner=formatted_current_owner, new_owner=formatted_new_owner)) else: selected_owner = req.args.get(id, default_owner) control.append(tag_("to %(owner)s", owner=tag.select( [tag.option(text, value=value if value is not None else '', selected=(value == selected_owner or None)) for text, value in sorted((format_author(owner), owner) for owner in owners)], id=id, name=id))) if not exists or ticket_owner is None: hints.append(_("The owner will be the selected user")) else: hints.append(tag_("The owner will be changed from " "%(current_owner)s to the selected user", current_owner=formatted_current_owner)) elif 'set_owner_to_self' in operations: formatted_author = author_info(author) if not exists or ticket_owner is None: hints.append(tag_("The owner will be %(new_owner)s", new_owner=formatted_author)) elif ticket_owner != author: hints.append(tag_("The owner will be changed from " "%(current_owner)s to %(new_owner)s", current_owner=formatted_current_owner, new_owner=formatted_author)) elif ticket_status != next_status: hints.append(tag_("The owner will remain %(current_owner)s", current_owner=formatted_current_owner)) if 'set_resolution' in operations: resolutions = [r.name for r in Resolution.select(self.env)] if 'set_resolution' in this_action: valid_resolutions = set(resolutions) resolutions = this_action['set_resolution'] if any(x not in valid_resolutions for x in resolutions): raise ConfigurationError(_( "Your workflow attempts to set a resolution but uses " "undefined resolutions (configuration issue, please " "contact your Trac admin).")) if not resolutions: raise ConfigurationError(_( "Your workflow attempts to set a resolution but none is " "defined (configuration issue, please contact your Trac " "admin).")) id = 'action_%s_resolve_resolution' % action if len(resolutions) == 1: resolution = tag.input(type='hidden', id=id, name=id, value=resolutions[0]) control.append(tag_("as %(resolution)s", resolution=tag(resolutions[0], resolution))) hints.append(tag_("The resolution will be set to %(name)s", name=resolutions[0])) else: selected_option = req.args.get(id, ticket_system.default_resolution) control.append(tag_("as %(resolution)s", resolution=tag.select( [tag.option(x, value=x, selected=(x == selected_option or None)) for x in resolutions], id=id, name=id))) hints.append(_("The resolution will be set")) if 'del_resolution' in operations: hints.append(_("The resolution will be deleted")) if 'leave_status' in operations: if len(operations) == 1: control.append(tag_("as %(status)s", status=ticket_status)) hints.append(tag_("The owner will remain %(current_owner)s", current_owner=formatted_current_owner) if ticket_owner else _("The ticket will remain with no owner")) elif ticket['status'] is None: # New ticket hints.append(tag_("The status will be '%(name)s'", name=next_status)) elif next_status != ticket_status: hints.append(tag_("Next status will be '%(name)s'", name=next_status)) return (label, tag(separated(control, ' ')), tag(separated(hints, '. ', '.') if hints else '')) def get_ticket_changes(self, req, ticket, action): this_action = self.actions[action] # Enforce permissions if not self._is_action_allowed(req, this_action, ticket.resource): # The user does not have any of the listed permissions, so we won't # do anything. return {} updated = {} # Status changes status = this_action['newstate'] if status != '*': updated['status'] = status for operation in this_action['operations']: if operation == 'del_owner': updated['owner'] = '' elif operation in ('set_owner', 'may_set_owner'): set_owner = this_action.get('set_owner') newowner = req.args.get('action_%s_reassign_owner' % action, set_owner[0] if set_owner else '') # If there was already an owner, we get a list, [new, old], # but if there wasn't we just get new. if type(newowner) == list: newowner = newowner[0] updated['owner'] = self._sub_owner_keyword(newowner, ticket) elif operation == 'set_owner_to_self': updated['owner'] = get_reporter_id(req, 'author') elif operation == 'del_resolution': updated['resolution'] = '' elif operation == 'set_resolution': set_resolution = this_action.get('set_resolution') newresolution = req.args.get('action_%s_resolve_resolution' % action, set_resolution[0] if set_resolution else '') updated['resolution'] = newresolution # reset_workflow is just a no-op here, so we don't look for it. # leave_status is just a no-op here, so we don't look for it. # Set owner to component owner for 'new' ticket if: # - ticket doesn't exist and owner is < default > # - component is changed # - owner isn't explicitly changed # - ticket owner is equal to owner of previous component # - new component has an owner if not ticket.exists and 'owner' not in updated: updated['owner'] = self._sub_owner_keyword(ticket['owner'], ticket) elif ticket['status'] == 'new' and \ 'component' in ticket.values and \ 'component' in ticket._old and \ 'owner' not in updated: try: old_comp = TicketComponent(self.env, ticket._old['component']) except ResourceNotFound: # If the old component has been removed from the database # we just leave the owner as is. pass else: old_owner = old_comp.owner or '' current_owner = ticket['owner'] or '' if old_owner == current_owner: new_comp = TicketComponent(self.env, ticket['component']) if new_comp.owner: updated['owner'] = new_comp.owner return updated def apply_action_side_effects(self, req, ticket, action): pass # Public methods (for other ITicketActionControllers that want to use # our config file and provide an operation for an action) def get_all_actions(self): actions = parse_workflow_config(self.ticket_workflow_section.options()) has_new_state = any('new' in [a['newstate']] + a['oldstates'] for a in actions.values()) if has_new_state: # Special action that gets enabled if the current status no # longer exists, as no other action can then change its state. # (#5307/#11850) reset = { 'default': 0, 'label': 'Reset', 'newstate': 'new', 'oldstates': [], 'operations': ['reset_workflow'], 'permissions': ['TICKET_ADMIN'] } for key, val in reset.items(): actions['_reset'].setdefault(key, val) for name, info in actions.items(): if not info['newstate']: self.log.warning("Ticket workflow action '%s' doesn't define " "any transitions", name) return actions def get_actions_by_operation(self, operation): """Return a list of all actions with a given operation (for use in the controller's get_all_status()) """ actions = [(info['default'], action) for action, info in self.actions.items() if operation in info['operations']] return actions def get_actions_by_operation_for_req(self, req, ticket, operation): """Return list of all actions with a given operation that are valid in the given state for the controller's get_ticket_actions(). If state='*' (the default), all actions with the given operation are returned. """ # Be sure to look at the original status. status = ticket._old.get('status', ticket['status']) actions = [(info['default'], action) for action, info in self.actions.items() if operation in info['operations'] and ('*' in info['oldstates'] or status in info['oldstates']) and self._is_action_allowed(req, info, ticket.resource)] return actions # Public methods def get_allowed_owners(self, req, ticket, action): """Returns users listed in the `set_owner` field of the action or possessing the `TICKET_MODIFY` permission if `set_owner` is not specified. This method can be overridden in a subclasses in order to customize the list of users that populate the assign-to select box. :since: 1.3.2 """ if 'set_owner' in action: return self._to_users(action['set_owner'], ticket) elif TicketSystem(self.env).restrict_owner: users = PermissionSystem(self.env)\ .get_users_with_permission('TICKET_MODIFY') cache = partial(PermissionCache, self.env, resource=ticket.resource) return sorted(u for u in users if 'TICKET_MODIFY' in cache(username=u)) # Internal methods def _sub_owner_keyword(self, owner, ticket): """Substitute keywords from the default_owner field. < default > -> component owner """ if owner in ('< default >', '<default>'): default_owner = '' if ticket['component']: try: component = TicketComponent(self.env, ticket['component']) except ResourceNotFound: pass # No such component exists else: default_owner = component.owner # May be empty return default_owner return owner def _to_users(self, users_perms_and_groups, ticket): """Finds all users contained in the list of `users_perms_and_groups` by recursive lookup of users when a `group` is encountered. """ ps = PermissionSystem(self.env) groups = ps.get_groups_dict() def append_owners(users_perms_and_groups): for user_perm_or_group in users_perms_and_groups: if user_perm_or_group == 'authenticated': owners.update({u[0] for u in self.env.get_known_users()}) elif user_perm_or_group.isupper(): perm = user_perm_or_group for user in ps.get_users_with_permission(perm): if perm in PermissionCache(self.env, user, ticket.resource): owners.add(user) elif user_perm_or_group not in groups: owners.add(user_perm_or_group) else: append_owners(groups[user_perm_or_group]) owners = set() append_owners(users_perms_and_groups) return sorted(owners) class WorkflowMacro(WikiMacroBase): _domain = 'messages' _description = cleandoc_( """Render a workflow graph. This macro accepts a TracWorkflow configuration and renders the states and transitions as a directed graph. If no parameters are given, the current ticket workflow is rendered. In [WikiProcessors WikiProcessor] mode the `width` and `height` arguments can be specified (Defaults: `width = 800` and `height = 600`). The repository-scoped path of a workflow file can be specified as either a macro or !WikiProcessor `file` argument. The file revision can be specified by appending `@<rev>` to the path. The `file` argument value must be enclosed in single or double quotes. //(Since 1.3.2)//. Examples: {{{ [[Workflow()]] [[Workflow(go = here -> there; return = there -> here)]] [[Workflow(file=/contrib/workflow/enterprise-workflow.ini@1)]] {{{#!Workflow file="/contrib/workflow/enterprise-workflow.ini" }}} {{{#!Workflow width=700 height=700 leave = * -> * leave.operations = leave_status leave.default = 1 create = <none> -> new create.default = 1 create_and_assign = <none> -> assigned create_and_assign.label = assign create_and_assign.permissions = TICKET_MODIFY create_and_assign.operations = may_set_owner accept = new,assigned,accepted,reopened -> accepted accept.permissions = TICKET_MODIFY accept.operations = set_owner_to_self resolve = new,assigned,accepted,reopened -> closed resolve.permissions = TICKET_MODIFY resolve.operations = set_resolution reassign = new,assigned,accepted,reopened -> assigned reassign.permissions = TICKET_MODIFY reassign.operations = set_owner reopen = closed -> reopened reopen.permissions = TICKET_CREATE reopen.operations = del_resolution }}} }}} """) def expand_macro(self, formatter, name, content, args=None): if content is not None: content = content.strip() if not args and not content: raw_actions = self.config.options('ticket-workflow') else: is_macro = args is None if is_macro: kwargs = parse_args(content)[1] file = kwargs.get('file') else: file = args.get('file') if not file and not content: raise ProcessorError("Invalid argument(s).") if file: print(file) text = RepositoryManager(self.env).read_file_by_path(file) if text is None: raise ProcessorError( tag_("The file %(file)s does not exist.", file=tag.code(file))) elif is_macro: text = '\n'.join(line.lstrip() for line in content.split(';')) else: text = content if '[ticket-workflow]' not in text: text = '[ticket-workflow]\n' + text parser = RawConfigParser() try: parser.readfp(io.StringIO(text)) except ParsingError as e: if is_macro: raise MacroError(exception_to_unicode(e)) from e else: raise ProcessorError(exception_to_unicode(e)) from e raw_actions = list(parser.items('ticket-workflow')) actions = parse_workflow_config(raw_actions) states = list( {state for action in actions.values() for state in action['oldstates']} | {action['newstate'] for action in actions.values()}) action_labels = [attrs['label'] for attrs in actions.values()] action_names = list(actions) edges = [] for name, action in actions.items(): new_index = states.index(action['newstate']) name_index = action_names.index(name) for old_state in action['oldstates']: old_index = states.index(old_state) edges.append((old_index, new_index, name_index)) args = args or {} width = args.get('width', 800) height = args.get('height', 600) graph = {'nodes': states, 'actions': action_labels, 'edges': edges, 'width': width, 'height': height} graph_id = '%012x' % id(graph) req = formatter.req add_script(req, 'common/js/workflow_graph.js') add_script_data(req, {'graph_%s' % graph_id: graph}) return tag( tag.div('', class_='trac-workflow-graph trac-noscript', id='trac-workflow-graph-%s' % graph_id, style="display:inline-block;width:%spx;height:%spx" % (width, height)), tag.noscript( tag.div(_("Enable JavaScript to display the workflow graph."), class_='system-message')))