Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spark): support ExistenceJoin internal join type #333

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

andrew-coleman
Copy link
Contributor

For certain filter expressions that embed subqueries, the Spark optimiser replaces these with a Join relation of type ‘ExistenceJoin’. This internal join type does not map directly to any standard SQL join type, or Substrait join type.
To address this, it needs to be converted to a substrate ‘InPredicate’ within a filter condition.

@andrew-coleman
Copy link
Contributor Author

@Blizzara, @vbarua, this is ready for review... many thanks :)

Copy link
Contributor

@Blizzara Blizzara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Couple comments, some on the impl but also main one on the approach - rather than converting the InPredicate expression into ExistenceJoin logicalplan, could we convert it to Exists expression and let Spark do the conversion into ExistenceJoin?

@@ -21,7 +21,7 @@ import io.substrait.spark.DefaultExpressionVisitor
import org.apache.spark.sql.catalyst.util.DateTimeUtils

import io.substrait.expression.{Expression, FieldReference}
import io.substrait.expression.Expression.{DateLiteral, DecimalLiteral, I32Literal, StrLiteral}
import io.substrait.expression.Expression.{DateLiteral, DecimalLiteral, I32Literal, I64Literal, StrLiteral}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR, but FWIW I'd vote for removing these /debug/ things (and have done so in our fork), it's a lot of boilerplate code to maintain for not that much value 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I have mixed feelings on this. I might remove this in a followup PR.

require(sparkPlan2.resolved)

// and back to substrait again
val substraitPlan3 = new ToSubstraitRel().visit(sparkPlan2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this additional conversion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really adding an extra conversion, I'm just moving the conversion to/from protobuf into the critical path. A the moment it just invokes the protobuf conversion but doesn't check it did the right thing.
Although this is not really core to this PR, so I'm happy to remove this if you prefer :)

val protoPlan = io.substrait.proto.Rel.parseFrom(bytes)
val substraitPlan2 =
new ProtoRelConverter(extensionCollector, SparkExtension.COLLECTION).from(protoPlan)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a substraitPlan2.shouldEqualPlainly(substraitPlan) here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could do, although it's checking the overall roundtrip at the end

@@ -68,6 +68,12 @@ private class ToSparkType

override def visit(expr: Type.IntervalYear): DataType = YearMonthIntervalType.DEFAULT

override def visit(expr: Type.Struct): DataType = {
StructType(
expr.fields.asScala.map(f => StructField(f.toString, f.accept(this), f.nullable()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what kind of names does this result in for the fields?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be better to do something like adding a nameIdx field and then

// Default to "col1", "col2", .. like Spark
s"col${nameIdx + 1}"
nameIdx += 1

Filter(condition, child)
}
}

private def findExistenceJoins(
expression: Expression,
attributes: mutable.ListBuffer[AttributeReference]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just return the list instead of using a mutable arg? the recursion can be done by concatenating/flatMap I think.

Alternatively, this is just trying to recursively find all AttributeReferences? I think you could also use expr.collect to do that?

Though does this work correctly if the expression is more complicated than just a pure AttributeReference? I guess this PR doesn't produce such plans, but someone else might, so preferably they'd still either work correctly or fail loudly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is gone now - see below...

arg.accept(expr.declaration(), i, this)
arg match {
case ip: SExpression.InPredicate =>
existenceJoin(ip)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think I'd prefer to map this into Spark's Exist expression, and then let the optimizer do its thing to convert it into an ExistenceJoin. While that's not a 1-to-1 mapping for the case you have here, it feels more general. Some other system might be producing SExpression.InPredicate's for other reasons, and maybe Spark still wants to turn them into ExistenceJoins, but in general that should be Spark's decision, not ours, IMO.

I think that'd also simplify the code here a lot, since you wouldn't need to do the mixing of joins.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you're absolutely right. I've amended the commit accordingly.
(It's an InSubquery rather than an Exists - I'll get to that one later 😅)

For certain filter expressions that embed subqueries, the Spark
optimiser replaces these with a Join relation of type ‘ExistenceJoin’.
This internal join type does not map directly to any standard
SQL join type, or Substrait join type.
To address this, it needs to be converted to a substrate
‘InPredicate’ within a filter condition.

Signed-off-by: Andrew Coleman <andrew_coleman@uk.ibm.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants