Skip to content

Fix issue #912: Support data_schema in CREATE EXTERNAL STREAM (Kafka) #913

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
version: '3.8'

services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"

kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ALLOW_PLAINTEXT_LISTENER: "yes"

9 changes: 9 additions & 0 deletions my_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"type": "record",
"name": "Data",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "score", "type": "float"}
]
}
37 changes: 37 additions & 0 deletions src/Formats/FormatSchemaInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,33 @@
#include <Interpreters/Context.h>
#include <Common/Exception.h>
#include <filesystem>
#include <unordered_map>


namespace DB
{
String avroTypeToClickHouseType(const String & avro_type)
{
static const std::unordered_map<String, String> avro_to_clickhouse = {
{"null", "Nullable(String)"},
{"boolean", "Bool"},
{"int", "int32"},
{"long", "int64"},
{"float", "float32"},
{"double", "float64"},
{"bytes", "string"},
{"string", "string"},
{"array", "Array(string)"},
{"map", "Map(string, string)"},
{"fixed", "FixedString(16)"}
};

auto it = avro_to_clickhouse.find(avro_type);
if (it != avro_to_clickhouse.end())
return it->second;

return "string";
}
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
Expand All @@ -32,6 +55,7 @@ namespace
}



FormatSchemaInfo::FormatSchemaInfo(const String & format_schema, const String & format, bool require_message, bool is_server, const std::string & format_schema_path)
{
if (format_schema.empty())
Expand Down Expand Up @@ -100,6 +124,19 @@ FormatSchemaInfo::FormatSchemaInfo(const String & format_schema, const String &
schema_path = path.filename();
schema_directory = path.parent_path() / "";
}
else if (path.has_parent_path() && !fs::weakly_canonical(default_schema_directory_path / path).string().starts_with(fs::weakly_canonical(default_schema_directory_path).string()))
{
if (is_server)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Path in the 'format_schema' setting shouldn't go outside the 'format_schema_path' directory: {} ({} not in {})",
default_schema_directory(),
path.string(),
default_schema_directory());
path = default_schema_directory_path / path;
schema_path = path.filename();
schema_directory = path.parent_path() / "";
}
else
{
schema_path = path;
Expand Down
2 changes: 2 additions & 0 deletions src/Formats/FormatSchemaInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

namespace DB
{
String avroTypeToClickHouseType(const String & avro_type);

class Context;

/// Extracts information about where the format schema file is from passed context and keep it.
Expand Down
75 changes: 73 additions & 2 deletions src/Interpreters/InterpreterCreateQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

#include <Columns/ColumnString.h>

#include <Formats/FormatSchemaInfo.h>

#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>

Expand All @@ -26,6 +30,11 @@
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>

#include <Poco/JSON/Parser.h>
#include <Poco/Dynamic/Var.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Array.h>

#include <Storages/StorageFactory.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/Streaming/StorageStream.h>
Expand Down Expand Up @@ -1458,8 +1467,70 @@ BlockIO InterpreterCreateQuery::execute()
/// CREATE|ATTACH DATABASE
if (create.database && !create.table)
return createDatabase(create);
else
return createTable(create);
// else
// return createTable(create);

/// CREATE EXTERNAL STREAM
if (create.is_external && create.storage && create.storage->settings)
{
// Extract data_schema from SETTINGS
auto * set_query = create.storage->settings;
for (const auto & change : set_query->changes)
{
if (change.name == "data_schema")
create.data_schema = change.value.safeGet<String>();
}
// Automatically derive schema
if (!create.columns_list && create.data_schema)
{
// FORMAT SCHEMA JSON
FormatSchemaInfo schema_info(*create.data_schema, "Avro", false, getContext()->getApplicationType() == Context::ApplicationType::SERVER, getContext()->getFormatSchemaPath());
String schema_path = schema_info.absoluteSchemaPath();

std::string avro_schema_json;
{
ReadBufferFromFile in(schema_path);
readStringUntilEOF(avro_schema_json, in);
}

Poco::JSON::Parser parser;
Poco::Dynamic::Var parsed_result = parser.parse(avro_schema_json);
Poco::JSON::Object::Ptr schema_obj = parsed_result.extract<Poco::JSON::Object::Ptr>();

if (!schema_obj->has("fields"))
{
throw Exception("Invalid Avro schema: 'fields' not found in schema: " + *create.data_schema,
ErrorCodes::BAD_ARGUMENTS);
}

Poco::JSON::Array::Ptr fields = schema_obj->getArray("fields");
auto columns_list = std::make_shared<ASTColumns>();
auto expr_list = std::make_shared<ASTExpressionList>();
columns_list->columns = expr_list.get();
columns_list->children.push_back(expr_list);

for (size_t i = 0; i < fields->size(); ++i)
{
Poco::JSON::Object::Ptr field = fields->getObject(i);
String column_name = field->getValue<String>("name");
String column_type = field->getValue<String>("type");

// Create ASTColumnDeclaration
auto column_decl = std::make_shared<ASTColumnDeclaration>();
column_decl->name = column_name;

// Change Avro type into ClickHouse Type
String clickhouse_type = DB::avroTypeToClickHouseType(column_type);
auto type_ast = std::make_shared<ASTIdentifier>(clickhouse_type);
column_decl->type = type_ast;

expr_list->children.push_back(column_decl);
}

create.set(create.columns_list, columns_list);
}
}
return createTable(create);
}


Expand Down
9 changes: 9 additions & 0 deletions src/Parsers/ASTCreateQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ ASTPtr ASTCreateQuery::clone() const
if (comment)
res->set(res->comment, comment->clone());

res->data_schema = data_schema;

cloneOutputOptions(*res);
cloneTableOptions(*res);

Expand Down Expand Up @@ -254,6 +256,13 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
comment->formatImpl(settings, state, frame);
}

if (data_schema)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << settings.nl_or_ws
<< "data_schema = " << (settings.hilite ? hilite_none : "")
<< quoteString(*data_schema);
}

return;
}

Expand Down
3 changes: 3 additions & 0 deletions src/Parsers/ASTCreateQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ class ASTCreateQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnC
/// CREATE EXTERNAL STREAM
bool is_external = false;

/// For store data_schema
std::optional<String> data_schema;

ASTColumns * columns_list = nullptr;
ASTExpressionList * tables = nullptr;

Expand Down
8 changes: 8 additions & 0 deletions src/Storages/ExecutableSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ void ExecutableSettings::loadFromQuery(ASTStorage & storage_def)
{
try
{
for (const auto & change : storage_def.settings->changes)
{
if (change.name == "data_schema")
{
this->data_schema = change.value.safeGet<String>();
}
}

applyChanges(storage_def.settings->changes);
}
catch (Exception & e)
Expand Down
1 change: 1 addition & 0 deletions src/Storages/ExecutableSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace DB
class ASTStorage;

#define LIST_OF_EXECUTABLE_SETTINGS(M) \
M(String, data_schema, "", "Avro schema identifier for the stream", 0) \
M(Bool, send_chunk_header, false, "Send number_of_rows\n before sending chunk to process.", 0) \
M(UInt64, pool_size, 16, "Processes pool size. If size == 0, then there is no size restrictions.", 0) \
M(UInt64, max_command_execution_time, 10, "Max command execution time in seconds.", 0) \
Expand Down
1 change: 1 addition & 0 deletions src/Storages/ExternalStream/ExternalStreamSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class ASTStorage;

#define ALL_EXTERNAL_STREAM_SETTINGS(M) \
M(String, type, "", "External stream type", 0) \
M(String, data_schema, "", "Avro schema identifier for the stream", 0) \
KAFKA_EXTERNAL_STREAM_SETTINGS(M) \
LOG_FILE_EXTERNAL_STREAM_SETTINGS(M) \
PULSAR_EXTERNAL_STREAM_SETTINGS(M) \
Expand Down