Skip to content

Commit

Permalink
Updating SQL to update variable number of columns
Browse files Browse the repository at this point in the history
  • Loading branch information
kotkar-pallavi committed Feb 26, 2025
1 parent 24033c4 commit a7463f3
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -51,6 +50,7 @@
import static com.linkedin.metadata.dao.utils.EBeanDAOUtils.*;
import static com.linkedin.metadata.dao.utils.SQLIndexFilterUtils.*;
import static com.linkedin.metadata.dao.utils.SQLSchemaUtils.*;
import static com.linkedin.metadata.dao.utils.SQLStatementUtils.*;


/**
Expand Down Expand Up @@ -194,20 +194,21 @@ public <ASPECT_UNION extends RecordTemplate> int create(
.map(aspectCreateLamdba -> aspectCreateLamdba.getAspectClass().getCanonicalName())
.collect(Collectors.toList());

sqlUpdate = _server.createSqlUpdate(SQLStatementUtils.createAspectInsertSql(urn, classNames, isTestMode));

sqlUpdate.setParameter("urn", urn.toString())
.setParameter("lastmodifiedon", new Timestamp(timestamp).toString())
.setParameter("lastmodifiedby", actor);

// If a non-default UrnPathExtractor is provided, the user MUST specify in their schema generation scripts
// 'ALTER TABLE <table> ADD COLUMN a_urn JSON'.
if (urnExtraction) {
sqlUpdate.setParameter("a_urn", toJsonString(urn));
// Create insert statement with variable number of aspect columns
// For example: INSERT INTO <table_name> (<columns>)
StringBuilder insertIntoSql = new StringBuilder(SQL_INSERT_INTO_ASPECT_WITH_URN);
for (String className: classNames) {
insertIntoSql.append(getAspectColumnName(urn.getEntityType(), className));
// Add comma if not the last column
if (!className.equals(classNames.get(classNames.size() - 1))) {
insertIntoSql.append(", ");
}
}
insertIntoSql.append(CLOSING_BRACKET);

List<String> auditedAspects = new ArrayList<>();

// Create part of insert statement with variable number of aspect values
// For example: VALUES (<values>);
StringBuilder insertSqlValues = new StringBuilder(SQL_INSERT_ASPECT_VALUES_WITH_URN);
for (int i = 0; i < aspectValues.size(); i++) {
AuditedAspect auditedAspect = new AuditedAspect()
.setAspect(RecordUtils.toJsonString(aspectValues.get(i)))
Expand All @@ -219,11 +220,29 @@ public <ASPECT_UNION extends RecordTemplate> int create(
auditedAspect.setEmitTime(ingestionTrackingContext.getEmitTime(), SetMode.IGNORE_NULL);
auditedAspect.setEmitter(ingestionTrackingContext.getEmitter(), SetMode.IGNORE_NULL);
}
auditedAspects.add(toJsonString(auditedAspect));
insertSqlValues.append(encloseInSingleQuotes(toJsonString(auditedAspect)));
if (i != aspectValues.size() - 1) {
insertSqlValues.append(", ");
}
}

String metadata = String.join(",", auditedAspects);
return sqlUpdate.setParameter("metadata", metadata).execute();
insertSqlValues.append(CLOSING_BRACKET_WITH_SEMICOLON);
// Build the final insert statement
// For example: INSERT INTO <table_name> (<columns>) VALUES (<values>);
String insertStatement = insertIntoSql.toString() + insertSqlValues.toString();
insertStatement = String.format(insertStatement, getTableName(urn));

sqlUpdate = _server.createSqlUpdate(insertStatement);
// If a non-default UrnPathExtractor is provided, the user MUST specify in their schema generation scripts
// 'ALTER TABLE <table> ADD COLUMN a_urn JSON'.
if (urnExtraction) {
sqlUpdate.setParameter("a_urn", toJsonString(urn));
}
sqlUpdate.setParameter("urn", urn.toString())
.setParameter("lastmodifiedon", new Timestamp(timestamp).toString())
.setParameter("lastmodifiedby", actor);

return sqlUpdate.execute();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,16 @@ public class SQLStatementUtils {
"INSERT INTO %s (urn, a_urn, %s, lastmodifiedon, lastmodifiedby) VALUE (:urn, :a_urn, :metadata, :lastmodifiedon, :lastmodifiedby) "
+ "ON DUPLICATE KEY UPDATE %s = :metadata, lastmodifiedon = :lastmodifiedon, a_urn = :a_urn;";

// Used to create an aspect without upsert behavior.
// If the db tabled already contains a row with a given urn, the insert will fail
private static final String SQL_INSERT_ASPECT_WITH_URN_TEMPLATE =
"INSERT INTO %s (urn, a_urn, %s, lastmodifiedon, lastmodifiedby) VALUE (:urn, :a_urn, :metadata, :lastmodifiedon, :lastmodifiedby);";

// INSERT prefix of the sql statement for inserting into metadata_aspect table with multiple aspects which will be combined with the VALUES suffix
public static final String SQL_INSERT_INTO_ASPECT_WITH_URN = "INSERT INTO %s (urn, a_urn, lastmodifiedon, lastmodifiedby,";
// VALUES suffix of the sql statement for inserting into metadata_aspect table with multiple aspects which will be combined with the INSERT prefix
public static final String SQL_INSERT_ASPECT_VALUES_WITH_URN = "VALUES (:urn, :a_urn, :lastmodifiedon, :lastmodifiedby,";
// closing bracket for the sql statement INSERT prefix
// e.g. INSERT INTO metadata_aspect (urn, a_urn, lastmodifiedon, lastmodifiedby)
public static final String CLOSING_BRACKET = ") ";
// closing bracket with semicolon for the sql statement VALUES suffix
// e.g. VALUES (:urn, :a_urn, :lastmodifiedon, :lastmodifiedby);
public static final String CLOSING_BRACKET_WITH_SEMICOLON = ");";
// "JSON_EXTRACT(%s, '$.gma_deleted') IS NOT NULL" is used to exclude soft-deleted entity which has no lastmodifiedon.
// for details, see the known limitations on https://github.com/linkedin/datahub-gma/pull/311. Same reason for
// SQL_UPDATE_ASPECT_WITH_URN_TEMPLATE
Expand Down Expand Up @@ -139,6 +144,15 @@ private SQLStatementUtils() {
// Util class
}

/**
* Enclose a JSON string in single quotes and escape double quotes.
* @param json JSON string
* @return JSON string enclosed in single quotes
*/
public static String encloseInSingleQuotes(String json) {
return "'" + json.replace("\"", "\\\"") + "'";
}

/**
* Create entity exist SQL statement.
* @param urn entity urn
Expand Down Expand Up @@ -243,15 +257,6 @@ public static <ASPECT extends RecordTemplate> String createAspectUpsertSql(@Nonn
return String.format(urnExtraction ? SQL_UPSERT_ASPECT_WITH_URN_TEMPLATE : SQL_UPSERT_ASPECT_TEMPLATE, tableName, columnName, columnName);
}

public static String createAspectInsertSql(@Nonnull Urn urn,
@Nonnull List<String> aspectClassList, boolean isTestMode) {
final String tableName = isTestMode ? getTestTableName(urn) : getTableName(urn);
List<String> aspectColumns = new ArrayList<>();
aspectClassList.forEach(aspectClassName -> aspectColumns.add(getAspectColumnName(urn.getEntityType(), aspectClassName)));
String columnList = String.join(", ", aspectColumns);
return String.format(SQL_INSERT_ASPECT_WITH_URN_TEMPLATE, tableName, columnList);
}

/**
* Create Update with optimistic locking SQL statement. The SQL UPDATE use old_timestamp as a compareAndSet to check if the current update
* is made on an unchange record. For example: UPDATE table WHERE modifiedon = :oldTimestamp.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.linkedin.metadata.query.IndexSortCriterion;
import com.linkedin.metadata.query.IndexValue;
import com.linkedin.metadata.query.SortOrder;
import com.linkedin.testing.AspectBar;
import com.linkedin.testing.AspectFoo;
import com.linkedin.testing.urn.BurgerUrn;
import com.linkedin.testing.urn.FooUrn;
Expand Down Expand Up @@ -444,4 +445,21 @@ public void testCreateDuplicateAspect() {
assert (duplicateKeyException.getMessage().contains("Duplicate entry"));
}
}

@Test
public void testCreateMultipleAspect() {
FooUrn fooUrn = makeFooUrn(110);
AspectFoo aspectFoo = new AspectFoo().setValue("foo");
AspectBar aspectBar = new AspectBar().setValue("bar");
AuditStamp auditStamp = makeAuditStamp("actor", _now);
List<RecordTemplate> aspectValues = new ArrayList<>();
aspectValues.add(aspectFoo);
aspectValues.add(aspectBar);
List<BaseLocalDAO.AspectCreateLambda<? extends RecordTemplate>> aspectCreateLambdas = new ArrayList<>();
aspectCreateLambdas.add(new BaseLocalDAO.AspectCreateLambda(aspectFoo));
aspectCreateLambdas.add(new BaseLocalDAO.AspectCreateLambda(aspectBar));
int numRowsCreated = _ebeanLocalAccessFoo.create(fooUrn, aspectValues, aspectCreateLambdas, auditStamp, null, false);
// Assert that 1 record is created for asset with FooUrn
assertEquals(numRowsCreated, 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,26 +53,6 @@ public void testCreateUpsertAspectSql() {
assertEquals(SQLStatementUtils.createAspectUpsertSql(fooUrn, AspectFoo.class, false, false), expectedSql);
}

@Test
public void testCreateInsertAspectSql() {
FooUrn fooUrn = makeFooUrn(1);
String expectedSql =
"INSERT INTO metadata_entity_foo (urn, a_urn, a_aspectfoo, lastmodifiedon, lastmodifiedby) VALUE (:urn, "
+ ":a_urn, :metadata, :lastmodifiedon, :lastmodifiedby);";
assertEquals(SQLStatementUtils.createAspectInsertSql(fooUrn, new ArrayList<>(
Collections.singletonList(AspectFoo.class.getCanonicalName())), false), expectedSql);
}

@Test
public void testCreateInsertMultipleAspectSql() {
FooUrn fooUrn = makeFooUrn(1);
String expectedSql =
"INSERT INTO metadata_entity_foo (urn, a_urn, a_aspectfoo, a_aspectbar, lastmodifiedon, lastmodifiedby) VALUE (:urn, "
+ ":a_urn, :metadata, :lastmodifiedon, :lastmodifiedby);";
ArrayList<String> classNames = new ArrayList<>(Arrays.asList(AspectFoo.class.getCanonicalName(), AspectBar.class.getCanonicalName()));
assertEquals(SQLStatementUtils.createAspectInsertSql(fooUrn, classNames, false), expectedSql);
}

@Test
public void testCreateAspectReadSql() {
FooUrn fooUrn1 = makeFooUrn(1);
Expand Down

0 comments on commit a7463f3

Please sign in to comment.