Skip to content

Commit 1ec1c01

Browse files
committed
Add
1 parent bfd3945 commit 1ec1c01

File tree

10 files changed

+653
-101
lines changed

10 files changed

+653
-101
lines changed

clients/filesystem-fuse/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,5 @@ libc = "0.2.168"
4242
log = "0.4.22"
4343
tokio = { version = "1.38.0", features = ["full"] }
4444
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
45+
regex = "1.11.1"
4546

clients/filesystem-fuse/src/filesystem.rs

+25-21
Original file line numberDiff line numberDiff line change
@@ -94,39 +94,38 @@ pub(crate) trait PathFileSystem: Send + Sync {
9494
async fn init(&self) -> Result<()>;
9595

9696
/// Get the file stat by file path, if the file is exist, return the file stat
97-
async fn stat(&self, name: &str) -> Result<FileStat>;
97+
async fn stat(&self, path: &str) -> Result<FileStat>;
9898

99-
/// Get the file stat by parent file path and file name, if the file is exist, return the file stat
100-
async fn lookup(&self, parent: &str, name: &str) -> Result<FileStat>;
99+
/// Get the file stat by file path, if the file is exist, return the file stat
100+
async fn lookup(&self, path: &str) -> Result<FileStat>;
101101

102102
/// Read the directory by file path, if the file is a valid directory, return the file stat list
103-
async fn read_dir(&self, name: &str) -> Result<Vec<FileStat>>;
103+
async fn read_dir(&self, path: &str) -> Result<Vec<FileStat>>;
104104

105105
/// Open the file by file path and flags, if the file is exist, return the opened file
106-
async fn open_file(&self, name: &str, flags: OpenFileFlags) -> Result<OpenedFile>;
106+
async fn open_file(&self, path: &str, flags: OpenFileFlags) -> Result<OpenedFile>;
107107

108108
/// Open the directory by file path and flags, if the file is exist, return the opened file
109-
async fn open_dir(&self, name: &str, flags: OpenFileFlags) -> Result<OpenedFile>;
109+
async fn open_dir(&self, path: &str, flags: OpenFileFlags) -> Result<OpenedFile>;
110110

111-
/// Create the file by parent file path and file name and flags, if successful, return the opened file
111+
/// Create the file by file path and flags, if successful, return the opened file
112112
async fn create_file(
113113
&self,
114-
parent: &str,
115-
name: &str,
114+
path: &str,
116115
flags: OpenFileFlags,
117116
) -> Result<OpenedFile>;
118117

119-
/// Create the directory by parent file path and file name, if successful, return the file stat
120-
async fn create_dir(&self, parent: &str, name: &str) -> Result<FileStat>;
118+
/// Create the directory by file path , if successful, return the file stat
119+
async fn create_dir(&self, path: &str) -> Result<FileStat>;
121120

122121
/// Set the file attribute by file path and file stat
123-
async fn set_attr(&self, name: &str, file_stat: &FileStat, flush: bool) -> Result<()>;
122+
async fn set_attr(&self, path: &str, file_stat: &FileStat, flush: bool) -> Result<()>;
124123

125-
/// Remove the file by parent file path and file name
126-
async fn remove_file(&self, parent: &str, name: &str) -> Result<()>;
124+
/// Remove the file by file path
125+
async fn remove_file(&self, path: &str) -> Result<()>;
127126

128-
/// Remove the directory by parent file path and file name
129-
async fn remove_dir(&self, parent: &str, name: &str) -> Result<()>;
127+
/// Remove the directory by file path
128+
async fn remove_dir(&self, path: &str) -> Result<()>;
130129
}
131130

132131
// FileSystemContext is the system environment for the fuse file system.
@@ -505,7 +504,8 @@ impl<T: PathFileSystem> RawFileSystem for DefaultRawFileSystem<T> {
505504

506505
async fn lookup(&self, parent_file_id: u64, name: &str) -> Result<FileStat> {
507506
let parent_file_entry = self.get_file_entry(parent_file_id).await?;
508-
let mut file_stat = self.fs.lookup(&parent_file_entry.path, name).await?;
507+
let path = join_file_path(&parent_file_entry.path, name);
508+
let mut file_stat = self.fs.lookup(&path).await?;
509509
// fill the file id to file stat
510510
self.resolve_file_id_to_filestat(&mut file_stat, parent_file_id)
511511
.await;
@@ -533,9 +533,10 @@ impl<T: PathFileSystem> RawFileSystem for DefaultRawFileSystem<T> {
533533

534534
async fn create_file(&self, parent_file_id: u64, name: &str, flags: u32) -> Result<FileHandle> {
535535
let parent_file_entry = self.get_file_entry(parent_file_id).await?;
536+
let path = join_file_path(&parent_file_entry.path, name);
536537
let mut opened_file = self
537538
.fs
538-
.create_file(&parent_file_entry.path, name, OpenFileFlags(flags))
539+
.create_file(&path, OpenFileFlags(flags))
539540
.await?;
540541

541542
opened_file.set_file_id(parent_file_id, self.next_file_id());
@@ -558,7 +559,8 @@ impl<T: PathFileSystem> RawFileSystem for DefaultRawFileSystem<T> {
558559

559560
async fn create_dir(&self, parent_file_id: u64, name: &str) -> Result<u64> {
560561
let parent_file_entry = self.get_file_entry(parent_file_id).await?;
561-
let mut filestat = self.fs.create_dir(&parent_file_entry.path, name).await?;
562+
let path = join_file_path(&parent_file_entry.path, name);
563+
let mut filestat = self.fs.create_dir(&path).await?;
562564

563565
filestat.set_file_id(parent_file_id, self.next_file_id());
564566

@@ -577,7 +579,8 @@ impl<T: PathFileSystem> RawFileSystem for DefaultRawFileSystem<T> {
577579

578580
async fn remove_file(&self, parent_file_id: u64, name: &str) -> Result<()> {
579581
let parent_file_entry = self.get_file_entry(parent_file_id).await?;
580-
self.fs.remove_file(&parent_file_entry.path, name).await?;
582+
let path = join_file_path(&parent_file_entry.path, name);
583+
self.fs.remove_file(&path).await?;
581584

582585
// remove the file from file entry manager
583586
{
@@ -589,7 +592,8 @@ impl<T: PathFileSystem> RawFileSystem for DefaultRawFileSystem<T> {
589592

590593
async fn remove_dir(&self, parent_file_id: u64, name: &str) -> Result<()> {
591594
let parent_file_entry = self.get_file_entry(parent_file_id).await?;
592-
self.fs.remove_dir(&parent_file_entry.path, name).await?;
595+
let path = join_file_path(&parent_file_entry.path, name);
596+
self.fs.remove_dir(&path).await?;
593597

594598
// remove the dir from file entry manager
595599
{

clients/filesystem-fuse/src/fuse_api_handle.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use std::ffi::{OsStr, OsString};
3434
use std::num::NonZeroU32;
3535
use std::time::{Duration, SystemTime};
3636

37-
pub(crate) struct FuseApiHandle<T: RawFileSystem> {
37+
pub struct FuseApiHandle<T: RawFileSystem> {
3838
fs: T,
3939
default_ttl: Duration,
4040
fs_context: FileSystemContext,
+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
use fuse3::raw::{Filesystem, MountHandle, Session};
20+
use fuse3::{MountOptions, Result};
21+
use log::{error, info};
22+
use std::process::exit;
23+
use std::sync::Arc;
24+
use std::time::Duration;
25+
use tokio::sync::{Mutex, Notify};
26+
use tokio::time::timeout;
27+
28+
/// Represents a FUSE server capable of starting and stopping the FUSE filesystem.
29+
pub struct FuseServer {
30+
// Notification for stop
31+
close_notify: Arc<Notify>,
32+
33+
// Shared handle to manage FUSE unmounting
34+
mount_handle: Arc<Mutex<Option<MountHandle>>>, // Shared handle to manage FUSE unmounting
35+
36+
// Mount point of the FUSE filesystem
37+
mount_point: String,
38+
}
39+
40+
impl FuseServer {
41+
/// Creates a new instance of `FuseServer`.
42+
pub fn new(mount_point: &str) -> Self {
43+
Self {
44+
close_notify: Arc::new(Notify::new()),
45+
mount_handle: Arc::new(Mutex::new(None)),
46+
mount_point: mount_point.to_string(),
47+
}
48+
}
49+
50+
/// Starts the FUSE filesystem and blocks until it is stopped.
51+
pub async fn start(&self, fuse_fs: impl Filesystem + Sync + 'static) -> Result<()> {
52+
//check if the mount point exists
53+
if !std::path::Path::new(&self.mount_point).exists() {
54+
error!("Mount point {} does not exist", self.mount_point);
55+
exit(libc::ENOENT);
56+
}
57+
58+
info!(
59+
"Starting FUSE filesystem and mounting at {}",
60+
self.mount_point
61+
);
62+
63+
let mount_options = MountOptions::default();
64+
let mount_handle = Session::new(mount_options)
65+
.mount_with_unprivileged(fuse_fs, &self.mount_point)
66+
.await?;
67+
68+
{
69+
let mut handle_guard = self.mount_handle.lock().await;
70+
*handle_guard = Some(mount_handle);
71+
}
72+
73+
// Wait for stop notification
74+
self.close_notify.notified().await;
75+
76+
info!("Received stop notification, FUSE filesystem will be unmounted.");
77+
Ok(())
78+
}
79+
80+
/// Stops the FUSE filesystem and waits for unmounting to complete.
81+
pub async fn stop(&self) -> Result<()> {
82+
// Notify stop
83+
self.close_notify.notify_one();
84+
85+
info!("Stopping FUSE filesystem...");
86+
let timeout_duration = Duration::from_secs(5);
87+
88+
let handle = {
89+
let mut handle_guard = self.mount_handle.lock().await;
90+
handle_guard.take() // Take the handle out to unmount
91+
};
92+
93+
if let Some(mount_handle) = handle {
94+
let res = timeout(timeout_duration, mount_handle.unmount()).await;
95+
96+
match res {
97+
Ok(Ok(())) => {
98+
info!("FUSE filesystem unmounted successfully.");
99+
Ok(())
100+
}
101+
Ok(Err(e)) => {
102+
error!("Failed to unmount FUSE filesystem: {:?}", e);
103+
Err(e.into())
104+
}
105+
Err(_) => {
106+
error!("Unmount timed out.");
107+
Err(libc::ETIMEDOUT.into())
108+
}
109+
}
110+
} else {
111+
error!("No active mount handle to unmount.");
112+
Err(libc::EBADF.into())
113+
}
114+
}
115+
}

clients/filesystem-fuse/src/lib.rs

+11
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,14 @@ mod filesystem;
2020
mod fuse_api_handle;
2121
mod opened_file_manager;
2222
mod utils;
23+
mod memory_filesystem;
24+
mod fuse_server;
25+
mod mount;
26+
27+
pub async fn gvfs_mount() -> fuse3::Result<()> {
28+
mount::mount().await
29+
}
30+
31+
pub fn gvfs_unmount() {
32+
33+
}

clients/filesystem-fuse/src/main.rs

+8-56
Original file line numberDiff line numberDiff line change
@@ -19,64 +19,16 @@
1919
mod filesystem;
2020
mod fuse_api_handle;
2121
mod opened_file_manager;
22+
mod memory_filesystem;
23+
mod fuse_server;
2224
mod utils;
25+
mod mount;
2326

24-
use log::debug;
25-
use log::info;
26-
use std::process::exit;
27+
use gvfs_fuse::gvfs_mount;
2728

2829
#[tokio::main]
29-
async fn main() {
30+
async fn main() -> fuse3::Result<()> {
3031
tracing_subscriber::fmt().init();
31-
info!("Starting filesystem...");
32-
debug!("Shutdown filesystem...");
33-
exit(0);
34-
}
35-
36-
async fn create_gvfs_fuse_filesystem() {
37-
// Gvfs-fuse filesystem structure:
38-
// FuseApiHandle
39-
// ├─ DefaultRawFileSystem (RawFileSystem)
40-
// │ └─ FileSystemLog (PathFileSystem)
41-
// │ ├─ GravitinoComposedFileSystem (PathFileSystem)
42-
// │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
43-
// │ │ │ └─ S3FileSystem (PathFileSystem)
44-
// │ │ │ └─ OpenDALFileSystem (PathFileSystem)
45-
// │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
46-
// │ │ │ └─ HDFSFileSystem (PathFileSystem)
47-
// │ │ │ └─ OpenDALFileSystem (PathFileSystem)
48-
// │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
49-
// │ │ │ └─ JuiceFileSystem (PathFileSystem)
50-
// │ │ │ └─ NasFileSystem (PathFileSystem)
51-
// │ │ ├─ GravitinoFilesetFileSystem (PathFileSystem)
52-
// │ │ │ └─ XXXFileSystem (PathFileSystem)
53-
//
54-
// `SimpleFileSystem` is a low-level filesystem designed to communicate with FUSE APIs.
55-
// It manages file and directory relationships, as well as file mappings.
56-
// It delegates file operations to the PathFileSystem
57-
//
58-
// `FileSystemLog` is a decorator that adds extra debug logging functionality to file system APIs.
59-
// Similar implementations include permissions, caching, and metrics.
60-
//
61-
// `GravitinoComposeFileSystem` is a composite file system that can combine multiple `GravitinoFilesetFileSystem`.
62-
// It use the part of catalog and schema of fileset path to a find actual GravitinoFilesetFileSystem. delegate the operation to the real storage.
63-
// If the user only mounts a fileset, this layer is not present. There will only be one below layer.
64-
//
65-
// `GravitinoFilesetFileSystem` is a file system that can access a fileset.It translates the fileset path to the real storage path.
66-
// and delegate the operation to the real storage.
67-
//
68-
// `OpenDALFileSystem` is a file system that use the OpenDAL to access real storage.
69-
// it can assess the S3, HDFS, gcs, azblob and other storage.
70-
//
71-
// `S3FileSystem` is a file system that use `OpenDALFileSystem` to access S3 storage.
72-
//
73-
// `HDFSFileSystem` is a file system that use `OpenDALFileSystem` to access HDFS storage.
74-
//
75-
// `NasFileSystem` is a filesystem that uses a locally accessible path mounted by NAS tools, such as JuiceFS.
76-
//
77-
// `JuiceFileSystem` is a file that use `NasFileSystem` to access JuiceFS storage.
78-
//
79-
// `XXXFileSystem is a filesystem that allows you to implement file access through your own extensions.
80-
81-
todo!("Implement the createGvfsFuseFileSystem function");
82-
}
32+
gvfs_mount().await?;
33+
Ok(())
34+
}

0 commit comments

Comments
 (0)