diff --git a/mysql-test/combinations b/mysql-test/combinations index 9f9a4eb9ec06..b926ea840fc0 100644 --- a/mysql-test/combinations +++ b/mysql-test/combinations @@ -1,8 +1,9 @@ [innodb_intrinsic_table] enable_rocksdb_intrinsic_tmp_table= OFF -loose-rocksdb_enable_tmp_table = OFF +loose-rocksdb_enable_tmp_table= OFF [rocksdb_intrinsic_table] enable_rocksdb_intrinsic_tmp_table= ON -loose-rocksdb_enable_tmp_table = ON -loose-rocksdb_enable_delete_range_for_drop_index = ON +loose-rocksdb_enable_tmp_table= ON +loose-rocksdb_enable_delete_range_for_drop_index= ON +loose-rocksdb_max_intrinsic_tmp_table_write_count= 3 diff --git a/mysql-test/r/mysqld--help-notwin.result b/mysql-test/r/mysqld--help-notwin.result index 4ab0679e3fa7..9b6a6eb7b75d 100644 --- a/mysql-test/r/mysqld--help-notwin.result +++ b/mysql-test/r/mysqld--help-notwin.result @@ -2167,6 +2167,10 @@ The following options may be given as the first argument: Track history for at most this many completed compactions. The history is in the INFORMATION_SCHEMA.ROCKSDB_COMPACTION_HISTORY table. + --rocksdb-max-intrinsic-tmp-table-write-count=# + Intrinsic tmp table max allowed write batch size.After + this, current transaction holding write batch will commit + and newtransaction will be started. --rocksdb-max-latest-deadlocks=# Maximum number of recent deadlocks to store --rocksdb-max-log-file-size=# @@ -3620,6 +3624,7 @@ rocksdb-max-background-flushes -1 rocksdb-max-background-jobs 2 rocksdb-max-bottom-pri-background-compactions 0 rocksdb-max-compaction-history 64 +rocksdb-max-intrinsic-tmp-table-write-count 1000 rocksdb-max-latest-deadlocks 5 rocksdb-max-log-file-size 0 rocksdb-max-manifest-file-size 1073741824 diff --git a/mysql-test/suite/json/r/json_table.result b/mysql-test/suite/json/r/json_table.result index 8440179ff645..6482857e75a3 100644 --- a/mysql-test/suite/json/r/json_table.result +++ b/mysql-test/suite/json/r/json_table.result @@ -1033,6 +1033,7 @@ Variable_name Value Created_tmp_disk_tables 1 Created_tmp_files 0 Created_tmp_tables 1 +rocksdb_intrinsic_tmp_table_commits 0 set @@max_heap_table_size= @save_heap_size; set @@internal_tmp_mem_storage_engine= @save_mem_se; # @@ -1051,6 +1052,7 @@ Variable_name Value Created_tmp_disk_tables 0 Created_tmp_files 0 Created_tmp_tables 1 +rocksdb_intrinsic_tmp_table_commits 0 # # Bug#25525409: ASSERTION `TABLE_LIST->TABLE' FAILED IN SQL/SQL_BASE.CC # diff --git a/mysql-test/suite/rocksdb/r/rocksdb.result b/mysql-test/suite/rocksdb/r/rocksdb.result index 19e39c1101a9..fb09e222e7c9 100644 --- a/mysql-test/suite/rocksdb/r/rocksdb.result +++ b/mysql-test/suite/rocksdb/r/rocksdb.result @@ -999,6 +999,7 @@ rocksdb_max_background_flushes -1 rocksdb_max_background_jobs 2 rocksdb_max_bottom_pri_background_compactions 0 rocksdb_max_compaction_history 64 +rocksdb_max_intrinsic_tmp_table_write_count 1000 rocksdb_max_latest_deadlocks 5 rocksdb_max_log_file_size 0 rocksdb_max_manifest_file_size 1073741824 @@ -1600,6 +1601,7 @@ rocksdb_table_index_stats_success # rocksdb_table_index_stats_failure # rocksdb_table_index_stats_req_queue_length # rocksdb_covered_secondary_key_lookups # +rocksdb_intrinsic_tmp_table_commits # rocksdb_additional_compaction_triggers # rocksdb_binlog_ttl_compaction_timestamp # rocksdb_block_cache_add # @@ -1753,6 +1755,7 @@ ROCKSDB_GET_HIT_L1 ROCKSDB_GET_HIT_L2_AND_UP ROCKSDB_GIT_DATE ROCKSDB_GIT_HASH +ROCKSDB_INTRINSIC_TMP_TABLE_COMMITS ROCKSDB_ITER_BYTES_READ ROCKSDB_LAST_LEVEL_SEEK_DATA ROCKSDB_LAST_LEVEL_SEEK_DATA_USEFUL_FILTER_MATCH @@ -1887,6 +1890,7 @@ ROCKSDB_GET_HIT_L1 ROCKSDB_GET_HIT_L2_AND_UP ROCKSDB_GIT_DATE ROCKSDB_GIT_HASH +ROCKSDB_INTRINSIC_TMP_TABLE_COMMITS ROCKSDB_ITER_BYTES_READ ROCKSDB_LAST_LEVEL_SEEK_DATA ROCKSDB_LAST_LEVEL_SEEK_DATA_USEFUL_FILTER_MATCH diff --git a/mysql-test/suite/rocksdb/t/tmp_table.test b/mysql-test/suite/rocksdb/t/tmp_table.test index c9b34b7bb91a..9b59ae051834 100644 --- a/mysql-test/suite/rocksdb/t/tmp_table.test +++ b/mysql-test/suite/rocksdb/t/tmp_table.test @@ -360,4 +360,22 @@ show create table tmp1; select * from t1; drop table t1; +#################################################### +### [TODO] Fix bugs: Case 12: Intrinsic tmp table background commits +#################################################### +#--echo Case 12: Intrinsic tmp table background commits +#set big_tables = 1; +#Set global rocksdb_max_intrinsic_tmp_table_write_count = 2; +#let $old_value= query_get_value(show status like "rocksdb_intrinsic_tmp_table_commits", Value, 1); +#create table t1(i int, c char(5)); +#insert into t1 values (0, "aaaa"); +#insert into t1 values (1, "aaaa"); +#select i, c, count(*) from t1 group by i, c having count(*) > 0; +#let $new_value= query_get_value(show status like "rocksdb_intrinsic_tmp_table_commits", Value, 1); +#--let $assert_text = Number of intrinsic tmp table commits should be 1 +#--let $assert_cond= $new_value - $old_value = 1 +#--source include/assert.inc +#Set global rocksdb_max_intrinsic_tmp_table_write_count = default; +#drop table t1; + --source include/wait_until_count_sessions.inc diff --git a/mysql-test/suite/rocksdb_sys_vars/r/rocksdb_max_intrinsic_tmp_table_write_count_basic.result b/mysql-test/suite/rocksdb_sys_vars/r/rocksdb_max_intrinsic_tmp_table_write_count_basic.result new file mode 100644 index 000000000000..11b22c08e485 --- /dev/null +++ b/mysql-test/suite/rocksdb_sys_vars/r/rocksdb_max_intrinsic_tmp_table_write_count_basic.result @@ -0,0 +1,46 @@ +CREATE TABLE valid_values (value varchar(255)) ENGINE=myisam; +INSERT INTO valid_values VALUES(10); +INSERT INTO valid_values VALUES(20); +CREATE TABLE invalid_values (value varchar(255)) ENGINE=myisam; +INSERT INTO invalid_values VALUES('\'aaa\''); +SET @start_global_value = @@global.rocksdb_max_intrinsic_tmp_table_write_count; +SELECT @start_global_value; +@start_global_value +1000 +'# Setting to valid values in global scope#' +"Trying to set variable @@global.rocksdb_max_intrinsic_tmp_table_write_count to 10" +SET @@global.rocksdb_max_intrinsic_tmp_table_write_count = 10; +SELECT @@global.rocksdb_max_intrinsic_tmp_table_write_count; +@@global.rocksdb_max_intrinsic_tmp_table_write_count +10 +"Setting the global scope variable back to default" +SET @@global.rocksdb_max_intrinsic_tmp_table_write_count = DEFAULT; +SELECT @@global.rocksdb_max_intrinsic_tmp_table_write_count; +@@global.rocksdb_max_intrinsic_tmp_table_write_count +1000 +"Trying to set variable @@global.rocksdb_max_intrinsic_tmp_table_write_count to 20" +SET @@global.rocksdb_max_intrinsic_tmp_table_write_count = 20; +SELECT @@global.rocksdb_max_intrinsic_tmp_table_write_count; +@@global.rocksdb_max_intrinsic_tmp_table_write_count +20 +"Setting the global scope variable back to default" +SET @@global.rocksdb_max_intrinsic_tmp_table_write_count = DEFAULT; +SELECT @@global.rocksdb_max_intrinsic_tmp_table_write_count; +@@global.rocksdb_max_intrinsic_tmp_table_write_count +1000 +"Trying to set variable @@session.rocksdb_max_intrinsic_tmp_table_write_count to 444. It should fail because it is not session." +SET @@session.rocksdb_max_intrinsic_tmp_table_write_count = 444; +ERROR HY000: Variable 'rocksdb_max_intrinsic_tmp_table_write_count' is a GLOBAL variable and should be set with SET GLOBAL +'# Testing with invalid values in global scope #' +"Trying to set variable @@global.rocksdb_max_intrinsic_tmp_table_write_count to 'aaa'" +SET @@global.rocksdb_max_intrinsic_tmp_table_write_count = 'aaa'; +Got one of the listed errors +SELECT @@global.rocksdb_max_intrinsic_tmp_table_write_count; +@@global.rocksdb_max_intrinsic_tmp_table_write_count +1000 +SET @@global.rocksdb_max_intrinsic_tmp_table_write_count = @start_global_value; +SELECT @@global.rocksdb_max_intrinsic_tmp_table_write_count; +@@global.rocksdb_max_intrinsic_tmp_table_write_count +1000 +DROP TABLE valid_values; +DROP TABLE invalid_values; diff --git a/mysql-test/suite/rocksdb_sys_vars/t/rocksdb_max_intrinsic_tmp_table_write_count_basic.test b/mysql-test/suite/rocksdb_sys_vars/t/rocksdb_max_intrinsic_tmp_table_write_count_basic.test new file mode 100644 index 000000000000..9378df1687d0 --- /dev/null +++ b/mysql-test/suite/rocksdb_sys_vars/t/rocksdb_max_intrinsic_tmp_table_write_count_basic.test @@ -0,0 +1,16 @@ +--source include/have_rocksdb.inc + +CREATE TABLE valid_values (value varchar(255)) ENGINE=myisam; +INSERT INTO valid_values VALUES(10); +INSERT INTO valid_values VALUES(20); + +CREATE TABLE invalid_values (value varchar(255)) ENGINE=myisam; +INSERT INTO invalid_values VALUES('\'aaa\''); + +--let $sys_var=rocksdb_max_intrinsic_tmp_table_write_count +--let $read_only=0 +--let $session=0 +--source ../include/rocksdb_sys_var.inc + +DROP TABLE valid_values; +DROP TABLE invalid_values; diff --git a/mysql-test/t/error_simulation.test b/mysql-test/t/error_simulation.test index 73f28ead254c..c812fb2c017a 100644 --- a/mysql-test/t/error_simulation.test +++ b/mysql-test/t/error_simulation.test @@ -1,6 +1,4 @@ -- source include/have_debug.inc ---source include/have_innodb_intrinsic_table.inc - # # Bug #28499: crash for grouping query when tmp_table_size is too small # diff --git a/mysql-test/t/with_recursive_innodb_tmp_table.test b/mysql-test/t/with_recursive_innodb_tmp_table.test index 7b83dcaf9fcd..44af304fb5ac 100644 --- a/mysql-test/t/with_recursive_innodb_tmp_table.test +++ b/mysql-test/t/with_recursive_innodb_tmp_table.test @@ -1,4 +1,3 @@ ---source include/have_innodb_intrinsic_table.inc --source include/no_valgrind_without_big.inc set session internal_tmp_mem_storage_engine='memory'; diff --git a/mysql-test/t/with_recursive_rocksdb-master.opt b/mysql-test/t/with_recursive_rocksdb-master.opt new file mode 100644 index 000000000000..1c26bf22ebb0 --- /dev/null +++ b/mysql-test/t/with_recursive_rocksdb-master.opt @@ -0,0 +1 @@ +--loose-rocksdb_max_intrinsic_tmp_table_write_count=1000000 diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index 55de633efddb..ed09b230fa03 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -261,6 +261,10 @@ void ha_rocksdb::inc_covered_sk_lookup() { global_stats.covered_secondary_key_lookups.inc(); } +void ha_rocksdb::inc_intrinsic_tmp_table_commits() { + global_stats.intrinsic_tmp_table_commits.inc(); +} + static handler *rocksdb_create_handler(my_core::handlerton *hton, my_core::TABLE_SHARE *table_arg, bool partitioned, @@ -859,6 +863,7 @@ static uint32_t rocksdb_select_bypass_rejected_query_history_size = 0; static uint32_t rocksdb_select_bypass_debug_row_delay = 0; static bool rocksdb_bypass_rpc_on = true; static bool rocksdb_bypass_rpc_log_rejected = false; +static uint32_t rocksdb_max_intrinsic_tmp_table_write_count = 0; static unsigned long long // NOLINT(runtime/int) rocksdb_select_bypass_multiget_min = 0; static bool rocksdb_skip_locks_if_skip_unique_check = false; @@ -2717,6 +2722,14 @@ static MYSQL_SYSVAR_BOOL(column_default_value_as_expression, "allow column default value expressed in function", nullptr, nullptr, true); +static MYSQL_SYSVAR_UINT( + max_intrinsic_tmp_table_write_count, + rocksdb_max_intrinsic_tmp_table_write_count, PLUGIN_VAR_RQCMDARG, + "Intrinsic tmp table max allowed write batch size." + "After this, current transaction holding write batch will commit and new" + "transaction will be started.", + nullptr, nullptr, /* default */ 1000, /* min */ 1, /* max */ UINT_MAX, 0); + static const int ROCKSDB_ASSUMED_KEY_VALUE_DISK_SIZE = 100; static struct SYS_VAR *rocksdb_system_variables[] = { @@ -2931,6 +2944,7 @@ static struct SYS_VAR *rocksdb_system_variables[] = { MYSQL_SYSVAR(alter_table_comment_inplace), MYSQL_SYSVAR(column_default_value_as_expression), MYSQL_SYSVAR(enable_delete_range_for_drop_index), + MYSQL_SYSVAR(max_intrinsic_tmp_table_write_count), nullptr}; static bool is_tmp_table(const std::string &tablename) { @@ -3166,7 +3180,7 @@ static int rdb_dbug_set_ttl_read_filter_ts(); */ class Rdb_transaction { protected: - ulonglong m_write_count = 0; + ulonglong m_write_count[2] = {0, 0}; ulonglong m_insert_count = 0; ulonglong m_update_count = 0; ulonglong m_delete_count = 0; @@ -3209,7 +3223,8 @@ class Rdb_transaction { // This should be used only when updating binlog information. virtual rocksdb::WriteBatchBase *get_write_batch() = 0; - virtual bool commit_no_binlog() = 0; + virtual bool commit_no_binlog( + TABLE_TYPE table_type = TABLE_TYPE::USER_TABLE) = 0; /* @detail @@ -3245,7 +3260,7 @@ class Rdb_transaction { virtual void do_rollback_to_savepoint() = 0; public: - rocksdb::ReadOptions m_read_opts; + rocksdb::ReadOptions m_read_opts[2]; const char *m_mysql_log_file_name; my_off_t m_mysql_log_offset; const char *m_mysql_gtid; @@ -3254,6 +3269,7 @@ class Rdb_transaction { int64_t m_snapshot_timestamp = 0; bool m_ddl_transaction; std::shared_ptr m_explicit_snapshot; + bool should_refresh_iterator_after_first_write = false; /* Tracks the number of tables in use through external_lock. @@ -3402,7 +3418,10 @@ class Rdb_transaction { virtual void set_lock_timeout(int timeout_sec_arg, TABLE_TYPE table_type) = 0; - ulonglong get_write_count() const { return m_write_count; } + ulonglong get_write_count( + TABLE_TYPE table_type = TABLE_TYPE::USER_TABLE) const { + return m_write_count[table_type]; + } ulonglong get_insert_count() const { return m_insert_count; } @@ -3489,7 +3508,8 @@ class Rdb_transaction { void snapshot_created(const rocksdb::Snapshot *const snapshot) { assert(snapshot != nullptr); - m_read_opts.snapshot = snapshot; + m_read_opts[USER_TABLE].snapshot = snapshot; + // TODO: Use snapshot timestamp from rocksdb Snapshot object itself. This // saves the extra call to fetch current time, and allows TTL compaction // (which uses rocksdb timestamp) to be consistent with TTL read filtering @@ -3505,7 +3525,10 @@ class Rdb_transaction { virtual void acquire_snapshot(bool acquire_now, TABLE_TYPE table_type) = 0; virtual void release_snapshot(TABLE_TYPE table_type) = 0; - bool has_snapshot() const { return m_read_opts.snapshot != nullptr; } + bool has_snapshot(TABLE_TYPE table_type) const { + if (table_type == INTRINSIC_TMP) return false; + return m_read_opts[table_type].snapshot != nullptr; + } private: // The Rdb_sst_info structures we are currently loading. In a partitioned @@ -4097,14 +4120,14 @@ class Rdb_transaction { Add test coverage for what happens when somebody attempts to do bulk inserts while inside a multi-statement transaction. */ - bool flush_batch() { - if (get_write_count() == 0) return false; + bool flush_batch(TABLE_TYPE table_type) { + if (get_write_count(table_type) == 0) return false; /* Commit the current transaction */ - if (commit_no_binlog()) return true; + if (commit_no_binlog(table_type)) return true; /* Start another one */ - start_tx(TABLE_TYPE::USER_TABLE); + start_tx(table_type); return false; } @@ -4177,7 +4200,7 @@ class Rdb_transaction { if (create_snapshot) acquire_snapshot(true, table_type); - rocksdb::ReadOptions options = m_read_opts; + rocksdb::ReadOptions options = m_read_opts[table_type]; const bool fill_cache = !THDVAR(get_thd(), skip_fill_cache); if (skip_bloom_filter) { @@ -4209,7 +4232,8 @@ class Rdb_transaction { protected: // Non-virtual functions with actions to be done on transaction start and // commit. - void on_commit() { + void on_commit(TABLE_TYPE table_type) { + if (table_type == TABLE_TYPE::INTRINSIC_TMP) return; time_t tm; tm = time(nullptr); for (auto &it : modified_tables) { @@ -4265,7 +4289,7 @@ class Rdb_transaction { entire transaction. */ do_set_savepoint(); - m_writes_at_last_savepoint = m_write_count; + m_writes_at_last_savepoint = m_write_count[USER_TABLE]; } /* @@ -4276,7 +4300,7 @@ class Rdb_transaction { // Take another RocksDB savepoint only if we had changes since the last // one. This is very important for long transactions doing lots of // SELECTs. - if (m_writes_at_last_savepoint != m_write_count) { + if (m_writes_at_last_savepoint != m_write_count[USER_TABLE]) { rocksdb::Status status = rocksdb::Status::NotFound(); while ((status = do_pop_savepoint()) == rocksdb::Status::OK()) { } @@ -4286,7 +4310,7 @@ class Rdb_transaction { } do_set_savepoint(); - m_writes_at_last_savepoint = m_write_count; + m_writes_at_last_savepoint = m_write_count[USER_TABLE]; } return HA_EXIT_SUCCESS; @@ -4296,7 +4320,7 @@ class Rdb_transaction { Rollback to the savepoint we've set before the last statement */ void rollback_to_stmt_savepoint() { - if (m_writes_at_last_savepoint != m_write_count) { + if (m_writes_at_last_savepoint != m_write_count[USER_TABLE]) { do_rollback_to_savepoint(); /* RollbackToSavePoint "removes the most recent SetSavePoint()", so @@ -4306,7 +4330,7 @@ class Rdb_transaction { statement start) because setting a savepoint is cheap. */ do_set_savepoint(); - m_write_count = m_writes_at_last_savepoint; + m_write_count[USER_TABLE] = m_writes_at_last_savepoint; } } @@ -4372,7 +4396,12 @@ class Rdb_transaction { } explicit Rdb_transaction(THD *const thd) - : m_thd(thd), m_tbl_io_perf(nullptr) {} + : m_thd(thd), m_tbl_io_perf(nullptr) { + m_read_opts[INTRINSIC_TMP].ignore_range_deletions = + !rocksdb_enable_delete_range_for_drop_index; + m_read_opts[USER_TABLE].ignore_range_deletions = + !rocksdb_enable_delete_range_for_drop_index; + } virtual ~Rdb_transaction() { #ifndef NDEBUG @@ -4477,13 +4506,13 @@ class Rdb_transaction_impl : public Rdb_transaction { return true; } - bool commit_no_binlog() override { + bool commit_no_binlog(TABLE_TYPE table_type) override { bool res = false; rocksdb::Status s; s = merge_auto_incr_map( - m_rocksdb_tx[TABLE_TYPE::USER_TABLE]->GetWriteBatch()->GetWriteBatch()); -#ifndef NDEBUG + m_rocksdb_tx[table_type]->GetWriteBatch()->GetWriteBatch()); +#ifndef DBUG_OFF DBUG_EXECUTE_IF("myrocks_commit_merge_io_error", dbug_change_status_to_io_error(&s);); DBUG_EXECUTE_IF("myrocks_commit_merge_incomplete", @@ -4495,9 +4524,9 @@ class Rdb_transaction_impl : public Rdb_transaction { goto error; } - release_snapshot(TABLE_TYPE::USER_TABLE); - s = m_rocksdb_tx[TABLE_TYPE::USER_TABLE]->Commit(); -#ifndef NDEBUG + release_snapshot(table_type); + s = m_rocksdb_tx[table_type]->Commit(); +#ifndef DBUG_OFF DBUG_EXECUTE_IF("myrocks_commit_io_error", dbug_change_status_to_io_error(&s);); DBUG_EXECUTE_IF("myrocks_commit_incomplete", @@ -4509,26 +4538,34 @@ class Rdb_transaction_impl : public Rdb_transaction { goto error; } - on_commit(); + on_commit(table_type); error: - on_rollback(); - /* Save the transaction object to be reused */ - release_tx(); - - m_write_count = 0; - m_insert_count = 0; - m_update_count = 0; - m_delete_count = 0; - m_row_lock_count = 0; - set_tx_read_only(false); - m_rollback_only = false; + if (table_type == USER_TABLE) { + on_rollback(); + /* Save the transaction object to be reused */ + release_tx(); + m_write_count[USER_TABLE] = 0; + m_write_count[INTRINSIC_TMP] = 0; + m_insert_count = 0; + m_update_count = 0; + m_delete_count = 0; + m_row_lock_count = 0; + set_tx_read_only(false); + m_rollback_only = false; + } else { + m_write_count[INTRINSIC_TMP] = 0; + // clean up only tmp table tx + m_rocksdb_reuse_tx[INTRINSIC_TMP] = m_rocksdb_tx[INTRINSIC_TMP]; + m_rocksdb_tx[INTRINSIC_TMP] = nullptr; + } return res; } public: void rollback() override { on_rollback(); - m_write_count = 0; + m_write_count[USER_TABLE] = 0; + m_write_count[INTRINSIC_TMP] = 0; m_insert_count = 0; m_update_count = 0; m_delete_count = 0; @@ -4545,18 +4582,18 @@ class Rdb_transaction_impl : public Rdb_transaction { set_tx_read_only(false); m_rollback_only = false; + } else { + m_rocksdb_reuse_tx[INTRINSIC_TMP] = m_rocksdb_tx[INTRINSIC_TMP]; + m_rocksdb_tx[INTRINSIC_TMP] = nullptr; } } void acquire_snapshot(bool acquire_now, TABLE_TYPE table_type) override { if (table_type == INTRINSIC_TMP) { - // intrinsic tmp table iterator currently reads all data from local write - // batch, so there is no need for explicit snapshot. TODO(pgl): Revisit - // this later after we enabled commit in batches for big write batch. return; } - if (m_read_opts.snapshot == nullptr) { + if (m_read_opts[table_type].snapshot == nullptr) { const auto thd_ss = std::static_pointer_cast( m_thd->get_explicit_snapshot()); if (thd_ss) { @@ -4568,11 +4605,10 @@ class Rdb_transaction_impl : public Rdb_transaction { } else if (is_tx_read_only()) { snapshot_created(rdb->GetSnapshot()); } else if (acquire_now) { - m_rocksdb_tx[TABLE_TYPE::USER_TABLE]->SetSnapshot(); - snapshot_created(m_rocksdb_tx[TABLE_TYPE::USER_TABLE]->GetSnapshot()); + m_rocksdb_tx[table_type]->SetSnapshot(); + snapshot_created(m_rocksdb_tx[table_type]->GetSnapshot()); } else if (!m_is_delayed_snapshot) { - m_rocksdb_tx[TABLE_TYPE::USER_TABLE]->SetSnapshotOnNextOperation( - m_notifier); + m_rocksdb_tx[table_type]->SetSnapshotOnNextOperation(m_notifier); m_is_delayed_snapshot = true; } } @@ -4582,36 +4618,38 @@ class Rdb_transaction_impl : public Rdb_transaction { if (table_type == INTRINSIC_TMP) { return; } + bool need_clear = m_is_delayed_snapshot; - if (m_read_opts.snapshot != nullptr) { + if (m_read_opts[table_type].snapshot != nullptr) { m_snapshot_timestamp = 0; if (m_explicit_snapshot) { m_explicit_snapshot.reset(); need_clear = false; } else if (is_tx_read_only()) { - rdb->ReleaseSnapshot(m_read_opts.snapshot); + rdb->ReleaseSnapshot(m_read_opts[table_type].snapshot); need_clear = false; } else { need_clear = true; } - m_read_opts.snapshot = nullptr; + m_read_opts[table_type].snapshot = nullptr; } - if (need_clear && m_rocksdb_tx[TABLE_TYPE::USER_TABLE] != nullptr) - m_rocksdb_tx[TABLE_TYPE::USER_TABLE]->ClearSnapshot(); + if (need_clear && m_rocksdb_tx[table_type] != nullptr) + m_rocksdb_tx[table_type]->ClearSnapshot(); m_is_delayed_snapshot = false; } - bool has_snapshot() { return m_read_opts.snapshot != nullptr; } + bool has_snapshot(TABLE_TYPE table_type) { + if (table_type == INTRINSIC_TMP) return false; + return m_read_opts[table_type].snapshot != nullptr; + } rocksdb::Status put(rocksdb::ColumnFamilyHandle *const column_family, const rocksdb::Slice &key, const rocksdb::Slice &value, TABLE_TYPE table_type, const bool assume_tracked) override { - if (table_type == USER_TABLE) { - ++m_write_count; - } + ++m_write_count[table_type]; return m_rocksdb_tx[table_type]->Put(column_family, key, value, assume_tracked); } @@ -4619,9 +4657,7 @@ class Rdb_transaction_impl : public Rdb_transaction { rocksdb::Status delete_key(rocksdb::ColumnFamilyHandle *const column_family, const rocksdb::Slice &key, TABLE_TYPE table_type, const bool assume_tracked) override { - if (table_type == USER_TABLE) { - ++m_write_count; - } + ++m_write_count[table_type]; return m_rocksdb_tx[table_type]->Delete(column_family, key, assume_tracked); } @@ -4629,9 +4665,7 @@ class Rdb_transaction_impl : public Rdb_transaction { rocksdb::ColumnFamilyHandle *const column_family, const rocksdb::Slice &key, TABLE_TYPE table_type, const bool assume_tracked) override { - if (table_type == USER_TABLE) { - ++m_write_count; - } + ++m_write_count[table_type]; return m_rocksdb_tx[table_type]->SingleDelete(column_family, key, assume_tracked); } @@ -4662,9 +4696,7 @@ class Rdb_transaction_impl : public Rdb_transaction { */ rocksdb::WriteBatchBase *get_indexed_write_batch( TABLE_TYPE table_type) override { - if (table_type == USER_TABLE) { - ++m_write_count; - } + ++m_write_count[table_type]; return m_rocksdb_tx[table_type]->GetWriteBatch(); } @@ -4684,7 +4716,7 @@ class Rdb_transaction_impl : public Rdb_transaction { // select * from qn; rocksdb::PinnableSlice pin_val; rocksdb::Status s = m_rocksdb_tx[table_type]->Get( - m_read_opts, column_family, key, &pin_val); + m_read_opts[table_type], column_family, key, &pin_val); pin_val.Reset(); return s; } else { @@ -4692,8 +4724,8 @@ class Rdb_transaction_impl : public Rdb_transaction { if (table_type == USER_TABLE) { global_stats.queries[QUERIES_POINT].inc(); } - return m_rocksdb_tx[table_type]->Get(m_read_opts, column_family, key, - value); + return m_rocksdb_tx[table_type]->Get(m_read_opts[table_type], + column_family, key, value); } } @@ -4702,8 +4734,9 @@ class Rdb_transaction_impl : public Rdb_transaction { rocksdb::PinnableSlice *values, TABLE_TYPE table_type, rocksdb::Status *statuses, const bool sorted_input) const override { - m_rocksdb_tx[table_type]->MultiGet(m_read_opts, column_family, num_keys, - keys, values, statuses, sorted_input); + m_rocksdb_tx[table_type]->MultiGet(m_read_opts[table_type], column_family, + num_keys, keys, values, statuses, + sorted_input); } rocksdb::Status get_for_update(const Rdb_key_def &key_descr, @@ -4741,18 +4774,18 @@ class Rdb_transaction_impl : public Rdb_transaction { rocksdb::Status s; // If snapshot is null, pass it to GetForUpdate and snapshot is // initialized there. Snapshot validation is skipped in that case. - if (m_read_opts.snapshot == nullptr || do_validate) { + if (m_read_opts[table_type].snapshot == nullptr || do_validate) { s = m_rocksdb_tx[table_type]->GetForUpdate( - m_read_opts, column_family, key, value, exclusive, - m_read_opts.snapshot ? do_validate : false); + m_read_opts[table_type], column_family, key, value, exclusive, + m_read_opts[table_type].snapshot ? do_validate : false); } else { // If snapshot is set, and if skipping validation, // call GetForUpdate without validation and set back old snapshot - auto saved_snapshot = m_read_opts.snapshot; - m_read_opts.snapshot = nullptr; - s = m_rocksdb_tx[table_type]->GetForUpdate(m_read_opts, column_family, - key, value, exclusive, false); - m_read_opts.snapshot = saved_snapshot; + auto saved_snapshot = m_read_opts[table_type].snapshot; + m_read_opts[table_type].snapshot = nullptr; + s = m_rocksdb_tx[table_type]->GetForUpdate( + m_read_opts[table_type], column_family, key, value, exclusive, false); + m_read_opts[table_type].snapshot = saved_snapshot; } // row_lock_count is to track per row instead of per key @@ -4802,6 +4835,7 @@ class Rdb_transaction_impl : public Rdb_transaction { m_rocksdb_tx[table_type] = rdb->BeginTransaction( write_opts, tx_opts, m_rocksdb_reuse_tx[table_type]); m_rocksdb_reuse_tx[table_type] = nullptr; + m_read_opts[table_type] = rocksdb::ReadOptions(); } else { write_opts.sync = (rocksdb_flush_log_at_trx_commit == FLUSH_LOG_SYNC) && rdb_sync_wal_supported(); @@ -4818,7 +4852,9 @@ class Rdb_transaction_impl : public Rdb_transaction { write_opts, tx_opts, m_rocksdb_reuse_tx[table_type]); m_rocksdb_reuse_tx[table_type] = nullptr; - m_read_opts = rocksdb::ReadOptions(); + m_read_opts[table_type] = rocksdb::ReadOptions(); + m_read_opts[table_type].ignore_range_deletions = + !rocksdb_enable_delete_range_for_drop_index; set_initial_savepoint(); @@ -4887,7 +4923,7 @@ class Rdb_transaction_impl : public Rdb_transaction { if (org_snapshot != cur_snapshot) { if (org_snapshot != nullptr) m_snapshot_timestamp = 0; - m_read_opts.snapshot = cur_snapshot; + m_read_opts[TABLE_TYPE::USER_TABLE].snapshot = cur_snapshot; if (cur_snapshot != nullptr) { rdb->GetEnv()->GetCurrentTime(&m_snapshot_timestamp); } else { @@ -4942,15 +4978,20 @@ class Rdb_writebatch_impl : public Rdb_transaction { // Called after commit/rollback. void reset() { m_batch->Clear(); - m_read_opts = rocksdb::ReadOptions(); + m_read_opts[USER_TABLE] = rocksdb::ReadOptions(); + m_read_opts[USER_TABLE].ignore_range_deletions = + !rocksdb_enable_delete_range_for_drop_index; m_ddl_transaction = false; } private: bool prepare() override { return true; } - bool commit_no_binlog() override { + bool commit_no_binlog(TABLE_TYPE table_type) override { bool res = false; + if (table_type == INTRINSIC_TMP) { + return res; + } rocksdb::Status s; rocksdb::TransactionDBWriteOptimizations optimize; optimize.skip_concurrency_control = true; @@ -4961,7 +5002,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { res = true; goto error; } - release_snapshot(TABLE_TYPE::USER_TABLE); + release_snapshot(table_type); s = rdb->Write(write_opts, optimize, m_batch->GetWriteBatch()); if (!s.ok()) { @@ -4969,12 +5010,12 @@ class Rdb_writebatch_impl : public Rdb_transaction { res = true; goto error; } - on_commit(); + on_commit(table_type); error: on_rollback(); reset(); - m_write_count = 0; + m_write_count[table_type] = 0; m_insert_count = 0; m_update_count = 0; m_delete_count = 0; @@ -5009,7 +5050,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { void rollback() override { on_rollback(); - m_write_count = 0; + m_write_count[TABLE_TYPE::USER_TABLE] = 0; m_insert_count = 0; m_update_count = 0; m_delete_count = 0; @@ -5027,7 +5068,8 @@ class Rdb_writebatch_impl : public Rdb_transaction { assert(false); return; } - if (m_read_opts.snapshot == nullptr) snapshot_created(rdb->GetSnapshot()); + if (m_read_opts[table_type].snapshot == nullptr) + snapshot_created(rdb->GetSnapshot()); } void release_snapshot(TABLE_TYPE table_type) override { @@ -5035,9 +5077,9 @@ class Rdb_writebatch_impl : public Rdb_transaction { assert(false); return; } - if (m_read_opts.snapshot != nullptr) { - rdb->ReleaseSnapshot(m_read_opts.snapshot); - m_read_opts.snapshot = nullptr; + if (m_read_opts[table_type].snapshot != nullptr) { + rdb->ReleaseSnapshot(m_read_opts[table_type].snapshot); + m_read_opts[table_type].snapshot = nullptr; } } @@ -5050,7 +5092,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { return rocksdb::Status::NotSupported( "Not supported for intrinsic tmp tables"); } - ++m_write_count; + ++m_write_count[table_type]; m_batch->Put(column_family, key, value); // Note Put/Delete in write batch doesn't return any error code. We simply // return OK here. @@ -5066,7 +5108,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { return rocksdb::Status::NotSupported( "Not supported for intrinsic tmp tables"); } - ++m_write_count; + ++m_write_count[table_type]; m_batch->Delete(column_family, key); return rocksdb::Status::OK(); } @@ -5080,7 +5122,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { return rocksdb::Status::NotSupported( "Not supported for intrinsic tmp tables"); } - ++m_write_count; + ++m_write_count[table_type]; m_batch->SingleDelete(column_family, key); return rocksdb::Status::OK(); } @@ -5097,7 +5139,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { assert(false); return nullptr; } - ++m_write_count; + ++m_write_count[table_type]; return m_batch; } @@ -5111,8 +5153,8 @@ class Rdb_writebatch_impl : public Rdb_transaction { "Not supported for intrinsic tmp tables"); } value->Reset(); - return m_batch->GetFromBatchAndDB(rdb, m_read_opts, column_family, key, - value); + return m_batch->GetFromBatchAndDB(rdb, m_read_opts[table_type], + column_family, key, value); } void multi_get(rocksdb::ColumnFamilyHandle *const column_family, @@ -5124,8 +5166,9 @@ class Rdb_writebatch_impl : public Rdb_transaction { assert(false); return; } - m_batch->MultiGetFromBatchAndDB(rdb, m_read_opts, column_family, num_keys, - keys, values, statuses, sorted_input); + m_batch->MultiGetFromBatchAndDB(rdb, m_read_opts[table_type], column_family, + num_keys, keys, values, statuses, + sorted_input); } rocksdb::Status get_for_update(const Rdb_key_def &key_descr, @@ -5255,6 +5298,37 @@ class Rdb_ha_data { void set_disable_file_deletions(bool d) { disable_file_deletions = d; } + void add_tmp_table_handler(ha_rocksdb *rocksdb_handler) { + m_tmp_table_handlers.insert(rocksdb_handler); + } + + void remove_tmp_table_handler(ha_rocksdb *rocksdb_handler) { + m_tmp_table_handlers.erase(m_tmp_table_handlers.find(rocksdb_handler)); + } + + bool refresh_iterator_for_all_handlers( + const std::vector *output) { + bool res = false; + int count = 0; + for (auto const &handler : m_tmp_table_handlers) { + res = handler->refresh_tmp_table_iterator((*output)[count++]); + if (res) { + return res; + } + } + return res; + } + + void extract_iterator_keys_for_all_handlers( + std::vector *output) { + for (auto const &handler : m_tmp_table_handlers) { + // current_key will be empty if the iterator is invalid. + std::string current_key = {}; + handler->extract_snapshot_keys(¤t_key); + output->push_back(current_key); + } + } + private: void clear_checkpoint_dir() { if (checkpoint_dir) { @@ -5267,6 +5341,7 @@ class Rdb_ha_data { char *checkpoint_dir; Rdb_transaction *trx; bool disable_file_deletions; + std::multiset m_tmp_table_handlers; }; static Rdb_ha_data *&get_ha_data(THD *const thd) { @@ -5288,6 +5363,14 @@ Rdb_transaction *get_tx_from_thd(THD *const thd) { return get_ha_data(thd)->get_trx(); } +void add_tmp_table_handler(THD *const thd, ha_rocksdb *rocksdb_handler) { + get_ha_data(thd)->add_tmp_table_handler(rocksdb_handler); +} + +void remove_tmp_table_handler(THD *const thd, ha_rocksdb *rocksdb_handler) { + get_ha_data(thd)->remove_tmp_table_handler(rocksdb_handler); +} + static void set_tx_on_thd(THD *const thd, Rdb_transaction *trx) { return get_ha_data(thd)->set_trx(trx); } @@ -6666,7 +6749,7 @@ static int rocksdb_start_tx_and_assign_read_view( Rdb_transaction *const tx = get_or_create_tx(thd, TABLE_TYPE::USER_TABLE); Rdb_perf_context_guard guard(tx, thd); - assert(!tx->has_snapshot()); + assert(!tx->has_snapshot(TABLE_TYPE::USER_TABLE)); tx->set_tx_read_only(true); rocksdb_register_tx(hton, thd, tx); tx->acquire_snapshot(true, TABLE_TYPE::USER_TABLE); @@ -6723,15 +6806,15 @@ static int rocksdb_start_tx_with_shared_read_view( tx->m_explicit_snapshot = explicit_snapshot; } - assert(!tx->has_snapshot()); + assert(!tx->has_snapshot(TABLE_TYPE::USER_TABLE)); tx->set_tx_read_only(true); rocksdb_register_tx(hton, thd, tx); tx->acquire_snapshot(true, TABLE_TYPE::USER_TABLE); // case: an explicit snapshot was not assigned to this transaction if (!tx->m_explicit_snapshot) { - tx->m_explicit_snapshot = - Rdb_explicit_snapshot::create(ss_info, rdb, tx->m_read_opts.snapshot); + tx->m_explicit_snapshot = Rdb_explicit_snapshot::create( + ss_info, rdb, tx->m_read_opts[TABLE_TYPE::USER_TABLE].snapshot); if (!tx->m_explicit_snapshot) { my_printf_error(ER_UNKNOWN_ERROR, "Could not create snapshot", MYF(0)); error = HA_EXIT_FAILURE; @@ -7849,6 +7932,7 @@ static int rocksdb_init_func(void *const p) { // to IOError or corruption. The good practice is always check it. // https://github.com/facebook/rocksdb/wiki/Iterator#error-handling bool is_valid_iterator(rocksdb::Iterator *scan_it) { + if (scan_it == nullptr) return false; if (scan_it->Valid()) { return true; } else { @@ -7983,14 +8067,15 @@ ulonglong ha_rocksdb::load_auto_incr_value_from_index() { assert(!m_key_descr_arr[active_index_pos()]->is_partial_index()); std::unique_ptr save_iterator(new Rdb_iterator_base( - ha_thd(), m_key_descr_arr[active_index_pos()], m_pk_descr, m_tbl_def)); + ha_thd(), nullptr, m_key_descr_arr[active_index_pos()], m_pk_descr, + m_tbl_def)); std::swap(m_iterator, save_iterator); ulonglong last_val = 0; Rdb_transaction *const tx = get_or_create_tx(table->in_use, m_tbl_def->get_table_type()); - const bool is_new_snapshot = !tx->has_snapshot(); + const bool is_new_snapshot = !tx->has_snapshot(m_tbl_def->get_table_type()); if (is_new_snapshot) { tx->acquire_snapshot(true, m_tbl_def->get_table_type()); } @@ -8087,12 +8172,13 @@ int ha_rocksdb::load_hidden_pk_value() { const uint8 save_table_status = table->m_status; std::unique_ptr save_iterator(new Rdb_iterator_base( - ha_thd(), m_key_descr_arr[active_index_pos()], m_pk_descr, m_tbl_def)); + ha_thd(), nullptr, m_key_descr_arr[active_index_pos()], m_pk_descr, + m_tbl_def)); std::swap(m_iterator, save_iterator); Rdb_transaction *const tx = get_or_create_tx(table->in_use, m_tbl_def->get_table_type()); - const bool is_new_snapshot = !tx->has_snapshot(); + const bool is_new_snapshot = !tx->has_snapshot(m_tbl_def->get_table_type()); longlong hidden_pk_id = 1; longlong old = 0; @@ -8622,7 +8708,7 @@ int ha_rocksdb::close(void) { m_pk_descr = nullptr; m_key_descr_arr = nullptr; m_converter = nullptr; - m_iterator = nullptr; + m_iterator.reset(nullptr); free_key_buffers(); if (m_table_handler != nullptr) { @@ -8666,6 +8752,7 @@ static const char *const rdb_error_messages[] = { "RocksDB status: deadlock.", "RocksDB status: expired.", "RocksDB status: try again.", + "RocksDB commit failed for intrinsic tmp table.", }; static_assert((sizeof(rdb_error_messages) / sizeof(rdb_error_messages[0])) == @@ -10138,7 +10225,7 @@ int ha_rocksdb::index_read_intern(uchar *const buf, const uchar *const key, Rdb_transaction *const tx = get_or_create_tx(table->in_use, m_tbl_def->get_table_type()); - const bool is_new_snapshot = !tx->has_snapshot(); + const bool is_new_snapshot = !tx->has_snapshot(m_tbl_def->get_table_type()); // Loop as long as we get a deadlock error AND we end up creating the // snapshot here (i.e. it did not exist prior to this) @@ -10312,7 +10399,6 @@ int ha_rocksdb::check(THD *const thd MY_ATTRIBUTE((__unused__)), read_hidden_pk_id_from_rowkey(&hidden_pk_id)) { goto error; } - /* Check if we get the same PK value */ uint packed_size = m_pk_descr->pack_record( table, m_pack_buffer, table->record[0], m_pk_packed_tuple, nullptr, @@ -10324,7 +10410,6 @@ int ha_rocksdb::check(THD *const thd MY_ATTRIBUTE((__unused__)), table_name, rows); goto print_and_error; } - /* Check if we get the same secondary key value */ packed_size = m_key_descr_arr[keyno]->pack_record( table, m_pack_buffer, table->record[0], m_sk_packed_tuple, @@ -10798,8 +10883,8 @@ int ha_rocksdb::index_next_with_direction_intern(uchar *const buf, Rdb_iterator_base *ha_rocksdb::get_pk_iterator() { if (!m_pk_iterator) { - m_pk_iterator.reset( - new Rdb_iterator_base(ha_thd(), m_pk_descr, m_pk_descr, m_tbl_def)); + m_pk_iterator.reset(new Rdb_iterator_base(ha_thd(), nullptr, m_pk_descr, + m_pk_descr, m_tbl_def)); } return m_pk_iterator.get(); } @@ -10977,7 +11062,89 @@ static bool commit_in_the_middle(THD *thd) { bool ha_rocksdb::do_bulk_commit(Rdb_transaction *const tx) { return commit_in_the_middle(table->in_use) && tx->get_write_count() >= THDVAR(table->in_use, bulk_load_size) && - tx->flush_batch(); + tx->flush_batch(TABLE_TYPE::USER_TABLE); +} +/* + Commits the write batch accumulated so far. + Steps: + 1) Check if the write batch is more than threshold. + 2) If yes, then extract the current keys for all open iterators. After write + batch flush, iterators running on write batch will become invalid. So we + won't be able to extract the current keys. + 3) Flush write batch accumulated so far. + 4) Refresh all the iterators to the exact keys where they were before + write batch flush. +*/ +bool ha_rocksdb::do_intrinsic_table_commit(Rdb_transaction *const tx) { + bool res = false; + if (m_tbl_def->get_table_type() == USER_TABLE) return res; + + if (tx->should_refresh_iterator_after_first_write) { + // This is to handle the special case where we start rocksdb iterator on + // new transaction(with empty write batch). Then iterator will only see + // the already committed data, but ignores any new data added in write + // batch later. So we are pro-actively refreshing the iterator after first + // write in write batch. + std::vector output; + get_ha_data(ha_thd())->extract_iterator_keys_for_all_handlers(&output); + res = get_ha_data(ha_thd())->refresh_iterator_for_all_handlers(&output); + tx->should_refresh_iterator_after_first_write = false; + return res; + } else if (tx->get_write_count(m_tbl_def->get_table_type()) < + rocksdb_max_intrinsic_tmp_table_write_count) { + return res; + } else { + std::vector output; + get_ha_data(ha_thd())->extract_iterator_keys_for_all_handlers(&output); + res = tx->flush_batch(m_tbl_def->get_table_type()); + if (res) { + // NO_LINT_DEBUG + sql_print_error("flush_batch failed for intrinsic table commit"); + return res; + } + res = get_ha_data(ha_thd())->refresh_iterator_for_all_handlers(&output); + inc_intrinsic_tmp_table_commits(); + tx->should_refresh_iterator_after_first_write = true; + return res; + } +} + +bool ha_rocksdb::refresh_tmp_table_iterator(const std::string &key) { + bool res = false; + if (m_tbl_def == nullptr || m_tbl_def->get_table_type() == USER_TABLE) { + return res; + } + // If m_iterator is valid, then after commit reset it back to previous value. + if (m_iterator != nullptr) { + if (!key.empty()) { + const rocksdb::Slice ¤t_key = rocksdb::Slice(key); + m_iterator->reset(); + rocksdb::Slice empty_end_slice; + if ((res = m_iterator->seek(HA_READ_KEY_OR_NEXT, current_key, + true /*using_full_key*/, empty_end_slice, + true /* read_current */))) { + return res; + } + if (m_iterator->key() != current_key) { + // Key not found in the index + res = HA_ERR_KEY_NOT_FOUND; + assert(false); + m_iterator->reset(); + return res; + } + } else { + m_iterator->reset(); + } + } + return res; +} + +void ha_rocksdb::extract_snapshot_keys(std::string *key) { + if (m_tbl_def != nullptr && m_iterator != nullptr && m_iterator->is_valid()) { + *key = m_iterator->key().ToString(); + } else { + *key = {}; + } } /* @@ -11126,12 +11293,12 @@ int ha_rocksdb::write_row(uchar *const buf) { */ ptrdiff_t ptrdiff = buf - table->record[0]; uchar *save_record_0 = nullptr; - if (ptrdiff) { + if (m_tbl_def->is_intrinsic_tmp_table() && ptrdiff) { save_record_0 = table->record[0]; table->record[0] = buf; for (uint i = 0; i < table->s->fields; i++) { - assert(table->s->field[i]); - table->s->field[i]->move_field_offset(ptrdiff); + assert(table->field[i]); + table->field[i]->move_field_offset(ptrdiff); } assert(m_tbl_def->is_intrinsic_tmp_table()); } @@ -11167,10 +11334,11 @@ int ha_rocksdb::write_row(uchar *const buf) { update_row_stats(ROWS_INSERTED); } - if (ptrdiff) { + if (m_tbl_def->is_intrinsic_tmp_table() && ptrdiff) { table->record[0] = save_record_0; for (uint i = 0; i < table->s->fields; i++) { - table->s->field[i]->move_field_offset(-ptrdiff); + assert(table->field[i]); + table->field[i]->move_field_offset(-ptrdiff); } } @@ -11487,7 +11655,7 @@ int ha_rocksdb::check_and_lock_sk( The bloom filter may need to be disabled for this lookup. */ assert(!m_key_descr_arr[key_id]->is_partial_index()); - Rdb_iterator_base iter(ha_thd(), m_key_descr_arr[key_id], m_pk_descr, + Rdb_iterator_base iter(ha_thd(), nullptr, m_key_descr_arr[key_id], m_pk_descr, m_tbl_def); /* @@ -12128,6 +12296,11 @@ int ha_rocksdb::update_write_row(const uchar *const old_data, DBUG_RETURN(HA_ERR_ROCKSDB_BULK_LOAD); } + if (m_tbl_def->get_table_type() != INTRINSIC_TMP && + do_intrinsic_table_commit(row_info.tx)) { + DBUG_RETURN(HA_ERR_ROCKSDB_TMP_TABLE_COMMIT_FAILED); + } + DBUG_RETURN(HA_EXIT_SUCCESS); } @@ -12277,7 +12450,7 @@ int ha_rocksdb::index_init(uint idx, bool sorted MY_ATTRIBUTE((__unused__))) { m_pk_descr, m_tbl_def, table, dd_table)); } else { m_iterator.reset(new Rdb_iterator_base( - thd, m_key_descr_arr[active_index_pos()], m_pk_descr, m_tbl_def)); + thd, this, m_key_descr_arr[active_index_pos()], m_pk_descr, m_tbl_def)); } // If m_lock_rows is not RDB_LOCK_NONE then we will be doing a get_for_update @@ -12299,7 +12472,7 @@ int ha_rocksdb::index_end() { m_need_build_decoder = false; - m_iterator = nullptr; + m_iterator.reset(nullptr); active_index = MAX_KEY; in_range_check_pushed_down = false; @@ -12438,6 +12611,11 @@ int ha_rocksdb::delete_row(const uchar *const buf) { if (do_bulk_commit(tx)) { DBUG_RETURN(HA_ERR_ROCKSDB_BULK_LOAD); } + + if (m_tbl_def->get_table_type() != INTRINSIC_TMP && + do_intrinsic_table_commit(tx)) { + DBUG_RETURN(HA_ERR_ROCKSDB_TMP_TABLE_COMMIT_FAILED); + } /* TODO(yzha) - row stats are gone in 8.0 stats.rows_deleted++; */ @@ -12862,23 +13040,7 @@ int ha_rocksdb::update_row(const uchar *const old_data, uchar *const new_data) { old_data points to record we're updating. It is the same as the record we've just read (for multi-table UPDATE, too, because SQL layer will make an rnd_pos() call to re-read the record before calling update_row()). - - Only intrinsic table(create_ondisk_from_heap) uses record[1] as buffer to - read/write. All scenarios uses record[0] as buffer to read/write. - Rdb_converter uses fields pointing to record[0] for encoding, so updating - the fields to record[1] for tmp tables is required for proper encoding. */ - ptrdiff_t ptrdiff = new_data - table->record[0]; - uchar *save_record_0 = nullptr; - if (ptrdiff) { - save_record_0 = table->record[0]; - table->record[0] = new_data; - for (uint i = 0; i < table->s->fields; i++) { - assert(table->s->field[i]); - table->s->field[i]->move_field_offset(ptrdiff); - } - assert(m_tbl_def->is_intrinsic_tmp_table()); - } ha_statistic_increment(&System_status_var::ha_update_count); int err = check_disk_usage(); @@ -12893,13 +13055,6 @@ int ha_rocksdb::update_row(const uchar *const old_data, uchar *const new_data) { update_row_stats(ROWS_UPDATED); } - if (ptrdiff) { - table->record[0] = save_record_0; - for (uint i = 0; i < table->s->fields; i++) { - table->s->field[i]->move_field_offset(-ptrdiff); - } - } - DBUG_RETURN(rv); } @@ -13592,10 +13747,9 @@ int ha_rocksdb::delete_table(const char *const tablename, HA_EXIT_SUCCESS OK other HA_ERR error code (cannot be SE-specific) */ -int ha_rocksdb::rename_table( - const char *const from, const char *const to, - const dd::Table *from_table_def MY_ATTRIBUTE((__unused__)), - dd::Table *to_table_def MY_ATTRIBUTE((__unused__))) { +int ha_rocksdb::rename_table(const char *const from, const char *const to, + [[maybe_unused]] const dd::Table *from_table_def, + [[maybe_unused]] dd::Table *to_table_def) { DBUG_ENTER_FUNC(); std::string from_str; @@ -15780,6 +15934,8 @@ static void myrocks_update_status() { export_stats.covered_secondary_key_lookups = global_stats.covered_secondary_key_lookups; + export_stats.intrinsic_tmp_table_commits = + global_stats.intrinsic_tmp_table_commits; } static void myrocks_update_memory_status() { @@ -15838,7 +15994,9 @@ static SHOW_VAR myrocks_status_variables[] = { DEF_STATUS_VAR_FUNC("covered_secondary_key_lookups", &export_stats.covered_secondary_key_lookups, SHOW_LONGLONG), - + DEF_STATUS_VAR_FUNC("intrinsic_tmp_table_commits", + &export_stats.intrinsic_tmp_table_commits, + SHOW_LONGLONG), {NullS, NullS, SHOW_LONG, SHOW_SCOPE_GLOBAL}}; static int show_myrocks_vars(THD *thd MY_ATTRIBUTE((unused)), SHOW_VAR *var, @@ -17819,7 +17977,7 @@ unsigned long long get_partial_index_sort_max_mem(THD *thd) { const rocksdb::ReadOptions &rdb_tx_acquire_snapshot(Rdb_transaction *tx) { tx->acquire_snapshot(true, TABLE_TYPE::USER_TABLE); - return tx->m_read_opts; + return tx->m_read_opts[TABLE_TYPE::USER_TABLE]; } rocksdb::Iterator *rdb_tx_get_iterator( diff --git a/storage/rocksdb/ha_rocksdb.h b/storage/rocksdb/ha_rocksdb.h index 9a50fa9fd105..c95a15fd92c5 100644 --- a/storage/rocksdb/ha_rocksdb.h +++ b/storage/rocksdb/ha_rocksdb.h @@ -370,6 +370,8 @@ class ha_rocksdb : public my_core::handler, public blob_buffer { bool skip_unique_check() const; bool do_bulk_commit(Rdb_transaction *const tx) MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); + bool do_intrinsic_table_commit(Rdb_transaction *const tx) + MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); bool has_hidden_pk(const TABLE *const table) const MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); @@ -397,6 +399,8 @@ class ha_rocksdb : public my_core::handler, public blob_buffer { Rdb_io_perf m_io_perf; public: + bool refresh_tmp_table_iterator(const std::string &key); + void extract_snapshot_keys(std::string *key); static rocksdb::Range get_range(const Rdb_key_def &kd, uchar buf[]); /* @@ -490,9 +494,9 @@ class ha_rocksdb : public my_core::handler, public blob_buffer { bool should_store_row_debug_checksums() const; int rename_table(const char *const from, const char *const to, - const dd::Table *from_table_def MY_ATTRIBUTE((__unused__)), - dd::Table *to_table_def MY_ATTRIBUTE((__unused__))) override - MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); + const dd::Table *from_table_def, + dd::Table *to_table_def) override + MY_ATTRIBUTE((__warn_unused_result__, __nonnull__(2, 3))); int convert_record_from_storage_format(const rocksdb::Slice *const key, const rocksdb::Slice *const value, @@ -984,6 +988,7 @@ class ha_rocksdb : public my_core::handler, public blob_buffer { void update_row_read(ulonglong count); static void inc_covered_sk_lookup(); + static void inc_intrinsic_tmp_table_commits(); void build_decoder(); void check_build_decoder(); @@ -1152,6 +1157,8 @@ bool should_log_rejected_bypass_rpc(); unsigned long long get_partial_index_sort_max_mem(THD *thd); Rdb_transaction *get_tx_from_thd(THD *const thd); +void add_tmp_table_handler(THD *const thd, ha_rocksdb *rocksdb_handler); +void remove_tmp_table_handler(THD *const thd, ha_rocksdb *rocksdb_handler); const rocksdb::ReadOptions &rdb_tx_acquire_snapshot(Rdb_transaction *tx); diff --git a/storage/rocksdb/nosql_access.cc b/storage/rocksdb/nosql_access.cc index 6b6660edd506..8d3676b32f3f 100644 --- a/storage/rocksdb/nosql_access.cc +++ b/storage/rocksdb/nosql_access.cc @@ -2331,7 +2331,7 @@ bool INLINE_ATTR select_exec::setup_iterator(THD *thd) { m_tbl_def, m_table, m_dd_table)); } else { m_iterator.reset( - new Rdb_iterator_base(thd, m_key_def, m_pk_def, m_tbl_def)); + new Rdb_iterator_base(thd, nullptr, m_key_def, m_pk_def, m_tbl_def)); } return m_iterator == nullptr; diff --git a/storage/rocksdb/rdb_global.h b/storage/rocksdb/rdb_global.h index 8380387cc63b..6ecd53d1bdaf 100644 --- a/storage/rocksdb/rdb_global.h +++ b/storage/rocksdb/rdb_global.h @@ -353,7 +353,8 @@ static_assert(HA_ERR_ROCKSDB_FIRST > HA_ERR_LAST, #define HA_ERR_ROCKSDB_STATUS_DEADLOCK (HA_ERR_ROCKSDB_FIRST + 23) #define HA_ERR_ROCKSDB_STATUS_EXPIRED (HA_ERR_ROCKSDB_FIRST + 24) #define HA_ERR_ROCKSDB_STATUS_TRY_AGAIN (HA_ERR_ROCKSDB_FIRST + 25) -#define HA_ERR_ROCKSDB_LAST HA_ERR_ROCKSDB_STATUS_TRY_AGAIN +#define HA_ERR_ROCKSDB_TMP_TABLE_COMMIT_FAILED (HA_ERR_ROCKSDB_FIRST + 26) +#define HA_ERR_ROCKSDB_LAST HA_ERR_ROCKSDB_TMP_TABLE_COMMIT_FAILED const char *const rocksdb_hton_name = "ROCKSDB"; @@ -424,6 +425,7 @@ struct st_global_stats { table_index_stats_result[TABLE_INDEX_STATS_RESULT_MAX]; ib_counter_t covered_secondary_key_lookups; + ib_counter_t intrinsic_tmp_table_commits; }; /* Struct used for exporting status to MySQL */ @@ -450,6 +452,7 @@ struct st_export_stats { ulonglong table_index_stats_req_queue_length; ulonglong covered_secondary_key_lookups; + ulonglong intrinsic_tmp_table_commits; }; /* Struct used for exporting RocksDB memory status */ diff --git a/storage/rocksdb/rdb_iterator.cc b/storage/rocksdb/rdb_iterator.cc index 0a21a8a1541e..63f715919b28 100644 --- a/storage/rocksdb/rdb_iterator.cc +++ b/storage/rocksdb/rdb_iterator.cc @@ -27,7 +27,7 @@ namespace myrocks { Rdb_iterator::~Rdb_iterator() {} -Rdb_iterator_base::Rdb_iterator_base(THD *thd, +Rdb_iterator_base::Rdb_iterator_base(THD *thd, ha_rocksdb *rocksdb_handler, const std::shared_ptr kd, const std::shared_ptr pkd, const Rdb_tbl_def *tbl_def) @@ -35,13 +35,20 @@ Rdb_iterator_base::Rdb_iterator_base(THD *thd, m_pkd(pkd), m_tbl_def(tbl_def), m_thd(thd), + m_rocksdb_handler(rocksdb_handler), m_scan_it(nullptr), m_scan_it_skips_bloom(false), m_scan_it_snapshot(nullptr), m_scan_it_lower_bound(nullptr), m_scan_it_upper_bound(nullptr), m_prefix_buf(nullptr), - m_table_type(tbl_def->get_table_type()) {} + m_table_type(tbl_def->get_table_type()) { + if (tbl_def->get_table_type() == INTRINSIC_TMP) { + if (m_rocksdb_handler) { + add_tmp_table_handler(m_thd, m_rocksdb_handler); + } + } +} Rdb_iterator_base::~Rdb_iterator_base() { release_scan_iterator(); @@ -51,6 +58,11 @@ Rdb_iterator_base::~Rdb_iterator_base() { m_scan_it_upper_bound = nullptr; my_free(m_prefix_buf); m_prefix_buf = nullptr; + if (m_table_type == INTRINSIC_TMP) { + if (m_rocksdb_handler) { + remove_tmp_table_handler(m_thd, m_rocksdb_handler); + } + } } int Rdb_iterator_base::read_before_key(const bool full_key_match, @@ -366,9 +378,9 @@ Rdb_iterator_partial::Rdb_iterator_partial( THD *thd, const std::shared_ptr kd, const std::shared_ptr pkd, const Rdb_tbl_def *tbl_def, TABLE *table, const dd::Table *dd_table) - : Rdb_iterator_base(thd, kd, pkd, tbl_def), + : Rdb_iterator_base(thd, nullptr, kd, pkd, tbl_def), m_table(table), - m_iterator_pk(thd, pkd, pkd, tbl_def), + m_iterator_pk(thd, nullptr, pkd, pkd, tbl_def), m_converter(thd, tbl_def, table, dd_table), m_valid(false), m_materialized(false), @@ -625,7 +637,7 @@ int Rdb_iterator_partial::materialize_prefix() { // It is possible that someone else has already materialized this group // before we locked. Double check if the prefix is still empty. - Rdb_iterator_base iter(m_thd, m_kd, m_pkd, m_tbl_def); + Rdb_iterator_base iter(m_thd, nullptr, m_kd, m_pkd, m_tbl_def); m_kd->get_infimum_key(m_cur_prefix_key, &tmp); int rc = iter.seek(HA_READ_KEY_EXACT, cur_prefix_key, false, cur_prefix_key, true /* read current */); @@ -651,7 +663,7 @@ int Rdb_iterator_partial::materialize_prefix() { } m_pkd->get_infimum_key(m_cur_prefix_key, &tmp); - Rdb_iterator_base iter_pk(m_thd, m_pkd, m_pkd, m_tbl_def); + Rdb_iterator_base iter_pk(m_thd, nullptr, m_pkd, m_pkd, m_tbl_def); rc = iter_pk.seek(HA_READ_KEY_EXACT, cur_prefix_key, false, cur_prefix_key, true /* read current */); size_t num_rows = 0; diff --git a/storage/rocksdb/rdb_iterator.h b/storage/rocksdb/rdb_iterator.h index 7e58d6adcd92..b564a12a16a9 100644 --- a/storage/rocksdb/rdb_iterator.h +++ b/storage/rocksdb/rdb_iterator.h @@ -69,6 +69,7 @@ class Rdb_iterator { virtual rocksdb::Slice key() = 0; virtual rocksdb::Slice value() = 0; virtual void reset() = 0; + virtual bool is_valid() = 0; }; class Rdb_iterator_base : public Rdb_iterator { @@ -86,7 +87,8 @@ class Rdb_iterator_base : public Rdb_iterator { int next_with_direction(bool move_forward, bool skip_next); public: - Rdb_iterator_base(THD *thd, const std::shared_ptr kd, + Rdb_iterator_base(THD *thd, ha_rocksdb *rocksdb_handler, + const std::shared_ptr kd, const std::shared_ptr pkd, const Rdb_tbl_def *tbl_def); @@ -109,6 +111,8 @@ class Rdb_iterator_base : public Rdb_iterator { void reset() override { release_scan_iterator(); } + bool is_valid() override { return is_valid_iterator(m_scan_it); } + protected: friend class Rdb_iterator; const std::shared_ptr m_kd; @@ -120,6 +124,8 @@ class Rdb_iterator_base : public Rdb_iterator { THD *m_thd; + ha_rocksdb *m_rocksdb_handler; + /* Iterator used for range scans and for full table/index scans */ rocksdb::Iterator *m_scan_it; @@ -222,6 +228,7 @@ class Rdb_iterator_partial : public Rdb_iterator_base { rocksdb::Slice key() override; rocksdb::Slice value() override; void reset() override; + bool is_valid() override { return false; } }; } // namespace myrocks