Server : Apache System : Linux iad1-shared-b8-43 6.6.49-grsec-jammy+ #10 SMP Thu Sep 12 23:23:08 UTC 2024 x86_64 User : dh_edsupp ( 6597262) PHP Version : 8.2.26 Disable Function : NONE Directory : /lib/python3/dist-packages/trac/ticket/ |
Upload File : |
# -*- coding: utf-8 -*- # # Copyright (C) 2004-2021 Edgewall Software # Copyright (C) 2004-2005 Christopher Lenz <cmlenz@gmx.de> # Copyright (C) 2006-2007 Christian Boos <cboos@edgewall.org> # All rights reserved. # # This software is licensed as described in the file COPYING, which # you should have received as part of this distribution. The terms # are also available at https://trac.edgewall.org/wiki/TracLicense. # # This software consists of voluntary contributions made by many # individuals. For the exact contribution history, see the revision # history and logs, available at https://trac.edgewall.org/log/. # # Author: Christopher Lenz <cmlenz@gmx.de> from datetime import datetime, timedelta import io import itertools import re from trac.attachment import Attachment, AttachmentModule from trac.config import ConfigSection, ExtensionOption, Option from trac.core import * from trac.notification.api import NotificationSystem from trac.perm import IPermissionRequestor from trac.resource import * from trac.search import ISearchSource, search_to_regexps, shorten_result from trac.util import as_bool, partition from trac.util.datefmt import (datetime_now, format_date, format_datetime, from_utimestamp, get_datetime_format_hint, parse_date, pretty_timedelta, to_datetime, user_time, utc) from trac.util.html import tag from trac.util.presentation import classes from trac.util.text import CRLF, exception_to_unicode, to_unicode from trac.util.translation import _, tag_ from trac.ticket.api import TicketSystem from trac.ticket.notification import BatchTicketChangeEvent from trac.ticket.model import Milestone, MilestoneCache, Ticket from trac.timeline.api import ITimelineEventProvider from trac.web.api import HTTPBadRequest, IRequestHandler, RequestDone from trac.web.chrome import (Chrome, INavigationContributor, accesskey, add_link, add_notice, add_stylesheet, add_warning, auth_link, prevnext_nav, web_context) from trac.wiki.api import IWikiSyntaxProvider from trac.wiki.formatter import format_to class ITicketGroupStatsProvider(Interface): def get_ticket_group_stats(ticket_ids): """ Gather statistics on a group of tickets. This method returns a valid `TicketGroupStats` object. """ class TicketGroupStats(object): """Encapsulates statistics on a group of tickets.""" def __init__(self, title, unit): """ :param title: the display name of this group of stats (e.g. ``'ticket status'``) :param unit: is the units for these stats in plural form, e.g. ``_('hours'``) """ self.title = title self.unit = unit self.count = 0 self.qry_args = {} self.intervals = [] self.done_percent = 0 self.done_count = 0 def add_interval(self, title, count, qry_args, css_class, overall_completion=None): """Adds a division to this stats' group's progress bar. :param title: the display name (e.g. ``'closed'``, ``'spent effort'``) of this interval that will be displayed in front of the unit name :param count: the number of units in the interval :param qry_args: a dict of extra params that will yield the subset of tickets in this interval on a query. :param css_class: is the css class that will be used to display the division :param overall_completion: can be set to true to make this interval count towards overall completion of this group of tickets. .. versionchanged :: 0.12 deprecated `countsToProg` argument was removed, use `overall_completion` instead """ self.intervals.append({ 'title': title, 'count': count, 'qry_args': qry_args, 'css_class': css_class, 'percent': None, 'overall_completion': overall_completion, }) self.count = self.count + count def refresh_calcs(self): if self.count < 1: return total_percent = 0 self.done_percent = 0 self.done_count = 0 for interval in self.intervals: interval['percent'] = round(float(interval['count'] / float(self.count) * 100)) total_percent = total_percent + interval['percent'] if interval['overall_completion']: self.done_percent += interval['percent'] self.done_count += interval['count'] # We want the percentages to add up to 100%. To do that, we fudge one # of the intervals. If we're < 100%, we add to the smallest non-zero # interval. If we're > 100%, we subtract from the largest interval. # The interval is adjusted to make the intervals sum to 100%. if self.done_count and total_percent != 100: fudge_amt = 100 - total_percent fudge_int = [i for i in sorted(self.intervals, key=lambda k: k['percent'], reverse=(fudge_amt < 0)) if i['percent']][0] fudge_int['percent'] += fudge_amt self.done_percent += fudge_amt class DefaultTicketGroupStatsProvider(Component): """Configurable ticket group statistics provider. See :teo:`TracIni#milestone-groups-section` for a detailed example configuration. """ implements(ITicketGroupStatsProvider) milestone_groups_section = ConfigSection('milestone-groups', """As the workflow for tickets is now configurable, there can be many ticket states, and simply displaying closed tickets vs. all the others is maybe not appropriate in all cases. This section enables one to easily create ''groups'' of states that will be shown in different colors in the milestone progress bar. Note that the groups can only be based on the ticket //status//, nothing else. In particular, it's not possible to distinguish between different closed tickets based on the //resolution//. Example configuration with three groups, //closed//, //new// and //active// (the default only has closed and active): {{{ # the 'closed' group correspond to the 'closed' tickets closed = closed # .order: sequence number in the progress bar closed.order = 0 # .query_args: optional parameters for the corresponding # query. In this example, the changes from the # default are two additional columns ('created' and # 'modified'), and sorting is done on 'created'. closed.query_args = group=resolution,order=time,col=id,col=summary,col=owner,col=type,col=priority,col=component,col=severity,col=time,col=changetime # .overall_completion: indicates groups that count for overall # completion percentage closed.overall_completion = true new = new new.order = 1 new.css_class = new new.label = new # Note: one catch-all group for other statuses is allowed active = * active.order = 2 # .css_class: CSS class for this interval active.css_class = open # .label: displayed label for this group active.label = in progress }}} The definition consists in a comma-separated list of accepted status. Also, '*' means any status and could be used to associate all remaining states to one catch-all group. The CSS class can be one of: new (yellow), open (no color) or closed (green). Other styles can easily be added using custom CSS rule: `table.progress td.<class> { background: <color> }` to a [TracInterfaceCustomization#SiteAppearance site/style.css] file for example. """) default_milestone_groups = [ {'name': 'closed', 'status': 'closed', 'query_args': 'group=resolution', 'overall_completion': 'true'}, {'name': 'active', 'status': '*', 'css_class': 'open'} ] def _get_ticket_groups(self): """Returns a list of dict describing the ticket groups in the expected order of appearance in the milestone progress bars. """ if 'milestone-groups' in self.config: groups = {} order = 0 for groupname, value in self.milestone_groups_section.options(): qualifier = 'status' if '.' in groupname: groupname, qualifier = groupname.split('.', 1) group = groups.setdefault(groupname, {'name': groupname, 'order': order}) group[qualifier] = value order = max(order, int(group['order'])) + 1 return sorted(groups.values(), key=lambda g: int(g['order'])) else: return self.default_milestone_groups def get_ticket_group_stats(self, ticket_ids): total_cnt = len(ticket_ids) all_statuses = set(TicketSystem(self.env).get_all_status()) status_cnt = {} for s in all_statuses: status_cnt[s] = 0 if total_cnt: for status, count in self.env.db_query(""" SELECT status, count(status) FROM ticket WHERE id IN (%s) GROUP BY status """ % ",".join(str(x) for x in sorted(ticket_ids))): status_cnt[status] = count stat = TicketGroupStats(_("ticket status"), _("tickets")) remaining_statuses = set(all_statuses) groups = self._get_ticket_groups() catch_all_group = None # we need to go through the groups twice, so that the catch up group # doesn't need to be the last one in the sequence for group in groups: status_str = group['status'].strip() if status_str == '*': if catch_all_group: raise TracError(_( "'%(group1)s' and '%(group2)s' milestone groups " "both are declared to be \"catch-all\" groups. " "Please check your configuration.", group1=group['name'], group2=catch_all_group['name'])) catch_all_group = group else: group_statuses = {s.strip() for s in status_str.split(',')} \ & all_statuses if group_statuses - remaining_statuses: raise TracError(_( "'%(groupname)s' milestone group reused status " "'%(status)s' already taken by other groups. " "Please check your configuration.", groupname=group['name'], status=', '.join(group_statuses - remaining_statuses))) else: remaining_statuses -= group_statuses group['statuses'] = group_statuses if catch_all_group: catch_all_group['statuses'] = remaining_statuses for group in groups: group_cnt = 0 query_args = {} for s, cnt in status_cnt.items(): if s in group['statuses']: group_cnt += cnt query_args.setdefault('status', []).append(s) if 'status' in query_args: query_args['status'].sort() for arg in [kv for kv in group.get('query_args', '').split(',') if '=' in kv]: k, v = [a.strip() for a in arg.split('=', 1)] query_args.setdefault(k, []).append(v) stat.add_interval(group.get('label', group['name']), group_cnt, query_args, group.get('css_class', group['name']), as_bool(group.get('overall_completion'))) stat.refresh_calcs() return stat def get_ticket_stats(provider, tickets): return provider.get_ticket_group_stats([t['id'] for t in tickets]) def get_tickets_for_milestone(env, milestone=None, field='component'): """Retrieve all tickets associated with the given `milestone`. """ fields = TicketSystem(env).get_ticket_fields() if field in [f['name'] for f in fields if not f.get('custom')]: sql = """SELECT id, status, %s FROM ticket WHERE milestone=%%s ORDER BY %s, id""" % (field, field) args = (milestone,) else: sql = """SELECT id, status, value FROM ticket LEFT OUTER JOIN ticket_custom ON (id=ticket AND name=%s) WHERE milestone=%s ORDER BY value, id""" args = (field, milestone) return [{'id': tkt_id, 'status': status, field: fieldval} for tkt_id, status, fieldval in env.db_query(sql, args)] def get_tickets_for_all_milestones(env, field='component'): with env.db_query as db: fields = TicketSystem(env).get_ticket_fields() if any(field == f['name'] and not f.get('custom') for f in fields): sql = """SELECT id, status, %(field)s, milestone FROM ticket WHERE milestone != '' ORDER BY milestone, %(field)s, id """ % {'field': db.quote(field)} args = () else: sql = """SELECT t.id, t.status, c.value, t.milestone FROM ticket AS t LEFT OUTER JOIN ticket_custom AS c ON (t.id=c.ticket AND c.name=%s) WHERE t.milestone != '' ORDER BY t.milestone, c.value, t.id""" args = (field,) cursor = db.cursor() cursor.execute(sql, args) results = {} for milestone, group in itertools.groupby(cursor, lambda row: row[3]): results[milestone] = [{'id': row[0], 'status': row[1], field: row[2]} for row in group] return results def get_num_tickets_for_milestone(env, milestone, exclude_closed=False): """Returns the number of tickets associated with the milestone. :param milestone: name of a milestone or a Milestone instance. :param exclude_closed: whether tickets with status 'closed' should be excluded from the count. Defaults to False. :since: 1.2 """ name = milestone.name if isinstance(milestone, Milestone) else milestone sql = "SELECT COUNT(*) FROM ticket WHERE milestone=%s" if exclude_closed: sql += " AND status != 'closed'" return env.db_query(sql, (name,))[0][0] def apply_ticket_permissions(env, req, tickets): """Apply permissions to a set of milestone tickets as returned by `get_tickets_for_milestone()`.""" return [t for t in tickets if 'TICKET_VIEW' in req.perm('ticket', t['id'])] def milestone_stats_data(env, req, stat, name, grouped_by='component', group=None): from trac.ticket.query import QueryModule has_query = env[QueryModule] is not None def query_href(extra_args): if not has_query: return None args = {'milestone': name, grouped_by: group, 'group': 'status'} args.update(extra_args) return req.href.query(args) return {'stats': stat, 'stats_href': query_href(stat.qry_args), 'interval_hrefs': [query_href(interval['qry_args']) for interval in stat.intervals]} def grouped_stats_data(env, stats_provider, tickets, by, per_group_stats_data): """Get the `tickets` stats data grouped by ticket field `by`. `per_group_stats_data(gstat, group_name)` should return a data dict to include for the group with field value `group_name`. """ group_names = [] for field in TicketSystem(env).get_ticket_fields(): if field['name'] == by: if 'options' in field: group_names = field['options'] if field.get('optional'): group_names.insert(0, '') elif field.get('custom'): group_names = [name for name, in env.db_query(""" SELECT DISTINCT COALESCE(c.value, '') FROM ticket_custom c WHERE c.name=%s ORDER BY COALESCE(c.value, '') """, (by, ))] if '' not in group_names: group_names.insert(0, '') else: group_names = [name for name, in env.db_query(""" SELECT DISTINCT COALESCE(%s, '') FROM ticket ORDER BY COALESCE(%s, '') """ % (by, by))] max_count = 0 data = [] for name in group_names: values = (name,) if name else (None, name) group_tickets = [t for t in tickets if t[by] in values] if not group_tickets: continue gstat = get_ticket_stats(stats_provider, group_tickets) if gstat.count > max_count: max_count = gstat.count gs_dict = {'name': name} gs_dict.update(per_group_stats_data(gstat, name)) data.append(gs_dict) for gs_dict in data: percent = 1.0 if max_count: gstat = gs_dict['stats'] percent = float(gstat.count) / float(max_count) * 100 gs_dict['percent_of_max_total'] = percent return data def group_milestones(milestones, include_completed): """Group milestones into "open with due date", "open with no due date", and possibly "completed". Return a list of (label, milestones) tuples.""" def category(m): return 1 if m.is_completed else 2 if m.due else 3 open_due_milestones, open_not_due_milestones, \ closed_milestones = partition([(m, category(m)) for m in milestones], (2, 3, 1)) groups = [ (_("Open (by due date)"), open_due_milestones), (_("Open (no due date)"), open_not_due_milestones), ] if include_completed: groups.append((_('Closed'), closed_milestones)) return groups class RoadmapModule(Component): """Overview of all the milestones.""" implements(INavigationContributor, IPermissionRequestor, IRequestHandler) stats_provider = ExtensionOption('roadmap', 'stats_provider', ITicketGroupStatsProvider, 'DefaultTicketGroupStatsProvider', """Name of the component implementing `ITicketGroupStatsProvider`, which is used to collect statistics on groups of tickets for display in the roadmap views.""") # INavigationContributor methods def get_active_navigation_item(self, req): return 'roadmap' def get_navigation_items(self, req): if 'ROADMAP_VIEW' in req.perm: yield ('mainnav', 'roadmap', tag.a(_('Roadmap'), href=req.href.roadmap(), accesskey=accesskey(req, 3))) # IPermissionRequestor methods def get_permission_actions(self): actions = ['MILESTONE_CREATE', 'MILESTONE_DELETE', 'MILESTONE_MODIFY', 'MILESTONE_VIEW', 'ROADMAP_VIEW'] return ['ROADMAP_VIEW'] + [('ROADMAP_ADMIN', actions)] # IRequestHandler methods def match_request(self, req): return req.path_info == '/roadmap' def process_request(self, req): req.perm.require('ROADMAP_VIEW') show = req.args.getlist('show') if 'all' in show: show = ['completed'] milestones = Milestone.select(self.env, 'completed' in show) if 'noduedate' in show: milestones = [m for m in milestones if m.due is not None or m.completed] milestones = [m for m in milestones if 'MILESTONE_VIEW' in req.perm(m.resource)] stats = [] queries = [] all_tickets = get_tickets_for_all_milestones(self.env, field='owner') for milestone in milestones: tickets = all_tickets.get(milestone.name) or [] tickets = apply_ticket_permissions(self.env, req, tickets) stat = get_ticket_stats(self.stats_provider, tickets) stats.append(milestone_stats_data(self.env, req, stat, milestone.name)) # milestone['tickets'] = tickets # for the iCalendar view if req.args.get('format') == 'ics': self._render_ics(req, milestones) return # FIXME should use the 'webcal:' scheme, probably username = None if req.is_authenticated: username = req.authname icshref = req.href.roadmap(show=show, user=username, format='ics') add_link(req, 'alternate', auth_link(req, icshref), _("iCalendar"), 'text/calendar', 'ics') data = { 'milestones': milestones, 'milestone_stats': stats, 'queries': queries, 'show': show, } add_stylesheet(req, 'common/css/roadmap.css') return 'roadmap.html', data # Internal methods def _render_ics(self, req, milestones): req.send_response(200) req.send_header('Content-Type', 'text/calendar;charset=utf-8') buf = io.StringIO() from trac.ticket import Priority priorities = {} for priority in Priority.select(self.env): priorities[priority.name] = float(priority.value) def get_priority(ticket): value = priorities.get(ticket['priority']) if value: return int((len(priorities) + 8 * value - 9) / (len(priorities) - 1)) def get_status(ticket): status = ticket['status'] if status == 'new' or status == 'reopened' and \ not ticket['owner']: return 'NEEDS-ACTION' elif status in ('assigned', 'reopened'): return 'IN-PROCESS' elif status == 'closed': if ticket['resolution'] == 'fixed': return 'COMPLETED' else: return 'CANCELLED' else: return '' def escape_value(text): s = ''.join(map(lambda c: '\\' + c if c in ';,\\' else c, text)) return '\\n'.join(re.split(r'[\r\n]+', s)) def write_prop(name, value, params={}): text = ';'.join([name] + [k + '=' + v for k, v in params.items()]) + \ ':' + escape_value(value) firstline = 1 text = to_unicode(text) while text: if not firstline: text = ' ' + text else: firstline = 0 buf.write(text[:75] + CRLF) text = text[75:] def write_date(name, value, params={}): params['VALUE'] = 'DATE' write_prop(name, format_date(value, '%Y%m%d', req.tz), params) def write_utctime(name, value, params={}): write_prop(name, format_datetime(value, '%Y%m%dT%H%M%SZ', utc), params) host = req.base_url[req.base_url.find('://') + 3:] user = req.args.get('user', 'anonymous') write_prop('BEGIN', 'VCALENDAR') write_prop('VERSION', '2.0') write_prop('PRODID', '-//Edgewall Software//NONSGML Trac %s//EN' % self.env.trac_version) write_prop('METHOD', 'PUBLISH') write_prop('X-WR-CALNAME', self.env.project_name + ' - ' + _("Roadmap")) write_prop('X-WR-CALDESC', self.env.project_description) write_prop('X-WR-TIMEZONE', str(req.tz)) all_tickets = get_tickets_for_all_milestones(self.env, field='owner') for milestone in milestones: uid = '<%s/milestone/%s@%s>' % (req.base_path, milestone.name, host) if milestone.due: write_prop('BEGIN', 'VEVENT') write_prop('UID', uid) write_utctime('DTSTAMP', milestone.due) write_date('DTSTART', milestone.due) write_prop('SUMMARY', _("Milestone %(name)s", name=milestone.name)) write_prop('URL', req.abs_href.milestone(milestone.name)) if milestone.description: write_prop('DESCRIPTION', milestone.description) write_prop('END', 'VEVENT') tickets = all_tickets.get(milestone.name) or [] tickets = apply_ticket_permissions(self.env, req, tickets) for tkt_id in [ticket['id'] for ticket in tickets if ticket['owner'] == user]: ticket = Ticket(self.env, tkt_id) write_prop('BEGIN', 'VTODO') write_prop('UID', '<%s/ticket/%s@%s>' % (req.base_path, tkt_id, host)) if milestone.due: write_prop('RELATED-TO', uid) write_date('DUE', milestone.due) write_prop('SUMMARY', _("Ticket #%(num)s: %(summary)s", num=ticket.id, summary=ticket['summary'])) write_prop('URL', req.abs_href.ticket(ticket.id)) write_prop('DESCRIPTION', ticket['description']) priority = get_priority(ticket) if priority: write_prop('PRIORITY', str(priority)) write_prop('STATUS', get_status(ticket)) if ticket['status'] == 'closed': for time, in self.env.db_query(""" SELECT time FROM ticket_change WHERE ticket=%s AND field='status' ORDER BY time desc LIMIT 1 """, (ticket.id,)): write_utctime('COMPLETED', from_utimestamp(time)) write_prop('END', 'VTODO') write_prop('END', 'VCALENDAR') ics_str = buf.getvalue().encode('utf-8') req.send_header('Content-Length', len(ics_str)) req.end_headers() req.write(ics_str) raise RequestDone class MilestoneModule(Component): """View and edit individual milestones.""" implements(INavigationContributor, IPermissionRequestor, IRequestHandler, IResourceManager, ISearchSource, ITimelineEventProvider, IWikiSyntaxProvider) realm = 'milestone' stats_provider = ExtensionOption('milestone', 'stats_provider', ITicketGroupStatsProvider, 'DefaultTicketGroupStatsProvider', """Name of the component implementing `ITicketGroupStatsProvider`, which is used to collect statistics on groups of tickets for display in the milestone views.""") default_retarget_to = Option('milestone', 'default_retarget_to', doc="""Default milestone to which tickets are retargeted when closing or deleting a milestone. (''since 1.1.2'')""") default_group_by = Option('milestone', 'default_group_by', 'component', """Default field to use for grouping tickets in the grouped progress bar. (''since 1.2'')""") # INavigationContributor methods def get_active_navigation_item(self, req): return 'roadmap' def get_navigation_items(self, req): return [] # IPermissionRequestor methods def get_permission_actions(self): actions = ['MILESTONE_CREATE', 'MILESTONE_DELETE', 'MILESTONE_MODIFY', 'MILESTONE_VIEW'] return actions + [('MILESTONE_ADMIN', actions)] # ITimelineEventProvider methods def get_timeline_filters(self, req): if 'MILESTONE_VIEW' in req.perm: yield ('milestone', _("Milestones completed")) def get_timeline_events(self, req, start, stop, filters): if 'milestone' in filters: milestone_realm = Resource(self.realm) for name, due, completed, description \ in MilestoneCache(self.env).milestones.values(): if completed and start <= completed <= stop: # TODO: creation and (later) modifications should also be # reported milestone = milestone_realm(id=name) if 'MILESTONE_VIEW' in req.perm(milestone): yield ('milestone', completed, '', # FIXME: author? (milestone, description)) # Attachments for event in AttachmentModule(self.env).get_timeline_events( req, milestone_realm, start, stop): yield event def render_timeline_event(self, context, field, event): milestone, description = event[3] if field == 'url': return context.href.milestone(milestone.id) elif field == 'title': return tag_("Milestone %(name)s completed", name=tag.em(milestone.id)) elif field == 'description': child_resource = context.child(resource=milestone) return format_to(self.env, None, child_resource, description) # IRequestHandler methods def match_request(self, req): match = re.match(r'/milestone(?:/(.+))?$', req.path_info) if match: if match.group(1): req.args['id'] = match.group(1) return True def process_request(self, req): milestone_id = req.args.get('id') action = req.args.get('action', 'view') if not milestone_id and action == 'view': req.redirect(req.href.roadmap()) req.perm(self.realm, milestone_id).require('MILESTONE_VIEW') add_link(req, 'up', req.href.roadmap(), _("Roadmap")) try: milestone = Milestone(self.env, milestone_id) except ResourceNotFound: if 'MILESTONE_CREATE' not in req.perm(self.realm, milestone_id): raise milestone = Milestone(self.env) milestone.name = milestone_id action = 'edit' # rather than 'new', so it works for POST/save if req.method == 'POST': if 'cancel' in req.args: if milestone.exists: req.redirect(req.href.milestone(milestone.name)) else: req.redirect(req.href.roadmap()) elif action == 'edit': return self._do_save(req, milestone) elif action == 'delete': self._do_delete(req, milestone) else: raise HTTPBadRequest(_("Invalid request arguments.")) elif action in ('new', 'edit'): return self._render_editor(req, milestone) elif action == 'delete': return self._render_confirm(req, milestone) if not milestone.name: req.redirect(req.href.roadmap()) return self._render_view(req, milestone) # Public methods def get_default_due(self, req): """Returns a `datetime` object representing the default due date in the user's timezone. The default due time is 18:00 in the user's time zone. """ now = datetime_now(req.tz) default_due = datetime(now.year, now.month, now.day, 18) if now.hour > 18: default_due += timedelta(days=1) return to_datetime(default_due, req.tz) def save_milestone(self, req, milestone): # Instead of raising one single error, check all the constraints # and let the user fix them by going back to edit mode and showing # the warnings warnings = [] def warn(msg): add_warning(req, msg) warnings.append(msg) milestone.description = req.args.get('description', '') if 'due' in req.args: duedate = req.args.get('duedate') milestone.due = user_time(req, parse_date, duedate, hint='datetime') \ if duedate else None else: milestone.due = None # -- check completed date if 'completed' in req.args: completed = req.args.get('completeddate', '') completed = user_time(req, parse_date, completed, hint='datetime') if completed else None if completed and completed > datetime_now(utc): warn(_("Completion date may not be in the future")) else: completed = None milestone.completed = completed # -- check the name # If the name has changed, check that the milestone doesn't already # exist # FIXME: the whole .exists business needs to be clarified # (#4130) and should behave like a WikiPage does in # this respect. new_name = req.args.get('name') try: new_milestone = Milestone(self.env, new_name) except ResourceNotFound: milestone.name = new_name else: if new_milestone.name != milestone.name: if new_milestone.name: warn(_('Milestone "%(name)s" already exists, please ' 'choose another name.', name=new_milestone.name)) else: warn(_("You must provide a name for the milestone.")) if warnings: return False # -- actually save changes if milestone.exists: try: milestone.update(author=req.authname) except ResourceExistsError: raise ResourceExistsError(_('Milestone "%(name)s" already ' 'exists, please choose another ' 'name.', name=milestone.name)) if completed and 'retarget' in req.args: comment = req.args.get('comment', '') retarget_to = req.args.get('target') or None retargeted_tickets = \ milestone.move_tickets(retarget_to, req.authname, comment, exclude_closed=True) add_notice(req, _('The open tickets associated with ' 'milestone "%(name)s" have been retargeted ' 'to milestone "%(retarget)s".', name=milestone.name, retarget=retarget_to)) new_values = {'milestone': retarget_to} comment = comment or \ _("Open tickets retargeted after milestone closed") event = BatchTicketChangeEvent(retargeted_tickets, None, req.authname, comment, new_values, None) try: NotificationSystem(self.env).notify(event) except Exception as e: self.log.error("Failure sending notification on ticket " "batch change: %s", exception_to_unicode(e)) add_warning(req, tag_("The changes have been saved, but " "an error occurred while sending " "notifications: %(message)s", message=to_unicode(e))) add_notice(req, _("Your changes have been saved.")) else: try: milestone.insert() except ResourceExistsError: raise ResourceExistsError(_('Milestone "%(name)s" already ' 'exists, please choose another ' 'name.', name=milestone.name)) add_notice(req, _('The milestone "%(name)s" has been added.', name=milestone.name)) return True # Internal methods _default_retarget_to = default_retarget_to @property def default_retarget_to(self): if self._default_retarget_to and \ not any(self._default_retarget_to == m.name for m in Milestone.select(self.env)): self.log.warning('Milestone "%s" does not exist. Update the ' '"default_retarget_to" option in the ' '[milestone] section of trac.ini', self._default_retarget_to) return self._default_retarget_to def _do_delete(self, req, milestone): req.perm(milestone.resource).require('MILESTONE_DELETE') retarget_to = req.args.get('target') or None # Don't translate ticket comment (comment:40:ticket:5658) retargeted_tickets = \ milestone.move_tickets(retarget_to, req.authname, "Ticket retargeted after milestone deleted") milestone.delete() add_notice(req, _('The milestone "%(name)s" has been deleted.', name=milestone.name)) if retargeted_tickets: add_notice(req, _('The tickets associated with milestone ' '"%(name)s" have been retargeted to milestone ' '"%(retarget)s".', name=milestone.name, retarget=retarget_to)) new_values = {'milestone': retarget_to} comment = _("Tickets retargeted after milestone deleted") event = BatchTicketChangeEvent(retargeted_tickets, None, req.authname, comment, new_values, None) try: NotificationSystem(self.env).notify(event) except Exception as e: self.log.error("Failure sending notification on ticket batch " "change: %s", exception_to_unicode(e)) add_warning(req, tag_("The changes have been saved, but an " "error occurred while sending " "notifications: %(message)s", message=to_unicode(e))) req.redirect(req.href.roadmap()) def _do_save(self, req, milestone): if milestone.exists: req.perm(milestone.resource).require('MILESTONE_MODIFY') else: req.perm(milestone.resource).require('MILESTONE_CREATE') if self.save_milestone(req, milestone): req.redirect(req.href.milestone(milestone.name)) return self._render_editor(req, milestone) def _render_confirm(self, req, milestone): req.perm(milestone.resource).require('MILESTONE_DELETE') milestones = [m for m in Milestone.select(self.env) if m.name != milestone.name and 'MILESTONE_VIEW' in req.perm(m.resource)] attachments = Attachment.select(self.env, self.realm, milestone.name) data = { 'milestone': milestone, 'milestone_groups': group_milestones(milestones, 'TICKET_ADMIN' in req.perm), 'num_tickets': get_num_tickets_for_milestone(self.env, milestone), 'retarget_to': self.default_retarget_to, 'attachments': list(attachments) } add_stylesheet(req, 'common/css/roadmap.css') return 'milestone_delete.html', data def _render_editor(self, req, milestone): data = { 'milestone': milestone, 'datetime_hint': get_datetime_format_hint(req.lc_time), 'default_due': self.get_default_due(req), 'milestone_groups': [], } if milestone.exists: req.perm(milestone.resource).require('MILESTONE_MODIFY') milestones = [m for m in Milestone.select(self.env) if m.name != milestone.name and 'MILESTONE_VIEW' in req.perm(m.resource)] data['milestone_groups'] = \ group_milestones(milestones, 'TICKET_ADMIN' in req.perm) data['num_open_tickets'] = \ get_num_tickets_for_milestone(self.env, milestone, exclude_closed=True) data['retarget_to'] = self.default_retarget_to else: req.perm(milestone.resource).require('MILESTONE_CREATE') if milestone.name: add_notice(req, _("Milestone %(name)s does not exist. You " "can create it here.", name=milestone.name)) chrome = Chrome(self.env) chrome.add_jquery_ui(req) chrome.add_wiki_toolbars(req) chrome.add_auto_preview(req) add_stylesheet(req, 'common/css/roadmap.css') return 'milestone_edit.html', data def _render_view(self, req, milestone): milestone_groups = [] available_groups = [] default_group_by_available = False ticket_fields = TicketSystem(self.env).get_ticket_fields() # collect fields that can be used for grouping for field in ticket_fields: if field['type'] == 'select' and field['name'] != 'milestone' \ or field['name'] in ('owner', 'reporter'): available_groups.append({'name': field['name'], 'label': field['label']}) if field['name'] == self.default_group_by: default_group_by_available = True # determine the field currently used for grouping by = None if default_group_by_available: by = self.default_group_by elif available_groups: by = available_groups[0]['name'] by = req.args.getfirst('by', by) tickets = get_tickets_for_milestone(self.env, milestone=milestone.name, field=by) tickets = apply_ticket_permissions(self.env, req, tickets) stat = get_ticket_stats(self.stats_provider, tickets) context = web_context(req, milestone.resource) data = { 'context': context, 'milestone': milestone, 'attachments': AttachmentModule(self.env).attachment_data(context), 'available_groups': available_groups, 'grouped_by': by, 'groups': milestone_groups } data.update(milestone_stats_data(self.env, req, stat, milestone.name)) if by: def per_group_stats_data(gstat, group_name): return milestone_stats_data(self.env, req, gstat, milestone.name, by, group_name) milestone_groups.extend( grouped_stats_data(self.env, self.stats_provider, tickets, by, per_group_stats_data)) add_stylesheet(req, 'common/css/roadmap.css') def add_milestone_link(rel, milestone): href = req.href.milestone(milestone.name, by=req.args.get('by')) add_link(req, rel, href, _('Milestone "%(name)s"', name=milestone.name)) milestones = [m for m in Milestone.select(self.env) if 'MILESTONE_VIEW' in req.perm(m.resource)] idx = [i for i, m in enumerate(milestones) if m.name == milestone.name] if idx: idx = idx[0] if idx > 0: add_milestone_link('first', milestones[0]) add_milestone_link('prev', milestones[idx - 1]) if idx < len(milestones) - 1: add_milestone_link('next', milestones[idx + 1]) add_milestone_link('last', milestones[-1]) prevnext_nav(req, _("Previous Milestone"), _("Next Milestone"), _("Back to Roadmap")) return 'milestone_view.html', data # IWikiSyntaxProvider methods def get_wiki_syntax(self): return [] def get_link_resolvers(self): yield ('milestone', self._format_link) def _format_link(self, formatter, ns, name, label): name, query, fragment = formatter.split_link(name) return self._render_link(formatter.context, name, label, query + fragment) def _render_link(self, context, name, label, extra=''): if not (name or extra): return tag() try: milestone = Milestone(self.env, name) except ResourceNotFound: milestone = None # Note: the above should really not be needed, `Milestone.exists` # should simply be false if the milestone doesn't exist in the db # (related to #4130) href = context.href.milestone(name) exists = milestone and milestone.exists if exists: if 'MILESTONE_VIEW' in context.perm(milestone.resource): title = None if hasattr(context, 'req'): if milestone.is_completed: title = _( "Completed %(duration)s ago (%(date)s)", duration=pretty_timedelta(milestone.completed), date=user_time(context.req, format_datetime, milestone.completed)) elif milestone.is_late: title = _("%(duration)s late (%(date)s)", duration=pretty_timedelta(milestone.due), date=user_time(context.req, format_datetime, milestone.due)) elif milestone.due: title = _("Due in %(duration)s (%(date)s)", duration=pretty_timedelta(milestone.due), date=user_time(context.req, format_datetime, milestone.due)) else: title = _("No date set") closed = 'closed ' if milestone.is_completed else '' return tag.a(label, class_='%smilestone' % closed, href=href + extra, title=title) elif 'MILESTONE_CREATE' in context.perm(self.realm, name): return tag.a(label, class_='missing milestone', href=href + extra, rel='nofollow') return tag.a(label, class_=classes('milestone', missing=not exists)) # IResourceManager methods def get_resource_realms(self): yield self.realm def get_resource_description(self, resource, format=None, context=None, **kwargs): desc = resource.id if format != 'compact': desc = _("Milestone %(name)s", name=resource.id) if context: return self._render_link(context, resource.id, desc) else: return desc def resource_exists(self, resource): """ >>> from trac.test import EnvironmentStub >>> env = EnvironmentStub() >>> m1 = Milestone(env) >>> m1.name = 'M1' >>> m1.insert() >>> MilestoneModule(env).resource_exists(Resource('milestone', 'M1')) True >>> MilestoneModule(env).resource_exists(Resource('milestone', 'M2')) False """ return resource.id in MilestoneCache(self.env).milestones # ISearchSource methods def get_search_filters(self, req): if 'MILESTONE_VIEW' in req.perm: yield ('milestone', _("Milestones")) def get_search_results(self, req, terms, filters): if 'milestone' not in filters: return term_regexps = search_to_regexps(terms) milestone_realm = Resource(self.realm) for name, due, completed, description \ in MilestoneCache(self.env).milestones.values(): if all(r.search(description) or r.search(name) for r in term_regexps): milestone = milestone_realm(id=name) if 'MILESTONE_VIEW' in req.perm(milestone): dt = (completed if completed else due if due else datetime_now(utc)) yield (get_resource_url(self.env, milestone, req.href), get_resource_name(self.env, milestone), dt, '', shorten_result(description, terms)) # Attachments for result in AttachmentModule(self.env).get_search_results( req, milestone_realm, terms): yield result