txn.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. "bytes"
  8. "context"
  9. "encoding/hex"
  10. "math"
  11. "sort"
  12. "strconv"
  13. "sync"
  14. "sync/atomic"
  15. "github.com/pkg/errors"
  16. "github.com/dgraph-io/badger/v4/y"
  17. "github.com/dgraph-io/ristretto/v2/z"
  18. )
  19. type oracle struct {
  20. isManaged bool // Does not change value, so no locking required.
  21. detectConflicts bool // Determines if the txns should be checked for conflicts.
  22. sync.Mutex // For nextTxnTs and commits.
  23. // writeChLock lock is for ensuring that transactions go to the write
  24. // channel in the same order as their commit timestamps.
  25. writeChLock sync.Mutex
  26. nextTxnTs uint64
  27. // Used to block NewTransaction, so all previous commits are visible to a new read.
  28. txnMark *y.WaterMark
  29. // Either of these is used to determine which versions can be permanently
  30. // discarded during compaction.
  31. discardTs uint64 // Used by ManagedDB.
  32. readMark *y.WaterMark // Used by DB.
  33. // committedTxns contains all committed writes (contains fingerprints
  34. // of keys written and their latest commit counter).
  35. committedTxns []committedTxn
  36. lastCleanupTs uint64
  37. // closer is used to stop watermarks.
  38. closer *z.Closer
  39. }
  40. type committedTxn struct {
  41. ts uint64
  42. // ConflictKeys Keeps track of the entries written at timestamp ts.
  43. conflictKeys map[uint64]struct{}
  44. }
  45. func newOracle(opt Options) *oracle {
  46. orc := &oracle{
  47. isManaged: opt.managedTxns,
  48. detectConflicts: opt.DetectConflicts,
  49. // We're not initializing nextTxnTs and readOnlyTs. It would be done after replay in Open.
  50. //
  51. // WaterMarks must be 64-bit aligned for atomic package, hence we must use pointers here.
  52. // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
  53. readMark: &y.WaterMark{Name: "badger.PendingReads"},
  54. txnMark: &y.WaterMark{Name: "badger.TxnTimestamp"},
  55. closer: z.NewCloser(2),
  56. }
  57. orc.readMark.Init(orc.closer)
  58. orc.txnMark.Init(orc.closer)
  59. return orc
  60. }
  61. func (o *oracle) Stop() {
  62. o.closer.SignalAndWait()
  63. }
  64. func (o *oracle) readTs() uint64 {
  65. if o.isManaged {
  66. panic("ReadTs should not be retrieved for managed DB")
  67. }
  68. var readTs uint64
  69. o.Lock()
  70. readTs = o.nextTxnTs - 1
  71. o.readMark.Begin(readTs)
  72. o.Unlock()
  73. // Wait for all txns which have no conflicts, have been assigned a commit
  74. // timestamp and are going through the write to value log and LSM tree
  75. // process. Not waiting here could mean that some txns which have been
  76. // committed would not be read.
  77. y.Check(o.txnMark.WaitForMark(context.Background(), readTs))
  78. return readTs
  79. }
  80. func (o *oracle) nextTs() uint64 {
  81. o.Lock()
  82. defer o.Unlock()
  83. return o.nextTxnTs
  84. }
  85. func (o *oracle) incrementNextTs() {
  86. o.Lock()
  87. defer o.Unlock()
  88. o.nextTxnTs++
  89. }
  90. // Any deleted or invalid versions at or below ts would be discarded during
  91. // compaction to reclaim disk space in LSM tree and thence value log.
  92. func (o *oracle) setDiscardTs(ts uint64) {
  93. o.Lock()
  94. defer o.Unlock()
  95. o.discardTs = ts
  96. o.cleanupCommittedTransactions()
  97. }
  98. func (o *oracle) discardAtOrBelow() uint64 {
  99. if o.isManaged {
  100. o.Lock()
  101. defer o.Unlock()
  102. return o.discardTs
  103. }
  104. return o.readMark.DoneUntil()
  105. }
  106. // hasConflict must be called while having a lock.
  107. func (o *oracle) hasConflict(txn *Txn) bool {
  108. if len(txn.reads) == 0 {
  109. return false
  110. }
  111. for _, committedTxn := range o.committedTxns {
  112. // If the committedTxn.ts is less than txn.readTs that implies that the
  113. // committedTxn finished before the current transaction started.
  114. // We don't need to check for conflict in that case.
  115. // This change assumes linearizability. Lack of linearizability could
  116. // cause the read ts of a new txn to be lower than the commit ts of
  117. // a txn before it (@mrjn).
  118. if committedTxn.ts <= txn.readTs {
  119. continue
  120. }
  121. for _, ro := range txn.reads {
  122. if _, has := committedTxn.conflictKeys[ro]; has {
  123. return true
  124. }
  125. }
  126. }
  127. return false
  128. }
  129. func (o *oracle) newCommitTs(txn *Txn) (uint64, bool) {
  130. o.Lock()
  131. defer o.Unlock()
  132. if o.hasConflict(txn) {
  133. return 0, true
  134. }
  135. var ts uint64
  136. if !o.isManaged {
  137. o.doneRead(txn)
  138. o.cleanupCommittedTransactions()
  139. // This is the general case, when user doesn't specify the read and commit ts.
  140. ts = o.nextTxnTs
  141. o.nextTxnTs++
  142. o.txnMark.Begin(ts)
  143. } else {
  144. // If commitTs is set, use it instead.
  145. ts = txn.commitTs
  146. }
  147. y.AssertTrue(ts >= o.lastCleanupTs)
  148. if o.detectConflicts {
  149. // We should ensure that txns are not added to o.committedTxns slice when
  150. // conflict detection is disabled otherwise this slice would keep growing.
  151. o.committedTxns = append(o.committedTxns, committedTxn{
  152. ts: ts,
  153. conflictKeys: txn.conflictKeys,
  154. })
  155. }
  156. return ts, false
  157. }
  158. func (o *oracle) doneRead(txn *Txn) {
  159. if !txn.doneRead {
  160. txn.doneRead = true
  161. o.readMark.Done(txn.readTs)
  162. }
  163. }
  164. func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock
  165. if !o.detectConflicts {
  166. // When detectConflicts is set to false, we do not store any
  167. // committedTxns and so there's nothing to clean up.
  168. return
  169. }
  170. // Same logic as discardAtOrBelow but unlocked
  171. var maxReadTs uint64
  172. if o.isManaged {
  173. maxReadTs = o.discardTs
  174. } else {
  175. maxReadTs = o.readMark.DoneUntil()
  176. }
  177. y.AssertTrue(maxReadTs >= o.lastCleanupTs)
  178. // do not run clean up if the maxReadTs (read timestamp of the
  179. // oldest transaction that is still in flight) has not increased
  180. if maxReadTs == o.lastCleanupTs {
  181. return
  182. }
  183. o.lastCleanupTs = maxReadTs
  184. tmp := o.committedTxns[:0]
  185. for _, txn := range o.committedTxns {
  186. if txn.ts <= maxReadTs {
  187. continue
  188. }
  189. tmp = append(tmp, txn)
  190. }
  191. o.committedTxns = tmp
  192. }
  193. func (o *oracle) doneCommit(cts uint64) {
  194. if o.isManaged {
  195. // No need to update anything.
  196. return
  197. }
  198. o.txnMark.Done(cts)
  199. }
  200. // Txn represents a Badger transaction.
  201. type Txn struct {
  202. readTs uint64
  203. commitTs uint64
  204. size int64
  205. count int64
  206. db *DB
  207. reads []uint64 // contains fingerprints of keys read.
  208. // contains fingerprints of keys written. This is used for conflict detection.
  209. conflictKeys map[uint64]struct{}
  210. readsLock sync.Mutex // guards the reads slice. See addReadKey.
  211. pendingWrites map[string]*Entry // cache stores any writes done by txn.
  212. duplicateWrites []*Entry // Used in managed mode to store duplicate entries.
  213. numIterators atomic.Int32
  214. discarded bool
  215. doneRead bool
  216. update bool // update is used to conditionally keep track of reads.
  217. }
  218. type pendingWritesIterator struct {
  219. entries []*Entry
  220. nextIdx int
  221. readTs uint64
  222. reversed bool
  223. }
  224. func (pi *pendingWritesIterator) Next() {
  225. pi.nextIdx++
  226. }
  227. func (pi *pendingWritesIterator) Rewind() {
  228. pi.nextIdx = 0
  229. }
  230. func (pi *pendingWritesIterator) Seek(key []byte) {
  231. key = y.ParseKey(key)
  232. pi.nextIdx = sort.Search(len(pi.entries), func(idx int) bool {
  233. cmp := bytes.Compare(pi.entries[idx].Key, key)
  234. if !pi.reversed {
  235. return cmp >= 0
  236. }
  237. return cmp <= 0
  238. })
  239. }
  240. func (pi *pendingWritesIterator) Key() []byte {
  241. y.AssertTrue(pi.Valid())
  242. entry := pi.entries[pi.nextIdx]
  243. return y.KeyWithTs(entry.Key, pi.readTs)
  244. }
  245. func (pi *pendingWritesIterator) Value() y.ValueStruct {
  246. y.AssertTrue(pi.Valid())
  247. entry := pi.entries[pi.nextIdx]
  248. return y.ValueStruct{
  249. Value: entry.Value,
  250. Meta: entry.meta,
  251. UserMeta: entry.UserMeta,
  252. ExpiresAt: entry.ExpiresAt,
  253. Version: pi.readTs,
  254. }
  255. }
  256. func (pi *pendingWritesIterator) Valid() bool {
  257. return pi.nextIdx < len(pi.entries)
  258. }
  259. func (pi *pendingWritesIterator) Close() error {
  260. return nil
  261. }
  262. func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator {
  263. if !txn.update || len(txn.pendingWrites) == 0 {
  264. return nil
  265. }
  266. entries := make([]*Entry, 0, len(txn.pendingWrites))
  267. for _, e := range txn.pendingWrites {
  268. entries = append(entries, e)
  269. }
  270. // Number of pending writes per transaction shouldn't be too big in general.
  271. sort.Slice(entries, func(i, j int) bool {
  272. cmp := bytes.Compare(entries[i].Key, entries[j].Key)
  273. if !reversed {
  274. return cmp < 0
  275. }
  276. return cmp > 0
  277. })
  278. return &pendingWritesIterator{
  279. readTs: txn.readTs,
  280. entries: entries,
  281. reversed: reversed,
  282. }
  283. }
  284. func (txn *Txn) checkSize(e *Entry) error {
  285. count := txn.count + 1
  286. // Extra bytes for the version in key.
  287. size := txn.size + e.estimateSizeAndSetThreshold(txn.db.valueThreshold()) + 10
  288. if count >= txn.db.opt.maxBatchCount || size >= txn.db.opt.maxBatchSize {
  289. return ErrTxnTooBig
  290. }
  291. txn.count, txn.size = count, size
  292. return nil
  293. }
  294. func exceedsSize(prefix string, max int64, key []byte) error {
  295. return errors.Errorf("%s with size %d exceeded %d limit. %s:\n%s",
  296. prefix, len(key), max, prefix, hex.Dump(key[:1<<10]))
  297. }
  298. func (txn *Txn) modify(e *Entry) error {
  299. const maxKeySize = 65000
  300. switch {
  301. case !txn.update:
  302. return ErrReadOnlyTxn
  303. case txn.discarded:
  304. return ErrDiscardedTxn
  305. case len(e.Key) == 0:
  306. return ErrEmptyKey
  307. case bytes.HasPrefix(e.Key, badgerPrefix):
  308. return ErrInvalidKey
  309. case len(e.Key) > maxKeySize:
  310. // Key length can't be more than uint16, as determined by table::header. To
  311. // keep things safe and allow badger move prefix and a timestamp suffix, let's
  312. // cut it down to 65000, instead of using 65536.
  313. return exceedsSize("Key", maxKeySize, e.Key)
  314. case int64(len(e.Value)) > txn.db.opt.ValueLogFileSize:
  315. return exceedsSize("Value", txn.db.opt.ValueLogFileSize, e.Value)
  316. case txn.db.opt.InMemory && int64(len(e.Value)) > txn.db.valueThreshold():
  317. return exceedsSize("Value", txn.db.valueThreshold(), e.Value)
  318. }
  319. if err := txn.db.isBanned(e.Key); err != nil {
  320. return err
  321. }
  322. if err := txn.checkSize(e); err != nil {
  323. return err
  324. }
  325. // The txn.conflictKeys is used for conflict detection. If conflict detection
  326. // is disabled, we don't need to store key hashes in this map.
  327. if txn.db.opt.DetectConflicts {
  328. fp := z.MemHash(e.Key) // Avoid dealing with byte arrays.
  329. txn.conflictKeys[fp] = struct{}{}
  330. }
  331. // If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice.
  332. // Add the entry to duplicateWrites only if both the entries have different versions. For
  333. // same versions, we will overwrite the existing entry.
  334. if oldEntry, ok := txn.pendingWrites[string(e.Key)]; ok && oldEntry.version != e.version {
  335. txn.duplicateWrites = append(txn.duplicateWrites, oldEntry)
  336. }
  337. txn.pendingWrites[string(e.Key)] = e
  338. return nil
  339. }
  340. // Set adds a key-value pair to the database.
  341. // It will return ErrReadOnlyTxn if update flag was set to false when creating the transaction.
  342. //
  343. // The current transaction keeps a reference to the key and val byte slice
  344. // arguments. Users must not modify key and val until the end of the transaction.
  345. func (txn *Txn) Set(key, val []byte) error {
  346. return txn.SetEntry(NewEntry(key, val))
  347. }
  348. // SetEntry takes an Entry struct and adds the key-value pair in the struct,
  349. // along with other metadata to the database.
  350. //
  351. // The current transaction keeps a reference to the entry passed in argument.
  352. // Users must not modify the entry until the end of the transaction.
  353. func (txn *Txn) SetEntry(e *Entry) error {
  354. return txn.modify(e)
  355. }
  356. // Delete deletes a key.
  357. //
  358. // This is done by adding a delete marker for the key at commit timestamp. Any
  359. // reads happening before this timestamp would be unaffected. Any reads after
  360. // this commit would see the deletion.
  361. //
  362. // The current transaction keeps a reference to the key byte slice argument.
  363. // Users must not modify the key until the end of the transaction.
  364. func (txn *Txn) Delete(key []byte) error {
  365. e := &Entry{
  366. Key: key,
  367. meta: bitDelete,
  368. }
  369. return txn.modify(e)
  370. }
  371. // Get looks for key and returns corresponding Item.
  372. // If key is not found, ErrKeyNotFound is returned.
  373. func (txn *Txn) Get(key []byte) (item *Item, rerr error) {
  374. if len(key) == 0 {
  375. return nil, ErrEmptyKey
  376. } else if txn.discarded {
  377. return nil, ErrDiscardedTxn
  378. }
  379. if err := txn.db.isBanned(key); err != nil {
  380. return nil, err
  381. }
  382. item = new(Item)
  383. if txn.update {
  384. if e, has := txn.pendingWrites[string(key)]; has && bytes.Equal(key, e.Key) {
  385. if isDeletedOrExpired(e.meta, e.ExpiresAt) {
  386. return nil, ErrKeyNotFound
  387. }
  388. // Fulfill from cache.
  389. item.meta = e.meta
  390. item.val = e.Value
  391. item.userMeta = e.UserMeta
  392. item.key = key
  393. item.status = prefetched
  394. item.version = txn.readTs
  395. item.expiresAt = e.ExpiresAt
  396. // We probably don't need to set db on item here.
  397. return item, nil
  398. }
  399. // Only track reads if this is update txn. No need to track read if txn serviced it
  400. // internally.
  401. txn.addReadKey(key)
  402. }
  403. seek := y.KeyWithTs(key, txn.readTs)
  404. vs, err := txn.db.get(seek)
  405. if err != nil {
  406. return nil, y.Wrapf(err, "DB::Get key: %q", key)
  407. }
  408. if vs.Value == nil && vs.Meta == 0 {
  409. return nil, ErrKeyNotFound
  410. }
  411. if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
  412. return nil, ErrKeyNotFound
  413. }
  414. item.key = key
  415. item.version = vs.Version
  416. item.meta = vs.Meta
  417. item.userMeta = vs.UserMeta
  418. item.vptr = y.SafeCopy(item.vptr, vs.Value)
  419. item.txn = txn
  420. item.expiresAt = vs.ExpiresAt
  421. return item, nil
  422. }
  423. func (txn *Txn) addReadKey(key []byte) {
  424. if txn.update {
  425. fp := z.MemHash(key)
  426. // Because of the possibility of multiple iterators it is now possible
  427. // for multiple threads within a read-write transaction to read keys at
  428. // the same time. The reads slice is not currently thread-safe and
  429. // needs to be locked whenever we mark a key as read.
  430. txn.readsLock.Lock()
  431. txn.reads = append(txn.reads, fp)
  432. txn.readsLock.Unlock()
  433. }
  434. }
  435. // Discard discards a created transaction. This method is very important and must be called. Commit
  436. // method calls this internally, however, calling this multiple times doesn't cause any issues. So,
  437. // this can safely be called via a defer right when transaction is created.
  438. //
  439. // NOTE: If any operations are run on a discarded transaction, ErrDiscardedTxn is returned.
  440. func (txn *Txn) Discard() {
  441. if txn.discarded { // Avoid a re-run.
  442. return
  443. }
  444. if txn.numIterators.Load() > 0 {
  445. panic("Unclosed iterator at time of Txn.Discard.")
  446. }
  447. txn.discarded = true
  448. if !txn.db.orc.isManaged {
  449. txn.db.orc.doneRead(txn)
  450. }
  451. }
  452. func (txn *Txn) commitAndSend() (func() error, error) {
  453. orc := txn.db.orc
  454. // Ensure that the order in which we get the commit timestamp is the same as
  455. // the order in which we push these updates to the write channel. So, we
  456. // acquire a writeChLock before getting a commit timestamp, and only release
  457. // it after pushing the entries to it.
  458. orc.writeChLock.Lock()
  459. defer orc.writeChLock.Unlock()
  460. commitTs, conflict := orc.newCommitTs(txn)
  461. if conflict {
  462. return nil, ErrConflict
  463. }
  464. keepTogether := true
  465. setVersion := func(e *Entry) {
  466. if e.version == 0 {
  467. e.version = commitTs
  468. } else {
  469. keepTogether = false
  470. }
  471. }
  472. for _, e := range txn.pendingWrites {
  473. setVersion(e)
  474. }
  475. // The duplicateWrites slice will be non-empty only if there are duplicate
  476. // entries with different versions.
  477. for _, e := range txn.duplicateWrites {
  478. setVersion(e)
  479. }
  480. entries := make([]*Entry, 0, len(txn.pendingWrites)+len(txn.duplicateWrites)+1)
  481. processEntry := func(e *Entry) {
  482. // Suffix the keys with commit ts, so the key versions are sorted in
  483. // descending order of commit timestamp.
  484. e.Key = y.KeyWithTs(e.Key, e.version)
  485. // Add bitTxn only if these entries are part of a transaction. We
  486. // support SetEntryAt(..) in managed mode which means a single
  487. // transaction can have entries with different timestamps. If entries
  488. // in a single transaction have different timestamps, we don't add the
  489. // transaction markers.
  490. if keepTogether {
  491. e.meta |= bitTxn
  492. }
  493. entries = append(entries, e)
  494. }
  495. // The following debug information is what led to determining the cause of
  496. // bank txn violation bug, and it took a whole bunch of effort to narrow it
  497. // down to here. So, keep this around for at least a couple of months.
  498. // var b strings.Builder
  499. // fmt.Fprintf(&b, "Read: %d. Commit: %d. reads: %v. writes: %v. Keys: ",
  500. // txn.readTs, commitTs, txn.reads, txn.conflictKeys)
  501. for _, e := range txn.pendingWrites {
  502. processEntry(e)
  503. }
  504. for _, e := range txn.duplicateWrites {
  505. processEntry(e)
  506. }
  507. if keepTogether {
  508. // CommitTs should not be zero if we're inserting transaction markers.
  509. y.AssertTrue(commitTs != 0)
  510. e := &Entry{
  511. Key: y.KeyWithTs(txnKey, commitTs),
  512. Value: []byte(strconv.FormatUint(commitTs, 10)),
  513. meta: bitFinTxn,
  514. }
  515. entries = append(entries, e)
  516. }
  517. req, err := txn.db.sendToWriteCh(entries)
  518. if err != nil {
  519. orc.doneCommit(commitTs)
  520. return nil, err
  521. }
  522. ret := func() error {
  523. err := req.Wait()
  524. // Wait before marking commitTs as done.
  525. // We can't defer doneCommit above, because it is being called from a
  526. // callback here.
  527. orc.doneCommit(commitTs)
  528. return err
  529. }
  530. return ret, nil
  531. }
  532. func (txn *Txn) commitPrecheck() error {
  533. if txn.discarded {
  534. return errors.New("Trying to commit a discarded txn")
  535. }
  536. keepTogether := true
  537. for _, e := range txn.pendingWrites {
  538. if e.version != 0 {
  539. keepTogether = false
  540. }
  541. }
  542. // If keepTogether is True, it implies transaction markers will be added.
  543. // In that case, commitTs should not be never be zero. This might happen if
  544. // someone uses txn.Commit instead of txn.CommitAt in managed mode. This
  545. // should happen only in managed mode. In normal mode, keepTogether will
  546. // always be true.
  547. if keepTogether && txn.db.opt.managedTxns && txn.commitTs == 0 {
  548. return errors.New("CommitTs cannot be zero. Please use commitAt instead")
  549. }
  550. return nil
  551. }
  552. // Commit commits the transaction, following these steps:
  553. //
  554. // 1. If there are no writes, return immediately.
  555. //
  556. // 2. Check if read rows were updated since txn started. If so, return ErrConflict.
  557. //
  558. // 3. If no conflict, generate a commit timestamp and update written rows' commit ts.
  559. //
  560. // 4. Batch up all writes, write them to value log and LSM tree.
  561. //
  562. // 5. If callback is provided, Badger will return immediately after checking
  563. // for conflicts. Writes to the database will happen in the background. If
  564. // there is a conflict, an error will be returned and the callback will not
  565. // run. If there are no conflicts, the callback will be called in the
  566. // background upon successful completion of writes or any error during write.
  567. //
  568. // If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM
  569. // tree won't be updated, so there's no need for any rollback.
  570. func (txn *Txn) Commit() error {
  571. // txn.conflictKeys can be zero if conflict detection is turned off. So we
  572. // should check txn.pendingWrites.
  573. if len(txn.pendingWrites) == 0 {
  574. // Discard the transaction so that the read is marked done.
  575. txn.Discard()
  576. return nil
  577. }
  578. // Precheck before discarding txn.
  579. if err := txn.commitPrecheck(); err != nil {
  580. return err
  581. }
  582. defer txn.Discard()
  583. txnCb, err := txn.commitAndSend()
  584. if err != nil {
  585. return err
  586. }
  587. // If batchSet failed, LSM would not have been updated. So, no need to rollback anything.
  588. // TODO: What if some of the txns successfully make it to value log, but others fail.
  589. // Nothing gets updated to LSM, until a restart happens.
  590. return txnCb()
  591. }
  592. type txnCb struct {
  593. commit func() error
  594. user func(error)
  595. err error
  596. }
  597. func runTxnCallback(cb *txnCb) {
  598. switch {
  599. case cb == nil:
  600. panic("txn callback is nil")
  601. case cb.user == nil:
  602. panic("Must have caught a nil callback for txn.CommitWith")
  603. case cb.err != nil:
  604. cb.user(cb.err)
  605. case cb.commit != nil:
  606. err := cb.commit()
  607. cb.user(err)
  608. default:
  609. cb.user(nil)
  610. }
  611. }
  612. // CommitWith acts like Commit, but takes a callback, which gets run via a
  613. // goroutine to avoid blocking this function. The callback is guaranteed to run,
  614. // so it is safe to increment sync.WaitGroup before calling CommitWith, and
  615. // decrementing it in the callback; to block until all callbacks are run.
  616. func (txn *Txn) CommitWith(cb func(error)) {
  617. if cb == nil {
  618. panic("Nil callback provided to CommitWith")
  619. }
  620. if len(txn.pendingWrites) == 0 {
  621. // Do not run these callbacks from here, because the CommitWith and the
  622. // callback might be acquiring the same locks. Instead run the callback
  623. // from another goroutine.
  624. go runTxnCallback(&txnCb{user: cb, err: nil})
  625. // Discard the transaction so that the read is marked done.
  626. txn.Discard()
  627. return
  628. }
  629. // Precheck before discarding txn.
  630. if err := txn.commitPrecheck(); err != nil {
  631. cb(err)
  632. return
  633. }
  634. defer txn.Discard()
  635. commitCb, err := txn.commitAndSend()
  636. if err != nil {
  637. go runTxnCallback(&txnCb{user: cb, err: err})
  638. return
  639. }
  640. go runTxnCallback(&txnCb{user: cb, commit: commitCb})
  641. }
  642. // ReadTs returns the read timestamp of the transaction.
  643. func (txn *Txn) ReadTs() uint64 {
  644. return txn.readTs
  645. }
  646. // NewTransaction creates a new transaction. Badger supports concurrent execution of transactions,
  647. // providing serializable snapshot isolation, avoiding write skews. Badger achieves this by tracking
  648. // the keys read and at Commit time, ensuring that these read keys weren't concurrently modified by
  649. // another transaction.
  650. //
  651. // For read-only transactions, set update to false. In this mode, we don't track the rows read for
  652. // any changes. Thus, any long running iterations done in this mode wouldn't pay this overhead.
  653. //
  654. // Running transactions concurrently is OK. However, a transaction itself isn't thread safe, and
  655. // should only be run serially. It doesn't matter if a transaction is created by one goroutine and
  656. // passed down to other, as long as the Txn APIs are called serially.
  657. //
  658. // When you create a new transaction, it is absolutely essential to call
  659. // Discard(). This should be done irrespective of what the update param is set
  660. // to. Commit API internally runs Discard, but running it twice wouldn't cause
  661. // any issues.
  662. //
  663. // txn := db.NewTransaction(false)
  664. // defer txn.Discard()
  665. // // Call various APIs.
  666. func (db *DB) NewTransaction(update bool) *Txn {
  667. return db.newTransaction(update, false)
  668. }
  669. func (db *DB) newTransaction(update, isManaged bool) *Txn {
  670. if db.opt.ReadOnly && update {
  671. // DB is read-only, force read-only transaction.
  672. update = false
  673. }
  674. txn := &Txn{
  675. update: update,
  676. db: db,
  677. count: 1, // One extra entry for BitFin.
  678. size: int64(len(txnKey) + 10), // Some buffer for the extra entry.
  679. }
  680. if update {
  681. if db.opt.DetectConflicts {
  682. txn.conflictKeys = make(map[uint64]struct{})
  683. }
  684. txn.pendingWrites = make(map[string]*Entry)
  685. }
  686. if !isManaged {
  687. txn.readTs = db.orc.readTs()
  688. }
  689. return txn
  690. }
  691. // View executes a function creating and managing a read-only transaction for the user. Error
  692. // returned by the function is relayed by the View method.
  693. // If View is used with managed transactions, it would assume a read timestamp of MaxUint64.
  694. func (db *DB) View(fn func(txn *Txn) error) error {
  695. if db.IsClosed() {
  696. return ErrDBClosed
  697. }
  698. var txn *Txn
  699. if db.opt.managedTxns {
  700. txn = db.NewTransactionAt(math.MaxUint64, false)
  701. } else {
  702. txn = db.NewTransaction(false)
  703. }
  704. defer txn.Discard()
  705. return fn(txn)
  706. }
  707. // Update executes a function, creating and managing a read-write transaction
  708. // for the user. Error returned by the function is relayed by the Update method.
  709. // Update cannot be used with managed transactions.
  710. func (db *DB) Update(fn func(txn *Txn) error) error {
  711. if db.IsClosed() {
  712. return ErrDBClosed
  713. }
  714. if db.opt.managedTxns {
  715. panic("Update can only be used with managedDB=false.")
  716. }
  717. txn := db.NewTransaction(true)
  718. defer txn.Discard()
  719. if err := fn(txn); err != nil {
  720. return err
  721. }
  722. return txn.Commit()
  723. }