txn.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. "bytes"
  19. "context"
  20. "encoding/hex"
  21. "math"
  22. "sort"
  23. "strconv"
  24. "sync"
  25. "sync/atomic"
  26. "github.com/pkg/errors"
  27. "github.com/dgraph-io/badger/v4/y"
  28. "github.com/dgraph-io/ristretto/v2/z"
  29. )
  30. type oracle struct {
  31. isManaged bool // Does not change value, so no locking required.
  32. detectConflicts bool // Determines if the txns should be checked for conflicts.
  33. sync.Mutex // For nextTxnTs and commits.
  34. // writeChLock lock is for ensuring that transactions go to the write
  35. // channel in the same order as their commit timestamps.
  36. writeChLock sync.Mutex
  37. nextTxnTs uint64
  38. // Used to block NewTransaction, so all previous commits are visible to a new read.
  39. txnMark *y.WaterMark
  40. // Either of these is used to determine which versions can be permanently
  41. // discarded during compaction.
  42. discardTs uint64 // Used by ManagedDB.
  43. readMark *y.WaterMark // Used by DB.
  44. // committedTxns contains all committed writes (contains fingerprints
  45. // of keys written and their latest commit counter).
  46. committedTxns []committedTxn
  47. lastCleanupTs uint64
  48. // closer is used to stop watermarks.
  49. closer *z.Closer
  50. }
  51. type committedTxn struct {
  52. ts uint64
  53. // ConflictKeys Keeps track of the entries written at timestamp ts.
  54. conflictKeys map[uint64]struct{}
  55. }
  56. func newOracle(opt Options) *oracle {
  57. orc := &oracle{
  58. isManaged: opt.managedTxns,
  59. detectConflicts: opt.DetectConflicts,
  60. // We're not initializing nextTxnTs and readOnlyTs. It would be done after replay in Open.
  61. //
  62. // WaterMarks must be 64-bit aligned for atomic package, hence we must use pointers here.
  63. // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
  64. readMark: &y.WaterMark{Name: "badger.PendingReads"},
  65. txnMark: &y.WaterMark{Name: "badger.TxnTimestamp"},
  66. closer: z.NewCloser(2),
  67. }
  68. orc.readMark.Init(orc.closer)
  69. orc.txnMark.Init(orc.closer)
  70. return orc
  71. }
  72. func (o *oracle) Stop() {
  73. o.closer.SignalAndWait()
  74. }
  75. func (o *oracle) readTs() uint64 {
  76. if o.isManaged {
  77. panic("ReadTs should not be retrieved for managed DB")
  78. }
  79. var readTs uint64
  80. o.Lock()
  81. readTs = o.nextTxnTs - 1
  82. o.readMark.Begin(readTs)
  83. o.Unlock()
  84. // Wait for all txns which have no conflicts, have been assigned a commit
  85. // timestamp and are going through the write to value log and LSM tree
  86. // process. Not waiting here could mean that some txns which have been
  87. // committed would not be read.
  88. y.Check(o.txnMark.WaitForMark(context.Background(), readTs))
  89. return readTs
  90. }
  91. func (o *oracle) nextTs() uint64 {
  92. o.Lock()
  93. defer o.Unlock()
  94. return o.nextTxnTs
  95. }
  96. func (o *oracle) incrementNextTs() {
  97. o.Lock()
  98. defer o.Unlock()
  99. o.nextTxnTs++
  100. }
  101. // Any deleted or invalid versions at or below ts would be discarded during
  102. // compaction to reclaim disk space in LSM tree and thence value log.
  103. func (o *oracle) setDiscardTs(ts uint64) {
  104. o.Lock()
  105. defer o.Unlock()
  106. o.discardTs = ts
  107. o.cleanupCommittedTransactions()
  108. }
  109. func (o *oracle) discardAtOrBelow() uint64 {
  110. if o.isManaged {
  111. o.Lock()
  112. defer o.Unlock()
  113. return o.discardTs
  114. }
  115. return o.readMark.DoneUntil()
  116. }
  117. // hasConflict must be called while having a lock.
  118. func (o *oracle) hasConflict(txn *Txn) bool {
  119. if len(txn.reads) == 0 {
  120. return false
  121. }
  122. for _, committedTxn := range o.committedTxns {
  123. // If the committedTxn.ts is less than txn.readTs that implies that the
  124. // committedTxn finished before the current transaction started.
  125. // We don't need to check for conflict in that case.
  126. // This change assumes linearizability. Lack of linearizability could
  127. // cause the read ts of a new txn to be lower than the commit ts of
  128. // a txn before it (@mrjn).
  129. if committedTxn.ts <= txn.readTs {
  130. continue
  131. }
  132. for _, ro := range txn.reads {
  133. if _, has := committedTxn.conflictKeys[ro]; has {
  134. return true
  135. }
  136. }
  137. }
  138. return false
  139. }
  140. func (o *oracle) newCommitTs(txn *Txn) (uint64, bool) {
  141. o.Lock()
  142. defer o.Unlock()
  143. if o.hasConflict(txn) {
  144. return 0, true
  145. }
  146. var ts uint64
  147. if !o.isManaged {
  148. o.doneRead(txn)
  149. o.cleanupCommittedTransactions()
  150. // This is the general case, when user doesn't specify the read and commit ts.
  151. ts = o.nextTxnTs
  152. o.nextTxnTs++
  153. o.txnMark.Begin(ts)
  154. } else {
  155. // If commitTs is set, use it instead.
  156. ts = txn.commitTs
  157. }
  158. y.AssertTrue(ts >= o.lastCleanupTs)
  159. if o.detectConflicts {
  160. // We should ensure that txns are not added to o.committedTxns slice when
  161. // conflict detection is disabled otherwise this slice would keep growing.
  162. o.committedTxns = append(o.committedTxns, committedTxn{
  163. ts: ts,
  164. conflictKeys: txn.conflictKeys,
  165. })
  166. }
  167. return ts, false
  168. }
  169. func (o *oracle) doneRead(txn *Txn) {
  170. if !txn.doneRead {
  171. txn.doneRead = true
  172. o.readMark.Done(txn.readTs)
  173. }
  174. }
  175. func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock
  176. if !o.detectConflicts {
  177. // When detectConflicts is set to false, we do not store any
  178. // committedTxns and so there's nothing to clean up.
  179. return
  180. }
  181. // Same logic as discardAtOrBelow but unlocked
  182. var maxReadTs uint64
  183. if o.isManaged {
  184. maxReadTs = o.discardTs
  185. } else {
  186. maxReadTs = o.readMark.DoneUntil()
  187. }
  188. y.AssertTrue(maxReadTs >= o.lastCleanupTs)
  189. // do not run clean up if the maxReadTs (read timestamp of the
  190. // oldest transaction that is still in flight) has not increased
  191. if maxReadTs == o.lastCleanupTs {
  192. return
  193. }
  194. o.lastCleanupTs = maxReadTs
  195. tmp := o.committedTxns[:0]
  196. for _, txn := range o.committedTxns {
  197. if txn.ts <= maxReadTs {
  198. continue
  199. }
  200. tmp = append(tmp, txn)
  201. }
  202. o.committedTxns = tmp
  203. }
  204. func (o *oracle) doneCommit(cts uint64) {
  205. if o.isManaged {
  206. // No need to update anything.
  207. return
  208. }
  209. o.txnMark.Done(cts)
  210. }
  211. // Txn represents a Badger transaction.
  212. type Txn struct {
  213. readTs uint64
  214. commitTs uint64
  215. size int64
  216. count int64
  217. db *DB
  218. reads []uint64 // contains fingerprints of keys read.
  219. // contains fingerprints of keys written. This is used for conflict detection.
  220. conflictKeys map[uint64]struct{}
  221. readsLock sync.Mutex // guards the reads slice. See addReadKey.
  222. pendingWrites map[string]*Entry // cache stores any writes done by txn.
  223. duplicateWrites []*Entry // Used in managed mode to store duplicate entries.
  224. numIterators atomic.Int32
  225. discarded bool
  226. doneRead bool
  227. update bool // update is used to conditionally keep track of reads.
  228. }
  229. type pendingWritesIterator struct {
  230. entries []*Entry
  231. nextIdx int
  232. readTs uint64
  233. reversed bool
  234. }
  235. func (pi *pendingWritesIterator) Next() {
  236. pi.nextIdx++
  237. }
  238. func (pi *pendingWritesIterator) Rewind() {
  239. pi.nextIdx = 0
  240. }
  241. func (pi *pendingWritesIterator) Seek(key []byte) {
  242. key = y.ParseKey(key)
  243. pi.nextIdx = sort.Search(len(pi.entries), func(idx int) bool {
  244. cmp := bytes.Compare(pi.entries[idx].Key, key)
  245. if !pi.reversed {
  246. return cmp >= 0
  247. }
  248. return cmp <= 0
  249. })
  250. }
  251. func (pi *pendingWritesIterator) Key() []byte {
  252. y.AssertTrue(pi.Valid())
  253. entry := pi.entries[pi.nextIdx]
  254. return y.KeyWithTs(entry.Key, pi.readTs)
  255. }
  256. func (pi *pendingWritesIterator) Value() y.ValueStruct {
  257. y.AssertTrue(pi.Valid())
  258. entry := pi.entries[pi.nextIdx]
  259. return y.ValueStruct{
  260. Value: entry.Value,
  261. Meta: entry.meta,
  262. UserMeta: entry.UserMeta,
  263. ExpiresAt: entry.ExpiresAt,
  264. Version: pi.readTs,
  265. }
  266. }
  267. func (pi *pendingWritesIterator) Valid() bool {
  268. return pi.nextIdx < len(pi.entries)
  269. }
  270. func (pi *pendingWritesIterator) Close() error {
  271. return nil
  272. }
  273. func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator {
  274. if !txn.update || len(txn.pendingWrites) == 0 {
  275. return nil
  276. }
  277. entries := make([]*Entry, 0, len(txn.pendingWrites))
  278. for _, e := range txn.pendingWrites {
  279. entries = append(entries, e)
  280. }
  281. // Number of pending writes per transaction shouldn't be too big in general.
  282. sort.Slice(entries, func(i, j int) bool {
  283. cmp := bytes.Compare(entries[i].Key, entries[j].Key)
  284. if !reversed {
  285. return cmp < 0
  286. }
  287. return cmp > 0
  288. })
  289. return &pendingWritesIterator{
  290. readTs: txn.readTs,
  291. entries: entries,
  292. reversed: reversed,
  293. }
  294. }
  295. func (txn *Txn) checkSize(e *Entry) error {
  296. count := txn.count + 1
  297. // Extra bytes for the version in key.
  298. size := txn.size + e.estimateSizeAndSetThreshold(txn.db.valueThreshold()) + 10
  299. if count >= txn.db.opt.maxBatchCount || size >= txn.db.opt.maxBatchSize {
  300. return ErrTxnTooBig
  301. }
  302. txn.count, txn.size = count, size
  303. return nil
  304. }
  305. func exceedsSize(prefix string, max int64, key []byte) error {
  306. return errors.Errorf("%s with size %d exceeded %d limit. %s:\n%s",
  307. prefix, len(key), max, prefix, hex.Dump(key[:1<<10]))
  308. }
  309. func (txn *Txn) modify(e *Entry) error {
  310. const maxKeySize = 65000
  311. switch {
  312. case !txn.update:
  313. return ErrReadOnlyTxn
  314. case txn.discarded:
  315. return ErrDiscardedTxn
  316. case len(e.Key) == 0:
  317. return ErrEmptyKey
  318. case bytes.HasPrefix(e.Key, badgerPrefix):
  319. return ErrInvalidKey
  320. case len(e.Key) > maxKeySize:
  321. // Key length can't be more than uint16, as determined by table::header. To
  322. // keep things safe and allow badger move prefix and a timestamp suffix, let's
  323. // cut it down to 65000, instead of using 65536.
  324. return exceedsSize("Key", maxKeySize, e.Key)
  325. case int64(len(e.Value)) > txn.db.opt.ValueLogFileSize:
  326. return exceedsSize("Value", txn.db.opt.ValueLogFileSize, e.Value)
  327. case txn.db.opt.InMemory && int64(len(e.Value)) > txn.db.valueThreshold():
  328. return exceedsSize("Value", txn.db.valueThreshold(), e.Value)
  329. }
  330. if err := txn.db.isBanned(e.Key); err != nil {
  331. return err
  332. }
  333. if err := txn.checkSize(e); err != nil {
  334. return err
  335. }
  336. // The txn.conflictKeys is used for conflict detection. If conflict detection
  337. // is disabled, we don't need to store key hashes in this map.
  338. if txn.db.opt.DetectConflicts {
  339. fp := z.MemHash(e.Key) // Avoid dealing with byte arrays.
  340. txn.conflictKeys[fp] = struct{}{}
  341. }
  342. // If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice.
  343. // Add the entry to duplicateWrites only if both the entries have different versions. For
  344. // same versions, we will overwrite the existing entry.
  345. if oldEntry, ok := txn.pendingWrites[string(e.Key)]; ok && oldEntry.version != e.version {
  346. txn.duplicateWrites = append(txn.duplicateWrites, oldEntry)
  347. }
  348. txn.pendingWrites[string(e.Key)] = e
  349. return nil
  350. }
  351. // Set adds a key-value pair to the database.
  352. // It will return ErrReadOnlyTxn if update flag was set to false when creating the transaction.
  353. //
  354. // The current transaction keeps a reference to the key and val byte slice
  355. // arguments. Users must not modify key and val until the end of the transaction.
  356. func (txn *Txn) Set(key, val []byte) error {
  357. return txn.SetEntry(NewEntry(key, val))
  358. }
  359. // SetEntry takes an Entry struct and adds the key-value pair in the struct,
  360. // along with other metadata to the database.
  361. //
  362. // The current transaction keeps a reference to the entry passed in argument.
  363. // Users must not modify the entry until the end of the transaction.
  364. func (txn *Txn) SetEntry(e *Entry) error {
  365. return txn.modify(e)
  366. }
  367. // Delete deletes a key.
  368. //
  369. // This is done by adding a delete marker for the key at commit timestamp. Any
  370. // reads happening before this timestamp would be unaffected. Any reads after
  371. // this commit would see the deletion.
  372. //
  373. // The current transaction keeps a reference to the key byte slice argument.
  374. // Users must not modify the key until the end of the transaction.
  375. func (txn *Txn) Delete(key []byte) error {
  376. e := &Entry{
  377. Key: key,
  378. meta: bitDelete,
  379. }
  380. return txn.modify(e)
  381. }
  382. // Get looks for key and returns corresponding Item.
  383. // If key is not found, ErrKeyNotFound is returned.
  384. func (txn *Txn) Get(key []byte) (item *Item, rerr error) {
  385. if len(key) == 0 {
  386. return nil, ErrEmptyKey
  387. } else if txn.discarded {
  388. return nil, ErrDiscardedTxn
  389. }
  390. if err := txn.db.isBanned(key); err != nil {
  391. return nil, err
  392. }
  393. item = new(Item)
  394. if txn.update {
  395. if e, has := txn.pendingWrites[string(key)]; has && bytes.Equal(key, e.Key) {
  396. if isDeletedOrExpired(e.meta, e.ExpiresAt) {
  397. return nil, ErrKeyNotFound
  398. }
  399. // Fulfill from cache.
  400. item.meta = e.meta
  401. item.val = e.Value
  402. item.userMeta = e.UserMeta
  403. item.key = key
  404. item.status = prefetched
  405. item.version = txn.readTs
  406. item.expiresAt = e.ExpiresAt
  407. // We probably don't need to set db on item here.
  408. return item, nil
  409. }
  410. // Only track reads if this is update txn. No need to track read if txn serviced it
  411. // internally.
  412. txn.addReadKey(key)
  413. }
  414. seek := y.KeyWithTs(key, txn.readTs)
  415. vs, err := txn.db.get(seek)
  416. if err != nil {
  417. return nil, y.Wrapf(err, "DB::Get key: %q", key)
  418. }
  419. if vs.Value == nil && vs.Meta == 0 {
  420. return nil, ErrKeyNotFound
  421. }
  422. if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
  423. return nil, ErrKeyNotFound
  424. }
  425. item.key = key
  426. item.version = vs.Version
  427. item.meta = vs.Meta
  428. item.userMeta = vs.UserMeta
  429. item.vptr = y.SafeCopy(item.vptr, vs.Value)
  430. item.txn = txn
  431. item.expiresAt = vs.ExpiresAt
  432. return item, nil
  433. }
  434. func (txn *Txn) addReadKey(key []byte) {
  435. if txn.update {
  436. fp := z.MemHash(key)
  437. // Because of the possibility of multiple iterators it is now possible
  438. // for multiple threads within a read-write transaction to read keys at
  439. // the same time. The reads slice is not currently thread-safe and
  440. // needs to be locked whenever we mark a key as read.
  441. txn.readsLock.Lock()
  442. txn.reads = append(txn.reads, fp)
  443. txn.readsLock.Unlock()
  444. }
  445. }
  446. // Discard discards a created transaction. This method is very important and must be called. Commit
  447. // method calls this internally, however, calling this multiple times doesn't cause any issues. So,
  448. // this can safely be called via a defer right when transaction is created.
  449. //
  450. // NOTE: If any operations are run on a discarded transaction, ErrDiscardedTxn is returned.
  451. func (txn *Txn) Discard() {
  452. if txn.discarded { // Avoid a re-run.
  453. return
  454. }
  455. if txn.numIterators.Load() > 0 {
  456. panic("Unclosed iterator at time of Txn.Discard.")
  457. }
  458. txn.discarded = true
  459. if !txn.db.orc.isManaged {
  460. txn.db.orc.doneRead(txn)
  461. }
  462. }
  463. func (txn *Txn) commitAndSend() (func() error, error) {
  464. orc := txn.db.orc
  465. // Ensure that the order in which we get the commit timestamp is the same as
  466. // the order in which we push these updates to the write channel. So, we
  467. // acquire a writeChLock before getting a commit timestamp, and only release
  468. // it after pushing the entries to it.
  469. orc.writeChLock.Lock()
  470. defer orc.writeChLock.Unlock()
  471. commitTs, conflict := orc.newCommitTs(txn)
  472. if conflict {
  473. return nil, ErrConflict
  474. }
  475. keepTogether := true
  476. setVersion := func(e *Entry) {
  477. if e.version == 0 {
  478. e.version = commitTs
  479. } else {
  480. keepTogether = false
  481. }
  482. }
  483. for _, e := range txn.pendingWrites {
  484. setVersion(e)
  485. }
  486. // The duplicateWrites slice will be non-empty only if there are duplicate
  487. // entries with different versions.
  488. for _, e := range txn.duplicateWrites {
  489. setVersion(e)
  490. }
  491. entries := make([]*Entry, 0, len(txn.pendingWrites)+len(txn.duplicateWrites)+1)
  492. processEntry := func(e *Entry) {
  493. // Suffix the keys with commit ts, so the key versions are sorted in
  494. // descending order of commit timestamp.
  495. e.Key = y.KeyWithTs(e.Key, e.version)
  496. // Add bitTxn only if these entries are part of a transaction. We
  497. // support SetEntryAt(..) in managed mode which means a single
  498. // transaction can have entries with different timestamps. If entries
  499. // in a single transaction have different timestamps, we don't add the
  500. // transaction markers.
  501. if keepTogether {
  502. e.meta |= bitTxn
  503. }
  504. entries = append(entries, e)
  505. }
  506. // The following debug information is what led to determining the cause of
  507. // bank txn violation bug, and it took a whole bunch of effort to narrow it
  508. // down to here. So, keep this around for at least a couple of months.
  509. // var b strings.Builder
  510. // fmt.Fprintf(&b, "Read: %d. Commit: %d. reads: %v. writes: %v. Keys: ",
  511. // txn.readTs, commitTs, txn.reads, txn.conflictKeys)
  512. for _, e := range txn.pendingWrites {
  513. processEntry(e)
  514. }
  515. for _, e := range txn.duplicateWrites {
  516. processEntry(e)
  517. }
  518. if keepTogether {
  519. // CommitTs should not be zero if we're inserting transaction markers.
  520. y.AssertTrue(commitTs != 0)
  521. e := &Entry{
  522. Key: y.KeyWithTs(txnKey, commitTs),
  523. Value: []byte(strconv.FormatUint(commitTs, 10)),
  524. meta: bitFinTxn,
  525. }
  526. entries = append(entries, e)
  527. }
  528. req, err := txn.db.sendToWriteCh(entries)
  529. if err != nil {
  530. orc.doneCommit(commitTs)
  531. return nil, err
  532. }
  533. ret := func() error {
  534. err := req.Wait()
  535. // Wait before marking commitTs as done.
  536. // We can't defer doneCommit above, because it is being called from a
  537. // callback here.
  538. orc.doneCommit(commitTs)
  539. return err
  540. }
  541. return ret, nil
  542. }
  543. func (txn *Txn) commitPrecheck() error {
  544. if txn.discarded {
  545. return errors.New("Trying to commit a discarded txn")
  546. }
  547. keepTogether := true
  548. for _, e := range txn.pendingWrites {
  549. if e.version != 0 {
  550. keepTogether = false
  551. }
  552. }
  553. // If keepTogether is True, it implies transaction markers will be added.
  554. // In that case, commitTs should not be never be zero. This might happen if
  555. // someone uses txn.Commit instead of txn.CommitAt in managed mode. This
  556. // should happen only in managed mode. In normal mode, keepTogether will
  557. // always be true.
  558. if keepTogether && txn.db.opt.managedTxns && txn.commitTs == 0 {
  559. return errors.New("CommitTs cannot be zero. Please use commitAt instead")
  560. }
  561. return nil
  562. }
  563. // Commit commits the transaction, following these steps:
  564. //
  565. // 1. If there are no writes, return immediately.
  566. //
  567. // 2. Check if read rows were updated since txn started. If so, return ErrConflict.
  568. //
  569. // 3. If no conflict, generate a commit timestamp and update written rows' commit ts.
  570. //
  571. // 4. Batch up all writes, write them to value log and LSM tree.
  572. //
  573. // 5. If callback is provided, Badger will return immediately after checking
  574. // for conflicts. Writes to the database will happen in the background. If
  575. // there is a conflict, an error will be returned and the callback will not
  576. // run. If there are no conflicts, the callback will be called in the
  577. // background upon successful completion of writes or any error during write.
  578. //
  579. // If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM
  580. // tree won't be updated, so there's no need for any rollback.
  581. func (txn *Txn) Commit() error {
  582. // txn.conflictKeys can be zero if conflict detection is turned off. So we
  583. // should check txn.pendingWrites.
  584. if len(txn.pendingWrites) == 0 {
  585. // Discard the transaction so that the read is marked done.
  586. txn.Discard()
  587. return nil
  588. }
  589. // Precheck before discarding txn.
  590. if err := txn.commitPrecheck(); err != nil {
  591. return err
  592. }
  593. defer txn.Discard()
  594. txnCb, err := txn.commitAndSend()
  595. if err != nil {
  596. return err
  597. }
  598. // If batchSet failed, LSM would not have been updated. So, no need to rollback anything.
  599. // TODO: What if some of the txns successfully make it to value log, but others fail.
  600. // Nothing gets updated to LSM, until a restart happens.
  601. return txnCb()
  602. }
  603. type txnCb struct {
  604. commit func() error
  605. user func(error)
  606. err error
  607. }
  608. func runTxnCallback(cb *txnCb) {
  609. switch {
  610. case cb == nil:
  611. panic("txn callback is nil")
  612. case cb.user == nil:
  613. panic("Must have caught a nil callback for txn.CommitWith")
  614. case cb.err != nil:
  615. cb.user(cb.err)
  616. case cb.commit != nil:
  617. err := cb.commit()
  618. cb.user(err)
  619. default:
  620. cb.user(nil)
  621. }
  622. }
  623. // CommitWith acts like Commit, but takes a callback, which gets run via a
  624. // goroutine to avoid blocking this function. The callback is guaranteed to run,
  625. // so it is safe to increment sync.WaitGroup before calling CommitWith, and
  626. // decrementing it in the callback; to block until all callbacks are run.
  627. func (txn *Txn) CommitWith(cb func(error)) {
  628. if cb == nil {
  629. panic("Nil callback provided to CommitWith")
  630. }
  631. if len(txn.pendingWrites) == 0 {
  632. // Do not run these callbacks from here, because the CommitWith and the
  633. // callback might be acquiring the same locks. Instead run the callback
  634. // from another goroutine.
  635. go runTxnCallback(&txnCb{user: cb, err: nil})
  636. // Discard the transaction so that the read is marked done.
  637. txn.Discard()
  638. return
  639. }
  640. // Precheck before discarding txn.
  641. if err := txn.commitPrecheck(); err != nil {
  642. cb(err)
  643. return
  644. }
  645. defer txn.Discard()
  646. commitCb, err := txn.commitAndSend()
  647. if err != nil {
  648. go runTxnCallback(&txnCb{user: cb, err: err})
  649. return
  650. }
  651. go runTxnCallback(&txnCb{user: cb, commit: commitCb})
  652. }
  653. // ReadTs returns the read timestamp of the transaction.
  654. func (txn *Txn) ReadTs() uint64 {
  655. return txn.readTs
  656. }
  657. // NewTransaction creates a new transaction. Badger supports concurrent execution of transactions,
  658. // providing serializable snapshot isolation, avoiding write skews. Badger achieves this by tracking
  659. // the keys read and at Commit time, ensuring that these read keys weren't concurrently modified by
  660. // another transaction.
  661. //
  662. // For read-only transactions, set update to false. In this mode, we don't track the rows read for
  663. // any changes. Thus, any long running iterations done in this mode wouldn't pay this overhead.
  664. //
  665. // Running transactions concurrently is OK. However, a transaction itself isn't thread safe, and
  666. // should only be run serially. It doesn't matter if a transaction is created by one goroutine and
  667. // passed down to other, as long as the Txn APIs are called serially.
  668. //
  669. // When you create a new transaction, it is absolutely essential to call
  670. // Discard(). This should be done irrespective of what the update param is set
  671. // to. Commit API internally runs Discard, but running it twice wouldn't cause
  672. // any issues.
  673. //
  674. // txn := db.NewTransaction(false)
  675. // defer txn.Discard()
  676. // // Call various APIs.
  677. func (db *DB) NewTransaction(update bool) *Txn {
  678. return db.newTransaction(update, false)
  679. }
  680. func (db *DB) newTransaction(update, isManaged bool) *Txn {
  681. if db.opt.ReadOnly && update {
  682. // DB is read-only, force read-only transaction.
  683. update = false
  684. }
  685. txn := &Txn{
  686. update: update,
  687. db: db,
  688. count: 1, // One extra entry for BitFin.
  689. size: int64(len(txnKey) + 10), // Some buffer for the extra entry.
  690. }
  691. if update {
  692. if db.opt.DetectConflicts {
  693. txn.conflictKeys = make(map[uint64]struct{})
  694. }
  695. txn.pendingWrites = make(map[string]*Entry)
  696. }
  697. if !isManaged {
  698. txn.readTs = db.orc.readTs()
  699. }
  700. return txn
  701. }
  702. // View executes a function creating and managing a read-only transaction for the user. Error
  703. // returned by the function is relayed by the View method.
  704. // If View is used with managed transactions, it would assume a read timestamp of MaxUint64.
  705. func (db *DB) View(fn func(txn *Txn) error) error {
  706. if db.IsClosed() {
  707. return ErrDBClosed
  708. }
  709. var txn *Txn
  710. if db.opt.managedTxns {
  711. txn = db.NewTransactionAt(math.MaxUint64, false)
  712. } else {
  713. txn = db.NewTransaction(false)
  714. }
  715. defer txn.Discard()
  716. return fn(txn)
  717. }
  718. // Update executes a function, creating and managing a read-write transaction
  719. // for the user. Error returned by the function is relayed by the Update method.
  720. // Update cannot be used with managed transactions.
  721. func (db *DB) Update(fn func(txn *Txn) error) error {
  722. if db.IsClosed() {
  723. return ErrDBClosed
  724. }
  725. if db.opt.managedTxns {
  726. panic("Update can only be used with managedDB=false.")
  727. }
  728. txn := db.NewTransaction(true)
  729. defer txn.Discard()
  730. if err := fn(txn); err != nil {
  731. return err
  732. }
  733. return txn.Commit()
  734. }