Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(Draft) Gvfs-fuse implementation and code structure layout #5738

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
30581af
Init gvfs-fuse
diqiu50 Dec 3, 2024
18cd375
Update
diqiu50 Dec 3, 2024
b7cbeab
improve Fuse server stop
diqiu50 Dec 3, 2024
fe4ad01
Remove invalid comments
diqiu50 Dec 3, 2024
2ae9427
Retrieve gid and uid from system.
diqiu50 Dec 3, 2024
c9459aa
Add workflow for gvfs-fuse
diqiu50 Dec 3, 2024
ae1e591
Update code format
diqiu50 Dec 3, 2024
9d584d7
Fix ut
diqiu50 Dec 3, 2024
b13c594
Update test log level
diqiu50 Dec 3, 2024
5156f9d
Fix error log
diqiu50 Dec 3, 2024
36282f2
Remove log_filesystem.rs
diqiu50 Dec 4, 2024
cd11e47
Optimize error handlling
diqiu50 Dec 4, 2024
4f6e980
Update interfaces
diqiu50 Dec 5, 2024
5eba1f0
Change the interface to async
diqiu50 Dec 5, 2024
bf2b1d2
Use the static dispatch for fs interface
diqiu50 Dec 6, 2024
a5bb26d
Merge branch 'branch-gvfs-fuse-dev' into gvfs-2
diqiu50 Dec 6, 2024
7d08dae
refact reade and write interfaces
diqiu50 Dec 8, 2024
a2b7ad4
Update reader and writer error handling
diqiu50 Dec 9, 2024
8f76564
Make Reader Writer interface async
diqiu50 Dec 9, 2024
a9db9a6
fix error in the SimpleFilesystem
diqiu50 Dec 10, 2024
3b2fb12
Update rust toolchains
diqiu50 Dec 10, 2024
9abd45d
Add close file
diqiu50 Dec 10, 2024
0631316
Add fuse debug log
diqiu50 Dec 4, 2024
a48ba1a
Fix ci failed
diqiu50 Dec 10, 2024
bebcd73
Temp
diqiu50 Dec 10, 2024
f6b0ceb
Merge branch 'gvfs-3' into gvfs-2
diqiu50 Dec 13, 2024
b0734f1
Support config
diqiu50 Dec 16, 2024
7640a8f
set rust build directory to target
diqiu50 Dec 16, 2024
8ac961b
Update
diqiu50 Dec 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions .github/workflows/gvfs-fuse-build-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
name: Build gvfs-fuse and testing

# Controls when the workflow will run
on:
push:
branches: [ "main", "branch-*" ]
pull_request:
branches: [ "main", "branch-*" ]
workflow_dispatch:

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
changes:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: dorny/paths-filter@v2
id: filter
with:
filters: |
source_changes:
- .github/**
- api/**
- bin/**
- catalogs/**
- clients/filesystem-fuse/**
- common/**
- conf/**
- core/**
- dev/**
- gradle/**
- meta/**
- scripts/**
- server/**
- server-common/**
- build.gradle.kts
- gradle.properties
- gradlew
- setting.gradle.kts
outputs:
source_changes: ${{ steps.filter.outputs.source_changes }}

# Build for AMD64 architecture
Gvfs-Build:
needs: changes
if: needs.changes.outputs.source_changes == 'true'
runs-on: ubuntu-latest
timeout-minutes: 60
strategy:
matrix:
architecture: [linux/amd64]
java-version: [ 17 ]
env:
PLATFORM: ${{ matrix.architecture }}
steps:
- uses: actions/checkout@v3

- uses: actions/setup-java@v4
with:
java-version: ${{ matrix.java-version }}
distribution: 'temurin'
cache: 'gradle'

- name: Set up QEMU
uses: docker/setup-qemu-action@v2

- name: Check required command
run: |
dev/ci/check_commands.sh

- name: Build and test Gravitino
run: |
./gradlew :clients:filesystem-fuse:build -PenableFuse=true

- name: Package Gravitino
run: |
./gradlew compileDistribution -x test -PjdkVersion=${{ matrix.java-version }} -PenableFuse=true
Comment on lines +74 to +80
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the compileDistribution after Build and test Gravitino ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compileDistribution is use to running the integration test, build and test only run uts

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean you mix fuseIT in compileDistribution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, fuseITs depend on the package of compileDistribution

Copy link
Contributor

@mchades mchades Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the function of -PenableFuse=true here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz remove this and the below steps since it's unnecessary now


- name: Free up disk space
run: |
dev/ci/util_free_space.sh

- name: Upload tests reports
uses: actions/upload-artifact@v3
if: ${{ (failure() && steps.integrationTest.outcome == 'failure') || contains(github.event.pull_request.labels.*.name, 'upload log') }}
with:
name: trino-connector-integrate-test-reports-${{ matrix.java-version }}
path: |
clients/filesystem-fuse/build/test/log/*.log

17 changes: 16 additions & 1 deletion clients/filesystem-fuse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,24 @@ repository = "https://github.com/apache/gravitino"
name = "gvfs-fuse"
path = "src/main.rs"

[lib]
name="gvfs_fuse"

[dependencies]
bytes = "1.6.0"
dashmap = "5.5.3"
futures-util = "0.3.30"
fuse3 = { version = "0.8.1", "features" = ["tokio-runtime", "unprivileged"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you plan to use fuse3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the thread mode of fuse3 is better than fuser. It uses multiple threads with asynchronous I/O.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does fuse3 have some disadvantages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FUSE3 is more modern, with fewer legacy burdens, but it has a smaller user base. On the other hand, fuser has been around for a long time, its implementation and technologies are older, but it has a larger user base, making it more stable in practice.

I'm not sure which one would be better to use right now. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern about FUSE3 is the stability and smaller user community. It may hard to resolve if encountering some underlying problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should use fuser?

libc = "0.2.164"
log = "0.4.22"
opendal = { version = "0.46.0", features = ["services-s3"] }
tokio = { version = "1.38.0", features = ["full"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
regex = "1.11.1"
async-trait = "0.1"
reqwest = { version = "0.12.9", features = ["json"] }
serde = { version = "1.0.215", features = ["derive"] }
urlencoding = "2.1.3"

[dev-dependencies]
mockito = "0.31"
3 changes: 2 additions & 1 deletion clients/filesystem-fuse/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ val checkRustProject by tasks.registering(Exec::class) {
cargo fmt --all -- --check

echo "Running clippy"
cargo clippy --all-targets --all-features --workspace -- -D warnings
#cargo clippy --all-targets --all-features --workspace -- -D warnings
cargo clippy --all-targets --all-features --workspace --
""".trimIndent()
)
}
Expand Down
21 changes: 21 additions & 0 deletions clients/filesystem-fuse/rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[toolchain]
channel = "1.82.0"
components = ["rustfmt", "clippy", "rust-src"]
profile = "default"
236 changes: 236 additions & 0 deletions clients/filesystem-fuse/src/cloud_storage_filesystem.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use crate::filesystem::{
FileReader, FileStat, FileSystemCapacity, FileWriter, OpenFileFlags, OpenedFile,
PathFileSystem, Result,
};
use crate::filesystem_metadata::DefaultFileSystemMetadata;
use crate::utils::join_file_path;
use async_trait::async_trait;
use bytes::Bytes;
use fuse3::{Errno, FileType, Timestamp};
use futures_util::{TryFutureExt, TryStreamExt};
use log::debug;
use opendal::layers::LoggingLayer;
use opendal::{services, EntryMode, ErrorKind, Metadata, Operator};
use std::ops::Range;
use std::sync::{Mutex, RwLock};
use std::time::SystemTime;

pub(crate) struct CloudStorageFileSystem {
op: Operator,
}

impl CloudStorageFileSystem {
pub fn new(op: Operator) -> Self {
Self {
op: op,
}
}
}

#[async_trait]
impl PathFileSystem for CloudStorageFileSystem {
async fn init(&self) {}

async fn stat(&self, name: &str) -> Result<FileStat> {
let meta = self.op.stat(name).await.map_err(opendal_error_to_errno)?;
let mut file_stat = FileStat::new_file_with_path(name, 0);
opdal_meta_to_file_stat(&meta, &mut file_stat);
Ok(file_stat)
}

async fn lookup(&self, parent: &str, name: &str) -> Result<FileStat> {
let path = join_file_path(parent, name);
self.stat(&path).await
}

async fn read_dir(&self, name: &str) -> Result<Vec<FileStat>> {
let entries = self.op.list(name).await.map_err(opendal_error_to_errno)?;
entries
.iter()
.map(|entry| {
let path = entry.path().trim_end_matches('/');
let mut file_stat = FileStat::new_file_with_path(&path, 0);
opdal_meta_to_file_stat(entry.metadata(), &mut file_stat);
debug!("read dir file stat: {:?}", file_stat);
Ok(file_stat)
})
.collect()
}

async fn open_file(&self, name: &str, flags: OpenFileFlags) -> Result<OpenedFile> {
let file_stat = self.stat(name).await?;
debug_assert!(file_stat.kind == FileType::RegularFile);
let mut file = OpenedFile::new(file_stat);
if flags.is_read() {
let reader = self
.op
.reader_with(name)
.await
.map_err(opendal_error_to_errno)?;
file.reader = Some(Box::new(FileReaderImpl { reader }));
}
if flags.is_write() {
let writer = self
.op
.writer_with(name)
.await
.map_err(opendal_error_to_errno)?;
file.writer = Some(Box::new(FileWriterImpl { writer }));
}
Ok(file)
}

async fn open_dir(&self, name: &str, flags: OpenFileFlags) -> Result<OpenedFile> {
let file_stat = self.stat(name).await?;
debug_assert!(file_stat.kind == FileType::Directory);
let mut file = OpenedFile::new(file_stat);
Ok(file)
}

async fn create_file(
&self,
parent: &str,
name: &str,
flags: OpenFileFlags,
) -> Result<OpenedFile> {
let mut file = OpenedFile::new(FileStat::new_file_with_path(name, 0));

if flags.is_read() {
let reader = self
.op
.reader_with(name)
.await
.map_err(opendal_error_to_errno)?;
file.reader = Some(Box::new(FileReaderImpl { reader }));
}
if flags.is_write() {
let writer = self
.op
.writer_with(name)
.await
.map_err(opendal_error_to_errno)?;
file.writer = Some(Box::new(FileWriterImpl { writer }));
}
Ok(file)
}

async fn create_dir(&self, parent: &str, name: &str) -> Result<OpenedFile> {
let path = join_file_path(parent, name);
self.op
.create_dir(&path)
.await
.map_err(opendal_error_to_errno)?;
let file_stat = self.stat(&path).await?;
Ok(OpenedFile::new(file_stat))
}

async fn set_attr(&self, name: &str, file_stat: &FileStat, flush: bool) -> Result<()> {
Ok(())
}

async fn remove_file(&self, parent: &str, name: &str) -> Result<()> {
self.op
.remove(vec![join_file_path(parent, name)])
.await
.map_err(opendal_error_to_errno)
}

async fn remove_dir(&self, parent: &str, name: &str) -> Result<()> {
//todo:: need to consider keeping the behavior of posix remove dir when the dir is not empty
self.op
.remove_all(&join_file_path(parent, name))
.await
.map_err(opendal_error_to_errno)
}

fn get_capacity(&self) -> Result<FileSystemCapacity> {
Ok(FileSystemCapacity {})
}
}

struct FileReaderImpl {
reader: opendal::Reader,
}

#[async_trait]
impl FileReader for FileReaderImpl {
async fn read(&mut self, offset: u64, size: u32) -> Result<Bytes> {
let end = offset + size as u64;
let v = self
.reader
.read(offset..end)
.await
.map_err(opendal_error_to_errno)?;
Ok(v.to_bytes())
}
}

struct FileWriterImpl {
writer: opendal::Writer,
}

#[async_trait]
impl FileWriter for FileWriterImpl {
async fn write(&mut self, offset: u64, data: &[u8]) -> Result<u32> {
self.writer
.write(data.to_vec())
.await
.map_err(opendal_error_to_errno)?;
Ok(data.len() as u32)
}

async fn close(&mut self) -> Result<()> {
self.writer.close().await.map_err(opendal_error_to_errno)?;
Ok(())
}
}

fn opendal_error_to_errno(err: opendal::Error) -> fuse3::Errno {
debug!("opendal_error2errno: {:?}", err);
match err.kind() {
ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP),
ErrorKind::IsADirectory => Errno::from(libc::EISDIR),
ErrorKind::NotFound => Errno::from(libc::ENOENT),
ErrorKind::PermissionDenied => Errno::from(libc::EACCES),
ErrorKind::AlreadyExists => Errno::from(libc::EEXIST),
ErrorKind::NotADirectory => Errno::from(libc::ENOTDIR),
ErrorKind::RateLimited => Errno::from(libc::EBUSY),
_ => Errno::from(libc::ENOENT),
}
}

fn opendal_filemode_to_filetype(mode: EntryMode) -> FileType {
match mode {
EntryMode::DIR => FileType::Directory,
_ => FileType::RegularFile,
}
}

fn opdal_meta_to_file_stat(meta: &Metadata, file_stat: &mut FileStat) {
let now = SystemTime::now();
let mtime = meta.last_modified().map(|x| x.into()).unwrap_or(now);

file_stat.size = meta.content_length();
file_stat.kind = opendal_filemode_to_filetype(meta.mode());
file_stat.ctime = Timestamp::from(mtime);
file_stat.atime = Timestamp::from(now);
file_stat.mtime = Timestamp::from(mtime);
}
Loading
Loading