From 92a26812921026fe8203e2f73f50559c952fada8 Mon Sep 17 00:00:00 2001 From: Eli-Class Date: Thu, 5 Feb 2026 05:36:36 +0000 Subject: [PATCH] Read writer memory optimize --- .gitignore | 3 ++- lib/dat/protocol.ts | 27 +++++++++++++++++++++++ lib/dat/reader.ts | 6 +++-- lib/dat/writer.ts | 38 +++++++++++++++++++------------ lib/idx/protocol.ts | 40 +++++++-------------------------- lib/idx/reader.ts | 21 +++++++++++++----- lib/idx/writer.ts | 54 +++++++++++++++++++++------------------------ package.json | 16 ++++++++++---- 8 files changed, 118 insertions(+), 87 deletions(-) diff --git a/.gitignore b/.gitignore index 862f83b..12822a4 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ build/ dist/ *.log *.node -.DS_Store \ No newline at end of file +.DS_Store +package-lock.json diff --git a/lib/dat/protocol.ts b/lib/dat/protocol.ts index 411f829..4d106b2 100644 --- a/lib/dat/protocol.ts +++ b/lib/dat/protocol.ts @@ -12,6 +12,10 @@ export interface DataHeader { reserved: Buffer; } +export interface BufferRef { + buf: Buffer; +} + export class DataProtocol { static createHeader(): Buffer { const buf = Buffer.alloc(DATA_HEADER_SIZE); @@ -54,6 +58,29 @@ export class DataProtocol { return buf; } + /** + * 기존 버퍼에 레코드를 직렬화하여 쓰기 + * 버퍼 크기 부족시 내부에서 재할당 (BufferRef로 참조 전달) + * @returns 실제 쓰여진 바이트 수 (totalLen) + */ + static serializeRecordTo(bufRef: BufferRef, data: T, serializer: Serializer): number { + const dataBytes = serializer.serialize(data); + const totalLen = RECORD_HEADER_SIZE + dataBytes.length; + + // 버퍼 크기 부족시 재할당 + if (bufRef.buf.length < totalLen) { + bufRef.buf = Buffer.alloc(totalLen); + } + + dataBytes.copy(bufRef.buf, RECORD_HEADER_SIZE); + + bufRef.buf.writeUInt32LE(dataBytes.length, 0); + const checksum = crc32(bufRef.buf, RECORD_HEADER_SIZE, totalLen); + bufRef.buf.writeUInt32LE(checksum, 4); + + return totalLen; + } + static deserializeRecord( buf: Buffer, offset: number, diff --git a/lib/dat/reader.ts b/lib/dat/reader.ts index dae9e20..aa4cce6 100644 --- a/lib/dat/reader.ts +++ b/lib/dat/reader.ts @@ -1,5 +1,6 @@ // src/data-file/reader.ts import * as fs from 'node:fs'; +import * as fsext from 'fs-ext'; import { DATA_HEADER_SIZE } from './constants.js'; import { DataProtocol, DataHeader } from './protocol.js'; import { IndexReader } from '../idx/index.js'; @@ -28,7 +29,7 @@ export class DataReader { // Read header only const headerBuf = Buffer.alloc(DATA_HEADER_SIZE); - fs.readSync(this.fd, headerBuf, 0, DATA_HEADER_SIZE, 0); + fs.readSync(this.fd, headerBuf, 0, DATA_HEADER_SIZE, null); this.header = DataProtocol.readHeader(headerBuf); this.indexReader.open(this.indexPath); @@ -36,7 +37,8 @@ export class DataReader { private readRecordAt(fd: number, offset: bigint, length: number): { data: T; length: number } | null { const buf = Buffer.alloc(length); - fs.readSync(fd, buf, 0, length, Number(offset)); + fsext.seekSync(fd, Number(offset), 0); + fs.readSync(fd, buf, 0, length, null); const result = DataProtocol.deserializeRecord( buf, 0, diff --git a/lib/dat/writer.ts b/lib/dat/writer.ts index 42f60b2..57c44e2 100644 --- a/lib/dat/writer.ts +++ b/lib/dat/writer.ts @@ -1,13 +1,17 @@ // src/data-file/writer.ts import * as fs from 'node:fs'; +import * as fsext from 'fs-ext'; import { DATA_HEADER_SIZE } from './constants.js'; -import { DataProtocol } from './protocol.js'; +import { DataProtocol, BufferRef } from './protocol.js'; import { IndexWriter, IndexFileOptionsRequired } from '../idx/index.js'; import type { Serializer, DataFileOptions } from './types.js'; +const DEFAULT_DATA_BUF_SIZE = 4096; + export class DataWriter { private fd: number | null = null; - private headerBuf: Buffer | null = null; + private headerBuf: Buffer = Buffer.alloc(DATA_HEADER_SIZE); + private dataBufRef: BufferRef = { buf: Buffer.alloc(DEFAULT_DATA_BUF_SIZE) }; private currentOffset: bigint = BigInt(DATA_HEADER_SIZE); private recordCount = 0; @@ -65,17 +69,19 @@ export class DataWriter { this.fd = fs.openSync(this.dataPath, isNew || this.forceTruncate ? 'w+' : 'r+'); - try { - this.headerBuf = Buffer.alloc(DATA_HEADER_SIZE); + // 버퍼 초기화 + this.headerBuf.fill(0); + this.dataBufRef.buf.fill(0); + try { if (isNew || this.forceTruncate) { const header = DataProtocol.createHeader(); - fs.writeSync(this.fd, header, 0, DATA_HEADER_SIZE, 0); + fs.writeSync(this.fd, header, 0, DATA_HEADER_SIZE, null); this.currentOffset = BigInt(DATA_HEADER_SIZE); this.recordCount = 0; this.latestSequence = 0; } else { - fs.readSync(this.fd, this.headerBuf, 0, DATA_HEADER_SIZE, 0); + fs.readSync(this.fd, this.headerBuf, 0, DATA_HEADER_SIZE, null); const header = DataProtocol.readHeader(this.headerBuf); // Validate: Data file recordCount must match Index file writtenCnt @@ -88,6 +94,7 @@ export class DataWriter { this.currentOffset = header.fileSize; this.recordCount = header.recordCount; this.latestSequence = this.indexWriter.getLatestSequence(); + fsext.seekSync(this.fd, 0, 2); // 가장 마지막으로 } } catch (error) { // Clean up resources on error @@ -95,7 +102,8 @@ export class DataWriter { fs.closeSync(this.fd); this.fd = null; } - this.headerBuf = null; + this.headerBuf.fill(0); + this.dataBufRef.buf.fill(0); throw error; } } @@ -103,18 +111,19 @@ export class DataWriter { append(data: T, sequence?: number, timestamp?: bigint): number { if (this.fd === null) throw new Error('Data file not opened'); - const buf = DataProtocol.serializeRecord(data, this.serializer); + // dataBufRef에 직렬화 (내부에서 필요시 재할당) + const writtenLen = DataProtocol.serializeRecordTo(this.dataBufRef, data, this.serializer); const offset = this.currentOffset; - fs.writeSync(this.fd, buf, 0, buf.length, Number(offset)); + fs.writeSync(this.fd, this.dataBufRef.buf, 0, writtenLen, null); // Write to index file - this.indexWriter.write(offset, buf.length, sequence, timestamp); + this.indexWriter.write(offset, writtenLen, sequence, timestamp); // Update latestSequence to the most recent sequence this.latestSequence = this.indexWriter.getLatestSequence(); - this.currentOffset += BigInt(buf.length); + this.currentOffset += BigInt(writtenLen); ++this.recordCount; this.writeHeader(); @@ -151,14 +160,14 @@ export class DataWriter { } writeHeader(): void { - if (this.fd === null || !this.headerBuf) return; + if (this.fd === null) return; DataProtocol.updateHeader(this.headerBuf, this.currentOffset, this.recordCount); fs.writeSync(this.fd, this.headerBuf, 0, DATA_HEADER_SIZE, 0); } sync(): void { - if (this.fd === null || !this.headerBuf) return; + if (this.fd === null) return; DataProtocol.updateHeader(this.headerBuf, this.currentOffset, this.recordCount); fs.writeSync(this.fd, this.headerBuf, 0, DATA_HEADER_SIZE, 0); @@ -174,7 +183,8 @@ export class DataWriter { } this.indexWriter.close(); - this.headerBuf = null; + this.headerBuf.fill(0); + this.dataBufRef.buf.fill(0); } writeFlags(flags: number): void { diff --git a/lib/idx/protocol.ts b/lib/idx/protocol.ts index 4031c5d..36909e3 100644 --- a/lib/idx/protocol.ts +++ b/lib/idx/protocol.ts @@ -73,39 +73,15 @@ export class IndexProtocol { buf.writeUInt32LE(latestSequence, 36); } - static writeEntry(buf: Buffer, index: number, entry: Omit): void { - const off = INDEX_HEADER_SIZE + index * INDEX_ENTRY_SIZE; + static writeEntry(buf: Buffer, entry: Omit): void { + buf.writeUInt32LE(entry.sequence, 0); + buf.writeBigUInt64LE(entry.timestamp, 4); + buf.writeBigUInt64LE(entry.offset, 12); + buf.writeUInt32LE(entry.length, 20); + buf.writeUInt32LE(entry.flags | FLAG_VALID, 24); - buf.writeUInt32LE(entry.sequence, off); - buf.writeBigUInt64LE(entry.timestamp, off + 4); - buf.writeBigUInt64LE(entry.offset, off + 12); - buf.writeUInt32LE(entry.length, off + 20); - buf.writeUInt32LE(entry.flags | FLAG_VALID, off + 24); - - const checksum = crc32(buf, off, off + 28); - buf.writeUInt32LE(checksum, off + 28); - } - - static readEntry(buf: Buffer, index: number): IndexEntry | null { - const off = INDEX_HEADER_SIZE + index * INDEX_ENTRY_SIZE; - const flags = buf.readUInt32LE(off + 24); - - if (!(flags & FLAG_VALID)) return null; - - return { - sequence: buf.readUInt32LE(off), - timestamp: buf.readBigUInt64LE(off + 4), - offset: buf.readBigUInt64LE(off + 12), - length: buf.readUInt32LE(off + 20), - flags, - checksum: buf.readUInt32LE(off + 28), - }; - } - - static isValidEntry(buf: Buffer, index: number): boolean { - const off = INDEX_HEADER_SIZE + index * INDEX_ENTRY_SIZE; - const flags = buf.readUInt32LE(off + 24); - return (flags & FLAG_VALID) !== 0; + const checksum = crc32(buf, 0, 28); + buf.writeUInt32LE(checksum, 28); } static calcFileSize(entryCount: number): number { diff --git a/lib/idx/reader.ts b/lib/idx/reader.ts index 71c201c..79e4ed3 100644 --- a/lib/idx/reader.ts +++ b/lib/idx/reader.ts @@ -1,5 +1,6 @@ // src/index-file/reader.ts import * as fs from 'node:fs'; +import * as fsext from 'fs-ext'; import { INDEX_HEADER_SIZE, INDEX_ENTRY_SIZE, FLAG_VALID } from './constants.js'; import { IndexProtocol } from './protocol.js'; import type { IndexHeader, IndexEntry } from './types.js'; @@ -14,7 +15,8 @@ export class IndexReader { private currentIndex: number = -1; private currentSequence: number = -1; - // Reusable buffer for reading entries + // Reusable buffers + private headerBuf: Buffer = Buffer.alloc(INDEX_HEADER_SIZE); private entryBuffer: Buffer = Buffer.alloc(INDEX_ENTRY_SIZE); constructor() { @@ -22,12 +24,16 @@ export class IndexReader { open(idxFilePath: string): void { this.path = idxFilePath; + + // 버퍼 초기화 + this.headerBuf.fill(0); + this.entryBuffer.fill(0); + this.fd = fs.openSync(this.path, 'r'); // Read header only (64 bytes) - const headerBuf = Buffer.alloc(INDEX_HEADER_SIZE); - fs.readSync(this.fd, headerBuf, 0, INDEX_HEADER_SIZE, 0); - this.header = IndexProtocol.readHeader(headerBuf); + fs.readSync(this.fd, this.headerBuf, 0, INDEX_HEADER_SIZE, null); + this.header = IndexProtocol.readHeader(this.headerBuf); // 아예 최초에는 -1 로 두어서 readNextEntry 할때 자동 ++ 할때 오류를 검증 this.currentIndex = -1; this.currentSequence = -1; @@ -136,6 +142,10 @@ export class IndexReader { this.header = null; this.currentIndex = -1; this.currentSequence = -1; + + // 버퍼 초기화 + this.headerBuf.fill(0); + this.entryBuffer.fill(0); } // ###################################################################### @@ -143,7 +153,8 @@ export class IndexReader { // ###################################################################### private readEntryAt(fd: number, index: number): IndexEntry | null { const offset = INDEX_HEADER_SIZE + index * INDEX_ENTRY_SIZE; - fs.readSync(fd, this.entryBuffer, 0, INDEX_ENTRY_SIZE, offset); + fsext.seekSync(fd, offset, 0); + fs.readSync(fd, this.entryBuffer, 0, INDEX_ENTRY_SIZE, null); const flags = this.entryBuffer.readUInt32LE(24); const indexEntry = this.buildIndexEntry(flags); diff --git a/lib/idx/writer.ts b/lib/idx/writer.ts index 18d451d..e5a5e51 100644 --- a/lib/idx/writer.ts +++ b/lib/idx/writer.ts @@ -1,13 +1,14 @@ // src/index-file/writer.ts import * as fs from 'node:fs'; +import * as fsext from 'fs-ext'; import { INDEX_HEADER_SIZE, INDEX_ENTRY_SIZE, FLAG_VALID } from './constants.js'; import { IndexProtocol } from './protocol.js'; import { IndexFileOptionsRequired } from './types.js'; export class IndexWriter { private fd: number | null = null; - private headerBuf: Buffer | null = null; - private entryBuf: Buffer | null = null; + private headerBuf: Buffer = Buffer.alloc(INDEX_HEADER_SIZE); + private entryBuf: Buffer = Buffer.alloc(INDEX_ENTRY_SIZE); private writtenCnt = 0; private dataFileSize = 0n; @@ -16,6 +17,7 @@ export class IndexWriter { private path: string | null = null; private fileSize: number = 0; + // see IndexFileOptions private maxEntries: number = 0; private autoIncrementSequence: boolean = false; @@ -31,6 +33,10 @@ export class IndexWriter { const isNew = !fs.existsSync(this.path); + // 버퍼 초기화 + this.headerBuf.fill(0); + this.entryBuf.fill(0); + if (isNew || forceTruncate) { // New file: use provided values this.fileSize = IndexProtocol.calcFileSize(this.maxEntries); @@ -41,25 +47,18 @@ export class IndexWriter { this.fd = fs.openSync(this.path, 'w+'); fs.ftruncateSync(this.fd, this.fileSize); - // Allocate buffers for header and entry - this.headerBuf = Buffer.alloc(INDEX_HEADER_SIZE); - this.entryBuf = Buffer.alloc(INDEX_ENTRY_SIZE); - const header = IndexProtocol.createHeader(this.maxEntries, this.autoIncrementSequence); header.copy(this.headerBuf, 0); // Write header to file - fs.writeSync(this.fd, this.headerBuf, 0, INDEX_HEADER_SIZE, 0); + fs.writeSync(this.fd, this.headerBuf, 0, INDEX_HEADER_SIZE, null); fs.fsyncSync(this.fd); } else { // Existing file: read header first this.fd = fs.openSync(this.path, 'r+'); try { - this.headerBuf = Buffer.alloc(INDEX_HEADER_SIZE); - this.entryBuf = Buffer.alloc(INDEX_ENTRY_SIZE); - - fs.readSync(this.fd, this.headerBuf, 0, INDEX_HEADER_SIZE, 0); + fs.readSync(this.fd, this.headerBuf, 0, INDEX_HEADER_SIZE, null); const header = IndexProtocol.readHeader(this.headerBuf); if (this.maxEntries !== header.entryCount) { @@ -86,14 +85,17 @@ export class IndexWriter { this.writtenCnt = header.writtenCnt; this.dataFileSize = header.dataFileSize; this.latestSequence = header.latestSequence; + // Writer 중 이미 있었으면 가장 마지막으로 이동 + const fileOffset = INDEX_HEADER_SIZE + this.writtenCnt * INDEX_ENTRY_SIZE; + fsext.seekSync(this.fd, fileOffset, 0); } catch (error) { // Clean up resources on error if (this.fd !== null) { fs.closeSync(this.fd); this.fd = null; } - this.headerBuf = null; - this.entryBuf = null; + this.headerBuf.fill(0); + this.entryBuf.fill(0); throw error; } } @@ -107,7 +109,7 @@ export class IndexWriter { sequence?: number, timestamp?: bigint ): boolean { - if (!this.entryBuf || this.fd === null) throw new Error('Index file not opened'); + if (this.fd === null) throw new Error('Index file not opened'); if (this.writtenCnt >= this.maxEntries) { throw new Error(`Data count exceed provide : ${this.writtenCnt + 1} - max : ${this.maxEntries}`); } @@ -125,11 +127,8 @@ export class IndexWriter { const ts = timestamp ?? BigInt(Date.now()) * 1000000n; - // Create a temporary buffer for this entry - const tempBuf = Buffer.alloc(INDEX_HEADER_SIZE + (this.writtenCnt + 1) * INDEX_ENTRY_SIZE); - - // Write entry to temp buffer - IndexProtocol.writeEntry(tempBuf, this.writtenCnt, { + // Write entry to entryBuf (offset 0부터) + IndexProtocol.writeEntry(this.entryBuf, { sequence: seq, timestamp: ts, offset, @@ -137,11 +136,8 @@ export class IndexWriter { flags: FLAG_VALID, }); - // Calculate file offset for this entry - const fileOffset = INDEX_HEADER_SIZE + this.writtenCnt * INDEX_ENTRY_SIZE; - - // Write entry to file - fs.writeSync(this.fd, tempBuf, fileOffset, INDEX_ENTRY_SIZE, fileOffset); + // Write entry to file (현재 fd 위치에 쓰기) + fs.writeSync(this.fd, this.entryBuf, 0, INDEX_ENTRY_SIZE, null); this.writtenCnt++; this.latestSequence = seq; @@ -162,12 +158,12 @@ export class IndexWriter { } getFlags(): number { - if (!this.headerBuf) throw new Error('Index file not opened'); + if (this.fd === null) throw new Error('Index file not opened'); return this.headerBuf.readUInt32LE(41); } writeHeader(): void { - if (!this.headerBuf || this.fd === null) return; + if (this.fd === null) return; // Update header counts IndexProtocol.updateHeaderCounts( @@ -202,12 +198,12 @@ export class IndexWriter { this.fd = null; // 3. Clean up buffers - this.headerBuf = null; - this.entryBuf = null; + this.headerBuf.fill(0); + this.entryBuf.fill(0); } writeFlags(flags: number): void { - if (!this.headerBuf || this.fd === null) { + if (this.fd === null) { throw new Error('Index file not opened'); } diff --git a/package.json b/package.json index 9212c39..52d651b 100644 --- a/package.json +++ b/package.json @@ -13,8 +13,10 @@ "tsconfig.json" ], "dependencies": { - "typescript": "^5.7.0", - "@types/node": "^22.0.0" + "@types/fs-ext": "^2.0.3", + "@types/node": "^22.0.0", + "fs-ext": "^2.1.1", + "typescript": "^5.7.0" }, "scripts": { "prepare": "tsc -p tsconfig.json", @@ -23,6 +25,12 @@ "engines": { "node": ">=18.0.0" }, - "os": ["linux", "darwin"], - "cpu": ["x64", "arm64"] + "os": [ + "linux", + "darwin" + ], + "cpu": [ + "x64", + "arm64" + ] }