diff --git a/CHANGELOG.md b/CHANGELOG.md index dddcb7821..741e6ec1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - `tt rs rebootstrap`: re-bootstraps an instance. - `-s (--self)` flag to execute `tt` itself and don't search for other `tt`s in bin_dir provided in config. +- `tt start` interactive mode with `-i` option. ### Fixed diff --git a/cli/cmd/start.go b/cli/cmd/start.go index 2d5ed7c2a..8c2f4a760 100644 --- a/cli/cmd/start.go +++ b/cli/cmd/start.go @@ -1,9 +1,13 @@ package cmd import ( + "context" "fmt" "os" + "os/signal" "strconv" + "sync" + "syscall" "time" "github.com/spf13/cobra" @@ -11,6 +15,7 @@ import ( "github.com/tarantool/tt/cli/cmdcontext" "github.com/tarantool/tt/cli/modules" "github.com/tarantool/tt/cli/running" + "github.com/tarantool/tt/cli/tail" "github.com/tarantool/tt/cli/util" "github.com/tarantool/tt/lib/integrity" ) @@ -23,6 +28,10 @@ var ( // integrityCheckPeriod is a flag enables periodic integrity checks. // The default period is 1 day. integrityCheckPeriod = 24 * 60 * 60 + // startInteractive is startInteractive mode flag. If set, the main process does not exit after + // watchdog children start and waits for them to complete. Also all logging is performed + // to standard output. + startInteractive bool ) // NewStartCmd creates start command. @@ -49,6 +58,7 @@ func NewStartCmd() *cobra.Command { startCmd.Flags().BoolVar(&watchdog, "watchdog", false, "") startCmd.Flags().MarkHidden("watchdog") + startCmd.Flags().BoolVarP(&startInteractive, "interactive", "i", false, "") integrity.RegisterIntegrityCheckPeriodFlag(startCmd.Flags(), &integrityCheckPeriod) @@ -76,6 +86,36 @@ func startInstancesUnderWatchdog(cmdCtx *cmdcontext.CmdCtx, instances []running. return nil } +// startInstancesInteractive starts tarantool instances and waits for them to complete. +func startInstancesInteractive(cmdCtx *cmdcontext.CmdCtx, instances []running.InstanceCtx) error { + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + wg := sync.WaitGroup{} + pickColor := tail.DefaultColorPicker() + for _, instCtx := range instances { + clr := pickColor() + prefix := running.GetAppInstanceName(instCtx) + " " + wg.Add(1) + go func(inst running.InstanceCtx) { + running.RunInstance(ctx, cmdCtx, inst, + running.NewColorizedPrefixWriter(os.Stdout, clr, prefix), + running.NewColorizedPrefixWriter(os.Stderr, clr, prefix)) + wg.Done() + }(instCtx) + } + wg.Wait() + return nil +} + +// startInstances starts tarantool instances. +func startInstances(cmdCtx *cmdcontext.CmdCtx, instances []running.InstanceCtx) error { + if startInteractive { + return startInstancesInteractive(cmdCtx, instances) + } + return startInstancesUnderWatchdog(cmdCtx, instances) +} + // internalStartModule is a default start module. func internalStartModule(cmdCtx *cmdcontext.CmdCtx, args []string) error { if !isConfigExist(cmdCtx) { @@ -97,7 +137,7 @@ func internalStartModule(cmdCtx *cmdcontext.CmdCtx, args []string) error { } if !watchdog { - if err := startInstancesUnderWatchdog(cmdCtx, runningCtx.Instances); err != nil { + if err := startInstances(cmdCtx, runningCtx.Instances); err != nil { return err } return nil diff --git a/cli/daemon/process.go b/cli/daemon/process.go index 90ef52b36..0416b12b0 100644 --- a/cli/daemon/process.go +++ b/cli/daemon/process.go @@ -99,7 +99,7 @@ func (process *Process) Start() error { return fmt.Errorf("failed to create log: %s", err) } - if err := process_utils.CreatePIDFile(process.pidFileName); err != nil { + if err := process_utils.CreatePIDFile(process.pidFileName, os.Getpid()); err != nil { return err } diff --git a/cli/process_utils/process_utils.go b/cli/process_utils/process_utils.go index 1cee6e89b..c49e274a1 100644 --- a/cli/process_utils/process_utils.go +++ b/cli/process_utils/process_utils.go @@ -106,7 +106,7 @@ func CheckPIDFile(pidFileName string) error { // CreatePIDFile checks that the instance PID file is absent or // deprecated and creates a new one. Returns an error on failure. -func CreatePIDFile(pidFileName string) error { +func CreatePIDFile(pidFileName string, pid int) error { if err := CheckPIDFile(pidFileName); err != nil { return err } @@ -135,7 +135,7 @@ func CreatePIDFile(pidFileName string) error { } defer pidFile.Close() - if _, err = pidFile.WriteString(strconv.Itoa(os.Getpid())); err != nil { + if _, err = pidFile.WriteString(strconv.Itoa(pid)); err != nil { return err } diff --git a/cli/running/base_instance.go b/cli/running/base_instance.go new file mode 100644 index 000000000..44868ad27 --- /dev/null +++ b/cli/running/base_instance.go @@ -0,0 +1,165 @@ +package running + +import ( + "fmt" + "io" + "os" + "strings" + "syscall" + "time" + + "github.com/apex/log" + "github.com/tarantool/tt/cli/ttlog" + "github.com/tarantool/tt/lib/integrity" +) + +// baseInstance represents a tarantool instance. +type baseInstance struct { + // processController is a child process controller. + *processController + // logger represents an active logging object. + logger ttlog.Logger + // tarantoolPath describes the path to the tarantool binary + // that will be used to launch the Instance. + tarantoolPath string + // appPath describes the path to the "init" file of an application. + appPath string + // appDir is an application directory. + appDir string + // appName describes the application name (the name of the directory + // where the application files are present). + appName string + // instName describes the instance name. + instName string + // walDir is a directory where write-ahead log (.xlog) files are stored. + walDir string + // memtxDir is a directory where memtx stores snapshot (.snap) files. + memtxDir string `mapstructure:"memtx_dir" yaml:"memtx_dir"` + // vinylDir is a directory where vinyl files or subdirectories will be stored. + vinylDir string `mapstructure:"vinyl_dir" yaml:"vinyl_dir"` + // consoleSocket is a Unix domain socket to be used as "admin port". + consoleSocket string + // binaryPort is a Unix socket to be used as "binary port". + binaryPort string + // logDir is log files location. + logDir string + // IntegrityCtx contains information necessary to perform integrity checks. + integrityCtx integrity.IntegrityCtx + // integrityChecks tells whether integrity checks are turned on. + integrityChecks bool + // stdOut is a standard output writer. + stdOut io.Writer + // stdErr is a standard error writer. + stdErr io.Writer +} + +func newBaseInstance(tarantoolPath string, instanceCtx InstanceCtx, + opts ...InstanceOption) baseInstance { + baseInst := baseInstance{ + tarantoolPath: tarantoolPath, + appPath: instanceCtx.InstanceScript, + appName: instanceCtx.AppName, + appDir: instanceCtx.AppDir, + instName: instanceCtx.InstName, + consoleSocket: instanceCtx.ConsoleSocket, + walDir: instanceCtx.WalDir, + vinylDir: instanceCtx.VinylDir, + memtxDir: instanceCtx.MemtxDir, + logDir: instanceCtx.LogDir, + binaryPort: instanceCtx.BinaryPort, + stdOut: os.Stdout, + stdErr: os.Stderr, + } + for _, opt := range opts { + opt(&baseInst) + } + return baseInst +} + +// InstanceOption is a functional option to configure tarantool instance. +type InstanceOption func(inst *baseInstance) error + +// IntegrityOpt sets integrity context. +func IntegrityOpt(integrityCtx integrity.IntegrityCtx) InstanceOption { + return func(inst *baseInstance) error { + inst.integrityChecks = true + inst.integrityCtx = integrityCtx + return nil + } +} + +// StdOutOpt sets stdout writer for the child process. +func StdOutOpt(writer io.Writer) InstanceOption { + return func(inst *baseInstance) error { + inst.stdOut = writer + return nil + } +} + +// StdErrOpt sets stderr writer for the child process. +func StdErrOpt(writer io.Writer) InstanceOption { + return func(inst *baseInstance) error { + inst.stdErr = writer + return nil + } +} + +// StdLoggerOpt sets logger for the instance and standard out FDs to logger writer. +func StdLoggerOpt(logger ttlog.Logger) InstanceOption { + return func(inst *baseInstance) error { + inst.logger = logger + inst.stdOut = logger.Writer() + inst.stdErr = logger.Writer() + return nil + } +} + +// Wait waits for the child process to complete. +func (inst *baseInstance) Wait() error { + if inst.processController == nil { + return fmt.Errorf("instance is not started") + } + return inst.processController.Wait() +} + +// SendSignal sends a signal to tarantool instance. +func (inst *baseInstance) SendSignal(sig os.Signal) error { + if inst.processController == nil { + return fmt.Errorf("instance is not started") + } + return inst.processController.SendSignal(sig) +} + +// IsAlive verifies that the instance is alive by sending a "0" signal. +func (inst *baseInstance) IsAlive() bool { + if inst.processController == nil { + return false + } + return inst.processController.IsAlive() +} + +// StopWithSignal terminates the process with a specific signal. +func (inst *baseInstance) StopWithSignal(waitTimeout time.Duration, usedSignal os.Signal) error { + if inst.processController == nil { + return nil + } + return inst.processController.StopWithSignal(waitTimeout, usedSignal) +} + +// Run runs tarantool instance. +func (inst *baseInstance) Run(opts RunOpts) error { + f, err := inst.integrityCtx.Repository.Read(inst.tarantoolPath) + if err != nil { + return err + } + f.Close() + newInstanceEnv := os.Environ() + args := []string{inst.tarantoolPath} + args = append(args, opts.RunArgs...) + log.Debugf("Running Tarantool with args: %s", strings.Join(args[1:], " ")) + execErr := syscall.Exec(inst.tarantoolPath, args, newInstanceEnv) + if execErr != nil { + return execErr + } + return nil +} diff --git a/cli/running/cluster_instance.go b/cli/running/cluster_instance.go index 4cac6af8d..677aa48f4 100644 --- a/cli/running/cluster_instance.go +++ b/cli/running/cluster_instance.go @@ -1,66 +1,29 @@ package running import ( + "context" "fmt" "os" "os/exec" "path/filepath" - "strings" - "syscall" "time" - "github.com/apex/log" "github.com/tarantool/tt/cli/cmdcontext" - "github.com/tarantool/tt/cli/ttlog" "github.com/tarantool/tt/cli/util" "github.com/tarantool/tt/lib/integrity" ) // clusterInstance describes tarantool 3 instance running using cluster config. type clusterInstance struct { - // processController is a child process controller. - processController *processController - // logger represents an active logging object. - logger ttlog.Logger - // tarantoolPath describes the path to the tarantool binary - // that will be used to launch the Instance. - tarantoolPath string - // appPath describes the path to the "init" file of an application. - appPath string - // appName describes the application name (the name of the directory - // where the application files are present). - appName string - // instName describes the instance name. - instName string - // walDir is a directory where write-ahead log (.xlog) files are stored. - walDir string - // memtxDir is a directory where memtx stores snapshot (.snap) files. - memtxDir string - // vinylDir is a directory where vinyl files or subdirectories will be stored. - vinylDir string - // consoleSocket is a Unix domain socket to be used as "admin port". - consoleSocket string - // binaryPort is a Unix socket to be used as "binary port" - binaryPort string - // appDir is an application directory. - appDir string - // runDir is a directory that stores various instance runtime artifacts like - // console socket, PID file, etc. - runDir string + baseInstance // clusterConfigPath is a path of the cluster config. clusterConfigPath string - // logDir is log files location. - logDir string - // IntegrityCtx contains information necessary to perform integrity checks. - integrityCtx integrity.IntegrityCtx - // integrityChecks tells whether integrity checks are turned on. - integrityChecks bool + runDir string } // newClusterInstance creates a clusterInstance. func newClusterInstance(tarantoolCli cmdcontext.TarantoolCli, instanceCtx InstanceCtx, - logger ttlog.Logger, integrityCtx integrity.IntegrityCtx, - integrityChecks bool) (*clusterInstance, error) { + opts ...InstanceOption) (*clusterInstance, error) { // Check if tarantool binary exists. if _, err := exec.LookPath(tarantoolCli.Executable); err != nil { return nil, err @@ -75,22 +38,9 @@ func newClusterInstance(tarantoolCli cmdcontext.TarantoolCli, instanceCtx Instan } return &clusterInstance{ - tarantoolPath: tarantoolCli.Executable, - appPath: instanceCtx.InstanceScript, - appName: instanceCtx.AppName, - instName: instanceCtx.InstName, - consoleSocket: instanceCtx.ConsoleSocket, - binaryPort: instanceCtx.BinaryPort, - logger: logger, - walDir: instanceCtx.WalDir, - vinylDir: instanceCtx.VinylDir, - memtxDir: instanceCtx.MemtxDir, - appDir: instanceCtx.AppDir, + baseInstance: newBaseInstance(tarantoolCli.Executable, instanceCtx, opts...), runDir: instanceCtx.RunDir, clusterConfigPath: instanceCtx.ClusterConfigPath, - logDir: instanceCtx.LogDir, - integrityCtx: integrityCtx, - integrityChecks: integrityChecks, }, nil } @@ -103,16 +53,20 @@ func appendEnvIfNotEmpty(env []string, envVarName string, value string) []string } // Start starts tarantool instance with cluster config. -func (inst *clusterInstance) Start() error { +func (inst *clusterInstance) Start(ctx context.Context) error { cmdArgs := []string{"-n", inst.instName, "-c", inst.clusterConfigPath} if inst.integrityChecks { cmdArgs = append(cmdArgs, "--integrity-check", filepath.Join(inst.appDir, integrity.HashesFileName)) } - cmd := exec.Command(inst.tarantoolPath, cmdArgs...) - cmd.Stdout = inst.logger.Writer() - cmd.Stderr = inst.logger.Writer() + cmd := exec.CommandContext(ctx, inst.tarantoolPath, cmdArgs...) + cmd.Cancel = func() error { + return cmd.Process.Signal(os.Interrupt) + } + cmd.WaitDelay = 30 * time.Second + cmd.Stdout = inst.stdOut + cmd.Stderr = inst.stdErr cmd.Env = os.Environ() cmd.Env = appendEnvIfNotEmpty(cmd.Env, "TT_VINYL_DIR_DEFAULT", inst.vinylDir) @@ -143,63 +97,3 @@ func (inst *clusterInstance) Start() error { return nil } - -// Run runs tarantool instance. -func (inst *clusterInstance) Run(opts RunOpts) error { - newInstanceEnv := os.Environ() - args := []string{inst.tarantoolPath} - - f, err := inst.integrityCtx.Repository.Read(inst.tarantoolPath) - if err != nil { - return err - } - f.Close() - - args = append(args, opts.RunArgs...) - log.Debugf("Running Tarantool with args: %s", strings.Join(args[1:], " ")) - execErr := syscall.Exec(inst.tarantoolPath, args, newInstanceEnv) - if execErr != nil { - return execErr - } - return nil -} - -// Wait waits for the process to complete. -func (inst *clusterInstance) Wait() error { - if inst.processController == nil { - return fmt.Errorf("instance is not started") - } - return inst.processController.Wait() -} - -// SendSignal sends a signal to the process. -func (inst *clusterInstance) SendSignal(sig os.Signal) error { - if inst.processController == nil { - return fmt.Errorf("instance is not started") - } - return inst.processController.SendSignal(sig) -} - -// IsAlive verifies that the instance is alive. -func (inst *clusterInstance) IsAlive() bool { - if inst.processController == nil { - return false - } - return inst.processController.IsAlive() -} - -// Stop terminates the process. -// -// timeout - the time that was provided to the process -// to terminate correctly before killing it. -func (inst *clusterInstance) Stop(waitTimeout time.Duration) error { - return inst.StopWithSignal(waitTimeout, os.Interrupt) -} - -// StopWithSignal terminates the process with specific signal. -func (inst *clusterInstance) StopWithSignal(waitTimeout time.Duration, usedSignal os.Signal) error { - if inst.processController == nil { - return nil - } - return inst.processController.StopWithSignal(waitTimeout, usedSignal) -} diff --git a/cli/running/cluster_instance_test.go b/cli/running/cluster_instance_test.go index f9f195003..ed4f0019f 100644 --- a/cli/running/cluster_instance_test.go +++ b/cli/running/cluster_instance_test.go @@ -3,6 +3,7 @@ package running import ( "bufio" "bytes" + "context" "fmt" "io" "os" @@ -16,7 +17,6 @@ import ( "github.com/tarantool/tt/cli/cmdcontext" "github.com/tarantool/tt/cli/ttlog" "github.com/tarantool/tt/cli/util" - "github.com/tarantool/tt/lib/integrity" ) var tntCli cmdcontext.TarantoolCli @@ -77,13 +77,11 @@ func TestClusterInstance_Start(t *testing.T) { InstName: "instance-001", AppDir: tmpDir, BinaryPort: "localhost:3013", - }, ttlog.NewCustomLogger(&outputBuf, "test", 0), integrity.IntegrityCtx{ - Repository: &mockRepository{}, - }, false) + }, StdLoggerOpt(ttlog.NewCustomLogger(&outputBuf, "test", 0))) require.NoError(t, err) require.NotNil(t, clusterInstance) - require.NoError(t, clusterInstance.Start()) + require.NoError(t, clusterInstance.Start(context.Background())) t.Cleanup(func() { require.NoError(t, clusterInstance.Stop(stopTimeout)) }) @@ -121,15 +119,13 @@ func TestClusterInstance_StartChangeDefaults(t *testing.T) { ConsoleSocket: "run/tt.control", AppDir: tmpAppDir, BinaryPort: "localhost:3013", - }, ttlog.NewCustomLogger(&outputBuf, "test", 0), integrity.IntegrityCtx{ - Repository: &mockRepository{}, - }, false) + }, StdLoggerOpt(ttlog.NewCustomLogger(&outputBuf, "test", 0))) require.NoError(t, err) require.NotNil(t, clusterInstance) require.NoError(t, os.Mkdir(filepath.Join(tmpAppDir, "run"), 0755)) - require.NoError(t, clusterInstance.Start()) + require.NoError(t, clusterInstance.Start(context.Background())) t.Cleanup(func() { require.NoError(t, clusterInstance.Stop(stopTimeout)) }) @@ -171,15 +167,13 @@ func TestClusterInstance_StartChangeSomeDefaults(t *testing.T) { AppDir: tmpAppDir, LogDir: tmpAppDir, BinaryPort: "localhost:3013", - }, ttlog.NewCustomLogger(&outputBuf, "test", 0), integrity.IntegrityCtx{ - Repository: &mockRepository{}, - }, false) + }, StdLoggerOpt(ttlog.NewCustomLogger(&outputBuf, "test", 0))) require.NoError(t, err) require.NotNil(t, clusterInstance) require.NoError(t, os.Mkdir(filepath.Join(tmpAppDir, "run"), 0755)) - require.NoError(t, clusterInstance.Start()) + require.NoError(t, clusterInstance.Start(context.Background())) t.Cleanup(func() { require.NoError(t, clusterInstance.Stop(stopTimeout)) }) @@ -200,3 +194,37 @@ func TestClusterInstance_StartChangeSomeDefaults(t *testing.T) { assert.DirExists(t, filepath.Join(tmpAppDir, "vinyl_dir")) assert.NoDirExists(t, filepath.Join(tmpAppDir, "instance-002")) } + +func TestClusterInstance_StopByContext(t *testing.T) { + SkipForTntMajorBefore3(t) + + configPath, err := filepath.Abs(filepath.Join("testdata", "instances_enabled", + "cluster_app", "config.yml")) + require.NoError(t, err) + + tmpDir := t.TempDir() + cancelChdir, err := util.Chdir(tmpDir) + require.NoError(t, err) + defer cancelChdir() + + outputBuf := bytes.Buffer{} + outputBuf.Grow(1024) + clusterInstance, err := newClusterInstance(tntCli, InstanceCtx{ + ClusterConfigPath: configPath, + InstName: "instance-001", + AppDir: tmpDir, + BinaryPort: "localhost:3013", + }, StdLoggerOpt(ttlog.NewCustomLogger(&outputBuf, "test", 0))) + + require.NoError(t, err) + require.NotNil(t, clusterInstance) + ctx, cancel := context.WithCancel(context.Background()) + require.NoError(t, clusterInstance.Start(ctx)) + t.Cleanup(func() { + require.NoError(t, clusterInstance.Stop(stopTimeout)) + }) + require.NoError(t, waitForMsgInBuffer(&outputBuf, "entering the event loop", 10*time.Second)) + cancel() + assert.Error(t, clusterInstance.Wait(), context.Canceled) + assert.True(t, clusterInstance.ProcessState().Success()) +} diff --git a/cli/running/instance.go b/cli/running/instance.go index a51d56d84..f1c99805c 100644 --- a/cli/running/instance.go +++ b/cli/running/instance.go @@ -1,6 +1,7 @@ package running import ( + "context" "os" "time" ) @@ -8,7 +9,7 @@ import ( // Instance describes a running tarantool instance. type Instance interface { // Start starts the Instance with the specified parameters. - Start() error + Start(context.Context) error // Run runs tarantool interpreter. Run(opts RunOpts) error @@ -30,4 +31,10 @@ type Instance interface { // StopWithSignal terminates the process with specific signal. StopWithSignal(waitTimeout time.Duration, usedSignal os.Signal) error + + // GetPid returns instance process PID. + GetPid() int + + // ProcessState returns completed process state. + ProcessState() *os.ProcessState } diff --git a/cli/running/process_controller.go b/cli/running/process_controller.go index 016e1f41b..e0af36460 100644 --- a/cli/running/process_controller.go +++ b/cli/running/process_controller.go @@ -11,7 +11,7 @@ import ( // newProcessController create new process controller. func newProcessController(cmd *exec.Cmd) (*processController, error) { - dpc := processController{cmd: cmd} + dpc := processController{Cmd: cmd} if err := dpc.start(); err != nil { return nil, err } @@ -21,7 +21,7 @@ func newProcessController(cmd *exec.Cmd) (*processController, error) { // processController represents a command being run. type processController struct { // Cmd represents an external command to run. - cmd *exec.Cmd + *exec.Cmd // waitMutex is used to prevent several invokes of the "Wait" // for the same process. // https://github.com/golang/go/issues/28461 @@ -33,7 +33,7 @@ type processController struct { // start starts the process. func (pc *processController) start() error { // Start an Instance. - if err := pc.cmd.Start(); err != nil { + if err := pc.Cmd.Start(); err != nil { return err } pc.done = false @@ -50,7 +50,7 @@ func (pc *processController) Wait() error { // https://github.com/golang/go/issues/28461 pc.waitMutex.Lock() defer pc.waitMutex.Unlock() - err := pc.cmd.Wait() + err := pc.Cmd.Wait() if err == nil { pc.done = true } @@ -59,10 +59,10 @@ func (pc *processController) Wait() error { // SendSignal sends a signal to tarantool instance. func (pc *processController) SendSignal(sig os.Signal) error { - if pc.cmd == nil || pc.cmd.Process == nil { + if pc.Cmd == nil || pc.Cmd.Process == nil { return fmt.Errorf("the instance hasn't started yet") } - return pc.cmd.Process.Signal(sig) + return pc.Cmd.Process.Signal(sig) } // IsAlive verifies that the Instance is alive by sending a "0" signal. @@ -103,7 +103,7 @@ func (pc *processController) StopWithSignal(waitTimeout time.Duration, stopSigna select { case <-time.After(waitTimeout): // Send "SIGKILL" signal - if err := pc.cmd.Process.Kill(); err != nil { + if err := pc.Cmd.Process.Kill(); err != nil { return fmt.Errorf("failed to send SIGKILL to instance: %s", err) } else { // Wait for the process to terminate. @@ -114,3 +114,13 @@ func (pc *processController) StopWithSignal(waitTimeout time.Duration, stopSigna return err } } + +// GetPid returns process PID. +func (pc *processController) GetPid() int { + return pc.Process.Pid +} + +// ProcessState returns completed process state. +func (pc *processController) ProcessState() *os.ProcessState { + return pc.Cmd.ProcessState +} diff --git a/cli/running/running.go b/cli/running/running.go index 7762283a3..5a25f68be 100644 --- a/cli/running/running.go +++ b/cli/running/running.go @@ -2,6 +2,7 @@ package running import ( "bytes" + "context" "errors" "fmt" "io" @@ -158,24 +159,33 @@ func (provider *providerImpl) updateCtx() error { return nil } +// createInstance creates an Instance. +func createInstance(cmdCtx cmdcontext.CmdCtx, instanceCtx InstanceCtx, + opts ...InstanceOption) (inst Instance, err error) { + if instanceCtx.ClusterConfigPath != "" { + return newClusterInstance(cmdCtx.Cli.TarantoolCli, instanceCtx, opts...) + } + return newScriptInstance(cmdCtx.Cli.TarantoolCli.Executable, instanceCtx, opts...) +} + // createInstance reads config and creates an Instance. func (provider *providerImpl) CreateInstance(logger ttlog.Logger) (inst Instance, err error) { if err = provider.updateCtx(); err != nil { return } - integrityChecks := provider.cmdCtx.Cli.IntegrityCheck != "" + opts := []InstanceOption{StdLoggerOpt(logger)} + if provider.cmdCtx.Cli.IntegrityCheck != "" { + opts = append(opts, IntegrityOpt(provider.cmdCtx.Integrity)) + } if provider.instanceCtx.ClusterConfigPath != "" { logger.Printf("(INFO): using %q cluster config for instance %q", provider.instanceCtx.ClusterConfigPath, provider.instanceCtx.InstName, ) - return newClusterInstance(provider.cmdCtx.Cli.TarantoolCli, *provider.instanceCtx, - logger, provider.cmdCtx.Integrity, integrityChecks) } - return newScriptInstance(provider.cmdCtx.Cli.TarantoolCli.Executable, *provider.instanceCtx, - logger, provider.cmdCtx.Integrity, integrityChecks) + return createInstance(*provider.cmdCtx, *provider.instanceCtx, opts...) } // isLoggerChanged checks if any of the logging parameters has been changed. @@ -703,6 +713,45 @@ func FillCtx(cliOpts *config.CliOpts, cmdCtx *cmdcontext.CmdCtx, return nil } +// RunInstance runs tarantool instance and waits for completion. +func RunInstance(ctx context.Context, cmdCtx *cmdcontext.CmdCtx, inst InstanceCtx, + stdOut, stdErr io.Writer) error { + for _, dataDir := range [...]string{inst.WalDir, inst.VinylDir, inst.MemtxDir, inst.RunDir} { + if err := util.CreateDirectory(dataDir, defaultDirPerms); err != nil { + return err + } + } + + logger := ttlog.NewCustomLogger(stdOut, "", 0) + opts := []InstanceOption{ + StdLoggerOpt(logger), + StdOutOpt(stdOut), + StdErrOpt(stdErr), + } + if cmdCtx.Cli.IntegrityCheck != "" { + opts = append(opts, IntegrityOpt(cmdCtx.Integrity)) + } + instance, err := createInstance(*cmdCtx, inst, opts...) + if err != nil { + return fmt.Errorf("failed to create the instance %q: %s", inst.InstName, err) + } + logger.Println("(INFO) Start") + if err = instance.Start(ctx); err != nil { + return fmt.Errorf("failed to start the instance %q: %s", inst.InstName, err) + } + + defer func() { + cleanup(&inst) + }() + + if err := process_utils.CreatePIDFile(inst.PIDFile, instance.GetPid()); err != nil { + instance.Stop(10 * time.Second) + return fmt.Errorf("cannot create the pid file %q: %s", inst.PIDFile, err) + } + + return instance.Wait() +} + // Start an Instance. func Start(cmdCtx *cmdcontext.CmdCtx, inst *InstanceCtx, integrityCheckPeriod time.Duration) error { if err := createInstanceDataDirectories(*inst); err != nil { @@ -716,7 +765,7 @@ func Start(cmdCtx *cmdcontext.CmdCtx, inst *InstanceCtx, integrityCheckPeriod ti provider := providerImpl{cmdCtx: cmdCtx, instanceCtx: inst} preStartAction := func() error { - if err := process_utils.CreatePIDFile(inst.PIDFile); err != nil { + if err := process_utils.CreatePIDFile(inst.PIDFile, os.Getpid()); err != nil { return err } return nil @@ -793,8 +842,9 @@ func Quit(run InstanceCtx) error { } func Run(runInfo *RunInfo) error { - inst := scriptInstance{tarantoolPath: runInfo.CmdCtx.Cli.TarantoolCli.Executable, - integrityCtx: runInfo.CmdCtx.Integrity} + inst := scriptInstance{baseInstance: baseInstance{ + tarantoolPath: runInfo.CmdCtx.Cli.TarantoolCli.Executable, + integrityCtx: runInfo.CmdCtx.Integrity}} err := inst.Run(runInfo.RunOpts) return err } diff --git a/cli/running/script_instance.go b/cli/running/script_instance.go index 33e6a380c..31bcda6fe 100644 --- a/cli/running/script_instance.go +++ b/cli/running/script_instance.go @@ -1,18 +1,15 @@ package running import ( + "context" _ "embed" "fmt" "os" "os/exec" "path/filepath" "runtime" - "strings" - "syscall" "time" - "github.com/apex/log" - "github.com/tarantool/tt/cli/ttlog" "github.com/tarantool/tt/cli/util" "github.com/tarantool/tt/lib/integrity" ) @@ -24,46 +21,15 @@ const ( // scriptInstance represents a tarantool invoked with an instance script provided. type scriptInstance struct { - // processController is a child process controller. - processController *processController - // logger represents an active logging object. - logger ttlog.Logger - // tarantoolPath describes the path to the tarantool binary - // that will be used to launch the Instance. - tarantoolPath string - // appPath describes the path to the "init" file of an application. - appPath string - // appDir is an application directory. - appDir string - // appName describes the application name (the name of the directory - // where the application files are present). - appName string - // instName describes the instance name. - instName string - // walDir is a directory where write-ahead log (.xlog) files are stored. - walDir string - // memtxDir is a directory where memtx stores snapshot (.snap) files. - memtxDir string `mapstructure:"memtx_dir" yaml:"memtx_dir"` - // vinylDir is a directory where vinyl files or subdirectories will be stored. - vinylDir string `mapstructure:"vinyl_dir" yaml:"vinyl_dir"` - // consoleSocket is a Unix domain socket to be used as "admin port". - consoleSocket string - // binaryPort is a Unix socket to be used as "binary port" - binaryPort string - // logDir is log files location. - logDir string - // IntegrityCtx contains information necessary to perform integrity checks. - integrityCtx integrity.IntegrityCtx - // integrityChecks tells whether integrity checks are turned on. - integrityChecks bool + baseInstance } //go:embed lua/launcher.lua var instanceLauncher []byte // newScriptInstance creates an Instance. -func newScriptInstance(tarantoolPath string, instanceCtx InstanceCtx, logger ttlog.Logger, - integrityCtx integrity.IntegrityCtx, integrityChecks bool) (*scriptInstance, error) { +func newScriptInstance(tarantoolPath string, instanceCtx InstanceCtx, opts ...InstanceOption) ( + *scriptInstance, error) { // Check if tarantool binary exists. if _, err := exec.LookPath(tarantoolPath); err != nil { return nil, err @@ -75,20 +41,7 @@ func newScriptInstance(tarantoolPath string, instanceCtx InstanceCtx, logger ttl } return &scriptInstance{ - tarantoolPath: tarantoolPath, - appPath: instanceCtx.InstanceScript, - appName: instanceCtx.AppName, - appDir: instanceCtx.AppDir, - instName: instanceCtx.InstName, - consoleSocket: instanceCtx.ConsoleSocket, - logger: logger, - walDir: instanceCtx.WalDir, - vinylDir: instanceCtx.VinylDir, - memtxDir: instanceCtx.MemtxDir, - logDir: instanceCtx.LogDir, - integrityCtx: integrityCtx, - integrityChecks: integrityChecks, - binaryPort: instanceCtx.BinaryPort, + baseInstance: newBaseInstance(tarantoolPath, instanceCtx, opts...), }, nil } @@ -137,12 +90,14 @@ func (inst *scriptInstance) setTarantoolLog(cmd *exec.Cmd) { } // Start starts the Instance with the specified parameters. -func (inst *scriptInstance) Start() error { - f, err := inst.integrityCtx.Repository.Read(inst.tarantoolPath) - if err != nil { - return err +func (inst *scriptInstance) Start(ctx context.Context) error { + if inst.integrityChecks { + f, err := inst.integrityCtx.Repository.Read(inst.tarantoolPath) + if err != nil { + return err + } + f.Close() } - f.Close() cmdArgs := []string{} @@ -152,9 +107,13 @@ func (inst *scriptInstance) Start() error { cmdArgs = append(cmdArgs, "-") - cmd := exec.Command(inst.tarantoolPath, cmdArgs...) - cmd.Stdout = inst.logger.Writer() - cmd.Stderr = inst.logger.Writer() + cmd := exec.CommandContext(ctx, inst.tarantoolPath, cmdArgs...) + cmd.Cancel = func() error { + return cmd.Process.Signal(os.Interrupt) + } + cmd.WaitDelay = 30 * time.Second + cmd.Stdout = inst.stdOut + cmd.Stderr = inst.stdErr StdinPipe, err := cmd.StdinPipe() if err != nil { return err @@ -218,64 +177,3 @@ func (inst *scriptInstance) Start() error { return nil } - -// Run runs tarantool instance. -func (inst *scriptInstance) Run(opts RunOpts) error { - f, err := inst.integrityCtx.Repository.Read(inst.tarantoolPath) - if err != nil { - return err - } - f.Close() - newInstanceEnv := os.Environ() - args := []string{inst.tarantoolPath} - args = append(args, opts.RunArgs...) - log.Debugf("Running Tarantool with args: %s", strings.Join(args[1:], " ")) - execErr := syscall.Exec(inst.tarantoolPath, args, newInstanceEnv) - if execErr != nil { - return execErr - } - return nil -} - -// Wait waits for the process completion. -func (inst *scriptInstance) Wait() error { - if inst.processController == nil { - return fmt.Errorf("instance is not started") - } - return inst.processController.Wait() -} - -// SendSignal sends a signal to tarantool instance. -func (inst *scriptInstance) SendSignal(sig os.Signal) error { - if inst.processController == nil { - return fmt.Errorf("instance is not started") - } - return inst.processController.SendSignal(sig) -} - -// IsAlive verifies that the instance is alive by sending a "0" signal. -func (inst *scriptInstance) IsAlive() bool { - if inst.processController == nil { - return false - } - return inst.processController.IsAlive() -} - -// Stop terminates the process. -// -// timeout - the time that was provided to the process -// to terminate correctly before the "SIGKILL" signal is used. -func (inst *scriptInstance) Stop(waitTimeout time.Duration) error { - if inst.processController == nil { - return nil - } - return inst.processController.Stop(waitTimeout) -} - -// StopWithSignal terminates the process with specific signal. -func (inst *scriptInstance) StopWithSignal(waitTimeout time.Duration, usedSignal os.Signal) error { - if inst.processController == nil { - return nil - } - return inst.processController.StopWithSignal(waitTimeout, usedSignal) -} diff --git a/cli/running/script_instance_test.go b/cli/running/script_instance_test.go index 12882b5b5..b88acf035 100644 --- a/cli/running/script_instance_test.go +++ b/cli/running/script_instance_test.go @@ -2,6 +2,7 @@ package running import ( "bytes" + "context" "io" "net" "os" @@ -14,7 +15,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tarantool/tt/cli/ttlog" - "github.com/tarantool/tt/lib/integrity" ) const ( @@ -22,8 +22,8 @@ const ( ) // startTestInstance starts instance for the test. -func startTestInstance(t *testing.T, app string, consoleSock string, binaryPort string, - logger ttlog.Logger) *scriptInstance { +func startTestInstance(t *testing.T, ctx context.Context, app string, consoleSock string, + binaryPort string, logger ttlog.Logger) *scriptInstance { assert := assert.New(t) // Need absolute path to the script, because working dir is changed on start. @@ -45,16 +45,13 @@ func startTestInstance(t *testing.T, app string, consoleSock string, binaryPort VinylDir: instTestDataDir, MemtxDir: instTestDataDir, BinaryPort: binaryPort, - }, - logger, integrity.IntegrityCtx{ - Repository: &mockRepository{}, - }, false) + }, StdLoggerOpt(logger)) assert.Nilf(err, `Can't create an instance. Error: "%v".`, err) require.NoErrorf(t, err, `Can't get the path to the executable. Error: "%v".`, err) os.Setenv("started_flag_file", filepath.Join(binDir, app)) defer os.Remove(os.Getenv("started_flag_file")) - err = inst.Start() + err = inst.Start(ctx) assert.Nilf(err, `Can't start the instance. Error: "%v".`, err) require.NotZero(t, waitForFile(os.Getenv("started_flag_file")), "Instance is not started") @@ -85,7 +82,8 @@ func TestInstanceBase(t *testing.T) { binaryPort := filepath.Join(filepath.Dir(binPath), "testbin.sock") logger := ttlog.NewCustomLogger(io.Discard, "", 0) - inst := startTestInstance(t, "dumb_test_app", consoleSock, binaryPort, logger) + inst := startTestInstance(t, context.Background(), "dumb_test_app", consoleSock, + binaryPort, logger) t.Cleanup(func() { cleanupTestInstance(t, inst) }) conn, err := net.Dial("unix", consoleSock) @@ -101,7 +99,8 @@ func TestInstanceLogger(t *testing.T) { defer reader.Close() logger := ttlog.NewCustomLogger(writer, "", 0) consoleSock := "" - inst := startTestInstance(t, "log_check_test_app", consoleSock, "", logger) + inst := startTestInstance(t, context.Background(), "log_check_test_app", consoleSock, "", + logger) t.Cleanup(func() { cleanupTestInstance(t, inst) }) msg := "Check Log.\n" @@ -214,7 +213,6 @@ func TestInstanceLogs(t *testing.T) { tarantoolBin, err := exec.LookPath("tarantool") require.NoError(t, err) - logger := ttlog.NewCustomLogger(os.Stdout, "", 0) instTestDataDir := t.TempDir() binDir := filepath.Dir(binPath) @@ -227,10 +225,7 @@ func TestInstanceLogs(t *testing.T) { MemtxDir: instTestDataDir, LogDir: instTestDataDir, BinaryPort: binaryPort, - }, - logger, integrity.IntegrityCtx{ - Repository: &mockRepository{}, - }, false) + }) require.NoError(t, err) t.Cleanup(func() { cleanupTestInstance(t, inst) }) @@ -238,7 +233,7 @@ func TestInstanceLogs(t *testing.T) { require.NoErrorf(t, err, `Can't get the path to the executable. Error: "%v".`, err) os.Setenv("started_flag_file", filepath.Join(binDir, app)) defer os.Remove(os.Getenv("started_flag_file")) - err = inst.Start() + err = inst.Start(context.Background()) require.NoError(t, err) require.NotZero(t, waitForFile(os.Getenv("started_flag_file")), "Instance is not started") @@ -248,3 +243,19 @@ func TestInstanceLogs(t *testing.T) { assert.FileExists(t, filepath.Join(filepath.Dir(binPath), "test.sock")) assert.FileExists(t, filepath.Join(filepath.Dir(binPath), "testbin.sock")) } + +func TestInstanceStopByContext(t *testing.T) { + tmpdir := t.TempDir() + + consoleSock := filepath.Join(tmpdir, "test.sock") + binaryPort := filepath.Join(tmpdir, "testbin.sock") + + logger := ttlog.NewCustomLogger(io.Discard, "", 0) + ctx, cancel := context.WithCancel(context.Background()) + inst := startTestInstance(t, ctx, "dumb_test_app", consoleSock, binaryPort, logger) + t.Cleanup(func() { cleanupTestInstance(t, inst) }) + + cancel() + assert.Error(t, inst.Wait(), context.Canceled) + assert.True(t, inst.ProcessState().Success()) +} diff --git a/cli/running/watchdog.go b/cli/running/watchdog.go index 0ce1d5c57..3311edd4d 100644 --- a/cli/running/watchdog.go +++ b/cli/running/watchdog.go @@ -112,7 +112,7 @@ func (wd *Watchdog) Start() error { return nil } // Start the Instance. - if err := wd.instance.Start(); err != nil { + if err := wd.instance.Start(context.Background()); err != nil { wd.logger.Printf(`(ERROR): instance start failed: %v.`, err) wd.stopMutex.Unlock() break diff --git a/cli/running/watchdog_test.go b/cli/running/watchdog_test.go index e846bc713..598066625 100644 --- a/cli/running/watchdog_test.go +++ b/cli/running/watchdog_test.go @@ -43,9 +43,7 @@ func (provider *providerTestImpl) CreateInstance(logger ttlog.Logger) (Instance, InstanceScript: provider.appPath, AppDir: provider.t.TempDir(), }, - logger, integrity.IntegrityCtx{ - Repository: &mockRepository{}, - }, false) + StdLoggerOpt(logger)) } // UpdateLogger updates the logger settings or creates a new logger, if passed nil. diff --git a/cli/tail/color.go b/cli/tail/color.go index 6a0ada096..424cfdcab 100644 --- a/cli/tail/color.go +++ b/cli/tail/color.go @@ -8,11 +8,12 @@ type ColorPicker func() color.Color // DefaultColorPicker create a color picker to get a color from a default colors set. func DefaultColorPicker() ColorPicker { var colorTable = []color.Color{ + *color.New(color.FgHiBlue), + *color.New(color.FgHiCyan), + *color.New(color.FgHiMagenta), + *color.New(color.FgBlue), *color.New(color.FgCyan), - *color.New(color.FgGreen), *color.New(color.FgMagenta), - *color.New(color.FgYellow), - *color.New(color.FgBlue), } i := 0 diff --git a/test/integration/log/test_log.py b/test/integration/log/test_log.py index 59c5a7b38..06a3c87e8 100644 --- a/test/integration/log/test_log.py +++ b/test/integration/log/test_log.py @@ -4,7 +4,7 @@ import pytest -from utils import config_name +from utils import config_name, wait_for_lines_in_output @pytest.fixture(scope="function") @@ -189,31 +189,6 @@ def test_log_no_inst(tt_cmd, mock_env_dir): assert 'app0:inst4: instance(s) not found' in output -def wait_for_lines_in_output(stdout, expected_lines): - output = '' - retries = 10 - found = 0 - while True: - line = stdout.readline() - if line == '': - if retries == 0: - break - time.sleep(0.2) - retries -= 1 - else: - retries = 10 - output += line - for expected in expected_lines: - if expected in line: - found += 1 - break - - if found == len(expected_lines): - break - - return output - - def test_log_output_default_follow(tt_cmd, mock_env_dir): cmd = [tt_cmd, 'log', '-f'] process = subprocess.Popen( diff --git a/test/integration/running/test_running.py b/test/integration/running/test_running.py index cb3fa568c..8bd643bcf 100644 --- a/test/integration/running/test_running.py +++ b/test/integration/running/test_running.py @@ -1,6 +1,7 @@ import os import re import shutil +import signal import subprocess import tempfile @@ -12,7 +13,8 @@ from utils import (config_name, control_socket, extract_status, initial_snap, initial_xlog, kill_child_process, lib_path, log_file, log_path, pid_file, run_command_and_get_output, run_path, - wait_file, wait_instance_start, wait_instance_stop) + wait_file, wait_for_lines_in_output, wait_instance_start, + wait_instance_stop) def test_running_base_functionality(tt_cmd, tmpdir_with_cfg): @@ -927,3 +929,43 @@ def test_kill_without_app_name(tt_cmd, tmp_path, cmd, input): finally: stop = [tt_cmd, "stop"] run_command_and_get_output(stop, cwd=test_app_path) + + +def test_start_interactive(tt_cmd, tmp_path): + test_app_path_src = os.path.join(os.path.dirname(__file__), "multi_inst_app") + + tmp_path /= "multi_inst_app" + shutil.copytree(test_app_path_src, tmp_path) + + start_cmd = [tt_cmd, "start", "-i"] + instance_process = subprocess.Popen( + start_cmd, + cwd=tmp_path, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True + ) + try: + wait_for_lines_in_output(instance_process.stdout, [ + "multi_inst_app:router custom init file...", + "multi_inst_app:router multi_inst_app:router", + "multi_inst_app:master multi_inst_app:master", + "multi_inst_app:replica multi_inst_app:replica", + "multi_inst_app:stateboard unknown instance", + ]) + + instance_process.send_signal(signal.SIGTERM) + + wait_for_lines_in_output(instance_process.stdout, [ + "multi_inst_app:router stopped", + "multi_inst_app:master stopped", + "multi_inst_app:replica stopped", + "multi_inst_app:stateboard stopped", + ]) + + # Make sure no log dir created. + assert not (tmp_path / "var" / "log").exists() + + finally: + run_command_and_get_output([tt_cmd, "stop"], cwd=tmp_path) + assert instance_process.wait(5) == 0 diff --git a/test/utils.py b/test/utils.py index 03dce38e1..119b74c84 100644 --- a/test/utils.py +++ b/test/utils.py @@ -526,3 +526,27 @@ def wait_string_in_file(file, text): break lines = fp.readlines(100) assert found + + +def wait_for_lines_in_output(stdout, expected_lines: list): + output = '' + retries = 10 + while True: + line = stdout.readline() + if line == '': + if retries == 0: + break + time.sleep(0.2) + retries -= 1 + else: + retries = 10 + output += line + for expected in expected_lines: + if expected in line: + expected_lines.remove(expected) + break + + if len(expected_lines) == 0: + break + + return output