Skip to content

Commit

Permalink
Add an option to abort/ignore server when hitting data corruption
Browse files Browse the repository at this point in the history
Upstream commit ID: facebook/mysql-5.6@6ea4040
PS-8494: Merge percona-202206 (https://jira.percona.com/browse/PS-8494)

Summary:
When hitting the HA_ERR_ROCKSDB_CORRUPT_DATA error in myrocks, the current behavior is to fail the query. We want to change the behavior to provide a choice between:
1. Fail the query with error HA_ERR_ROCKSDB_CORRUPT_DATA
2. Crash the server
3. Pass the query with warning

We introduce one enum corrupt_data_action in myrocks to control the behavior.

The reason for the behavior change is:
- [ABORT_SERVER]
When the corrupted data is introduced in myrocks layer due to some reason (e.g random hardware error), we want to crash the server to prevent the corrupted data being copied to other instances in the replicaset.
- [WARNING]
We also want to provide the ability for quick mitigation. We can quickly delete the corrupted data if we pass the query with warning. This is only used for issue mitigation reason.

The diff is to fix the issue of S301551.

Reviewed By: yoshinorim

Differential Revision: D40390954

fbshipit-source-id: f46ff698e552c0cac2ca8fd65aaa80d654d47480
  • Loading branch information
Chun Ni authored and inikep committed Apr 26, 2023
1 parent a171a14 commit f5a2d9a
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
SET SESSION debug="+d, stimulate_corrupt_data_read";
CREATE DATABASE a;
USE a;
CREATE TABLE t1 (a INT PRIMARY KEY, b CHAR(8)) ENGINE=rocksdB;
INSERT INTO t1 VALUES (3, 'bar');
INSERT INTO t1 VALUES (4, 'bar');
read
fail query
SET GLOBAL rocksdb_corrupt_data_action = ERROR;
select * from t1;
ERROR HY000: Got error 505 'Found data corruption.' from ROCKSDB
fail server
SET GLOBAL rocksdb_corrupt_data_action = ABORT_SERVER;
select * from t1;
SET SESSION debug="+d, stimulate_corrupt_data_read";
pass query with warning
SET GLOBAL rocksdb_corrupt_data_action = WARNING;
select * from t1;
a b
3 bar
4 bar
SET SESSION debug="-d, stimulate_corrupt_data_read";
write
USE a;
CREATE table t2 (
pk0 int primary key auto_increment,
sk int,
val int default 0,
unique(sk)
) engine=rocksdb;
insert into t2 (sk) values (1), (2);
SET SESSION debug="+d, stimulate_corrupt_data_update";
fail query
SET GLOBAL rocksdb_corrupt_data_action = ERROR;
insert into t2 (sk) values (1), (2) on duplicate key update val = val + 1;
ERROR HY000: Got error 505 'Found data corruption.' from ROCKSDB
fail server
SET GLOBAL rocksdb_corrupt_data_action = ABORT_SERVER;
insert into t2 (sk) values (1), (2) on duplicate key update val = val + 1;
SET SESSION debug="+d, stimulate_corrupt_data_update";
pass query with warning
SET GLOBAL rocksdb_corrupt_data_action = WARNING;
insert into t2 (sk) values (1), (2) on duplicate key update val = val + 1;
SET SESSION debug="-d, stimulate_corrupt_data_update";
select * from t2;
pk0 sk val
1 1 1
2 2 1
DROP database a;
SET GLOBAL rocksdb_corrupt_data_action = default;
1 change: 1 addition & 0 deletions mysql-test/suite/rocksdb/r/rocksdb.result
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,7 @@ rocksdb_compaction_sequential_deletes_count_sd OFF
rocksdb_compaction_sequential_deletes_file_size 0
rocksdb_compaction_sequential_deletes_window 0
rocksdb_concurrent_prepare ON
rocksdb_corrupt_data_action ERROR
rocksdb_create_checkpoint
rocksdb_create_if_missing ON
rocksdb_create_missing_column_families OFF
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
--nowarnings
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
--source include/have_debug.inc
--source include/not_parallel.inc
--source include/have_rocksdb.inc
--source suite/rocksdb/include/have_write_committed.inc
--source include/parse_rocksdb_fs_uri.inc

if (!$rocksdb_zenfs_disabled)
{
--skip Test doesn't support ZenFS
}

let DATADIR_LOCATION=$MYSQLTEST_VARDIR/mysqld.1/data;

SET SESSION debug="+d, stimulate_corrupt_data_read";

CREATE DATABASE a;
USE a;
CREATE TABLE t1 (a INT PRIMARY KEY, b CHAR(8)) ENGINE=rocksdB;

INSERT INTO t1 VALUES (3, 'bar');
INSERT INTO t1 VALUES (4, 'bar');

--echo read

--echo fail query
SET GLOBAL rocksdb_corrupt_data_action = ERROR;
--error 1296
select * from t1;

--echo fail server
SET GLOBAL rocksdb_corrupt_data_action = ABORT_SERVER;
--error 0,CR_SERVER_LOST,ER_INTERNAL_ERROR,1296
select * from t1;

--remove_file $DATADIR_LOCATION/.rocksdb/ROCKSDB_CORRUPTED

--exec echo "restart" > $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
--enable_reconnect
let $WAIT_COUNT=6000;
--source include/wait_time_until_connected_again.inc

SET SESSION debug="+d, stimulate_corrupt_data_read";

--echo pass query with warning
SET GLOBAL rocksdb_corrupt_data_action = WARNING;
select * from t1;

SET SESSION debug="-d, stimulate_corrupt_data_read";

--echo write

USE a;
CREATE table t2 (
pk0 int primary key auto_increment,
sk int,
val int default 0,
unique(sk)
) engine=rocksdb;

insert into t2 (sk) values (1), (2);

SET SESSION debug="+d, stimulate_corrupt_data_update";

--echo fail query
SET GLOBAL rocksdb_corrupt_data_action = ERROR;
--error 1296
insert into t2 (sk) values (1), (2) on duplicate key update val = val + 1;

--echo fail server
SET GLOBAL rocksdb_corrupt_data_action = ABORT_SERVER;
--error 0,CR_SERVER_LOST,ER_INTERNAL_ERROR,1296
insert into t2 (sk) values (1), (2) on duplicate key update val = val + 1;

--remove_file $DATADIR_LOCATION/.rocksdb/ROCKSDB_CORRUPTED

--exec echo "restart" > $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
--enable_reconnect
let $WAIT_COUNT=6000;
--source include/wait_time_until_connected_again.inc

SET SESSION debug="+d, stimulate_corrupt_data_update";

--echo pass query with warning
SET GLOBAL rocksdb_corrupt_data_action = WARNING;
insert into t2 (sk) values (1), (2) on duplicate key update val = val + 1;

SET SESSION debug="-d, stimulate_corrupt_data_update";

select * from t2;

DROP database a;

SET GLOBAL rocksdb_corrupt_data_action = default;
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
SET GLOBAL rocksdb_corrupt_data_action = ERROR;
SET GLOBAL rocksdb_corrupt_data_action = ABORT_SERVER;
SET GLOBAL rocksdb_corrupt_data_action = WARNING;
== wrong argument type
SET GLOBAL rocksdb_corrupt_data_action = "abc";
ERROR 42000: Variable 'rocksdb_corrupt_data_action' can't be set to the value of 'abc'
== wrong argument type
SET GLOBAL rocksdb_corrupt_data_action = 4;
ERROR 42000: Variable 'rocksdb_corrupt_data_action' can't be set to the value of '4'
SET GLOBAL rocksdb_corrupt_data_action = default;
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
--source include/have_rocksdb.inc

SET GLOBAL rocksdb_corrupt_data_action = ERROR;
SET GLOBAL rocksdb_corrupt_data_action = ABORT_SERVER;
SET GLOBAL rocksdb_corrupt_data_action = WARNING;

--echo == wrong argument type
--error 1231
SET GLOBAL rocksdb_corrupt_data_action = "abc";

--echo == wrong argument type
--error 1231
SET GLOBAL rocksdb_corrupt_data_action = 4;

SET GLOBAL rocksdb_corrupt_data_action = default;
65 changes: 56 additions & 9 deletions storage/rocksdb/ha_rocksdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ static int rocksdb_validate_max_bottom_pri_background_compactions(
THD *thd MY_ATTRIBUTE((__unused__)),
struct SYS_VAR *const var MY_ATTRIBUTE((__unused__)), void *var_ptr,
struct st_mysql_value *value);
static int handle_rocksdb_corrupt_data_error();
//////////////////////////////////////////////////////////////////////////////
// Options definitions
//////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -743,6 +744,8 @@ Regex_list_handler rdb_read_free_regex_handler;
#endif
enum read_free_rpl_type { OFF = 0, PK_ONLY, PK_SK };
static ulong rocksdb_read_free_rpl = read_free_rpl_type::OFF;
enum corrupt_data_action { ERROR = 0, ABORT_SERVER, WARNING };
static ulong rocksdb_corrupt_data_action = corrupt_data_action::ERROR;
static bool rocksdb_error_on_suboptimal_collation = false;
static uint32_t rocksdb_stats_recalc_rate = 0;
static bool rocksdb_no_create_column_family = false;
Expand Down Expand Up @@ -910,6 +913,22 @@ static int rocksdb_tracing(THD *const thd MY_ATTRIBUTE((__unused__)),
return HA_EXIT_SUCCESS;
}

static int handle_rocksdb_corrupt_data_error() {
switch (rocksdb_corrupt_data_action) {
case corrupt_data_action::ABORT_SERVER:
LogPluginErrMsg(ERROR_LEVEL, 0,
"Aborting on HA_ERR_ROCKSDB_CORRUPT_DATA error.");
rdb_persist_corruption_marker();
abort();
case corrupt_data_action::WARNING:
LogPluginErrMsg(WARNING_LEVEL, 0,
"Hit error HA_ERR_ROCKSDB_CORRUPT_DATA.");
return 0;
default:
return HA_ERR_ROCKSDB_CORRUPT_DATA;
}
}

static std::unique_ptr<rocksdb::DBOptions> rdb_init_rocksdb_db_options(void) {
auto o = std::unique_ptr<rocksdb::DBOptions>(new rocksdb::DBOptions());

Expand Down Expand Up @@ -984,6 +1003,14 @@ static TYPELIB bottommost_level_compaction_typelib = {
"bottommost_level_compaction_typelib", bottommost_level_compaction_names,
nullptr};

/* This enum needs to be kept up to date with corrupt_data_action */
static const char *corrupt_data_action_names[] = {"ERROR", "ABORT_SERVER",
"WARNING", NullS};

static TYPELIB corrupt_data_action_typelib = {
array_elements(corrupt_data_action_names) - 1,
"corrupt_data_action_typelib", corrupt_data_action_names, nullptr};

static void rocksdb_set_rocksdb_info_log_level(
THD *const thd MY_ATTRIBUTE((__unused__)),
struct SYS_VAR *const var MY_ATTRIBUTE((__unused__)),
Expand Down Expand Up @@ -1291,6 +1318,12 @@ static MYSQL_SYSVAR_BOOL(
"Use write batches for replication thread instead of tx api", nullptr,
nullptr, false);

static MYSQL_SYSVAR_ENUM(
corrupt_data_action, rocksdb_corrupt_data_action, PLUGIN_VAR_RQCMDARG,
"Control behavior when hitting data corruption. We can fail the query, "
"crash the server or pass the query and give users a warning. ",
nullptr, nullptr, corrupt_data_action::ERROR, &corrupt_data_action_typelib);

static MYSQL_THDVAR_BOOL(skip_bloom_filter_on_read,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_HINTUPDATEABLE,
"Skip using bloom filter for reads", nullptr, nullptr,
Expand Down Expand Up @@ -2661,6 +2694,7 @@ static struct SYS_VAR *rocksdb_system_variables[] = {
MYSQL_SYSVAR(alter_table_comment_inplace),
MYSQL_SYSVAR(column_default_value_as_expression),
MYSQL_SYSVAR(enable_delete_range_for_drop_index),
MYSQL_SYSVAR(corrupt_data_action),
nullptr};

static bool is_tmp_table(const std::string &tablename) {
Expand Down Expand Up @@ -3437,12 +3471,12 @@ class Rdb_transaction {
// Determine the length of the new group prefix
Rdb_string_reader reader(&merge_key);
if ((!reader.read(Rdb_key_def::INDEX_NUMBER_SIZE))) {
rc2 = HA_ERR_ROCKSDB_CORRUPT_DATA;
rc2 = handle_rocksdb_corrupt_data_error();
break;
}
for (uint i = 0; i < keydef->partial_index_keyparts(); i++) {
if (keydef->read_memcmp_key_part(&reader, i) > 0) {
rc2 = HA_ERR_ROCKSDB_CORRUPT_DATA;
rc2 = handle_rocksdb_corrupt_data_error();
break;
}
}
Expand Down Expand Up @@ -7188,14 +7222,14 @@ int ha_rocksdb::read_hidden_pk_id_from_rowkey(longlong *const hidden_pk_id) {
// Get hidden primary key from old key slice
Rdb_string_reader reader(&rowkey_slice);
if ((!reader.read(Rdb_key_def::INDEX_NUMBER_SIZE))) {
return HA_ERR_ROCKSDB_CORRUPT_DATA;
return handle_rocksdb_corrupt_data_error();
}

const int length = Field_longlong::PACK_LENGTH;
const uchar *from = reinterpret_cast<const uchar *>(reader.read(length));
if (from == nullptr) {
/* Mem-comparable image doesn't have enough bytes */
return HA_ERR_ROCKSDB_CORRUPT_DATA;
return handle_rocksdb_corrupt_data_error();
}

*hidden_pk_id = rdb_netbuf_read_uint64(&from);
Expand Down Expand Up @@ -7469,7 +7503,13 @@ int ha_rocksdb::convert_record_from_storage_format(
assert(key != nullptr);
assert(buf != nullptr);

return m_converter->decode(m_pk_descr, buf, key, value);
int rc = m_converter->decode(m_pk_descr, buf, key, value);

DBUG_EXECUTE_IF("stimulate_corrupt_data_read",
{ rc = HA_ERR_ROCKSDB_CORRUPT_DATA; });

return rc == HA_ERR_ROCKSDB_CORRUPT_DATA ? handle_rocksdb_corrupt_data_error()
: rc;
}

int ha_rocksdb::alloc_key_buffers(const TABLE *const table_arg,
Expand Down Expand Up @@ -8797,7 +8837,7 @@ int ha_rocksdb::create(const char *const name, TABLE *const table_arg,
table_def));
} else {
my_error(ER_METADATA_INCONSISTENCY, MYF(0), str.c_str());
DBUG_RETURN(HA_ERR_ROCKSDB_CORRUPT_DATA);
DBUG_RETURN(handle_rocksdb_corrupt_data_error());
}
}
DBUG_RETURN(create_table(str, create_info->actual_user_table_name, table_arg,
Expand Down Expand Up @@ -9641,8 +9681,9 @@ int ha_rocksdb::get_row_by_sk(uchar *buf, const Rdb_key_def &kd,

const uint size =
kd.get_primary_key_tuple(*m_pk_descr, key, m_pk_packed_tuple);

if (size == RDB_INVALID_KEY_LEN) {
DBUG_RETURN(HA_ERR_ROCKSDB_CORRUPT_DATA);
DBUG_RETURN(handle_rocksdb_corrupt_data_error());
}

m_last_rowkey.copy((const char *)m_pk_packed_tuple, size, &my_charset_bin);
Expand Down Expand Up @@ -9842,6 +9883,10 @@ int ha_rocksdb::index_next_with_direction_intern(uchar *const buf,
}
}

if (rc == HA_ERR_ROCKSDB_CORRUPT_DATA) {
rc = handle_rocksdb_corrupt_data_error();
}

DBUG_RETURN(rc);
}

Expand Down Expand Up @@ -10548,8 +10593,10 @@ int ha_rocksdb::check_and_lock_sk(
const rocksdb::Slice &rkey = all_parts_used ? new_slice : iter.key();
uint pk_size =
kd.get_primary_key_tuple(*m_pk_descr, &rkey, m_pk_packed_tuple);
DBUG_EXECUTE_IF("stimulate_corrupt_data_update",
{ pk_size = RDB_INVALID_KEY_LEN; });
if (pk_size == RDB_INVALID_KEY_LEN) {
rc = HA_ERR_ROCKSDB_CORRUPT_DATA;
rc = handle_rocksdb_corrupt_data_error();
} else {
m_dup_key_found = true;
m_last_rowkey.copy((const char *)m_pk_packed_tuple, pk_size,
Expand Down Expand Up @@ -11654,7 +11701,7 @@ int ha_rocksdb::rnd_pos(uchar *const buf, uchar *const pos) {
len = m_pk_descr->key_length(table,
rocksdb::Slice((const char *)pos, ref_length));
if (len == size_t(-1)) {
DBUG_RETURN(HA_ERR_ROCKSDB_CORRUPT_DATA); /* Data corruption? */
DBUG_RETURN(handle_rocksdb_corrupt_data_error()); /* Data corruption? */
}

rc = get_row_by_rowid(buf, pos, len);
Expand Down

0 comments on commit f5a2d9a

Please sign in to comment.