bans.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. """
  2. API for checking values for bans
  3. Calling this instead of Ban.objects.find_ban is preffered, if you don't want
  4. to use validate_X_banned validators
  5. """
  6. from datetime import timedelta
  7. from django.utils import timezone
  8. from django.utils.dateparse import parse_datetime
  9. from misago.core import cachebuster
  10. from .models import Ban, BanCache
  11. CACHE_SESSION_KEY = 'misago_ip_check'
  12. VERSION_KEY = 'misago_bans'
  13. def get_username_ban(username, registration_only=False):
  14. try:
  15. return Ban.objects.get_username_ban(username, registration_only)
  16. except Ban.DoesNotExist:
  17. return None
  18. def get_email_ban(email, registration_only=False):
  19. try:
  20. return Ban.objects.get_email_ban(email, registration_only)
  21. except Ban.DoesNotExist:
  22. return None
  23. def get_ip_ban(ip, registration_only=False):
  24. try:
  25. return Ban.objects.get_ip_ban(ip, registration_only)
  26. except Ban.DoesNotExist:
  27. return None
  28. def get_user_ban(user):
  29. """
  30. This function checks if user is banned
  31. When user model is available, this is preffered to calling
  32. get_email_ban(user.email) and get_username_ban(user.username)
  33. because it sets ban cache on user model
  34. """
  35. try:
  36. ban_cache = user.ban_cache
  37. if not ban_cache.is_valid:
  38. _set_user_ban_cache(user)
  39. except BanCache.DoesNotExist:
  40. user.ban_cache = BanCache(user=user)
  41. user.ban_cache = _set_user_ban_cache(user)
  42. if user.ban_cache.ban:
  43. return user.ban_cache
  44. else:
  45. return None
  46. def _set_user_ban_cache(user):
  47. ban_cache = user.ban_cache
  48. ban_cache.bans_version = cachebuster.get_version(VERSION_KEY)
  49. try:
  50. user_ban = Ban.objects.get_ban(
  51. username=user.username,
  52. email=user.email,
  53. registration_only=False,
  54. )
  55. ban_cache.ban = user_ban
  56. ban_cache.expires_on = user_ban.expires_on
  57. ban_cache.user_message = user_ban.user_message
  58. ban_cache.staff_message = user_ban.staff_message
  59. except Ban.DoesNotExist:
  60. ban_cache.ban = None
  61. ban_cache.expires_on = None
  62. ban_cache.user_message = None
  63. ban_cache.staff_message = None
  64. ban_cache.save()
  65. return ban_cache
  66. def get_request_ip_ban(request):
  67. """
  68. Utility for checking if request came from banned IP
  69. This check may be performed frequently, which is why there is extra
  70. boilerplate that caches ban check result in session
  71. """
  72. session_ban_cache = _get_session_bancache(request)
  73. if session_ban_cache:
  74. if session_ban_cache['is_banned']:
  75. return session_ban_cache
  76. else:
  77. return False
  78. found_ban = get_ip_ban(request.user_ip)
  79. ban_cache = request.session[CACHE_SESSION_KEY] = {
  80. 'version': cachebuster.get_version(VERSION_KEY),
  81. 'ip': request.user_ip,
  82. }
  83. if found_ban:
  84. if found_ban.expires_on:
  85. ban_cache['expires_on'] = found_ban.expires_on.isoformat()
  86. else:
  87. ban_cache['expires_on'] = None
  88. ban_cache.update({'is_banned': True, 'message': found_ban.user_message})
  89. request.session[CACHE_SESSION_KEY] = ban_cache
  90. return _hydrate_session_cache(request.session[CACHE_SESSION_KEY])
  91. else:
  92. ban_cache['is_banned'] = False
  93. request.session[CACHE_SESSION_KEY] = ban_cache
  94. return None
  95. def _get_session_bancache(request):
  96. try:
  97. ban_cache = request.session[CACHE_SESSION_KEY]
  98. ban_cache = _hydrate_session_cache(ban_cache)
  99. if ban_cache['ip'] != request.user_ip:
  100. return None
  101. if not cachebuster.is_valid(VERSION_KEY, ban_cache['version']):
  102. return None
  103. if ban_cache.get('expires_on'):
  104. if ban_cache['expires_on'] < timezone.today():
  105. return None
  106. return ban_cache
  107. except KeyError:
  108. return None
  109. def _hydrate_session_cache(ban_cache):
  110. hydrated = ban_cache.copy()
  111. if hydrated.get('expires_on'):
  112. hydrated['expires_on'] = parse_datetime(hydrated['expires_on'])
  113. return hydrated
  114. # Utilities for front-end based bans
  115. def ban_user(user, user_message=None, staff_message=None, length=None, expires_on=None):
  116. if not expires_on and length:
  117. expires_on = timezone.now() + timedelta(**length)
  118. ban = Ban.objects.create(
  119. banned_value=user.username.lower(),
  120. user_message=user_message,
  121. staff_message=staff_message,
  122. expires_on=expires_on
  123. )
  124. Ban.objects.invalidate_cache()
  125. return ban
  126. def ban_ip(ip, user_message=None, staff_message=None, length=None, expires_on=None):
  127. if not expires_on and length:
  128. expires_on = timezone.now() + timedelta(**length)
  129. ban = Ban.objects.create(
  130. check_type=Ban.IP,
  131. banned_value=ip,
  132. user_message=user_message,
  133. staff_message=staff_message,
  134. expires_on=expires_on
  135. )
  136. Ban.objects.invalidate_cache()
  137. return ban