Work on socket IPC implementation:

- use spawn instead of fork to start service, pass additional args
- do not bind to service stdout and stderr to allow process exit
- use db logger and level
- use Buffer.subarray instead of deprecated Buffer.slice
This commit is contained in:
Ewout Stortenbeker 2023-05-01 17:16:09 +02:00
parent 22d04ca2d8
commit 8daa2b3058

View file

@ -1,11 +1,11 @@
import { Socket, connect, Server } from 'net';
import { resolve as resolvePath } from 'path';
import { fork } from 'child_process';
import { spawn } from 'child_process';
import { AceBaseIPCPeer, IHelloMessage, IMessage } from './ipc';
import { Storage } from '../storage';
import { ID, Transport } from 'acebase-core';
import { getSocketPath, MSG_DELIMITER } from './service/shared';
import { startServer } from './service';
// import { startServer } from './service';
export { Server as NetIPCServer } from 'net';
const masterPeerId = '[master]';
@ -18,9 +18,12 @@ interface EventEmitterLike {
}
/**
* Node cluster functionality - enables vertical scaling with forked processes. AceBase will enable IPC at startup, so
* any forked process will communicate database changes and events automatically. Locking of resources will be done by
* the cluster's primary (previously master) process. NOTE: if the master process dies, all peers stop working
* Socket IPC implementation. Peers will attempt starting up a dedicated service process for the target database,
* or connect to an already running process. The service acts as the IPC master and governs over locks, space allocation
* and communication between peers. Communication between the processes is done using (very fast in-memory) Unix sockets.
* This IPC implementation allows different processes on a single machine to access the same database simultaniously without
* them having to explicitly configure their IPC settings.
* Currently can be used by passing `ipc: 'socket'` in AceBase's `storage` settings, will become the default soon.
*/
export class IPCSocketPeer extends AceBaseIPCPeer {
@ -51,16 +54,18 @@ export class IPCSocketPeer extends AceBaseIPCPeer {
});
if (!isMaster) {
// Try starting IPC service if it is not running yet
const service = fork(__dirname + '/service/start.js', [dbFile], { detached: true, stdio: 'inherit' });
service.unref(); // Process is detached and allowed to keep running after we exit
bindEventHandler(service, 'exit', (code, signal) => {
console.log(`Service exited with code ${code}`);
});
// Try starting IPC service if it is not running yet.
// Use maxIdleTime 0 to allow tests to remove database files when done, make this configurable!
const service = spawn('node', [__dirname + '/service/start.js', dbFile, '--loglevel', storage.debug.level, '--maxidletime', '0'], { detached: true, stdio: 'ignore' });
service.unref(); // Process is detached and allowed to keep running after we exit. Do not keep a reference to it, possibly preventing app exit.
// // For testing:
// startServer(dbFile, (code) => {
// console.log(`Service exited with code ${code}`);
// For testing:
// startServer(dbFile, {
// maxIdleTime: 0,
// logLevel: storage.debug.level,
// exit: (code) => {
// storage.debug.log(`[IPC ${ipcSettings.ipcName}] service exited with code ${code}`);
// },
// });
}
@ -117,18 +122,18 @@ export class IPCSocketPeer extends AceBaseIPCPeer {
}
// Extract message from buffer
const message = buffer.slice(0, delimiterIndex);
buffer = buffer.slice(delimiterIndex + MSG_DELIMITER.length);
const message = buffer.subarray(0, delimiterIndex);
buffer = buffer.subarray(delimiterIndex + MSG_DELIMITER.length);
try {
const json = message.toString('utf-8');
// console.log(`Received socket message: `, json);
// storage.debug.log(`[IPC ${ipcSettings.ipcName}] Received socket message: `, json);
const serialized = JSON.parse(json);
const msg = Transport.deserialize2(serialized);
handleMessage(socket, msg);
}
catch (err) {
console.error(`Error parsing message: ${err}`);
storage.debug.error(`[IPC ${ipcSettings.ipcName}] Error parsing message: ${err}`);
}
}
});
@ -152,12 +157,13 @@ export class IPCSocketPeer extends AceBaseIPCPeer {
const connectSocket = async (path: string) => {
const tryConnect = async (tries: number): Promise<void> => {
try {
if (this._exiting) { return; }
const s = connect({ path });
await new Promise<void>((resolve, reject) => {
s.once('error', reject);
s.once('connect', resolve);
s.once('error', reject).unref();
s.once('connect', resolve).unref();
});
console.log(`IPC peer ${this.id} successfully established connection to the server`);
storage.debug.log(`[IPC ${ipcSettings.ipcName}] peer ${this.id} successfully established connection to the service`);
socket = s;
connected = true;
}
@ -167,19 +173,19 @@ export class IPCSocketPeer extends AceBaseIPCPeer {
await new Promise(resolve => setTimeout(resolve, 100));
return tryConnect(tries + 1);
}
console.error(err.message);
storage.debug.error(`[IPC ${ipcSettings.ipcName}] peer ${this.id} cannot connect to service: ${err.message}`);
throw err;
}
};
await tryConnect(1);
this.once('exit', () => {
socket.destroy();
socket?.destroy();
});
bindEventHandler(socket, 'close', (hadError) => {
// Connection to server closed
console.log(`IPC peer ${this.id} lost its connection to the server${hadError ? ' because of an error' : ''}`);
storage.debug.log(`IPC peer ${this.id} lost its connection to the service${hadError ? ' because of an error' : ''}`);
});
let buffer = Buffer.alloc(0); // Buffer to store incomplete messages
@ -194,18 +200,18 @@ export class IPCSocketPeer extends AceBaseIPCPeer {
}
// Extract message from buffer
const message = buffer.slice(0, delimiterIndex);
buffer = buffer.slice(delimiterIndex + MSG_DELIMITER.length);
const message = buffer.subarray(0, delimiterIndex);
buffer = buffer.subarray(delimiterIndex + MSG_DELIMITER.length);
try {
const json = message.toString('utf-8');
// console.log(`Received server message: `, json);
// storage.debug.log(`Received server message: `, json);
const serialized = JSON.parse(json);
const msg = Transport.deserialize2(serialized);
handleMessage(socket, msg);
}
catch (err) {
console.error(`Error parsing message: ${err}`);
storage.debug.error(`Error parsing message: ${err}`);
}
}
});