From 01cd330a4c0dae90bfd2e7f09cb8b0effdf1efcc Mon Sep 17 00:00:00 2001 From: Ewout Stortenbeker <4ewout@gmail.com> Date: Wed, 23 Feb 2022 13:56:18 +0100 Subject: [PATCH] - fixed #75 .take issue: now loads extData - fixed 2+ index rebuilds failing - moved unused imports to jsdoc - async .build --- src/data-index.js | 85 +++++++++++++++++++++++++++-------------------- 1 file changed, 49 insertions(+), 36 deletions(-) diff --git a/src/data-index.js b/src/data-index.js index 81809f9..ecc6bf0 100644 --- a/src/data-index.js +++ b/src/data-index.js @@ -1,7 +1,6 @@ 'use strict'; -const { Storage } = require('./storage'); const { Node } = require('./node'); -const { BPlusTreeBuilder, BPlusTree, BinaryBPlusTree, BinaryWriter, BinaryBPlusTreeLeafEntry, BinaryReader, BlacklistingSearchOperator } = require('./btree'); +const { BPlusTreeBuilder, BPlusTree, BinaryBPlusTree, BinaryWriter, BinaryReader, BlacklistingSearchOperator } = require('./btree'); const { PathInfo, Utils, ID, ColorStyle } = require('acebase-core'); const { compareValues, getChildValues, numberToBytes, bytesToNumber, encodeString, decodeString } = Utils; const Geohash = require('./geohash'); @@ -10,6 +9,12 @@ const ThreadSafe = require('./thread-safe'); const unidecode = require('unidecode'); const { getValueType } = require('./node-value-types'); +/** + * @typedef {import('./storage').Storage} Storage + * @typedef {import('./btree').BinaryBPlusTreeLeaf} BinaryBPlusTreeLeaf + * @typedef {import('./btree').BinaryBPlusTreeLeafEntry} BinaryBPlusTreeLeafEntry + */ + const DISK_BLOCK_SIZE = 4096; // use 512 for older disks const FILL_FACTOR = 50; // leave room for inserts @@ -533,6 +538,7 @@ class DataIndex { // const oldEntry = tree.find(keyValues.oldValue); const go = async (retry = 0) => { + const opsCount = operations.length; try { await idx.tree.transaction(operations); // Index updated @@ -543,9 +549,11 @@ class DataIndex { // Could not update index --> leaf full? this.storage.debug.verbose(`Could not update index ${this.description}: ${err.message}`.colorize(ColorStyle.yellow)); - console.assert(retry === 0, `unable to process operations because tree was rebuilt, and it didn't help?!`); + if (retry > 0 && opsCount === operations.length) { + throw new Error(`DEV ERROR: unable to process operations because tree was rebuilt, and that didn't help?!`); + } - await this._rebuild(idx); // rebuild calls idx.close() and release() + await this._rebuild(idx); // rebuild calls idx.close() and .release() // Process left-over operations this.storage.debug.verbose(`Index was rebuilt, retrying pending operations`); @@ -704,16 +712,23 @@ class DataIndex { const results = new IndexQueryResults(); //[]; results.filterKey = this.key; let skipped = 0; - const processLeaf = (leaf) => { + /** + * @param {BinaryBPlusTreeLeaf} leaf + * @returns + */ + const processLeaf = async (leaf) => { if (!ascending) { leaf.entries.reverse(); } for (let i = 0; i < leaf.entries.length; i++) { const entry = leaf.entries[i]; const value = entry.key; - for (let j = 0; j < entry.values.length; j++) { + for (let j = 0; j < entry.totalValues; j++) { //entry.values.length if (skipped < skip) { skipped++; continue; } + if (leaf.hasExtData && !leaf.extData.loaded) { + await leaf.extData.load(); + } const entryValue = entry.values[j]; const recordPointer = _parseRecordPointer(this.path, entryValue.recordPointer); const metadata = entryValue.metadata; @@ -725,10 +740,10 @@ class DataIndex { } } - if (ascending && leaf.getNext) { + if (ascending && leaf.hasNext) { return leaf.getNext().then(processLeaf); } - else if (!ascending && leaf.getPrevious) { + else if (!ascending && leaf.hasPrevious) { return leaf.getPrevious().then(processLeaf); } else { @@ -919,9 +934,9 @@ class DataIndex { * @param {(tree: BPlusTreeBuilder, value: any, recordPointer: number[], metadata?: object, env: { path: string, wildcards: string[], key: string, locale: string }) => void} [options.addCallback] * @param {number[]} [options.valueTypes] */ - build(options) { - if (~[DataIndex.STATE.BUILD, DataIndex.STATE.REBUILD].indexOf(this.state)) { - return Promise.reject(new Error(`Index is already being built`)); + async build(options) { + if ([DataIndex.STATE.BUILD, DataIndex.STATE.REBUILD].includes(this.state)) { + throw new Error(`Index is already being built`); } this.state = this.state === DataIndex.STATE.READY ? DataIndex.STATE.REBUILD // Existing index file has to be overwritten in the last phase @@ -964,31 +979,29 @@ class DataIndex { console.error(err); reject(err); }); - buildWriteStream.on('open', () => { - return getAll('', 0) - .then(() => { - // if (indexedValues === 0) { - // const err = new Error('No values found to index'); - // err.code = 'NO_DATA'; - // buildWriteStream.close(() => { - // pfs.rm(buildFile) - // .then(() => { - // reject(err); - // }) - // return reject(err); - // }); - // return; - // } - this.storage.debug.log(`done writing values to ${buildFile}`); - if (streamState.wait) { - buildWriteStream.once('drain', () => { - buildWriteStream.end(resolve); - }); - } - else { + buildWriteStream.on('open', async () => { + await getAll('', 0) + // if (indexedValues === 0) { + // const err = new Error('No values found to index'); + // err.code = 'NO_DATA'; + // buildWriteStream.close(() => { + // pfs.rm(buildFile) + // .then(() => { + // reject(err); + // }) + // return reject(err); + // }); + // return; + // } + this.storage.debug.log(`done writing values to ${buildFile}`); + if (streamState.wait) { + buildWriteStream.once('drain', () => { buildWriteStream.end(resolve); - } - }); + }); + } + else { + buildWriteStream.end(resolve); + } }); buildWriteStream.on('drain', () => { // Write queued chunks @@ -1017,7 +1030,7 @@ class DataIndex { } } const isWildcardKey = key => key === '*' || key.startsWith('$'); - const getAll = (currentPath, keyIndex) => { + const getAll = async (currentPath, keyIndex) => { // "users/*/posts" // --> Get all children of "users", // --> get their "posts" children,