Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigquery Row-Level Deletes + Erase After on Database Collections #5293

Merged
merged 21 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
65dfd23
POC - executing a delete request on a row in a postgres db.
pattisdr Sep 10, 2024
d8d1cb1
Merge main
pattisdr Sep 13, 2024
4aeefe5
Merge branch 'main' into PROD-2744-row-deletes
pattisdr Sep 17, 2024
e07c481
Generate a delete statement for bigquery -
pattisdr Sep 17, 2024
e358379
Revert sql-connector code for now to focus on bigquery-
pattisdr Sep 17, 2024
e7f22d7
Remove masking strategy override on postgres dataset
pattisdr Sep 17, 2024
9fa25f2
Add end-to-end testing of row-level deletes as a masking strategy for…
pattisdr Sep 19, 2024
0f60402
Add query config tests for bigquery around the update or delete state…
pattisdr Sep 19, 2024
4973996
Improve docstrings
pattisdr Sep 19, 2024
c67acc4
Extend "erase_after" concept so it can be used by databases not just …
pattisdr Sep 21, 2024
2bc6a90
Mark tests using bigquery
pattisdr Sep 21, 2024
921f169
Merge branch 'main' into PROD-2744-row-deletes
pattisdr Sep 23, 2024
97a600b
Refactor tests to get rid of the second bigquery example dataset, to …
pattisdr Sep 23, 2024
aea2445
Remove unnecessary statement.
pattisdr Sep 23, 2024
7066780
Update CHANGELOG and remove breakpoints
pattisdr Sep 23, 2024
f731c73
Make tests repeatable so the order doesn't matter
pattisdr Sep 23, 2024
b3af3f0
Merge branch 'main' into PROD-2744-row-deletes
pattisdr Sep 24, 2024
1879694
Bump fideslang commit.
pattisdr Sep 24, 2024
825d104
Merge branch 'main' into PROD-2744-row-deletes
pattisdr Sep 24, 2024
e425190
Fix copy/paste issues
pattisdr Sep 24, 2024
992f9e9
Merge branch 'main' into PROD-2744-row-deletes
pattisdr Sep 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ The types of changes are:

## [Unreleased](https://github.com/ethyca/fides/compare/2.45.2...main)

### Added
- Support row-level deletes for BigQuery and add erase_after support for database connectors [#5293](https://github.com/ethyca/fides/pull/5293)

## [2.45.2](https://github.com/ethyca/fides/compare/2.45.1...2.45.2)

### Fixed
Expand Down
9 changes: 8 additions & 1 deletion data/dataset/bigquery_example_test_dataset.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ dataset:
description: Example of a BigQuery dataset containing a variety of related tables like customers, products, addresses, etc.
collections:
- name: address
fides_meta:
erase_after: [ bigquery_example_test_dataset.employee ]
pattisdr marked this conversation as resolved.
Show resolved Hide resolved
fields:
- name: city
data_categories: [user.contact.address.city]
Expand All @@ -21,6 +23,8 @@ dataset:
data_categories: [user.contact.address.postal_code]

- name: customer
fides_meta:
erase_after: [ bigquery_example_test_dataset.address ]
fields:
- name: address_id
data_categories: [system.operations]
Expand All @@ -47,14 +51,17 @@ dataset:
length: 40

- name: employee
fides_meta:
masking_strategy_override:
strategy: delete
pattisdr marked this conversation as resolved.
Show resolved Hide resolved
fields:
- name: address_id
data_categories: [system.operations]
fides_meta:
references:
- dataset: bigquery_example_test_dataset
field: address.id
direction: to
direction: from
pattisdr marked this conversation as resolved.
Show resolved Hide resolved
- name: email
data_categories: [user.contact.email]
fides_meta:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ types-defusedxml==0.7.0.20240218
expandvars==0.9.0
fastapi[all]==0.111.0
fastapi-pagination[sqlalchemy]==0.12.25
fideslang==3.0.4
fideslang @ git+https://github.com/ethyca/fideslang.git@eee7ebd0fd5ee94c88db2e70bbccebaa8876a516
pattisdr marked this conversation as resolved.
Show resolved Hide resolved
fideslog==1.2.10
firebase-admin==5.3.0
GitPython==3.1.41
Expand Down
2 changes: 2 additions & 0 deletions src/fides/api/graph/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union

from fideslang.models import MaskingStrategyOverride
from fideslang.validation import FidesKey
from pydantic import BaseModel, ConfigDict, field_serializer, field_validator

Expand Down Expand Up @@ -454,6 +455,7 @@ class Collection(BaseModel):
# An optional set of dependent fields that need to be queried together
grouped_inputs: Set[str] = set()
data_categories: Set[FidesKey] = set()
masking_strategy_override: Optional[MaskingStrategyOverride] = None

@property
def field_dict(self) -> Dict[FieldPath, Field]:
Expand Down
13 changes: 13 additions & 0 deletions src/fides/api/models/datasetconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,23 @@ def convert_dataset_to_graph(
CollectionAddress(*s.split(".")) for s in collection.fides_meta.after
}

collection_erase_after: Set[CollectionAddress] = set()
if collection.fides_meta and collection.fides_meta.erase_after:
collection_erase_after = {
CollectionAddress(*s.split("."))
for s in collection.fides_meta.erase_after
}

masking_override = None
if collection.fides_meta and collection.fides_meta.masking_strategy_override:
masking_override = collection.fides_meta.masking_strategy_override

graph_collection = Collection(
name=collection.name,
fields=graph_fields,
after=collection_after,
erase_after=collection_erase_after,
masking_strategy_override=masking_override,
pattisdr marked this conversation as resolved.
Show resolved Hide resolved
skip_processing=collection_skip_processing,
data_categories=(
set(collection.data_categories) if collection.data_categories else set()
Expand Down
56 changes: 54 additions & 2 deletions src/fides/api/service/connectors/query_config.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
# pylint: disable=too-many-lines
import re
from abc import ABC, abstractmethod
from typing import Any, Dict, Generic, List, Optional, Tuple, TypeVar
from typing import Any, Dict, Generic, List, Optional, Tuple, TypeVar, Union

import pydash
from boto3.dynamodb.types import TypeSerializer
from fideslang.models import MaskingStrategies
from loguru import logger
from sqlalchemy import MetaData, Table, text
from sqlalchemy.engine import Engine
from sqlalchemy.sql import Executable, Update # type: ignore
from sqlalchemy.sql import Delete, Executable, Update # type: ignore
from sqlalchemy.sql.elements import ColumnElement, TextClause

from fides.api.graph.config import (
Expand Down Expand Up @@ -819,6 +820,28 @@ def get_formatted_query_string(
BigQuery reserved words."""
return f'SELECT {field_list} FROM `{self.node.collection.name}` WHERE {" OR ".join(clauses)}'

def generate_masking_stmt(
self,
node: ExecutionNode,
row: Row,
policy: Policy,
request: PrivacyRequest,
client: Engine,
) -> Union[Optional[Update], Optional[Delete]]:
"""
Generate a masking statement for BigQuery.

If a masking override is present, it will take precedence over the policy masking strategy.
"""

masking_override = node.collection.masking_strategy_override
if masking_override and masking_override.strategy == MaskingStrategies.DELETE:
logger.info(
f"Masking override detected for collection {node.address.value}: {masking_override.strategy.value}"
)
return self.generate_delete(row, client)
return self.generate_update(row, policy, request, client)
pattisdr marked this conversation as resolved.
Show resolved Hide resolved

def generate_update(
self, row: Row, policy: Policy, request: PrivacyRequest, client: Engine
) -> Optional[Update]:
Expand Down Expand Up @@ -851,6 +874,35 @@ def generate_update(
]
return table.update().where(*pk_clauses).values(**update_value_map)

def generate_delete(self, row: Row, client: Engine) -> Optional[Delete]:
"""Returns a SQLAlchemy DELETE statement for BigQuery. Does not actually execute the delete statement.

Used when a collection-level masking override is present and the masking strategy is DELETE.
"""
non_empty_primary_keys: Dict[str, Field] = filter_nonempty_values(
{
fpath.string_path: fld.cast(row[fpath.string_path])
for fpath, fld in self.primary_key_field_paths.items()
if fpath.string_path in row
}
)

valid = len(non_empty_primary_keys) > 0
if not valid:
logger.warning(
"There is not enough data to generate a valid DELETE statement for {}",
self.node.address,
)
return None

table = Table(
self.node.address.collection, MetaData(bind=client), autoload=True
)
pk_clauses: List[ColumnElement] = [
getattr(table.c, k) == v for k, v in non_empty_primary_keys.items()
]
return table.delete().where(*pk_clauses)

Comment on lines +877 to +905
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New method for Bigquery only to start to generate a DELETE statement for the relevant primary keys.


MongoStatement = Tuple[Dict[str, Any], Dict[str, Any]]
"""A mongo query is expressed in the form of 2 dicts, the first of which represents
Expand Down
20 changes: 12 additions & 8 deletions src/fides/api/service/connectors/sql_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,19 +562,23 @@ def mask_data(
request_task: RequestTask,
rows: List[Row],
) -> int:
"""Execute a masking request. Returns the number of records masked"""
"""Execute a masking request. Returns the number of records updated or deleted"""
query_config = self.query_config(node)
update_ct = 0
update_or_delete_ct = 0
client = self.client()
for row in rows:
update_stmt: Optional[Executable] = query_config.generate_update(
row, policy, privacy_request, client
update_or_delete_stmt: Optional[Executable] = (
query_config.generate_masking_stmt(
node, row, policy, privacy_request, client
)
pattisdr marked this conversation as resolved.
Show resolved Hide resolved
)
if update_stmt is not None:
if update_or_delete_stmt is not None:
with client.connect() as connection:
results: LegacyCursorResult = connection.execute(update_stmt)
update_ct = update_ct + results.rowcount
return update_ct
results: LegacyCursorResult = connection.execute(
update_or_delete_stmt
)
update_or_delete_ct = update_or_delete_ct + results.rowcount
return update_or_delete_ct


class SnowflakeConnector(SQLConnector):
Expand Down
18 changes: 18 additions & 0 deletions tests/fixtures/bigquery_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@ def bigquery_resources(
insert into customer (id, email, name, address_id)
values ({customer_id}, '{customer_email}', '{customer_name}', {address_id});
"""

connection.execute(stmt)

stmt = "select max(id) from employee;"
res = connection.execute(stmt)
employee_id = res.all()[0][0] + 1
employee_email = f"employee-{uuid}@example.com"
employee_name = f"Jane {uuid}"

stmt = f"""
insert into employee (id, email, name, address_id)
values ({employee_id}, '{employee_email}', '{employee_name}', {address_id});
"""
connection.execute(stmt)

yield {
Expand All @@ -131,6 +144,8 @@ def bigquery_resources(
"city": city,
"state": state,
"connector": connector,
"employee_id": employee_id,
"employee_email": employee_email,
}
# Remove test data and close BigQuery connection in teardown
stmt = f"delete from customer where email = '{customer_email}';"
Expand All @@ -139,6 +154,9 @@ def bigquery_resources(
stmt = f"delete from address where id = {address_id};"
connection.execute(stmt)

stmt = f"delete from employee where address_id = {address_id};"
connection.execute(stmt)


@pytest.fixture(scope="session")
def bigquery_test_engine() -> Generator:
Expand Down
25 changes: 25 additions & 0 deletions tests/ops/graph/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pydantic
import pytest
from fideslang.models import MaskingStrategies

from fides.api.graph.config import *
from fides.api.graph.data_type import (
Expand Down Expand Up @@ -95,6 +96,7 @@ def test_from_string(self):
collection_to_serialize = ds = Collection(
name="t3",
skip_processing=False,
masking_strategy_override=None,
fields=[
ScalarField(
name="f1",
Expand Down Expand Up @@ -124,6 +126,7 @@ def test_from_string(self):
serialized_collection = {
"name": "t3",
"skip_processing": False,
"masking_strategy_override": None,
"fields": [
{
"name": "f1",
Expand Down Expand Up @@ -378,6 +381,28 @@ def test_parse_from_task_without_data_categories(self):
parsed = Collection.parse_from_request_task(serialized_collection)
assert parsed.data_categories == set()

def test_collection_masking_strategy_override(self):
ds = Collection(
name="t3",
masking_strategy_override=MaskingStrategyOverride(
strategy=MaskingStrategies.DELETE
),
fields=[],
)

assert ds.masking_strategy_override == MaskingStrategyOverride(
strategy=MaskingStrategies.DELETE
)

serialized_collection_with_masking_override = {
"name": "t3",
"masking_strategy_override": {"strategy": "delete"},
"fields": [],
}

coll = ds.parse_from_request_task(serialized_collection_with_masking_override)
assert coll == ds


class TestField:
def test_generate_field(self) -> None:
Expand Down
37 changes: 37 additions & 0 deletions tests/ops/graph/test_graph_traversal.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import pytest
from fideslang import Dataset
from fideslang.models import MaskingStrategies

from fides.api.graph.graph import *
from fides.api.models.datasetconfig import convert_dataset_to_graph

from .graph_test_util import *

Expand Down Expand Up @@ -31,6 +34,40 @@ def test_graph_creation() -> None:
assert graph.identity_keys == {FieldAddress("dr_1", "ds_1", "f1"): "x"}


@pytest.mark.integration_external
@pytest.mark.integration_bigquery
def test_graph_creation_with_collection_level_meta(
example_datasets, bigquery_connection_config
):
dataset = Dataset(**example_datasets[7])
graph = convert_dataset_to_graph(dataset, bigquery_connection_config.key)
dg = DatasetGraph(*[graph])

# Assert erase_after
customer_collection = dg.nodes[
CollectionAddress("bigquery_example_test_dataset", "customer")
].collection
assert customer_collection.erase_after == {
CollectionAddress("bigquery_example_test_dataset", "address")
}

address_collection = dg.nodes[
CollectionAddress("bigquery_example_test_dataset", "address")
].collection
assert address_collection.erase_after == {
CollectionAddress("bigquery_example_test_dataset", "employee")
}
assert address_collection.masking_strategy_override is None

employee_collection = dg.nodes[
CollectionAddress("bigquery_example_test_dataset", "employee")
].collection
assert employee_collection.erase_after == set()
assert employee_collection.masking_strategy_override == MaskingStrategyOverride(
strategy=MaskingStrategies.DELETE
)


def test_extract_seed_nodes() -> None:
# TEST INIT:
t = generate_graph_resources(3)
Expand Down
3 changes: 2 additions & 1 deletion tests/ops/integration_tests/setup_scripts/postgres_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

# Need to manually import this model because it's used in src/fides/api/models/property.py
# but that file only imports it conditionally if TYPE_CHECKING is true
from fides.api.models.experience_notices import ExperienceNotices
pattisdr marked this conversation as resolved.
Show resolved Hide resolved
from fides.api.models.privacy_experience import PrivacyExperienceConfig
from fides.api.service.connectors.sql_connector import PostgreSQLConnector
from fides.config import CONFIG
Expand All @@ -29,7 +30,7 @@ def seed_postgres_data(db: Session, query_file_path: str) -> Session:
that contains the query to seed the data in the DB. e.g.,
`./docker/sample_data/postgres_example.sql`

Using the provided sesion, creates the database, dropping it if it
Using the provided session, creates the database, dropping it if it
already existed. Seeds the created database using the query found
in the relative path provided. Some processing is done on the query
text so that it can be executed properly.
Expand Down
Loading
Loading