Skip to content

Commit

Permalink
Pull request #6: Feature/anonymization
Browse files Browse the repository at this point in the history
BAC-9129: feature/anonymization

Squashed commit of the following:

commit 8c4f721b81feae0aa3b868b54315846d35f34347
Author: Jeroen Pinxten <jeroen.pinxten@icometrix.com>
Date:   Wed Apr 10 14:18:34 2024 +0200

    BAC-9129: get supported hash algos from Literal

commit 823eae14b0f4ce381cc86260f885957bdd5c4321
Author: Jeroen Pinxten <jeroen.pinxten@icometrix.com>
Date:   Wed Apr 10 13:53:12 2024 +0200

    BAC-9129: unit test hash

commit 06f562396f6a28335cff94833b8eee579fcfa900
Author: Jeroen Pinxten <jeroen.pinxten@icometrix.com>
Date:   Wed Apr 10 10:45:35 2024 +0200

    BAC-9129: move test file code to examples

commit a9a9f2dd28b3d32aa85c7b16cbe8996a34b39495
Author: Jeroen Pinxten <jeroen.pinxten@icometrix.com>
Date:   Wed Apr 10 10:43:21 2024 +0200

    BAC-9129: unit test anonimization

commit 5eee105dd34d8ee946b3fafaa8f96807b28d57ab
Author: Jeroen Pinxten <jeroen.pinxten@icometrix.com>
Date:   Tue Apr 9 13:58:23 2024 +0200

    BAC-9129: unit test anonimization

commit 6252f8e9acc80eafea7a0426b10771d26251bc0f
Author: Jeroen Pinxten <jeroen.pinxten@icometrix.com>
Date:   Tue Apr 9 11:16:50 2024 +0200

    BAC-9129: unit test anonimization
  • Loading branch information
jpinxten committed Apr 10, 2024
1 parent 765e8d4 commit b2c915e
Show file tree
Hide file tree
Showing 19 changed files with 630 additions and 53 deletions.
26 changes: 20 additions & 6 deletions examples/anonymize.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
import os
from pathlib import Path

import pydicom
from pydicom.data import get_testdata_files

from icometrix_sdk.anonymizer.anonymizer import Anonymizer
from icometrix_sdk.anonymizer.hash_factory import HashFactory
from icometrix_sdk.anonymizer.policy import policy, group_policy

file_paths = os.listdir("<data_dir>")
# These files are included in the pydicom test dataset to test failed dcmread
INVALID_FILES = [
"ExplVR_BigEndNoMeta.dcm",
"ExplVR_LitEndNoMeta.dcm",
"no_meta.dcm",
"rtstruct.dcm",
"OT-PAL-8-face.dcm",
]

hash_algo = HashFactory.create_hash_method("ico_md5")

def get_dicom_test_files():
all_files = get_testdata_files("*.dcm")
return [x for x in all_files if Path(x).name not in INVALID_FILES]


hash_algo = HashFactory.create_hash_method("short_md5")
anon = Anonymizer(policy, group_policy, hash_algo)

for file_path in file_paths:
dataset = pydicom.dcmread(f"out/{file_path}")
anon.anonymize(dataset).save_as(f"out/anon-{file_path}")
for file_path in get_dicom_test_files():
dataset = pydicom.dcmread(f"{file_path}")
anon.anonymize(dataset).save_as(f"out/anon-{Path(file_path).name}")
29 changes: 27 additions & 2 deletions icometrix_sdk/anonymizer/anonymizer.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
import logging

from pydicom import Dataset, DataElement

from icometrix_sdk.anonymizer.exceptions import PolicyException
from icometrix_sdk.anonymizer.hash_factory import HashMethod
from icometrix_sdk.anonymizer.models import Policy, TagPolicy
from icometrix_sdk.anonymizer.utils import remove_tag, replace_tag, hash_tag, _is_pixel_data, round_tag, \
add_de_identification_tags
add_de_identification_tags, is_tag, is_group

logger = logging.getLogger(__name__)


class Anonymizer:
default_policy: TagPolicy = TagPolicy("remove", "Default")

def __init__(self, policy: Policy, group_policy: Policy, hash_algo: HashMethod):
for tag in policy:
if not is_tag(tag):
raise PolicyException("Tag policy contains an invalid tag")

for group in group_policy:
if not is_group(group):
raise PolicyException("Group policy contains an invalid group")

self.policy = policy
self.group_policy = group_policy
self.hash_algo = hash_algo
Expand All @@ -28,15 +41,25 @@ def anonymize(self, dataset: Dataset) -> Dataset:
# Apply the group policy
elif element.tag.group in self.group_policy:
tag_policy = self.group_policy[element.tag.group]
self._apply_policy_to_tag(element, tag_policy)
self._apply_policy_to_group(element, tag_policy)

# Apply the default policy
else:
self._apply_policy_to_tag(element, self.default_policy)

return add_de_identification_tags(dataset)

def _apply_policy_to_group(self, element: DataElement, tag_policy: TagPolicy):
try:
self._apply_policy_to_tag(element, tag_policy)
except (AttributeError, ValueError):
logger.debug("Failed to apply group action '%d' to %s %s.", tag_policy.action,
element.tag, element.name)
return

def _apply_policy_to_tag(self, element: DataElement, tag_policy: TagPolicy):
logger.debug('%d %s: %s', element.tag, element.name, tag_policy.action)

if tag_policy.action == "keep":
return
elif tag_policy.action == "remove":
Expand All @@ -47,3 +70,5 @@ def _apply_policy_to_tag(self, element: DataElement, tag_policy: TagPolicy):
hash_tag(element, self.hash_algo)
elif tag_policy.action == "round":
round_tag(element)
else:
raise ValueError("Unknown tag policy action")
14 changes: 14 additions & 0 deletions icometrix_sdk/anonymizer/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
class IcometrixException(Exception):
...


class PolicyException(IcometrixException):
...


class HashAlgorithmException(IcometrixException):
...


class HashSizeException(IcometrixException):
...
23 changes: 9 additions & 14 deletions icometrix_sdk/anonymizer/hash_factory.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,23 @@
import hashlib
from abc import abstractmethod
from typing import Literal
from typing import get_args


class UnsupportedAlgorithmException(Exception):
pass


class UnsupportedSizeException(Exception):
pass
from icometrix_sdk.anonymizer.exceptions import HashAlgorithmException, HashSizeException
from icometrix_sdk.anonymizer.models import HashAlgo


class HashFactory:
@staticmethod
def create_hash_method(algo: Literal["sha3", "md5", "ico_md5"], size=256, salt=None):
def create_hash_method(algo: HashAlgo, size=256, salt=None):
if algo == "sha3":
return SHA3(size)
elif algo == "md5":
return MD5()
elif algo == "ico_md5":
elif algo == "short_md5":
return IcometrixMD5()
else:
raise UnsupportedAlgorithmException(f"No algorithm named {algo} is supported, "
f"valid values are \"sha3\", \'md5\"")
supported = ", ".join(get_args(HashAlgo))
raise HashAlgorithmException(f"No algorithm named {algo} is supported, valid values are {supported}")


class HashMethod:
Expand Down Expand Up @@ -51,8 +46,8 @@ def calculate_hash_from_bytes(self, input_obj: bytes):
elif self.size == 512:
hash_obj = hashlib.sha3_512(input_obj)
else:
raise UnsupportedSizeException(f"SHA3 does not support size {self.size}, "
f"valid values are (224, 256, 384, 512)")
raise HashSizeException(f"SHA3 does not support size {self.size}, "
f"valid values are (224, 256, 384, 512)")

digest = hash_obj.hexdigest()
return digest
Expand Down
4 changes: 2 additions & 2 deletions icometrix_sdk/anonymizer/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from dataclasses import dataclass
from typing import Literal
from typing import Literal, get_args

Action = Literal["keep", "remove", "replace", "hash", "round"]
HashAlgo = Literal["sha3", "md5", "ico_md5"]
HashAlgo = Literal["sha3", "md5", "short_md5"]


@dataclass
Expand Down
1 change: 1 addition & 0 deletions icometrix_sdk/anonymizer/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
0x300a0013: TagPolicy("hash", "DoseReferenceUID"),

0x00100010: TagPolicy("hash", "PatientName"),
# 0x00100010: TagPolicy("replace", "PatientName", "test"),
0x00080090: TagPolicy("keep", "ReferringPhysicianName"),

0x00080050: TagPolicy("keep", "AccessionNumber"),
Expand Down
Binary file removed icometrix_sdk/anonymizer/test_data/IM-0007-0106.dcm
Binary file not shown.
Empty file.
32 changes: 32 additions & 0 deletions icometrix_sdk/anonymizer/tests/test_anonimizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import pytest

from icometrix_sdk.anonymizer.anonymizer import Anonymizer
from icometrix_sdk.anonymizer.exceptions import PolicyException
from icometrix_sdk.anonymizer.hash_factory import HashMethod, HashFactory
from icometrix_sdk.anonymizer.models import Policy, TagPolicy


@pytest.fixture(scope="module")
def hash_algo() -> HashMethod:
return HashFactory.create_hash_method("md5")


tags: Policy = {
0x00180081: TagPolicy("keep", "EchoTime"),
}

groups: Policy = {
0x0018: TagPolicy("keep", "Group 18"),
}


def test_valid_constructor(hash_algo: HashMethod):
Anonymizer(tags, groups, hash_algo)


def test_invalid_constructor(hash_algo: HashMethod):
with pytest.raises(PolicyException):
Anonymizer(groups, groups, hash_algo)

with pytest.raises(PolicyException):
Anonymizer(tags, tags, hash_algo)
50 changes: 50 additions & 0 deletions icometrix_sdk/anonymizer/tests/test_hash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import pytest
from pydicom import DataElement
from pydicom.valuerep import MAX_VALUE_LEN

from icometrix_sdk.anonymizer.hash_factory import HashFactory, HashMethod
from icometrix_sdk.anonymizer.utils import hash_tag
from icometrix_sdk.anonymizer.utils import _cut_max_length


@pytest.fixture(scope="module")
def hash_algo() -> HashMethod:
return HashFactory.create_hash_method("md5")


def test_hash_long_value(hash_algo: HashMethod):
value = "Head"
max_len = MAX_VALUE_LEN["LO"]

expected = _cut_max_length(hash_algo.calculate_hash(value), max_len)

elem1 = DataElement(0x00080013, "LO", value)
hash_tag(elem1, hash_algo)

assert elem1.value == expected


def test_hash_short_value(hash_algo: HashMethod):
value = "4753014.1"
max_len = MAX_VALUE_LEN["SH"]

expected = _cut_max_length(hash_algo.calculate_hash(value), max_len)

elem1 = DataElement(0x00080050, "SH", value)
hash_tag(elem1, hash_algo)

assert elem1.value == expected


def test_hash_number_value(hash_algo: HashMethod):
elem1 = DataElement(0x00020000, "UL", 210)
with pytest.raises(ValueError):
hash_tag(elem1, hash_algo)


def test_hash_ui_value(hash_algo: HashMethod):
value = "1.2.826.0.1.3680043.9.5542.5114248473116471214116117310121520961"
expected = "1.2.826.0.1.3680043.9.5542.2676173402192025550109336474686546713"
elem1 = DataElement(0x0020000D, "UI", value)
hash_tag(elem1, hash_algo)
assert elem1.value == expected
43 changes: 43 additions & 0 deletions icometrix_sdk/anonymizer/tests/test_hash_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from icometrix_sdk.anonymizer.hash_factory import HashFactory


def test_hash_string_hash_factory():
# Input data
data = 'Patient Name'

# Expected SHA3-256 hash
expected_hash = '95fa47c8fb19996e4bdeed2c3fa7cc35ab013737fd8b562a51335302773c4b29'

# Calculate the SHA3-256 hash
hash_factory = HashFactory()
hash_method = hash_factory.create_hash_method("sha3", 256)
hash_digest = hash_method.calculate_hash(data) # anonymizer._hash(data)

# Verify the calculated hash matches the expected hash
assert expected_hash == hash_digest


def test_sha3():
dataset = [('BART_TEST.', '3fb8369208'), ('BART_TEST ', '077979f009'), ]

# Calculate the SHA3-512 hash
hash_factory = HashFactory()
hash_method = hash_factory.create_hash_method("sha3", 512)

# Verify the calculated hashes match the expected hashes
for data, expected_hash in dataset:
hash_digest = hash_method.calculate_hash(data)[:10]
assert expected_hash == hash_digest


def test_short_md5():
dataset = [('BART_TEST ', '1080702796'), ('BART_TEST.', '2994824863')]

# Calculate the icometrix MD5 hash (MD5 in decimal format)
hash_factory = HashFactory()
hash_method = hash_factory.create_hash_method("short_md5")

# Verify the calculated hash matches the expected hash
for data, expected_hash in dataset:
hash_digest = hash_method.calculate_hash(data)
assert expected_hash == hash_digest, 'hash verification failed'
Loading

0 comments on commit b2c915e

Please sign in to comment.