import unittest
import sys
import rocksdb

class TestFilterPolicy(rocksdb.interfaces.FilterPolicy):
    def create_filter(self, keys):
        return b'nix'

    def key_may_match(self, key, fil):
        return True

    def name(self):
        return b'testfilter'

class TestMergeOperator(rocksdb.interfaces.MergeOperator):
    def full_merge(self, *args, **kwargs):
        return (False, None)

    def partial_merge(self, *args, **kwargs):
        return (False, None)

    def name(self):
        return b'testmergeop'

class TestOptions(unittest.TestCase):
    #  def test_default_merge_operator(self):
        #  opts = rocksdb.Options()
        #  self.assertEqual(True, opts.paranoid_checks)
        #  opts.paranoid_checks = False
        #  self.assertEqual(False, opts.paranoid_checks)

        #  self.assertIsNone(opts.merge_operator)
        #  opts.merge_operator = "uint64add"
        #  self.assertIsNotNone(opts.merge_operator)
        #  self.assertEqual(opts.merge_operator, "uint64add")
        #  with self.assertRaises(TypeError):
            #  opts.merge_operator = "not an operator"

    # FIXME: travis test should include the latest version of rocksdb
    #  def test_compaction_pri(self):
        #  opts = rocksdb.Options()
        # default compaction_pri
        #  self.assertEqual(opts.compaction_pri, rocksdb.CompactionPri.by_compensated_size)
        #  self.assertEqual(opts.compaction_pri, rocksdb.CompactionPri.min_overlapping_ratio)
        #  opts.compaction_pri = rocksdb.CompactionPri.by_compensated_size
        #  self.assertEqual(opts.compaction_pri, rocksdb.CompactionPri.by_compensated_size)
        #  opts.compaction_pri = rocksdb.CompactionPri.oldest_largest_seq_first
        #  self.assertEqual(opts.compaction_pri, rocksdb.CompactionPri.oldest_largest_seq_first)
        #  opts.compaction_pri = rocksdb.CompactionPri.min_overlapping_ratio
        #  self.assertEqual(opts.compaction_pri, rocksdb.CompactionPri.min_overlapping_ratio)

    def test_enable_write_thread_adaptive_yield(self):
        opts = rocksdb.Options()
        self.assertEqual(opts.enable_write_thread_adaptive_yield, True)
        opts.enable_write_thread_adaptive_yield = False
        self.assertEqual(opts.enable_write_thread_adaptive_yield, False)

    def test_allow_concurrent_memtable_write(self):
        opts = rocksdb.Options()
        self.assertEqual(opts.allow_concurrent_memtable_write, True)
        opts.allow_concurrent_memtable_write = False
        self.assertEqual(opts.allow_concurrent_memtable_write, False)

    def test_compression_opts(self):
        opts = rocksdb.Options()
        compression_opts = opts.compression_opts
        # default value
        self.assertEqual(isinstance(compression_opts, dict), True)
        self.assertEqual(compression_opts['window_bits'], -14)
        # This doesn't match rocksdb latest
        # self.assertEqual(compression_opts['level'], -1)
        self.assertEqual(compression_opts['strategy'], 0)
        self.assertEqual(compression_opts['max_dict_bytes'], 0)

        with self.assertRaises(TypeError):
            opts.compression_opts = list(1,2)

        opts.compression_opts = {'window_bits': 1, 'level': 2, 'strategy': 3, 'max_dict_bytes': 4}
        compression_opts = opts.compression_opts
        self.assertEqual(compression_opts['window_bits'], 1)
        self.assertEqual(compression_opts['level'], 2)
        self.assertEqual(compression_opts['strategy'], 3)
        self.assertEqual(compression_opts['max_dict_bytes'], 4)

    def test_simple(self):
        opts = rocksdb.Options()
        self.assertEqual(True, opts.paranoid_checks)
        opts.paranoid_checks = False
        self.assertEqual(False, opts.paranoid_checks)

        self.assertIsNone(opts.merge_operator)
        ob = TestMergeOperator()
        opts.merge_operator = ob
        self.assertEqual(opts.merge_operator, ob)

        self.assertIsInstance(
            opts.comparator,
            rocksdb.BytewiseComparator)

        self.assertIn(opts.compression,
                      (rocksdb.CompressionType.no_compression,
                      rocksdb.CompressionType.snappy_compression))

        opts.compression = rocksdb.CompressionType.zstd_compression
        self.assertEqual(rocksdb.CompressionType.zstd_compression, opts.compression)

    def test_block_options(self):
        rocksdb.BlockBasedTableFactory(
            block_size=4096,
            filter_policy=TestFilterPolicy(),
            block_cache=rocksdb.LRUCache(100))

    def test_unicode_path(self):
        name = b'/tmp/M\xc3\xbcnchen'.decode('utf8')
        opts = rocksdb.Options()
        opts.db_log_dir = name
        opts.wal_dir = name

        self.assertEqual(name, opts.db_log_dir)
        self.assertEqual(name, opts.wal_dir)

    def test_table_factory(self):
        opts = rocksdb.Options()
        self.assertIsNone(opts.table_factory)

        opts.table_factory = rocksdb.BlockBasedTableFactory()
        opts.table_factory = rocksdb.PlainTableFactory()

    def test_compaction_style(self):
        opts = rocksdb.Options()
        self.assertEqual('level', opts.compaction_style)

        opts.compaction_style = 'universal'
        self.assertEqual('universal', opts.compaction_style)

        opts.compaction_style = 'level'
        self.assertEqual('level', opts.compaction_style)

        if sys.version_info[0] == 3:
            assertRaisesRegex = self.assertRaisesRegex
        else:
            assertRaisesRegex = self.assertRaisesRegexp

        with assertRaisesRegex(Exception, 'Unknown compaction style'):
            opts.compaction_style = 'foo'

    def test_compaction_opts_universal(self):
        opts = rocksdb.Options()
        uopts = opts.compaction_options_universal
        self.assertEqual(-1, uopts['compression_size_percent'])
        self.assertEqual(200, uopts['max_size_amplification_percent'])
        self.assertEqual('total_size', uopts['stop_style'])
        self.assertEqual(1, uopts['size_ratio'])
        self.assertEqual(2, uopts['min_merge_width'])
        self.assertGreaterEqual(4294967295, uopts['max_merge_width'])

        new_opts = {'stop_style': 'similar_size', 'max_merge_width': 30}
        opts.compaction_options_universal = new_opts
        uopts = opts.compaction_options_universal

        self.assertEqual(-1, uopts['compression_size_percent'])
        self.assertEqual(200, uopts['max_size_amplification_percent'])
        self.assertEqual('similar_size', uopts['stop_style'])
        self.assertEqual(1, uopts['size_ratio'])
        self.assertEqual(2, uopts['min_merge_width'])
        self.assertEqual(30, uopts['max_merge_width'])

    def test_row_cache(self):
        opts = rocksdb.Options()
        self.assertIsNone(opts.row_cache)
        opts.row_cache = cache = rocksdb.LRUCache(2*1024*1024)
        self.assertEqual(cache, opts.row_cache)