memtable.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628
  1. /*
  2. * Copyright 2020 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. "bufio"
  19. "bytes"
  20. "crypto/aes"
  21. cryptorand "crypto/rand"
  22. "encoding/binary"
  23. "fmt"
  24. "hash/crc32"
  25. "io"
  26. "os"
  27. "path/filepath"
  28. "sort"
  29. "strconv"
  30. "strings"
  31. "sync"
  32. "sync/atomic"
  33. "github.com/pkg/errors"
  34. "github.com/dgraph-io/badger/v4/pb"
  35. "github.com/dgraph-io/badger/v4/skl"
  36. "github.com/dgraph-io/badger/v4/y"
  37. "github.com/dgraph-io/ristretto/v2/z"
  38. )
  39. // memTable structure stores a skiplist and a corresponding WAL. Writes to memTable are written
  40. // both to the WAL and the skiplist. On a crash, the WAL is replayed to bring the skiplist back to
  41. // its pre-crash form.
  42. type memTable struct {
  43. // TODO: Give skiplist z.Calloc'd []byte.
  44. sl *skl.Skiplist
  45. wal *logFile
  46. maxVersion uint64
  47. opt Options
  48. buf *bytes.Buffer
  49. }
  50. func (db *DB) openMemTables(opt Options) error {
  51. // We don't need to open any tables in in-memory mode.
  52. if db.opt.InMemory {
  53. return nil
  54. }
  55. files, err := os.ReadDir(db.opt.Dir)
  56. if err != nil {
  57. return errFile(err, db.opt.Dir, "Unable to open mem dir.")
  58. }
  59. var fids []int
  60. for _, file := range files {
  61. if !strings.HasSuffix(file.Name(), memFileExt) {
  62. continue
  63. }
  64. fsz := len(file.Name())
  65. fid, err := strconv.ParseInt(file.Name()[:fsz-len(memFileExt)], 10, 64)
  66. if err != nil {
  67. return errFile(err, file.Name(), "Unable to parse log id.")
  68. }
  69. fids = append(fids, int(fid))
  70. }
  71. // Sort in ascending order.
  72. sort.Slice(fids, func(i, j int) bool {
  73. return fids[i] < fids[j]
  74. })
  75. for _, fid := range fids {
  76. flags := os.O_RDWR
  77. if db.opt.ReadOnly {
  78. flags = os.O_RDONLY
  79. }
  80. mt, err := db.openMemTable(fid, flags)
  81. if err != nil {
  82. return y.Wrapf(err, "while opening fid: %d", fid)
  83. }
  84. // If this memtable is empty we don't need to add it. This is a
  85. // memtable that was completely truncated.
  86. if mt.sl.Empty() {
  87. mt.DecrRef()
  88. continue
  89. }
  90. // These should no longer be written to. So, make them part of the imm.
  91. db.imm = append(db.imm, mt)
  92. }
  93. if len(fids) != 0 {
  94. db.nextMemFid = fids[len(fids)-1]
  95. }
  96. db.nextMemFid++
  97. return nil
  98. }
  99. const memFileExt string = ".mem"
  100. func (db *DB) openMemTable(fid, flags int) (*memTable, error) {
  101. filepath := db.mtFilePath(fid)
  102. s := skl.NewSkiplist(arenaSize(db.opt))
  103. mt := &memTable{
  104. sl: s,
  105. opt: db.opt,
  106. buf: &bytes.Buffer{},
  107. }
  108. // We don't need to create the wal for the skiplist in in-memory mode so return the mt.
  109. if db.opt.InMemory {
  110. return mt, z.NewFile
  111. }
  112. mt.wal = &logFile{
  113. fid: uint32(fid),
  114. path: filepath,
  115. registry: db.registry,
  116. writeAt: vlogHeaderSize,
  117. opt: db.opt,
  118. }
  119. lerr := mt.wal.open(filepath, flags, 2*db.opt.MemTableSize)
  120. if lerr != z.NewFile && lerr != nil {
  121. return nil, y.Wrapf(lerr, "While opening memtable: %s", filepath)
  122. }
  123. // Have a callback set to delete WAL when skiplist reference count goes down to zero. That is,
  124. // when it gets flushed to L0.
  125. s.OnClose = func() {
  126. if err := mt.wal.Delete(); err != nil {
  127. db.opt.Errorf("while deleting file: %s, err: %v", filepath, err)
  128. }
  129. }
  130. if lerr == z.NewFile {
  131. return mt, lerr
  132. }
  133. err := mt.UpdateSkipList()
  134. return mt, y.Wrapf(err, "while updating skiplist")
  135. }
  136. func (db *DB) newMemTable() (*memTable, error) {
  137. mt, err := db.openMemTable(db.nextMemFid, os.O_CREATE|os.O_RDWR)
  138. if err == z.NewFile {
  139. db.nextMemFid++
  140. return mt, nil
  141. }
  142. if err != nil {
  143. db.opt.Errorf("Got error: %v for id: %d\n", err, db.nextMemFid)
  144. return nil, y.Wrapf(err, "newMemTable")
  145. }
  146. return nil, errors.Errorf("File %s already exists", mt.wal.Fd.Name())
  147. }
  148. func (db *DB) mtFilePath(fid int) string {
  149. return filepath.Join(db.opt.Dir, fmt.Sprintf("%05d%s", fid, memFileExt))
  150. }
  151. func (mt *memTable) SyncWAL() error {
  152. return mt.wal.Sync()
  153. }
  154. func (mt *memTable) isFull() bool {
  155. if mt.sl.MemSize() >= mt.opt.MemTableSize {
  156. return true
  157. }
  158. if mt.opt.InMemory {
  159. // InMemory mode doesn't have any WAL.
  160. return false
  161. }
  162. return int64(mt.wal.writeAt) >= mt.opt.MemTableSize
  163. }
  164. func (mt *memTable) Put(key []byte, value y.ValueStruct) error {
  165. entry := &Entry{
  166. Key: key,
  167. Value: value.Value,
  168. UserMeta: value.UserMeta,
  169. meta: value.Meta,
  170. ExpiresAt: value.ExpiresAt,
  171. }
  172. // wal is nil only when badger in running in in-memory mode and we don't need the wal.
  173. if mt.wal != nil {
  174. // If WAL exceeds opt.ValueLogFileSize, we'll force flush the memTable. See logic in
  175. // ensureRoomForWrite.
  176. if err := mt.wal.writeEntry(mt.buf, entry, mt.opt); err != nil {
  177. return y.Wrapf(err, "cannot write entry to WAL file")
  178. }
  179. }
  180. // We insert the finish marker in the WAL but not in the memtable.
  181. if entry.meta&bitFinTxn > 0 {
  182. return nil
  183. }
  184. // Write to skiplist and update maxVersion encountered.
  185. mt.sl.Put(key, value)
  186. if ts := y.ParseTs(entry.Key); ts > mt.maxVersion {
  187. mt.maxVersion = ts
  188. }
  189. y.NumBytesWrittenToL0Add(mt.opt.MetricsEnabled, entry.estimateSizeAndSetThreshold(mt.opt.ValueThreshold))
  190. return nil
  191. }
  192. func (mt *memTable) UpdateSkipList() error {
  193. if mt.wal == nil || mt.sl == nil {
  194. return nil
  195. }
  196. endOff, err := mt.wal.iterate(true, 0, mt.replayFunction(mt.opt))
  197. if err != nil {
  198. return y.Wrapf(err, "while iterating wal: %s", mt.wal.Fd.Name())
  199. }
  200. if endOff < mt.wal.size.Load() && mt.opt.ReadOnly {
  201. return y.Wrapf(ErrTruncateNeeded, "end offset: %d < size: %d", endOff, mt.wal.size.Load())
  202. }
  203. return mt.wal.Truncate(int64(endOff))
  204. }
  205. // IncrRef increases the refcount
  206. func (mt *memTable) IncrRef() {
  207. mt.sl.IncrRef()
  208. }
  209. // DecrRef decrements the refcount, deallocating the Skiplist when done using it
  210. func (mt *memTable) DecrRef() {
  211. mt.sl.DecrRef()
  212. }
  213. func (mt *memTable) replayFunction(opt Options) func(Entry, valuePointer) error {
  214. first := true
  215. return func(e Entry, _ valuePointer) error { // Function for replaying.
  216. if first {
  217. opt.Debugf("First key=%q\n", e.Key)
  218. }
  219. first = false
  220. if ts := y.ParseTs(e.Key); ts > mt.maxVersion {
  221. mt.maxVersion = ts
  222. }
  223. v := y.ValueStruct{
  224. Value: e.Value,
  225. Meta: e.meta,
  226. UserMeta: e.UserMeta,
  227. ExpiresAt: e.ExpiresAt,
  228. }
  229. // This is already encoded correctly. Value would be either a vptr, or a full value
  230. // depending upon how big the original value was. Skiplist makes a copy of the key and
  231. // value.
  232. mt.sl.Put(e.Key, v)
  233. return nil
  234. }
  235. }
  236. type logFile struct {
  237. *z.MmapFile
  238. path string
  239. // This is a lock on the log file. It guards the fd’s value, the file’s
  240. // existence and the file’s memory map.
  241. //
  242. // Use shared ownership when reading/writing the file or memory map, use
  243. // exclusive ownership to open/close the descriptor, unmap or remove the file.
  244. lock sync.RWMutex
  245. fid uint32
  246. size atomic.Uint32
  247. dataKey *pb.DataKey
  248. baseIV []byte
  249. registry *KeyRegistry
  250. writeAt uint32
  251. opt Options
  252. }
  253. func (lf *logFile) Truncate(end int64) error {
  254. if fi, err := lf.Fd.Stat(); err != nil {
  255. return fmt.Errorf("while file.stat on file: %s, error: %v\n", lf.Fd.Name(), err)
  256. } else if fi.Size() == end {
  257. return nil
  258. }
  259. y.AssertTrue(!lf.opt.ReadOnly)
  260. lf.size.Store(uint32(end))
  261. return lf.MmapFile.Truncate(end)
  262. }
  263. // encodeEntry will encode entry to the buf
  264. // layout of entry
  265. // +--------+-----+-------+-------+
  266. // | header | key | value | crc32 |
  267. // +--------+-----+-------+-------+
  268. func (lf *logFile) encodeEntry(buf *bytes.Buffer, e *Entry, offset uint32) (int, error) {
  269. h := header{
  270. klen: uint32(len(e.Key)),
  271. vlen: uint32(len(e.Value)),
  272. expiresAt: e.ExpiresAt,
  273. meta: e.meta,
  274. userMeta: e.UserMeta,
  275. }
  276. hash := crc32.New(y.CastagnoliCrcTable)
  277. writer := io.MultiWriter(buf, hash)
  278. // encode header.
  279. var headerEnc [maxHeaderSize]byte
  280. sz := h.Encode(headerEnc[:])
  281. y.Check2(writer.Write(headerEnc[:sz]))
  282. // we'll encrypt only key and value.
  283. if lf.encryptionEnabled() {
  284. // TODO: no need to allocate the bytes. we can calculate the encrypted buf one by one
  285. // since we're using ctr mode of AES encryption. Ordering won't changed. Need some
  286. // refactoring in XORBlock which will work like stream cipher.
  287. eBuf := make([]byte, 0, len(e.Key)+len(e.Value))
  288. eBuf = append(eBuf, e.Key...)
  289. eBuf = append(eBuf, e.Value...)
  290. if err := y.XORBlockStream(
  291. writer, eBuf, lf.dataKey.Data, lf.generateIV(offset)); err != nil {
  292. return 0, y.Wrapf(err, "Error while encoding entry for vlog.")
  293. }
  294. } else {
  295. // Encryption is disabled so writing directly to the buffer.
  296. y.Check2(writer.Write(e.Key))
  297. y.Check2(writer.Write(e.Value))
  298. }
  299. // write crc32 hash.
  300. var crcBuf [crc32.Size]byte
  301. binary.BigEndian.PutUint32(crcBuf[:], hash.Sum32())
  302. y.Check2(buf.Write(crcBuf[:]))
  303. // return encoded length.
  304. return len(headerEnc[:sz]) + len(e.Key) + len(e.Value) + len(crcBuf), nil
  305. }
  306. func (lf *logFile) writeEntry(buf *bytes.Buffer, e *Entry, opt Options) error {
  307. buf.Reset()
  308. plen, err := lf.encodeEntry(buf, e, lf.writeAt)
  309. if err != nil {
  310. return err
  311. }
  312. y.AssertTrue(plen == copy(lf.Data[lf.writeAt:], buf.Bytes()))
  313. lf.writeAt += uint32(plen)
  314. lf.zeroNextEntry()
  315. return nil
  316. }
  317. func (lf *logFile) decodeEntry(buf []byte, offset uint32) (*Entry, error) {
  318. var h header
  319. hlen := h.Decode(buf)
  320. kv := buf[hlen:]
  321. if lf.encryptionEnabled() {
  322. var err error
  323. // No need to worry about mmap. because, XORBlock allocates a byte array to do the
  324. // xor. So, the given slice is not being mutated.
  325. if kv, err = lf.decryptKV(kv, offset); err != nil {
  326. return nil, err
  327. }
  328. }
  329. e := &Entry{
  330. meta: h.meta,
  331. UserMeta: h.userMeta,
  332. ExpiresAt: h.expiresAt,
  333. offset: offset,
  334. Key: kv[:h.klen],
  335. Value: kv[h.klen : h.klen+h.vlen],
  336. }
  337. return e, nil
  338. }
  339. func (lf *logFile) decryptKV(buf []byte, offset uint32) ([]byte, error) {
  340. return y.XORBlockAllocate(buf, lf.dataKey.Data, lf.generateIV(offset))
  341. }
  342. // KeyID returns datakey's ID.
  343. func (lf *logFile) keyID() uint64 {
  344. if lf.dataKey == nil {
  345. // If there is no datakey, then we'll return 0. Which means no encryption.
  346. return 0
  347. }
  348. return lf.dataKey.KeyId
  349. }
  350. func (lf *logFile) encryptionEnabled() bool {
  351. return lf.dataKey != nil
  352. }
  353. // Acquire lock on mmap/file if you are calling this
  354. func (lf *logFile) read(p valuePointer) (buf []byte, err error) {
  355. offset := p.Offset
  356. // Do not convert size to uint32, because the lf.Data can be of size
  357. // 4GB, which overflows the uint32 during conversion to make the size 0,
  358. // causing the read to fail with ErrEOF. See issue #585.
  359. size := int64(len(lf.Data))
  360. valsz := p.Len
  361. lfsz := lf.size.Load()
  362. if int64(offset) >= size || int64(offset+valsz) > size ||
  363. // Ensure that the read is within the file's actual size. It might be possible that
  364. // the offset+valsz length is beyond the file's actual size. This could happen when
  365. // dropAll and iterations are running simultaneously.
  366. int64(offset+valsz) > int64(lfsz) {
  367. err = y.ErrEOF
  368. } else {
  369. buf = lf.Data[offset : offset+valsz]
  370. }
  371. return buf, err
  372. }
  373. // generateIV will generate IV by appending given offset with the base IV.
  374. func (lf *logFile) generateIV(offset uint32) []byte {
  375. iv := make([]byte, aes.BlockSize)
  376. // baseIV is of 12 bytes.
  377. y.AssertTrue(12 == copy(iv[:12], lf.baseIV))
  378. // remaining 4 bytes is obtained from offset.
  379. binary.BigEndian.PutUint32(iv[12:], offset)
  380. return iv
  381. }
  382. func (lf *logFile) doneWriting(offset uint32) error {
  383. if lf.opt.SyncWrites {
  384. if err := lf.Sync(); err != nil {
  385. return y.Wrapf(err, "Unable to sync value log: %q", lf.path)
  386. }
  387. }
  388. // Before we were acquiring a lock here on lf.lock, because we were invalidating the file
  389. // descriptor due to reopening it as read-only. Now, we don't invalidate the fd, but unmap it,
  390. // truncate it and remap it. That creates a window where we have segfaults because the mmap is
  391. // no longer valid, while someone might be reading it. Therefore, we need a lock here again.
  392. lf.lock.Lock()
  393. defer lf.lock.Unlock()
  394. if err := lf.Truncate(int64(offset)); err != nil {
  395. return y.Wrapf(err, "Unable to truncate file: %q", lf.path)
  396. }
  397. // Previously we used to close the file after it was written and reopen it in read-only mode.
  398. // We no longer open files in read-only mode. We keep all vlog files open in read-write mode.
  399. return nil
  400. }
  401. // iterate iterates over log file. It doesn't not allocate new memory for every kv pair.
  402. // Therefore, the kv pair is only valid for the duration of fn call.
  403. func (lf *logFile) iterate(readOnly bool, offset uint32, fn logEntry) (uint32, error) {
  404. if offset == 0 {
  405. // If offset is set to zero, let's advance past the encryption key header.
  406. offset = vlogHeaderSize
  407. }
  408. // For now, read directly from file, because it allows
  409. reader := bufio.NewReader(lf.NewReader(int(offset)))
  410. read := &safeRead{
  411. k: make([]byte, 10),
  412. v: make([]byte, 10),
  413. recordOffset: offset,
  414. lf: lf,
  415. }
  416. var lastCommit uint64
  417. var validEndOffset uint32 = offset
  418. var entries []*Entry
  419. var vptrs []valuePointer
  420. loop:
  421. for {
  422. e, err := read.Entry(reader)
  423. switch {
  424. // We have not reached the end of the file but the entry we read is
  425. // zero. This happens because we have truncated the file and
  426. // zero'ed it out.
  427. case err == io.EOF:
  428. break loop
  429. case err == io.ErrUnexpectedEOF || err == errTruncate:
  430. break loop
  431. case err != nil:
  432. return 0, err
  433. case e == nil:
  434. continue
  435. case e.isZero():
  436. break loop
  437. }
  438. var vp valuePointer
  439. vp.Len = uint32(e.hlen + len(e.Key) + len(e.Value) + crc32.Size)
  440. read.recordOffset += vp.Len
  441. vp.Offset = e.offset
  442. vp.Fid = lf.fid
  443. switch {
  444. case e.meta&bitTxn > 0:
  445. txnTs := y.ParseTs(e.Key)
  446. if lastCommit == 0 {
  447. lastCommit = txnTs
  448. }
  449. if lastCommit != txnTs {
  450. break loop
  451. }
  452. entries = append(entries, e)
  453. vptrs = append(vptrs, vp)
  454. case e.meta&bitFinTxn > 0:
  455. txnTs, err := strconv.ParseUint(string(e.Value), 10, 64)
  456. if err != nil || lastCommit != txnTs {
  457. break loop
  458. }
  459. // Got the end of txn. Now we can store them.
  460. lastCommit = 0
  461. validEndOffset = read.recordOffset
  462. for i, e := range entries {
  463. vp := vptrs[i]
  464. if err := fn(*e, vp); err != nil {
  465. if err == errStop {
  466. break
  467. }
  468. return 0, errFile(err, lf.path, "Iteration function")
  469. }
  470. }
  471. entries = entries[:0]
  472. vptrs = vptrs[:0]
  473. default:
  474. if lastCommit != 0 {
  475. // This is most likely an entry which was moved as part of GC.
  476. // We shouldn't get this entry in the middle of a transaction.
  477. break loop
  478. }
  479. validEndOffset = read.recordOffset
  480. if err := fn(*e, vp); err != nil {
  481. if err == errStop {
  482. break
  483. }
  484. return 0, errFile(err, lf.path, "Iteration function")
  485. }
  486. }
  487. }
  488. return validEndOffset, nil
  489. }
  490. // Zero out the next entry to deal with any crashes.
  491. func (lf *logFile) zeroNextEntry() {
  492. z.ZeroOut(lf.Data, int(lf.writeAt), int(lf.writeAt+maxHeaderSize))
  493. }
  494. func (lf *logFile) open(path string, flags int, fsize int64) error {
  495. mf, ferr := z.OpenMmapFile(path, flags, int(fsize))
  496. lf.MmapFile = mf
  497. if ferr == z.NewFile {
  498. if err := lf.bootstrap(); err != nil {
  499. os.Remove(path)
  500. return err
  501. }
  502. lf.size.Store(vlogHeaderSize)
  503. } else if ferr != nil {
  504. return y.Wrapf(ferr, "while opening file: %s", path)
  505. }
  506. lf.size.Store(uint32(len(lf.Data)))
  507. if lf.size.Load() < vlogHeaderSize {
  508. // Every vlog file should have at least vlogHeaderSize. If it is less than vlogHeaderSize
  509. // then it must have been corrupted. But no need to handle here. log replayer will truncate
  510. // and bootstrap the logfile. So ignoring here.
  511. return nil
  512. }
  513. // Copy over the encryption registry data.
  514. buf := make([]byte, vlogHeaderSize)
  515. y.AssertTruef(vlogHeaderSize == copy(buf, lf.Data),
  516. "Unable to copy from %s, size %d", path, lf.size.Load())
  517. keyID := binary.BigEndian.Uint64(buf[:8])
  518. // retrieve datakey.
  519. if dk, err := lf.registry.DataKey(keyID); err != nil {
  520. return y.Wrapf(err, "While opening vlog file %d", lf.fid)
  521. } else {
  522. lf.dataKey = dk
  523. }
  524. lf.baseIV = buf[8:]
  525. y.AssertTrue(len(lf.baseIV) == 12)
  526. // Preserved ferr so we can return if this was a new file.
  527. return ferr
  528. }
  529. // bootstrap will initialize the log file with key id and baseIV.
  530. // The below figure shows the layout of log file.
  531. // +----------------+------------------+------------------+
  532. // | keyID(8 bytes) | baseIV(12 bytes)| entry... |
  533. // +----------------+------------------+------------------+
  534. func (lf *logFile) bootstrap() error {
  535. var err error
  536. // generate data key for the log file.
  537. var dk *pb.DataKey
  538. if dk, err = lf.registry.LatestDataKey(); err != nil {
  539. return y.Wrapf(err, "Error while retrieving datakey in logFile.bootstarp")
  540. }
  541. lf.dataKey = dk
  542. // We'll always preserve vlogHeaderSize for key id and baseIV.
  543. buf := make([]byte, vlogHeaderSize)
  544. // write key id to the buf.
  545. // key id will be zero if the logfile is in plain text.
  546. binary.BigEndian.PutUint64(buf[:8], lf.keyID())
  547. // generate base IV. It'll be used with offset of the vptr to encrypt the entry.
  548. if _, err := cryptorand.Read(buf[8:]); err != nil {
  549. return y.Wrapf(err, "Error while creating base IV, while creating logfile")
  550. }
  551. // Initialize base IV.
  552. lf.baseIV = buf[8:]
  553. y.AssertTrue(len(lf.baseIV) == 12)
  554. // Copy over to the logFile.
  555. y.AssertTrue(vlogHeaderSize == copy(lf.Data[0:], buf))
  556. // Zero out the next entry.
  557. lf.zeroNextEntry()
  558. return nil
  559. }