test_attachments_middleware.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. from unittest.mock import Mock
  2. from rest_framework import serializers
  3. from .. import test
  4. from ...acl import useracl
  5. from ...acl.test import patch_user_acl
  6. from ...categories.models import Category
  7. from ...conf import settings
  8. from ...conftest import get_cache_versions
  9. from ...users.test import AuthenticatedUserTestCase
  10. from ..api.postingendpoint import PostingEndpoint
  11. from ..api.postingendpoint.attachments import (
  12. AttachmentsMiddleware,
  13. validate_attachments_count,
  14. )
  15. from ..models import Attachment, AttachmentType
  16. cache_versions = get_cache_versions()
  17. def patch_attachments_acl(acl_patch=None):
  18. acl_patch = acl_patch or {}
  19. acl_patch.setdefault("max_attachment_size", 1024)
  20. return patch_user_acl(acl_patch)
  21. class AttachmentsMiddlewareTests(AuthenticatedUserTestCase):
  22. def setUp(self):
  23. super().setUp()
  24. self.category = Category.objects.get(slug="first-category")
  25. self.thread = test.post_thread(category=self.category)
  26. self.post = self.thread.first_post
  27. self.post.update_fields = []
  28. self.filetype = AttachmentType.objects.order_by("id").last()
  29. def mock_attachment(self, user=True, post=None):
  30. return Attachment.objects.create(
  31. secret=Attachment.generate_new_secret(),
  32. filetype=self.filetype,
  33. post=post,
  34. size=1000,
  35. uploader=self.user if user else None,
  36. uploader_name=self.user.username,
  37. uploader_slug=self.user.slug,
  38. filename="testfile_%s.zip" % (Attachment.objects.count() + 1),
  39. )
  40. def test_use_this_middleware(self):
  41. """use_this_middleware returns False if we can't upload attachments"""
  42. with patch_user_acl({"max_attachment_size": 0}):
  43. user_acl = useracl.get_user_acl(self.user, cache_versions)
  44. middleware = AttachmentsMiddleware(user=self.user, user_acl=user_acl)
  45. self.assertFalse(middleware.use_this_middleware())
  46. with patch_user_acl({"max_attachment_size": 1024}):
  47. user_acl = useracl.get_user_acl(self.user, cache_versions)
  48. middleware = AttachmentsMiddleware(user=self.user, user_acl=user_acl)
  49. self.assertTrue(middleware.use_this_middleware())
  50. @patch_attachments_acl()
  51. def test_middleware_is_optional(self):
  52. """middleware is optional"""
  53. INPUTS = [{}, {"attachments": []}]
  54. user_acl = useracl.get_user_acl(self.user, cache_versions)
  55. for test_input in INPUTS:
  56. middleware = AttachmentsMiddleware(
  57. request=Mock(data=test_input),
  58. mode=PostingEndpoint.START,
  59. user=self.user,
  60. user_acl=user_acl,
  61. post=self.post,
  62. )
  63. serializer = middleware.get_serializer()
  64. self.assertTrue(serializer.is_valid())
  65. @patch_attachments_acl()
  66. def test_middleware_validates_ids(self):
  67. """middleware validates attachments ids"""
  68. INPUTS = [
  69. "none",
  70. ["a", "b", 123],
  71. range(settings.MISAGO_POST_ATTACHMENTS_LIMIT + 1),
  72. ]
  73. user_acl = useracl.get_user_acl(self.user, cache_versions)
  74. for test_input in INPUTS:
  75. middleware = AttachmentsMiddleware(
  76. request=Mock(data={"attachments": test_input}),
  77. mode=PostingEndpoint.START,
  78. user=self.user,
  79. user_acl=user_acl,
  80. post=self.post,
  81. )
  82. serializer = middleware.get_serializer()
  83. self.assertFalse(
  84. serializer.is_valid(), "%r shouldn't validate" % test_input
  85. )
  86. @patch_attachments_acl()
  87. def test_get_initial_attachments(self):
  88. """
  89. get_initial_attachments returns list of attachments already existing on post
  90. """
  91. user_acl = useracl.get_user_acl(self.user, cache_versions)
  92. middleware = AttachmentsMiddleware(
  93. request=Mock(data={}),
  94. mode=PostingEndpoint.EDIT,
  95. user=self.user,
  96. user_acl=user_acl,
  97. post=self.post,
  98. )
  99. serializer = middleware.get_serializer()
  100. attachments = serializer.get_initial_attachments(
  101. middleware.mode, middleware.user, middleware.post
  102. )
  103. self.assertEqual(attachments, [])
  104. attachment = self.mock_attachment(post=self.post)
  105. attachments = serializer.get_initial_attachments(
  106. middleware.mode, middleware.user_acl, middleware.post
  107. )
  108. self.assertEqual(attachments, [attachment])
  109. @patch_attachments_acl()
  110. def test_get_new_attachments(self):
  111. """
  112. get_initial_attachments returns list of attachments already existing on post
  113. """
  114. user_acl = useracl.get_user_acl(self.user, cache_versions)
  115. middleware = AttachmentsMiddleware(
  116. request=Mock(data={}),
  117. mode=PostingEndpoint.EDIT,
  118. user=self.user,
  119. user_acl=user_acl,
  120. post=self.post,
  121. )
  122. serializer = middleware.get_serializer()
  123. attachments = serializer.get_new_attachments(middleware.user, [1, 2, 3])
  124. self.assertEqual(attachments, [])
  125. attachment = self.mock_attachment()
  126. attachments = serializer.get_new_attachments(middleware.user, [attachment.pk])
  127. self.assertEqual(attachments, [attachment])
  128. # only own orphaned attachments may be assigned to posts
  129. other_user_attachment = self.mock_attachment(user=False)
  130. attachments = serializer.get_new_attachments(
  131. middleware.user, [other_user_attachment.pk]
  132. )
  133. self.assertEqual(attachments, [])
  134. @patch_attachments_acl({"can_delete_other_users_attachments": False})
  135. def test_cant_delete_attachment(self):
  136. """
  137. middleware validates if we have permission to delete other users attachments
  138. """
  139. attachment = self.mock_attachment(user=False, post=self.post)
  140. self.assertIsNone(attachment.uploader)
  141. user_acl = useracl.get_user_acl(self.user, cache_versions)
  142. serializer = AttachmentsMiddleware(
  143. request=Mock(data={"attachments": []}),
  144. mode=PostingEndpoint.EDIT,
  145. user=self.user,
  146. user_acl=user_acl,
  147. post=self.post,
  148. ).get_serializer()
  149. self.assertFalse(serializer.is_valid())
  150. @patch_attachments_acl()
  151. def test_add_attachments(self):
  152. """middleware adds attachments to post"""
  153. attachments = [self.mock_attachment(), self.mock_attachment()]
  154. user_acl = useracl.get_user_acl(self.user, cache_versions)
  155. middleware = AttachmentsMiddleware(
  156. request=Mock(data={"attachments": [a.pk for a in attachments]}),
  157. mode=PostingEndpoint.EDIT,
  158. user=self.user,
  159. user_acl=user_acl,
  160. post=self.post,
  161. )
  162. serializer = middleware.get_serializer()
  163. self.assertTrue(serializer.is_valid())
  164. middleware.save(serializer)
  165. # attachments were associated with post
  166. self.assertEqual(self.post.update_fields, ["attachments_cache"])
  167. self.assertEqual(self.post.attachment_set.count(), 2)
  168. attachments_filenames = list(reversed([a.filename for a in attachments]))
  169. self.assertEqual(
  170. [a["filename"] for a in self.post.attachments_cache], attachments_filenames
  171. )
  172. @patch_attachments_acl()
  173. def test_remove_attachments(self):
  174. """middleware removes attachment from post and db"""
  175. attachments = [
  176. self.mock_attachment(post=self.post),
  177. self.mock_attachment(post=self.post),
  178. ]
  179. user_acl = useracl.get_user_acl(self.user, cache_versions)
  180. middleware = AttachmentsMiddleware(
  181. request=Mock(data={"attachments": [attachments[0].pk]}),
  182. mode=PostingEndpoint.EDIT,
  183. user=self.user,
  184. user_acl=user_acl,
  185. post=self.post,
  186. )
  187. serializer = middleware.get_serializer()
  188. self.assertTrue(serializer.is_valid())
  189. middleware.save(serializer)
  190. # attachments were associated with post
  191. self.assertEqual(self.post.update_fields, ["attachments_cache"])
  192. self.assertEqual(self.post.attachment_set.count(), 1)
  193. self.assertEqual(Attachment.objects.count(), 1)
  194. attachments_filenames = [attachments[0].filename]
  195. self.assertEqual(
  196. [a["filename"] for a in self.post.attachments_cache], attachments_filenames
  197. )
  198. @patch_attachments_acl()
  199. def test_steal_attachments(self):
  200. """middleware validates if attachments are already assigned to other posts"""
  201. other_post = test.reply_thread(self.thread)
  202. attachments = [self.mock_attachment(post=other_post), self.mock_attachment()]
  203. user_acl = useracl.get_user_acl(self.user, cache_versions)
  204. middleware = AttachmentsMiddleware(
  205. request=Mock(data={"attachments": [attachments[0].pk, attachments[1].pk]}),
  206. mode=PostingEndpoint.EDIT,
  207. user=self.user,
  208. user_acl=user_acl,
  209. post=self.post,
  210. )
  211. serializer = middleware.get_serializer()
  212. self.assertTrue(serializer.is_valid())
  213. middleware.save(serializer)
  214. # only unassociated attachment was associated with post
  215. self.assertEqual(self.post.update_fields, ["attachments_cache"])
  216. self.assertEqual(self.post.attachment_set.count(), 1)
  217. self.assertEqual(Attachment.objects.get(pk=attachments[0].pk).post, other_post)
  218. self.assertEqual(Attachment.objects.get(pk=attachments[1].pk).post, self.post)
  219. @patch_attachments_acl()
  220. def test_edit_attachments(self):
  221. """middleware removes and adds attachments to post"""
  222. attachments = [
  223. self.mock_attachment(post=self.post),
  224. self.mock_attachment(post=self.post),
  225. self.mock_attachment(),
  226. ]
  227. user_acl = useracl.get_user_acl(self.user, cache_versions)
  228. middleware = AttachmentsMiddleware(
  229. request=Mock(data={"attachments": [attachments[0].pk, attachments[2].pk]}),
  230. mode=PostingEndpoint.EDIT,
  231. user=self.user,
  232. user_acl=user_acl,
  233. post=self.post,
  234. )
  235. serializer = middleware.get_serializer()
  236. self.assertTrue(serializer.is_valid())
  237. middleware.save(serializer)
  238. # attachments were associated with post
  239. self.assertEqual(self.post.update_fields, ["attachments_cache"])
  240. self.assertEqual(self.post.attachment_set.count(), 2)
  241. attachments_filenames = [attachments[2].filename, attachments[0].filename]
  242. self.assertEqual(
  243. [a["filename"] for a in self.post.attachments_cache], attachments_filenames
  244. )
  245. class ValidateAttachmentsCountTests(AuthenticatedUserTestCase):
  246. def test_validate_attachments_count(self):
  247. """too large count of attachments is rejected"""
  248. validate_attachments_count(range(settings.MISAGO_POST_ATTACHMENTS_LIMIT))
  249. with self.assertRaises(serializers.ValidationError):
  250. validate_attachments_count(
  251. range(settings.MISAGO_POST_ATTACHMENTS_LIMIT + 1)
  252. )