Ittt #52

Merged
anbsky merged 62 commits from ittt into master 2021-07-24 02:35:22 +02:00
2 changed files with 11 additions and 11 deletions
Showing only changes of commit 2b458a6bd0 - Show all commits

View file

@ -31,7 +31,7 @@ var (
//port configuration //port configuration
tcpPeerPort int tcpPeerPort int
http3PeerPort int http3PeerPort int
httpPort int httpPeerPort int
receiverPort int receiverPort int
metricsPort int metricsPort int
@ -73,7 +73,7 @@ func init() {
cmd.Flags().IntVar(&tcpPeerPort, "tcp-peer-port", 5567, "The port reflector will distribute content from for the TCP (LBRY) protocol") cmd.Flags().IntVar(&tcpPeerPort, "tcp-peer-port", 5567, "The port reflector will distribute content from for the TCP (LBRY) protocol")
cmd.Flags().IntVar(&http3PeerPort, "http3-peer-port", 5568, "The port reflector will distribute content from over HTTP3 protocol") cmd.Flags().IntVar(&http3PeerPort, "http3-peer-port", 5568, "The port reflector will distribute content from over HTTP3 protocol")
cmd.Flags().IntVar(&httpPort, "http-port", 5569, "The port reflector will distribute content from over HTTP protocol") cmd.Flags().IntVar(&httpPeerPort, "http-peer-port", 5569, "The port reflector will distribute content from over HTTP protocol")
cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from") cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from")
cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for prometheus metrics") cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for prometheus metrics")
@ -82,7 +82,7 @@ func init() {
cmd.Flags().BoolVar(&useDB, "use-db", true, "Whether to connect to the reflector db or not") cmd.Flags().BoolVar(&useDB, "use-db", true, "Whether to connect to the reflector db or not")
cmd.Flags().StringVar(&upstreamReflector, "upstream-reflector", "", "host:port of a reflector server where blobs are fetched from") cmd.Flags().StringVar(&upstreamReflector, "upstream-reflector", "", "host:port of a reflector server where blobs are fetched from")
cmd.Flags().StringVar(&upstreamProtocol, "proxy-protocol", "http", "protocol used to fetch blobs from another reflector server (tcp/http3/http)") cmd.Flags().StringVar(&upstreamProtocol, "upstream-protocol", "http", "protocol used to fetch blobs from another upstream reflector server (tcp/http3/http)")
cmd.Flags().IntVar(&requestQueueSize, "request-queue-size", 200, "How many concurrent requests from downstream should be handled at once (the rest will wait)") cmd.Flags().IntVar(&requestQueueSize, "request-queue-size", 200, "How many concurrent requests from downstream should be handled at once (the rest will wait)")
@ -130,7 +130,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
defer http3PeerServer.Shutdown() defer http3PeerServer.Shutdown()
httpServer := http.NewServer(underlyingStoreWithCaches, requestQueueSize) httpServer := http.NewServer(underlyingStoreWithCaches, requestQueueSize)
err = httpServer.Start(":" + strconv.Itoa(httpPort)) err = httpServer.Start(":" + strconv.Itoa(httpPeerPort))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View file

@ -39,12 +39,9 @@ type SdBlob struct {
type trackAccess int type trackAccess int
const ( const (
//TrackAccessNone Don't track accesses TrackAccessNone trackAccess = iota // Don't track accesses
TrackAccessNone trackAccess = iota TrackAccessStreams // Track accesses at the stream level
//TrackAccessStreams Track accesses at the stream level TrackAccessBlobs // Track accesses at the blob level
TrackAccessStreams
//TrackAccessBlobs Track accesses at the blob level
TrackAccessBlobs
) )
// SQL implements the DB interface // SQL implements the DB interface
@ -106,13 +103,14 @@ func (s *SQL) AddBlobs(hash []string) error {
if s.conn == nil { if s.conn == nil {
return errors.Err("not connected") return errors.Err("not connected")
} }
// Split the slice into batches of 20 items.
batch := 10000 batch := 10000
totalBlobs := int64(len(hash)) totalBlobs := int64(len(hash))
work := make(chan []string, 1000) work := make(chan []string, 1000)
stopper := stop.New() stopper := stop.New()
var totalInserted atomic.Int64 var totalInserted atomic.Int64
start := time.Now() start := time.Now()
go func() { go func() {
for i := 0; i < len(hash); i += batch { for i := 0; i < len(hash); i += batch {
j := i + batch j := i + batch
@ -124,6 +122,7 @@ func (s *SQL) AddBlobs(hash []string) error {
log.Infof("done loading %d hashes in the work queue", len(hash)) log.Infof("done loading %d hashes in the work queue", len(hash))
close(work) close(work)
}() }()
for i := 0; i < runtime.NumCPU(); i++ { for i := 0; i < runtime.NumCPU(); i++ {
stopper.Add(1) stopper.Add(1)
go func(worker int) { go func(worker int) {
@ -145,6 +144,7 @@ func (s *SQL) AddBlobs(hash []string) error {
} }
}(i) }(i)
} }
stopper.Wait() stopper.Wait()
return nil return nil
} }