Release the GIL if calls into rocksdb are made

This commit is contained in:
hofmockel 2014-01-18 12:24:49 +01:00
parent 86e6aef6cb
commit ecb6e26546

View file

@ -15,13 +15,13 @@ cimport options
cimport merge_operator cimport merge_operator
cimport filter_policy cimport filter_policy
cimport comparator cimport comparator
cimport slice_
cimport cache cimport cache
cimport logger cimport logger
cimport snapshot cimport snapshot
cimport db cimport db
cimport iterator cimport iterator
from slice_ cimport Slice
from status cimport Status from status cimport Status
import sys import sys
@ -33,7 +33,7 @@ import traceback
import errors import errors
cdef extern from "cpp/utils.hpp" namespace "py_rocks": cdef extern from "cpp/utils.hpp" namespace "py_rocks":
cdef const slice_.Slice* vector_data(vector[slice_.Slice]&) cdef const Slice* vector_data(vector[Slice]&)
## Here comes the stuff to wrap the status to exception ## Here comes the stuff to wrap the status to exception
cdef check_status(const Status& st): cdef check_status(const Status& st):
@ -71,10 +71,10 @@ cdef string bytes_to_string(path) except *:
cdef string_to_bytes(string ob): cdef string_to_bytes(string ob):
return PyBytes_FromStringAndSize(ob.c_str(), ob.size()) return PyBytes_FromStringAndSize(ob.c_str(), ob.size())
cdef slice_.Slice bytes_to_slice(ob) except *: cdef Slice bytes_to_slice(ob) except *:
return slice_.Slice(PyBytes_AsString(ob), PyBytes_Size(ob)) return Slice(PyBytes_AsString(ob), PyBytes_Size(ob))
cdef slice_to_bytes(slice_.Slice sl): cdef slice_to_bytes(Slice sl):
return PyBytes_FromStringAndSize(sl.data(), sl.size()) return PyBytes_FromStringAndSize(sl.data(), sl.size())
## only for filsystem paths ## only for filsystem paths
@ -149,8 +149,8 @@ cdef class PyBytewiseComparator(PyComparator):
cdef int compare_callback( cdef int compare_callback(
void* ctx, void* ctx,
const slice_.Slice& a, const Slice& a,
const slice_.Slice& b) with gil: const Slice& b) with gil:
return (<object>ctx).compare(slice_to_bytes(a), slice_to_bytes(b)) return (<object>ctx).compare(slice_to_bytes(a), slice_to_bytes(b))
@ -197,7 +197,7 @@ cdef class PyGenericFilterPolicy(PyFilterPolicy):
cdef void create_filter_callback( cdef void create_filter_callback(
void* ctx, void* ctx,
const slice_.Slice* keys, const Slice* keys,
int n, int n,
string* dst) with gil: string* dst) with gil:
@ -207,8 +207,8 @@ cdef void create_filter_callback(
cdef cpp_bool key_may_match_callback( cdef cpp_bool key_may_match_callback(
void* ctx, void* ctx,
const slice_.Slice& key, const Slice& key,
const slice_.Slice& filt) with gil: const Slice& filt) with gil:
return (<object>ctx).key_may_match( return (<object>ctx).key_may_match(
slice_to_bytes(key), slice_to_bytes(key),
@ -229,7 +229,7 @@ cdef class PyBloomFilterPolicy(PyFilterPolicy):
def create_filter(self, keys): def create_filter(self, keys):
cdef string dst cdef string dst
cdef vector[slice_.Slice] c_keys cdef vector[Slice] c_keys
for key in keys: for key in keys:
c_keys.push_back(bytes_to_slice(key)) c_keys.push_back(bytes_to_slice(key))
@ -294,9 +294,9 @@ cdef class PyMergeOperator(object):
cdef cpp_bool merge_callback( cdef cpp_bool merge_callback(
void* ctx, void* ctx,
const slice_.Slice& key, const Slice& key,
const slice_.Slice* existing_value, const Slice* existing_value,
const slice_.Slice& value, const Slice& value,
string* new_value, string* new_value,
logger.Logger* log) with gil: logger.Logger* log) with gil:
@ -326,8 +326,8 @@ cdef cpp_bool merge_callback(
cdef cpp_bool full_merge_callback( cdef cpp_bool full_merge_callback(
void* ctx, void* ctx,
const slice_.Slice& key, const Slice& key,
const slice_.Slice* existing_value, const Slice* existing_value,
const deque[string]& op_list, const deque[string]& op_list,
string* new_value, string* new_value,
logger.Logger* log) with gil: logger.Logger* log) with gil:
@ -358,9 +358,9 @@ cdef cpp_bool full_merge_callback(
cdef cpp_bool partial_merge_callback( cdef cpp_bool partial_merge_callback(
void* ctx, void* ctx,
const slice_.Slice& key, const Slice& key,
const slice_.Slice& left_op, const Slice& left_op,
const slice_.Slice& right_op, const Slice& right_op,
string* new_value, string* new_value,
logger.Logger* log) with gil: logger.Logger* log) with gil:
@ -966,65 +966,85 @@ cdef class DB(object):
cdef db.DB* db cdef db.DB* db
def __cinit__(self, db_name, Options opts, read_only=False): def __cinit__(self, db_name, Options opts, read_only=False):
if read_only: cdef Status st
check_status( cdef string db_path = path_to_string(db_name)
db.DB_OpenForReadOnly(
deref(opts.opts),
path_to_string(db_name),
cython.address(self.db),
False))
else:
check_status(
db.DB_Open(
deref(opts.opts),
path_to_string(db_name),
cython.address(self.db)))
if read_only:
with nogil:
st = db.DB_OpenForReadOnly(
deref(opts.opts),
db_path,
cython.address(self.db),
False)
else:
with nogil:
st = db.DB_Open(
deref(opts.opts),
db_path,
cython.address(self.db))
check_status(st)
self.opts = opts self.opts = opts
def __dealloc__(self): def __dealloc__(self):
del self.db del self.db
def put(self, key, value, sync=False, disable_wal=False): def put(self, key, value, sync=False, disable_wal=False):
cdef Status st
cdef options.WriteOptions opts cdef options.WriteOptions opts
opts.sync = sync opts.sync = sync
opts.disableWAL = disable_wal opts.disableWAL = disable_wal
check_status( cdef Slice c_key = bytes_to_slice(key)
self.db.Put(opts, bytes_to_slice(key), bytes_to_slice(value))) cdef Slice c_value = bytes_to_slice(value)
with nogil:
st = self.db.Put(opts, c_key, c_value)
check_status(st)
def delete(self, key, sync=False, disable_wal=False): def delete(self, key, sync=False, disable_wal=False):
cdef Status st
cdef options.WriteOptions opts cdef options.WriteOptions opts
opts.sync = sync opts.sync = sync
opts.disableWAL = disable_wal opts.disableWAL = disable_wal
check_status( cdef Slice c_key = bytes_to_slice(key)
self.db.Delete(opts, bytes_to_slice(key))) with nogil:
st = self.db.Delete(opts, c_key)
check_status(st)
def merge(self, key, value, sync=False, disable_wal=False): def merge(self, key, value, sync=False, disable_wal=False):
cdef Status st
cdef options.WriteOptions opts cdef options.WriteOptions opts
opts.sync = sync opts.sync = sync
opts.disableWAL = disable_wal opts.disableWAL = disable_wal
check_status( cdef Slice c_key = bytes_to_slice(key)
self.db.Merge(opts, bytes_to_slice(key), bytes_to_slice(value))) cdef Slice c_value = bytes_to_slice(value)
with nogil:
st = self.db.Merge(opts, c_key, c_value)
check_status(st)
def write(self, WriteBatch batch, sync=False, disable_wal=False): def write(self, WriteBatch batch, sync=False, disable_wal=False):
cdef Status st
cdef options.WriteOptions opts cdef options.WriteOptions opts
opts.sync = sync opts.sync = sync
opts.disableWAL = disable_wal opts.disableWAL = disable_wal
check_status( with nogil:
self.db.Write(opts, batch.batch)) st = self.db.Write(opts, batch.batch)
check_status(st)
def get(self, key, *args, **kwargs): def get(self, key, *args, **kwargs):
cdef string res cdef string res
cdef Status st cdef Status st
cdef options.ReadOptions opts
st = self.db.Get( opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs))
self.build_read_opts(self.__parse_read_opts(*args, **kwargs)), cdef Slice c_key = bytes_to_slice(key)
bytes_to_slice(key),
cython.address(res)) with nogil:
st = self.db.Get(opts, c_key, cython.address(res))
if st.ok(): if st.ok():
return string_to_bytes(res) return string_to_bytes(res)
@ -1037,14 +1057,19 @@ cdef class DB(object):
cdef vector[string] values cdef vector[string] values
values.resize(len(keys)) values.resize(len(keys))
cdef vector[slice_.Slice] c_keys cdef vector[Slice] c_keys
for key in keys: for key in keys:
c_keys.push_back(bytes_to_slice(key)) c_keys.push_back(bytes_to_slice(key))
cdef vector[Status] res = self.db.MultiGet( cdef options.ReadOptions opts
self.build_read_opts(self.__parse_read_opts(*args, **kwargs)), opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs))
c_keys,
cython.address(values)) cdef vector[Status] res
with nogil:
res = self.db.MultiGet(
opts,
c_keys,
cython.address(values))
cdef dict ret_dict = {} cdef dict ret_dict = {}
for index in range(len(keys)): for index in range(len(keys)):
@ -1062,15 +1087,20 @@ cdef class DB(object):
cdef cpp_bool value_found cdef cpp_bool value_found
cdef cpp_bool exists cdef cpp_bool exists
cdef options.ReadOptions opts cdef options.ReadOptions opts
cdef Slice c_key
opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs)) opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs))
c_key = bytes_to_slice(key)
exists = False
if fetch: if fetch:
value_found = False value_found = False
exists = self.db.KeyMayExist( with nogil:
opts, exists = self.db.KeyMayExist(
bytes_to_slice(key), opts,
cython.address(value), c_key,
cython.address(value_found)) cython.address(value),
cython.address(value_found))
if exists: if exists:
if value_found: if value_found:
@ -1080,10 +1110,11 @@ cdef class DB(object):
else: else:
return (False, None) return (False, None)
else: else:
exists = self.db.KeyMayExist( with nogil:
opts, exists = self.db.KeyMayExist(
bytes_to_slice(key), opts,
cython.address(value)) c_key,
cython.address(value))
return (exists, None) return (exists, None)
@ -1092,7 +1123,8 @@ cdef class DB(object):
cdef KeysIterator it cdef KeysIterator it
opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs)) opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs))
it = KeysIterator(self) it = KeysIterator(self)
it.ptr = self.db.NewIterator(opts) with nogil:
it.ptr = self.db.NewIterator(opts)
return it return it
def itervalues(self, prefix=None, *args, **kwargs): def itervalues(self, prefix=None, *args, **kwargs):
@ -1100,7 +1132,8 @@ cdef class DB(object):
cdef ValuesIterator it cdef ValuesIterator it
opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs)) opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs))
it = ValuesIterator(self) it = ValuesIterator(self)
it.ptr = self.db.NewIterator(opts) with nogil:
it.ptr = self.db.NewIterator(opts)
return it return it
def iteritems(self, prefix=None, *args, **kwargs): def iteritems(self, prefix=None, *args, **kwargs):
@ -1108,7 +1141,8 @@ cdef class DB(object):
cdef ItemsIterator it cdef ItemsIterator it
opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs)) opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs))
it = ItemsIterator(self) it = ItemsIterator(self)
it.ptr = self.db.NewIterator(opts) with nogil:
it.ptr = self.db.NewIterator(opts)
return it return it
def snapshot(self): def snapshot(self):
@ -1116,8 +1150,13 @@ cdef class DB(object):
def get_property(self, prop): def get_property(self, prop):
cdef string value cdef string value
cdef Slice c_prop = bytes_to_slice(prop)
cdef cpp_bool ret = False
if self.db.GetProperty(bytes_to_slice(prop), cython.address(value)): with nogil:
ret = self.db.GetProperty(c_prop, cython.address(value))
if ret:
return string_to_bytes(value) return string_to_bytes(value)
else: else:
return None return None
@ -1125,7 +1164,8 @@ cdef class DB(object):
def get_live_files_metadata(self): def get_live_files_metadata(self):
cdef vector[db.LiveFileMetaData] metadata cdef vector[db.LiveFileMetaData] metadata
self.db.GetLiveFilesMetaData(cython.address(metadata)) with nogil:
self.db.GetLiveFilesMetaData(cython.address(metadata))
ret = [] ret = []
for ob in metadata: for ob in metadata:
@ -1181,10 +1221,12 @@ cdef class Snapshot(object):
def __cinit__(self, DB db): def __cinit__(self, DB db):
self.db = db self.db = db
self.ptr = db.db.GetSnapshot() with nogil:
self.ptr = db.db.GetSnapshot()
def __dealloc__(self): def __dealloc__(self):
self.db.db.ReleaseSnapshot(self.ptr) with nogil:
self.db.db.ReleaseSnapshot(self.ptr)
@cython.internal @cython.internal
@ -1208,20 +1250,25 @@ cdef class BaseIterator(object):
raise StopIteration() raise StopIteration()
cdef object ret = self.get_ob() cdef object ret = self.get_ob()
self.ptr.Next() with nogil:
self.ptr.Next()
return ret return ret
def __reversed__(self): def __reversed__(self):
return ReversedIterator(self) return ReversedIterator(self)
cpdef seek_to_first(self): cpdef seek_to_first(self):
self.ptr.SeekToFirst() with nogil:
self.ptr.SeekToFirst()
cpdef seek_to_last(self): cpdef seek_to_last(self):
self.ptr.SeekToLast() with nogil:
self.ptr.SeekToLast()
cpdef seek(self, key): cpdef seek(self, key):
self.ptr.Seek(bytes_to_slice(key)) cdef Slice c_key = bytes_to_slice(key)
with nogil:
self.ptr.Seek(c_key)
cdef object get_ob(self): cdef object get_ob(self):
return None return None
@ -1229,17 +1276,28 @@ cdef class BaseIterator(object):
@cython.internal @cython.internal
cdef class KeysIterator(BaseIterator): cdef class KeysIterator(BaseIterator):
cdef object get_ob(self): cdef object get_ob(self):
return slice_to_bytes(self.ptr.key()) cdef Slice c_key
with nogil:
c_key = self.ptr.key()
return slice_to_bytes(c_key)
@cython.internal @cython.internal
cdef class ValuesIterator(BaseIterator): cdef class ValuesIterator(BaseIterator):
cdef object get_ob(self): cdef object get_ob(self):
return slice_to_bytes(self.ptr.value()) cdef Slice c_value
with nogil:
c_value = self.ptr.value()
return slice_to_bytes(c_value)
@cython.internal @cython.internal
cdef class ItemsIterator(BaseIterator): cdef class ItemsIterator(BaseIterator):
cdef object get_ob(self): cdef object get_ob(self):
return (slice_to_bytes(self.ptr.key()), slice_to_bytes(self.ptr.value())) cdef Slice c_key
cdef Slice c_value
with nogil:
c_key = self.ptr.key()
c_value = self.ptr.value()
return (slice_to_bytes(c_key), slice_to_bytes(c_value))
@cython.internal @cython.internal
cdef class ReversedIterator(object): cdef class ReversedIterator(object):