diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c8577674..0003f7647 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- `tt cluster`: worker publish subcommand. +- `tt cluster`: worker show subcommand. + ### Changed ### Fixed diff --git a/cli/cluster/cmd/storage.go b/cli/cluster/cmd/storage.go new file mode 100644 index 000000000..182f8d69c --- /dev/null +++ b/cli/cluster/cmd/storage.go @@ -0,0 +1,198 @@ +package cmd + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "io/fs" + "os" + "strings" + "time" + + "github.com/tarantool/go-storage" + etcdDriver "github.com/tarantool/go-storage/driver/etcd" + tcsDriver "github.com/tarantool/go-storage/driver/tcs" + "github.com/tarantool/go-tarantool/v2" + libconnect "github.com/tarantool/tt/lib/connect" + "github.com/tarantool/tt/lib/dial" + "go.etcd.io/etcd/client/pkg/v3/transport" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +// ConnectStorage connects to etcd or tarantool config storage and returns +// a storage.Storage instance. It tries etcd first, then tarantool. +func ConnectStorage( + opts libconnect.UriOpts, + username, password string, +) (storage.Storage, func(), error) { + etcdcli, errEtcd := connectEtcd(opts, username, password) + if errEtcd == nil { + drv := etcdDriver.New(etcdcli) + stg := storage.NewStorage(drv) + return stg, func() { etcdcli.Close() }, nil + } + + conn, errTarantool := connectTarantool(opts, username, password) + if errTarantool == nil { + drv := tcsDriver.New(conn) + stg := storage.NewStorage(drv) + return stg, func() { conn.Close() }, nil + } + + return nil, nil, fmt.Errorf("failed to connect to etcd or tarantool: %w, %w", + errTarantool, errEtcd) +} + +// connectEtcd creates an etcd client from URI options. +func connectEtcd( + opts libconnect.UriOpts, + username, password string, +) (*clientv3.Client, error) { + var endpoints []string + if opts.Endpoint != "" { + endpoints = []string{opts.Endpoint} + } + + etcdUsername := username + etcdPassword := password + if etcdUsername == "" { + etcdUsername = os.Getenv(libconnect.EtcdUsernameEnv) + } + if etcdPassword == "" { + etcdPassword = os.Getenv(libconnect.EtcdPasswordEnv) + } + + var tlsConfig *tls.Config + if opts.KeyFile != "" || opts.CertFile != "" || opts.CaFile != "" || + opts.CaPath != "" || opts.SkipHostVerify { + + tlsInfo := transport.TLSInfo{ + CertFile: opts.CertFile, + KeyFile: opts.KeyFile, + TrustedCAFile: opts.CaFile, + } + + var err error + tlsConfig, err = tlsInfo.ClientConfig() + if err != nil { + return nil, fmt.Errorf("failed to create tls client config: %w", err) + } + + if opts.CaPath != "" { + roots, err := loadRootCACerts(opts.CaPath) + if err != nil { + return nil, fmt.Errorf("failed to load CA directory: %w", err) + } + tlsConfig.RootCAs = roots + } + + if opts.SkipHostVerify { + tlsConfig.InsecureSkipVerify = true + } + } + + client, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: opts.Timeout, + Username: etcdUsername, + Password: etcdPassword, + TLS: tlsConfig, + Logger: zap.NewNop(), + DialOptions: []grpc.DialOption{grpc.WithBlock()}, //nolint:staticcheck + }) + if err != nil { + return nil, fmt.Errorf("failed to create etcd client: %w", err) + } + + return client, nil +} + +// connectTarantool creates a tarantool connection from URI options. +func connectTarantool( + opts libconnect.UriOpts, + username, password string, +) (*tarantool.Connection, error) { + tarantoolUsername := username + tarantoolPassword := password + if tarantoolUsername == "" { + tarantoolUsername = os.Getenv(libconnect.TarantoolUsernameEnv) + } + if tarantoolPassword == "" { + tarantoolPassword = os.Getenv(libconnect.TarantoolPasswordEnv) + } + + dialOpts := dial.Opts{ + Address: fmt.Sprintf("tcp://%s", opts.Host), + User: tarantoolUsername, + Password: tarantoolPassword, + SslKeyFile: opts.KeyFile, + SslCertFile: opts.CertFile, + SslCaFile: opts.CaFile, + SslCiphers: opts.Ciphers, + } + + dialer, err := dial.New(dialOpts) + if err != nil { + return nil, fmt.Errorf("failed to create dialer: %w", err) + } + + connectorOpts := tarantool.Opts{ + Timeout: opts.Timeout, + } + + ctx, cancel := contextWithTimeout(connectorOpts.Timeout) + defer cancel() + + conn, err := tarantool.Connect(ctx, dialer, connectorOpts) + if err != nil { + return nil, fmt.Errorf("failed to connect to tarantool: %w", err) + } + + return conn, nil +} + +// contextWithTimeout creates a context with optional timeout. +func contextWithTimeout(timeout time.Duration) (context.Context, context.CancelFunc) { + if timeout > 0 { + return context.WithTimeout(context.Background(), timeout) + } + return context.Background(), func() {} +} + +// loadRootCACerts loads root CA certificates from a directory. +func loadRootCACerts(caPath string) (*x509.CertPool, error) { + roots := x509.NewCertPool() + + files, err := os.ReadDir(caPath) + if err != nil { + return nil, fmt.Errorf("failed to read CA directory: %w", err) + } + + for _, f := range files { + if f.IsDir() || isSameDirSymlink(f, caPath) { + continue + } + + data, err := os.ReadFile(caPath + "/" + f.Name()) + if err != nil { + continue + } + + roots.AppendCertsFromPEM(data) + } + + return roots, nil +} + +// isSameDirSymlink checks if a directory entry is a symlink pointing to the same directory. +func isSameDirSymlink(f fs.DirEntry, dir string) bool { + if f.Type()&fs.ModeSymlink == 0 { + return false + } + + target, err := os.Readlink(dir + "/" + f.Name()) + return err == nil && !strings.Contains(target, "/") +} diff --git a/cli/cluster/cmd/worker.go b/cli/cluster/cmd/worker.go index 79be267e7..b3238343f 100644 --- a/cli/cluster/cmd/worker.go +++ b/cli/cluster/cmd/worker.go @@ -1,34 +1,38 @@ package cmd import ( + "context" "errors" "fmt" "os" "strings" + "github.com/tarantool/go-storage" + "github.com/tarantool/go-storage/operation" + "github.com/tarantool/go-storage/predicate" libconnect "github.com/tarantool/tt/lib/connect" ) // WorkerPublishCtx contains information about cluster worker publish command // execution context. type WorkerPublishCtx struct { - // Username defines a username for connection. - Username string - // Password defines a password for connection. - Password string - // Force defines whether the publish should be forced. - Force bool + // Storage is the storage instance for the operation. + Storage storage.Storage + // Key is the key in storage for the worker configuration. + Key string // Src is a raw data to publish. Src []byte + // Force defines whether the publish should be forced. + Force bool } // WorkerShowCtx contains information about cluster worker show command // execution context. type WorkerShowCtx struct { - // Username defines a username for connection. - Username string - // Password defines a password for connection. - Password string + // Storage is the storage instance for the operation. + Storage storage.Storage + // Key is the key in storage for the worker configuration. + Key string } // WorkerDeleteCtx contains information about cluster worker delete command @@ -109,14 +113,58 @@ func ResolveWorkerCredentials( return username, password } -// WorkerPublish publishes a worker configuration. Unimplemented. -func WorkerPublish(url string, ctx WorkerPublishCtx) error { - return errors.New("unimplemented") +// WorkerPublish publishes a worker configuration to storage. +// Without Force flag, it atomically checks that the key does not exist and +// publishes the configuration. If the key already exists, an error is returned. +// With Force flag, it overwrites the existing configuration unconditionally. +func WorkerPublish(publishCtx WorkerPublishCtx) error { + ctx := context.Background() + key := []byte(publishCtx.Key) + value := publishCtx.Src + + if publishCtx.Force { + _, err := publishCtx.Storage.Tx(ctx).Then(operation.Put(key, value)).Commit() + if err != nil { + return fmt.Errorf("failed to publish worker configuration: %w", err) + } + return nil + } + + resp, err := publishCtx.Storage.Tx(ctx). + If(predicate.VersionEqual(key, 0)). + Then(operation.Put(key, value)). + Commit() + if err != nil { + return fmt.Errorf("failed to publish worker configuration: %w", err) + } + + if !resp.Succeeded { + return fmt.Errorf( + "worker configuration already exists at %q, use --force to overwrite", + publishCtx.Key) + } + + return nil } -// WorkerShow shows a worker configuration. Unimplemented. -func WorkerShow(url string, ctx WorkerShowCtx) error { - return errors.New("unimplemented") +// WorkerShow shows a worker configuration from storage. +// It returns an error if the configuration is not found. +func WorkerShow(showCtx WorkerShowCtx) ([]byte, error) { + ctx := context.Background() + key := []byte(showCtx.Key) + + resp, err := showCtx.Storage.Tx(ctx).Then(operation.Get(key)).Commit() + if err != nil { + return nil, fmt.Errorf("failed to get worker configuration: %w", err) + } + + if len(resp.Results) == 0 || len(resp.Results[0].Values) == 0 { + return nil, fmt.Errorf("worker configuration not found at %q", showCtx.Key) + } + + value := resp.Results[0].Values[0].Value + + return value, nil } // WorkerDelete deletes a worker configuration. Unimplemented. diff --git a/cli/cluster/cmd/worker_test.go b/cli/cluster/cmd/worker_test.go index 17bd74372..20a4f9d92 100644 --- a/cli/cluster/cmd/worker_test.go +++ b/cli/cluster/cmd/worker_test.go @@ -1,9 +1,17 @@ package cmd import ( + "context" + "errors" "testing" "github.com/stretchr/testify/require" + "github.com/tarantool/go-storage" + "github.com/tarantool/go-storage/kv" + "github.com/tarantool/go-storage/operation" + "github.com/tarantool/go-storage/predicate" + "github.com/tarantool/go-storage/tx" + "github.com/tarantool/go-storage/watch" libconnect "github.com/tarantool/tt/lib/connect" ) @@ -239,14 +247,132 @@ func TestResolveWorkerCredentials(t *testing.T) { } } -func TestWorkerPublish(t *testing.T) { - err := WorkerPublish("http://localhost:2379/prefix/host/worker", WorkerPublishCtx{}) - require.EqualError(t, err, "unimplemented") +type mockTx struct { + results []tx.RequestResponse + succeeded bool + err error + operations []operation.Operation +} + +func (m *mockTx) If(predicates ...predicate.Predicate) tx.Tx { + return m +} + +func (m *mockTx) Then(operations ...operation.Operation) tx.Tx { + m.operations = operations + return m +} + +func (m *mockTx) Else(operations ...operation.Operation) tx.Tx { + return m +} + +func (m *mockTx) Commit() (tx.Response, error) { + if m.err != nil { + return tx.Response{}, m.err + } + return tx.Response{ + Succeeded: m.succeeded, + Results: m.results, + }, nil +} + +type mockStorage struct { + tx *mockTx +} + +func (m *mockStorage) Watch( + ctx context.Context, + key []byte, + opts ...watch.Option, +) <-chan watch.Event { + return nil +} + +func (m *mockStorage) Tx(ctx context.Context) tx.Tx { + return m.tx +} + +func (m *mockStorage) Range( + ctx context.Context, + opts ...storage.RangeOption, +) ([]kv.KeyValue, error) { + return nil, nil } func TestWorkerShow(t *testing.T) { - err := WorkerShow("http://localhost:2379/prefix/host/worker", WorkerShowCtx{}) - require.EqualError(t, err, "unimplemented") + workerCfg := `type: nontarantool +instrumentation: + url: host1:8080 + metrics_url: /metrics + metrics_format: prometheus +config: + addr: host1:9080 +` + + cases := []struct { + name string + key string + value []byte + txErr error + expectedErr string + }{ + { + name: "key found", + key: "/prefix/instances/host1/worker1", + value: []byte(workerCfg), + }, + { + name: "key not found", + key: "/prefix/instances/host1/worker1", + value: nil, + expectedErr: "worker configuration not found", + }, + { + name: "storage error", + key: "/prefix/instances/host1/worker1", + txErr: errors.New("connection refused"), + expectedErr: "failed to get worker configuration", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + var results []tx.RequestResponse + if tc.value != nil { + results = []tx.RequestResponse{ + { + Values: []kv.KeyValue{ + {Key: []byte(tc.key), Value: tc.value}, + }, + }, + } + } + + mockStg := &mockStorage{ + tx: &mockTx{ + results: results, + err: tc.txErr, + }, + } + + showCtx := WorkerShowCtx{ + Storage: mockStg, + Key: tc.key, + } + + output, err := WorkerShow(showCtx) + + if tc.expectedErr != "" { + require.ErrorContains(t, err, tc.expectedErr) + return + } + + require.NoError(t, err) + require.Contains(t, string(output), "type: nontarantool") + require.Contains(t, string(output), "host1:8080") + }) + } } func TestWorkerDelete(t *testing.T) { diff --git a/cli/cmd/cluster.go b/cli/cmd/cluster.go index b288d0481..16ed6d29b 100644 --- a/cli/cmd/cluster.go +++ b/cli/cmd/cluster.go @@ -66,11 +66,21 @@ var switchStatusCtx = clustercmd.SwitchStatusCtx{ var rolesChangeCtx = clustercmd.RolesChangeCtx{} -var workerPublishCtx clustercmd.WorkerPublishCtx +var workerPublishCtx = clustercmd.WorkerPublishCtx{} -var workerShowCtx clustercmd.WorkerShowCtx +var ( + workerPublishUsername string + workerPublishPassword string +) + +var workerShowCtx = clustercmd.WorkerShowCtx{} + +var ( + workerShowUsername string + workerShowPassword string +) -var workerDeleteCtx clustercmd.WorkerDeleteCtx +var workerDeleteCtx = clustercmd.WorkerDeleteCtx{} var ( defaultSwitchTimeout uint64 = 30 @@ -295,9 +305,9 @@ func newClusterWorkerCmd() *cobra.Command { Run: RunModuleFunc(internalClusterWorkerPublishModule), Args: cobra.ExactArgs(2), } - publishCmd.Flags().StringVarP(&workerPublishCtx.Username, "username", "u", "", + publishCmd.Flags().StringVarP(&workerPublishUsername, "username", "u", "", "username (used as etcd/tarantool config storage credentials)") - publishCmd.Flags().StringVarP(&workerPublishCtx.Password, "password", "p", "", + publishCmd.Flags().StringVarP(&workerPublishPassword, "password", "p", "", "password (used as etcd/tarantool config storage credentials)") publishCmd.Flags().BoolVar(&workerPublishCtx.Force, "force", false, "force publish and skip checking existence") @@ -312,9 +322,9 @@ func newClusterWorkerCmd() *cobra.Command { Run: RunModuleFunc(internalClusterWorkerShowModule), Args: cobra.ExactArgs(1), } - showCmd.Flags().StringVarP(&workerShowCtx.Username, "username", "u", "", + showCmd.Flags().StringVarP(&workerShowUsername, "username", "u", "", "username (used as etcd/tarantool config storage credentials)") - showCmd.Flags().StringVarP(&workerShowCtx.Password, "password", "p", "", + showCmd.Flags().StringVarP(&workerShowPassword, "password", "p", "", "password (used as etcd/tarantool config storage credentials)") deleteCmd := &cobra.Command{ @@ -606,10 +616,23 @@ func internalClusterWorkerPublishModule(cmdCtx *cmdcontext.CmdCtx, args []string } workerPublishCtx.Src = data - workerPublishCtx.Username, workerPublishCtx.Password = clustercmd.ResolveWorkerCredentials( - opts, workerPublishCtx.Username, workerPublishCtx.Password) + prefix, hostName, workerName, err := clustercmd.ParseWorkerPath(opts.Prefix) + if err != nil { + return fmt.Errorf("failed to parse URL path: %w", err) + } + workerPublishCtx.Key = clustercmd.BuildWorkerStorageKey(prefix, hostName, workerName) + + username, password := clustercmd.ResolveWorkerCredentials( + opts, workerPublishUsername, workerPublishPassword) + + stg, closeFunc, err := clustercmd.ConnectStorage(opts, username, password) + if err != nil { + return fmt.Errorf("failed to connect to storage: %w", err) + } + defer closeFunc() + workerPublishCtx.Storage = stg - if err := clustercmd.WorkerPublish(args[0], workerPublishCtx); err != nil { + if err := clustercmd.WorkerPublish(workerPublishCtx); err != nil { return fmt.Errorf("failed to publish worker configuration: %w", err) } return nil @@ -622,12 +645,28 @@ func internalClusterWorkerShowModule(cmdCtx *cmdcontext.CmdCtx, args []string) e return fmt.Errorf("invalid URL %q: %w", args[0], err) } - workerShowCtx.Username, workerShowCtx.Password = clustercmd.ResolveWorkerCredentials( - opts, workerShowCtx.Username, workerShowCtx.Password) + prefix, hostName, workerName, err := clustercmd.ParseWorkerPath(opts.Prefix) + if err != nil { + return fmt.Errorf("failed to parse URL path: %w", err) + } + workerShowCtx.Key = clustercmd.BuildWorkerStorageKey(prefix, hostName, workerName) + + username, password := clustercmd.ResolveWorkerCredentials( + opts, workerShowUsername, workerShowPassword) - if err := clustercmd.WorkerShow(args[0], workerShowCtx); err != nil { + stg, closeFunc, err := clustercmd.ConnectStorage(opts, username, password) + if err != nil { + return fmt.Errorf("failed to connect to storage: %w", err) + } + defer closeFunc() + workerShowCtx.Storage = stg + + output, err := clustercmd.WorkerShow(workerShowCtx) + if err != nil { return fmt.Errorf("failed to show worker configuration: %w", err) } + + fmt.Print(string(output)) return nil } diff --git a/go.mod b/go.mod index 082852d28..1134a0908 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/stretchr/testify v1.11.1 github.com/tarantool/cartridge-cli v0.0.0-00010101000000-000000000000 github.com/tarantool/go-prompt v1.0.1 + github.com/tarantool/go-storage v1.2.0 github.com/tarantool/go-tarantool v1.12.3 github.com/tarantool/go-tarantool/v2 v2.4.2 github.com/tarantool/tt/lib/cluster v0.0.0 @@ -40,6 +41,7 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.6.8 go.etcd.io/etcd/client/v3 v3.6.8 go.etcd.io/etcd/tests/v3 v3.6.8 + go.uber.org/zap v1.27.1 golang.org/x/crypto v0.49.0 golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa golang.org/x/sys v0.42.0 @@ -127,9 +129,8 @@ require ( github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect github.com/spf13/pflag v1.0.10 // indirect github.com/tarantool/go-iproto v1.1.0 // indirect - github.com/tarantool/go-openssl v1.2.1 // indirect + github.com/tarantool/go-openssl v1.2.2 // indirect github.com/tarantool/go-option v1.0.0 // indirect - github.com/tarantool/go-storage v1.1.2 // indirect github.com/tarantool/go-tlsdialer v1.0.2 // indirect github.com/tklauser/go-sysconf v0.3.4 // indirect github.com/tklauser/numcpus v0.2.1 // indirect @@ -155,7 +156,6 @@ require ( go.opentelemetry.io/otel/trace v1.42.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.1 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect golang.org/x/net v0.52.0 // indirect golang.org/x/sync v0.20.0 // indirect diff --git a/go.sum b/go.sum index 19ee9bd60..54917bbaf 100644 --- a/go.sum +++ b/go.sum @@ -114,6 +114,8 @@ github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiU github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/gojuno/minimock/v3 v3.4.7 h1:vhE5zpniyPDRT0DXd5s3DbtZJVlcbmC5k80izYtj9lY= +github.com/gojuno/minimock/v3 v3.4.7/go.mod h1:QxJk4mdPrVyYUmEZGc2yD2NONpqM/j4dWhsy9twjFHg= github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo= github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -313,16 +315,16 @@ github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD github.com/tarantool/go-iproto v1.1.0 h1:HULVOIHsiehI+FnHfM7wMDntuzUddO09DKqu2WnFQ5A= github.com/tarantool/go-iproto v1.1.0/go.mod h1:LNCtdyZxojUed8SbOiYHoc3v9NvaZTB7p96hUySMlIo= github.com/tarantool/go-openssl v0.0.8-0.20230307065445-720eeb389195/go.mod h1:M7H4xYSbzqpW/ZRBMyH0eyqQBsnhAMfsYk5mv0yid7A= -github.com/tarantool/go-openssl v1.2.1 h1:WUVTeEPuBAXbrBjvJZ3ynzk/Sv4DL47V/ehWea9czjA= -github.com/tarantool/go-openssl v1.2.1/go.mod h1:EwX1pKIGypLxkY49vKIIR4LTT+94DhKiunCqU2gEzLQ= +github.com/tarantool/go-openssl v1.2.2 h1:GaGMNsa68ZqoNgrMF7Ke4s9+tXGfc5ulfCUm3/Jb7/k= +github.com/tarantool/go-openssl v1.2.2/go.mod h1:EwX1pKIGypLxkY49vKIIR4LTT+94DhKiunCqU2gEzLQ= github.com/tarantool/go-option v1.0.0 h1:+Etw0i3TjsXvADTo5rfZNCfsXe3BfHOs+iVfIrl0Nlo= github.com/tarantool/go-option v1.0.0/go.mod h1:lXzzeZtL+rPUtLOCDP6ny3FemFBjruG9aHKzNN2bS08= github.com/tarantool/go-prompt v0.2.6-tarantool h1:/dYMRBuM5nE3mleka/mqJWPf8SrJ151U+OqDlTzvES0= github.com/tarantool/go-prompt v0.2.6-tarantool/go.mod h1:8enZKIgoGFEQu2XPBK79TguJG2XF3SR4QU2iYI28NSo= github.com/tarantool/go-prompt v1.0.1 h1:88Yer6gCFylqGRrdWwikNFVbklRQsqKF7mycvGdDcj0= github.com/tarantool/go-prompt v1.0.1/go.mod h1:9Vuvi60Bk+3yaXqgYaXNTpLbwPPaaEOeaUgpFW1jqTU= -github.com/tarantool/go-storage v1.1.2 h1:I+fQtVnivSy6M2xhedn73s1604jx6ZDRWjqE2AOgRsc= -github.com/tarantool/go-storage v1.1.2/go.mod h1:lM/UPkuzeggynwtmIHD5OCqdz5H2RHsXX6HaOzYBCzk= +github.com/tarantool/go-storage v1.2.0 h1:BxsMZxVw3spD4xXv9Svws2A4NM74v4nE3drUjeN+CGg= +github.com/tarantool/go-storage v1.2.0/go.mod h1:cDp+ffxTLRRn7lhy1q8m9IUIzFD1uUhY2OqWLUzrfCM= github.com/tarantool/go-tarantool v1.12.3 h1:GXabowmrTSW225xFEjX4t+8PlccVDCeGB5OM1VLbBXE= github.com/tarantool/go-tarantool v1.12.3/go.mod h1:QRiXv0jnxwgxHtr9ZmifSr/eRba76gTUBgp69pDMX1U= github.com/tarantool/go-tarantool/v2 v2.4.2 h1:rkzYtFhLJLA9RDIhjzN93MJBN5PBxHW4+soq+RB90gE= diff --git a/test/integration/cluster/test_cluster_worker.py b/test/integration/cluster/test_cluster_worker.py index b3e50a894..e20e0866d 100644 --- a/test/integration/cluster/test_cluster_worker.py +++ b/test/integration/cluster/test_cluster_worker.py @@ -1,5 +1,26 @@ +import os import subprocess +import pytest + +worker_cfg = """type: nontarantool +instrumentation: + url: host1:8080 + metrics_url: /metrics + metrics_format: prometheus +config: + addr: host1:9080 +""" + +worker_cfg_updated = """type: nontarantool +instrumentation: + url: host1:8080 + metrics_url: /metrics + metrics_format: prometheus +config: + addr: host1:9081 +""" + def test_cluster_worker_help(tt_cmd, tmp_path): help_cmd = [tt_cmd, "cluster", "worker", "--help"] @@ -95,65 +116,1061 @@ def test_cluster_worker_delete_help(tt_cmd, tmp_path): assert "-p" in help_output or "--password" in help_output -def test_cluster_worker_publish_unimplemented(tt_cmd, tmp_path): - worker_cfg = tmp_path / "worker.yaml" - worker_cfg.write_text("type: nontarantool\n") +def test_cluster_worker_delete_unimplemented(tt_cmd, tmp_path): + delete_cmd = [ + tt_cmd, + "cluster", + "worker", + "delete", + "https://localhost:2379/prefix/host/worker", + ] + instance_process = subprocess.Popen( + delete_cmd, + cwd=tmp_path, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + output = instance_process.stdout.read() + + assert "unimplemented" in output + +def test_cluster_worker_publish_missing_file(tt_cmd, tmpdir_with_cfg): + tmpdir = tmpdir_with_cfg publish_cmd = [ tt_cmd, "cluster", "worker", "publish", - "https://localhost:2379/prefix/host/worker", - str(worker_cfg), + "http://localhost:2379/prefix/host1/worker1", + "nonexistent.yaml", ] instance_process = subprocess.Popen( publish_cmd, - cwd=tmp_path, + cwd=tmpdir, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, text=True, ) output = instance_process.stdout.read() - assert "unimplemented" in output + assert "failed to read file" in output + + +def test_cluster_worker_publish_invalid_url(tt_cmd, tmpdir_with_cfg): + tmpdir = tmpdir_with_cfg + worker_cfg_path = os.path.join(tmpdir, "worker.yaml") + with open(worker_cfg_path, "w") as f: + f.write(worker_cfg) + + publish_cmd = [ + tt_cmd, + "cluster", + "worker", + "publish", + "not-a-valid-url", + "worker.yaml", + ] + instance_process = subprocess.Popen( + publish_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + output = instance_process.stdout.read() + + assert "invalid URL" in output -def test_cluster_worker_show_unimplemented(tt_cmd, tmp_path): +def test_cluster_worker_publish_invalid_path(tt_cmd, tmpdir_with_cfg): + tmpdir = tmpdir_with_cfg + worker_cfg_path = os.path.join(tmpdir, "worker.yaml") + with open(worker_cfg_path, "w") as f: + f.write(worker_cfg) + + publish_cmd = [ + tt_cmd, + "cluster", + "worker", + "publish", + "http://localhost:2379/onlyhost?timeout=0.1", + "worker.yaml", + ] + instance_process = subprocess.Popen( + publish_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + output = instance_process.stdout.read() + + assert output == ( + " ⨯ failed to parse URL path:" + " URL path must contain at least a host-name and a worker-name," + ' got: "/onlyhost"\n' + ) + + +def test_cluster_worker_publish_connection_failed(tt_cmd, tmpdir_with_cfg): + tmpdir = tmpdir_with_cfg + worker_cfg_path = os.path.join(tmpdir, "worker.yaml") + with open(worker_cfg_path, "w") as f: + f.write(worker_cfg) + + publish_cmd = [ + tt_cmd, + "cluster", + "worker", + "publish", + "https://localhost:12344/prefix/host1/worker1?timeout=0.1", + "worker.yaml", + ] + instance_process = subprocess.Popen( + publish_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + output = instance_process.stdout.read() + + assert "failed to connect to storage: failed to connect to etcd or tarantool" in output + + +@pytest.mark.parametrize("instance_name", ["etcd", "tcs"]) +def test_cluster_worker_publish(tt_cmd, tmpdir_with_cfg, instance_name, request): + instance = request.getfixturevalue(instance_name) + tmpdir = tmpdir_with_cfg + worker_cfg_path = os.path.join(tmpdir, "worker.yaml") + with open(worker_cfg_path, "w") as f: + f.write(worker_cfg) + + conn = instance.conn() + creds = ( + f"{instance.connection_username}:{instance.connection_password}@" + if instance_name == "tcs" + else "" + ) + publish_cmd = [ + tt_cmd, + "cluster", + "worker", + "publish", + "http://" + creds + f"{instance.host}:{instance.port}/prefix/host1/worker1?timeout=5", + "worker.yaml", + ] + instance_process = subprocess.Popen( + publish_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + publish_output = instance_process.stdout.read() + + assert "" == publish_output + + content = "" + storage_key = "/prefix/instances/host1/worker1" + if instance_name == "etcd": + content, _ = conn.get(storage_key) + content = content.decode("utf-8") + else: + content = conn.call("config.storage.get", storage_key) + if len(content) > 0: + content = content[0]["data"][0]["value"] + + assert worker_cfg == content + + +@pytest.mark.parametrize("instance_name", ["etcd", "tcs"]) +def test_cluster_worker_publish_nested_prefix(tt_cmd, tmpdir_with_cfg, instance_name, request): + instance = request.getfixturevalue(instance_name) + tmpdir = tmpdir_with_cfg + worker_cfg_path = os.path.join(tmpdir, "worker.yaml") + with open(worker_cfg_path, "w") as f: + f.write(worker_cfg) + + conn = instance.conn() + creds = ( + f"{instance.connection_username}:{instance.connection_password}@" + if instance_name == "tcs" + else "" + ) + publish_cmd = [ + tt_cmd, + "cluster", + "worker", + "publish", + "http://" + + creds + + f"{instance.host}:{instance.port}/tdb-workers/cluster1/host1/worker1?timeout=5", + "worker.yaml", + ] + instance_process = subprocess.Popen( + publish_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + publish_output = instance_process.stdout.read() + + assert "" == publish_output + + content = "" + storage_key = "/tdb-workers/cluster1/instances/host1/worker1" + if instance_name == "etcd": + content, _ = conn.get(storage_key) + content = content.decode("utf-8") + else: + content = conn.call("config.storage.get", storage_key) + if len(content) > 0: + content = content[0]["data"][0]["value"] + + assert worker_cfg == content + + +@pytest.mark.parametrize("instance_name", ["etcd", "tcs"]) +def test_cluster_worker_publish_exists_no_force(tt_cmd, tmpdir_with_cfg, instance_name, request): + instance = request.getfixturevalue(instance_name) + tmpdir = tmpdir_with_cfg + worker_cfg_path = os.path.join(tmpdir, "worker.yaml") + with open(worker_cfg_path, "w") as f: + f.write(worker_cfg) + + conn = instance.conn() + creds = ( + f"{instance.connection_username}:{instance.connection_password}@" + if instance_name == "tcs" + else "" + ) + url = "http://" + creds + f"{instance.host}:{instance.port}/prefix/host1/worker1?timeout=5" + + publish_cmd = [tt_cmd, "cluster", "worker", "publish", url, "worker.yaml"] + instance_process = subprocess.Popen( + publish_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + publish_output = instance_process.stdout.read() + assert "" == publish_output + + with open(worker_cfg_path, "w") as f: + f.write(worker_cfg_updated) + + publish_cmd = [tt_cmd, "cluster", "worker", "publish", url, "worker.yaml"] + instance_process = subprocess.Popen( + publish_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + publish_output = instance_process.stdout.read() + + assert publish_output == ( + " ⨯ failed to publish worker configuration:" + " worker configuration already exists at" + ' "/prefix/instances/host1/worker1",' + " use --force to overwrite\n" + ) + + content = "" + storage_key = "/prefix/instances/host1/worker1" + if instance_name == "etcd": + content, _ = conn.get(storage_key) + content = content.decode("utf-8") + else: + content = conn.call("config.storage.get", storage_key) + if len(content) > 0: + content = content[0]["data"][0]["value"] + + assert worker_cfg == content + + +@pytest.mark.parametrize("instance_name", ["etcd", "tcs"]) +def test_cluster_worker_publish_force_overwrite(tt_cmd, tmpdir_with_cfg, instance_name, request): + instance = request.getfixturevalue(instance_name) + tmpdir = tmpdir_with_cfg + worker_cfg_path = os.path.join(tmpdir, "worker.yaml") + with open(worker_cfg_path, "w") as f: + f.write(worker_cfg) + + conn = instance.conn() + creds = ( + f"{instance.connection_username}:{instance.connection_password}@" + if instance_name == "tcs" + else "" + ) + url = "http://" + creds + f"{instance.host}:{instance.port}/prefix/host1/worker1?timeout=5" + + publish_cmd = [tt_cmd, "cluster", "worker", "publish", url, "worker.yaml"] + instance_process = subprocess.Popen( + publish_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + publish_output = instance_process.stdout.read() + assert "" == publish_output + + with open(worker_cfg_path, "w") as f: + f.write(worker_cfg_updated) + + publish_cmd = [tt_cmd, "cluster", "worker", "publish", "--force", url, "worker.yaml"] + instance_process = subprocess.Popen( + publish_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + publish_output = instance_process.stdout.read() + assert "" == publish_output + + content = "" + storage_key = "/prefix/instances/host1/worker1" + if instance_name == "etcd": + content, _ = conn.get(storage_key) + content = content.decode("utf-8") + else: + content = conn.call("config.storage.get", storage_key) + if len(content) > 0: + content = content[0]["data"][0]["value"] + + assert worker_cfg_updated == content + + +@pytest.mark.parametrize("instance_name", ["etcd", "tcs"]) +def test_cluster_worker_publish_force_new_key(tt_cmd, tmpdir_with_cfg, instance_name, request): + instance = request.getfixturevalue(instance_name) + tmpdir = tmpdir_with_cfg + worker_cfg_path = os.path.join(tmpdir, "worker.yaml") + with open(worker_cfg_path, "w") as f: + f.write(worker_cfg) + + conn = instance.conn() + creds = ( + f"{instance.connection_username}:{instance.connection_password}@" + if instance_name == "tcs" + else "" + ) + url = "http://" + creds + f"{instance.host}:{instance.port}/prefix/host1/worker1?timeout=5" + + publish_cmd = [tt_cmd, "cluster", "worker", "publish", "--force", url, "worker.yaml"] + instance_process = subprocess.Popen( + publish_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + publish_output = instance_process.stdout.read() + assert "" == publish_output + + content = "" + storage_key = "/prefix/instances/host1/worker1" + if instance_name == "etcd": + content, _ = conn.get(storage_key) + content = content.decode("utf-8") + else: + content = conn.call("config.storage.get", storage_key) + if len(content) > 0: + content = content[0]["data"][0]["value"] + + assert worker_cfg == content + + +@pytest.mark.parametrize( + "auth, instance_name", + [ + pytest.param("url", "etcd"), + pytest.param("flag", "etcd"), + pytest.param("env", "etcd"), + pytest.param("url", "tcs"), + pytest.param("flag", "tcs"), + pytest.param("env", "tcs"), + ], +) +def test_cluster_worker_publish_auth(tt_cmd, tmpdir_with_cfg, auth, instance_name, request): + instance = request.getfixturevalue(instance_name) + tmpdir = tmpdir_with_cfg + worker_cfg_path = os.path.join(tmpdir, "worker.yaml") + with open(worker_cfg_path, "w") as f: + f.write(worker_cfg) + + if instance_name == "etcd": + instance.enable_auth() + + try: + if auth == "url": + env = None + url = ( + f"http://{instance.connection_username}:{instance.connection_password}@" + f"{instance.host}:{instance.port}/prefix/host1/worker1?timeout=5" + ) + publish_cmd = [tt_cmd, "cluster", "worker", "publish", url, "worker.yaml"] + elif auth == "flag": + env = None + url = f"{instance.endpoint}/prefix/host1/worker1?timeout=5" + publish_cmd = [ + tt_cmd, + "cluster", + "worker", + "publish", + url, + "worker.yaml", + "-u", + instance.connection_username, + "-p", + instance.connection_password, + ] + elif auth == "env": + env = { + ( + "TT_CLI_ETCD_USERNAME" if instance_name == "etcd" else "TT_CLI_USERNAME" + ): instance.connection_username, + ( + "TT_CLI_ETCD_PASSWORD" if instance_name == "etcd" else "TT_CLI_PASSWORD" + ): instance.connection_password, + } + url = f"{instance.endpoint}/prefix/host1/worker1?timeout=5" + publish_cmd = [tt_cmd, "cluster", "worker", "publish", url, "worker.yaml"] + + instance_process = subprocess.Popen( + publish_cmd, + env=env, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + publish_output = instance_process.stdout.read() + + assert "" == publish_output + + if instance_name == "etcd": + instance.disable_auth() + + conn = instance.conn() + content = "" + storage_key = "/prefix/instances/host1/worker1" + if instance_name == "etcd": + content, _ = conn.get(storage_key) + content = content.decode("utf-8") + else: + content = conn.call("config.storage.get", storage_key) + if len(content) > 0: + content = content[0]["data"][0]["value"] + + assert worker_cfg == content + finally: + if instance_name == "etcd": + instance.disable_auth() + + +@pytest.mark.parametrize("instance_name", ["etcd", "tcs"]) +def test_cluster_worker_publish_auth_priority_url_over_flag( + tt_cmd, + tmpdir_with_cfg, + instance_name, + request, +): + instance = request.getfixturevalue(instance_name) + tmpdir = tmpdir_with_cfg + worker_cfg_path = os.path.join(tmpdir, "worker.yaml") + with open(worker_cfg_path, "w") as f: + f.write(worker_cfg) + + if instance_name == "etcd": + instance.enable_auth() + + try: + url = ( + f"http://{instance.connection_username}:{instance.connection_password}@" + f"{instance.host}:{instance.port}/prefix/host1/worker1?timeout=5" + ) + publish_cmd = [ + tt_cmd, + "cluster", + "worker", + "publish", + url, + "worker.yaml", + "-u", + "invalid_user", + "-p", + "invalid_pass", + ] + instance_process = subprocess.Popen( + publish_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + publish_output = instance_process.stdout.read() + + assert "" == publish_output + + if instance_name == "etcd": + instance.disable_auth() + + conn = instance.conn() + content = "" + storage_key = "/prefix/instances/host1/worker1" + if instance_name == "etcd": + content, _ = conn.get(storage_key) + content = content.decode("utf-8") + else: + content = conn.call("config.storage.get", storage_key) + if len(content) > 0: + content = content[0]["data"][0]["value"] + + assert worker_cfg == content + finally: + if instance_name == "etcd": + instance.disable_auth() + + +@pytest.mark.parametrize("instance_name", ["etcd", "tcs"]) +def test_cluster_worker_publish_auth_priority_flag_over_env( + tt_cmd, + tmpdir_with_cfg, + instance_name, + request, +): + instance = request.getfixturevalue(instance_name) + tmpdir = tmpdir_with_cfg + worker_cfg_path = os.path.join(tmpdir, "worker.yaml") + with open(worker_cfg_path, "w") as f: + f.write(worker_cfg) + + if instance_name == "etcd": + instance.enable_auth() + + try: + env = { + "TT_CLI_ETCD_USERNAME" + if instance_name == "etcd" + else "TT_CLI_USERNAME": "invalid_env_user", + "TT_CLI_ETCD_PASSWORD" + if instance_name == "etcd" + else "TT_CLI_PASSWORD": "invalid_env_pass", + } + url = f"{instance.endpoint}/prefix/host1/worker1?timeout=5" + publish_cmd = [ + tt_cmd, + "cluster", + "worker", + "publish", + url, + "worker.yaml", + "-u", + instance.connection_username, + "-p", + instance.connection_password, + ] + instance_process = subprocess.Popen( + publish_cmd, + env=env, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + publish_output = instance_process.stdout.read() + + assert "" == publish_output + + if instance_name == "etcd": + instance.disable_auth() + + conn = instance.conn() + content = "" + storage_key = "/prefix/instances/host1/worker1" + if instance_name == "etcd": + content, _ = conn.get(storage_key) + content = content.decode("utf-8") + else: + content = conn.call("config.storage.get", storage_key) + if len(content) > 0: + content = content[0]["data"][0]["value"] + + assert worker_cfg == content + finally: + if instance_name == "etcd": + instance.disable_auth() + + +@pytest.mark.parametrize("instance_name", ["etcd", "tcs"]) +def test_cluster_worker_publish_auth_bad_credentials( + tt_cmd, + tmpdir_with_cfg, + instance_name, + request, +): + instance = request.getfixturevalue(instance_name) + tmpdir = tmpdir_with_cfg + worker_cfg_path = os.path.join(tmpdir, "worker.yaml") + with open(worker_cfg_path, "w") as f: + f.write(worker_cfg) + + if instance_name == "etcd": + instance.enable_auth() + + try: + url = f"http://invalid_user:invalid_pass@{instance.host}:{instance.port}/prefix/host1/worker1?timeout=1" + publish_cmd = [tt_cmd, "cluster", "worker", "publish", url, "worker.yaml"] + instance_process = subprocess.Popen( + publish_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + publish_output = instance_process.stdout.read() + + assert ( + "failed to connect to storage: failed to connect to etcd or tarantool" in publish_output + ) + finally: + if instance_name == "etcd": + instance.disable_auth() + + +def test_cluster_worker_show_invalid_url(tt_cmd, tmpdir_with_cfg): + tmpdir = tmpdir_with_cfg show_cmd = [ tt_cmd, "cluster", "worker", "show", - "https://localhost:2379/prefix/host/worker", + "not-a-valid-url", ] instance_process = subprocess.Popen( show_cmd, - cwd=tmp_path, + cwd=tmpdir, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, text=True, ) output = instance_process.stdout.read() - assert "unimplemented" in output + assert "invalid URL" in output -def test_cluster_worker_delete_unimplemented(tt_cmd, tmp_path): - delete_cmd = [ +def test_cluster_worker_show_invalid_path(tt_cmd, tmpdir_with_cfg): + tmpdir = tmpdir_with_cfg + show_cmd = [ tt_cmd, "cluster", "worker", - "delete", - "https://localhost:2379/prefix/host/worker", + "show", + "http://localhost:2379/onlyhost?timeout=0.1", ] instance_process = subprocess.Popen( - delete_cmd, - cwd=tmp_path, + show_cmd, + cwd=tmpdir, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, text=True, ) output = instance_process.stdout.read() - assert "unimplemented" in output + assert output == ( + " ⨯ failed to parse URL path:" + " URL path must contain at least a host-name and a worker-name," + ' got: "/onlyhost"\n' + ) + + +def test_cluster_worker_show_connection_failed(tt_cmd, tmpdir_with_cfg): + tmpdir = tmpdir_with_cfg + show_cmd = [ + tt_cmd, + "cluster", + "worker", + "show", + "https://localhost:12344/prefix/host1/worker1?timeout=0.1", + ] + instance_process = subprocess.Popen( + show_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + output = instance_process.stdout.read() + + assert "failed to connect to storage: failed to connect to etcd or tarantool" in output + + +@pytest.mark.parametrize("instance_name", ["etcd", "tcs"]) +def test_cluster_worker_show(tt_cmd, tmpdir_with_cfg, instance_name, request): + instance = request.getfixturevalue(instance_name) + tmpdir = tmpdir_with_cfg + + conn = instance.conn() + storage_key = "/prefix/instances/host1/worker1" + if instance_name == "etcd": + conn.put(storage_key, worker_cfg) + else: + conn.call("config.storage.put", storage_key, worker_cfg) + + creds = ( + f"{instance.connection_username}:{instance.connection_password}@" + if instance_name == "tcs" + else "" + ) + show_cmd = [ + tt_cmd, + "cluster", + "worker", + "show", + "http://" + creds + f"{instance.host}:{instance.port}/prefix/host1/worker1?timeout=5", + ] + instance_process = subprocess.Popen( + show_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + show_output = instance_process.stdout.read() + + assert show_output.strip() == worker_cfg.strip() + + +@pytest.mark.parametrize("instance_name", ["etcd", "tcs"]) +def test_cluster_worker_show_nested_prefix(tt_cmd, tmpdir_with_cfg, instance_name, request): + instance = request.getfixturevalue(instance_name) + tmpdir = tmpdir_with_cfg + + conn = instance.conn() + storage_key = "/tdb-workers/cluster1/instances/host1/worker1" + if instance_name == "etcd": + conn.put(storage_key, worker_cfg) + else: + conn.call("config.storage.put", storage_key, worker_cfg) + + creds = ( + f"{instance.connection_username}:{instance.connection_password}@" + if instance_name == "tcs" + else "" + ) + show_cmd = [ + tt_cmd, + "cluster", + "worker", + "show", + "http://" + + creds + + f"{instance.host}:{instance.port}/tdb-workers/cluster1/host1/worker1?timeout=5", + ] + instance_process = subprocess.Popen( + show_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + show_output = instance_process.stdout.read() + + assert show_output.strip() == worker_cfg.strip() + + +@pytest.mark.parametrize("instance_name", ["etcd", "tcs"]) +def test_cluster_worker_show_not_found(tt_cmd, tmpdir_with_cfg, instance_name, request): + instance = request.getfixturevalue(instance_name) + tmpdir = tmpdir_with_cfg + + creds = ( + f"{instance.connection_username}:{instance.connection_password}@" + if instance_name == "tcs" + else "" + ) + show_cmd = [ + tt_cmd, + "cluster", + "worker", + "show", + "http://" + creds + f"{instance.host}:{instance.port}/prefix/host1/worker1?timeout=5", + ] + instance_process = subprocess.Popen( + show_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + show_output = instance_process.stdout.read() + + assert "worker configuration not found" in show_output + + +@pytest.mark.parametrize( + "auth, instance_name", + [ + pytest.param("url", "etcd"), + pytest.param("flag", "etcd"), + pytest.param("env", "etcd"), + pytest.param("url", "tcs"), + pytest.param("flag", "tcs"), + pytest.param("env", "tcs"), + ], +) +def test_cluster_worker_show_auth(tt_cmd, tmpdir_with_cfg, auth, instance_name, request): + instance = request.getfixturevalue(instance_name) + tmpdir = tmpdir_with_cfg + + conn = instance.conn() + storage_key = "/prefix/instances/host1/worker1" + if instance_name == "etcd": + conn.put(storage_key, worker_cfg) + else: + conn.call("config.storage.put", storage_key, worker_cfg) + + if instance_name == "etcd": + instance.enable_auth() + + try: + if auth == "url": + env = None + url = ( + f"http://{instance.connection_username}:{instance.connection_password}@" + f"{instance.host}:{instance.port}/prefix/host1/worker1?timeout=5" + ) + show_cmd = [tt_cmd, "cluster", "worker", "show", url] + elif auth == "flag": + env = None + url = f"{instance.endpoint}/prefix/host1/worker1?timeout=5" + show_cmd = [ + tt_cmd, + "cluster", + "worker", + "show", + url, + "-u", + instance.connection_username, + "-p", + instance.connection_password, + ] + elif auth == "env": + env = { + ( + "TT_CLI_ETCD_USERNAME" if instance_name == "etcd" else "TT_CLI_USERNAME" + ): instance.connection_username, + ( + "TT_CLI_ETCD_PASSWORD" if instance_name == "etcd" else "TT_CLI_PASSWORD" + ): instance.connection_password, + } + url = f"{instance.endpoint}/prefix/host1/worker1?timeout=5" + show_cmd = [tt_cmd, "cluster", "worker", "show", url] + + instance_process = subprocess.Popen( + show_cmd, + env=env, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + show_output = instance_process.stdout.read() + + assert show_output.strip() == worker_cfg.strip() + finally: + if instance_name == "etcd": + instance.disable_auth() + + +@pytest.mark.parametrize("instance_name", ["etcd", "tcs"]) +def test_cluster_worker_show_auth_priority_url_over_flag( + tt_cmd, + tmpdir_with_cfg, + instance_name, + request, +): + instance = request.getfixturevalue(instance_name) + tmpdir = tmpdir_with_cfg + + conn = instance.conn() + storage_key = "/prefix/instances/host1/worker1" + if instance_name == "etcd": + conn.put(storage_key, worker_cfg) + else: + conn.call("config.storage.put", storage_key, worker_cfg) + + if instance_name == "etcd": + instance.enable_auth() + + try: + url = ( + f"http://{instance.connection_username}:{instance.connection_password}@" + f"{instance.host}:{instance.port}/prefix/host1/worker1?timeout=5" + ) + show_cmd = [ + tt_cmd, + "cluster", + "worker", + "show", + url, + "-u", + "invalid_user", + "-p", + "invalid_pass", + ] + instance_process = subprocess.Popen( + show_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + show_output = instance_process.stdout.read() + + assert show_output.strip() == worker_cfg.strip() + finally: + if instance_name == "etcd": + instance.disable_auth() + + +@pytest.mark.parametrize("instance_name", ["etcd", "tcs"]) +def test_cluster_worker_show_auth_priority_flag_over_env( + tt_cmd, + tmpdir_with_cfg, + instance_name, + request, +): + instance = request.getfixturevalue(instance_name) + tmpdir = tmpdir_with_cfg + + conn = instance.conn() + storage_key = "/prefix/instances/host1/worker1" + if instance_name == "etcd": + conn.put(storage_key, worker_cfg) + else: + conn.call("config.storage.put", storage_key, worker_cfg) + + if instance_name == "etcd": + instance.enable_auth() + + try: + env = { + "TT_CLI_ETCD_USERNAME" + if instance_name == "etcd" + else "TT_CLI_USERNAME": "invalid_env_user", + "TT_CLI_ETCD_PASSWORD" + if instance_name == "etcd" + else "TT_CLI_PASSWORD": "invalid_env_pass", + } + url = f"{instance.endpoint}/prefix/host1/worker1?timeout=5" + show_cmd = [ + tt_cmd, + "cluster", + "worker", + "show", + url, + "-u", + instance.connection_username, + "-p", + instance.connection_password, + ] + instance_process = subprocess.Popen( + show_cmd, + env=env, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + show_output = instance_process.stdout.read() + + assert show_output.strip() == worker_cfg.strip() + finally: + if instance_name == "etcd": + instance.disable_auth() + + +@pytest.mark.parametrize("instance_name", ["etcd", "tcs"]) +def test_cluster_worker_show_auth_bad_credentials( + tt_cmd, + tmpdir_with_cfg, + instance_name, + request, +): + instance = request.getfixturevalue(instance_name) + tmpdir = tmpdir_with_cfg + + conn = instance.conn() + storage_key = "/prefix/instances/host1/worker1" + if instance_name == "etcd": + conn.put(storage_key, worker_cfg) + else: + conn.call("config.storage.put", storage_key, worker_cfg) + + if instance_name == "etcd": + instance.enable_auth() + + try: + url = f"http://invalid_user:invalid_pass@{instance.host}:{instance.port}/prefix/host1/worker1?timeout=1" + show_cmd = [tt_cmd, "cluster", "worker", "show", url] + instance_process = subprocess.Popen( + show_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + show_output = instance_process.stdout.read() + + assert "failed to connect to storage: failed to connect to etcd or tarantool" in show_output + finally: + if instance_name == "etcd": + instance.disable_auth() + + +@pytest.mark.parametrize("instance_name", ["etcd", "tcs"]) +def test_cluster_worker_publish_then_show(tt_cmd, tmpdir_with_cfg, instance_name, request): + instance = request.getfixturevalue(instance_name) + tmpdir = tmpdir_with_cfg + worker_cfg_path = os.path.join(tmpdir, "worker.yaml") + with open(worker_cfg_path, "w") as f: + f.write(worker_cfg) + + creds = ( + f"{instance.connection_username}:{instance.connection_password}@" + if instance_name == "tcs" + else "" + ) + url = "http://" + creds + f"{instance.host}:{instance.port}/prefix/host1/worker1?timeout=5" + + publish_cmd = [tt_cmd, "cluster", "worker", "publish", url, "worker.yaml"] + instance_process = subprocess.Popen( + publish_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + publish_output = instance_process.stdout.read() + assert "" == publish_output + + show_cmd = [tt_cmd, "cluster", "worker", "show", url] + instance_process = subprocess.Popen( + show_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + ) + show_output = instance_process.stdout.read() + + assert show_output.strip() == worker_cfg.strip()