Added support for wildcards and variables in path event subscriptions

This commit is contained in:
Ewout Stortenbeker 2018-12-21 14:41:39 +01:00
parent 1219fa370e
commit 60f0bd8b2e
2 changed files with 256 additions and 101 deletions

View file

@ -125,7 +125,7 @@ class NodeAllocation {
}
toString() {
this.normalize();
// this.normalize();
return this.ranges.map(range => {
return `${range.pageNr},${range.recordNr}+${range.length-1}`;
})
@ -280,6 +280,14 @@ class NodeLock {
this.waitingFor = null;
}
static list() {
return this._locks || [];
}
static isAllowed(path, tid, forWriting) {
return _allowLock(path, tid, forWriting).allow;
}
release(comment) {
//return this.storage.unlock(this.path, this.tid, comment);
return NodeLock.unlock(this, comment || this.comment);
@ -300,7 +308,7 @@ class NodeLock {
proceed = true;
}
else if (_locks.findIndex((l => l.tid === tid && l.state === NodeLock.LOCK_STATE.EXPIRED)) >= 0) {
return Promise.reject(`lock on tid ${tid} has expired, not allowed to continue`);
return Promise.reject(new Error(`lock on tid ${tid} has expired, not allowed to continue`));
}
else {
@ -1655,6 +1663,136 @@ class Node {
return Node.getInfo(storage, path, options);
}
// /**
// * Updates or overwrite an existing node, or creates a new node. Handles storing of subnodes,
// * freeing old node and subnodes allocation, updating/creation of parent nodes, and removing
// * old cache entries. Triggers event notifications and index updates after the update succeeds.
// * @param {Storage} storage
// * @param {string} path
// * @param {any} value Any value will do. If the value is small enough to be stored in a parent record, it will take care of it
// * @param {{ merge?: boolean, tid?: string }} options
// */
// static update(storage, path, value, options = { merge: true, tid: undefined, _internal: false }) {
// const tid = options.tid;// || ID.generate();
// const pathInfo = PathInfo.get(path); // getPathInfo(path);
// if (value === null) {
// // Deletion of node is requested. Update parent
// return Node.update(storage, pathInfo.parentPath, { [pathInfo.key]: null }, { merge: true, tid });
// }
// if (path !== "" && _valueFitsInline(storage, value)) {
// // Simple value, update parent instead
// return Node.update(storage, pathInfo.parentPath, { [pathInfo.key]: value }, { merge: true, tid });
// }
// if (typeof value !== 'object' && options.merge) {
// // Make sure merge option is swtiched off if value is not an object
// options.merge = false;
// }
// if (!this._updateSync) { this._updateSync = new Map(); }
// let storageUpdates = this._updateSync.get(storage.name);
// if (!storageUpdates) {
// storageUpdates = { storage, pathUpdates: new Map() };
// this._updateSync.set(storage.name, storageUpdates);
// }
// let pathUpdates = storageUpdates.pathUpdates.get(path);
// if (!pathUpdates) {
// pathUpdates = { postponed: 0, updates: [] };
// storageUpdates.pathUpdates.set(path, pathUpdates);
// }
// else if (pathUpdates.timeout) { // && pathUpdates.postponed < 5
// clearTimeout(pathUpdates.timeout);
// delete pathUpdates.timeout;
// pathUpdates.postponed++;
// }
// const merge = () => {
// storageUpdates.pathUpdates.delete(path);
// let mergedValue = {};
// let internal = true;
// let merge = true;
// for(let i = 0; i < pathUpdates.updates.length; i++) {
// let update = pathUpdates.updates[i];
// if (!update.options.merge) {
// merge = false;
// mergedValue = update.value;
// }
// else {
// Object.keys(update.value).forEach(key => {
// mergedValue[key] = update.value[key];
// });
// }
// internal = internal && update.options._internal;
// }
// const go = (tid) => {
// pathUpdates.updates.length > 1 && console.log(`Processing ${pathUpdates.updates.length} merged updates on node "/${path}"`);
// let mergedOptions = { _internal: internal, merge, tid };
// return this._update(storage, path, mergedValue, mergedOptions)
// .then(() => {
// for(let i = 0; i < pathUpdates.updates.length; i++) {
// let update = pathUpdates.updates[i];
// update.resolve();
// }
// });
// }
// if (pathUpdates.lockPromise) {
// pathUpdates.lockPromise.then(lock => {
// go(lock.tid).then(() => lock.release());
// });
// }
// else {
// const findBestTid = () => {
// const tids = pathUpdates.updates.map(update => update.options && update.options.tid).filter(tid => tid);
// if (tids.length === 0) { return; }
// const locks = NodeLock.list().filter(lock => tids.findIndex(tid => tid === lock.tid) >= 0);
// if (locks.length > 0) {
// const exact = locks.find(lock => lock.path === path && lock.forWriting && lock.state === NodeLock.LOCK_STATE.LOCKED);
// if (exact) { return exact.tid; }
// const best = locks.find(lock => NodeLock.isAllowed(path, lock.tid, true));
// if (best) { return best.tid; }
// }
// const potential = tids.find(tid => NodeLock.isAllowed(path, tid, true));
// if (potential) { return potential; }
// return tids[0];
// }
// go(findBestTid() || ID.generate());
// }
// };
// if (pathUpdates.postponed === 10) {
// // Do it now and start afresh
// merge();
// pathUpdates = { postponed: 0, updates: [] };
// storageUpdates.pathUpdates.set(path, pathUpdates);
// }
// const update = {
// value,
// options,
// resolve: undefined,
// reject: undefined,
// };
// const promise = new Promise((res, rej) => {
// update.resolve = res;
// update.reject = rej;
// });
// pathUpdates.last = Date.now();
// pathUpdates.updates.push(update);
// if (!pathUpdates.lockPromise && tid && NodeLock.isAllowed(path, tid, true)) {
// pathUpdates.lockPromise = NodeLock.lock(path, tid, true);
// }
// // process.nextTick(merge);
// pathUpdates.timeout = setTimeout(merge, 0);
// return promise;
// }
/**
* Updates or overwrite an existing node, or creates a new node. Handles storing of subnodes,
* freeing old node and subnodes allocation, updating/creation of parent nodes, and removing
@ -1669,8 +1807,7 @@ class Node {
// debug.log(`Update request for node "/${path}"`);
const tid = options.tid || ID.generate();
const pathInfo = PathInfo.get(path); // getPathInfo(path);
// const lockPath = pathInfo.parent || path;
const pathInfo = PathInfo.get(path);
if (value === null) {
// Deletion of node is requested. Update parent
@ -1682,36 +1819,37 @@ class Node {
return Node.update(storage, pathInfo.parentPath, { [pathInfo.key]: value }, { merge: true, tid });
}
let eventSubscriptions = options._internal
? []
: storage.subscriptions.getValueSubscribersForPath(path);
let topEventPath = path;
let hasValueSubscribers = false;
if (eventSubscriptions.length > 0) {
let eventPaths = eventSubscriptions
.map(sub => { return { path: sub.dataPath, keys: PathInfo.getPathKeys(sub.dataPath) }; })
.sort((a,b) => {
if (a.keys.length < b.keys.length) return -1;
else if (a.keys.length > b.keys.length) return 1;
return 0;
});
let first = eventPaths[0];
topEventPath = first.path;
hasValueSubscribers = eventSubscriptions.length > 0;
// Now get all subscriptions that should execute on the data (includes events on child nodes as well)
let eventSubscriptions = [];
if (!options._internal) {
// Get all subscriptions that should execute on the data (includes events on child nodes as well)
eventSubscriptions = storage.subscriptions.getAllSubscribersForPath(path);
// Get all subscriptions for data on this or ancestor nodes, determines what data to load before processing
const valueSubscribers = storage.subscriptions.getValueSubscribersForPath(path);
if (valueSubscribers.length > 0) {
hasValueSubscribers = true;
let eventPaths = valueSubscribers
.map(sub => { return { path: sub.dataPath, keys: PathInfo.getPathKeys(sub.dataPath) }; })
.sort((a,b) => {
if (a.keys.length < b.keys.length) return -1;
else if (a.keys.length > b.keys.length) return 1;
return 0;
});
let first = eventPaths[0];
topEventPath = first.path;
}
}
/** @type {NodeLock} */
let eventDataLock;
/** @type {NodeLock} */
let lock;
let topEventData;
return NodeLock.lock(topEventPath, tid, true, `Node.update (get topEventPath "/${topEventPath}")`)
.then(l => {
lock = l; //eventDataLock = l;
lock = l;
return Node.getInfo(storage, topEventPath, { tid });
})
.then(eventNodeInfo => {
@ -1769,7 +1907,14 @@ class Node {
return Node.update(storage, pathInfo.parentPath, { [pathInfo.key]: new InternalNodeReference(recordInfo.valueType, recordInfo.address) }, { merge: true, tid: lock.tid, _internal: true });
})
.then(() => true);
// parentUpdatePromise = Node.update(storage, pathInfo.parentPath, { [pathInfo.key]: new InternalNodeReference(recordInfo.valueType, recordInfo.address) }, { merge: true, tid: lock.tid, _internal: true })
// .then(() => true);
// NodeLock.unlock(lock, 'Node._update: update parent', false);
}
// else {
// lock && lock.release();
// }
return parentUpdatePromise
.then(parentUpdated => {
@ -1780,6 +1925,7 @@ class Node {
if (deallocate && deallocate.totalAddresses > 0) {
// Release record allocation marked for deallocation
deallocate.normalize();
debug.log(`Releasing ${deallocate.totalAddresses} addresses (${deallocate.ranges.length} ranges) previously used by node "/${path}" and/or descendants: ${deallocate}`.gray);
// // TEMP check, remove loop when all is good:
@ -1949,7 +2095,7 @@ class Node {
});
});
const callSubscriberWithValues = (sub, oldValue, newValue, wildcardKey = undefined) => {
const callSubscriberWithValues = (sub, oldValue, newValue, variables = []) => {
let trigger = true;
let type = sub.type;
if (type.startsWith('notify_')) {
@ -1968,12 +2114,18 @@ class Node {
else if (type === "child_removed") {
trigger = oldValue !== null && newValue === null;
}
// let dataPath = sub.dataPath;
// if (dataPath.endsWith('/*')) {
// dataPath = dataPath.substr(0, dataPath.length-1);
// dataPath += wildcardKey;
// }
let dataPath = sub.dataPath;
if (dataPath.endsWith('/*')) {
dataPath = dataPath.substr(0, dataPath.length-1);
dataPath += wildcardKey;
}
trigger && storage.subscriptions.trigger(sub.type, sub.path, dataPath, oldValue, newValue);
variables.forEach((variable, index) => {
// only replaces first occurrence (so multiple *'s will be processed 1 by 1)
const safeVarName = variable.name === '*' ? '\\*' : variable.name.replace('$', '\\$');
dataPath = dataPath.replace(new RegExp(`(^|/)${safeVarName}([/\[]|$)`), `$1${variable.value}$2`);
})
trigger && storage.subscriptions.trigger(sub.type, sub.subscriptionPath, dataPath, oldValue, newValue);
};
// Now... trigger all events
@ -1988,42 +2140,42 @@ class Node {
return 0;
})
.forEach(sub => {
let trailPath = sub.dataPath.slice(topEventPath.length).replace(/^\//, '');
let trailKeys = PathInfo.getPathKeys(trailPath);
let oldValue = topEventData;
let newValue = newTopEventData;
while (trailKeys.length > 0) {
let subKey = trailKeys.shift();
if (subKey === '*') {
// Fire on all relevant child keys (compare!)
let allKeys = oldValue === null ? [] : Object.keys(oldValue);
newValue !== null && Object.keys(newValue).forEach(key => {
if (allKeys.indexOf(key) < 0) {
allKeys.push(key);
}
});
allKeys.forEach(key => {
// let val1 = oldValue === null ? null : oldValue[key];
// if (typeof val1 === 'undefined') { val1 = null; }
// let val2 = newValue === null ? null : newValue[key];
// if (typeof val2 === 'undefined') { val2 = null; }
let childValues = getChildValues(key, oldValue, newValue);
callSubscriberWithValues(sub, childValues.oldValue, childValues.newValue, key);
})
return;
const process = (currentPath, oldValue, newValue, variables = []) => {
let trailPath = sub.dataPath.slice(currentPath.length).replace(/^\//, '');
let trailKeys = PathInfo.getPathKeys(trailPath);
while (trailKeys.length > 0) {
let subKey = trailKeys.shift();
if (typeof subKey === 'string' && (subKey === '*' || subKey[0] === '$')) {
// Fire on all relevant child keys
let allKeys = oldValue === null ? [] : Object.keys(oldValue);
newValue !== null && Object.keys(newValue).forEach(key => {
if (allKeys.indexOf(key) < 0) {
allKeys.push(key);
}
});
allKeys.forEach(key => {
const childValues = getChildValues(key, oldValue, newValue);
const vars = variables.concat({ name: subKey, value: key });
if (trailKeys.length === 0) {
callSubscriberWithValues(sub, childValues.oldValue, childValues.newValue, vars);
}
else {
process(`${currentPath}/${subKey}`, childValues.oldValue, childValues.newValue, vars);
}
});
return; // We can stop processing
}
else {
currentPath = PathInfo.getChildPath(currentPath, subKey);
let childValues = getChildValues(subKey, oldValue, newValue);
oldValue = childValues.oldValue;
newValue = childValues.newValue;
}
}
else {
// oldValue = oldValue === null ? null : oldValue[subKey];
// if (typeof oldValue === 'undefined') { oldValue = null; }
// newValue = newValue === null ? null : newValue[subKey];
// if (typeof newValue === 'undefined') { newValue = null; }
let childValues = getChildValues(subKey, oldValue, newValue);
oldValue = childValues.oldValue;
newValue = childValues.newValue;
}
}
callSubscriberWithValues(sub, oldValue, newValue);
//console.warn(`Should trigger "${sub.type}" event on node "/${sub.dataPath}" with data: `, newValue);
callSubscriberWithValues(sub, oldValue, newValue, variables);
};
process(topEventPath, topEventData, newTopEventData);
});
}
@ -2034,7 +2186,6 @@ class Node {
})
.catch(err => {
debug.error(`Node.update ERROR: `, err);
eventDataLock && eventDataLock.release(`Node.update: error`);
lock && lock.release(`Node.update: error`);
return false;
});
@ -2732,10 +2883,10 @@ function _mergeNode(storage, nodeInfo, updates, lock) {
}
});
let operations = [];
let tree = nodeReader.getChildTree();
updatePromise = Promise.all(childPromises)
.then(() => {
let operations = [];
changes.deletes.forEach(change => {
let oldValue = change.oldValue;
if (oldValue instanceof NodeAddress) {
@ -2781,17 +2932,19 @@ function _mergeNode(storage, nodeInfo, updates, lock) {
return tree.toTreeBuilder(fillFactor)
.then(builder => {
// Reprocess the changes
changes.deletes.forEach(change => {
builder.remove(change.keyOrIndex, change.oldValue);
});
changes.updates.forEach(change => {
builder.remove(change.keyOrIndex, change.oldValue);
builder.add(change.keyOrIndex, change.newValue);
});
changes.inserts.forEach(change => {
builder.add(change.keyOrIndex, change.newValue);
});
// Process left-over operations:
operations.forEach(op => {
if (op.type === 'add') {
builder.add(op.key, op.recordPointer, op.metadata);
}
else if (op.type === 'update') {
builder.remove(op.key, op.recordPointer);
builder.add(op.key, op.recordPointer, op.metadata);
}
else if (op.type === 'remove') {
builder.remove(op.key, op.recordPointer);
}
});
const bytes = [];
return builder.create().toBinary(true, BinaryWriter.forArray(bytes))

View file

@ -1151,27 +1151,28 @@ class Storage extends EventEmitter {
// - "child_added", "child_removed" events on the parent path
// - "child_changed" events on the parent path and its ancestors
// - ALL events on child/descendant paths
const pathInfo = PathInfo.get(path);
const pathInfo = new PathInfo(path);
const valueSubscribers = [];
Object.keys(_subs).forEach(subscriptionPath => {
if (path === subscriptionPath || pathInfo.isDescendantOf(subscriptionPath)) {
if (pathInfo.equals(subscriptionPath) || pathInfo.isDescendantOf(subscriptionPath)) {
let pathSubs = _subs[subscriptionPath];
const eventPath = PathInfo.fillVariables(subscriptionPath, path);
pathSubs.forEach(sub => {
let dataPath;
if (sub.type === "value" || sub.type === "notify_value") {
dataPath = subscriptionPath;
dataPath = eventPath;
}
else if ((sub.type === "child_changed" || sub.type === "notify_child_changed") && path !== subscriptionPath) {
let childKey = PathInfo.getPathKeys(path.slice(subscriptionPath.length).replace(/^\//, ''))[0];
dataPath = PathInfo.getChildPath(subscriptionPath, childKey); //NodePath(subscriptionPath).childPath(childKey);
else if ((sub.type === "child_changed" || sub.type === "notify_child_changed") && path !== eventPath) {
let childKey = PathInfo.getPathKeys(path.slice(eventPath.length).replace(/^\//, ''))[0];
dataPath = PathInfo.getChildPath(eventPath, childKey);
}
else if (~["child_added", "child_removed", "notify_child_added", "notify_child_removed"].indexOf(sub.type) && pathInfo.isChildOf(subscriptionPath)) {
let childKey = PathInfo.getPathKeys(path.slice(subscriptionPath.length).replace(/^\//, ''))[0];
dataPath = PathInfo.getChildPath(subscriptionPath, childKey) //NodePath(subscriptionPath).childPath(childKey);
else if (~["child_added", "child_removed", "notify_child_added", "notify_child_removed"].indexOf(sub.type) && pathInfo.isChildOf(eventPath)) {
let childKey = PathInfo.getPathKeys(path.slice(eventPath.length).replace(/^\//, ''))[0];
dataPath = PathInfo.getChildPath(eventPath, childKey);
}
if (dataPath && valueSubscribers.findIndex(s => s.type === sub.type && s.path === subscriptionPath) < 0) {
valueSubscribers.push({ type: sub.type, path: subscriptionPath, dataPath });
if (dataPath && valueSubscribers.findIndex(s => s.type === sub.type && s.path === eventPath) < 0) {
valueSubscribers.push({ type: sub.type, eventPath, dataPath, subscriptionPath });
}
});
}
@ -1187,37 +1188,38 @@ class Storage extends EventEmitter {
const pathInfo = PathInfo.get(path);
const subscribers = [];
Object.keys(_subs).forEach(subscriptionPath => {
if (path === subscriptionPath
if (pathInfo.equals(subscriptionPath) //path === subscriptionPath
|| pathInfo.isDescendantOf(subscriptionPath)
|| pathInfo.isAncestorOf(subscriptionPath)
) {
let pathSubs = _subs[subscriptionPath];
const eventPath = PathInfo.fillVariables(subscriptionPath, path);
pathSubs.forEach(sub => {
let dataPath = null;
if (sub.type === "value" || sub.type === "notify_value") {
dataPath = subscriptionPath;
dataPath = eventPath;
}
else if (sub.type === "child_changed" || sub.type === "notify_child_changed") {
let childKey = path === subscriptionPath || pathInfo.isAncestorOf(subscriptionPath)
let childKey = path === eventPath || pathInfo.isAncestorOf(eventPath)
? "*"
: PathInfo.getPathKeys(path.slice(subscriptionPath.length).replace(/^\//, ''))[0];
dataPath = PathInfo.getChildPath(subscriptionPath, childKey); //NodePath(subscriptionPath).childPath(childKey);
: PathInfo.getPathKeys(path.slice(eventPath.length).replace(/^\//, ''))[0];
dataPath = PathInfo.getChildPath(eventPath, childKey);
}
else if (
~["child_added", "child_removed", "notify_child_added", "notify_child_removed"].indexOf(sub.type)
&& (
pathInfo.isChildOf(subscriptionPath)
|| path === subscriptionPath
|| pathInfo.isAncestorOf(subscriptionPath)
pathInfo.isChildOf(eventPath)
|| path === eventPath
|| pathInfo.isAncestorOf(eventPath)
)
) {
let childKey = path === subscriptionPath || pathInfo.isAncestorOf(subscriptionPath)
let childKey = path === eventPath || pathInfo.isAncestorOf(eventPath)
? "*"
: PathInfo.getPathKeys(path.slice(subscriptionPath.length).replace(/^\//, ''))[0];
dataPath = PathInfo.getChildPath(subscriptionPath, childKey); //NodePath(subscriptionPath).childPath(childKey);
: PathInfo.getPathKeys(path.slice(eventPath.length).replace(/^\//, ''))[0];
dataPath = PathInfo.getChildPath(eventPath, childKey); //NodePath(subscriptionPath).childPath(childKey);
}
if (dataPath && subscribers.findIndex(s => s.type === sub.type && s.path === subscriptionPath) < 0) {
subscribers.push({ type: sub.type, path: subscriptionPath, dataPath });
if (dataPath && subscribers.findIndex(s => s.type === sub.type && s.path === eventPath) < 0) {
subscribers.push({ type: sub.type, eventPath, dataPath, subscriptionPath });
}
});
}