diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b13db2f0..4d67a0f39 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added * #1637 Support for Django 6.0 +* #670 Dynamic Client Registration Protocol (RFC 7591 / RFC 7592) — `DynamicClientRegistrationView` and `DynamicClientRegistrationManagementView` with configurable permission classes and registration access tokens ### Removed * #1636 Remove support for Python 3.8 and 3.9 diff --git a/docs/views/dynamic_client_registration.rst b/docs/views/dynamic_client_registration.rst new file mode 100644 index 000000000..2bed140cc --- /dev/null +++ b/docs/views/dynamic_client_registration.rst @@ -0,0 +1,158 @@ +Dynamic Client Registration +=========================== + +Django OAuth Toolkit includes support for the OAuth 2.0 Dynamic Client Registration Protocol +(`RFC 7591 `_) and the OAuth 2.0 Dynamic Client +Registration Management Protocol (`RFC 7592 `_). + +These views are automatically included in ``base_urlpatterns`` when you use +``include("oauth2_provider.urls")``. + + +Endpoints +--------- + +POST /o/register/ +~~~~~~~~~~~~~~~~~ + +Creates a new OAuth2 application (RFC 7591). Authentication is controlled by +``DCR_REGISTRATION_PERMISSION_CLASSES``. + +**Request body (JSON):** + +.. code-block:: json + + { + "redirect_uris": ["https://example.com/callback"], + "grant_types": ["authorization_code"], + "client_name": "My Application", + "token_endpoint_auth_method": "client_secret_basic" + } + +**Response (201):** + +.. code-block:: json + + { + "client_id": "abc123", + "client_secret": "...", + "redirect_uris": ["https://example.com/callback"], + "grant_types": ["authorization_code", "refresh_token"], + "token_endpoint_auth_method": "client_secret_basic", + "client_name": "My Application", + "registration_access_token": "...", + "registration_client_uri": "https://example.com/o/register/abc123/" + } + +GET/PUT/DELETE /o/register/{client_id}/ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Read, update, or delete the client configuration (RFC 7592). Requires a +``Bearer {registration_access_token}`` header issued during registration. + +- **GET** — returns current client metadata (same format as registration response) +- **PUT** — accepts the same JSON body as POST; updates the application +- **DELETE** — deletes the application and all associated tokens; returns 204 + + +Field Mapping +------------- + ++-------------------------------------+-----------------------------------+----------------------------------+ +| RFC 7591 field | DOT Application field | Notes | ++=====================================+===================================+==================================+ +| ``redirect_uris`` (array) | ``redirect_uris`` (space-joined) | | ++-------------------------------------+-----------------------------------+----------------------------------+ +| ``client_name`` | ``name`` | | ++-------------------------------------+-----------------------------------+----------------------------------+ +| ``grant_types`` (array) | ``authorization_grant_type`` | ``refresh_token`` is ignored; | +| | | only one non-refresh grant type | +| | | is supported per application | ++-------------------------------------+-----------------------------------+----------------------------------+ +| ``token_endpoint_auth_method: none``| ``client_type = "public"`` | | ++-------------------------------------+-----------------------------------+----------------------------------+ +| ``token_endpoint_auth_method: ...`` | ``client_type = "confidential"`` | Default | ++-------------------------------------+-----------------------------------+----------------------------------+ + + +Configuration +------------- + +Add the following keys to ``OAUTH2_PROVIDER`` in your Django settings. All are optional and have +sensible defaults. + +``DCR_ENABLED`` + Set to ``True`` to activate the Dynamic Client Registration endpoints. + When ``False`` (the default), both endpoints return ``404`` even though the + URL patterns are always registered. + + Default: ``False`` + +``DCR_REGISTRATION_PERMISSION_CLASSES`` + A tuple of importable class paths whose instances are instantiated and called as + ``instance.has_permission(request) -> bool``. All classes must pass (AND logic). + + Default: ``("oauth2_provider.dcr.IsAuthenticatedDCRPermission",)`` + + Built-in classes: + + * ``oauth2_provider.dcr.IsAuthenticatedDCRPermission`` — requires Django session authentication. + * ``oauth2_provider.dcr.AllowAllDCRPermission`` — open registration; no authentication required. + +``DCR_REGISTRATION_SCOPE`` + The scope string stored on the registration ``AccessToken`` used to protect the RFC 7592 + management endpoints. + + Default: ``"oauth2_provider:registration"`` + +``DCR_REGISTRATION_TOKEN_EXPIRE_SECONDS`` + Number of seconds until the registration access token expires, or ``None`` for a + far-future expiry (year 9999, effectively non-expiring). + + Default: ``None`` + +``DCR_ROTATE_REGISTRATION_TOKEN_ON_UPDATE`` + When ``True``, a PUT request to the management endpoint revokes the current registration + access token and issues a new one, returning it in the response. + + Default: ``True`` + + +Examples +-------- + +Open registration (no auth required): + +.. code-block:: python + + OAUTH2_PROVIDER = { + "DCR_REGISTRATION_PERMISSION_CLASSES": ("oauth2_provider.dcr.AllowAllDCRPermission",), + } + +Custom permission class (e.g. initial-access token): + +.. code-block:: python + + # myapp/permissions.py + class InitialAccessTokenPermission: + def has_permission(self, request) -> bool: + token = request.META.get("HTTP_AUTHORIZATION", "").removeprefix("Bearer ").strip() + return MyInitialToken.objects.filter(token=token, active=True).exists() + + # settings.py + OAUTH2_PROVIDER = { + "DCR_REGISTRATION_PERMISSION_CLASSES": ("myapp.permissions.InitialAccessTokenPermission",), + } + +Smoke test with ``curl``: + +.. code-block:: bash + + # Register (open mode) + curl -X POST https://example.com/o/register/ \\ + -H "Content-Type: application/json" \\ + -d '{"redirect_uris":["https://app.example.com/cb"],"grant_types":["authorization_code"]}' + + # Read configuration + curl https://example.com/o/register/{client_id}/ \\ + -H "Authorization: Bearer {registration_access_token}" diff --git a/docs/views/views.rst b/docs/views/views.rst index 262f9d20a..61a2e7ffd 100644 --- a/docs/views/views.rst +++ b/docs/views/views.rst @@ -11,3 +11,4 @@ Django OAuth Toolkit provides a set of pre-defined views for different purposes: application token mixins + dynamic_client_registration diff --git a/oauth2_provider/dcr.py b/oauth2_provider/dcr.py new file mode 100644 index 000000000..6e7bdccbe --- /dev/null +++ b/oauth2_provider/dcr.py @@ -0,0 +1,20 @@ +""" +Permission classes for the Dynamic Client Registration endpoint (RFC 7591). + +Each class must implement ``has_permission(request) -> bool``. +Configure via ``OAUTH2_PROVIDER["DCR_REGISTRATION_PERMISSION_CLASSES"]``. +""" + + +class IsAuthenticatedDCRPermission: + """Allow registration only to session-authenticated users (default).""" + + def has_permission(self, request) -> bool: + return bool(request.user and request.user.is_authenticated) + + +class AllowAllDCRPermission: + """Allow registration to anyone (open registration).""" + + def has_permission(self, request) -> bool: + return True diff --git a/oauth2_provider/settings.py b/oauth2_provider/settings.py index 216f36ba8..c100553ed 100644 --- a/oauth2_provider/settings.py +++ b/oauth2_provider/settings.py @@ -123,6 +123,12 @@ "ALWAYS_RELOAD_OAUTHLIB_CORE": False, "CLEAR_EXPIRED_TOKENS_BATCH_SIZE": 10000, "CLEAR_EXPIRED_TOKENS_BATCH_INTERVAL": 0, + # Dynamic Client Registration (RFC 7591/7592) + "DCR_ENABLED": False, + "DCR_REGISTRATION_PERMISSION_CLASSES": ("oauth2_provider.dcr.IsAuthenticatedDCRPermission",), + "DCR_REGISTRATION_SCOPE": "oauth2_provider:registration", + "DCR_REGISTRATION_TOKEN_EXPIRE_SECONDS": None, # None = year 9999 (no expiry) + "DCR_ROTATE_REGISTRATION_TOKEN_ON_UPDATE": True, } # List of settings that cannot be empty @@ -154,6 +160,7 @@ "GRANT_ADMIN_CLASS", "ID_TOKEN_ADMIN_CLASS", "REFRESH_TOKEN_ADMIN_CLASS", + "DCR_REGISTRATION_PERMISSION_CLASSES", ) diff --git a/oauth2_provider/urls.py b/oauth2_provider/urls.py index ea974e045..7e1cc2065 100644 --- a/oauth2_provider/urls.py +++ b/oauth2_provider/urls.py @@ -57,5 +57,13 @@ path("logout/", views.RPInitiatedLogoutView.as_view(), name="rp-initiated-logout"), ] +dcr_urlpatterns = [ + path("register/", views.DynamicClientRegistrationView.as_view(), name="dcr-register"), + path( + "register//", + views.DynamicClientRegistrationManagementView.as_view(), + name="dcr-register-management", + ), +] -urlpatterns = base_urlpatterns + management_urlpatterns + oidc_urlpatterns +urlpatterns = base_urlpatterns + management_urlpatterns + oidc_urlpatterns + dcr_urlpatterns diff --git a/oauth2_provider/views/__init__.py b/oauth2_provider/views/__init__.py index 24022f55e..781a40fc8 100644 --- a/oauth2_provider/views/__init__.py +++ b/oauth2_provider/views/__init__.py @@ -18,3 +18,7 @@ from .oidc import ConnectDiscoveryInfoView, JwksInfoView, RPInitiatedLogoutView, UserInfoView from .token import AuthorizedTokenDeleteView, AuthorizedTokensListView from .device import DeviceAuthorizationView, DeviceUserCodeView, DeviceConfirmView, DeviceGrantStatusView +from .dynamic_client_registration import ( + DynamicClientRegistrationView, + DynamicClientRegistrationManagementView, +) diff --git a/oauth2_provider/views/dynamic_client_registration.py b/oauth2_provider/views/dynamic_client_registration.py new file mode 100644 index 000000000..5751116cc --- /dev/null +++ b/oauth2_provider/views/dynamic_client_registration.py @@ -0,0 +1,342 @@ +""" +Views implementing OAuth 2.0 Dynamic Client Registration Protocol. + +RFC 7591 — POST /register/ +RFC 7592 — GET/PUT/DELETE /register/{client_id}/ +""" + +import json +import logging +from datetime import datetime, timedelta, timezone as dt_timezone + +from django.http import JsonResponse +from django.urls import reverse +from django.utils import timezone +from django.utils.decorators import method_decorator +from django.views import View +from django.views.decorators.csrf import csrf_exempt + +from ..compat import login_not_required +from ..models import get_access_token_model, get_application_model +from ..settings import oauth2_settings + + +logger = logging.getLogger(__name__) + +# RFC 7591 grant type name → DOT AbstractApplication constant +GRANT_TYPE_MAP = { + "authorization_code": "authorization-code", + "implicit": "implicit", + "password": "password", + "client_credentials": "client-credentials", + "urn:ietf:params:oauth:grant-type:device_code": "urn:ietf:params:oauth:grant-type:device_code", +} + +# Grant types that are handled automatically by DOT alongside authorization_code +IGNORED_GRANT_TYPES = {"refresh_token"} + + +def _error_response(error, description, status=400): + return JsonResponse({"error": error, "error_description": description}, status=status) + + +def _check_permissions(request): + """Run all DCR_REGISTRATION_PERMISSION_CLASSES; return True if all pass.""" + permission_classes = oauth2_settings.DCR_REGISTRATION_PERMISSION_CLASSES + for cls in permission_classes: + instance = cls() + if not instance.has_permission(request): + return False + return True + + +def _parse_metadata(body): + """ + Parse JSON body and return (data_dict, error_response). + + Returns (None, JsonResponse) on parse failure, (dict, None) on success. + """ + try: + data = json.loads(body) + except (json.JSONDecodeError, ValueError): + return None, _error_response("invalid_client_metadata", "Request body must be valid JSON") + if not isinstance(data, dict): + return None, _error_response("invalid_client_metadata", "Request body must be a JSON object") + return data, None + + +def _resolve_grant_type(grant_types): + """ + Resolve RFC 7591 grant_types list to a single DOT grant type constant. + + Returns (dot_grant_type, error_response). + """ + if not grant_types: + return None, _error_response("invalid_client_metadata", "grant_types must not be empty") + + meaningful = [g for g in grant_types if g not in IGNORED_GRANT_TYPES] + + if not meaningful: + # Only refresh_token (or empty after filtering) is invalid + return None, _error_response( + "invalid_client_metadata", + "grant_types must contain at least one grant type other than refresh_token", + ) + + if len(meaningful) > 1: + return None, _error_response( + "invalid_client_metadata", + "DOT only supports one grant type per application; " + "multiple non-refresh_token grant types are not supported", + ) + + grant_type = meaningful[0] + dot_grant = GRANT_TYPE_MAP.get(grant_type) + if dot_grant is None: + return None, _error_response( + "invalid_client_metadata", + f"Unsupported grant_type: {grant_type!r}", + ) + return dot_grant, None + + +def _build_application_kwargs(data): + """ + Convert RFC 7591 metadata dict to Application field kwargs. + + Returns (kwargs_dict, error_response). + """ + kwargs = {} + + # redirect_uris + redirect_uris = data.get("redirect_uris", []) + if not isinstance(redirect_uris, list): + return None, _error_response("invalid_client_metadata", "redirect_uris must be an array") + kwargs["redirect_uris"] = " ".join(redirect_uris) + + # client_name + if "client_name" in data: + kwargs["name"] = data["client_name"] + + # grant_types → authorization_grant_type + grant_types = data.get("grant_types", ["authorization_code"]) + if not isinstance(grant_types, list): + return None, _error_response("invalid_client_metadata", "grant_types must be an array") + + dot_grant, err = _resolve_grant_type(grant_types) + if err: + return None, err + kwargs["authorization_grant_type"] = dot_grant + + # token_endpoint_auth_method → client_type + auth_method = data.get("token_endpoint_auth_method", "client_secret_basic") + if auth_method == "none": + kwargs["client_type"] = "public" + else: + kwargs["client_type"] = "confidential" + + return kwargs, None + + +def _issue_registration_token(application, user): + """ + Create and return a new registration AccessToken for *application*. + + Token scope is ``oauth2_settings.DCR_REGISTRATION_SCOPE``. + Expiry: far-future (year 9999) when ``DCR_REGISTRATION_TOKEN_EXPIRE_SECONDS`` is None, + otherwise ``now + DCR_REGISTRATION_TOKEN_EXPIRE_SECONDS`` seconds. + """ + from ..generators import generate_client_secret # reuse secret-quality token generator + + AccessToken = get_access_token_model() + + expire_seconds = oauth2_settings.DCR_REGISTRATION_TOKEN_EXPIRE_SECONDS + if expire_seconds is None: + expires = datetime(9999, 12, 31, 23, 59, 59, tzinfo=dt_timezone.utc) + else: + expires = timezone.now() + timedelta(seconds=expire_seconds) + + token = AccessToken.objects.create( + application=application, + user=user, + token=generate_client_secret(), + expires=expires, + scope=oauth2_settings.DCR_REGISTRATION_SCOPE, + ) + return token + + +def _application_to_response(application, registration_token, request): + """Build the RFC 7591 response dict for *application*.""" + data = { + "client_id": application.client_id, + "redirect_uris": application.redirect_uris.split() if application.redirect_uris else [], + "grant_types": _dot_grant_to_rfc_grant_types(application.authorization_grant_type), + "token_endpoint_auth_method": ( + "none" if application.client_type == "public" else "client_secret_basic" + ), + "registration_access_token": registration_token.token, + "registration_client_uri": request.build_absolute_uri( + reverse("oauth2_provider:dcr-register-management", kwargs={"client_id": application.client_id}) + ), + } + if application.name: + data["client_name"] = application.name + return data + + +def _dot_grant_to_rfc_grant_types(dot_grant): + """Return the RFC 7591 grant_types list for a DOT grant type constant.""" + reverse_map = {v: k for k, v in GRANT_TYPE_MAP.items()} + rfc_grant = reverse_map.get(dot_grant, dot_grant) + # For authorization_code, also surface refresh_token per RFC 7591 convention + result = [rfc_grant] + if dot_grant == "authorization-code": + result.append("refresh_token") + return result + + +@method_decorator(csrf_exempt, name="dispatch") +@method_decorator(login_not_required, name="dispatch") +class DynamicClientRegistrationView(View): + """ + RFC 7591 — Dynamic Client Registration endpoint. + + POST /register/ + """ + + def dispatch(self, request, *args, **kwargs): + if not oauth2_settings.DCR_ENABLED: + return JsonResponse({"error": "not_found"}, status=404) + return super().dispatch(request, *args, **kwargs) + + def post(self, request, *args, **kwargs): + # Permission check + if not _check_permissions(request): + return _error_response( + "access_denied", + "Authentication required to register a client", + status=401, + ) + + data, err = _parse_metadata(request.body) + if err: + return err + + app_kwargs, err = _build_application_kwargs(data) + if err: + return err + + Application = get_application_model() + user = request.user if request.user.is_authenticated else None + application = Application(user=user, **app_kwargs) + + # Capture the raw secret before save() hashes it + raw_secret = application.client_secret if application.client_type == "confidential" else None + + try: + application.full_clean() + except Exception as exc: + return _error_response("invalid_client_metadata", str(exc)) + + application.save() + + registration_token = _issue_registration_token(application, user) + response_data = _application_to_response(application, registration_token, request) + if raw_secret: + response_data["client_secret"] = raw_secret + + return JsonResponse(response_data, status=201) + + +@method_decorator(csrf_exempt, name="dispatch") +@method_decorator(login_not_required, name="dispatch") +class DynamicClientRegistrationManagementView(View): + """ + RFC 7592 — Client Configuration Endpoint. + + GET/PUT/DELETE /register/{client_id}/ + """ + + def dispatch(self, request, *args, **kwargs): + if not oauth2_settings.DCR_ENABLED: + return JsonResponse({"error": "not_found"}, status=404) + return super().dispatch(request, *args, **kwargs) + + def _get_application_from_registration_token(self, request, client_id): + """ + Validate Bearer token, check scope, check client_id match. + + Returns (application, registration_token) or (None, error_response). + """ + auth_header = request.META.get("HTTP_AUTHORIZATION", "") + if not auth_header.startswith("Bearer "): + return None, _error_response( + "invalid_token", + "Registration access token required", + status=401, + ) + + raw_token = auth_header[len("Bearer "):] + AccessToken = get_access_token_model() + try: + token = AccessToken.objects.get(token=raw_token) + except AccessToken.DoesNotExist: + return None, _error_response("invalid_token", "Invalid registration access token", status=401) + + if not token.is_valid([oauth2_settings.DCR_REGISTRATION_SCOPE]): + return None, _error_response("invalid_token", "Registration access token is expired or invalid", status=401) + + application = token.application + if application is None or application.client_id != client_id: + return None, _error_response("invalid_token", "Token does not match client_id", status=403) + + return application, token + + def get(self, request, client_id, *args, **kwargs): + application, result = self._get_application_from_registration_token(request, client_id) + if application is None: + return result # error response + + registration_token = result + return JsonResponse(_application_to_response(application, registration_token, request)) + + def put(self, request, client_id, *args, **kwargs): + application, result = self._get_application_from_registration_token(request, client_id) + if application is None: + return result + + registration_token = result + + data, err = _parse_metadata(request.body) + if err: + return err + + app_kwargs, err = _build_application_kwargs(data) + if err: + return err + + for field, value in app_kwargs.items(): + setattr(application, field, value) + + try: + application.full_clean() + except Exception as exc: + return _error_response("invalid_client_metadata", str(exc)) + + application.save() + + if oauth2_settings.DCR_ROTATE_REGISTRATION_TOKEN_ON_UPDATE: + registration_token.delete() + user = application.user + registration_token = _issue_registration_token(application, user) + + return JsonResponse(_application_to_response(application, registration_token, request)) + + def delete(self, request, client_id, *args, **kwargs): + application, result = self._get_application_from_registration_token(request, client_id) + if application is None: + return result + + application.delete() + return JsonResponse({}, status=204) diff --git a/tests/presets.py b/tests/presets.py index 4538c64eb..9d7a4ea07 100644 --- a/tests/presets.py +++ b/tests/presets.py @@ -65,3 +65,16 @@ ALLOWED_SCHEMES_HTTP = { "ALLOWED_SCHEMES": ["https", "http"], } + +DCR_SETTINGS = { + "SCOPES": { + "read": "Read scope", + "write": "Write scope", + }, + "READ_SCOPE": "read", + "WRITE_SCOPE": "write", + "DCR_ENABLED": True, + "DCR_REGISTRATION_PERMISSION_CLASSES": ("oauth2_provider.dcr.IsAuthenticatedDCRPermission",), + "DCR_REGISTRATION_SCOPE": "oauth2_provider:registration", + "ALLOWED_REDIRECT_URI_SCHEMES": ["https", "http"], +} diff --git a/tests/test_dcr_views.py b/tests/test_dcr_views.py new file mode 100644 index 000000000..4b6a85a2e --- /dev/null +++ b/tests/test_dcr_views.py @@ -0,0 +1,515 @@ +""" +Tests for Dynamic Client Registration views (RFC 7591 / RFC 7592). +""" + +import json + +import pytest +from django.contrib.auth import get_user_model +from django.urls import reverse + +from oauth2_provider.models import get_access_token_model, get_application_model +from oauth2_provider.settings import oauth2_settings + +from . import presets +from .common_testing import OAuth2ProviderTestCase as TestCase + + +UserModel = get_user_model() +Application = get_application_model() +AccessToken = get_access_token_model() + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _register_url(): + return reverse("oauth2_provider:dcr-register") + + +def _post_register(client, data, **kwargs): + return client.post( + _register_url(), + data=json.dumps(data), + content_type="application/json", + **kwargs, + ) + + +def _management_url(client_id): + return reverse("oauth2_provider:dcr-register-management", kwargs={"client_id": client_id}) + + +def _bearer(token): + return {"HTTP_AUTHORIZATION": f"Bearer {token}"} + + +# --------------------------------------------------------------------------- +# RFC 7591 — Registration endpoint tests +# --------------------------------------------------------------------------- + + +@pytest.mark.usefixtures("oauth2_settings") +@pytest.mark.oauth2_settings(presets.DCR_SETTINGS) +class TestDynamicClientRegistration(TestCase): + def setUp(self): + self.user = UserModel.objects.create_user("dcr_user", "dcr@example.com", "pass") + + # -- success cases ------------------------------------------------------- + + def test_register_minimal_authenticated(self): + """POST with minimal valid metadata by an authenticated user → 201.""" + self.client.force_login(self.user) + data = { + "redirect_uris": ["https://example.com/cb"], + "grant_types": ["authorization_code"], + } + response = _post_register(self.client, data) + assert response.status_code == 201 + body = response.json() + assert "client_id" in body + assert "registration_access_token" in body + assert "registration_client_uri" in body + assert body["grant_types"] == ["authorization_code", "refresh_token"] + assert Application.objects.filter(client_id=body["client_id"]).exists() + + def test_register_with_client_name(self): + """client_name is mapped to Application.name.""" + self.client.force_login(self.user) + data = { + "redirect_uris": ["https://example.com/cb"], + "grant_types": ["authorization_code"], + "client_name": "My Test App", + } + response = _post_register(self.client, data) + assert response.status_code == 201 + body = response.json() + assert body["client_name"] == "My Test App" + app = Application.objects.get(client_id=body["client_id"]) + assert app.name == "My Test App" + + def test_register_public_client(self): + """token_endpoint_auth_method=none → client_type=public.""" + self.client.force_login(self.user) + data = { + "redirect_uris": ["https://example.com/cb"], + "grant_types": ["authorization_code"], + "token_endpoint_auth_method": "none", + } + response = _post_register(self.client, data) + assert response.status_code == 201 + body = response.json() + assert body["token_endpoint_auth_method"] == "none" + app = Application.objects.get(client_id=body["client_id"]) + assert app.client_type == Application.CLIENT_PUBLIC + + def test_register_confidential_client(self): + """token_endpoint_auth_method=client_secret_basic → client_type=confidential.""" + self.client.force_login(self.user) + data = { + "redirect_uris": ["https://example.com/cb"], + "grant_types": ["authorization_code"], + "token_endpoint_auth_method": "client_secret_basic", + } + response = _post_register(self.client, data) + assert response.status_code == 201 + body = response.json() + assert body["token_endpoint_auth_method"] == "client_secret_basic" + assert "client_secret" in body + app = Application.objects.get(client_id=body["client_id"]) + assert app.client_type == Application.CLIENT_CONFIDENTIAL + + def test_register_authorization_code_with_refresh_token(self): + """[authorization_code, refresh_token] → maps cleanly, refresh_token ignored.""" + self.client.force_login(self.user) + data = { + "redirect_uris": ["https://example.com/cb"], + "grant_types": ["authorization_code", "refresh_token"], + } + response = _post_register(self.client, data) + assert response.status_code == 201 + body = response.json() + app = Application.objects.get(client_id=body["client_id"]) + assert app.authorization_grant_type == Application.GRANT_AUTHORIZATION_CODE + + def test_register_client_credentials(self): + """client_credentials grant type.""" + self.client.force_login(self.user) + data = { + "grant_types": ["client_credentials"], + } + response = _post_register(self.client, data) + assert response.status_code == 201 + body = response.json() + app = Application.objects.get(client_id=body["client_id"]) + assert app.authorization_grant_type == Application.GRANT_CLIENT_CREDENTIALS + + def test_response_includes_registration_token_and_uri(self): + """Registration response includes registration_access_token and registration_client_uri.""" + self.client.force_login(self.user) + data = {"redirect_uris": ["https://example.com/cb"], "grant_types": ["authorization_code"]} + response = _post_register(self.client, data) + assert response.status_code == 201 + body = response.json() + assert body["registration_access_token"] + assert body["registration_client_uri"].endswith(f"/o/register/{body['client_id']}/") + + # -- auth failures ------------------------------------------------------- + + def test_register_unauthenticated_is_401(self): + """Unauthenticated POST when IsAuthenticatedDCRPermission is active → 401.""" + data = {"redirect_uris": ["https://example.com/cb"], "grant_types": ["authorization_code"]} + response = _post_register(self.client, data) + assert response.status_code == 401 + assert response.json()["error"] == "access_denied" + + # -- validation failures ------------------------------------------------- + + def test_register_multiple_grant_types_is_400(self): + """Multiple non-refresh_token grant types → 400.""" + self.client.force_login(self.user) + data = { + "redirect_uris": ["https://example.com/cb"], + "grant_types": ["authorization_code", "implicit"], + } + response = _post_register(self.client, data) + assert response.status_code == 400 + body = response.json() + assert body["error"] == "invalid_client_metadata" + + def test_register_only_refresh_token_is_400(self): + """grant_types=[refresh_token] only → 400.""" + self.client.force_login(self.user) + data = { + "redirect_uris": ["https://example.com/cb"], + "grant_types": ["refresh_token"], + } + response = _post_register(self.client, data) + assert response.status_code == 400 + assert response.json()["error"] == "invalid_client_metadata" + + def test_register_invalid_redirect_uri_is_400(self): + """Invalid redirect_uri → 400.""" + self.client.force_login(self.user) + data = { + "redirect_uris": ["not-a-valid-uri!"], + "grant_types": ["authorization_code"], + } + response = _post_register(self.client, data) + assert response.status_code == 400 + assert response.json()["error"] == "invalid_client_metadata" + + def test_register_invalid_json_is_400(self): + """Non-JSON body → 400.""" + self.client.force_login(self.user) + response = self.client.post( + _register_url(), data="not-json", content_type="application/json" + ) + assert response.status_code == 400 + assert response.json()["error"] == "invalid_client_metadata" + + +# --------------------------------------------------------------------------- +# Open registration (AllowAllDCRPermission) +# --------------------------------------------------------------------------- + + +@pytest.mark.usefixtures("oauth2_settings") +@pytest.mark.oauth2_settings( + { + **presets.DCR_SETTINGS, + "DCR_REGISTRATION_PERMISSION_CLASSES": ("oauth2_provider.dcr.AllowAllDCRPermission",), + } +) +class TestOpenRegistration(TestCase): + def test_register_without_auth_succeeds(self): + """AllowAllDCRPermission → unauthenticated POST → 201.""" + data = {"redirect_uris": ["https://example.com/cb"], "grant_types": ["authorization_code"]} + response = _post_register(self.client, data) + assert response.status_code == 201 + body = response.json() + assert "client_id" in body + # user should be None on the application + app = Application.objects.get(client_id=body["client_id"]) + assert app.user is None + + +# --------------------------------------------------------------------------- +# RFC 7592 — Management endpoint tests +# --------------------------------------------------------------------------- + + +@pytest.mark.usefixtures("oauth2_settings") +@pytest.mark.oauth2_settings(presets.DCR_SETTINGS) +class TestDynamicClientRegistrationManagement(TestCase): + def setUp(self): + self.user = UserModel.objects.create_user("mgmt_user", "mgmt@example.com", "pass") + self.client.force_login(self.user) + # Register a client to use in management tests + data = { + "redirect_uris": ["https://example.com/cb"], + "grant_types": ["authorization_code"], + "client_name": "Managed App", + } + response = _post_register(self.client, data) + assert response.status_code == 201 + body = response.json() + self.client_id = body["client_id"] + self.registration_token = body["registration_access_token"] + self.management_url = _management_url(self.client_id) + self.client.logout() + + # -- GET ----------------------------------------------------------------- + + def test_get_returns_current_config(self): + """GET with valid token → 200 with current config.""" + response = self.client.get(self.management_url, **_bearer(self.registration_token)) + assert response.status_code == 200 + body = response.json() + assert body["client_id"] == self.client_id + assert body["client_name"] == "Managed App" + assert "https://example.com/cb" in body["redirect_uris"] + + def test_get_wrong_token_is_401(self): + """GET without token → 401.""" + response = self.client.get(self.management_url) + assert response.status_code == 401 + + def test_get_token_wrong_client_is_403(self): + """GET with token for a different client → 403.""" + # Create a second application with its own token + self.client.force_login(self.user) + data2 = {"redirect_uris": ["https://other.com/cb"], "grant_types": ["authorization_code"]} + r2 = _post_register(self.client, data2) + other_token = r2.json()["registration_access_token"] + self.client.logout() + + response = self.client.get(self.management_url, **_bearer(other_token)) + assert response.status_code == 403 + + # -- PUT ----------------------------------------------------------------- + + def test_put_updates_application(self): + """PUT → updates Application fields.""" + update_data = { + "redirect_uris": ["https://updated.example.com/cb"], + "grant_types": ["authorization_code"], + "client_name": "Updated App", + } + response = self.client.put( + self.management_url, + data=json.dumps(update_data), + content_type="application/json", + **_bearer(self.registration_token), + ) + assert response.status_code == 200 + body = response.json() + assert body["client_name"] == "Updated App" + assert "https://updated.example.com/cb" in body["redirect_uris"] + app = Application.objects.get(client_id=self.client_id) + assert app.name == "Updated App" + + def test_put_rotates_token_by_default(self): + """PUT with DCR_ROTATE_REGISTRATION_TOKEN_ON_UPDATE=True → new token issued.""" + update_data = { + "redirect_uris": ["https://example.com/cb"], + "grant_types": ["authorization_code"], + } + response = self.client.put( + self.management_url, + data=json.dumps(update_data), + content_type="application/json", + **_bearer(self.registration_token), + ) + assert response.status_code == 200 + body = response.json() + new_token = body["registration_access_token"] + assert new_token != self.registration_token + # Old token should be gone + assert not AccessToken.objects.filter(token=self.registration_token).exists() + # New token should exist + assert AccessToken.objects.filter(token=new_token).exists() + + def test_put_no_rotate_keeps_token(self): + """PUT with DCR_ROTATE_REGISTRATION_TOKEN_ON_UPDATE=False → same token.""" + self.oauth2_settings.DCR_ROTATE_REGISTRATION_TOKEN_ON_UPDATE = False + update_data = { + "redirect_uris": ["https://example.com/cb"], + "grant_types": ["authorization_code"], + } + response = self.client.put( + self.management_url, + data=json.dumps(update_data), + content_type="application/json", + **_bearer(self.registration_token), + ) + assert response.status_code == 200 + body = response.json() + assert body["registration_access_token"] == self.registration_token + + # -- DELETE -------------------------------------------------------------- + + def test_delete_removes_application(self): + """DELETE → 204, application deleted.""" + response = self.client.delete(self.management_url, **_bearer(self.registration_token)) + assert response.status_code == 204 + assert not Application.objects.filter(client_id=self.client_id).exists() + # Registration token should also be gone (cascade) + assert not AccessToken.objects.filter(token=self.registration_token).exists() + + +# --------------------------------------------------------------------------- +# Settings coverage +# --------------------------------------------------------------------------- + + +@pytest.mark.usefixtures("oauth2_settings") +@pytest.mark.oauth2_settings( + { + **presets.DCR_SETTINGS, + "DCR_REGISTRATION_PERMISSION_CLASSES": ("oauth2_provider.dcr.AllowAllDCRPermission",), + "DCR_REGISTRATION_SCOPE": "my:custom:scope", + } +) +class TestDCRCustomScope(TestCase): + def test_custom_scope_on_registration_token(self): + """DCR_REGISTRATION_SCOPE custom value → management token uses custom scope.""" + data = {"redirect_uris": ["https://example.com/cb"], "grant_types": ["authorization_code"]} + response = _post_register(self.client, data) + assert response.status_code == 201 + body = response.json() + token = AccessToken.objects.get(token=body["registration_access_token"]) + assert token.scope == "my:custom:scope" + + +@pytest.mark.usefixtures("oauth2_settings") +@pytest.mark.oauth2_settings( + { + **presets.DCR_SETTINGS, + "DCR_REGISTRATION_PERMISSION_CLASSES": ("oauth2_provider.dcr.AllowAllDCRPermission",), + "DCR_REGISTRATION_TOKEN_EXPIRE_SECONDS": 3600, + } +) +class TestDCRTokenExpiry(TestCase): + def test_token_expires_after_set_seconds(self): + """DCR_REGISTRATION_TOKEN_EXPIRE_SECONDS=3600 → token expires ~1 hour from now.""" + from django.utils import timezone + + data = {"redirect_uris": ["https://example.com/cb"], "grant_types": ["authorization_code"]} + response = _post_register(self.client, data) + assert response.status_code == 201 + body = response.json() + token = AccessToken.objects.get(token=body["registration_access_token"]) + delta = (token.expires - timezone.now()).total_seconds() + # Should be close to 3600 seconds (within 30s tolerance) + assert 3570 <= delta <= 3630 + + def test_token_no_expire_is_far_future(self): + """DCR_REGISTRATION_TOKEN_EXPIRE_SECONDS=None → expiry is year 9999.""" + # Use the default DCR_SETTINGS (None expiry) + self.oauth2_settings.DCR_REGISTRATION_TOKEN_EXPIRE_SECONDS = None + data = {"redirect_uris": ["https://example.com/cb"], "grant_types": ["authorization_code"]} + response = _post_register(self.client, data) + assert response.status_code == 201 + body = response.json() + token = AccessToken.objects.get(token=body["registration_access_token"]) + assert token.expires.year == 9999 + + +@pytest.mark.usefixtures("oauth2_settings") +@pytest.mark.oauth2_settings(presets.DCR_SETTINGS) +class TestDCRCustomPermissionClass(TestCase): + def test_custom_permission_class_applied(self): + """DCR_REGISTRATION_PERMISSION_CLASSES with always-deny class → 401.""" + from unittest.mock import patch + + with patch( + "oauth2_provider.views.dynamic_client_registration._check_permissions", + return_value=False, + ): + data = { + "redirect_uris": ["https://example.com/cb"], + "grant_types": ["authorization_code"], + } + response = _post_register(self.client, data) + assert response.status_code == 401 + + +# --------------------------------------------------------------------------- +# DCR_ENABLED=False — endpoints return 404 +# --------------------------------------------------------------------------- + + +@pytest.mark.usefixtures("oauth2_settings") +@pytest.mark.oauth2_settings({**presets.DCR_SETTINGS, "DCR_ENABLED": False}) +class TestDCRDisabled(TestCase): + def test_register_returns_404_when_disabled(self): + response = self.client.post( + _register_url(), + data=json.dumps({"redirect_uris": ["https://example.com/cb"], "grant_types": ["authorization_code"]}), + content_type="application/json", + ) + assert response.status_code == 404 + + def test_management_returns_404_when_disabled(self): + response = self.client.get(_management_url("any-client-id")) + assert response.status_code == 404 + + +# --------------------------------------------------------------------------- +# Full roundtrip test +# --------------------------------------------------------------------------- + + +@pytest.mark.usefixtures("oauth2_settings") +@pytest.mark.oauth2_settings( + { + **presets.DCR_SETTINGS, + "DCR_REGISTRATION_PERMISSION_CLASSES": ("oauth2_provider.dcr.AllowAllDCRPermission",), + "DCR_ROTATE_REGISTRATION_TOKEN_ON_UPDATE": True, + } +) +class TestDCRFullRoundtrip(TestCase): + def test_register_get_put_delete(self): + """Full roundtrip: register → GET → PUT → DELETE.""" + # 1. Register + reg_data = { + "redirect_uris": ["https://example.com/cb"], + "grant_types": ["authorization_code"], + "client_name": "Roundtrip App", + } + reg_response = _post_register(self.client, reg_data) + assert reg_response.status_code == 201 + reg_body = reg_response.json() + client_id = reg_body["client_id"] + token = reg_body["registration_access_token"] + mgmt_url = _management_url(client_id) + + # 2. GET + get_response = self.client.get(mgmt_url, **_bearer(token)) + assert get_response.status_code == 200 + assert get_response.json()["client_name"] == "Roundtrip App" + + # 3. PUT + put_data = { + "redirect_uris": ["https://updated.example.com/cb"], + "grant_types": ["authorization_code"], + "client_name": "Updated Roundtrip App", + } + put_response = self.client.put( + mgmt_url, + data=json.dumps(put_data), + content_type="application/json", + **_bearer(token), + ) + assert put_response.status_code == 200 + put_body = put_response.json() + new_token = put_body["registration_access_token"] + assert new_token != token # token was rotated + assert put_body["client_name"] == "Updated Roundtrip App" + + # 4. DELETE (use new token) + delete_response = self.client.delete(mgmt_url, **_bearer(new_token)) + assert delete_response.status_code == 204 + assert not Application.objects.filter(client_id=client_id).exists()