Skip to content

Commit

Permalink
Merge pull request #14 from FHIR-Aggregator/development
Browse files Browse the repository at this point in the history
Development
  • Loading branch information
teslajoy authored Feb 19, 2025
2 parents d6a5d37 + 6d07c52 commit 7a62f40
Show file tree
Hide file tree
Showing 3 changed files with 933 additions and 789 deletions.
24 changes: 17 additions & 7 deletions fhir_query/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ def visualize(db_path: str, output_path: str, ignored_edges: list[str]) -> None:
try:
db = ResourceDB(db_path=db_path)
visualize_aggregation(db.aggregate(ignored_edges), output_path)
click.echo(f"Wrote: {output_path}", file=sys.stderr)
except Exception as e:
logging.error(f"Error: {e}", exc_info=True)
click.echo(f"Error: {e}", file=sys.stderr)
Expand Down Expand Up @@ -197,28 +198,37 @@ def summarize(db_path: str) -> None:
help="Open the graph in a browser using the dtale package for interactive data exploration.",
)
@click.argument(
"data_type", required=True, type=click.Choice(["Specimen", "DocumentReference", "ResearchSubject"]), default="Specimen"
"data_type",
required=True,
type=click.Choice(["Specimen", "DocumentReference", "ResearchSubject", "Patient"]),
default="Specimen",
)
def dataframe(db_path: str, output_path: str, launch_dtale: bool, data_type: str) -> None:
"""Create dataframes from the local db."""

try:
db = Dataframer(db_path=db_path)
# TODO - add more data types
assert data_type in ["Specimen"], f"Sorry {data_type} dataframe is not supported yet."
df = pd.DataFrame(db.flattened_specimens())
# TODO - add more data types - including condition
assert data_type in ["Specimen", "Patient"], f"Sorry {data_type} dataframe is not supported yet."

df: pd.DataFrame | None = None
if data_type == "Specimen":
df = pd.DataFrame(db.flattened_specimens())
if data_type == "Patient":
df = pd.DataFrame(db.flattened_patients())

if launch_dtale:
# TODO - add check that dtale is installed
import dtale

dtale.show(df, subprocess=False, open_browser=True, port=40000)
else:
elif df is not None:
# export to csv
file_name = output_path if output_path else f"{data_type}.csv"
df.to_csv(file_name, index=False)
click.secho(f"Saved {file_name}", file=sys.stderr)
df.to_csv(output_path, index=False)
click.secho(f"Saved {output_path}", file=sys.stderr)
else:
click.secho(f"No data found for {data_type}", file=sys.stderr)

except Exception as e:
logging.error(f"Error: {e}", exc_info=True)
Expand Down
93 changes: 71 additions & 22 deletions fhir_query/dataframer.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,14 @@ def values(self) -> dict:
return {}

# update the key if code information is available

if self.resource.get("code", {}).get("text", None):
source = self.resource["code"]["text"]
else:
source = self.resource["code"]["coding"][0].get("display", self.resource["code"]["coding"][0].get("code"))
if not source:
source = "NA"
source = inflection.underscore(inflection.parameterize(source))
return {source: value}


Expand Down Expand Up @@ -329,7 +335,6 @@ def values(self) -> dict:

# get top-level value in dict if it exists
_values = super().values

if len(_values) == 0:
assert "component" in self.resource, "no component nor top-level value found"

Expand All @@ -343,28 +348,30 @@ def values(self) -> dict:
continue
_values[source] = value

# knowing there's now at least 1 item in _values
if "component" in self.resource:
# ensure no top-level value is not duplicating a component code value
# TODO: ensure this value_key corresponds to percent_tumor on some runs due to getting display
value_key = [k for k in _values][0]
assert (
value_key not in self.resource["component"]
), """duplicate code value found, only specify the code value in the component, see Rule obs-7
https://build.fhir.org/observation.html#invs"""

# get component codes
# knowing there's now at least 1 item in _values
if "component" in self.resource:
for component in self.resource["component"]:
value, source = normalize_value(component)
if component.get("code", {}).get("text", None):
source = component["code"]["text"]
if not value:
continue
_values[source] = value
# ensure no top-level value is not duplicating a component code value
# TODO: ensure this value_key corresponds to percent_tumor on some runs due to getting display
value_key = [k for k in _values][0]
assert (
value_key not in self.resource["component"]
), """duplicate code value found, only specify the code value in the component, see Rule obs-7
https://build.fhir.org/observation.html#invs"""

# get component codes
if "component" in self.resource:
for component in self.resource["component"]:
value, source = normalize_value(component)
if component.get("code", {}).get("text", None):
source = component["code"]["text"]
if not value:
continue
_values[source] = value

if "code" in self.resource and "text" in self.resource["code"]:
_values["observation_code"] = self.resource["code"]["text"]

assert not [_ for _ in _values.keys() if _.startswith("value")], f"key misnamed {_values}\n {self.resource}"
assert len(_values) > 0, f"no values found in Observation: {self.resource}"

return _values
Expand Down Expand Up @@ -515,7 +522,7 @@ def flattened_specimens(self) -> Generator[dict, None, None]:
cursor = self.connection.cursor()

# get a dict mapping focus ID to its associated observations
observations_by_focus_id = self.get_observations_by_focus(resource_type)
specimen_observations_by_focus_id = self.get_observations_by_focus(resource_type)
service_requests_by_specimen_id = self.get_resources_by_reference("ServiceRequest", "specimen", "Specimen")
document_references_by_based_on_id = self.get_resources_by_reference("DocumentReference", "basedOn", "ServiceRequest")

Expand All @@ -524,7 +531,7 @@ def flattened_specimens(self) -> Generator[dict, None, None]:
for _, _, _, resource in cursor.fetchall():
specimen = json.loads(resource)
yield self.flattened_specimen(
specimen, observations_by_focus_id, service_requests_by_specimen_id, document_references_by_based_on_id
specimen, specimen_observations_by_focus_id, service_requests_by_specimen_id, document_references_by_based_on_id
)

def flattened_specimen(
Expand All @@ -536,7 +543,17 @@ def flattened_specimen(
flat_specimen = traverse(specimen)

# extract its .subject and append its fields (including id)
flat_specimen.update(self.get_subject(specimen))
subject = self.get_subject(specimen)
if "patient_id" in subject:
assert len(self.flattened_patients()) > 1, f"Length of flattened_patients is {len(self.flattened_patients())}"
_flattened_patient = next(
iter([_ for _ in self.flattened_patients() if _["patient_id"] == subject["patient_id"]]), None
)
if not _flattened_patient:
print(f"Patient not found {subject['patient_id']} {[_['patient_id'] for _ in self.flattened_patients()]}")
else:
subject = {f"patient_{k}".replace("patient_patient_", "patient_"): v for k, v in _flattened_patient.items()}
flat_specimen.update(subject)

# populate observation codes for each associated observation
if specimen["id"] in observation_by_id:
Expand All @@ -558,3 +575,35 @@ def flattened_specimen(
flat_specimen.update(traverse(document_reference))

return flat_specimen

@lru_cache(maxsize=None)
def flattened_patients(self) -> list[dict]:
"""
Generator that yields flattened Patient records.
Each flattened Patient merges in fields from:
- Observations that reference the Patient via the focus field
"""
resource_type = "Patient"
cursor = self.connection.cursor()

observations_by_focus = self.get_resources_by_reference("Observation", "focus", "Patient")
cursor.execute("SELECT * FROM resources WHERE resource_type = ?", (resource_type,))
_flattened_patients = []
for _, _, _, resource in cursor.fetchall():
patient = json.loads(resource)
_flattened_patients.append(self.flattened_patient(patient, observations_by_focus))
return _flattened_patients

@staticmethod
def flattened_patient(patient: dict, observations_by_subject: dict) -> dict:
"""Return the flattened Patient record with related Observations"""
flat_patient = traverse(patient)

if patient["id"] in observations_by_subject:
observations = observations_by_subject[patient["id"]]
for observation in observations:
flat_observation = SimplifiedResource.build(resource=observation).values
flat_observation = {f"observation_{k}": v for k, v in flat_observation.items()}
flat_patient.update(flat_observation)

return flat_patient
Loading

0 comments on commit 7a62f40

Please sign in to comment.