diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 65de8df..1b6c3a1 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -6,6 +6,7 @@ on: push: branches: - master + - iterator-api jobs: manylinux: runs-on: ubuntu-latest @@ -14,6 +15,7 @@ jobs: with: submodules: recursive - name: Cache .a files + id: build-cache uses: actions/cache@v2 with: key: ${{ runner.os }} @@ -25,8 +27,9 @@ jobs: src/rocksdb/libz.a src/rocksdb/librocksdb.a - name: Install requirements - run: sudo apt-get install build-essential binutils cmake + run: sudo apt-get install build-essential binutils cmake python3-cffi - name: Make static library files + if: steps.build-cache.outputs.cache-hit != 'true' run: make clean && make - name: Build wheels run: /bin/bash scripts/build.sh diff --git a/MANIFEST.in b/MANIFEST.in index e324b5d..d8585d2 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,4 @@ -include README.rst +include README.md include rocksdb/cpp/*.hpp recursive-include rocksdb *.pxd recursive-include rocksdb *.pyx diff --git a/rocksdb/_rocksdb.pyx b/rocksdb/_rocksdb.pyx index 7d5cfe8..8f87daa 100644 --- a/rocksdb/_rocksdb.pyx +++ b/rocksdb/_rocksdb.pyx @@ -323,7 +323,6 @@ BloomFilterPolicy = PyBloomFilterPolicy ############################################# - ## Here comes the stuff for the merge operator @cython.internal cdef class PyMergeOperator(object): @@ -1950,6 +1949,83 @@ cdef class DB(object): st = self.db.Write(opts, batch.batch) check_status(st) + def iterator(self, start: bytes, column_family: ColumnFamilyHandle = None, iterate_lower_bound: bytes = None, + iterate_upper_bound: bytes = None, reverse: bool = False, include_key: bool = True, + include_value: bool = True, fill_cache: bool = True, prefix_same_as_start: bool = False, + auto_prefix_mode: bool = False): + """ + RocksDB Iterator + + Args: + column_family (ColumnFamilyHandle): column family handle + start (bytes): prefix to seek to + iterate_lower_bound (bytes): defines the smallest key at which the backward iterator can return an entry. + Once the bound is passed, Valid() will be false. `iterate_lower_bound` is + inclusive ie the bound value is a valid entry. + If prefix_extractor is not null, the Seek target and `iterate_lower_bound` + need to have the same prefix. This is because ordering is not guaranteed + outside of prefix domain. + iterate_upper_bound: (bytes): defines the extent up to which the forward iterator + can returns entries. Once the bound is reached, Valid() will be false. + "iterate_upper_bound" is exclusive ie the bound value is + not a valid entry. If prefix_extractor is not null: + 1. If auto_prefix_mode = true, iterate_upper_bound will be used + to infer whether prefix iterating (e.g. applying prefix bloom filter) + can be used within RocksDB. This is done by comparing + iterate_upper_bound with the seek key. + 2. If auto_prefix_mode = false, iterate_upper_bound only takes + effect if it shares the same prefix as the seek key. If + iterate_upper_bound is outside the prefix of the seek key, then keys + returned outside the prefix range will be undefined, just as if + iterate_upper_bound = null. + If iterate_upper_bound is not null, SeekToLast() will position the iterator + at the first key smaller than iterate_upper_bound. + reverse: (bool): run the iteration in reverse - using `reversed` is also supported + include_key (bool): the iterator should include the key in each iteration + include_value (bool): the iterator should include the value in each iteration + fill_cache (bool): Should the "data block"/"index block" read for this iteration be placed in + block cache? Callers may wish to set this field to false for bulk scans. + This would help not to the change eviction order of existing items in the + block cache. Default: true + prefix_same_as_start (bool): Enforce that the iterator only iterates over the same prefix as the seek. + This option is effective only for prefix seeks, i.e. prefix_extractor is + non-null for the column family and total_order_seek is false. Unlike + iterate_upper_bound, prefix_same_as_start only works within a prefix + but in both directions. Default: false + auto_prefix_mode (bool): When true, by default use total_order_seek = true, and RocksDB can + selectively enable prefix seek mode if won't generate a different result + from total_order_seek, based on seek key, and iterator upper bound. + Not supported in ROCKSDB_LITE mode, in the way that even with value true + prefix mode is not used. Default: false + + Returns: + BaseIterator: An iterator that yields key/value pairs or keys or values alone depending on the arguments. + The iterator supports being `reversed` + """ + + if not include_value: + iterator = self.iterkeys( + column_family=column_family, fill_cache=fill_cache, prefix_same_as_start=prefix_same_as_start, + iterate_lower_bound=iterate_lower_bound, iterate_upper_bound=iterate_upper_bound, + auto_prefix_mode=auto_prefix_mode + ) + elif not include_key: + iterator = self.itervalues( + column_family=column_family, fill_cache=fill_cache, prefix_same_as_start=prefix_same_as_start, + iterate_lower_bound=iterate_lower_bound, iterate_upper_bound=iterate_upper_bound, + auto_prefix_mode=auto_prefix_mode + ) + else: + iterator = self.iteritems( + column_family=column_family, fill_cache=fill_cache, prefix_same_as_start=prefix_same_as_start, + iterate_lower_bound=iterate_lower_bound, iterate_upper_bound=iterate_upper_bound, + auto_prefix_mode=auto_prefix_mode + ) + iterator.seek(start) + if reverse: + iterator = reversed(iterator) + return iterator + def get(self, key, *args, **kwargs): cdef string res cdef Status st @@ -2277,18 +2353,30 @@ cdef class DB(object): @staticmethod def __parse_read_opts( + iterate_lower_bound=None, + iterate_upper_bound=None, + readahead_size=0, + prefix_same_as_start=False, verify_checksums=False, fill_cache=True, snapshot=None, - read_tier="all"): + read_tier="all", + auto_prefix_mode=False): # TODO: Is this really effiencet ? return locals() cdef options.ReadOptions build_read_opts(self, dict py_opts): cdef options.ReadOptions opts + cdef Slice iterate_lower_bound + cdef Slice iterate_upper_bound + opts.verify_checksums = py_opts['verify_checksums'] opts.fill_cache = py_opts['fill_cache'] + opts.readahead_size = py_opts['readahead_size'] + opts.prefix_same_as_start = py_opts['prefix_same_as_start'] + opts.auto_prefix_mode = py_opts['auto_prefix_mode'] + if py_opts['snapshot'] is not None: opts.snapshot = ((py_opts['snapshot'])).ptr @@ -2298,7 +2386,10 @@ cdef class DB(object): opts.read_tier = options.kBlockCacheTier else: raise ValueError("Invalid read_tier") - + if py_opts['iterate_lower_bound'] is not None: + opts.iterate_lower_bound = new Slice(PyBytes_AsString(py_opts['iterate_lower_bound']), PyBytes_Size(py_opts['iterate_lower_bound'])) + if py_opts['iterate_upper_bound'] is not None: + opts.iterate_upper_bound = new Slice(PyBytes_AsString(py_opts['iterate_upper_bound']), PyBytes_Size(py_opts['iterate_upper_bound'])) return opts property options: @@ -2347,6 +2438,9 @@ cdef class DB(object): if copts: copts.in_use = False + def write_batch(self, py_bool disable_wal = False, py_bool sync = False) -> RocksDBWriteBatch: + return RocksDBWriteBatch(self, sync=sync, disable_wal=disable_wal) + def repair_db(db_name, Options opts): cdef Status st @@ -2369,7 +2463,6 @@ def list_column_families(db_name, Options opts): return column_families - @cython.no_gc_clear @cython.internal cdef class Snapshot(object): @@ -2422,6 +2515,7 @@ cdef class BaseIterator(object): return ret def __reversed__(self): + self.seek_to_last() return ReversedIterator(self) cpdef seek_to_first(self): @@ -2622,3 +2716,23 @@ cdef class BackupEngine(object): ret.append(t) return ret + + +cdef class RocksDBWriteBatch(object): + cdef DB db + cdef py_bool sync + cdef py_bool disable_wal + cdef WriteBatch batch + + def __cinit__(self, DB db, sync: bool = False, disable_wal: bool = False): + self.batch = WriteBatch() + self.db = db + self.sync = sync + self.disable_wal = disable_wal + + def __enter__(self): + return self.batch + + def __exit__(self, exc_type, exc_val, exc_tb): + if not exc_val: + self.db.write(self.batch, sync=self.sync, disable_wal=self.disable_wal) diff --git a/rocksdb/options.pxd b/rocksdb/options.pxd index c7c1260..3bab301 100644 --- a/rocksdb/options.pxd +++ b/rocksdb/options.pxd @@ -172,10 +172,15 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb": cpp_bool disableWAL cdef cppclass ReadOptions: + const Snapshot* snapshot + const Slice* iterate_lower_bound + const Slice* iterate_upper_bound + size_t readahead_size cpp_bool verify_checksums cpp_bool fill_cache - const Snapshot* snapshot ReadTier read_tier + cpp_bool prefix_same_as_start + cpp_bool auto_prefix_mode cdef cppclass FlushOptions: cpp_bool wait diff --git a/tests/test_db.py b/tests/test_db.py index c0adcd5..35ab4c4 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -9,9 +9,11 @@ import struct import tempfile from rocksdb.merge_operators import UintAddOperator, StringAppendOperator + def int_to_bytes(ob): return str(ob).encode('ascii') + class TestHelper(unittest.TestCase): def setUp(self): @@ -69,6 +71,24 @@ class TestDB(TestHelper): secondary.try_catch_up_with_primary() self.assertEqual(b"b", secondary.get(b"a")) + secondary2_location = os.path.join(self.db_loc, "secondary2") + secondary2 = rocksdb.DB( + os.path.join(self.db_loc, "test"), + rocksdb.Options(create_if_missing=True, max_open_files=-1), + secondary_name=secondary2_location + ) + self.addCleanup(secondary2.close) + + self.assertEqual(b"b", secondary2.get(b"a")) + self.db.put(b"a", b"c") + self.assertEqual(b"b", secondary.get(b"a")) + self.assertEqual(b"b", secondary2.get(b"a")) + self.assertEqual(b"c", self.db.get(b"a")) + secondary.try_catch_up_with_primary() + secondary2.try_catch_up_with_primary() + self.assertEqual(b"c", secondary.get(b"a")) + self.assertEqual(b"c", secondary2.get(b"a")) + def test_multi_get(self): self.db.put(b"a", b"1") self.db.put(b"b", b"2") @@ -97,6 +117,18 @@ class TestDB(TestHelper): ret = self.db.multi_get([b'key', b'a']) self.assertEqual(ref, ret) + def test_write_batch_context(self): + with self.db.write_batch() as batch: + batch.put(b"key", b"v1") + batch.delete(b"key") + batch.put(b"key", b"v2") + batch.put(b"key", b"v3") + batch.put(b"a", b"b") + + ref = {b'a': b'b', b'key': b'v3'} + ret = self.db.multi_get([b'key', b'a']) + self.assertEqual(ref, ret) + def test_write_batch_iter(self): batch = rocksdb.WriteBatch() self.assertEqual([], list(batch)) @@ -120,7 +152,6 @@ class TestDB(TestHelper): ] self.assertEqual(ref, list(it)) - def test_key_may_exists(self): self.db.put(b"a", b'1') @@ -174,7 +205,6 @@ class TestDB(TestHelper): it.seek_for_prev(b'c3') self.assertEqual(it.get(), (b'c2', b'c2_value')) - def test_iter_keys(self): for x in range(300): self.db.put(int_to_bytes(x), int_to_bytes(x)) @@ -457,6 +487,7 @@ class StaticPrefix(rocksdb.interfaces.SliceTransform): def in_range(self, dst): return len(dst) == 5 + class TestPrefixExtractor(TestHelper): def setUp(self): TestHelper.setUp(self) @@ -687,15 +718,30 @@ class TestDBColumnFamilies(TestHelper): self.assertEqual({(cfa, b'a'): b'1', (cfa, b'b'): b'2'}, dict(it)) def test_get_property(self): + secondary_location = os.path.join(self.db_loc, "secondary") + cf = { + b'A': rocksdb.ColumnFamilyOptions(), + b'B': rocksdb.ColumnFamilyOptions() + } + secondary = rocksdb.DB( + os.path.join(self.db_loc, "test"), + rocksdb.Options(create_if_missing=True, max_open_files=-1), + secondary_name=secondary_location, column_families=cf + ) + self.addCleanup(secondary.close) + for x in range(300): x = int_to_bytes(x) self.db.put((self.cf_a, x), x) - self.assertEqual(b"300", - self.db.get_property(b'rocksdb.estimate-num-keys', - self.cf_a)) - self.assertIsNone(self.db.get_property(b'does not exsits', - self.cf_a)) + self.assertIsNone(self.db.get_property(b'does not exsits', self.cf_a)) + self.assertEqual(b"0", secondary.get_property(b'rocksdb.estimate-num-keys', secondary.get_column_family(b'A'))) + self.assertEqual(b"300", self.db.get_property(b'rocksdb.estimate-num-keys', self.cf_a)) + + secondary.try_catch_up_with_primary() + + self.assertEqual(b"300", secondary.get_property(b'rocksdb.estimate-num-keys', secondary.get_column_family(b'A'))) + self.assertEqual(b"300", self.db.get_property(b'rocksdb.estimate-num-keys', self.cf_a)) def test_compact_range(self): for x in range(10000): @@ -704,3 +750,173 @@ class TestDBColumnFamilies(TestHelper): self.db.compact_range(column_family=self.cf_b) + +class OneCharacterPrefix(rocksdb.interfaces.SliceTransform): + def name(self): + return b'test prefix' + + def transform(self, src): + return (0, 1) + + def in_domain(self, src): + return len(src) >= 1 + + def in_range(self, dst): + return len(dst) == 1 + + +class TestPrefixIterator(TestHelper): + def setUp(self): + TestHelper.setUp(self) + opts = rocksdb.Options(create_if_missing=True) + self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts) + + def test_iterator(self): + self.db.put(b'a0', b'a0_value') + self.db.put(b'a1', b'a1_value') + self.db.put(b'a1b', b'a1b_value') + self.db.put(b'a2b', b'a2b_value') + self.db.put(b'a3', b'a3_value') + self.db.put(b'a4', b'a4_value') + self.db.put(b'b0', b'b0_value') + self.assertListEqual( + [(b'a0', b'a0_value'), (b'a1', b'a1_value'), (b'a1b', b'a1b_value'), (b'a2b', b'a2b_value'), + (b'a3', b'a3_value'), (b'a4', b'a4_value')], + list(self.db.iterator(start=b'a', iterate_upper_bound=b'b')) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'], + list(self.db.iterator(start=b'a', iterate_upper_bound=b'b', include_value=False)) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'], + list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False)) + ) + self.assertListEqual( + [b'a4', b'a3', b'a2b', b'a1b', b'a1', b'a0'], + list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False))) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3'], + list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a4', include_value=False)) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b'], + list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False)) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b'], + list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False)) + ) + self.assertListEqual( + [b'a1b', b'a1', b'a0'], + list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False))) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'], + list(self.db.iterator(start=b'a', iterate_upper_bound=b'b0', include_value=False)) + ) + + +class TestPrefixIteratorWithExtractor(TestHelper): + def setUp(self): + TestHelper.setUp(self) + opts = rocksdb.Options(create_if_missing=True) + opts.prefix_extractor = OneCharacterPrefix() + self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts) + + def test_iterator(self): + self.db.put(b'a0', b'a0_value') + self.db.put(b'a1', b'a1_value') + self.db.put(b'a1b', b'a1b_value') + self.db.put(b'a2b', b'a2b_value') + self.db.put(b'a3', b'a3_value') + self.db.put(b'a4', b'a4_value') + self.db.put(b'b0', b'b0_value') + self.assertListEqual( + [(b'a0', b'a0_value'), (b'a1', b'a1_value'), (b'a1b', b'a1b_value'), (b'a2b', b'a2b_value'), + (b'a3', b'a3_value'), (b'a4', b'a4_value')], + list(self.db.iterator(start=b'a', prefix_same_as_start=True)) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'], + list(self.db.iterator(start=b'a', include_value=False, prefix_same_as_start=True)) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'], + list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False)) + ) + self.assertListEqual( + [b'a4', b'a3', b'a2b', b'a1b', b'a1', b'a0'], + list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False))) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3'], + list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a4', include_value=False)) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b'], + list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False)) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b'], + list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False)) + ) + self.assertListEqual( + [b'a1b', b'a1', b'a0'], + list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False))) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'], + list(self.db.iterator(start=b'a', iterate_upper_bound=b'b0', include_value=False)) + ) + + def test_column_family_iterator(self): + cf_a = self.db.create_column_family(b'first', rocksdb.ColumnFamilyOptions()) + cf_b = self.db.create_column_family(b'second', rocksdb.ColumnFamilyOptions()) + + self.db.put((cf_a, b'a0'), b'a0_value') + self.db.put((cf_a, b'a1'), b'a1_value') + self.db.put((cf_a, b'a1b'), b'a1b_value') + self.db.put((cf_a, b'a2b'), b'a2b_value') + self.db.put((cf_a, b'a3'), b'a3_value') + self.db.put((cf_a, b'a4'), b'a4_value') + self.db.put((cf_b, b'b0'), b'b0_value') + + self.assertListEqual( + [(b'a0', b'a0_value'), (b'a1', b'a1_value'), (b'a1b', b'a1b_value'), (b'a2b', b'a2b_value'), + (b'a3', b'a3_value'), (b'a4', b'a4_value')], + list(map(lambda x: (x[0][-1], x[1]), self.db.iterator(column_family=cf_a, start=b'a', prefix_same_as_start=True))) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'], + list(map(lambda x: x[-1], self.db.iterator(column_family=cf_a, start=b'a', include_value=False, prefix_same_as_start=True))) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'], + list(map(lambda x: x[-1], self.db.iterator(column_family=cf_a, start=b'a0', iterate_upper_bound=b'a5', include_value=False))) + ) + self.assertListEqual( + [b'a4', b'a3', b'a2b', b'a1b', b'a1', b'a0'], + list(map(lambda x: x[-1], + reversed(self.db.iterator( + column_family=cf_a, start=b'a0', iterate_upper_bound=b'a5', include_value=False + )))) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b', b'a2b', b'a3'], + list(map(lambda x: x[-1], self.db.iterator(column_family=cf_a, start=b'a0', iterate_upper_bound=b'a4', include_value=False))) + ) + self.assertListEqual( + [b'a0', b'a1', b'a1b'], + list(map(lambda x: x[-1], self.db.iterator(column_family=cf_a, start=b'a0', iterate_upper_bound=b'a2', include_value=False))) + ) + self.assertListEqual( + [b'a1b', b'a1', b'a0'], + list(map(lambda x: x[-1], reversed( + self.db.iterator(column_family=cf_a, start=b'a0', iterate_upper_bound=b'a2', include_value=False)))) + ) + self.assertListEqual( + [b'b0'], + list(map(lambda x: x[-1], self.db.iterator(column_family=cf_b, start=b'b', include_value=False))) + )