Conversation
|
The latest Buf updates on your PR. Results from workflow Buf / buf (pull_request).
|
| for _, vs := range i.votes[lane].q[n].byHeader { | ||
| if len(vs) >= c.LaneQuorum() { | ||
| return types.NewLaneQC(vs[:c.LaneQuorum()]), true | ||
| } | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for lane := range i.votes { | ||
| lr := commitQC.LaneRange(lane) | ||
| i.votes[lr.Lane()].prune(lr.First()) | ||
| i.blocks[lr.Lane()].prune(lr.First()) | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for lane, bq := range inner.blocks { | ||
| for i := max(bq.first, r.next[lane]); i < bq.next; i++ { | ||
| batch = append(batch, bq.q[i].Msg().Block().Header()) | ||
| } | ||
| r.next[lane] = bq.next | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for _, v := range cv.byHash[h] { | ||
| votes = append(votes, v) | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for _, v := range pv.byHash[h] { | ||
| votes = append(votes, v) | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for _, qc := range m.laneQCs { | ||
| laneQCs = append(laneQCs, qc) | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
|
|
||
| // GenProposal generates a random Proposal. | ||
| func GenProposal(rng utils.Rng) *Proposal { | ||
| return newProposal(GenView(rng), time.Now(), utils.GenSlice(rng, GenLaneRange), utils.Some(GenAppProposal(rng))) |
Check warning
Code scanning / CodeQL
Calling the system time Warning test
| for _, l := range p.laneRanges { | ||
| laneRanges = append(laneRanges, l) | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| pb "github.com/tendermint/tendermint/internal/autobahn/pb" | ||
| protoreflect "google.golang.org/protobuf/reflect/protoreflect" | ||
| protoimpl "google.golang.org/protobuf/runtime/protoimpl" | ||
| reflect "reflect" |
Check notice
Code scanning / CodeQL
Sensitive package import Note
| protoimpl "google.golang.org/protobuf/runtime/protoimpl" | ||
| reflect "reflect" | ||
| sync "sync" | ||
| unsafe "unsafe" |
Check notice
Code scanning / CodeQL
Sensitive package import Note
| } | ||
| time.Sleep(time.Second) | ||
| t.Logf("wait for the blocks used in test") | ||
| utils.OrPanic1(waitForBlock(ctx, primary, 3)) |
There was a problem hiding this comment.
Just curious, why do we need to change this test?
There was a problem hiding this comment.
The test is flaky, it is not related to the autobahn in any sense.
| // Equivalent of `google.protobuf.Timestamp` but supports canonical encoding. | ||
| // See `google.protobuf.Timestamp` for more detailed specification. | ||
| message Timestamp { | ||
| option (hashable.hashable) = true; |
There was a problem hiding this comment.
this is documented on the hashable package.
| } | ||
| } | ||
| } | ||
| // TODO(gprusak): this is counterintuitive asymmetric behavior: |
There was a problem hiding this comment.
Sorry I'm confused, didn't find this in v3 code, Is this migrated from some other repo?
There was a problem hiding this comment.
No this is my own code. I've found a bug when plugging the mux into autobahn, which is a result of this inconsistency.
|
|
||
| // Ping implements pb.StreamAPIServer. | ||
| // Note that we use streaming RPC, because unary RPC apparently causes 10ms extra delay on avg (empirically tested). | ||
| func (x *Service) serverPing(ctx context.Context, server rpc.Server[API]) error { |
There was a problem hiding this comment.
And from here on is from stream/consensus/server.go?
Why don't we keep the original client/server separation?
There was a problem hiding this comment.
the client and server implementation are tightly coupled, given that we use streaming RPCs. However, the way I've distributed the code across the files is kinda arbitrary, we can cleanup it later. A lot of it will need to be moved around anyway.
| "github.com/tendermint/tendermint/internal/p2p/rpc" | ||
| ) | ||
|
|
||
| func (x *Service) serverStreamLaneProposals(ctx context.Context, server rpc.Server[API]) error { |
There was a problem hiding this comment.
So from here on it's stream/consensus/avail/server.go?
| }) | ||
| } | ||
|
|
||
| func (x *Service) clientStreamLaneProposals(ctx context.Context, c rpc.Client[API]) error { |
There was a problem hiding this comment.
nit: If we have to put client and server in the same file, can we always do client first or always do server first maybe?
There was a problem hiding this comment.
sure, this is a work-in-progress layout, definitely will need a cleanup.
| @@ -0,0 +1,33 @@ | |||
| package avail | |||
|
|
|||
There was a problem hiding this comment.
Since queue sounds like a general enough data structure, can you maybe comment why we need it defined in avail/, not at some more generic location?
There was a problem hiding this comment.
my rule of thumb is to write general enough abstractions to avoid tangling logic. The question of visibility (where the code should be located, which packages should have access to it) is a totally separate question. So far this struct was under a lot of code churn, that's why it is in the package that is making use of it.
| @@ -0,0 +1,103 @@ | |||
| package avail | |||
|
|
|||
There was a problem hiding this comment.
I believe this file is new, can you maybe explain a little bit why it's organized this way? How many receiver instances do we plan to have for each kind?
There was a problem hiding this comment.
There is 1 receiver per connection currently. I wanted to separate autobahn from the transport layer. Autobahn api surface is way too large atm (number of receivers, custom receivers for each consensus message type, proper buffer sizes in rpcs, etc), it will require some work to simplify it. Or we can fallback to merge back the transport layer into autobahn (which is currently under p2p/giga). This is TBD.
| for _, v := range cv.byHash[h] { | ||
| votes = append(votes, v) | ||
| } | ||
| cv.qc.Store(utils.Some(types.NewCommitQC(votes))) |
There was a problem hiding this comment.
Why are we changing from cv.qc.Update to cv.qc.Load and cv.qc.Store now?
There was a problem hiding this comment.
thank for reminding me. The AtomicWatch has evolved since I implemented it in sei-v3. Now it is separated into AtomicSend and AtomicRecv and does not provide atomic updated by default - you need to wrap AtomicSend into a mutex - it simplifies the AtomicSend api AND allows AtomicSend to share mutex with other data, which is usually very convenient.
There was a problem hiding this comment.
I've left a bunch of TODOs, because this change introduces some race conditions. Let me address them.
| return types.NewLaneQC(votes) | ||
| } | ||
|
|
||
| func TestCommitQC( |
There was a problem hiding this comment.
Nice, I like that the helpers are all moved here.
| MaxGasPerBlock uint64 | ||
| MaxTxsPerBlock uint64 | ||
| MaxTxsPerSecond utils.Option[uint64] | ||
| MempoolSize uint64 |
There was a problem hiding this comment.
the block production pipeline in giga needs to be designed and implemented. I've copied over the producer for reference.
| ) | ||
|
|
||
| // Sends a consensus message to the peer whenever atomic watch is updated. | ||
| func sendUpdates[T interface { |
There was a problem hiding this comment.
Okay, this one is moved from stream/consensus/client.go?
| if qc.Proposal().Index() < types.NextIndexOpt(i.viewSpec.CommitQC) { | ||
| return nil | ||
| for i := range s.inner.Lock() { | ||
| if qc.Proposal().Index() < types.NextIndexOpt(i.Load().viewSpec.CommitQC) { |
There was a problem hiding this comment.
nit: is this the same check as the one on line 26:
i.viewSpec.View().Index > qc.Proposal().Index()
Would help readability if we use the same expression to show that we are just rechecking the same condition.
| inner := utils.Alloc(utils.NewAtomicSend(inner{})) | ||
| return &State{ | ||
| cfg: cfg, | ||
| // metrics: NewMetrics(), |
There was a problem hiding this comment.
We do plan to add metrics back right?
Migrated consensus,avail,data,producer and types modules. Migrated corresponding tests. Replaced gRPC usage with custom rpc layer on top of mux. GigaRouter is still hardcoded to be disabled.
NOTE: there is a lot of missing parts and coverage, but this pr will allow us to stop development of autobahn in sei-v3, so that we can start making it production-ready in sei-chain.