- fixed #75 .take issue: now loads extData

- fixed 2+ index rebuilds failing
- moved unused imports to jsdoc
- async .build
This commit is contained in:
Ewout Stortenbeker 2022-02-23 13:56:18 +01:00
parent 1851fb42fb
commit 01cd330a4c

View file

@ -1,7 +1,6 @@
'use strict';
const { Storage } = require('./storage');
const { Node } = require('./node');
const { BPlusTreeBuilder, BPlusTree, BinaryBPlusTree, BinaryWriter, BinaryBPlusTreeLeafEntry, BinaryReader, BlacklistingSearchOperator } = require('./btree');
const { BPlusTreeBuilder, BPlusTree, BinaryBPlusTree, BinaryWriter, BinaryReader, BlacklistingSearchOperator } = require('./btree');
const { PathInfo, Utils, ID, ColorStyle } = require('acebase-core');
const { compareValues, getChildValues, numberToBytes, bytesToNumber, encodeString, decodeString } = Utils;
const Geohash = require('./geohash');
@ -10,6 +9,12 @@ const ThreadSafe = require('./thread-safe');
const unidecode = require('unidecode');
const { getValueType } = require('./node-value-types');
/**
* @typedef {import('./storage').Storage} Storage
* @typedef {import('./btree').BinaryBPlusTreeLeaf} BinaryBPlusTreeLeaf
* @typedef {import('./btree').BinaryBPlusTreeLeafEntry} BinaryBPlusTreeLeafEntry
*/
const DISK_BLOCK_SIZE = 4096; // use 512 for older disks
const FILL_FACTOR = 50; // leave room for inserts
@ -533,6 +538,7 @@ class DataIndex {
// const oldEntry = tree.find(keyValues.oldValue);
const go = async (retry = 0) => {
const opsCount = operations.length;
try {
await idx.tree.transaction(operations);
// Index updated
@ -543,9 +549,11 @@ class DataIndex {
// Could not update index --> leaf full?
this.storage.debug.verbose(`Could not update index ${this.description}: ${err.message}`.colorize(ColorStyle.yellow));
console.assert(retry === 0, `unable to process operations because tree was rebuilt, and it didn't help?!`);
if (retry > 0 && opsCount === operations.length) {
throw new Error(`DEV ERROR: unable to process operations because tree was rebuilt, and that didn't help?!`);
}
await this._rebuild(idx); // rebuild calls idx.close() and release()
await this._rebuild(idx); // rebuild calls idx.close() and .release()
// Process left-over operations
this.storage.debug.verbose(`Index was rebuilt, retrying pending operations`);
@ -704,16 +712,23 @@ class DataIndex {
const results = new IndexQueryResults(); //[];
results.filterKey = this.key;
let skipped = 0;
const processLeaf = (leaf) => {
/**
* @param {BinaryBPlusTreeLeaf} leaf
* @returns
*/
const processLeaf = async (leaf) => {
if (!ascending) { leaf.entries.reverse(); }
for (let i = 0; i < leaf.entries.length; i++) {
const entry = leaf.entries[i];
const value = entry.key;
for (let j = 0; j < entry.values.length; j++) {
for (let j = 0; j < entry.totalValues; j++) { //entry.values.length
if (skipped < skip) {
skipped++;
continue;
}
if (leaf.hasExtData && !leaf.extData.loaded) {
await leaf.extData.load();
}
const entryValue = entry.values[j];
const recordPointer = _parseRecordPointer(this.path, entryValue.recordPointer);
const metadata = entryValue.metadata;
@ -725,10 +740,10 @@ class DataIndex {
}
}
if (ascending && leaf.getNext) {
if (ascending && leaf.hasNext) {
return leaf.getNext().then(processLeaf);
}
else if (!ascending && leaf.getPrevious) {
else if (!ascending && leaf.hasPrevious) {
return leaf.getPrevious().then(processLeaf);
}
else {
@ -919,9 +934,9 @@ class DataIndex {
* @param {(tree: BPlusTreeBuilder, value: any, recordPointer: number[], metadata?: object, env: { path: string, wildcards: string[], key: string, locale: string }) => void} [options.addCallback]
* @param {number[]} [options.valueTypes]
*/
build(options) {
if (~[DataIndex.STATE.BUILD, DataIndex.STATE.REBUILD].indexOf(this.state)) {
return Promise.reject(new Error(`Index is already being built`));
async build(options) {
if ([DataIndex.STATE.BUILD, DataIndex.STATE.REBUILD].includes(this.state)) {
throw new Error(`Index is already being built`);
}
this.state = this.state === DataIndex.STATE.READY
? DataIndex.STATE.REBUILD // Existing index file has to be overwritten in the last phase
@ -964,31 +979,29 @@ class DataIndex {
console.error(err);
reject(err);
});
buildWriteStream.on('open', () => {
return getAll('', 0)
.then(() => {
// if (indexedValues === 0) {
// const err = new Error('No values found to index');
// err.code = 'NO_DATA';
// buildWriteStream.close(() => {
// pfs.rm(buildFile)
// .then(() => {
// reject(err);
// })
// return reject(err);
// });
// return;
// }
this.storage.debug.log(`done writing values to ${buildFile}`);
if (streamState.wait) {
buildWriteStream.once('drain', () => {
buildWriteStream.end(resolve);
});
}
else {
buildWriteStream.on('open', async () => {
await getAll('', 0)
// if (indexedValues === 0) {
// const err = new Error('No values found to index');
// err.code = 'NO_DATA';
// buildWriteStream.close(() => {
// pfs.rm(buildFile)
// .then(() => {
// reject(err);
// })
// return reject(err);
// });
// return;
// }
this.storage.debug.log(`done writing values to ${buildFile}`);
if (streamState.wait) {
buildWriteStream.once('drain', () => {
buildWriteStream.end(resolve);
}
});
});
}
else {
buildWriteStream.end(resolve);
}
});
buildWriteStream.on('drain', () => {
// Write queued chunks
@ -1017,7 +1030,7 @@ class DataIndex {
}
}
const isWildcardKey = key => key === '*' || key.startsWith('$');
const getAll = (currentPath, keyIndex) => {
const getAll = async (currentPath, keyIndex) => {
// "users/*/posts"
// --> Get all children of "users",
// --> get their "posts" children,