stream_writer.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. /*
  2. * Copyright 2019 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. "encoding/hex"
  19. "fmt"
  20. "sync"
  21. humanize "github.com/dustin/go-humanize"
  22. "github.com/pkg/errors"
  23. "google.golang.org/protobuf/proto"
  24. "github.com/dgraph-io/badger/v4/pb"
  25. "github.com/dgraph-io/badger/v4/table"
  26. "github.com/dgraph-io/badger/v4/y"
  27. "github.com/dgraph-io/ristretto/v2/z"
  28. )
  29. // StreamWriter is used to write data coming from multiple streams. The streams must not have any
  30. // overlapping key ranges. Within each stream, the keys must be sorted. Badger Stream framework is
  31. // capable of generating such an output. So, this StreamWriter can be used at the other end to build
  32. // BadgerDB at a much faster pace by writing SSTables (and value logs) directly to LSM tree levels
  33. // without causing any compactions at all. This is way faster than using batched writer or using
  34. // transactions, but only applicable in situations where the keys are pre-sorted and the DB is being
  35. // bootstrapped. Existing data would get deleted when using this writer. So, this is only useful
  36. // when restoring from backup or replicating DB across servers.
  37. //
  38. // StreamWriter should not be called on in-use DB instances. It is designed only to bootstrap new
  39. // DBs.
  40. type StreamWriter struct {
  41. writeLock sync.Mutex
  42. db *DB
  43. done func()
  44. throttle *y.Throttle
  45. maxVersion uint64
  46. writers map[uint32]*sortedWriter
  47. prevLevel int
  48. }
  49. // NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be
  50. // called. The memory usage of a StreamWriter is directly proportional to the number of streams
  51. // possible. So, efforts must be made to keep the number of streams low. Stream framework would
  52. // typically use 16 goroutines and hence create 16 streams.
  53. func (db *DB) NewStreamWriter() *StreamWriter {
  54. return &StreamWriter{
  55. db: db,
  56. // throttle shouldn't make much difference. Memory consumption is based on the number of
  57. // concurrent streams being processed.
  58. throttle: y.NewThrottle(16),
  59. writers: make(map[uint32]*sortedWriter),
  60. }
  61. }
  62. // Prepare should be called before writing any entry to StreamWriter. It deletes all data present in
  63. // existing DB, stops compactions and any writes being done by other means. Be very careful when
  64. // calling Prepare, because it could result in permanent data loss. Not calling Prepare would result
  65. // in a corrupt Badger instance. Use PrepareIncremental to do incremental stream write.
  66. func (sw *StreamWriter) Prepare() error {
  67. sw.writeLock.Lock()
  68. defer sw.writeLock.Unlock()
  69. done, err := sw.db.dropAll()
  70. // Ensure that done() is never called more than once.
  71. var once sync.Once
  72. sw.done = func() { once.Do(done) }
  73. return err
  74. }
  75. // PrepareIncremental should be called before writing any entry to StreamWriter incrementally.
  76. // In incremental stream write, the tables are written at one level above the current base level.
  77. func (sw *StreamWriter) PrepareIncremental() error {
  78. sw.writeLock.Lock()
  79. defer sw.writeLock.Unlock()
  80. // Ensure that done() is never called more than once.
  81. var once sync.Once
  82. // prepareToDrop will stop all the incoming writes and process any pending flush tasks.
  83. // Before we start writing, we'll stop the compactions because no one else should be writing to
  84. // the same level as the stream writer is writing to.
  85. f, err := sw.db.prepareToDrop()
  86. if err != nil {
  87. sw.done = func() { once.Do(f) }
  88. return err
  89. }
  90. sw.db.stopCompactions()
  91. done := func() {
  92. sw.db.startCompactions()
  93. f()
  94. }
  95. sw.done = func() { once.Do(done) }
  96. mts, decr := sw.db.getMemTables()
  97. defer decr()
  98. for _, m := range mts {
  99. if !m.sl.Empty() {
  100. return fmt.Errorf("Unable to do incremental writes because MemTable has data")
  101. }
  102. }
  103. isEmptyDB := true
  104. for _, level := range sw.db.Levels() {
  105. if level.NumTables > 0 {
  106. sw.prevLevel = level.Level
  107. isEmptyDB = false
  108. break
  109. }
  110. }
  111. if isEmptyDB {
  112. // If DB is empty, we should allow doing incremental stream write.
  113. return nil
  114. }
  115. if sw.prevLevel == 0 {
  116. // It seems that data is present in all levels from Lmax to L0. If we call flatten
  117. // on the tree, all the data will go to Lmax. All the levels above will be empty
  118. // after flatten call. Now, we should be able to use incremental stream writer again.
  119. if err := sw.db.Flatten(3); err != nil {
  120. return errors.Wrapf(err, "error during flatten in StreamWriter")
  121. }
  122. sw.prevLevel = len(sw.db.Levels()) - 1
  123. }
  124. return nil
  125. }
  126. // Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter
  127. // would use to demux the writes. Write is thread safe and can be called concurrently by multiple
  128. // goroutines.
  129. func (sw *StreamWriter) Write(buf *z.Buffer) error {
  130. if buf.LenNoPadding() == 0 {
  131. return nil
  132. }
  133. // closedStreams keeps track of all streams which are going to be marked as done. We are
  134. // keeping track of all streams so that we can close them at the end, after inserting all
  135. // the valid kvs.
  136. closedStreams := make(map[uint32]struct{})
  137. streamReqs := make(map[uint32]*request)
  138. err := buf.SliceIterate(func(s []byte) error {
  139. var kv pb.KV
  140. if err := proto.Unmarshal(s, &kv); err != nil {
  141. return err
  142. }
  143. if kv.StreamDone {
  144. closedStreams[kv.StreamId] = struct{}{}
  145. return nil
  146. }
  147. // Panic if some kv comes after stream has been marked as closed.
  148. if _, ok := closedStreams[kv.StreamId]; ok {
  149. panic(fmt.Sprintf("write performed on closed stream: %d", kv.StreamId))
  150. }
  151. sw.writeLock.Lock()
  152. if sw.maxVersion < kv.Version {
  153. sw.maxVersion = kv.Version
  154. }
  155. if sw.prevLevel == 0 {
  156. // If prevLevel is 0, that means that we have not written anything yet.
  157. // So, we can write to the maxLevel. newWriter writes to prevLevel - 1,
  158. // so we can set prevLevel to len(levels).
  159. sw.prevLevel = len(sw.db.lc.levels)
  160. }
  161. sw.writeLock.Unlock()
  162. var meta, userMeta byte
  163. if len(kv.Meta) > 0 {
  164. meta = kv.Meta[0]
  165. }
  166. if len(kv.UserMeta) > 0 {
  167. userMeta = kv.UserMeta[0]
  168. }
  169. e := &Entry{
  170. Key: y.KeyWithTs(kv.Key, kv.Version),
  171. Value: y.Copy(kv.Value),
  172. UserMeta: userMeta,
  173. ExpiresAt: kv.ExpiresAt,
  174. meta: meta,
  175. }
  176. // If the value can be collocated with the key in LSM tree, we can skip
  177. // writing the value to value log.
  178. req := streamReqs[kv.StreamId]
  179. if req == nil {
  180. req = &request{}
  181. streamReqs[kv.StreamId] = req
  182. }
  183. req.Entries = append(req.Entries, e)
  184. return nil
  185. })
  186. if err != nil {
  187. return err
  188. }
  189. all := make([]*request, 0, len(streamReqs))
  190. for _, req := range streamReqs {
  191. all = append(all, req)
  192. }
  193. sw.writeLock.Lock()
  194. defer sw.writeLock.Unlock()
  195. // We are writing all requests to vlog even if some request belongs to already closed stream.
  196. // It is safe to do because we are panicking while writing to sorted writer, which will be nil
  197. // for closed stream. At restart, stream writer will drop all the data in Prepare function.
  198. if err := sw.db.vlog.write(all); err != nil {
  199. return err
  200. }
  201. for streamID, req := range streamReqs {
  202. writer, ok := sw.writers[streamID]
  203. if !ok {
  204. var err error
  205. writer, err = sw.newWriter(streamID)
  206. if err != nil {
  207. return y.Wrapf(err, "failed to create writer with ID %d", streamID)
  208. }
  209. sw.writers[streamID] = writer
  210. }
  211. if writer == nil {
  212. panic(fmt.Sprintf("write performed on closed stream: %d", streamID))
  213. }
  214. writer.reqCh <- req
  215. }
  216. // Now we can close any streams if required. We will make writer for
  217. // the closed streams as nil.
  218. for streamId := range closedStreams {
  219. writer, ok := sw.writers[streamId]
  220. if !ok {
  221. sw.db.opt.Warningf("Trying to close stream: %d, but no sorted "+
  222. "writer found for it", streamId)
  223. continue
  224. }
  225. writer.closer.SignalAndWait()
  226. if err := writer.Done(); err != nil {
  227. return err
  228. }
  229. sw.writers[streamId] = nil
  230. }
  231. return nil
  232. }
  233. // Flush is called once we are done writing all the entries. It syncs DB directories. It also
  234. // updates Oracle with maxVersion found in all entries (if DB is not managed).
  235. func (sw *StreamWriter) Flush() error {
  236. sw.writeLock.Lock()
  237. defer sw.writeLock.Unlock()
  238. defer sw.done()
  239. for _, writer := range sw.writers {
  240. if writer != nil {
  241. writer.closer.SignalAndWait()
  242. }
  243. }
  244. for _, writer := range sw.writers {
  245. if writer == nil {
  246. continue
  247. }
  248. if err := writer.Done(); err != nil {
  249. return err
  250. }
  251. }
  252. if !sw.db.opt.managedTxns {
  253. if sw.db.orc != nil {
  254. sw.db.orc.Stop()
  255. }
  256. if curMax := sw.db.orc.readTs(); curMax >= sw.maxVersion {
  257. sw.maxVersion = curMax
  258. }
  259. sw.db.orc = newOracle(sw.db.opt)
  260. sw.db.orc.nextTxnTs = sw.maxVersion
  261. sw.db.orc.txnMark.Done(sw.maxVersion)
  262. sw.db.orc.readMark.Done(sw.maxVersion)
  263. sw.db.orc.incrementNextTs()
  264. }
  265. // Wait for all files to be written.
  266. if err := sw.throttle.Finish(); err != nil {
  267. return err
  268. }
  269. // Sort tables at the end.
  270. for _, l := range sw.db.lc.levels {
  271. l.sortTables()
  272. }
  273. // Now sync the directories, so all the files are registered.
  274. if sw.db.opt.ValueDir != sw.db.opt.Dir {
  275. if err := sw.db.syncDir(sw.db.opt.ValueDir); err != nil {
  276. return err
  277. }
  278. }
  279. if err := sw.db.syncDir(sw.db.opt.Dir); err != nil {
  280. return err
  281. }
  282. return sw.db.lc.validate()
  283. }
  284. // Cancel signals all goroutines to exit. Calling defer sw.Cancel() immediately after creating a new StreamWriter
  285. // ensures that writes are unblocked even upon early return. Note that dropAll() is not called here, so any
  286. // partially written data will not be erased until a new StreamWriter is initialized.
  287. func (sw *StreamWriter) Cancel() {
  288. sw.writeLock.Lock()
  289. defer sw.writeLock.Unlock()
  290. for _, writer := range sw.writers {
  291. if writer != nil {
  292. writer.closer.Signal()
  293. }
  294. }
  295. for _, writer := range sw.writers {
  296. if writer != nil {
  297. writer.closer.Wait()
  298. }
  299. }
  300. if err := sw.throttle.Finish(); err != nil {
  301. sw.db.opt.Errorf("error in throttle.Finish: %+v", err)
  302. }
  303. // Handle Cancel() being called before Prepare().
  304. if sw.done != nil {
  305. sw.done()
  306. }
  307. }
  308. type sortedWriter struct {
  309. db *DB
  310. throttle *y.Throttle
  311. opts table.Options
  312. builder *table.Builder
  313. lastKey []byte
  314. level int
  315. streamID uint32
  316. reqCh chan *request
  317. // Have separate closer for each writer, as it can be closed at any time.
  318. closer *z.Closer
  319. }
  320. func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) {
  321. bopts := buildTableOptions(sw.db)
  322. for i := 2; i < sw.db.opt.MaxLevels; i++ {
  323. bopts.TableSize *= uint64(sw.db.opt.TableSizeMultiplier)
  324. }
  325. w := &sortedWriter{
  326. db: sw.db,
  327. opts: bopts,
  328. streamID: streamID,
  329. throttle: sw.throttle,
  330. builder: table.NewTableBuilder(bopts),
  331. reqCh: make(chan *request, 3),
  332. closer: z.NewCloser(1),
  333. level: sw.prevLevel - 1, // Write at the level just above the one we were writing to.
  334. }
  335. go w.handleRequests()
  336. return w, nil
  337. }
  338. func (w *sortedWriter) handleRequests() {
  339. defer w.closer.Done()
  340. process := func(req *request) {
  341. for i, e := range req.Entries {
  342. // If badger is running in InMemory mode, len(req.Ptrs) == 0.
  343. var vs y.ValueStruct
  344. if e.skipVlogAndSetThreshold(w.db.valueThreshold()) {
  345. vs = y.ValueStruct{
  346. Value: e.Value,
  347. Meta: e.meta,
  348. UserMeta: e.UserMeta,
  349. ExpiresAt: e.ExpiresAt,
  350. }
  351. } else {
  352. vptr := req.Ptrs[i]
  353. vs = y.ValueStruct{
  354. Value: vptr.Encode(),
  355. Meta: e.meta | bitValuePointer,
  356. UserMeta: e.UserMeta,
  357. ExpiresAt: e.ExpiresAt,
  358. }
  359. }
  360. if err := w.Add(e.Key, vs); err != nil {
  361. panic(err)
  362. }
  363. }
  364. }
  365. for {
  366. select {
  367. case req := <-w.reqCh:
  368. process(req)
  369. case <-w.closer.HasBeenClosed():
  370. close(w.reqCh)
  371. for req := range w.reqCh {
  372. process(req)
  373. }
  374. return
  375. }
  376. }
  377. }
  378. // Add adds key and vs to sortedWriter.
  379. func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error {
  380. if len(w.lastKey) > 0 && y.CompareKeys(key, w.lastKey) <= 0 {
  381. return errors.Errorf("keys not in sorted order (last key: %s, key: %s)",
  382. hex.Dump(w.lastKey), hex.Dump(key))
  383. }
  384. sameKey := y.SameKey(key, w.lastKey)
  385. // Same keys should go into the same SSTable.
  386. if !sameKey && w.builder.ReachedCapacity() {
  387. if err := w.send(false); err != nil {
  388. return err
  389. }
  390. }
  391. w.lastKey = y.SafeCopy(w.lastKey, key)
  392. var vp valuePointer
  393. if vs.Meta&bitValuePointer > 0 {
  394. vp.Decode(vs.Value)
  395. }
  396. w.builder.Add(key, vs, vp.Len)
  397. return nil
  398. }
  399. func (w *sortedWriter) send(done bool) error {
  400. if err := w.throttle.Do(); err != nil {
  401. return err
  402. }
  403. go func(builder *table.Builder) {
  404. err := w.createTable(builder)
  405. w.throttle.Done(err)
  406. }(w.builder)
  407. // If done is true, this indicates we can close the writer.
  408. // No need to allocate underlying TableBuilder now.
  409. if done {
  410. w.builder = nil
  411. return nil
  412. }
  413. w.builder = table.NewTableBuilder(w.opts)
  414. return nil
  415. }
  416. // Done is called once we are done writing all keys and valueStructs
  417. // to sortedWriter. It completes writing current SST to disk.
  418. func (w *sortedWriter) Done() error {
  419. if w.builder.Empty() {
  420. w.builder.Close()
  421. // Assign builder as nil, so that underlying memory can be garbage collected.
  422. w.builder = nil
  423. return nil
  424. }
  425. return w.send(true)
  426. }
  427. func (w *sortedWriter) createTable(builder *table.Builder) error {
  428. defer builder.Close()
  429. if builder.Empty() {
  430. builder.Finish()
  431. return nil
  432. }
  433. fileID := w.db.lc.reserveFileID()
  434. var tbl *table.Table
  435. if w.db.opt.InMemory {
  436. data := builder.Finish()
  437. var err error
  438. if tbl, err = table.OpenInMemoryTable(data, fileID, builder.Opts()); err != nil {
  439. return err
  440. }
  441. } else {
  442. var err error
  443. fname := table.NewFilename(fileID, w.db.opt.Dir)
  444. if tbl, err = table.CreateTable(fname, builder); err != nil {
  445. return err
  446. }
  447. }
  448. lc := w.db.lc
  449. lhandler := lc.levels[w.level]
  450. // Now that table can be opened successfully, let's add this to the MANIFEST.
  451. change := &pb.ManifestChange{
  452. Id: tbl.ID(),
  453. KeyId: tbl.KeyID(),
  454. Op: pb.ManifestChange_CREATE,
  455. Level: uint32(lhandler.level),
  456. Compression: uint32(tbl.CompressionType()),
  457. }
  458. if err := w.db.manifest.addChanges([]*pb.ManifestChange{change}); err != nil {
  459. return err
  460. }
  461. // We are not calling lhandler.replaceTables() here, as it sorts tables on every addition.
  462. // We can sort all tables only once during Flush() call.
  463. lhandler.addTable(tbl)
  464. // Release the ref held by OpenTable.
  465. _ = tbl.DecrRef()
  466. w.db.opt.Infof("Table created: %d at level: %d for stream: %d. Size: %s\n",
  467. fileID, lhandler.level, w.streamID, humanize.IBytes(uint64(tbl.Size())))
  468. return nil
  469. }