-
Notifications
You must be signed in to change notification settings - Fork 77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(spark): support ExistenceJoin internal join type #333
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -86,19 +86,30 @@ trait SubstraitPlanTestBase { self: SharedSparkSession => | |
|
||
def assertSqlSubstraitRelRoundTrip(query: String): LogicalPlan = { | ||
// TODO need a more robust way of testing this than round-tripping. | ||
val logicalPlan = plan(query) | ||
val pojoRel = new ToSubstraitRel().visit(logicalPlan) | ||
val converter = new ToLogicalPlan(spark = spark); | ||
val logicalPlan2 = pojoRel.accept(converter); | ||
require(logicalPlan2.resolved); | ||
val pojoRel2 = new ToSubstraitRel().visit(logicalPlan2) | ||
|
||
val extensionCollector = new ExtensionCollector; | ||
val proto = new RelProtoConverter(extensionCollector).toProto(pojoRel) | ||
new ProtoRelConverter(extensionCollector, SparkExtension.COLLECTION).from(proto) | ||
|
||
pojoRel2.shouldEqualPlainly(pojoRel) | ||
logicalPlan2 | ||
val sparkPlan = plan(query) | ||
|
||
// convert spark logical plan to substrait | ||
val substraitPlan = new ToSubstraitRel().visit(sparkPlan) | ||
|
||
// Serialize to protobuf byte array | ||
val extensionCollector = new ExtensionCollector | ||
val bytes = new RelProtoConverter(extensionCollector).toProto(substraitPlan).toByteArray | ||
|
||
// Read it back | ||
val protoPlan = io.substrait.proto.Rel.parseFrom(bytes) | ||
val substraitPlan2 = | ||
new ProtoRelConverter(extensionCollector, SparkExtension.COLLECTION).from(protoPlan) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we add a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could do, although it's checking the overall roundtrip at the end |
||
// convert substrait back to spark plan | ||
val sparkPlan2 = substraitPlan2.accept(new ToLogicalPlan(spark)) | ||
require(sparkPlan2.resolved) | ||
|
||
// and back to substrait again | ||
val substraitPlan3 = new ToSubstraitRel().visit(sparkPlan2) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why this additional conversion? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not really adding an extra conversion, I'm just moving the conversion to/from protobuf into the critical path. A the moment it just invokes the protobuf conversion but doesn't check it did the right thing. |
||
|
||
// compare with original substrait plan to ensure it round-tripped (via proto bytes) correctly | ||
substraitPlan3.shouldEqualPlainly(substraitPlan) | ||
sparkPlan2 | ||
} | ||
|
||
def plan(sql: String): LogicalPlan = { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to this PR, but FWIW I'd vote for removing these
/debug/
things (and have done so in our fork), it's a lot of boilerplate code to maintain for not that much value 😅There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I have mixed feelings on this. I might remove this in a followup PR.