Skip to content

Commit

Permalink
Pass ColumnFamilyHandle around by reference; wrap Iterator in unique_ptr
Browse files Browse the repository at this point in the history
In preparation for the range locking patch, clean up some code related to
iterators:
- Pass rocksdb::ColumnFamilyHandle around and store in classes by references
  instead of pointers. It is never nullptr, nor is ever reseated in the classes
  using it.
- Wrap the rocksdb::Iterator objects in std::unique_ptr: change the factory
  method return types and class fields.
- In both cases above mark the classes containing reference fields as
  non-copyable and non-moveable as needed, matching their current usage.
- Mark touched methods [[nodiscard]] as applicable. In rdb_index_merge.h, remove
  many instances of redundant MY_ATTRIBUTE((__nonnull__)) too.
- Make touched local vars auto, but with reference or pointer pulled out,
  sometimes avoiding redundant returned object copies.
- Remove redundant instances of const from passed-by-value arguments.
  • Loading branch information
laurynas-biveinis committed Apr 10, 2024
1 parent 55e2d62 commit e763c6b
Show file tree
Hide file tree
Showing 14 changed files with 264 additions and 269 deletions.
255 changes: 126 additions & 129 deletions storage/rocksdb/ha_rocksdb.cc

Large diffs are not rendered by default.

50 changes: 23 additions & 27 deletions storage/rocksdb/ha_rocksdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
/* C++ standard header files */
#include <deque>
#include <map>
#include <memory>
#include <string>
#include <string_view>
#include <unordered_map>
Expand Down Expand Up @@ -653,11 +654,10 @@ class ha_rocksdb : public my_core::handler, public blob_buffer {
int delete_row(const uchar *const buf) override
MY_ATTRIBUTE((__warn_unused_result__));
void update_table_stats_if_needed();
rocksdb::Status delete_or_singledelete(uint index, Rdb_transaction *const tx,
rocksdb::ColumnFamilyHandle *const cf,
const rocksdb::Slice &key,
TABLE_TYPE table_type)
MY_ATTRIBUTE((__warn_unused_result__));

[[nodiscard]] rocksdb::Status delete_or_singledelete(
uint index, Rdb_transaction *tx, rocksdb::ColumnFamilyHandle &cf,
const rocksdb::Slice &key, TABLE_TYPE table_type);

int index_next(uchar *const buf) override
MY_ATTRIBUTE((__warn_unused_result__));
Expand Down Expand Up @@ -1205,18 +1205,17 @@ void remove_tmp_table_handler(THD *const thd, ha_rocksdb *rocksdb_handler);

const rocksdb::ReadOptions &rdb_tx_acquire_snapshot(Rdb_transaction *tx);

rocksdb::Iterator *rdb_tx_get_iterator(
THD *thd, rocksdb::ColumnFamilyHandle *const cf, bool skip_bloom_filter,
[[nodiscard]] std::unique_ptr<rocksdb::Iterator> rdb_tx_get_iterator(
THD *thd, rocksdb::ColumnFamilyHandle &cf, bool skip_bloom_filter,
const rocksdb::Slice &eq_cond_lower_bound,
const rocksdb::Slice &eq_cond_upper_bound,
const rocksdb::Snapshot **snapshot, TABLE_TYPE table_type,
bool read_current = false, bool create_snapshot = true);

rocksdb::Status rdb_tx_get(Rdb_transaction *tx,
rocksdb::ColumnFamilyHandle *const column_family,
const rocksdb::Slice &key,
rocksdb::PinnableSlice *const value,
TABLE_TYPE table_type);
[[nodiscard]] rocksdb::Status rdb_tx_get(
Rdb_transaction *tx, rocksdb::ColumnFamilyHandle &column_family,
const rocksdb::Slice &key, rocksdb::PinnableSlice *const value,
TABLE_TYPE table_type);

rocksdb::Status rdb_tx_get_for_update(Rdb_transaction *tx,
const Rdb_key_def &kd,
Expand All @@ -1229,36 +1228,33 @@ void rdb_tx_release_lock(Rdb_transaction *tx, const Rdb_key_def &kd,
const rocksdb::Slice &key, bool force);

void rdb_tx_multi_get(Rdb_transaction *tx,
rocksdb::ColumnFamilyHandle *const column_family,
const size_t num_keys, const rocksdb::Slice *keys,
rocksdb::ColumnFamilyHandle &column_family,
size_t num_keys, const rocksdb::Slice *keys,
rocksdb::PinnableSlice *values, TABLE_TYPE table_type,
rocksdb::Status *statuses, const bool sorted_input);
rocksdb::Status *statuses, bool sorted_input);

inline void rocksdb_smart_seek(bool seek_backward,
rocksdb::Iterator *const iter,
inline void rocksdb_smart_seek(bool seek_backward, rocksdb::Iterator &iter,
const rocksdb::Slice &key_slice) {
if (seek_backward) {
iter->SeekForPrev(key_slice);
iter.SeekForPrev(key_slice);
} else {
iter->Seek(key_slice);
iter.Seek(key_slice);
}
}

inline void rocksdb_smart_next(bool seek_backward,
rocksdb::Iterator *const iter) {
inline void rocksdb_smart_next(bool seek_backward, rocksdb::Iterator &iter) {
if (seek_backward) {
iter->Prev();
iter.Prev();
} else {
iter->Next();
iter.Next();
}
}

inline void rocksdb_smart_prev(bool seek_backward,
rocksdb::Iterator *const iter) {
inline void rocksdb_smart_prev(bool seek_backward, rocksdb::Iterator &iter) {
if (seek_backward) {
iter->Next();
iter.Next();
} else {
iter->Prev();
iter.Prev();
}
}

Expand Down
13 changes: 6 additions & 7 deletions storage/rocksdb/nosql_access.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1505,17 +1505,16 @@ class select_exec {
return false;
}

rocksdb::Iterator *get_iterator(rocksdb::ColumnFamilyHandle *cf,
bool use_bloom,
const rocksdb::Slice &lower_bound,
const rocksdb::Slice &upper_bound) {
[[nodiscard]] std::unique_ptr<rocksdb::Iterator> get_iterator(
rocksdb::ColumnFamilyHandle &cf, bool use_bloom,
const rocksdb::Slice &lower_bound, const rocksdb::Slice &upper_bound) {
return rdb_tx_get_iterator(m_thd, cf, !use_bloom, lower_bound,
upper_bound, nullptr, m_table_type);
}

rocksdb::Status get(rocksdb::ColumnFamilyHandle *cf,
const rocksdb::Slice &key_slice,
rocksdb::PinnableSlice *value_slice) {
[[nodiscard]] rocksdb::Status get(rocksdb::ColumnFamilyHandle &cf,
const rocksdb::Slice &key_slice,
rocksdb::PinnableSlice *value_slice) {
rocksdb::Status s;
return rdb_tx_get(m_tx, cf, key_slice, value_slice, m_table_type);
}
Expand Down
2 changes: 1 addition & 1 deletion storage/rocksdb/rdb_cf_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ struct Rdb_cf_scanner : public Rdb_tables_scanner {
for (uint i = 0; i < tdef->m_key_count; i++) {
const Rdb_key_def &kd = *tdef->m_key_descr_arr[i];

if (kd.get_cf()->GetID() == m_cf_id) {
if (kd.get_cf().GetID() == m_cf_id) {
return HA_EXIT_FAILURE;
}
}
Expand Down
8 changes: 4 additions & 4 deletions storage/rocksdb/rdb_datadic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ uint Rdb_key_def::setup(const TABLE &tbl, const Rdb_tbl_def &tbl_def,
m_stats.m_distinct_keys_per_prefix.resize(get_key_parts());

/* Cache prefix extractor for bloom filter usage later */
rocksdb::Options opt = rdb_get_rocksdb_db()->GetOptions(get_cf());
const auto opt = rdb_get_rocksdb_db()->GetOptions(&get_cf());
m_prefix_extractor = opt.prefix_extractor;

uint rtn = setup_vector_index(tbl, tbl_def, cmd_srv_helper);
Expand Down Expand Up @@ -4205,15 +4205,15 @@ bool Rdb_tbl_def::put_dict(Rdb_dict_manager *const dict,
for (uint i = 0; i < m_key_count; i++) {
const Rdb_key_def &kd = *m_key_descr_arr[i];

const uint cf_id = kd.get_cf()->GetID();
const auto cf_id = kd.get_cf().GetID();
/*
If cf_id already exists, cf_flags must be the same.
To prevent race condition, reading/modifying/committing CF flags
need to be protected by mutex (dict_manager->lock()).
When RocksDB supports transaction with pessimistic concurrency
control, we can switch to use it and removing mutex.
*/
const std::string cf_name = kd.get_cf()->GetName();
const auto &cf_name = kd.get_cf().GetName();

std::shared_ptr<rocksdb::ColumnFamilyHandle> cfh =
cf_manager->get_cf(cf_name);
Expand Down Expand Up @@ -4366,7 +4366,7 @@ int Rdb_ddl_manager::find_in_uncommitted_keydef(const uint32_t cf_id) {
for (const auto &pr : m_index_num_to_uncommitted_keydef) {
const auto &kd = pr.second;

if (kd->get_cf()->GetID() == cf_id) {
if (kd->get_cf().GetID() == cf_id) {
mysql_rwlock_unlock(&m_rwlock);
return HA_EXIT_FAILURE;
}
Expand Down
5 changes: 4 additions & 1 deletion storage/rocksdb/rdb_datadic.h
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,10 @@ class Rdb_key_def {
const Rdb_tbl_def &tbl_def_arg, bool &per_part_match_found,
const char *const qualifier);

rocksdb::ColumnFamilyHandle *get_cf() const { return m_cf_handle.get(); }
[[nodiscard]] rocksdb::ColumnFamilyHandle &get_cf() const {
return *m_cf_handle;
}

std::shared_ptr<rocksdb::ColumnFamilyHandle> get_shared_cf() const {
return m_cf_handle;
}
Expand Down
2 changes: 1 addition & 1 deletion storage/rocksdb/rdb_i_s.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1494,7 +1494,7 @@ int Rdb_ddl_scanner::add_table(Rdb_tbl_def *tdef) {
field[RDB_DDL_FIELD::TTL_DURATION]->store(kd.m_ttl_duration, true);
field[RDB_DDL_FIELD::INDEX_FLAGS]->store(kd.m_index_flags_bitmap, true);

std::string cf_name = kd.get_cf()->GetName();
const auto &cf_name = kd.get_cf().GetName();
field[RDB_DDL_FIELD::CF]->store(cf_name.c_str(), cf_name.size(),
system_charset_info);
ulonglong auto_incr;
Expand Down
8 changes: 4 additions & 4 deletions storage/rocksdb/rdb_index_merge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@

namespace myrocks {

Rdb_index_merge::Rdb_index_merge(const char *const tmpfile_path,
Rdb_index_merge::Rdb_index_merge(const char *tmpfile_path,
const ulonglong merge_buf_size,
const ulonglong merge_combine_read_size,
const ulonglong merge_tmp_file_removal_delay,
rocksdb::ColumnFamilyHandle *cf)
rocksdb::ColumnFamilyHandle &cf)
: m_tmpfile_path(tmpfile_path),
m_merge_buf_size(merge_buf_size),
m_merge_combine_read_size(merge_combine_read_size),
Expand Down Expand Up @@ -198,7 +198,7 @@ int Rdb_index_merge::add(const rocksdb::Slice &key, const rocksdb::Slice &val) {
/* Find sort order of the new record */
auto res =
m_offset_tree.emplace(m_rec_buf_unsorted->m_block.get() + rec_offset,
m_cf_handle->GetComparator());
m_cf_handle.GetComparator());
if (!res.second) {
my_printf_error(ER_DUP_ENTRY,
"Failed to insert the record: the key already exists",
Expand Down Expand Up @@ -309,7 +309,7 @@ int Rdb_index_merge::merge_heap_prepare() {
/* Allocate buffers for each chunk */
for (ulonglong i = 0; i < m_merge_file.m_num_sort_buffers; i++) {
const auto entry =
std::make_shared<merge_heap_entry>(m_cf_handle->GetComparator());
std::make_shared<merge_heap_entry>(m_cf_handle.GetComparator());

/*
Read chunk_size bytes from each chunk on disk, and place inside
Expand Down
74 changes: 36 additions & 38 deletions storage/rocksdb/rdb_index_merge.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ class Rdb_key_def;
class Rdb_tbl_def;

class Rdb_index_merge {
Rdb_index_merge(const Rdb_index_merge &p) = delete;
Rdb_index_merge &operator=(const Rdb_index_merge &p) = delete;
Rdb_index_merge(const Rdb_index_merge &) = delete;
Rdb_index_merge &operator=(const Rdb_index_merge &) = delete;
Rdb_index_merge(const Rdb_index_merge &&) = delete;
Rdb_index_merge &operator=(Rdb_index_merge &&) = delete;

public:
/* Information about temporary files used in external merge sort */
Expand All @@ -67,15 +69,13 @@ class Rdb_index_merge {
ulonglong m_disk_curr_offset; /* current offset on disk */
uint64 m_total_size; /* total # of data bytes in chunk */

void store_key_value(const rocksdb::Slice &key, const rocksdb::Slice &val)
MY_ATTRIBUTE((__nonnull__));
void store_key_value(const rocksdb::Slice &key, const rocksdb::Slice &val);

void store_slice(const rocksdb::Slice &slice) MY_ATTRIBUTE((__nonnull__));
void store_slice(const rocksdb::Slice &slice);

size_t prepare(File fd, ulonglong f_offset) MY_ATTRIBUTE((__nonnull__));
[[nodiscard]] size_t prepare(File fd, ulonglong f_offset);

int read_next_chunk_from_disk(File fd)
MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
[[nodiscard]] int read_next_chunk_from_disk(File fd);

inline bool is_chunk_finished() const {
return m_curr_offset + m_disk_curr_offset - m_disk_start_offset ==
Expand Down Expand Up @@ -109,17 +109,16 @@ class Rdb_index_merge {
rocksdb::Slice m_key; /* current key pointed to by block ptr */
rocksdb::Slice m_val;

size_t prepare(File fd, ulonglong f_offset, ulonglong chunk_size)
MY_ATTRIBUTE((__nonnull__));
[[nodiscard]] size_t prepare(File fd, ulonglong f_offset,
ulonglong chunk_size);

int read_next_chunk_from_disk(File fd)
MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
[[nodiscard]] int read_next_chunk_from_disk(File fd);

int read_rec(rocksdb::Slice *const key, rocksdb::Slice *const val)
MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
[[nodiscard]] int read_rec(rocksdb::Slice *const key,
rocksdb::Slice *const val);

int read_slice(rocksdb::Slice *const slice, const uchar **block_ptr)
MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
[[nodiscard]] int read_slice(rocksdb::Slice *const slice,
const uchar **block_ptr);

explicit merge_heap_entry(const rocksdb::Comparator *const comparator)
: m_chunk_info(nullptr), m_block(nullptr), m_comparator(comparator) {}
Expand Down Expand Up @@ -152,7 +151,7 @@ class Rdb_index_merge {
const ulonglong m_merge_buf_size;
const ulonglong m_merge_combine_read_size;
const ulonglong m_merge_tmp_file_removal_delay;
rocksdb::ColumnFamilyHandle *m_cf_handle;
rocksdb::ColumnFamilyHandle &m_cf_handle;
struct merge_file_info m_merge_file;
std::shared_ptr<merge_buf_info> m_rec_buf_unsorted;
std::shared_ptr<merge_buf_info> m_output_buf;
Expand Down Expand Up @@ -180,9 +179,9 @@ class Rdb_index_merge {
return rocksdb::Slice(reinterpret_cast<const char *>(block), len);
}

static int merge_record_compare(const uchar *a_block, const uchar *b_block,
const rocksdb::Comparator *const comparator)
MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
[[nodiscard]] static int merge_record_compare(
const uchar *a_block, const uchar *b_block,
const rocksdb::Comparator *const comparator) MY_ATTRIBUTE((__nonnull__));

void merge_read_rec(const uchar *const block, rocksdb::Slice *const key,
rocksdb::Slice *const val) MY_ATTRIBUTE((__nonnull__));
Expand All @@ -191,37 +190,36 @@ class Rdb_index_merge {
MY_ATTRIBUTE((__nonnull__));

public:
Rdb_index_merge(const char *const tmpfile_path,
const ulonglong merge_buf_size,
const ulonglong merge_combine_read_size,
const ulonglong merge_tmp_file_removal_delay,
rocksdb::ColumnFamilyHandle *cf);
Rdb_index_merge(const char *tmpfile_path, ulonglong merge_buf_size,
ulonglong merge_combine_read_size,
ulonglong merge_tmp_file_removal_delay,
rocksdb::ColumnFamilyHandle &cf);
~Rdb_index_merge();

int init() MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
[[nodiscard]] int init();

int merge_file_create() MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
[[nodiscard]] int merge_file_create();

int add(const rocksdb::Slice &key, const rocksdb::Slice &val)
MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
[[nodiscard]] int add(const rocksdb::Slice &key, const rocksdb::Slice &val);

int merge_buf_write() MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
[[nodiscard]] int merge_buf_write();

int next(rocksdb::Slice *const key, rocksdb::Slice *const val)
MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
[[nodiscard]] int next(rocksdb::Slice *const key, rocksdb::Slice *const val);

int merge_heap_prepare() MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
[[nodiscard]] int merge_heap_prepare();

void merge_heap_top(rocksdb::Slice *key, rocksdb::Slice *val)
MY_ATTRIBUTE((__nonnull__));

int merge_heap_pop_and_get_next(rocksdb::Slice *const key,
rocksdb::Slice *const val)
MY_ATTRIBUTE((__nonnull__, __warn_unused_result__));
[[nodiscard]] int merge_heap_pop_and_get_next(rocksdb::Slice *const key,
rocksdb::Slice *const val)
MY_ATTRIBUTE((__nonnull__));

void merge_reset();

rocksdb::ColumnFamilyHandle *get_cf() const { return m_cf_handle; }
};
[[nodiscard]] rocksdb::ColumnFamilyHandle &get_cf() const {
return m_cf_handle;
}
};

} // namespace myrocks
Loading

0 comments on commit e763c6b

Please sign in to comment.