test_attachments_middleware.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. from unittest.mock import Mock
  2. from rest_framework import serializers
  3. from misago.acl import useracl
  4. from misago.acl.test import patch_user_acl
  5. from misago.categories.models import Category
  6. from misago.conf import settings
  7. from misago.threads import testutils
  8. from misago.threads.api.postingendpoint import PostingEndpoint
  9. from misago.threads.api.postingendpoint.attachments import (
  10. AttachmentsMiddleware, validate_attachments_count)
  11. from misago.threads.models import Attachment, AttachmentType
  12. from misago.users.testutils import AuthenticatedUserTestCase
  13. cache_versions = {"acl": "abcdefgh"}
  14. def patch_attachments_acl(acl_patch=None):
  15. acl_patch = acl_patch or {}
  16. acl_patch.setdefault("max_attachment_size", 1024)
  17. return patch_user_acl(acl_patch)
  18. class AttachmentsMiddlewareTests(AuthenticatedUserTestCase):
  19. def setUp(self):
  20. super().setUp()
  21. self.category = Category.objects.get(slug='first-category')
  22. self.thread = testutils.post_thread(category=self.category)
  23. self.post = self.thread.first_post
  24. self.post.update_fields = []
  25. self.filetype = AttachmentType.objects.order_by('id').last()
  26. def mock_attachment(self, user=True, post=None):
  27. return Attachment.objects.create(
  28. secret=Attachment.generate_new_secret(),
  29. filetype=self.filetype,
  30. post=post,
  31. size=1000,
  32. uploader=self.user if user else None,
  33. uploader_name=self.user.username,
  34. uploader_slug=self.user.slug,
  35. filename='testfile_%s.zip' % (Attachment.objects.count() + 1),
  36. )
  37. def test_use_this_middleware(self):
  38. """use_this_middleware returns False if we can't upload attachments"""
  39. with patch_user_acl({'max_attachment_size': 0}):
  40. user_acl = useracl.get_user_acl(self.user, cache_versions)
  41. middleware = AttachmentsMiddleware(user=self.user, user_acl=user_acl)
  42. self.assertFalse(middleware.use_this_middleware())
  43. with patch_user_acl({'max_attachment_size': 1024}):
  44. user_acl = useracl.get_user_acl(self.user, cache_versions)
  45. middleware = AttachmentsMiddleware(user=self.user, user_acl=user_acl)
  46. self.assertTrue(middleware.use_this_middleware())
  47. @patch_attachments_acl()
  48. def test_middleware_is_optional(self):
  49. """middleware is optional"""
  50. INPUTS = [{}, {'attachments': []}]
  51. user_acl = useracl.get_user_acl(self.user, cache_versions)
  52. for test_input in INPUTS:
  53. middleware = AttachmentsMiddleware(
  54. request=Mock(data=test_input),
  55. mode=PostingEndpoint.START,
  56. user=self.user,
  57. user_acl=user_acl,
  58. post=self.post,
  59. )
  60. serializer = middleware.get_serializer()
  61. self.assertTrue(serializer.is_valid())
  62. @patch_attachments_acl()
  63. def test_middleware_validates_ids(self):
  64. """middleware validates attachments ids"""
  65. INPUTS = ['none', ['a', 'b', 123], range(settings.MISAGO_POST_ATTACHMENTS_LIMIT + 1)]
  66. user_acl = useracl.get_user_acl(self.user, cache_versions)
  67. for test_input in INPUTS:
  68. middleware = AttachmentsMiddleware(
  69. request=Mock(data={
  70. 'attachments': test_input
  71. }),
  72. mode=PostingEndpoint.START,
  73. user=self.user,
  74. user_acl=user_acl,
  75. post=self.post,
  76. )
  77. serializer = middleware.get_serializer()
  78. self.assertFalse(serializer.is_valid(), "%r shouldn't validate" % test_input)
  79. @patch_attachments_acl()
  80. def test_get_initial_attachments(self):
  81. """get_initial_attachments returns list of attachments already existing on post"""
  82. user_acl = useracl.get_user_acl(self.user, cache_versions)
  83. middleware = AttachmentsMiddleware(
  84. request=Mock(data={}),
  85. mode=PostingEndpoint.EDIT,
  86. user=self.user,
  87. user_acl=user_acl,
  88. post=self.post,
  89. )
  90. serializer = middleware.get_serializer()
  91. attachments = serializer.get_initial_attachments(
  92. middleware.mode, middleware.user, middleware.post
  93. )
  94. self.assertEqual(attachments, [])
  95. attachment = self.mock_attachment(post=self.post)
  96. attachments = serializer.get_initial_attachments(
  97. middleware.mode, middleware.user_acl, middleware.post
  98. )
  99. self.assertEqual(attachments, [attachment])
  100. @patch_attachments_acl()
  101. def test_get_new_attachments(self):
  102. """get_initial_attachments returns list of attachments already existing on post"""
  103. user_acl = useracl.get_user_acl(self.user, cache_versions)
  104. middleware = AttachmentsMiddleware(
  105. request=Mock(data={}),
  106. mode=PostingEndpoint.EDIT,
  107. user=self.user,
  108. user_acl=user_acl,
  109. post=self.post,
  110. )
  111. serializer = middleware.get_serializer()
  112. attachments = serializer.get_new_attachments(middleware.user, [1, 2, 3])
  113. self.assertEqual(attachments, [])
  114. attachment = self.mock_attachment()
  115. attachments = serializer.get_new_attachments(middleware.user, [attachment.pk])
  116. self.assertEqual(attachments, [attachment])
  117. # only own orphaned attachments may be assigned to posts
  118. other_user_attachment = self.mock_attachment(user=False)
  119. attachments = serializer.get_new_attachments(middleware.user, [other_user_attachment.pk])
  120. self.assertEqual(attachments, [])
  121. @patch_attachments_acl({'can_delete_other_users_attachments': False})
  122. def test_cant_delete_attachment(self):
  123. """middleware validates if we have permission to delete other users attachments"""
  124. attachment = self.mock_attachment(user=False, post=self.post)
  125. self.assertIsNone(attachment.uploader)
  126. user_acl = useracl.get_user_acl(self.user, cache_versions)
  127. serializer = AttachmentsMiddleware(
  128. request=Mock(data={
  129. 'attachments': []
  130. }),
  131. mode=PostingEndpoint.EDIT,
  132. user=self.user,
  133. user_acl=user_acl,
  134. post=self.post,
  135. ).get_serializer()
  136. self.assertFalse(serializer.is_valid())
  137. @patch_attachments_acl()
  138. def test_add_attachments(self):
  139. """middleware adds attachments to post"""
  140. attachments = [
  141. self.mock_attachment(),
  142. self.mock_attachment(),
  143. ]
  144. user_acl = useracl.get_user_acl(self.user, cache_versions)
  145. middleware = AttachmentsMiddleware(
  146. request=Mock(data={
  147. 'attachments': [a.pk for a in attachments]
  148. }),
  149. mode=PostingEndpoint.EDIT,
  150. user=self.user,
  151. user_acl=user_acl,
  152. post=self.post,
  153. )
  154. serializer = middleware.get_serializer()
  155. self.assertTrue(serializer.is_valid())
  156. middleware.save(serializer)
  157. # attachments were associated with post
  158. self.assertEqual(self.post.update_fields, ['attachments_cache'])
  159. self.assertEqual(self.post.attachment_set.count(), 2)
  160. attachments_filenames = list(reversed([a.filename for a in attachments]))
  161. self.assertEqual([a['filename'] for a in self.post.attachments_cache],
  162. attachments_filenames)
  163. @patch_attachments_acl()
  164. def test_remove_attachments(self):
  165. """middleware removes attachment from post and db"""
  166. attachments = [
  167. self.mock_attachment(post=self.post),
  168. self.mock_attachment(post=self.post),
  169. ]
  170. user_acl = useracl.get_user_acl(self.user, cache_versions)
  171. middleware = AttachmentsMiddleware(
  172. request=Mock(data={
  173. 'attachments': [attachments[0].pk]
  174. }),
  175. mode=PostingEndpoint.EDIT,
  176. user=self.user,
  177. user_acl=user_acl,
  178. post=self.post,
  179. )
  180. serializer = middleware.get_serializer()
  181. self.assertTrue(serializer.is_valid())
  182. middleware.save(serializer)
  183. # attachments were associated with post
  184. self.assertEqual(self.post.update_fields, ['attachments_cache'])
  185. self.assertEqual(self.post.attachment_set.count(), 1)
  186. self.assertEqual(Attachment.objects.count(), 1)
  187. attachments_filenames = [attachments[0].filename]
  188. self.assertEqual([a['filename'] for a in self.post.attachments_cache],
  189. attachments_filenames)
  190. @patch_attachments_acl()
  191. def test_steal_attachments(self):
  192. """middleware validates if attachments are already assigned to other posts"""
  193. other_post = testutils.reply_thread(self.thread)
  194. attachments = [
  195. self.mock_attachment(post=other_post),
  196. self.mock_attachment(),
  197. ]
  198. user_acl = useracl.get_user_acl(self.user, cache_versions)
  199. middleware = AttachmentsMiddleware(
  200. request=Mock(data={
  201. 'attachments': [attachments[0].pk, attachments[1].pk]
  202. }),
  203. mode=PostingEndpoint.EDIT,
  204. user=self.user,
  205. user_acl=user_acl,
  206. post=self.post,
  207. )
  208. serializer = middleware.get_serializer()
  209. self.assertTrue(serializer.is_valid())
  210. middleware.save(serializer)
  211. # only unassociated attachment was associated with post
  212. self.assertEqual(self.post.update_fields, ['attachments_cache'])
  213. self.assertEqual(self.post.attachment_set.count(), 1)
  214. self.assertEqual(Attachment.objects.get(pk=attachments[0].pk).post, other_post)
  215. self.assertEqual(Attachment.objects.get(pk=attachments[1].pk).post, self.post)
  216. @patch_attachments_acl()
  217. def test_edit_attachments(self):
  218. """middleware removes and adds attachments to post"""
  219. attachments = [
  220. self.mock_attachment(post=self.post),
  221. self.mock_attachment(post=self.post),
  222. self.mock_attachment(),
  223. ]
  224. user_acl = useracl.get_user_acl(self.user, cache_versions)
  225. middleware = AttachmentsMiddleware(
  226. request=Mock(data={
  227. 'attachments': [attachments[0].pk, attachments[2].pk]
  228. }),
  229. mode=PostingEndpoint.EDIT,
  230. user=self.user,
  231. user_acl=user_acl,
  232. post=self.post,
  233. )
  234. serializer = middleware.get_serializer()
  235. self.assertTrue(serializer.is_valid())
  236. middleware.save(serializer)
  237. # attachments were associated with post
  238. self.assertEqual(self.post.update_fields, ['attachments_cache'])
  239. self.assertEqual(self.post.attachment_set.count(), 2)
  240. attachments_filenames = [attachments[2].filename, attachments[0].filename]
  241. self.assertEqual([a['filename'] for a in self.post.attachments_cache],
  242. attachments_filenames)
  243. class ValidateAttachmentsCountTests(AuthenticatedUserTestCase):
  244. def test_validate_attachments_count(self):
  245. """too large count of attachments is rejected"""
  246. validate_attachments_count(range(settings.MISAGO_POST_ATTACHMENTS_LIMIT))
  247. with self.assertRaises(serializers.ValidationError):
  248. validate_attachments_count(range(settings.MISAGO_POST_ATTACHMENTS_LIMIT + 1))