Skip to content

Commit

Permalink
play: add timestamp flag to filter output
Browse files Browse the repository at this point in the history
After this commit user can filter output of the `play` command by record
timestamp.

Closes tarantool/tt-ee#227
  • Loading branch information
patapenka-alexey authored and oleg-jukovec committed Nov 20, 2024
1 parent 4a19155 commit c5c65e1
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 23 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- `tt replicaset upgrade`: command to upgrade the schema on a Tarantool cluster.
* `-r (--replicaset)`: specify the replicaset name(s) to upgrade.
* `-t (--timeout)`: timeout for waiting the LSN synchronization (in seconds) (default 5).
- New flag `--timestamp` of `tt cat` command is added to specify
operations ending with the given timestamp. It value can be specified
- New flag `--timestamp` of `tt cat` and `tt play` commands is added to specify
operations ending with the given timestamp. This value can be specified
as a number or using [RFC3339/RFC3339Nano](https://go.dev/src/time/format.go) time format.

### Changed
Expand Down
27 changes: 18 additions & 9 deletions cli/checkpoint/lua/play.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
-- The --space flags passes through 'TT_CLI_PLAY_SPACES'.
-- The --from flag passes through 'TT_CLI_PLAY_FROM'.
-- The --to flag passes through 'TT_CLI_PLAY_TO'.
-- The --timestamp flag passes through 'TT_CLI_PLAY_TIMESTAMP'.
-- The --replica flags passes through 'TT_CLI_PLAY_REPLICAS'.

local log = require('log')
Expand All @@ -24,20 +25,21 @@ local function find_in_list(id, list)
end

local function filter_xlog(gen, param, state, opts, cb)
local from, to, spaces = opts.from, opts.to, opts.space
local from, to, timestamp, spaces = opts.from, opts.to, opts.timestamp, opts.space
local show_system, replicas = opts['show-system'], opts.replica

for lsn, record in gen, param, state do
local sid = record.BODY and record.BODY.space_id
local rid = record.HEADER.replica_id
if replicas and #replicas == 1 and replicas[1] == rid and lsn >= to then
local ts = record.HEADER.timestamp or 0
if replicas and #replicas == 1 and replicas[1] == rid and (lsn >= to or ts >= timestamp) then
-- Stop, as we've finished reading tuple with lsn == to
-- and the next lsn's will be bigger.
break
elseif (lsn < from) or (lsn >= to) or
(not spaces and sid and sid < 512 and not show_system) or
(spaces and (sid == nil or not find_in_list(sid, spaces))) or
(replicas and not find_in_list(rid, replicas)) then
elseif (lsn < from) or (lsn >= to) or (ts >= timestamp) or
(not spaces and sid and sid < 512 and not show_system) or
(spaces and (sid == nil or not find_in_list(sid, spaces))) or
(replicas and not find_in_list(rid, replicas)) then
-- Pass this tuple, luacheck: ignore.
else
cb(record)
Expand Down Expand Up @@ -97,7 +99,7 @@ local function main()

local files_and_uri = os.getenv('TT_CLI_PLAY_FILES_AND_URI')
if files_and_uri == nil then
log.error('Internal error: failed to get cat params from TT_CLI_PLAY_FILES_AND_URI')
log.error('Internal error: failed to get play params from TT_CLI_PLAY_FILES_AND_URI')
os.exit(1)
end
positional_arguments = json.decode(files_and_uri)
Expand All @@ -117,18 +119,25 @@ local function main()

local from = os.getenv('TT_CLI_PLAY_FROM')
if from == nil then
log.error('Internal error: failed to get cat params from TT_CLI_PLAY_FROM')
log.error('Internal error: failed to get play params from TT_CLI_PLAY_FROM')
os.exit(1)
end
keyword_arguments['from'] = tonumber(from)

local to = os.getenv('TT_CLI_PLAY_TO')
if to == nil then
log.error('Internal error: failed to get cat params from TT_CLI_PLAY_TO')
log.error('Internal error: failed to get play params from TT_CLI_PLAY_TO')
os.exit(1)
end
keyword_arguments['to'] = tonumber(to)

local timestamp = os.getenv('TT_CLI_PLAY_TIMESTAMP')
if timestamp == nil then
log.error('Internal error: failed to get play params from TT_CLI_PLAY_TIMESTAMP')
os.exit(1)
end
keyword_arguments['timestamp'] = tonumber(timestamp)

local replicas = os.getenv('TT_CLI_PLAY_REPLICAS')
if replicas ~= nil then
keyword_arguments['replica'] = {}
Expand Down
30 changes: 18 additions & 12 deletions cli/cmd/play.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
var playFlags = checkpoint.Opts{
From: 0,
To: math.MaxUint64,
Timestamp: "",
Space: nil,
Replica: nil,
ShowSystem: false,
Expand All @@ -46,12 +47,16 @@ func NewPlayCmd() *cobra.Command {
internalPlayModule, args)
util.HandleCmdErr(cmd, err)
},
Example: "tt play uri /path/to/xlog --timestamp 2024-11-13T14:02:36.818700000+00:00\n" +
" tt play uri /path/to/xlog --timestamp=1731592956.818",
}

playCmd.Flags().StringVarP(&playUsername, "username", "u", "", "username")
playCmd.Flags().StringVarP(&playPassword, "password", "p", "", "password")
playCmd.Flags().Uint64Var(&playFlags.To, "to", playFlags.To,
"Show operations ending with the given lsn")
playCmd.Flags().StringVar(&playFlags.Timestamp, "timestamp", playFlags.Timestamp,
"Show operations ending with the given timestamp")
playCmd.Flags().Uint64Var(&playFlags.From, "from", playFlags.From,
"Show operations starting from the given lsn")
playCmd.Flags().IntSliceVar(&playFlags.Space, "space", playFlags.Space,
Expand All @@ -67,16 +72,15 @@ func NewPlayCmd() *cobra.Command {
// internalPlayModule is a default play module.
func internalPlayModule(cmdCtx *cmdcontext.CmdCtx, args []string) error {
if len(args) < 2 {
return fmt.Errorf("it is required to specify an URI and at least one .xlog or .snap file")
return errors.New("it is required to specify an URI and at least one .xlog or .snap file")
}

// List of files and URI is passed to lua play script via environment variable in json format.
filesAndUriJson, err := json.Marshal(args)
if err != nil {
util.InternalError(
return util.InternalError(
"Internal error: problem with creating json params with files and uri: %s",
version.GetVersion, err,
)
version.GetVersion, err)
}

if libconnect.IsCredentialsURI(args[0]) {
Expand Down Expand Up @@ -105,11 +109,9 @@ func internalPlayModule(cmdCtx *cmdcontext.CmdCtx, args []string) error {
// List of spaces is passed to lua play script via environment variable in json format.
spacesJson, err := json.Marshal(playFlags.Space)
if err != nil {
util.InternalError(
return util.InternalError(
"Internal error: problem with creating json params with spaces: %s",
version.GetVersion,
err,
)
version.GetVersion, err)
}
if string(spacesJson) != "null" {
os.Setenv("TT_CLI_PLAY_SPACES", string(spacesJson))
Expand All @@ -118,14 +120,18 @@ func internalPlayModule(cmdCtx *cmdcontext.CmdCtx, args []string) error {
os.Setenv("TT_CLI_PLAY_FROM", strconv.FormatUint(playFlags.From, 10))
os.Setenv("TT_CLI_PLAY_TO", strconv.FormatUint(playFlags.To, 10))

timestamp, err := util.StringToTimestamp(playFlags.Timestamp)
if err != nil {
return fmt.Errorf("failed to parse a timestamp: %s", err)
}
os.Setenv("TT_CLI_PLAY_TIMESTAMP", timestamp)

// List of replicas is passed to lua play script via environment variable in json format.
replicasJson, err := json.Marshal(playFlags.Replica)
if err != nil {
util.InternalError(
return util.InternalError(
"Internal error: problem with creating json params with replicas: %s",
version.GetVersion,
err,
)
version.GetVersion, err)
}
if string(replicasJson) != "null" {
os.Setenv("TT_CLI_PLAY_REPLICAS", string(replicasJson))
Expand Down
71 changes: 71 additions & 0 deletions test/integration/play/test_play.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,77 @@ def test_play_test_remote_instance(tt_cmd, test_instance):
assert re.search(r"[3, 'Ace of Base', 1993]", output)


TEST_PLAY_TIMESTAMP_PARAMS_CCONFIG = ("input, play_result, found, not_found")


def make_test_play_timestamp_param(
input="",
play_result=0,
found={},
not_found={},
):
return pytest.param(input, play_result, found, not_found)


@pytest.mark.parametrize(TEST_PLAY_TIMESTAMP_PARAMS_CCONFIG, [
make_test_play_timestamp_param(
input="abcdef",
play_result=1,
found={"failed to parse a timestamp: parsing time \"abcdef\""},
),
make_test_play_timestamp_param(
input="2024-11-14T14:02:36.abc",
play_result=1,
found={"failed to parse a timestamp: parsing time \"2024-11-14T14:02:36.abc\""},
),
make_test_play_timestamp_param(
input="",
play_result=0,
found={"[3, 'Ace of Base', 1993]"},
),
make_test_play_timestamp_param(
input="1651130533.1534",
play_result=0,
found={"space_id: 999",
"[1, 'Roxette', 1986]",
"[2, 'Scorpions', 2015]"},
not_found={"Ace of Base"},
),
make_test_play_timestamp_param(
input="2022-04-28T07:22:13.1534+00:00",
play_result=0,
found={"space_id: 999",
"[1, 'Roxette', 1986]",
"[2, 'Scorpions', 2015]"},
not_found={"Ace of Base"},
),
make_test_play_timestamp_param(
input="2022-04-28T07:22:12+00:00",
play_result=0,
found={"space_id: 999",
"[1, 'Roxette', 1986]"},
not_found={"Scorpions",
"Ace of Base"},
),
])
def test_play_test_remote_instance_timestamp(tt_cmd, test_instance, input,
play_result, found, not_found):
# Play .xlog file to the remote instance.
cmd = [tt_cmd, "play", "127.0.0.1:" + test_instance.port, "test.xlog",
"--timestamp={0}".format(input), "--space=999"]
rc, output = run_command_and_get_output(cmd, cwd=test_instance._tmpdir)
assert rc == play_result
if play_result == 0:
# Testing played .xlog file from the remote instance.
cmd = [tt_cmd, "cat", "00000000000000000000.xlog", "--space=999"]
rc, output = run_command_and_get_output(cmd, cwd=test_instance._tmpdir)
assert rc == 0
for item in found:
assert re.search(r"{0}".format(item), output)
for item in not_found:
assert not re.search(r"{0}".format(item), output)


@pytest.mark.parametrize("opts", [
pytest.param({"flags": ["--username=test_user", "--password=4"]}),
pytest.param({"flags": ["--username=fry"]}),
Expand Down

0 comments on commit c5c65e1

Please sign in to comment.