transaction.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. //go:build wasm
  2. // +build wasm
  3. package indexeddb
  4. import (
  5. "context"
  6. "sort"
  7. "sync"
  8. "sync/atomic"
  9. "github.com/hack-pad/go-indexeddb/idb"
  10. "github.com/hack-pad/hackpadfs"
  11. "github.com/hack-pad/hackpadfs/keyvalue"
  12. "github.com/hack-pad/hackpadfs/keyvalue/blob"
  13. )
  14. type transaction struct {
  15. ctx context.Context
  16. abort context.CancelFunc
  17. store *store
  18. txn *idb.Transaction
  19. results map[keyvalue.OpID]keyvalue.OpResult
  20. pendingResults []func()
  21. nextOp keyvalue.OpID
  22. resultsMu sync.Mutex
  23. }
  24. func (t *transaction) newOp() keyvalue.OpID {
  25. nextOp := atomic.AddInt64((*int64)(&t.nextOp), 1)
  26. return keyvalue.OpID(nextOp - 1)
  27. }
  28. func (t *transaction) setResult(op keyvalue.OpID, result keyvalue.OpResult) {
  29. t.resultsMu.Lock()
  30. t.results[op] = result
  31. t.resultsMu.Unlock()
  32. }
  33. func (t *transaction) setPendingResult(op keyvalue.OpID, req *getFileRequest) {
  34. t.resultsMu.Lock()
  35. t.pendingResults = append(t.pendingResults, func() {
  36. result := keyvalue.OpResult{Op: op}
  37. if req != nil {
  38. result.Record, result.Err = req.Result()
  39. }
  40. t.setResult(op, result)
  41. })
  42. t.resultsMu.Unlock()
  43. }
  44. func (t *transaction) setPendingValidateErr(op keyvalue.OpID, parentExistsReq *parentDirExistsReq) {
  45. if parentExistsReq == nil {
  46. return
  47. }
  48. t.resultsMu.Lock()
  49. t.pendingResults = append(t.pendingResults, func() {
  50. err := parentExistsReq.Err()
  51. t.setResult(op, keyvalue.OpResult{Op: op, Err: err})
  52. })
  53. t.resultsMu.Unlock()
  54. }
  55. func (t *transaction) Get(path string) (op keyvalue.OpID) {
  56. op = t.newOp()
  57. infos, err := t.txn.ObjectStore(infoStore)
  58. if err != nil {
  59. t.setResult(op, keyvalue.OpResult{Op: op, Err: err})
  60. return
  61. }
  62. req, err := t.store.getFile(infos, path)
  63. if err != nil {
  64. t.setResult(op, keyvalue.OpResult{Op: op, Err: err})
  65. return
  66. }
  67. t.setPendingResult(op, req)
  68. return
  69. }
  70. func (t *transaction) GetHandler(path string, handler keyvalue.OpHandler) (op keyvalue.OpID) {
  71. op = t.newOp()
  72. infos, err := t.txn.ObjectStore(infoStore)
  73. if err != nil {
  74. t.setResult(op, keyvalue.OpResult{Op: op, Err: err})
  75. return
  76. }
  77. req, err := t.store.getFile(infos, path)
  78. if err != nil {
  79. t.setResult(op, keyvalue.OpResult{Op: op, Err: err})
  80. return
  81. }
  82. listenErr := req.Listen(t.ctx, func() {
  83. record, err := req.Result()
  84. result := keyvalue.OpResult{
  85. Op: op,
  86. Record: record,
  87. Err: err,
  88. }
  89. if err := handler.Handle(t, result); err != nil {
  90. result.Err = err
  91. }
  92. t.setResult(op, result)
  93. }, func() {
  94. err := req.Err()
  95. t.setResult(op, keyvalue.OpResult{Op: op, Err: err})
  96. })
  97. if listenErr != nil {
  98. t.setResult(op, keyvalue.OpResult{Op: op, Err: listenErr})
  99. return
  100. }
  101. return
  102. }
  103. func (t *transaction) Set(name string, record keyvalue.FileRecord, contents blob.Blob) (op keyvalue.OpID) {
  104. op = t.newOp()
  105. _, err := t.set(op, name, record, contents)
  106. if err != nil {
  107. t.setResult(op, keyvalue.OpResult{Op: op, Err: err})
  108. return
  109. }
  110. return
  111. }
  112. func (t *transaction) set(op keyvalue.OpID, name string, record keyvalue.FileRecord, data blob.Blob) (*idb.Request, error) {
  113. infos, err := t.txn.ObjectStore(infoStore)
  114. if err != nil {
  115. return nil, err
  116. }
  117. contents, err := t.txn.ObjectStore(contentsStore)
  118. if err != nil {
  119. return nil, err
  120. }
  121. t.setResult(op, keyvalue.OpResult{Op: op}) // Ensure an op is recorded. A later result can overwrite it.
  122. if record == nil {
  123. if name == rootPath {
  124. return nil, hackpadfs.ErrNotImplemented // cannot delete root dir
  125. }
  126. req, err := deleteRecord(infos, contents, name)
  127. if err != nil {
  128. return nil, err
  129. }
  130. return req.Request, nil
  131. }
  132. if data != nil {
  133. // set file contents
  134. err := setFileContents(contents, name, data)
  135. if err != nil {
  136. return nil, err
  137. }
  138. }
  139. // always set metadata to update size when contents change
  140. req, parentExistsReq, err := validateAndSetFileMeta(t.ctx, infos, name, record, data)
  141. if err != nil {
  142. return nil, err
  143. }
  144. t.setPendingValidateErr(op, parentExistsReq)
  145. return req, nil
  146. }
  147. func (t *transaction) SetHandler(name string, record keyvalue.FileRecord, data blob.Blob, handler keyvalue.OpHandler) (op keyvalue.OpID) {
  148. op = t.newOp()
  149. req, err := t.set(op, name, record, data)
  150. if err != nil {
  151. t.setResult(op, keyvalue.OpResult{Op: op, Err: err})
  152. return
  153. }
  154. listenErr := req.Listen(t.ctx, func() {
  155. result := keyvalue.OpResult{Op: op, Err: req.Err()}
  156. if err := handler.Handle(t, result); err != nil {
  157. result.Err = err
  158. }
  159. t.setResult(op, result)
  160. }, func() {
  161. t.setResult(op, keyvalue.OpResult{Op: op, Err: req.Err()})
  162. })
  163. if listenErr != nil {
  164. t.setResult(op, keyvalue.OpResult{Op: op, Err: listenErr})
  165. return
  166. }
  167. return
  168. }
  169. func (t *transaction) Commit(ctx context.Context) ([]keyvalue.OpResult, error) {
  170. awaitErr := t.txn.Await(ctx)
  171. t.abort()
  172. for _, fn := range t.pendingResults {
  173. fn()
  174. }
  175. t.resultsMu.Lock()
  176. results := make([]keyvalue.OpResult, 0, len(t.results))
  177. for _, result := range t.results {
  178. results = append(results, result)
  179. }
  180. t.resultsMu.Unlock()
  181. sort.Slice(results, func(a, b int) bool {
  182. return results[a].Op < results[b].Op
  183. })
  184. return results, awaitErr
  185. }
  186. func (t *transaction) Abort() error {
  187. return t.txn.Abort()
  188. }