use DeferredLock instead of threading.Lock() as it can cause deadlocks

This commit is contained in:
Kay Kurokawa 2017-10-06 17:05:10 -04:00
parent 2115919c55
commit d9cc81766d
No known key found for this signature in database
GPG key ID: 2D65FB1698958B8D
2 changed files with 18 additions and 15 deletions

View file

@ -1,6 +1,5 @@
import logging import logging
import os import os
import threading
from twisted.internet import defer, threads from twisted.internet import defer, threads
from twisted.protocols.basic import FileSender from twisted.protocols.basic import FileSender
from twisted.web.client import FileBodyProducer from twisted.web.client import FileBodyProducer
@ -40,7 +39,7 @@ class BlobFile(object):
self.readers = 0 self.readers = 0
self.blob_dir = blob_dir self.blob_dir = blob_dir
self.file_path = os.path.join(blob_dir, self.blob_hash) self.file_path = os.path.join(blob_dir, self.blob_hash)
self.setting_verified_blob_lock = threading.Lock() self.blob_write_lock = defer.DeferredLock()
self.moved_verified_blob = False self.moved_verified_blob = False
if os.path.isfile(self.file_path): if os.path.isfile(self.file_path):
self.set_length(os.path.getsize(self.file_path)) self.set_length(os.path.getsize(self.file_path))
@ -201,7 +200,7 @@ class BlobFile(object):
if err is None: if err is None:
if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash: if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash:
if self._verified is False: if self._verified is False:
d = self._save_verified_blob(writer) d = self.save_verified_blob(writer)
d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred) d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred)
d.addCallback(lambda _: cancel_other_downloads()) d.addCallback(lambda _: cancel_other_downloads())
else: else:
@ -219,15 +218,19 @@ class BlobFile(object):
d.addBoth(lambda _: writer.close_handle()) d.addBoth(lambda _: writer.close_handle())
return d return d
def save_verified_blob(self, writer):
# we cannot have multiple _save_verified_blob interrupting
# each other, can happen since startProducing is a deferred
return self.blob_write_lock.run(self._save_verified_blob, writer)
@defer.inlineCallbacks @defer.inlineCallbacks
def _save_verified_blob(self, writer): def _save_verified_blob(self, writer):
with self.setting_verified_blob_lock: if self.moved_verified_blob is False:
if self.moved_verified_blob is False: writer.write_handle.seek(0)
writer.write_handle.seek(0) out_path = os.path.join(self.blob_dir, self.blob_hash)
out_path = os.path.join(self.blob_dir, self.blob_hash) producer = FileBodyProducer(writer.write_handle)
producer = FileBodyProducer(writer.write_handle) yield producer.startProducing(open(out_path, 'wb'))
yield producer.startProducing(open(out_path, 'wb')) self.moved_verified_blob = True
self.moved_verified_blob = True defer.returnValue(True)
defer.returnValue(True) else:
else: raise DownloadCanceledError()
raise DownloadCanceledError()

View file

@ -150,7 +150,7 @@ class BlobFileTest(unittest.TestCase):
writer_1, finished_d_1 = blob_file.open_for_writing(peer=1) writer_1, finished_d_1 = blob_file.open_for_writing(peer=1)
writer_2, finished_d_2 = blob_file.open_for_writing(peer=2) writer_2, finished_d_2 = blob_file.open_for_writing(peer=2)
blob_file._save_verified_blob(writer_1) blob_file.save_verified_blob(writer_1)
# second write should fail to save # second write should fail to save
yield self.assertFailure(blob_file._save_verified_blob(writer_2), DownloadCanceledError) yield self.assertFailure(blob_file.save_verified_blob(writer_2), DownloadCanceledError)