db-index.js

import { create, load } from 'prolly-trees/db-index'
import { sha256 as hasher } from 'multiformats/hashes/sha2'
import { nocache as cache } from 'prolly-trees/cache'
import { bf, simpleCompare } from 'prolly-trees/utils'
import { makeGetBlock } from './prolly.js'
import { cidsToProof } from './fireproof.js'
import * as codec from '@ipld/dag-cbor'
// import { create as createBlock } from 'multiformats/block'
import { doTransaction } from './blockstore.js'
import charwise from 'charwise'

const ALWAYS_REBUILD = true // todo: remove this

const arrayCompare = (a, b) => {
  if (Array.isArray(a) && Array.isArray(b)) {
    const len = Math.min(a.length, b.length)
    for (let i = 0; i < len; i++) {
      const comp = simpleCompare(a[i], b[i])
      if (comp !== 0) {
        return comp
      }
    }
    return simpleCompare(a.length, b.length)
  } else {
    return simpleCompare(a, b)
  }
}

const opts = { cache, chunker: bf(3), codec, hasher, compare: arrayCompare }

const makeDoc = ({ key, value }) => ({ _id: key, ...value })

/**
 * JDoc for the result row type.
 * @typedef {Object} ChangeEvent
 * @property {string} key - The key of the document.
 * @property {Object} value - The new value of the document.
 * @property {boolean} [del] - Is the row deleted?
 * @memberof DbIndex
 */

/**
 * JDoc for the result row type.
 * @typedef {Object} DbIndexEntry
 * @property {string[]} key - The key for the DbIndex entry.
 * @property {Object} value - The value of the document.
 * @property {boolean} [del] - Is the row deleted?
 * @memberof DbIndex
 */

/**
 * Transforms a set of changes to DbIndex entries using a map function.
 *
 * @param {ChangeEvent[]} changes
 * @param {Function} mapFun
 * @returns {DbIndexEntry[]} The DbIndex entries generated by the map function.
 * @private
 * @memberof DbIndex
 */
const indexEntriesForChanges = (changes, mapFun) => {
  const indexEntries = []
  changes.forEach(({ key, value, del }) => {
    if (del || !value) return
    mapFun(makeDoc({ key, value }), (k, v) => {
      indexEntries.push({
        key: [charwise.encode(k), key],
        value: v
      })
    })
  })
  return indexEntries
}

const indexEntriesForOldChanges = async (blocks, byIDindexRoot, ids, mapFun) => {
  const { getBlock } = makeGetBlock(blocks)
  const byIDindex = await load({ cid: byIDindexRoot.cid, get: getBlock, ...opts })

  const result = await byIDindex.getMany(ids)
  return result
}

/**
 * Represents an DbIndex for a Fireproof database.
 *
 * @class DbIndex
 * @classdesc An DbIndex can be used to order and filter the documents in a Fireproof database.
 *
 * @param {Fireproof} database - The Fireproof database instance to DbIndex.
 * @param {Function} mapFun - The map function to apply to each entry in the database.
 *
 */
export default class DbIndex {
  constructor (database, mapFun) {
    /**
     * The database instance to DbIndex.
     * @type {Fireproof}
     */
    this.database = database
    /**
     * The map function to apply to each entry in the database.
     * @type {Function}
     */
    this.mapFun = mapFun

    this.dbIndexRoot = null
    this.dbIndex = null

    this.byIDindexRoot = null
    this.dbHead = null
  }

  /**
   * JSDoc for Query type.
   * @typedef {Object} DbQuery
   * @property {string[]} [range] - The range to query.
   * @memberof DbIndex
   */

  /**
   * Query object can have {range}
   * @param {DbQuery} query - the query range to use
   * @returns {Promise<{rows: Array<{id: string, key: string, value: any}>}>}
   * @memberof DbIndex
   * @instance
   */
  async query (query) {
    // if (!root) {
    // pass a root to query a snapshot
    await doTransaction('#updateIndex', this.database.blocks, async (blocks) => {
      await this.#updateIndex(blocks)
    })
    // }
    const response = await doIndexQuery(this.database.blocks, this.dbIndexRoot, this.dbIndex, query)
    return {
      proof: { index: await cidsToProof(response.cids) },
      // TODO fix this naming upstream in prolly/db-DbIndex?
      rows: response.result.map(({ id, key, row }) => ({ id: key, key: charwise.decode(id), value: row }))
    }
  }

  /**
   * Update the DbIndex with the latest changes
   * @private
   * @returns {Promise<void>}
   */
  async #updateIndex (blocks) {
    // todo remove this hack
    if (ALWAYS_REBUILD) {
      this.dbHead = null // hack
      this.dbIndex = null // hack
      this.dbIndexRoot = null
    }
    const result = await this.database.changesSince(this.dbHead) // {key, value, del}
    if (this.dbHead) {
      const oldChangeEntries = await indexEntriesForOldChanges(
        blocks,
        this.byIDindexRoot,
        result.rows.map(({ key }) => key),
        this.mapFun
      )
      const oldIndexEntries = oldChangeEntries.result.map((key) => ({ key, del: true }))
      const removalResult = await bulkIndex(blocks, this.dbIndexRoot, this.dbIndex, oldIndexEntries, opts)
      this.dbIndexRoot = removalResult.root
      this.dbIndex = removalResult.dbIndex

      const removeByIdIndexEntries = oldIndexEntries.map(({ key }) => ({ key: key[1], del: true }))
      const purgedRemovalResults = await bulkIndex(
        blocks,
        this.byIDindexRoot,
        this.byIDIndex,
        removeByIdIndexEntries,
        opts
      )
      this.byIDindexRoot = purgedRemovalResults.root
      this.byIDIndex = purgedRemovalResults.dbIndex
    }
    const indexEntries = indexEntriesForChanges(result.rows, this.mapFun)
    const byIdIndexEntries = indexEntries.map(({ key }) => ({ key: key[1], value: key }))
    const addFutureRemovalsResult = await bulkIndex(blocks, this.byIDindexRoot, this.byIDIndex, byIdIndexEntries, opts)
    this.byIDindexRoot = addFutureRemovalsResult.root
    this.byIDIndex = addFutureRemovalsResult.dbIndex

    // console.log('indexEntries', indexEntries)

    const updateIndexResult = await bulkIndex(blocks, this.dbIndexRoot, this.dbIndex, indexEntries, opts)
    this.dbIndexRoot = updateIndexResult.root
    this.dbIndex = updateIndexResult.dbIndex

    this.dbHead = result.clock
  }
}

/**
 * Update the DbIndex with the given entries
 * @param {Blockstore} blocks
 * @param {Block} inRoot
 * @param {DbIndexEntry[]} indexEntries
 * @private
 */
async function bulkIndex (blocks, inRoot, inDBindex, indexEntries) {
  if (!indexEntries.length) return { dbIndex: inDBindex, root: inRoot }
  const putBlock = blocks.put.bind(blocks)
  const { getBlock } = makeGetBlock(blocks)
  let returnRootBlock
  let returnNode
  if (!inDBindex) {
    for await (const node of await create({ get: getBlock, list: indexEntries, ...opts })) {
      const block = await node.block
      await putBlock(block.cid, block.bytes)
      returnRootBlock = block
      returnNode = node
    }
  } else {
    // const dbIndex = await load({ cid: inRoot.cid, get: getBlock, ...opts }) // todo load from root on refresh
    const { root, blocks } = await inDBindex.bulk(indexEntries)
    returnRootBlock = await root.block
    returnNode = root
    for await (const block of blocks) {
      await putBlock(block.cid, block.bytes)
    }
    await putBlock(returnRootBlock.cid, returnRootBlock.bytes)
  }
  return { dbIndex: returnNode, root: returnRootBlock }
}

async function doIndexQuery (blocks, dbIndexRoot, dbIndex, query) {
  if (!dbIndex) {
    const cid = dbIndexRoot && dbIndexRoot.cid
    if (!cid) return { result: [] }
    const { getBlock } = makeGetBlock(blocks)
    dbIndex = await load({ cid, get: getBlock, ...opts })
  }
  if (query.range) {
    const encodedRange = query.range.map((key) => charwise.encode(key))
    return dbIndex.range(...encodedRange)
  } else if (query.key) {
    const encodedKey = charwise.encode(query.key)
    return dbIndex.get(encodedKey)
  }
}