test_thread_merge_api.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. from django.urls import reverse
  2. from misago.acl.testutils import override_acl
  3. from misago.categories.models import Category
  4. from misago.threads import testutils
  5. from misago.threads.models import Poll, PollVote, Thread
  6. from .test_threads_api import ThreadsApiTestCase
  7. class ThreadMergeApiTests(ThreadsApiTestCase):
  8. def setUp(self):
  9. super(ThreadMergeApiTests, self).setUp()
  10. Category(
  11. name='Category B',
  12. slug='category-b',
  13. ).insert_at(
  14. self.category,
  15. position='last-child',
  16. save=True,
  17. )
  18. self.category_b = Category.objects.get(slug='category-b')
  19. self.api_link = reverse(
  20. 'misago:api:thread-merge', kwargs={
  21. 'pk': self.thread.pk,
  22. }
  23. )
  24. def override_other_acl(self, acl=None):
  25. other_category_acl = self.user.acl_cache['categories'][self.category.pk].copy()
  26. other_category_acl.update({
  27. 'can_see': 1,
  28. 'can_browse': 1,
  29. 'can_see_all_threads': 1,
  30. 'can_see_own_threads': 0,
  31. 'can_hide_threads': 0,
  32. 'can_approve_content': 0,
  33. 'can_edit_posts': 0,
  34. 'can_hide_posts': 0,
  35. 'can_hide_own_posts': 0,
  36. 'can_merge_threads': 0,
  37. })
  38. if acl:
  39. other_category_acl.update(acl)
  40. categories_acl = self.user.acl_cache['categories']
  41. categories_acl[self.category_b.pk] = other_category_acl
  42. visible_categories = [self.category.pk]
  43. if other_category_acl['can_see']:
  44. visible_categories.append(self.category_b.pk)
  45. override_acl(
  46. self.user, {
  47. 'visible_categories': visible_categories,
  48. 'categories': categories_acl,
  49. }
  50. )
  51. def test_merge_no_permission(self):
  52. """api validates if thread can be merged with other one"""
  53. self.override_acl({'can_merge_threads': 0})
  54. response = self.client.post(self.api_link)
  55. self.assertContains(
  56. response,
  57. "You don't have permission to merge this thread with others.",
  58. status_code=403
  59. )
  60. def test_merge_no_url(self):
  61. """api validates if thread url was given"""
  62. self.override_acl({'can_merge_threads': 1})
  63. response = self.client.post(self.api_link)
  64. self.assertContains(response, "This is not a valid thread link.", status_code=400)
  65. def test_invalid_url(self):
  66. """api validates thread url"""
  67. self.override_acl({'can_merge_threads': 1})
  68. response = self.client.post(self.api_link, {
  69. 'thread_url': self.user.get_absolute_url(),
  70. })
  71. self.assertContains(response, "This is not a valid thread link.", status_code=400)
  72. def test_current_thread_url(self):
  73. """api validates if thread url given is to current thread"""
  74. self.override_acl({'can_merge_threads': 1})
  75. response = self.client.post(
  76. self.api_link, {
  77. 'thread_url': self.thread.get_absolute_url(),
  78. }
  79. )
  80. self.assertContains(response, "You can't merge thread with itself.", status_code=400)
  81. def test_other_thread_exists(self):
  82. """api validates if other thread exists"""
  83. self.override_acl({'can_merge_threads': 1})
  84. self.override_other_acl()
  85. other_thread = testutils.post_thread(self.category_b)
  86. other_thread_url = other_thread.get_absolute_url()
  87. other_thread.delete()
  88. response = self.client.post(self.api_link, {
  89. 'thread_url': other_thread_url,
  90. })
  91. self.assertContains(
  92. response, "The thread you have entered link to doesn't exist", status_code=400
  93. )
  94. def test_other_thread_is_invisible(self):
  95. """api validates if other thread is visible"""
  96. self.override_acl({'can_merge_threads': 1})
  97. self.override_other_acl({'can_see': 0})
  98. other_thread = testutils.post_thread(self.category_b)
  99. response = self.client.post(
  100. self.api_link, {
  101. 'thread_url': other_thread.get_absolute_url(),
  102. }
  103. )
  104. self.assertContains(
  105. response, "The thread you have entered link to doesn't exist", status_code=400
  106. )
  107. def test_other_thread_isnt_mergeable(self):
  108. """api validates if other thread can be merged"""
  109. self.override_acl({'can_merge_threads': 1})
  110. self.override_other_acl({'can_merge_threads': 0})
  111. other_thread = testutils.post_thread(self.category_b)
  112. response = self.client.post(
  113. self.api_link, {
  114. 'thread_url': other_thread.get_absolute_url(),
  115. }
  116. )
  117. self.assertContains(
  118. response, "You don't have permission to merge this thread", status_code=400
  119. )
  120. def test_other_thread_isnt_replyable(self):
  121. """api validates if other thread can be replied, which is condition for merg"""
  122. self.override_acl({'can_merge_threads': 1})
  123. self.override_other_acl({'can_reply_threads': 0})
  124. other_thread = testutils.post_thread(self.category_b)
  125. response = self.client.post(
  126. self.api_link, {
  127. 'thread_url': other_thread.get_absolute_url(),
  128. }
  129. )
  130. self.assertContains(
  131. response, "You can't merge this thread into thread you can't reply.", status_code=400
  132. )
  133. def test_merge_threads(self):
  134. """api merges two threads successfully"""
  135. self.override_acl({'can_merge_threads': 1})
  136. self.override_other_acl({'can_merge_threads': 1})
  137. other_thread = testutils.post_thread(self.category_b)
  138. response = self.client.post(
  139. self.api_link, {
  140. 'thread_url': other_thread.get_absolute_url(),
  141. }
  142. )
  143. self.assertContains(response, other_thread.get_absolute_url(), status_code=200)
  144. # other thread has two posts now
  145. self.assertEqual(other_thread.post_set.count(), 3)
  146. # first thread is gone
  147. with self.assertRaises(Thread.DoesNotExist):
  148. Thread.objects.get(pk=self.thread.pk)
  149. def test_merge_threads_kept_poll(self):
  150. """api merges two threads successfully, keeping poll from old thread"""
  151. self.override_acl({'can_merge_threads': 1})
  152. self.override_other_acl({'can_merge_threads': 1})
  153. other_thread = testutils.post_thread(self.category_b)
  154. poll = testutils.post_poll(other_thread, self.user)
  155. response = self.client.post(
  156. self.api_link, {
  157. 'thread_url': other_thread.get_absolute_url(),
  158. }
  159. )
  160. self.assertContains(response, other_thread.get_absolute_url(), status_code=200)
  161. # other thread has two posts now
  162. self.assertEqual(other_thread.post_set.count(), 3)
  163. # first thread is gone
  164. with self.assertRaises(Thread.DoesNotExist):
  165. Thread.objects.get(pk=self.thread.pk)
  166. # poll and its votes were kept
  167. self.assertEqual(Poll.objects.filter(pk=poll.pk, thread=other_thread).count(), 1)
  168. self.assertEqual(PollVote.objects.filter(poll=poll, thread=other_thread).count(), 4)
  169. def test_merge_threads_moved_poll(self):
  170. """api merges two threads successfully, moving poll from other thread"""
  171. self.override_acl({'can_merge_threads': 1})
  172. self.override_other_acl({'can_merge_threads': 1})
  173. other_thread = testutils.post_thread(self.category_b)
  174. poll = testutils.post_poll(self.thread, self.user)
  175. response = self.client.post(
  176. self.api_link, {
  177. 'thread_url': other_thread.get_absolute_url(),
  178. }
  179. )
  180. self.assertContains(response, other_thread.get_absolute_url(), status_code=200)
  181. # other thread has two posts now
  182. self.assertEqual(other_thread.post_set.count(), 3)
  183. # first thread is gone
  184. with self.assertRaises(Thread.DoesNotExist):
  185. Thread.objects.get(pk=self.thread.pk)
  186. # poll and its votes were moved
  187. self.assertEqual(Poll.objects.filter(pk=poll.pk, thread=other_thread).count(), 1)
  188. self.assertEqual(PollVote.objects.filter(poll=poll, thread=other_thread).count(), 4)
  189. def test_threads_merge_conflict(self):
  190. """api errors on merge conflict, returning list of available polls"""
  191. self.override_acl({'can_merge_threads': 1})
  192. self.override_other_acl({'can_merge_threads': 1})
  193. other_thread = testutils.post_thread(self.category_b)
  194. poll = testutils.post_poll(self.thread, self.user)
  195. other_poll = testutils.post_poll(other_thread, self.user)
  196. response = self.client.post(
  197. self.api_link, {
  198. 'thread_url': other_thread.get_absolute_url(),
  199. }
  200. )
  201. self.assertEqual(response.status_code, 400)
  202. self.assertEqual(
  203. response.json(), {
  204. 'polls': [
  205. [0, "Delete all polls"],
  206. [poll.pk, poll.question],
  207. [other_poll.pk, other_poll.question],
  208. ]
  209. }
  210. )
  211. # polls and votes were untouched
  212. self.assertEqual(Poll.objects.count(), 2)
  213. self.assertEqual(PollVote.objects.count(), 8)
  214. def test_threads_merge_conflict_invalid_resolution(self):
  215. """api errors on invalid merge conflict resolution"""
  216. self.override_acl({'can_merge_threads': 1})
  217. self.override_other_acl({'can_merge_threads': 1})
  218. other_thread = testutils.post_thread(self.category_b)
  219. testutils.post_poll(self.thread, self.user)
  220. testutils.post_poll(other_thread, self.user)
  221. response = self.client.post(
  222. self.api_link, {
  223. 'thread_url': other_thread.get_absolute_url(),
  224. 'poll': 'jhdkajshdsak',
  225. }
  226. )
  227. self.assertEqual(response.status_code, 400)
  228. self.assertEqual(response.json(), {'detail': "Invalid choice."})
  229. # polls and votes were untouched
  230. self.assertEqual(Poll.objects.count(), 2)
  231. self.assertEqual(PollVote.objects.count(), 8)
  232. def test_threads_merge_conflict_delete_all(self):
  233. """api deletes all polls when delete all choice is selected"""
  234. self.override_acl({'can_merge_threads': 1})
  235. self.override_other_acl({'can_merge_threads': 1})
  236. other_thread = testutils.post_thread(self.category_b)
  237. testutils.post_poll(self.thread, self.user)
  238. testutils.post_poll(other_thread, self.user)
  239. response = self.client.post(
  240. self.api_link, {
  241. 'thread_url': other_thread.get_absolute_url(),
  242. 'poll': 0,
  243. }
  244. )
  245. self.assertContains(response, other_thread.get_absolute_url(), status_code=200)
  246. # other thread has two posts now
  247. self.assertEqual(other_thread.post_set.count(), 3)
  248. # first thread is gone
  249. with self.assertRaises(Thread.DoesNotExist):
  250. Thread.objects.get(pk=self.thread.pk)
  251. # polls and votes are gone
  252. self.assertEqual(Poll.objects.count(), 0)
  253. self.assertEqual(PollVote.objects.count(), 0)
  254. def test_threads_merge_conflict_keep_first_poll(self):
  255. """api deletes other poll on merge"""
  256. self.override_acl({'can_merge_threads': 1})
  257. self.override_other_acl({'can_merge_threads': 1})
  258. other_thread = testutils.post_thread(self.category_b)
  259. poll = testutils.post_poll(self.thread, self.user)
  260. other_poll = testutils.post_poll(other_thread, self.user)
  261. response = self.client.post(
  262. self.api_link, {
  263. 'thread_url': other_thread.get_absolute_url(),
  264. 'poll': poll.pk,
  265. }
  266. )
  267. self.assertContains(response, other_thread.get_absolute_url(), status_code=200)
  268. # other thread has two posts now
  269. self.assertEqual(other_thread.post_set.count(), 3)
  270. # first thread is gone
  271. with self.assertRaises(Thread.DoesNotExist):
  272. Thread.objects.get(pk=self.thread.pk)
  273. # other poll and its votes are gone
  274. self.assertEqual(Poll.objects.filter(thread=self.thread).count(), 0)
  275. self.assertEqual(PollVote.objects.filter(thread=self.thread).count(), 0)
  276. self.assertEqual(Poll.objects.filter(thread=other_thread).count(), 1)
  277. self.assertEqual(PollVote.objects.filter(thread=other_thread).count(), 4)
  278. Poll.objects.get(pk=poll.pk)
  279. with self.assertRaises(Poll.DoesNotExist):
  280. Poll.objects.get(pk=other_poll.pk)
  281. def test_threads_merge_conflict_keep_other_poll(self):
  282. """api deletes first poll on merge"""
  283. self.override_acl({'can_merge_threads': 1})
  284. self.override_other_acl({'can_merge_threads': 1})
  285. other_thread = testutils.post_thread(self.category_b)
  286. poll = testutils.post_poll(self.thread, self.user)
  287. other_poll = testutils.post_poll(other_thread, self.user)
  288. response = self.client.post(
  289. self.api_link, {
  290. 'thread_url': other_thread.get_absolute_url(),
  291. 'poll': other_poll.pk,
  292. }
  293. )
  294. self.assertContains(response, other_thread.get_absolute_url(), status_code=200)
  295. # other thread has two posts now
  296. self.assertEqual(other_thread.post_set.count(), 3)
  297. # first thread is gone
  298. with self.assertRaises(Thread.DoesNotExist):
  299. Thread.objects.get(pk=self.thread.pk)
  300. # other poll and its votes are gone
  301. self.assertEqual(Poll.objects.filter(thread=self.thread).count(), 0)
  302. self.assertEqual(PollVote.objects.filter(thread=self.thread).count(), 0)
  303. self.assertEqual(Poll.objects.filter(thread=other_thread).count(), 1)
  304. self.assertEqual(PollVote.objects.filter(thread=other_thread).count(), 4)
  305. Poll.objects.get(pk=other_poll.pk)
  306. with self.assertRaises(Poll.DoesNotExist):
  307. Poll.objects.get(pk=poll.pk)