diff --git a/lib/dat/reader.ts b/lib/dat/reader.ts index 9440bd2..f91e889 100644 --- a/lib/dat/reader.ts +++ b/lib/dat/reader.ts @@ -3,7 +3,7 @@ import * as fs from 'node:fs'; import { DATA_HEADER_SIZE } from './constants.js'; import { DataProtocol, DataHeader } from './protocol.js'; import { IndexReader } from '../idx/index.js'; -import type { Serializer, DataEntry } from './types.js'; +import type { Serializer, DataEntry, DataFileReaderOptions } from './types.js'; export class DataReader { private fd: number | null = null; @@ -12,17 +12,18 @@ export class DataReader { private indexReader: IndexReader; private serializer: Serializer; - readonly dataPath: string; - readonly indexPath: string; + private dataPath: string | null = null; + private indexPath: string | null = null; - constructor(basePath: string, serializer: Serializer) { - this.dataPath = `${basePath}.dat`; - this.indexPath = `${basePath}.idx`; - this.serializer = serializer; - this.indexReader = new IndexReader(this.indexPath); + constructor(options: DataFileReaderOptions) { + this.serializer = options.serializer; + this.indexReader = new IndexReader(); } - open(): void { + open(basePath: string): void { + this.dataPath = `${basePath}.dat`; + this.indexPath = `${basePath}.idx`; + this.fd = fs.openSync(this.dataPath, 'r'); // Read header only @@ -30,7 +31,7 @@ export class DataReader { fs.readSync(this.fd, headerBuf, 0, DATA_HEADER_SIZE, 0); this.header = DataProtocol.readHeader(headerBuf); - this.indexReader.open(); + this.indexReader.open(this.indexPath); } private readRecord(offset: bigint, length: number): Buffer { @@ -204,6 +205,10 @@ export class DataReader { return this.indexReader.getHeader().latestSequence; } + getFlags(): number { + return this.indexReader.getFlags(); + } + close(): void { if (this.fd !== null) { fs.closeSync(this.fd); diff --git a/lib/dat/types.ts b/lib/dat/types.ts index acc00d8..7041b99 100644 --- a/lib/dat/types.ts +++ b/lib/dat/types.ts @@ -17,3 +17,7 @@ export interface DataFileOptions { forceTruncate?: boolean; indexFileOpt: IndexFileOptions; } + +export interface DataFileReaderOptions { + serializer: Serializer; +} diff --git a/lib/dat/writer.ts b/lib/dat/writer.ts index 9dbe1fd..4062e14 100644 --- a/lib/dat/writer.ts +++ b/lib/dat/writer.ts @@ -117,6 +117,7 @@ export class DataWriter { this.currentOffset += BigInt(buf.length); ++this.recordCount; + this.writeHeader(); return this.latestSequence; } @@ -149,8 +150,11 @@ export class DataWriter { return this.latestSequence; } - getNextSequence(): number { - return this.latestSequence + 1; + writeHeader(): void { + if (this.fd === null || !this.headerBuf) return; + + DataProtocol.updateHeader(this.headerBuf, this.currentOffset, this.recordCount); + fs.writeSync(this.fd, this.headerBuf, 0, DATA_HEADER_SIZE, 0); } sync(): void { @@ -159,8 +163,6 @@ export class DataWriter { DataProtocol.updateHeader(this.headerBuf, this.currentOffset, this.recordCount); fs.writeSync(this.fd, this.headerBuf, 0, DATA_HEADER_SIZE, 0); fs.fsyncSync(this.fd); - - this.indexWriter.syncAll(); } close(): void { @@ -175,6 +177,10 @@ export class DataWriter { this.headerBuf = null; } + writeFlags(flags: number): void { + this.indexWriter.writeFlags(flags); + } + getStats() { return { dataPath: this.dataPath, diff --git a/lib/idx/protocol.ts b/lib/idx/protocol.ts index 33d36ad..4031c5d 100644 --- a/lib/idx/protocol.ts +++ b/lib/idx/protocol.ts @@ -38,6 +38,7 @@ export class IndexProtocol { buf.writeBigUInt64LE(0n, 28); // dataFileSize buf.writeUInt32LE(0, 36); // latestSequence buf.writeUInt8(autoIncrementSequence ? 1 : 0, 40); // autoIncrementSequence + buf.writeUInt32LE(0, 41); // flags return buf; } @@ -52,10 +53,15 @@ export class IndexProtocol { dataFileSize: buf.readBigUInt64LE(28), latestSequence: buf.readUInt32LE(36), autoIncrementSequence: buf.readUInt8(40) === 1, - reserved: buf.subarray(41, 64), + flags: buf.readUInt32LE(41), + reserved: buf.subarray(45, 64), }; } + static writeFlags(buf: Buffer, flags: number): void { + buf.writeUInt32LE(flags, 41); + } + static updateHeaderCounts( buf: Buffer, writtenCnt: number, diff --git a/lib/idx/reader.ts b/lib/idx/reader.ts index 3498565..653348c 100644 --- a/lib/idx/reader.ts +++ b/lib/idx/reader.ts @@ -7,14 +7,14 @@ export class IndexReader { private buffer: Buffer | null = null; private header: IndexHeader | null = null; - readonly path: string; + private path: string | null = null; - constructor(path: string) { - this.path = path; + constructor() { } - open(): void { + open(idxFilePath: string): void { // Read entire file into buffer (simpler than mmap for read-only access) + this.path = idxFilePath; this.buffer = fs.readFileSync(this.path); this.header = IndexProtocol.readHeader(this.buffer); } @@ -24,6 +24,11 @@ export class IndexReader { return this.header; } + getFlags(): number { + if (!this.header) throw new Error('Index file not opened'); + return this.header.flags; + } + getEntry(index: number): IndexEntry | null { if (!this.buffer || !this.header) throw new Error('Index file not opened'); if (index < 0 || index >= this.header.entryCount) return null; diff --git a/lib/idx/types.ts b/lib/idx/types.ts index 19191fa..fd38dfc 100644 --- a/lib/idx/types.ts +++ b/lib/idx/types.ts @@ -9,6 +9,7 @@ export interface IndexHeader { dataFileSize: bigint; latestSequence: number; autoIncrementSequence: boolean; + flags: number; reserved: Buffer; } diff --git a/lib/idx/writer.ts b/lib/idx/writer.ts index e9ffb13..5fa3d73 100644 --- a/lib/idx/writer.ts +++ b/lib/idx/writer.ts @@ -151,6 +151,9 @@ export class IndexWriter { this.dataFileSize = newDataEnd; } + // Sync immediately after write + this.writeHeader(); + return true; } @@ -158,7 +161,7 @@ export class IndexWriter { return this.latestSequence; } - syncHeader(): void { + writeHeader(): void { if (!this.headerBuf || this.fd === null) return; // Update header counts @@ -177,7 +180,7 @@ export class IndexWriter { if (this.fd === null) return; // Sync header first - this.syncHeader(); + this.writeHeader(); // Sync all file changes to disk fs.fsyncSync(this.fd); @@ -198,6 +201,16 @@ export class IndexWriter { this.entryBuf = null; } + writeFlags(flags: number): void { + if (!this.headerBuf || this.fd === null) { + throw new Error('Index file not opened'); + } + + IndexProtocol.writeFlags(this.headerBuf, flags); + fs.writeSync(this.fd, this.headerBuf, 41, 4, 41); + fs.fsyncSync(this.fd); + } + getStats() { return { path: this.path,