Most of federation is written, need to finish udp and test #20
No reviewers
Labels
No labels
consider soon
documentation
good first issue
hacktoberfest
help wanted
priority: blocker
priority: high
priority: low
priority: medium
type: bug
type: discussion
type: improvement
type: new feature
type: refactor
type: task
No milestone
No project
No assignees
1 participant
Notifications
Due date
No due date set.
Dependencies
No dependencies set.
Reference: LBRYCommunity/herald.go#20
Loading…
Add table
Reference in a new issue
No description provided.
Delete branch "feature/6/jeffreypicard/hub-federation"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Also some code reorg. Lots of new potential for metrics that I should also take advantage of. Needs much much more testing.
im having trouble understanding some of federation.go. could you add comments to your functions using the godoc style. or at least to the exported ones.
in general i'd like good comments on exported functions and at least a one-liner on unexported ones (except if they're short and its super obvious whats going on). you should think of comments as part of the API. they explain the contract in a way that just a function name and args list cannot.
also i don't see tests for any of this. im not an expert on testing, but my understanding was that it's good to write some tests first or maybe at the same time as you write the code, and that writing tests afterwards is less valuable. whats your take on tests?
why do servers subscribe to peers? what messages are they listening for?
i don't think we want every hub to subscribe to every other hub
@ -136,0 +157,4 @@
opts := []elastic.ClientOptionFunc{
elastic.SetSniff(true),
elastic.SetSnifferTimeoutStartup(time.Second * 60),
elastic.SetSnifferTimeout(time.Second * 60),
you're starting a bunch of goroutines. do you have a graceful way of shutting them down when the server is shut down? or maybe we don't need one?
@ -0,0 +13,4 @@
const maxBufferSize = 1024
// genesis blocktime (which is actually wrong)
const magic = 1446058291
this needs a comment. whats magic?
@ -136,0 +157,4 @@
opts := []elastic.ClientOptionFunc{
elastic.SetSniff(true),
elastic.SetSnifferTimeoutStartup(time.Second * 60),
elastic.SetSnifferTimeout(time.Second * 60),
You're right, I think these should all have another case in the select on ctx.Done() probably to clean up.
So my idea here is to select the fastest other hub that this hub knows about, and subscribe to it for peer updates. The current getFastestPeer is just a place holder and will be implemented with udp so it doesn't dos other servers trying to connect to them all. So each one will be subscribed to one other.
@ -0,0 +13,4 @@
const maxBufferSize = 1024
// genesis blocktime (which is actually wrong)
const magic = 1446058291
This file definitely needs more comments and tests, but this is basically all lifted from here https://github.com/lbryio/lbry-sdk/blob/master/lbry/wallet/server/udp.py#L12
do you also test for race conditions (
-race
)? this makes tests way slower but its worth doing manually for parts that may have races@ -25,0 +26,4 @@
Name: "peers_known",
Help: "Number of peers we know about.",
})
PeersSubscribed = promauto.NewGauge(prometheus.GaugeOpts{
is this still useful if we're not subscribing to peers anymore?
i think this method is typically called
server.New()
orserver.NewServer()
. i prefer the former since the package is already calledserver
@ -10,1 +10,4 @@
rpc Ping (EmptyMessage) returns (StringValue) {}
rpc Hello (HelloMessage) returns (HelloMessage) {}
rpc AddPeer (ServerMessage) returns (StringValue) {}
rpc PeerSubscribe (ServerMessage) returns (StringValue) {}
we dropped this right?
@ -0,0 +1,201 @@
package server
good idea putting this stuff in a separate file
@ -0,0 +118,4 @@
// subscribeToPeer subscribes us to a peer to we'll get updates about their
// known peers.
func (s *Server) subscribeToPeer(peer *FederatedServer) error {
more subscribing stuff. maybe im confused about what we decided to do with it? i thought you were putting it into a separate branch to keep for later.
@ -108,0 +114,4 @@
s.PeerServersMut.RUnlock()
s.PeerServersMut.Lock()
s.PeerServers[key] = peer
s.PeerServersMut.Unlock()
dont think this is necessary since its the default value
@ -0,0 +12,4 @@
)
const maxBufferSize = 1024
// genesis blocktime (which is actually wrong)
lol
@ -25,0 +26,4 @@
Name: "peers_known",
Help: "Number of peers we know about.",
})
PeersSubscribed = promauto.NewGauge(prometheus.GaugeOpts{
We are still subscribing to peers, we're just not holding the connection open. So this basically just mirrors the number of elements in the subscribed peers internal data structure.
Makes sense, I think I have some deep down avoidance measures against making functions called "new" because of the years of abuse from OO languages I've endured.
@ -10,1 +10,4 @@
rpc Ping (EmptyMessage) returns (StringValue) {}
rpc Hello (HelloMessage) returns (HelloMessage) {}
rpc AddPeer (ServerMessage) returns (StringValue) {}
rpc PeerSubscribe (ServerMessage) returns (StringValue) {}
Nope, we're still subscribing to peers, just not holding the connect open anymore, but we need a way to tell the server who to communicate their updates to. The only other option would be to just use the list of all known peers and broadcast it, but these are TCP connections with probably quite a bit of overhead, so I don't think that would be very scalable.
@ -0,0 +118,4 @@
// subscribeToPeer subscribes us to a peer to we'll get updates about their
// known peers.
func (s *Server) subscribeToPeer(peer *FederatedServer) error {
Nope, just the streaming pieces. We still need to subscribe.
@ -108,0 +114,4 @@
s.PeerServersMut.RUnlock()
s.PeerServersMut.Lock()
s.PeerServers[key] = peer
s.PeerServersMut.Unlock()
I think the compiler complains about it being accessed possibly without being set... but I could just initialize it at declaration. That would simplify this.
@ -0,0 +12,4 @@
)
const maxBufferSize = 1024
// genesis blocktime (which is actually wrong)
This comment was stolen verbatim from the python code btw.
@ -136,0 +157,4 @@
opts := []elastic.ClientOptionFunc{
elastic.SetSniff(true),
elastic.SetSnifferTimeoutStartup(time.Second * 60),
elastic.SetSnifferTimeout(time.Second * 60),
I ultimately got rid of most of these go routines, I was essentially just using the channel as a lock so it could do a bunch of other logic in an asynchronous manner from the endpoint, but after more careful consideration that logic needed to be synchronous and there was no longer any justification for the increased complexity and error prone code that comes with channel, so I'm using a sync.Map for a shared datastructure.
@ -136,0 +157,4 @@
opts := []elastic.ClientOptionFunc{
elastic.SetSniff(true),
elastic.SetSnifferTimeoutStartup(time.Second * 60),
elastic.SetSnifferTimeout(time.Second * 60),
have you read the docs for sync.Map? its kind of an unfortunate name because its not meant to be a generic synchronized map. its meant for a narrower use case
@ -136,0 +157,4 @@
opts := []elastic.ClientOptionFunc{
elastic.SetSniff(true),
elastic.SetSnifferTimeoutStartup(time.Second * 60),
elastic.SetSnifferTimeout(time.Second * 60),
Of course not, I found an API that looked like it fit the use case and barreled on ahead. I'll take a look at that. The other implementation would just be mutex that you lock/unlock manually around a regular map, which isn't difficult either.
Good call, I'll add that. They do pass with it, but I've already removed most of the go routine / channel stuff which is probably where those bugs would be.
@ -136,0 +157,4 @@
opts := []elastic.ClientOptionFunc{
elastic.SetSniff(true),
elastic.SetSnifferTimeoutStartup(time.Second * 60),
elastic.SetSnifferTimeout(time.Second * 60),
yea that's what they want you to do most of the time - roll your own map + mutex. its annoying, but in line with their overall philosophy.
@ -0,0 +119,4 @@
// subscribeToPeer subscribes us to a peer to we'll get updates about their
// known peers.
func (s *Server) subscribeToPeer(peer *FederatedServer) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
does this just return the first peer every time? i'm confused
@ -0,0 +119,4 @@
// subscribeToPeer subscribes us to a peer to we'll get updates about their
// known peers.
func (s *Server) subscribeToPeer(peer *FederatedServer) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
if yes, is this actually getting the fastest peer?
@ -0,0 +119,4 @@
// subscribeToPeer subscribes us to a peer to we'll get updates about their
// known peers.
func (s *Server) subscribeToPeer(peer *FederatedServer) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Yeah, it's just a placeholder right now for two reasons, we're currently subscribing to all the peers anyways and this should be implemented with the udp ping protocol that I haven't finished yet.