diff --git a/sweep/mock_test.go b/sweep/mock_test.go index e6e254e8e11..6d6f1b1c27c 100644 --- a/sweep/mock_test.go +++ b/sweep/mock_test.go @@ -233,10 +233,13 @@ func (m *MockInputSet) FeeRate() chainfee.SatPerKWeight { // AddWalletInputs adds wallet inputs to the set until a non-dust // change output can be made. Return an error if there are not enough // wallet inputs. -func (m *MockInputSet) AddWalletInputs(wallet Wallet) error { - args := m.Called(wallet) +func (m *MockInputSet) AddWalletInputs(wallet Wallet, + excludeUtxos fn.Set[wire.OutPoint], +) ([]wire.OutPoint, error) { - return args.Error(0) + args := m.Called(wallet, excludeUtxos) + + return args.Get(0).([]wire.OutPoint), args.Error(1) } // NeedWalletInput returns true if the input set needs more wallet diff --git a/sweep/sweeper.go b/sweep/sweeper.go index e2163f6373c..c3f5ab49765 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -1587,40 +1587,55 @@ func (s *UtxoSweeper) sweepPendingInputs(inputs InputsMap) { // Cluster all of our inputs based on the specific Aggregator. sets := s.cfg.Aggregator.ClusterInputs(inputs) - // sweepWithLock is a helper closure that executes the sweep within a - // coin select lock to prevent the coins being selected for other - // transactions like funding of a channel. - sweepWithLock := func(set InputSet) error { - return s.cfg.Wallet.WithCoinSelectLock(func() error { - // Try to add inputs from our wallet. - err := set.AddWalletInputs(s.cfg.Wallet) + // Track wallet UTXOs claimed by previous sets to prevent + // duplicate selection across InputSets. + usedUtxos := fn.NewSet[wire.OutPoint]() + + // sweepWithLock is a helper closure that executes the sweep + // within a coin select lock to prevent the coins being + // selected for other transactions like funding of a channel. + // On success it returns the wallet outpoints that were added + // to the set. + sweepWithLock := func(set InputSet) ([]wire.OutPoint, error) { + var walletOPs []wire.OutPoint + + err := s.cfg.Wallet.WithCoinSelectLock(func() error { + ops, err := set.AddWalletInputs( + s.cfg.Wallet, usedUtxos, + ) if err != nil { return err } - // Create sweeping transaction for each set. - err = s.sweep(set) - if err != nil { - return err - } + walletOPs = ops - return nil + return s.sweep(set) }) + + return walletOPs, err } for _, set := range sets { - var err error + var ( + walletOPs []wire.OutPoint + err error + ) + if set.NeedWalletInput() { - // Sweep the set of inputs that need the wallet inputs. - err = sweepWithLock(set) + walletOPs, err = sweepWithLock(set) } else { - // Sweep the set of inputs that don't need the wallet - // inputs. err = s.sweep(set) } if err != nil { log.Errorf("Failed to sweep %v: %v", set, err) + continue + } + + // Record wallet UTXOs claimed by this set so + // subsequent sets won't reuse them. + for _, op := range walletOPs { + usedUtxos.Add(op) } } } diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index d97fd992504..42a9fe55206 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -644,7 +644,9 @@ func TestSweepPendingInputs(t *testing.T) { // Mock this set to ask for wallet input. setNeedWallet.On("NeedWalletInput").Return(true).Once() - setNeedWallet.On("AddWalletInputs", wallet).Return(nil).Once() + setNeedWallet.On( + "AddWalletInputs", wallet, mock.Anything, + ).Return([]wire.OutPoint(nil), nil).Once() // Mock the wallet to require the lock once. wallet.On("WithCoinSelectLock", mock.Anything).Return(nil).Once() @@ -686,6 +688,96 @@ func TestSweepPendingInputs(t *testing.T) { s.sweepPendingInputs(pis) } +// TestSweepPendingInputsExcludesUsedUtxos verifies that when two +// InputSets both need wallet inputs in the same sweep cycle, the +// second set receives an exclusion set containing the wallet UTXOs +// claimed by the first. +func TestSweepPendingInputsExcludesUsedUtxos(t *testing.T) { + t.Parallel() + + wallet := &MockWallet{} + defer wallet.AssertExpectations(t) + + aggregator := &mockUtxoAggregator{} + defer aggregator.AssertExpectations(t) + + publisher := &MockBumper{} + defer publisher.AssertExpectations(t) + + s := New(&UtxoSweeperConfig{ + Wallet: wallet, + Aggregator: aggregator, + Publisher: publisher, + GenSweepScript: func() fn.Result[lnwallet.AddrWithKey] { + //nolint:ll + return fn.Ok(lnwallet.AddrWithKey{ + DeliveryAddress: testPubKey.SerializeCompressed(), + }) + }, + NoDeadlineConfTarget: uint32(DefaultDeadlineDelta), + }) + s.currentHeight = testHeight + + // A wallet UTXO that the first set will claim. + walletOP := wire.OutPoint{ + Hash: chainhash.Hash{0xaa}, + Index: 0, + } + + // --- First set: needs wallet input, claims walletOP --- + set1 := &MockInputSet{} + defer set1.AssertExpectations(t) + + set1.On("NeedWalletInput").Return(true).Once() + set1.On("AddWalletInputs", wallet, + mock.MatchedBy(func(s fn.Set[wire.OutPoint]) bool { + return s.IsEmpty() + }), + ).Return([]wire.OutPoint{walletOP}, nil).Once() + + set1.On("Inputs").Return(nil).Maybe() + set1.On("DeadlineHeight").Return(testHeight).Once() + set1.On("Budget").Return(btcutil.Amount(1)).Once() + set1.On("StartingFeeRate").Return( + fn.None[chainfee.SatPerKWeight](), + ).Once() + set1.On("Immediate").Return(false).Once() + + // --- Second set: needs wallet input, must see walletOP + // excluded --- + set2 := &MockInputSet{} + defer set2.AssertExpectations(t) + + set2.On("NeedWalletInput").Return(true).Once() + set2.On("AddWalletInputs", wallet, + mock.MatchedBy(func(s fn.Set[wire.OutPoint]) bool { + return s.Contains(walletOP) + }), + ).Return([]wire.OutPoint(nil), nil).Once() + + set2.On("Inputs").Return(nil).Maybe() + set2.On("DeadlineHeight").Return(testHeight).Once() + set2.On("Budget").Return(btcutil.Amount(1)).Once() + set2.On("StartingFeeRate").Return( + fn.None[chainfee.SatPerKWeight](), + ).Once() + set2.On("Immediate").Return(false).Once() + + // Both sets need the coin select lock. + wallet.On( + "WithCoinSelectLock", mock.Anything, + ).Return(nil).Twice() + + pis := make(InputsMap) + aggregator.On("ClusterInputs", pis).Return( + []InputSet{set1, set2}, + ) + + publisher.On("Broadcast", mock.Anything).Return(nil).Twice() + + s.sweepPendingInputs(pis) +} + // TestHandleBumpEventTxFailed checks that the sweeper correctly handles the // case where the bump event tx fails to be published. func TestHandleBumpEventTxFailed(t *testing.T) { diff --git a/sweep/tx_input_set.go b/sweep/tx_input_set.go index 7b533c232f2..4b31cf29a63 100644 --- a/sweep/tx_input_set.go +++ b/sweep/tx_input_set.go @@ -34,10 +34,16 @@ type InputSet interface { // Inputs returns the set of inputs that should be used to create a tx. Inputs() []input.Input - // AddWalletInputs adds wallet inputs to the set until a non-dust - // change output can be made. Return an error if there are not enough - // wallet inputs. - AddWalletInputs(wallet Wallet) error + // AddWalletInputs adds wallet inputs to the set until a + // non-dust change output can be made. Return an error if + // there are not enough wallet inputs. The excludeUtxos set + // contains outpoints already claimed by other InputSets in + // the current sweep cycle and should be skipped. On success + // it returns the outpoints of the wallet UTXOs that were + // added. + AddWalletInputs(wallet Wallet, + excludeUtxos fn.Set[wire.OutPoint], + ) ([]wire.OutPoint, error) // NeedWalletInput returns true if the input set needs more wallet // inputs. @@ -347,55 +353,77 @@ func (b *BudgetInputSet) hasNormalInput() bool { // set to its initial state by removing any wallet inputs added. // // NOTE: must be called with the wallet lock held via `WithCoinSelectLock`. -func (b *BudgetInputSet) AddWalletInputs(wallet Wallet) error { - // Retrieve wallet utxos. Only consider confirmed utxos to prevent - // problems around RBF rules for unconfirmed inputs. This currently - // ignores the configured coin selection strategy. +func (b *BudgetInputSet) AddWalletInputs(wallet Wallet, + excludeUtxos fn.Set[wire.OutPoint], +) ([]wire.OutPoint, error) { + + // Retrieve wallet utxos. Only consider confirmed utxos to + // prevent problems around RBF rules for unconfirmed inputs. + // This currently ignores the configured coin selection + // strategy. utxos, err := wallet.ListUnspentWitnessFromDefaultAccount( 1, math.MaxInt32, ) if err != nil { - return fmt.Errorf("list unspent witness: %w", err) + return nil, fmt.Errorf("list unspent witness: %w", + err) } - // Sort the UTXOs by putting smaller values at the start of the slice - // to avoid locking large UTXO for sweeping. + // Sort the UTXOs by putting smaller values at the start of + // the slice to avoid locking large UTXO for sweeping. // - // TODO(yy): add more choices to CoinSelectionStrategy and use the - // configured value here. + // TODO(yy): add more choices to CoinSelectionStrategy and + // use the configured value here. sort.Slice(utxos, func(i, j int) bool { return utxos[i].Value < utxos[j].Value }) - // Add wallet inputs to the set until the specified budget is covered. + // Add wallet inputs to the set until the specified budget + // is covered. + var addedOPs []wire.OutPoint + for _, utxo := range utxos { + // Skip UTXOs already claimed by another InputSet + // in this sweep cycle. + if excludeUtxos.Contains(utxo.OutPoint) { + log.Debugf("Skipping wallet UTXO %v: "+ + "already used by another "+ + "sweep set", utxo.OutPoint) + + continue + } + err := b.addWalletInput(utxo) if err != nil { - return err + return nil, err } - // Return if we've reached the minimum output amount. + addedOPs = append(addedOPs, utxo.OutPoint) + + // Return if we've reached the minimum output + // amount. if !b.NeedWalletInput() { - return nil + return addedOPs, nil } } // Exit if there are no inputs can contribute to the fees. if !b.hasNormalInput() { - return ErrNotEnoughInputs + return nil, ErrNotEnoughInputs } - // If there's at least one input that can contribute to fees, we allow - // the sweep to continue, even though the full budget can't be met. - // Maybe later more wallet inputs will become available and we can add - // them if needed. + // If there's at least one input that can contribute to fees, + // we allow the sweep to continue, even though the full + // budget can't be met. Maybe later more wallet inputs will + // become available and we can add them if needed. budget := b.Budget() total, spendable := b.inputAmts() - log.Warnf("Not enough wallet UTXOs: need budget=%v, has spendable=%v, "+ - "total=%v, missing at least %v, sweeping anyway...", budget, - spendable, total, budget-spendable) + log.Warnf("Not enough wallet UTXOs: need budget=%v, has "+ + "spendable=%v, total=%v, missing at least %v, "+ + "sweeping anyway...", budget, spendable, total, + budget-spendable) - return nil + return addedOPs, nil } // Budget returns the total budget of the set. diff --git a/sweep/tx_input_set_test.go b/sweep/tx_input_set_test.go index 15982487853..e65c2c57782 100644 --- a/sweep/tx_input_set_test.go +++ b/sweep/tx_input_set_test.go @@ -411,7 +411,7 @@ func TestAddWalletInputsReturnErr(t *testing.T) { // Check that the error is returned from // ListUnspentWitnessFromDefaultAccount. - err := set.AddWalletInputs(wallet) + _, err := set.AddWalletInputs(wallet, fn.NewSet[wire.OutPoint]()) require.ErrorIs(t, err, dummyErr) // Create an utxo with unknown address type to trigger an error. @@ -424,7 +424,7 @@ func TestAddWalletInputsReturnErr(t *testing.T) { min, max).Return([]*lnwallet.Utxo{utxo}, nil).Once() // Check that the error is returned from createWalletTxInput. - err = set.AddWalletInputs(wallet) + _, err = set.AddWalletInputs(wallet, fn.NewSet[wire.OutPoint]()) require.Error(t, err) // Mock the wallet to return empty utxos. @@ -432,7 +432,7 @@ func TestAddWalletInputsReturnErr(t *testing.T) { min, max).Return([]*lnwallet.Utxo{}, nil).Once() // Check that the error is returned from not having wallet inputs. - err = set.AddWalletInputs(wallet) + _, err = set.AddWalletInputs(wallet, fn.NewSet[wire.OutPoint]()) require.ErrorIs(t, err, ErrNotEnoughInputs) } @@ -484,7 +484,7 @@ func TestAddWalletInputsNotEnoughInputs(t *testing.T) { // Add wallet inputs to the input set, which should return no error // although the wallet cannot cover the budget. - err := set.AddWalletInputs(wallet) + _, err := set.AddWalletInputs(wallet, fn.NewSet[wire.OutPoint]()) require.NoError(t, err) // Check that the budget set is updated. @@ -552,7 +552,7 @@ func TestAddWalletInputsEmptyWalletSuccess(t *testing.T) { // Add wallet inputs to the input set, which should return no error // although the wallet is empty. - err := set.AddWalletInputs(wallet) + _, err := set.AddWalletInputs(wallet, fn.NewSet[wire.OutPoint]()) require.NoError(t, err) } @@ -611,7 +611,7 @@ func TestAddWalletInputsSuccess(t *testing.T) { // Add wallet inputs to the input set, which should give us an error as // the wallet cannot cover the budget. - err = set.AddWalletInputs(wallet) + _, err = set.AddWalletInputs(wallet, fn.NewSet[wire.OutPoint]()) require.NoError(t, err) // Check that the budget set is updated. @@ -633,3 +633,94 @@ func TestAddWalletInputsSuccess(t *testing.T) { // Weak check, a strong check is to open the slice and check each item. require.Len(t, set.inputs, 3) } + +// TestAddWalletInputsExcludesUsedUtxos checks that wallet UTXOs +// present in the exclusion set are skipped by AddWalletInputs. +func TestAddWalletInputsExcludesUsedUtxos(t *testing.T) { + t.Parallel() + + wallet := &MockWallet{} + defer wallet.AssertExpectations(t) + + min, max := int32(1), int32(math.MaxInt32) + + const budget = 10_000 + + // Create a mock input that has required outputs (needs + // wallet funding). + mockInput := &input.MockInput{} + mockInput.On("RequiredTxOut").Return(&wire.TxOut{}) + defer mockInput.AssertExpectations(t) + + deadline := int32(1000) + pi := &SweeperInput{ + Input: mockInput, + params: Params{ + Budget: budget, + DeadlineHeight: fn.Some(deadline), + }, + } + + mockInput.On("OutPoint").Return( + wire.OutPoint{Hash: chainhash.Hash{1}}, + ) + mockInput.On("WitnessType").Return( + input.CommitmentAnchor, + ) + + // Create two wallet UTXOs with distinct outpoints. + excludedOP := wire.OutPoint{ + Hash: chainhash.Hash{0xaa}, + Index: 0, + } + availableOP := wire.OutPoint{ + Hash: chainhash.Hash{0xbb}, + Index: 0, + } + + // Give the excluded UTXO a smaller value so it sorts + // first, guaranteeing the skip path is exercised before + // the available UTXO is considered. + excludedUtxo := &lnwallet.Utxo{ + AddressType: lnwallet.WitnessPubKey, + Value: budget / 2, + OutPoint: excludedOP, + } + availableUtxo := &lnwallet.Utxo{ + AddressType: lnwallet.WitnessPubKey, + Value: budget, + OutPoint: availableOP, + } + + // Return both UTXOs from the wallet. + wallet.On( + "ListUnspentWitnessFromDefaultAccount", min, max, + ).Return( + []*lnwallet.Utxo{excludedUtxo, availableUtxo}, nil, + ).Once() + + set, err := NewBudgetInputSet( + []SweeperInput{*pi}, deadline, + fn.None[AuxSweeper](), + ) + require.NoError(t, err) + + // Mark the first UTXO as already used. + usedUtxos := fn.NewSet(excludedOP) + + addedOPs, err := set.AddWalletInputs(wallet, usedUtxos) + require.NoError(t, err) + + // The set should contain the original input plus only the + // available (non-excluded) wallet UTXO. + require.Len(t, set.inputs, 2) + + // Verify the wallet input that was added is the available + // one, not the excluded one. + walletInput := set.inputs[1] + require.Equal(t, availableOP, walletInput.OutPoint()) + + // The returned outpoints should list only the available + // UTXO. + require.Equal(t, []wire.OutPoint{availableOP}, addedOPs) +}