Skip to content

Pool Utility Functions - Database & Blockchain Integration

Overview

These utility functions handle the critical task of managing pool state between the database, blockchain, and in-memory processing. They ensure data consistency and handle initialization of new pools.

Core Imports & Dependencies

// Subsquid database and processor types
import { Store } from '@subsquid/typeorm-store';
import { PoolConfig, PoolState, Token, PoolProcessState, ActiveBalances } from '../model';
import { BlockData, DataHandlerContext } from '@subsquid/evm-processor';
import { ActiveBalance, ProtocolConfig } from '@absinthe/common';
 
// Contract ABIs for blockchain calls
import * as univ2Abi from '../abi/univ2';
import * as univ2LpAbi from '../abi/univ2LP';
import { jsonToMap } from '@absinthe/common';
Key Dependencies:
  • TypeORM Store: Database operations with strong typing
  • Model Entities: PoolConfig, PoolState, Token database entities
  • Contract ABIs: For making on-chain calls to get current state
  • Subsquid Types: Block data and processor context

🔄 State Update Functions

updatePoolStateFromOnChain()

Updates an existing pool's state with fresh blockchain data.

export async function updatePoolStateFromOnChain(
  ctx: DataHandlerContext<Store>,
  block: BlockData,
  contractAddress: string,
  poolConfig: PoolConfig,
  existingPoolState: PoolState,
): Promise<PoolState> {
  // Validate pool config exists
  if (!poolConfig.id || !poolConfig.lpToken || !poolConfig.token0 || !poolConfig.token1)
    throw new Error('Pool config not found');
 
  // Create contract instance for on-chain calls
  const contract = new univ2Abi.Contract(ctx, block.header, contractAddress);
 
  // Fetch current reserves and total supply from blockchain
  const reserve = await contract.getReserves();
  const totalSupply = await contract.totalSupply();
  const r0 = reserve._reserve0;
  const r1 = reserve._reserve1;
 
  // Update the existing pool state (mutates in place)
  existingPoolState.reserve0 = r0;
  existingPoolState.reserve1 = r1;
  existingPoolState.totalSupply = totalSupply;
  existingPoolState.lastBlock = block.header.height;
  existingPoolState.lastTsMs = BigInt(block.header.timestamp);
  existingPoolState.updatedAt = new Date();
 
  return existingPoolState;
}
Purpose:
  • Real-time sync: Keeps pool reserves in sync with blockchain
  • Price calculations: Updated reserves are needed for accurate LP token pricing
  • State tracking: Records the last processed block and timestamp
When called:
  • During regular block processing to update pool state
  • Before calculating LP token prices for transfers

🚀 Initialization Functions

initPoolStateIfNeeded()

Creates initial pool state from blockchain data if it doesn't exist.

export async function initPoolStateIfNeeded(
  ctx: DataHandlerContext<Store>,
  block: BlockData,
  contractAddress: string,
  poolState: PoolState,
  poolConfig: PoolConfig,
): Promise<PoolState> {
  // Return existing state if already initialized
  if (poolState.id) return poolState;
 
  // Validate dependencies
  if (!poolConfig.id || !poolConfig.lpToken || !poolConfig.token0 || !poolConfig.token1)
    throw new Error('Pool config not found');
 
  // Fetch initial state from blockchain
  const contract = new univ2Abi.Contract(ctx, block.header, contractAddress);
  const reserve = await contract.getReserves();
  const totalSupply = await contract.totalSupply();
  const r0 = reserve._reserve0;
  const r1 = reserve._reserve1;
 
  // Create new pool state entity
  const newPoolState = new PoolState({
    id: `${contractAddress}-state`,
    pool: poolConfig,
    reserve0: r0,
    reserve1: r1,
    totalSupply,
    lastBlock: block.header.height,
    lastTsMs: BigInt(block.header.timestamp),
    updatedAt: new Date(),
  });
 
  return newPoolState;
}
Purpose:
  • First-time setup: Initializes pool state when first encountered
  • Blockchain bootstrap: Gets initial reserves and supply from contract
  • Database consistency: Creates properly linked entity relationships

initPoolConfigIfNeeded()

The most complex function - discovers and initializes pool configuration from blockchain.

export async function initPoolConfigIfNeeded(
  ctx: DataHandlerContext<Store>,
  block: BlockData,
  contractAddress: string,
  poolConfig: PoolConfig,
  protocol: ProtocolConfig,
): Promise<PoolConfig> {
  // Return existing config if fully initialized
  if (poolConfig.id && poolConfig.lpToken && poolConfig.token0 && poolConfig.token1) return poolConfig;
 
  try {
    // Step 1: Get pool contract info
    const contract = new univ2Abi.Contract(ctx, block.header, contractAddress);
    const lpDecimals = await contract.decimals(); // LP token decimals
    const token0Address = await contract.token0(); // Token0 address
    const token1Address = await contract.token1(); // Token1 address
 
    // Step 2: Get token0 contract info
    const token0Contract = new univ2LpAbi.Contract(ctx, block.header, token0Address);
    const token0Decimals = await token0Contract.decimals();
 
    // Step 3: Get token1 contract info
    const token1Contract = new univ2LpAbi.Contract(ctx, block.header, token1Address);
    const token1Decimals = await token1Contract.decimals();
 
    // Step 4: Create token entities with CoinGecko IDs from config
    const lpToken = new Token({
      id: `${contractAddress}-lp`,
      address: contractAddress,
      decimals: lpDecimals,
      coingeckoId: null, // LP tokens don't have CoinGecko IDs
    });
 
    const token0 = new Token({
      id: `${token0Address}-token0`,
      address: token0Address,
      decimals: token0Decimals,
      coingeckoId: protocol.token0.coingeckoId, // From user config
    });
 
    const token1 = new Token({
      id: `${token1Address}-token1`,
      address: token1Address,
      decimals: token1Decimals,
      coingeckoId: protocol.token1.coingeckoId, // From user config
    });
 
    // Step 5: Create pool config linking all tokens
    const newPoolConfig = new PoolConfig({
      id: `${contractAddress}-config`,
      lpToken,
      token0,
      token1,
    });
 
    return newPoolConfig;
  } catch (error) {
    console.warn(
      `Failed to initialize pool config for ${contractAddress} at block ${block.header.height}:`,
      (error as Error).message,
    );
    // Return existing (possibly incomplete) config instead of crashing
    return poolConfig;
  }
}
Purpose:
  • Auto-discovery: Automatically discovers token addresses and decimals from pool contract
  • Metadata linking: Connects blockchain data with user-provided CoinGecko IDs
  • Error handling: Gracefully handles initialization failures
Discovery Process:
  1. Pool Contract: Get LP token decimals and underlying token addresses
  2. Token Contracts: Get decimals for both underlying tokens
  3. Metadata Merge: Combine blockchain data with user config (CoinGecko IDs)
  4. Entity Creation: Create linked database entities

📖 Database Loading Functions

loadPoolStateFromDb()

export async function loadPoolStateFromDb(
  ctx: DataHandlerContext<Store>,
  contractAddress: string,
): Promise<PoolState | void> {
  const poolState = await ctx.store.findOne(PoolState, {
    where: { id: `${contractAddress}-state` },
    relations: { pool: true }, // Load related pool config
  });
  return poolState || undefined;
}

loadPoolConfigFromDb()

export async function loadPoolConfigFromDb(
  ctx: DataHandlerContext<Store>,
  contractAddress: string,
): Promise<PoolConfig | void> {
  const poolConfig = await ctx.store.findOne(PoolConfig, {
    where: { id: `${contractAddress}-config` },
    relations: { token0: true, token1: true, lpToken: true }, // Load all related tokens
  });
  return poolConfig || undefined;
}

loadActiveBalancesFromDb()

export async function loadActiveBalancesFromDb(
  ctx: DataHandlerContext<Store>,
  contractAddress: string,
): Promise<Map<string, ActiveBalance> | undefined> {
  const activeBalancesEntity = await ctx.store.findOne(ActiveBalances, {
    where: { id: `${contractAddress}-active-balances` },
  });
 
  // Convert JSON back to Map for in-memory processing
  return activeBalancesEntity
    ? jsonToMap(activeBalancesEntity.activeBalancesMap as Record<string, ActiveBalance>)
    : undefined;
}
Key Features:
  • Eager loading: Uses relations to load linked entities in single query
  • Type safety: Returns strongly typed entities or undefined
  • JSON deserialization: Converts stored JSON back to usable Map structure

initPoolProcessStateIfNeeded()

export async function initPoolProcessStateIfNeeded(
  contractAddress: string,
  poolConfig: PoolConfig,
  poolProcessState: PoolProcessState | undefined,
): Promise<PoolProcessState> {
  if (poolProcessState?.id) return poolProcessState;
 
  return new PoolProcessState({
    id: `${contractAddress}-process-state`,
    pool: poolConfig,
    lastInterpolatedTs: undefined, // Will be set on first use
  });
}
Purpose:
  • Processing state: Tracks when balance flushes were last performed
  • Time boundaries: Maintains lastInterpolatedTs for periodic operations

💡 Error Handling Strategy

  • Graceful degradation: Functions return existing/partial state on errors
  • Detailed logging: Warns about initialization failures with context
  • Dependency validation: Checks required fields before proceeding
  • Try-catch boundaries: Prevents single pool failures from crashing processor

🔧 Performance Considerations

  • Eager loading: Uses relations to minimize database queries
  • Caching: Loads state once per batch, reuses for all blocks
  • Conditional updates: Only calls blockchain when initialization needed
  • Batch operations: Upserts happen during finalization, not per block

These utility functions form the foundation that allows the BatchProcessor to seamlessly handle both existing pools and newly discovered ones, maintaining consistency between database, blockchain, and in-memory state.