test_participants.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. from django.contrib.auth import get_user_model
  2. from django.test import TestCase
  3. from django.utils import timezone
  4. from misago.categories.models import Category
  5. from ..models import Post, Thread, ThreadParticipant
  6. from ..participants import (
  7. has_participants,
  8. make_participants_aware,
  9. set_owner,
  10. set_users_unread_private_threads_sync
  11. )
  12. class ParticipantsTests(TestCase):
  13. def setUp(self):
  14. datetime = timezone.now()
  15. self.category = Category.objects.all_categories()[:1][0]
  16. self.thread = Thread(
  17. category=self.category,
  18. started_on=datetime,
  19. starter_name='Tester',
  20. starter_slug='tester',
  21. last_post_on=datetime,
  22. last_poster_name='Tester',
  23. last_poster_slug='tester'
  24. )
  25. self.thread.set_title("Test thread")
  26. self.thread.save()
  27. post = Post.objects.create(
  28. category=self.category,
  29. thread=self.thread,
  30. poster_name='Tester',
  31. poster_ip='127.0.0.1',
  32. original="Hello! I am test message!",
  33. parsed="<p>Hello! I am test message!</p>",
  34. checksum="nope",
  35. posted_on=datetime,
  36. updated_on=datetime
  37. )
  38. self.thread.first_post = post
  39. self.thread.last_post = post
  40. self.thread.save()
  41. def test_has_participants(self):
  42. """has_participants returns true if thread has participants"""
  43. User = get_user_model()
  44. users = [
  45. User.objects.create_user("Bob", "bob@boberson.com", "Pass.123"),
  46. User.objects.create_user("Bob2", "bob2@boberson.com", "Pass.123"),
  47. ]
  48. self.assertFalse(has_participants(self.thread))
  49. ThreadParticipant.objects.add_participants(self.thread, users)
  50. self.assertTrue(has_participants(self.thread))
  51. self.thread.threadparticipant_set.all().delete()
  52. self.assertFalse(has_participants(self.thread))
  53. def test_make_participants_aware(self):
  54. """
  55. make_participants_aware sets participants_list and participant
  56. annotations on thread model
  57. """
  58. User = get_user_model()
  59. user = User.objects.create_user("Bob", "bob@boberson.com", "Pass.123")
  60. other_user = User.objects.create_user("Bob2", "bob2@boberson.com", "Pass.123")
  61. self.assertFalse(hasattr(self.thread, 'participants_list'))
  62. self.assertFalse(hasattr(self.thread, 'participant'))
  63. make_participants_aware(user, self.thread)
  64. self.assertTrue(hasattr(self.thread, 'participants_list'))
  65. self.assertTrue(hasattr(self.thread, 'participant'))
  66. self.assertEqual(self.thread.participants_list, [])
  67. self.assertIsNone(self.thread.participant)
  68. ThreadParticipant.objects.set_owner(self.thread, user)
  69. ThreadParticipant.objects.add_participants(self.thread, [other_user])
  70. make_participants_aware(user, self.thread)
  71. self.assertEqual(self.thread.participant.user, user)
  72. for participant in self.thread.participants_list:
  73. if participant.user == user:
  74. break
  75. else:
  76. self.fail("thread.participants_list didn't contain user")
  77. def test_set_owner(self):
  78. """set_owner sets user as thread owner"""
  79. User = get_user_model()
  80. user = User.objects.create_user("Bob", "bob@boberson.com", "Pass.123")
  81. set_owner(self.thread, user)
  82. owner = self.thread.threadparticipant_set.get(is_owner=True)
  83. self.assertEqual(user, owner.user)
  84. def test_set_users_unread_private_threads_sync(self):
  85. """
  86. set_users_unread_private_threads_sync sets sync_unread_private_threads
  87. flag on users provided to true
  88. """
  89. User = get_user_model()
  90. users = [
  91. User.objects.create_user("Bob1", "bob1@boberson.com", "Pass.123"),
  92. User.objects.create_user("Bob2", "bob2@boberson.com", "Pass.123"),
  93. ]
  94. set_users_unread_private_threads_sync(users=users)
  95. for user in users:
  96. User.objects.get(pk=user.pk, sync_unread_private_threads=True)
  97. def test_set_participants_unread_private_threads_sync(self):
  98. """
  99. set_users_unread_private_threads_sync sets sync_unread_private_threads
  100. flag on participants provided to true
  101. """
  102. User = get_user_model()
  103. users = [
  104. User.objects.create_user("Bob1", "bob1@boberson.com", "Pass.123"),
  105. User.objects.create_user("Bob2", "bob2@boberson.com", "Pass.123"),
  106. ]
  107. participants = [ThreadParticipant(user=u) for u in users]
  108. set_users_unread_private_threads_sync(participants=participants)
  109. for user in users:
  110. User.objects.get(pk=user.pk, sync_unread_private_threads=True)
  111. def test_set_participants_users_unread_private_threads_sync(self):
  112. """
  113. set_users_unread_private_threads_sync sets sync_unread_private_threads
  114. flag on users and participants provided to true
  115. """
  116. User = get_user_model()
  117. users = [
  118. User.objects.create_user("Bob1", "bob1@boberson.com", "Pass.123"),
  119. ]
  120. participants = [ThreadParticipant(user=u) for u in users]
  121. users.append(User.objects.create_user("Bob2", "bob2@boberson.com", "Pass.123"))
  122. set_users_unread_private_threads_sync(users=users, participants=participants)
  123. for user in users:
  124. User.objects.get(pk=user.pk, sync_unread_private_threads=True)
  125. def test_set_users_unread_private_threads_sync_exclude_user(self):
  126. """exclude_user kwarg works"""
  127. User = get_user_model()
  128. users = [
  129. User.objects.create_user("Bob1", "bob1@boberson.com", "Pass.123"),
  130. User.objects.create_user("Bob2", "bob2@boberson.com", "Pass.123")
  131. ]
  132. set_users_unread_private_threads_sync(users=users, exclude_user=users[0])
  133. self.assertFalse(User.objects.get(pk=users[0].pk).sync_unread_private_threads)
  134. self.assertTrue(User.objects.get(pk=users[1].pk).sync_unread_private_threads)
  135. def test_set_users_unread_private_threads_sync_noop(self):
  136. """excluding only user is noop"""
  137. User = get_user_model()
  138. user = User.objects.create_user("Bob1", "bob1@boberson.com", "Pass.123")
  139. with self.assertNumQueries(0):
  140. set_users_unread_private_threads_sync(users=[user], exclude_user=user)
  141. self.assertFalse(User.objects.get(pk=user.pk).sync_unread_private_threads)