Просмотр исходного кода

Fix normalizing/filtering user data (#1441)

* Fix normalizing/filtering user data

* Pass user data copy to filter

* Improve OAuth 2 data cleaning
Rafał Pitoń 2 лет назад
Родитель
Сommit
d1efede5e1

+ 51 - 0
misago/oauth2/tests/test_get_user_data.py

@@ -382,6 +382,57 @@ def test_user_data_json_values_are_mapped_to_result(mock_request):
     oauth2_json_id_path="id",
     oauth2_json_name_path="user.profile.name",
     oauth2_json_email_path="user.profile.email",
+    oauth2_json_avatar_path="user.profile.avatar",
+)
+def test_user_data_json_values_are_none_when_not_found(mock_request):
+    user_data = {
+        "id": "7dds8a7dd89sa",
+        "name": "Aerith",
+        "email": "aerith@example.com",
+        "avatar": "https://example.com/avatar.png",
+    }
+
+    get_mock = Mock(
+        return_value=Mock(
+            status_code=200,
+            json=Mock(
+                return_value={
+                    "profile_id": user_data["id"],
+                    "user": {
+                        "data": {
+                            "name": user_data["name"],
+                            "email": user_data["email"],
+                            "avatar": user_data["avatar"],
+                        }
+                    },
+                },
+            ),
+        ),
+    )
+
+    with patch("requests.get", get_mock):
+        assert get_user_data(mock_request, ACCESS_TOKEN) == {
+            "id": None,
+            "name": None,
+            "email": None,
+            "avatar": None,
+        }
+
+        get_mock.assert_called_once_with(
+            f"https://example.com/oauth2/data?type=user&atoken={ACCESS_TOKEN}",
+            headers={},
+            timeout=REQUESTS_TIMEOUT,
+        )
+
+
+@override_dynamic_settings(
+    oauth2_user_url="https://example.com/oauth2/data?type=user",
+    oauth2_user_method="GET",
+    oauth2_user_token_name="atoken",
+    oauth2_user_token_location="QUERY",
+    oauth2_json_id_path="id",
+    oauth2_json_name_path="user.profile.name",
+    oauth2_json_email_path="user.profile.email",
     oauth2_json_avatar_path="",
 )
 def test_user_data_skips_avatar_if_path_is_not_set(mock_request):

+ 18 - 0
misago/oauth2/tests/test_getting_json_values.py

@@ -29,3 +29,21 @@ def test_json_value_is_returned_from_nested_objects():
         )
         == "ok"
     )
+
+
+def test_none_is_returned_from_nested_objects():
+    assert (
+        get_value_from_json(
+            "val.child.val3",
+            {
+                "val2": "nope",
+                "val": {
+                    "child": {
+                        "val2": "nope",
+                        "val": "ok",
+                    },
+                },
+            },
+        )
+        is None
+    )

+ 64 - 2
misago/oauth2/tests/test_oauth2_complete_view.py

@@ -658,7 +658,7 @@ def test_oauth2_complete_view_returns_error_400_if_access_token_is_rejected(
 
 @responses.activate
 @override_dynamic_settings(**TEST_SETTINGS)
-def test_oauth2_complete_view_returns_error_400_if_user_data_didnt_validate(
+def test_oauth2_complete_view_returns_error_400_if_user_email_was_missing(
     user, client, dynamic_settings
 ):
     assert dynamic_settings.enable_oauth2_client is True
@@ -699,7 +699,69 @@ def test_oauth2_complete_view_returns_error_400_if_user_data_didnt_validate(
             "id": 1234,
             "profile": {
                 "name": "John Doe",
-                "email": "",
+            },
+        },
+        match=[
+            header_matcher({"Authorization": f"Bearer {access_token}"}),
+        ],
+    )
+
+    response = client.get(
+        "%s?state=%s&code=%s"
+        % (
+            reverse("misago:oauth2-complete"),
+            session_state,
+            code_grant,
+        )
+    )
+
+    assert_contains(response, "Enter a valid email address.", 400)
+
+
+@responses.activate
+@override_dynamic_settings(**TEST_SETTINGS)
+def test_oauth2_complete_view_returns_error_400_if_user_email_was_invalid(
+    user, client, dynamic_settings
+):
+    assert dynamic_settings.enable_oauth2_client is True
+
+    Subject.objects.create(sub="1234", user=user)
+
+    code_grant = "12345grant"
+    session_state = "12345state"
+    access_token = "12345token"
+
+    session = client.session
+    session[SESSION_STATE] = session_state
+    session.save()
+
+    responses.post(
+        "https://example.com/oauth2/token",
+        json={
+            "token": {
+                "bearer": access_token,
+            },
+        },
+        match=[
+            urlencoded_params_matcher(
+                {
+                    "grant_type": "authorization_code",
+                    "client_id": "oauth2_client_id",
+                    "client_secret": "oauth2_client_secret",
+                    "redirect_uri": "http://testserver/oauth2/complete/",
+                    "code": code_grant,
+                },
+            ),
+        ],
+    )
+
+    responses.post(
+        "https://example.com/oauth2/user",
+        json={
+            "id": 1234,
+            "profile": {
+                "name": "John Doe",
+                "email": "invalid",
             },
         },
         match=[

+ 198 - 0
misago/oauth2/tests/test_user_data_filter.py

@@ -43,6 +43,136 @@ def test_existing_user_data_is_filtered_using_default_filters(user, request):
     }
 
 
+def test_empty_user_name_is_replaced_with_placeholder_one(db, request):
+    filtered_data = filter_user_data(
+        request,
+        None,
+        {
+            "id": "242132",
+            "name": None,
+            "email": "oauth2@example.com",
+            "avatar": None,
+        },
+    )
+
+    assert len(filtered_data["name"].strip())
+
+
+def test_missing_user_name_is_replaced_with_placeholder_one(db, request):
+    filtered_data = filter_user_data(
+        request,
+        None,
+        {
+            "id": "242132",
+            "name": None,
+            "email": "oauth2@example.com",
+            "avatar": None,
+        },
+    )
+
+    assert len(filtered_data["name"].strip())
+
+
+def test_user_name_is_converted_to_str(db, request):
+    filtered_data = filter_user_data(
+        request,
+        None,
+        {
+            "id": "242132",
+            "name": 123456,
+            "email": "oauth2@example.com",
+            "avatar": None,
+        },
+    )
+
+    assert filtered_data == {
+        "id": "242132",
+        "name": "123456",
+        "email": "oauth2@example.com",
+        "avatar": None,
+    }
+
+
+def test_user_email_is_converted_to_str(db, request):
+    filtered_data = filter_user_data(
+        request,
+        None,
+        {
+            "id": "242132",
+            "name": "New User",
+            "email": [1, 2, 4, "d"],
+            "avatar": None,
+        },
+    )
+
+    assert filtered_data == {
+        "id": "242132",
+        "name": "New_User",
+        "email": "[1, 2, 4, 'd']",
+        "avatar": None,
+    }
+
+
+def test_missing_user_email_is_set_as_empty_str(db, request):
+    filtered_data = filter_user_data(
+        request,
+        None,
+        {
+            "id": "242132",
+            "name": "New User",
+            "email": None,
+            "avatar": None,
+        },
+    )
+
+    assert filtered_data == {
+        "id": "242132",
+        "name": "New_User",
+        "email": "",
+        "avatar": None,
+    }
+
+
+def test_user_avatar_is_converted_to_str(db, request):
+    filtered_data = filter_user_data(
+        request,
+        None,
+        {
+            "id": "242132",
+            "name": "New User",
+            "email": "oauth2@example.com",
+            "avatar": 123456,
+        },
+    )
+
+    assert filtered_data == {
+        "id": "242132",
+        "name": "New_User",
+        "email": "oauth2@example.com",
+        "avatar": "123456",
+    }
+
+
+def test_empty_user_avatar_is_filtered_to_none(db, request):
+    filtered_data = filter_user_data(
+        request,
+        None,
+        {
+            "id": "242132",
+            "name": "New User",
+            "email": "oauth2@example.com",
+            "avatar": "",
+        },
+    )
+
+    assert filtered_data == {
+        "id": "242132",
+        "name": "New_User",
+        "email": "oauth2@example.com",
+        "avatar": None,
+    }
+
+
 def user_request_filter(request, user, user_data):
     assert request
 
@@ -144,3 +274,71 @@ def test_existing_user_data_is_filtered_using_custom_filters(user, request):
             "email": "filtered_oauth2@example.com",
             "avatar": None,
         }
+
+
+def test_original_user_data_is_not_mutated_by_default_filters(user, request):
+    user_data = {
+        "id": "1234",
+        "name": "New User",
+        "email": "oauth2@example.com",
+        "avatar": None,
+    }
+
+    filtered_data = filter_user_data(request, user, user_data)
+    assert filtered_data == {
+        "id": "1234",
+        "name": "New_User",
+        "email": "oauth2@example.com",
+        "avatar": None,
+    }
+
+    assert user_data == {
+        "id": "1234",
+        "name": "New User",
+        "email": "oauth2@example.com",
+        "avatar": None,
+    }
+
+
+def test_original_user_data_is_not_mutated_by_custom_filters(user, request):
+    def user_is_set_filter(request, user_obj, user_data):
+        assert user_obj == user
+        return {
+            "id": str(user_obj.id),
+            "name": user_data["name"],
+            "email": user_data["email"],
+            "avatar": user_data["avatar"],
+        }
+
+    user_data = {
+        "id": "1234",
+        "name": "New User",
+        "email": "oauth2@example.com",
+        "avatar": None,
+    }
+
+    with patch(
+        "misago.oauth2.validation.oauth2_user_data_filters",
+        [
+            user_request_filter,
+            user_is_set_filter,
+            user_id_filter,
+            user_name_filter,
+            user_email_filter,
+        ],
+    ):
+        filtered_data = filter_user_data(request, user, user_data)
+
+        assert filtered_data == {
+            "id": "".join(reversed(str(user.id))),
+            "name": "resU weN",
+            "email": "filtered_oauth2@example.com",
+            "avatar": None,
+        }
+
+    assert user_data == {
+        "id": "1234",
+        "name": "New User",
+        "email": "oauth2@example.com",
+        "avatar": None,
+    }

+ 1 - 0
misago/oauth2/user.py

@@ -15,6 +15,7 @@ def get_user_from_data(request, user_data):
     if not user_data["id"]:
         raise OAuth2UserIdNotProvidedError()
 
+    user_data["id"] = str(user_data["id"])
     user = get_user_by_subject(user_data["id"])
     if not user and user_data["email"]:
         user = get_user_by_email(user_data["id"], user_data["email"])

+ 21 - 8
misago/oauth2/validation.py

@@ -26,6 +26,10 @@ def validate_user_data(request, user, user_data):
     filtered_data = filter_user_data(request, user, user_data)
 
     try:
+        validate_username_content(filtered_data["name"])
+        validate_username_length(UsernameSettings, filtered_data["name"])
+        validate_email(filtered_data["email"])
+
         errors_list = []
 
         def add_error(_field_unused: str | None, error: str | ValidationError):
@@ -34,10 +38,6 @@ def validate_user_data(request, user, user_data):
 
             errors_list.append(str(error))
 
-        validate_username_content(user_data["name"])
-        validate_username_length(UsernameSettings, user_data["name"])
-        validate_email(user_data["email"])
-
         validate_new_registration(request, filtered_data, add_error)
     except ValidationError as exc:
         raise OAuth2UserDataValidationError(error_list=[str(exc.message)])
@@ -49,19 +49,32 @@ def validate_user_data(request, user, user_data):
 
 
 def filter_user_data(request, user, user_data):
+    filtered_data = {
+        "id": user_data["id"],
+        "name": str(user_data["name"] or "").strip(),
+        "email": str(user_data["email"] or "").strip(),
+        "avatar": filter_user_avatar(user_data["avatar"]),
+    }
+
     if oauth2_user_data_filters:
         return filter_user_data_with_filters(
-            request, user, user_data, oauth2_user_data_filters
+            request, user, filtered_data, oauth2_user_data_filters
         )
     else:
-        user_data["name"] = filter_name(user, user_data["name"])
+        filtered_data["name"] = filter_name(user, filtered_data["name"])
 
-    return user_data
+    return filtered_data
+
+
+def filter_user_avatar(user_avatar):
+    if user_avatar:
+        return str(user_avatar).strip() or None
+    return None
 
 
 def filter_user_data_with_filters(request, user, user_data, filters):
     for filter_user_data in filters:
-        user_data = filter_user_data(request, user, user_data) or user_data
+        user_data = filter_user_data(request, user, user_data.copy()) or user_data
     return user_data