Skip to content
Merged
25 changes: 21 additions & 4 deletions protocol/comm/kauri.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@
eventloop.Register(el, func(event WaitTimerExpiredEvent) {
k.onWaitTimerExpired(event)
})
eventloop.Register(el, func(event WaitForConnectedEvent) {
k.onWaitForConnected(event)
})
return k
}

Expand All @@ -84,10 +87,9 @@
func (k *Kauri) begin(p *hotstuff.ProposeMsg, pc hotstuff.PartialCert) error {
if !k.initDone {
// TODO(meling): This is not correct use of DelayUntil, see issue #267
eventloop.DelayUntil[network.ConnectedEvent](k.eventLoop, func() {
if err := k.begin(p, pc); err != nil {
k.logger.Error(err)
}
eventloop.DelayUntil[network.ConnectedEvent](k.eventLoop, WaitForConnectedEvent{
pc: pc,
p: p,
})
return nil
}
Expand Down Expand Up @@ -128,6 +130,16 @@
k.eventLoop.AddEvent(WaitTimerExpiredEvent{currentView: view})
}

// onWaitForConnected is invoked when begin is called before the replica is connected.
func (k *Kauri) onWaitForConnected(event WaitForConnectedEvent) {
k.logger.Debugf("WaitForConnectedEvent: %v", event)
if k.currentView > hotstuff.View(event.p.Block.View()) {
k.logger.Debug("Current view is higher than event view, not starting kauri")
return
}
k.begin(event.p, event.pc)

Check failure on line 140 in protocol/comm/kauri.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `k.begin` is not checked (errcheck)
}

// onContributionRecv is invoked upon receiving the vote for aggregation.
func (k *Kauri) onContributionRecv(event kauri.ContributionRecvEvent) {
if k.currentView != hotstuff.View(event.Contribution.View) {
Expand Down Expand Up @@ -196,4 +208,9 @@
currentView hotstuff.View
}

type WaitForConnectedEvent struct {
pc hotstuff.PartialCert
p *hotstuff.ProposeMsg
}

var _ Communication = (*Kauri)(nil)
Loading