Server : Apache System : Linux iad1-shared-b8-43 6.6.49-grsec-jammy+ #10 SMP Thu Sep 12 23:23:08 UTC 2024 x86_64 User : dh_edsupp ( 6597262) PHP Version : 8.2.26 Disable Function : NONE Directory : /lib/python3/dist-packages/trac/ticket/ |
Upload File : |
# -*- coding: utf-8 -*- # # Copyright (C) 2010 Brian Meeker # All rights reserved. # # This software is licensed as described in the file COPYING, which # you should have received as part of this distribution. The terms # are also available at https://trac.edgewall.org/wiki/TracLicense. # # This software consists of voluntary contributions made by many # individuals. For the exact contribution history, see the revision # history and logs, available at https://trac.edgewall.org/log/. # # Author: Brian Meeker <meeker.brian@gmail.com> import re from trac.core import * from trac.notification.api import NotificationSystem from trac.perm import IPermissionRequestor from trac.ticket.api import ITicketManipulator, TicketSystem from trac.ticket.model import Ticket from trac.ticket.notification import BatchTicketChangeEvent from trac.util import to_list from trac.util.datefmt import datetime_now, utc from trac.util.html import tag from trac.util.text import exception_to_unicode, to_unicode from trac.util.translation import _, tag_ from trac.web.api import HTTPBadRequest, IRequestFilter, IRequestHandler from trac.web.chrome import Chrome, add_script_data, add_warning class BatchModifyModule(Component): """Ticket batch modification module. This component allows multiple tickets to be modified in one request from the custom query page. For users with the TICKET_BATCH_MODIFY permission it will add a [TracBatchModify batch modify] section underneath custom query results. Users can choose which tickets and fields they wish to modify. """ implements(IPermissionRequestor, IRequestFilter, IRequestHandler) is_valid_default_handler = False ticket_manipulators = ExtensionPoint(ITicketManipulator) list_separator_re = re.compile(r'[;\s,]+') list_connector_string = ', ' # IRequestHandler methods def match_request(self, req): return req.path_info == '/batchmodify' def process_request(self, req): if req.method != 'POST': raise HTTPBadRequest(_("Invalid request arguments.")) req.perm.require('TICKET_BATCH_MODIFY') comment = req.args.get('batchmod_value_comment', '') action = req.args.get('action') # Get new ticket values from POST request. new_values = {} for field in TicketSystem(self.env).get_ticket_fields(): name = field['name'] if name not in ('id', 'resolution', 'status', 'owner', 'time', 'changetime', 'summary', 'description') + \ (('reporter',) if 'TICKET_ADMIN' not in req.perm else ()) \ and field['type'] != 'textarea': arg_name = 'batchmod_value_' + name if arg_name in req.args: new_values[name] = req.args.get(arg_name) selected_tickets = to_list(req.args.get('selected_tickets', '')) self._save_ticket_changes(req, selected_tickets, new_values, comment, action) # Always redirect back to the query page we came from req.redirect(req.args.get('query_href') or req.href.query()) # IRequestFilter methods def pre_process_request(self, req, handler): return handler def post_process_request(self, req, template, data, metadata): if req.path_info == '/query' and data is not None and \ 'TICKET_BATCH_MODIFY' in req.perm('ticket'): self.add_template_data(req, data, data['tickets']) chrome = Chrome(self.env) chrome.add_auto_preview(req) chrome.add_wiki_toolbars(req) return template, data, metadata # IPermissionRequestor methods def get_permission_actions(self): return ['TICKET_BATCH_MODIFY', ('TICKET_BATCH_MODIFY', ['TICKET_MODIFY']), ('TICKET_ADMIN', ['TICKET_BATCH_MODIFY'])] def add_template_data(self, req, data, tickets): data['batch_modify'] = True data['query_href'] = req.session['query_href'] or req.href.query() data['action_controls'] = self._get_action_controls(req, tickets) batch_list_modes = [ {'name': _("add"), 'value': "+"}, {'name': _("remove"), 'value': "-"}, {'name': _("add / remove"), 'value': "+-"}, {'name': _("set to"), 'value': "="}, ] add_script_data(req, batch_list_modes=batch_list_modes, batch_list_properties=self._get_list_fields()) def _get_list_fields(self): return [f['name'] for f in TicketSystem(self.env).get_ticket_fields() if f['type'] == 'text' and f.get('format') == 'list'] def _get_action_controls(self, req, ticket_data): tickets = [Ticket(self.env, t['id']) for t in ticket_data] action_weights = {} action_tickets = {} for t in tickets: for ctrl in TicketSystem(self.env).action_controllers: for weight, action in ctrl.get_ticket_actions(req, t) or []: if action in action_weights: action_weights[action] = max(action_weights[action], weight) action_tickets[action].append(t) else: action_weights[action] = weight action_tickets[action] = [t] sorted_actions = [a for a, w in sorted(iter(action_weights.items()), key=lambda item: (item[1], item[0]), reverse=True)] action_controls = [] for action in sorted_actions: first_label = None hints = [] widgets = [] ticket = action_tickets[action][0] for controller in self._get_action_controllers(req, ticket, action): label, widget, hint = controller.render_ticket_action_control( req, ticket, action) if not first_label: first_label = label widgets.append(widget) hints.append(hint) action_controls.append((action, first_label, tag(widgets), hints)) return action_controls def _get_action_controllers(self, req, ticket, action): """Generator yielding the controllers handling the given `action`""" for controller in TicketSystem(self.env).action_controllers: actions = [a for w, a in controller.get_ticket_actions(req, ticket) or []] if action in actions: yield controller def _get_updated_ticket_values(self, req, ticket, new_values): list_fields = self._get_list_fields() _values = new_values.copy() for field in list_fields: mode = req.args.get('batchmod_mode_' + field) if mode: old = ticket[field] if field in ticket else '' new = req.args.get('batchmod_primary_' + field, '') new2 = req.args.get('batchmod_secondary_' + field, '') _values[field] = self._change_list(old, new, new2, mode) return _values def _save_ticket_changes(self, req, selected_tickets, new_values, comment, action): """Save changes to tickets.""" valid = True for manipulator in self.ticket_manipulators: if hasattr(manipulator, 'validate_comment'): for message in manipulator.validate_comment(req, comment): valid = False add_warning(req, tag_("The ticket comment is invalid: " "%(message)s", message=message)) tickets = [] for id_ in selected_tickets: t = Ticket(self.env, id_) values = self._get_updated_ticket_values(req, t, new_values) for ctlr in self._get_action_controllers(req, t, action): values.update(ctlr.get_ticket_changes(req, t, action)) t.populate(values) for manipulator in self.ticket_manipulators: for field, message in manipulator.validate_ticket(req, t): valid = False if field: add_warning(req, tag_("The ticket field %(field)s is " "invalid: %(message)s", field=tag.strong(field), message=message)) else: add_warning(req, message) tickets.append(t) if not valid: return when = datetime_now(utc) with self.env.db_transaction: for t in tickets: t.save_changes(req.authname, comment, when=when) for ctlr in self._get_action_controllers(req, t, action): ctlr.apply_action_side_effects(req, t, action) event = BatchTicketChangeEvent(selected_tickets, when, req.authname, comment, new_values, action) try: NotificationSystem(self.env).notify(event) except Exception as e: self.log.error("Failure sending notification on ticket batch" "change: %s", exception_to_unicode(e)) add_warning(req, tag_("The changes have been saved, but an error " "occurred while sending notifications: " "%(message)s", message=to_unicode(e))) def _change_list(self, old_list, new_list, new_list2, mode): changed_list = to_list(old_list, self.list_separator_re) new_list = to_list(new_list, self.list_separator_re) new_list2 = to_list(new_list2, self.list_separator_re) if mode == '=': changed_list = new_list elif mode == '+': for entry in new_list: if entry not in changed_list: changed_list.append(entry) elif mode == '-': for entry in new_list: while entry in changed_list: changed_list.remove(entry) elif mode == '+-': for entry in new_list: if entry not in changed_list: changed_list.append(entry) for entry in new_list2: while entry in changed_list: changed_list.remove(entry) return self.list_connector_string.join(changed_list)