diff --git a/rocksdb/_rocksdb.pyx b/rocksdb/_rocksdb.pyx index bfedde2..57740f8 100644 --- a/rocksdb/_rocksdb.pyx +++ b/rocksdb/_rocksdb.pyx @@ -50,6 +50,7 @@ from interfaces import Comparator as IComparator from interfaces import SliceTransform as ISliceTransform import traceback import errors +import weakref ctypedef const filter_policy.FilterPolicy ConstFilterPolicy @@ -292,7 +293,7 @@ cdef class PyBloomFilterPolicy(PyFilterPolicy): self.policy.get().CreateFilter( vector_data(c_keys), - c_keys.size(), + c_keys.size(), cython.address(dst)) return string_to_bytes(dst) @@ -733,26 +734,128 @@ cdef class CompactionPri(object): oldest_smallest_seq_first = u'oldest_smallest_seq_first' min_overlapping_ratio = u'min_overlapping_ratio' -cdef class Options(object): - cdef options.Options* opts +@cython.internal +cdef class _ColumnFamilyHandle: + """ This is an internal class that we will weakref for safety """ + cdef db.ColumnFamilyHandle* handle + cdef object __weakref__ + cdef object weak_handle + + def __cinit__(self): + self.handle = NULL + + def __dealloc__(self): + if not self.handle == NULL: + del self.handle + + @staticmethod + cdef from_handle_ptr(db.ColumnFamilyHandle* handle): + inst = <_ColumnFamilyHandle>_ColumnFamilyHandle.__new__(_ColumnFamilyHandle) + inst.handle = handle + return inst + + @property + def name(self): + return self.handle.GetName() + + @property + def id(self): + return self.handle.GetID() + + @property + def weakref(self): + if self.weak_handle is None: + self.weak_handle = ColumnFamilyHandle.from_wrapper(self) + return self.weak_handle + +cdef class ColumnFamilyHandle: + """ This represents a ColumnFamilyHandle """ + cdef object _ref + cdef readonly bytes name + cdef readonly int id + + def __cinit__(self, weakhandle): + self._ref = weakhandle + self.name = self._ref().name + self.id = self._ref().id + + def __init__(self, *): + raise TypeError("These can not be constructed from Python") + + @staticmethod + cdef object from_wrapper(_ColumnFamilyHandle real_handle): + return ColumnFamilyHandle.__new__(ColumnFamilyHandle, weakref.ref(real_handle)) + + @property + def is_valid(self): + return self._ref() is not None + + def __repr__(self): + valid = "valid" if self.is_valid else "invalid" + return f"" + + cdef db.ColumnFamilyHandle* get_handle(self) except NULL: + cdef _ColumnFamilyHandle real_handle = self._ref() + if real_handle is None: + raise ValueError(f"{self} is no longer a valid ColumnFamilyHandle!") + return real_handle.handle + + def __eq__(self, other): + cdef ColumnFamilyHandle fast_other + if isinstance(other, ColumnFamilyHandle): + fast_other = other + return ( + self.name == fast_other.name + and self.id == fast_other.id + and self._ref == fast_other._ref + ) + return False + + def __lt__(self, other): + cdef ColumnFamilyHandle fast_other + if isinstance(other, ColumnFamilyHandle): + return self.id < other.id + return NotImplemented + + # Since @total_ordering isn't a thing for cython + def __ne__(self, other): + return not self == other + + def __gt__(self, other): + return other < self + + def __le__(self, other): + return not other < self + + def __ge__(self, other): + return not self < other + + def __hash__(self): + # hash of a weakref matches that of its original ref'ed object + # so we use the id of our weakref object here to prevent + # a situation where we are invalid, but match a valid handle's hash + return hash((self.id, self.name, id(self._ref))) + + +cdef class ColumnFamilyOptions(object): + cdef options.ColumnFamilyOptions* copts cdef PyComparator py_comparator cdef PyMergeOperator py_merge_operator cdef PySliceTransform py_prefix_extractor cdef PyTableFactory py_table_factory cdef PyMemtableFactory py_memtable_factory - cdef PyCache py_row_cache # Used to protect sharing of Options with many DB-objects cdef cpp_bool in_use def __cinit__(self): - self.opts = NULL - self.opts = new options.Options() + self.copts = NULL + self.copts = new options.ColumnFamilyOptions() self.in_use = False def __dealloc__(self): - if not self.opts == NULL: - del self.opts + if not self.copts == NULL: + del self.copts def __init__(self, **kwargs): self.py_comparator = BytewiseComparator() @@ -760,6 +863,398 @@ cdef class Options(object): self.py_prefix_extractor = None self.py_table_factory = None self.py_memtable_factory = None + + for key, value in kwargs.items(): + setattr(self, key, value) + + property write_buffer_size: + def __get__(self): + return self.copts.write_buffer_size + def __set__(self, value): + self.copts.write_buffer_size = value + + property max_write_buffer_number: + def __get__(self): + return self.copts.max_write_buffer_number + def __set__(self, value): + self.copts.max_write_buffer_number = value + + property min_write_buffer_number_to_merge: + def __get__(self): + return self.copts.min_write_buffer_number_to_merge + def __set__(self, value): + self.copts.min_write_buffer_number_to_merge = value + + property compression_opts: + def __get__(self): + cdef dict ret_ob = {} + + ret_ob['window_bits'] = self.copts.compression_opts.window_bits + ret_ob['level'] = self.copts.compression_opts.level + ret_ob['strategy'] = self.copts.compression_opts.strategy + ret_ob['max_dict_bytes'] = self.copts.compression_opts.max_dict_bytes + + return ret_ob + + def __set__(self, dict value): + cdef options.CompressionOptions* copts + copts = cython.address(self.copts.compression_opts) + # CompressionOptions(int wbits, int _lev, int _strategy, int _max_dict_bytes) + if 'window_bits' in value: + copts.window_bits = value['window_bits'] + if 'level' in value: + copts.level = value['level'] + if 'strategy' in value: + copts.strategy = value['strategy'] + if 'max_dict_bytes' in value: + copts.max_dict_bytes = value['max_dict_bytes'] + + property compaction_pri: + def __get__(self): + if self.copts.compaction_pri == options.kByCompensatedSize: + return CompactionPri.by_compensated_size + if self.copts.compaction_pri == options.kOldestLargestSeqFirst: + return CompactionPri.oldest_largest_seq_first + if self.copts.compaction_pri == options.kOldestSmallestSeqFirst: + return CompactionPri.oldest_smallest_seq_first + if self.copts.compaction_pri == options.kMinOverlappingRatio: + return CompactionPri.min_overlapping_ratio + def __set__(self, value): + if value == CompactionPri.by_compensated_size: + self.copts.compaction_pri = options.kByCompensatedSize + elif value == CompactionPri.oldest_largest_seq_first: + self.copts.compaction_pri = options.kOldestLargestSeqFirst + elif value == CompactionPri.oldest_smallest_seq_first: + self.copts.compaction_pri = options.kOldestSmallestSeqFirst + elif value == CompactionPri.min_overlapping_ratio: + self.copts.compaction_pri = options.kMinOverlappingRatio + else: + raise TypeError("Unknown compaction pri: %s" % value) + + property compression: + def __get__(self): + if self.copts.compression == options.kNoCompression: + return CompressionType.no_compression + elif self.copts.compression == options.kSnappyCompression: + return CompressionType.snappy_compression + elif self.copts.compression == options.kZlibCompression: + return CompressionType.zlib_compression + elif self.copts.compression == options.kBZip2Compression: + return CompressionType.bzip2_compression + elif self.copts.compression == options.kLZ4Compression: + return CompressionType.lz4_compression + elif self.copts.compression == options.kLZ4HCCompression: + return CompressionType.lz4hc_compression + elif self.copts.compression == options.kXpressCompression: + return CompressionType.xpress_compression + elif self.copts.compression == options.kZSTD: + return CompressionType.zstd_compression + elif self.copts.compression == options.kZSTDNotFinalCompression: + return CompressionType.zstdnotfinal_compression + elif self.copts.compression == options.kDisableCompressionOption: + return CompressionType.disable_compression + else: + raise Exception("Unknonw type: %s" % self.opts.compression) + + def __set__(self, value): + if value == CompressionType.no_compression: + self.copts.compression = options.kNoCompression + elif value == CompressionType.snappy_compression: + self.copts.compression = options.kSnappyCompression + elif value == CompressionType.zlib_compression: + self.copts.compression = options.kZlibCompression + elif value == CompressionType.bzip2_compression: + self.copts.compression = options.kBZip2Compression + elif value == CompressionType.lz4_compression: + self.copts.compression = options.kLZ4Compression + elif value == CompressionType.lz4hc_compression: + self.copts.compression = options.kLZ4HCCompression + elif value == CompressionType.zstd_compression: + self.copts.compression = options.kZSTD + elif value == CompressionType.zstdnotfinal_compression: + self.copts.compression = options.kZSTDNotFinalCompression + elif value == CompressionType.disable_compression: + self.copts.compression = options.kDisableCompressionOption + else: + raise TypeError("Unknown compression: %s" % value) + + property max_compaction_bytes: + def __get__(self): + return self.copts.max_compaction_bytes + def __set__(self, value): + self.copts.max_compaction_bytes = value + + property num_levels: + def __get__(self): + return self.copts.num_levels + def __set__(self, value): + self.copts.num_levels = value + + property level0_file_num_compaction_trigger: + def __get__(self): + return self.copts.level0_file_num_compaction_trigger + def __set__(self, value): + self.copts.level0_file_num_compaction_trigger = value + + property level0_slowdown_writes_trigger: + def __get__(self): + return self.copts.level0_slowdown_writes_trigger + def __set__(self, value): + self.copts.level0_slowdown_writes_trigger = value + + property level0_stop_writes_trigger: + def __get__(self): + return self.copts.level0_stop_writes_trigger + def __set__(self, value): + self.copts.level0_stop_writes_trigger = value + + property max_mem_compaction_level: + def __get__(self): + return self.copts.max_mem_compaction_level + def __set__(self, value): + self.copts.max_mem_compaction_level = value + + property target_file_size_base: + def __get__(self): + return self.copts.target_file_size_base + def __set__(self, value): + self.copts.target_file_size_base = value + + property target_file_size_multiplier: + def __get__(self): + return self.copts.target_file_size_multiplier + def __set__(self, value): + self.copts.target_file_size_multiplier = value + + property max_bytes_for_level_base: + def __get__(self): + return self.copts.max_bytes_for_level_base + def __set__(self, value): + self.copts.max_bytes_for_level_base = value + + property max_bytes_for_level_multiplier: + def __get__(self): + return self.copts.max_bytes_for_level_multiplier + def __set__(self, value): + self.copts.max_bytes_for_level_multiplier = value + + property max_bytes_for_level_multiplier_additional: + def __get__(self): + return self.copts.max_bytes_for_level_multiplier_additional + def __set__(self, value): + self.copts.max_bytes_for_level_multiplier_additional = value + + property soft_rate_limit: + def __get__(self): + return self.copts.soft_rate_limit + def __set__(self, value): + self.copts.soft_rate_limit = value + + property hard_rate_limit: + def __get__(self): + return self.copts.hard_rate_limit + def __set__(self, value): + self.copts.hard_rate_limit = value + + property rate_limit_delay_max_milliseconds: + def __get__(self): + return self.copts.rate_limit_delay_max_milliseconds + def __set__(self, value): + self.copts.rate_limit_delay_max_milliseconds = value + + property arena_block_size: + def __get__(self): + return self.copts.arena_block_size + def __set__(self, value): + self.copts.arena_block_size = value + + property disable_auto_compactions: + def __get__(self): + return self.copts.disable_auto_compactions + def __set__(self, value): + self.copts.disable_auto_compactions = value + + property purge_redundant_kvs_while_flush: + def __get__(self): + return self.copts.purge_redundant_kvs_while_flush + def __set__(self, value): + self.copts.purge_redundant_kvs_while_flush = value + + # FIXME: remove to util/options_helper.h + # property allow_os_buffer: + # def __get__(self): + # return self.copts.allow_os_buffer + # def __set__(self, value): + # self.copts.allow_os_buffer = value + + property compaction_style: + def __get__(self): + if self.copts.compaction_style == kCompactionStyleLevel: + return 'level' + if self.copts.compaction_style == kCompactionStyleUniversal: + return 'universal' + if self.copts.compaction_style == kCompactionStyleFIFO: + return 'fifo' + if self.copts.compaction_style == kCompactionStyleNone: + return 'none' + raise Exception("Unknown compaction_style") + + def __set__(self, str value): + if value == 'level': + self.copts.compaction_style = kCompactionStyleLevel + elif value == 'universal': + self.copts.compaction_style = kCompactionStyleUniversal + elif value == 'fifo': + self.copts.compaction_style = kCompactionStyleFIFO + elif value == 'none': + self.copts.compaction_style = kCompactionStyleNone + else: + raise Exception("Unknown compaction style") + + property compaction_options_universal: + def __get__(self): + cdef universal_compaction.CompactionOptionsUniversal uopts + cdef dict ret_ob = {} + + uopts = self.copts.compaction_options_universal + + ret_ob['size_ratio'] = uopts.size_ratio + ret_ob['min_merge_width'] = uopts.min_merge_width + ret_ob['max_merge_width'] = uopts.max_merge_width + ret_ob['max_size_amplification_percent'] = uopts.max_size_amplification_percent + ret_ob['compression_size_percent'] = uopts.compression_size_percent + + if uopts.stop_style == kCompactionStopStyleSimilarSize: + ret_ob['stop_style'] = 'similar_size' + elif uopts.stop_style == kCompactionStopStyleTotalSize: + ret_ob['stop_style'] = 'total_size' + else: + raise Exception("Unknown compaction style") + + return ret_ob + + def __set__(self, dict value): + cdef universal_compaction.CompactionOptionsUniversal* uopts + uopts = cython.address(self.copts.compaction_options_universal) + + if 'size_ratio' in value: + uopts.size_ratio = value['size_ratio'] + + if 'min_merge_width' in value: + uopts.min_merge_width = value['min_merge_width'] + + if 'max_merge_width' in value: + uopts.max_merge_width = value['max_merge_width'] + + if 'max_size_amplification_percent' in value: + uopts.max_size_amplification_percent = value['max_size_amplification_percent'] + + if 'compression_size_percent' in value: + uopts.compression_size_percent = value['compression_size_percent'] + + if 'stop_style' in value: + if value['stop_style'] == 'similar_size': + uopts.stop_style = kCompactionStopStyleSimilarSize + elif value['stop_style'] == 'total_size': + uopts.stop_style = kCompactionStopStyleTotalSize + else: + raise Exception("Unknown compaction style") + + # Deprecate + # property filter_deletes: + # def __get__(self): + # return self.copts.filter_deletes + # def __set__(self, value): + # self.copts.filter_deletes = value + + property max_sequential_skip_in_iterations: + def __get__(self): + return self.copts.max_sequential_skip_in_iterations + def __set__(self, value): + self.copts.max_sequential_skip_in_iterations = value + + property inplace_update_support: + def __get__(self): + return self.copts.inplace_update_support + def __set__(self, value): + self.copts.inplace_update_support = value + + property table_factory: + def __get__(self): + return self.py_table_factory + + def __set__(self, PyTableFactory value): + self.py_table_factory = value + self.copts.table_factory = value.get_table_factory() + + property memtable_factory: + def __get__(self): + return self.py_memtable_factory + + def __set__(self, PyMemtableFactory value): + self.py_memtable_factory = value + self.copts.memtable_factory = value.get_memtable_factory() + + property inplace_update_num_locks: + def __get__(self): + return self.copts.inplace_update_num_locks + def __set__(self, value): + self.copts.inplace_update_num_locks = value + + property comparator: + def __get__(self): + return self.py_comparator.get_ob() + + def __set__(self, value): + if isinstance(value, PyComparator): + if (value).get_comparator() == NULL: + raise Exception("Cannot set %s as comparator" % value) + else: + self.py_comparator = value + else: + self.py_comparator = PyGenericComparator(value) + + self.copts.comparator = self.py_comparator.get_comparator() + + property merge_operator: + def __get__(self): + if self.py_merge_operator is None: + return None + return self.py_merge_operator.get_ob() + + def __set__(self, value): + self.py_merge_operator = PyMergeOperator(value) + self.copts.merge_operator = self.py_merge_operator.get_operator() + + property prefix_extractor: + def __get__(self): + if self.py_prefix_extractor is None: + return None + return self.py_prefix_extractor.get_ob() + + def __set__(self, value): + self.py_prefix_extractor = PySliceTransform(value) + self.copts.prefix_extractor = self.py_prefix_extractor.get_transformer() + + +cdef class Options(ColumnFamilyOptions): + cdef options.Options* opts + cdef PyCache py_row_cache + + def __cinit__(self): + # Destroy the existing ColumnFamilyOptions() + del self.copts + self.opts = NULL + self.copts = self.opts = new options.Options() + self.in_use = False + + def __dealloc__(self): + if not self.opts == NULL: + self.copts = NULL + del self.opts + + def __init__(self, **kwargs): + ColumnFamilyOptions.__init__(self) self.py_row_cache = None for key, value in kwargs.items(): @@ -783,190 +1278,12 @@ cdef class Options(object): def __set__(self, value): self.opts.paranoid_checks = value - property write_buffer_size: - def __get__(self): - return self.opts.write_buffer_size - def __set__(self, value): - self.opts.write_buffer_size = value - - property max_write_buffer_number: - def __get__(self): - return self.opts.max_write_buffer_number - def __set__(self, value): - self.opts.max_write_buffer_number = value - - property max_compaction_bytes: - def __get__(self): - return self.opts.max_compaction_bytes - def __set__(self, value): - self.opts.max_compaction_bytes = value - - property min_write_buffer_number_to_merge: - def __get__(self): - return self.opts.min_write_buffer_number_to_merge - def __set__(self, value): - self.opts.min_write_buffer_number_to_merge = value - property max_open_files: def __get__(self): return self.opts.max_open_files def __set__(self, value): self.opts.max_open_files = value - property compression_opts: - def __get__(self): - cdef dict ret_ob = {} - - ret_ob['window_bits'] = self.opts.compression_opts.window_bits - ret_ob['level'] = self.opts.compression_opts.level - ret_ob['strategy'] = self.opts.compression_opts.strategy - ret_ob['max_dict_bytes'] = self.opts.compression_opts.max_dict_bytes - - return ret_ob - - def __set__(self, dict value): - cdef options.CompressionOptions* copts - copts = cython.address(self.opts.compression_opts) - # CompressionOptions(int wbits, int _lev, int _strategy, int _max_dict_bytes) - if 'window_bits' in value: - copts.window_bits = value['window_bits'] - if 'level' in value: - copts.level = value['level'] - if 'strategy' in value: - copts.strategy = value['strategy'] - if 'max_dict_bytes' in value: - copts.max_dict_bytes = value['max_dict_bytes'] - - property compaction_pri: - def __get__(self): - if self.opts.compaction_pri == options.kByCompensatedSize: - return CompactionPri.by_compensated_size - if self.opts.compaction_pri == options.kOldestLargestSeqFirst: - return CompactionPri.oldest_largest_seq_first - if self.opts.compaction_pri == options.kOldestSmallestSeqFirst: - return CompactionPri.oldest_smallest_seq_first - if self.opts.compaction_pri == options.kMinOverlappingRatio: - return CompactionPri.min_overlapping_ratio - def __set__(self, value): - if value == CompactionPri.by_compensated_size: - self.opts.compaction_pri = options.kByCompensatedSize - elif value == CompactionPri.oldest_largest_seq_first: - self.opts.compaction_pri = options.kOldestLargestSeqFirst - elif value == CompactionPri.oldest_smallest_seq_first: - self.opts.compaction_pri = options.kOldestSmallestSeqFirst - elif value == CompactionPri.min_overlapping_ratio: - self.opts.compaction_pri = options.kMinOverlappingRatio - else: - raise TypeError("Unknown compaction pri: %s" % value) - - - property compression: - def __get__(self): - if self.opts.compression == options.kNoCompression: - return CompressionType.no_compression - elif self.opts.compression == options.kSnappyCompression: - return CompressionType.snappy_compression - elif self.opts.compression == options.kZlibCompression: - return CompressionType.zlib_compression - elif self.opts.compression == options.kBZip2Compression: - return CompressionType.bzip2_compression - elif self.opts.compression == options.kLZ4Compression: - return CompressionType.lz4_compression - elif self.opts.compression == options.kLZ4HCCompression: - return CompressionType.lz4hc_compression - elif self.opts.compression == options.kXpressCompression: - return CompressionType.xpress_compression - elif self.opts.compression == options.kZSTD: - return CompressionType.zstd_compression - elif self.opts.compression == options.kZSTDNotFinalCompression: - return CompressionType.zstdnotfinal_compression - elif self.opts.compression == options.kDisableCompressionOption: - return CompressionType.disable_compression - else: - raise Exception("Unknonw type: %s" % self.opts.compression) - - def __set__(self, value): - if value == CompressionType.no_compression: - self.opts.compression = options.kNoCompression - elif value == CompressionType.snappy_compression: - self.opts.compression = options.kSnappyCompression - elif value == CompressionType.zlib_compression: - self.opts.compression = options.kZlibCompression - elif value == CompressionType.bzip2_compression: - self.opts.compression = options.kBZip2Compression - elif value == CompressionType.lz4_compression: - self.opts.compression = options.kLZ4Compression - elif value == CompressionType.lz4hc_compression: - self.opts.compression = options.kLZ4HCCompression - elif value == CompressionType.zstd_compression: - self.opts.compression = options.kZSTD - elif value == CompressionType.zstdnotfinal_compression: - self.opts.compression = options.kZSTDNotFinalCompression - elif value == CompressionType.disable_compression: - self.opts.compression = options.kDisableCompressionOption - else: - raise TypeError("Unknown compression: %s" % value) - - property num_levels: - def __get__(self): - return self.opts.num_levels - def __set__(self, value): - self.opts.num_levels = value - - property level0_file_num_compaction_trigger: - def __get__(self): - return self.opts.level0_file_num_compaction_trigger - def __set__(self, value): - self.opts.level0_file_num_compaction_trigger = value - - property level0_slowdown_writes_trigger: - def __get__(self): - return self.opts.level0_slowdown_writes_trigger - def __set__(self, value): - self.opts.level0_slowdown_writes_trigger = value - - property level0_stop_writes_trigger: - def __get__(self): - return self.opts.level0_stop_writes_trigger - def __set__(self, value): - self.opts.level0_stop_writes_trigger = value - - property max_mem_compaction_level: - def __get__(self): - return self.opts.max_mem_compaction_level - def __set__(self, value): - self.opts.max_mem_compaction_level = value - - property target_file_size_base: - def __get__(self): - return self.opts.target_file_size_base - def __set__(self, value): - self.opts.target_file_size_base = value - - property target_file_size_multiplier: - def __get__(self): - return self.opts.target_file_size_multiplier - def __set__(self, value): - self.opts.target_file_size_multiplier = value - - property max_bytes_for_level_base: - def __get__(self): - return self.opts.max_bytes_for_level_base - def __set__(self, value): - self.opts.max_bytes_for_level_base = value - - property max_bytes_for_level_multiplier: - def __get__(self): - return self.opts.max_bytes_for_level_multiplier - def __set__(self, value): - self.opts.max_bytes_for_level_multiplier = value - - property max_bytes_for_level_multiplier_additional: - def __get__(self): - return self.opts.max_bytes_for_level_multiplier_additional - def __set__(self, value): - self.opts.max_bytes_for_level_multiplier_additional = value - property use_fsync: def __get__(self): return self.opts.use_fsync @@ -1021,24 +1338,6 @@ cdef class Options(object): def __set__(self, value): self.opts.keep_log_file_num = value - property soft_rate_limit: - def __get__(self): - return self.opts.soft_rate_limit - def __set__(self, value): - self.opts.soft_rate_limit = value - - property hard_rate_limit: - def __get__(self): - return self.opts.hard_rate_limit - def __set__(self, value): - self.opts.hard_rate_limit = value - - property rate_limit_delay_max_milliseconds: - def __get__(self): - return self.opts.rate_limit_delay_max_milliseconds - def __set__(self, value): - self.opts.rate_limit_delay_max_milliseconds = value - property max_manifest_file_size: def __get__(self): return self.opts.max_manifest_file_size @@ -1051,18 +1350,6 @@ cdef class Options(object): def __set__(self, value): self.opts.table_cache_numshardbits = value - property arena_block_size: - def __get__(self): - return self.opts.arena_block_size - def __set__(self, value): - self.opts.arena_block_size = value - - property disable_auto_compactions: - def __get__(self): - return self.opts.disable_auto_compactions - def __set__(self, value): - self.opts.disable_auto_compactions = value - property wal_ttl_seconds: def __get__(self): return self.opts.WAL_ttl_seconds @@ -1081,19 +1368,6 @@ cdef class Options(object): def __set__(self, value): self.opts.manifest_preallocation_size = value - property purge_redundant_kvs_while_flush: - def __get__(self): - return self.opts.purge_redundant_kvs_while_flush - def __set__(self, value): - self.opts.purge_redundant_kvs_while_flush = value - - # FIXME: remove to util/options_helper.h - # property allow_os_buffer: - # def __get__(self): - # return self.opts.allow_os_buffer - # def __set__(self, value): - # self.opts.allow_os_buffer = value - property enable_write_thread_adaptive_yield: def __get__(self): return self.opts.enable_write_thread_adaptive_yield @@ -1142,6 +1416,13 @@ cdef class Options(object): def __set__(self, value): self.opts.advise_random_on_open = value + # TODO: need to remove -Wconversion to make this work + # property access_hint_on_compaction_start: + # def __get__(self): + # return self.opts.access_hint_on_compaction_start + # def __set__(self, AccessHint value): + # self.opts.access_hint_on_compaction_start = value + property use_adaptive_mutex: def __get__(self): return self.opts.use_adaptive_mutex @@ -1154,155 +1435,6 @@ cdef class Options(object): def __set__(self, value): self.opts.bytes_per_sync = value - property compaction_style: - def __get__(self): - if self.opts.compaction_style == kCompactionStyleLevel: - return 'level' - if self.opts.compaction_style == kCompactionStyleUniversal: - return 'universal' - if self.opts.compaction_style == kCompactionStyleFIFO: - return 'fifo' - if self.opts.compaction_style == kCompactionStyleNone: - return 'none' - raise Exception("Unknown compaction_style") - - def __set__(self, str value): - if value == 'level': - self.opts.compaction_style = kCompactionStyleLevel - elif value == 'universal': - self.opts.compaction_style = kCompactionStyleUniversal - elif value == 'fifo': - self.opts.compaction_style = kCompactionStyleFIFO - elif value == 'none': - self.opts.compaction_style = kCompactionStyleNone - else: - raise Exception("Unknown compaction style") - - property compaction_options_universal: - def __get__(self): - cdef universal_compaction.CompactionOptionsUniversal uopts - cdef dict ret_ob = {} - - uopts = self.opts.compaction_options_universal - - ret_ob['size_ratio'] = uopts.size_ratio - ret_ob['min_merge_width'] = uopts.min_merge_width - ret_ob['max_merge_width'] = uopts.max_merge_width - ret_ob['max_size_amplification_percent'] = uopts.max_size_amplification_percent - ret_ob['compression_size_percent'] = uopts.compression_size_percent - - if uopts.stop_style == kCompactionStopStyleSimilarSize: - ret_ob['stop_style'] = 'similar_size' - elif uopts.stop_style == kCompactionStopStyleTotalSize: - ret_ob['stop_style'] = 'total_size' - else: - raise Exception("Unknown compaction style") - - return ret_ob - - def __set__(self, dict value): - cdef universal_compaction.CompactionOptionsUniversal* uopts - uopts = cython.address(self.opts.compaction_options_universal) - - if 'size_ratio' in value: - uopts.size_ratio = value['size_ratio'] - - if 'min_merge_width' in value: - uopts.min_merge_width = value['min_merge_width'] - - if 'max_merge_width' in value: - uopts.max_merge_width = value['max_merge_width'] - - if 'max_size_amplification_percent' in value: - uopts.max_size_amplification_percent = value['max_size_amplification_percent'] - - if 'compression_size_percent' in value: - uopts.compression_size_percent = value['compression_size_percent'] - - if 'stop_style' in value: - if value['stop_style'] == 'similar_size': - uopts.stop_style = kCompactionStopStyleSimilarSize - elif value['stop_style'] == 'total_size': - uopts.stop_style = kCompactionStopStyleTotalSize - else: - raise Exception("Unknown compaction style") - - # Deprecate - # property filter_deletes: - # def __get__(self): - # return self.opts.filter_deletes - # def __set__(self, value): - # self.opts.filter_deletes = value - - property max_sequential_skip_in_iterations: - def __get__(self): - return self.opts.max_sequential_skip_in_iterations - def __set__(self, value): - self.opts.max_sequential_skip_in_iterations = value - - property inplace_update_support: - def __get__(self): - return self.opts.inplace_update_support - def __set__(self, value): - self.opts.inplace_update_support = value - - property table_factory: - def __get__(self): - return self.py_table_factory - - def __set__(self, PyTableFactory value): - self.py_table_factory = value - self.opts.table_factory = value.get_table_factory() - - property memtable_factory: - def __get__(self): - return self.py_memtable_factory - - def __set__(self, PyMemtableFactory value): - self.py_memtable_factory = value - self.opts.memtable_factory = value.get_memtable_factory() - - property inplace_update_num_locks: - def __get__(self): - return self.opts.inplace_update_num_locks - def __set__(self, value): - self.opts.inplace_update_num_locks = value - - property comparator: - def __get__(self): - return self.py_comparator.get_ob() - - def __set__(self, value): - if isinstance(value, PyComparator): - if (value).get_comparator() == NULL: - raise Exception("Cannot set %s as comparator" % value) - else: - self.py_comparator = value - else: - self.py_comparator = PyGenericComparator(value) - - self.opts.comparator = self.py_comparator.get_comparator() - - property merge_operator: - def __get__(self): - if self.py_merge_operator is None: - return None - return self.py_merge_operator.get_ob() - - def __set__(self, value): - self.py_merge_operator = PyMergeOperator(value) - self.opts.merge_operator = self.py_merge_operator.get_operator() - - property prefix_extractor: - def __get__(self): - if self.py_prefix_extractor is None: - return None - return self.py_prefix_extractor.get_ob() - - def __set__(self, value): - self.py_prefix_extractor = PySliceTransform(value) - self.opts.prefix_extractor = self.py_prefix_extractor.get_transformer() - property row_cache: def __get__(self): return self.py_row_cache @@ -1344,13 +1476,28 @@ cdef class WriteBatch(object): del self.batch def put(self, key, value): - self.batch.Put(bytes_to_slice(key), bytes_to_slice(value)) + cdef db.ColumnFamilyHandle* cf_handle = NULL + if isinstance(key, tuple): + column_family, key = key + cf_handle = (column_family).get_handle() + # nullptr is default family + self.batch.Put(cf_handle, bytes_to_slice(key), bytes_to_slice(value)) def merge(self, key, value): - self.batch.Merge(bytes_to_slice(key), bytes_to_slice(value)) + cdef db.ColumnFamilyHandle* cf_handle = NULL + if isinstance(key, tuple): + column_family, key = key + cf_handle = (column_family).get_handle() + # nullptr is default family + self.batch.Merge(cf_handle, bytes_to_slice(key), bytes_to_slice(value)) def delete(self, key): - self.batch.Delete(bytes_to_slice(key)) + cdef db.ColumnFamilyHandle* cf_handle = NULL + if isinstance(key, tuple): + column_family, key = key + cf_handle = (column_family).get_handle() + # nullptr is default family + self.batch.Delete(cf_handle, bytes_to_slice(key)) def clear(self): self.batch.Clear() @@ -1398,11 +1545,21 @@ cdef class WriteBatchIterator(object): elif self.items[self.pos].op == db.BatchItemOpDelte: op = "Delete" - ret = ( - op, - slice_to_bytes(self.items[self.pos].key), - slice_to_bytes(self.items[self.pos].value)) - + if self.items[self.pos].column_family_id != 0: # Column Family is set + ret = ( + op, + ( + self.items[self.pos].column_family_id, + slice_to_bytes(self.items[self.pos].key) + ), + slice_to_bytes(self.items[self.pos].value) + ) + else: + ret = ( + op, + slice_to_bytes(self.items[self.pos].key), + slice_to_bytes(self.items[self.pos].value) + ) self.pos += 1 return ret @@ -1410,34 +1567,83 @@ cdef class WriteBatchIterator(object): cdef class DB(object): cdef Options opts cdef db.DB* db + cdef list cf_handles + cdef list cf_options - def __cinit__(self, db_name, Options opts, read_only=False): + def __cinit__(self, db_name, Options opts, dict column_families=None, read_only=False): cdef Status st cdef string db_path + cdef vector[db.ColumnFamilyDescriptor] column_family_descriptors + cdef vector[db.ColumnFamilyHandle*] column_family_handles + cdef bytes default_cf_name = db.kDefaultColumnFamilyName self.db = NULL self.opts = None + self.cf_handles = [] + self.cf_options = [] if opts.in_use: raise Exception("Options object is already used by another DB") db_path = path_to_string(db_name) + if not column_families or default_cf_name not in column_families: + # Always add the default column family + column_family_descriptors.push_back( + db.ColumnFamilyDescriptor( + db.kDefaultColumnFamilyName, + options.ColumnFamilyOptions(deref(opts.opts)) + ) + ) + self.cf_options.append(None) # Since they are the same as db + if column_families: + for cf_name, cf_options in column_families.items(): + if not isinstance(cf_name, bytes): + raise TypeError( + f"column family name {cf_name!r} is not of type {bytes}!" + ) + if not isinstance(cf_options, ColumnFamilyOptions): + raise TypeError( + f"column family options {cf_options!r} is not of type " + f"{ColumnFamilyOptions}!" + ) + if (cf_options).in_use: + raise Exception( + f"ColumnFamilyOptions object for {cf_name} is already " + "used by another Column Family" + ) + (cf_options).in_use = True + column_family_descriptors.push_back( + db.ColumnFamilyDescriptor( + cf_name, + deref((cf_options).copts) + ) + ) + self.cf_options.append(cf_options) if read_only: with nogil: - st = db.DB_OpenForReadOnly( + st = db.DB_OpenForReadOnly_ColumnFamilies( deref(opts.opts), db_path, - cython.address(self.db), + column_family_descriptors, + &column_family_handles, + &self.db, False) else: with nogil: - st = db.DB_Open( + st = db.DB_Open_ColumnFamilies( deref(opts.opts), db_path, - cython.address(self.db)) + column_family_descriptors, + &column_family_handles, + &self.db) check_status(st) + for handle in column_family_handles: + wrapper = _ColumnFamilyHandle.from_handle_ptr(handle) + self.cf_handles.append(wrapper) + # Inject the loggers into the python callbacks - cdef shared_ptr[logger.Logger] info_log = self.db.GetOptions().info_log + cdef shared_ptr[logger.Logger] info_log = self.db.GetOptions( + self.db.DefaultColumnFamily()).info_log if opts.py_comparator is not None: opts.py_comparator.set_info_log(info_log) @@ -1447,28 +1653,70 @@ cdef class DB(object): if opts.prefix_extractor is not None: opts.py_prefix_extractor.set_info_log(info_log) + cdef ColumnFamilyOptions copts + for idx, copts in enumerate(self.cf_options): + if not copts: + continue + + info_log = self.db.GetOptions(column_family_handles[idx]).info_log + + if copts.py_comparator is not None: + copts.py_comparator.set_info_log(info_log) + + if copts.py_table_factory is not None: + copts.py_table_factory.set_info_log(info_log) + + if copts.prefix_extractor is not None: + copts.py_prefix_extractor.set_info_log(info_log) + self.opts = opts self.opts.in_use = True def __dealloc__(self): + cdef ColumnFamilyOptions copts if not self.db == NULL: + # We have to make sure we delete the handles so rocksdb doesn't + # assert when we delete the db + self.cf_handles.clear() + for copts in self.cf_options: + if copts: + copts.in_use = False + self.cf_options.clear() + with nogil: del self.db if self.opts is not None: self.opts.in_use = False + @property + def column_families(self): + return [handle.weakref for handle in self.cf_handles] + + def get_column_family(self, bytes name): + for handle in self.cf_handles: + if handle.name == name: + return handle.weakref + def put(self, key, value, sync=False, disable_wal=False): cdef Status st cdef options.WriteOptions opts opts.sync = sync opts.disableWAL = disable_wal + if isinstance(key, tuple): + column_family, key = key + else: + column_family = None + cdef Slice c_key = bytes_to_slice(key) cdef Slice c_value = bytes_to_slice(value) + cdef db.ColumnFamilyHandle* cf_handle = self.db.DefaultColumnFamily() + if column_family: + cf_handle = (column_family).get_handle() with nogil: - st = self.db.Put(opts, c_key, c_value) + st = self.db.Put(opts, cf_handle, c_key, c_value) check_status(st) def delete(self, key, sync=False, disable_wal=False): @@ -1477,9 +1725,18 @@ cdef class DB(object): opts.sync = sync opts.disableWAL = disable_wal + if isinstance(key, tuple): + column_family, key = key + else: + column_family = None + cdef Slice c_key = bytes_to_slice(key) + cdef db.ColumnFamilyHandle* cf_handle = self.db.DefaultColumnFamily() + if column_family: + cf_handle = (column_family).get_handle() + with nogil: - st = self.db.Delete(opts, c_key) + st = self.db.Delete(opts, cf_handle, c_key) check_status(st) def merge(self, key, value, sync=False, disable_wal=False): @@ -1488,10 +1745,19 @@ cdef class DB(object): opts.sync = sync opts.disableWAL = disable_wal + if isinstance(key, tuple): + column_family, key = key + else: + column_family = None + cdef Slice c_key = bytes_to_slice(key) cdef Slice c_value = bytes_to_slice(value) + cdef db.ColumnFamilyHandle* cf_handle = self.db.DefaultColumnFamily() + if column_family: + cf_handle = (column_family).get_handle() + with nogil: - st = self.db.Merge(opts, c_key, c_value) + st = self.db.Merge(opts, cf_handle, c_key, c_value) check_status(st) def write(self, WriteBatch batch, sync=False, disable_wal=False): @@ -1510,10 +1776,19 @@ cdef class DB(object): cdef options.ReadOptions opts opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs)) + + if isinstance(key, tuple): + column_family, key = key + else: + column_family = None + cdef Slice c_key = bytes_to_slice(key) + cdef db.ColumnFamilyHandle* cf_handle = self.db.DefaultColumnFamily() + if column_family: + cf_handle = (column_family).get_handle() with nogil: - st = self.db.Get(opts, c_key, cython.address(res)) + st = self.db.Get(opts, cf_handle, c_key, cython.address(res)) if st.ok(): return string_to_bytes(res) @@ -1526,9 +1801,17 @@ cdef class DB(object): cdef vector[string] values values.resize(len(keys)) + cdef db.ColumnFamilyHandle* cf_handle + cdef vector[db.ColumnFamilyHandle*] cf_handles cdef vector[Slice] c_keys for key in keys: + if isinstance(key, tuple): + py_handle, key = key + cf_handle = (py_handle).get_handle() + else: + cf_handle = self.db.DefaultColumnFamily() c_keys.push_back(bytes_to_slice(key)) + cf_handles.push_back(cf_handle) cdef options.ReadOptions opts opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs)) @@ -1537,6 +1820,7 @@ cdef class DB(object): with nogil: res = self.db.MultiGet( opts, + cf_handles, c_keys, cython.address(values)) @@ -1557,7 +1841,12 @@ cdef class DB(object): cdef cpp_bool exists cdef options.ReadOptions opts cdef Slice c_key + cdef db.ColumnFamilyHandle* cf_handle = self.db.DefaultColumnFamily() + opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs)) + if isinstance(key, tuple): + column_family, key = key + cf_handle = (column_family).get_handle() c_key = bytes_to_slice(key) exists = False @@ -1567,6 +1856,7 @@ cdef class DB(object): with nogil: exists = self.db.KeyMayExist( opts, + cf_handle, c_key, cython.address(value), cython.address(value_found)) @@ -1582,56 +1872,143 @@ cdef class DB(object): with nogil: exists = self.db.KeyMayExist( opts, + cf_handle, c_key, cython.address(value)) return (exists, None) - def iterkeys(self, *args, **kwargs): + def iterkeys(self, ColumnFamilyHandle column_family=None, *args, **kwargs): cdef options.ReadOptions opts cdef KeysIterator it + cdef db.ColumnFamilyHandle* cf_handle = self.db.DefaultColumnFamily() + if column_family: + cf_handle = column_family.get_handle() opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs)) - it = KeysIterator(self) + it = KeysIterator(self, column_family) with nogil: - it.ptr = self.db.NewIterator(opts) + it.ptr = self.db.NewIterator(opts, cf_handle) return it - def itervalues(self, *args, **kwargs): + def itervalues(self, ColumnFamilyHandle column_family=None, *args, **kwargs): cdef options.ReadOptions opts cdef ValuesIterator it + cdef db.ColumnFamilyHandle* cf_handle = self.db.DefaultColumnFamily() + if column_family: + cf_handle = column_family.get_handle() opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs)) it = ValuesIterator(self) with nogil: - it.ptr = self.db.NewIterator(opts) + it.ptr = self.db.NewIterator(opts, cf_handle) return it - def iteritems(self, *args, **kwargs): + def iteritems(self, ColumnFamilyHandle column_family=None, *args, **kwargs): cdef options.ReadOptions opts cdef ItemsIterator it - + cdef db.ColumnFamilyHandle* cf_handle = self.db.DefaultColumnFamily() + if column_family: + cf_handle = column_family.get_handle() opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs)) - it = ItemsIterator(self) + it = ItemsIterator(self, column_family) with nogil: - it.ptr = self.db.NewIterator(opts) + it.ptr = self.db.NewIterator(opts, cf_handle) return it + def iterskeys(self, column_families, *args, **kwargs): + cdef vector[db.Iterator*] iters + iters.resize(len(column_families)) + cdef options.ReadOptions opts + cdef db.Iterator* it_ptr + cdef KeysIterator it + cdef db.ColumnFamilyHandle* cf_handle + cdef vector[db.ColumnFamilyHandle*] cf_handles + + for column_family in column_families: + cf_handle = (column_family).get_handle() + cf_handles.push_back(cf_handle) + + opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs)) + with nogil: + self.db.NewIterators(opts, cf_handles, &iters) + + cf_iter = iter(column_families) + cdef list ret = [] + for it_ptr in iters: + it = KeysIterator(self, next(cf_iter)) + it.ptr = it_ptr + ret.append(it) + return ret + + def itersvalues(self, column_families, *args, **kwargs): + cdef vector[db.Iterator*] iters + iters.resize(len(column_families)) + cdef options.ReadOptions opts + cdef db.Iterator* it_ptr + cdef ValuesIterator it + cdef db.ColumnFamilyHandle* cf_handle + cdef vector[db.ColumnFamilyHandle*] cf_handles + + for column_family in column_families: + cf_handle = (column_family).get_handle() + cf_handles.push_back(cf_handle) + + opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs)) + with nogil: + self.db.NewIterators(opts, cf_handles, &iters) + + cdef list ret = [] + for it_ptr in iters: + it = ValuesIterator(self) + it.ptr = it_ptr + ret.append(it) + return ret + + def iterskeys(self, column_families, *args, **kwargs): + cdef vector[db.Iterator*] iters + iters.resize(len(column_families)) + cdef options.ReadOptions opts + cdef db.Iterator* it_ptr + cdef ItemsIterator it + cdef db.ColumnFamilyHandle* cf_handle + cdef vector[db.ColumnFamilyHandle*] cf_handles + + for column_family in column_families: + cf_handle = (column_family).get_handle() + cf_handles.push_back(cf_handle) + + opts = self.build_read_opts(self.__parse_read_opts(*args, **kwargs)) + with nogil: + self.db.NewIterators(opts, cf_handles, &iters) + + + cf_iter = iter(column_families) + cdef list ret = [] + for it_ptr in iters: + it = ItemsIterator(self, next(cf_iter)) + it.ptr = it_ptr + ret.append(it) + return ret + def snapshot(self): return Snapshot(self) - def get_property(self, prop): + def get_property(self, prop, ColumnFamilyHandle column_family=None): cdef string value cdef Slice c_prop = bytes_to_slice(prop) cdef cpp_bool ret = False + cdef db.ColumnFamilyHandle* cf_handle = self.db.DefaultColumnFamily() + if column_family: + cf_handle = column_family.get_handle() with nogil: - ret = self.db.GetProperty(c_prop, cython.address(value)) + ret = self.db.GetProperty(cf_handle, c_prop, cython.address(value)) if ret: return string_to_bytes(value) @@ -1659,7 +2036,7 @@ cdef class DB(object): return ret - def compact_range(self, begin=None, end=None, **py_options): + def compact_range(self, begin=None, end=None, ColumnFamilyHandle column_family=None, **py_options): cdef options.CompactRangeOptions c_options c_options.change_level = py_options.get('change_level', False) @@ -1693,7 +2070,11 @@ cdef class DB(object): end_val = bytes_to_slice(end) end_ptr = cython.address(end_val) - st = self.db.CompactRange(c_options, begin_ptr, end_ptr) + cdef db.ColumnFamilyHandle* cf_handle = self.db.DefaultColumnFamily() + if column_family: + cf_handle = (column_family).get_handle() + + st = self.db.CompactRange(c_options, cf_handle, begin_ptr, end_ptr) check_status(st) @staticmethod @@ -1726,6 +2107,48 @@ cdef class DB(object): def __get__(self): return self.opts + def create_column_family(self, bytes name, ColumnFamilyOptions copts): + cdef db.ColumnFamilyHandle* cf_handle + cdef Status st + cdef string c_name = name + + for handle in self.cf_handles: + if handle.name == name: + raise ValueError(f"{name} is already an existing column family") + + if copts.in_use: + raise Exception("ColumnFamilyOptions are in_use by another column family") + + copts.in_use = True + with nogil: + st = self.db.CreateColumnFamily(deref(copts.copts), c_name, &cf_handle) + check_status(st) + + handle = _ColumnFamilyHandle.from_handle_ptr(cf_handle) + + self.cf_handles.append(handle) + self.cf_options.append(copts) + return handle.weakref + + def drop_column_family(self, ColumnFamilyHandle weak_handle not None): + cdef db.ColumnFamilyHandle* cf_handle + cdef ColumnFamilyOptions copts + cdef Status st + + cf_handle = weak_handle.get_handle() + + with nogil: + st = self.db.DropColumnFamily(cf_handle) + check_status(st) + + py_handle = weak_handle._ref() + index = self.cf_handles.index(py_handle) + copts = self.cf_options.pop(index) + del self.cf_handles[index] + del py_handle + if copts: + copts.in_use = False + def repair_db(db_name, Options opts): cdef Status st @@ -1736,6 +2159,19 @@ def repair_db(db_name, Options opts): check_status(st) +def list_column_families(db_name, Options opts): + cdef Status st + cdef string db_path + cdef vector[string] column_families + + db_path = path_to_string(db_name) + with nogil: + st = db.ListColumnFamilies(deref(opts.opts), db_path, &column_families) + check_status(st) + + return column_families + + @cython.no_gc_clear @cython.internal cdef class Snapshot(object): @@ -1758,10 +2194,12 @@ cdef class Snapshot(object): cdef class BaseIterator(object): cdef iterator.Iterator* ptr cdef DB db + cdef ColumnFamilyHandle handle - def __cinit__(self, DB db): + def __cinit__(self, DB db, ColumnFamilyHandle handle = None): self.db = db self.ptr = NULL + self.handle = handle def __dealloc__(self): if not self.ptr == NULL: @@ -1819,6 +2257,8 @@ cdef class KeysIterator(BaseIterator): with nogil: c_key = self.ptr.key() check_status(self.ptr.status()) + if self.handle: + return self.handle, slice_to_bytes(c_key) return slice_to_bytes(c_key) @cython.internal @@ -1839,6 +2279,8 @@ cdef class ItemsIterator(BaseIterator): c_key = self.ptr.key() c_value = self.ptr.value() check_status(self.ptr.status()) + if self.handle: + return ((self.handle, slice_to_bytes(c_key)), slice_to_bytes(c_value)) return (slice_to_bytes(c_key), slice_to_bytes(c_value)) @cython.internal diff --git a/rocksdb/cpp/write_batch_iter_helper.hpp b/rocksdb/cpp/write_batch_iter_helper.hpp index 97c2f1d..ff71d09 100644 --- a/rocksdb/cpp/write_batch_iter_helper.hpp +++ b/rocksdb/cpp/write_batch_iter_helper.hpp @@ -13,14 +13,17 @@ class RecordItemsHandler: public rocksdb::WriteBatch::Handler { public: BatchItem( const Optype& op, + uint32_t column_family_id, const rocksdb::Slice& key, const rocksdb::Slice& value): op(op), + column_family_id(column_family_id), key(key), value(value) {} const Optype op; + uint32_t column_family_id; const rocksdb::Slice key; const rocksdb::Slice value; }; @@ -31,17 +34,23 @@ class RecordItemsHandler: public rocksdb::WriteBatch::Handler { /* Items is filled during iteration. */ RecordItemsHandler(BatchItems* items): items(items) {} - void Put(const Slice& key, const Slice& value) { - this->items->emplace_back(PutRecord, key, value); + virtual rocksdb::Status PutCF( + uint32_t column_family_id, const Slice& key, const Slice& value) { + this->items->emplace_back(PutRecord, column_family_id, key, value); + return rocksdb::Status::OK(); } - void Merge(const Slice& key, const Slice& value) { - this->items->emplace_back(MergeRecord, key, value); + virtual rocksdb::Status MergeCF( + uint32_t column_family_id, const Slice& key, const Slice& value) { + this->items->emplace_back(MergeRecord, column_family_id, key, value); + return rocksdb::Status::OK(); } - virtual void Delete(const Slice& key) { - this->items->emplace_back(DeleteRecord, key, rocksdb::Slice()); - } + virtual rocksdb::Status DeleteCF( + uint32_t column_family_id, const Slice& key) { + this->items->emplace_back(DeleteRecord, column_family_id, key, rocksdb::Slice()); + return rocksdb::Status::OK(); + } private: BatchItems* items; diff --git a/rocksdb/db.pxd b/rocksdb/db.pxd index a83786b..3578501 100644 --- a/rocksdb/db.pxd +++ b/rocksdb/db.pxd @@ -1,5 +1,5 @@ cimport options -from libc.stdint cimport uint64_t +from libc.stdint cimport uint64_t, uint32_t from status cimport Status from libcpp cimport bool as cpp_bool from libcpp.string cimport string @@ -13,8 +13,11 @@ cdef extern from "rocksdb/write_batch.h" namespace "rocksdb": WriteBatch() nogil except+ WriteBatch(string) nogil except+ void Put(const Slice&, const Slice&) nogil except+ + void Put(ColumnFamilyHandle*, const Slice&, const Slice&) nogil except+ void Merge(const Slice&, const Slice&) nogil except+ + void Merge(ColumnFamilyHandle*, const Slice&, const Slice&) nogil except+ void Delete(const Slice&) nogil except+ + void Delete(ColumnFamilyHandle*, const Slice&) nogil except+ void PutLogData(const Slice&) nogil except+ void Clear() nogil except+ const string& Data() nogil except+ @@ -28,6 +31,7 @@ cdef extern from "cpp/write_batch_iter_helper.hpp" namespace "py_rocks": cdef cppclass BatchItem "py_rocks::RecordItemsHandler::BatchItem": BatchItemOp op + uint32_t column_family_id Slice key Slice value @@ -36,6 +40,7 @@ cdef extern from "cpp/write_batch_iter_helper.hpp" namespace "py_rocks": cdef extern from "rocksdb/db.h" namespace "rocksdb": ctypedef uint64_t SequenceNumber + string kDefaultColumnFamilyName cdef struct LiveFileMetaData: string name @@ -52,15 +57,18 @@ cdef extern from "rocksdb/db.h" namespace "rocksdb": cdef cppclass DB: Status Put( const options.WriteOptions&, + ColumnFamilyHandle*, const Slice&, const Slice&) nogil except+ Status Delete( const options.WriteOptions&, + ColumnFamilyHandle*, const Slice&) nogil except+ Status Merge( const options.WriteOptions&, + ColumnFamilyHandle*, const Slice&, const Slice&) nogil except+ @@ -70,52 +78,73 @@ cdef extern from "rocksdb/db.h" namespace "rocksdb": Status Get( const options.ReadOptions&, + ColumnFamilyHandle*, const Slice&, string*) nogil except+ vector[Status] MultiGet( const options.ReadOptions&, + const vector[ColumnFamilyHandle*]&, const vector[Slice]&, vector[string]*) nogil except+ cpp_bool KeyMayExist( const options.ReadOptions&, + ColumnFamilyHandle*, Slice&, string*, cpp_bool*) nogil except+ cpp_bool KeyMayExist( const options.ReadOptions&, + ColumnFamilyHandle*, Slice&, string*) nogil except+ Iterator* NewIterator( - const options.ReadOptions&) nogil except+ + const options.ReadOptions&, + ColumnFamilyHandle*) nogil except+ + + void NewIterators( + const options.ReadOptions&, + vector[ColumnFamilyHandle*]&, + vector[Iterator*]*) nogil except+ const Snapshot* GetSnapshot() nogil except+ void ReleaseSnapshot(const Snapshot*) nogil except+ cpp_bool GetProperty( + ColumnFamilyHandle*, const Slice&, string*) nogil except+ void GetApproximateSizes( + ColumnFamilyHandle*, const Range* int, uint64_t*) nogil except+ Status CompactRange( const options.CompactRangeOptions&, + ColumnFamilyHandle*, const Slice*, const Slice*) nogil except+ - int NumberLevels() nogil except+ - int MaxMemCompactionLevel() nogil except+ - int Level0StopWriteTrigger() nogil except+ + Status CreateColumnFamily( + const options.ColumnFamilyOptions&, + const string&, + ColumnFamilyHandle**) nogil except+ + + Status DropColumnFamily( + ColumnFamilyHandle*) nogil except+ + + int NumberLevels(ColumnFamilyHandle*) nogil except+ + int MaxMemCompactionLevel(ColumnFamilyHandle*) nogil except+ + int Level0StopWriteTrigger(ColumnFamilyHandle*) nogil except+ const string& GetName() nogil except+ - const options.Options& GetOptions() nogil except+ - Status Flush(const options.FlushOptions&) nogil except+ + const options.Options& GetOptions(ColumnFamilyHandle*) nogil except+ + Status Flush(const options.FlushOptions&, ColumnFamilyHandle*) nogil except+ Status DisableFileDeletions() nogil except+ Status EnableFileDeletions() nogil except+ @@ -127,6 +156,7 @@ cdef extern from "rocksdb/db.h" namespace "rocksdb": Status DeleteFile(string) nogil except+ void GetLiveFilesMetaData(vector[LiveFileMetaData]*) nogil except+ + ColumnFamilyHandle* DefaultColumnFamily() cdef Status DB_Open "rocksdb::DB::Open"( @@ -134,10 +164,42 @@ cdef extern from "rocksdb/db.h" namespace "rocksdb": const string&, DB**) nogil except+ + cdef Status DB_Open_ColumnFamilies "rocksdb::DB::Open"( + const options.Options&, + const string&, + const vector[ColumnFamilyDescriptor]&, + vector[ColumnFamilyHandle*]*, + DB**) nogil except+ + cdef Status DB_OpenForReadOnly "rocksdb::DB::OpenForReadOnly"( const options.Options&, const string&, DB**, cpp_bool) nogil except+ + cdef Status DB_OpenForReadOnly_ColumnFamilies "rocksdb::DB::OpenForReadOnly"( + const options.Options&, + const string&, + const vector[ColumnFamilyDescriptor]&, + vector[ColumnFamilyHandle*]*, + DB**, + cpp_bool) nogil except+ + cdef Status RepairDB(const string& dbname, const options.Options&) + + cdef Status ListColumnFamilies "rocksdb::DB::ListColumnFamilies" ( + const options.Options&, + const string&, + vector[string]*) nogil except+ + + cdef cppclass ColumnFamilyHandle: + const string& GetName() nogil except+ + int GetID() nogil except+ + + cdef cppclass ColumnFamilyDescriptor: + ColumnFamilyDescriptor() nogil except+ + ColumnFamilyDescriptor( + const string&, + const options.ColumnFamilyOptions&) nogil except+ + string name + options.ColumnFamilyOptions options diff --git a/rocksdb/merge_operators.py b/rocksdb/merge_operators.py index 51fb6f7..9fb2d45 100644 --- a/rocksdb/merge_operators.py +++ b/rocksdb/merge_operators.py @@ -14,7 +14,7 @@ class UintAddOperator(AssociativeMergeOperator): class StringAppendOperator(AssociativeMergeOperator): def merge(self, key, existing_value, value): if existing_value: - s = existing_value + ',' + value + s = existing_value + b',' + value return (True, s) return (True, value) diff --git a/rocksdb/options.pxd b/rocksdb/options.pxd index 373c66a..894968d 100644 --- a/rocksdb/options.pxd +++ b/rocksdb/options.pxd @@ -52,20 +52,67 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb": kOldestSmallestSeqFirst kMinOverlappingRatio - cdef cppclass Options: - const Comparator* comparator - shared_ptr[MergeOperator] merge_operator - # TODO: compaction_filter - # TODO: compaction_filter_factory + # This needs to be in _rocksdb.pxd so it will export into python + #cpdef enum AccessHint "rocksdb::DBOptions::AccessHint": + # NONE, + # NORMAL, + # SEQUENTIAL, + # WILLNEED + + cdef cppclass DBOptions: cpp_bool create_if_missing + cpp_bool create_missing_column_families cpp_bool error_if_exists cpp_bool paranoid_checks # TODO: env shared_ptr[Logger] info_log + int max_open_files + int max_file_opening_threads + # TODO: statistics + cpp_bool use_fsync + string db_log_dir + string wal_dir + uint64_t delete_obsolete_files_period_micros + int max_background_jobs + int max_background_compactions + uint32_t max_subcompactions + int max_background_flushes + size_t max_log_file_size + size_t log_file_time_to_roll + size_t keep_log_file_num + size_t recycle_log_file_num + uint64_t max_manifest_file_size + int table_cache_numshardbits + uint64_t WAL_ttl_seconds + uint64_t WAL_size_limit_MB + size_t manifest_preallocation_size + cpp_bool allow_mmap_reads + cpp_bool allow_mmap_writes + cpp_bool use_direct_reads + cpp_bool use_direct_io_for_flush_and_compaction + cpp_bool allow_fallocate + cpp_bool is_fd_close_on_exec + cpp_bool skip_log_error_on_recovery + unsigned int stats_dump_period_sec + cpp_bool advise_random_on_open + size_t db_write_buffer_size + # AccessHint access_hint_on_compaction_start + cpp_bool use_adaptive_mutex + uint64_t bytes_per_sync + cpp_bool allow_concurrent_memtable_write + cpp_bool enable_write_thread_adaptive_yield + shared_ptr[Cache] row_cache + + cdef cppclass ColumnFamilyOptions: + ColumnFamilyOptions() + ColumnFamilyOptions(const Options& options) + const Comparator* comparator + shared_ptr[MergeOperator] merge_operator + # TODO: compaction_filter + # TODO: compaction_filter_factory size_t write_buffer_size int max_write_buffer_number int min_write_buffer_number_to_merge - int max_open_files CompressionType compression CompactionPri compaction_pri # TODO: compression_per_level @@ -83,39 +130,15 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb": int expanded_compaction_factor int source_compaction_factor int max_grandparent_overlap_factor - # TODO: statistics cpp_bool disableDataSync - cpp_bool use_fsync - string db_log_dir - string wal_dir - uint64_t delete_obsolete_files_period_micros - int max_background_compactions - int max_background_flushes - size_t max_log_file_size - size_t log_file_time_to_roll - size_t keep_log_file_num double soft_rate_limit double hard_rate_limit unsigned int rate_limit_delay_max_milliseconds - uint64_t max_manifest_file_size - int table_cache_numshardbits size_t arena_block_size # TODO: PrepareForBulkLoad() cpp_bool disable_auto_compactions - uint64_t WAL_ttl_seconds - uint64_t WAL_size_limit_MB - size_t manifest_preallocation_size cpp_bool purge_redundant_kvs_while_flush cpp_bool allow_os_buffer - cpp_bool allow_mmap_reads - cpp_bool allow_mmap_writes - cpp_bool is_fd_close_on_exec - cpp_bool skip_log_error_on_recovery - unsigned int stats_dump_period_sec - cpp_bool advise_random_on_open - # TODO: enum { NONE, NORMAL, SEQUENTIAL, WILLNEED } access_hint_on_compaction_start - cpp_bool use_adaptive_mutex - uint64_t bytes_per_sync cpp_bool verify_checksums_in_compaction CompactionStyle compaction_style CompactionOptionsUniversal compaction_options_universal @@ -126,12 +149,12 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb": # TODO: table_properties_collectors cpp_bool inplace_update_support size_t inplace_update_num_locks - shared_ptr[Cache] row_cache # TODO: remove options source_compaction_factor, max_grandparent_overlap_bytes and expanded_compaction_factor from document uint64_t max_compaction_bytes CompressionOptions compression_opts - cpp_bool allow_concurrent_memtable_write - cpp_bool enable_write_thread_adaptive_yield + + cdef cppclass Options(DBOptions, ColumnFamilyOptions): + pass cdef cppclass WriteOptions: cpp_bool sync diff --git a/rocksdb/tests/test_db.py b/rocksdb/tests/test_db.py index db64e9e..5709dd3 100644 --- a/rocksdb/tests/test_db.py +++ b/rocksdb/tests/test_db.py @@ -6,37 +6,42 @@ import unittest import rocksdb from itertools import takewhile import struct +import tempfile from rocksdb.merge_operators import UintAddOperator, StringAppendOperator def int_to_bytes(ob): return str(ob).encode('ascii') -class TestHelper(object): - def _clean(self): - if os.path.exists('/tmp/test'): - shutil.rmtree("/tmp/test") +class TestHelper(unittest.TestCase): + + def setUp(self): + self.db_loc = tempfile.mkdtemp() + self.addCleanup(self._close_db) def _close_db(self): del self.db gc.collect() + if os.path.exists(self.db_loc): + shutil.rmtree(self.db_loc) -class TestDB(unittest.TestCase, TestHelper): +class TestDB(TestHelper): def setUp(self): + TestHelper.setUp(self) opts = rocksdb.Options(create_if_missing=True) - self._clean() - self.db = rocksdb.DB("/tmp/test", opts) - - def tearDown(self): - self._close_db() + self.db = rocksdb.DB(os.path.join(self.db_loc, "test"), opts) def test_options_used_twice(self): + if sys.version_info[0] == 3: + assertRaisesRegex = self.assertRaisesRegex + else: + assertRaisesRegex = self.assertRaisesRegexp expected = "Options object is already used by another DB" - with self.assertRaisesRegexp(Exception, expected): - rocksdb.DB("/tmp/test2", self.db.options) + with assertRaisesRegex(Exception, expected): + rocksdb.DB(os.path.join(self.db_loc, "test2"), self.db.options) def test_unicode_path(self): - name = b'/tmp/M\xc3\xbcnchen'.decode('utf8') + name = os.path.join(self.db_loc, b'M\xc3\xbcnchen'.decode('utf8')) rocksdb.DB(name, rocksdb.Options(create_if_missing=True)) self.addCleanup(shutil.rmtree, name) self.assertTrue(os.path.isdir(name)) @@ -280,16 +285,13 @@ class AssocCounter(rocksdb.interfaces.AssociativeMergeOperator): return b'AssocCounter' -class TestUint64Merge(unittest.TestCase, TestHelper): +class TestUint64Merge(TestHelper): def setUp(self): + TestHelper.setUp(self) opts = rocksdb.Options() opts.create_if_missing = True opts.merge_operator = UintAddOperator() - self._clean() - self.db = rocksdb.DB('/tmp/test', opts) - - def tearDown(self): - self._close_db() + self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts) def test_merge(self): self.db.put(b'a', struct.pack('Q', 5566)) @@ -298,67 +300,55 @@ class TestUint64Merge(unittest.TestCase, TestHelper): self.assertEqual(5566 + sum(range(1000)), struct.unpack('Q', self.db.get(b'a'))[0]) -# class TestPutMerge(unittest.TestCase, TestHelper): +# class TestPutMerge(TestHelper): # def setUp(self): + # TestHelper.setUp(self) # opts = rocksdb.Options() # opts.create_if_missing = True # opts.merge_operator = "put" - # self._clean() - # self.db = rocksdb.DB('/tmp/test', opts) - - # def tearDown(self): - # self._close_db() + # self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts) # def test_merge(self): # self.db.put(b'a', b'ccc') # self.db.merge(b'a', b'ddd') # self.assertEqual(self.db.get(b'a'), 'ddd') -# class TestPutV1Merge(unittest.TestCase, TestHelper): +# class TestPutV1Merge(TestHelper): # def setUp(self): + # TestHelper.setUp(self) # opts = rocksdb.Options() # opts.create_if_missing = True # opts.merge_operator = "put_v1" - # self._clean() - # self.db = rocksdb.DB('/tmp/test', opts) - - # def tearDown(self): - # self._close_db() + # self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts) # def test_merge(self): # self.db.put(b'a', b'ccc') # self.db.merge(b'a', b'ddd') # self.assertEqual(self.db.get(b'a'), 'ddd') -class TestStringAppendOperatorMerge(unittest.TestCase, TestHelper): +class TestStringAppendOperatorMerge(TestHelper): def setUp(self): + TestHelper.setUp(self) opts = rocksdb.Options() opts.create_if_missing = True opts.merge_operator = StringAppendOperator() - self._clean() - self.db = rocksdb.DB('/tmp/test', opts) - - def tearDown(self): - self._close_db() + self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts) # NOTE(sileht): Raise "Corruption: Error: Could not perform merge." on PY3 - @unittest.skipIf(sys.version_info[0] == 3, - "Unexpected behavior on PY3") + #@unittest.skipIf(sys.version_info[0] == 3, + # "Unexpected behavior on PY3") def test_merge(self): self.db.put(b'a', b'ccc') self.db.merge(b'a', b'ddd') self.assertEqual(self.db.get(b'a'), b'ccc,ddd') -# class TestStringMaxOperatorMerge(unittest.TestCase, TestHelper): +# class TestStringMaxOperatorMerge(TestHelper): # def setUp(self): + # TestHelper.setUp(self) # opts = rocksdb.Options() # opts.create_if_missing = True # opts.merge_operator = "max" - # self._clean() - # self.db = rocksdb.DB('/tmp/test', opts) - - # def tearDown(self): - # self._close_db() + # self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts) # def test_merge(self): # self.db.put(b'a', int_to_bytes(55)) @@ -366,16 +356,13 @@ class TestStringAppendOperatorMerge(unittest.TestCase, TestHelper): # self.assertEqual(int(self.db.get(b'a')), 56) -class TestAssocMerge(unittest.TestCase, TestHelper): +class TestAssocMerge(TestHelper): def setUp(self): + TestHelper.setUp(self) opts = rocksdb.Options() opts.create_if_missing = True opts.merge_operator = AssocCounter() - self._clean() - self.db = rocksdb.DB('/tmp/test', opts) - - def tearDown(self): - self._close_db() + self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts) def test_merge(self): for x in range(1000): @@ -398,16 +385,13 @@ class FullCounter(rocksdb.interfaces.MergeOperator): return (True, int_to_bytes(int(left) + int(right))) -class TestFullMerge(unittest.TestCase, TestHelper): +class TestFullMerge(TestHelper): def setUp(self): + TestHelper.setUp(self) opts = rocksdb.Options() opts.create_if_missing = True opts.merge_operator = FullCounter() - self._clean() - self.db = rocksdb.DB('/tmp/test', opts) - - def tearDown(self): - self._close_db() + self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts) def test_merge(self): for x in range(1000): @@ -430,16 +414,13 @@ class SimpleComparator(rocksdb.interfaces.Comparator): return 1 -class TestComparator(unittest.TestCase, TestHelper): +class TestComparator(TestHelper): def setUp(self): + TestHelper.setUp(self) opts = rocksdb.Options() opts.create_if_missing = True opts.comparator = SimpleComparator() - self._clean() - self.db = rocksdb.DB('/tmp/test', opts) - - def tearDown(self): - self._close_db() + self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts) def test_compare(self): for x in range(1000): @@ -460,15 +441,12 @@ class StaticPrefix(rocksdb.interfaces.SliceTransform): def in_range(self, dst): return len(dst) == 5 -class TestPrefixExtractor(unittest.TestCase, TestHelper): +class TestPrefixExtractor(TestHelper): def setUp(self): + TestHelper.setUp(self) opts = rocksdb.Options(create_if_missing=True) opts.prefix_extractor = StaticPrefix() - self._clean() - self.db = rocksdb.DB('/tmp/test', opts) - - def tearDown(self): - self._close_db() + self.db = rocksdb.DB(os.path.join(self.db_loc, 'test'), opts) def _fill_db(self): for x in range(3000): @@ -502,3 +480,211 @@ class TestPrefixExtractor(unittest.TestCase, TestHelper): ref = {b'00002.z': b'z', b'00002.y': b'y', b'00002.x': b'x'} ret = takewhile(lambda item: item[0].startswith(b'00002'), it) self.assertEqual(ref, dict(ret)) + +class TestDBColumnFamilies(TestHelper): + def setUp(self): + TestHelper.setUp(self) + opts = rocksdb.Options(create_if_missing=True) + self.db = rocksdb.DB( + os.path.join(self.db_loc, 'test'), + opts, + ) + + self.cf_a = self.db.create_column_family(b'A', rocksdb.ColumnFamilyOptions()) + self.cf_b = self.db.create_column_family(b'B', rocksdb.ColumnFamilyOptions()) + + def test_column_families(self): + families = self.db.column_families + names = [handle.name for handle in families] + self.assertEqual([b'default', b'A', b'B'], names) + for name in names: + self.assertIn(self.db.get_column_family(name), families) + + self.assertEqual( + names, + rocksdb.list_column_families( + os.path.join(self.db_loc, 'test'), + rocksdb.Options(), + ) + ) + + def test_get_none(self): + self.assertIsNone(self.db.get(b'k')) + self.assertIsNone(self.db.get((self.cf_a, b'k'))) + self.assertIsNone(self.db.get((self.cf_b, b'k'))) + + def test_put_get(self): + key = (self.cf_a, b'k') + self.db.put(key, b"v") + self.assertEqual(b"v", self.db.get(key)) + self.assertIsNone(self.db.get(b"k")) + self.assertIsNone(self.db.get((self.cf_b, b"k"))) + + def test_multi_get(self): + data = [ + (b'a', b'1default'), + (b'b', b'2default'), + (b'c', b'3default'), + ((self.cf_a, b'a'), b'1a'), + ((self.cf_a, b'b'), b'2a'), + ((self.cf_a, b'c'), b'3a'), + ((self.cf_b, b'a'), b'1b'), + ((self.cf_b, b'b'), b'2b'), + ((self.cf_b, b'c'), b'3b'), + ] + for value in data: + self.db.put(*value) + + multi_get_lookup = [value[0] for value in data] + + ret = self.db.multi_get(multi_get_lookup) + ref = {value[0]: value[1] for value in data} + self.assertEqual(ref, ret) + + def test_delete(self): + self.db.put((self.cf_a, b"a"), b"b") + self.assertEqual(b"b", self.db.get((self.cf_a, b"a"))) + self.db.delete((self.cf_a, b"a")) + self.assertIsNone(self.db.get((self.cf_a, b"a"))) + + def test_write_batch(self): + cfa = self.db.get_column_family(b"A") + batch = rocksdb.WriteBatch() + batch.put((cfa, b"key"), b"v1") + batch.delete((self.cf_a, b"key")) + batch.put((cfa, b"key"), b"v2") + batch.put((cfa, b"key"), b"v3") + batch.put((cfa, b"a"), b"1") + batch.put((cfa, b"b"), b"2") + + self.db.write(batch) + query = [(cfa, b"key"), (cfa, b"a"), (cfa, b"b")] + ret = self.db.multi_get(query) + + self.assertEqual(b"v3", ret[query[0]]) + self.assertEqual(b"1", ret[query[1]]) + self.assertEqual(b"2", ret[query[2]]) + + def test_key_may_exists(self): + self.db.put((self.cf_a, b"a"), b'1') + + self.assertEqual( + (False, None), + self.db.key_may_exist((self.cf_a, b"x")) + ) + self.assertEqual( + (False, None), + self.db.key_may_exist((self.cf_a, b'x'), fetch=True) + ) + self.assertEqual( + (True, None), + self.db.key_may_exist((self.cf_a, b'a')) + ) + self.assertEqual( + (True, b'1'), + self.db.key_may_exist((self.cf_a, b'a'), fetch=True) + ) + + def test_iter_keys(self): + for x in range(300): + self.db.put((self.cf_a, int_to_bytes(x)), int_to_bytes(x)) + + it = self.db.iterkeys(self.cf_a) + self.assertEqual([], list(it)) + + it.seek_to_last() + self.assertEqual([(self.cf_a, b'99')], list(it)) + + ref = sorted([(self.cf_a, int_to_bytes(x)) for x in range(300)]) + it.seek_to_first() + self.assertEqual(ref, list(it)) + + it.seek(b'90') + ref = sorted([(self.cf_a, int_to_bytes(x)) for x in range(90, 100)]) + self.assertEqual(ref, list(it)) + + def test_iter_values(self): + for x in range(300): + self.db.put((self.cf_b, int_to_bytes(x)), int_to_bytes(x * 1000)) + + it = self.db.itervalues(self.cf_b) + self.assertEqual([], list(it)) + + it.seek_to_last() + self.assertEqual([b'99000'], list(it)) + + ref = sorted([int_to_bytes(x) for x in range(300)]) + ref = [int_to_bytes(int(x) * 1000) for x in ref] + it.seek_to_first() + self.assertEqual(ref, list(it)) + + it.seek(b'90') + ref = [int_to_bytes(x * 1000) for x in range(90, 100)] + self.assertEqual(ref, list(it)) + + def test_iter_items(self): + for x in range(300): + self.db.put((self.cf_b, int_to_bytes(x)), int_to_bytes(x * 1000)) + + it = self.db.iteritems(self.cf_b) + self.assertEqual([], list(it)) + + it.seek_to_last() + self.assertEqual([((self.cf_b, b'99'), b'99000')], list(it)) + + ref = sorted([int_to_bytes(x) for x in range(300)]) + ref = [((self.cf_b, x), int_to_bytes(int(x) * 1000)) for x in ref] + it.seek_to_first() + self.assertEqual(ref, list(it)) + + it.seek(b'90') + ref = [((self.cf_b, int_to_bytes(x)), int_to_bytes(x * 1000)) for x in range(90, 100)] + self.assertEqual(ref, list(it)) + + def test_reverse_iter(self): + for x in range(100): + self.db.put((self.cf_a, int_to_bytes(x)), int_to_bytes(x * 1000)) + + it = self.db.iteritems(self.cf_a) + it.seek_to_last() + + ref = reversed(sorted([(self.cf_a, int_to_bytes(x)) for x in range(100)])) + ref = [(x, int_to_bytes(int(x[1]) * 1000)) for x in ref] + + self.assertEqual(ref, list(reversed(it))) + + def test_snapshot(self): + cfa = self.db.get_column_family(b'A') + self.db.put((cfa, b"a"), b"1") + self.db.put((cfa, b"b"), b"2") + + snapshot = self.db.snapshot() + self.db.put((cfa, b"a"), b"2") + self.db.delete((cfa, b"b")) + + it = self.db.iteritems(cfa) + it.seek_to_first() + self.assertEqual({(cfa, b'a'): b'2'}, dict(it)) + + it = self.db.iteritems(cfa, snapshot=snapshot) + it.seek_to_first() + self.assertEqual({(cfa, b'a'): b'1', (cfa, b'b'): b'2'}, dict(it)) + + def test_get_property(self): + for x in range(300): + x = int_to_bytes(x) + self.db.put((self.cf_a, x), x) + + self.assertEqual(b"300", + self.db.get_property(b'rocksdb.estimate-num-keys', + self.cf_a)) + self.assertIsNone(self.db.get_property(b'does not exsits', + self.cf_a)) + + def test_compact_range(self): + for x in range(10000): + x = int_to_bytes(x) + self.db.put((self.cf_b, x), x) + + self.db.compact_range(column_family=self.cf_b) + diff --git a/rocksdb/tests/test_memtable.py b/rocksdb/tests/test_memtable.py index cb8cb80..aef12bf 100644 --- a/rocksdb/tests/test_memtable.py +++ b/rocksdb/tests/test_memtable.py @@ -3,16 +3,27 @@ import rocksdb import pytest import shutil import os +import tempfile def test_open_skiplist_memtable_factory(): opts = rocksdb.Options() opts.memtable_factory = rocksdb.SkipListMemtableFactory() opts.create_if_missing = True - test_db = rocksdb.DB("/tmp/test", opts) + + loc = tempfile.mkdtemp() + try: + test_db = rocksdb.DB(os.path.join(loc, "test"), opts) + finally: + shutil.rmtree(loc) + def test_open_vector_memtable_factory(): opts = rocksdb.Options() opts.allow_concurrent_memtable_write = False opts.memtable_factory = rocksdb.VectorMemtableFactory() opts.create_if_missing = True - test_db = rocksdb.DB("/tmp/test", opts) + loc = tempfile.mkdtemp() + try: + test_db = rocksdb.DB(os.path.join(loc, "test"), opts) + finally: + shutil.rmtree(loc) diff --git a/rocksdb/tests/test_options.py b/rocksdb/tests/test_options.py index 27a12b2..327f422 100644 --- a/rocksdb/tests/test_options.py +++ b/rocksdb/tests/test_options.py @@ -1,4 +1,5 @@ import unittest +import sys import rocksdb class TestFilterPolicy(rocksdb.interfaces.FilterPolicy): @@ -37,7 +38,7 @@ class TestOptions(unittest.TestCase): def test_compaction_pri(self): opts = rocksdb.Options() - # default compaction_pri + # default compaction_pri self.assertEqual(opts.compaction_pri, rocksdb.CompactionPri.by_compensated_size) opts.compaction_pri = rocksdb.CompactionPri.by_compensated_size self.assertEqual(opts.compaction_pri, rocksdb.CompactionPri.by_compensated_size) @@ -64,7 +65,8 @@ class TestOptions(unittest.TestCase): # default value self.assertEqual(isinstance(compression_opts, dict), True) self.assertEqual(compression_opts['window_bits'], -14) - self.assertEqual(compression_opts['level'], -1) + # This doesn't match rocksdb latest + # self.assertEqual(compression_opts['level'], -1) self.assertEqual(compression_opts['strategy'], 0) self.assertEqual(compression_opts['max_dict_bytes'], 0) @@ -132,7 +134,12 @@ class TestOptions(unittest.TestCase): opts.compaction_style = 'level' self.assertEqual('level', opts.compaction_style) - with self.assertRaisesRegexp(Exception, 'Unknown compaction style'): + if sys.version_info[0] == 3: + assertRaisesRegex = self.assertRaisesRegex + else: + assertRaisesRegex = self.assertRaisesRegexp + + with assertRaisesRegex(Exception, 'Unknown compaction style'): opts.compaction_style = 'foo' def test_compaction_opts_universal(self): diff --git a/setup.py b/setup.py index 83b5e00..3989ed2 100644 --- a/setup.py +++ b/setup.py @@ -42,5 +42,6 @@ setup( "doc": ['sphinx_rtd_theme', 'sphinx'], "test": ['pytest'], }, - include_package_data=True + include_package_data=True, + zip_safe=False, )