"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.NodeLock = exports.NodeLocker = exports.LOCK_STATE = void 0; const acebase_core_1 = require("acebase-core"); const assert_1 = require("./assert"); const DEBUG_MODE = false; const DEFAULT_LOCK_TIMEOUT = 120; // in seconds exports.LOCK_STATE = { PENDING: 'pending', LOCKED: 'locked', EXPIRED: 'expired', DONE: 'done', }; class NodeLocker { /** * Provides locking mechanism for nodes, ensures no simultanious read and writes happen to overlapping paths */ constructor(debug, lockTimeout = DEFAULT_LOCK_TIMEOUT) { this._locks = []; this._lastTid = 0; this.debug = debug; this.timeout = lockTimeout * 1000; } setTimeout(timeout) { this.timeout = timeout * 1000; } createTid() { return DEBUG_MODE ? ++this._lastTid : acebase_core_1.ID.generate(); } _allowLock(path, tid, forWriting) { /** * Disabled path locking because of the following issue: * * Process 1 requests WRITE lock on "/users/ewout", is GRANTED * Process 2 requests READ lock on "", is DENIED (process 1 writing to a descendant) * Process 3 requests WRITE lock on "/posts/post1", is GRANTED * Process 1 requests READ lock on "/" because of bound events, is DENIED (3 is writing to a descendant) * Process 3 requests READ lock on "/" because of bound events, is DENIED (1 is writing to a descendant) * * --> DEADLOCK! * * Now simply makes sure one transaction has write access at the same time, * might change again in the future... */ const conflict = this._locks .find(otherLock => { return (otherLock.tid !== tid && otherLock.state === exports.LOCK_STATE.LOCKED && (forWriting || otherLock.forWriting)); }); return { allow: !conflict, conflict }; } quit() { return new Promise(resolve => { if (this._locks.length === 0) { return resolve(); } this._quit = resolve; }); } /** * Safely reject a pending lock, catching any unhandled promise rejections (that should not happen in the first place, obviously) * @param lock */ _rejectLock(lock, err) { this._locks.splice(this._locks.indexOf(lock), 1); // Remove from queue clearTimeout(lock.timeout); try { lock.reject(err); } catch (err) { console.error(`Unhandled promise rejection:`, err); } } _processLockQueue() { if (this._quit) { // Reject all pending locks const quitError = new Error('Quitting'); this._locks .filter(lock => lock.state === exports.LOCK_STATE.PENDING) .forEach(lock => this._rejectLock(lock, quitError)); // Resolve quit promise if queue is empty: if (this._locks.length === 0) { this._quit(); } } const pending = this._locks .filter(lock => lock.state === exports.LOCK_STATE.PENDING) .sort((a, b) => { // // Writes get higher priority so all reads get the most recent data // if (a.forWriting === b.forWriting) { // if (a.requested < b.requested) { return -1; } // else { return 1; } // } // else if (a.forWriting) { return -1; } if (a.priority && !b.priority) { return -1; } else if (!a.priority && b.priority) { return 1; } return a.requested - b.requested; }); pending.forEach(lock => { const check = this._allowLock(lock.path, lock.tid, lock.forWriting); lock.waitingFor = check.conflict || null; if (check.allow) { this.lock(lock) .then(lock.resolve) .catch(err => this._rejectLock(lock, err)); } }); } async lock(path, tid, forWriting = true, comment = '', options = { withPriority: false, noTimeout: false }) { let lock, proceed; if (path instanceof NodeLock) { lock = path; //lock.comment = `(retry: ${lock.comment})`; proceed = true; } else if (this._locks.findIndex((l => l.tid === tid && l.state === exports.LOCK_STATE.EXPIRED)) >= 0) { throw new Error(`lock on tid ${tid} has expired, not allowed to continue`); } else if (this._quit && !options.withPriority) { throw new Error(`Quitting`); } else { DEBUG_MODE && console.error(`${forWriting ? 'write' : 'read'} lock requested on "${path}" by tid ${tid} (${comment})`); // // Test the requested lock path // let duplicateKeys = getPathKeys(path) // .reduce((r, key) => { // let i = r.findIndex(c => c.key === key); // if (i >= 0) { r[i].count++; } // else { r.push({ key, count: 1 }) } // return r; // }, []) // .filter(c => c.count > 1) // .map(c => c.key); // if (duplicateKeys.length > 0) { // console.log(`ALERT: Duplicate keys found in path "/${path}"`.colorize([ColorStyle.dim, ColorStyle.bgRed]); // } lock = new NodeLock(this, path, tid, forWriting, options.withPriority === true); lock.comment = comment; this._locks.push(lock); const check = this._allowLock(path, tid, forWriting); lock.waitingFor = check.conflict || null; proceed = check.allow; } if (proceed) { DEBUG_MODE && console.error(`${lock.forWriting ? 'write' : 'read'} lock ALLOWED on "${lock.path}" by tid ${lock.tid} (${lock.comment})`); lock.state = exports.LOCK_STATE.LOCKED; if (typeof lock.granted === 'number') { //debug.warn(`lock :: ALLOWING ${lock.forWriting ? "write" : "read" } lock on path "/${lock.path}" by tid ${lock.tid}; ${lock.comment}`); } else { lock.granted = Date.now(); if (options.noTimeout !== true) { lock.expires = Date.now() + this.timeout; //debug.warn(`lock :: GRANTED ${lock.forWriting ? "write" : "read" } lock on path "/${lock.path}" by tid ${lock.tid}; ${lock.comment}`); let timeoutCount = 0; const timeoutHandler = () => { // Autorelease timeouts must only fire when there is something wrong in the // executing (AceBase) code, eg an unhandled promise rejection causing a lock not // to be released. To guard against programming errors, we will issue 3 warning // messages before releasing the lock. if (lock.state !== exports.LOCK_STATE.LOCKED) { return; } timeoutCount++; if (timeoutCount <= 3) { // Warn first. this.debug.warn(`${lock.forWriting ? 'write' : 'read'} lock on path "/${lock.path}" by tid ${lock.tid} (${lock.comment}) is taking a long time to complete [${timeoutCount}]`); lock.timeout = setTimeout(timeoutHandler, this.timeout / 4); return; } this.debug.error(`lock :: ${lock.forWriting ? 'write' : 'read'} lock on path "/${lock.path}" by tid ${lock.tid} (${lock.comment}) took too long`); lock.state = exports.LOCK_STATE.EXPIRED; // let allTransactionLocks = _locks.filter(l => l.tid === lock.tid).sort((a,b) => a.requested < b.requested ? -1 : 1); // let transactionsDebug = allTransactionLocks.map(l => `${l.state} ${l.forWriting ? "WRITE" : "read"} ${l.comment}`).join("\n"); // debug.error(transactionsDebug); this._processLockQueue(); }; lock.timeout = setTimeout(timeoutHandler, this.timeout / 4); } } return lock; } else { // Keep pending until clashing lock(s) is/are removed //debug.warn(`lock :: QUEUED ${lock.forWriting ? "write" : "read" } lock on path "/${lock.path}" by tid ${lock.tid}; ${lock.comment}`); (0, assert_1.assert)(lock.state === exports.LOCK_STATE.PENDING); return new Promise((resolve, reject) => { lock.resolve = resolve; lock.reject = reject; }); } } unlock(lockOrId, comment, processQueue = true) { let lock, i; if (lockOrId instanceof NodeLock) { lock = lockOrId; i = this._locks.indexOf(lock); } else { const id = lockOrId; i = this._locks.findIndex(l => l.id === id); lock = this._locks[i]; } if (i < 0) { const msg = `lock on "/${lock.path}" for tid ${lock.tid} wasn't found; ${comment}`; // debug.error(`unlock :: ${msg}`); throw new Error(msg); } lock.state = exports.LOCK_STATE.DONE; clearTimeout(lock.timeout); this._locks.splice(i, 1); DEBUG_MODE && console.error(`${lock.forWriting ? 'write' : 'read'} lock RELEASED on "${lock.path}" by tid ${lock.tid}`); //debug.warn(`unlock :: RELEASED ${lock.forWriting ? "write" : "read" } lock on "/${lock.path}" for tid ${lock.tid}; ${lock.comment}; ${comment}`); processQueue && this._processLockQueue(); return lock; } list() { return this._locks || []; } isAllowed(path, tid, forWriting) { return this._allowLock(path, tid, forWriting).allow; } } exports.NodeLocker = NodeLocker; let lastid = 0; class NodeLock { /** * Constructor for a record lock * @param {NodeLocker} locker * @param {string} path * @param {string} tid * @param {boolean} forWriting * @param {boolean} priority */ constructor(locker, path, tid, forWriting, priority = false) { this.locker = locker; this.path = path; this.tid = tid; this.forWriting = forWriting; this.priority = priority; this.state = exports.LOCK_STATE.PENDING; this.requested = Date.now(); this.comment = ''; this.waitingFor = null; this.id = ++lastid; this.history = []; } static get LOCK_STATE() { return exports.LOCK_STATE; } async release(comment) { //return this.storage.unlock(this.path, this.tid, comment); this.history.push({ action: 'release', path: this.path, forWriting: this.forWriting, comment }); return this.locker.unlock(this, comment || this.comment); } async moveToParent() { const parentPath = acebase_core_1.PathInfo.get(this.path).parentPath; //getPathInfo(this.path).parent; const allowed = this.locker.isAllowed(parentPath, this.tid, this.forWriting); //_allowLock(parentPath, this.tid, this.forWriting); if (allowed) { DEBUG_MODE && console.error(`moveToParent ALLOWED for ${this.forWriting ? 'write' : 'read'} lock on "${this.path}" by tid ${this.tid} (${this.comment})`); this.history.push({ path: this.path, forWriting: this.forWriting, action: 'moving to parent' }); this.waitingFor = null; this.path = parentPath; // this.comment = `moved to parent: ${this.comment}`; return this; } else { // Unlock without processing the queue DEBUG_MODE && console.error(`moveToParent QUEUED for ${this.forWriting ? 'write' : 'read'} lock on "${this.path}" by tid ${this.tid} (${this.comment})`); this.locker.unlock(this, `moveLockToParent: ${this.comment}`, false); // Lock parent node with priority to jump the queue const newLock = await this.locker.lock(parentPath, this.tid, this.forWriting, this.comment, { withPriority: true }); DEBUG_MODE && console.error(`QUEUED moveToParent ALLOWED for ${this.forWriting ? 'write' : 'read'} lock on "${this.path}" by tid ${this.tid} (${this.comment})`); newLock.history = this.history; newLock.history.push({ path: this.path, forWriting: this.forWriting, action: 'moving to parent through queue (priority)' }); return newLock; } } } exports.NodeLock = NodeLock; //# sourceMappingURL=node-lock.js.map