From fd0e5c58c295fd181ec884ba39e7b9d263d5375f Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Wed, 21 Sep 2022 09:21:13 +0000 Subject: [PATCH 1/8] integration testing scripts some scripts for integration testing and a docker file for an action. Still need to figure out how to properly run a more realistic version in ci. --- docker/Dockerfile.action.integration | 13 ++ scripts/cicd_integration_test_runner.sh | 9 + scripts/integration_tests.sh | 208 ++++++++++++++++++++++++ server/jsonrpc_blockchain.go | 22 +++ 4 files changed, 252 insertions(+) create mode 100644 docker/Dockerfile.action.integration create mode 100644 scripts/cicd_integration_test_runner.sh create mode 100755 scripts/integration_tests.sh diff --git a/docker/Dockerfile.action.integration b/docker/Dockerfile.action.integration new file mode 100644 index 0000000..a846493 --- /dev/null +++ b/docker/Dockerfile.action.integration @@ -0,0 +1,13 @@ +FROM jeffreypicard/hub-github-env:dev + +COPY scripts/integration_tests.sh /integration_tests.sh +COPY scripts/cicd_integration_test_runner.sh /cicd_integration_test_runner.sh +COPY herald /herald + +RUN apt install -y jq curl + +ENV CGO_LDFLAGS "-L/usr/local/lib -lrocksdb -lstdc++ -lm -lz -lsnappy -llz4 -lzstd" +ENV CGO_CFLAGS "-I/usr/local/include/rocksdb" +ENV LD_LIBRARY_PATH /usr/local/lib + +ENTRYPOINT ["/cicd_integration_test_runner.sh"] diff --git a/scripts/cicd_integration_test_runner.sh b/scripts/cicd_integration_test_runner.sh new file mode 100644 index 0000000..7df0337 --- /dev/null +++ b/scripts/cicd_integration_test_runner.sh @@ -0,0 +1,9 @@ +#!/bin/bash +# +# cicd_integration_test_runner.sh +# +# simple script to kick off herald and call the integration testing +# script +# + + diff --git a/scripts/integration_tests.sh b/scripts/integration_tests.sh new file mode 100755 index 0000000..382e4db --- /dev/null +++ b/scripts/integration_tests.sh @@ -0,0 +1,208 @@ +#!/bin/bash +# +# integration_testing.sh +# +# GitHub Action CI/CD based integration tests for herald.go +# These are smoke / sanity tests for the server behaving correctly on a "live" +# system, and looks for reasonable response codes, not specific correct +# behavior. Those are covered in unit tests. +# +# N.B. +# For the curl based json tests the `id` field existing is needed. +# + +# global variables + +RES=(0) +FINALRES=0 +CHUNK_TEST_RES="010000000000000000000000000000000000000000000000000000000000000000000000cc59e59ff97ac092b55e423aa549" + +# functions + + +function logical_or { + for res in ${RES[@]}; do + if [ $res -eq 1 -o $FINALRES -eq 1 ]; then + FINALRES=1 + return + fi + done +} + +function want_got { + if [ "${WANT}" != "${GOT}" ]; then + echo "WANT: ${WANT}" + echo "GOT: ${GOT}" + RES+=(1) + else + RES+=(0) + fi +} + +function want_greater { + if [ "${WANT}" -gt "${GOT}" ]; then + echo "WANT: ${WANT}" + echo "GOT: ${GOT}" + RES+=(1) + else + RES+=(0) + fi +} + +function test_command_with_want { + echo $CMD + GOT=`eval $CMD` + + want_got +} + +# grpc endpoint testing + + +read -r -d '' CMD <<- EOM + grpcurl -plaintext -d '{"value": ["@Styxhexenhammer666:2"]}' 127.0.0.1:50051 pb.Hub.Resolve + | jq .txos[0].txHash | sed 's/"//g' +EOM +WANT="VOFP8MQEwps9Oa5NJJQ18WfVzUzlpCjst0Wz3xyOPd4=" +test_command_with_want + +# GOT=`eval $CMD` + +#want_got + +## +## N.B. This is a degenerate case that takes a long time to run. +## The runtime should be fixed, but in the meantime, we definitely should +## ensure this behaves as expected. +## +## TODO: Test runtime doesn't exceed worst case. +## + +#WANT=806389 +#read -r -d '' CMD <<- EOM +# grpcurl -plaintext -d '{"value": ["foo"]}' 127.0.0.1:50051 pb.Hub.Resolve | jq .txos[0].height +#EOM +# test_command_with_want + +# json rpc endpoint testing + +## blockchain.block + +### blockchain.block.get_chunk +read -r -d '' CMD <<- EOM + curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + --data '{"id": 1, "method": "blockchain.block.get_chunk", "params": [0]}' + | jq .result | sed 's/"//g' | head -c 100 +EOM +WANT="${CHUNK_TEST_RES}" +test_command_with_want + +### blockchain.block.get_header +read -r -d '' CMD <<- EOM + curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + --data '{"id": 1, "method": "blockchain.block.get_header", "params": []}' + | jq .result.timestamp +EOM +WANT=1446058291 +test_command_with_want + +### blockchain.block.headers +read -r -d '' CMD <<- EOM + curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + --data '{"id": 1, "method": "blockchain.block.headers", "params": []}' + | jq .result.count +EOM +WANT=0 +test_command_with_want + +## blockchain.claimtrie + +read -r -d '' CMD <<- EOM + curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + --data '{"id": 1, "method": "blockchain.claimtrie.resolve", "params":[{"Data": ["@Styxhexenhammer666:2"]}]}' + | jq .result.txos[0].tx_hash | sed 's/"//g' +EOM +WANT="VOFP8MQEwps9Oa5NJJQ18WfVzUzlpCjst0Wz3xyOPd4=" +test_command_with_want + +## blockchain.address + +### blockchain.address.get_balance + +read -r -d '' CMD <<- EOM + curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + --data '{"id": 1, "method": "blockchain.address.get_balance", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}' + | jq .result.confirmed +EOM +WANT=44415602186 +test_command_with_want + +## blockchain.address.get_history + +read -r -d '' CMD <<- EOM + curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + --data '{"id": 1, "method": "blockchain.address.get_history", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}' + | jq '.result.confirmed | length' +EOM +WANT=82 +test_command_with_want + +## blockchain.address.listunspent + +read -r -d '' CMD <<- EOM + curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + --data '{"id": 1, "method": "blockchain.address.listunspent", "params":[{"Address": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}' + | jq '.result | length' +EOM +WANT=32 +test_command_with_want + +# blockchain.scripthash + +## blockchain.scripthash.get_mempool + +read -r -d '' CMD <<- EOM + curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + --data '{"id": 1, "method": "blockchain.scripthash.get_mempool", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}' + | jq .error | sed 's/"//g' +EOM +WANT="encoding/hex: invalid byte: U+0047 'G'" +test_command_with_want + +## blockchain.scripthash.get_history + +read -r -d '' CMD <<- EOM + curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + --data '{"id": 1, "method": "blockchain.scripthash.get_history", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}' + | jq .error | sed 's/"//g' +EOM +WANT="encoding/hex: invalid byte: U+0047 'G'" +test_command_with_want + +## blockchain.scripthash.listunspent + +read -r -d '' CMD <<- EOM + curl http://127.0.0.1:50001/rpc -s -H "Content-Type: application/json" + --data '{"id": 1, "method": "blockchain.scripthash.listunspent", "params":[{"scripthash": "bGqWuXRVm5bBqLvLPEQQpvsNxJ5ubc6bwN"}]}' + | jq .error | sed 's/"//g' +EOM +WANT="encoding/hex: invalid byte: U+0047 'G'" +test_command_with_want + +# metrics endpoint testing + +WANT=0 +GOT=$(curl http://127.0.0.1:2112/metrics -s | grep requests | grep resolve | awk '{print $NF}') +want_greater + +# caclulate return value + +logical_or $RES + +if [ $FINALRES -eq 1 ]; then + echo "Failed!" + exit 1 +else + echo "Passed!" + exit 0 +fi diff --git a/server/jsonrpc_blockchain.go b/server/jsonrpc_blockchain.go index de5c457..02be53b 100644 --- a/server/jsonrpc_blockchain.go +++ b/server/jsonrpc_blockchain.go @@ -18,6 +18,8 @@ import ( "github.com/lbryio/lbcd/wire" "github.com/lbryio/lbcutil" "golang.org/x/exp/constraints" + + log "github.com/sirupsen/logrus" ) // BlockchainBlockService methods handle "blockchain.block.*" RPCs @@ -120,6 +122,7 @@ func (s *BlockchainBlockService) Get_chunk(req *BlockGetChunkReq, resp **BlockGe index := uint32(*req) db_headers, err := s.DB.GetHeaders(index*CHUNK_SIZE, CHUNK_SIZE) if err != nil { + log.Warn(err) return err } raw := make([]byte, 0, HEADER_SIZE*len(db_headers)) @@ -141,6 +144,7 @@ func (s *BlockchainBlockService) Get_header(req *BlockGetHeaderReq, resp **Block height := uint32(*req) headers, err := s.DB.GetHeaders(height, 1) if err != nil { + log.Warn(err) return err } if len(headers) < 1 { @@ -171,6 +175,7 @@ func (s *BlockchainBlockService) Headers(req *BlockHeadersReq, resp **BlockHeade count := min(req.Count, MAX_CHUNK_SIZE) db_headers, err := s.DB.GetHeaders(req.StartHeight, count) if err != nil { + log.Warn(err) return err } count = uint32(len(db_headers)) @@ -283,18 +288,22 @@ type AddressGetBalanceResp struct { func (s *BlockchainAddressService) Get_balance(req *AddressGetBalanceReq, resp **AddressGetBalanceResp) error { address, err := lbcutil.DecodeAddress(req.Address, s.Chain) if err != nil { + log.Warn(err) return err } script, err := txscript.PayToAddrScript(address) if err != nil { + log.Warn(err) return err } hashX := hashXScript(script, s.Chain) confirmed, unconfirmed, err := s.DB.GetBalance(hashX) if err != nil { + log.Warn(err) return err } *resp = &AddressGetBalanceResp{confirmed, unconfirmed} + return err } @@ -310,11 +319,13 @@ type ScripthashGetBalanceResp struct { func (s *BlockchainScripthashService) Get_balance(req *scripthashGetBalanceReq, resp **ScripthashGetBalanceResp) error { scripthash, err := decodeScriptHash(req.ScriptHash) if err != nil { + log.Warn(err) return err } hashX := hashX(scripthash) confirmed, unconfirmed, err := s.DB.GetBalance(hashX) if err != nil { + log.Warn(err) return err } *resp = &ScripthashGetBalanceResp{confirmed, unconfirmed} @@ -341,15 +352,18 @@ type AddressGetHistoryResp struct { func (s *BlockchainAddressService) Get_history(req *AddressGetHistoryReq, resp **AddressGetHistoryResp) error { address, err := lbcutil.DecodeAddress(req.Address, s.Chain) if err != nil { + log.Warn(err) return err } script, err := txscript.PayToAddrScript(address) if err != nil { + log.Warn(err) return err } hashX := hashXScript(script, s.Chain) dbTXs, err := s.DB.GetHistory(hashX) if err != nil { + log.Warn(err) return err } confirmed := make([]TxInfo, 0, len(dbTXs)) @@ -380,11 +394,13 @@ type ScripthashGetHistoryResp struct { func (s *BlockchainScripthashService) Get_history(req *ScripthashGetHistoryReq, resp **ScripthashGetHistoryResp) error { scripthash, err := decodeScriptHash(req.ScriptHash) if err != nil { + log.Warn(err) return err } hashX := hashX(scripthash) dbTXs, err := s.DB.GetHistory(hashX) if err != nil { + log.Warn(err) return err } confirmed := make([]TxInfo, 0, len(dbTXs)) @@ -412,10 +428,12 @@ type AddressGetMempoolResp []TxInfoFee func (s *BlockchainAddressService) Get_mempool(req *AddressGetMempoolReq, resp **AddressGetMempoolResp) error { address, err := lbcutil.DecodeAddress(req.Address, s.Chain) if err != nil { + log.Warn(err) return err } script, err := txscript.PayToAddrScript(address) if err != nil { + log.Warn(err) return err } hashX := hashXScript(script, s.Chain) @@ -436,6 +454,7 @@ type ScripthashGetMempoolResp []TxInfoFee func (s *BlockchainScripthashService) Get_mempool(req *ScripthashGetMempoolReq, resp **ScripthashGetMempoolResp) error { scripthash, err := decodeScriptHash(req.ScriptHash) if err != nil { + log.Warn(err) return err } hashX := hashX(scripthash) @@ -462,10 +481,12 @@ type AddressListUnspentResp []TXOInfo func (s *BlockchainAddressService) Listunspent(req *AddressListUnspentReq, resp **AddressListUnspentResp) error { address, err := lbcutil.DecodeAddress(req.Address, s.Chain) if err != nil { + log.Warn(err) return err } script, err := txscript.PayToAddrScript(address) if err != nil { + log.Warn(err) return err } hashX := hashXScript(script, s.Chain) @@ -494,6 +515,7 @@ type ScripthashListUnspentResp []TXOInfo func (s *BlockchainScripthashService) Listunspent(req *ScripthashListUnspentReq, resp **ScripthashListUnspentResp) error { scripthash, err := decodeScriptHash(req.ScriptHash) if err != nil { + log.Warn(err) return err } hashX := hashX(scripthash) -- 2.43.4 From 76e639b757f1a914be802c7df9c660d51a289903 Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Fri, 23 Sep 2022 13:51:25 +0000 Subject: [PATCH 2/8] update --- scripts/cicd_integration_test_runner.sh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/scripts/cicd_integration_test_runner.sh b/scripts/cicd_integration_test_runner.sh index 7df0337..bf26893 100644 --- a/scripts/cicd_integration_test_runner.sh +++ b/scripts/cicd_integration_test_runner.sh @@ -5,5 +5,10 @@ # simple script to kick off herald and call the integration testing # script # +# N.B. this currently just works locally until we figure a way to have +# the data in the cicd environment. +# +./herald serve --db-path /mnt/sdb1/wallet_server/_data/lbry-rocksdb & +./integration_tests.sh -- 2.43.4 From fedd27d3d6568e1f02f3f612656ad958f90f13f3 Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Sat, 24 Sep 2022 06:28:49 +0000 Subject: [PATCH 3/8] changes --- scripts/integration_tests.sh | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/scripts/integration_tests.sh b/scripts/integration_tests.sh index 382e4db..c9d1cc4 100755 --- a/scripts/integration_tests.sh +++ b/scripts/integration_tests.sh @@ -15,8 +15,6 @@ RES=(0) FINALRES=0 -CHUNK_TEST_RES="010000000000000000000000000000000000000000000000000000000000000000000000cc59e59ff97ac092b55e423aa549" - # functions @@ -40,7 +38,7 @@ function want_got { } function want_greater { - if [ "${WANT}" -gt "${GOT}" ]; then + if [ ${WANT} -ge ${GOT} ]; then echo "WANT: ${WANT}" echo "GOT: ${GOT}" RES+=(1) @@ -94,7 +92,7 @@ read -r -d '' CMD <<- EOM --data '{"id": 1, "method": "blockchain.block.get_chunk", "params": [0]}' | jq .result | sed 's/"//g' | head -c 100 EOM -WANT="${CHUNK_TEST_RES}" +WANT="010000000000000000000000000000000000000000000000000000000000000000000000cc59e59ff97ac092b55e423aa549" test_command_with_want ### blockchain.block.get_header -- 2.43.4 From d343d88d34a35b2ebc03f0c011a22e020085d1fd Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Sat, 24 Sep 2022 12:22:22 +0000 Subject: [PATCH 4/8] db shutdown racecondition fix --- db/db.go | 43 +++++++++++++++++++++++++++++++++++++++++++ db/iteroptions.go | 11 +++++++++++ 2 files changed, 54 insertions(+) diff --git a/db/db.go b/db/db.go index 00cc9a7..1c21f9f 100644 --- a/db/db.go +++ b/db/db.go @@ -8,6 +8,7 @@ import ( "encoding/hex" "fmt" "os" + "sync" "time" "github.com/lbryio/herald.go/db/prefixes" @@ -58,6 +59,8 @@ type ReadOnlyDBColumnFamily struct { BlockedChannels map[string][]byte FilteredStreams map[string][]byte FilteredChannels map[string][]byte + OpenIterators map[string][]chan struct{} + ItMut sync.RWMutex ShutdownChan chan struct{} DoneChan chan struct{} Cleanup func() @@ -318,6 +321,20 @@ func intMin(a, b int) int { return b } +// FIXME: This was copied from the signal.go file, maybe move it to a more common place? +// interruptRequested returns true when the channel returned by +// interruptListener was closed. This simplifies early shutdown slightly since +// the caller can just use an if statement instead of a select. +func interruptRequested(interrupted <-chan struct{}) bool { + select { + case <-interrupted: + return true + default: + } + + return false +} + func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { ch := make(chan *prefixes.PrefixRowKV) @@ -325,14 +342,27 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { ro.SetFillCache(opts.FillCache) it := db.NewIteratorCF(ro, opts.CfHandle) opts.It = it + iterKey := fmt.Sprintf("%p", opts) it.Seek(opts.Prefix) if opts.Start != nil { it.Seek(opts.Start) } + if opts.DB != nil { + opts.DB.ItMut.Lock() + opts.DB.OpenIterators[iterKey] = []chan struct{}{opts.DoneChan, opts.ShutdownChan} + opts.DB.ItMut.Unlock() + } + go func() { defer func() { + if opts.DB != nil { + opts.DB.ItMut.Lock() + delete(opts.DB.OpenIterators, iterKey) + opts.DB.ItMut.Unlock() + opts.DoneChan <- struct{}{} + } it.Close() close(ch) ro.Destroy() @@ -355,6 +385,9 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { if kv = opts.ReadRow(&prevKey); kv != nil { ch <- kv } + if interruptRequested(opts.ShutdownChan) { + return + } } }() @@ -514,6 +547,7 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func } db, err := GetDBColumnFamilies(name, secondaryPath, cfNames) + db.OpenIterators = make(map[string][]chan struct{}) cleanupFiles := func() { err = os.RemoveAll(secondaryPath) @@ -642,7 +676,16 @@ func (db *ReadOnlyDBColumnFamily) Unwind() { // Shutdown shuts down the db. func (db *ReadOnlyDBColumnFamily) Shutdown() { + // FIXME: Do we need to shutdown the iterators first? db.ShutdownChan <- struct{}{} + db.ItMut.Lock() + for _, it := range db.OpenIterators { + it[0] <- struct{}{} + } + for _, it := range db.OpenIterators { + <-it[1] + } + db.ItMut.Unlock() <-db.DoneChan db.Cleanup() } diff --git a/db/iteroptions.go b/db/iteroptions.go index 4508cec..aa728a3 100644 --- a/db/iteroptions.go +++ b/db/iteroptions.go @@ -22,6 +22,9 @@ type IterOptions struct { IncludeValue bool RawKey bool RawValue bool + ShutdownChan chan struct{} + DoneChan chan struct{} + DB *ReadOnlyDBColumnFamily CfHandle *grocksdb.ColumnFamilyHandle It *grocksdb.Iterator Serializer *prefixes.SerializationAPI @@ -40,6 +43,9 @@ func NewIterateOptions() *IterOptions { IncludeValue: false, RawKey: false, RawValue: false, + ShutdownChan: make(chan struct{}), + DoneChan: make(chan struct{}), + DB: nil, CfHandle: nil, It: nil, Serializer: prefixes.ProductionAPI, @@ -101,6 +107,11 @@ func (o *IterOptions) WithRawValue(rawValue bool) *IterOptions { return o } +func (o *IterOptions) WithDB(db *ReadOnlyDBColumnFamily) *IterOptions { + o.DB = db + return o +} + func (o *IterOptions) WithSerializer(serializer *prefixes.SerializationAPI) *IterOptions { o.Serializer = serializer return o -- 2.43.4 From f303907acd81aa7ffde87bdb552cd41c4246ea0d Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Wed, 28 Sep 2022 11:26:26 +0000 Subject: [PATCH 5/8] changes per pr --- db/db.go | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/db/db.go b/db/db.go index 1c21f9f..776a4c4 100644 --- a/db/db.go +++ b/db/db.go @@ -338,34 +338,34 @@ func interruptRequested(interrupted <-chan struct{}) bool { func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { ch := make(chan *prefixes.PrefixRowKV) - ro := grocksdb.NewDefaultReadOptions() - ro.SetFillCache(opts.FillCache) - it := db.NewIteratorCF(ro, opts.CfHandle) - opts.It = it iterKey := fmt.Sprintf("%p", opts) - - it.Seek(opts.Prefix) - if opts.Start != nil { - it.Seek(opts.Start) - } - if opts.DB != nil { opts.DB.ItMut.Lock() opts.DB.OpenIterators[iterKey] = []chan struct{}{opts.DoneChan, opts.ShutdownChan} opts.DB.ItMut.Unlock() } + ro := grocksdb.NewDefaultReadOptions() + ro.SetFillCache(opts.FillCache) + it := db.NewIteratorCF(ro, opts.CfHandle) + opts.It = it + + it.Seek(opts.Prefix) + if opts.Start != nil { + it.Seek(opts.Start) + } + go func() { defer func() { - if opts.DB != nil { - opts.DB.ItMut.Lock() - delete(opts.DB.OpenIterators, iterKey) - opts.DB.ItMut.Unlock() - opts.DoneChan <- struct{}{} - } it.Close() close(ch) ro.Destroy() + if opts.DB != nil { + opts.DB.ItMut.Lock() + delete(opts.DB.OpenIterators, iterKey) + opts.DoneChan <- struct{}{} + opts.DB.ItMut.Unlock() + } }() var prevKey []byte @@ -680,10 +680,10 @@ func (db *ReadOnlyDBColumnFamily) Shutdown() { db.ShutdownChan <- struct{}{} db.ItMut.Lock() for _, it := range db.OpenIterators { - it[0] <- struct{}{} + it[1] <- struct{}{} } for _, it := range db.OpenIterators { - <-it[1] + <-it[0] } db.ItMut.Unlock() <-db.DoneChan -- 2.43.4 From 0bcb069e0434726a1ea9da643c3971e5099b6b5b Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Mon, 3 Oct 2022 09:41:20 +0000 Subject: [PATCH 6/8] changes per code review --- db/db.go | 14 +++++++------- db/db_get.go | 16 ++++++++-------- db/db_resolve.go | 2 +- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/db/db.go b/db/db.go index 776a4c4..4ec12e3 100644 --- a/db/db.go +++ b/db/db.go @@ -361,9 +361,9 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { close(ch) ro.Destroy() if opts.DB != nil { + opts.DoneChan <- struct{}{} opts.DB.ItMut.Lock() delete(opts.DB.OpenIterators, iterKey) - opts.DoneChan <- struct{}{} opts.DB.ItMut.Unlock() } }() @@ -445,7 +445,7 @@ func (db *ReadOnlyDBColumnFamily) selectFrom(prefix []byte, startKey, stopKey pr return nil, err } // Prefix and handle - options := NewIterateOptions().WithPrefix(prefix).WithCfHandle(handle) + options := NewIterateOptions().WithDB(db).WithPrefix(prefix).WithCfHandle(handle) // Start and stop bounds options = options.WithStart(startKey.PackKey()).WithStop(stopKey.PackKey()).WithIncludeStop(true) // Don't include the key @@ -606,8 +606,8 @@ func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*R LastState: nil, Height: 0, Headers: nil, - ShutdownChan: make(chan struct{}), - DoneChan: make(chan struct{}), + ShutdownChan: make(chan struct{}, 1), + DoneChan: make(chan struct{}, 1), } err = myDB.ReadDBState() //TODO: Figure out right place for this @@ -833,7 +833,7 @@ func (db *ReadOnlyDBColumnFamily) InitHeaders() error { // endKey := prefixes.NewHeaderKey(db.LastState.Height) startKeyRaw := startKey.PackKey() // endKeyRaw := endKey.PackKey() - options := NewIterateOptions().WithPrefix([]byte{prefixes.Header}).WithCfHandle(handle) + options := NewIterateOptions().WithDB(db).WithPrefix([]byte{prefixes.Header}).WithCfHandle(handle) options = options.WithIncludeKey(false).WithIncludeValue(true) //.WithIncludeStop(true) options = options.WithStart(startKeyRaw) //.WithStop(endKeyRaw) @@ -856,7 +856,7 @@ func (db *ReadOnlyDBColumnFamily) InitTxCounts() error { db.TxCounts = stack.NewSliceBacked[uint32](InitialTxCountSize) - options := NewIterateOptions().WithPrefix([]byte{prefixes.TxCount}).WithCfHandle(handle) + options := NewIterateOptions().WithDB(db).WithPrefix([]byte{prefixes.TxCount}).WithCfHandle(handle) options = options.WithIncludeKey(false).WithIncludeValue(true).WithIncludeStop(true) ch := IterCF(db.DB, options) @@ -1080,7 +1080,7 @@ func GenerateTestData(prefix byte, fileName string) { log.Fatalln(err) } - options := NewIterateOptions() + options := NewIterateOptions().WithDB(db) options.WithRawKey(true).WithRawValue(true).WithIncludeValue(true) options.WithPrefix([]byte{prefix}) diff --git a/db/db_get.go b/db/db_get.go index d630c90..478aaf9 100644 --- a/db/db_get.go +++ b/db/db_get.go @@ -94,7 +94,7 @@ func (db *ReadOnlyDBColumnFamily) GetHeaders(height uint32, count uint32) ([][11 startKeyRaw := prefixes.NewHeaderKey(height).PackKey() endKeyRaw := prefixes.NewHeaderKey(height + count).PackKey() - options := NewIterateOptions().WithPrefix([]byte{prefixes.Header}).WithCfHandle(handle) + options := NewIterateOptions().WithDB(db).WithPrefix([]byte{prefixes.Header}).WithCfHandle(handle) options = options.WithIncludeKey(false).WithIncludeValue(true) //.WithIncludeStop(true) options = options.WithStart(startKeyRaw).WithStop(endKeyRaw) @@ -130,7 +130,7 @@ func (db *ReadOnlyDBColumnFamily) GetBalance(hashX []byte) (uint64, uint64, erro startKeyRaw := startKey.PackKey() endKeyRaw := endKey.PackKey() // Prefix and handle - options := NewIterateOptions().WithPrefix([]byte{prefixes.UTXO}).WithCfHandle(handle) + options := NewIterateOptions().WithDB(db).WithPrefix([]byte{prefixes.UTXO}).WithCfHandle(handle) // Start and stop bounds options = options.WithStart(startKeyRaw).WithStop(endKeyRaw).WithIncludeStop(true) // Don't include the key @@ -346,7 +346,7 @@ func (db *ReadOnlyDBColumnFamily) GetStreamsAndChannelRepostedByChannelHashes(re for _, reposterChannelHash := range reposterChannelHashes { key := prefixes.NewChannelToClaimKeyWHash(reposterChannelHash) rawKeyPrefix := key.PartialPack(1) - options := NewIterateOptions().WithCfHandle(handle).WithPrefix(rawKeyPrefix) + options := NewIterateOptions().WithDB(db).WithCfHandle(handle).WithPrefix(rawKeyPrefix) options = options.WithIncludeKey(false).WithIncludeValue(true) ch := IterCF(db.DB, options) // for stream := range Iterate(db.DB, prefixes.ChannelToClaim, []byte{reposterChannelHash}, false) { @@ -420,7 +420,7 @@ func (db *ReadOnlyDBColumnFamily) GetShortClaimIdUrl(name string, normalizedName log.Printf("partialKey: %#v\n", partialKey) keyPrefix := partialKey.PartialPack(2) // Prefix and handle - options := NewIterateOptions().WithPrefix(prefix).WithCfHandle(handle) + options := NewIterateOptions().WithDB(db).WithPrefix(prefix).WithCfHandle(handle) // Start and stop bounds options = options.WithStart(keyPrefix).WithStop(keyPrefix) // Don't include the key @@ -518,7 +518,7 @@ func (db *ReadOnlyDBColumnFamily) GetActiveAmount(claimHash []byte, txoType uint startKeyRaw := startKey.PartialPack(3) endKeyRaw := endKey.PartialPack(3) // Prefix and handle - options := NewIterateOptions().WithPrefix([]byte{prefixes.ActiveAmount}).WithCfHandle(handle) + options := NewIterateOptions().WithDB(db).WithPrefix([]byte{prefixes.ActiveAmount}).WithCfHandle(handle) // Start and stop bounds options = options.WithStart(startKeyRaw).WithStop(endKeyRaw) // Don't include the key @@ -674,7 +674,7 @@ func (db *ReadOnlyDBColumnFamily) ControllingClaimIter() <-chan *prefixes.Prefix key := prefixes.NewClaimTakeoverKey("") var rawKeyPrefix []byte = nil rawKeyPrefix = key.PartialPack(0) - options := NewIterateOptions().WithCfHandle(handle).WithPrefix(rawKeyPrefix) + options := NewIterateOptions().WithDB(db).WithCfHandle(handle).WithPrefix(rawKeyPrefix) options = options.WithIncludeValue(true) //.WithIncludeStop(true) ch := IterCF(db.DB, options) return ch @@ -785,7 +785,7 @@ func (db *ReadOnlyDBColumnFamily) BidOrderNameIter(normalizedName string) <-chan key := prefixes.NewBidOrderKey(normalizedName) var rawKeyPrefix []byte = nil rawKeyPrefix = key.PartialPack(1) - options := NewIterateOptions().WithCfHandle(handle).WithPrefix(rawKeyPrefix) + options := NewIterateOptions().WithDB(db).WithCfHandle(handle).WithPrefix(rawKeyPrefix) options = options.WithIncludeValue(true) //.WithIncludeStop(true) ch := IterCF(db.DB, options) return ch @@ -803,7 +803,7 @@ func (db *ReadOnlyDBColumnFamily) ClaimShortIdIter(normalizedName string, claimI } else { rawKeyPrefix = key.PartialPack(1) } - options := NewIterateOptions().WithCfHandle(handle).WithPrefix(rawKeyPrefix) + options := NewIterateOptions().WithDB(db).WithCfHandle(handle).WithPrefix(rawKeyPrefix) options = options.WithIncludeValue(true) //.WithIncludeStop(true) ch := IterCF(db.DB, options) return ch diff --git a/db/db_resolve.go b/db/db_resolve.go index a2f34e0..c5fde64 100644 --- a/db/db_resolve.go +++ b/db/db_resolve.go @@ -326,7 +326,7 @@ func (db *ReadOnlyDBColumnFamily) ResolveClaimInChannel(channelHash []byte, norm key := prefixes.NewChannelToClaimKey(channelHash, normalizedName) rawKeyPrefix := key.PartialPack(2) - options := NewIterateOptions().WithCfHandle(handle).WithPrefix(rawKeyPrefix) + options := NewIterateOptions().WithDB(db).WithCfHandle(handle).WithPrefix(rawKeyPrefix) options = options.WithIncludeValue(true) //.WithIncludeStop(true) ch := IterCF(db.DB, options) // TODO: what's a good default size for this? -- 2.43.4 From f086dd0d57cf3ad55b950b98b37e62750ac9994e Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Mon, 3 Oct 2022 10:16:23 +0000 Subject: [PATCH 7/8] fix testing --- db/db.go | 4 +++- db/db_test.go | 5 +++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/db/db.go b/db/db.go index 4ec12e3..f78bcdd 100644 --- a/db/db.go +++ b/db/db.go @@ -606,6 +606,8 @@ func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*R LastState: nil, Height: 0, Headers: nil, + OpenIterators: make(map[string][]chan struct{}), + ItMut: sync.RWMutex{}, ShutdownChan: make(chan struct{}, 1), DoneChan: make(chan struct{}, 1), } @@ -1080,7 +1082,7 @@ func GenerateTestData(prefix byte, fileName string) { log.Fatalln(err) } - options := NewIterateOptions().WithDB(db) + options := NewIterateOptions() options.WithRawKey(true).WithRawValue(true).WithIncludeValue(true) options.WithPrefix([]byte{prefix}) diff --git a/db/db_test.go b/db/db_test.go index 85ed7ec..d6ebc9f 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -7,6 +7,7 @@ import ( "log" "os" "strings" + "sync" "testing" dbpkg "github.com/lbryio/herald.go/db" @@ -93,6 +94,10 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami LastState: nil, Height: 0, Headers: nil, + OpenIterators: make(map[string][]chan struct{}), + ItMut: sync.RWMutex{}, + ShutdownChan: make(chan struct{}, 1), + DoneChan: make(chan struct{}, 1), } // err = dbpkg.ReadDBState(myDB) //TODO: Figure out right place for this -- 2.43.4 From 354973aee4f6f8090ad56e655370f322c06d6194 Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Tue, 4 Oct 2022 04:43:27 +0000 Subject: [PATCH 8/8] add shutdowncalled bool to db --- db/db.go | 16 +++++++++++++--- db/db_test.go | 1 + 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/db/db.go b/db/db.go index f78bcdd..debff0d 100644 --- a/db/db.go +++ b/db/db.go @@ -63,6 +63,7 @@ type ReadOnlyDBColumnFamily struct { ItMut sync.RWMutex ShutdownChan chan struct{} DoneChan chan struct{} + ShutdownCalled bool Cleanup func() } @@ -341,8 +342,16 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { iterKey := fmt.Sprintf("%p", opts) if opts.DB != nil { opts.DB.ItMut.Lock() - opts.DB.OpenIterators[iterKey] = []chan struct{}{opts.DoneChan, opts.ShutdownChan} - opts.DB.ItMut.Unlock() + // There is a tiny chance that we were wating on the above lock while shutdown was + // being called and by the time we get it the db has already notified all active + // iterators to shutdown. In this case we go to the else branch. + if !opts.DB.ShutdownCalled { + opts.DB.OpenIterators[iterKey] = []chan struct{}{opts.DoneChan, opts.ShutdownChan} + opts.DB.ItMut.Unlock() + } else { + opts.DB.ItMut.Unlock() + return ch + } } ro := grocksdb.NewDefaultReadOptions() @@ -609,6 +618,7 @@ func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*R OpenIterators: make(map[string][]chan struct{}), ItMut: sync.RWMutex{}, ShutdownChan: make(chan struct{}, 1), + ShutdownCalled: false, DoneChan: make(chan struct{}, 1), } @@ -678,9 +688,9 @@ func (db *ReadOnlyDBColumnFamily) Unwind() { // Shutdown shuts down the db. func (db *ReadOnlyDBColumnFamily) Shutdown() { - // FIXME: Do we need to shutdown the iterators first? db.ShutdownChan <- struct{}{} db.ItMut.Lock() + db.ShutdownCalled = true for _, it := range db.OpenIterators { it[1] <- struct{}{} } diff --git a/db/db_test.go b/db/db_test.go index d6ebc9f..2855753 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -98,6 +98,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami ItMut: sync.RWMutex{}, ShutdownChan: make(chan struct{}, 1), DoneChan: make(chan struct{}, 1), + ShutdownCalled: false, } // err = dbpkg.ReadDBState(myDB) //TODO: Figure out right place for this -- 2.43.4