Skip to content

Commit

Permalink
BAC-9129: create icobridge compatible policies + docs
Browse files Browse the repository at this point in the history
Merge in BAC/icometrix-sdk from feature/align-ib-anonymization to master

Squashed commit of the following:

commit 330c8e961c5aff4256e3f839937491667d1e780a
Author: Jeroen Pinxten <jeroen.pinxten@icometrix.com>
Date:   Fri Apr 12 11:02:41 2024 +0200

    BAC-9129: typos

commit 8377b100be7f34b83a82c192264a3146597ad226
Author: Jeroen Pinxten <jeroen.pinxten@icometrix.com>
Date:   Fri Apr 12 09:47:58 2024 +0200

    BAC-9129: create icobridge compatible policies + docs
  • Loading branch information
jpinxten committed Apr 12, 2024
1 parent b2c915e commit d82e66c
Show file tree
Hide file tree
Showing 21 changed files with 399 additions and 130 deletions.
72 changes: 72 additions & 0 deletions docs/developer_guide/anonymization.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
Anonymization
=============

The :class:`~icometrix_sdk.anonymizer.anonymizer.Anonymizer` provides functionality for anonymizing DICOM datasets
according to specified policies.


Policies
--------

A Policy is a dictionary with the key bing a DICOM tag and the value a :class:`~icometrix_sdk.anonymizer.models.TagPolicy`
This defines how individual tags or groups should be anonymized.

.. code-block:: python
from icometrix_sdk.anonymizer.models import TagPolicy, Policy
from icometrix_sdk.anonymizer.hash_factory import SHA3
def replace_vl(el: DataElement, _):
el.value = "Jane^Doe"
def sha3_hash(element: DataElement, _):
element.value = SHA3(size=512).calculate_hash(element.value)[:64]
# A policy defining how tags should be anonymized
policy: Policy = {
0x00080020: TagPolicy("keep", "StudyDate"), # Don't change the StudyDate
0x0020000d: TagPolicy("hash", "StudyInstanceUID"), # Hash the StudyInstanceUID by the default hash function
0x00100010: TagPolicy("replace", "PatientName", replace_fn=replace_vl), # Replace the PatientName by "Jane^Doe"
0x00100020: TagPolicy("replace", "PatientID", replace_fn=sha3_hash), # Replace the PatientID by a sha3 hash of the PatientID
0x00100030: TagPolicy("round", "PatientBirthday"), # Round the PatientBirthday to YYYY0101
}
# A policy defining how groups should be anonymized
group_policy: Policy = {
0x0018: TagPolicy("keep", "Acquisition: mage acquisition device and imaging procedure"),
0x5200: TagPolicy("keep", "Multi-frame Functional Groups"),
}
Anonymizer
----------

The :class:`~icometrix_sdk.anonymizer.anonymizer.Anonymizer` requires 3 parameters:

- policy: a Policy for DICOM tags
- group_policy: a Policy for DICOM groups
- hash_algo: The hash algorithm you want to use when using a hash :attr:`~icometrix_sdk.anonymizer.models.TagPolicy.action`


.. code-block:: python
from icometrix_sdk.anonymizer.anonymizer import Anonymizer
from icometrix_sdk.anonymizer.hash_factory import HashFactory
from pydicom.data import get_testdata_file
hash_algo = HashFactory.create_hash_method("md5")
# You can use the policy examples above, make your own or import one:
# from icometrix_sdk.anonymizer.policy import policy, group_policy
anonymizer = Anonymizer(policy, group_policy, hash_algo)
dataset = pydicom.read_file(get_testdata_file("MR_small.dcm"))
anonymizer.anonymize(dataset).save_as("anonymized_MR_small.dcm")
Settings
--------
Some default behaviour can be overwritten by setting environment variables:

- ROOT_UID: The root of UID used when hashing DICOM tags with the VR UI (https://dicom.nema.org/dicom/2013/output/chtml/part05/chapter_B.html)
- VALIDATION_MODE: and int defining how validation should be done (0: ignore, 1:warn, 2: raise),
3 changes: 2 additions & 1 deletion docs/developer_guide/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ Developer Guide

paginators
session
models
models
anonymization
26 changes: 26 additions & 0 deletions docs/models/anonymizer.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Anonymizer
==========

.. automodule:: icometrix_sdk.anonymizer.anonymizer
:members:
:undoc-members:

.. automodule:: icometrix_sdk.anonymizer.hash_factory
:members:
:undoc-members:

.. automodule:: icometrix_sdk.anonymizer.models
:members:
:undoc-members:

.. automodule:: icometrix_sdk.anonymizer.policy
:members:
:undoc-members:

.. automodule:: icometrix_sdk.anonymizer.utils
:members:
:undoc-members:

.. automodule:: icometrix_sdk.anonymizer.exceptions
:members:
:undoc-members:
1 change: 1 addition & 0 deletions docs/models/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ Models
upload
customer_result
customer_report
anonymizer
base
9 changes: 6 additions & 3 deletions examples/anonymize.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from pathlib import Path

import pydicom
from pydicom.config import WARN
from pydicom.data import get_testdata_files

from icometrix_sdk.anonymizer.anonymizer import Anonymizer
from icometrix_sdk.anonymizer.hash_factory import HashFactory
from icometrix_sdk.anonymizer.policy import policy, group_policy
from icometrix_sdk.anonymizer.policy import policy_sha, group_policy

# set env VALIDATION_MODE to 1 or 0, there are some invalid DICOMs in this set

# These files are included in the pydicom test dataset to test failed dcmread
INVALID_FILES = [
Expand All @@ -22,8 +25,8 @@ def get_dicom_test_files():
return [x for x in all_files if Path(x).name not in INVALID_FILES]


hash_algo = HashFactory.create_hash_method("short_md5")
anon = Anonymizer(policy, group_policy, hash_algo)
hash_algo = HashFactory.create_hash_method("md5")
anon = Anonymizer(policy_sha, group_policy, hash_algo)

for file_path in get_dicom_test_files():
dataset = pydicom.dcmread(f"{file_path}")
Expand Down
24 changes: 13 additions & 11 deletions icometrix_sdk/anonymizer/anonymizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
from icometrix_sdk.anonymizer.hash_factory import HashMethod
from icometrix_sdk.anonymizer.models import Policy, TagPolicy
from icometrix_sdk.anonymizer.utils import remove_tag, replace_tag, hash_tag, _is_pixel_data, round_tag, \
add_de_identification_tags, is_tag, is_group
add_de_identification_tags, is_tag, is_group, empty_tag

logger = logging.getLogger(__name__)


class Anonymizer:
default_policy: TagPolicy = TagPolicy("remove", "Default")
default_policy: TagPolicy = TagPolicy("empty", "Default")

def __init__(self, policy: Policy, group_policy: Policy, hash_algo: HashMethod):
for tag in policy:
Expand All @@ -28,44 +28,46 @@ def __init__(self, policy: Policy, group_policy: Policy, hash_algo: HashMethod):
self.hash_algo = hash_algo

def anonymize(self, dataset: Dataset) -> Dataset:
for element in dataset:
for element in dataset.iterall():
# Keep the pixel data
if _is_pixel_data(element.tag):
continue

# Apply the tag policy
elif element.tag in self.policy:
tag_policy = self.policy[element.tag]
self._apply_policy_to_tag(element, tag_policy)
self._apply_policy_to_tag(element, tag_policy, dataset)

# Apply the group policy
elif element.tag.group in self.group_policy:
tag_policy = self.group_policy[element.tag.group]
self._apply_policy_to_group(element, tag_policy)
self._apply_policy_to_group(element, tag_policy, dataset)

# Apply the default policy
else:
self._apply_policy_to_tag(element, self.default_policy)
self._apply_policy_to_tag(element, self.default_policy, dataset)

return add_de_identification_tags(dataset)

def _apply_policy_to_group(self, element: DataElement, tag_policy: TagPolicy):
def _apply_policy_to_group(self, element: DataElement, tag_policy: TagPolicy, dataset: Dataset):
try:
self._apply_policy_to_tag(element, tag_policy)
self._apply_policy_to_tag(element, tag_policy, dataset)
except (AttributeError, ValueError):
logger.debug("Failed to apply group action '%d' to %s %s.", tag_policy.action,
element.tag, element.name)
return

def _apply_policy_to_tag(self, element: DataElement, tag_policy: TagPolicy):
def _apply_policy_to_tag(self, element: DataElement, tag_policy: TagPolicy, dataset: Dataset):
logger.debug('%d %s: %s', element.tag, element.name, tag_policy.action)

if tag_policy.action == "keep":
return
elif tag_policy.action == "empty":
empty_tag(element)
elif tag_policy.action == "remove":
remove_tag(element)
remove_tag(element, dataset)
elif tag_policy.action == "replace":
replace_tag(element, tag_policy.value)
replace_tag(element, dataset, tag_policy.replace_fn)
elif tag_policy.action == "hash":
hash_tag(element, self.hash_algo)
elif tag_policy.action == "round":
Expand Down
10 changes: 10 additions & 0 deletions icometrix_sdk/anonymizer/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import os
from pydicom.config import RAISE

# Organization root, default to icometrix (https://dicom.nema.org/dicom/2013/output/chtml/part05/chapter_B.html)
ROOT_UID: str = os.getenv("ROOT_UID", "1.2.826.0.1.3680043.9.5542")
VALIDATION_MODE: int = int(os.getenv("VALIDATION_MODE", RAISE))

PATIENT_IDENTITY_REMOVED_TAG: int = 0x00120062
DE_IDENTIFICATION_METHOD_TAG: int = 0x00120063
PRIVATE_ICOMETRIX_GROUPS: list[int] = [0x0009, 0x0015, 0x0017]
3 changes: 0 additions & 3 deletions icometrix_sdk/anonymizer/constants.py

This file was deleted.

12 changes: 7 additions & 5 deletions icometrix_sdk/anonymizer/hash_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

class HashFactory:
@staticmethod
def create_hash_method(algo: HashAlgo, size=256, salt=None):
def create_hash_method(algo: HashAlgo, size=512, salt=None):
if algo == "sha3":
return SHA3(size)
elif algo == "md5":
return MD5()
elif algo == "short_md5":
return IcometrixMD5()
return ShortMD5()
else:
supported = ", ".join(get_args(HashAlgo))
raise HashAlgorithmException(f"No algorithm named {algo} is supported, valid values are {supported}")
Expand Down Expand Up @@ -58,12 +58,14 @@ def calculate_hash_from_bytes(self, input_obj: bytes):
return hashlib.md5(input_obj, usedforsecurity=True).hexdigest()


class IcometrixMD5(HashMethod):
class ShortMD5(HashMethod):
"""
MD5 that is re-based to base10.
"""

def calculate_hash(self, input_obj: str, encoding='utf-8') -> str:
return self.calculate_hash_from_bytes(input_obj.encode(encoding))

def calculate_hash_from_bytes(self, input_obj: bytes):
md5_hash = MD5().calculate_hash_from_bytes(input_obj)
decimized = str(int(md5_hash, base=16))[:10]
return decimized
return str(int(md5_hash, base=16))[:10]
26 changes: 22 additions & 4 deletions icometrix_sdk/anonymizer/models.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,34 @@
from dataclasses import dataclass
from typing import Literal, get_args
from typing import Literal, Callable
from pydicom import DataElement, Dataset

Action = Literal["keep", "remove", "replace", "hash", "round"]
Action = Literal["keep", "empty", "remove", "replace", "hash", "round"]
HashAlgo = Literal["sha3", "md5", "short_md5"]
ReplaceFn = Callable[[DataElement, Dataset], None]


@dataclass
class TagPolicy:
"""Class for defining a DICOM tag anonymization policy"""
"""
represents a DICOM tag anonymization policy, defining how individual tags or groups should be anonymized.
:param: Action to be performed on the tag (keep, remove, replace, hash, round)
:param: Description of the tag anonymization policy.
:param: Replace Function to be used when the action is "replace" (optional).
Actions:
- keep: Keep the original tag value.
- remove: Remove the tag from the DICOM dataset.
- replace: Replace the tag value with a specified :attr:`~icometrix_sdk.anonymizer.models.TagPolicy.value`.
- hash: Hash the tag value using a specified algorithm.
- round: Round the tag value to the nearest value. (currently only dates VRs are supported)
"""
action: Action
description: str
value: int | str = None

# value: int | str = None
replace_fn: ReplaceFn = None


Policy = dict[int, TagPolicy]
Loading

0 comments on commit d82e66c

Please sign in to comment.