Skip to content

Commit 598bc05

Browse files
authored
[#5996] feat(python-client): Using credentail in python GVFS client. (#5997)
### What changes were proposed in this pull request? Support using credentail in GVFS python client for cloud storage. ### Why are the changes needed? It's need. Fix: #5996 ### Does this PR introduce _any_ user-facing change? N/A ### How was this patch tested? New it and test locally
1 parent 30e21d1 commit 598bc05

11 files changed

+942
-68
lines changed

api/src/main/java/org/apache/gravitino/credential/ADLSTokenCredential.java

+1
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public long expireTimeInMs() {
7474
public Map<String, String> credentialInfo() {
7575
return (new ImmutableMap.Builder<String, String>())
7676
.put(GRAVITINO_ADLS_SAS_TOKEN, sasToken)
77+
.put(GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME, accountName)
7778
.build();
7879
}
7980

clients/client-python/gravitino/client/metadata_object_credential_operations.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def __init__(
4848
metadata_object_type = metadata_object.type().value
4949
metadata_object_name = metadata_object.name()
5050
self._request_path = (
51-
f"api/metalakes/{metalake_name}objects/{metadata_object_type}/"
51+
f"api/metalakes/{metalake_name}/objects/{metadata_object_type}/"
5252
f"{metadata_object_name}/credentials"
5353
)
5454

clients/client-python/gravitino/filesystem/gvfs.py

+255-47
Large diffs are not rendered by default.

clients/client-python/gravitino/filesystem/gvfs_config.py

+8
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,11 @@ class GVFSConfig:
4444

4545
GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME = "abs_account_name"
4646
GVFS_FILESYSTEM_AZURE_ACCOUNT_KEY = "abs_account_key"
47+
48+
# This configuration marks the expired time of the credential. For instance, if the credential
49+
# fetched from Gravitino server has expired time of 3600 seconds, and the credential_expired_time_ration is 0.5
50+
# then the credential will be considered as expired after 1800 seconds and will try to retrieve a new credential.
51+
GVFS_FILESYSTEM_CREDENTIAL_EXPIRED_TIME_RATIO = "credential_expiration_ratio"
52+
53+
# The default value of the credential_expired_time_ratio is 0.5
54+
DEFAULT_CREDENTIAL_EXPIRED_TIME_RATIO = 0.5

clients/client-python/tests/integration/test_gvfs_with_abs.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@
3333
)
3434
from gravitino.exceptions.base import GravitinoRuntimeException
3535
from gravitino.filesystem.gvfs_config import GVFSConfig
36-
from gravitino.filesystem.gvfs import StorageType
37-
3836

3937
logger = logging.getLogger(__name__)
4038

@@ -281,7 +279,7 @@ def test_mkdir(self):
281279
self.assertFalse(self.fs.exists(mkdir_actual_dir))
282280
self.assertFalse(fs.exists(mkdir_dir))
283281

284-
self.assertFalse(self.fs.exists(f"{StorageType.ABS.value}://{new_bucket}"))
282+
self.assertFalse(self.fs.exists("abfss://{new_bucket}"))
285283

286284
def test_makedirs(self):
287285
mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
@@ -309,7 +307,7 @@ def test_makedirs(self):
309307
self.assertFalse(self.fs.exists(mkdir_actual_dir))
310308

311309
self.assertFalse(fs.exists(mkdir_dir))
312-
self.assertFalse(self.fs.exists(f"{StorageType.ABS.value}://{new_bucket}"))
310+
self.assertFalse(self.fs.exists(f"abfss://{new_bucket}"))
313311

314312
def test_ls(self):
315313
ls_dir = self.fileset_gvfs_location + "/test_ls"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
import logging
19+
import os
20+
from random import randint
21+
import unittest
22+
23+
24+
from adlfs import AzureBlobFileSystem
25+
26+
from gravitino import (
27+
gvfs,
28+
GravitinoClient,
29+
Catalog,
30+
Fileset,
31+
)
32+
from gravitino.filesystem.gvfs_config import GVFSConfig
33+
from tests.integration.test_gvfs_with_abs import TestGvfsWithABS
34+
35+
36+
logger = logging.getLogger(__name__)
37+
38+
39+
def azure_abs_with_credential_is_prepared():
40+
return (
41+
os.environ.get("ABS_ACCOUNT_NAME_FOR_CREDENTIAL")
42+
and os.environ.get("ABS_ACCOUNT_KEY_FOR_CREDENTIAL")
43+
and os.environ.get("ABS_CONTAINER_NAME_FOR_CREDENTIAL")
44+
and os.environ.get("ABS_TENANT_ID_FOR_CREDENTIAL")
45+
and os.environ.get("ABS_CLIENT_ID_FOR_CREDENTIAL")
46+
and os.environ.get("ABS_CLIENT_SECRET_FOR_CREDENTIAL")
47+
)
48+
49+
50+
@unittest.skipUnless(
51+
azure_abs_with_credential_is_prepared(),
52+
"Azure Blob Storage credential test is not prepared.",
53+
)
54+
class TestGvfsWithCredentialABS(TestGvfsWithABS):
55+
# Before running this test, please set the make sure azure-bundle-xxx.jar has been
56+
# copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory
57+
azure_abs_account_key = os.environ.get("ABS_ACCOUNT_KEY_FOR_CREDENTIAL")
58+
azure_abs_account_name = os.environ.get("ABS_ACCOUNT_NAME_FOR_CREDENTIAL")
59+
azure_abs_container_name = os.environ.get("ABS_CONTAINER_NAME_FOR_CREDENTIAL")
60+
azure_abs_tenant_id = os.environ.get("ABS_TENANT_ID_FOR_CREDENTIAL")
61+
azure_abs_client_id = os.environ.get("ABS_CLIENT_ID_FOR_CREDENTIAL")
62+
azure_abs_client_secret = os.environ.get("ABS_CLIENT_SECRET_FOR_CREDENTIAL")
63+
64+
metalake_name: str = "TestGvfsWithCredentialABS_metalake" + str(randint(1, 10000))
65+
66+
def setUp(self):
67+
self.options = {
68+
GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME: self.azure_abs_account_name,
69+
GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_KEY: self.azure_abs_account_key,
70+
}
71+
72+
@classmethod
73+
def _init_test_entities(cls):
74+
cls.gravitino_admin_client.create_metalake(
75+
name=cls.metalake_name, comment="", properties={}
76+
)
77+
cls.gravitino_client = GravitinoClient(
78+
uri="http://localhost:8090", metalake_name=cls.metalake_name
79+
)
80+
81+
cls.config = {}
82+
cls.conf = {}
83+
catalog = cls.gravitino_client.create_catalog(
84+
name=cls.catalog_name,
85+
catalog_type=Catalog.Type.FILESET,
86+
provider=cls.catalog_provider,
87+
comment="",
88+
properties={
89+
"filesystem-providers": "abs",
90+
"azure-storage-account-name": cls.azure_abs_account_name,
91+
"azure-storage-account-key": cls.azure_abs_account_key,
92+
"azure-tenant-id": cls.azure_abs_tenant_id,
93+
"azure-client-id": cls.azure_abs_client_id,
94+
"azure-client-secret": cls.azure_abs_client_secret,
95+
"credential-providers": "adls-token",
96+
},
97+
)
98+
catalog.as_schemas().create_schema(
99+
schema_name=cls.schema_name, comment="", properties={}
100+
)
101+
102+
cls.fileset_storage_location: str = (
103+
f"{cls.azure_abs_container_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
104+
)
105+
cls.fileset_gvfs_location = (
106+
f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
107+
)
108+
catalog.as_fileset_catalog().create_fileset(
109+
ident=cls.fileset_ident,
110+
fileset_type=Fileset.Type.MANAGED,
111+
comment=cls.fileset_comment,
112+
storage_location=(
113+
f"abfss://{cls.azure_abs_container_name}@{cls.azure_abs_account_name}.dfs.core.windows.net/"
114+
f"{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
115+
),
116+
properties=cls.fileset_properties,
117+
)
118+
119+
cls.fs = AzureBlobFileSystem(
120+
account_name=cls.azure_abs_account_name,
121+
account_key=cls.azure_abs_account_key,
122+
)
123+
124+
# As the permission provided by the dynamic token is smaller compared to the permission provided by the static token
125+
# like account key and account name, the test case will fail if we do not override the test case.
126+
def test_mkdir(self):
127+
mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
128+
mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
129+
fs = gvfs.GravitinoVirtualFileSystem(
130+
server_uri="http://localhost:8090",
131+
metalake_name=self.metalake_name,
132+
options=self.options,
133+
**self.conf,
134+
)
135+
136+
# it actually takes no effect.
137+
self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
138+
139+
# check whether it will automatically create the bucket if 'create_parents'
140+
# is set to True.
141+
new_bucket = self.azure_abs_container_name + "2"
142+
mkdir_actual_dir = mkdir_actual_dir.replace(
143+
self.azure_abs_container_name, new_bucket
144+
)
145+
self.fs.mkdir(mkdir_actual_dir, create_parents=True)
146+
147+
self.assertFalse(self.fs.exists(mkdir_actual_dir))
148+
149+
self.assertTrue(self.fs.exists(f"abfss://{new_bucket}"))
150+
151+
def test_makedirs(self):
152+
mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
153+
mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
154+
fs = gvfs.GravitinoVirtualFileSystem(
155+
server_uri="http://localhost:8090",
156+
metalake_name=self.metalake_name,
157+
options=self.options,
158+
**self.conf,
159+
)
160+
161+
# it actually takes no effect.
162+
self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
163+
164+
# check whether it will automatically create the bucket if 'create_parents'
165+
# is set to True.
166+
new_bucket = self.azure_abs_container_name + "1"
167+
new_mkdir_actual_dir = mkdir_actual_dir.replace(
168+
self.azure_abs_container_name, new_bucket
169+
)
170+
self.fs.makedirs(new_mkdir_actual_dir)
171+
self.assertFalse(self.fs.exists(mkdir_actual_dir))

clients/client-python/tests/integration/test_gvfs_with_gcs.py

+4-5
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
logger = logging.getLogger(__name__)
3737

3838

39-
def oss_is_configured():
39+
def gcs_is_configured():
4040
return all(
4141
[
4242
os.environ.get("GCS_SERVICE_ACCOUNT_JSON_PATH") is not None,
@@ -45,7 +45,7 @@ def oss_is_configured():
4545
)
4646

4747

48-
@unittest.skipUnless(oss_is_configured(), "GCS is not configured.")
48+
@unittest.skipUnless(gcs_is_configured(), "GCS is not configured.")
4949
class TestGvfsWithGCS(TestGvfsWithHDFS):
5050
# Before running this test, please set the make sure gcp-bundle-x.jar has been
5151
# copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory
@@ -254,11 +254,10 @@ def test_mkdir(self):
254254
new_bucket = self.bucket_name + "1"
255255
mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket)
256256
mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name, new_bucket)
257-
fs.mkdir(mkdir_dir, create_parents=True)
258257

258+
with self.assertRaises(OSError):
259+
fs.mkdir(mkdir_dir, create_parents=True)
259260
self.assertFalse(self.fs.exists(mkdir_actual_dir))
260-
self.assertFalse(fs.exists(mkdir_dir))
261-
self.assertFalse(self.fs.exists("gs://" + new_bucket))
262261

263262
def test_makedirs(self):
264263
mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
import logging
19+
import os
20+
from random import randint
21+
import unittest
22+
23+
from gcsfs import GCSFileSystem
24+
25+
from gravitino import Catalog, Fileset, GravitinoClient
26+
from gravitino.filesystem import gvfs
27+
from tests.integration.test_gvfs_with_gcs import TestGvfsWithGCS
28+
29+
logger = logging.getLogger(__name__)
30+
31+
32+
def gcs_with_credential_is_configured():
33+
return all(
34+
[
35+
os.environ.get("GCS_SERVICE_ACCOUNT_JSON_PATH_FOR_CREDENTIAL") is not None,
36+
os.environ.get("GCS_BUCKET_NAME_FOR_CREDENTIAL") is not None,
37+
]
38+
)
39+
40+
41+
@unittest.skipUnless(gcs_with_credential_is_configured(), "GCS is not configured.")
42+
class TestGvfsWithGCSCredential(TestGvfsWithGCS):
43+
# Before running this test, please set the make sure gcp-bundle-x.jar has been
44+
# copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory
45+
key_file = os.environ.get("GCS_SERVICE_ACCOUNT_JSON_PATH_FOR_CREDENTIAL")
46+
bucket_name = os.environ.get("GCS_BUCKET_NAME_FOR_CREDENTIAL")
47+
metalake_name: str = "TestGvfsWithGCSCredential_metalake" + str(randint(1, 10000))
48+
49+
@classmethod
50+
def _init_test_entities(cls):
51+
cls.gravitino_admin_client.create_metalake(
52+
name=cls.metalake_name, comment="", properties={}
53+
)
54+
cls.gravitino_client = GravitinoClient(
55+
uri="http://localhost:8090", metalake_name=cls.metalake_name
56+
)
57+
58+
cls.config = {}
59+
cls.conf = {}
60+
catalog = cls.gravitino_client.create_catalog(
61+
name=cls.catalog_name,
62+
catalog_type=Catalog.Type.FILESET,
63+
provider=cls.catalog_provider,
64+
comment="",
65+
properties={
66+
"filesystem-providers": "gcs",
67+
"gcs-credential-file-path": cls.key_file,
68+
"gcs-service-account-file": cls.key_file,
69+
"credential-providers": "gcs-token",
70+
},
71+
)
72+
catalog.as_schemas().create_schema(
73+
schema_name=cls.schema_name, comment="", properties={}
74+
)
75+
76+
cls.fileset_storage_location: str = (
77+
f"gs://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
78+
)
79+
cls.fileset_gvfs_location = (
80+
f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
81+
)
82+
catalog.as_fileset_catalog().create_fileset(
83+
ident=cls.fileset_ident,
84+
fileset_type=Fileset.Type.MANAGED,
85+
comment=cls.fileset_comment,
86+
storage_location=cls.fileset_storage_location,
87+
properties=cls.fileset_properties,
88+
)
89+
90+
cls.fs = GCSFileSystem(token=cls.key_file)
91+
92+
def test_mkdir(self):
93+
mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
94+
mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
95+
fs = gvfs.GravitinoVirtualFileSystem(
96+
server_uri="http://localhost:8090",
97+
metalake_name=self.metalake_name,
98+
options=self.options,
99+
**self.conf,
100+
)
101+
102+
# it actually takes no effect.
103+
self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
104+
105+
# check whether it will automatically create the bucket if 'create_parents'
106+
# is set to True.
107+
new_bucket = self.bucket_name + "1"
108+
mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket)
109+
mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name, new_bucket)
110+
111+
fs.mkdir(mkdir_dir, create_parents=True)
112+
self.assertFalse(self.fs.exists(mkdir_actual_dir))

0 commit comments

Comments
 (0)