pipeline.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. # pylint: disable=keyword-arg-before-vararg
  2. import json
  3. from django.contrib.auth import get_user_model
  4. from django.db import IntegrityError
  5. from django.http import JsonResponse
  6. from django.shortcuts import render
  7. from django.urls import reverse
  8. from django.utils.translation import gettext as _
  9. from social_core.pipeline.partial import partial
  10. from ...core.exceptions import SocialAuthBanned, SocialAuthFailed
  11. from ...legal.models import Agreement
  12. from ..bans import get_request_ip_ban, get_user_ban
  13. from ..forms.register import SocialAuthRegisterForm
  14. from ..models import Ban
  15. from ..registration import (
  16. get_registration_result_json,
  17. save_user_agreements,
  18. send_welcome_email,
  19. )
  20. from ..setupnewuser import setup_new_user
  21. from ..validators import (
  22. ValidationError,
  23. validate_email,
  24. validate_new_registration,
  25. validate_username,
  26. )
  27. from .utils import get_social_auth_backend_name, perpare_username
  28. User = get_user_model()
  29. def validate_ip_not_banned(strategy, details, backend, user=None, *args, **kwargs):
  30. """
  31. Pipeline step that interrupts pipeline if found user is non-staff and IP banned
  32. """
  33. if not user or user.is_staff:
  34. return None
  35. ban = get_request_ip_ban(strategy.request)
  36. if ban:
  37. hydrated_ban = Ban(
  38. check_type=Ban.IP, user_message=ban["message"], expires_on=ban["expires_on"]
  39. )
  40. raise SocialAuthBanned(backend, hydrated_ban)
  41. def validate_user_not_banned(strategy, details, backend, user=None, *args, **kwargs):
  42. """Pipeline step that interrupts pipeline if found user is non-staff and banned"""
  43. if not user or user.is_staff:
  44. return None
  45. user_ban = get_user_ban(user, strategy.request.cache_versions)
  46. if user_ban:
  47. raise SocialAuthBanned(backend, user_ban)
  48. def associate_by_email(strategy, details, backend, user=None, *args, **kwargs):
  49. """If user with e-mail from provider exists in database and is active,
  50. this step authenticates them.
  51. """
  52. if user:
  53. return None
  54. email = details.get("email")
  55. if not email:
  56. return None
  57. try:
  58. user = User.objects.get_by_email(email)
  59. except User.DoesNotExist:
  60. return None
  61. backend_name = get_social_auth_backend_name(backend.name)
  62. if not user.is_active:
  63. raise SocialAuthFailed(
  64. backend,
  65. _(
  66. "The e-mail address associated with your %(backend)s account is "
  67. "not available for use on this site."
  68. )
  69. % {"backend": backend_name},
  70. )
  71. if user.requires_activation_by_admin:
  72. raise SocialAuthFailed(
  73. backend,
  74. _(
  75. "Your account has to be activated by site administrator "
  76. "before you will be able to sign in with %(backend)s."
  77. )
  78. % {"backend": backend_name},
  79. )
  80. return {"user": user, "is_new": False}
  81. def get_username(strategy, details, backend, user=None, *args, **kwargs):
  82. """Resolve valid username for use in new account"""
  83. if user:
  84. return None
  85. settings = strategy.request.settings
  86. username = perpare_username(details.get("username", ""))
  87. full_name = perpare_username(details.get("full_name", ""))
  88. first_name = perpare_username(details.get("first_name", ""))
  89. last_name = perpare_username(details.get("last_name", ""))
  90. names_to_try = [username, first_name]
  91. if username:
  92. names_to_try.append(username)
  93. if first_name:
  94. names_to_try.append(first_name)
  95. if last_name:
  96. # if first name is taken, try first name + first char of last name
  97. names_to_try.append(first_name + last_name[0])
  98. if full_name:
  99. names_to_try.append(full_name)
  100. username_length_max = settings.username_length_max
  101. for name in names_to_try:
  102. if len(name) > username_length_max:
  103. names_to_try.append(name[:username_length_max])
  104. for name in filter(bool, names_to_try):
  105. try:
  106. validate_username(settings, name)
  107. return {"clean_username": name}
  108. except ValidationError:
  109. pass
  110. def create_user(strategy, details, backend, user=None, *args, **kwargs):
  111. """Aggressively attempt to register and sign in new user"""
  112. if user:
  113. return None
  114. request = strategy.request
  115. settings = request.settings
  116. email = details.get("email")
  117. username = kwargs.get("clean_username")
  118. if not email or not username:
  119. return None
  120. try:
  121. validate_email(email)
  122. validate_new_registration(request, {"email": email, "username": username})
  123. except ValidationError:
  124. return None
  125. activation_kwargs = {}
  126. if settings.account_activation == "admin":
  127. activation_kwargs = {"requires_activation": User.ACTIVATION_ADMIN}
  128. new_user = User.objects.create_user(
  129. username, email, joined_from_ip=request.user_ip, **activation_kwargs
  130. )
  131. setup_new_user(settings, new_user)
  132. send_welcome_email(request, new_user)
  133. return {"user": new_user, "is_new": True}
  134. @partial
  135. def create_user_with_form(strategy, details, backend, user=None, *args, **kwargs):
  136. """
  137. create_user lets user confirm account creation by entering final username or email
  138. """
  139. if user:
  140. return None
  141. request = strategy.request
  142. settings = request.settings
  143. backend_name = get_social_auth_backend_name(backend.name)
  144. if request.method == "POST":
  145. try:
  146. request_data = json.loads(request.body)
  147. except (TypeError, ValueError):
  148. request_data = request.POST.copy()
  149. form = SocialAuthRegisterForm(
  150. request_data, request=request, agreements=Agreement.objects.get_agreements()
  151. )
  152. if not form.is_valid():
  153. return JsonResponse(form.errors, status=400)
  154. email_verified = form.cleaned_data["email"] == details.get("email")
  155. activation_kwargs = {}
  156. if settings.account_activation == "admin":
  157. activation_kwargs = {"requires_activation": User.ACTIVATION_ADMIN}
  158. elif settings.account_activation == "user" and not email_verified:
  159. activation_kwargs = {"requires_activation": User.ACTIVATION_USER}
  160. try:
  161. new_user = User.objects.create_user(
  162. form.cleaned_data["username"],
  163. form.cleaned_data["email"],
  164. joined_from_ip=request.user_ip,
  165. **activation_kwargs
  166. )
  167. setup_new_user(settings, new_user)
  168. except IntegrityError:
  169. return JsonResponse(
  170. {"__all__": _("Please try resubmitting the form.")}, status=400
  171. )
  172. save_user_agreements(new_user, form)
  173. send_welcome_email(request, new_user)
  174. return {"user": new_user, "is_new": True}
  175. request.frontend_context["SOCIAL_AUTH"] = {
  176. "backend_name": backend_name,
  177. "step": "register",
  178. "email": details.get("email"),
  179. "username": kwargs.get("clean_username"),
  180. "url": reverse("social:complete", kwargs={"backend": backend.name}),
  181. }
  182. return render(request, "misago/socialauth.html", {"backend_name": backend_name})
  183. @partial
  184. def require_activation(
  185. strategy, details, backend, user=None, is_new=False, *args, **kwargs
  186. ):
  187. if not user:
  188. # Social auth pipeline has entered corrupted state
  189. # Remove partial auth state and redirect user to beginning
  190. partial_token = strategy.session.get("partial_pipeline_token")
  191. if partial_token:
  192. strategy.clean_partial_pipeline(partial_token)
  193. return None
  194. if not user.requires_activation:
  195. return None
  196. request = strategy.request
  197. backend_name = get_social_auth_backend_name(backend.name)
  198. response_data = get_registration_result_json(user)
  199. response_data.update({"step": "done", "backend_name": backend_name})
  200. if request.method == "POST":
  201. # we are carrying on from requestration request
  202. return JsonResponse(response_data)
  203. request.frontend_context["SOCIAL_AUTH"] = response_data
  204. request.frontend_context["SOCIAL_AUTH"].update(
  205. {"url": reverse("social:complete", kwargs={"backend": backend.name})}
  206. )
  207. return render(request, "misago/socialauth.html", {"backend_name": backend_name})