Add DB.iterator helper api #1

Merged
jackrobison merged 7 commits from iterator-api into master 2022-01-16 20:34:07 +01:00
5 changed files with 352 additions and 14 deletions

View file

@ -6,6 +6,7 @@ on:
push: push:
branches: branches:
- master - master
- iterator-api
jobs: jobs:
manylinux: manylinux:
runs-on: ubuntu-latest runs-on: ubuntu-latest
@ -14,6 +15,7 @@ jobs:
with: with:
submodules: recursive submodules: recursive
- name: Cache .a files - name: Cache .a files
id: build-cache
uses: actions/cache@v2 uses: actions/cache@v2
with: with:
key: ${{ runner.os }} key: ${{ runner.os }}
@ -25,8 +27,9 @@ jobs:
src/rocksdb/libz.a src/rocksdb/libz.a
src/rocksdb/librocksdb.a src/rocksdb/librocksdb.a
- name: Install requirements - name: Install requirements
run: sudo apt-get install build-essential binutils cmake run: sudo apt-get install build-essential binutils cmake python3-cffi
- name: Make static library files - name: Make static library files
if: steps.build-cache.outputs.cache-hit != 'true'
run: make clean && make run: make clean && make
- name: Build wheels - name: Build wheels
run: /bin/bash scripts/build.sh run: /bin/bash scripts/build.sh

View file

@ -1,4 +1,4 @@
include README.rst include README.md
include rocksdb/cpp/*.hpp include rocksdb/cpp/*.hpp
recursive-include rocksdb *.pxd recursive-include rocksdb *.pxd
recursive-include rocksdb *.pyx recursive-include rocksdb *.pyx

View file

@ -323,7 +323,6 @@ BloomFilterPolicy = PyBloomFilterPolicy
############################################# #############################################
## Here comes the stuff for the merge operator ## Here comes the stuff for the merge operator
@cython.internal @cython.internal
cdef class PyMergeOperator(object): cdef class PyMergeOperator(object):
@ -1950,6 +1949,83 @@ cdef class DB(object):
st = self.db.Write(opts, batch.batch) st = self.db.Write(opts, batch.batch)
check_status(st) check_status(st)
def iterator(self, start: bytes, column_family: ColumnFamilyHandle = None, iterate_lower_bound: bytes = None,
iterate_upper_bound: bytes = None, reverse: bool = False, include_key: bool = True,
include_value: bool = True, fill_cache: bool = True, prefix_same_as_start: bool = False,
auto_prefix_mode: bool = False):
"""
RocksDB Iterator
Args:
column_family (ColumnFamilyHandle): column family handle
start (bytes): prefix to seek to
iterate_lower_bound (bytes): defines the smallest key at which the backward iterator can return an entry.
Once the bound is passed, Valid() will be false. `iterate_lower_bound` is
inclusive ie the bound value is a valid entry.
If prefix_extractor is not null, the Seek target and `iterate_lower_bound`
need to have the same prefix. This is because ordering is not guaranteed
outside of prefix domain.
iterate_upper_bound: (bytes): defines the extent up to which the forward iterator
can returns entries. Once the bound is reached, Valid() will be false.
"iterate_upper_bound" is exclusive ie the bound value is
not a valid entry. If prefix_extractor is not null:
1. If auto_prefix_mode = true, iterate_upper_bound will be used
to infer whether prefix iterating (e.g. applying prefix bloom filter)
can be used within RocksDB. This is done by comparing
iterate_upper_bound with the seek key.
2. If auto_prefix_mode = false, iterate_upper_bound only takes
effect if it shares the same prefix as the seek key. If
iterate_upper_bound is outside the prefix of the seek key, then keys
returned outside the prefix range will be undefined, just as if
iterate_upper_bound = null.
If iterate_upper_bound is not null, SeekToLast() will position the iterator
at the first key smaller than iterate_upper_bound.
reverse: (bool): run the iteration in reverse - using `reversed` is also supported
include_key (bool): the iterator should include the key in each iteration
include_value (bool): the iterator should include the value in each iteration
fill_cache (bool): Should the "data block"/"index block" read for this iteration be placed in
block cache? Callers may wish to set this field to false for bulk scans.
This would help not to the change eviction order of existing items in the
block cache. Default: true
prefix_same_as_start (bool): Enforce that the iterator only iterates over the same prefix as the seek.
This option is effective only for prefix seeks, i.e. prefix_extractor is
non-null for the column family and total_order_seek is false. Unlike
iterate_upper_bound, prefix_same_as_start only works within a prefix
but in both directions. Default: false
auto_prefix_mode (bool): When true, by default use total_order_seek = true, and RocksDB can
selectively enable prefix seek mode if won't generate a different result
from total_order_seek, based on seek key, and iterator upper bound.
Not supported in ROCKSDB_LITE mode, in the way that even with value true
prefix mode is not used. Default: false
Returns:
BaseIterator: An iterator that yields key/value pairs or keys or values alone depending on the arguments.
The iterator supports being `reversed`
"""
if not include_value:
iterator = self.iterkeys(
column_family=column_family, fill_cache=fill_cache, prefix_same_as_start=prefix_same_as_start,
iterate_lower_bound=iterate_lower_bound, iterate_upper_bound=iterate_upper_bound,
auto_prefix_mode=auto_prefix_mode
)
elif not include_key:
iterator = self.itervalues(
column_family=column_family, fill_cache=fill_cache, prefix_same_as_start=prefix_same_as_start,
iterate_lower_bound=iterate_lower_bound, iterate_upper_bound=iterate_upper_bound,
auto_prefix_mode=auto_prefix_mode
)
else:
iterator = self.iteritems(
column_family=column_family, fill_cache=fill_cache, prefix_same_as_start=prefix_same_as_start,
iterate_lower_bound=iterate_lower_bound, iterate_upper_bound=iterate_upper_bound,
auto_prefix_mode=auto_prefix_mode
)
iterator.seek(start)
if reverse:
iterator = reversed(iterator)
return iterator
def get(self, key, *args, **kwargs): def get(self, key, *args, **kwargs):
cdef string res cdef string res
cdef Status st cdef Status st
@ -2277,18 +2353,30 @@ cdef class DB(object):
@staticmethod @staticmethod
def __parse_read_opts( def __parse_read_opts(
iterate_lower_bound=None,
iterate_upper_bound=None,
readahead_size=0,
prefix_same_as_start=False,
verify_checksums=False, verify_checksums=False,
fill_cache=True, fill_cache=True,
snapshot=None, snapshot=None,
read_tier="all"): read_tier="all",
auto_prefix_mode=False):
# TODO: Is this really effiencet ? # TODO: Is this really effiencet ?
return locals() return locals()
cdef options.ReadOptions build_read_opts(self, dict py_opts): cdef options.ReadOptions build_read_opts(self, dict py_opts):
cdef options.ReadOptions opts cdef options.ReadOptions opts
cdef Slice iterate_lower_bound
cdef Slice iterate_upper_bound
opts.verify_checksums = py_opts['verify_checksums'] opts.verify_checksums = py_opts['verify_checksums']
opts.fill_cache = py_opts['fill_cache'] opts.fill_cache = py_opts['fill_cache']
opts.readahead_size = py_opts['readahead_size']
opts.prefix_same_as_start = py_opts['prefix_same_as_start']
opts.auto_prefix_mode = py_opts['auto_prefix_mode']
if py_opts['snapshot'] is not None: if py_opts['snapshot'] is not None:
opts.snapshot = (<Snapshot?>(py_opts['snapshot'])).ptr opts.snapshot = (<Snapshot?>(py_opts['snapshot'])).ptr
@ -2298,7 +2386,10 @@ cdef class DB(object):
opts.read_tier = options.kBlockCacheTier opts.read_tier = options.kBlockCacheTier
else: else:
raise ValueError("Invalid read_tier") raise ValueError("Invalid read_tier")
if py_opts['iterate_lower_bound'] is not None:
opts.iterate_lower_bound = new Slice(PyBytes_AsString(py_opts['iterate_lower_bound']), PyBytes_Size(py_opts['iterate_lower_bound']))
if py_opts['iterate_upper_bound'] is not None:
opts.iterate_upper_bound = new Slice(PyBytes_AsString(py_opts['iterate_upper_bound']), PyBytes_Size(py_opts['iterate_upper_bound']))
return opts return opts
property options: property options:
@ -2347,6 +2438,9 @@ cdef class DB(object):
if copts: if copts:
copts.in_use = False copts.in_use = False
def write_batch(self, py_bool disable_wal = False, py_bool sync = False) -> RocksDBWriteBatch:
return RocksDBWriteBatch(self, sync=sync, disable_wal=disable_wal)
def repair_db(db_name, Options opts): def repair_db(db_name, Options opts):
cdef Status st cdef Status st
@ -2369,7 +2463,6 @@ def list_column_families(db_name, Options opts):
return column_families return column_families
@cython.no_gc_clear @cython.no_gc_clear
@cython.internal @cython.internal
cdef class Snapshot(object): cdef class Snapshot(object):
@ -2422,6 +2515,7 @@ cdef class BaseIterator(object):
return ret return ret
def __reversed__(self): def __reversed__(self):
self.seek_to_last()
return ReversedIterator(self) return ReversedIterator(self)
cpdef seek_to_first(self): cpdef seek_to_first(self):
@ -2622,3 +2716,23 @@ cdef class BackupEngine(object):
ret.append(t) ret.append(t)
return ret return ret
cdef class RocksDBWriteBatch(object):
cdef DB db
cdef py_bool sync
cdef py_bool disable_wal
cdef WriteBatch batch
def __cinit__(self, DB db, sync: bool = False, disable_wal: bool = False):
self.batch = WriteBatch()
self.db = db
self.sync = sync
self.disable_wal = disable_wal
def __enter__(self):
return self.batch
def __exit__(self, exc_type, exc_val, exc_tb):
if not exc_val:
self.db.write(self.batch, sync=self.sync, disable_wal=self.disable_wal)

View file

@ -172,10 +172,15 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb":
cpp_bool disableWAL cpp_bool disableWAL
cdef cppclass ReadOptions: cdef cppclass ReadOptions:
const Snapshot* snapshot
const Slice* iterate_lower_bound
const Slice* iterate_upper_bound
size_t readahead_size
cpp_bool verify_checksums cpp_bool verify_checksums
cpp_bool fill_cache cpp_bool fill_cache
const Snapshot* snapshot
ReadTier read_tier ReadTier read_tier
cpp_bool prefix_same_as_start
cpp_bool auto_prefix_mode
cdef cppclass FlushOptions: cdef cppclass FlushOptions:
cpp_bool wait cpp_bool wait

View file

@ -9,9 +9,11 @@ import struct
import tempfile import tempfile
from rocksdb.merge_operators import UintAddOperator, StringAppendOperator from rocksdb.merge_operators import UintAddOperator, StringAppendOperator
def int_to_bytes(ob): def int_to_bytes(ob):
return str(ob).encode('ascii') return str(ob).encode('ascii')
class TestHelper(unittest.TestCase): class TestHelper(unittest.TestCase):
def setUp(self): def setUp(self):
@ -69,6 +71,24 @@ class TestDB(TestHelper):
secondary.try_catch_up_with_primary() secondary.try_catch_up_with_primary()
self.assertEqual(b"b", secondary.get(b"a")) self.assertEqual(b"b", secondary.get(b"a"))
secondary2_location = os.path.join(self.db_loc, "secondary2")
secondary2 = rocksdb.DB(
os.path.join(self.db_loc, "test"),
rocksdb.Options(create_if_missing=True, max_open_files=-1),
secondary_name=secondary2_location
)
self.addCleanup(secondary2.close)
self.assertEqual(b"b", secondary2.get(b"a"))
self.db.put(b"a", b"c")
self.assertEqual(b"b", secondary.get(b"a"))
self.assertEqual(b"b", secondary2.get(b"a"))
self.assertEqual(b"c", self.db.get(b"a"))
secondary.try_catch_up_with_primary()
secondary2.try_catch_up_with_primary()
self.assertEqual(b"c", secondary.get(b"a"))
self.assertEqual(b"c", secondary2.get(b"a"))
def test_multi_get(self): def test_multi_get(self):
self.db.put(b"a", b"1") self.db.put(b"a", b"1")
self.db.put(b"b", b"2") self.db.put(b"b", b"2")
@ -97,6 +117,18 @@ class TestDB(TestHelper):
ret = self.db.multi_get([b'key', b'a']) ret = self.db.multi_get([b'key', b'a'])
self.assertEqual(ref, ret) self.assertEqual(ref, ret)
def test_write_batch_context(self):
with self.db.write_batch() as batch:
batch.put(b"key", b"v1")
batch.delete(b"key")
batch.put(b"key", b"v2")
batch.put(b"key", b"v3")
batch.put(b"a", b"b")
ref = {b'a': b'b', b'key': b'v3'}
ret = self.db.multi_get([b'key', b'a'])
self.assertEqual(ref, ret)
def test_write_batch_iter(self): def test_write_batch_iter(self):
batch = rocksdb.WriteBatch() batch = rocksdb.WriteBatch()
self.assertEqual([], list(batch)) self.assertEqual([], list(batch))
@ -120,7 +152,6 @@ class TestDB(TestHelper):
] ]
self.assertEqual(ref, list(it)) self.assertEqual(ref, list(it))
def test_key_may_exists(self): def test_key_may_exists(self):
self.db.put(b"a", b'1') self.db.put(b"a", b'1')
@ -174,7 +205,6 @@ class TestDB(TestHelper):
it.seek_for_prev(b'c3') it.seek_for_prev(b'c3')
self.assertEqual(it.get(), (b'c2', b'c2_value')) self.assertEqual(it.get(), (b'c2', b'c2_value'))
def test_iter_keys(self): def test_iter_keys(self):
for x in range(300): for x in range(300):
self.db.put(int_to_bytes(x), int_to_bytes(x)) self.db.put(int_to_bytes(x), int_to_bytes(x))
@ -457,6 +487,7 @@ class StaticPrefix(rocksdb.interfaces.SliceTransform):
def in_range(self, dst): def in_range(self, dst):
return len(dst) == 5 return len(dst) == 5
class TestPrefixExtractor(TestHelper): class TestPrefixExtractor(TestHelper):
def setUp(self): def setUp(self):
TestHelper.setUp(self) TestHelper.setUp(self)
@ -687,15 +718,30 @@ class TestDBColumnFamilies(TestHelper):
self.assertEqual({(cfa, b'a'): b'1', (cfa, b'b'): b'2'}, dict(it)) self.assertEqual({(cfa, b'a'): b'1', (cfa, b'b'): b'2'}, dict(it))
def test_get_property(self): def test_get_property(self):
secondary_location = os.path.join(self.db_loc, "secondary")
cf = {
b'A': rocksdb.ColumnFamilyOptions(),
b'B': rocksdb.ColumnFamilyOptions()
}
secondary = rocksdb.DB(
os.path.join(self.db_loc, "test"),
rocksdb.Options(create_if_missing=True, max_open_files=-1),
secondary_name=secondary_location, column_families=cf
)
self.addCleanup(secondary.close)
for x in range(300): for x in range(300):
x = int_to_bytes(x) x = int_to_bytes(x)
self.db.put((self.cf_a, x), x) self.db.put((self.cf_a, x), x)
self.assertEqual(b"300", self.assertIsNone(self.db.get_property(b'does not exsits', self.cf_a))
self.db.get_property(b'rocksdb.estimate-num-keys', self.assertEqual(b"0", secondary.get_property(b'rocksdb.estimate-num-keys', secondary.get_column_family(b'A')))
self.cf_a)) self.assertEqual(b"300", self.db.get_property(b'rocksdb.estimate-num-keys', self.cf_a))
self.assertIsNone(self.db.get_property(b'does not exsits',
self.cf_a)) secondary.try_catch_up_with_primary()
self.assertEqual(b"300", secondary.get_property(b'rocksdb.estimate-num-keys', secondary.get_column_family(b'A')))
self.assertEqual(b"300", self.db.get_property(b'rocksdb.estimate-num-keys', self.cf_a))
def test_compact_range(self): def test_compact_range(self):
for x in range(10000): for x in range(10000):
@ -704,3 +750,173 @@ class TestDBColumnFamilies(TestHelper):
self.db.compact_range(column_family=self.cf_b) self.db.compact_range(column_family=self.cf_b)
class OneCharacterPrefix(rocksdb.interfaces.SliceTransform):
def name(self):
return b'test prefix'
def transform(self, src):
return (0, 1)
def in_domain(self, src):
return len(src) >= 1
def in_range(self, dst):
return len(dst) == 1
class TestPrefixIterator(TestHelper):
def setUp(self):
TestHelper.setUp(self)
opts = rocksdb.Options(create_if_missing=True)
self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts)
def test_iterator(self):
self.db.put(b'a0', b'a0_value')
self.db.put(b'a1', b'a1_value')
self.db.put(b'a1b', b'a1b_value')
self.db.put(b'a2b', b'a2b_value')
self.db.put(b'a3', b'a3_value')
self.db.put(b'a4', b'a4_value')
self.db.put(b'b0', b'b0_value')
self.assertListEqual(
[(b'a0', b'a0_value'), (b'a1', b'a1_value'), (b'a1b', b'a1b_value'), (b'a2b', b'a2b_value'),
(b'a3', b'a3_value'), (b'a4', b'a4_value')],
list(self.db.iterator(start=b'a', iterate_upper_bound=b'b'))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(self.db.iterator(start=b'a', iterate_upper_bound=b'b', include_value=False))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False))
)
self.assertListEqual(
[b'a4', b'a3', b'a2b', b'a1b', b'a1', b'a0'],
list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a4', include_value=False))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False))
)
self.assertListEqual(
[b'a1b', b'a1', b'a0'],
list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(self.db.iterator(start=b'a', iterate_upper_bound=b'b0', include_value=False))
)
class TestPrefixIteratorWithExtractor(TestHelper):
def setUp(self):
TestHelper.setUp(self)
opts = rocksdb.Options(create_if_missing=True)
opts.prefix_extractor = OneCharacterPrefix()
self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts)
def test_iterator(self):
self.db.put(b'a0', b'a0_value')
self.db.put(b'a1', b'a1_value')
self.db.put(b'a1b', b'a1b_value')
self.db.put(b'a2b', b'a2b_value')
self.db.put(b'a3', b'a3_value')
self.db.put(b'a4', b'a4_value')
self.db.put(b'b0', b'b0_value')
self.assertListEqual(
[(b'a0', b'a0_value'), (b'a1', b'a1_value'), (b'a1b', b'a1b_value'), (b'a2b', b'a2b_value'),
(b'a3', b'a3_value'), (b'a4', b'a4_value')],
list(self.db.iterator(start=b'a', prefix_same_as_start=True))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(self.db.iterator(start=b'a', include_value=False, prefix_same_as_start=True))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False))
)
self.assertListEqual(
[b'a4', b'a3', b'a2b', b'a1b', b'a1', b'a0'],
list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a4', include_value=False))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False))
)
self.assertListEqual(
[b'a1b', b'a1', b'a0'],
list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(self.db.iterator(start=b'a', iterate_upper_bound=b'b0', include_value=False))
)
def test_column_family_iterator(self):
cf_a = self.db.create_column_family(b'first', rocksdb.ColumnFamilyOptions())
cf_b = self.db.create_column_family(b'second', rocksdb.ColumnFamilyOptions())
self.db.put((cf_a, b'a0'), b'a0_value')
self.db.put((cf_a, b'a1'), b'a1_value')
self.db.put((cf_a, b'a1b'), b'a1b_value')
self.db.put((cf_a, b'a2b'), b'a2b_value')
self.db.put((cf_a, b'a3'), b'a3_value')
self.db.put((cf_a, b'a4'), b'a4_value')
self.db.put((cf_b, b'b0'), b'b0_value')
self.assertListEqual(
[(b'a0', b'a0_value'), (b'a1', b'a1_value'), (b'a1b', b'a1b_value'), (b'a2b', b'a2b_value'),
(b'a3', b'a3_value'), (b'a4', b'a4_value')],
list(map(lambda x: (x[0][-1], x[1]), self.db.iterator(column_family=cf_a, start=b'a', prefix_same_as_start=True)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(map(lambda x: x[-1], self.db.iterator(column_family=cf_a, start=b'a', include_value=False, prefix_same_as_start=True)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(map(lambda x: x[-1], self.db.iterator(column_family=cf_a, start=b'a0', iterate_upper_bound=b'a5', include_value=False)))
)
self.assertListEqual(
[b'a4', b'a3', b'a2b', b'a1b', b'a1', b'a0'],
list(map(lambda x: x[-1],
reversed(self.db.iterator(
column_family=cf_a, start=b'a0', iterate_upper_bound=b'a5', include_value=False
))))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3'],
list(map(lambda x: x[-1], self.db.iterator(column_family=cf_a, start=b'a0', iterate_upper_bound=b'a4', include_value=False)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b'],
list(map(lambda x: x[-1], self.db.iterator(column_family=cf_a, start=b'a0', iterate_upper_bound=b'a2', include_value=False)))
)
self.assertListEqual(
[b'a1b', b'a1', b'a0'],
list(map(lambda x: x[-1], reversed(
self.db.iterator(column_family=cf_a, start=b'a0', iterate_upper_bound=b'a2', include_value=False))))
)
self.assertListEqual(
[b'b0'],
list(map(lambda x: x[-1], self.db.iterator(column_family=cf_b, start=b'b', include_value=False)))
)