Skip to content

Commit

Permalink
Merge pull request #1071 from rommelDB/fix/empty-folder
Browse files Browse the repository at this point in the history
[REVIEW] Fix for empty folders
  • Loading branch information
William Malpica authored Oct 9, 2020
2 parents c3b1ef7 + 0a0a578 commit 82e4e4d
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
- #1007 Fix arrow and spdlog compilation issues
- #1068 Just adds a docs important links and avoid the message about filesystem authority not found
- #1074: Remove lock inside grow() method from PinnedBufferProvider
- #1071 Fix crash when loading an empty folder


# BlazingSQL 0.15.0 (August 31, 2020)
Expand Down
6 changes: 0 additions & 6 deletions engine/src/io/data_provider/DataProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ struct data_handle {
Uri uri; // in case the data was loaded from a file

bool is_valid(){
// sometimes parquet directories have a `_metadata` file that have not the same schema as the *.parquet files
// we don't want the data provider handle this one.
std::string file_name = uri.toString(true);
std::string metadata_str = file_name.substr(file_name.size() - 9);
if (metadata_str == "_metadata") return false;

return fileHandle != nullptr || !uri.isEmpty() ;
}
};
Expand Down
38 changes: 35 additions & 3 deletions engine/src/io/data_provider/UriDataProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,50 @@ data_handle uri_data_provider::get_next(bool open_file) {
this->directory_uris = BlazingContext::getInstance()->getFileSystemManager()->list(target_uri);
}

std::string ender = ".crc";
// sometimes parquet directories have somes files that
// have not the same schema as the *.parquet files
// we don't want the data provider handle this ones
std::vector<std::string> ignored_suffixes {
".crc",
"_metadata",
"_SUCCESS",
".ipynb_checkpoints"
};

std::vector<Uri> new_uris;
for(int i = 0; i < this->directory_uris.size(); i++) {
std::string fileName = this->directory_uris[i].getPath().toString();

if(!StringUtil::endsWith(fileName, ender)) {
bool is_valid=true;
for(std::string ender : ignored_suffixes) {
if(StringUtil::endsWith(fileName, ender)) {
is_valid=false;
break;
}
}

if(is_valid){
new_uris.push_back(this->directory_uris[i]);
}
}
this->directory_uris = new_uris;

this->directory_uris = new_uris;
this->directory_current_file = 0;

// If this->directory_uris is empty,
// the folder is empty, we just skip it
if(this->directory_uris.size()==0){
this->current_file++;

auto logger = spdlog::get("batch_logger");
if(logger != nullptr) {
logger->warn("|||{info}|||||", "info"_a="Folder is empty");
}

data_handle empty_handle;
return empty_handle;
}

return get_next(open_file);

} else if(fileStatus.isFile()) {
Expand Down
1 change: 1 addition & 0 deletions engine/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,6 @@ add_subdirectory(skipdata)
add_subdirectory(sort)
add_subdirectory(waiting_queue)
add_subdirectory(kernel_tests)
add_subdirectory(provider)

message(STATUS "******** Tests are ready ********")
5 changes: 5 additions & 0 deletions engine/tests/provider/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
set(provider_sources
provider_test.cpp
)

configure_test(provider_test "${provider_sources}")
92 changes: 92 additions & 0 deletions engine/tests/provider/provider_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#include <fstream>
#include "tests/utilities/BlazingUnitTest.h"
#include "io/data_provider/UriDataProvider.h"
#include "FileSystem/LocalFileSystem.h"
#include "Util/StringUtil.h"

struct ProviderTest : public BlazingUnitTest {};

TEST_F(ProviderTest, non_existent_directory) {

std::string filename = "/fake/";
std::vector<Uri> uris = {Uri{filename}};

auto provider = std::make_shared<ral::io::uri_data_provider>(uris);

bool open_file = false;
if(provider->has_next()){
try{
ral::io::data_handle new_handle = provider->get_next(open_file);
FAIL();
}
catch(std::runtime_error e){
SUCCEED();
}
catch(std::exception e){
FAIL();
}
catch(...){
FAIL();
}
}
}

void create_dummy_file(std::string content, std::string filename){
std::ofstream outfile(filename, std::ofstream::out);
outfile << content << std::endl;
outfile.close();
}

TEST_F(ProviderTest, ignoring_dummy_files) {

std::vector<std::string> test_files = {"/tmp/file.crc", "/tmp/file_SUCCESS", "/tmp/file_metadata", "/tmp/file.csv"};

create_dummy_file("some crc", test_files[0]);
create_dummy_file("some flag", test_files[1]);
create_dummy_file("some meta", test_files[2]);
create_dummy_file("a|b\n0|0", test_files[3]);

std::vector<Uri> uris = {Uri{"/tmp/file*"}};

auto provider = std::make_shared<ral::io::uri_data_provider>(uris);

bool open_file = false;

std::vector<std::string> result;

while(provider->has_next()){
ral::io::data_handle new_handle = provider->get_next(open_file);
std::string file_name = new_handle.uri.toString(true);
result.push_back(file_name);
}

EXPECT_EQ(result.size(), 1);
EXPECT_EQ(result[0], "/tmp/file.csv");
}

TEST_F(ProviderTest, empty_dir) {

std::unique_ptr<LocalFileSystem> localFileSystem(new LocalFileSystem(Path("/")));

const int length = 10;
std::string dirname = "/tmp/" + randomString(length);

bool dir_create_ok = localFileSystem->makeDirectory(Uri{dirname});
ASSERT_TRUE(dir_create_ok);

std::vector<Uri> uris = {Uri{dirname}};
auto provider = std::make_shared<ral::io::uri_data_provider>(uris);

bool open_file = false;

std::vector<std::string> result;

if(provider->has_next()){
ral::io::data_handle new_handle = provider->get_next(open_file);
// an empty folder must return an empty handle
EXPECT_EQ(new_handle.uri.isEmpty(), true);
}

bool dir_remove_ok = localFileSystem->remove(Uri{dirname});
ASSERT_TRUE(dir_remove_ok);
}

0 comments on commit 82e4e4d

Please sign in to comment.