optimze for reader

This commit is contained in:
Eli-Class
2026-02-04 07:09:00 +00:00
parent 81c5dba641
commit 339e2d37e3
3 changed files with 316 additions and 128 deletions

View File

@@ -34,12 +34,26 @@ export class DataReader<T> {
this.indexReader.open(this.indexPath);
}
private readRecord(offset: bigint, length: number): Buffer {
if (this.fd === null) throw new Error('Data file not opened');
private readRecordAt(fd: number, offset: bigint, length: number): { data: T; length: number } | null {
const buf = Buffer.alloc(length);
fs.readSync(this.fd, buf, 0, length, Number(offset));
return buf;
fs.readSync(fd, buf, 0, length, Number(offset));
const result = DataProtocol.deserializeRecord(
buf,
0,
this.serializer
);
return result;
}
private readNextRecord(fd: number, length: number): { data: T; length: number } | null {
const buf = Buffer.alloc(length);
fs.readSync(fd, buf, 0, length, null);
const result = DataProtocol.deserializeRecord(
buf,
0,
this.serializer
);
return result;
}
getHeader(): DataHeader {
@@ -53,15 +67,12 @@ export class DataReader<T> {
const found = this.indexReader.binarySearchBySequence(sequence);
if (!found) return null;
const buf = this.readRecord(found.entry.offset, found.entry.length);
const result = DataProtocol.deserializeRecord(
buf,
0,
this.serializer
);
const result = this.readRecordAt(this.fd, found.entry.offset, found.entry.length);
if (!result) return null;
return {
index: found.index,
sequence: found.entry.sequence,
timestamp: found.entry.timestamp,
data: result.data,
@@ -74,112 +85,93 @@ export class DataReader<T> {
const entry = this.indexReader.getEntry(index);
if (!entry) return null;
const buf = this.readRecord(entry.offset, entry.length);
const result = DataProtocol.deserializeRecord(
buf,
0,
this.serializer
);
const result = this.readRecordAt(this.fd, entry.offset, entry.length);
if (!result) return null;
return {
index: index,
sequence: entry.sequence,
timestamp: entry.timestamp,
data: result.data,
};
}
getBulkData(startSeq: number, endSeq: number): DataEntry<T>[] {
getBulkDataBySequence(startSeq: number, endSeq: number): DataEntry<T>[] {
if (this.fd === null) throw new Error('Data file not opened');
const results: DataEntry<T>[] = [];
const indexHeader = this.indexReader.getHeader();
let startIdx = this.findStartIndex(startSeq, indexHeader.writtenCnt);
const indexResults = this.indexReader.findBySequenceRange(startSeq, endSeq);
if (indexResults.length === 0) return [];
for (let i = startIdx; i < indexHeader.writtenCnt; i++) {
const entry = this.indexReader.getEntry(i);
if (!entry) continue;
const firstRecord = this.readRecordAt(this.fd, indexResults[0].entry.offset, indexResults[0].entry.length);
if (firstRecord) {
results.push({
index: indexResults[0].index,
sequence: indexResults[0].entry.sequence,
timestamp: indexResults[0].entry.timestamp,
data: firstRecord.data,
})
}
if (entry.sequence > endSeq) break;
for (let i = 1; i < indexResults.length; i++) {
const { index, entry } = indexResults[i];
const result = this.readNextRecord(this.fd, entry.length);
if (!result) continue;
if (entry.sequence >= startSeq) {
const buf = this.readRecord(entry.offset, entry.length);
const result = DataProtocol.deserializeRecord(
buf,
0,
this.serializer
);
if (result) {
results.push({
sequence: entry.sequence,
timestamp: entry.timestamp,
data: result.data,
});
}
}
results.push({
index: index,
sequence: entry.sequence,
timestamp: entry.timestamp,
data: result.data,
});
}
return results;
}
private findStartIndex(targetSeq: number, writtenCnt: number): number {
let left = 0;
let right = writtenCnt - 1;
let result = 0;
while (left <= right) {
const mid = Math.floor((left + right) / 2);
const entry = this.indexReader.getEntry(mid);
if (!entry) {
right = mid - 1;
continue;
}
if (entry.sequence >= targetSeq) {
result = mid;
right = mid - 1;
} else {
left = mid + 1;
}
}
return result;
}
getBulkDataByTime(startTs: bigint, endTs: bigint): DataEntry<T>[] {
if (this.fd === null) throw new Error('Data file not opened');
const indexResults = this.indexReader.findByTimeRange(startTs, endTs);
const results: DataEntry<T>[] = [];
const indexResults = this.indexReader.findByTimeRange(startTs, endTs);
if (indexResults.length === 0) return [];
for (const { entry } of indexResults) {
const buf = this.readRecord(entry.offset, entry.length);
const result = DataProtocol.deserializeRecord(
buf,
0,
this.serializer
);
if (result) {
results.push({
sequence: entry.sequence,
timestamp: entry.timestamp,
data: result.data,
});
}
const firstRecord = this.readRecordAt(this.fd, indexResults[0].entry.offset, indexResults[0].entry.length);
if (firstRecord) {
results.push({
index: indexResults[0].index,
sequence: indexResults[0].entry.sequence,
timestamp: indexResults[0].entry.timestamp,
data: firstRecord.data,
})
}
for (let i = 1; i < indexResults.length; i++) {
const { index, entry } = indexResults[i];
const result = this.readNextRecord(this.fd, entry.length);
if (!result) continue;
results.push({
index: index,
sequence: entry.sequence,
timestamp: entry.timestamp,
data: result.data,
});
}
return results;
}
/*
getAllData(): DataEntry<T>[] {
if (this.fd === null) throw new Error('Data file not opened');
const entries = this.indexReader.getAllEntries();
const results: DataEntry<T>[] = [];
for (const entry of entries) {
const buf = this.readRecord(entry.offset, entry.length);
const buf = this.readRecordAt(this.fd, entry.offset, entry.length);
const result = DataProtocol.deserializeRecord(
buf,
0,
@@ -193,9 +185,10 @@ export class DataReader<T> {
});
}
}
return results;
}
*/
getRecordCount(): number {
return this.indexReader.getHeader().writtenCnt;
@@ -209,6 +202,32 @@ export class DataReader<T> {
return this.indexReader.getFlags();
}
readNext(): DataEntry<T> | null {
if (this.fd === null) throw new Error('Data file not opened');
const next = this.indexReader.getNextEntry();
if (!next) return null;
const { index, entry } = next;
const result = this.readNextRecord(this.fd, entry.length);
if (!result) return null;
return {
index,
sequence: entry.sequence,
timestamp: entry.timestamp,
data: result.data,
};
}
getCurrentIndex(): number {
return this.indexReader.getCurrentIndex();
}
getCurrentSequence(): number {
return this.indexReader.getCurrentSequence();
}
close(): void {
if (this.fd !== null) {
fs.closeSync(this.fd);