-
Notifications
You must be signed in to change notification settings - Fork 77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(spark): support ExistenceJoin internal join type #333
base: main
Are you sure you want to change the base?
Conversation
ea009b1
to
cc17adb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Couple comments, some on the impl but also main one on the approach - rather than converting the InPredicate expression into ExistenceJoin logicalplan, could we convert it to Exists expression and let Spark do the conversion into ExistenceJoin?
@@ -21,7 +21,7 @@ import io.substrait.spark.DefaultExpressionVisitor | |||
import org.apache.spark.sql.catalyst.util.DateTimeUtils | |||
|
|||
import io.substrait.expression.{Expression, FieldReference} | |||
import io.substrait.expression.Expression.{DateLiteral, DecimalLiteral, I32Literal, StrLiteral} | |||
import io.substrait.expression.Expression.{DateLiteral, DecimalLiteral, I32Literal, I64Literal, StrLiteral} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to this PR, but FWIW I'd vote for removing these /debug/
things (and have done so in our fork), it's a lot of boilerplate code to maintain for not that much value 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I have mixed feelings on this. I might remove this in a followup PR.
require(sparkPlan2.resolved) | ||
|
||
// and back to substrait again | ||
val substraitPlan3 = new ToSubstraitRel().visit(sparkPlan2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this additional conversion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really adding an extra conversion, I'm just moving the conversion to/from protobuf into the critical path. A the moment it just invokes the protobuf conversion but doesn't check it did the right thing.
Although this is not really core to this PR, so I'm happy to remove this if you prefer :)
val protoPlan = io.substrait.proto.Rel.parseFrom(bytes) | ||
val substraitPlan2 = | ||
new ProtoRelConverter(extensionCollector, SparkExtension.COLLECTION).from(protoPlan) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a substraitPlan2.shouldEqualPlainly(substraitPlan)
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could do, although it's checking the overall roundtrip at the end
@@ -68,6 +68,12 @@ private class ToSparkType | |||
|
|||
override def visit(expr: Type.IntervalYear): DataType = YearMonthIntervalType.DEFAULT | |||
|
|||
override def visit(expr: Type.Struct): DataType = { | |||
StructType( | |||
expr.fields.asScala.map(f => StructField(f.toString, f.accept(this), f.nullable())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what kind of names does this result in for the fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be better to do something like adding a nameIdx
field and then
// Default to "col1", "col2", .. like Spark
s"col${nameIdx + 1}"
nameIdx += 1
Filter(condition, child) | ||
} | ||
} | ||
|
||
private def findExistenceJoins( | ||
expression: Expression, | ||
attributes: mutable.ListBuffer[AttributeReference]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just return the list instead of using a mutable arg? the recursion can be done by concatenating/flatMap I think.
Alternatively, this is just trying to recursively find all AttributeReferences? I think you could also use expr.collect
to do that?
Though does this work correctly if the expression is more complicated than just a pure AttributeReference? I guess this PR doesn't produce such plans, but someone else might, so preferably they'd still either work correctly or fail loudly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is gone now - see below...
arg.accept(expr.declaration(), i, this) | ||
arg match { | ||
case ip: SExpression.InPredicate => | ||
existenceJoin(ip) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I think I'd prefer to map this into Spark's Exist
expression, and then let the optimizer do its thing to convert it into an ExistenceJoin. While that's not a 1-to-1 mapping for the case you have here, it feels more general. Some other system might be producing SExpression.InPredicate's for other reasons, and maybe Spark still wants to turn them into ExistenceJoins, but in general that should be Spark's decision, not ours, IMO.
I think that'd also simplify the code here a lot, since you wouldn't need to do the mixing of joins.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think you're absolutely right. I've amended the commit accordingly.
(It's an InSubquery
rather than an Exists
- I'll get to that one later 😅)
For certain filter expressions that embed subqueries, the Spark optimiser replaces these with a Join relation of type ‘ExistenceJoin’. This internal join type does not map directly to any standard SQL join type, or Substrait join type. To address this, it needs to be converted to a substrate ‘InPredicate’ within a filter condition. Signed-off-by: Andrew Coleman <andrew_coleman@uk.ibm.com>
cc17adb
to
151d013
Compare
For certain filter expressions that embed subqueries, the Spark optimiser replaces these with a Join relation of type ‘ExistenceJoin’. This internal join type does not map directly to any standard SQL join type, or Substrait join type.
To address this, it needs to be converted to a substrate ‘InPredicate’ within a filter condition.