diff --git a/go.mod b/go.mod index b42240f7b..2a649e0c4 100644 --- a/go.mod +++ b/go.mod @@ -115,6 +115,8 @@ require ( github.com/spf13/pflag v1.0.10 // indirect github.com/tarantool/go-iproto v1.1.0 // indirect github.com/tarantool/go-openssl v1.2.2 // indirect + github.com/tarantool/go-option v1.0.0 // indirect + github.com/tarantool/go-storage v1.2.0 // indirect github.com/tarantool/go-tlsdialer v1.0.2 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect diff --git a/go.sum b/go.sum index 72b71a5ef..b822241e1 100644 --- a/go.sum +++ b/go.sum @@ -102,6 +102,8 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/gojuno/minimock/v3 v3.4.7 h1:vhE5zpniyPDRT0DXd5s3DbtZJVlcbmC5k80izYtj9lY= +github.com/gojuno/minimock/v3 v3.4.7/go.mod h1:QxJk4mdPrVyYUmEZGc2yD2NONpqM/j4dWhsy9twjFHg= github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo= github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -288,8 +290,12 @@ github.com/tarantool/go-iproto v1.1.0/go.mod h1:LNCtdyZxojUed8SbOiYHoc3v9NvaZTB7 github.com/tarantool/go-openssl v0.0.8-0.20230307065445-720eeb389195/go.mod h1:M7H4xYSbzqpW/ZRBMyH0eyqQBsnhAMfsYk5mv0yid7A= github.com/tarantool/go-openssl v1.2.2 h1:GaGMNsa68ZqoNgrMF7Ke4s9+tXGfc5ulfCUm3/Jb7/k= github.com/tarantool/go-openssl v1.2.2/go.mod h1:EwX1pKIGypLxkY49vKIIR4LTT+94DhKiunCqU2gEzLQ= +github.com/tarantool/go-option v1.0.0 h1:+Etw0i3TjsXvADTo5rfZNCfsXe3BfHOs+iVfIrl0Nlo= +github.com/tarantool/go-option v1.0.0/go.mod h1:lXzzeZtL+rPUtLOCDP6ny3FemFBjruG9aHKzNN2bS08= github.com/tarantool/go-prompt v1.0.1 h1:88Yer6gCFylqGRrdWwikNFVbklRQsqKF7mycvGdDcj0= github.com/tarantool/go-prompt v1.0.1/go.mod h1:9Vuvi60Bk+3yaXqgYaXNTpLbwPPaaEOeaUgpFW1jqTU= +github.com/tarantool/go-storage v1.2.0 h1:BxsMZxVw3spD4xXv9Svws2A4NM74v4nE3drUjeN+CGg= +github.com/tarantool/go-storage v1.2.0/go.mod h1:cDp+ffxTLRRn7lhy1q8m9IUIzFD1uUhY2OqWLUzrfCM= github.com/tarantool/go-tarantool v1.12.3 h1:GXabowmrTSW225xFEjX4t+8PlccVDCeGB5OM1VLbBXE= github.com/tarantool/go-tarantool v1.12.3/go.mod h1:QRiXv0jnxwgxHtr9ZmifSr/eRba76gTUBgp69pDMX1U= github.com/tarantool/go-tarantool/v2 v2.4.2 h1:rkzYtFhLJLA9RDIhjzN93MJBN5PBxHW4+soq+RB90gE= diff --git a/lib/cluster/datacollector.go b/lib/cluster/datacollector.go index d719ba5ec..57ce5a21c 100644 --- a/lib/cluster/datacollector.go +++ b/lib/cluster/datacollector.go @@ -3,6 +3,9 @@ package cluster import ( "time" + "github.com/tarantool/go-storage" + "github.com/tarantool/go-storage/driver/etcd" + "github.com/tarantool/go-storage/driver/tcs" "github.com/tarantool/go-tarantool/v2" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -53,10 +56,10 @@ func (factory collectorsFactory) NewFile(path string) (DataCollector, error) { func (factory collectorsFactory) NewEtcd(etcdcli *clientv3.Client, prefix, key string, timeout time.Duration, ) (DataCollector, error) { - if key == "" { - return NewEtcdAllCollector(etcdcli, prefix, timeout), nil - } - return NewEtcdKeyCollector(etcdcli, prefix, key, timeout), nil + driver := etcd.New(etcdcli) + storage := storage.NewStorage(driver) + + return NewStorage(storage, prefix, timeout, key, etcdStorageType), nil } // NewTarantool creates creates a new tarantool config storage configuration @@ -64,10 +67,10 @@ func (factory collectorsFactory) NewEtcd(etcdcli *clientv3.Client, func (factory collectorsFactory) NewTarantool(conn tarantool.Doer, prefix, key string, timeout time.Duration, ) (DataCollector, error) { - if key == "" { - return NewTarantoolAllCollector(conn, prefix, timeout), nil - } - return NewTarantoolKeyCollector(conn, prefix, key, timeout), nil + driver := tcs.New(dummyDoerWatcher{conn}) + storage := storage.NewStorage(driver) + + return NewStorage(storage, prefix, timeout, key, tcsStorageType), nil } // integrityCollectorsFactory is a type that implements a default CollectorFactory. diff --git a/lib/cluster/datacollector_test.go b/lib/cluster/datacollector_test.go index e8b76dc47..1a7b93add 100644 --- a/lib/cluster/datacollector_test.go +++ b/lib/cluster/datacollector_test.go @@ -7,57 +7,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - clientv3 "go.etcd.io/etcd/client/v3" - - "github.com/tarantool/go-tarantool/v2" "github.com/tarantool/tt/lib/cluster" ) -func TestDataCollectorFactory(t *testing.T) { - etcdcli := &clientv3.Client{} - conn := &tarantool.Connection{} - factory := cluster.NewDataCollectorFactory() - - noErr := func(collector cluster.DataCollector, err error) cluster.DataCollector { - require.NoError(t, err) - return collector - } - - cases := []struct { - Name string - Collector cluster.DataCollector - Expected cluster.DataCollector - }{ - { - Name: "etcd_all", - Collector: noErr(factory.NewEtcd(etcdcli, "foo", "", 1)), - Expected: cluster.NewEtcdAllCollector(etcdcli, "foo", 1), - }, - { - Name: "etcd_key", - Collector: noErr(factory.NewEtcd(etcdcli, "foo", "bar", 2)), - Expected: cluster.NewEtcdKeyCollector(etcdcli, "foo", "bar", 2), - }, - { - Name: "tarantool_all", - Collector: noErr(factory.NewTarantool(conn, "foo", "", 1)), - Expected: cluster.NewTarantoolAllCollector(conn, "foo", 1), - }, - { - Name: "tarantool_key", - Collector: noErr(factory.NewTarantool(conn, "foo", "bar", 2)), - Expected: cluster.NewTarantoolKeyCollector(conn, "foo", "bar", 2), - }, - } - - for _, tc := range cases { - t.Run(tc.Name, func(t *testing.T) { - assert.Equal(t, tc.Expected, tc.Collector) - }) - } -} - func TestDataCollectorFactory_NewFile_not_exist(t *testing.T) { cases := []struct { Name string diff --git a/lib/cluster/datapublisher.go b/lib/cluster/datapublisher.go index 04968e96c..fb6722f13 100644 --- a/lib/cluster/datapublisher.go +++ b/lib/cluster/datapublisher.go @@ -6,6 +6,9 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" + "github.com/tarantool/go-storage" + "github.com/tarantool/go-storage/driver/etcd" + "github.com/tarantool/go-storage/driver/tcs" "github.com/tarantool/go-tarantool/v2" ) @@ -45,20 +48,20 @@ func (factory publishersFactory) NewFile(path string) (DataPublisher, error) { func (factory publishersFactory) NewEtcd(etcdcli *clientv3.Client, prefix, key string, timeout time.Duration, ) (DataPublisher, error) { - if key == "" { - return NewEtcdAllDataPublisher(etcdcli, prefix, timeout), nil - } - return NewEtcdKeyDataPublisher(etcdcli, prefix, key, timeout), nil + driver := etcd.New(etcdcli) + storage := storage.NewStorage(driver) + + return NewStorage(storage, prefix, timeout, key, etcdStorageType), nil } // NewTarantool creates creates a new tarantool config storage data publisher. func (factory publishersFactory) NewTarantool(conn tarantool.Doer, prefix, key string, timeout time.Duration, ) (DataPublisher, error) { - if key == "" { - return NewTarantoolAllDataPublisher(conn, prefix, timeout), nil - } - return NewTarantoolKeyDataPublisher(conn, prefix, key, timeout), nil + driver := tcs.New(dummyDoerWatcher{conn}) + storage := storage.NewStorage(driver) + + return NewStorage(storage, prefix, timeout, key, tcsStorageType), nil } // integrityPublishersFactory is a type that implements a default diff --git a/lib/cluster/datapublisher_test.go b/lib/cluster/datapublisher_test.go index c128b5cfb..d5067431e 100644 --- a/lib/cluster/datapublisher_test.go +++ b/lib/cluster/datapublisher_test.go @@ -4,62 +4,10 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - clientv3 "go.etcd.io/etcd/client/v3" - "github.com/tarantool/go-tarantool/v2" "github.com/tarantool/tt/lib/cluster" ) -func TestDataPublisherFactory(t *testing.T) { - etcdcli := &clientv3.Client{} - conn := &tarantool.Connection{} - factory := cluster.NewDataPublisherFactory() - - noErr := func(publisher cluster.DataPublisher, err error) cluster.DataPublisher { - require.NoError(t, err) - return publisher - } - - cases := []struct { - Name string - Publisher cluster.DataPublisher - Expected cluster.DataPublisher - }{ - { - Name: "file", - Publisher: noErr(factory.NewFile("foo")), - Expected: cluster.NewFileDataPublisher("foo"), - }, - { - Name: "etcd_all", - Publisher: noErr(factory.NewEtcd(etcdcli, "foo", "", 1)), - Expected: cluster.NewEtcdAllDataPublisher(etcdcli, "foo", 1), - }, - { - Name: "etcd_key", - Publisher: noErr(factory.NewEtcd(etcdcli, "foo", "bar", 2)), - Expected: cluster.NewEtcdKeyDataPublisher(etcdcli, "foo", "bar", 2), - }, - { - Name: "tarantool_all", - Publisher: noErr(factory.NewTarantool(conn, "foo", "", 1)), - Expected: cluster.NewTarantoolAllDataPublisher(conn, "foo", 1), - }, - { - Name: "tarantool_key", - Publisher: noErr(factory.NewTarantool(conn, "foo", "bar", 2)), - Expected: cluster.NewTarantoolKeyDataPublisher(conn, "foo", "bar", 2), - }, - } - - for _, tc := range cases { - t.Run(tc.Name, func(t *testing.T) { - assert.Equal(t, tc.Expected, tc.Publisher) - }) - } -} - func TestIntegrityDataPublisherFactory_NewFile(t *testing.T) { factory := cluster.NewIntegrityDataPublisherFactory(nil) publisher, err := factory.NewFile("any") diff --git a/lib/cluster/errors.go b/lib/cluster/errors.go index e1972b2bc..8ae14da14 100644 --- a/lib/cluster/errors.go +++ b/lib/cluster/errors.go @@ -1,6 +1,7 @@ package cluster import ( + "errors" "fmt" ) @@ -26,3 +27,8 @@ func (e CollectEmptyError) Error() string { return fmt.Sprintf("a configuration data not found in %s for prefix %q", e.storage, e.prefix) } + +var ( + errDataMissing = errors.New("data does not exist") + errWrongRevision = errors.New("wrong revision") +) diff --git a/lib/cluster/etcd.go b/lib/cluster/etcd.go index a236a6ec0..c054d86ab 100644 --- a/lib/cluster/etcd.go +++ b/lib/cluster/etcd.go @@ -11,9 +11,10 @@ import ( "strings" "time" + "github.com/tarantool/go-storage" + "github.com/tarantool/go-storage/driver/etcd" "github.com/tarantool/go-tarantool/v2" libconnect "github.com/tarantool/tt/lib/connect" - "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/client/pkg/v3/transport" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -886,77 +887,20 @@ func MakeEtcdOptsFromUriOpts(src libconnect.UriOpts) EtcdOpts { } } -type etcdCSConnection struct { - cli *clientv3.Client -} - func connectEtcdCS(uriOpts libconnect.UriOpts, connOpts ConnectOpts) (CSConnection, error) { cli, err := ConnectEtcdUriOpts(uriOpts, connOpts) if err != nil { return nil, err } - return &etcdCSConnection{ - cli: cli, - }, nil -} - -// Close implements ConnectCStorage interface. -func (c *etcdCSConnection) Close() error { - return c.cli.Close() -} - -// Get implements ConnectCStorage interface. -func (c *etcdCSConnection) Get(ctx context.Context, key string) ([]Data, error) { - resp, err := c.cli.Get(ctx, key) - if err != nil { - return nil, fmt.Errorf("failed to fetch data from etcd: %w", err) - } - switch { - case len(resp.Kvs) == 0: - // It should not happen, but we need to be sure to avoid a null pointer - // dereference. - return nil, fmt.Errorf("a configuration data not found in etcd for key %q", - key) - case len(resp.Kvs) > 1: - return nil, fmt.Errorf("too many responses (%v) from etcd for key %q", - resp.Kvs, key) - } + driver := etcd.New(cli) + storage := storage.NewStorage(driver) - collected := []Data{ - { - Source: string(resp.Kvs[0].Key), - Value: resp.Kvs[0].Value, - Revision: resp.Kvs[0].ModRevision, + return &RawStorage{ + storage: storage, + storageType: etcdStorageType, + close: func() error { + return cli.Close() }, - } - return collected, nil -} - -// Put implements ConnectCStorage interface. -func (c *etcdCSConnection) Put(ctx context.Context, key, value string) error { - _, err := c.cli.Put(ctx, key, value) - return err -} - -// Watch implements ConnectCStorage interface. -func (c *etcdCSConnection) Watch(ctx context.Context, key string) <-chan CSWatchEvent { - ch := make(chan CSWatchEvent) - innerCh := c.cli.Watch(ctx, key) - go func() { - defer close(ch) - - for resp := range innerCh { - for _, ev := range resp.Events { - switch ev.Type { - case mvccpb.PUT: - ch <- CSWatchEvent{ - Key: string(ev.Kv.Value), - Value: ev.Kv.Value, - } - } - } - } - }() - return ch + }, nil } diff --git a/lib/cluster/go.mod b/lib/cluster/go.mod index c92dadd25..59d426b3d 100644 --- a/lib/cluster/go.mod +++ b/lib/cluster/go.mod @@ -5,6 +5,7 @@ go 1.25.7 require ( github.com/mitchellh/mapstructure v1.5.0 github.com/stretchr/testify v1.11.1 + github.com/tarantool/go-storage v1.2.0 github.com/tarantool/go-tarantool/v2 v2.4.2 github.com/tarantool/tt/lib/connect v0.0.0-0 github.com/tarantool/tt/lib/dial v0.0.0-0 @@ -57,7 +58,8 @@ require ( github.com/spf13/cobra v1.9.1 // indirect github.com/spf13/pflag v1.0.6 // indirect github.com/tarantool/go-iproto v1.1.0 // indirect - github.com/tarantool/go-openssl v1.2.1 // indirect + github.com/tarantool/go-openssl v1.2.2 // indirect + github.com/tarantool/go-option v1.0.0 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect go.etcd.io/bbolt v1.4.3 // indirect diff --git a/lib/cluster/go.sum b/lib/cluster/go.sum index 023fdc796..80cbd0b6f 100644 --- a/lib/cluster/go.sum +++ b/lib/cluster/go.sum @@ -43,6 +43,8 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/gojuno/minimock/v3 v3.4.7 h1:vhE5zpniyPDRT0DXd5s3DbtZJVlcbmC5k80izYtj9lY= +github.com/gojuno/minimock/v3 v3.4.7/go.mod h1:QxJk4mdPrVyYUmEZGc2yD2NONpqM/j4dWhsy9twjFHg= github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8= github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -149,8 +151,12 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/tarantool/go-iproto v1.1.0 h1:HULVOIHsiehI+FnHfM7wMDntuzUddO09DKqu2WnFQ5A= github.com/tarantool/go-iproto v1.1.0/go.mod h1:LNCtdyZxojUed8SbOiYHoc3v9NvaZTB7p96hUySMlIo= -github.com/tarantool/go-openssl v1.2.1 h1:WUVTeEPuBAXbrBjvJZ3ynzk/Sv4DL47V/ehWea9czjA= -github.com/tarantool/go-openssl v1.2.1/go.mod h1:EwX1pKIGypLxkY49vKIIR4LTT+94DhKiunCqU2gEzLQ= +github.com/tarantool/go-openssl v1.2.2 h1:GaGMNsa68ZqoNgrMF7Ke4s9+tXGfc5ulfCUm3/Jb7/k= +github.com/tarantool/go-openssl v1.2.2/go.mod h1:EwX1pKIGypLxkY49vKIIR4LTT+94DhKiunCqU2gEzLQ= +github.com/tarantool/go-option v1.0.0 h1:+Etw0i3TjsXvADTo5rfZNCfsXe3BfHOs+iVfIrl0Nlo= +github.com/tarantool/go-option v1.0.0/go.mod h1:lXzzeZtL+rPUtLOCDP6ny3FemFBjruG9aHKzNN2bS08= +github.com/tarantool/go-storage v1.2.0 h1:BxsMZxVw3spD4xXv9Svws2A4NM74v4nE3drUjeN+CGg= +github.com/tarantool/go-storage v1.2.0/go.mod h1:cDp+ffxTLRRn7lhy1q8m9IUIzFD1uUhY2OqWLUzrfCM= github.com/tarantool/go-tarantool/v2 v2.4.2 h1:rkzYtFhLJLA9RDIhjzN93MJBN5PBxHW4+soq+RB90gE= github.com/tarantool/go-tarantool/v2 v2.4.2/go.mod h1:MTbhdjFc3Jl63Lgi/UJr5D+QbT+QegqOzsNJGmaw7VM= github.com/tarantool/go-tlsdialer v1.0.2 h1:TiOkihvC2ufLbOqJcJLuQ9I7W5bsZtmnT7KHF/t8n4s= diff --git a/lib/cluster/integration_test.go b/lib/cluster/integration_test.go index 0a131d15f..81fa968fb 100644 --- a/lib/cluster/integration_test.go +++ b/lib/cluster/integration_test.go @@ -1,5 +1,3 @@ -//go:build integration - package cluster_test import ( @@ -176,6 +174,28 @@ func etcdGet(t *testing.T, etcd *clientv3.Client, key string) ([]byte, int64) { return resp.Kvs[0].Value, resp.Kvs[0].ModRevision } +func newEtcdCollector(t *testing.T, etcd *clientv3.Client, + prefix, key string, timeout time.Duration, +) cluster.DataCollector { + t.Helper() + + collector, err := cluster.NewDataCollectorFactory().NewEtcd(etcd, prefix, key, timeout) + require.NoError(t, err) + + return collector +} + +func newEtcdPublisher(t *testing.T, etcd *clientv3.Client, + prefix, key string, timeout time.Duration, +) cluster.DataPublisher { + t.Helper() + + publisher, err := cluster.NewDataPublisherFactory().NewEtcd(etcd, prefix, key, timeout) + require.NoError(t, err) + + return publisher +} + type connectEtcdOpts struct { ServerOpts etcdOpts ClientOpts cluster.EtcdOpts @@ -309,9 +329,9 @@ func TestEtcdCollectors_single(t *testing.T) { Collector cluster.Collector }{ {"all", cluster.NewYamlCollectorDecorator( - cluster.NewEtcdAllCollector(etcd, "/foo/", timeout))}, + newEtcdCollector(t, etcd, "/foo/", "", timeout))}, {"key", cluster.NewYamlCollectorDecorator( - cluster.NewEtcdKeyCollector(etcd, "/foo/", "bar", timeout))}, + newEtcdCollector(t, etcd, "/foo/", "bar", timeout))}, } for _, tc := range cases { @@ -339,7 +359,7 @@ func TestEtcdAllCollector_merge(t *testing.T) { etcdPut(t, etcd, "/foo/config/b", "foo: car\nzoo: car") config, err := cluster.NewYamlCollectorDecorator( - cluster.NewEtcdAllCollector(etcd, "/foo/", timeout)).Collect() + newEtcdCollector(t, etcd, "/foo/", "", timeout)).Collect() require.NoError(t, err) value, err := config.Get([]string{"foo"}) require.NoError(t, err) @@ -364,9 +384,9 @@ func TestEtcdCollectors_empty(t *testing.T) { Collector cluster.Collector }{ {"all", cluster.NewYamlCollectorDecorator( - cluster.NewEtcdAllCollector(etcd, "/foo/", timeout))}, + newEtcdCollector(t, etcd, "/foo/", "", timeout))}, {"key", cluster.NewYamlCollectorDecorator( - cluster.NewEtcdKeyCollector(etcd, "/foo/", "bar", timeout))}, + newEtcdCollector(t, etcd, "/foo/", "bar", timeout))}, } for _, tc := range cases { @@ -394,8 +414,8 @@ func TestEtcdDataPublishers_Publish_single(t *testing.T) { Key string Publisher cluster.DataPublisher }{ - {"all", "all", cluster.NewEtcdAllDataPublisher(etcd, "/foo/", timeout)}, - {"key", "key", cluster.NewEtcdKeyDataPublisher(etcd, "/foo/", "key", timeout)}, + {"all", "all", newEtcdPublisher(t, etcd, "/foo/", "", timeout)}, + {"key", "key", newEtcdPublisher(t, etcd, "/foo/", "key", timeout)}, } for _, tc := range cases { @@ -426,8 +446,8 @@ func TestEtcdDataPublishers_Publish_rewrite(t *testing.T) { Key string Publisher cluster.DataPublisher }{ - {"all", "all", cluster.NewEtcdAllDataPublisher(etcd, "/foo/", timeout)}, - {"key", "key", cluster.NewEtcdKeyDataPublisher(etcd, "/foo/", "key", timeout)}, + {"all", "all", newEtcdPublisher(t, etcd, "/foo/", "", timeout)}, + {"key", "key", newEtcdPublisher(t, etcd, "/foo/", "key", timeout)}, } for _, tc := range cases { @@ -452,14 +472,14 @@ func TestEtcdAllDataPublisher_Publish_rewrite_prefix(t *testing.T) { require.NotNil(t, etcd) defer etcd.Close() - etcdPut(t, etcd, "/foo/config/", "foo") - etcdPut(t, etcd, "/foo/config/foo", "zoo") + etcdPut(t, etcd, "/foo/config/foo", "foo") + etcdPut(t, etcd, "/foo/config/zoo", "zoo") data := []byte("zoo bar foo") - err = cluster.NewEtcdAllDataPublisher(etcd, "/foo/", timeout).Publish(0, data) + err = newEtcdPublisher(t, etcd, "/foo/", "", timeout).Publish(0, data) require.NoError(t, err) - actual, _ := etcdGet(t, etcd, "/foo/config/") + actual, _ := etcdGet(t, etcd, "/foo/config/zoo") assert.Equal(t, []byte(""), actual) actual, _ = etcdGet(t, etcd, "/foo/config/foo") @@ -484,7 +504,7 @@ func TestEtcdKeyDataPublisher_Publish_modRevision_specified(t *testing.T) { data := []byte("baz") - publisher := cluster.NewEtcdKeyDataPublisher(etcd, "/foo", "key", timeout) + publisher := newEtcdPublisher(t, etcd, "/foo", "key", timeout) // Use wrong revision. err = publisher.Publish(modRevision-1, data) assert.Errorf(t, err, "failed to put data into etcd: wrong revision") @@ -512,7 +532,7 @@ func TestEtcdAllDataPublisher_Publish_ignore_prefix(t *testing.T) { etcdPut(t, etcd, "/foo/config/foo", "zoo") data := []byte("zoo bar foo") - err = cluster.NewEtcdKeyDataPublisher(etcd, "/foo/", "all", timeout).Publish(0, data) + err = newEtcdPublisher(t, etcd, "/foo/", "all", timeout).Publish(0, data) assert.NoError(t, err) @@ -539,10 +559,10 @@ func TestEtcdAllDataPublisher_collect_publish_collect(t *testing.T) { etcdPut(t, etcd, "/foo/config/foo", "zoo: bar") prefix := "/foo/" - dataPublisher := cluster.NewEtcdAllDataPublisher(etcd, prefix, timeout) + dataPublisher := newEtcdPublisher(t, etcd, prefix, "", timeout) publisher := cluster.NewYamlConfigPublisher(dataPublisher) collector := cluster.NewYamlCollectorDecorator( - cluster.NewEtcdAllCollector(etcd, prefix, timeout)) + newEtcdCollector(t, etcd, prefix, "", timeout)) config, err := collector.Collect() require.NoError(t, err) diff --git a/lib/cluster/storage.go b/lib/cluster/storage.go new file mode 100644 index 000000000..59de19d7c --- /dev/null +++ b/lib/cluster/storage.go @@ -0,0 +1,287 @@ +package cluster + +import ( + "context" + "fmt" + "time" + + gstorage "github.com/tarantool/go-storage" + "github.com/tarantool/go-storage/operation" + "github.com/tarantool/go-storage/predicate" + "github.com/tarantool/go-tarantool/v2" +) + +const ( + etcdStorageType = "etcd" + tcsStorageType = "tarantool" +) + +// dummyDoerWatcher implements DoerWatcher without watcher function. +type dummyDoerWatcher struct { + conn tarantool.Doer +} + +// Do performs a request. +func (b dummyDoerWatcher) Do(req tarantool.Request) (fut *tarantool.Future) { + return b.conn.Do(req) +} + +// NewWatcher is just a stub for DoeWatcher. +func (b dummyDoerWatcher) NewWatcher(key string, callback tarantool.WatchCallback) (tarantool.Watcher, error) { + return nil, nil +} + +// IRawStorage is an interface that includes Collector, Publisher and CSConnection interfaces. +type IRawStorage interface { + // Collect collects data from a source. + Collect() ([]Data, error) + // Publish publishes the interface or returns an error. + Publish(revision int64, data []byte) error + // Close closes connection. + Close() error + // Get retrieves value for key. + Get(ctx context.Context, key string) ([]Data, error) + // Put puts a key-value pair into config storage. + Put(ctx context.Context, key, value string) error + // Watch watches on a key and return watched events through the returned channel. + Watch(ctx context.Context, key string) <-chan CSWatchEvent +} + +// RawStorage implements IRawStorage. +type RawStorage struct { + close func() error + storage gstorage.Storage + key string + storageType string + prefix string + timeout time.Duration +} + +// collectByRange collects kvs by prefix. +func (r *RawStorage) collectByRange(ctx context.Context, prefix string) ([]Data, error) { + kvs, err := r.storage.Range(ctx, gstorage.WithPrefix(prefix)) + if err != nil { + return nil, fmt.Errorf("failed to fetch data from %s: %w", r.storageType, err) + } + + if len(kvs) == 0 { + return nil, fmt.Errorf("a configuration data not found in %s for prefix %q", + r.storageType, r.prefix) + } + + data := make([]Data, 0, len(kvs)) + for _, kv := range kvs { + data = append(data, Data{ + Source: string(kv.Key), + Value: kv.Value, + Revision: kv.ModRevision, + }) + } + + return data, nil +} + +// collectByRange collects kvs by key. +func (r *RawStorage) collectByKey(ctx context.Context, key string) ([]Data, error) { + resp, err := r.storage.Tx(ctx).Then(operation.Get([]byte(key))).Commit() + if err != nil { + return nil, fmt.Errorf("failed to fetch data from %s: %w", r.storageType, err) + } + + data := make([]Data, 0, len(resp.Results)) + for _, result := range resp.Results { + for _, kv := range result.Values { + data = append(data, Data{ + Source: string(kv.Key), + Value: kv.Value, + Revision: kv.ModRevision, + }) + } + } + + switch len(data) { + case 0: + return nil, fmt.Errorf("a configuration data not found in %s for key %q", r.storageType, key) + case 1: + return []Data{data[0]}, nil + default: + return nil, fmt.Errorf("too many responses (%v) from etcd for key %q", data, key) + } +} + +// Collect collects values from storage by prefix or key. +func (r RawStorage) Collect() ([]Data, error) { + ctx := context.Background() + + if r.timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), r.timeout) + defer cancel() + } + + switch r.key { + case "": + return r.collectByRange(ctx, getConfigPrefix(r.prefix)) + default: + return r.collectByKey(ctx, getConfigKey(r.prefix, r.key)) + } +} + +// publishByKey put data by specific key. +func (r *RawStorage) publishByKey(ctx context.Context, key string, revision int64, data []byte) error { + if data == nil { + return fmt.Errorf("failed to publish data into %s: %w", r.storageType, errDataMissing) + } + + var predicates []predicate.Predicate + if revision != 0 { + predicates = append(predicates, predicate.VersionEqual([]byte(key), revision)) + } + + txn := r.storage.Tx(ctx) + if predicates != nil { + txn = txn.If(predicates...) + } + resp, err := txn.Then(operation.Put([]byte(key), data)).Commit() + if err != nil { + return fmt.Errorf("failed to publish data into %s: %w", r.storageType, err) + } + if !resp.Succeeded { + return fmt.Errorf("failed to publish data into %s: %w", r.storageType, errWrongRevision) + } + + return nil +} + +// publishByKey put data by prefix. +func (r *RawStorage) publishByRange(ctx context.Context, prefix string, targetKey string, revision int64, data []byte) error { + if data == nil { + return fmt.Errorf("failed to publish data into %s: %w", r.storageType, errDataMissing) + } + + if revision != 0 { + return fmt.Errorf("failed to publish data into %s: target revision %d is not supported", + r.storageType, revision) + } + + for { + // The code tries to put data with the key and remove all other + // data with the prefix. We need to remove all other data with the + // prefix because actually the cluster config could be split + // into several parts with the same prefix. + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + // First of all we need to get all paths with the prefix. + kvs, err := r.storage.Range(ctx, gstorage.WithPrefix(prefix)) + if err != nil { + return fmt.Errorf("failed to fetch data from %s: %w", r.storageType, err) + } + + // Then we need to delete all other paths and put the configuration + // into the target key. We do it in a single transaction to avoid + // concurrent updates and collisions. + + var ( + predicates []predicate.Predicate + ops []operation.Operation + ) + for _, kv := range kvs { + // We need to skip the target key since some storage backends do not + // support delete + put for the same key in a single transaction. + if string(kv.Key) != targetKey { + predicates = append(predicates, + predicate.VersionEqual(kv.Key, kv.ModRevision)) + ops = append(ops, operation.Delete(kv.Key)) + } + } + + // Fill the put part of the transaction. + ops = append(ops, operation.Put([]byte(targetKey), data)) + + txn := r.storage.Tx(ctx) + if len(predicates) > 0 { + txn = txn.If(predicates...) + } + + // And try to execute the transaction. + resp, err := txn.Then(ops...).Commit() + if err != nil { + return fmt.Errorf("failed to publish data into %s: %w", r.storageType, err) + } + if resp.Succeeded { + return nil + } + // Transaction failed due to concurrent modification, retry. + } +} + +// Publish publishes the configuration data into the storage. +// If the collector key is empty, it publishes to all keys with the prefix +// (deleting other keys under the prefix). Otherwise, it publishes to a +// specific key. +func (r RawStorage) Publish(revision int64, data []byte) error { + ctx := context.Background() + + if r.timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), r.timeout) + defer cancel() + } + + switch r.key { + case "": + return r.publishByRange(ctx, getConfigPrefix(r.prefix), getConfigKey(r.prefix, "all"), revision, data) + default: + return r.publishByKey(ctx, getConfigKey(r.prefix, r.key), revision, data) + } +} + +// Close closes a storage driver connection. +func (r *RawStorage) Close() error { + return r.close() +} + +// Get retrieves value for key. +func (r *RawStorage) Get(ctx context.Context, key string) ([]Data, error) { + return r.collectByKey(ctx, key) +} + +// Put puts a key-value pair into config storage. +func (r *RawStorage) Put(ctx context.Context, key, value string) error { + _, err := r.storage.Tx(ctx).Then(operation.Put([]byte(key), []byte(value))).Commit() + return err +} + +// Watch watches on a key and return watched events through the returned channel. +func (r *RawStorage) Watch(ctx context.Context, key string) <-chan CSWatchEvent { + ch := make(chan CSWatchEvent) + innerCh := r.storage.Watch(ctx, []byte(key)) + + go func() { + defer close(ch) + + for resp := range innerCh { + value, _ := r.Get(ctx, string(resp.Prefix)) + ch <- CSWatchEvent{ + Key: key, + Value: value[0].Value, + } + } + }() + return ch +} + +// NewStorage returns RawStorage with specified storageType. +func NewStorage(storage gstorage.Storage, prefix string, timeout time.Duration, key string, storageType string) RawStorage { + return RawStorage{ + storage: storage, + key: key, + storageType: storageType, + prefix: prefix, + timeout: timeout, + } +} diff --git a/lib/cluster/tarantool.go b/lib/cluster/tarantool.go index 68797b383..911257b55 100644 --- a/lib/cluster/tarantool.go +++ b/lib/cluster/tarantool.go @@ -9,6 +9,8 @@ import ( "github.com/mitchellh/mapstructure" + "github.com/tarantool/go-storage" + "github.com/tarantool/go-storage/driver/tcs" "github.com/tarantool/go-tarantool/v2" libconnect "github.com/tarantool/tt/lib/connect" "github.com/tarantool/tt/lib/dial" @@ -604,121 +606,20 @@ func ConnectTarantool(uriOpts libconnect.UriOpts, return conn, nil } -type tarantoolCSConnection struct { - conn tarantool.Connector -} - func connectTarantoolCS(uriOpts libconnect.UriOpts, connOpts ConnectOpts) (CSConnection, error) { conn, err := ConnectTarantool(uriOpts, connOpts) if err != nil { return nil, err } - return &tarantoolCSConnection{ - conn: conn, - }, nil -} - -// Close implements ConnectCStorage interface. -func (c *tarantoolCSConnection) Close() error { - return c.conn.Close() -} - -// Get implements ConnectCStorage interface. -func (c *tarantoolCSConnection) Get(ctx context.Context, key string) ([]Data, error) { - data, err := c.call(ctx, "config.storage.get", []any{key}) - if err != nil { - return nil, fmt.Errorf("failed to fetch data from tarantool: %w", err) - } - if len(data) != 1 { - return nil, fmt.Errorf("unexpected response from tarantool: %q", data) - } - - var resp tarantoolGetResponse - if err := mapstructure.Decode(data[0], &resp); err != nil { - return nil, fmt.Errorf("failed to map response from tarantool: %q", data[0]) - } - switch { - case len(resp.Data) == 0: - return nil, fmt.Errorf("a configuration data not found in tarantool for key %q", - key) - case len(resp.Data) > 1: - // It should not happen, but we need to be sure to avoid a null pointer - // dereference. - return nil, fmt.Errorf("too many responses (%v) from tarantool for key %q", - resp, key) - } + driver := tcs.New(conn) + storage := storage.NewStorage(driver) - return []Data{ - { - Source: key, - Value: []byte(resp.Data[0].Value), - Revision: resp.Data[0].ModRevision, + return &RawStorage{ + storage: storage, + storageType: tcsStorageType, + close: func() error { + return conn.Close() }, - }, err -} - -// Put implements ConnectCStorage interface. -func (c *tarantoolCSConnection) Put(ctx context.Context, key, value string) error { - _, err := c.call(ctx, "config.storage.put", []any{key, value}) - if err != nil { - return fmt.Errorf("failed to put data into tarantool: %w", err) - } - return nil -} - -// Watch implements ConnectCStorage interface. -func (c *tarantoolCSConnection) Watch(ctx context.Context, key string) <-chan CSWatchEvent { - ch := make(chan CSWatchEvent) - - // To watch for config storage key "config.storage:" prefix should be used. - watcher, err := c.conn.NewWatcher("config.storage:"+key, func(ev tarantool.WatchEvent) { - // WatchEvent only contains revision number, so separate request is needed to get data. - if ev.Value != nil { - // NOTE: At the moment watcher is only used to catch the fact that value has been - // changed and it doesn't matter if the value obtained with the subsequent request - // has revision number other than revision that triggered this event, so for now - // no consistency checking is implemented. - value, err := c.Get(ctx, key) - if err == nil { - ch <- CSWatchEvent{ - Key: key, - Value: value[0].Value, - } - } - } - }) - if err != nil { - close(ch) - return ch - } - - go func() { - defer close(ch) - defer watcher.Unregister() - for { - select { - case <-time.After(10 * time.Millisecond): - case <-ctx.Done(): - return - } - } - }() - - return ch -} - -func (c *tarantoolCSConnection) call( - ctx context.Context, - fun string, - args []any, -) ([]any, error) { - req := tarantool.NewCallRequest(fun).Args(args) - - var result []any - if err := c.conn.Do(req.Context(ctx)).GetTyped(&result); err != nil { - return nil, err - } - - return result, nil + }, nil } diff --git a/test/integration/cluster/test_cluster_demote.py b/test/integration/cluster/test_cluster_demote.py index 7bbd16c13..16cee7004 100644 --- a/test/integration/cluster/test_cluster_demote.py +++ b/test/integration/cluster/test_cluster_demote.py @@ -154,12 +154,14 @@ def test_cluster_demote_many_keys( ( "etcd", "⨯ failed to collect cluster config: " - + "failed to fetch data from etcd: etcdserver: user name is empty", + + "failed to fetch data from etcd: failed to execute ops: " + + "transaction failed: etcdserver: user name is empty", ), ( "tcs", - "⨯ failed to collect cluster config: failed to fetch data from tarantool:" - + " Execute access to function 'config.storage.get' is denied for user 'guest'", + "⨯ failed to collect cluster config: failed to fetch data from tarantool: " + + "failed to execute ops: failed to execute transaction: " + + "Execute access to function 'config.storage.txn' is denied for user 'guest'", ), ], ) diff --git a/test/integration/cluster/test_cluster_promote.py b/test/integration/cluster/test_cluster_promote.py index f109e5fa6..efa554878 100644 --- a/test/integration/cluster/test_cluster_promote.py +++ b/test/integration/cluster/test_cluster_promote.py @@ -375,12 +375,14 @@ def test_cluster_promote_key_specified( ( "etcd", "⨯ failed to collect cluster config: " - + "failed to fetch data from etcd: etcdserver: user name is empty", + + "failed to fetch data from etcd: failed to execute ops: " + + "transaction failed: etcdserver: user name is empty", ), ( "tcs", - "⨯ failed to collect cluster config: failed to fetch data from tarantool:" - + " Execute access to function 'config.storage.get' is denied for user 'guest'", + "⨯ failed to collect cluster config: failed to fetch data from tarantool: " + + "failed to execute ops: failed to execute transaction: " + + "Execute access to function 'config.storage.txn' is denied for user 'guest'", ), ], ) diff --git a/test/integration/cluster/test_cluster_publish.py b/test/integration/cluster/test_cluster_publish.py index eebd6fb3a..490634211 100644 --- a/test/integration/cluster/test_cluster_publish.py +++ b/test/integration/cluster/test_cluster_publish.py @@ -710,12 +710,14 @@ def test_cluster_publish_config_key_not_exist(tt_cmd, tmpdir_with_cfg): [ pytest.param( "etcd", - r" ⨯ failed to fetch data from etcd: etcdserver: user name is empty", + r" ⨯ failed to fetch data from etcd: failed to execute ops: " + + "transaction failed: etcdserver: user name is empty", ), pytest.param( "tcs", - r" ⨯ failed to put data into tarantool: Execute access to" - " function 'config.storage.txn' is denied for user", + r" ⨯ failed to fetch data from tarantool: failed to execute ops: " + + "failed to execute transaction: Execute access to function " + + "'config.storage.txn' is denied for user", ), ], ) diff --git a/test/integration/cluster/test_cluster_roles_add.py b/test/integration/cluster/test_cluster_roles_add.py index 410005983..3530ba388 100644 --- a/test/integration/cluster/test_cluster_roles_add.py +++ b/test/integration/cluster/test_cluster_roles_add.py @@ -42,12 +42,14 @@ def test_cluster_rs_roles_add_missing_args(tt_cmd, tmpdir_with_cfg, role_args, e pytest.param( "etcd", r" ⨯ failed to collect cluster config: failed to fetch" - + " data from etcd: etcdserver: user name is empty", + + " data from etcd: failed to execute ops: transaction failed: " + + "etcdserver: user name is empty", ), pytest.param( "tcs", r"⨯ failed to collect cluster config: failed to fetch data" - + " from tarantool: Execute access to function 'config.storage.get'" + + " from tarantool: failed to execute ops: failed to execute " + + "transaction: Execute access to function 'config.storage.txn'" + " is denied for user 'guest'", ), ], diff --git a/test/integration/cluster/test_cluster_roles_remove.py b/test/integration/cluster/test_cluster_roles_remove.py index b72c81a90..12503ad08 100644 --- a/test/integration/cluster/test_cluster_roles_remove.py +++ b/test/integration/cluster/test_cluster_roles_remove.py @@ -45,12 +45,14 @@ def test_cluster_rs_roles_remove_missing_args(tt_cmd, tmpdir_with_cfg, role_args pytest.param( "etcd", r" ⨯ failed to collect cluster config: failed to fetch" - + " data from etcd: etcdserver: user name is empty", + + " data from etcd: failed to execute ops: transaction failed: " + + "etcdserver: user name is empty", ), pytest.param( "tcs", r"⨯ failed to collect cluster config: failed to fetch data" - + " from tarantool: Execute access to function 'config.storage.get'" + + " from tarantool: failed to execute ops: failed to execute " + + "transaction: Execute access to function 'config.storage.txn'" + " is denied for user 'guest'", ), ], diff --git a/test/integration/cluster/test_cluster_show.py b/test/integration/cluster/test_cluster_show.py index aa6fa218d..d241d72e5 100644 --- a/test/integration/cluster/test_cluster_show.py +++ b/test/integration/cluster/test_cluster_show.py @@ -272,7 +272,7 @@ def test_cluster_show_config_no_prefix( expected = ( r" ⨯ failed to collect a configuration: " - + f'a configuration data not found in {storage_name} for prefix "/prefix/config/"' + + f'a configuration data not found in {storage_name} for prefix "/prefix"' ) assert expected in show_output @@ -323,13 +323,15 @@ def test_cluster_show_config_no_key( pytest.param( "etcd", " ⨯ failed to collect a configuration: " - + "failed to fetch data from etcd: etcdserver: user name is empty", + + "failed to fetch data from etcd: failed to execute ops: " + + "transaction failed: etcdserver: user name is empty", ), pytest.param( "tcs", " ⨯ failed to collect a configuration: " - + "failed to fetch data from tarantool: Execute access to function " - + "'config.storage.get' is denied for user", + + "failed to fetch data from tarantool: failed to execute ops: " + + "failed to execute transaction: Execute access to function " + + "'config.storage.txn' is denied for user", ), ], ) diff --git a/test/integration/pack/test_pack.py b/test/integration/pack/test_pack.py index 7f7d126d3..9b628042b 100644 --- a/test/integration/pack/test_pack.py +++ b/test/integration/pack/test_pack.py @@ -1943,7 +1943,7 @@ def test_pack_app_local_tarantool(tt_cmd, tmpdir_with_tarantool, tmp_path): dirs_exist_ok=True, ) - build_cmd = [tt_cmd, "create", "basic", "--name", "app", "--non-interactive"] + build_cmd = [tt_cmd, "create", "single_instance", "--name", "app", "--non-interactive"] tt_process = subprocess.Popen( build_cmd, cwd=tmp_path, diff --git a/test/integration/ttbuild/test_build.py b/test/integration/ttbuild/test_build.py index efff2a0c6..88bacfe82 100644 --- a/test/integration/ttbuild/test_build.py +++ b/test/integration/ttbuild/test_build.py @@ -203,13 +203,15 @@ def test_build_app_local_tarantool(tt_cmd, tmpdir_with_tarantool): app_name = "app1" app_dir = os.path.join(tmpdir_with_tarantool, app_name) - cmd = [tt_cmd, "create", "basic", "--name", app_name, "--non-interactive"] - p = subprocess.run( - cmd, - cwd=tmpdir_with_tarantool, + shutil.copytree( + os.path.join(os.path.dirname(__file__), "apps", app_name), + app_dir, + ) + # Create a symlink in instances.enabled so tt can find the app. + instances_enabled = os.path.join(tmpdir_with_tarantool, "instances.enabled") + os.symlink( + os.path.relpath(app_dir, instances_enabled), os.path.join(instances_enabled, app_name) ) - assert p.returncode == 0 - assert os.path.exists(app_dir) cmd = [tt_cmd, "build", app_name] p = subprocess.run(