from rest_framework.exceptions import ValidationError from django.utils.translation import ugettext as _ from misago.threads.models import Poll class MergeConflictHandler(object): def __init__(self, threads): self.items = [] self.choices = {0: None} self._is_valid = False self._resolution = None self.threads = threads self.populate_from_threads(threads) if len(self.items) == 1: self._is_valid = True self._resolution = self.items[0] def populate_from_threads(self, threads): raise NotImplementedError('merge handler must define populate_from_threads') def is_merge_conflict(self): return len(self.items) > 1 def set_resolution(self, resolution): try: resolution_clean = int(resolution) except (TypeError, ValueError): return if resolution_clean in self.choices: self._resolution = self.choices[resolution_clean] self._is_valid = True def is_valid(self): return self._is_valid def get_resolution(self): return self._resolution or None class BestAnswerMergeHandler(MergeConflictHandler): data_name = 'best_answer' def populate_from_threads(self, threads): for thread in threads: if thread.best_answer_id: self.items.append(thread) self.choices[thread.pk] = thread self.items.sort(key=lambda choice: choice.id, reverse=True) def get_available_resolutions(self): resolutions = [[0, _("Unmark all best answers")]] for thread in self.items: resolutions.append([thread.pk, thread.title]) return resolutions class PollMergeHandler(MergeConflictHandler): data_name = 'poll' def populate_from_threads(self, threads): for thread in threads: try: self.items.append(thread.poll) self.choices[thread.poll.id] = thread.poll except Poll.DoesNotExist: pass self.items.sort(key=lambda choice: choice.thread_id, reverse=True) def get_available_resolutions(self): resolutions = [[0, _("Delete all polls")]] for poll in self.items: resolutions.append([poll.id, poll.question]) return resolutions class MergeConflict(object): """ Utility class single point of entry for detecting merge conflicts on different properties and validating user resolutions. """ HANDLERS = ( BestAnswerMergeHandler, PollMergeHandler, ) def __init__(self, data=None, threads=None): self.data = data or {} self._handlers = [Handler(threads) for Handler in self.HANDLERS] self._conflicts = [i for i in self._handlers if i.is_merge_conflict()] self.set_resolution(data) def is_merge_conflict(self): return bool(self._conflicts) def get_merge_conflict(self): return [i.data_name for i in self._conflicts] def set_resolution(self, data): for handler in self._conflicts: data = self.data.get(handler.data_name) handler.set_resolution(data) def is_valid(self, raise_exception=False): if raise_exception: self.raise_exception() return all([i.is_valid() for i in self._handlers]) def raise_exception(self): errors = {} for handler in self._conflicts: if self.data.get(handler.data_name) is None: key = '{}s'.format(handler.data_name) errors[key] = handler.get_available_resolutions() elif not handler.is_valid(): errors[handler.data_name] = [_("Invalid choice.")] if errors: raise ValidationError(errors) def get_resolution(self): resolved_handlers = [i for i in self._handlers if i.is_valid()] return {i.data_name: i.get_resolution() for i in resolved_handlers}