backup.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. "bufio"
  19. "bytes"
  20. "context"
  21. "encoding/binary"
  22. "io"
  23. "github.com/pkg/errors"
  24. "google.golang.org/protobuf/proto"
  25. "github.com/dgraph-io/badger/v4/pb"
  26. "github.com/dgraph-io/badger/v4/y"
  27. "github.com/dgraph-io/ristretto/v2/z"
  28. )
  29. // flushThreshold determines when a buffer will be flushed. When performing a
  30. // backup/restore, the entries will be batched up until the total size of batch
  31. // is more than flushThreshold or entry size (without the value size) is more
  32. // than the maxBatchSize.
  33. const flushThreshold = 100 << 20
  34. // Backup dumps a protobuf-encoded list of all entries in the database into the
  35. // given writer, that are newer than or equal to the specified version. It
  36. // returns a timestamp (version) indicating the version of last entry that is
  37. // dumped, which after incrementing by 1 can be passed into later invocation to
  38. // generate incremental backup of entries that have been added/modified since
  39. // the last invocation of DB.Backup().
  40. // DB.Backup is a wrapper function over Stream.Backup to generate full and
  41. // incremental backups of the DB. For more control over how many goroutines are
  42. // used to generate the backup, or if you wish to backup only a certain range
  43. // of keys, use Stream.Backup directly.
  44. func (db *DB) Backup(w io.Writer, since uint64) (uint64, error) {
  45. stream := db.NewStream()
  46. stream.LogPrefix = "DB.Backup"
  47. stream.SinceTs = since
  48. return stream.Backup(w, since)
  49. }
  50. // Backup dumps a protobuf-encoded list of all entries in the database into the
  51. // given writer, that are newer than or equal to the specified version. It returns a
  52. // timestamp(version) indicating the version of last entry that was dumped, which
  53. // after incrementing by 1 can be passed into a later invocation to generate an
  54. // incremental dump of entries that have been added/modified since the last
  55. // invocation of Stream.Backup().
  56. //
  57. // This can be used to backup the data in a database at a given point in time.
  58. func (stream *Stream) Backup(w io.Writer, since uint64) (uint64, error) {
  59. stream.KeyToList = func(key []byte, itr *Iterator) (*pb.KVList, error) {
  60. list := &pb.KVList{}
  61. a := itr.Alloc
  62. for ; itr.Valid(); itr.Next() {
  63. item := itr.Item()
  64. if !bytes.Equal(item.Key(), key) {
  65. return list, nil
  66. }
  67. if item.Version() < since {
  68. return nil, errors.Errorf("Backup: Item Version: %d less than sinceTs: %d",
  69. item.Version(), since)
  70. }
  71. var valCopy []byte
  72. if !item.IsDeletedOrExpired() {
  73. // No need to copy value, if item is deleted or expired.
  74. err := item.Value(func(val []byte) error {
  75. valCopy = a.Copy(val)
  76. return nil
  77. })
  78. if err != nil {
  79. stream.db.opt.Errorf("Key [%x, %d]. Error while fetching value [%v]\n",
  80. item.Key(), item.Version(), err)
  81. return nil, err
  82. }
  83. }
  84. // clear txn bits
  85. meta := item.meta &^ (bitTxn | bitFinTxn)
  86. kv := y.NewKV(a)
  87. *kv = pb.KV{
  88. Key: a.Copy(item.Key()),
  89. Value: valCopy,
  90. UserMeta: a.Copy([]byte{item.UserMeta()}),
  91. Version: item.Version(),
  92. ExpiresAt: item.ExpiresAt(),
  93. Meta: a.Copy([]byte{meta}),
  94. }
  95. list.Kv = append(list.Kv, kv)
  96. switch {
  97. case item.DiscardEarlierVersions():
  98. // If we need to discard earlier versions of this item, add a delete
  99. // marker just below the current version.
  100. list.Kv = append(list.Kv, &pb.KV{
  101. Key: item.KeyCopy(nil),
  102. Version: item.Version() - 1,
  103. Meta: []byte{bitDelete},
  104. })
  105. return list, nil
  106. case item.IsDeletedOrExpired():
  107. return list, nil
  108. }
  109. }
  110. return list, nil
  111. }
  112. var maxVersion uint64
  113. stream.Send = func(buf *z.Buffer) error {
  114. list, err := BufferToKVList(buf)
  115. if err != nil {
  116. return err
  117. }
  118. out := list.Kv[:0]
  119. for _, kv := range list.Kv {
  120. if maxVersion < kv.Version {
  121. maxVersion = kv.Version
  122. }
  123. if !kv.StreamDone {
  124. // Don't pick stream done changes.
  125. out = append(out, kv)
  126. }
  127. }
  128. list.Kv = out
  129. return writeTo(list, w)
  130. }
  131. if err := stream.Orchestrate(context.Background()); err != nil {
  132. return 0, err
  133. }
  134. return maxVersion, nil
  135. }
  136. func writeTo(list *pb.KVList, w io.Writer) error {
  137. if err := binary.Write(w, binary.LittleEndian, uint64(proto.Size(list))); err != nil {
  138. return err
  139. }
  140. buf, err := proto.Marshal(list)
  141. if err != nil {
  142. return err
  143. }
  144. _, err = w.Write(buf)
  145. return err
  146. }
  147. // KVLoader is used to write KVList objects in to badger. It can be used to restore a backup.
  148. type KVLoader struct {
  149. db *DB
  150. throttle *y.Throttle
  151. entries []*Entry
  152. entriesSize int64
  153. totalSize int64
  154. }
  155. // NewKVLoader returns a new instance of KVLoader.
  156. func (db *DB) NewKVLoader(maxPendingWrites int) *KVLoader {
  157. return &KVLoader{
  158. db: db,
  159. throttle: y.NewThrottle(maxPendingWrites),
  160. entries: make([]*Entry, 0, db.opt.maxBatchCount),
  161. }
  162. }
  163. // Set writes the key-value pair to the database.
  164. func (l *KVLoader) Set(kv *pb.KV) error {
  165. var userMeta, meta byte
  166. if len(kv.UserMeta) > 0 {
  167. userMeta = kv.UserMeta[0]
  168. }
  169. if len(kv.Meta) > 0 {
  170. meta = kv.Meta[0]
  171. }
  172. e := &Entry{
  173. Key: y.KeyWithTs(kv.Key, kv.Version),
  174. Value: kv.Value,
  175. UserMeta: userMeta,
  176. ExpiresAt: kv.ExpiresAt,
  177. meta: meta,
  178. }
  179. estimatedSize := e.estimateSizeAndSetThreshold(l.db.valueThreshold())
  180. // Flush entries if inserting the next entry would overflow the transactional limits.
  181. if int64(len(l.entries))+1 >= l.db.opt.maxBatchCount ||
  182. l.entriesSize+estimatedSize >= l.db.opt.maxBatchSize ||
  183. l.totalSize >= flushThreshold {
  184. if err := l.send(); err != nil {
  185. return err
  186. }
  187. }
  188. l.entries = append(l.entries, e)
  189. l.entriesSize += estimatedSize
  190. l.totalSize += estimatedSize + int64(len(e.Value))
  191. return nil
  192. }
  193. func (l *KVLoader) send() error {
  194. if err := l.throttle.Do(); err != nil {
  195. return err
  196. }
  197. if err := l.db.batchSetAsync(l.entries, func(err error) {
  198. l.throttle.Done(err)
  199. }); err != nil {
  200. return err
  201. }
  202. l.entries = make([]*Entry, 0, l.db.opt.maxBatchCount)
  203. l.entriesSize = 0
  204. l.totalSize = 0
  205. return nil
  206. }
  207. // Finish is meant to be called after all the key-value pairs have been loaded.
  208. func (l *KVLoader) Finish() error {
  209. if len(l.entries) > 0 {
  210. if err := l.send(); err != nil {
  211. return err
  212. }
  213. }
  214. return l.throttle.Finish()
  215. }
  216. // Load reads a protobuf-encoded list of all entries from a reader and writes
  217. // them to the database. This can be used to restore the database from a backup
  218. // made by calling DB.Backup(). If more complex logic is needed to restore a badger
  219. // backup, the KVLoader interface should be used instead.
  220. //
  221. // DB.Load() should be called on a database that is not running any other
  222. // concurrent transactions while it is running.
  223. func (db *DB) Load(r io.Reader, maxPendingWrites int) error {
  224. br := bufio.NewReaderSize(r, 16<<10)
  225. unmarshalBuf := make([]byte, 1<<10)
  226. ldr := db.NewKVLoader(maxPendingWrites)
  227. for {
  228. var sz uint64
  229. err := binary.Read(br, binary.LittleEndian, &sz)
  230. if err == io.EOF {
  231. break
  232. } else if err != nil {
  233. return err
  234. }
  235. if cap(unmarshalBuf) < int(sz) {
  236. unmarshalBuf = make([]byte, sz)
  237. }
  238. if _, err = io.ReadFull(br, unmarshalBuf[:sz]); err != nil {
  239. return err
  240. }
  241. list := &pb.KVList{}
  242. if err := proto.Unmarshal(unmarshalBuf[:sz], list); err != nil {
  243. return err
  244. }
  245. for _, kv := range list.Kv {
  246. if err := ldr.Set(kv); err != nil {
  247. return err
  248. }
  249. // Update nextTxnTs, memtable stores this
  250. // timestamp in badger head when flushed.
  251. if kv.Version >= db.orc.nextTxnTs {
  252. db.orc.nextTxnTs = kv.Version + 1
  253. }
  254. }
  255. }
  256. if err := ldr.Finish(); err != nil {
  257. return err
  258. }
  259. db.orc.txnMark.Done(db.orc.nextTxnTs - 1)
  260. return nil
  261. }