Skip to content

Commit

Permalink
optimize in memory workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
nokonoko1203 committed Feb 21, 2025
1 parent c80045f commit fff8e2e
Showing 1 changed file with 94 additions and 40 deletions.
134 changes: 94 additions & 40 deletions app/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ fn in_memory_workflow(
output_path: &Path,
) -> std::io::Result<()> {
let jgd2wgs = Arc::new(Jgd2011ToWgs84::default());

let extension = check_and_get_extension(&input_files).unwrap();
let mut reader: Box<dyn PointReader> = match extension {
Extension::Las | Extension::Laz => {
Expand All @@ -392,75 +393,128 @@ fn in_memory_workflow(
Box::new(CsvPointReader::new(input_files.clone()).unwrap())
}
};

log::info!("start parse and transform and tiling...");
let start_local = std::time::Instant::now();

let mut all_points = Vec::new();
while let Ok(Some(p)) = reader.next_point() {
let transformed = transform_point(p, args.input_epsg, args.output_epsg, &jgd2wgs);
all_points.push(transformed);
// let transformed = transform_point(p, args.input_epsg, args.output_epsg, &jgd2wgs);
// all_points.push(transformed);
all_points.push(p);
}

log::info!(
"Finish transforming and tiling in {:?}",
start_local.elapsed()
);

log::info!("start sorting...");
// log::info!("start sorting...");
// let start_local = std::time::Instant::now();
// let max_zoom = args.max;
// let mut keyed_points: Vec<(SortKey, Point)> = all_points
// .into_iter()
// .map(|p| {
// let (z, x, y) = tiling::scheme::zxy_from_lng_lat(max_zoom, p.x, p.y);
// let tile_id = TileIdMethod::Hilbert.zxy_to_id(z, x, y);
// (SortKey { tile_id }, p)
// })
// .collect();
// keyed_points.sort_by_key(|(k, _)| k.tile_id);

log::info!("start grouping...");
let start_local = std::time::Instant::now();

let epsg_in = args.input_epsg;
let epsg_out = args.output_epsg;
let max_zoom = args.max;
let mut keyed_points: Vec<(SortKey, Point)> = all_points
.into_iter()
.map(|p| {
let (z, x, y) = tiling::scheme::zxy_from_lng_lat(max_zoom, p.x, p.y);
let tile_id = TileIdMethod::Hilbert.zxy_to_id(z, x, y);
(SortKey { tile_id }, p)
})
.collect();
keyed_points.sort_by_key(|(k, _)| k.tile_id);

let jgd2wgs_clone = Arc::clone(&jgd2wgs);
let map_init = || HashMap::<u64, Vec<Point>>::new();
let map_fold = move |mut map: HashMap<u64, Vec<Point>>, p: &Point| {
let transformed = transform_point(p.clone(), epsg_in, epsg_out, &jgd2wgs_clone);
let (z, x, y) = tiling::scheme::zxy_from_lng_lat(max_zoom, transformed.x, transformed.y);
let tile_id = TileIdMethod::Hilbert.zxy_to_id(z, x, y);
map.entry(tile_id).or_default().push(transformed);
map
};
let map_reduce = |mut a: HashMap<u64, Vec<Point>>, b: HashMap<u64, Vec<Point>>| {
for (k, mut v) in b {
a.entry(k).or_default().append(&mut v);
}
a
};

let tile_map = all_points
.par_iter()
.fold(map_init, map_fold)
.reduce(map_init, map_reduce);

log::info!(
"Transformed & grouped into {} tiles in {:?}",
tile_map.len(),
start_local.elapsed()
);

let tmp_tiled_file_dir_path = tempdir().unwrap();

{
let mut current_tile_id = None;
let mut buffer = Vec::new();
log::info!("start writing tile files...");
let start_local = std::time::Instant::now();
tile_map
.into_par_iter()
.try_for_each(|(tile_id, points)| -> std::io::Result<()> {
let (z, x, y) = TileIdMethod::Hilbert.id_to_zxy(tile_id);

for (sort_key, point) in keyed_points {
match current_tile_id {
Some(tid) if tid == sort_key.tile_id => {
buffer.push(point);
}
_ => {
if let Some(tid) = current_tile_id {
let tile_coords = TileIdMethod::Hilbert.id_to_zxy(tid);
write_points_to_tile(tmp_tiled_file_dir_path.path(), tile_coords, &buffer)?;
}
current_tile_id = Some(sort_key.tile_id);
buffer.clear();
buffer.push(point);
}
}
}
if let Some(tid) = current_tile_id {
let tile_coords = TileIdMethod::Hilbert.id_to_zxy(tid);
write_points_to_tile(tmp_tiled_file_dir_path.path(), tile_coords, &buffer)?;
}
}
let tile_path = tmp_tiled_file_dir_path
.path()
.join(format!("{}/{}/{}.bin", z, x, y));
fs::create_dir_all(tile_path.parent().unwrap())?;
let file = File::create(tile_path)?;
let mut writer = BufWriter::new(file);
let encoded = bitcode::encode(&points);
writer.write_all(&encoded)?;
Ok(())
})?;

log::info!("Finish sorting in {:?}", start_local.elapsed());
log::info!("Wrote tile files in {:?}", start_local.elapsed());

// {
// let mut current_tile_id = None;
// let mut buffer = Vec::new();

// for (sort_key, point) in keyed_points {
// match current_tile_id {
// Some(tid) if tid == sort_key.tile_id => {
// buffer.push(point);
// }
// _ => {
// if let Some(tid) = current_tile_id {
// let tile_coords = TileIdMethod::Hilbert.id_to_zxy(tid);
// write_points_to_tile(tmp_tiled_file_dir_path.path(), tile_coords, &buffer)?;
// }
// current_tile_id = Some(sort_key.tile_id);
// buffer.clear();
// buffer.push(point);
// }
// }
// }
// if let Some(tid) = current_tile_id {
// let tile_coords = TileIdMethod::Hilbert.id_to_zxy(tid);
// write_points_to_tile(tmp_tiled_file_dir_path.path(), tile_coords, &buffer)?;
// }
// }

// log::info!("Finish sorting in {:?}", start_local.elapsed());

log::info!("start zoom aggregation...");
let start_local = std::time::Instant::now();

for z in (args.min..max_zoom).rev() {
log::info!("aggregating zoom level: {}", z);
aggregate_zoom_level(tmp_tiled_file_dir_path.path(), z)?;
}

log::info!("Finish zoom aggregation in {:?}", start_local.elapsed());

log::info!("start exporting tiles (GLB)...");
let start_local = std::time::Instant::now();

let tile_contents = export_tiles_to_glb(
tmp_tiled_file_dir_path.path(),
Expand Down

0 comments on commit fff8e2e

Please sign in to comment.