Pool Utility Functions - Database & Blockchain Integration
Overview
These utility functions handle the critical task of managing pool state between the database, blockchain, and in-memory processing. They ensure data consistency and handle initialization of new pools.
Core Imports & Dependencies
// Subsquid database and processor types
import { Store } from '@subsquid/typeorm-store';
import { PoolConfig, PoolState, Token, PoolProcessState, ActiveBalances } from '../model';
import { BlockData, DataHandlerContext } from '@subsquid/evm-processor';
import { ActiveBalance, ProtocolConfig } from '@absinthe/common';
// Contract ABIs for blockchain calls
import * as univ2Abi from '../abi/univ2';
import * as univ2LpAbi from '../abi/univ2LP';
import { jsonToMap } from '@absinthe/common';
- TypeORM Store: Database operations with strong typing
- Model Entities:
PoolConfig
,PoolState
,Token
database entities - Contract ABIs: For making on-chain calls to get current state
- Subsquid Types: Block data and processor context
🔄 State Update Functions
updatePoolStateFromOnChain()
Updates an existing pool's state with fresh blockchain data.
export async function updatePoolStateFromOnChain(
ctx: DataHandlerContext<Store>,
block: BlockData,
contractAddress: string,
poolConfig: PoolConfig,
existingPoolState: PoolState,
): Promise<PoolState> {
// Validate pool config exists
if (!poolConfig.id || !poolConfig.lpToken || !poolConfig.token0 || !poolConfig.token1)
throw new Error('Pool config not found');
// Create contract instance for on-chain calls
const contract = new univ2Abi.Contract(ctx, block.header, contractAddress);
// Fetch current reserves and total supply from blockchain
const reserve = await contract.getReserves();
const totalSupply = await contract.totalSupply();
const r0 = reserve._reserve0;
const r1 = reserve._reserve1;
// Update the existing pool state (mutates in place)
existingPoolState.reserve0 = r0;
existingPoolState.reserve1 = r1;
existingPoolState.totalSupply = totalSupply;
existingPoolState.lastBlock = block.header.height;
existingPoolState.lastTsMs = BigInt(block.header.timestamp);
existingPoolState.updatedAt = new Date();
return existingPoolState;
}
- Real-time sync: Keeps pool reserves in sync with blockchain
- Price calculations: Updated reserves are needed for accurate LP token pricing
- State tracking: Records the last processed block and timestamp
- During regular block processing to update pool state
- Before calculating LP token prices for transfers
🚀 Initialization Functions
initPoolStateIfNeeded()
Creates initial pool state from blockchain data if it doesn't exist.
export async function initPoolStateIfNeeded(
ctx: DataHandlerContext<Store>,
block: BlockData,
contractAddress: string,
poolState: PoolState,
poolConfig: PoolConfig,
): Promise<PoolState> {
// Return existing state if already initialized
if (poolState.id) return poolState;
// Validate dependencies
if (!poolConfig.id || !poolConfig.lpToken || !poolConfig.token0 || !poolConfig.token1)
throw new Error('Pool config not found');
// Fetch initial state from blockchain
const contract = new univ2Abi.Contract(ctx, block.header, contractAddress);
const reserve = await contract.getReserves();
const totalSupply = await contract.totalSupply();
const r0 = reserve._reserve0;
const r1 = reserve._reserve1;
// Create new pool state entity
const newPoolState = new PoolState({
id: `${contractAddress}-state`,
pool: poolConfig,
reserve0: r0,
reserve1: r1,
totalSupply,
lastBlock: block.header.height,
lastTsMs: BigInt(block.header.timestamp),
updatedAt: new Date(),
});
return newPoolState;
}
- First-time setup: Initializes pool state when first encountered
- Blockchain bootstrap: Gets initial reserves and supply from contract
- Database consistency: Creates properly linked entity relationships
initPoolConfigIfNeeded()
The most complex function - discovers and initializes pool configuration from blockchain.
export async function initPoolConfigIfNeeded(
ctx: DataHandlerContext<Store>,
block: BlockData,
contractAddress: string,
poolConfig: PoolConfig,
protocol: ProtocolConfig,
): Promise<PoolConfig> {
// Return existing config if fully initialized
if (poolConfig.id && poolConfig.lpToken && poolConfig.token0 && poolConfig.token1) return poolConfig;
try {
// Step 1: Get pool contract info
const contract = new univ2Abi.Contract(ctx, block.header, contractAddress);
const lpDecimals = await contract.decimals(); // LP token decimals
const token0Address = await contract.token0(); // Token0 address
const token1Address = await contract.token1(); // Token1 address
// Step 2: Get token0 contract info
const token0Contract = new univ2LpAbi.Contract(ctx, block.header, token0Address);
const token0Decimals = await token0Contract.decimals();
// Step 3: Get token1 contract info
const token1Contract = new univ2LpAbi.Contract(ctx, block.header, token1Address);
const token1Decimals = await token1Contract.decimals();
// Step 4: Create token entities with CoinGecko IDs from config
const lpToken = new Token({
id: `${contractAddress}-lp`,
address: contractAddress,
decimals: lpDecimals,
coingeckoId: null, // LP tokens don't have CoinGecko IDs
});
const token0 = new Token({
id: `${token0Address}-token0`,
address: token0Address,
decimals: token0Decimals,
coingeckoId: protocol.token0.coingeckoId, // From user config
});
const token1 = new Token({
id: `${token1Address}-token1`,
address: token1Address,
decimals: token1Decimals,
coingeckoId: protocol.token1.coingeckoId, // From user config
});
// Step 5: Create pool config linking all tokens
const newPoolConfig = new PoolConfig({
id: `${contractAddress}-config`,
lpToken,
token0,
token1,
});
return newPoolConfig;
} catch (error) {
console.warn(
`Failed to initialize pool config for ${contractAddress} at block ${block.header.height}:`,
(error as Error).message,
);
// Return existing (possibly incomplete) config instead of crashing
return poolConfig;
}
}
- Auto-discovery: Automatically discovers token addresses and decimals from pool contract
- Metadata linking: Connects blockchain data with user-provided CoinGecko IDs
- Error handling: Gracefully handles initialization failures
- Pool Contract: Get LP token decimals and underlying token addresses
- Token Contracts: Get decimals for both underlying tokens
- Metadata Merge: Combine blockchain data with user config (CoinGecko IDs)
- Entity Creation: Create linked database entities
📖 Database Loading Functions
loadPoolStateFromDb()
export async function loadPoolStateFromDb(
ctx: DataHandlerContext<Store>,
contractAddress: string,
): Promise<PoolState | void> {
const poolState = await ctx.store.findOne(PoolState, {
where: { id: `${contractAddress}-state` },
relations: { pool: true }, // Load related pool config
});
return poolState || undefined;
}
loadPoolConfigFromDb()
export async function loadPoolConfigFromDb(
ctx: DataHandlerContext<Store>,
contractAddress: string,
): Promise<PoolConfig | void> {
const poolConfig = await ctx.store.findOne(PoolConfig, {
where: { id: `${contractAddress}-config` },
relations: { token0: true, token1: true, lpToken: true }, // Load all related tokens
});
return poolConfig || undefined;
}
loadActiveBalancesFromDb()
export async function loadActiveBalancesFromDb(
ctx: DataHandlerContext<Store>,
contractAddress: string,
): Promise<Map<string, ActiveBalance> | undefined> {
const activeBalancesEntity = await ctx.store.findOne(ActiveBalances, {
where: { id: `${contractAddress}-active-balances` },
});
// Convert JSON back to Map for in-memory processing
return activeBalancesEntity
? jsonToMap(activeBalancesEntity.activeBalancesMap as Record<string, ActiveBalance>)
: undefined;
}
- Eager loading: Uses
relations
to load linked entities in single query - Type safety: Returns strongly typed entities or undefined
- JSON deserialization: Converts stored JSON back to usable Map structure
initPoolProcessStateIfNeeded()
export async function initPoolProcessStateIfNeeded(
contractAddress: string,
poolConfig: PoolConfig,
poolProcessState: PoolProcessState | undefined,
): Promise<PoolProcessState> {
if (poolProcessState?.id) return poolProcessState;
return new PoolProcessState({
id: `${contractAddress}-process-state`,
pool: poolConfig,
lastInterpolatedTs: undefined, // Will be set on first use
});
}
- Processing state: Tracks when balance flushes were last performed
- Time boundaries: Maintains
lastInterpolatedTs
for periodic operations
💡 Error Handling Strategy
- Graceful degradation: Functions return existing/partial state on errors
- Detailed logging: Warns about initialization failures with context
- Dependency validation: Checks required fields before proceeding
- Try-catch boundaries: Prevents single pool failures from crashing processor
🔧 Performance Considerations
- Eager loading: Uses
relations
to minimize database queries - Caching: Loads state once per batch, reuses for all blocks
- Conditional updates: Only calls blockchain when initialization needed
- Batch operations: Upserts happen during finalization, not per block
These utility functions form the foundation that allows the BatchProcessor to seamlessly handle both existing pools and newly discovered ones, maintaining consistency between database, blockchain, and in-memory state.