txn.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. "bytes"
  8. "context"
  9. "encoding/hex"
  10. "errors"
  11. "fmt"
  12. "math"
  13. "sort"
  14. "strconv"
  15. "sync"
  16. "sync/atomic"
  17. "github.com/dgraph-io/badger/v4/y"
  18. "github.com/dgraph-io/ristretto/v2/z"
  19. )
  20. type oracle struct {
  21. isManaged bool // Does not change value, so no locking required.
  22. detectConflicts bool // Determines if the txns should be checked for conflicts.
  23. sync.Mutex // For nextTxnTs and commits.
  24. // writeChLock lock is for ensuring that transactions go to the write
  25. // channel in the same order as their commit timestamps.
  26. writeChLock sync.Mutex
  27. nextTxnTs uint64
  28. // Used to block NewTransaction, so all previous commits are visible to a new read.
  29. txnMark *y.WaterMark
  30. // Either of these is used to determine which versions can be permanently
  31. // discarded during compaction.
  32. discardTs uint64 // Used by ManagedDB.
  33. readMark *y.WaterMark // Used by DB.
  34. // committedTxns contains all committed writes (contains fingerprints
  35. // of keys written and their latest commit counter).
  36. committedTxns []committedTxn
  37. lastCleanupTs uint64
  38. // closer is used to stop watermarks.
  39. closer *z.Closer
  40. }
  41. type committedTxn struct {
  42. ts uint64
  43. // ConflictKeys Keeps track of the entries written at timestamp ts.
  44. conflictKeys map[uint64]struct{}
  45. }
  46. func newOracle(opt Options) *oracle {
  47. orc := &oracle{
  48. isManaged: opt.managedTxns,
  49. detectConflicts: opt.DetectConflicts,
  50. // We're not initializing nextTxnTs and readOnlyTs. It would be done after replay in Open.
  51. //
  52. // WaterMarks must be 64-bit aligned for atomic package, hence we must use pointers here.
  53. // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
  54. readMark: &y.WaterMark{Name: "badger.PendingReads"},
  55. txnMark: &y.WaterMark{Name: "badger.TxnTimestamp"},
  56. closer: z.NewCloser(2),
  57. }
  58. orc.readMark.Init(orc.closer)
  59. orc.txnMark.Init(orc.closer)
  60. return orc
  61. }
  62. func (o *oracle) Stop() {
  63. o.closer.SignalAndWait()
  64. }
  65. func (o *oracle) readTs() uint64 {
  66. if o.isManaged {
  67. panic("ReadTs should not be retrieved for managed DB")
  68. }
  69. var readTs uint64
  70. o.Lock()
  71. readTs = o.nextTxnTs - 1
  72. o.readMark.Begin(readTs)
  73. o.Unlock()
  74. // Wait for all txns which have no conflicts, have been assigned a commit
  75. // timestamp and are going through the write to value log and LSM tree
  76. // process. Not waiting here could mean that some txns which have been
  77. // committed would not be read.
  78. y.Check(o.txnMark.WaitForMark(context.Background(), readTs))
  79. return readTs
  80. }
  81. func (o *oracle) nextTs() uint64 {
  82. o.Lock()
  83. defer o.Unlock()
  84. return o.nextTxnTs
  85. }
  86. func (o *oracle) incrementNextTs() {
  87. o.Lock()
  88. defer o.Unlock()
  89. o.nextTxnTs++
  90. }
  91. // Any deleted or invalid versions at or below ts would be discarded during
  92. // compaction to reclaim disk space in LSM tree and thence value log.
  93. func (o *oracle) setDiscardTs(ts uint64) {
  94. o.Lock()
  95. defer o.Unlock()
  96. o.discardTs = ts
  97. o.cleanupCommittedTransactions()
  98. }
  99. func (o *oracle) discardAtOrBelow() uint64 {
  100. if o.isManaged {
  101. o.Lock()
  102. defer o.Unlock()
  103. return o.discardTs
  104. }
  105. return o.readMark.DoneUntil()
  106. }
  107. // hasConflict must be called while having a lock.
  108. func (o *oracle) hasConflict(txn *Txn) bool {
  109. if len(txn.reads) == 0 {
  110. return false
  111. }
  112. for _, committedTxn := range o.committedTxns {
  113. // If the committedTxn.ts is less than txn.readTs that implies that the
  114. // committedTxn finished before the current transaction started.
  115. // We don't need to check for conflict in that case.
  116. // This change assumes linearizability. Lack of linearizability could
  117. // cause the read ts of a new txn to be lower than the commit ts of
  118. // a txn before it (@mrjn).
  119. if committedTxn.ts <= txn.readTs {
  120. continue
  121. }
  122. for _, ro := range txn.reads {
  123. if _, has := committedTxn.conflictKeys[ro]; has {
  124. return true
  125. }
  126. }
  127. }
  128. return false
  129. }
  130. func (o *oracle) newCommitTs(txn *Txn) (uint64, bool) {
  131. o.Lock()
  132. defer o.Unlock()
  133. if o.hasConflict(txn) {
  134. return 0, true
  135. }
  136. var ts uint64
  137. if !o.isManaged {
  138. o.doneRead(txn)
  139. o.cleanupCommittedTransactions()
  140. // This is the general case, when user doesn't specify the read and commit ts.
  141. ts = o.nextTxnTs
  142. o.nextTxnTs++
  143. o.txnMark.Begin(ts)
  144. } else {
  145. // If commitTs is set, use it instead.
  146. ts = txn.commitTs
  147. }
  148. y.AssertTrue(ts >= o.lastCleanupTs)
  149. if o.detectConflicts {
  150. // We should ensure that txns are not added to o.committedTxns slice when
  151. // conflict detection is disabled otherwise this slice would keep growing.
  152. o.committedTxns = append(o.committedTxns, committedTxn{
  153. ts: ts,
  154. conflictKeys: txn.conflictKeys,
  155. })
  156. }
  157. return ts, false
  158. }
  159. func (o *oracle) doneRead(txn *Txn) {
  160. if !txn.doneRead {
  161. txn.doneRead = true
  162. o.readMark.Done(txn.readTs)
  163. }
  164. }
  165. func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock
  166. if !o.detectConflicts {
  167. // When detectConflicts is set to false, we do not store any
  168. // committedTxns and so there's nothing to clean up.
  169. return
  170. }
  171. // Same logic as discardAtOrBelow but unlocked
  172. var maxReadTs uint64
  173. if o.isManaged {
  174. maxReadTs = o.discardTs
  175. } else {
  176. maxReadTs = o.readMark.DoneUntil()
  177. }
  178. y.AssertTrue(maxReadTs >= o.lastCleanupTs)
  179. // do not run clean up if the maxReadTs (read timestamp of the
  180. // oldest transaction that is still in flight) has not increased
  181. if maxReadTs == o.lastCleanupTs {
  182. return
  183. }
  184. o.lastCleanupTs = maxReadTs
  185. tmp := o.committedTxns[:0]
  186. for _, txn := range o.committedTxns {
  187. if txn.ts <= maxReadTs {
  188. continue
  189. }
  190. tmp = append(tmp, txn)
  191. }
  192. o.committedTxns = tmp
  193. }
  194. func (o *oracle) doneCommit(cts uint64) {
  195. if o.isManaged {
  196. // No need to update anything.
  197. return
  198. }
  199. o.txnMark.Done(cts)
  200. }
  201. // Txn represents a Badger transaction.
  202. type Txn struct {
  203. readTs uint64
  204. commitTs uint64
  205. size int64
  206. count int64
  207. db *DB
  208. reads []uint64 // contains fingerprints of keys read.
  209. // contains fingerprints of keys written. This is used for conflict detection.
  210. conflictKeys map[uint64]struct{}
  211. readsLock sync.Mutex // guards the reads slice. See addReadKey.
  212. pendingWrites map[string]*Entry // cache stores any writes done by txn.
  213. duplicateWrites []*Entry // Used in managed mode to store duplicate entries.
  214. numIterators atomic.Int32
  215. discarded bool
  216. doneRead bool
  217. update bool // update is used to conditionally keep track of reads.
  218. }
  219. type pendingWritesIterator struct {
  220. entries []*Entry
  221. nextIdx int
  222. readTs uint64
  223. reversed bool
  224. }
  225. func (pi *pendingWritesIterator) Next() {
  226. pi.nextIdx++
  227. }
  228. func (pi *pendingWritesIterator) Rewind() {
  229. pi.nextIdx = 0
  230. }
  231. func (pi *pendingWritesIterator) Seek(key []byte) {
  232. key = y.ParseKey(key)
  233. pi.nextIdx = sort.Search(len(pi.entries), func(idx int) bool {
  234. cmp := bytes.Compare(pi.entries[idx].Key, key)
  235. if !pi.reversed {
  236. return cmp >= 0
  237. }
  238. return cmp <= 0
  239. })
  240. }
  241. func (pi *pendingWritesIterator) Key() []byte {
  242. y.AssertTrue(pi.Valid())
  243. entry := pi.entries[pi.nextIdx]
  244. return y.KeyWithTs(entry.Key, pi.readTs)
  245. }
  246. func (pi *pendingWritesIterator) Value() y.ValueStruct {
  247. y.AssertTrue(pi.Valid())
  248. entry := pi.entries[pi.nextIdx]
  249. return y.ValueStruct{
  250. Value: entry.Value,
  251. Meta: entry.meta,
  252. UserMeta: entry.UserMeta,
  253. ExpiresAt: entry.ExpiresAt,
  254. Version: pi.readTs,
  255. }
  256. }
  257. func (pi *pendingWritesIterator) Valid() bool {
  258. return pi.nextIdx < len(pi.entries)
  259. }
  260. func (pi *pendingWritesIterator) Close() error {
  261. return nil
  262. }
  263. func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator {
  264. if !txn.update || len(txn.pendingWrites) == 0 {
  265. return nil
  266. }
  267. entries := make([]*Entry, 0, len(txn.pendingWrites))
  268. for _, e := range txn.pendingWrites {
  269. entries = append(entries, e)
  270. }
  271. // Number of pending writes per transaction shouldn't be too big in general.
  272. sort.Slice(entries, func(i, j int) bool {
  273. cmp := bytes.Compare(entries[i].Key, entries[j].Key)
  274. if !reversed {
  275. return cmp < 0
  276. }
  277. return cmp > 0
  278. })
  279. return &pendingWritesIterator{
  280. readTs: txn.readTs,
  281. entries: entries,
  282. reversed: reversed,
  283. }
  284. }
  285. func (txn *Txn) checkSize(e *Entry) error {
  286. count := txn.count + 1
  287. // Extra bytes for the version in key.
  288. size := txn.size + e.estimateSizeAndSetThreshold(txn.db.valueThreshold()) + 10
  289. if count >= txn.db.opt.maxBatchCount || size >= txn.db.opt.maxBatchSize {
  290. return ErrTxnTooBig
  291. }
  292. txn.count, txn.size = count, size
  293. return nil
  294. }
  295. func exceedsSize(prefix string, max int64, key []byte) error {
  296. return fmt.Errorf("%s with size %d exceeded %d limit. %s:\n%s",
  297. prefix, len(key), max, prefix, hex.Dump(key[:1<<10]))
  298. }
  299. func (txn *Txn) modify(e *Entry) error {
  300. const maxKeySize = 65000
  301. switch {
  302. case !txn.update:
  303. return ErrReadOnlyTxn
  304. case txn.discarded:
  305. return ErrDiscardedTxn
  306. case len(e.Key) == 0:
  307. return ErrEmptyKey
  308. case bytes.HasPrefix(e.Key, badgerPrefix):
  309. return ErrInvalidKey
  310. case len(e.Key) > maxKeySize:
  311. // Key length can't be more than uint16, as determined by table::header. To
  312. // keep things safe and allow badger move prefix and a timestamp suffix, let's
  313. // cut it down to 65000, instead of using 65536.
  314. return exceedsSize("Key", maxKeySize, e.Key)
  315. case int64(len(e.Value)) > txn.db.opt.ValueLogFileSize:
  316. return exceedsSize("Value", txn.db.opt.ValueLogFileSize, e.Value)
  317. case txn.db.opt.InMemory && int64(len(e.Value)) > txn.db.valueThreshold():
  318. return exceedsSize("Value", txn.db.valueThreshold(), e.Value)
  319. }
  320. if err := txn.db.isBanned(e.Key); err != nil {
  321. return err
  322. }
  323. if err := txn.checkSize(e); err != nil {
  324. return err
  325. }
  326. // The txn.conflictKeys is used for conflict detection. If conflict detection
  327. // is disabled, we don't need to store key hashes in this map.
  328. if txn.db.opt.DetectConflicts {
  329. fp := z.MemHash(e.Key) // Avoid dealing with byte arrays.
  330. txn.conflictKeys[fp] = struct{}{}
  331. }
  332. // If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice.
  333. // Add the entry to duplicateWrites only if both the entries have different versions. For
  334. // same versions, we will overwrite the existing entry.
  335. if oldEntry, ok := txn.pendingWrites[string(e.Key)]; ok && oldEntry.version != e.version {
  336. txn.duplicateWrites = append(txn.duplicateWrites, oldEntry)
  337. }
  338. txn.pendingWrites[string(e.Key)] = e
  339. return nil
  340. }
  341. // Set adds a key-value pair to the database.
  342. // It will return ErrReadOnlyTxn if update flag was set to false when creating the transaction.
  343. //
  344. // The current transaction keeps a reference to the key and val byte slice
  345. // arguments. Users must not modify key and val until the end of the transaction.
  346. func (txn *Txn) Set(key, val []byte) error {
  347. return txn.SetEntry(NewEntry(key, val))
  348. }
  349. // SetEntry takes an Entry struct and adds the key-value pair in the struct,
  350. // along with other metadata to the database.
  351. //
  352. // The current transaction keeps a reference to the entry passed in argument.
  353. // Users must not modify the entry until the end of the transaction.
  354. func (txn *Txn) SetEntry(e *Entry) error {
  355. return txn.modify(e)
  356. }
  357. // Delete deletes a key.
  358. //
  359. // This is done by adding a delete marker for the key at commit timestamp. Any
  360. // reads happening before this timestamp would be unaffected. Any reads after
  361. // this commit would see the deletion.
  362. //
  363. // The current transaction keeps a reference to the key byte slice argument.
  364. // Users must not modify the key until the end of the transaction.
  365. func (txn *Txn) Delete(key []byte) error {
  366. e := &Entry{
  367. Key: key,
  368. meta: bitDelete,
  369. }
  370. return txn.modify(e)
  371. }
  372. // Get looks for key and returns corresponding Item.
  373. // If key is not found, ErrKeyNotFound is returned.
  374. func (txn *Txn) Get(key []byte) (item *Item, rerr error) {
  375. if len(key) == 0 {
  376. return nil, ErrEmptyKey
  377. } else if txn.discarded {
  378. return nil, ErrDiscardedTxn
  379. }
  380. if err := txn.db.isBanned(key); err != nil {
  381. return nil, err
  382. }
  383. item = new(Item)
  384. if txn.update {
  385. if e, has := txn.pendingWrites[string(key)]; has && bytes.Equal(key, e.Key) {
  386. if isDeletedOrExpired(e.meta, e.ExpiresAt) {
  387. return nil, ErrKeyNotFound
  388. }
  389. // Fulfill from cache.
  390. item.meta = e.meta
  391. item.val = e.Value
  392. item.userMeta = e.UserMeta
  393. item.key = key
  394. item.status = prefetched
  395. item.version = txn.readTs
  396. item.expiresAt = e.ExpiresAt
  397. // We probably don't need to set db on item here.
  398. return item, nil
  399. }
  400. // Only track reads if this is update txn. No need to track read if txn serviced it
  401. // internally.
  402. txn.addReadKey(key)
  403. }
  404. seek := y.KeyWithTs(key, txn.readTs)
  405. vs, err := txn.db.get(seek)
  406. if err != nil {
  407. return nil, y.Wrapf(err, "DB::Get key: %q", key)
  408. }
  409. if vs.Value == nil && vs.Meta == 0 {
  410. return nil, ErrKeyNotFound
  411. }
  412. if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
  413. return nil, ErrKeyNotFound
  414. }
  415. item.key = key
  416. item.version = vs.Version
  417. item.meta = vs.Meta
  418. item.userMeta = vs.UserMeta
  419. item.vptr = y.SafeCopy(item.vptr, vs.Value)
  420. item.txn = txn
  421. item.expiresAt = vs.ExpiresAt
  422. return item, nil
  423. }
  424. func (txn *Txn) addReadKey(key []byte) {
  425. if txn.update {
  426. fp := z.MemHash(key)
  427. // Because of the possibility of multiple iterators it is now possible
  428. // for multiple threads within a read-write transaction to read keys at
  429. // the same time. The reads slice is not currently thread-safe and
  430. // needs to be locked whenever we mark a key as read.
  431. txn.readsLock.Lock()
  432. txn.reads = append(txn.reads, fp)
  433. txn.readsLock.Unlock()
  434. }
  435. }
  436. // Discard discards a created transaction. This method is very important and must be called. Commit
  437. // method calls this internally, however, calling this multiple times doesn't cause any issues. So,
  438. // this can safely be called via a defer right when transaction is created.
  439. //
  440. // NOTE: If any operations are run on a discarded transaction, ErrDiscardedTxn is returned.
  441. func (txn *Txn) Discard() {
  442. if txn.discarded { // Avoid a re-run.
  443. return
  444. }
  445. if txn.numIterators.Load() > 0 {
  446. panic("Unclosed iterator at time of Txn.Discard.")
  447. }
  448. txn.discarded = true
  449. if !txn.db.orc.isManaged {
  450. txn.db.orc.doneRead(txn)
  451. }
  452. }
  453. func (txn *Txn) commitAndSend() (func() error, error) {
  454. orc := txn.db.orc
  455. // Ensure that the order in which we get the commit timestamp is the same as
  456. // the order in which we push these updates to the write channel. So, we
  457. // acquire a writeChLock before getting a commit timestamp, and only release
  458. // it after pushing the entries to it.
  459. orc.writeChLock.Lock()
  460. defer orc.writeChLock.Unlock()
  461. commitTs, conflict := orc.newCommitTs(txn)
  462. if conflict {
  463. return nil, ErrConflict
  464. }
  465. keepTogether := true
  466. setVersion := func(e *Entry) {
  467. if e.version == 0 {
  468. e.version = commitTs
  469. } else {
  470. keepTogether = false
  471. }
  472. }
  473. for _, e := range txn.pendingWrites {
  474. setVersion(e)
  475. }
  476. // The duplicateWrites slice will be non-empty only if there are duplicate
  477. // entries with different versions.
  478. for _, e := range txn.duplicateWrites {
  479. setVersion(e)
  480. }
  481. entries := make([]*Entry, 0, len(txn.pendingWrites)+len(txn.duplicateWrites)+1)
  482. processEntry := func(e *Entry) {
  483. // Suffix the keys with commit ts, so the key versions are sorted in
  484. // descending order of commit timestamp.
  485. e.Key = y.KeyWithTs(e.Key, e.version)
  486. // Add bitTxn only if these entries are part of a transaction. We
  487. // support SetEntryAt(..) in managed mode which means a single
  488. // transaction can have entries with different timestamps. If entries
  489. // in a single transaction have different timestamps, we don't add the
  490. // transaction markers.
  491. if keepTogether {
  492. e.meta |= bitTxn
  493. }
  494. entries = append(entries, e)
  495. }
  496. // The following debug information is what led to determining the cause of
  497. // bank txn violation bug, and it took a whole bunch of effort to narrow it
  498. // down to here. So, keep this around for at least a couple of months.
  499. // var b strings.Builder
  500. // fmt.Fprintf(&b, "Read: %d. Commit: %d. reads: %v. writes: %v. Keys: ",
  501. // txn.readTs, commitTs, txn.reads, txn.conflictKeys)
  502. for _, e := range txn.pendingWrites {
  503. processEntry(e)
  504. }
  505. for _, e := range txn.duplicateWrites {
  506. processEntry(e)
  507. }
  508. if keepTogether {
  509. // CommitTs should not be zero if we're inserting transaction markers.
  510. y.AssertTrue(commitTs != 0)
  511. e := &Entry{
  512. Key: y.KeyWithTs(txnKey, commitTs),
  513. Value: []byte(strconv.FormatUint(commitTs, 10)),
  514. meta: bitFinTxn,
  515. }
  516. entries = append(entries, e)
  517. }
  518. req, err := txn.db.sendToWriteCh(entries)
  519. if err != nil {
  520. orc.doneCommit(commitTs)
  521. return nil, err
  522. }
  523. ret := func() error {
  524. err := req.Wait()
  525. // Wait before marking commitTs as done.
  526. // We can't defer doneCommit above, because it is being called from a
  527. // callback here.
  528. orc.doneCommit(commitTs)
  529. return err
  530. }
  531. return ret, nil
  532. }
  533. func (txn *Txn) commitPrecheck() error {
  534. if txn.discarded {
  535. return errors.New("Trying to commit a discarded txn")
  536. }
  537. keepTogether := true
  538. for _, e := range txn.pendingWrites {
  539. if e.version != 0 {
  540. keepTogether = false
  541. }
  542. }
  543. // If keepTogether is True, it implies transaction markers will be added.
  544. // In that case, commitTs should not be never be zero. This might happen if
  545. // someone uses txn.Commit instead of txn.CommitAt in managed mode. This
  546. // should happen only in managed mode. In normal mode, keepTogether will
  547. // always be true.
  548. if keepTogether && txn.db.opt.managedTxns && txn.commitTs == 0 {
  549. return errors.New("CommitTs cannot be zero. Please use commitAt instead")
  550. }
  551. return nil
  552. }
  553. // Commit commits the transaction, following these steps:
  554. //
  555. // 1. If there are no writes, return immediately.
  556. //
  557. // 2. Check if read rows were updated since txn started. If so, return ErrConflict.
  558. //
  559. // 3. If no conflict, generate a commit timestamp and update written rows' commit ts.
  560. //
  561. // 4. Batch up all writes, write them to value log and LSM tree.
  562. //
  563. // 5. If callback is provided, Badger will return immediately after checking
  564. // for conflicts. Writes to the database will happen in the background. If
  565. // there is a conflict, an error will be returned and the callback will not
  566. // run. If there are no conflicts, the callback will be called in the
  567. // background upon successful completion of writes or any error during write.
  568. //
  569. // If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM
  570. // tree won't be updated, so there's no need for any rollback.
  571. func (txn *Txn) Commit() error {
  572. // txn.conflictKeys can be zero if conflict detection is turned off. So we
  573. // should check txn.pendingWrites.
  574. if len(txn.pendingWrites) == 0 {
  575. // Discard the transaction so that the read is marked done.
  576. txn.Discard()
  577. return nil
  578. }
  579. // Precheck before discarding txn.
  580. if err := txn.commitPrecheck(); err != nil {
  581. return err
  582. }
  583. defer txn.Discard()
  584. txnCb, err := txn.commitAndSend()
  585. if err != nil {
  586. return err
  587. }
  588. // If batchSet failed, LSM would not have been updated. So, no need to rollback anything.
  589. // TODO: What if some of the txns successfully make it to value log, but others fail.
  590. // Nothing gets updated to LSM, until a restart happens.
  591. return txnCb()
  592. }
  593. type txnCb struct {
  594. commit func() error
  595. user func(error)
  596. err error
  597. }
  598. func runTxnCallback(cb *txnCb) {
  599. switch {
  600. case cb == nil:
  601. panic("txn callback is nil")
  602. case cb.user == nil:
  603. panic("Must have caught a nil callback for txn.CommitWith")
  604. case cb.err != nil:
  605. cb.user(cb.err)
  606. case cb.commit != nil:
  607. err := cb.commit()
  608. cb.user(err)
  609. default:
  610. cb.user(nil)
  611. }
  612. }
  613. // CommitWith acts like Commit, but takes a callback, which gets run via a
  614. // goroutine to avoid blocking this function. The callback is guaranteed to run,
  615. // so it is safe to increment sync.WaitGroup before calling CommitWith, and
  616. // decrementing it in the callback; to block until all callbacks are run.
  617. func (txn *Txn) CommitWith(cb func(error)) {
  618. if cb == nil {
  619. panic("Nil callback provided to CommitWith")
  620. }
  621. if len(txn.pendingWrites) == 0 {
  622. // Do not run these callbacks from here, because the CommitWith and the
  623. // callback might be acquiring the same locks. Instead run the callback
  624. // from another goroutine.
  625. go runTxnCallback(&txnCb{user: cb, err: nil})
  626. // Discard the transaction so that the read is marked done.
  627. txn.Discard()
  628. return
  629. }
  630. // Precheck before discarding txn.
  631. if err := txn.commitPrecheck(); err != nil {
  632. cb(err)
  633. return
  634. }
  635. defer txn.Discard()
  636. commitCb, err := txn.commitAndSend()
  637. if err != nil {
  638. go runTxnCallback(&txnCb{user: cb, err: err})
  639. return
  640. }
  641. go runTxnCallback(&txnCb{user: cb, commit: commitCb})
  642. }
  643. // ReadTs returns the read timestamp of the transaction.
  644. func (txn *Txn) ReadTs() uint64 {
  645. return txn.readTs
  646. }
  647. // NewTransaction creates a new transaction. Badger supports concurrent execution of transactions,
  648. // providing serializable snapshot isolation, avoiding write skews. Badger achieves this by tracking
  649. // the keys read and at Commit time, ensuring that these read keys weren't concurrently modified by
  650. // another transaction.
  651. //
  652. // For read-only transactions, set update to false. In this mode, we don't track the rows read for
  653. // any changes. Thus, any long running iterations done in this mode wouldn't pay this overhead.
  654. //
  655. // Running transactions concurrently is OK. However, a transaction itself isn't thread safe, and
  656. // should only be run serially. It doesn't matter if a transaction is created by one goroutine and
  657. // passed down to other, as long as the Txn APIs are called serially.
  658. //
  659. // When you create a new transaction, it is absolutely essential to call
  660. // Discard(). This should be done irrespective of what the update param is set
  661. // to. Commit API internally runs Discard, but running it twice wouldn't cause
  662. // any issues.
  663. //
  664. // txn := db.NewTransaction(false)
  665. // defer txn.Discard()
  666. // // Call various APIs.
  667. func (db *DB) NewTransaction(update bool) *Txn {
  668. return db.newTransaction(update, false)
  669. }
  670. func (db *DB) newTransaction(update, isManaged bool) *Txn {
  671. if db.opt.ReadOnly && update {
  672. // DB is read-only, force read-only transaction.
  673. update = false
  674. }
  675. txn := &Txn{
  676. update: update,
  677. db: db,
  678. count: 1, // One extra entry for BitFin.
  679. size: int64(len(txnKey) + 10), // Some buffer for the extra entry.
  680. }
  681. if update {
  682. if db.opt.DetectConflicts {
  683. txn.conflictKeys = make(map[uint64]struct{})
  684. }
  685. txn.pendingWrites = make(map[string]*Entry)
  686. }
  687. if !isManaged {
  688. txn.readTs = db.orc.readTs()
  689. }
  690. return txn
  691. }
  692. // View executes a function creating and managing a read-only transaction for the user. Error
  693. // returned by the function is relayed by the View method.
  694. // If View is used with managed transactions, it would assume a read timestamp of MaxUint64.
  695. func (db *DB) View(fn func(txn *Txn) error) error {
  696. if db.IsClosed() {
  697. return ErrDBClosed
  698. }
  699. var txn *Txn
  700. if db.opt.managedTxns {
  701. txn = db.NewTransactionAt(math.MaxUint64, false)
  702. } else {
  703. txn = db.NewTransaction(false)
  704. }
  705. defer txn.Discard()
  706. return fn(txn)
  707. }
  708. // Update executes a function, creating and managing a read-write transaction
  709. // for the user. Error returned by the function is relayed by the Update method.
  710. // Update cannot be used with managed transactions.
  711. func (db *DB) Update(fn func(txn *Txn) error) error {
  712. if db.IsClosed() {
  713. return ErrDBClosed
  714. }
  715. if db.opt.managedTxns {
  716. panic("Update can only be used with managedDB=false.")
  717. }
  718. txn := db.NewTransaction(true)
  719. defer txn.Discard()
  720. if err := fn(txn); err != nil {
  721. return err
  722. }
  723. return txn.Commit()
  724. }