diff --git a/docs/api/database.rst b/docs/api/database.rst index 8b3f2ca..2b2f875 100644 --- a/docs/api/database.rst +++ b/docs/api/database.rst @@ -364,6 +364,11 @@ WriteBatch Clear all updates buffered in this batch. + .. note:: + Don't call this method if there is an outstanding iterator. + Calling :py:meth:`rocksdb.WriteBatch.clear()` with outstanding + iterator, leads to SEGFAULT. + .. py:method:: data() Retrieve the serialized version of this batch. @@ -376,6 +381,49 @@ WriteBatch :rtype: int + .. py:method:: __iter__() + + Returns an iterator over the current contents of the write batch. + + If you add new items to the batch, they are not visible for this + iterator. Create a new one if you need to see them. + + .. note:: + Calling :py:meth:`rocksdb.WriteBatch.clear()` on the write batch + invalidates the iterator. Using a iterator where its corresponding + write batch has been cleared, leads to SEGFAULT. + + :rtype: :py:class:`rocksdb.WriteBatchIterator` + +WriteBatchIterator +================== + +.. py:class:: rocksdb.WriteBatchIterator + + .. py:method:: __iter__() + + Returns self. + + .. py:method:: __next__() + + Returns the next item inside the corresponding write batch. + The return value is a tuple of always size three. + + First item (Name of the operation): + + * ``"Put"`` + * ``"Merge"`` + * ``"Delete"`` + + Second item (key): + Key for this operation. + + Third item (value): + The value for this operation. Empty for ``"Delete"``. + + changelog + tutoro + Errors ====== diff --git a/docs/changelog.rst b/docs/changelog.rst index 25df699..be29e43 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -57,6 +57,7 @@ New: * Make CompactRange available: :py:meth:`rocksdb.DB.compact_range` * Add init options to :py:class:`rocksdb.BlockBasedTableFactory` * Add more option to :py:class:`rocksdb.PlainTableFactory` +* Add :py:class:`rocksdb.WriteBatchIterator` Version 0.2 diff --git a/docs/tutorial/index.rst b/docs/tutorial/index.rst index 0159876..e89988a 100644 --- a/docs/tutorial/index.rst +++ b/docs/tutorial/index.rst @@ -346,3 +346,22 @@ Here is an example to switch to *universal style compaction*. :: See here for more options on *universal style compaction*, :py:attr:`rocksdb.Options.compaction_options_universal` + +Iterate Over WriteBatch +======================= + +In same cases you need to know, what operations happened on a WriteBatch. +The pyrocksdb WriteBatch supports the iterator protocol, see this example. :: + + batch = rocksdb.WriteBatch() + batch.put(b"key1", b"v1") + batch.delete(b'a') + batch.merge(b'xxx', b'value') + + for op, key, value in batch: + print op, key, value + + # prints the following three lines + # Put key1 v1 + # Delete a + # Merge xxx value diff --git a/rocksdb/_rocksdb.pyx b/rocksdb/_rocksdb.pyx index 6782d86..9d7dedd 100644 --- a/rocksdb/_rocksdb.pyx +++ b/rocksdb/_rocksdb.pyx @@ -1214,6 +1214,9 @@ cdef class ValuesIterator cdef class ItemsIterator cdef class ReversedIterator +# Forward declaration +cdef class WriteBatchIterator + cdef class WriteBatch(object): cdef db.WriteBatch* batch @@ -1246,6 +1249,51 @@ cdef class WriteBatch(object): def count(self): return self.batch.Count() + def __iter__(self): + return WriteBatchIterator(self) + + +@cython.internal +cdef class WriteBatchIterator(object): + # Need a reference to the WriteBatch. + # The BatchItems are only pointers to the memory in WriteBatch. + cdef WriteBatch batch + cdef vector[db.BatchItem] items + cdef size_t pos + + def __init__(self, WriteBatch batch): + cdef Status st + + self.batch = batch + self.pos = 0 + + st = db.get_batch_items(batch.batch, cython.address(self.items)) + check_status(st) + + def __iter__(self): + return self + + def __next__(self): + if self.pos == self.items.size(): + raise StopIteration() + + cdef str op + + if self.items[self.pos].op == db.BatchItemOpPut: + op = "Put" + elif self.items[self.pos].op == db.BatchItemOpMerge: + op = "Merge" + elif self.items[self.pos].op == db.BatchItemOpDelte: + op = "Delete" + + ret = ( + op, + slice_to_bytes(self.items[self.pos].key), + slice_to_bytes(self.items[self.pos].value)) + + self.pos += 1 + return ret + @cython.no_gc_clear cdef class DB(object): cdef Options opts diff --git a/rocksdb/cpp/write_batch_iter_helper.hpp b/rocksdb/cpp/write_batch_iter_helper.hpp new file mode 100644 index 0000000..97c2f1d --- /dev/null +++ b/rocksdb/cpp/write_batch_iter_helper.hpp @@ -0,0 +1,56 @@ +#pragma once + +#include +#include "rocksdb/write_batch.h" + +namespace py_rocks { + +class RecordItemsHandler: public rocksdb::WriteBatch::Handler { + public: + enum Optype {PutRecord, MergeRecord, DeleteRecord}; + + class BatchItem { + public: + BatchItem( + const Optype& op, + const rocksdb::Slice& key, + const rocksdb::Slice& value): + op(op), + key(key), + value(value) + {} + + const Optype op; + const rocksdb::Slice key; + const rocksdb::Slice value; + }; + + typedef std::vector BatchItems; + + public: + /* Items is filled during iteration. */ + RecordItemsHandler(BatchItems* items): items(items) {} + + void Put(const Slice& key, const Slice& value) { + this->items->emplace_back(PutRecord, key, value); + } + + void Merge(const Slice& key, const Slice& value) { + this->items->emplace_back(MergeRecord, key, value); + } + + virtual void Delete(const Slice& key) { + this->items->emplace_back(DeleteRecord, key, rocksdb::Slice()); + } + + private: + BatchItems* items; +}; + +rocksdb::Status +get_batch_items(const rocksdb::WriteBatch* batch, RecordItemsHandler::BatchItems* items) { + RecordItemsHandler handler(items); + return batch->Iterate(&handler); +} + +} diff --git a/rocksdb/db.pxd b/rocksdb/db.pxd index a609d1c..06a7f40 100644 --- a/rocksdb/db.pxd +++ b/rocksdb/db.pxd @@ -20,6 +20,20 @@ cdef extern from "rocksdb/write_batch.h" namespace "rocksdb": const string& Data() nogil except+ int Count() nogil except+ +cdef extern from "cpp/write_batch_iter_helper.hpp" namespace "py_rocks": + cdef enum BatchItemOp "RecordItemsHandler::Optype": + BatchItemOpPut "py_rocks::RecordItemsHandler::Optype::PutRecord" + BatchItemOpMerge "py_rocks::RecordItemsHandler::Optype::MergeRecord" + BatchItemOpDelte "py_rocks::RecordItemsHandler::Optype::DeleteRecord" + + cdef cppclass BatchItem "py_rocks::RecordItemsHandler::BatchItem": + BatchItemOp op + Slice key + Slice value + + Status get_batch_items(WriteBatch* batch, vector[BatchItem]* items) + + cdef extern from "rocksdb/db.h" namespace "rocksdb": ctypedef uint64_t SequenceNumber diff --git a/rocksdb/tests/test_db.py b/rocksdb/tests/test_db.py index ba92907..0a71f4c 100644 --- a/rocksdb/tests/test_db.py +++ b/rocksdb/tests/test_db.py @@ -73,6 +73,37 @@ class TestDB(unittest.TestCase, TestHelper): ret = self.db.multi_get([b'key', b'a']) self.assertEqual(ref, ret) + def test_write_batch_iter(self): + batch = rocksdb.WriteBatch() + batch.put(b"key1", b"v1") + batch.delete(b'a') + batch.merge(b'xxx', b'value') + for op, key, value in batch: + print op, key, value + + batch = rocksdb.WriteBatch() + self.assertEqual([], list(batch)) + + batch.put(b"key1", b"v1") + batch.put(b"key2", b"v2") + batch.put(b"key3", b"v3") + batch.delete(b'a') + batch.delete(b'key1') + batch.merge(b'xxx', b'value') + + it = iter(batch) + del batch + ref = [ + ('Put', 'key1', 'v1'), + ('Put', 'key2', 'v2'), + ('Put', 'key3', 'v3'), + ('Delete', 'a', ''), + ('Delete', 'key1', ''), + ('Merge', 'xxx', 'value') + ] + self.assertEqual(ref, list(it)) + + def test_key_may_exists(self): self.db.put(b"a", b'1')