txn_store.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package keyvalue
  2. import (
  3. "context"
  4. "sync"
  5. "sync/atomic"
  6. "github.com/hack-pad/hackpadfs/keyvalue/blob"
  7. )
  8. // TransactionStore is a Store that can create a Transaction.
  9. type TransactionStore interface {
  10. Store
  11. Transaction(options TransactionOptions) (Transaction, error)
  12. }
  13. // TransactionMode is the kind of transaction mode, i.e. read-only or read-write
  14. type TransactionMode int
  15. // Transaction modes
  16. const (
  17. TransactionReadOnly TransactionMode = iota
  18. TransactionReadWrite
  19. )
  20. // TransactionOptions contain options used to construct a Transaction from a Store
  21. type TransactionOptions struct {
  22. Mode TransactionMode
  23. }
  24. // OpID is a unique ID within the transaction that generated it. It's used to correlate which Get/Set operation produced which result.
  25. type OpID int64
  26. // OpResult is returned from Transaction.Commit(), representing an operation's result with any data or error it produced.
  27. type OpResult struct {
  28. Record FileRecord
  29. Err error
  30. Op OpID
  31. }
  32. // OpHandler processes 'result' during the commit process of 'txn'.
  33. // If the transaction should not proceed, the handler should call txn.Abort().
  34. type OpHandler interface {
  35. Handle(txn Transaction, result OpResult) error
  36. }
  37. // OpHandlerFunc is a convenient func wrapper for implementing OpHandler
  38. type OpHandlerFunc func(txn Transaction, result OpResult) error
  39. // Handle implements OpHandler
  40. func (o OpHandlerFunc) Handle(txn Transaction, result OpResult) error {
  41. return o(txn, result)
  42. }
  43. // Transaction behaves like a Store but only returns results after running Commit().
  44. // GetHandler and SetHandler can be used to interrupt transaction processing and handle the response,
  45. // permitting an opportunity to Abort() or perform more operations.
  46. type Transaction interface {
  47. Get(path string) OpID
  48. GetHandler(path string, handler OpHandler) OpID
  49. Set(path string, src FileRecord, contents blob.Blob) OpID
  50. SetHandler(path string, src FileRecord, contents blob.Blob, handler OpHandler) OpID
  51. Commit(ctx context.Context) ([]OpResult, error)
  52. Abort() error
  53. }
  54. type unsafeSerialTransaction struct {
  55. ctx context.Context
  56. abort context.CancelFunc
  57. store Store
  58. results map[OpID]OpResult
  59. resultsMu sync.Mutex
  60. nextOp OpID
  61. }
  62. // TransactionOrSerial attempts to produce a Transaction from 'store'.
  63. // If unsupported, returns an unsafe transaction instead, which runs each action serially without transactional safety.
  64. //
  65. // This is used in FS to attempt transactions whenever possible.
  66. // Since some Stores don't need transactions, they aren't required to implement TransactionStore.
  67. func TransactionOrSerial(store Store, options TransactionOptions) (Transaction, error) {
  68. if store, ok := store.(TransactionStore); ok {
  69. return store.Transaction(options)
  70. }
  71. ctx, cancel := context.WithCancel(context.Background())
  72. return &unsafeSerialTransaction{
  73. ctx: ctx,
  74. abort: cancel,
  75. store: store,
  76. results: make(map[OpID]OpResult),
  77. }, nil
  78. }
  79. func (u *unsafeSerialTransaction) newOp() OpID {
  80. nextOp := atomic.AddInt64((*int64)(&u.nextOp), 1)
  81. return OpID(nextOp - 1)
  82. }
  83. func (u *unsafeSerialTransaction) setResult(op OpID, result OpResult) {
  84. u.resultsMu.Lock()
  85. u.results[op] = result
  86. u.resultsMu.Unlock()
  87. }
  88. func abortErr(ctx, extraCtx context.Context) error {
  89. if extraCtx == nil {
  90. extraCtx = context.Background()
  91. }
  92. select {
  93. case <-extraCtx.Done():
  94. return extraCtx.Err()
  95. case <-ctx.Done():
  96. return ctx.Err()
  97. default:
  98. return nil
  99. }
  100. }
  101. func (u *unsafeSerialTransaction) Get(path string) OpID {
  102. return u.GetHandler(path, OpHandlerFunc(func(_ Transaction, _ OpResult) error {
  103. return nil
  104. }))
  105. }
  106. func (u *unsafeSerialTransaction) GetHandler(path string, handler OpHandler) OpID {
  107. op := u.newOp()
  108. if err := abortErr(u.ctx, nil); err != nil {
  109. u.setResult(op, OpResult{Op: op, Err: err})
  110. return op
  111. }
  112. record, err := u.store.Get(u.ctx, path)
  113. result := OpResult{Op: op, Record: record, Err: err}
  114. err = handler.Handle(u, result)
  115. if result.Err == nil && err != nil {
  116. result.Err = err
  117. }
  118. u.setResult(op, result)
  119. return op
  120. }
  121. func (u *unsafeSerialTransaction) Set(path string, src FileRecord, contents blob.Blob) OpID {
  122. return u.SetHandler(path, src, contents, OpHandlerFunc(func(_ Transaction, _ OpResult) error {
  123. return nil
  124. }))
  125. }
  126. func (u *unsafeSerialTransaction) SetHandler(path string, src FileRecord, _ blob.Blob, handler OpHandler) OpID {
  127. op := u.newOp()
  128. if err := abortErr(u.ctx, nil); err != nil {
  129. u.setResult(op, OpResult{Op: op, Err: err})
  130. return op
  131. }
  132. err := u.store.Set(u.ctx, path, src)
  133. result := OpResult{Op: op, Err: err}
  134. err = handler.Handle(u, result)
  135. if result.Err == nil && err != nil {
  136. result.Err = err
  137. }
  138. u.setResult(op, result)
  139. return op
  140. }
  141. func (u *unsafeSerialTransaction) Commit(ctx context.Context) ([]OpResult, error) {
  142. if err := abortErr(u.ctx, ctx); err != nil {
  143. return nil, err
  144. }
  145. u.abort()
  146. opCount := atomic.LoadInt64((*int64)(&u.nextOp))
  147. results := make([]OpResult, opCount)
  148. u.resultsMu.Lock()
  149. for op, result := range u.results {
  150. results[op] = result
  151. }
  152. u.resultsMu.Unlock()
  153. return results, nil
  154. }
  155. func (u *unsafeSerialTransaction) Abort() error {
  156. u.abort()
  157. return nil
  158. }