Skip to content

Commit 31b9380

Browse files
feat:Stream base commands (OpenAtomFoundation#1955)
* [develop] finish StreamMetaValue, ParsedStreamMetaValue in stream_meta_value.h * [develop] add other cgroup, consumer and pel meta value * [develop] move stream metadata out of the storage layer and rewrite it. * [develop] add two class :TreeIDGenerator, StreamUtil * [develop] add some helper func * [develop] xadd cmd's most function finished * [develop] XREAD command parse * [temp] add xgroup cmmand parse * [develop] XGROUP commad parse, XGROUP CREATE, XGROUP CREATECONSUMER finished * [develop] XRANGE, XREAD finished, add some unit test * [develop] XREADGROUP finished * [develop] XLEN finished * [develop] XACK command finished * [develop] DEL command support, XGROUP DESTORY finished, registered some command. * [debug] fix the problem of CMakeFile in test/stream * Add xgroup help command * fix conflicts * [debug] fix the compile problem in debug mode * [develop] XTRIM command finished * [debug] fix some problem of XTRIM command * [develop] remove stream cpp utest, fix some bug and pass 23 unit test * [develop] ajust code structure, add 2 tests * [develop] XCLAIM command finished * [test] now pass 30 tests * [develop] ajust code structure, fix a return value bug of XRANGE * [develop] XREVRANGE cmd finished and passed tests * [develop] add 4 more tests of XADD and XDEL * [develop] Adjusting code structure * [develop] change const shared_ptr & to raw pointer * [develop] remove xgroup commands implementation * [fix] fix compile problem on mac and centos * [fix] add licence, fix compile problem * [fix] code format * [fix] fix compile problem on macos * [fix] fix macos compile problem again, add log to find ubuntu test problem * fix the change of config file * [fix] fix a bug found by sanitizer * [fix] add test: XADD large data, triggering flushing, XREAD, XLEN, XRANG, XTRIIM should work * "fix: ./stream_test.go:76:6: randomInt redeclared in this block" * fix: add replication test of stream * fix: a bug of XADD replication and add a stronger replication test. The XADD command in Redis has the capability to generate IDs based on timestamps. To ensure that repeated execution of a command results in the same timestamp, it is necessary to include the generated ID in the command when generating the binlog. This way, when replaying or replicating, the same ID will be obtained. * fix: compile bug. * fix: add concurrency test of stream. --------- Co-authored-by: Jinghui <vere.lee@foxmail.com>
1 parent 9ea7ad5 commit 31b9380

12 files changed

+3141
-2
lines changed

include/pika_command.h

+14
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,17 @@ const std::string kCmdNamePubSub = "pubsub";
231231
const std::string kCmdNamePSubscribe = "psubscribe";
232232
const std::string kCmdNamePUnSubscribe = "punsubscribe";
233233

234+
// Stream
235+
const std::string kCmdNameXAdd = "xadd";
236+
const std::string kCmdNameXDel = "xdel";
237+
const std::string kCmdNameXRead = "xread";
238+
const std::string kCmdNameXLen = "xlen";
239+
const std::string kCmdNameXRange = "xrange";
240+
const std::string kCmdNameXRevrange = "xrevrange";
241+
const std::string kCmdNameXTrim = "xtrim";
242+
const std::string kCmdNameXInfo = "xinfo";
243+
244+
234245
const std::string kClusterPrefix = "pkcluster";
235246
using PikaCmdArgsType = net::RedisCmdArgsType;
236247
static const int RAW_ARGS_LEN = 1024 * 1024;
@@ -272,6 +283,8 @@ enum CmdFlags {
272283
kCmdFlagsDoNotSpecifySlot = 0, // default do not specify slot
273284
kCmdFlagsSingleSlot = 512,
274285
kCmdFlagsMultiSlot = 1024,
286+
kCmdFlagsPreDo = 2048,
287+
kCmdFlagsStream = 1536,
275288
kCmdFlagsReadCache = 128,
276289
kCmdFlagsUpdateCache = 2048,
277290
kCmdFlagsDoThroughDB = 4096,
@@ -322,6 +335,7 @@ class CmdRes {
322335

323336
bool none() const { return ret_ == kNone && message_.empty(); }
324337
bool ok() const { return ret_ == kOk || ret_ == kNone; }
338+
CmdRet ret() const { return ret_; }
325339
void clear() {
326340
message_.clear();
327341
ret_ = kNone;

include/pika_stream.h

+167
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
// Copyright (c) 2018-present, Qihoo, Inc. All rights reserved.
2+
// This source code is licensed under the BSD-style license found in the
3+
// LICENSE file in the root directory of this source tree. An additional grant
4+
// of patent rights can be found in the PATENTS file in the same directory.
5+
6+
#ifndef PIKA_STREAM_H_
7+
#define PIKA_STREAM_H_
8+
9+
#include "include/pika_command.h"
10+
#include "include/pika_slot.h"
11+
#include "include/pika_stream_base.h"
12+
#include "include/pika_stream_meta_value.h"
13+
#include "include/pika_stream_types.h"
14+
#include "storage/storage.h"
15+
16+
/*
17+
* stream
18+
*/
19+
20+
inline void ParseAddOrTrimArgsOrReply(CmdRes& res, const PikaCmdArgsType& argv, StreamAddTrimArgs& args, int* idpos,
21+
bool is_xadd);
22+
23+
inline void ParseReadOrReadGroupArgsOrReply(CmdRes& res, const PikaCmdArgsType& argv, StreamReadGroupReadArgs& args,
24+
bool is_xreadgroup);
25+
26+
// @field_values is the result of ScanStream.
27+
// field is the serialized message id,
28+
// value is the serialized message.
29+
inline void AppendMessagesToRes(CmdRes& res, std::vector<storage::FieldValue>& field_values, const Slot* slot);
30+
31+
class XAddCmd : public Cmd {
32+
public:
33+
XAddCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){};
34+
std::vector<std::string> current_key() const override { return {key_}; }
35+
void Do(std::shared_ptr<Slot> slot = nullptr) override;
36+
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
37+
void Merge() override{};
38+
Cmd* Clone() override { return new XAddCmd(*this); }
39+
40+
private:
41+
std::string key_;
42+
StreamAddTrimArgs args_;
43+
int field_pos_{0};
44+
45+
void DoInitial() override;
46+
inline void GenerateStreamIDOrReply(const StreamMetaValue& stream_meta);
47+
};
48+
49+
class XDelCmd : public Cmd {
50+
public:
51+
XDelCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){};
52+
std::vector<std::string> current_key() const override { return {key_}; }
53+
void Do(std::shared_ptr<Slot> slot = nullptr) override;
54+
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
55+
void Merge() override{};
56+
Cmd* Clone() override { return new XDelCmd(*this); }
57+
58+
private:
59+
std::string key_;
60+
std::vector<streamID> ids_;
61+
62+
void DoInitial() override;
63+
void Clear() override { ids_.clear(); }
64+
inline void SetFirstOrLastIDOrReply(StreamMetaValue& stream_meta, const Slot* slot, bool is_set_first);
65+
inline void SetFirstIDOrReply(StreamMetaValue& stream_meta, const Slot* slot);
66+
inline void SetLastIDOrReply(StreamMetaValue& stream_meta, const Slot* slot);
67+
};
68+
69+
class XReadCmd : public Cmd {
70+
public:
71+
XReadCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){};
72+
void Do(std::shared_ptr<Slot> slot = nullptr) override;
73+
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
74+
void Merge() override{};
75+
Cmd* Clone() override { return new XReadCmd(*this); }
76+
77+
private:
78+
StreamReadGroupReadArgs args_;
79+
80+
void DoInitial() override;
81+
void Clear() override {
82+
args_.unparsed_ids.clear();
83+
args_.keys.clear();
84+
}
85+
};
86+
87+
class XRangeCmd : public Cmd {
88+
public:
89+
XRangeCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){};
90+
void Do(std::shared_ptr<Slot> slot = nullptr) override;
91+
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
92+
void Merge() override{};
93+
Cmd* Clone() override { return new XRangeCmd(*this); }
94+
95+
protected:
96+
std::string key_;
97+
streamID start_sid;
98+
streamID end_sid;
99+
int32_t count_{INT32_MAX};
100+
bool start_ex_{false};
101+
bool end_ex_{false};
102+
103+
void DoInitial() override;
104+
};
105+
106+
class XRevrangeCmd : public XRangeCmd {
107+
public:
108+
XRevrangeCmd(const std::string& name, int arity, uint16_t flag) : XRangeCmd(name, arity, flag){};
109+
void Do(std::shared_ptr<Slot> slot = nullptr) override;
110+
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
111+
void Merge() override{};
112+
Cmd* Clone() override { return new XRevrangeCmd(*this); }
113+
};
114+
115+
class XLenCmd : public Cmd {
116+
public:
117+
XLenCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){};
118+
void Do(std::shared_ptr<Slot> slot = nullptr) override;
119+
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
120+
void Merge() override{};
121+
Cmd* Clone() override { return new XLenCmd(*this); }
122+
123+
private:
124+
std::string key_;
125+
126+
void DoInitial() override;
127+
};
128+
129+
class XTrimCmd : public Cmd {
130+
public:
131+
XTrimCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){};
132+
std::vector<std::string> current_key() const override { return {key_}; }
133+
void Do(std::shared_ptr<Slot> slot = nullptr) override;
134+
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
135+
void Merge() override{};
136+
Cmd* Clone() override { return new XTrimCmd(*this); }
137+
138+
private:
139+
std::string key_;
140+
StreamAddTrimArgs args_;
141+
142+
void DoInitial() override;
143+
};
144+
145+
class XInfoCmd : public Cmd {
146+
public:
147+
XInfoCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){};
148+
void Do(std::shared_ptr<Slot> slot = nullptr) override;
149+
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
150+
void Merge() override{};
151+
Cmd* Clone() override { return new XInfoCmd(*this); }
152+
153+
private:
154+
std::string key_;
155+
std::string cgroupname_;
156+
std::string consumername_;
157+
std::string subcmd_;
158+
uint64_t count_{0};
159+
bool is_full_{false};
160+
161+
void DoInitial() override;
162+
void StreamInfo(std::shared_ptr<Slot>& slot);
163+
void GroupsInfo(std::shared_ptr<Slot>& slot);
164+
void ConsumersInfo(std::shared_ptr<Slot>& slot);
165+
};
166+
167+
#endif // PIKA_STREAM_H_

0 commit comments

Comments
 (0)