BatchProcessor.ts - The Core Business Logic Engine
What It Does
The BatchProcessor is the core business logic engine that:
- Processes blockchain events (Swap and Transfer)
- Manages state across multiple pools using utility functions
- Calculates time-weighted balances with periodic flushes
- Handles data transmission to Absinthe API
Class Structure & Dependencies
import { ActiveBalances, PoolState, PoolProcessState, PoolConfig } from './model';
import {
AbsintheApiClient,
ActiveBalance,
BatchContext,
Chain,
ProtocolType,
ValidatedDexProtocolConfig,
ValidatedEnvBase,
} from '@absinthe/common';
import { processor } from './processor';
import { createHash } from 'crypto';
import { TypeormDatabase } from '@subsquid/typeorm-store';
import { initPoolConfigIfNeeded, loadActiveBalancesFromDb, loadPoolConfigFromDb } from './utils/pool';
import { ProtocolStateUniv2 } from './utils/types';
import * as univ2Abi from './abi/univ2';
import { computeLpTokenPrice, computePricedSwapVolume } from './utils/pricing';
import { processValueChange, toTimeWeightedBalance, toTransaction, pricePosition } from '@absinthe/common';
export class UniswapV2Processor {
private readonly protocols: ProtocolConfig[];
private readonly protocolType: ProtocolType;
private readonly schemaName: string;
private readonly refreshWindow: number;
private readonly apiClient: AbsintheApiClient;
private readonly chainConfig: Chain;
private readonly env: ValidatedEnvBase;
constructor(
dexProtocol: ValidatedDexProtocolConfig,
refreshWindow: number,
apiClient: AbsintheApiClient,
env: ValidatedEnvBase,
chainConfig: Chain,
) {
this.protocols = dexProtocol.protocols;
this.protocolType = dexProtocol.type;
this.refreshWindow = refreshWindow;
this.apiClient = apiClient;
this.env = env;
this.chainConfig = chainConfig;
this.schemaName = this.generateSchemaName();
}
}
Schema Name Generation
private generateSchemaName(): string {
const uniquePoolCombination = this.protocols
.reduce((acc, protocol) => acc + protocol.contractAddress.toLowerCase(), '')
.concat(this.chainConfig.networkId.toString());
const hash = createHash('md5').update(uniquePoolCombination).digest('hex').slice(0, 8);
return `univ2-${hash}`;
}
- Creates a unique database schema name for this processor
- Combines all pool addresses + chain network ID
- Uses MD5 hash with
univ2-
prefix - Allows multiple processors to use the same database without conflicts
Main Processing Flow
async run(): Promise<void> {
processor.run(
new TypeormDatabase({ supportHotBlocks: false, stateSchema: this.schemaName }),
async (ctx) => {
try {
await this.processBatch(ctx);
} catch (error) {
logger.error('Error processing batch:', (error as Error).message);
throw error;
}
},
);
}
Error Handling
- Comprehensive try/catch with logging
- Re-throws errors after logging for upstream handling
- Uses structured logging with batch context
Batch Processing Pipeline
private async processBatch(ctx: any): Promise<void> {
logger.info(`Processing batch with ${ctx.blocks.length} blocks`, {
blockRange: `${ctx.blocks[0]?.header.height} - ${ctx.blocks[ctx.blocks.length - 1]?.header.height}`,
});
// Step 1: Initialize protocol states using utility functions
const protocolStates = await this.initializeProtocolStates(ctx);
// Step 2: Process each block
for (const block of ctx.blocks) {
logger.info(`Processing block ${block.header.height}`, {
timestamp: new Date(block.header.timestamp).toISOString(),
});
await this.processBlock({ ctx, block, protocolStates });
}
// Step 3: Finalize and send data
await this.finalizeBatch(ctx, protocolStates);
}
Step 1: Initialize Protocol States
private async initializeProtocolStates(ctx: any): Promise<Map<string, ProtocolStateUniv2>> {
const protocolStates = new Map<string, ProtocolStateUniv2>();
for (const protocol of this.protocols) {
const contractAddress = protocol.contractAddress.toLowerCase();
// Load existing state from database using utility functions
let poolConfig = await loadPoolConfigFromDb(ctx, contractAddress);
let poolState = await loadPoolStateFromDb(ctx, contractAddress);
let poolProcessState = await loadPoolProcessStateFromDb(ctx, contractAddress);
let activeBalances = await loadActiveBalancesFromDb(ctx, contractAddress);
// Initialize config if missing
if (!poolConfig?.id) {
const initBlock = this.findInitializationBlock(ctx.blocks, protocol.fromBlock);
if (initBlock) {
poolConfig = await initPoolConfigIfNeeded(
ctx,
initBlock,
contractAddress,
poolConfig || new PoolConfig({}),
protocol,
);
}
}
// Initialize state if missing (requires config)
if (!poolState?.id && poolConfig?.id) {
const initBlock = this.findInitializationBlock(ctx.blocks, protocol.fromBlock);
if (initBlock) {
poolState = await initPoolStateIfNeeded(
ctx,
initBlock,
contractAddress,
poolState || new PoolState({}),
poolConfig,
);
}
}
// Initialize process state if missing
if (!poolProcessState?.id) {
poolProcessState = await initPoolProcessStateIfNeeded(
contractAddress,
poolConfig || new PoolConfig({}),
poolProcessState || new PoolProcessState({}),
);
}
// Create protocol state object
protocolStates.set(contractAddress, {
config: poolConfig || new PoolConfig({}),
state: poolState || new PoolState({}),
processState: poolProcessState || new PoolProcessState({}),
activeBalances: activeBalances || new Map<string, ActiveBalance>(),
balanceWindows: [],
transactions: [],
});
}
return protocolStates;
}
loadPoolConfigFromDb()
- Loads token metadata from databaseloadPoolStateFromDb()
- Loads current reserves and supplyloadActiveBalancesFromDb()
- Loads user balance stateinitPoolConfigIfNeeded()
- Initializes pool configuration on-chaininitPoolStateIfNeeded()
- Initializes pool state from blockchaininitPoolProcessStateIfNeeded()
- Creates processing state
💰 Swap Event Processing
private async processSwapEvent(
ctx: any,
block: any,
log: any,
protocol: ProtocolConfig,
protocolState: ProtocolStateUniv2,
): Promise<void> {
// Step 1: Decode event
const { sender, amount0In, amount0Out, amount1In, amount1Out } = univ2Abi.events.Swap.decode(log);
// Step 2: Calculate amounts (simple addition, not BigNumber)
const token0Amount = amount0In + amount0Out;
const token1Amount = amount1In + amount1Out;
// Step 3: Calculate gas fee in USD
const { gasPrice, gasUsed } = log.transaction;
const gasFee = Number(gasUsed) * Number(gasPrice);
const displayGasFee = gasFee / 10 ** 18;
const ethPriceUsd = await fetchHistoricalUsd(
'ethereum',
block.header.timestamp,
this.env.coingeckoApiKey,
);
const gasFeeUsd = displayGasFee * ethPriceUsd;
// Step 4: Calculate swap volume using utility function
const pricedSwapVolume = protocol.preferredTokenCoingeckoId === 'token0'
? await computePricedSwapVolume(
token0Amount,
protocolState.config.token0.coingeckoId as string,
protocolState.config.token0.decimals,
block.header.timestamp,
this.env.coingeckoApiKey,
)
: await computePricedSwapVolume(
token1Amount,
protocolState.config.token1.coingeckoId as string,
protocolState.config.token1.decimals,
block.header.timestamp,
this.env.coingeckoApiKey,
);
// Step 5: Create transaction schema with detailed token info
const transactionSchema = {
eventType: MessageType.TRANSACTION,
eventName: 'Swap',
tokens: {
token0Symbol: {
value: ChainShortName.HEMI,
type: 'string',
},
// ... extensive token metadata
},
rawAmount: protocol.preferredTokenCoingeckoId === 'token0'
? token0Amount.toString()
: token1Amount.toString(),
displayAmount: protocol.preferredTokenCoingeckoId === 'token0'
? Number(BigInt(token0Amount) / BigInt(10 ** protocolState.config.token0.decimals))
: Number(BigInt(token1Amount) / BigInt(10 ** protocolState.config.token1.decimals)),
unixTimestampMs: block.header.timestamp,
txHash: log.transactionHash,
logIndex: log.logIndex,
blockNumber: block.header.height,
blockHash: block.header.hash,
userId: sender,
currency: Currency.USD,
valueUsd: pricedSwapVolume,
gasUsed: Number(gasUsed),
gasFeeUsd: gasFeeUsd,
};
protocolState.transactions.push(transactionSchema);
}
Transfer Event Processing
private async processTransferEvent(
ctx: any,
block: any,
log: any,
protocol: ProtocolConfig,
protocolState: ProtocolStateUniv2,
): Promise<void> {
// Step 1: Decode event
const { from, to, value } = univ2Abi.events.Transfer.decode(log);
// Step 2: Calculate LP token price using utility function
const {
price: lpTokenPrice,
token0Price,
token1Price,
token0Value,
token1Value,
totalPoolValue,
totalSupplyBig,
reserve0,
reserve1,
} = await computeLpTokenPrice(
ctx,
block,
protocolState.config,
protocolState.state,
this.env.coingeckoApiKey,
block.header.timestamp,
);
// Step 3: Calculate USD value using utility function
const lpTokenSwapUsdValue = pricePosition(
lpTokenPrice,
value,
protocolState.config.lpToken.decimals,
);
// Step 4: Process value change using common utility
const newHistoryWindows = processValueChange({
from,
to,
amount: value,
usdValue: lpTokenSwapUsdValue,
blockTimestamp: block.header.timestamp,
blockHeight: block.header.height,
txHash: log.transactionHash,
activeBalances: protocolState.activeBalances,
windowDurationMs: this.refreshWindow,
tokenPrice: lpTokenPrice,
tokenDecimals: protocolState.config.lpToken.decimals,
tokens: {
// Extensive metadata including all pricing data
lpTokenPrice: { value: lpTokenPrice.toString(), type: 'string' },
token0Price: { value: token0Price.toString(), type: 'string' },
reserve0: { value: reserve0.toString(), type: 'string' },
// ... complete token and pool data
},
contractAddress: protocol.contractAddress,
});
protocolState.balanceWindows.push(...newHistoryWindows);
}
computeLpTokenPrice()
- Calculates LP token value from reserves and CoinGecko pricespricePosition()
- Converts LP token amount to USD valueprocessValueChange()
- Handles balance updates and creates time windows
⏰ Periodic Balance Flush
private async processPeriodicBalanceFlush(
ctx: any,
block: any,
protocolState: ProtocolStateUniv2,
): Promise<void> {
const currentTs = block.header.timestamp;
// Initialize if first time
if (!protocolState.processState?.lastInterpolatedTs) {
protocolState.processState.lastInterpolatedTs = currentTs;
}
// Process all missed windows using while loop
while (
protocolState.processState.lastInterpolatedTs &&
Number(protocolState.processState.lastInterpolatedTs) + this.refreshWindow < currentTs
) {
const windowsSinceEpoch = Math.floor(
Number(protocolState.processState.lastInterpolatedTs) / this.refreshWindow,
);
const nextBoundaryTs: number = (windowsSinceEpoch + 1) * this.refreshWindow;
// Create balance windows for all active users
for (const [userAddress, data] of protocolState.activeBalances.entries()) {
const oldStart = data.updatedBlockTs;
if (data.balance > 0n && oldStart < nextBoundaryTs) {
// Get current LP token price
const { price: lpTokenPrice /* ... other prices */ } = await computeLpTokenPrice(
ctx, block, protocolState.config, protocolState.state,
this.env.coingeckoApiKey, currentTs,
);
const lpTokenSwapUsdValue = pricePosition(
lpTokenPrice,
data.balance,
protocolState.config.lpToken.decimals,
);
// Create time window for this period
protocolState.balanceWindows.push({
userAddress: userAddress,
deltaAmount: 0, // No change, just time passage
trigger: TimeWindowTrigger.EXHAUSTED,
startTs: oldStart,
endTs: nextBoundaryTs,
windowDurationMs: this.refreshWindow,
balanceBefore: data.balance.toString(),
balanceAfter: data.balance.toString(), // Same balance
valueUsd: lpTokenSwapUsdValue,
// ... complete token metadata
});
// Update balance timestamp
protocolState.activeBalances.set(userAddress, {
balance: data.balance,
updatedBlockTs: nextBoundaryTs,
updatedBlockHeight: block.header.height,
});
}
}
// Move to next window
protocolState.processState.lastInterpolatedTs = BigInt(nextBoundaryTs);
}
}
- Uses
while
loop to handle multiple missed windows - Tracks
lastInterpolatedTs
for precise timing - Uses
TimeWindowTrigger.EXHAUSTED
for periodic flushes - Updates balance timestamps without changing amounts
- User adds $1000 LP tokens at 10:00 AM (window starts)
- No transfers happen for 6 hours
- At 4:00 PM, system processes a new block
- Creates EXHAUSTED event:
startTs: 10:00 AM, endTs: 4:00 PM, deltaAmount: 0, valueUsd: $1000
- This captures $1000 × 6 hours = $6000 USD-hours of time-weighted balance
- User's balance remains $1000, but timestamp updates to 4:00 PM for next window
Finalization & API Transmission
private async finalizeBatch(
ctx: any,
protocolStates: Map<string, ProtocolStateUniv2>,
): Promise<void> {
for (const protocol of this.protocols) {
const protocolState = protocolStates.get(protocol.contractAddress.toLowerCase())!;
// Skip if not properly initialized
if (!protocolState.config.id || !protocolState.config.token0 ||
!protocolState.config.token1 || !protocolState.config.lpToken) {
logger.warn(`Skipping finalize for ${protocol.contractAddress} - config not initialized`);
continue;
}
// Transform data using utility functions
const balances = toTimeWeightedBalance(
protocolState.balanceWindows,
{ ...protocol, type: this.protocolType },
this.env,
this.chainConfig,
);
const transactions = toTransaction(
protocolState.transactions,
{ ...protocol, type: this.protocolType },
this.env,
this.chainConfig,
);
// Send to Absinthe API
await this.apiClient.send(balances);
await this.apiClient.send(transactions);
// Save state to database (conditional upserts)
if (protocolState.config.token0) await ctx.store.upsert(protocolState.config.token0);
if (protocolState.config.token1) await ctx.store.upsert(protocolState.config.token1);
if (protocolState.config.lpToken) await ctx.store.upsert(protocolState.config.lpToken);
if (protocolState.config.id) await ctx.store.upsert(protocolState.config);
if (protocolState.state.id) await ctx.store.upsert(protocolState.state);
if (protocolState.processState.id) await ctx.store.upsert(protocolState.processState);
// Save active balances as JSON
await ctx.store.upsert(new ActiveBalances({
id: `${protocol.contractAddress.toLowerCase()}-active-balances`,
activeBalancesMap: mapToJson(protocolState.activeBalances),
}));
}
}
toTimeWeightedBalance()
- Transforms balance windows to API formattoTransaction()
- Transforms transaction events to API formatmapToJson()
- Serializes active balances for database storage