backup.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. "bufio"
  8. "bytes"
  9. "context"
  10. "encoding/binary"
  11. "fmt"
  12. "io"
  13. "google.golang.org/protobuf/proto"
  14. "github.com/dgraph-io/badger/v4/pb"
  15. "github.com/dgraph-io/badger/v4/y"
  16. "github.com/dgraph-io/ristretto/v2/z"
  17. )
  18. // flushThreshold determines when a buffer will be flushed. When performing a
  19. // backup/restore, the entries will be batched up until the total size of batch
  20. // is more than flushThreshold or entry size (without the value size) is more
  21. // than the maxBatchSize.
  22. const flushThreshold = 100 << 20
  23. // Backup dumps a protobuf-encoded list of all entries in the database into the
  24. // given writer, that are newer than or equal to the specified version. It
  25. // returns a timestamp (version) indicating the version of last entry that is
  26. // dumped, which after incrementing by 1 can be passed into later invocation to
  27. // generate incremental backup of entries that have been added/modified since
  28. // the last invocation of DB.Backup().
  29. // DB.Backup is a wrapper function over Stream.Backup to generate full and
  30. // incremental backups of the DB. For more control over how many goroutines are
  31. // used to generate the backup, or if you wish to backup only a certain range
  32. // of keys, use Stream.Backup directly.
  33. func (db *DB) Backup(w io.Writer, since uint64) (uint64, error) {
  34. stream := db.NewStream()
  35. stream.LogPrefix = "DB.Backup"
  36. stream.SinceTs = since
  37. return stream.Backup(w, since)
  38. }
  39. // Backup dumps a protobuf-encoded list of all entries in the database into the
  40. // given writer, that are newer than or equal to the specified version. It returns a
  41. // timestamp(version) indicating the version of last entry that was dumped, which
  42. // after incrementing by 1 can be passed into a later invocation to generate an
  43. // incremental dump of entries that have been added/modified since the last
  44. // invocation of Stream.Backup().
  45. //
  46. // This can be used to backup the data in a database at a given point in time.
  47. func (stream *Stream) Backup(w io.Writer, since uint64) (uint64, error) {
  48. stream.KeyToList = func(key []byte, itr *Iterator) (*pb.KVList, error) {
  49. list := &pb.KVList{}
  50. a := itr.Alloc
  51. for ; itr.Valid(); itr.Next() {
  52. item := itr.Item()
  53. if !bytes.Equal(item.Key(), key) {
  54. return list, nil
  55. }
  56. if item.Version() < since {
  57. return nil, fmt.Errorf("Backup: Item Version: %d less than sinceTs: %d",
  58. item.Version(), since)
  59. }
  60. var valCopy []byte
  61. if !item.IsDeletedOrExpired() {
  62. // No need to copy value, if item is deleted or expired.
  63. err := item.Value(func(val []byte) error {
  64. valCopy = a.Copy(val)
  65. return nil
  66. })
  67. if err != nil {
  68. stream.db.opt.Errorf("Key [%x, %d]. Error while fetching value [%v]\n",
  69. item.Key(), item.Version(), err)
  70. return nil, err
  71. }
  72. }
  73. // clear txn bits
  74. meta := item.meta &^ (bitTxn | bitFinTxn)
  75. kv := y.NewKV(a)
  76. *kv = pb.KV{
  77. Key: a.Copy(item.Key()),
  78. Value: valCopy,
  79. UserMeta: a.Copy([]byte{item.UserMeta()}),
  80. Version: item.Version(),
  81. ExpiresAt: item.ExpiresAt(),
  82. Meta: a.Copy([]byte{meta}),
  83. }
  84. list.Kv = append(list.Kv, kv)
  85. switch {
  86. case item.DiscardEarlierVersions():
  87. // If we need to discard earlier versions of this item, add a delete
  88. // marker just below the current version.
  89. list.Kv = append(list.Kv, &pb.KV{
  90. Key: item.KeyCopy(nil),
  91. Version: item.Version() - 1,
  92. Meta: []byte{bitDelete},
  93. })
  94. return list, nil
  95. case item.IsDeletedOrExpired():
  96. return list, nil
  97. }
  98. }
  99. return list, nil
  100. }
  101. var maxVersion uint64
  102. stream.Send = func(buf *z.Buffer) error {
  103. list, err := BufferToKVList(buf)
  104. if err != nil {
  105. return err
  106. }
  107. out := list.Kv[:0]
  108. for _, kv := range list.Kv {
  109. if maxVersion < kv.Version {
  110. maxVersion = kv.Version
  111. }
  112. if !kv.StreamDone {
  113. // Don't pick stream done changes.
  114. out = append(out, kv)
  115. }
  116. }
  117. list.Kv = out
  118. return writeTo(list, w)
  119. }
  120. if err := stream.Orchestrate(context.Background()); err != nil {
  121. return 0, err
  122. }
  123. return maxVersion, nil
  124. }
  125. func writeTo(list *pb.KVList, w io.Writer) error {
  126. if err := binary.Write(w, binary.LittleEndian, uint64(proto.Size(list))); err != nil {
  127. return err
  128. }
  129. buf, err := proto.Marshal(list)
  130. if err != nil {
  131. return err
  132. }
  133. _, err = w.Write(buf)
  134. return err
  135. }
  136. // KVLoader is used to write KVList objects in to badger. It can be used to restore a backup.
  137. type KVLoader struct {
  138. db *DB
  139. throttle *y.Throttle
  140. entries []*Entry
  141. entriesSize int64
  142. totalSize int64
  143. }
  144. // NewKVLoader returns a new instance of KVLoader.
  145. func (db *DB) NewKVLoader(maxPendingWrites int) *KVLoader {
  146. return &KVLoader{
  147. db: db,
  148. throttle: y.NewThrottle(maxPendingWrites),
  149. entries: make([]*Entry, 0, db.opt.maxBatchCount),
  150. }
  151. }
  152. // Set writes the key-value pair to the database.
  153. func (l *KVLoader) Set(kv *pb.KV) error {
  154. var userMeta, meta byte
  155. if len(kv.UserMeta) > 0 {
  156. userMeta = kv.UserMeta[0]
  157. }
  158. if len(kv.Meta) > 0 {
  159. meta = kv.Meta[0]
  160. }
  161. e := &Entry{
  162. Key: y.KeyWithTs(kv.Key, kv.Version),
  163. Value: kv.Value,
  164. UserMeta: userMeta,
  165. ExpiresAt: kv.ExpiresAt,
  166. meta: meta,
  167. }
  168. estimatedSize := e.estimateSizeAndSetThreshold(l.db.valueThreshold())
  169. // Flush entries if inserting the next entry would overflow the transactional limits.
  170. if int64(len(l.entries))+1 >= l.db.opt.maxBatchCount ||
  171. l.entriesSize+estimatedSize >= l.db.opt.maxBatchSize ||
  172. l.totalSize >= flushThreshold {
  173. if err := l.send(); err != nil {
  174. return err
  175. }
  176. }
  177. l.entries = append(l.entries, e)
  178. l.entriesSize += estimatedSize
  179. l.totalSize += estimatedSize + int64(len(e.Value))
  180. return nil
  181. }
  182. func (l *KVLoader) send() error {
  183. if err := l.throttle.Do(); err != nil {
  184. return err
  185. }
  186. if err := l.db.batchSetAsync(l.entries, func(err error) {
  187. l.throttle.Done(err)
  188. }); err != nil {
  189. return err
  190. }
  191. l.entries = make([]*Entry, 0, l.db.opt.maxBatchCount)
  192. l.entriesSize = 0
  193. l.totalSize = 0
  194. return nil
  195. }
  196. // Finish is meant to be called after all the key-value pairs have been loaded.
  197. func (l *KVLoader) Finish() error {
  198. if len(l.entries) > 0 {
  199. if err := l.send(); err != nil {
  200. return err
  201. }
  202. }
  203. return l.throttle.Finish()
  204. }
  205. // Load reads a protobuf-encoded list of all entries from a reader and writes
  206. // them to the database. This can be used to restore the database from a backup
  207. // made by calling DB.Backup(). If more complex logic is needed to restore a badger
  208. // backup, the KVLoader interface should be used instead.
  209. //
  210. // DB.Load() should be called on a database that is not running any other
  211. // concurrent transactions while it is running.
  212. func (db *DB) Load(r io.Reader, maxPendingWrites int) error {
  213. br := bufio.NewReaderSize(r, 16<<10)
  214. unmarshalBuf := make([]byte, 1<<10)
  215. ldr := db.NewKVLoader(maxPendingWrites)
  216. for {
  217. var sz uint64
  218. err := binary.Read(br, binary.LittleEndian, &sz)
  219. if err == io.EOF {
  220. break
  221. } else if err != nil {
  222. return err
  223. }
  224. if cap(unmarshalBuf) < int(sz) {
  225. unmarshalBuf = make([]byte, sz)
  226. }
  227. if _, err = io.ReadFull(br, unmarshalBuf[:sz]); err != nil {
  228. return err
  229. }
  230. list := &pb.KVList{}
  231. if err := proto.Unmarshal(unmarshalBuf[:sz], list); err != nil {
  232. return err
  233. }
  234. for _, kv := range list.Kv {
  235. if err := ldr.Set(kv); err != nil {
  236. return err
  237. }
  238. // Update nextTxnTs, memtable stores this
  239. // timestamp in badger head when flushed.
  240. if kv.Version >= db.orc.nextTxnTs {
  241. db.orc.nextTxnTs = kv.Version + 1
  242. }
  243. }
  244. }
  245. if err := ldr.Finish(); err != nil {
  246. return err
  247. }
  248. db.orc.txnMark.Done(db.orc.nextTxnTs - 1)
  249. return nil
  250. }