Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions sqldb/v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ import (
)

const (
// DefaultSqliteMaxConns is the default number of maximum open
// connections for SQLite. SQLite only supports a single writer, so a
// low default reduces contention on the busy_timeout and limits
// resource usage.
DefaultSqliteMaxConns = 2

// defaultMaxConns is the number of permitted active and idle
// connections. We want to limit this so it isn't unlimited. We use the
// same value for the number of idle connections as, this can speed up
Expand Down Expand Up @@ -62,6 +68,25 @@ func (s *SqliteConfig) busyTimeoutMs() int64 {
return DefaultSqliteBusyTimeout.Milliseconds()
}

// MaxConns returns the effective maximum number of SQLite connections.
func (s *SqliteConfig) MaxConns() int {
if s.MaxConnections > 0 {
return s.MaxConnections
}

return DefaultSqliteMaxConns
}

// MaxIdleConns returns the effective maximum number of idle SQLite
// connections.
func (s *SqliteConfig) MaxIdleConns() int {
if s.MaxIdleConnections > 0 {
return s.MaxIdleConnections
}

return s.MaxConns()
}

// Validate checks that the SqliteConfig values are valid.
func (p *SqliteConfig) Validate() error {
if err := p.QueryConfig.Validate(true); err != nil {
Expand Down
88 changes: 88 additions & 0 deletions sqldb/v2/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package sqldb

import (
"testing"

"github.com/stretchr/testify/require"
)

// TestSqliteConfigMaxConns verifies that SQLite keeps the low default
// connection limit unless the caller overrides it explicitly.
func TestSqliteConfigMaxConns(t *testing.T) {
t.Parallel()

testCases := []struct {
name string
maxConns int
expectedConn int
}{
{
name: "default limit",
expectedConn: DefaultSqliteMaxConns,
},
{
name: "explicit limit",
maxConns: 7,
expectedConn: 7,
},
}

for _, testCase := range testCases {
testCase := testCase

t.Run(testCase.name, func(t *testing.T) {
t.Parallel()

cfg := &SqliteConfig{
MaxConnections: testCase.maxConns,
}

require.Equal(t, testCase.expectedConn, cfg.MaxConns())
})
}
}

// TestSqliteConfigMaxIdleConns verifies that SQLite defaults its idle
// connections to the open connection limit unless the caller overrides it.
func TestSqliteConfigMaxIdleConns(t *testing.T) {
t.Parallel()

testCases := []struct {
name string
maxConns int
maxIdleConns int
expectedIdleConn int
}{
{
name: "default idle limit",
expectedIdleConn: DefaultSqliteMaxConns,
},
{
name: "inherits explicit open limit",
maxConns: 4,
expectedIdleConn: 4,
},
{
name: "explicit idle limit",
maxConns: 4,
maxIdleConns: 3,
expectedIdleConn: 3,
},
}

for _, testCase := range testCases {
testCase := testCase

t.Run(testCase.name, func(t *testing.T) {
t.Parallel()

cfg := &SqliteConfig{
MaxConnections: testCase.maxConns,
MaxIdleConnections: testCase.maxIdleConns,
}

require.Equal(t, testCase.expectedIdleConn,
cfg.MaxIdleConns())
})
}
}
132 changes: 71 additions & 61 deletions sqldb/v2/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,12 @@ func ReadTxOpt() TxOptions {
}
}

// BaseQuerier is a generic interface that represents the base methods that any
// database backend implementation which uses a Querier for its operations must
// implement.
type BaseQuerier interface {
// Backend returns the type of the database backend used.
Backend() BackendType
}

// BatchedTx is a generic interface that represents the ability to execute
// several operations to a given storage interface in a single atomic
// transaction. Typically, Q here will be some subset of the main sqlc.Querier
// interface allowing it to only depend on the routines it needs to implement
// any additional business logic.
type BatchedTx[Q BaseQuerier] interface {
type BatchedTx[Q any] interface {
// ExecTx will execute the passed txBody, operating upon generic
// parameter Q (usually a storage interface) in a single transaction.
//
Expand Down Expand Up @@ -137,6 +129,9 @@ type BatchedQuerier interface {
// BeginTx creates a new database transaction given the set of
// transaction options.
BeginTx(ctx context.Context, options TxOptions) (*sql.Tx, error)

// Backend returns the type of the database backend used.
Backend() BackendType
}

// txExecutorOptions is a struct that holds the options for the transaction
Expand All @@ -158,12 +153,6 @@ func defaultTxExecutorOptions() *txExecutorOptions {
}
}

// randRetryDelay returns a random retry delay between 0 and the configured max
// delay.
func (t *txExecutorOptions) randRetryDelay() time.Duration {
return time.Duration(rand.Int63n(int64(t.maxRetryDelay))) //nolint:gosec
}

// TxExecutorOption is a functional option that allows us to pass in optional
// argument when creating the executor.
type TxExecutorOption func(*txExecutorOptions)
Expand All @@ -188,18 +177,22 @@ func WithTxRetryDelay(delay time.Duration) TxExecutorOption {
// query a type needs to run under a database transaction, and also the set of
// options for that transaction. The QueryCreator is used to create a query
// given a database transaction created by the BatchedQuerier.
type TransactionExecutor[Query BaseQuerier] struct {
type TransactionExecutor[Query any] struct {
BatchedQuerier

createQuery QueryCreator[Query]

opts *txExecutorOptions
}

// A compile-time assertion to ensure TransactionExecutor satisfies the
// batched transaction interface.
var _ BatchedTx[any] = (*TransactionExecutor[any])(nil)

// NewTransactionExecutor creates a new instance of a TransactionExecutor given
// a Querier query object and a concrete type for the type of transactions the
// Querier understands.
func NewTransactionExecutor[Querier BaseQuerier](db BatchedQuerier,
func NewTransactionExecutor[Querier any](db BatchedQuerier,
createQuery QueryCreator[Querier],
opts ...TxExecutorOption) *TransactionExecutor[Querier] {

Expand All @@ -215,6 +208,11 @@ func NewTransactionExecutor[Querier BaseQuerier](db BatchedQuerier,
}
}

// Backend returns the type of database backend used by the executor.
func (t *TransactionExecutor[Q]) Backend() BackendType {
return t.BatchedQuerier.Backend()
}

// randRetryDelay returns a random retry delay between -50% and +50% of the
// configured delay that is doubled for each attempt and capped at a max value.
func randRetryDelay(initialRetryDelay, maxRetryDelay time.Duration,
Expand Down Expand Up @@ -268,6 +266,55 @@ type RollbackTx func(tx Tx) error
// the delay before the next retry.
type OnBackoff func(retry int, delay time.Duration)

// executeTxAttempt runs a single transaction attempt and reports whether the
// caller should retry it.
func executeTxAttempt(tx Tx, txBody TxBody, rollbackTx RollbackTx,
waitBeforeRetry func(int) bool, attempt int) (bool, error) {

// Rollback is safe to call even if the tx is already closed, so if the tx
// commits successfully, this is a no-op.
defer func() {
_ = tx.Rollback()
}()

if bodyErr := txBody(tx); bodyErr != nil {
log.Tracef("Error in txBody: %v", bodyErr)

// Roll back the transaction, then attempt a random backoff and try
// again if the error was a serialization error.
if err := rollbackTx(tx); err != nil {
return false, MapSQLError(err)
}

dbErr := MapSQLError(bodyErr)
if IsSerializationOrDeadlockError(dbErr) {
return waitBeforeRetry(attempt), dbErr
}

return false, dbErr
}

// Commit transaction.
if commitErr := tx.Commit(); commitErr != nil {
log.Tracef("Failed to commit tx: %v", commitErr)

// Roll back the transaction, then attempt a random backoff and try
// again if the error was a serialization error.
if err := rollbackTx(tx); err != nil {
return false, MapSQLError(err)
}

dbErr := MapSQLError(commitErr)
if IsSerializationOrDeadlockError(dbErr) {
return waitBeforeRetry(attempt), dbErr
}

return false, dbErr
}

return false, nil
}

// ExecuteSQLTransactionWithRetry is a helper function that executes a
// transaction with retry logic. It will retry the transaction if it fails with
// a serialization error. The function will return an error if the transaction
Expand Down Expand Up @@ -316,51 +363,14 @@ func ExecuteSQLTransactionWithRetry(ctx context.Context, makeTx MakeTx,
return dbErr
}

// Rollback is safe to call even if the tx is already closed,
// so if the tx commits successfully, this is a no-op.
defer func() {
_ = tx.Rollback()
}()

if bodyErr := txBody(tx); bodyErr != nil {
log.Tracef("Error in txBody: %v", bodyErr)

// Roll back the transaction, then attempt a random
// backoff and try again if the error was a
// serialization error.
if err := rollbackTx(tx); err != nil {
return MapSQLError(err)
}

dbErr := MapSQLError(bodyErr)
if IsSerializationOrDeadlockError(dbErr) {
if waitBeforeRetry(i) {
continue
}
}

return dbErr
retry, err := executeTxAttempt(
tx, txBody, rollbackTx, waitBeforeRetry, i,
)
if retry {
continue
}

// Commit transaction.
if commitErr := tx.Commit(); commitErr != nil {
log.Tracef("Failed to commit tx: %v", commitErr)

// Roll back the transaction, then attempt a random
// backoff and try again if the error was a
// serialization error.
if err := rollbackTx(tx); err != nil {
return MapSQLError(err)
}

dbErr := MapSQLError(commitErr)
if IsSerializationOrDeadlockError(dbErr) {
if waitBeforeRetry(i) {
continue
}
}

return dbErr
if err != nil {
return err
}

return nil
Expand Down
48 changes: 48 additions & 0 deletions sqldb/v2/interfaces_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package sqldb

import (
"context"
"database/sql"
"testing"

"github.com/stretchr/testify/require"
)

// testQuerier is a minimal query wrapper used to instantiate the generic
// transaction executor in tests.
type testQuerier struct {
}

// testBatchedQuerier is a minimal BatchedQuerier implementation used to verify
// that TransactionExecutor forwards backend identity.
type testBatchedQuerier struct {
backend BackendType
}

// BeginTx is a stub implementation used to satisfy the BatchedQuerier
// interface in tests.
func (t testBatchedQuerier) BeginTx(context.Context,
TxOptions) (*sql.Tx, error) {

return nil, nil
}

// Backend returns the backend type used by the test batched querier.
func (t testBatchedQuerier) Backend() BackendType {
return t.backend
}

// TestTransactionExecutorBackend verifies that the executor forwards the
// backend type from its batched querier.
func TestTransactionExecutorBackend(t *testing.T) {
t.Parallel()

executor := NewTransactionExecutor[testQuerier](
testBatchedQuerier{backend: BackendTypePostgres},
func(*sql.Tx) testQuerier {
return testQuerier{}
},
)

require.Equal(t, BackendTypePostgres, executor.Backend())
}
Loading
Loading