diff --git a/CHANGELOG.md b/CHANGELOG.md index 321155371..5971ea95b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ - #1007 Fix arrow and spdlog compilation issues - #1068 Just adds a docs important links and avoid the message about filesystem authority not found - #1074: Remove lock inside grow() method from PinnedBufferProvider +- #1071 Fix crash when loading an empty folder # BlazingSQL 0.15.0 (August 31, 2020) diff --git a/engine/src/io/data_provider/DataProvider.h b/engine/src/io/data_provider/DataProvider.h index 992a29ff4..f94591e33 100644 --- a/engine/src/io/data_provider/DataProvider.h +++ b/engine/src/io/data_provider/DataProvider.h @@ -36,12 +36,6 @@ struct data_handle { Uri uri; // in case the data was loaded from a file bool is_valid(){ - // sometimes parquet directories have a `_metadata` file that have not the same schema as the *.parquet files - // we don't want the data provider handle this one. - std::string file_name = uri.toString(true); - std::string metadata_str = file_name.substr(file_name.size() - 9); - if (metadata_str == "_metadata") return false; - return fileHandle != nullptr || !uri.isEmpty() ; } }; diff --git a/engine/src/io/data_provider/UriDataProvider.cpp b/engine/src/io/data_provider/UriDataProvider.cpp index 68eb15e85..5e5cfbc83 100644 --- a/engine/src/io/data_provider/UriDataProvider.cpp +++ b/engine/src/io/data_provider/UriDataProvider.cpp @@ -150,18 +150,50 @@ data_handle uri_data_provider::get_next(bool open_file) { this->directory_uris = BlazingContext::getInstance()->getFileSystemManager()->list(target_uri); } - std::string ender = ".crc"; + // sometimes parquet directories have somes files that + // have not the same schema as the *.parquet files + // we don't want the data provider handle this ones + std::vector ignored_suffixes { + ".crc", + "_metadata", + "_SUCCESS", + ".ipynb_checkpoints" + }; + std::vector new_uris; for(int i = 0; i < this->directory_uris.size(); i++) { std::string fileName = this->directory_uris[i].getPath().toString(); - if(!StringUtil::endsWith(fileName, ender)) { + bool is_valid=true; + for(std::string ender : ignored_suffixes) { + if(StringUtil::endsWith(fileName, ender)) { + is_valid=false; + break; + } + } + + if(is_valid){ new_uris.push_back(this->directory_uris[i]); } } - this->directory_uris = new_uris; + this->directory_uris = new_uris; this->directory_current_file = 0; + + // If this->directory_uris is empty, + // the folder is empty, we just skip it + if(this->directory_uris.size()==0){ + this->current_file++; + + auto logger = spdlog::get("batch_logger"); + if(logger != nullptr) { + logger->warn("|||{info}|||||", "info"_a="Folder is empty"); + } + + data_handle empty_handle; + return empty_handle; + } + return get_next(open_file); } else if(fileStatus.isFile()) { diff --git a/engine/tests/CMakeLists.txt b/engine/tests/CMakeLists.txt index 17fa7e23d..850619341 100644 --- a/engine/tests/CMakeLists.txt +++ b/engine/tests/CMakeLists.txt @@ -86,5 +86,6 @@ add_subdirectory(skipdata) add_subdirectory(sort) add_subdirectory(waiting_queue) add_subdirectory(kernel_tests) +add_subdirectory(provider) message(STATUS "******** Tests are ready ********") diff --git a/engine/tests/provider/CMakeLists.txt b/engine/tests/provider/CMakeLists.txt new file mode 100644 index 000000000..b78f55cd0 --- /dev/null +++ b/engine/tests/provider/CMakeLists.txt @@ -0,0 +1,5 @@ +set(provider_sources + provider_test.cpp +) + +configure_test(provider_test "${provider_sources}") \ No newline at end of file diff --git a/engine/tests/provider/provider_test.cpp b/engine/tests/provider/provider_test.cpp new file mode 100644 index 000000000..6636090de --- /dev/null +++ b/engine/tests/provider/provider_test.cpp @@ -0,0 +1,92 @@ +#include +#include "tests/utilities/BlazingUnitTest.h" +#include "io/data_provider/UriDataProvider.h" +#include "FileSystem/LocalFileSystem.h" +#include "Util/StringUtil.h" + +struct ProviderTest : public BlazingUnitTest {}; + +TEST_F(ProviderTest, non_existent_directory) { + + std::string filename = "/fake/"; + std::vector uris = {Uri{filename}}; + + auto provider = std::make_shared(uris); + + bool open_file = false; + if(provider->has_next()){ + try{ + ral::io::data_handle new_handle = provider->get_next(open_file); + FAIL(); + } + catch(std::runtime_error e){ + SUCCEED(); + } + catch(std::exception e){ + FAIL(); + } + catch(...){ + FAIL(); + } + } +} + +void create_dummy_file(std::string content, std::string filename){ + std::ofstream outfile(filename, std::ofstream::out); + outfile << content << std::endl; + outfile.close(); +} + +TEST_F(ProviderTest, ignoring_dummy_files) { + + std::vector test_files = {"/tmp/file.crc", "/tmp/file_SUCCESS", "/tmp/file_metadata", "/tmp/file.csv"}; + + create_dummy_file("some crc", test_files[0]); + create_dummy_file("some flag", test_files[1]); + create_dummy_file("some meta", test_files[2]); + create_dummy_file("a|b\n0|0", test_files[3]); + + std::vector uris = {Uri{"/tmp/file*"}}; + + auto provider = std::make_shared(uris); + + bool open_file = false; + + std::vector result; + + while(provider->has_next()){ + ral::io::data_handle new_handle = provider->get_next(open_file); + std::string file_name = new_handle.uri.toString(true); + result.push_back(file_name); + } + + EXPECT_EQ(result.size(), 1); + EXPECT_EQ(result[0], "/tmp/file.csv"); +} + +TEST_F(ProviderTest, empty_dir) { + + std::unique_ptr localFileSystem(new LocalFileSystem(Path("/"))); + + const int length = 10; + std::string dirname = "/tmp/" + randomString(length); + + bool dir_create_ok = localFileSystem->makeDirectory(Uri{dirname}); + ASSERT_TRUE(dir_create_ok); + + std::vector uris = {Uri{dirname}}; + auto provider = std::make_shared(uris); + + bool open_file = false; + + std::vector result; + + if(provider->has_next()){ + ral::io::data_handle new_handle = provider->get_next(open_file); + // an empty folder must return an empty handle + EXPECT_EQ(new_handle.uri.isEmpty(), true); + } + + bool dir_remove_ok = localFileSystem->remove(Uri{dirname}); + ASSERT_TRUE(dir_remove_ok); +}