Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/z fabric mirroring #86

Draft
wants to merge 6 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions CluedIn.Connector.AzureDataLake.sln
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Connector.DataLake.Common",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Connector.OneLake", "src\Connector.OneLake\Connector.OneLake.csproj", "{1AA8B845-9762-47DD-B0E4-3B2C20C7486A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Connector.AzureDatabricks", "src\Connector.AzureDatabricks\Connector.AzureDatabricks.csproj", "{B0A8FAB9-8809-492F-A2B1-C141CE53A724}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Connector.AzureDatabricks", "src\Connector.AzureDatabricks\Connector.AzureDatabricks.csproj", "{B0A8FAB9-8809-492F-A2B1-C141CE53A724}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Connector.SynapseDataEngineering", "src\Connector.SynapseDataEngineering\Connector.SynapseDataEngineering.csproj", "{61607CFA-7A18-4DD6-9512-83B620B288EB}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Connector.SynapseDataEngineering", "src\Connector.SynapseDataEngineering\Connector.SynapseDataEngineering.csproj", "{61607CFA-7A18-4DD6-9512-83B620B288EB}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Connector.AzureAIStudio", "src\Connector.AzureAIStudio\Connector.AzureAIStudio.csproj", "{5F8EDA0E-5F95-4A7A-A7F3-217210674FDE}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Connector.AzureAIStudio", "src\Connector.AzureAIStudio\Connector.AzureAIStudio.csproj", "{5F8EDA0E-5F95-4A7A-A7F3-217210674FDE}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Connector.FabricMirroring", "src\Connector.FabricMirroring\Connector.FabricMirroring.csproj", "{B6B64EFA-397F-467F-984E-B7FE73ED92A4}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -97,6 +99,10 @@ Global
{5F8EDA0E-5F95-4A7A-A7F3-217210674FDE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5F8EDA0E-5F95-4A7A-A7F3-217210674FDE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5F8EDA0E-5F95-4A7A-A7F3-217210674FDE}.Release|Any CPU.Build.0 = Release|Any CPU
{B6B64EFA-397F-467F-984E-B7FE73ED92A4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B6B64EFA-397F-467F-984E-B7FE73ED92A4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B6B64EFA-397F-467F-984E-B7FE73ED92A4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B6B64EFA-397F-467F-984E-B7FE73ED92A4}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -116,6 +122,7 @@ Global
{B0A8FAB9-8809-492F-A2B1-C141CE53A724} = {5256D9B9-8A1D-480D-A8F0-1A69AFA59B31}
{61607CFA-7A18-4DD6-9512-83B620B288EB} = {5256D9B9-8A1D-480D-A8F0-1A69AFA59B31}
{5F8EDA0E-5F95-4A7A-A7F3-217210674FDE} = {5256D9B9-8A1D-480D-A8F0-1A69AFA59B31}
{B6B64EFA-397F-467F-984E-B7FE73ED92A4} = {5256D9B9-8A1D-480D-A8F0-1A69AFA59B31}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {E6A866CB-824C-4271-8EA6-053B7FC4B134}
Expand Down
1 change: 1 addition & 0 deletions Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<PackageReference Update="CsvHelper" Version="15.0.0" />
<PackageReference Update="xunit" Version="2.4.1" />
<PackageReference Update="xunit.runner.visualstudio" Version="2.4.1" />
<PackageReference Update="Microsoft.Fabric.Api" Version="1.0.0-beta.16" />
<PackageReference Update="Moq" Version="4.13.1" />
<PackageReference Update="NCrontab" Version="3.3.3" />
<PackageReference Update="Parquet.Net" Version="4.23.5" />
Expand Down
88 changes: 66 additions & 22 deletions src/Connector.DataLake.Common/Connector/DataLakeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ namespace CluedIn.Connector.DataLake.Common.Connector
{
public abstract class DataLakeClient : IDataLakeClient
{
public async Task<DataLakeDirectoryClient> EnsureDataLakeDirectoryExist(IDataLakeJobData configuration)
public Task<DataLakeDirectoryClient> EnsureDataLakeDirectoryExist(IDataLakeJobData configuration)
{
var fileSystemClient = await GetFileSystemClientAsync(configuration);
var directory = GetDirectory(configuration);
var directoryClient = fileSystemClient.GetDirectoryClient(directory);
if (!await directoryClient.ExistsAsync())
{
directoryClient = await fileSystemClient.CreateDirectoryAsync(directory);
}
return EnsureDataLakeDirectoryExist(configuration, string.Empty);
}

public async Task<DataLakeDirectoryClient> EnsureDataLakeDirectoryExist(IDataLakeJobData configuration, string subDirectory)
{
var fileSystemClient = await GetFileSystemClientAsync(configuration, ensureExists: true);
var directoryClient = await GetDirectoryClientAsync(configuration, fileSystemClient, subDirectory, ensureExists: true);

return directoryClient;
}
Expand Down Expand Up @@ -68,19 +68,20 @@ protected static TJobData CastJobData<TJobData>(IDataLakeJobData jobData) where
return castedJobData;
}

public async Task<bool> FileInPathExists(IDataLakeJobData configuration, string fileName)
public Task<bool> FileInPathExists(IDataLakeJobData configuration, string fileName)
{
var serviceClient = GetDataLakeServiceClient(configuration);
var fileSystemName = GetFileSystemName(configuration);
var fileSystemClient = serviceClient.GetFileSystemClient(fileSystemName);
return FileInPathExists(configuration, fileName, string.Empty);
}

public async Task<bool> FileInPathExists(IDataLakeJobData configuration, string fileName, string subDirectory)
{
var fileSystemClient = await GetFileSystemClientAsync(configuration, ensureExists: false);
if (!await fileSystemClient.ExistsAsync())
{
return false;
}

var directory = GetDirectory(configuration);
var directoryClient = fileSystemClient.GetDirectoryClient(directory);
var directoryClient = await GetDirectoryClientAsync(configuration, fileSystemClient, subDirectory, ensureExists: false);
if (!await directoryClient.ExistsAsync())
{
return false;
Expand All @@ -90,19 +91,38 @@ public async Task<bool> FileInPathExists(IDataLakeJobData configuration, string
return await dataLakeFileClient.ExistsAsync();
}

public async Task<PathProperties> GetFilePathProperties(IDataLakeJobData configuration, string fileName)
public Task<bool> DirectoryExists(IDataLakeJobData configuration)
{
var serviceClient = GetDataLakeServiceClient(configuration);
var fileSystemName = GetFileSystemName(configuration);
var fileSystemClient = serviceClient.GetFileSystemClient(fileSystemName);
return DirectoryExists(configuration, string.Empty);
}

public async Task<bool> DirectoryExists(IDataLakeJobData configuration, string subDirectory)
{
var fileSystemClient = await GetFileSystemClientAsync(configuration, ensureExists: false);
if (!await fileSystemClient.ExistsAsync())
{
return false;
}

var directoryClient = await GetDirectoryClientAsync(configuration, fileSystemClient, subDirectory, ensureExists: false);
return await directoryClient.ExistsAsync();
}

public Task<PathProperties> GetFilePathProperties(IDataLakeJobData configuration, string fileName)
{
return GetFilePathProperties(configuration, fileName, string.Empty);
}

public async Task<PathProperties> GetFilePathProperties(IDataLakeJobData configuration, string fileName, string subDirectory)
{
var fileSystemClient = await GetFileSystemClientAsync(configuration, ensureExists: false);

if (!await fileSystemClient.ExistsAsync())
{
return null;
}

var directory = GetDirectory(configuration);
var directoryClient = fileSystemClient.GetDirectoryClient(directory);
var directoryClient = await GetDirectoryClientAsync(configuration, fileSystemClient, subDirectory, ensureExists: false);
if (!await directoryClient.ExistsAsync())
{
return null;
Expand All @@ -117,13 +137,37 @@ public async Task<PathProperties> GetFilePathProperties(IDataLakeJobData configu
return await dataLakeFileClient.GetPropertiesAsync();
}

private async Task<DataLakeDirectoryClient> GetDirectoryClientAsync(
IDataLakeJobData configuration,
DataLakeFileSystemClient fileSystemClient,
string subDirectory,
bool ensureExists)
{
var directory = GetDirectory(configuration);
var directoryClient = fileSystemClient.GetDirectoryClient(directory);
if (string.IsNullOrWhiteSpace(subDirectory))
{
return directoryClient;
}

directoryClient = directoryClient.GetSubDirectoryClient(subDirectory);

if (ensureExists && !await directoryClient.ExistsAsync())
{
directoryClient = await fileSystemClient.CreateDirectoryAsync(directoryClient.Path);
}

return directoryClient;
}

private async Task<DataLakeFileSystemClient> GetFileSystemClientAsync(
IDataLakeJobData configuration)
IDataLakeJobData configuration,
bool ensureExists)
{
var dataLakeServiceClient = GetDataLakeServiceClient(configuration);
var fileSystemName = GetFileSystemName(configuration);
var dataLakeFileSystemClient = dataLakeServiceClient.GetFileSystemClient(fileSystemName);
if (!await dataLakeFileSystemClient.ExistsAsync())
if (ensureExists && !await dataLakeFileSystemClient.ExistsAsync())
{
dataLakeFileSystemClient = await dataLakeServiceClient.CreateFileSystemAsync(fileSystemName);
}
Expand Down
Loading