add default merge operator

This commit is contained in:
twmht 2017-04-16 14:13:01 +08:00
parent 66c053bb92
commit 9fcafffbf4
5 changed files with 211 additions and 4 deletions

View file

@ -318,8 +318,8 @@ cdef class PyMergeOperator(object):
cdef object ob cdef object ob
def __cinit__(self, object ob): def __cinit__(self, object ob):
if isinstance(ob, IAssociativeMergeOperator):
self.ob = ob self.ob = ob
if isinstance(ob, IAssociativeMergeOperator):
self.merge_op.reset( self.merge_op.reset(
<merge_operator.MergeOperator*> <merge_operator.MergeOperator*>
new merge_operator.AssociativeMergeOperatorWrapper( new merge_operator.AssociativeMergeOperatorWrapper(
@ -328,7 +328,6 @@ cdef class PyMergeOperator(object):
merge_callback)) merge_callback))
elif isinstance(ob, IMergeOperator): elif isinstance(ob, IMergeOperator):
self.ob = ob
self.merge_op.reset( self.merge_op.reset(
<merge_operator.MergeOperator*> <merge_operator.MergeOperator*>
new merge_operator.MergeOperatorWrapper( new merge_operator.MergeOperatorWrapper(
@ -337,11 +336,29 @@ cdef class PyMergeOperator(object):
<void*>ob, <void*>ob,
full_merge_callback, full_merge_callback,
partial_merge_callback)) partial_merge_callback))
elif isinstance(ob, str):
if ob == "put":
self.merge_op = merge_operator.MergeOperators.CreatePutOperator()
elif ob == "put_v1":
self.merge_op = merge_operator.MergeOperators.CreateDeprecatedPutOperator()
elif ob == "uint64add":
self.merge_op = merge_operator.MergeOperators.CreateUInt64AddOperator()
elif ob == "stringappend":
self.merge_op = merge_operator.MergeOperators.CreateStringAppendOperator()
#TODO: necessary?
# elif ob == "stringappendtest":
# self.merge_op = merge_operator.MergeOperators.CreateStringAppendTESTOperator()
elif ob == "max":
self.merge_op = merge_operator.MergeOperators.CreateMaxOperator()
else:
msg = "{0} is not the default type".format(ob)
raise TypeError(msg)
else: else:
msg = "%s is not of this types %s" msg = "%s is not of this types %s"
msg %= (ob, (IAssociativeMergeOperator, IMergeOperator)) msg %= (ob, (IAssociativeMergeOperator, IMergeOperator))
raise TypeError(msg) raise TypeError(msg)
cdef object get_ob(self): cdef object get_ob(self):
return self.ob return self.ob
@ -695,6 +712,7 @@ cdef class HashLinkListMemtableFactory(PyMemtableFactory):
self.factory.reset(memtablerep.NewHashLinkListRepFactory(bucket_count)) self.factory.reset(memtablerep.NewHashLinkListRepFactory(bucket_count))
################################## ##################################
cdef class CompressionType(object): cdef class CompressionType(object):
no_compression = u'no_compression' no_compression = u'no_compression'
snappy_compression = u'snappy_compression' snappy_compression = u'snappy_compression'
@ -787,6 +805,32 @@ cdef class Options(object):
def __set__(self, value): def __set__(self, value):
self.opts.max_open_files = value self.opts.max_open_files = value
property compression_opts:
def __get__(self):
cdef dict ret_ob = {}
ret_ob['window_bits'] = self.opts.compression_opts.window_bits
ret_ob['level'] = self.opts.compression_opts.level
ret_ob['strategy'] = self.opts.compression_opts.strategy
ret_ob['max_dict_bytes'] = self.opts.compression_opts.max_dict_bytes
return ret_ob
def __set__(self, dict value):
cdef options.CompressionOptions* copts
copts = cython.address(self.opts.compression_opts)
# CompressionOptions(int wbits, int _lev, int _strategy, int _max_dict_bytes)
if 'window_bits' in value:
copts.window_bits = value['window_bits']
if 'level' in value:
copts.level = value['level']
if 'strategy' in value:
copts.strategy = value['strategy']
if 'max_dict_bytes' in value:
copts.max_dict_bytes = value['max_dict_bytes']
property compression: property compression:
def __get__(self): def __get__(self):
if self.opts.compression == options.kNoCompression: if self.opts.compression == options.kNoCompression:

View file

@ -3,11 +3,32 @@ from libcpp cimport bool as cpp_bool
from libcpp.deque cimport deque from libcpp.deque cimport deque
from slice_ cimport Slice from slice_ cimport Slice
from logger cimport Logger from logger cimport Logger
from std_memory cimport shared_ptr
cdef extern from "rocksdb/merge_operator.h" namespace "rocksdb": cdef extern from "rocksdb/merge_operator.h" namespace "rocksdb":
cdef cppclass MergeOperator: cdef cppclass MergeOperator:
pass pass
# cdef extern from "utilities/merge_operators.h" namespace "rocksdb::MergeOperators":
cdef extern from "utilities/merge_operators.h" namespace "rocksdb":
cdef cppclass MergeOperators:
@staticmethod
shared_ptr[MergeOperator] CreatePutOperator()
@staticmethod
shared_ptr[MergeOperator] CreateDeprecatedPutOperator()
@staticmethod
shared_ptr[MergeOperator] CreateUInt64AddOperator()
@staticmethod
shared_ptr[MergeOperator] CreateStringAppendOperator()
@staticmethod
shared_ptr[MergeOperator] CreateStringAppendTESTOperator()
@staticmethod
shared_ptr[MergeOperator] CreateMaxOperator()
@staticmethod
shared_ptr[MergeOperator] CreateFromStringId(const string &)
ctypedef cpp_bool (*merge_func)( ctypedef cpp_bool (*merge_func)(
void*, void*,
const Slice&, const Slice&,

View file

@ -16,6 +16,14 @@ from universal_compaction cimport CompactionOptionsUniversal
from cache cimport Cache from cache cimport Cache
cdef extern from "rocksdb/options.h" namespace "rocksdb": cdef extern from "rocksdb/options.h" namespace "rocksdb":
cdef cppclass CompressionOptions:
int window_bits;
int level;
int strategy;
uint32_t max_dict_bytes
CompressionOptions() except +
CompressionOptions(int, int, int, int) except +
ctypedef enum CompactionStyle: ctypedef enum CompactionStyle:
kCompactionStyleLevel kCompactionStyleLevel
kCompactionStyleUniversal kCompactionStyleUniversal
@ -61,7 +69,6 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb":
CompressionType compression CompressionType compression
CompactionPri compaction_pri CompactionPri compaction_pri
# TODO: compression_per_level # TODO: compression_per_level
# TODO: compression_opts
shared_ptr[SliceTransform] prefix_extractor shared_ptr[SliceTransform] prefix_extractor
int num_levels int num_levels
int level0_file_num_compaction_trigger int level0_file_num_compaction_trigger
@ -121,7 +128,8 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb":
size_t inplace_update_num_locks size_t inplace_update_num_locks
shared_ptr[Cache] row_cache shared_ptr[Cache] row_cache
# TODO: remove options source_compaction_factor, max_grandparent_overlap_bytes and expanded_compaction_factor from document # TODO: remove options source_compaction_factor, max_grandparent_overlap_bytes and expanded_compaction_factor from document
uint64_t max_compaction_bytes; uint64_t max_compaction_bytes
CompressionOptions compression_opts
cdef cppclass WriteOptions: cdef cppclass WriteOptions:
cpp_bool sync cpp_bool sync

View file

@ -4,6 +4,7 @@ import gc
import unittest import unittest
import rocksdb import rocksdb
from itertools import takewhile from itertools import takewhile
import struct
def int_to_bytes(ob): def int_to_bytes(ob):
return str(ob).encode('ascii') return str(ob).encode('ascii')
@ -230,6 +231,106 @@ class AssocCounter(rocksdb.interfaces.AssociativeMergeOperator):
def name(self): def name(self):
return b'AssocCounter' return b'AssocCounter'
class TestUint64Merge(unittest.TestCase, TestHelper):
def setUp(self):
opts = rocksdb.Options()
opts.create_if_missing = True
opts.merge_operator = "uint64add"
self._clean()
self.db = rocksdb.DB('/tmp/test', opts)
def tearDown(self):
self._close_db()
def test_merge(self):
self.db.put(b'a', struct.pack('Q', 5566))
for x in range(1000):
self.db.merge(b"a", struct.pack('Q', x))
print ('value', struct.unpack('Q', self.db.get(b'a'))[0])
self.assertEqual(5566 + sum(range(1000)), struct.unpack('Q', self.db.get(b'a'))[0])
class TestUint64Merge(unittest.TestCase, TestHelper):
def setUp(self):
opts = rocksdb.Options()
opts.create_if_missing = True
opts.merge_operator = "uint64add"
self._clean()
self.db = rocksdb.DB('/tmp/test', opts)
def tearDown(self):
self._close_db()
def test_merge(self):
self.db.put(b'a', struct.pack('Q', 5566))
for x in range(1000):
self.db.merge(b"a", struct.pack('Q', x))
# print ('value', struct.unpack('Q', self.db.get(b'a'))[0])
self.assertEqual(5566 + sum(range(1000)), struct.unpack('Q', self.db.get(b'a'))[0])
class TestPutMerge(unittest.TestCase, TestHelper):
def setUp(self):
opts = rocksdb.Options()
opts.create_if_missing = True
opts.merge_operator = "put"
self._clean()
self.db = rocksdb.DB('/tmp/test', opts)
def tearDown(self):
self._close_db()
def test_merge(self):
self.db.put(b'a', b'ccc')
self.db.merge(b'a', b'ddd')
self.assertEqual(self.db.get(b'a'), 'ddd')
class TestPutV1Merge(unittest.TestCase, TestHelper):
def setUp(self):
opts = rocksdb.Options()
opts.create_if_missing = True
opts.merge_operator = "put_v1"
self._clean()
self.db = rocksdb.DB('/tmp/test', opts)
def tearDown(self):
self._close_db()
def test_merge(self):
self.db.put(b'a', b'ccc')
self.db.merge(b'a', b'ddd')
self.assertEqual(self.db.get(b'a'), 'ddd')
class TestStringAppendOperatorMerge(unittest.TestCase, TestHelper):
def setUp(self):
opts = rocksdb.Options()
opts.create_if_missing = True
opts.merge_operator = "stringappend"
self._clean()
self.db = rocksdb.DB('/tmp/test', opts)
def tearDown(self):
self._close_db()
def test_merge(self):
self.db.put(b'a', b'ccc')
self.db.merge(b'a', b'ddd')
self.assertEqual(self.db.get(b'a'), 'ccc,ddd')
class TestStringMaxOperatorMerge(unittest.TestCase, TestHelper):
def setUp(self):
opts = rocksdb.Options()
opts.create_if_missing = True
opts.merge_operator = "max"
self._clean()
self.db = rocksdb.DB('/tmp/test', opts)
def tearDown(self):
self._close_db()
def test_merge(self):
self.db.put(b'a', int_to_bytes(55))
self.db.merge(b'a', int_to_bytes(56))
self.assertEqual(int(self.db.get(b'a')), 56)
class TestAssocMerge(unittest.TestCase, TestHelper): class TestAssocMerge(unittest.TestCase, TestHelper):
def setUp(self): def setUp(self):

View file

@ -22,6 +22,39 @@ class TestMergeOperator(rocksdb.interfaces.MergeOperator):
return b'testmergeop' return b'testmergeop'
class TestOptions(unittest.TestCase): class TestOptions(unittest.TestCase):
def test_default_merge_operator(self):
opts = rocksdb.Options()
self.assertEqual(True, opts.paranoid_checks)
opts.paranoid_checks = False
self.assertEqual(False, opts.paranoid_checks)
self.assertIsNone(opts.merge_operator)
opts.merge_operator = "uint64add"
self.assertIsNotNone(opts.merge_operator)
self.assertEqual(opts.merge_operator, "uint64add")
with self.assertRaises(TypeError):
opts.merge_operator = "not an operator"
def test_compression_opts(self):
opts = rocksdb.Options()
compression_opts = opts.compression_opts
# default value
self.assertEqual(isinstance(compression_opts, dict), True)
self.assertEqual(compression_opts['window_bits'], -14)
self.assertEqual(compression_opts['level'], -1)
self.assertEqual(compression_opts['strategy'], 0)
self.assertEqual(compression_opts['max_dict_bytes'], 0)
with self.assertRaises(TypeError):
opts.compression_opts = list(1,2)
opts.compression_opts = {'window_bits': 1, 'level': 2, 'strategy': 3, 'max_dict_bytes': 4}
compression_opts = opts.compression_opts
self.assertEqual(compression_opts['window_bits'], 1)
self.assertEqual(compression_opts['level'], 2)
self.assertEqual(compression_opts['strategy'], 3)
self.assertEqual(compression_opts['max_dict_bytes'], 4)
def test_simple(self): def test_simple(self):
opts = rocksdb.Options() opts = rocksdb.Options()
self.assertEqual(True, opts.paranoid_checks) self.assertEqual(True, opts.paranoid_checks)