Column Family Support

Add support for Column Families in a runtime safe way.
Add unittests to test functionality
Insure all unittests are passing.
Cleaned up unittests to not use a fixed directory in tmp, but use tempfile
This commit is contained in:
Jason Fried 2018-11-02 18:27:14 +00:00
parent 98910c2dce
commit 2a66e20ca3
9 changed files with 1274 additions and 533 deletions

File diff suppressed because it is too large Load diff

View file

@ -13,14 +13,17 @@ class RecordItemsHandler: public rocksdb::WriteBatch::Handler {
public:
BatchItem(
const Optype& op,
uint32_t column_family_id,
const rocksdb::Slice& key,
const rocksdb::Slice& value):
op(op),
column_family_id(column_family_id),
key(key),
value(value)
{}
const Optype op;
uint32_t column_family_id;
const rocksdb::Slice key;
const rocksdb::Slice value;
};
@ -31,17 +34,23 @@ class RecordItemsHandler: public rocksdb::WriteBatch::Handler {
/* Items is filled during iteration. */
RecordItemsHandler(BatchItems* items): items(items) {}
void Put(const Slice& key, const Slice& value) {
this->items->emplace_back(PutRecord, key, value);
virtual rocksdb::Status PutCF(
uint32_t column_family_id, const Slice& key, const Slice& value) {
this->items->emplace_back(PutRecord, column_family_id, key, value);
return rocksdb::Status::OK();
}
void Merge(const Slice& key, const Slice& value) {
this->items->emplace_back(MergeRecord, key, value);
virtual rocksdb::Status MergeCF(
uint32_t column_family_id, const Slice& key, const Slice& value) {
this->items->emplace_back(MergeRecord, column_family_id, key, value);
return rocksdb::Status::OK();
}
virtual void Delete(const Slice& key) {
this->items->emplace_back(DeleteRecord, key, rocksdb::Slice());
}
virtual rocksdb::Status DeleteCF(
uint32_t column_family_id, const Slice& key) {
this->items->emplace_back(DeleteRecord, column_family_id, key, rocksdb::Slice());
return rocksdb::Status::OK();
}
private:
BatchItems* items;

View file

@ -1,5 +1,5 @@
cimport options
from libc.stdint cimport uint64_t
from libc.stdint cimport uint64_t, uint32_t
from status cimport Status
from libcpp cimport bool as cpp_bool
from libcpp.string cimport string
@ -13,8 +13,11 @@ cdef extern from "rocksdb/write_batch.h" namespace "rocksdb":
WriteBatch() nogil except+
WriteBatch(string) nogil except+
void Put(const Slice&, const Slice&) nogil except+
void Put(ColumnFamilyHandle*, const Slice&, const Slice&) nogil except+
void Merge(const Slice&, const Slice&) nogil except+
void Merge(ColumnFamilyHandle*, const Slice&, const Slice&) nogil except+
void Delete(const Slice&) nogil except+
void Delete(ColumnFamilyHandle*, const Slice&) nogil except+
void PutLogData(const Slice&) nogil except+
void Clear() nogil except+
const string& Data() nogil except+
@ -28,6 +31,7 @@ cdef extern from "cpp/write_batch_iter_helper.hpp" namespace "py_rocks":
cdef cppclass BatchItem "py_rocks::RecordItemsHandler::BatchItem":
BatchItemOp op
uint32_t column_family_id
Slice key
Slice value
@ -36,6 +40,7 @@ cdef extern from "cpp/write_batch_iter_helper.hpp" namespace "py_rocks":
cdef extern from "rocksdb/db.h" namespace "rocksdb":
ctypedef uint64_t SequenceNumber
string kDefaultColumnFamilyName
cdef struct LiveFileMetaData:
string name
@ -52,15 +57,18 @@ cdef extern from "rocksdb/db.h" namespace "rocksdb":
cdef cppclass DB:
Status Put(
const options.WriteOptions&,
ColumnFamilyHandle*,
const Slice&,
const Slice&) nogil except+
Status Delete(
const options.WriteOptions&,
ColumnFamilyHandle*,
const Slice&) nogil except+
Status Merge(
const options.WriteOptions&,
ColumnFamilyHandle*,
const Slice&,
const Slice&) nogil except+
@ -70,52 +78,73 @@ cdef extern from "rocksdb/db.h" namespace "rocksdb":
Status Get(
const options.ReadOptions&,
ColumnFamilyHandle*,
const Slice&,
string*) nogil except+
vector[Status] MultiGet(
const options.ReadOptions&,
const vector[ColumnFamilyHandle*]&,
const vector[Slice]&,
vector[string]*) nogil except+
cpp_bool KeyMayExist(
const options.ReadOptions&,
ColumnFamilyHandle*,
Slice&,
string*,
cpp_bool*) nogil except+
cpp_bool KeyMayExist(
const options.ReadOptions&,
ColumnFamilyHandle*,
Slice&,
string*) nogil except+
Iterator* NewIterator(
const options.ReadOptions&) nogil except+
const options.ReadOptions&,
ColumnFamilyHandle*) nogil except+
void NewIterators(
const options.ReadOptions&,
vector[ColumnFamilyHandle*]&,
vector[Iterator*]*) nogil except+
const Snapshot* GetSnapshot() nogil except+
void ReleaseSnapshot(const Snapshot*) nogil except+
cpp_bool GetProperty(
ColumnFamilyHandle*,
const Slice&,
string*) nogil except+
void GetApproximateSizes(
ColumnFamilyHandle*,
const Range*
int,
uint64_t*) nogil except+
Status CompactRange(
const options.CompactRangeOptions&,
ColumnFamilyHandle*,
const Slice*,
const Slice*) nogil except+
int NumberLevels() nogil except+
int MaxMemCompactionLevel() nogil except+
int Level0StopWriteTrigger() nogil except+
Status CreateColumnFamily(
const options.ColumnFamilyOptions&,
const string&,
ColumnFamilyHandle**) nogil except+
Status DropColumnFamily(
ColumnFamilyHandle*) nogil except+
int NumberLevels(ColumnFamilyHandle*) nogil except+
int MaxMemCompactionLevel(ColumnFamilyHandle*) nogil except+
int Level0StopWriteTrigger(ColumnFamilyHandle*) nogil except+
const string& GetName() nogil except+
const options.Options& GetOptions() nogil except+
Status Flush(const options.FlushOptions&) nogil except+
const options.Options& GetOptions(ColumnFamilyHandle*) nogil except+
Status Flush(const options.FlushOptions&, ColumnFamilyHandle*) nogil except+
Status DisableFileDeletions() nogil except+
Status EnableFileDeletions() nogil except+
@ -127,6 +156,7 @@ cdef extern from "rocksdb/db.h" namespace "rocksdb":
Status DeleteFile(string) nogil except+
void GetLiveFilesMetaData(vector[LiveFileMetaData]*) nogil except+
ColumnFamilyHandle* DefaultColumnFamily()
cdef Status DB_Open "rocksdb::DB::Open"(
@ -134,10 +164,42 @@ cdef extern from "rocksdb/db.h" namespace "rocksdb":
const string&,
DB**) nogil except+
cdef Status DB_Open_ColumnFamilies "rocksdb::DB::Open"(
const options.Options&,
const string&,
const vector[ColumnFamilyDescriptor]&,
vector[ColumnFamilyHandle*]*,
DB**) nogil except+
cdef Status DB_OpenForReadOnly "rocksdb::DB::OpenForReadOnly"(
const options.Options&,
const string&,
DB**,
cpp_bool) nogil except+
cdef Status DB_OpenForReadOnly_ColumnFamilies "rocksdb::DB::OpenForReadOnly"(
const options.Options&,
const string&,
const vector[ColumnFamilyDescriptor]&,
vector[ColumnFamilyHandle*]*,
DB**,
cpp_bool) nogil except+
cdef Status RepairDB(const string& dbname, const options.Options&)
cdef Status ListColumnFamilies "rocksdb::DB::ListColumnFamilies" (
const options.Options&,
const string&,
vector[string]*) nogil except+
cdef cppclass ColumnFamilyHandle:
const string& GetName() nogil except+
int GetID() nogil except+
cdef cppclass ColumnFamilyDescriptor:
ColumnFamilyDescriptor() nogil except+
ColumnFamilyDescriptor(
const string&,
const options.ColumnFamilyOptions&) nogil except+
string name
options.ColumnFamilyOptions options

View file

@ -14,7 +14,7 @@ class UintAddOperator(AssociativeMergeOperator):
class StringAppendOperator(AssociativeMergeOperator):
def merge(self, key, existing_value, value):
if existing_value:
s = existing_value + ',' + value
s = existing_value + b',' + value
return (True, s)
return (True, value)

View file

@ -52,20 +52,67 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb":
kOldestSmallestSeqFirst
kMinOverlappingRatio
cdef cppclass Options:
const Comparator* comparator
shared_ptr[MergeOperator] merge_operator
# TODO: compaction_filter
# TODO: compaction_filter_factory
# This needs to be in _rocksdb.pxd so it will export into python
#cpdef enum AccessHint "rocksdb::DBOptions::AccessHint":
# NONE,
# NORMAL,
# SEQUENTIAL,
# WILLNEED
cdef cppclass DBOptions:
cpp_bool create_if_missing
cpp_bool create_missing_column_families
cpp_bool error_if_exists
cpp_bool paranoid_checks
# TODO: env
shared_ptr[Logger] info_log
int max_open_files
int max_file_opening_threads
# TODO: statistics
cpp_bool use_fsync
string db_log_dir
string wal_dir
uint64_t delete_obsolete_files_period_micros
int max_background_jobs
int max_background_compactions
uint32_t max_subcompactions
int max_background_flushes
size_t max_log_file_size
size_t log_file_time_to_roll
size_t keep_log_file_num
size_t recycle_log_file_num
uint64_t max_manifest_file_size
int table_cache_numshardbits
uint64_t WAL_ttl_seconds
uint64_t WAL_size_limit_MB
size_t manifest_preallocation_size
cpp_bool allow_mmap_reads
cpp_bool allow_mmap_writes
cpp_bool use_direct_reads
cpp_bool use_direct_io_for_flush_and_compaction
cpp_bool allow_fallocate
cpp_bool is_fd_close_on_exec
cpp_bool skip_log_error_on_recovery
unsigned int stats_dump_period_sec
cpp_bool advise_random_on_open
size_t db_write_buffer_size
# AccessHint access_hint_on_compaction_start
cpp_bool use_adaptive_mutex
uint64_t bytes_per_sync
cpp_bool allow_concurrent_memtable_write
cpp_bool enable_write_thread_adaptive_yield
shared_ptr[Cache] row_cache
cdef cppclass ColumnFamilyOptions:
ColumnFamilyOptions()
ColumnFamilyOptions(const Options& options)
const Comparator* comparator
shared_ptr[MergeOperator] merge_operator
# TODO: compaction_filter
# TODO: compaction_filter_factory
size_t write_buffer_size
int max_write_buffer_number
int min_write_buffer_number_to_merge
int max_open_files
CompressionType compression
CompactionPri compaction_pri
# TODO: compression_per_level
@ -83,39 +130,15 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb":
int expanded_compaction_factor
int source_compaction_factor
int max_grandparent_overlap_factor
# TODO: statistics
cpp_bool disableDataSync
cpp_bool use_fsync
string db_log_dir
string wal_dir
uint64_t delete_obsolete_files_period_micros
int max_background_compactions
int max_background_flushes
size_t max_log_file_size
size_t log_file_time_to_roll
size_t keep_log_file_num
double soft_rate_limit
double hard_rate_limit
unsigned int rate_limit_delay_max_milliseconds
uint64_t max_manifest_file_size
int table_cache_numshardbits
size_t arena_block_size
# TODO: PrepareForBulkLoad()
cpp_bool disable_auto_compactions
uint64_t WAL_ttl_seconds
uint64_t WAL_size_limit_MB
size_t manifest_preallocation_size
cpp_bool purge_redundant_kvs_while_flush
cpp_bool allow_os_buffer
cpp_bool allow_mmap_reads
cpp_bool allow_mmap_writes
cpp_bool is_fd_close_on_exec
cpp_bool skip_log_error_on_recovery
unsigned int stats_dump_period_sec
cpp_bool advise_random_on_open
# TODO: enum { NONE, NORMAL, SEQUENTIAL, WILLNEED } access_hint_on_compaction_start
cpp_bool use_adaptive_mutex
uint64_t bytes_per_sync
cpp_bool verify_checksums_in_compaction
CompactionStyle compaction_style
CompactionOptionsUniversal compaction_options_universal
@ -126,12 +149,12 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb":
# TODO: table_properties_collectors
cpp_bool inplace_update_support
size_t inplace_update_num_locks
shared_ptr[Cache] row_cache
# TODO: remove options source_compaction_factor, max_grandparent_overlap_bytes and expanded_compaction_factor from document
uint64_t max_compaction_bytes
CompressionOptions compression_opts
cpp_bool allow_concurrent_memtable_write
cpp_bool enable_write_thread_adaptive_yield
cdef cppclass Options(DBOptions, ColumnFamilyOptions):
pass
cdef cppclass WriteOptions:
cpp_bool sync

View file

@ -6,37 +6,42 @@ import unittest
import rocksdb
from itertools import takewhile
import struct
import tempfile
from rocksdb.merge_operators import UintAddOperator, StringAppendOperator
def int_to_bytes(ob):
return str(ob).encode('ascii')
class TestHelper(object):
def _clean(self):
if os.path.exists('/tmp/test'):
shutil.rmtree("/tmp/test")
class TestHelper(unittest.TestCase):
def setUp(self):
self.db_loc = tempfile.mkdtemp()
self.addCleanup(self._close_db)
def _close_db(self):
del self.db
gc.collect()
if os.path.exists(self.db_loc):
shutil.rmtree(self.db_loc)
class TestDB(unittest.TestCase, TestHelper):
class TestDB(TestHelper):
def setUp(self):
TestHelper.setUp(self)
opts = rocksdb.Options(create_if_missing=True)
self._clean()
self.db = rocksdb.DB("/tmp/test", opts)
def tearDown(self):
self._close_db()
self.db = rocksdb.DB(os.path.join(self.db_loc, "test"), opts)
def test_options_used_twice(self):
if sys.version_info[0] == 3:
assertRaisesRegex = self.assertRaisesRegex
else:
assertRaisesRegex = self.assertRaisesRegexp
expected = "Options object is already used by another DB"
with self.assertRaisesRegexp(Exception, expected):
rocksdb.DB("/tmp/test2", self.db.options)
with assertRaisesRegex(Exception, expected):
rocksdb.DB(os.path.join(self.db_loc, "test2"), self.db.options)
def test_unicode_path(self):
name = b'/tmp/M\xc3\xbcnchen'.decode('utf8')
name = os.path.join(self.db_loc, b'M\xc3\xbcnchen'.decode('utf8'))
rocksdb.DB(name, rocksdb.Options(create_if_missing=True))
self.addCleanup(shutil.rmtree, name)
self.assertTrue(os.path.isdir(name))
@ -280,16 +285,13 @@ class AssocCounter(rocksdb.interfaces.AssociativeMergeOperator):
return b'AssocCounter'
class TestUint64Merge(unittest.TestCase, TestHelper):
class TestUint64Merge(TestHelper):
def setUp(self):
TestHelper.setUp(self)
opts = rocksdb.Options()
opts.create_if_missing = True
opts.merge_operator = UintAddOperator()
self._clean()
self.db = rocksdb.DB('/tmp/test', opts)
def tearDown(self):
self._close_db()
self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts)
def test_merge(self):
self.db.put(b'a', struct.pack('Q', 5566))
@ -298,67 +300,55 @@ class TestUint64Merge(unittest.TestCase, TestHelper):
self.assertEqual(5566 + sum(range(1000)), struct.unpack('Q', self.db.get(b'a'))[0])
# class TestPutMerge(unittest.TestCase, TestHelper):
# class TestPutMerge(TestHelper):
# def setUp(self):
# TestHelper.setUp(self)
# opts = rocksdb.Options()
# opts.create_if_missing = True
# opts.merge_operator = "put"
# self._clean()
# self.db = rocksdb.DB('/tmp/test', opts)
# def tearDown(self):
# self._close_db()
# self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts)
# def test_merge(self):
# self.db.put(b'a', b'ccc')
# self.db.merge(b'a', b'ddd')
# self.assertEqual(self.db.get(b'a'), 'ddd')
# class TestPutV1Merge(unittest.TestCase, TestHelper):
# class TestPutV1Merge(TestHelper):
# def setUp(self):
# TestHelper.setUp(self)
# opts = rocksdb.Options()
# opts.create_if_missing = True
# opts.merge_operator = "put_v1"
# self._clean()
# self.db = rocksdb.DB('/tmp/test', opts)
# def tearDown(self):
# self._close_db()
# self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts)
# def test_merge(self):
# self.db.put(b'a', b'ccc')
# self.db.merge(b'a', b'ddd')
# self.assertEqual(self.db.get(b'a'), 'ddd')
class TestStringAppendOperatorMerge(unittest.TestCase, TestHelper):
class TestStringAppendOperatorMerge(TestHelper):
def setUp(self):
TestHelper.setUp(self)
opts = rocksdb.Options()
opts.create_if_missing = True
opts.merge_operator = StringAppendOperator()
self._clean()
self.db = rocksdb.DB('/tmp/test', opts)
def tearDown(self):
self._close_db()
self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts)
# NOTE(sileht): Raise "Corruption: Error: Could not perform merge." on PY3
@unittest.skipIf(sys.version_info[0] == 3,
"Unexpected behavior on PY3")
#@unittest.skipIf(sys.version_info[0] == 3,
# "Unexpected behavior on PY3")
def test_merge(self):
self.db.put(b'a', b'ccc')
self.db.merge(b'a', b'ddd')
self.assertEqual(self.db.get(b'a'), b'ccc,ddd')
# class TestStringMaxOperatorMerge(unittest.TestCase, TestHelper):
# class TestStringMaxOperatorMerge(TestHelper):
# def setUp(self):
# TestHelper.setUp(self)
# opts = rocksdb.Options()
# opts.create_if_missing = True
# opts.merge_operator = "max"
# self._clean()
# self.db = rocksdb.DB('/tmp/test', opts)
# def tearDown(self):
# self._close_db()
# self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts)
# def test_merge(self):
# self.db.put(b'a', int_to_bytes(55))
@ -366,16 +356,13 @@ class TestStringAppendOperatorMerge(unittest.TestCase, TestHelper):
# self.assertEqual(int(self.db.get(b'a')), 56)
class TestAssocMerge(unittest.TestCase, TestHelper):
class TestAssocMerge(TestHelper):
def setUp(self):
TestHelper.setUp(self)
opts = rocksdb.Options()
opts.create_if_missing = True
opts.merge_operator = AssocCounter()
self._clean()
self.db = rocksdb.DB('/tmp/test', opts)
def tearDown(self):
self._close_db()
self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts)
def test_merge(self):
for x in range(1000):
@ -398,16 +385,13 @@ class FullCounter(rocksdb.interfaces.MergeOperator):
return (True, int_to_bytes(int(left) + int(right)))
class TestFullMerge(unittest.TestCase, TestHelper):
class TestFullMerge(TestHelper):
def setUp(self):
TestHelper.setUp(self)
opts = rocksdb.Options()
opts.create_if_missing = True
opts.merge_operator = FullCounter()
self._clean()
self.db = rocksdb.DB('/tmp/test', opts)
def tearDown(self):
self._close_db()
self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts)
def test_merge(self):
for x in range(1000):
@ -430,16 +414,13 @@ class SimpleComparator(rocksdb.interfaces.Comparator):
return 1
class TestComparator(unittest.TestCase, TestHelper):
class TestComparator(TestHelper):
def setUp(self):
TestHelper.setUp(self)
opts = rocksdb.Options()
opts.create_if_missing = True
opts.comparator = SimpleComparator()
self._clean()
self.db = rocksdb.DB('/tmp/test', opts)
def tearDown(self):
self._close_db()
self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts)
def test_compare(self):
for x in range(1000):
@ -460,15 +441,12 @@ class StaticPrefix(rocksdb.interfaces.SliceTransform):
def in_range(self, dst):
return len(dst) == 5
class TestPrefixExtractor(unittest.TestCase, TestHelper):
class TestPrefixExtractor(TestHelper):
def setUp(self):
TestHelper.setUp(self)
opts = rocksdb.Options(create_if_missing=True)
opts.prefix_extractor = StaticPrefix()
self._clean()
self.db = rocksdb.DB('/tmp/test', opts)
def tearDown(self):
self._close_db()
self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts)
def _fill_db(self):
for x in range(3000):
@ -502,3 +480,211 @@ class TestPrefixExtractor(unittest.TestCase, TestHelper):
ref = {b'00002.z': b'z', b'00002.y': b'y', b'00002.x': b'x'}
ret = takewhile(lambda item: item[0].startswith(b'00002'), it)
self.assertEqual(ref, dict(ret))
class TestDBColumnFamilies(TestHelper):
def setUp(self):
TestHelper.setUp(self)
opts = rocksdb.Options(create_if_missing=True)
self.db = rocksdb.DB(
os.path.join(self.db_loc, 'test'),
opts,
)
self.cf_a = self.db.create_column_family(b'A', rocksdb.ColumnFamilyOptions())
self.cf_b = self.db.create_column_family(b'B', rocksdb.ColumnFamilyOptions())
def test_column_families(self):
families = self.db.column_families
names = [handle.name for handle in families]
self.assertEqual([b'default', b'A', b'B'], names)
for name in names:
self.assertIn(self.db.get_column_family(name), families)
self.assertEqual(
names,
rocksdb.list_column_families(
os.path.join(self.db_loc, 'test'),
rocksdb.Options(),
)
)
def test_get_none(self):
self.assertIsNone(self.db.get(b'k'))
self.assertIsNone(self.db.get((self.cf_a, b'k')))
self.assertIsNone(self.db.get((self.cf_b, b'k')))
def test_put_get(self):
key = (self.cf_a, b'k')
self.db.put(key, b"v")
self.assertEqual(b"v", self.db.get(key))
self.assertIsNone(self.db.get(b"k"))
self.assertIsNone(self.db.get((self.cf_b, b"k")))
def test_multi_get(self):
data = [
(b'a', b'1default'),
(b'b', b'2default'),
(b'c', b'3default'),
((self.cf_a, b'a'), b'1a'),
((self.cf_a, b'b'), b'2a'),
((self.cf_a, b'c'), b'3a'),
((self.cf_b, b'a'), b'1b'),
((self.cf_b, b'b'), b'2b'),
((self.cf_b, b'c'), b'3b'),
]
for value in data:
self.db.put(*value)
multi_get_lookup = [value[0] for value in data]
ret = self.db.multi_get(multi_get_lookup)
ref = {value[0]: value[1] for value in data}
self.assertEqual(ref, ret)
def test_delete(self):
self.db.put((self.cf_a, b"a"), b"b")
self.assertEqual(b"b", self.db.get((self.cf_a, b"a")))
self.db.delete((self.cf_a, b"a"))
self.assertIsNone(self.db.get((self.cf_a, b"a")))
def test_write_batch(self):
cfa = self.db.get_column_family(b"A")
batch = rocksdb.WriteBatch()
batch.put((cfa, b"key"), b"v1")
batch.delete((self.cf_a, b"key"))
batch.put((cfa, b"key"), b"v2")
batch.put((cfa, b"key"), b"v3")
batch.put((cfa, b"a"), b"1")
batch.put((cfa, b"b"), b"2")
self.db.write(batch)
query = [(cfa, b"key"), (cfa, b"a"), (cfa, b"b")]
ret = self.db.multi_get(query)
self.assertEqual(b"v3", ret[query[0]])
self.assertEqual(b"1", ret[query[1]])
self.assertEqual(b"2", ret[query[2]])
def test_key_may_exists(self):
self.db.put((self.cf_a, b"a"), b'1')
self.assertEqual(
(False, None),
self.db.key_may_exist((self.cf_a, b"x"))
)
self.assertEqual(
(False, None),
self.db.key_may_exist((self.cf_a, b'x'), fetch=True)
)
self.assertEqual(
(True, None),
self.db.key_may_exist((self.cf_a, b'a'))
)
self.assertEqual(
(True, b'1'),
self.db.key_may_exist((self.cf_a, b'a'), fetch=True)
)
def test_iter_keys(self):
for x in range(300):
self.db.put((self.cf_a, int_to_bytes(x)), int_to_bytes(x))
it = self.db.iterkeys(self.cf_a)
self.assertEqual([], list(it))
it.seek_to_last()
self.assertEqual([(self.cf_a, b'99')], list(it))
ref = sorted([(self.cf_a, int_to_bytes(x)) for x in range(300)])
it.seek_to_first()
self.assertEqual(ref, list(it))
it.seek(b'90')
ref = sorted([(self.cf_a, int_to_bytes(x)) for x in range(90, 100)])
self.assertEqual(ref, list(it))
def test_iter_values(self):
for x in range(300):
self.db.put((self.cf_b, int_to_bytes(x)), int_to_bytes(x * 1000))
it = self.db.itervalues(self.cf_b)
self.assertEqual([], list(it))
it.seek_to_last()
self.assertEqual([b'99000'], list(it))
ref = sorted([int_to_bytes(x) for x in range(300)])
ref = [int_to_bytes(int(x) * 1000) for x in ref]
it.seek_to_first()
self.assertEqual(ref, list(it))
it.seek(b'90')
ref = [int_to_bytes(x * 1000) for x in range(90, 100)]
self.assertEqual(ref, list(it))
def test_iter_items(self):
for x in range(300):
self.db.put((self.cf_b, int_to_bytes(x)), int_to_bytes(x * 1000))
it = self.db.iteritems(self.cf_b)
self.assertEqual([], list(it))
it.seek_to_last()
self.assertEqual([((self.cf_b, b'99'), b'99000')], list(it))
ref = sorted([int_to_bytes(x) for x in range(300)])
ref = [((self.cf_b, x), int_to_bytes(int(x) * 1000)) for x in ref]
it.seek_to_first()
self.assertEqual(ref, list(it))
it.seek(b'90')
ref = [((self.cf_b, int_to_bytes(x)), int_to_bytes(x * 1000)) for x in range(90, 100)]
self.assertEqual(ref, list(it))
def test_reverse_iter(self):
for x in range(100):
self.db.put((self.cf_a, int_to_bytes(x)), int_to_bytes(x * 1000))
it = self.db.iteritems(self.cf_a)
it.seek_to_last()
ref = reversed(sorted([(self.cf_a, int_to_bytes(x)) for x in range(100)]))
ref = [(x, int_to_bytes(int(x[1]) * 1000)) for x in ref]
self.assertEqual(ref, list(reversed(it)))
def test_snapshot(self):
cfa = self.db.get_column_family(b'A')
self.db.put((cfa, b"a"), b"1")
self.db.put((cfa, b"b"), b"2")
snapshot = self.db.snapshot()
self.db.put((cfa, b"a"), b"2")
self.db.delete((cfa, b"b"))
it = self.db.iteritems(cfa)
it.seek_to_first()
self.assertEqual({(cfa, b'a'): b'2'}, dict(it))
it = self.db.iteritems(cfa, snapshot=snapshot)
it.seek_to_first()
self.assertEqual({(cfa, b'a'): b'1', (cfa, b'b'): b'2'}, dict(it))
def test_get_property(self):
for x in range(300):
x = int_to_bytes(x)
self.db.put((self.cf_a, x), x)
self.assertEqual(b"300",
self.db.get_property(b'rocksdb.estimate-num-keys',
self.cf_a))
self.assertIsNone(self.db.get_property(b'does not exsits',
self.cf_a))
def test_compact_range(self):
for x in range(10000):
x = int_to_bytes(x)
self.db.put((self.cf_b, x), x)
self.db.compact_range(column_family=self.cf_b)

View file

@ -3,16 +3,27 @@ import rocksdb
import pytest
import shutil
import os
import tempfile
def test_open_skiplist_memtable_factory():
opts = rocksdb.Options()
opts.memtable_factory = rocksdb.SkipListMemtableFactory()
opts.create_if_missing = True
test_db = rocksdb.DB("/tmp/test", opts)
loc = tempfile.mkdtemp()
try:
test_db = rocksdb.DB(os.path.join(loc, "test"), opts)
finally:
shutil.rmtree(loc)
def test_open_vector_memtable_factory():
opts = rocksdb.Options()
opts.allow_concurrent_memtable_write = False
opts.memtable_factory = rocksdb.VectorMemtableFactory()
opts.create_if_missing = True
test_db = rocksdb.DB("/tmp/test", opts)
loc = tempfile.mkdtemp()
try:
test_db = rocksdb.DB(os.path.join(loc, "test"), opts)
finally:
shutil.rmtree(loc)

View file

@ -1,4 +1,5 @@
import unittest
import sys
import rocksdb
class TestFilterPolicy(rocksdb.interfaces.FilterPolicy):
@ -37,7 +38,7 @@ class TestOptions(unittest.TestCase):
def test_compaction_pri(self):
opts = rocksdb.Options()
# default compaction_pri
# default compaction_pri
self.assertEqual(opts.compaction_pri, rocksdb.CompactionPri.by_compensated_size)
opts.compaction_pri = rocksdb.CompactionPri.by_compensated_size
self.assertEqual(opts.compaction_pri, rocksdb.CompactionPri.by_compensated_size)
@ -64,7 +65,8 @@ class TestOptions(unittest.TestCase):
# default value
self.assertEqual(isinstance(compression_opts, dict), True)
self.assertEqual(compression_opts['window_bits'], -14)
self.assertEqual(compression_opts['level'], -1)
# This doesn't match rocksdb latest
# self.assertEqual(compression_opts['level'], -1)
self.assertEqual(compression_opts['strategy'], 0)
self.assertEqual(compression_opts['max_dict_bytes'], 0)
@ -132,7 +134,12 @@ class TestOptions(unittest.TestCase):
opts.compaction_style = 'level'
self.assertEqual('level', opts.compaction_style)
with self.assertRaisesRegexp(Exception, 'Unknown compaction style'):
if sys.version_info[0] == 3:
assertRaisesRegex = self.assertRaisesRegex
else:
assertRaisesRegex = self.assertRaisesRegexp
with assertRaisesRegex(Exception, 'Unknown compaction style'):
opts.compaction_style = 'foo'
def test_compaction_opts_universal(self):

View file

@ -42,5 +42,6 @@ setup(
"doc": ['sphinx_rtd_theme', 'sphinx'],
"test": ['pytest'],
},
include_package_data=True
include_package_data=True,
zip_safe=False,
)