Skip to content

Commit

Permalink
[ETL-593] Record sort ordering for duplicate records (#101)
Browse files Browse the repository at this point in the history
* Record sort ordering for duplicate records
  • Loading branch information
BryanFauble authored Jan 9, 2024
1 parent dd285e8 commit 84bbffb
Show file tree
Hide file tree
Showing 3 changed files with 937 additions and 863 deletions.
108 changes: 108 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,111 @@ Once the pull request has been approved, we expect _you_ to merge. Although this
Code ought to conform to [`PEP8`](https://peps.python.org/pep-0008/), but may deviate if it helps with readability. There are many popular libraries for validating against PEP8, including [pylint](https://pypi.org/project/pylint/) and [black](https://pypi.org/project/black/) (which is a code formatter).

We also encourage [never nesting](https://www.youtube.com/watch?v=CFRhGnuXG-4), a slight misnomer which refers to never nesting code beyond three levels. This helps with readability and testing.

## Test scripts
The following are a small number of test scripts used for various testing and is meant to aid in the contribution to this project.


### Use PANDAS to print out a specific record from a parquet file
```
import pandas as pd
import json
import os
test_data = "part-00000-cf1aa98f-2ea2-48ea-81d5-2c0359437bf0.c000.snappy.parquet"
test_data_2 = "part-00000-2038617e-8480-4f63-ba05-f5046a7da0d1.c000.snappy.parquet"
data = pd.read_parquet(
os.path.expanduser(
f"~/recover-data/{test_data_2}"
),
engine="pyarrow",
)
result = data[data["ParticipantIdentifier"] == "**********"].to_dict(
orient="records"
)
print(json.dumps(result, indent=4))
```

### Use PYSPARK to read, query, and print an ndjson file
This relies on pyspark being install. See the instructions:
https://spark.apache.org/docs/3.1.2/api/python/getting_started/install.html

```
import os
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
RECORD_TO_RETRIEVE = "********"
INDEX_FIELD_MAP = {
"enrolledparticipants": ["ParticipantIdentifier"],
}
def drop_table_duplicates(
spark_df,
table_name: str,
):
""" """
table_name_components = table_name.split("_")
table_data_type = table_name_components[1]
if "InsertedDate" in spark_df.columns:
sorted_spark_df = spark_df.sort(spark_df.InsertedDate.desc())
else:
sorted_spark_df = spark_df.sort(spark_df.export_end_date.desc())
table = sorted_spark_df.dropDuplicates(subset=INDEX_FIELD_MAP[table_data_type])
return table
original_table = spark.read.json(
os.path.expanduser(
"~/recover/etl-593-data-drop/prod/EnrolledParticipants_20240107.part0.ndjson"
),
)
# Displays the content of the DataFrame
record_to_compare = spark.sql(
f"SELECT CustomFields FROM enrolledParticipants WHERE ParticipantIdentifier = '{RECORD_TO_RETRIEVE}'"
)
json_result = record_to_compare.toJSON().collect()
# Print each JSON string in the list to the console
print(f"Record count: {original_table.count()}")
for json_str in json_result:
print(json_str)
# Check to see if there are actually any duplicates on this table
record_to_compare = spark.sql(
"SELECT * FROM (SELECT ParticipantIdentifier, count(*) as partCount FROM enrolledParticipants group by ParticipantIdentifier order by partCount desc) WHERE partCount > 1 "
)
json_result = record_to_compare.toJSON().collect()
# Print each JSON string in the list to the console
for json_str in json_result:
print(json_str)
# # De-dupe table and comapre results
modified_table = drop_table_duplicates(
spark_df=original_table, table_name="dataset_enrolledparticipants"
)
modified_table.createOrReplaceTempView("enrolledParticipants_no_dupe")
no_dupe_record_to_compare = spark.sql(
f"SELECT CustomFields FROM enrolledParticipants_no_dupe WHERE ParticipantIdentifier = '{RECORD_TO_RETRIEVE}'"
)
json_result = no_dupe_record_to_compare.toJSON().collect()
print(f"Record count: {modified_table.count()}")
# Print each JSON string in the list to the console
for json_str in json_result:
print(json_str)
```
Loading

0 comments on commit 84bbffb

Please sign in to comment.