-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kernel]Add simple crc post commit for incremental crc writing. #4134
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the change in this file mainly parameterized certain test case for verifying crc writing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Left some feedback on some of the code comments + organziation. Thanks!
Also -- please update the PR description.
kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/ChecksumUtils.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/ChecksumWriter.java
Outdated
Show resolved
Hide resolved
|
||
private final Path logPath; | ||
// Constants for schema field names | ||
private static final String TABLE_SIZE_BYTES = "tableSizeBytes"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static fields should come before member fields
kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/ChecksumWriter.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/ChecksumWriter.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/test/scala/io/delta/kernel/internal/checksum/ChecksumWriterSuite.scala
Outdated
Show resolved
Hide resolved
.../kernel-defaults/src/test/scala/io/delta/kernel/defaults/ChecksumSimpleComparisonSuite.scala
Outdated
Show resolved
Hide resolved
.../kernel-defaults/src/test/scala/io/delta/kernel/defaults/ChecksumSimpleComparisonSuite.scala
Outdated
Show resolved
Hide resolved
.../kernel-defaults/src/test/scala/io/delta/kernel/defaults/ChecksumSimpleComparisonSuite.scala
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
test("create table as select and verify checksum") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry - what's the benefit of testing "create table as select" vs just "create table and insert to"?
are you expecting the CRC to be/look different between the two cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only difference is the 00000.crc, ctas has added file while create table doesn't, so I added this test for completeness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the CTAS case, we believe it is a bit redundant here.
.withSchema(engine, new StructType().add("id", INTEGER)) | ||
.build(engine) | ||
|
||
copyAndCommitTxnForUnpartitionedTable(txn, engine, sparkTablePath, versionAtCommit = 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't quite understand what this method is doing -- can you explain to me (on this PR, not in the code)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to make the test logic like
1.commit to spark table using ctas/dml, which produces a log with add file
2.read the log in step 1 and commit to the kernel table (make sore both table's log pointing to the same sets of parquet file to validate table size is correct in crc)
3.verify crc written stats are aligned
This method is mainly for step 2.
Does "copyDeltaLogFromSparkToKernel" a better name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some more documentation to the test class and method as follow up
long version, ColumnarBatch batch, int rowId, String crcFilePath) { | ||
Protocol protocol = | ||
Protocol.fromColumnVector( | ||
batch.getColumnVector(CRC_FILE_SCHEMA.indexOf("protocol")), rowId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if you define PROTOCOL
then you might as well use it here, eh?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, my bad, I forgot to update them.
public static final String PROTOCOL = "protocol"; | ||
public static final String TXN_ID = "txnId"; | ||
|
||
public static StructType CRC_FILE_SCHEMA = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this final
kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/ChecksumWriter.java
Show resolved
Hide resolved
newChecksumPath); | ||
} | ||
|
||
private Row buildCheckSumRow(CRCInfo crcInfo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just have toRow
on CRCInfo
? I think that's a better spot for this than inside of ChecksumWriter
. i.e. lower coupling, higher cohesion
} | ||
} | ||
|
||
private def verifyMetadataAndProtocol(row: Row, metadata: Metadata, protocol: Protocol): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expectedMetadata
expectedProtocol
) | ||
} | ||
|
||
private def checkMetadata(metadata: Metadata, metadataRow: Row): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expectedMetadata
actualMetadataRow
Which Delta project/connector is this regarding?
Description
This PR introduces a new post commit hook - ChecksumSimple, for writing CRC file after txn commit.
CRC file will only be written only commit version - 1's snapshot reads CRC during state construction
Other case will be handled in a separate PR
How was this patch tested?
E2e test
Does this PR introduce any user-facing changes?
No