Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

- `tt cluster`: worker publish subcommand.
- `tt cluster`: worker show subcommand.

### Changed

### Fixed
Expand Down
198 changes: 198 additions & 0 deletions cli/cluster/cmd/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package cmd

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/fs"
"os"
"strings"
"time"

"github.com/tarantool/go-storage"
etcdDriver "github.com/tarantool/go-storage/driver/etcd"
tcsDriver "github.com/tarantool/go-storage/driver/tcs"
"github.com/tarantool/go-tarantool/v2"
libconnect "github.com/tarantool/tt/lib/connect"
"github.com/tarantool/tt/lib/dial"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/grpc"
)

// ConnectStorage connects to etcd or tarantool config storage and returns
// a storage.Storage instance. It tries etcd first, then tarantool.
func ConnectStorage(
opts libconnect.UriOpts,
username, password string,
) (storage.Storage, func(), error) {
etcdcli, errEtcd := connectEtcd(opts, username, password)
if errEtcd == nil {
drv := etcdDriver.New(etcdcli)
stg := storage.NewStorage(drv)
return stg, func() { etcdcli.Close() }, nil
}

conn, errTarantool := connectTarantool(opts, username, password)
if errTarantool == nil {
drv := tcsDriver.New(conn)
stg := storage.NewStorage(drv)
return stg, func() { conn.Close() }, nil
}

return nil, nil, fmt.Errorf("failed to connect to etcd or tarantool: %w, %w",
errTarantool, errEtcd)
}

// connectEtcd creates an etcd client from URI options.
func connectEtcd(
opts libconnect.UriOpts,
username, password string,
) (*clientv3.Client, error) {
var endpoints []string
if opts.Endpoint != "" {
endpoints = []string{opts.Endpoint}
}

etcdUsername := username
etcdPassword := password
if etcdUsername == "" {
etcdUsername = os.Getenv(libconnect.EtcdUsernameEnv)
}
if etcdPassword == "" {
etcdPassword = os.Getenv(libconnect.EtcdPasswordEnv)
}

var tlsConfig *tls.Config
if opts.KeyFile != "" || opts.CertFile != "" || opts.CaFile != "" ||
opts.CaPath != "" || opts.SkipHostVerify {

tlsInfo := transport.TLSInfo{
CertFile: opts.CertFile,
KeyFile: opts.KeyFile,
TrustedCAFile: opts.CaFile,
}

var err error
tlsConfig, err = tlsInfo.ClientConfig()
if err != nil {
return nil, fmt.Errorf("failed to create tls client config: %w", err)
}

if opts.CaPath != "" {
roots, err := loadRootCACerts(opts.CaPath)
if err != nil {
return nil, fmt.Errorf("failed to load CA directory: %w", err)
}
tlsConfig.RootCAs = roots
}

if opts.SkipHostVerify {
tlsConfig.InsecureSkipVerify = true
}
}

client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: opts.Timeout,
Username: etcdUsername,
Password: etcdPassword,
TLS: tlsConfig,
Logger: zap.NewNop(),
DialOptions: []grpc.DialOption{grpc.WithBlock()}, //nolint:staticcheck
})
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}

return client, nil
}

// connectTarantool creates a tarantool connection from URI options.
func connectTarantool(
opts libconnect.UriOpts,
username, password string,
) (*tarantool.Connection, error) {
tarantoolUsername := username
tarantoolPassword := password
if tarantoolUsername == "" {
tarantoolUsername = os.Getenv(libconnect.TarantoolUsernameEnv)
}
if tarantoolPassword == "" {
tarantoolPassword = os.Getenv(libconnect.TarantoolPasswordEnv)
}

dialOpts := dial.Opts{
Address: fmt.Sprintf("tcp://%s", opts.Host),
User: tarantoolUsername,
Password: tarantoolPassword,
SslKeyFile: opts.KeyFile,
SslCertFile: opts.CertFile,
SslCaFile: opts.CaFile,
SslCiphers: opts.Ciphers,
}

dialer, err := dial.New(dialOpts)
if err != nil {
return nil, fmt.Errorf("failed to create dialer: %w", err)
}

connectorOpts := tarantool.Opts{
Timeout: opts.Timeout,
}

ctx, cancel := contextWithTimeout(connectorOpts.Timeout)
defer cancel()

conn, err := tarantool.Connect(ctx, dialer, connectorOpts)
if err != nil {
return nil, fmt.Errorf("failed to connect to tarantool: %w", err)
}

return conn, nil
}

// contextWithTimeout creates a context with optional timeout.
func contextWithTimeout(timeout time.Duration) (context.Context, context.CancelFunc) {
if timeout > 0 {
return context.WithTimeout(context.Background(), timeout)
}
return context.Background(), func() {}
}

// loadRootCACerts loads root CA certificates from a directory.
func loadRootCACerts(caPath string) (*x509.CertPool, error) {
roots := x509.NewCertPool()

files, err := os.ReadDir(caPath)
if err != nil {
return nil, fmt.Errorf("failed to read CA directory: %w", err)
}

for _, f := range files {
if f.IsDir() || isSameDirSymlink(f, caPath) {
continue
}

data, err := os.ReadFile(caPath + "/" + f.Name())
if err != nil {
continue
}

roots.AppendCertsFromPEM(data)
}

return roots, nil
}

// isSameDirSymlink checks if a directory entry is a symlink pointing to the same directory.
func isSameDirSymlink(f fs.DirEntry, dir string) bool {
if f.Type()&fs.ModeSymlink == 0 {
return false
}

target, err := os.Readlink(dir + "/" + f.Name())
return err == nil && !strings.Contains(target, "/")
}
80 changes: 64 additions & 16 deletions cli/cluster/cmd/worker.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,38 @@
package cmd

import (
"context"
"errors"
"fmt"
"os"
"strings"

"github.com/tarantool/go-storage"
"github.com/tarantool/go-storage/operation"
"github.com/tarantool/go-storage/predicate"
libconnect "github.com/tarantool/tt/lib/connect"
)

// WorkerPublishCtx contains information about cluster worker publish command
// execution context.
type WorkerPublishCtx struct {
// Username defines a username for connection.
Username string
// Password defines a password for connection.
Password string
// Force defines whether the publish should be forced.
Force bool
// Storage is the storage instance for the operation.
Storage storage.Storage
// Key is the key in storage for the worker configuration.
Key string
// Src is a raw data to publish.
Src []byte
// Force defines whether the publish should be forced.
Force bool
}

// WorkerShowCtx contains information about cluster worker show command
// execution context.
type WorkerShowCtx struct {
// Username defines a username for connection.
Username string
// Password defines a password for connection.
Password string
// Storage is the storage instance for the operation.
Storage storage.Storage
// Key is the key in storage for the worker configuration.
Key string
}

// WorkerDeleteCtx contains information about cluster worker delete command
Expand Down Expand Up @@ -109,14 +113,58 @@ func ResolveWorkerCredentials(
return username, password
}

// WorkerPublish publishes a worker configuration. Unimplemented.
func WorkerPublish(url string, ctx WorkerPublishCtx) error {
return errors.New("unimplemented")
// WorkerPublish publishes a worker configuration to storage.
// Without Force flag, it atomically checks that the key does not exist and
// publishes the configuration. If the key already exists, an error is returned.
// With Force flag, it overwrites the existing configuration unconditionally.
func WorkerPublish(publishCtx WorkerPublishCtx) error {
ctx := context.Background()
key := []byte(publishCtx.Key)
value := publishCtx.Src

if publishCtx.Force {
_, err := publishCtx.Storage.Tx(ctx).Then(operation.Put(key, value)).Commit()
if err != nil {
return fmt.Errorf("failed to publish worker configuration: %w", err)
}
return nil
}

resp, err := publishCtx.Storage.Tx(ctx).
If(predicate.VersionEqual(key, 0)).
Then(operation.Put(key, value)).
Commit()
if err != nil {
return fmt.Errorf("failed to publish worker configuration: %w", err)
}

if !resp.Succeeded {
return fmt.Errorf(
"worker configuration already exists at %q, use --force to overwrite",
publishCtx.Key)
}

return nil
}

// WorkerShow shows a worker configuration. Unimplemented.
func WorkerShow(url string, ctx WorkerShowCtx) error {
return errors.New("unimplemented")
// WorkerShow shows a worker configuration from storage.
// It returns an error if the configuration is not found.
func WorkerShow(showCtx WorkerShowCtx) ([]byte, error) {
ctx := context.Background()
key := []byte(showCtx.Key)

resp, err := showCtx.Storage.Tx(ctx).Then(operation.Get(key)).Commit()
if err != nil {
return nil, fmt.Errorf("failed to get worker configuration: %w", err)
}

if len(resp.Results) == 0 || len(resp.Results[0].Values) == 0 {
return nil, fmt.Errorf("worker configuration not found at %q", showCtx.Key)
}

value := resp.Results[0].Values[0].Value

return value, nil
}

// WorkerDelete deletes a worker configuration. Unimplemented.
Expand Down
Loading
Loading