Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ require (
github.com/spf13/pflag v1.0.10 // indirect
github.com/tarantool/go-iproto v1.1.0 // indirect
github.com/tarantool/go-openssl v1.2.2 // indirect
github.com/tarantool/go-option v1.0.0 // indirect
github.com/tarantool/go-storage v1.2.0 // indirect
github.com/tarantool/go-tlsdialer v1.0.2 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/gojuno/minimock/v3 v3.4.7 h1:vhE5zpniyPDRT0DXd5s3DbtZJVlcbmC5k80izYtj9lY=
github.com/gojuno/minimock/v3 v3.4.7/go.mod h1:QxJk4mdPrVyYUmEZGc2yD2NONpqM/j4dWhsy9twjFHg=
github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo=
github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
Expand Down Expand Up @@ -288,8 +290,12 @@ github.com/tarantool/go-iproto v1.1.0/go.mod h1:LNCtdyZxojUed8SbOiYHoc3v9NvaZTB7
github.com/tarantool/go-openssl v0.0.8-0.20230307065445-720eeb389195/go.mod h1:M7H4xYSbzqpW/ZRBMyH0eyqQBsnhAMfsYk5mv0yid7A=
github.com/tarantool/go-openssl v1.2.2 h1:GaGMNsa68ZqoNgrMF7Ke4s9+tXGfc5ulfCUm3/Jb7/k=
github.com/tarantool/go-openssl v1.2.2/go.mod h1:EwX1pKIGypLxkY49vKIIR4LTT+94DhKiunCqU2gEzLQ=
github.com/tarantool/go-option v1.0.0 h1:+Etw0i3TjsXvADTo5rfZNCfsXe3BfHOs+iVfIrl0Nlo=
github.com/tarantool/go-option v1.0.0/go.mod h1:lXzzeZtL+rPUtLOCDP6ny3FemFBjruG9aHKzNN2bS08=
github.com/tarantool/go-prompt v1.0.1 h1:88Yer6gCFylqGRrdWwikNFVbklRQsqKF7mycvGdDcj0=
github.com/tarantool/go-prompt v1.0.1/go.mod h1:9Vuvi60Bk+3yaXqgYaXNTpLbwPPaaEOeaUgpFW1jqTU=
github.com/tarantool/go-storage v1.2.0 h1:BxsMZxVw3spD4xXv9Svws2A4NM74v4nE3drUjeN+CGg=
github.com/tarantool/go-storage v1.2.0/go.mod h1:cDp+ffxTLRRn7lhy1q8m9IUIzFD1uUhY2OqWLUzrfCM=
github.com/tarantool/go-tarantool v1.12.3 h1:GXabowmrTSW225xFEjX4t+8PlccVDCeGB5OM1VLbBXE=
github.com/tarantool/go-tarantool v1.12.3/go.mod h1:QRiXv0jnxwgxHtr9ZmifSr/eRba76gTUBgp69pDMX1U=
github.com/tarantool/go-tarantool/v2 v2.4.2 h1:rkzYtFhLJLA9RDIhjzN93MJBN5PBxHW4+soq+RB90gE=
Expand Down
19 changes: 11 additions & 8 deletions lib/cluster/datacollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package cluster
import (
"time"

"github.com/tarantool/go-storage"
"github.com/tarantool/go-storage/driver/etcd"
"github.com/tarantool/go-storage/driver/tcs"
"github.com/tarantool/go-tarantool/v2"
clientv3 "go.etcd.io/etcd/client/v3"
)
Expand Down Expand Up @@ -53,21 +56,21 @@ func (factory collectorsFactory) NewFile(path string) (DataCollector, error) {
func (factory collectorsFactory) NewEtcd(etcdcli *clientv3.Client,
prefix, key string, timeout time.Duration,
) (DataCollector, error) {
if key == "" {
return NewEtcdAllCollector(etcdcli, prefix, timeout), nil
}
return NewEtcdKeyCollector(etcdcli, prefix, key, timeout), nil
driver := etcd.New(etcdcli)
storage := storage.NewStorage(driver)

return NewStorage(storage, prefix, timeout, key, etcdStorageType), nil
}

// NewTarantool creates creates a new tarantool config storage configuration
// collector.
func (factory collectorsFactory) NewTarantool(conn tarantool.Doer,
prefix, key string, timeout time.Duration,
) (DataCollector, error) {
if key == "" {
return NewTarantoolAllCollector(conn, prefix, timeout), nil
}
return NewTarantoolKeyCollector(conn, prefix, key, timeout), nil
driver := tcs.New(dummyDoerWatcher{conn})
storage := storage.NewStorage(driver)

return NewStorage(storage, prefix, timeout, key, tcsStorageType), nil
}

// integrityCollectorsFactory is a type that implements a default CollectorFactory.
Expand Down
47 changes: 0 additions & 47 deletions lib/cluster/datacollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,57 +7,10 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/tarantool/go-tarantool/v2"

"github.com/tarantool/tt/lib/cluster"
)

func TestDataCollectorFactory(t *testing.T) {
etcdcli := &clientv3.Client{}
conn := &tarantool.Connection{}
factory := cluster.NewDataCollectorFactory()

noErr := func(collector cluster.DataCollector, err error) cluster.DataCollector {
require.NoError(t, err)
return collector
}

cases := []struct {
Name string
Collector cluster.DataCollector
Expected cluster.DataCollector
}{
{
Name: "etcd_all",
Collector: noErr(factory.NewEtcd(etcdcli, "foo", "", 1)),
Expected: cluster.NewEtcdAllCollector(etcdcli, "foo", 1),
},
{
Name: "etcd_key",
Collector: noErr(factory.NewEtcd(etcdcli, "foo", "bar", 2)),
Expected: cluster.NewEtcdKeyCollector(etcdcli, "foo", "bar", 2),
},
{
Name: "tarantool_all",
Collector: noErr(factory.NewTarantool(conn, "foo", "", 1)),
Expected: cluster.NewTarantoolAllCollector(conn, "foo", 1),
},
{
Name: "tarantool_key",
Collector: noErr(factory.NewTarantool(conn, "foo", "bar", 2)),
Expected: cluster.NewTarantoolKeyCollector(conn, "foo", "bar", 2),
},
}

for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
assert.Equal(t, tc.Expected, tc.Collector)
})
}
}

func TestDataCollectorFactory_NewFile_not_exist(t *testing.T) {
cases := []struct {
Name string
Expand Down
19 changes: 11 additions & 8 deletions lib/cluster/datapublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (

clientv3 "go.etcd.io/etcd/client/v3"

"github.com/tarantool/go-storage"
"github.com/tarantool/go-storage/driver/etcd"
"github.com/tarantool/go-storage/driver/tcs"
"github.com/tarantool/go-tarantool/v2"
)

Expand Down Expand Up @@ -45,20 +48,20 @@ func (factory publishersFactory) NewFile(path string) (DataPublisher, error) {
func (factory publishersFactory) NewEtcd(etcdcli *clientv3.Client,
prefix, key string, timeout time.Duration,
) (DataPublisher, error) {
if key == "" {
return NewEtcdAllDataPublisher(etcdcli, prefix, timeout), nil
}
return NewEtcdKeyDataPublisher(etcdcli, prefix, key, timeout), nil
driver := etcd.New(etcdcli)
storage := storage.NewStorage(driver)

return NewStorage(storage, prefix, timeout, key, etcdStorageType), nil
}

// NewTarantool creates creates a new tarantool config storage data publisher.
func (factory publishersFactory) NewTarantool(conn tarantool.Doer,
prefix, key string, timeout time.Duration,
) (DataPublisher, error) {
if key == "" {
return NewTarantoolAllDataPublisher(conn, prefix, timeout), nil
}
return NewTarantoolKeyDataPublisher(conn, prefix, key, timeout), nil
driver := tcs.New(dummyDoerWatcher{conn})
storage := storage.NewStorage(driver)

return NewStorage(storage, prefix, timeout, key, tcsStorageType), nil
}

// integrityPublishersFactory is a type that implements a default
Expand Down
52 changes: 0 additions & 52 deletions lib/cluster/datapublisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,62 +4,10 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/tt/lib/cluster"
)

func TestDataPublisherFactory(t *testing.T) {
etcdcli := &clientv3.Client{}
conn := &tarantool.Connection{}
factory := cluster.NewDataPublisherFactory()

noErr := func(publisher cluster.DataPublisher, err error) cluster.DataPublisher {
require.NoError(t, err)
return publisher
}

cases := []struct {
Name string
Publisher cluster.DataPublisher
Expected cluster.DataPublisher
}{
{
Name: "file",
Publisher: noErr(factory.NewFile("foo")),
Expected: cluster.NewFileDataPublisher("foo"),
},
{
Name: "etcd_all",
Publisher: noErr(factory.NewEtcd(etcdcli, "foo", "", 1)),
Expected: cluster.NewEtcdAllDataPublisher(etcdcli, "foo", 1),
},
{
Name: "etcd_key",
Publisher: noErr(factory.NewEtcd(etcdcli, "foo", "bar", 2)),
Expected: cluster.NewEtcdKeyDataPublisher(etcdcli, "foo", "bar", 2),
},
{
Name: "tarantool_all",
Publisher: noErr(factory.NewTarantool(conn, "foo", "", 1)),
Expected: cluster.NewTarantoolAllDataPublisher(conn, "foo", 1),
},
{
Name: "tarantool_key",
Publisher: noErr(factory.NewTarantool(conn, "foo", "bar", 2)),
Expected: cluster.NewTarantoolKeyDataPublisher(conn, "foo", "bar", 2),
},
}

for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
assert.Equal(t, tc.Expected, tc.Publisher)
})
}
}

func TestIntegrityDataPublisherFactory_NewFile(t *testing.T) {
factory := cluster.NewIntegrityDataPublisherFactory(nil)
publisher, err := factory.NewFile("any")
Expand Down
6 changes: 6 additions & 0 deletions lib/cluster/errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cluster

import (
"errors"
"fmt"
)

Expand All @@ -26,3 +27,8 @@ func (e CollectEmptyError) Error() string {
return fmt.Sprintf("a configuration data not found in %s for prefix %q",
e.storage, e.prefix)
}

var (
errDataMissing = errors.New("data does not exist")
errWrongRevision = errors.New("wrong revision")
)
76 changes: 10 additions & 66 deletions lib/cluster/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import (
"strings"
"time"

"github.com/tarantool/go-storage"
"github.com/tarantool/go-storage/driver/etcd"
"github.com/tarantool/go-tarantool/v2"
libconnect "github.com/tarantool/tt/lib/connect"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand Down Expand Up @@ -886,77 +887,20 @@ func MakeEtcdOptsFromUriOpts(src libconnect.UriOpts) EtcdOpts {
}
}

type etcdCSConnection struct {
cli *clientv3.Client
}

func connectEtcdCS(uriOpts libconnect.UriOpts, connOpts ConnectOpts) (CSConnection, error) {
cli, err := ConnectEtcdUriOpts(uriOpts, connOpts)
if err != nil {
return nil, err
}
return &etcdCSConnection{
cli: cli,
}, nil
}

// Close implements ConnectCStorage interface.
func (c *etcdCSConnection) Close() error {
return c.cli.Close()
}

// Get implements ConnectCStorage interface.
func (c *etcdCSConnection) Get(ctx context.Context, key string) ([]Data, error) {
resp, err := c.cli.Get(ctx, key)
if err != nil {
return nil, fmt.Errorf("failed to fetch data from etcd: %w", err)
}

switch {
case len(resp.Kvs) == 0:
// It should not happen, but we need to be sure to avoid a null pointer
// dereference.
return nil, fmt.Errorf("a configuration data not found in etcd for key %q",
key)
case len(resp.Kvs) > 1:
return nil, fmt.Errorf("too many responses (%v) from etcd for key %q",
resp.Kvs, key)
}
driver := etcd.New(cli)
storage := storage.NewStorage(driver)

collected := []Data{
{
Source: string(resp.Kvs[0].Key),
Value: resp.Kvs[0].Value,
Revision: resp.Kvs[0].ModRevision,
return &RawStorage{
storage: storage,
storageType: etcdStorageType,
close: func() error {
return cli.Close()
},
}
return collected, nil
}

// Put implements ConnectCStorage interface.
func (c *etcdCSConnection) Put(ctx context.Context, key, value string) error {
_, err := c.cli.Put(ctx, key, value)
return err
}

// Watch implements ConnectCStorage interface.
func (c *etcdCSConnection) Watch(ctx context.Context, key string) <-chan CSWatchEvent {
ch := make(chan CSWatchEvent)
innerCh := c.cli.Watch(ctx, key)
go func() {
defer close(ch)

for resp := range innerCh {
for _, ev := range resp.Events {
switch ev.Type {
case mvccpb.PUT:
ch <- CSWatchEvent{
Key: string(ev.Kv.Value),
Value: ev.Kv.Value,
}
}
}
}
}()
return ch
}, nil
}
4 changes: 3 additions & 1 deletion lib/cluster/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.25.7
require (
github.com/mitchellh/mapstructure v1.5.0
github.com/stretchr/testify v1.11.1
github.com/tarantool/go-storage v1.2.0
github.com/tarantool/go-tarantool/v2 v2.4.2
github.com/tarantool/tt/lib/connect v0.0.0-0
github.com/tarantool/tt/lib/dial v0.0.0-0
Expand Down Expand Up @@ -57,7 +58,8 @@ require (
github.com/spf13/cobra v1.9.1 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/tarantool/go-iproto v1.1.0 // indirect
github.com/tarantool/go-openssl v1.2.1 // indirect
github.com/tarantool/go-openssl v1.2.2 // indirect
github.com/tarantool/go-option v1.0.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/bbolt v1.4.3 // indirect
Expand Down
10 changes: 8 additions & 2 deletions lib/cluster/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/gojuno/minimock/v3 v3.4.7 h1:vhE5zpniyPDRT0DXd5s3DbtZJVlcbmC5k80izYtj9lY=
github.com/gojuno/minimock/v3 v3.4.7/go.mod h1:QxJk4mdPrVyYUmEZGc2yD2NONpqM/j4dWhsy9twjFHg=
github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8=
github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
Expand Down Expand Up @@ -149,8 +151,12 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/tarantool/go-iproto v1.1.0 h1:HULVOIHsiehI+FnHfM7wMDntuzUddO09DKqu2WnFQ5A=
github.com/tarantool/go-iproto v1.1.0/go.mod h1:LNCtdyZxojUed8SbOiYHoc3v9NvaZTB7p96hUySMlIo=
github.com/tarantool/go-openssl v1.2.1 h1:WUVTeEPuBAXbrBjvJZ3ynzk/Sv4DL47V/ehWea9czjA=
github.com/tarantool/go-openssl v1.2.1/go.mod h1:EwX1pKIGypLxkY49vKIIR4LTT+94DhKiunCqU2gEzLQ=
github.com/tarantool/go-openssl v1.2.2 h1:GaGMNsa68ZqoNgrMF7Ke4s9+tXGfc5ulfCUm3/Jb7/k=
github.com/tarantool/go-openssl v1.2.2/go.mod h1:EwX1pKIGypLxkY49vKIIR4LTT+94DhKiunCqU2gEzLQ=
github.com/tarantool/go-option v1.0.0 h1:+Etw0i3TjsXvADTo5rfZNCfsXe3BfHOs+iVfIrl0Nlo=
github.com/tarantool/go-option v1.0.0/go.mod h1:lXzzeZtL+rPUtLOCDP6ny3FemFBjruG9aHKzNN2bS08=
github.com/tarantool/go-storage v1.2.0 h1:BxsMZxVw3spD4xXv9Svws2A4NM74v4nE3drUjeN+CGg=
github.com/tarantool/go-storage v1.2.0/go.mod h1:cDp+ffxTLRRn7lhy1q8m9IUIzFD1uUhY2OqWLUzrfCM=
github.com/tarantool/go-tarantool/v2 v2.4.2 h1:rkzYtFhLJLA9RDIhjzN93MJBN5PBxHW4+soq+RB90gE=
github.com/tarantool/go-tarantool/v2 v2.4.2/go.mod h1:MTbhdjFc3Jl63Lgi/UJr5D+QbT+QegqOzsNJGmaw7VM=
github.com/tarantool/go-tlsdialer v1.0.2 h1:TiOkihvC2ufLbOqJcJLuQ9I7W5bsZtmnT7KHF/t8n4s=
Expand Down
Loading
Loading