Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent accessing freed XA-committed RocksDB transactions #1513

Open
wants to merge 1 commit into
base: fb-mysql-8.0.32
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions mysql-test/suite/rocksdb/r/ddse.result
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,15 @@ DROP TABLE t1;
SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
CREATE TABLE t1 (a INT PRIMARY KEY);
DROP TABLE t1;
#
# heap-use-after-free on DDL after XA
#
CREATE TABLE t1(c1 INT NOT NULL) ENGINE=RocksDB;
XA START 'test1';
SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='test';
COUNT(*)
1
XA END 'test1';
XA PREPARE 'test1';
XA COMMIT 'test1';
DROP TABLE t1;
13 changes: 13 additions & 0 deletions mysql-test/suite/rocksdb/t/ddse.test
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,16 @@ SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;

CREATE TABLE t1 (a INT PRIMARY KEY);
DROP TABLE t1;

--echo #
--echo # heap-use-after-free on DDL after XA
--echo #
CREATE TABLE t1(c1 INT NOT NULL) ENGINE=RocksDB;

XA START 'test1';
SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='test';
XA END 'test1';
XA PREPARE 'test1';
XA COMMIT 'test1';

DROP TABLE t1;
44 changes: 37 additions & 7 deletions storage/rocksdb/ha_rocksdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ static Rdb_binlog_manager binlog_manager;
static std::unique_ptr<Rdb_io_watchdog> io_watchdog;
#endif

// The prepared transactions found during crash recovery. Populated in
// rocksdb_recover and cleared in rocskdb_post_recover.
static std::vector<rocksdb::Transaction *> recovered_transactions;

/**
MyRocks background thread control
N.B. This is besides RocksDB's own background threads
Expand Down Expand Up @@ -3977,6 +3981,8 @@ class Rdb_transaction {
}
}

virtual void restart_if_committed_by_xid() {}

void set_params(THD *thd, TABLE_TYPE table_type) {
if (thd_tx_is_dd_trx(thd)) {
assert(is_autocommit(*thd));
Expand Down Expand Up @@ -5350,6 +5356,25 @@ class Rdb_transaction_impl : public Rdb_transaction {
nullptr, nullptr};

public:
// Restart the owned RocksDB transaction if it was committed outside the
// owning Rdb_transaction_impl object. Currently this happens with XA
// transactions followed by DDL when DDSE is MyRocks.
void restart_if_committed_by_xid() override {
if (default_dd_system_storage_engine != DEFAULT_DD_ROCKSDB) return;

auto *const rdb_tx = m_rocksdb_tx[TABLE_TYPE::USER_TABLE].get();

if (rdb_tx == nullptr ||
likely(rdb_tx->GetState() ==
rocksdb::Transaction::TransactionState::STARTED)) {
return;
}

release_snapshot(TABLE_TYPE::USER_TABLE);
release_tx(rdb_tx->GetWriteBatch()->GetDataSize());
start_tx(TABLE_TYPE::USER_TABLE);
}

void set_lock_timeout(int timeout_sec_arg, TABLE_TYPE table_type) override {
assert(!is_ac_nl_ro_rc_transaction());

Expand Down Expand Up @@ -7186,8 +7211,6 @@ static xa_status_code rocksdb_commit_by_xid(
DBUG_RETURN(XAER_RMERR);
}

delete trx;

// `Add()` is implemented in a thread-safe manner.
commit_latency_stats->Add(timer.ElapsedNanos() / 1000);

Expand Down Expand Up @@ -7219,8 +7242,6 @@ static xa_status_code rocksdb_rollback_by_xid(
DBUG_RETURN(XAER_RMERR);
}

delete trx;

DBUG_RETURN(XA_OK);
}

Expand Down Expand Up @@ -7357,11 +7378,10 @@ static int rocksdb_recover(handlerton *const hton [[maybe_unused]],
return HA_EXIT_SUCCESS;
}

std::vector<rocksdb::Transaction *> trans_list;
rdb->GetAllPreparedTransactions(&trans_list);
rdb->GetAllPreparedTransactions(&recovered_transactions);

uint count = 0;
for (auto &trans : trans_list) {
for (const auto &trans : recovered_transactions) {
if (count >= len) {
break;
}
Expand All @@ -7372,6 +7392,13 @@ static int rocksdb_recover(handlerton *const hton [[maybe_unused]],
return count;
}

static void rocksdb_post_recover() {
for (auto &trans : recovered_transactions) {
delete trans;
}
recovered_transactions.clear();
}

static int rocksdb_commit(handlerton *const hton MY_ATTRIBUTE((__unused__)),
THD *const thd, bool all) {
DBUG_ENTER_FUNC();
Expand Down Expand Up @@ -8214,6 +8241,8 @@ static inline void rocksdb_register_tx(
Rdb_transaction *const tx) {
assert(tx != nullptr);

tx->restart_if_committed_by_xid();

trans_register_ha(thd, false, rocksdb_hton, NULL);
if (rocksdb_write_policy == rocksdb::TxnDBWritePolicy::WRITE_UNPREPARED) {
// Some internal operations will call trans_register_ha, but they do not
Expand Down Expand Up @@ -8964,6 +8993,7 @@ static int rocksdb_init_internal(void *const p) {
rocksdb_hton->recover_binlog_pos = rocksdb_recover_binlog_pos;
rocksdb_hton->update_binlog_pos = rocksdb_update_binlog_pos;
rocksdb_hton->recover = rocksdb_recover;
rocksdb_hton->post_recover = rocksdb_post_recover;
rocksdb_hton->commit = rocksdb_commit;
rocksdb_hton->rollback = rocksdb_rollback;
rocksdb_hton->db_type = DB_TYPE_ROCKSDB;
Expand Down