diff --git a/app/src/main.rs b/app/src/main.rs index b71e78e..a3e392f 100644 --- a/app/src/main.rs +++ b/app/src/main.rs @@ -16,11 +16,11 @@ use chrono::Local; use clap::Parser; use env_logger::Builder; use glob::glob; -use gzp::MgzipSyncReader; -use gzp::{ - deflate::Mgzip, - par::compress::{ParCompress, ParCompressBuilder}, -}; +// use gzp::MgzipSyncReader; +// use gzp::{ +// deflate::Mgzip, +// par::compress::{ParCompress, ParCompressBuilder}, +// }; use itertools::Itertools as _; use log::LevelFilter; use pcd_exporter::gltf::generate_glb; @@ -29,7 +29,7 @@ use pcd_parser::reader::las::LasPointReader; use pcd_parser::reader::PointReader; use pcd_transformer::projection::transform_point; use projection_transform::vshift::Jgd2011ToWgs84; -use rayon::iter::{IntoParallelRefIterator as _, ParallelIterator as _}; +use rayon::iter::{IntoParallelIterator as _, IntoParallelRefIterator as _, ParallelIterator as _}; use tempfile::tempdir; use tinymvt::tileid::hilbert; @@ -46,7 +46,7 @@ use pcd_exporter::{ use pcd_parser::parser::{get_extension, Extension}; use projection_transform::cartesian::geodetic_to_geocentric; -#[derive(Parser, Debug)] +#[derive(Parser, Debug, Clone)] #[command( name = "Point Tiler", about = "A tool for converting point cloud data into 3D Tiles", @@ -129,7 +129,8 @@ fn write_points_to_tile( fs::create_dir_all(tile_path.parent().unwrap())?; let file = File::create(tile_path)?; - let mut writer: ParCompress = ParCompressBuilder::new().from_writer(file); + // let mut writer: ParCompress = ParCompressBuilder::new().from_writer(file); + let mut writer = BufWriter::new(file); let encoded = bitcode::encode(points); writer.write_all(&encoded)?; @@ -139,7 +140,8 @@ fn write_points_to_tile( fn read_points_from_tile(file_path: &Path) -> std::io::Result> { let file = File::open(file_path)?; - let mut buf_reader = MgzipSyncReader::new(file); + // let mut buf_reader = MgzipSyncReader::new(file); + let mut buf_reader = file; let mut buffer = Vec::new(); buf_reader.read_to_end(&mut buffer).unwrap(); let points = bitcode::decode(&buffer).unwrap(); @@ -206,9 +208,12 @@ fn aggregate_zoom_level(base_path: &Path, z: u8) -> std::io::Result<()> { } } - for (parent_tile, pts) in parent_map { - write_points_to_tile(base_path, parent_tile, &pts)?; - } + parent_map + .into_par_iter() + .try_for_each(|(parent_tile, pts)| -> std::io::Result<()> { + write_points_to_tile(base_path, parent_tile, &pts)?; + Ok(()) + })?; Ok(()) } @@ -272,7 +277,8 @@ fn export_tiles_to_glb( if gzip_compress { let file = File::create(glb_path).unwrap(); - let writer: ParCompress = ParCompressBuilder::new().from_writer(file); + // let writer: ParCompress = ParCompressBuilder::new().from_writer(file); + let writer = BufWriter::new(file); glb.to_writer_with_alignment(writer, 8).unwrap(); } else { let file = File::create(glb_path).unwrap(); @@ -327,7 +333,8 @@ impl RunFileIterator { fn read_run_file(path: PathBuf) -> Result, Infallible> { let file = File::open(path).unwrap(); - let mut buf_reader = MgzipSyncReader::new(file); + // let mut buf_reader = MgzipSyncReader::new(file); + let mut buf_reader = file; let mut buffer = Vec::new(); buf_reader.read_to_end(&mut buffer).unwrap(); let data: Vec<(SortKey, Point)> = bitcode::decode(&buffer[..]).unwrap(); @@ -359,203 +366,118 @@ impl Iterator for RunFileIterator { } } -fn main() { - Builder::new() - .format(|buf, record| { - writeln!( - buf, - "{} [{}] - {}", - Local::now().format("%Y-%m-%d %H:%M:%S"), - record.level(), - record.args() - ) - }) - .filter(None, LevelFilter::Info) - .init(); - - let args = Cli::parse(); - - log::info!("input files: {:?}", args.input); - log::info!("output folder: {}", args.output); - log::info!("input EPSG: {}", args.input_epsg); - log::info!("output EPSG: {}", args.output_epsg); - log::info!("min zoom: {}", args.min); - log::info!("max zoom: {}", args.max); - log::info!("max memory mb: {}", args.max_memory_mb); - log::info!("quantize: {}", args.quantize); - log::info!("gzip compress: {}", args.gzip_compress); - - let start = std::time::Instant::now(); - - log::info!("start processing..."); - let input_files = expand_globs(args.input); - log::info!("Expanded input files: {:?}", input_files); - - let output_path = PathBuf::from(args.output); - std::fs::create_dir_all(&output_path).unwrap(); - - let tmp_run_file_dir_path = tempdir().unwrap(); - - let min_zoom = args.min; - let max_zoom = args.max; - - log::info!( - "start parse and transform and tiling at max_zoom ({})...", - max_zoom - ); - let start_local = std::time::Instant::now(); +fn estimate_total_size(paths: &[PathBuf]) -> u64 { + paths + .iter() + .map(|p| p.metadata().map(|m| m.len()).unwrap_or(0)) + .sum() +} +fn in_memory_workflow( + input_files: Vec, + args: &Cli, + output_path: &Path, +) -> std::io::Result<()> { let jgd2wgs = Arc::new(Jgd2011ToWgs84::default()); - let max_memory_mb: usize = args.max_memory_mb; - let max_memory_mb_bytes = max_memory_mb * 1024 * 1024; - let point_size = 96; - let default_chunk_points_len = 10_000_000; - let one_chunk_mem = default_chunk_points_len * point_size; - let mut channel_capacity = max_memory_mb_bytes / one_chunk_mem; - if channel_capacity == 0 { - channel_capacity = 1; - } - - log::info!("max_memory_mb_bytes: {}", max_memory_mb_bytes); - log::info!("one_chunk_mem: {}", one_chunk_mem); - log::info!("channel_capacity: {}", channel_capacity); - - let (tx, rx) = mpsc::sync_channel::>(channel_capacity); - let handle = thread::spawn(move || { - let mut buffer = Vec::with_capacity(default_chunk_points_len); - - let extension = check_and_get_extension(&input_files).unwrap(); - let mut reader: Box = match extension { - Extension::Las => Box::new(LasPointReader::new(input_files).unwrap()), - Extension::Laz => Box::new(LasPointReader::new(input_files).unwrap()), - Extension::Csv => Box::new(CsvPointReader::new(input_files).unwrap()), - Extension::Txt => Box::new(CsvPointReader::new(input_files).unwrap()), - }; - - while let Ok(Some(p)) = reader.next_point() { - buffer.push(p); - if buffer.len() >= default_chunk_points_len { - if tx.send(buffer.clone()).is_err() { - break; - } - buffer = Vec::with_capacity(default_chunk_points_len); - } + let extension = check_and_get_extension(&input_files).unwrap(); + let mut reader: Box = match extension { + Extension::Las | Extension::Laz => { + Box::new(LasPointReader::new(input_files.clone()).unwrap()) } - if !buffer.is_empty() { - tx.send(buffer.clone()).unwrap(); - buffer.clear(); + Extension::Csv | Extension::Txt => { + Box::new(CsvPointReader::new(input_files.clone()).unwrap()) } - }); - - for (current_run_index, chunk) in rx.into_iter().enumerate() { - let mut keyed_points: Vec<(SortKey, Point)> = chunk - .into_iter() - .map(|p| { - let transformed = transform_point(p, args.input_epsg, args.output_epsg, &jgd2wgs); - - let tile_coords = - tiling::scheme::zxy_from_lng_lat(max_zoom, transformed.x, transformed.y); - let tile_id = - TileIdMethod::Hilbert.zxy_to_id(tile_coords.0, tile_coords.1, tile_coords.2); - - (SortKey { tile_id }, transformed) - }) - .collect(); - - keyed_points.sort_by_key(|(k, _)| k.tile_id); + }; - let run_file_path = tmp_run_file_dir_path - .path() - .join(format!("run_{}.bin", current_run_index)); - let file = fs::File::create(run_file_path).unwrap(); - let mut writer: ParCompress = ParCompressBuilder::new().from_writer(file); + log::info!("start parse and transform and tiling..."); + let start_local = std::time::Instant::now(); - let encoded = bitcode::encode(&keyed_points); - writer.write_all(&encoded).unwrap(); + let mut all_points = Vec::new(); + while let Ok(Some(p)) = reader.next_point() { + all_points.push(p); } - handle.join().expect("Reading thread panicked"); - log::info!( "Finish transforming and tiling in {:?}", start_local.elapsed() ); - log::info!("start sorting..."); + log::info!("start grouping..."); let start_local = std::time::Instant::now(); - let pattern = tmp_run_file_dir_path.path().join("run_*.bin"); - let run_files = glob::glob(pattern.to_str().unwrap()) - .unwrap() - .map(|r| r.unwrap()) - .collect::>(); - - let tile_id_iter = RunFileIterator::new(run_files); - - let config = kv_extsort::SortConfig::default().max_chunk_bytes(max_memory_mb_bytes); - let sorted_iter = kv_extsort::sort( - tile_id_iter.map(|(key, point)| { - let encoded_point = bitcode::encode(&point); - std::result::Result::<_, Infallible>::Ok((key, encoded_point)) - }), - config, - ); - - let grouped_iter = sorted_iter.chunk_by(|res| match res { - Ok((key, _)) => (false, *key), - Err(_) => (true, SortKey { tile_id: 0 }), - }); - - let tmp_tiled_file_dir_path = tempdir().unwrap(); - - for ((_, key), group) in &grouped_iter { - let points = group - .into_iter() - .map(|r| r.map(|(_, p)| bitcode::decode::(&p).unwrap())) - .collect::, _>>() - .unwrap(); - - let tile_id = key.tile_id; - let tile = TileIdMethod::Hilbert.id_to_zxy(tile_id); + let epsg_in = args.input_epsg; + let epsg_out = args.output_epsg; + let max_zoom = args.max; - let (z, x, y) = tile; - let tile_path = tmp_tiled_file_dir_path - .path() - .join(format!("{}/{}/{}.bin", z, x, y)); + let jgd2wgs_clone = Arc::clone(&jgd2wgs); + let map_init = || HashMap::>::new(); + let map_fold = move |mut map: HashMap>, p: &Point| { + let transformed = transform_point(p.clone(), epsg_in, epsg_out, &jgd2wgs_clone); + let (z, x, y) = tiling::scheme::zxy_from_lng_lat(max_zoom, transformed.x, transformed.y); + let tile_id = TileIdMethod::Hilbert.zxy_to_id(z, x, y); + map.entry(tile_id).or_default().push(transformed); + map + }; + let map_reduce = |mut a: HashMap>, b: HashMap>| { + for (k, mut v) in b { + a.entry(k).or_default().append(&mut v); + } + a + }; - fs::create_dir_all(tile_path.parent().unwrap()).unwrap(); + let tile_map = all_points + .par_iter() + .fold(map_init, map_fold) + .reduce(map_init, map_reduce); - let file = fs::File::create(tile_path).unwrap(); - let mut writer: ParCompress = ParCompressBuilder::new().from_writer(file); + log::info!( + "Transformed & grouped into {} tiles in {:?}", + tile_map.len(), + start_local.elapsed() + ); - let encoded = bitcode::encode(&points); - writer.write_all(&encoded).unwrap(); - } - log::info!("Finish sorting in {:?}", start_local.elapsed()); + let tmp_tiled_file_dir_path = tempdir().unwrap(); - drop(tmp_run_file_dir_path); + log::info!("start writing tile files..."); + let start_local = std::time::Instant::now(); + tile_map + .into_par_iter() + .try_for_each(|(tile_id, points)| -> std::io::Result<()> { + let (z, x, y) = TileIdMethod::Hilbert.id_to_zxy(tile_id); + + let tile_path = tmp_tiled_file_dir_path + .path() + .join(format!("{}/{}/{}.bin", z, x, y)); + fs::create_dir_all(tile_path.parent().unwrap())?; + let file = File::create(tile_path)?; + let mut writer = BufWriter::new(file); + let encoded = bitcode::encode(&points); + writer.write_all(&encoded)?; + Ok(()) + })?; + + log::info!("Wrote tile files in {:?}", start_local.elapsed()); log::info!("start zoom aggregation..."); let start_local = std::time::Instant::now(); - - // The parent tile coordinates are calculated from the file with the maximum zoom level - for z in (min_zoom..max_zoom).rev() { + for z in (args.min..max_zoom).rev() { log::info!("aggregating zoom level: {}", z); - aggregate_zoom_level(tmp_tiled_file_dir_path.path(), z).unwrap(); + aggregate_zoom_level(tmp_tiled_file_dir_path.path(), z)?; } log::info!("Finish zoom aggregation in {:?}", start_local.elapsed()); log::info!("start exporting tiles (GLB)..."); let start_local = std::time::Instant::now(); + let tile_contents = export_tiles_to_glb( tmp_tiled_file_dir_path.path(), - &output_path, - min_zoom, + output_path, + args.min, max_zoom, args.quantize, args.gzip_compress, - ) - .unwrap(); + )?; + log::info!("Finish exporting tiles in {:?}", start_local.elapsed()); drop(tmp_tiled_file_dir_path); @@ -564,7 +486,6 @@ fn main() { for content in tile_contents { tree.add_content(content); } - let tileset = cesiumtiles::tileset::Tileset { asset: cesiumtiles::tileset::Asset { version: "1.1".to_string(), @@ -574,16 +495,275 @@ fn main() { geometric_error: 1e+100, ..Default::default() }; - let root_tileset_path = output_path.join("tileset.json"); - log::info!("write tileset.json: {:?}", root_tileset_path); - fs::create_dir_all(root_tileset_path.parent().unwrap()).unwrap(); + fs::create_dir_all(root_tileset_path.parent().unwrap())?; fs::write( root_tileset_path, serde_json::to_string_pretty(&tileset).unwrap(), - ) - .unwrap(); + )?; + + Ok(()) +} + +fn external_sort_workflow( + input_files: Vec, + args: &Cli, + output_path: &Path, +) -> std::io::Result<()> { + log::info!("start parse and transform and tiling..."); + + let start_local = std::time::Instant::now(); + + let jgd2wgs = Arc::new(Jgd2011ToWgs84::default()); + let tmp_run_file_dir_path = tempdir().unwrap(); + + { + let max_memory_mb: usize = args.max_memory_mb; + let max_memory_mb_bytes = max_memory_mb * 1024 * 1024; + let point_size = 96; + let default_chunk_points_len = 10_000_000; + let one_chunk_mem = default_chunk_points_len * point_size; + let mut channel_capacity = max_memory_mb_bytes / one_chunk_mem; + if channel_capacity == 0 { + channel_capacity = 1; + } + + let extension = check_and_get_extension(&input_files).unwrap(); + let input_files_clone = input_files.clone(); + let epsg_in = args.input_epsg; + let epsg_out = args.output_epsg; + + log::info!("max_memory_mb_bytes: {}", max_memory_mb_bytes); + log::info!("one_chunk_mem: {}", one_chunk_mem); + log::info!("channel_capacity: {}", channel_capacity); + + let (tx, rx) = mpsc::sync_channel::>(channel_capacity); + + let handle = thread::spawn(move || { + let mut buffer = Vec::with_capacity(default_chunk_points_len); + let mut reader: Box = match extension { + Extension::Las | Extension::Laz => { + Box::new(LasPointReader::new(input_files_clone).unwrap()) + } + Extension::Csv | Extension::Txt => { + Box::new(CsvPointReader::new(input_files_clone).unwrap()) + } + }; + while let Ok(Some(p)) = reader.next_point() { + let transformed = transform_point(p, epsg_in, epsg_out, &jgd2wgs); + buffer.push(transformed); + if buffer.len() >= default_chunk_points_len { + if tx.send(buffer.clone()).is_err() { + break; + } + buffer = Vec::with_capacity(default_chunk_points_len); + } + } + if !buffer.is_empty() { + tx.send(buffer.clone()).unwrap(); + } + }); + + for (current_run_index, chunk) in rx.into_iter().enumerate() { + let mut keyed_points: Vec<(SortKey, Point)> = chunk + .into_iter() + .map(|p| { + // let transformed = transform_point(p, args.input_epsg, args.output_epsg, &jgd2wgs); + + let tile_coords = tiling::scheme::zxy_from_lng_lat(args.max, p.x, p.y); + let tile_id = TileIdMethod::Hilbert.zxy_to_id( + tile_coords.0, + tile_coords.1, + tile_coords.2, + ); + + (SortKey { tile_id }, p) + }) + .collect(); + + keyed_points.sort_by_key(|(k, _)| k.tile_id); + + let run_file_path = tmp_run_file_dir_path + .path() + .join(format!("run_{}.bin", current_run_index)); + let file = fs::File::create(run_file_path).unwrap(); + // let mut writer: ParCompress = ParCompressBuilder::new().from_writer(file); + let mut writer = BufWriter::new(file); + + let encoded = bitcode::encode(&keyed_points); + writer.write_all(&encoded).unwrap(); + } + + handle.join().expect("Reading thread panicked"); + + log::info!( + "Finish transforming and tiling in {:?}", + start_local.elapsed() + ); + } + + { + log::info!("start sorting..."); + let start_local = std::time::Instant::now(); + + let pattern = tmp_run_file_dir_path.path().join("run_*.bin"); + let run_files = glob::glob(pattern.to_str().unwrap()) + .unwrap() + .map(|r| r.unwrap()) + .collect::>(); + + let tile_id_iter = RunFileIterator::new(run_files); + + let config = + kv_extsort::SortConfig::default().max_chunk_bytes(args.max_memory_mb * 1024 * 1024); + let sorted_iter = kv_extsort::sort( + tile_id_iter.map(|(key, point)| { + let encoded_point = bitcode::encode(&point); + std::result::Result::<_, Infallible>::Ok((key, encoded_point)) + }), + config, + ); + + let grouped_iter = sorted_iter.chunk_by(|res| match res { + Ok((key, _)) => (false, *key), + Err(_) => (true, SortKey { tile_id: 0 }), + }); + + let tmp_tiled_file_dir_path = tempdir().unwrap(); + + for ((_, key), group) in &grouped_iter { + let points = group + .into_iter() + .map(|r| r.map(|(_, p)| bitcode::decode::(&p).unwrap())) + .collect::, _>>() + .unwrap(); + + let tile_id = key.tile_id; + let tile = TileIdMethod::Hilbert.id_to_zxy(tile_id); + + let (z, x, y) = tile; + let tile_path = tmp_tiled_file_dir_path + .path() + .join(format!("{}/{}/{}.bin", z, x, y)); + + fs::create_dir_all(tile_path.parent().unwrap()).unwrap(); + + let file = fs::File::create(tile_path).unwrap(); + // let mut writer: ParCompress = ParCompressBuilder::new().from_writer(file); + let mut writer = BufWriter::new(file); + + let encoded = bitcode::encode(&points); + writer.write_all(&encoded).unwrap(); + } + log::info!("Finish sorting in {:?}", start_local.elapsed()); + + drop(tmp_run_file_dir_path); + + log::info!("start zoom aggregation..."); + let start_local = std::time::Instant::now(); + + // The parent tile coordinates are calculated from the file with the maximum zoom level + for z in (args.min..args.max).rev() { + log::info!("aggregating zoom level: {}", z); + aggregate_zoom_level(tmp_tiled_file_dir_path.path(), z).unwrap(); + } + log::info!("Finish zoom aggregation in {:?}", start_local.elapsed()); + + log::info!("start exporting tiles (GLB)..."); + let start_local = std::time::Instant::now(); + let tile_contents = export_tiles_to_glb( + tmp_tiled_file_dir_path.path(), + output_path, + args.min, + args.max, + args.quantize, + args.gzip_compress, + ) + .unwrap(); + log::info!("Finish exporting tiles in {:?}", start_local.elapsed()); + + drop(tmp_tiled_file_dir_path); + + let mut tree = TileTree::default(); + for content in tile_contents { + tree.add_content(content); + } + + let tileset = cesiumtiles::tileset::Tileset { + asset: cesiumtiles::tileset::Asset { + version: "1.1".to_string(), + ..Default::default() + }, + root: tree.into_tileset_root(), + geometric_error: 1e+100, + ..Default::default() + }; + + let root_tileset_path = output_path.join("tileset.json"); + log::info!("write tileset.json: {:?}", root_tileset_path); + fs::create_dir_all(root_tileset_path.parent().unwrap()).unwrap(); + fs::write( + root_tileset_path, + serde_json::to_string_pretty(&tileset).unwrap(), + ) + .unwrap(); + } + Ok(()) +} + +fn main() -> std::io::Result<()> { + Builder::new() + .format(|buf, record| { + writeln!( + buf, + "{} [{}] - {}", + Local::now().format("%Y-%m-%d %H:%M:%S"), + record.level(), + record.args() + ) + }) + .filter(None, LevelFilter::Info) + .init(); + + let args = Cli::parse(); + + log::info!("input files: {:?}", args.input); + log::info!("output folder: {}", args.output); + log::info!("input EPSG: {}", args.input_epsg); + log::info!("output EPSG: {}", args.output_epsg); + log::info!("min zoom: {}", args.min); + log::info!("max zoom: {}", args.max); + log::info!("max memory mb: {}", args.max_memory_mb); + log::info!("quantize: {}", args.quantize); + log::info!("gzip compress: {}", args.gzip_compress); + + let start = std::time::Instant::now(); + + log::info!("start processing..."); + let input_files = expand_globs(args.input.clone()); + log::info!("Expanded input files: {:?}", input_files); + + let output_path = PathBuf::from(args.output.clone()); + std::fs::create_dir_all(&output_path).unwrap(); + + let total_size = estimate_total_size(&input_files); + let max_memory_bytes = args.max_memory_mb as u64 * 1024 * 1024; + log::info!( + "Total input size: {}, threshold: {}", + total_size, + max_memory_bytes + ); + + if total_size <= max_memory_bytes { + log::info!("Using in-memory workflow"); + in_memory_workflow(input_files, &args, &output_path)?; + } else { + log::info!("Using external sort workflow"); + external_sort_workflow(input_files, &args, &output_path)?; + } log::info!("Elapsed: {:?}", start.elapsed()); log::info!("Finish processing"); + + Ok(()) }