test_thread_postmerge_api.py 20 KB


  1. import json
  2. from django.urls import reverse
  3. from misago.acl.testutils import override_acl
  4. from misago.categories.models import Category
  5. from misago.readtracker import poststracker
  6. from misago.threads import testutils
  7. from misago.threads.models import Post, Thread
  8. from misago.threads.serializers.moderation import POSTS_LIMIT
  9. from misago.users.testutils import AuthenticatedUserTestCase
  10. class ThreadPostMergeApiTestCase(AuthenticatedUserTestCase):
  11. def setUp(self):
  12. super().setUp()
  13. self.category = Category.objects.get(slug='first-category')
  14. self.thread = testutils.post_thread(category=self.category)
  15. self.post = testutils.reply_thread(self.thread, poster=self.user)
  16. self.api_link = reverse(
  17. 'misago:api:thread-post-merge', kwargs={
  18. 'thread_pk': self.thread.pk,
  19. }
  20. )
  21. self.override_acl()
  22. def refresh_thread(self):
  23. self.thread = Thread.objects.get(pk=self.thread.pk)
  24. def override_acl(self, extra_acl=None):
  25. new_acl = self.user.acl_cache
  26. new_acl['categories'][self.category.pk].update({
  27. 'can_see': 1,
  28. 'can_browse': 1,
  29. 'can_start_threads': 0,
  30. 'can_reply_threads': 0,
  31. 'can_edit_posts': 1,
  32. 'can_approve_content': 0,
  33. 'can_merge_posts': 1,
  34. })
  35. if extra_acl:
  36. new_acl['categories'][self.category.pk].update(extra_acl)
  37. override_acl(self.user, new_acl)
  38. def test_anonymous_user(self):
  39. """you need to authenticate to merge posts"""
  40. self.logout_user()
  41. response = self.client.post(
  42. self.api_link,
  43. json.dumps({}),
  44. content_type="application/json",
  45. )
  46. self.assertEqual(response.status_code, 403)
  47. def test_no_permission(self):
  48. """api validates permission to merge"""
  49. self.override_acl({'can_merge_posts': 0})
  50. response = self.client.post(
  51. self.api_link,
  52. json.dumps({}),
  53. content_type="application/json",
  54. )
  55. self.assertContains(response, "You can't merge posts in this thread.", status_code=403)
  56. def test_empty_data_json(self):
  57. """api handles empty json data"""
  58. response = self.client.post(
  59. self.api_link, json.dumps({}), content_type="application/json"
  60. )
  61. self.assertContains(
  62. response, "You have to select at least two posts to merge.", status_code=400
  63. )
  64. def test_empty_data_form(self):
  65. """api handles empty form data"""
  66. response = self.client.post(self.api_link, {})
  67. self.assertContains(
  68. response, "You have to select at least two posts to merge.", status_code=400
  69. )
  70. def test_invalid_data(self):
  71. """api handles post that is invalid type"""
  72. self.override_acl()
  73. response = self.client.post(self.api_link, '[]', content_type="application/json")
  74. self.assertContains(response, "Invalid data. Expected a dictionary", status_code=400)
  75. self.override_acl()
  76. response = self.client.post(self.api_link, '123', content_type="application/json")
  77. self.assertContains(response, "Invalid data. Expected a dictionary", status_code=400)
  78. self.override_acl()
  79. response = self.client.post(self.api_link, '"string"', content_type="application/json")
  80. self.assertContains(response, "Invalid data. Expected a dictionary", status_code=400)
  81. self.override_acl()
  82. response = self.client.post(self.api_link, 'malformed', content_type="application/json")
  83. self.assertContains(response, "JSON parse error", status_code=400)
  84. def test_no_posts_ids(self):
  85. """api rejects no posts ids"""
  86. response = self.client.post(
  87. self.api_link,
  88. json.dumps({
  89. 'posts': []
  90. }),
  91. content_type="application/json",
  92. )
  93. self.assertContains(
  94. response, "You have to select at least two posts to merge.", status_code=400
  95. )
  96. def test_invalid_posts_data(self):
  97. """api handles invalid data"""
  98. response = self.client.post(
  99. self.api_link,
  100. json.dumps({
  101. 'posts': 'string'
  102. }),
  103. content_type="application/json",
  104. )
  105. self.assertContains(
  106. response, "Expected a list of items but got type", status_code=400
  107. )
  108. def test_invalid_posts_ids(self):
  109. """api handles invalid post id"""
  110. response = self.client.post(
  111. self.api_link,
  112. json.dumps({
  113. 'posts': [1, 2, 'string']
  114. }),
  115. content_type="application/json",
  116. )
  117. self.assertContains(
  118. response, "One or more post ids received were invalid.", status_code=400
  119. )
  120. def test_one_post_id(self):
  121. """api rejects one post id"""
  122. response = self.client.post(
  123. self.api_link,
  124. json.dumps({
  125. 'posts': [1]
  126. }),
  127. content_type="application/json",
  128. )
  129. self.assertContains(
  130. response, "You have to select at least two posts to merge.", status_code=400
  131. )
  132. def test_merge_limit(self):
  133. """api rejects more posts than merge limit"""
  134. response = self.client.post(
  135. self.api_link,
  136. json.dumps({
  137. 'posts': list(range(POSTS_LIMIT + 1))
  138. }),
  139. content_type="application/json",
  140. )
  141. self.assertContains(
  142. response, "No more than {} posts can be merged".format(POSTS_LIMIT), status_code=400
  143. )
  144. def test_merge_event(self):
  145. """api recjects events"""
  146. event = testutils.reply_thread(self.thread, is_event=True, poster=self.user)
  147. response = self.client.post(
  148. self.api_link,
  149. json.dumps({
  150. 'posts': [self.post.pk, event.pk]
  151. }),
  152. content_type="application/json",
  153. )
  154. self.assertContains(response, "Events can't be merged.", status_code=400)
  155. def test_merge_notfound_pk(self):
  156. """api recjects nonexistant pk's"""
  157. response = self.client.post(
  158. self.api_link,
  159. json.dumps({
  160. 'posts': [self.post.pk, self.post.pk * 1000]
  161. }),
  162. content_type="application/json",
  163. )
  164. self.assertContains(
  165. response, "One or more posts to merge could not be found.", status_code=400
  166. )
  167. def test_merge_cross_threads(self):
  168. """api recjects attempt to merge with post made in other thread"""
  169. other_thread = testutils.post_thread(category=self.category)
  170. other_post = testutils.reply_thread(other_thread, poster=self.user)
  171. response = self.client.post(
  172. self.api_link,
  173. json.dumps({
  174. 'posts': [self.post.pk, other_post.pk]
  175. }),
  176. content_type="application/json",
  177. )
  178. self.assertContains(
  179. response, "One or more posts to merge could not be found.", status_code=400
  180. )
  181. def test_merge_authenticated_with_guest_post(self):
  182. """api recjects attempt to merge with post made by deleted user"""
  183. other_post = testutils.reply_thread(self.thread)
  184. response = self.client.post(
  185. self.api_link,
  186. json.dumps({
  187. 'posts': [self.post.pk, other_post.pk]
  188. }),
  189. content_type="application/json",
  190. )
  191. self.assertContains(
  192. response, "Posts made by different users can't be merged.", status_code=400
  193. )
  194. def test_merge_guest_with_authenticated_post(self):
  195. """api recjects attempt to merge with post made by deleted user"""
  196. other_post = testutils.reply_thread(self.thread)
  197. response = self.client.post(
  198. self.api_link,
  199. json.dumps({
  200. 'posts': [other_post.pk, self.post.pk]
  201. }),
  202. content_type="application/json",
  203. )
  204. self.assertContains(
  205. response, "Posts made by different users can't be merged.", status_code=400
  206. )
  207. def test_merge_guest_posts_different_usernames(self):
  208. """api recjects attempt to merge posts made by different guests"""
  209. response = self.client.post(
  210. self.api_link,
  211. json.dumps({
  212. 'posts': [
  213. testutils.reply_thread(self.thread, poster="Bob").pk,
  214. testutils.reply_thread(self.thread, poster="Miku").pk,
  215. ]
  216. }),
  217. content_type="application/json",
  218. )
  219. self.assertContains(
  220. response, "Posts made by different users can't be merged.", status_code=400
  221. )
  222. def test_merge_different_visibility(self):
  223. """api recjects attempt to merge posts with different visibility"""
  224. self.override_acl({'can_hide_posts': 1})
  225. response = self.client.post(
  226. self.api_link,
  227. json.dumps({
  228. 'posts': [
  229. testutils.reply_thread(self.thread, poster=self.user, is_hidden=True).pk,
  230. testutils.reply_thread(self.thread, poster=self.user, is_hidden=False).pk,
  231. ]
  232. }),
  233. content_type="application/json",
  234. )
  235. self.assertContains(
  236. response, "Posts with different visibility can't be merged.", status_code=400
  237. )
  238. def test_merge_different_approval(self):
  239. """api recjects attempt to merge posts with different approval"""
  240. self.override_acl({'can_approve_content': 1})
  241. response = self.client.post(
  242. self.api_link,
  243. json.dumps({
  244. 'posts': [
  245. testutils.reply_thread(self.thread, poster=self.user, is_unapproved=True).pk,
  246. testutils.reply_thread(self.thread, poster=self.user, is_unapproved=False).pk,
  247. ]
  248. }),
  249. content_type="application/json",
  250. )
  251. self.assertContains(
  252. response, "Posts with different visibility can't be merged.", status_code=400
  253. )
  254. def test_closed_thread(self):
  255. """api validates permission to merge in closed thread"""
  256. self.thread.is_closed = True
  257. self.thread.save()
  258. posts = [
  259. testutils.reply_thread(self.thread, poster=self.user).pk,
  260. testutils.reply_thread(self.thread, poster=self.user).pk,
  261. ]
  262. response = self.client.post(
  263. self.api_link,
  264. json.dumps({'posts': posts}),
  265. content_type="application/json",
  266. )
  267. self.assertContains(
  268. response,
  269. "This thread is closed. You can't merge posts in it.",
  270. status_code=400,
  271. )
  272. # allow closing threads
  273. self.override_acl({'can_close_threads': 1})
  274. response = self.client.post(
  275. self.api_link,
  276. json.dumps({'posts': posts}),
  277. content_type="application/json",
  278. )
  279. self.assertEqual(response.status_code, 200)
  280. def test_closed_category(self):
  281. """api validates permission to merge in closed category"""
  282. self.category.is_closed = True
  283. self.category.save()
  284. posts = [
  285. testutils.reply_thread(self.thread, poster=self.user).pk,
  286. testutils.reply_thread(self.thread, poster=self.user).pk,
  287. ]
  288. response = self.client.post(
  289. self.api_link,
  290. json.dumps({'posts': posts}),
  291. content_type="application/json",
  292. )
  293. self.assertContains(
  294. response,
  295. "This category is closed. You can't merge posts in it.",
  296. status_code=400,
  297. )
  298. # allow closing threads
  299. self.override_acl({'can_close_threads': 1})
  300. response = self.client.post(
  301. self.api_link,
  302. json.dumps({'posts': posts}),
  303. content_type="application/json",
  304. )
  305. self.assertEqual(response.status_code, 200)
  306. def test_merge_best_answer_first_post(self):
  307. """api recjects attempt to merge best_answer with first post"""
  308. self.thread.first_post.poster = self.user
  309. self.thread.first_post.save()
  310. self.post.poster = self.user
  311. self.post.save()
  312. self.thread.set_best_answer(self.user, self.post)
  313. self.thread.save()
  314. response = self.client.post(
  315. self.api_link,
  316. json.dumps({
  317. 'posts': [
  318. self.thread.first_post.pk,
  319. self.post.pk,
  320. ]
  321. }),
  322. content_type="application/json",
  323. )
  324. self.assertEqual(response.status_code, 400)
  325. self.assertEqual(response.json(), {
  326. 'detail': "Post marked as best answer can't be merged with thread's first post."
  327. })
  328. def test_merge_posts(self):
  329. """api merges two posts"""
  330. post_a = testutils.reply_thread(self.thread, poster=self.user, message="Battęry")
  331. post_b = testutils.reply_thread(self.thread, poster=self.user, message="Hórse")
  332. thread_replies = self.thread.replies
  333. response = self.client.post(
  334. self.api_link,
  335. json.dumps({
  336. 'posts': [post_a.pk, post_b.pk]
  337. }),
  338. content_type="application/json",
  339. )
  340. self.assertEqual(response.status_code, 200)
  341. self.refresh_thread()
  342. self.assertEqual(self.thread.replies, thread_replies - 1)
  343. with self.assertRaises(Post.DoesNotExist):
  344. Post.objects.get(pk=post_b.pk)
  345. merged_post = Post.objects.get(pk=post_a.pk)
  346. self.assertEqual(merged_post.parsed, '{}\n{}'.format(post_a.parsed, post_b.parsed))
  347. def test_merge_guest_posts(self):
  348. """api recjects attempt to merge posts made by same guest"""
  349. response = self.client.post(
  350. self.api_link,
  351. json.dumps({
  352. 'posts': [
  353. testutils.reply_thread(self.thread, poster="Bob").pk,
  354. testutils.reply_thread(self.thread, poster="Bob").pk,
  355. ]
  356. }),
  357. content_type="application/json",
  358. )
  359. self.assertEqual(response.status_code, 200)
  360. def test_merge_hidden_posts(self):
  361. """api merges two hidden posts"""
  362. self.override_acl({'can_hide_posts': 1})
  363. response = self.client.post(
  364. self.api_link,
  365. json.dumps({
  366. 'posts': [
  367. testutils.reply_thread(self.thread, poster=self.user, is_hidden=True).pk,
  368. testutils.reply_thread(self.thread, poster=self.user, is_hidden=True).pk,
  369. ]
  370. }),
  371. content_type="application/json",
  372. )
  373. self.assertEqual(response.status_code, 200)
  374. def test_merge_unapproved_posts(self):
  375. """api merges two unapproved posts"""
  376. self.override_acl({'can_approve_content': 1})
  377. response = self.client.post(
  378. self.api_link,
  379. json.dumps({
  380. 'posts': [
  381. testutils.reply_thread(self.thread, poster=self.user, is_unapproved=True).pk,
  382. testutils.reply_thread(self.thread, poster=self.user, is_unapproved=True).pk,
  383. ]
  384. }),
  385. content_type="application/json",
  386. )
  387. self.assertEqual(response.status_code, 200)
  388. def test_merge_with_hidden_thread(self):
  389. """api excludes thread's first post from visibility checks"""
  390. self.thread.first_post.is_hidden = True
  391. self.thread.first_post.poster = self.user
  392. self.thread.first_post.save()
  393. post_visible = testutils.reply_thread(self.thread, poster=self.user, is_hidden=False)
  394. self.override_acl({'can_hide_threads': 1})
  395. response = self.client.post(
  396. self.api_link,
  397. json.dumps({
  398. 'posts': [self.thread.first_post.pk, post_visible.pk]
  399. }),
  400. content_type="application/json",
  401. )
  402. self.assertEqual(response.status_code, 200)
  403. def test_merge_protected(self):
  404. """api preserves protected status after merge"""
  405. response = self.client.post(
  406. self.api_link,
  407. json.dumps({
  408. 'posts': [
  409. testutils.reply_thread(self.thread, poster="Bob", is_protected=True).pk,
  410. testutils.reply_thread(self.thread, poster="Bob", is_protected=False).pk,
  411. ]
  412. }),
  413. content_type="application/json",
  414. )
  415. self.assertEqual(response.status_code, 200)
  416. merged_post = self.thread.post_set.order_by('-id')[0]
  417. self.assertTrue(merged_post.is_protected)
  418. def test_merge_best_answer(self):
  419. """api merges best answer with other post"""
  420. best_answer = testutils.reply_thread(self.thread, poster="Bob")
  421. self.thread.set_best_answer(self.user, best_answer)
  422. self.thread.save()
  423. response = self.client.post(
  424. self.api_link,
  425. json.dumps({
  426. 'posts': [
  427. best_answer.pk,
  428. testutils.reply_thread(self.thread, poster="Bob").pk,
  429. ]
  430. }),
  431. content_type="application/json",
  432. )
  433. self.assertEqual(response.status_code, 200)
  434. self.refresh_thread()
  435. self.assertEqual(self.thread.best_answer, best_answer)
  436. def test_merge_best_answer_in(self):
  437. """api merges best answer into other post"""
  438. other_post = testutils.reply_thread(self.thread, poster="Bob")
  439. best_answer = testutils.reply_thread(self.thread, poster="Bob")
  440. self.thread.set_best_answer(self.user, best_answer)
  441. self.thread.save()
  442. response = self.client.post(
  443. self.api_link,
  444. json.dumps({
  445. 'posts': [
  446. best_answer.pk,
  447. other_post.pk,
  448. ]
  449. }),
  450. content_type="application/json",
  451. )
  452. self.assertEqual(response.status_code, 200)
  453. self.refresh_thread()
  454. self.assertEqual(self.thread.best_answer, other_post)
  455. def test_merge_best_answer_in_protected(self):
  456. """api merges best answer into protected post"""
  457. best_answer = testutils.reply_thread(self.thread, poster="Bob")
  458. self.thread.set_best_answer(self.user, best_answer)
  459. self.thread.save()
  460. response = self.client.post(
  461. self.api_link,
  462. json.dumps({
  463. 'posts': [
  464. best_answer.pk,
  465. testutils.reply_thread(self.thread, poster="Bob", is_protected=True).pk,
  466. ]
  467. }),
  468. content_type="application/json",
  469. )
  470. self.assertEqual(response.status_code, 200)
  471. self.refresh_thread()
  472. self.assertEqual(self.thread.best_answer, best_answer)
  473. self.assertTrue(self.thread.best_answer.is_protected)
  474. self.assertTrue(self.thread.best_answer_is_protected)
  475. def test_merge_remove_reads(self):
  476. """two posts merge removes read tracker from post"""
  477. post_a = testutils.reply_thread(self.thread, poster=self.user, message="Battęry")
  478. post_b = testutils.reply_thread(self.thread, poster=self.user, message="Hórse")
  479. poststracker.save_read(self.user, post_a)
  480. poststracker.save_read(self.user, post_b)
  481. response = self.client.post(
  482. self.api_link,
  483. json.dumps({
  484. 'posts': [post_a.pk, post_b.pk]
  485. }),
  486. content_type="application/json",
  487. )
  488. self.assertEqual(response.status_code, 200)
  489. # both post's were removed from readtracker
  490. self.assertEqual(self.user.postread_set.count(), 0)