Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Mar 6, 2025
1 parent 684b2fa commit b7860c4
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 42 deletions.
6 changes: 3 additions & 3 deletions crates/polars-io/src/cloud/adaptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl BlockingCloudWriter {
/// the writer is guaranteed to be in an error state.
pub fn close(&mut self) -> std::io::Result<()> {
match self
.try_with_writer(|writer| get_runtime().block_on_potential_spawn(writer.shutdown()))
.try_with_writer(|writer| get_runtime().block_in_place_on(writer.shutdown()))
{
Ok(_) => {
self.state = Err(std::io::Error::new(std::io::ErrorKind::Other, "closed"));
Expand Down Expand Up @@ -112,12 +112,12 @@ impl std::io::Write for BlockingCloudWriter {

self.try_with_writer(|writer| {
get_runtime()
.block_on_potential_spawn(async { writer.write_all(buf).await.map(|_t| buf.len()) })
.block_in_place_on(async { writer.write_all(buf).await.map(|_t| buf.len()) })
})
}

fn flush(&mut self) -> std::io::Result<()> {
self.try_with_writer(|writer| get_runtime().block_on_potential_spawn(writer.flush()))
self.try_with_writer(|writer| get_runtime().block_in_place_on(writer.flush()))
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/file_cache/file_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl FileFetcher for CloudFileFetcher {

fn fetch_metadata(&self) -> PolarsResult<RemoteMetadata> {
let metadata = pl_async::get_runtime()
.block_on_potential_spawn(self.object_store.head(&self.cloud_path))?;
.block_in_place_on(self.object_store.head(&self.cloud_path))?;

Ok(RemoteMetadata {
size: metadata.size as u64,
Expand All @@ -108,7 +108,7 @@ impl FileFetcher for CloudFileFetcher {
}

fn fetch(&self, local_path: &std::path::Path) -> PolarsResult<()> {
pl_async::get_runtime().block_on_potential_spawn(async {
pl_async::get_runtime().block_in_place_on(async {
let file = &mut tokio::fs::OpenOptions::new()
.write(true)
.truncate(true)
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/file_cache/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub fn init_entries_from_uri_list(
.unwrap_or_else(get_env_file_cache_ttl);

if is_cloud_url(first_uri) {
let object_stores = pl_async::get_runtime().block_on_potential_spawn(async {
let object_stores = pl_async::get_runtime().block_in_place_on(async {
futures::future::try_join_all(
(0..if first_uri.starts_with("http") {
// Object stores for http are tied to the path.
Expand Down
14 changes: 7 additions & 7 deletions crates/polars-io/src/path_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,14 @@ pub fn expand_paths_hive(
use crate::cloud::object_path_from_str;

if first_path.starts_with("hf://") {
let (expand_start_idx, paths) = crate::pl_async::get_runtime()
.block_on_potential_spawn(hugging_face::expand_paths_hf(
let (expand_start_idx, paths) = crate::pl_async::get_runtime().block_in_place_on(
hugging_face::expand_paths_hf(
paths,
check_directory_level,
cloud_options,
glob,
))?;
),
)?;

return Ok((Arc::from(paths), expand_start_idx));
}
Expand All @@ -322,7 +323,7 @@ pub fn expand_paths_hive(
let expand_path_cloud = |path: &str,
cloud_options: Option<&CloudOptions>|
-> PolarsResult<(usize, Vec<PathBuf>)> {
crate::pl_async::get_runtime().block_on_potential_spawn(async {
crate::pl_async::get_runtime().block_in_place_on(async {
let (cloud_location, store) =
crate::cloud::build_object_store(path, cloud_options, glob).await?;
let prefix = object_path_from_str(&cloud_location.prefix)?;
Expand Down Expand Up @@ -425,9 +426,8 @@ pub fn expand_paths_hive(

hive_idx_tracker.update(0, path_idx)?;

let iter = crate::pl_async::get_runtime().block_on_potential_spawn(
crate::async_glob(path.to_str().unwrap(), cloud_options),
)?;
let iter = crate::pl_async::get_runtime()
.block_in_place_on(crate::async_glob(path.to_str().unwrap(), cloud_options))?;

if is_cloud {
out_paths.extend(iter.into_iter().map(PathBuf::from));
Expand Down
14 changes: 7 additions & 7 deletions crates/polars-io/src/pl_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,21 +295,21 @@ impl RuntimeManager {
Self { rt }
}

/// Keep track of rayon threads that drive the runtime. Every thread
/// only allows a single runtime. If this thread calls block_on and this
/// rayon thread is already driving an async execution we must start a new thread
/// otherwise we panic. This can happen when we parallelize reads over 100s of files.
/// Shorthand for `tokio::task::block_in_place(|| block_on(f))`. This is a variant of `block_on`
/// that is safe to call from if the current thread has already entered the async runtime, or
/// is a rayon thread.
///
/// # Safety
/// The tokio runtime flavor is multi-threaded.
pub fn block_on_potential_spawn<F>(&'static self, future: F) -> F::Output
pub fn block_in_place_on<F>(&self, future: F) -> F::Output
where
F: Future + Send,
F::Output: Send,
F: Future,
{
tokio::task::block_in_place(|| self.rt.block_on(future))
}

/// Note: `block_in_place_on` should be used instead if the current thread is a rayon thread or
/// has already entered the async runtime.
pub fn block_on<F>(&self, future: F) -> F::Output
where
F: Future,
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/utils/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Writeable {
}

let writer = crate::pl_async::get_runtime()
.block_on_potential_spawn(BlockingCloudWriter::new(path, cloud_options))?;
.block_in_place_on(BlockingCloudWriter::new(path, cloud_options))?;
Ok(Self::Cloud(writer))
})
} else if config::force_async() {
Expand Down Expand Up @@ -79,7 +79,7 @@ impl Writeable {
}

let writer = crate::pl_async::get_runtime()
.block_on_potential_spawn(BlockingCloudWriter::new(&path, cloud_options))?;
.block_in_place_on(BlockingCloudWriter::new(&path, cloud_options))?;
Ok(Self::Cloud(writer))
})
} else {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-mem-engine/src/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl IpcExec {
eprintln!("ASYNC READING FORCED");
}

polars_io::pl_async::get_runtime().block_on_potential_spawn(self.read_async())?
polars_io::pl_async::get_runtime().block_in_place_on(self.read_async())?
})
} else {
self.read_sync().map_err(|e| match &self.sources {
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ impl ParquetExec {
eprintln!("ASYNC READING FORCED");
}

polars_io::pl_async::get_runtime().block_on_potential_spawn(self.read_async())?
polars_io::pl_async::get_runtime().block_in_place_on(self.read_async())?
})
} else {
self.read_par()?
Expand Down Expand Up @@ -567,7 +567,7 @@ impl ParquetExec {
#[cfg(feature = "cloud")]
if self.sources.is_cloud_url() {
return polars_io::pl_async::get_runtime()
.block_on_potential_spawn(self.metadata_async());
.block_in_place_on(self.metadata_async());
}

self.metadata_sync()
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,14 @@ impl ParquetSource {

let batched_readers = if needs_exact_processed_rows_count {
// We run serially to ensure we have a correct processed_rows count.
polars_io::pl_async::get_runtime().block_on_potential_spawn(async {
polars_io::pl_async::get_runtime().block_in_place_on(async {
futures::stream::iter(init_iter)
.then(|x| x)
.try_collect()
.await
})?
} else {
polars_io::pl_async::get_runtime().block_on_potential_spawn(async {
polars_io::pl_async::get_runtime().block_in_place_on(async {
futures::future::try_join_all(init_iter).await
})?
};
Expand Down Expand Up @@ -365,7 +365,7 @@ impl Source for ParquetSource {
};

let batches =
get_runtime().block_on_potential_spawn(reader.next_batches(self.n_threads))?;
get_runtime().block_in_place_on(reader.next_batches(self.n_threads))?;

Ok(match batches {
None => {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/conversion/scans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub(super) fn parquet_file_info(
let first_path = &sources.as_paths().unwrap()[0];
feature_gated!("cloud", {
let uri = first_path.to_string_lossy();
get_runtime().block_on_potential_spawn(async {
get_runtime().block_in_place_on(async {
let mut reader =
ParquetAsyncReader::from_uri(&uri, cloud_options, None).await?;

Expand Down
25 changes: 12 additions & 13 deletions crates/polars-python/src/catalog/unity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl PyCatalogClient {

pub fn list_catalogs(&self, py: Python) -> PyResult<PyObject> {
let v = py.enter_polars(|| {
pl_async::get_runtime().block_on_potential_spawn(self.client().list_catalogs())
pl_async::get_runtime().block_in_place_on(self.client().list_catalogs())
})?;

let mut opt_err = None;
Expand All @@ -86,8 +86,7 @@ impl PyCatalogClient {
#[pyo3(signature = (catalog_name))]
pub fn list_namespaces(&self, py: Python, catalog_name: &str) -> PyResult<PyObject> {
let v = py.enter_polars(|| {
pl_async::get_runtime()
.block_on_potential_spawn(self.client().list_namespaces(catalog_name))
pl_async::get_runtime().block_in_place_on(self.client().list_namespaces(catalog_name))
})?;

let mut opt_err = None;
Expand Down Expand Up @@ -120,7 +119,7 @@ impl PyCatalogClient {
) -> PyResult<PyObject> {
let v = py.enter_polars(|| {
pl_async::get_runtime()
.block_on_potential_spawn(self.client().list_tables(catalog_name, namespace))
.block_in_place_on(self.client().list_tables(catalog_name, namespace))
})?;

let mut opt_err = None;
Expand Down Expand Up @@ -155,7 +154,7 @@ impl PyCatalogClient {
) -> PyResult<PyObject> {
let table_info = py
.enter_polars(|| {
pl_async::get_runtime().block_on_potential_spawn(self.client().get_table_info(
pl_async::get_runtime().block_in_place_on(self.client().get_table_info(
table_name,
catalog_name,
namespace,
Expand All @@ -176,7 +175,7 @@ impl PyCatalogClient {
let table_credentials = py
.enter_polars(|| {
pl_async::get_runtime()
.block_on_potential_spawn(self.client().get_table_credentials(table_id, write))
.block_in_place_on(self.client().get_table_credentials(table_id, write))
})
.map_err(to_py_err)?;

Expand Down Expand Up @@ -245,7 +244,7 @@ impl PyCatalogClient {
retries: usize,
) -> PyResult<PyLazyFrame> {
let table_info = py.enter_polars(|| {
pl_async::get_runtime().block_on_potential_spawn(self.client().get_table_info(
pl_async::get_runtime().block_in_place_on(self.client().get_table_info(
catalog_name,
namespace,
table_name,
Expand Down Expand Up @@ -282,7 +281,7 @@ impl PyCatalogClient {
) -> PyResult<PyObject> {
let catalog_info = py
.allow_threads(|| {
pl_async::get_runtime().block_on_potential_spawn(self.client().create_catalog(
pl_async::get_runtime().block_in_place_on(self.client().create_catalog(
catalog_name,
comment,
storage_root,
Expand All @@ -297,7 +296,7 @@ impl PyCatalogClient {
pub fn delete_catalog(&self, py: Python, catalog_name: &str, force: bool) -> PyResult<()> {
py.allow_threads(|| {
pl_async::get_runtime()
.block_on_potential_spawn(self.client().delete_catalog(catalog_name, force))
.block_in_place_on(self.client().delete_catalog(catalog_name, force))
})
.map_err(to_py_err)
}
Expand All @@ -313,7 +312,7 @@ impl PyCatalogClient {
) -> PyResult<PyObject> {
let namespace_info = py
.allow_threads(|| {
pl_async::get_runtime().block_on_potential_spawn(self.client().create_namespace(
pl_async::get_runtime().block_in_place_on(self.client().create_namespace(
catalog_name,
namespace,
comment,
Expand All @@ -334,7 +333,7 @@ impl PyCatalogClient {
force: bool,
) -> PyResult<()> {
py.allow_threads(|| {
pl_async::get_runtime().block_on_potential_spawn(self.client().delete_namespace(
pl_async::get_runtime().block_in_place_on(self.client().delete_namespace(
catalog_name,
namespace,
force,
Expand Down Expand Up @@ -362,7 +361,7 @@ impl PyCatalogClient {
) -> PyResult<PyObject> {
let table_info = py.allow_threads(|| {
pl_async::get_runtime()
.block_on_potential_spawn(
.block_in_place_on(
self.client().create_table(
catalog_name,
namespace,
Expand Down Expand Up @@ -395,7 +394,7 @@ impl PyCatalogClient {
table_name: &str,
) -> PyResult<()> {
py.allow_threads(|| {
pl_async::get_runtime().block_on_potential_spawn(self.client().delete_table(
pl_async::get_runtime().block_in_place_on(self.client().delete_table(
catalog_name,
namespace,
table_name,
Expand Down

0 comments on commit b7860c4

Please sign in to comment.