Browse Source

Implement auth handlers and spec handlers

Alec Nikolas Reiter 7 years ago
parent
commit
78482422a9

+ 19 - 0
flaskbb/auth/plugins.py

@@ -14,6 +14,9 @@ from flask_login import login_user
 from . import impl
 from . import impl
 from ..user.models import User
 from ..user.models import User
 from ..utils.settings import flaskbb_config
 from ..utils.settings import flaskbb_config
+from .services.authentication import (BlockUnactivatedUser, ClearFailedLogins,
+                                      DefaultFlaskBBAuthProvider,
+                                      MarkFailedLogin)
 from .services.factories import account_activator_factory
 from .services.factories import account_activator_factory
 
 
 
 
@@ -34,3 +37,19 @@ def flaskbb_event_user_registered(username):
     else:
     else:
         login_user(user)
         login_user(user)
         flash(_("Thanks for registering."), "success")
         flash(_("Thanks for registering."), "success")
+
+
+@impl(trylast=True)
+def flaskbb_authenticate(identifier, secret):
+    return DefaultFlaskBBAuthProvider().authenticate(identifier, secret)
+
+
+@impl(tryfirst=True)
+def flaskbb_post_authenticate(user):
+    ClearFailedLogins().handle_post_auth(user)
+    BlockUnactivatedUser().handle_post_auth(user)
+
+
+@impl
+def flaskbb_authentication_failed(identifier):
+    MarkFailedLogin().handle_authentication_failure(identifier)

+ 108 - 0
flaskbb/auth/services/authentication.py

@@ -0,0 +1,108 @@
+# -*- coding: utf-8 -*-
+"""
+    flaskbb.auth.services.authentication
+    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    Authentication providers, handlers and post-processors
+    in FlaskBB
+
+    :copyright: (c) 2014-2018 the FlaskBB Team.
+    :license: BSD, see LICENSE for more details
+"""
+
+from datetime import datetime
+
+import attr
+from flask_babelplus import gettext as _
+from pytz import UTC
+from werkzeug import check_password_hash
+
+from ...core.auth.authentication import (AuthenticationFailureHandler,
+                                         AuthenticationProvider,
+                                         PostAuthenticationHandler,
+                                         StopAuthentication)
+from ...extensions import db
+from ...user.models import User
+from ...utils.helpers import time_utcnow
+
+
+@attr.s(frozen=True)
+class FailedLoginConfiguration(object):
+    limit = attr.ib()
+    lockout_window = attr.ib()
+
+
+class BlockTooManyFailedLogins(AuthenticationProvider):
+
+    def __init__(self, configuration):
+        self.configuration = configuration
+
+    def authenticate(self, identifier, secret):
+        user = User.query.filter(
+            db.or_(User.username == identifier, User.email == identifier)
+        ).first()
+
+        if user is not None:
+            attempts = user.login_attempts
+            last_attempt = user.last_failed_login or datetime.min.replace(
+                tzinfo=UTC
+            )
+            reached_attempt_limit = attempts >= self.configuration.limit
+            inside_lockout = (
+                last_attempt + self.configuration.lockout_window
+            ) >= time_utcnow()
+
+            if reached_attempt_limit and inside_lockout:
+                raise StopAuthentication(
+                    _(
+                        "Your account is currently locked out due to too many "
+                        "failed login attempts"
+                    )
+                )
+
+
+class DefaultFlaskBBAuthProvider(AuthenticationProvider):
+
+    def authenticate(self, identifier, secret):
+        user = User.query.filter(
+            db.or_(User.username == identifier, User.email == identifier)
+        ).first()
+
+        if user is not None:
+            if check_password_hash(user.password, secret):
+                return user
+            return None
+
+        check_password_hash("dummy password", secret)
+        return None
+
+
+class MarkFailedLogin(AuthenticationFailureHandler):
+
+    def handle_authentication_failure(self, identifier):
+        user = User.query.filter(
+            db.or_(User.username == identifier, User.email == identifier)
+        ).first()
+
+        if user is not None:
+            user.login_attempts += 1
+            user.last_failed_login = time_utcnow()
+
+
+class BlockUnactivatedUser(PostAuthenticationHandler):
+
+    def handle_post_auth(self, user):
+        if not user.activated:  # pragma: no branch
+            raise StopAuthentication(
+                _(
+                    "In order to use your account you have to "
+                    "activate it through the link we have sent to "
+                    "your email address."
+                )
+            )
+
+
+class ClearFailedLogins(PostAuthenticationHandler):
+
+    def handle_post_auth(self, user):
+        user.login_attempts = 0

+ 7 - 5
flaskbb/auth/services/registration.py

@@ -9,8 +9,7 @@
     :license: BSD, see LICENSE for more details
     :license: BSD, see LICENSE for more details
 """
 """
 
 
-from collections import namedtuple
-
+import attr
 from flask_babelplus import gettext as _
 from flask_babelplus import gettext as _
 from sqlalchemy import func
 from sqlalchemy import func
 
 
@@ -22,9 +21,12 @@ __all__ = (
     "UsernameUniquenessValidator"
     "UsernameUniquenessValidator"
 )
 )
 
 
-UsernameRequirements = namedtuple(
-    'UsernameRequirements', ['min', 'max', 'blacklist']
-)
+
+@attr.s(hash=False, repr=True, frozen=True, cmp=False)
+class UsernameRequirements(object):
+    min = attr.ib()
+    max = attr.ib()
+    blacklist = attr.ib()
 
 
 
 
 class UsernameValidator(UserValidator):
 class UsernameValidator(UserValidator):

+ 18 - 0
flaskbb/core/auth/authentication.py

@@ -30,3 +30,21 @@ class AuthenticationProvider(ABC):
 
 
     def __call__(self, identifier, secret):
     def __call__(self, identifier, secret):
         return self.authenticate(identifier, secret)
         return self.authenticate(identifier, secret)
+
+
+class AuthenticationFailureHandler(ABC):
+    @abstractmethod
+    def handle_authentication_failure(self, identifier):
+        pass
+
+    def __call__(self, identifier):
+        self.handle_authentication_failure(identifier)
+
+
+class PostAuthenticationHandler(ABC):
+    @abstractmethod
+    def handle_post_auth(self, user):
+        pass
+
+    def __call__(self, user):
+        self.handle_post_auth(user)

+ 0 - 1
flaskbb/core/auth/registration.py

@@ -15,7 +15,6 @@ from abc import abstractmethod
 import attr
 import attr
 
 
 from ..._compat import ABC
 from ..._compat import ABC
-from ..exceptions import ValidationError, StopValidation
 
 
 
 
 @attr.s(hash=True, cmp=False, repr=True, frozen=True)
 @attr.s(hash=True, cmp=False, repr=True, frozen=True)

+ 107 - 0
tests/unit/auth/test_authentication.py

@@ -0,0 +1,107 @@
+from datetime import datetime, timedelta
+
+import pytest
+from freezegun import freeze_time
+from pytz import UTC
+
+from flaskbb.auth.services import authentication as auth
+from flaskbb.core.auth.authentication import StopAuthentication
+
+pytestmark = pytest.mark.usefixtures('default_settings')
+
+
+class TestBlockTooManyFailedLogins(object):
+    provider = auth.BlockTooManyFailedLogins(
+        auth.
+        FailedLoginConfiguration(limit=1, lockout_window=timedelta(hours=1))
+    )
+
+    @freeze_time(datetime(2018, 1, 1, 13, 30))
+    def test_raises_StopAuthentication_if_user_is_at_limit_and_inside_window(
+            self, Fred
+    ):
+        Fred.last_failed_login = datetime(2018, 1, 1, 14, tzinfo=UTC)
+        Fred.login_attempts = 1
+
+        with pytest.raises(StopAuthentication) as excinfo:
+            self.provider.authenticate(Fred.email, 'not considered')
+
+        assert 'too many failed login attempts' in excinfo.value.reason
+
+    @freeze_time(datetime(2018, 1, 1, 14))
+    def test_doesnt_raise_if_user_is_at_limit_but_outside_window(self, Fred):
+        Fred.last_failed_login = datetime(2018, 1, 1, 12, tzinfo=UTC)
+        Fred.login_attempts = 1
+
+        self.provider.authenticate(Fred.email, 'not considered')
+
+    def test_doesnt_raise_if_user_is_below_limit_but_inside_window(self, Fred):
+        Fred.last_failed_login = datetime(2018, 1, 1, 12, tzinfo=UTC)
+        Fred.login_attempts = 0
+        self.provider.authenticate(Fred.email, 'not considered')
+
+    def test_handles_if_user_has_no_failed_login_attempts(self, Fred):
+        Fred.login_attempts = 0
+        Fred.last_failed_login = None
+
+        self.provider.authenticate(Fred.email, 'not considered')
+
+    def test_handles_if_user_doesnt_exist(self, Fred):
+        self.provider.authenticate('completely@made.up', 'not considered')
+
+
+class TestDefaultFlaskBBAuthProvider(object):
+    provider = auth.DefaultFlaskBBAuthProvider()
+
+    def test_returns_None_if_user_doesnt_exist(self, Fred):
+        result = self.provider.authenticate('completely@made.up', 'lolnope')
+
+        assert result is None
+
+    def test_returns_None_if_password_doesnt_match(self, Fred):
+        result = self.provider.authenticate(Fred.email, 'stillnotit')
+
+        assert result is None
+
+    def test_returns_user_if_identifer_and_password_match(self, Fred):
+        result = self.provider.authenticate(Fred.email, 'fred')
+
+        assert result.username == Fred.username
+
+
+class TestMarkFailedLoginAttempt(object):
+    handler = auth.MarkFailedLogin()
+
+    @freeze_time(datetime(2018, 1, 1, 12))
+    def test_increments_users_failed_logins_and_sets_last_fail_date(
+            self, Fred
+    ):
+        Fred.login_attempts = 0
+        Fred.last_failed_login = datetime.min.replace(tzinfo=UTC)
+        self.handler.handle_authentication_failure(Fred.email)
+        assert Fred.login_attempts == 1
+        assert Fred.last_failed_login == datetime.now(UTC)
+
+    def test_handles_if_user_doesnt_exist(self, Fred):
+        self.handler.handle_authentication_failure('completely@made.up')
+
+
+class TestClearFailedLogins(object):
+    handler = auth.ClearFailedLogins()
+
+    def test_clears_failed_logins_attempts(self, Fred):
+        Fred.login_attempts = 1000
+        self.handler.handle_post_auth(Fred)
+        assert Fred.login_attempts == 0
+
+
+class TestBlockUnactivatedUser(object):
+    handler = auth.BlockUnactivatedUser()
+
+    def test_raises_StopAuthentication_if_user_is_unactivated(
+            self, unactivated_user
+    ):
+        with pytest.raises(StopAuthentication) as excinfo:
+            self.handler.handle_post_auth(unactivated_user)
+
+        assert 'In order to use your account' in excinfo.value.reason