Skip to content

Commit

Permalink
Added initial mitol-django-scim app
Browse files Browse the repository at this point in the history
  • Loading branch information
rhysyngsun committed Feb 26, 2025
1 parent fbdbe19 commit 2f6ae25
Show file tree
Hide file tree
Showing 34 changed files with 1,832 additions and 4 deletions.
8 changes: 4 additions & 4 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -292,21 +292,21 @@
"filename": "testapp/main/settings/shared.py",
"hashed_secret": "8f2581750096043a1c68bedea8cfa6e13ad1a2e4",
"is_verified": false,
"line_number": 40
"line_number": 41
},
{
"type": "Basic Auth Credentials",
"filename": "testapp/main/settings/shared.py",
"hashed_secret": "afc848c316af1a89d49826c5ae9d00ed769415f3",
"is_verified": false,
"line_number": 120
"line_number": 123
},
{
"type": "Secret Keyword",
"filename": "testapp/main/settings/shared.py",
"hashed_secret": "9bc34549d565d9505b287de0cd20ac77be1d3f2c",
"is_verified": false,
"line_number": 200
"line_number": 203
}
],
"testapp/main/settings/test.py": [
Expand All @@ -319,5 +319,5 @@
}
]
},
"generated_at": "2025-02-26T21:06:50Z"
"generated_at": "2025-02-26T18:42:24Z"
}
9 changes: 9 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies = [
"mitol-django-payment_gateway",
"mitol-django-uvtestapp",
"mitol-django-transcoding",
"mitol-django-scim",
]
readme = "README.md"
requires-python = ">= 3.10"
Expand All @@ -33,11 +34,13 @@ build-backend = "hatchling.build"
managed = true
dev-dependencies = [
"GitPython",
"anys>=0.3.1",
"bumpver",
"click",
"click-log",
"cloup",
"coverage<=7.6.1",
"deepmerge>=2.0",
"dj-database-url",
"django-stubs[compatible-mypy]",
"factory-boy~=3.2",
Expand Down Expand Up @@ -85,6 +88,7 @@ mitol-django-openedx = { workspace = true }
mitol-django-payment_gateway = { workspace = true }
mitol-django-uvtestapp = { workspace = true }
mitol-django-transcoding = { workspace = true }
mitol-django-scim = { workspace = true }

[tool.hatch.build.targets.sdist]
include = ["CHANGELOG.md", "README.md", "py.typed", "**/*.py"]
Expand Down Expand Up @@ -216,6 +220,11 @@ convention = "pep257"
[tool.ruff.lint.flake8-quotes]
inline-quotes = "double"


[tool.ruff.lint.flake8-tidy-imports.banned-api]
"django.contrib.auth.models.User".msg = "use get_user_model() or settings.AUTH_USER_MODEL"


[tool.ruff.lint.per-file-ignores]
"tests/**" = ["S101"]
"test_*.py" = ["S101"]
Expand Down
7 changes: 7 additions & 0 deletions src/scim/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Changelog
All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project uses date-based versioning.

<!-- scriv-insert-here -->
36 changes: 36 additions & 0 deletions src/scim/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
## SCIM

## Prerequisites

- You need the following a local [Keycloak](https://www.keycloak.org/) instance running. Note which major version you are running (should be at least 26.x).
- You should have custom user profile fields setup on your `olapps` realm:
- `fullName`: required, otherwise defaults
- `emailOptIn`: defaults

## Install the scim-for-keycloak plugin

Sign up for an account on https://scim-for-keycloak.de and follow the instructions here: https://scim-for-keycloak.de/documentation/installation/install

## Configure SCIM

In the SCIM admin console, do the following:

### Configure Remote SCIM Provider

- In django-admin, go to OAuth Toolkit and create a new access token
- Go to Remote SCIM Provider
- Click the `+` button
- Specify a base URL for your learn API backend: `http://<IP_OR_HOSTNAME>:8063/scim/v2/`
- At the bottom of the page, click "Use default configuration"
- Add a new authentication method:
- Type: Long Life Bearer Token
- Bearer Token: the access token you created above
- On the Schemas tab, edit the User schema and add these custom attributes:
- Add a `fullName` attribute and set the Custom Attribute Name to `fullName`
- Add an attribute named `emailOptIn` with the following settings:
- Type: integer
- Custom Attribute Name: `emailOptIn`
- On the Realm Assignments tab, assign to the `olapps` realm
- Go to the Synchronization tab and perform one:
- Identifier attribute: email
- Synchronization strategy: Search and Bulk
5 changes: 5 additions & 0 deletions src/scim/changelog.d/scriv.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[scriv]
format = md
md_header_level = 2
entry_title_template = file: ../../scripts/scriv/entry_title.${config:format}.j2
version = literal: __init__.py: __version__
1 change: 1 addition & 0 deletions src/scim/mitol/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__import__("pkg_resources").declare_namespace(__name__)
6 changes: 6 additions & 0 deletions src/scim/mitol/scim/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""mitol.scim"""

default_app_config = "mitol.scim.apps.ScimApp"

__version__ = "0.0.0"
__distributionname__ = "mitol-django-scim"
259 changes: 259 additions & 0 deletions src/scim/mitol/scim/adapters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
import json
import logging
from typing import Optional, Union

from django.contrib.auth import get_user_model
from django.db import transaction
from django_scim import constants
from django_scim.adapters import SCIMUser
from scim2_filter_parser.attr_paths import AttrPath

User = get_user_model()


logger = logging.getLogger(__name__)


def get_user_model_for_scim():
"""
Get function for the django_scim library configuration (USER_MODEL_GETTER).
Returns:
model: User model.
"""
return User


class LearnSCIMUser(SCIMUser):
"""
Custom adapter to extend django_scim library. This is required in order
to extend the profiles.models.Profile model to work with the
django_scim library.
"""

password_changed = False
activity_changed = False

resource_type = "User"

id_field = "scim_id"

ATTR_MAP = {
("active", None, None): "is_active",
("name", "givenName", None): "first_name",
("name", "familyName", None): "last_name",
("userName", None, None): "username",
}

IGNORED_PATHS = {
("schemas", None, None),
}

@property
def is_new_user(self):
"""_summary_
Returns:
bool: True is the user does not currently exist,
False if the user already exists.
"""
return not bool(self.obj.id)

@property
def id(self):
"""
Return the SCIM id
"""
return self.obj.scim_id

@property
def emails(self):
"""
Return the email of the user per the SCIM spec.
"""
return [{"value": self.obj.email, "primary": True}]

@property
def display_name(self):
"""
Return the displayName of the user per the SCIM spec.
"""
return self.obj.profile.name

@property
def meta(self):
"""
Return the meta object of the user per the SCIM spec.
"""
return {
"resourceType": self.resource_type,
"created": self.obj.created_on.isoformat(timespec="milliseconds"),
"lastModified": self.obj.updated_on.isoformat(timespec="milliseconds"),
"location": self.location,
}

def to_dict(self):
"""
Return a ``dict`` conforming to the SCIM User Schema,
ready for conversion to a JSON object.
"""
return {
"id": self.id,
"externalId": self.obj.scim_external_id,
"schemas": [constants.SchemaURI.USER],
"userName": self.obj.username,
"name": {
"givenName": self.obj.first_name,
"familyName": self.obj.last_name,
},
"displayName": self.display_name,
"emails": self.emails,
"active": self.obj.is_active,
"groups": [],
"meta": self.meta,
}

def from_dict(self, d):
"""
Consume a ``dict`` conforming to the SCIM User Schema, updating the
internal user object with data from the ``dict``.
Please note, the user object is not saved within this method. To
persist the changes made by this method, please call ``.save()`` on the
adapter. Eg::
scim_user.from_dict(d)
scim_user.save()
"""
self.parse_emails(d.get("emails"))

self.obj.is_active = d.get("active", True)
self.obj.username = d.get("userName")
self.obj.first_name = d.get("name", {}).get("givenName", "")
self.obj.last_name = d.get("name", {}).get("familyName", "")
self.obj.scim_username = d.get("userName")
self.obj.scim_external_id = d.get("externalId")

def _save(self):
self.obj.save()

def _save_related(self):
pass

def save(self):
"""
Save instances of the Profile and User models.
"""
with transaction.atomic():
self._save()
self._save_related()
logger.info("User saved. User id %i", self.obj.id)

def delete(self):
"""
Update User's is_active to False.
"""
self.obj.is_active = False
self.obj.save()
logger.info("Deactivated user id %i", self.obj.id)

def handle_add(
self,
path: Optional[AttrPath],
value: Union[str, list, dict],
operation: dict, # noqa: ARG002
):
"""
Handle add operations per:
https://tools.ietf.org/html/rfc7644#section-3.5.2.1
Args:
path (AttrPath)
value (Union[str, list, dict])
"""
if path is None:
return

if path.first_path == ("externalId", None, None):
self.obj.scim_external_id = value
self.obj.save()

def parse_scim_for_keycloak_payload(self, payload: str) -> dict:
"""
Parse the payload sent from scim-for-keycloak and normalize it
"""
result = {}

for key, value in json.loads(payload).items():
if key == "schema":
continue

if isinstance(value, dict):
for nested_key, nested_value in value.items():
result[self.split_path(f"{key}.{nested_key}")] = nested_value
else:
result[key] = value

return result

def parse_path_and_values(
self, path: Optional[str], value: Union[str, list, dict]
) -> list:
"""Parse the incoming value(s)"""
if isinstance(value, str):
# scim-for-keycloak sends this as a noncompliant JSON-encoded string
if path is None:
val = json.loads(value)
else:
msg = "Called with a non-null path and a str value"
raise ValueError(msg)
else:
val = value

results = []

for attr_path, attr_value in val.items():
if isinstance(attr_value, dict):
# nested object, we want to recursively flatten it to `first.second`
results.extend(self.parse_path_and_values(attr_path, attr_value))
else:
flattened_path = (
f"{path}.{attr_path}" if path is not None else attr_path
)
new_path = self.split_path(flattened_path)
new_value = attr_value
results.append((new_path, new_value))

return results

def handle_replace(
self,
path: Optional[AttrPath],
value: Union[str, list, dict],
operation: dict, # noqa: ARG002
):
"""
Handle the replace operations.
All operations happen within an atomic transaction.
"""

if not isinstance(value, dict):
# Restructure for use in loop below.
value = {path: value}

for nested_path, nested_value in (value or {}).items():
if nested_path.first_path in self.ATTR_MAP:
setattr(self.obj, self.ATTR_MAP[nested_path.first_path], nested_value)
elif nested_path.first_path == ("fullName", None, None):
self.obj.profile.name = nested_value
elif nested_path.first_path == ("emailOptIn", None, None):
self.obj.profile.email_optin = nested_value == 1
elif nested_path.first_path == ("emails", None, None):
self.parse_emails(nested_value)
elif nested_path.first_path not in self.IGNORED_PATHS:
logger.debug(
"Ignoring SCIM update for path: %s", nested_path.first_path
)

self.save()
Loading

0 comments on commit 2f6ae25

Please sign in to comment.