Skip to content

Commit

Permalink
feat(java-sdk): add utils classes to give equivalence with python uti… (
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka authored and sleeperdeep committed Dec 17, 2024
1 parent 2816062 commit 2b0a059
Show file tree
Hide file tree
Showing 13 changed files with 782 additions and 1 deletion.
27 changes: 26 additions & 1 deletion metadata-integration/java/datahub-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
api project(':entity-registry')
api project(':metadata-integration:java:datahub-event')
implementation project(':metadata-integration:java:datahub-schematron:lib')

implementation(externalDependency.kafkaAvroSerializer) {
exclude group: "org.apache.avro"
}
Expand Down Expand Up @@ -60,10 +61,35 @@ task copyAvroSchemas {
compileJava.dependsOn copyAvroSchemas


// Add Python environment validation task
task validatePythonEnv {
doFirst {
def venvPath = System.getProperty('python.venv.path', '../../../metadata-ingestion/venv')
def isWindows = System.getProperty('os.name').toLowerCase().contains('windows')
def pythonExe = isWindows ? "${venvPath}/Scripts/python.exe" : "${venvPath}/bin/python"

def result = exec {
commandLine pythonExe, "-c", "import sys; print(sys.executable)"
ignoreExitValue = true
standardOutput = new ByteArrayOutputStream()
errorOutput = new ByteArrayOutputStream()
}

if (result.exitValue != 0) {
throw new GradleException("Python virtual environment not properly set up at ${venvPath}")
}
}
}

test {
// to avoid simultaneous executions of tests when complete build is run
mustRunAfter(":metadata-io:test")
useJUnit()
// Add Python environment configuration
dependsOn validatePythonEnv
dependsOn tasks.getByPath(":metadata-ingestion:installDev")
systemProperty 'python.venv.path', System.getProperty('python.venv.path', '../../../metadata-ingestion/venv')
finalizedBy jacocoTestReport
}

task checkShadowJar(type: Exec) {
Expand Down Expand Up @@ -111,7 +137,6 @@ shadowJar {
relocate 'org.checkerframework', 'datahub.shaded.org.checkerframework'
relocate 'com.google.errorprone', 'datahub.shaded.com.google.errorprone'
// Below jars added for kafka emitter only
// relocate 'org.apache.avro', 'datahub.shaded.org.apache.avro'
relocate 'com.thoughtworks.paranamer', 'datahub.shaded.com.thoughtworks.paranamer'
relocate 'org.xerial.snappy', 'datahub.shaded.org.xerial.snappy'
relocate 'org.apache.kafka', 'datahub.shaded.org.apache.kafka'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import click
from typing import Dict, Any
import json
from dataclasses import dataclass
from abc import ABC, abstractmethod
from datahub.emitter.mcp_builder import DatabaseKey, SchemaKey


class URNGenerator(ABC):
@abstractmethod
def generate(self, args: Dict[str, Any]) -> str:
pass


class DatabaseURNGenerator(URNGenerator):
def generate(self, args: Dict[str, Any]) -> str:
required_fields = ["platform", "database"]
for field in required_fields:
if field not in args:
raise ValueError(f"Missing required field: {field}")

all_fields = required_fields + ["instance"]
for arg in args:
if arg not in all_fields:
raise ValueError(f"Invalid field: {arg}")

database_key = DatabaseKey(
platform=args["platform"],
instance=args.get("instance"),
database=args["database"],
)
return database_key.as_urn()


class SchemaURNGenerator(URNGenerator):
def generate(self, args: Dict[str, Any]) -> str:
required_fields = ["platform", "database", "schema"]
all_fields = required_fields + ["instance", "env"]
for field in required_fields:
if field not in args:
raise ValueError(f"Missing required field: {field}")

for arg in args:
if arg not in all_fields:
raise ValueError(f"Invalid field: {arg}")

schema_key = SchemaKey(
platform=args["platform"],
instance=args.get("instance"),
env=args.get("env"),
database=args["database"],
schema=args["schema"],
)
return schema_key.as_urn()


URN_GENERATORS = {
"database": DatabaseURNGenerator(),
"schema": SchemaURNGenerator(),
}


def validate_key_value(ctx, param, value):
if not value:
return {}

result = {}
for item in value:
try:
key, val = item.split("=", 1)
result[key.strip()] = val.strip()
except ValueError:
raise click.BadParameter(
f"Invalid key-value pair: {item}. Format should be key=value"
)
return result


@click.command()
@click.option(
"--container-type",
type=click.Choice(["database", "schema"]),
required=True,
help="The type of container to generate a URN for",
)
@click.option(
"--param",
"-p",
multiple=True,
callback=validate_key_value,
help="Parameters in key=value format. Can be used multiple times.",
)
@click.option(
"--output-format",
type=click.Choice(["text", "json"]),
default="text",
help="Output format for the URN",
)
def generate_urn(container_type: str, param: Dict[str, str], output_format: str):
"""Generate URNs for different types of containers.
Example usage:
./container_urn_generator.py --container-type database -p platform=test-platform -p instance=DEV -p database=test-database
"""
try:
generator = URN_GENERATORS[container_type]
urn = generator.generate(param)

if output_format == "json":
result = {"urn": urn, "container_type": container_type, "parameters": param}
click.echo(json.dumps(result, indent=2))
else:
click.echo(urn)

except KeyError as e:
raise click.UsageError(f"Unknown container type: {container_type}")
except ValueError as e:
raise click.UsageError(str(e))


if __name__ == "__main__":
generate_urn()
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.datahubproject.models.util;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.linkedin.common.urn.Urn;
import java.util.HashMap;
import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public abstract class ContainerKey extends DataHubKey {
private String platform;
private String instance;

private static final String URN_PREFIX = "urn:li:container:";
private static final String URN_ENTITY = "container";
private static final String PLATFORM_MAP_FIELD = "platform";
private static final String INSTANCE_MAP_FIELD = "instance";

@Override
public Map<String, String> guidDict() {

Map<String, String> bag = new HashMap<>();
if (platform != null) bag.put(PLATFORM_MAP_FIELD, platform);
if (instance != null) bag.put(INSTANCE_MAP_FIELD, instance);

return bag;
}

public String asUrnString() {
String guid = guid();
return URN_PREFIX + guid;
}

public Urn asUrn() {
return Urn.createFromTuple(URN_ENTITY, guid());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.datahubproject.models.util;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.security.MessageDigest;
import java.util.Map;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DataHubGuidGenerator {
private static final ObjectMapper objectMapper = new ObjectMapper();

@SneakyThrows
public static String dataHubGuid(Map<String, String> obj) {
// Configure ObjectMapper for consistent serialization
objectMapper.configure(
com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);

// Convert map to JSON string with sorted keys
String jsonKey = objectMapper.writeValueAsString(obj);

// Generate MD5 hash
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] hashBytes = md.digest(jsonKey.getBytes());

// Convert byte array to hexadecimal string
StringBuilder hexString = new StringBuilder();
for (byte hashByte : hashBytes) {
String hex = Integer.toHexString(0xff & hashByte);
if (hex.length() == 1) {
hexString.append('0');
}
hexString.append(hex);
}

if (log.isDebugEnabled()) {
log.debug("DataHub Guid for {} is : {}", jsonKey, hexString);
}
return hexString.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.datahubproject.models.util;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
import lombok.Data;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
@JsonInclude(JsonInclude.Include.NON_NULL)
public abstract class DataHubKey {
// Static ObjectMapper instance since it's thread-safe and expensive to create
protected static final ObjectMapper MAPPER = new ObjectMapper();
// Static TypeReference instance since it doesn't change
private static final TypeReference<Map<String, String>> MAP_TYPE_REFERENCE =
new TypeReference<Map<String, String>>() {};

static {
MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
}

public Map<String, String> guidDict() {
return MAPPER.convertValue(this, MAP_TYPE_REFERENCE);
}

public String guid() {
Map<String, String> bag = guidDict();
return DataHubGuidGenerator.dataHubGuid(bag);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.datahubproject.models.util;

import com.fasterxml.jackson.annotation.JsonInclude;
import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class DatabaseKey extends ContainerKey {
private String database;

private static final String DATABASE_MAP_FIELD = "database";

@Override
public Map<String, String> guidDict() {
// Get the parent's GUID dictionary first
Map<String, String> bag = super.guidDict();

// Add the database field if it's not null
if (database != null) {
bag.put(DATABASE_MAP_FIELD, database);
}

return bag;
}
}
Loading

0 comments on commit 2b0a059

Please sign in to comment.