diff --git a/lib/dat/reader.ts b/lib/dat/reader.ts index f91e889..c0c4706 100644 --- a/lib/dat/reader.ts +++ b/lib/dat/reader.ts @@ -34,12 +34,26 @@ export class DataReader { this.indexReader.open(this.indexPath); } - private readRecord(offset: bigint, length: number): Buffer { - if (this.fd === null) throw new Error('Data file not opened'); - + private readRecordAt(fd: number, offset: bigint, length: number): { data: T; length: number } | null { const buf = Buffer.alloc(length); - fs.readSync(this.fd, buf, 0, length, Number(offset)); - return buf; + fs.readSync(fd, buf, 0, length, Number(offset)); + const result = DataProtocol.deserializeRecord( + buf, + 0, + this.serializer + ); + return result; + } + + private readNextRecord(fd: number, length: number): { data: T; length: number } | null { + const buf = Buffer.alloc(length); + fs.readSync(fd, buf, 0, length, null); + const result = DataProtocol.deserializeRecord( + buf, + 0, + this.serializer + ); + return result; } getHeader(): DataHeader { @@ -53,15 +67,12 @@ export class DataReader { const found = this.indexReader.binarySearchBySequence(sequence); if (!found) return null; - const buf = this.readRecord(found.entry.offset, found.entry.length); - const result = DataProtocol.deserializeRecord( - buf, - 0, - this.serializer - ); + const result = this.readRecordAt(this.fd, found.entry.offset, found.entry.length); + if (!result) return null; return { + index: found.index, sequence: found.entry.sequence, timestamp: found.entry.timestamp, data: result.data, @@ -74,112 +85,93 @@ export class DataReader { const entry = this.indexReader.getEntry(index); if (!entry) return null; - const buf = this.readRecord(entry.offset, entry.length); - const result = DataProtocol.deserializeRecord( - buf, - 0, - this.serializer - ); + const result = this.readRecordAt(this.fd, entry.offset, entry.length); + if (!result) return null; return { + index: index, sequence: entry.sequence, timestamp: entry.timestamp, data: result.data, }; } - getBulkData(startSeq: number, endSeq: number): DataEntry[] { + getBulkDataBySequence(startSeq: number, endSeq: number): DataEntry[] { if (this.fd === null) throw new Error('Data file not opened'); const results: DataEntry[] = []; - const indexHeader = this.indexReader.getHeader(); - let startIdx = this.findStartIndex(startSeq, indexHeader.writtenCnt); + const indexResults = this.indexReader.findBySequenceRange(startSeq, endSeq); + if (indexResults.length === 0) return []; - for (let i = startIdx; i < indexHeader.writtenCnt; i++) { - const entry = this.indexReader.getEntry(i); - if (!entry) continue; + const firstRecord = this.readRecordAt(this.fd, indexResults[0].entry.offset, indexResults[0].entry.length); + if (firstRecord) { + results.push({ + index: indexResults[0].index, + sequence: indexResults[0].entry.sequence, + timestamp: indexResults[0].entry.timestamp, + data: firstRecord.data, + }) + } - if (entry.sequence > endSeq) break; + for (let i = 1; i < indexResults.length; i++) { + const { index, entry } = indexResults[i]; + const result = this.readNextRecord(this.fd, entry.length); + if (!result) continue; - if (entry.sequence >= startSeq) { - const buf = this.readRecord(entry.offset, entry.length); - const result = DataProtocol.deserializeRecord( - buf, - 0, - this.serializer - ); - if (result) { - results.push({ - sequence: entry.sequence, - timestamp: entry.timestamp, - data: result.data, - }); - } - } + results.push({ + index: index, + sequence: entry.sequence, + timestamp: entry.timestamp, + data: result.data, + }); } return results; } - private findStartIndex(targetSeq: number, writtenCnt: number): number { - let left = 0; - let right = writtenCnt - 1; - let result = 0; - - while (left <= right) { - const mid = Math.floor((left + right) / 2); - const entry = this.indexReader.getEntry(mid); - - if (!entry) { - right = mid - 1; - continue; - } - - if (entry.sequence >= targetSeq) { - result = mid; - right = mid - 1; - } else { - left = mid + 1; - } - } - - return result; - } - getBulkDataByTime(startTs: bigint, endTs: bigint): DataEntry[] { if (this.fd === null) throw new Error('Data file not opened'); - const indexResults = this.indexReader.findByTimeRange(startTs, endTs); const results: DataEntry[] = []; + const indexResults = this.indexReader.findByTimeRange(startTs, endTs); + if (indexResults.length === 0) return []; - for (const { entry } of indexResults) { - const buf = this.readRecord(entry.offset, entry.length); - const result = DataProtocol.deserializeRecord( - buf, - 0, - this.serializer - ); - if (result) { - results.push({ - sequence: entry.sequence, - timestamp: entry.timestamp, - data: result.data, - }); - } + const firstRecord = this.readRecordAt(this.fd, indexResults[0].entry.offset, indexResults[0].entry.length); + if (firstRecord) { + results.push({ + index: indexResults[0].index, + sequence: indexResults[0].entry.sequence, + timestamp: indexResults[0].entry.timestamp, + data: firstRecord.data, + }) + } + + for (let i = 1; i < indexResults.length; i++) { + const { index, entry } = indexResults[i]; + const result = this.readNextRecord(this.fd, entry.length); + if (!result) continue; + + results.push({ + index: index, + sequence: entry.sequence, + timestamp: entry.timestamp, + data: result.data, + }); } return results; } + /* getAllData(): DataEntry[] { if (this.fd === null) throw new Error('Data file not opened'); - + const entries = this.indexReader.getAllEntries(); const results: DataEntry[] = []; - + for (const entry of entries) { - const buf = this.readRecord(entry.offset, entry.length); + const buf = this.readRecordAt(this.fd, entry.offset, entry.length); const result = DataProtocol.deserializeRecord( buf, 0, @@ -193,9 +185,10 @@ export class DataReader { }); } } - + return results; } + */ getRecordCount(): number { return this.indexReader.getHeader().writtenCnt; @@ -209,6 +202,32 @@ export class DataReader { return this.indexReader.getFlags(); } + readNext(): DataEntry | null { + if (this.fd === null) throw new Error('Data file not opened'); + + const next = this.indexReader.getNextEntry(); + if (!next) return null; + const { index, entry } = next; + + const result = this.readNextRecord(this.fd, entry.length); + if (!result) return null; + + return { + index, + sequence: entry.sequence, + timestamp: entry.timestamp, + data: result.data, + }; + } + + getCurrentIndex(): number { + return this.indexReader.getCurrentIndex(); + } + + getCurrentSequence(): number { + return this.indexReader.getCurrentSequence(); + } + close(): void { if (this.fd !== null) { fs.closeSync(this.fd); diff --git a/lib/dat/types.ts b/lib/dat/types.ts index 7041b99..880bd53 100644 --- a/lib/dat/types.ts +++ b/lib/dat/types.ts @@ -7,6 +7,7 @@ export interface Serializer { } export interface DataEntry { + index: number; sequence: number; timestamp: bigint; data: T; diff --git a/lib/idx/reader.ts b/lib/idx/reader.ts index 653348c..b747699 100644 --- a/lib/idx/reader.ts +++ b/lib/idx/reader.ts @@ -1,22 +1,36 @@ // src/index-file/reader.ts import * as fs from 'node:fs'; +import { INDEX_HEADER_SIZE, INDEX_ENTRY_SIZE, FLAG_VALID } from './constants.js'; import { IndexProtocol } from './protocol.js'; import type { IndexHeader, IndexEntry } from './types.js'; export class IndexReader { - private buffer: Buffer | null = null; + private fd: number | null = null; private header: IndexHeader | null = null; private path: string | null = null; + // Sequential read state + private currentIndex: number = 0; + private currentSequence: number = 0; + + // Reusable buffer for reading entries + private entryBuffer: Buffer = Buffer.alloc(INDEX_ENTRY_SIZE); + constructor() { } open(idxFilePath: string): void { - // Read entire file into buffer (simpler than mmap for read-only access) this.path = idxFilePath; - this.buffer = fs.readFileSync(this.path); - this.header = IndexProtocol.readHeader(this.buffer); + this.fd = fs.openSync(this.path, 'r'); + + // Read header only (64 bytes) + const headerBuf = Buffer.alloc(INDEX_HEADER_SIZE); + fs.readSync(this.fd, headerBuf, 0, INDEX_HEADER_SIZE, 0); + this.header = IndexProtocol.readHeader(headerBuf); + // 아예 최초에는 -1 로 두어서 readNextEntry 할때 자동 ++ 할때 오류를 검증 + this.currentIndex = -1; + this.currentSequence = -1; } getHeader(): IndexHeader { @@ -30,90 +44,244 @@ export class IndexReader { } getEntry(index: number): IndexEntry | null { - if (!this.buffer || !this.header) throw new Error('Index file not opened'); - if (index < 0 || index >= this.header.entryCount) return null; - return IndexProtocol.readEntry(this.buffer, index); - } - - findBySequence(sequence: number): { index: number; entry: IndexEntry } | null { - if (!this.buffer || !this.header) throw new Error('Index file not opened'); - - for (let i = 0; i < this.header.writtenCnt; i++) { - const entry = IndexProtocol.readEntry(this.buffer, i); - if (entry && entry.sequence === sequence) { - return { index: i, entry }; - } - } - return null; + if (this.fd === null || !this.header) throw new Error('Index file not opened'); + if (index < 0 || index >= this.header.writtenCnt) return null; + return this.readEntryAt(this.fd, index); } findBySequenceRange(startSeq: number, endSeq: number): { index: number; entry: IndexEntry }[] { - if (!this.buffer || !this.header) throw new Error('Index file not opened'); + if (this.fd === null || !this.header) throw new Error('Index file not opened'); const results: { index: number; entry: IndexEntry }[] = []; - for (let i = 0; i < this.header.writtenCnt; i++) { - const entry = IndexProtocol.readEntry(this.buffer, i); - if (entry && entry.sequence >= startSeq && entry.sequence <= endSeq) { - results.push({ index: i, entry }); - } + const first = this.searchSequenceLowerBound(this.fd, this.header.writtenCnt, startSeq); + if (first == null) return []; + + results.push(first); + for (let i = this.currentIndex; i < this.header.writtenCnt; i++) { + const entry = this.readNextEntry(this.fd); + if (entry == null || entry.sequence > endSeq) break; + results.push({ index: this.currentIndex, entry }); } return results; } + /* getAllEntries(): IndexEntry[] { - if (!this.buffer || !this.header) throw new Error('Index file not opened'); + if (this.fd === null || !this.header) throw new Error('Index file not opened'); const entries: IndexEntry[] = []; for (let i = 0; i < this.header.writtenCnt; i++) { - const entry = IndexProtocol.readEntry(this.buffer, i); + const entry = this.readEntryAt(this.fd, i); if (entry) entries.push(entry); } return entries; } + */ findByTimeRange(startTs: bigint, endTs: bigint): { index: number; entry: IndexEntry }[] { - if (!this.buffer || !this.header) throw new Error('Index file not opened'); + if (this.fd === null || !this.header) throw new Error('Index file not opened'); const results: { index: number; entry: IndexEntry }[] = []; - for (let i = 0; i < this.header.writtenCnt; i++) { - const entry = IndexProtocol.readEntry(this.buffer, i); - if (entry && entry.timestamp >= startTs && entry.timestamp <= endTs) { - results.push({ index: i, entry }); - } + const first = this.searchTimestampLowerBound(this.fd, this.header.writtenCnt, startTs); + if (first === null) return []; + + results.push(first); + for (let i = this.currentIndex; i < this.header.writtenCnt; i++) { + const entry = this.readNextEntry(this.fd); + if (entry == null || entry.timestamp > endTs) break; + results.push({ index: this.currentIndex, entry }); } return results; } binarySearchBySequence(targetSeq: number): { index: number; entry: IndexEntry } | null { - if (!this.buffer || !this.header) throw new Error('Index file not opened'); + if (this.fd === null || !this.header) throw new Error('Index file not opened'); + return this.searchSequenceExact(this.fd, this.header.writtenCnt, targetSeq); + } + getNextEntry(): { index: number; entry: IndexEntry } | null { + if (this.fd === null || !this.header) throw new Error('Index file not opened'); + + if (this.currentIndex >= (this.header.writtenCnt - 1)) { + return null; + } + + const entry = this.readNextEntry(this.fd); + if (!entry) return null; + + return { index: this.currentIndex, entry }; + } + + getCurrentIndex(): number { + return this.currentIndex; + } + + getCurrentSequence(): number { + return this.currentSequence; + } + + close(): void { + if (this.fd !== null) { + fs.closeSync(this.fd); + this.fd = null; + } + this.header = null; + this.currentIndex = -1; + this.currentSequence = -1; + } + + // ###################################################################### + // Private methods + // ###################################################################### + private readEntryAt(fd: number, index: number): IndexEntry | null { + const offset = INDEX_HEADER_SIZE + index * INDEX_ENTRY_SIZE; + fs.readSync(fd, this.entryBuffer, 0, INDEX_ENTRY_SIZE, offset); + + const flags = this.entryBuffer.readUInt32LE(24); + const indexEntry = this.buildIndexEntry(flags); + + this.currentIndex = index; + this.currentSequence = indexEntry.sequence; + + if (!(flags & FLAG_VALID)) return null; + return indexEntry; + } + + private readNextEntry(fd: number): IndexEntry | null { + fs.readSync(fd, this.entryBuffer, 0, INDEX_ENTRY_SIZE, null); + + const flags = this.entryBuffer.readUInt32LE(24); + const indexEntry = this.buildIndexEntry(flags); + + ++this.currentIndex; + this.currentSequence = indexEntry.sequence; + + if (!(flags & FLAG_VALID)) return null; + return indexEntry; + } + + private buildIndexEntry(flags: number): IndexEntry { + return { + sequence: this.entryBuffer.readUInt32LE(0), + timestamp: this.entryBuffer.readBigUInt64LE(4), + offset: this.entryBuffer.readBigUInt64LE(12), + length: this.entryBuffer.readUInt32LE(20), + flags, + checksum: this.entryBuffer.readUInt32LE(28), + }; + } + + // Timestamp binary search methods + private searchTimestampExact(fd: number, writtenCnt: number, targetTs: bigint): { index: number; entry: IndexEntry } | null { let left = 0; - let right = this.header.writtenCnt - 1; + let right = writtenCnt - 1; while (left <= right) { const mid = Math.floor((left + right) / 2); - const entry = IndexProtocol.readEntry(this.buffer, mid); + const entry = this.readEntryAt(fd, mid); + if (!entry) { right = mid - 1; continue; } - if (!entry) { + if (entry.timestamp === targetTs) return { index: mid, entry }; + if (entry.timestamp < targetTs) left = mid + 1; + else right = mid - 1; + } + return null; + } + + private searchTimestampLowerBound(fd: number, writtenCnt: number, targetTs: bigint): { index: number; entry: IndexEntry } | null { + let left = 0; + let right = writtenCnt - 1; + let result: { index: number; entry: IndexEntry } | null = null; + + while (left <= right) { + const mid = Math.floor((left + right) / 2); + const entry = this.readEntryAt(fd, mid); + if (!entry) { right = mid - 1; continue; } + + if (entry.timestamp >= targetTs) { + result = { index: mid, entry }; right = mid - 1; - continue; + } else { + left = mid + 1; } + } + return result; + } - if (entry.sequence === targetSeq) { - return { index: mid, entry }; - } else if (entry.sequence < targetSeq) { + private searchTimestampUpperBound(fd: number, writtenCnt: number, targetTs: bigint): { index: number; entry: IndexEntry } | null { + let left = 0; + let right = writtenCnt - 1; + let result: { index: number; entry: IndexEntry } | null = null; + + while (left <= right) { + const mid = Math.floor((left + right) / 2); + const entry = this.readEntryAt(fd, mid); + if (!entry) { right = mid - 1; continue; } + + if (entry.timestamp <= targetTs) { + result = { index: mid, entry }; left = mid + 1; } else { right = mid - 1; } } + return result; + } + // Sequence binary search methods + private searchSequenceExact(fd: number, writtenCnt: number, targetSeq: number): { index: number; entry: IndexEntry } | null { + let left = 0; + let right = writtenCnt - 1; + + while (left <= right) { + const mid = Math.floor((left + right) / 2); + const entry = this.readEntryAt(fd, mid); + if (!entry) { right = mid - 1; continue; } + + if (entry.sequence === targetSeq) return { index: mid, entry }; + if (entry.sequence < targetSeq) left = mid + 1; + else right = mid - 1; + } return null; } - close(): void { - // Simply release buffer reference (GC will handle cleanup) - this.buffer = null; - this.header = null; + private searchSequenceLowerBound(fd: number, writtenCnt: number, targetSeq: number): { index: number; entry: IndexEntry } | null { + let left = 0; + let right = writtenCnt - 1; + let result: { index: number; entry: IndexEntry } | null = null; + + while (left <= right) { + const mid = Math.floor((left + right) / 2); + const entry = this.readEntryAt(fd, mid); + if (!entry) { right = mid - 1; continue; } + + if (entry.sequence >= targetSeq) { + result = { index: mid, entry }; + right = mid - 1; + } else { + left = mid + 1; + } + } + return result; + } + + private searchSequenceUpperBound(fd: number, writtenCnt: number, targetSeq: number): { index: number; entry: IndexEntry } | null { + let left = 0; + let right = writtenCnt - 1; + let result: { index: number; entry: IndexEntry } | null = null; + + while (left <= right) { + const mid = Math.floor((left + right) / 2); + const entry = this.readEntryAt(fd, mid); + if (!entry) { right = mid - 1; continue; } + + if (entry.sequence <= targetSeq) { + result = { index: mid, entry }; + left = mid + 1; + } else { + right = mid - 1; + } + } + return result; } }