forked from LBRYCommunity/lbry-sdk
Merge pull request #3125 from lbryio/fix_order42
fix wallet db integrity / sync bugs
This commit is contained in:
commit
7f1f4eeac6
6 changed files with 240 additions and 197 deletions
|
@ -57,11 +57,11 @@ test:other-integration:
|
|||
- pip install tox-travis
|
||||
- tox -e other
|
||||
|
||||
test:json-api:
|
||||
stage: test
|
||||
script:
|
||||
- make install tools
|
||||
- HOME=/tmp coverage run -p --source=lbry scripts/generate_json_api.py
|
||||
#test:json-api:
|
||||
# stage: test
|
||||
# script:
|
||||
# - make install tools
|
||||
# - HOME=/tmp coverage run -p --source=lbry scripts/generate_json_api.py
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -740,4 +740,145 @@ HASHES = {
|
|||
738000: 'aebdf15b23eb7a37600f67d45bf6586b1d5bff3d5f3459adc2f6211ab3dd0bcb',
|
||||
739000: '3f5a894ac42f95f7d54ce25c42ea0baf1a05b2da0e9406978de0dc53484d8b04',
|
||||
740000: '55debc22f995d844eafa0a90296c9f4f433e2b7f38456fff45dd3c66cef04e37',
|
||||
741000: '927b47fc909b4b55c067bbd75d8638af1400fac076cb642e9500a747d849e458',
|
||||
742000: '97fa3d83eb94114496e418c118f549ebfb8f6d123d0b40a12ecb093239557646',
|
||||
743000: '482b66d8d5084703079c28e3ae69e5dee735f762d6fcf9743e75f04e139fd181',
|
||||
744000: 'f406890d5c70808a58fb14429bad812a3185bdb9dace1aa57de76663f92b5013',
|
||||
745000: '2bd0802cbb8aa4441a159104d39515a4ff6fc8dfe616bc83e88197847c78bcff',
|
||||
746000: '24d090a7b6359db3d5d714a69ddc9a6f2e8ff8f044b723220a8ba32df785fd54',
|
||||
747000: '07c4ce9ce5310ee472cf753ddb03c39c5fee6c910d491daffd38615205411633',
|
||||
748000: 'ea913798c0f09d0a27eae7c852954c2c88b8c3b7f23f8fba26b68a3952d0ffde',
|
||||
749000: '23f256adebfe35d49ba84ad49f3f71fc67f7745091c91f22e65f1cc2e23b8f2c',
|
||||
750000: '96db12ee3a295f3d5c56d244e6e7493f58c08d3427e379940e5d4f891a41ec26',
|
||||
751000: 'cedaf12415dac1314942e58ced80830b92fbfabc41f42a0b0f054f0672ef9822',
|
||||
752000: '293606bcd9fbbee5584724301b2cf86bb69204820023e1fb46c238ddfbc660ab',
|
||||
753000: 'f4d43cbb38b7d97919dedc0f5a6dc8007896c4f443b76f3e5693e25bc46760cf',
|
||||
754000: 'fcaad22fd815311280fe451086516375d1d9d92b2990c7c351407df5aa19011e',
|
||||
755000: 'b9276f10d1844cb5b0308766c8db960490ac34a73c4653d0a91202789a6ccb9b',
|
||||
756000: '2fe5581f1110c1c8dcea46cad647551bd6bd640cb37738d863e189bd8f368347',
|
||||
757000: 'b9d915f366f0b010429a52245b0fb02774157eb9fd8f66bce32dcd3acc71c2a1',
|
||||
758000: '62d1854fc15db56b5d0e05ceeb54c1297966bf9dc7f7a0a14b42c059fc485d1b',
|
||||
759000: 'f4ca9f69d16d092f4a0ea5102e6343b21204c4ea9cd9b22cddd77dbb5d68ade3',
|
||||
760000: 'df3bb86641330d8cc7f55a2fd0da28251219e95babe960a308b18e08a7d88fc8',
|
||||
761000: 'a93029475de4bc7569b6ae802d658cd91c84cc253772712a279f140a6c3b91b1',
|
||||
762000: '307e289dc6ec8bcd62ca8831e4159d5edd780f2fae55ba55dd446225450f46f8',
|
||||
763000: '293f73514abca24f374473bd0394179812952a04ea13dc60ef5ada5331fa274f',
|
||||
764000: 'dd8b082db9281e3d9bacf15d6b352fda186d2d2923c7731844d0d4764dd71db8',
|
||||
765000: '201239e562d2571bf47347b3522fff89632aecea3b2d8cef05151f88b2b0bcdb',
|
||||
766000: '4a55a538b51b5650979e64521998cd5c5ad055ba9f3ac0e3e2a28febc6cc2798',
|
||||
767000: '3916666f2adbb05ea98ec1961f9546b9afa0f6910ec95e42ce37267f2ae4f79c',
|
||||
768000: 'dc0ad881eedcb5fd4954238f462080d6e7636b058d481698ed1c077e0ce2207e',
|
||||
769000: 'eaf10a1e1ec6e129289b8479a05df03e0808f1f0946f1995de6524e9ebe7a461',
|
||||
770000: '7200c64f22e32de7f999583361c933680fc9a2ffcb9a5ab73d3076fd49ec7537',
|
||||
771000: 'd883111a2eeacff80ce31df35ab6c943805b9e48877b413fccf371e5dbfa7fb2',
|
||||
772000: '3977d3c60edb9c80c97bb2b759b1659cbb650ad2d3a6f61d2caec83f1b2ae84c',
|
||||
773000: '9c7175fb8646a1a82383b4c534fd01bcf92d65c43d87ae854d51a784b04dc77e',
|
||||
774000: 'e0e92485f86e5fffa87b3497424e43b02a37710517d9d3f272392e8cdc56e5e9',
|
||||
775000: '6395229113d3aa2105afbaeb8b59621a536fc61fe272314b2fc3bdda98dd66cc',
|
||||
776000: 'b4b00207328b5f032bd4f0b634f91323ff520ada8c8bfec241b23c8e4bfd5a4e',
|
||||
777000: '14cdc6f5f7b4bd5bad745dfe6fcd114e9194026412a2e1b3f345be2eef433d16',
|
||||
778000: 'd3cd7b68be504c32117b670d38d59d44b02dcf3d65811efc2ca5531d902623cc',
|
||||
779000: 'afcd220e4040cb5f92d4b38fc204e59822df2218f767f2c4b33597b238a35f77',
|
||||
780000: '78252a9cfc289a70192ed8dd3dddeb1b9a4f9b8eff9a5d0ac259b3254472cf68',
|
||||
781000: '02ebc3f17d947481a311b4771c254f1e002b6a9198d4a5258ce6c13165aadddc',
|
||||
782000: '8dd9f1f372ee6d688a0bcdc3b342c77804ba5a646a218be4bc2aa02d846206c0',
|
||||
783000: 'e46b0d02ec2ef488fae455665e107520e1bd2b4f35ca52af7ad8addd2f72fa73',
|
||||
784000: '9ee8a8de94231e3ae3a610b82fdbca48dc14d9b80791d20af6c365a31822df6f',
|
||||
785000: '21e1cc12def8173a50158b2833bd91a62140c61646f5e08aecaee3e6da20735e',
|
||||
786000: 'b3e659f84d73de42888cc0f2b69bae71dd5fa6756a437a4b21958b182faa316e',
|
||||
787000: 'a9be7ba00ea6a9ea6bd03d8412ec014ca7e8cda6bdc33382f165e702811b8836',
|
||||
788000: 'a4c14729f8a68c03f5a0ccd890ac6a92b39c143f1f752fe81ad051eb52d8dce0',
|
||||
789000: '5cf66d224e5645097efc9c3c0392b51c8ca8ea1295151921a7912a2f04ee1274',
|
||||
790000: '676769ade71c33bc102bce416e66eb2c6794b03d7b8f5a590c87c380da463775',
|
||||
791000: '0228e074451797bf6bfbc941bcafcbadc972d32e4e1e0c5da015513f65714217',
|
||||
792000: '0fa3d00a1f19c5ac060e10a410cf7cea18eac5f89018d79ce51ac3fc66bbb365',
|
||||
793000: '5f68d0868b424e32f5ce3d8e7d9f18979da7b831b8ef4e3974d62fb20ff53a97',
|
||||
794000: '34508c56423739c00a837801b654b07decb274d02b383eff396d23c4d64bc0e9',
|
||||
795000: '7f70910c855d1fd88cd7f9be8a3b94314ee408a31a2da6301404bf8deb07c12c',
|
||||
796000: 'b74ab8813b1d2a0967fea0e66597572e5f0b5a285e21f5150fcc9d5f757de130',
|
||||
797000: 'bba27b1491d907ab1baa456cb651dc5b071231b1b6ad27b62d351ca12c25dbfd',
|
||||
798000: 'e75dcb15b2fc91f02e75e600dde9f6f46c09672533bc82a5d6916c4a2cd8613a',
|
||||
799000: 'adf62c826a3e0b33af439a7881918ae4ce19c5fb2ca37d21243415f7d716aa65',
|
||||
800000: 'd8f0ca13a8c8a19c254a3a6ba15150a34711dca96f2d877162cc44aa2acfb268',
|
||||
801000: '2a8c7104c4040a2bc31913ae25e9361df5bac9477368c708f86c1ca640480887',
|
||||
802000: '1f3b09d3561c4a8a056b263289bd492dc6c0d604c3fa195935e735d1c0ddc40e',
|
||||
803000: '037769628c40a701fdb4b16d79084b8fbb319fde79770a7ac842f3cdc813099e',
|
||||
804000: 'a0c6a089e5fa1e3589ca282085fe7201a5705776d81b257ffd252b2947fa6428',
|
||||
805000: 'b2ac99bfc4a488e7b7624b31ee061991a6dd0881bb005cd13f3dd2e66a08fe19',
|
||||
806000: 'ffe63cb999a278280b80a667d2dcb60c40e43a53f733914d8bec808b694ebf83',
|
||||
807000: 'eddb09fc6c4869a59b520d0befb1fb6ac952333f3cc5de086539c85ea8558778',
|
||||
808000: '0f4fb3f9172e52897ea992d9f3a2024126c4d2e63e9888739f11fb1f5e4c1f46',
|
||||
809000: '9641dd720d23ced2f1cb6e5cf46ac4e547afb9f56263c4cf58e3b19d407cf401',
|
||||
810000: 'de6dc953acd7e5ef213b3aaf1c4a9ee1d5b756bfce5525ee105214647e243a85',
|
||||
811000: 'c52c83712ca12b24b2db1b4a575e7f352b1d560cbf702e121a03bdca9e8be23d',
|
||||
812000: '83143734bb965318a53a38a7e403dcdb3e3fadedb01ab12c370417fc2a0655c0',
|
||||
813000: 'e480deff10c5a84fc957e3aed936690e24b74dd08fa8858a8a953c2f7383b914',
|
||||
814000: '810d33afcee07b9abe16c6cdc3a041038daa131c476b0daf48a080007f08b490',
|
||||
815000: 'b4aeb9e16fddd27844b2d56bc2b221134039bb5642c9e9ba88372afbdeac3972',
|
||||
816000: '86e73b67aae3d248011b8f66ed414cb8a9ba4b2a3cf7e32773cfbff055d719b7',
|
||||
817000: '3ebb8b83752b48242016cb682f0f6bd14e15371bf1163a5933193eaa0edeb351',
|
||||
818000: '4d925e17f642f220bbf317d3d5355d2f41fbce325f190f8c3b32dc0b337d24d6',
|
||||
819000: 'b9cc126d620f6b99d90a00d35957b0e428aaaa7c986bc9e816a60e4334572961',
|
||||
820000: '9c2f8c142bed1f94dca29276f7c83958be8cfe11773bb9b56c808fbcf7d3b1f8',
|
||||
821000: 'e5509eb98895cfa12a8da5d54c1df3f52472ffcbdf707adbf84a4a9c5d356203',
|
||||
822000: '764aada4802ebfe4ef935ab50af06a4f83aa556c49fdde3d9e12e1abd230c16b',
|
||||
823000: '1dbd745c2e96a365d865f990d109137d32d42977f503af55d8c00b109d31d3c3',
|
||||
824000: '954304a0b0c8f549c3bffd5ff46b5b8f05b0f0fde2a36f24fd5af9d774fb3079',
|
||||
825000: '17808b14f2056c1a5d46cb7617e9de9be6a1a6084edbc1bdb778586467a72297',
|
||||
826000: '3ca1167d4cac8b187829b23001b438617c43704b42462c4eb001b0d434cb9651',
|
||||
827000: '246d1607245e4a202f420393ac2e30e9cbf5eb5570dc997073b897f6d8643023',
|
||||
828000: '1764730a8dc3e89d02d168ff6bb54e8c903820b74711af6ff27bd0c8545577e7',
|
||||
829000: 'd9f3ab0cd823c6305bd8b95a96188bb4f2ca90b4d66c5d12293e8b6192bac0f2',
|
||||
830000: 'd4ff51f0092b04aedf8d39937680d8e8309b1be21d36e7833ed36f8e30aad6ea',
|
||||
831000: '3e92e76721b962396dce52993fa7606552f0907b38f7b2bd7b21ada98c145f47',
|
||||
832000: 'df12fcdb4cbe53ba627ace6de898298de175f8671d3d90170732d110fcdc34b8',
|
||||
833000: '25167ff38ae4a5964b618cabe0a12d4de62ac7a4c47448cdb4499e09e108d5b9',
|
||||
834000: 'd31f5309ea179a1e386e835fc372e47dcda6871a3a239abfba50c4f368994f13',
|
||||
835000: 'aff7e8dd3e55ea807fcbe284014075f420b3a23f1b0eb47bacdc1c91d2899813',
|
||||
836000: '3b5ac6d64c470739bb17d1544a285affb40f2d33e92687e5ba7c5ac602e0d72a',
|
||||
837000: 'd5619cbfe4f27c55f2bf9351b4891636cf64fef88212a5eeeae7bd3de47fe0bd',
|
||||
838000: '1f9102a49c6ac470cb5d0050e5300b1443840d6d65719b835e3bea484aafb2ec',
|
||||
839000: '3f63e391f0fbc5787fbe4ace3bada3816261294ea1c6ee435001801023682f90',
|
||||
840000: '777894fd12bd0d6dee7bcde2995c68e55e7094e3122da38571e4b6c4304b75e0',
|
||||
841000: 'ceb0c598c788e25e43e25aa4beff5c7377035824844cf1675eaea537074df028',
|
||||
842000: '8661cf2065dc713d2ba043f0b81f0effcc940eeb3e91906a21ff22c210561dcd',
|
||||
843000: '0dc2766f90415009d0c86bedffee6ebcf58042eb08262c0c67c4e9ed86b2aec8',
|
||||
844000: '26d072da864cab268a12794977b04ec44fb69ef3978e2342e82225974dac54dd',
|
||||
845000: '95e93bb60be8d5f07a1f4d26290c914957a82fc9d26ae8a3f20082eda27406ff',
|
||||
846000: 'f1bdc39af7705e58ab8b6c31dc70dce1e115db1cfd8cc9b037949dfbec82a59a',
|
||||
847000: 'f5f10f06396ecf2765d8a081141d489737c1d8d57c281f28f57c4cb2f90db883',
|
||||
848000: '331b8ef08605bae8d749893af9ed54f0df4f07a5a002108a2a0aea82d0360979',
|
||||
849000: '75b5f6233ab9a1bbc3c8b2893e5b22a0aa98e7ea635261255dc3c281f67d2260',
|
||||
850000: '5d7e6fe83e0ea1910a54a00090704737671d6f44df4228e21440ad1fc15e595f',
|
||||
851000: '7822db25d3ff0f6695ee38bad91edf317b5c6611673d28f1d22053110bb558be',
|
||||
852000: '2f0effad83a3561fc1a2806a562786a641d9ddb18d16bb9308006e7d324a21e9',
|
||||
853000: 'f603b2eaff11d5296377d990651317d40a1b2599ad2c5250eab131090f4b9458',
|
||||
854000: '34d59b26a50f18a9f250736d0f2e69d28b7e196fbef9b8a26c6b0b75c16aa194',
|
||||
855000: '76dd1ffff3946c0878969886fcf177ce5ab5560df19ddf006f9bcb02ae3e4e4f',
|
||||
856000: '74ff0b6f64e9dd5802fec2aac1d3ae194d28b9264114adaf0a882b46c8c918fe',
|
||||
857000: '7b5badfa2e4f40aa597a504d7ebe83c3705a2c6169a8c168ce293db223bc2d32',
|
||||
858000: '2bb0767a0f72b20d45ecfc3e34517dbda16d85758e040cf0e147f4cbd0cc57ac',
|
||||
859000: '3d741b9c365a91ed76f85824b94d19ec19b608d232660840ba59c7aa4b2cb67f',
|
||||
860000: 'd481a5a117878c0e3acd1f5844e150fb30e617577947d9846b1d214d703b71b0',
|
||||
861000: '54033424e488a3f1ad6946d4a6d9acb48465d6b1dbe8e1c2504a54cc84d7cad4',
|
||||
862000: '464bc3820a8cc8844dc9e26c388009e9982c656d46ef4b4fd0a2cb0e4eea0aaa',
|
||||
863000: 'd1aa94be2174f66780c4f226b9da3f6712b0f37af8dec33360bea83ca261b342',
|
||||
864000: '8c16008f11de5bc395d88cd802514ff647450f1bc136724b9aaf2ccce10a494f',
|
||||
865000: '3dae86012e97a201e2e1a47c899001ac00f78dc108026ed7c4194858c6c6dd5a',
|
||||
866000: 'afe5b0ccab995e1a1fa25fbc24c1d4b1a92c43042d03395f8743dcd806e72fd8',
|
||||
867000: 'c83716ac171aa9ab0d414833db340fa30e82bfda6cc616d3038529caab9b5600',
|
||||
868000: '8c409fe03cd35ef2d8e366818788b40eaeb4c8f6ae91450d75f4a66ca5f69cad',
|
||||
869000: '1d47909ceba790b8e1ce2e9902ee2775ea99e58efdb95668f9803a8ccf95f286',
|
||||
870000: '9adf5da1476388f053aa42de636da169d1cf1c9652cdf7cd9ad4fb18a0eb3388',
|
||||
871000: '8ad57fb1e74bcba0b5614fbac003be2bb32275dd85b38f2d28a0585005a99cfc',
|
||||
872000: '84a32e92012a356106e9657da8dab1a5491ea588fc29d411c69b20680c666420',
|
||||
873000: 'adf5921bbbfaa43929f67e6a070975313b77b456e262c700a27be611fceb17ae',
|
||||
874000: '09eaa7c4b18c79a46a2895190333f72336826223d5c986849a06f5153f49f2a5',
|
||||
875000: '235d7e4f31966507312149ea4c5e294aa84c695cf840117f0ef5963be7a0bda1',
|
||||
876000: '9aa9cb806ccbec0475ac330b496c5b2edeba38ba3f1e13ddd54a01457634a288',
|
||||
877000: 'c1e7f9b2b20bb1c4c0deadbc786d31fdf36f262325342aa23d1a66e2846b22bc',
|
||||
878000: 'ee0d2b20ac28ce23ab38698a57c6beff14f12b7af9d027c05cc92f652695f46b',
|
||||
879000: '0eb0810f4b81d1845b0a88f05449408df2e45715c9210a656f45278c5fdf7956',
|
||||
880000: 'e7d613027e3b4ca38d09bbef07998b57db237c6d67f1e8ea50024d2e0d9a1a72',
|
||||
881000: '21af4d355d8756b8bf0369b2d79b5c824148ae069026ba5c14f9dd6b7555e1db',
|
||||
}
|
||||
|
|
|
@ -430,7 +430,7 @@ class SQLiteMixin:
|
|||
return
|
||||
await self.db.executescript('\n'.join(
|
||||
f"DROP TABLE {table};" for table in tables
|
||||
))
|
||||
) + '\n' + 'PRAGMA WAL_CHECKPOINT(FULL);' + '\n' + 'VACUUM;')
|
||||
await self.db.execute(self.CREATE_VERSION_TABLE)
|
||||
await self.db.execute("INSERT INTO version VALUES (?)", (self.SCHEMA_VERSION,))
|
||||
await self.db.executescript(self.CREATE_TABLES_QUERY)
|
||||
|
@ -574,7 +574,7 @@ def get_and_reserve_spendable_utxos(transaction: sqlite3.Connection, accounts: L
|
|||
|
||||
class Database(SQLiteMixin):
|
||||
|
||||
SCHEMA_VERSION = "1.4"
|
||||
SCHEMA_VERSION = "1.5"
|
||||
|
||||
PRAGMAS = """
|
||||
pragma journal_mode=WAL;
|
||||
|
|
|
@ -123,7 +123,6 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
self.network: Network = self.config.get('network') or Network(self)
|
||||
self.network.on_header.listen(self.receive_header)
|
||||
self.network.on_status.listen(self.process_status_update)
|
||||
self.network.on_connected.listen(self.join_network)
|
||||
|
||||
self.accounts = []
|
||||
self.fee_per_byte: int = self.config.get('fee_per_byte', self.default_fee_per_byte)
|
||||
|
@ -156,7 +155,7 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
self._on_ready_controller = StreamController()
|
||||
self.on_ready = self._on_ready_controller.stream
|
||||
|
||||
self._tx_cache = pylru.lrucache(self.config.get("tx_cache_size", 10_000))
|
||||
self._tx_cache = pylru.lrucache(self.config.get("tx_cache_size", 1024))
|
||||
self._update_tasks = TaskGroup()
|
||||
self._other_tasks = TaskGroup() # that we dont need to start
|
||||
self._utxo_reservation_lock = asyncio.Lock()
|
||||
|
@ -329,6 +328,8 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
await self.network.on_connected.first
|
||||
async with self._header_processing_lock:
|
||||
await self._update_tasks.add(self.initial_headers_sync())
|
||||
self.network.on_connected.listen(self.join_network)
|
||||
asyncio.ensure_future(self.join_network())
|
||||
await fully_synced
|
||||
await self.db.release_all_outputs()
|
||||
await asyncio.gather(*(a.maybe_migrate_certificates() for a in self.accounts))
|
||||
|
@ -414,6 +415,7 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
"Blockchain Reorganization: attempting rewind to height %s from starting height %s",
|
||||
height, height+rewound
|
||||
)
|
||||
self._tx_cache.clear()
|
||||
|
||||
else:
|
||||
raise IndexError(f"headers.connect() returned negative number ({added})")
|
||||
|
@ -512,10 +514,9 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
)
|
||||
return True
|
||||
|
||||
acquire_lock_tasks = []
|
||||
synced_txs = []
|
||||
to_request = {}
|
||||
pending_synced_history = {}
|
||||
updated_cached_items = {}
|
||||
already_synced = set()
|
||||
|
||||
already_synced_offset = 0
|
||||
|
@ -525,24 +526,6 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
already_synced.add((txid, remote_height))
|
||||
already_synced_offset += 1
|
||||
continue
|
||||
cache_item = self._tx_cache.get(txid)
|
||||
if cache_item is None:
|
||||
cache_item = TransactionCacheItem()
|
||||
self._tx_cache[txid] = cache_item
|
||||
|
||||
unsynced_offset = already_synced_offset
|
||||
for txid, remote_height in remote_history[already_synced_offset:]:
|
||||
cache_item = self._tx_cache[txid]
|
||||
if cache_item.tx is not None and cache_item.tx.height >= remote_height \
|
||||
and (not cache_item.tx.is_verified or remote_height < 1):
|
||||
pending_synced_history[unsynced_offset] = f'{txid}:{cache_item.tx.height}:'
|
||||
already_synced.add((txid, cache_item.tx.height))
|
||||
else:
|
||||
acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire()))
|
||||
unsynced_offset += 1
|
||||
|
||||
if acquire_lock_tasks:
|
||||
await asyncio.wait(acquire_lock_tasks)
|
||||
|
||||
tx_indexes = {}
|
||||
|
||||
|
@ -550,22 +533,25 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
tx_indexes[txid] = i
|
||||
if (txid, remote_height) in already_synced:
|
||||
continue
|
||||
cache_item = self._tx_cache.get(txid)
|
||||
cache_item.pending_verifications += 1
|
||||
updated_cached_items[txid] = cache_item
|
||||
|
||||
assert cache_item is not None, 'cache item is none'
|
||||
assert cache_item.lock.locked(), 'cache lock is not held?'
|
||||
|
||||
to_request[i] = (txid, remote_height)
|
||||
|
||||
log.debug(
|
||||
"request %i transactions, %i/%i for %s are already synced", len(to_request),
|
||||
len(pending_synced_history), len(remote_history), address
|
||||
"request %i transactions, %i/%i for %s are already synced", len(to_request), len(synced_txs),
|
||||
len(remote_history), address
|
||||
)
|
||||
requested_txes = await self._request_transaction_batch(to_request, len(remote_history), address)
|
||||
for tx in requested_txes:
|
||||
remote_history_txids = set(txid for txid, _ in remote_history)
|
||||
async for tx in self.request_synced_transactions(to_request, remote_history_txids, address):
|
||||
pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:"
|
||||
synced_txs.append(tx)
|
||||
if len(synced_txs) >= 100:
|
||||
log.info("Syncing address %s: %d/%d", address, len(pending_synced_history), len(to_request))
|
||||
await self.db.save_transaction_io_batch(
|
||||
synced_txs, address, self.address_to_hash160(address), ""
|
||||
)
|
||||
while synced_txs:
|
||||
tx = synced_txs.pop()
|
||||
self._on_transaction_controller.add(TransactionEvent(address, tx))
|
||||
log.info("Sync finished for address %s: %d/%d", address, len(pending_synced_history), len(to_request))
|
||||
|
||||
assert len(pending_synced_history) == len(remote_history), \
|
||||
f"{len(pending_synced_history)} vs {len(remote_history)}"
|
||||
|
@ -576,21 +562,11 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
if f"{txid}:{height}:" != pending_synced_history[i]:
|
||||
log.warning("history mismatch: %s vs %s", remote_history[remote_i], pending_synced_history[i])
|
||||
synced_history += pending_synced_history[i]
|
||||
|
||||
cache_size = self.config.get("tx_cache_size", 10_000)
|
||||
for txid, cache_item in updated_cached_items.items():
|
||||
cache_item.pending_verifications -= 1
|
||||
if cache_item.pending_verifications < 0:
|
||||
log.warning("config value tx cache size %i needs to be increased", cache_size)
|
||||
cache_item.pending_verifications = 0
|
||||
try:
|
||||
cache_item.lock.release()
|
||||
except RuntimeError:
|
||||
log.warning("lock was already released?")
|
||||
|
||||
await self.db.save_transaction_io_batch(
|
||||
[], address, self.address_to_hash160(address), synced_history
|
||||
synced_txs, address, self.address_to_hash160(address), synced_history
|
||||
)
|
||||
while synced_txs:
|
||||
self._on_transaction_controller.add(TransactionEvent(address, synced_txs.pop()))
|
||||
|
||||
if address_manager is None:
|
||||
address_manager = await self.get_address_manager_for_address(address)
|
||||
|
@ -598,13 +574,6 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
if address_manager is not None:
|
||||
await address_manager.ensure_address_gap()
|
||||
|
||||
for txid, cache_item in updated_cached_items.items():
|
||||
if self._tx_cache.get(txid) is not cache_item:
|
||||
log.warning("tx cache corrupted while syncing %s, reattempt sync=%s", address, reattempt_update)
|
||||
if reattempt_update:
|
||||
return await self.update_history(address, remote_status, address_manager, False)
|
||||
return False
|
||||
|
||||
local_status, local_history = \
|
||||
await self.get_local_status_and_history(address, synced_history)
|
||||
|
||||
|
@ -638,151 +607,88 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
|
||||
async def maybe_verify_transaction(self, tx, remote_height, merkle=None):
|
||||
tx.height = remote_height
|
||||
cached = self._tx_cache.get(tx.id)
|
||||
if not cached:
|
||||
# cache txs looked up by transaction_show too
|
||||
cached = TransactionCacheItem()
|
||||
self._tx_cache[tx.id] = cached
|
||||
cached.tx = tx
|
||||
if 0 < remote_height < len(self.headers) and cached.pending_verifications <= 1:
|
||||
if 0 < remote_height < len(self.headers):
|
||||
# can't be tx.pending_verifications == 1 because we have to handle the transaction_show case
|
||||
if not merkle:
|
||||
merkle = await self.network.retriable_call(self.network.get_merkle, tx.id, remote_height)
|
||||
if 'merkle' not in merkle:
|
||||
return
|
||||
merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash)
|
||||
header = await self.headers.get(remote_height)
|
||||
tx.position = merkle['pos']
|
||||
tx.is_verified = merkle_root == header['merkle_root']
|
||||
|
||||
async def _single_batch(self, batch, remote_heights, header_cache, transactions):
|
||||
batch_result = await self.network.retriable_call(
|
||||
self.network.get_transaction_batch, batch
|
||||
)
|
||||
for txid, (raw, merkle) in batch_result.items():
|
||||
remote_height = remote_heights[txid]
|
||||
merkle_height = merkle['block_height']
|
||||
cache_item = self._tx_cache.get(txid)
|
||||
if cache_item is None:
|
||||
cache_item = TransactionCacheItem()
|
||||
self._tx_cache[txid] = cache_item
|
||||
tx = cache_item.tx or Transaction(bytes.fromhex(raw.decode() if isinstance(raw, bytes) else raw),
|
||||
height=remote_height)
|
||||
tx.height = remote_height
|
||||
cache_item.tx = tx
|
||||
if 'merkle' in merkle and remote_heights[txid] > 0:
|
||||
merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash)
|
||||
try:
|
||||
header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height))
|
||||
except IndexError:
|
||||
log.warning("failed to verify %s at height %i", tx.id, merkle_height)
|
||||
else:
|
||||
header_cache[remote_heights[txid]] = header
|
||||
tx.position = merkle['pos']
|
||||
tx.is_verified = merkle_root == header['merkle_root']
|
||||
transactions.append(tx)
|
||||
return transactions
|
||||
|
||||
async def request_transactions_for_inflate(self, to_request: Tuple[Tuple[str, int], ...]):
|
||||
header_cache = {}
|
||||
async def request_transactions(self, to_request: Tuple[Tuple[str, int], ...], cached=False):
|
||||
batches = [[]]
|
||||
remote_heights = {}
|
||||
transactions = []
|
||||
heights_in_batch = 0
|
||||
last_height = 0
|
||||
|
||||
for txid, height in sorted(to_request, key=lambda x: x[1]):
|
||||
if cached:
|
||||
if txid in self._tx_cache:
|
||||
if self._tx_cache[txid].tx is not None and self._tx_cache[txid].tx.is_verified:
|
||||
yield self._tx_cache[txid].tx
|
||||
continue
|
||||
else:
|
||||
self._tx_cache[txid] = TransactionCacheItem()
|
||||
remote_heights[txid] = height
|
||||
if height != last_height:
|
||||
heights_in_batch += 1
|
||||
last_height = height
|
||||
if len(batches[-1]) == 100 or heights_in_batch == 20:
|
||||
if len(batches[-1]) == 100:
|
||||
batches.append([])
|
||||
heights_in_batch = 1
|
||||
batches[-1].append(txid)
|
||||
if not batches[-1]:
|
||||
batches.pop()
|
||||
|
||||
for batch in batches:
|
||||
await self._single_batch(batch, remote_heights, header_cache, transactions)
|
||||
return transactions
|
||||
async for tx in self._single_batch(batch, remote_heights):
|
||||
if cached:
|
||||
self._tx_cache[tx.id].tx = tx
|
||||
yield tx
|
||||
|
||||
async def _request_transaction_batch(self, to_request, remote_history_size, address):
|
||||
header_cache = {}
|
||||
batches = [[]]
|
||||
remote_heights = {}
|
||||
synced_txs = []
|
||||
heights_in_batch = 0
|
||||
last_height = 0
|
||||
for idx in sorted(to_request):
|
||||
txid = to_request[idx][0]
|
||||
height = to_request[idx][1]
|
||||
remote_heights[txid] = height
|
||||
if height != last_height:
|
||||
heights_in_batch += 1
|
||||
last_height = height
|
||||
if len(batches[-1]) == 100 or heights_in_batch == 20:
|
||||
batches.append([])
|
||||
heights_in_batch = 1
|
||||
batches[-1].append(txid)
|
||||
if not batches[-1]:
|
||||
batches.pop()
|
||||
async def request_synced_transactions(self, to_request, remote_history, address):
|
||||
pending_sync = {}
|
||||
async for tx in self.request_transactions(((txid, height) for txid, height in to_request.values())):
|
||||
pending_sync[tx.id] = tx
|
||||
for f in asyncio.as_completed([self._sync(tx, remote_history, pending_sync) for tx in pending_sync.values()]):
|
||||
yield await f
|
||||
|
||||
last_showed_synced_count = 0
|
||||
|
||||
async def _single_batch(batch):
|
||||
transactions = await self._single_batch(batch, remote_heights, header_cache, [])
|
||||
this_batch_synced = []
|
||||
|
||||
for tx in transactions:
|
||||
check_db_for_txos = []
|
||||
async def _single_batch(self, batch, remote_heights):
|
||||
heights = {remote_heights[txid] for txid in batch}
|
||||
unrestriced = 0 < min(heights) < max(heights) < max(self.headers.checkpoints or [0])
|
||||
batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch, not unrestriced)
|
||||
for txid, (raw, merkle) in batch_result.items():
|
||||
remote_height = remote_heights[txid]
|
||||
tx = Transaction(unhexlify(raw), height=remote_height)
|
||||
await self.maybe_verify_transaction(tx, remote_height, merkle)
|
||||
yield tx
|
||||
|
||||
async def _sync(self, tx, remote_history, pending_txs):
|
||||
check_db_for_txos = {}
|
||||
for txi in tx.inputs:
|
||||
if txi.txo_ref.txo is not None:
|
||||
continue
|
||||
cache_item = self._tx_cache.get(txi.txo_ref.tx_ref.id)
|
||||
if cache_item is not None:
|
||||
if cache_item.tx is not None:
|
||||
txi.txo_ref = cache_item.tx.outputs[txi.txo_ref.position].ref
|
||||
wanted_txid = txi.txo_ref.tx_ref.id
|
||||
if wanted_txid not in remote_history:
|
||||
continue
|
||||
if wanted_txid in pending_txs:
|
||||
txi.txo_ref = pending_txs[wanted_txid].outputs[txi.txo_ref.position].ref
|
||||
else:
|
||||
check_db_for_txos.append(txi.txo_ref.id)
|
||||
check_db_for_txos[txi] = txi.txo_ref.id
|
||||
|
||||
referenced_txos = {} if not check_db_for_txos else {
|
||||
txo.id: txo for txo in await self.db.get_txos(
|
||||
txoid__in=check_db_for_txos, order_by='txo.txoid', no_tx=True
|
||||
txoid__in=list(check_db_for_txos.values()), order_by='txo.txoid', no_tx=True
|
||||
)
|
||||
}
|
||||
|
||||
for txi in tx.inputs:
|
||||
if txi.txo_ref.txo is not None:
|
||||
continue
|
||||
referenced_txo = referenced_txos.get(txi.txo_ref.id)
|
||||
if referenced_txo is not None:
|
||||
txi.txo_ref = referenced_txo.ref
|
||||
continue
|
||||
cache_item = self._tx_cache.get(txi.txo_ref.id)
|
||||
if cache_item is None:
|
||||
cache_item = self._tx_cache[txi.txo_ref.id] = TransactionCacheItem()
|
||||
if cache_item.tx is not None:
|
||||
txi.txo_ref = cache_item.tx.ref
|
||||
|
||||
synced_txs.append(tx)
|
||||
this_batch_synced.append(tx)
|
||||
|
||||
await self.db.save_transaction_io_batch(
|
||||
this_batch_synced, address, self.address_to_hash160(address), ""
|
||||
)
|
||||
await asyncio.wait([
|
||||
self._on_transaction_controller.add(TransactionEvent(address, tx))
|
||||
for tx in this_batch_synced
|
||||
])
|
||||
nonlocal last_showed_synced_count
|
||||
if last_showed_synced_count + 100 < len(synced_txs):
|
||||
log.info("synced %i/%i transactions for %s", len(synced_txs), remote_history_size, address)
|
||||
last_showed_synced_count = len(synced_txs)
|
||||
|
||||
for batch in batches:
|
||||
await _single_batch(batch)
|
||||
|
||||
return synced_txs
|
||||
for txi in check_db_for_txos:
|
||||
if txi.txo_ref.id in referenced_txos:
|
||||
txi.txo_ref = referenced_txos[txi.txo_ref.id].ref
|
||||
else:
|
||||
tx_from_db = await self.db.get_transaction(txid=txi.txo_ref.tx_ref.id)
|
||||
if tx_from_db is None:
|
||||
log.warning("%s not on db, not on cache, but on remote history!", txi.txo_ref.id)
|
||||
else:
|
||||
txi.txo_ref = tx_from_db.outputs[txi.txo_ref.position].ref
|
||||
return tx
|
||||
|
||||
async def get_address_manager_for_address(self, address) -> Optional[AddressManager]:
|
||||
details = await self.db.get_address(address=address)
|
||||
|
@ -850,10 +756,10 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
include_received_tips=False) -> Tuple[List[Output], dict, int, int]:
|
||||
encoded_outputs = await query
|
||||
outputs = Outputs.from_base64(encoded_outputs or b'') # TODO: why is the server returning None?
|
||||
txs = []
|
||||
if len(outputs.txs) > 0:
|
||||
txs: List[Transaction] = []
|
||||
txs.extend((await self.request_transactions_for_inflate(tuple(outputs.txs))))
|
||||
if len(outputs.txs) > 0:
|
||||
async for tx in self.request_transactions(tuple(outputs.txs), cached=True):
|
||||
txs.append(tx)
|
||||
|
||||
_txos, blocked = outputs.inflate(txs)
|
||||
|
||||
|
@ -1071,7 +977,7 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
return self.db.get_channel_count(**constraints)
|
||||
|
||||
async def resolve_collection(self, collection, offset=0, page_size=1):
|
||||
claim_ids = collection.claim.collection.claims.ids[offset:page_size+offset]
|
||||
claim_ids = collection.claim.collection.claims.ids[offset:page_size + offset]
|
||||
try:
|
||||
resolve_results, _, _, _ = await self.claim_search([], claim_ids=claim_ids)
|
||||
except Exception as err:
|
||||
|
@ -1120,7 +1026,7 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
'txid': tx.id,
|
||||
'timestamp': ts,
|
||||
'date': datetime.fromtimestamp(ts).isoformat(' ')[:-3] if tx.height > 0 else None,
|
||||
'confirmations': (headers.height+1) - tx.height if tx.height > 0 else 0,
|
||||
'confirmations': (headers.height + 1) - tx.height if tx.height > 0 else 0,
|
||||
'claim_info': [],
|
||||
'update_info': [],
|
||||
'support_info': [],
|
||||
|
@ -1130,7 +1036,7 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
is_my_inputs = all([txi.is_my_input for txi in tx.inputs])
|
||||
if is_my_inputs:
|
||||
# fees only matter if we are the ones paying them
|
||||
item['value'] = dewies_to_lbc(tx.net_account_balance+tx.fee)
|
||||
item['value'] = dewies_to_lbc(tx.net_account_balance + tx.fee)
|
||||
item['fee'] = dewies_to_lbc(-tx.fee)
|
||||
else:
|
||||
# someone else paid the fees
|
||||
|
@ -1159,7 +1065,7 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
if previous is not None:
|
||||
item['update_info'].append({
|
||||
'address': txo.get_address(self),
|
||||
'balance_delta': dewies_to_lbc(previous.amount-txo.amount),
|
||||
'balance_delta': dewies_to_lbc(previous.amount - txo.amount),
|
||||
'amount': dewies_to_lbc(txo.amount),
|
||||
'claim_id': txo.claim_id,
|
||||
'claim_name': txo.claim_name,
|
||||
|
@ -1237,7 +1143,7 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
for account in accounts:
|
||||
balance = self._balance_cache.get(account.id)
|
||||
if not balance:
|
||||
balance = self._balance_cache[account.id] =\
|
||||
balance = self._balance_cache[account.id] = \
|
||||
await account.get_detailed_balance(confirmations, reserved_subtotals=True)
|
||||
for key, value in balance.items():
|
||||
if key == 'reserved_subtotals':
|
||||
|
@ -1247,7 +1153,6 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
result[key] += value
|
||||
return result
|
||||
|
||||
|
||||
class TestNetLedger(Ledger):
|
||||
network_name = 'testnet'
|
||||
pubkey_address_prefix = bytes((111,))
|
||||
|
@ -1256,7 +1161,6 @@ class TestNetLedger(Ledger):
|
|||
extended_private_key_prefix = unhexlify('04358394')
|
||||
checkpoints = {}
|
||||
|
||||
|
||||
class RegTestLedger(Ledger):
|
||||
network_name = 'regtest'
|
||||
headers_class = UnvalidatedHeaders
|
||||
|
|
|
@ -260,9 +260,9 @@ class Network:
|
|||
restricted = known_height in (None, -1, 0) or 0 > known_height > self.remote_height - 10
|
||||
return self.rpc('blockchain.transaction.get', [tx_hash], restricted)
|
||||
|
||||
def get_transaction_batch(self, txids, restricted=True, session=None):
|
||||
def get_transaction_batch(self, txids, restricted=True):
|
||||
# use any server if its old, otherwise restrict to who gave us the history
|
||||
return self.rpc('blockchain.transaction.get_batch', txids, restricted, session)
|
||||
return self.rpc('blockchain.transaction.get_batch', txids, restricted)
|
||||
|
||||
def get_transaction_and_merkle(self, tx_hash, known_height=None):
|
||||
# use any server if its old, otherwise restrict to who gave us the history
|
||||
|
|
|
@ -40,7 +40,7 @@ class MockNetwork:
|
|||
merkle = await self.get_merkle(tx_hash, known_height)
|
||||
return tx, merkle
|
||||
|
||||
async def get_transaction_batch(self, txids):
|
||||
async def get_transaction_batch(self, txids, restricted):
|
||||
return {
|
||||
txid: await self.get_transaction_and_merkle(txid)
|
||||
for txid in txids
|
||||
|
@ -126,9 +126,7 @@ class TestSynchronization(LedgerTestCase):
|
|||
|
||||
self.ledger.network.get_history_called = []
|
||||
self.ledger.network.get_transaction_called = []
|
||||
self.assertFalse(self.ledger._tx_cache[txid1].tx.is_verified)
|
||||
self.assertFalse(self.ledger._tx_cache[txid2].tx.is_verified)
|
||||
self.assertFalse(self.ledger._tx_cache[txid3].tx.is_verified)
|
||||
self.assertEqual(0, len(self.ledger._tx_cache))
|
||||
await self.ledger.update_history(address, '')
|
||||
self.assertListEqual(self.ledger.network.get_history_called, [address])
|
||||
self.assertListEqual(self.ledger.network.get_transaction_called, [])
|
||||
|
|
Loading…
Add table
Reference in a new issue