Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(java-sdk): add utils classes to give equivalence with python uti… #12002

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion metadata-integration/java/datahub-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
api project(':entity-registry')
api project(':metadata-integration:java:datahub-event')
implementation project(':metadata-integration:java:datahub-schematron:lib')

implementation(externalDependency.kafkaAvroSerializer) {
exclude group: "org.apache.avro"
}
Expand Down Expand Up @@ -60,10 +61,35 @@ task copyAvroSchemas {
compileJava.dependsOn copyAvroSchemas


// Add Python environment validation task
task validatePythonEnv {
doFirst {
def venvPath = System.getProperty('python.venv.path', '../../../metadata-ingestion/venv')
def isWindows = System.getProperty('os.name').toLowerCase().contains('windows')
def pythonExe = isWindows ? "${venvPath}/Scripts/python.exe" : "${venvPath}/bin/python"

def result = exec {
commandLine pythonExe, "-c", "import sys; print(sys.executable)"
ignoreExitValue = true
standardOutput = new ByteArrayOutputStream()
errorOutput = new ByteArrayOutputStream()
}

if (result.exitValue != 0) {
throw new GradleException("Python virtual environment not properly set up at ${venvPath}")
}
}
}

test {
// to avoid simultaneous executions of tests when complete build is run
mustRunAfter(":metadata-io:test")
useJUnit()
// Add Python environment configuration
dependsOn validatePythonEnv
dependsOn tasks.getByPath(":metadata-ingestion:installDev")
systemProperty 'python.venv.path', System.getProperty('python.venv.path', '../../../metadata-ingestion/venv')
finalizedBy jacocoTestReport
}

task checkShadowJar(type: Exec) {
Expand Down Expand Up @@ -111,7 +137,6 @@ shadowJar {
relocate 'org.checkerframework', 'datahub.shaded.org.checkerframework'
relocate 'com.google.errorprone', 'datahub.shaded.com.google.errorprone'
// Below jars added for kafka emitter only
// relocate 'org.apache.avro', 'datahub.shaded.org.apache.avro'
relocate 'com.thoughtworks.paranamer', 'datahub.shaded.com.thoughtworks.paranamer'
relocate 'org.xerial.snappy', 'datahub.shaded.org.xerial.snappy'
relocate 'org.apache.kafka', 'datahub.shaded.org.apache.kafka'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import click
from typing import Dict, Any
import json
from dataclasses import dataclass
from abc import ABC, abstractmethod
from datahub.emitter.mcp_builder import DatabaseKey, SchemaKey


class URNGenerator(ABC):
@abstractmethod
def generate(self, args: Dict[str, Any]) -> str:
pass


class DatabaseURNGenerator(URNGenerator):
def generate(self, args: Dict[str, Any]) -> str:
required_fields = ["platform", "database"]
for field in required_fields:
if field not in args:
raise ValueError(f"Missing required field: {field}")

all_fields = required_fields + ["instance"]
for arg in args:
if arg not in all_fields:
raise ValueError(f"Invalid field: {arg}")

database_key = DatabaseKey(
platform=args["platform"],
instance=args.get("instance"),
database=args["database"],
)
return database_key.as_urn()


class SchemaURNGenerator(URNGenerator):
def generate(self, args: Dict[str, Any]) -> str:
required_fields = ["platform", "database", "schema"]
all_fields = required_fields + ["instance", "env"]
for field in required_fields:
if field not in args:
raise ValueError(f"Missing required field: {field}")

for arg in args:
if arg not in all_fields:
raise ValueError(f"Invalid field: {arg}")

schema_key = SchemaKey(
platform=args["platform"],
instance=args.get("instance"),
env=args.get("env"),
database=args["database"],
schema=args["schema"],
)
return schema_key.as_urn()


URN_GENERATORS = {
"database": DatabaseURNGenerator(),
"schema": SchemaURNGenerator(),
}


def validate_key_value(ctx, param, value):
if not value:
return {}

result = {}
for item in value:
try:
key, val = item.split("=", 1)
result[key.strip()] = val.strip()
except ValueError:
raise click.BadParameter(
f"Invalid key-value pair: {item}. Format should be key=value"
)
return result


@click.command()
@click.option(
"--container-type",
type=click.Choice(["database", "schema"]),
required=True,
help="The type of container to generate a URN for",
)
@click.option(
"--param",
"-p",
multiple=True,
callback=validate_key_value,
help="Parameters in key=value format. Can be used multiple times.",
)
@click.option(
"--output-format",
type=click.Choice(["text", "json"]),
default="text",
help="Output format for the URN",
)
def generate_urn(container_type: str, param: Dict[str, str], output_format: str):
"""Generate URNs for different types of containers.

Example usage:
./container_urn_generator.py --container-type database -p platform=test-platform -p instance=DEV -p database=test-database
"""
try:
generator = URN_GENERATORS[container_type]
urn = generator.generate(param)

if output_format == "json":
result = {"urn": urn, "container_type": container_type, "parameters": param}
click.echo(json.dumps(result, indent=2))
else:
click.echo(urn)

except KeyError as e:
raise click.UsageError(f"Unknown container type: {container_type}")
except ValueError as e:
raise click.UsageError(str(e))


if __name__ == "__main__":
generate_urn()
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.datahubproject.models.util;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.linkedin.common.urn.Urn;
import java.util.HashMap;
import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public abstract class ContainerKey extends DataHubKey {
private String platform;
private String instance;

private static final String URN_PREFIX = "urn:li:container:";
private static final String URN_ENTITY = "container";
private static final String PLATFORM_MAP_FIELD = "platform";
private static final String INSTANCE_MAP_FIELD = "instance";

@Override
public Map<String, String> guidDict() {

Map<String, String> bag = new HashMap<>();
if (platform != null) bag.put(PLATFORM_MAP_FIELD, platform);
if (instance != null) bag.put(INSTANCE_MAP_FIELD, instance);

return bag;
}

public String asUrnString() {
String guid = guid();
return URN_PREFIX + guid;
}

public Urn asUrn() {
return Urn.createFromTuple(URN_ENTITY, guid());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.datahubproject.models.util;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.security.MessageDigest;
import java.util.Map;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DataHubGuidGenerator {
private static final ObjectMapper objectMapper = new ObjectMapper();

@SneakyThrows
public static String dataHubGuid(Map<String, String> obj) {
// Configure ObjectMapper for consistent serialization
objectMapper.configure(
com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);

// Convert map to JSON string with sorted keys
String jsonKey = objectMapper.writeValueAsString(obj);

// Generate MD5 hash
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] hashBytes = md.digest(jsonKey.getBytes());

// Convert byte array to hexadecimal string
StringBuilder hexString = new StringBuilder();
for (byte hashByte : hashBytes) {
String hex = Integer.toHexString(0xff & hashByte);
if (hex.length() == 1) {
hexString.append('0');
}
hexString.append(hex);
}

if (log.isDebugEnabled()) {
log.debug("DataHub Guid for {} is : {}", jsonKey, hexString);
}
return hexString.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.datahubproject.models.util;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
import lombok.Data;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
@JsonInclude(JsonInclude.Include.NON_NULL)
public abstract class DataHubKey {
// Static ObjectMapper instance since it's thread-safe and expensive to create
protected static final ObjectMapper MAPPER = new ObjectMapper();
// Static TypeReference instance since it doesn't change
private static final TypeReference<Map<String, String>> MAP_TYPE_REFERENCE =
new TypeReference<Map<String, String>>() {};

static {
MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
}

public Map<String, String> guidDict() {
return MAPPER.convertValue(this, MAP_TYPE_REFERENCE);
}

public String guid() {
Map<String, String> bag = guidDict();
return DataHubGuidGenerator.dataHubGuid(bag);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.datahubproject.models.util;

import com.fasterxml.jackson.annotation.JsonInclude;
import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class DatabaseKey extends ContainerKey {
private String database;

private static final String DATABASE_MAP_FIELD = "database";

@Override
public Map<String, String> guidDict() {
// Get the parent's GUID dictionary first
Map<String, String> bag = super.guidDict();

// Add the database field if it's not null
if (database != null) {
bag.put(DATABASE_MAP_FIELD, database);
}

return bag;
}
}
Loading
Loading