Add DB.iterator helper api #1

Merged
jackrobison merged 7 commits from iterator-api into master 2022-01-16 20:34:07 +01:00
5 changed files with 352 additions and 14 deletions

View file

@ -6,6 +6,7 @@ on:
push:
branches:
- master
- iterator-api
jobs:
manylinux:
runs-on: ubuntu-latest
@ -14,6 +15,7 @@ jobs:
with:
submodules: recursive
- name: Cache .a files
id: build-cache
uses: actions/cache@v2
with:
key: ${{ runner.os }}
@ -25,8 +27,9 @@ jobs:
src/rocksdb/libz.a
src/rocksdb/librocksdb.a
- name: Install requirements
run: sudo apt-get install build-essential binutils cmake
run: sudo apt-get install build-essential binutils cmake python3-cffi
- name: Make static library files
if: steps.build-cache.outputs.cache-hit != 'true'
run: make clean && make
- name: Build wheels
run: /bin/bash scripts/build.sh

View file

@ -1,4 +1,4 @@
include README.rst
include README.md
include rocksdb/cpp/*.hpp
recursive-include rocksdb *.pxd
recursive-include rocksdb *.pyx

View file

@ -323,7 +323,6 @@ BloomFilterPolicy = PyBloomFilterPolicy
#############################################
## Here comes the stuff for the merge operator
@cython.internal
cdef class PyMergeOperator(object):
@ -1950,6 +1949,83 @@ cdef class DB(object):
st = self.db.Write(opts, batch.batch)
check_status(st)
def iterator(self, start: bytes, column_family: ColumnFamilyHandle = None, iterate_lower_bound: bytes = None,
iterate_upper_bound: bytes = None, reverse: bool = False, include_key: bool = True,
include_value: bool = True, fill_cache: bool = True, prefix_same_as_start: bool = False,
auto_prefix_mode: bool = False):
"""
RocksDB Iterator
Args:
column_family (ColumnFamilyHandle): column family handle
start (bytes): prefix to seek to
iterate_lower_bound (bytes): defines the smallest key at which the backward iterator can return an entry.
Once the bound is passed, Valid() will be false. `iterate_lower_bound` is
inclusive ie the bound value is a valid entry.
If prefix_extractor is not null, the Seek target and `iterate_lower_bound`
need to have the same prefix. This is because ordering is not guaranteed
outside of prefix domain.
iterate_upper_bound: (bytes): defines the extent up to which the forward iterator
can returns entries. Once the bound is reached, Valid() will be false.
"iterate_upper_bound" is exclusive ie the bound value is
not a valid entry. If prefix_extractor is not null:
1. If auto_prefix_mode = true, iterate_upper_bound will be used
to infer whether prefix iterating (e.g. applying prefix bloom filter)
can be used within RocksDB. This is done by comparing
iterate_upper_bound with the seek key.
2. If auto_prefix_mode = false, iterate_upper_bound only takes
effect if it shares the same prefix as the seek key. If
iterate_upper_bound is outside the prefix of the seek key, then keys
returned outside the prefix range will be undefined, just as if
iterate_upper_bound = null.
If iterate_upper_bound is not null, SeekToLast() will position the iterator
at the first key smaller than iterate_upper_bound.
reverse: (bool): run the iteration in reverse - using `reversed` is also supported
include_key (bool): the iterator should include the key in each iteration
include_value (bool): the iterator should include the value in each iteration
fill_cache (bool): Should the "data block"/"index block" read for this iteration be placed in
block cache? Callers may wish to set this field to false for bulk scans.
This would help not to the change eviction order of existing items in the
block cache. Default: true
prefix_same_as_start (bool): Enforce that the iterator only iterates over the same prefix as the seek.
This option is effective only for prefix seeks, i.e. prefix_extractor is
non-null for the column family and total_order_seek is false. Unlike
iterate_upper_bound, prefix_same_as_start only works within a prefix
but in both directions. Default: false
auto_prefix_mode (bool): When true, by default use total_order_seek = true, and RocksDB can
selectively enable prefix seek mode if won't generate a different result
from total_order_seek, based on seek key, and iterator upper bound.
Not supported in ROCKSDB_LITE mode, in the way that even with value true
prefix mode is not used. Default: false
Returns:
BaseIterator: An iterator that yields key/value pairs or keys or values alone depending on the arguments.
The iterator supports being `reversed`
"""
if not include_value:
iterator = self.iterkeys(
column_family=column_family, fill_cache=fill_cache, prefix_same_as_start=prefix_same_as_start,
iterate_lower_bound=iterate_lower_bound, iterate_upper_bound=iterate_upper_bound,
auto_prefix_mode=auto_prefix_mode
)
elif not include_key:
iterator = self.itervalues(
column_family=column_family, fill_cache=fill_cache, prefix_same_as_start=prefix_same_as_start,
iterate_lower_bound=iterate_lower_bound, iterate_upper_bound=iterate_upper_bound,
auto_prefix_mode=auto_prefix_mode
)
else:
iterator = self.iteritems(
column_family=column_family, fill_cache=fill_cache, prefix_same_as_start=prefix_same_as_start,
iterate_lower_bound=iterate_lower_bound, iterate_upper_bound=iterate_upper_bound,
auto_prefix_mode=auto_prefix_mode
)
iterator.seek(start)
if reverse:
iterator = reversed(iterator)
return iterator
def get(self, key, *args, **kwargs):
cdef string res
cdef Status st
@ -2277,18 +2353,30 @@ cdef class DB(object):
@staticmethod
def __parse_read_opts(
iterate_lower_bound=None,
iterate_upper_bound=None,
readahead_size=0,
prefix_same_as_start=False,
verify_checksums=False,
fill_cache=True,
snapshot=None,
read_tier="all"):
read_tier="all",
auto_prefix_mode=False):
# TODO: Is this really effiencet ?
return locals()
cdef options.ReadOptions build_read_opts(self, dict py_opts):
cdef options.ReadOptions opts
cdef Slice iterate_lower_bound
cdef Slice iterate_upper_bound
opts.verify_checksums = py_opts['verify_checksums']
opts.fill_cache = py_opts['fill_cache']
opts.readahead_size = py_opts['readahead_size']
opts.prefix_same_as_start = py_opts['prefix_same_as_start']
opts.auto_prefix_mode = py_opts['auto_prefix_mode']
if py_opts['snapshot'] is not None:
opts.snapshot = (<Snapshot?>(py_opts['snapshot'])).ptr
@ -2298,7 +2386,10 @@ cdef class DB(object):
opts.read_tier = options.kBlockCacheTier
else:
raise ValueError("Invalid read_tier")
if py_opts['iterate_lower_bound'] is not None:
opts.iterate_lower_bound = new Slice(PyBytes_AsString(py_opts['iterate_lower_bound']), PyBytes_Size(py_opts['iterate_lower_bound']))
if py_opts['iterate_upper_bound'] is not None:
opts.iterate_upper_bound = new Slice(PyBytes_AsString(py_opts['iterate_upper_bound']), PyBytes_Size(py_opts['iterate_upper_bound']))
return opts
property options:
@ -2347,6 +2438,9 @@ cdef class DB(object):
if copts:
copts.in_use = False
def write_batch(self, py_bool disable_wal = False, py_bool sync = False) -> RocksDBWriteBatch:
return RocksDBWriteBatch(self, sync=sync, disable_wal=disable_wal)
def repair_db(db_name, Options opts):
cdef Status st
@ -2369,7 +2463,6 @@ def list_column_families(db_name, Options opts):
return column_families
@cython.no_gc_clear
@cython.internal
cdef class Snapshot(object):
@ -2422,6 +2515,7 @@ cdef class BaseIterator(object):
return ret
def __reversed__(self):
self.seek_to_last()
return ReversedIterator(self)
cpdef seek_to_first(self):
@ -2622,3 +2716,23 @@ cdef class BackupEngine(object):
ret.append(t)
return ret
cdef class RocksDBWriteBatch(object):
cdef DB db
cdef py_bool sync
cdef py_bool disable_wal
cdef WriteBatch batch
def __cinit__(self, DB db, sync: bool = False, disable_wal: bool = False):
self.batch = WriteBatch()
self.db = db
self.sync = sync
self.disable_wal = disable_wal
def __enter__(self):
return self.batch
def __exit__(self, exc_type, exc_val, exc_tb):
if not exc_val:
self.db.write(self.batch, sync=self.sync, disable_wal=self.disable_wal)

View file

@ -172,10 +172,15 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb":
cpp_bool disableWAL
cdef cppclass ReadOptions:
const Snapshot* snapshot
const Slice* iterate_lower_bound
const Slice* iterate_upper_bound
size_t readahead_size
cpp_bool verify_checksums
cpp_bool fill_cache
const Snapshot* snapshot
ReadTier read_tier
cpp_bool prefix_same_as_start
cpp_bool auto_prefix_mode
cdef cppclass FlushOptions:
cpp_bool wait

View file

@ -9,9 +9,11 @@ import struct
import tempfile
from rocksdb.merge_operators import UintAddOperator, StringAppendOperator
def int_to_bytes(ob):
return str(ob).encode('ascii')
class TestHelper(unittest.TestCase):
def setUp(self):
@ -69,6 +71,24 @@ class TestDB(TestHelper):
secondary.try_catch_up_with_primary()
self.assertEqual(b"b", secondary.get(b"a"))
secondary2_location = os.path.join(self.db_loc, "secondary2")
secondary2 = rocksdb.DB(
os.path.join(self.db_loc, "test"),
rocksdb.Options(create_if_missing=True, max_open_files=-1),
secondary_name=secondary2_location
)
self.addCleanup(secondary2.close)
self.assertEqual(b"b", secondary2.get(b"a"))
self.db.put(b"a", b"c")
self.assertEqual(b"b", secondary.get(b"a"))
self.assertEqual(b"b", secondary2.get(b"a"))
self.assertEqual(b"c", self.db.get(b"a"))
secondary.try_catch_up_with_primary()
secondary2.try_catch_up_with_primary()
self.assertEqual(b"c", secondary.get(b"a"))
self.assertEqual(b"c", secondary2.get(b"a"))
def test_multi_get(self):
self.db.put(b"a", b"1")
self.db.put(b"b", b"2")
@ -97,6 +117,18 @@ class TestDB(TestHelper):
ret = self.db.multi_get([b'key', b'a'])
self.assertEqual(ref, ret)
def test_write_batch_context(self):
with self.db.write_batch() as batch:
batch.put(b"key", b"v1")
batch.delete(b"key")
batch.put(b"key", b"v2")
batch.put(b"key", b"v3")
batch.put(b"a", b"b")
ref = {b'a': b'b', b'key': b'v3'}
ret = self.db.multi_get([b'key', b'a'])
self.assertEqual(ref, ret)
def test_write_batch_iter(self):
batch = rocksdb.WriteBatch()
self.assertEqual([], list(batch))
@ -120,7 +152,6 @@ class TestDB(TestHelper):
]
self.assertEqual(ref, list(it))
def test_key_may_exists(self):
self.db.put(b"a", b'1')
@ -174,7 +205,6 @@ class TestDB(TestHelper):
it.seek_for_prev(b'c3')
self.assertEqual(it.get(), (b'c2', b'c2_value'))
def test_iter_keys(self):
for x in range(300):
self.db.put(int_to_bytes(x), int_to_bytes(x))
@ -457,6 +487,7 @@ class StaticPrefix(rocksdb.interfaces.SliceTransform):
def in_range(self, dst):
return len(dst) == 5
class TestPrefixExtractor(TestHelper):
def setUp(self):
TestHelper.setUp(self)
@ -687,15 +718,30 @@ class TestDBColumnFamilies(TestHelper):
self.assertEqual({(cfa, b'a'): b'1', (cfa, b'b'): b'2'}, dict(it))
def test_get_property(self):
secondary_location = os.path.join(self.db_loc, "secondary")
cf = {
b'A': rocksdb.ColumnFamilyOptions(),
b'B': rocksdb.ColumnFamilyOptions()
}
secondary = rocksdb.DB(
os.path.join(self.db_loc, "test"),
rocksdb.Options(create_if_missing=True, max_open_files=-1),
secondary_name=secondary_location, column_families=cf
)
self.addCleanup(secondary.close)
for x in range(300):
x = int_to_bytes(x)
self.db.put((self.cf_a, x), x)
self.assertEqual(b"300",
self.db.get_property(b'rocksdb.estimate-num-keys',
self.cf_a))
self.assertIsNone(self.db.get_property(b'does not exsits',
self.cf_a))
self.assertIsNone(self.db.get_property(b'does not exsits', self.cf_a))
self.assertEqual(b"0", secondary.get_property(b'rocksdb.estimate-num-keys', secondary.get_column_family(b'A')))
self.assertEqual(b"300", self.db.get_property(b'rocksdb.estimate-num-keys', self.cf_a))
secondary.try_catch_up_with_primary()
self.assertEqual(b"300", secondary.get_property(b'rocksdb.estimate-num-keys', secondary.get_column_family(b'A')))
self.assertEqual(b"300", self.db.get_property(b'rocksdb.estimate-num-keys', self.cf_a))
def test_compact_range(self):
for x in range(10000):
@ -704,3 +750,173 @@ class TestDBColumnFamilies(TestHelper):
self.db.compact_range(column_family=self.cf_b)
class OneCharacterPrefix(rocksdb.interfaces.SliceTransform):
def name(self):
return b'test prefix'
def transform(self, src):
return (0, 1)
def in_domain(self, src):
return len(src) >= 1
def in_range(self, dst):
return len(dst) == 1
class TestPrefixIterator(TestHelper):
def setUp(self):
TestHelper.setUp(self)
opts = rocksdb.Options(create_if_missing=True)
self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts)
def test_iterator(self):
self.db.put(b'a0', b'a0_value')
self.db.put(b'a1', b'a1_value')
self.db.put(b'a1b', b'a1b_value')
self.db.put(b'a2b', b'a2b_value')
self.db.put(b'a3', b'a3_value')
self.db.put(b'a4', b'a4_value')
self.db.put(b'b0', b'b0_value')
self.assertListEqual(
[(b'a0', b'a0_value'), (b'a1', b'a1_value'), (b'a1b', b'a1b_value'), (b'a2b', b'a2b_value'),
(b'a3', b'a3_value'), (b'a4', b'a4_value')],
list(self.db.iterator(start=b'a', iterate_upper_bound=b'b'))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(self.db.iterator(start=b'a', iterate_upper_bound=b'b', include_value=False))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False))
)
self.assertListEqual(
[b'a4', b'a3', b'a2b', b'a1b', b'a1', b'a0'],
list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a4', include_value=False))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False))
)
self.assertListEqual(
[b'a1b', b'a1', b'a0'],
list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(self.db.iterator(start=b'a', iterate_upper_bound=b'b0', include_value=False))
)
class TestPrefixIteratorWithExtractor(TestHelper):
def setUp(self):
TestHelper.setUp(self)
opts = rocksdb.Options(create_if_missing=True)
opts.prefix_extractor = OneCharacterPrefix()
self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts)
def test_iterator(self):
self.db.put(b'a0', b'a0_value')
self.db.put(b'a1', b'a1_value')
self.db.put(b'a1b', b'a1b_value')
self.db.put(b'a2b', b'a2b_value')
self.db.put(b'a3', b'a3_value')
self.db.put(b'a4', b'a4_value')
self.db.put(b'b0', b'b0_value')
self.assertListEqual(
[(b'a0', b'a0_value'), (b'a1', b'a1_value'), (b'a1b', b'a1b_value'), (b'a2b', b'a2b_value'),
(b'a3', b'a3_value'), (b'a4', b'a4_value')],
list(self.db.iterator(start=b'a', prefix_same_as_start=True))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(self.db.iterator(start=b'a', include_value=False, prefix_same_as_start=True))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False))
)
self.assertListEqual(
[b'a4', b'a3', b'a2b', b'a1b', b'a1', b'a0'],
list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a5', include_value=False)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a4', include_value=False))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b'],
list(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False))
)
self.assertListEqual(
[b'a1b', b'a1', b'a0'],
list(reversed(self.db.iterator(start=b'a0', iterate_upper_bound=b'a2', include_value=False)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(self.db.iterator(start=b'a', iterate_upper_bound=b'b0', include_value=False))
)
def test_column_family_iterator(self):
cf_a = self.db.create_column_family(b'first', rocksdb.ColumnFamilyOptions())
cf_b = self.db.create_column_family(b'second', rocksdb.ColumnFamilyOptions())
self.db.put((cf_a, b'a0'), b'a0_value')
self.db.put((cf_a, b'a1'), b'a1_value')
self.db.put((cf_a, b'a1b'), b'a1b_value')
self.db.put((cf_a, b'a2b'), b'a2b_value')
self.db.put((cf_a, b'a3'), b'a3_value')
self.db.put((cf_a, b'a4'), b'a4_value')
self.db.put((cf_b, b'b0'), b'b0_value')
self.assertListEqual(
[(b'a0', b'a0_value'), (b'a1', b'a1_value'), (b'a1b', b'a1b_value'), (b'a2b', b'a2b_value'),
(b'a3', b'a3_value'), (b'a4', b'a4_value')],
list(map(lambda x: (x[0][-1], x[1]), self.db.iterator(column_family=cf_a, start=b'a', prefix_same_as_start=True)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(map(lambda x: x[-1], self.db.iterator(column_family=cf_a, start=b'a', include_value=False, prefix_same_as_start=True)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3', b'a4'],
list(map(lambda x: x[-1], self.db.iterator(column_family=cf_a, start=b'a0', iterate_upper_bound=b'a5', include_value=False)))
)
self.assertListEqual(
[b'a4', b'a3', b'a2b', b'a1b', b'a1', b'a0'],
list(map(lambda x: x[-1],
reversed(self.db.iterator(
column_family=cf_a, start=b'a0', iterate_upper_bound=b'a5', include_value=False
))))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b', b'a2b', b'a3'],
list(map(lambda x: x[-1], self.db.iterator(column_family=cf_a, start=b'a0', iterate_upper_bound=b'a4', include_value=False)))
)
self.assertListEqual(
[b'a0', b'a1', b'a1b'],
list(map(lambda x: x[-1], self.db.iterator(column_family=cf_a, start=b'a0', iterate_upper_bound=b'a2', include_value=False)))
)
self.assertListEqual(
[b'a1b', b'a1', b'a0'],
list(map(lambda x: x[-1], reversed(
self.db.iterator(column_family=cf_a, start=b'a0', iterate_upper_bound=b'a2', include_value=False))))
)
self.assertListEqual(
[b'b0'],
list(map(lambda x: x[-1], self.db.iterator(column_family=cf_b, start=b'b', include_value=False)))
)