diff --git a/claimtrie/node/noderepo/pebble.go b/claimtrie/node/noderepo/pebble.go index 530a2271..8365931a 100644 --- a/claimtrie/node/noderepo/pebble.go +++ b/claimtrie/node/noderepo/pebble.go @@ -2,8 +2,10 @@ package noderepo import ( "bytes" + "io" "reflect" "sort" + "sync" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/claimtrie/change" @@ -77,9 +79,71 @@ func init() { msgpack.Register(wire.OutPoint{}, opEncoder, opDecoder) } +type pooledMerger struct { + values [][]byte + index []int + pool *sync.Pool + buffer []byte +} + +func (a *pooledMerger) Len() int { return len(a.index) } +func (a *pooledMerger) Less(i, j int) bool { return a.index[i] < a.index[j] } +func (a *pooledMerger) Swap(i, j int) { + a.index[i], a.index[j] = a.index[j], a.index[i] + a.values[i], a.values[j] = a.values[j], a.values[i] +} + +func (a *pooledMerger) MergeNewer(value []byte) error { + a.values = append(a.values, value) + a.index = append(a.index, len(a.values)) + return nil +} + +func (a *pooledMerger) MergeOlder(value []byte) error { + a.values = append(a.values, value) + a.index = append(a.index, -len(a.values)) + return nil +} + +func (a *pooledMerger) Finish(includesBase bool) ([]byte, io.Closer, error) { + sort.Sort(a) + + a.buffer = a.pool.Get().([]byte)[:0] + for i := range a.values { + a.buffer = append(a.buffer, a.values[i]...) + } + + return a.buffer, a, nil +} + +func (a *pooledMerger) Close() error { + a.pool.Put(a.buffer) + return nil +} + func NewPebble(path string) (*Pebble, error) { - db, err := pebble.Open(path, &pebble.Options{Cache: pebble.NewCache(32 << 20), BytesPerSync: 4 << 20}) + mp := &sync.Pool{ + New: func() interface{} { + return make([]byte, 0, 256) + }, + } + + db, err := pebble.Open(path, &pebble.Options{ + Merger: &pebble.Merger{ + Merge: func(key, value []byte) (pebble.ValueMerger, error) { + p := &pooledMerger{pool: mp} + return p, p.MergeNewer(value) + }, + Name: pebble.DefaultMerger.Name, // yes, it's a lie + }, + Cache: pebble.NewCache(32 << 20), + BytesPerSync: 4 << 20, + }) + if err != nil { + return nil, errors.Wrapf(err, "pebble open %s", path) + } + repo := &Pebble{db: db} return repo, errors.Wrapf(err, "unable to open %s", path) @@ -91,16 +155,22 @@ func (repo *Pebble) AppendChanges(changes []change.Change) error { batch := repo.db.NewBatch() defer batch.Close() - // TODO: switch to buffer pool and reuse encoder + enc := msgpack.GetEncoder() + defer msgpack.PutEncoder(enc) + + var buf bytes.Buffer + for _, chg := range changes { name := chg.Name chg.Name = nil // don't waste the storage space on this (annotation a better approach?) - value, err := msgpack.Marshal(chg) + buf.Reset() + enc.Reset(&buf) // do this every time through? + err := enc.Encode(chg) if err != nil { return errors.Wrap(err, "in marshaller") } - err = batch.Merge(name, value, pebble.NoSync) + err = batch.Merge(name, buf.Bytes(), pebble.NoSync) if err != nil { return errors.Wrap(err, "in merge") }