Open way modify
This commit is contained in:
@@ -3,7 +3,7 @@ import * as fs from 'node:fs';
|
|||||||
import { DATA_HEADER_SIZE } from './constants.js';
|
import { DATA_HEADER_SIZE } from './constants.js';
|
||||||
import { DataProtocol, DataHeader } from './protocol.js';
|
import { DataProtocol, DataHeader } from './protocol.js';
|
||||||
import { IndexReader } from '../idx/index.js';
|
import { IndexReader } from '../idx/index.js';
|
||||||
import type { Serializer, DataEntry } from './types.js';
|
import type { Serializer, DataEntry, DataFileReaderOptions } from './types.js';
|
||||||
|
|
||||||
export class DataReader<T> {
|
export class DataReader<T> {
|
||||||
private fd: number | null = null;
|
private fd: number | null = null;
|
||||||
@@ -12,17 +12,18 @@ export class DataReader<T> {
|
|||||||
private indexReader: IndexReader;
|
private indexReader: IndexReader;
|
||||||
private serializer: Serializer<T>;
|
private serializer: Serializer<T>;
|
||||||
|
|
||||||
readonly dataPath: string;
|
private dataPath: string | null = null;
|
||||||
readonly indexPath: string;
|
private indexPath: string | null = null;
|
||||||
|
|
||||||
constructor(basePath: string, serializer: Serializer<T>) {
|
constructor(options: DataFileReaderOptions<T>) {
|
||||||
this.dataPath = `${basePath}.dat`;
|
this.serializer = options.serializer;
|
||||||
this.indexPath = `${basePath}.idx`;
|
this.indexReader = new IndexReader();
|
||||||
this.serializer = serializer;
|
|
||||||
this.indexReader = new IndexReader(this.indexPath);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
open(): void {
|
open(basePath: string): void {
|
||||||
|
this.dataPath = `${basePath}.dat`;
|
||||||
|
this.indexPath = `${basePath}.idx`;
|
||||||
|
|
||||||
this.fd = fs.openSync(this.dataPath, 'r');
|
this.fd = fs.openSync(this.dataPath, 'r');
|
||||||
|
|
||||||
// Read header only
|
// Read header only
|
||||||
@@ -30,7 +31,7 @@ export class DataReader<T> {
|
|||||||
fs.readSync(this.fd, headerBuf, 0, DATA_HEADER_SIZE, 0);
|
fs.readSync(this.fd, headerBuf, 0, DATA_HEADER_SIZE, 0);
|
||||||
this.header = DataProtocol.readHeader(headerBuf);
|
this.header = DataProtocol.readHeader(headerBuf);
|
||||||
|
|
||||||
this.indexReader.open();
|
this.indexReader.open(this.indexPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
private readRecord(offset: bigint, length: number): Buffer {
|
private readRecord(offset: bigint, length: number): Buffer {
|
||||||
@@ -204,6 +205,10 @@ export class DataReader<T> {
|
|||||||
return this.indexReader.getHeader().latestSequence;
|
return this.indexReader.getHeader().latestSequence;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getFlags(): number {
|
||||||
|
return this.indexReader.getFlags();
|
||||||
|
}
|
||||||
|
|
||||||
close(): void {
|
close(): void {
|
||||||
if (this.fd !== null) {
|
if (this.fd !== null) {
|
||||||
fs.closeSync(this.fd);
|
fs.closeSync(this.fd);
|
||||||
|
|||||||
@@ -17,3 +17,7 @@ export interface DataFileOptions<T> {
|
|||||||
forceTruncate?: boolean;
|
forceTruncate?: boolean;
|
||||||
indexFileOpt: IndexFileOptions;
|
indexFileOpt: IndexFileOptions;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface DataFileReaderOptions<T> {
|
||||||
|
serializer: Serializer<T>;
|
||||||
|
}
|
||||||
|
|||||||
@@ -117,6 +117,7 @@ export class DataWriter<T> {
|
|||||||
this.currentOffset += BigInt(buf.length);
|
this.currentOffset += BigInt(buf.length);
|
||||||
++this.recordCount;
|
++this.recordCount;
|
||||||
|
|
||||||
|
this.writeHeader();
|
||||||
return this.latestSequence;
|
return this.latestSequence;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -149,8 +150,11 @@ export class DataWriter<T> {
|
|||||||
return this.latestSequence;
|
return this.latestSequence;
|
||||||
}
|
}
|
||||||
|
|
||||||
getNextSequence(): number {
|
writeHeader(): void {
|
||||||
return this.latestSequence + 1;
|
if (this.fd === null || !this.headerBuf) return;
|
||||||
|
|
||||||
|
DataProtocol.updateHeader(this.headerBuf, this.currentOffset, this.recordCount);
|
||||||
|
fs.writeSync(this.fd, this.headerBuf, 0, DATA_HEADER_SIZE, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
sync(): void {
|
sync(): void {
|
||||||
@@ -159,8 +163,6 @@ export class DataWriter<T> {
|
|||||||
DataProtocol.updateHeader(this.headerBuf, this.currentOffset, this.recordCount);
|
DataProtocol.updateHeader(this.headerBuf, this.currentOffset, this.recordCount);
|
||||||
fs.writeSync(this.fd, this.headerBuf, 0, DATA_HEADER_SIZE, 0);
|
fs.writeSync(this.fd, this.headerBuf, 0, DATA_HEADER_SIZE, 0);
|
||||||
fs.fsyncSync(this.fd);
|
fs.fsyncSync(this.fd);
|
||||||
|
|
||||||
this.indexWriter.syncAll();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
close(): void {
|
close(): void {
|
||||||
@@ -175,6 +177,10 @@ export class DataWriter<T> {
|
|||||||
this.headerBuf = null;
|
this.headerBuf = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
writeFlags(flags: number): void {
|
||||||
|
this.indexWriter.writeFlags(flags);
|
||||||
|
}
|
||||||
|
|
||||||
getStats() {
|
getStats() {
|
||||||
return {
|
return {
|
||||||
dataPath: this.dataPath,
|
dataPath: this.dataPath,
|
||||||
|
|||||||
@@ -38,6 +38,7 @@ export class IndexProtocol {
|
|||||||
buf.writeBigUInt64LE(0n, 28); // dataFileSize
|
buf.writeBigUInt64LE(0n, 28); // dataFileSize
|
||||||
buf.writeUInt32LE(0, 36); // latestSequence
|
buf.writeUInt32LE(0, 36); // latestSequence
|
||||||
buf.writeUInt8(autoIncrementSequence ? 1 : 0, 40); // autoIncrementSequence
|
buf.writeUInt8(autoIncrementSequence ? 1 : 0, 40); // autoIncrementSequence
|
||||||
|
buf.writeUInt32LE(0, 41); // flags
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -52,10 +53,15 @@ export class IndexProtocol {
|
|||||||
dataFileSize: buf.readBigUInt64LE(28),
|
dataFileSize: buf.readBigUInt64LE(28),
|
||||||
latestSequence: buf.readUInt32LE(36),
|
latestSequence: buf.readUInt32LE(36),
|
||||||
autoIncrementSequence: buf.readUInt8(40) === 1,
|
autoIncrementSequence: buf.readUInt8(40) === 1,
|
||||||
reserved: buf.subarray(41, 64),
|
flags: buf.readUInt32LE(41),
|
||||||
|
reserved: buf.subarray(45, 64),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static writeFlags(buf: Buffer, flags: number): void {
|
||||||
|
buf.writeUInt32LE(flags, 41);
|
||||||
|
}
|
||||||
|
|
||||||
static updateHeaderCounts(
|
static updateHeaderCounts(
|
||||||
buf: Buffer,
|
buf: Buffer,
|
||||||
writtenCnt: number,
|
writtenCnt: number,
|
||||||
|
|||||||
@@ -7,14 +7,14 @@ export class IndexReader {
|
|||||||
private buffer: Buffer | null = null;
|
private buffer: Buffer | null = null;
|
||||||
private header: IndexHeader | null = null;
|
private header: IndexHeader | null = null;
|
||||||
|
|
||||||
readonly path: string;
|
private path: string | null = null;
|
||||||
|
|
||||||
constructor(path: string) {
|
constructor() {
|
||||||
this.path = path;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
open(): void {
|
open(idxFilePath: string): void {
|
||||||
// Read entire file into buffer (simpler than mmap for read-only access)
|
// Read entire file into buffer (simpler than mmap for read-only access)
|
||||||
|
this.path = idxFilePath;
|
||||||
this.buffer = fs.readFileSync(this.path);
|
this.buffer = fs.readFileSync(this.path);
|
||||||
this.header = IndexProtocol.readHeader(this.buffer);
|
this.header = IndexProtocol.readHeader(this.buffer);
|
||||||
}
|
}
|
||||||
@@ -24,6 +24,11 @@ export class IndexReader {
|
|||||||
return this.header;
|
return this.header;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getFlags(): number {
|
||||||
|
if (!this.header) throw new Error('Index file not opened');
|
||||||
|
return this.header.flags;
|
||||||
|
}
|
||||||
|
|
||||||
getEntry(index: number): IndexEntry | null {
|
getEntry(index: number): IndexEntry | null {
|
||||||
if (!this.buffer || !this.header) throw new Error('Index file not opened');
|
if (!this.buffer || !this.header) throw new Error('Index file not opened');
|
||||||
if (index < 0 || index >= this.header.entryCount) return null;
|
if (index < 0 || index >= this.header.entryCount) return null;
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ export interface IndexHeader {
|
|||||||
dataFileSize: bigint;
|
dataFileSize: bigint;
|
||||||
latestSequence: number;
|
latestSequence: number;
|
||||||
autoIncrementSequence: boolean;
|
autoIncrementSequence: boolean;
|
||||||
|
flags: number;
|
||||||
reserved: Buffer;
|
reserved: Buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -151,6 +151,9 @@ export class IndexWriter {
|
|||||||
this.dataFileSize = newDataEnd;
|
this.dataFileSize = newDataEnd;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sync immediately after write
|
||||||
|
this.writeHeader();
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -158,7 +161,7 @@ export class IndexWriter {
|
|||||||
return this.latestSequence;
|
return this.latestSequence;
|
||||||
}
|
}
|
||||||
|
|
||||||
syncHeader(): void {
|
writeHeader(): void {
|
||||||
if (!this.headerBuf || this.fd === null) return;
|
if (!this.headerBuf || this.fd === null) return;
|
||||||
|
|
||||||
// Update header counts
|
// Update header counts
|
||||||
@@ -177,7 +180,7 @@ export class IndexWriter {
|
|||||||
if (this.fd === null) return;
|
if (this.fd === null) return;
|
||||||
|
|
||||||
// Sync header first
|
// Sync header first
|
||||||
this.syncHeader();
|
this.writeHeader();
|
||||||
|
|
||||||
// Sync all file changes to disk
|
// Sync all file changes to disk
|
||||||
fs.fsyncSync(this.fd);
|
fs.fsyncSync(this.fd);
|
||||||
@@ -198,6 +201,16 @@ export class IndexWriter {
|
|||||||
this.entryBuf = null;
|
this.entryBuf = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
writeFlags(flags: number): void {
|
||||||
|
if (!this.headerBuf || this.fd === null) {
|
||||||
|
throw new Error('Index file not opened');
|
||||||
|
}
|
||||||
|
|
||||||
|
IndexProtocol.writeFlags(this.headerBuf, flags);
|
||||||
|
fs.writeSync(this.fd, this.headerBuf, 41, 4, 41);
|
||||||
|
fs.fsyncSync(this.fd);
|
||||||
|
}
|
||||||
|
|
||||||
getStats() {
|
getStats() {
|
||||||
return {
|
return {
|
||||||
path: this.path,
|
path: this.path,
|
||||||
|
|||||||
Reference in New Issue
Block a user