From 8daa2b30585c2d082cb1cb6f3fb9bb39e73dc151 Mon Sep 17 00:00:00 2001 From: Ewout Stortenbeker Date: Mon, 1 May 2023 17:16:09 +0200 Subject: [PATCH] Work on socket IPC implementation: - use spawn instead of fork to start service, pass additional args - do not bind to service stdout and stderr to allow process exit - use db logger and level - use Buffer.subarray instead of deprecated Buffer.slice --- src/ipc/socket.ts | 62 ++++++++++++++++++++++++++--------------------- 1 file changed, 34 insertions(+), 28 deletions(-) diff --git a/src/ipc/socket.ts b/src/ipc/socket.ts index a535c02..2a559a0 100644 --- a/src/ipc/socket.ts +++ b/src/ipc/socket.ts @@ -1,11 +1,11 @@ import { Socket, connect, Server } from 'net'; import { resolve as resolvePath } from 'path'; -import { fork } from 'child_process'; +import { spawn } from 'child_process'; import { AceBaseIPCPeer, IHelloMessage, IMessage } from './ipc'; import { Storage } from '../storage'; import { ID, Transport } from 'acebase-core'; import { getSocketPath, MSG_DELIMITER } from './service/shared'; -import { startServer } from './service'; +// import { startServer } from './service'; export { Server as NetIPCServer } from 'net'; const masterPeerId = '[master]'; @@ -18,9 +18,12 @@ interface EventEmitterLike { } /** - * Node cluster functionality - enables vertical scaling with forked processes. AceBase will enable IPC at startup, so - * any forked process will communicate database changes and events automatically. Locking of resources will be done by - * the cluster's primary (previously master) process. NOTE: if the master process dies, all peers stop working + * Socket IPC implementation. Peers will attempt starting up a dedicated service process for the target database, + * or connect to an already running process. The service acts as the IPC master and governs over locks, space allocation + * and communication between peers. Communication between the processes is done using (very fast in-memory) Unix sockets. + * This IPC implementation allows different processes on a single machine to access the same database simultaniously without + * them having to explicitly configure their IPC settings. + * Currently can be used by passing `ipc: 'socket'` in AceBase's `storage` settings, will become the default soon. */ export class IPCSocketPeer extends AceBaseIPCPeer { @@ -51,16 +54,18 @@ export class IPCSocketPeer extends AceBaseIPCPeer { }); if (!isMaster) { - // Try starting IPC service if it is not running yet - const service = fork(__dirname + '/service/start.js', [dbFile], { detached: true, stdio: 'inherit' }); - service.unref(); // Process is detached and allowed to keep running after we exit - bindEventHandler(service, 'exit', (code, signal) => { - console.log(`Service exited with code ${code}`); - }); + // Try starting IPC service if it is not running yet. + // Use maxIdleTime 0 to allow tests to remove database files when done, make this configurable! + const service = spawn('node', [__dirname + '/service/start.js', dbFile, '--loglevel', storage.debug.level, '--maxidletime', '0'], { detached: true, stdio: 'ignore' }); + service.unref(); // Process is detached and allowed to keep running after we exit. Do not keep a reference to it, possibly preventing app exit. - // // For testing: - // startServer(dbFile, (code) => { - // console.log(`Service exited with code ${code}`); + // For testing: + // startServer(dbFile, { + // maxIdleTime: 0, + // logLevel: storage.debug.level, + // exit: (code) => { + // storage.debug.log(`[IPC ${ipcSettings.ipcName}] service exited with code ${code}`); + // }, // }); } @@ -117,18 +122,18 @@ export class IPCSocketPeer extends AceBaseIPCPeer { } // Extract message from buffer - const message = buffer.slice(0, delimiterIndex); - buffer = buffer.slice(delimiterIndex + MSG_DELIMITER.length); + const message = buffer.subarray(0, delimiterIndex); + buffer = buffer.subarray(delimiterIndex + MSG_DELIMITER.length); try { const json = message.toString('utf-8'); - // console.log(`Received socket message: `, json); + // storage.debug.log(`[IPC ${ipcSettings.ipcName}] Received socket message: `, json); const serialized = JSON.parse(json); const msg = Transport.deserialize2(serialized); handleMessage(socket, msg); } catch (err) { - console.error(`Error parsing message: ${err}`); + storage.debug.error(`[IPC ${ipcSettings.ipcName}] Error parsing message: ${err}`); } } }); @@ -152,12 +157,13 @@ export class IPCSocketPeer extends AceBaseIPCPeer { const connectSocket = async (path: string) => { const tryConnect = async (tries: number): Promise => { try { + if (this._exiting) { return; } const s = connect({ path }); await new Promise((resolve, reject) => { - s.once('error', reject); - s.once('connect', resolve); + s.once('error', reject).unref(); + s.once('connect', resolve).unref(); }); - console.log(`IPC peer ${this.id} successfully established connection to the server`); + storage.debug.log(`[IPC ${ipcSettings.ipcName}] peer ${this.id} successfully established connection to the service`); socket = s; connected = true; } @@ -167,19 +173,19 @@ export class IPCSocketPeer extends AceBaseIPCPeer { await new Promise(resolve => setTimeout(resolve, 100)); return tryConnect(tries + 1); } - console.error(err.message); + storage.debug.error(`[IPC ${ipcSettings.ipcName}] peer ${this.id} cannot connect to service: ${err.message}`); throw err; } }; await tryConnect(1); this.once('exit', () => { - socket.destroy(); + socket?.destroy(); }); bindEventHandler(socket, 'close', (hadError) => { // Connection to server closed - console.log(`IPC peer ${this.id} lost its connection to the server${hadError ? ' because of an error' : ''}`); + storage.debug.log(`IPC peer ${this.id} lost its connection to the service${hadError ? ' because of an error' : ''}`); }); let buffer = Buffer.alloc(0); // Buffer to store incomplete messages @@ -194,18 +200,18 @@ export class IPCSocketPeer extends AceBaseIPCPeer { } // Extract message from buffer - const message = buffer.slice(0, delimiterIndex); - buffer = buffer.slice(delimiterIndex + MSG_DELIMITER.length); + const message = buffer.subarray(0, delimiterIndex); + buffer = buffer.subarray(delimiterIndex + MSG_DELIMITER.length); try { const json = message.toString('utf-8'); - // console.log(`Received server message: `, json); + // storage.debug.log(`Received server message: `, json); const serialized = JSON.parse(json); const msg = Transport.deserialize2(serialized); handleMessage(socket, msg); } catch (err) { - console.error(`Error parsing message: ${err}`); + storage.debug.error(`Error parsing message: ${err}`); } } });