| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821 |
- /*
- * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
- * SPDX-License-Identifier: Apache-2.0
- */
- package badger
- import (
- "bytes"
- "context"
- "encoding/hex"
- "errors"
- "fmt"
- "math"
- "sort"
- "strconv"
- "sync"
- "sync/atomic"
- "github.com/dgraph-io/badger/v4/y"
- "github.com/dgraph-io/ristretto/v2/z"
- )
- type oracle struct {
- isManaged bool // Does not change value, so no locking required.
- detectConflicts bool // Determines if the txns should be checked for conflicts.
- sync.Mutex // For nextTxnTs and commits.
- // writeChLock lock is for ensuring that transactions go to the write
- // channel in the same order as their commit timestamps.
- writeChLock sync.Mutex
- nextTxnTs uint64
- // Used to block NewTransaction, so all previous commits are visible to a new read.
- txnMark *y.WaterMark
- // Either of these is used to determine which versions can be permanently
- // discarded during compaction.
- discardTs uint64 // Used by ManagedDB.
- readMark *y.WaterMark // Used by DB.
- // committedTxns contains all committed writes (contains fingerprints
- // of keys written and their latest commit counter).
- committedTxns []committedTxn
- lastCleanupTs uint64
- // closer is used to stop watermarks.
- closer *z.Closer
- }
- type committedTxn struct {
- ts uint64
- // ConflictKeys Keeps track of the entries written at timestamp ts.
- conflictKeys map[uint64]struct{}
- }
- func newOracle(opt Options) *oracle {
- orc := &oracle{
- isManaged: opt.managedTxns,
- detectConflicts: opt.DetectConflicts,
- // We're not initializing nextTxnTs and readOnlyTs. It would be done after replay in Open.
- //
- // WaterMarks must be 64-bit aligned for atomic package, hence we must use pointers here.
- // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
- readMark: &y.WaterMark{Name: "badger.PendingReads"},
- txnMark: &y.WaterMark{Name: "badger.TxnTimestamp"},
- closer: z.NewCloser(2),
- }
- orc.readMark.Init(orc.closer)
- orc.txnMark.Init(orc.closer)
- return orc
- }
- func (o *oracle) Stop() {
- o.closer.SignalAndWait()
- }
- func (o *oracle) readTs() uint64 {
- if o.isManaged {
- panic("ReadTs should not be retrieved for managed DB")
- }
- var readTs uint64
- o.Lock()
- readTs = o.nextTxnTs - 1
- o.readMark.Begin(readTs)
- o.Unlock()
- // Wait for all txns which have no conflicts, have been assigned a commit
- // timestamp and are going through the write to value log and LSM tree
- // process. Not waiting here could mean that some txns which have been
- // committed would not be read.
- y.Check(o.txnMark.WaitForMark(context.Background(), readTs))
- return readTs
- }
- func (o *oracle) nextTs() uint64 {
- o.Lock()
- defer o.Unlock()
- return o.nextTxnTs
- }
- func (o *oracle) incrementNextTs() {
- o.Lock()
- defer o.Unlock()
- o.nextTxnTs++
- }
- // Any deleted or invalid versions at or below ts would be discarded during
- // compaction to reclaim disk space in LSM tree and thence value log.
- func (o *oracle) setDiscardTs(ts uint64) {
- o.Lock()
- defer o.Unlock()
- o.discardTs = ts
- o.cleanupCommittedTransactions()
- }
- func (o *oracle) discardAtOrBelow() uint64 {
- if o.isManaged {
- o.Lock()
- defer o.Unlock()
- return o.discardTs
- }
- return o.readMark.DoneUntil()
- }
- // hasConflict must be called while having a lock.
- func (o *oracle) hasConflict(txn *Txn) bool {
- if len(txn.reads) == 0 {
- return false
- }
- for _, committedTxn := range o.committedTxns {
- // If the committedTxn.ts is less than txn.readTs that implies that the
- // committedTxn finished before the current transaction started.
- // We don't need to check for conflict in that case.
- // This change assumes linearizability. Lack of linearizability could
- // cause the read ts of a new txn to be lower than the commit ts of
- // a txn before it (@mrjn).
- if committedTxn.ts <= txn.readTs {
- continue
- }
- for _, ro := range txn.reads {
- if _, has := committedTxn.conflictKeys[ro]; has {
- return true
- }
- }
- }
- return false
- }
- func (o *oracle) newCommitTs(txn *Txn) (uint64, bool) {
- o.Lock()
- defer o.Unlock()
- if o.hasConflict(txn) {
- return 0, true
- }
- var ts uint64
- if !o.isManaged {
- o.doneRead(txn)
- o.cleanupCommittedTransactions()
- // This is the general case, when user doesn't specify the read and commit ts.
- ts = o.nextTxnTs
- o.nextTxnTs++
- o.txnMark.Begin(ts)
- } else {
- // If commitTs is set, use it instead.
- ts = txn.commitTs
- }
- y.AssertTrue(ts >= o.lastCleanupTs)
- if o.detectConflicts {
- // We should ensure that txns are not added to o.committedTxns slice when
- // conflict detection is disabled otherwise this slice would keep growing.
- o.committedTxns = append(o.committedTxns, committedTxn{
- ts: ts,
- conflictKeys: txn.conflictKeys,
- })
- }
- return ts, false
- }
- func (o *oracle) doneRead(txn *Txn) {
- if !txn.doneRead {
- txn.doneRead = true
- o.readMark.Done(txn.readTs)
- }
- }
- func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock
- if !o.detectConflicts {
- // When detectConflicts is set to false, we do not store any
- // committedTxns and so there's nothing to clean up.
- return
- }
- // Same logic as discardAtOrBelow but unlocked
- var maxReadTs uint64
- if o.isManaged {
- maxReadTs = o.discardTs
- } else {
- maxReadTs = o.readMark.DoneUntil()
- }
- y.AssertTrue(maxReadTs >= o.lastCleanupTs)
- // do not run clean up if the maxReadTs (read timestamp of the
- // oldest transaction that is still in flight) has not increased
- if maxReadTs == o.lastCleanupTs {
- return
- }
- o.lastCleanupTs = maxReadTs
- tmp := o.committedTxns[:0]
- for _, txn := range o.committedTxns {
- if txn.ts <= maxReadTs {
- continue
- }
- tmp = append(tmp, txn)
- }
- o.committedTxns = tmp
- }
- func (o *oracle) doneCommit(cts uint64) {
- if o.isManaged {
- // No need to update anything.
- return
- }
- o.txnMark.Done(cts)
- }
- // Txn represents a Badger transaction.
- type Txn struct {
- readTs uint64
- commitTs uint64
- size int64
- count int64
- db *DB
- reads []uint64 // contains fingerprints of keys read.
- // contains fingerprints of keys written. This is used for conflict detection.
- conflictKeys map[uint64]struct{}
- readsLock sync.Mutex // guards the reads slice. See addReadKey.
- pendingWrites map[string]*Entry // cache stores any writes done by txn.
- duplicateWrites []*Entry // Used in managed mode to store duplicate entries.
- numIterators atomic.Int32
- discarded bool
- doneRead bool
- update bool // update is used to conditionally keep track of reads.
- }
- type pendingWritesIterator struct {
- entries []*Entry
- nextIdx int
- readTs uint64
- reversed bool
- }
- func (pi *pendingWritesIterator) Next() {
- pi.nextIdx++
- }
- func (pi *pendingWritesIterator) Rewind() {
- pi.nextIdx = 0
- }
- func (pi *pendingWritesIterator) Seek(key []byte) {
- key = y.ParseKey(key)
- pi.nextIdx = sort.Search(len(pi.entries), func(idx int) bool {
- cmp := bytes.Compare(pi.entries[idx].Key, key)
- if !pi.reversed {
- return cmp >= 0
- }
- return cmp <= 0
- })
- }
- func (pi *pendingWritesIterator) Key() []byte {
- y.AssertTrue(pi.Valid())
- entry := pi.entries[pi.nextIdx]
- return y.KeyWithTs(entry.Key, pi.readTs)
- }
- func (pi *pendingWritesIterator) Value() y.ValueStruct {
- y.AssertTrue(pi.Valid())
- entry := pi.entries[pi.nextIdx]
- return y.ValueStruct{
- Value: entry.Value,
- Meta: entry.meta,
- UserMeta: entry.UserMeta,
- ExpiresAt: entry.ExpiresAt,
- Version: pi.readTs,
- }
- }
- func (pi *pendingWritesIterator) Valid() bool {
- return pi.nextIdx < len(pi.entries)
- }
- func (pi *pendingWritesIterator) Close() error {
- return nil
- }
- func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator {
- if !txn.update || len(txn.pendingWrites) == 0 {
- return nil
- }
- entries := make([]*Entry, 0, len(txn.pendingWrites))
- for _, e := range txn.pendingWrites {
- entries = append(entries, e)
- }
- // Number of pending writes per transaction shouldn't be too big in general.
- sort.Slice(entries, func(i, j int) bool {
- cmp := bytes.Compare(entries[i].Key, entries[j].Key)
- if !reversed {
- return cmp < 0
- }
- return cmp > 0
- })
- return &pendingWritesIterator{
- readTs: txn.readTs,
- entries: entries,
- reversed: reversed,
- }
- }
- func (txn *Txn) checkSize(e *Entry) error {
- count := txn.count + 1
- // Extra bytes for the version in key.
- size := txn.size + e.estimateSizeAndSetThreshold(txn.db.valueThreshold()) + 10
- if count >= txn.db.opt.maxBatchCount || size >= txn.db.opt.maxBatchSize {
- return ErrTxnTooBig
- }
- txn.count, txn.size = count, size
- return nil
- }
- func exceedsSize(prefix string, max int64, key []byte) error {
- return fmt.Errorf("%s with size %d exceeded %d limit. %s:\n%s",
- prefix, len(key), max, prefix, hex.Dump(key[:1<<10]))
- }
- func (txn *Txn) modify(e *Entry) error {
- const maxKeySize = 65000
- switch {
- case !txn.update:
- return ErrReadOnlyTxn
- case txn.discarded:
- return ErrDiscardedTxn
- case len(e.Key) == 0:
- return ErrEmptyKey
- case bytes.HasPrefix(e.Key, badgerPrefix):
- return ErrInvalidKey
- case len(e.Key) > maxKeySize:
- // Key length can't be more than uint16, as determined by table::header. To
- // keep things safe and allow badger move prefix and a timestamp suffix, let's
- // cut it down to 65000, instead of using 65536.
- return exceedsSize("Key", maxKeySize, e.Key)
- case int64(len(e.Value)) > txn.db.opt.ValueLogFileSize:
- return exceedsSize("Value", txn.db.opt.ValueLogFileSize, e.Value)
- case txn.db.opt.InMemory && int64(len(e.Value)) > txn.db.valueThreshold():
- return exceedsSize("Value", txn.db.valueThreshold(), e.Value)
- }
- if err := txn.db.isBanned(e.Key); err != nil {
- return err
- }
- if err := txn.checkSize(e); err != nil {
- return err
- }
- // The txn.conflictKeys is used for conflict detection. If conflict detection
- // is disabled, we don't need to store key hashes in this map.
- if txn.db.opt.DetectConflicts {
- fp := z.MemHash(e.Key) // Avoid dealing with byte arrays.
- txn.conflictKeys[fp] = struct{}{}
- }
- // If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice.
- // Add the entry to duplicateWrites only if both the entries have different versions. For
- // same versions, we will overwrite the existing entry.
- if oldEntry, ok := txn.pendingWrites[string(e.Key)]; ok && oldEntry.version != e.version {
- txn.duplicateWrites = append(txn.duplicateWrites, oldEntry)
- }
- txn.pendingWrites[string(e.Key)] = e
- return nil
- }
- // Set adds a key-value pair to the database.
- // It will return ErrReadOnlyTxn if update flag was set to false when creating the transaction.
- //
- // The current transaction keeps a reference to the key and val byte slice
- // arguments. Users must not modify key and val until the end of the transaction.
- func (txn *Txn) Set(key, val []byte) error {
- return txn.SetEntry(NewEntry(key, val))
- }
- // SetEntry takes an Entry struct and adds the key-value pair in the struct,
- // along with other metadata to the database.
- //
- // The current transaction keeps a reference to the entry passed in argument.
- // Users must not modify the entry until the end of the transaction.
- func (txn *Txn) SetEntry(e *Entry) error {
- return txn.modify(e)
- }
- // Delete deletes a key.
- //
- // This is done by adding a delete marker for the key at commit timestamp. Any
- // reads happening before this timestamp would be unaffected. Any reads after
- // this commit would see the deletion.
- //
- // The current transaction keeps a reference to the key byte slice argument.
- // Users must not modify the key until the end of the transaction.
- func (txn *Txn) Delete(key []byte) error {
- e := &Entry{
- Key: key,
- meta: bitDelete,
- }
- return txn.modify(e)
- }
- // Get looks for key and returns corresponding Item.
- // If key is not found, ErrKeyNotFound is returned.
- func (txn *Txn) Get(key []byte) (item *Item, rerr error) {
- if len(key) == 0 {
- return nil, ErrEmptyKey
- } else if txn.discarded {
- return nil, ErrDiscardedTxn
- }
- if err := txn.db.isBanned(key); err != nil {
- return nil, err
- }
- item = new(Item)
- if txn.update {
- if e, has := txn.pendingWrites[string(key)]; has && bytes.Equal(key, e.Key) {
- if isDeletedOrExpired(e.meta, e.ExpiresAt) {
- return nil, ErrKeyNotFound
- }
- // Fulfill from cache.
- item.meta = e.meta
- item.val = e.Value
- item.userMeta = e.UserMeta
- item.key = key
- item.status = prefetched
- item.version = txn.readTs
- item.expiresAt = e.ExpiresAt
- // We probably don't need to set db on item here.
- return item, nil
- }
- // Only track reads if this is update txn. No need to track read if txn serviced it
- // internally.
- txn.addReadKey(key)
- }
- seek := y.KeyWithTs(key, txn.readTs)
- vs, err := txn.db.get(seek)
- if err != nil {
- return nil, y.Wrapf(err, "DB::Get key: %q", key)
- }
- if vs.Value == nil && vs.Meta == 0 {
- return nil, ErrKeyNotFound
- }
- if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
- return nil, ErrKeyNotFound
- }
- item.key = key
- item.version = vs.Version
- item.meta = vs.Meta
- item.userMeta = vs.UserMeta
- item.vptr = y.SafeCopy(item.vptr, vs.Value)
- item.txn = txn
- item.expiresAt = vs.ExpiresAt
- return item, nil
- }
- func (txn *Txn) addReadKey(key []byte) {
- if txn.update {
- fp := z.MemHash(key)
- // Because of the possibility of multiple iterators it is now possible
- // for multiple threads within a read-write transaction to read keys at
- // the same time. The reads slice is not currently thread-safe and
- // needs to be locked whenever we mark a key as read.
- txn.readsLock.Lock()
- txn.reads = append(txn.reads, fp)
- txn.readsLock.Unlock()
- }
- }
- // Discard discards a created transaction. This method is very important and must be called. Commit
- // method calls this internally, however, calling this multiple times doesn't cause any issues. So,
- // this can safely be called via a defer right when transaction is created.
- //
- // NOTE: If any operations are run on a discarded transaction, ErrDiscardedTxn is returned.
- func (txn *Txn) Discard() {
- if txn.discarded { // Avoid a re-run.
- return
- }
- if txn.numIterators.Load() > 0 {
- panic("Unclosed iterator at time of Txn.Discard.")
- }
- txn.discarded = true
- if !txn.db.orc.isManaged {
- txn.db.orc.doneRead(txn)
- }
- }
- func (txn *Txn) commitAndSend() (func() error, error) {
- orc := txn.db.orc
- // Ensure that the order in which we get the commit timestamp is the same as
- // the order in which we push these updates to the write channel. So, we
- // acquire a writeChLock before getting a commit timestamp, and only release
- // it after pushing the entries to it.
- orc.writeChLock.Lock()
- defer orc.writeChLock.Unlock()
- commitTs, conflict := orc.newCommitTs(txn)
- if conflict {
- return nil, ErrConflict
- }
- keepTogether := true
- setVersion := func(e *Entry) {
- if e.version == 0 {
- e.version = commitTs
- } else {
- keepTogether = false
- }
- }
- for _, e := range txn.pendingWrites {
- setVersion(e)
- }
- // The duplicateWrites slice will be non-empty only if there are duplicate
- // entries with different versions.
- for _, e := range txn.duplicateWrites {
- setVersion(e)
- }
- entries := make([]*Entry, 0, len(txn.pendingWrites)+len(txn.duplicateWrites)+1)
- processEntry := func(e *Entry) {
- // Suffix the keys with commit ts, so the key versions are sorted in
- // descending order of commit timestamp.
- e.Key = y.KeyWithTs(e.Key, e.version)
- // Add bitTxn only if these entries are part of a transaction. We
- // support SetEntryAt(..) in managed mode which means a single
- // transaction can have entries with different timestamps. If entries
- // in a single transaction have different timestamps, we don't add the
- // transaction markers.
- if keepTogether {
- e.meta |= bitTxn
- }
- entries = append(entries, e)
- }
- // The following debug information is what led to determining the cause of
- // bank txn violation bug, and it took a whole bunch of effort to narrow it
- // down to here. So, keep this around for at least a couple of months.
- // var b strings.Builder
- // fmt.Fprintf(&b, "Read: %d. Commit: %d. reads: %v. writes: %v. Keys: ",
- // txn.readTs, commitTs, txn.reads, txn.conflictKeys)
- for _, e := range txn.pendingWrites {
- processEntry(e)
- }
- for _, e := range txn.duplicateWrites {
- processEntry(e)
- }
- if keepTogether {
- // CommitTs should not be zero if we're inserting transaction markers.
- y.AssertTrue(commitTs != 0)
- e := &Entry{
- Key: y.KeyWithTs(txnKey, commitTs),
- Value: []byte(strconv.FormatUint(commitTs, 10)),
- meta: bitFinTxn,
- }
- entries = append(entries, e)
- }
- req, err := txn.db.sendToWriteCh(entries)
- if err != nil {
- orc.doneCommit(commitTs)
- return nil, err
- }
- ret := func() error {
- err := req.Wait()
- // Wait before marking commitTs as done.
- // We can't defer doneCommit above, because it is being called from a
- // callback here.
- orc.doneCommit(commitTs)
- return err
- }
- return ret, nil
- }
- func (txn *Txn) commitPrecheck() error {
- if txn.discarded {
- return errors.New("Trying to commit a discarded txn")
- }
- keepTogether := true
- for _, e := range txn.pendingWrites {
- if e.version != 0 {
- keepTogether = false
- }
- }
- // If keepTogether is True, it implies transaction markers will be added.
- // In that case, commitTs should not be never be zero. This might happen if
- // someone uses txn.Commit instead of txn.CommitAt in managed mode. This
- // should happen only in managed mode. In normal mode, keepTogether will
- // always be true.
- if keepTogether && txn.db.opt.managedTxns && txn.commitTs == 0 {
- return errors.New("CommitTs cannot be zero. Please use commitAt instead")
- }
- return nil
- }
- // Commit commits the transaction, following these steps:
- //
- // 1. If there are no writes, return immediately.
- //
- // 2. Check if read rows were updated since txn started. If so, return ErrConflict.
- //
- // 3. If no conflict, generate a commit timestamp and update written rows' commit ts.
- //
- // 4. Batch up all writes, write them to value log and LSM tree.
- //
- // 5. If callback is provided, Badger will return immediately after checking
- // for conflicts. Writes to the database will happen in the background. If
- // there is a conflict, an error will be returned and the callback will not
- // run. If there are no conflicts, the callback will be called in the
- // background upon successful completion of writes or any error during write.
- //
- // If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM
- // tree won't be updated, so there's no need for any rollback.
- func (txn *Txn) Commit() error {
- // txn.conflictKeys can be zero if conflict detection is turned off. So we
- // should check txn.pendingWrites.
- if len(txn.pendingWrites) == 0 {
- // Discard the transaction so that the read is marked done.
- txn.Discard()
- return nil
- }
- // Precheck before discarding txn.
- if err := txn.commitPrecheck(); err != nil {
- return err
- }
- defer txn.Discard()
- txnCb, err := txn.commitAndSend()
- if err != nil {
- return err
- }
- // If batchSet failed, LSM would not have been updated. So, no need to rollback anything.
- // TODO: What if some of the txns successfully make it to value log, but others fail.
- // Nothing gets updated to LSM, until a restart happens.
- return txnCb()
- }
- type txnCb struct {
- commit func() error
- user func(error)
- err error
- }
- func runTxnCallback(cb *txnCb) {
- switch {
- case cb == nil:
- panic("txn callback is nil")
- case cb.user == nil:
- panic("Must have caught a nil callback for txn.CommitWith")
- case cb.err != nil:
- cb.user(cb.err)
- case cb.commit != nil:
- err := cb.commit()
- cb.user(err)
- default:
- cb.user(nil)
- }
- }
- // CommitWith acts like Commit, but takes a callback, which gets run via a
- // goroutine to avoid blocking this function. The callback is guaranteed to run,
- // so it is safe to increment sync.WaitGroup before calling CommitWith, and
- // decrementing it in the callback; to block until all callbacks are run.
- func (txn *Txn) CommitWith(cb func(error)) {
- if cb == nil {
- panic("Nil callback provided to CommitWith")
- }
- if len(txn.pendingWrites) == 0 {
- // Do not run these callbacks from here, because the CommitWith and the
- // callback might be acquiring the same locks. Instead run the callback
- // from another goroutine.
- go runTxnCallback(&txnCb{user: cb, err: nil})
- // Discard the transaction so that the read is marked done.
- txn.Discard()
- return
- }
- // Precheck before discarding txn.
- if err := txn.commitPrecheck(); err != nil {
- cb(err)
- return
- }
- defer txn.Discard()
- commitCb, err := txn.commitAndSend()
- if err != nil {
- go runTxnCallback(&txnCb{user: cb, err: err})
- return
- }
- go runTxnCallback(&txnCb{user: cb, commit: commitCb})
- }
- // ReadTs returns the read timestamp of the transaction.
- func (txn *Txn) ReadTs() uint64 {
- return txn.readTs
- }
- // NewTransaction creates a new transaction. Badger supports concurrent execution of transactions,
- // providing serializable snapshot isolation, avoiding write skews. Badger achieves this by tracking
- // the keys read and at Commit time, ensuring that these read keys weren't concurrently modified by
- // another transaction.
- //
- // For read-only transactions, set update to false. In this mode, we don't track the rows read for
- // any changes. Thus, any long running iterations done in this mode wouldn't pay this overhead.
- //
- // Running transactions concurrently is OK. However, a transaction itself isn't thread safe, and
- // should only be run serially. It doesn't matter if a transaction is created by one goroutine and
- // passed down to other, as long as the Txn APIs are called serially.
- //
- // When you create a new transaction, it is absolutely essential to call
- // Discard(). This should be done irrespective of what the update param is set
- // to. Commit API internally runs Discard, but running it twice wouldn't cause
- // any issues.
- //
- // txn := db.NewTransaction(false)
- // defer txn.Discard()
- // // Call various APIs.
- func (db *DB) NewTransaction(update bool) *Txn {
- return db.newTransaction(update, false)
- }
- func (db *DB) newTransaction(update, isManaged bool) *Txn {
- if db.opt.ReadOnly && update {
- // DB is read-only, force read-only transaction.
- update = false
- }
- txn := &Txn{
- update: update,
- db: db,
- count: 1, // One extra entry for BitFin.
- size: int64(len(txnKey) + 10), // Some buffer for the extra entry.
- }
- if update {
- if db.opt.DetectConflicts {
- txn.conflictKeys = make(map[uint64]struct{})
- }
- txn.pendingWrites = make(map[string]*Entry)
- }
- if !isManaged {
- txn.readTs = db.orc.readTs()
- }
- return txn
- }
- // View executes a function creating and managing a read-only transaction for the user. Error
- // returned by the function is relayed by the View method.
- // If View is used with managed transactions, it would assume a read timestamp of MaxUint64.
- func (db *DB) View(fn func(txn *Txn) error) error {
- if db.IsClosed() {
- return ErrDBClosed
- }
- var txn *Txn
- if db.opt.managedTxns {
- txn = db.NewTransactionAt(math.MaxUint64, false)
- } else {
- txn = db.NewTransaction(false)
- }
- defer txn.Discard()
- return fn(txn)
- }
- // Update executes a function, creating and managing a read-write transaction
- // for the user. Error returned by the function is relayed by the Update method.
- // Update cannot be used with managed transactions.
- func (db *DB) Update(fn func(txn *Txn) error) error {
- if db.IsClosed() {
- return ErrDBClosed
- }
- if db.opt.managedTxns {
- panic("Update can only be used with managedDB=false.")
- }
- txn := db.NewTransaction(true)
- defer txn.Discard()
- if err := fn(txn); err != nil {
- return err
- }
- return txn.Commit()
- }
|