test_participants.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. from django.contrib.auth import get_user_model
  2. from django.test import TestCase
  3. from django.utils import timezone
  4. from misago.categories.models import Category
  5. from ..models import Post, Thread, ThreadParticipant
  6. from ..participants import (
  7. has_participants,
  8. make_participants_aware,
  9. remove_participant,
  10. set_owner,
  11. set_users_unread_private_threads_sync,
  12. )
  13. class ParticipantsTests(TestCase):
  14. def setUp(self):
  15. datetime = timezone.now()
  16. self.category = Category.objects.all_categories()[:1][0]
  17. self.thread = Thread(
  18. category=self.category,
  19. started_on=datetime,
  20. starter_name='Tester',
  21. starter_slug='tester',
  22. last_post_on=datetime,
  23. last_poster_name='Tester',
  24. last_poster_slug='tester'
  25. )
  26. self.thread.set_title("Test thread")
  27. self.thread.save()
  28. post = Post.objects.create(
  29. category=self.category,
  30. thread=self.thread,
  31. poster_name='Tester',
  32. poster_ip='127.0.0.1',
  33. original="Hello! I am test message!",
  34. parsed="<p>Hello! I am test message!</p>",
  35. checksum="nope",
  36. posted_on=datetime,
  37. updated_on=datetime
  38. )
  39. self.thread.first_post = post
  40. self.thread.last_post = post
  41. self.thread.save()
  42. def test_has_participants(self):
  43. """has_participants returns true if thread has participants"""
  44. User = get_user_model()
  45. users = [
  46. User.objects.create_user("Bob", "bob@boberson.com", "Pass.123"),
  47. User.objects.create_user("Bob2", "bob2@boberson.com", "Pass.123"),
  48. ]
  49. self.assertFalse(has_participants(self.thread))
  50. ThreadParticipant.objects.add_participants(self.thread, users)
  51. self.assertTrue(has_participants(self.thread))
  52. self.thread.threadparticipant_set.all().delete()
  53. self.assertFalse(has_participants(self.thread))
  54. def test_make_participants_aware(self):
  55. """
  56. make_participants_aware sets participants_list and participant
  57. annotations on thread model
  58. """
  59. User = get_user_model()
  60. user = User.objects.create_user("Bob", "bob@boberson.com", "Pass.123")
  61. other_user = User.objects.create_user("Bob2", "bob2@boberson.com", "Pass.123")
  62. self.assertFalse(hasattr(self.thread, 'participants_list'))
  63. self.assertFalse(hasattr(self.thread, 'participant'))
  64. make_participants_aware(user, self.thread)
  65. self.assertTrue(hasattr(self.thread, 'participants_list'))
  66. self.assertTrue(hasattr(self.thread, 'participant'))
  67. self.assertEqual(self.thread.participants_list, [])
  68. self.assertIsNone(self.thread.participant)
  69. ThreadParticipant.objects.set_owner(self.thread, user)
  70. ThreadParticipant.objects.add_participants(self.thread, [other_user])
  71. make_participants_aware(user, self.thread)
  72. self.assertEqual(self.thread.participant.user, user)
  73. for participant in self.thread.participants_list:
  74. if participant.user == user:
  75. break
  76. else:
  77. self.fail("thread.participants_list didn't contain user")
  78. def test_remove_participant(self):
  79. """remove_participant removes user from thread"""
  80. User = get_user_model()
  81. user = User.objects.create_user("Bob", "bob@boberson.com", "Pass.123")
  82. set_owner(self.thread, user)
  83. remove_participant(self.thread, user)
  84. self.assertEqual(self.thread.participants.count(), 0)
  85. with self.assertRaises(ThreadParticipant.DoesNotExist):
  86. self.thread.threadparticipant_set.get(user=user)
  87. def test_set_owner(self):
  88. """set_owner sets user as thread owner"""
  89. User = get_user_model()
  90. user = User.objects.create_user("Bob", "bob@boberson.com", "Pass.123")
  91. set_owner(self.thread, user)
  92. owner = self.thread.threadparticipant_set.get(is_owner=True)
  93. self.assertEqual(user, owner.user)
  94. def test_set_users_unread_private_threads_sync(self):
  95. """
  96. set_users_unread_private_threads_sync sets sync_unread_private_threads
  97. flag on user model to true
  98. """
  99. User = get_user_model()
  100. users = [
  101. User.objects.create_user("Bob1", "bob1@boberson.com", "Pass.123"),
  102. User.objects.create_user("Bob2", "bob2@boberson.com", "Pass.123"),
  103. ]
  104. set_users_unread_private_threads_sync(users)
  105. for user in users:
  106. User.objects.get(pk=user.pk, sync_unread_private_threads=True)