Skip to content

BatchProcessor.ts - The Core Business Logic Engine

What It Does

The BatchProcessor is the core business logic engine that:

  • Processes blockchain events (Swap and Transfer)
  • Manages state across multiple pools using utility functions
  • Calculates time-weighted balances with periodic flushes
  • Handles data transmission to Absinthe API

Class Structure & Dependencies

import { ActiveBalances, PoolState, PoolProcessState, PoolConfig } from './model';
import {
  AbsintheApiClient,
  ActiveBalance,
  BatchContext,
  Chain,
  ProtocolType,
  ValidatedDexProtocolConfig,
  ValidatedEnvBase,
} from '@absinthe/common';
import { processor } from './processor';
import { createHash } from 'crypto';
import { TypeormDatabase } from '@subsquid/typeorm-store';
import { initPoolConfigIfNeeded, loadActiveBalancesFromDb, loadPoolConfigFromDb } from './utils/pool';
import { ProtocolStateUniv2 } from './utils/types';
import * as univ2Abi from './abi/univ2';
import { computeLpTokenPrice, computePricedSwapVolume } from './utils/pricing';
import { processValueChange, toTimeWeightedBalance, toTransaction, pricePosition } from '@absinthe/common';
 
export class UniswapV2Processor {
  private readonly protocols: ProtocolConfig[];
  private readonly protocolType: ProtocolType;
  private readonly schemaName: string;
  private readonly refreshWindow: number;
  private readonly apiClient: AbsintheApiClient;
  private readonly chainConfig: Chain;
  private readonly env: ValidatedEnvBase;
 
  constructor(
    dexProtocol: ValidatedDexProtocolConfig,
    refreshWindow: number,
    apiClient: AbsintheApiClient,
    env: ValidatedEnvBase,
    chainConfig: Chain,
  ) {
    this.protocols = dexProtocol.protocols;
    this.protocolType = dexProtocol.type;
    this.refreshWindow = refreshWindow;
    this.apiClient = apiClient;
    this.env = env;
    this.chainConfig = chainConfig;
    this.schemaName = this.generateSchemaName();
  }
}

Schema Name Generation

private generateSchemaName(): string {
  const uniquePoolCombination = this.protocols
    .reduce((acc, protocol) => acc + protocol.contractAddress.toLowerCase(), '')
    .concat(this.chainConfig.networkId.toString());
 
  const hash = createHash('md5').update(uniquePoolCombination).digest('hex').slice(0, 8);
  return `univ2-${hash}`;
}
Purpose:
  • Creates a unique database schema name for this processor
  • Combines all pool addresses + chain network ID
  • Uses MD5 hash with univ2- prefix
  • Allows multiple processors to use the same database without conflicts

Main Processing Flow

async run(): Promise<void> {
  processor.run(
    new TypeormDatabase({ supportHotBlocks: false, stateSchema: this.schemaName }),
    async (ctx) => {
      try {
        await this.processBatch(ctx);
      } catch (error) {
        logger.error('Error processing batch:', (error as Error).message);
        throw error;
      }
    },
  );
}

Error Handling

  • Comprehensive try/catch with logging
  • Re-throws errors after logging for upstream handling
  • Uses structured logging with batch context

Batch Processing Pipeline

private async processBatch(ctx: any): Promise<void> {
  logger.info(`Processing batch with ${ctx.blocks.length} blocks`, {
    blockRange: `${ctx.blocks[0]?.header.height} - ${ctx.blocks[ctx.blocks.length - 1]?.header.height}`,
  });
 
  // Step 1: Initialize protocol states using utility functions
  const protocolStates = await this.initializeProtocolStates(ctx);
 
  // Step 2: Process each block
  for (const block of ctx.blocks) {
    logger.info(`Processing block ${block.header.height}`, {
      timestamp: new Date(block.header.timestamp).toISOString(),
    });
    await this.processBlock({ ctx, block, protocolStates });
  }
 
  // Step 3: Finalize and send data
  await this.finalizeBatch(ctx, protocolStates);
}

Step 1: Initialize Protocol States

private async initializeProtocolStates(ctx: any): Promise<Map<string, ProtocolStateUniv2>> {
  const protocolStates = new Map<string, ProtocolStateUniv2>();
 
  for (const protocol of this.protocols) {
    const contractAddress = protocol.contractAddress.toLowerCase();
 
    // Load existing state from database using utility functions
    let poolConfig = await loadPoolConfigFromDb(ctx, contractAddress);
    let poolState = await loadPoolStateFromDb(ctx, contractAddress);
    let poolProcessState = await loadPoolProcessStateFromDb(ctx, contractAddress);
    let activeBalances = await loadActiveBalancesFromDb(ctx, contractAddress);
 
    // Initialize config if missing
    if (!poolConfig?.id) {
      const initBlock = this.findInitializationBlock(ctx.blocks, protocol.fromBlock);
      if (initBlock) {
        poolConfig = await initPoolConfigIfNeeded(
          ctx,
          initBlock,
          contractAddress,
          poolConfig || new PoolConfig({}),
          protocol,
        );
      }
    }
 
    // Initialize state if missing (requires config)
    if (!poolState?.id && poolConfig?.id) {
      const initBlock = this.findInitializationBlock(ctx.blocks, protocol.fromBlock);
      if (initBlock) {
        poolState = await initPoolStateIfNeeded(
          ctx,
          initBlock,
          contractAddress,
          poolState || new PoolState({}),
          poolConfig,
        );
      }
    }
 
    // Initialize process state if missing
    if (!poolProcessState?.id) {
      poolProcessState = await initPoolProcessStateIfNeeded(
        contractAddress,
        poolConfig || new PoolConfig({}),
        poolProcessState || new PoolProcessState({}),
      );
    }
 
    // Create protocol state object
    protocolStates.set(contractAddress, {
      config: poolConfig || new PoolConfig({}),
      state: poolState || new PoolState({}),
      processState: poolProcessState || new PoolProcessState({}),
      activeBalances: activeBalances || new Map<string, ActiveBalance>(),
      balanceWindows: [],
      transactions: [],
    });
  }
 
  return protocolStates;
}
Key Dependencies:
  • loadPoolConfigFromDb() - Loads token metadata from database
  • loadPoolStateFromDb() - Loads current reserves and supply
  • loadActiveBalancesFromDb() - Loads user balance state
  • initPoolConfigIfNeeded() - Initializes pool configuration on-chain
  • initPoolStateIfNeeded() - Initializes pool state from blockchain
  • initPoolProcessStateIfNeeded() - Creates processing state

💰 Swap Event Processing

private async processSwapEvent(
  ctx: any,
  block: any,
  log: any,
  protocol: ProtocolConfig,
  protocolState: ProtocolStateUniv2,
): Promise<void> {
  // Step 1: Decode event
  const { sender, amount0In, amount0Out, amount1In, amount1Out } = univ2Abi.events.Swap.decode(log);
 
  // Step 2: Calculate amounts (simple addition, not BigNumber)
  const token0Amount = amount0In + amount0Out;
  const token1Amount = amount1In + amount1Out;
 
  // Step 3: Calculate gas fee in USD
  const { gasPrice, gasUsed } = log.transaction;
  const gasFee = Number(gasUsed) * Number(gasPrice);
  const displayGasFee = gasFee / 10 ** 18;
  const ethPriceUsd = await fetchHistoricalUsd(
    'ethereum',
    block.header.timestamp,
    this.env.coingeckoApiKey,
  );
  const gasFeeUsd = displayGasFee * ethPriceUsd;
 
  // Step 4: Calculate swap volume using utility function
  const pricedSwapVolume = protocol.preferredTokenCoingeckoId === 'token0'
    ? await computePricedSwapVolume(
        token0Amount,
        protocolState.config.token0.coingeckoId as string,
        protocolState.config.token0.decimals,
        block.header.timestamp,
        this.env.coingeckoApiKey,
      )
    : await computePricedSwapVolume(
        token1Amount,
        protocolState.config.token1.coingeckoId as string,
        protocolState.config.token1.decimals,
        block.header.timestamp,
        this.env.coingeckoApiKey,
      );
 
  // Step 5: Create transaction schema with detailed token info
  const transactionSchema = {
    eventType: MessageType.TRANSACTION,
    eventName: 'Swap',
    tokens: {
      token0Symbol: {
        value: ChainShortName.HEMI,
        type: 'string',
      },
      // ... extensive token metadata
    },
    rawAmount: protocol.preferredTokenCoingeckoId === 'token0'
      ? token0Amount.toString()
      : token1Amount.toString(),
    displayAmount: protocol.preferredTokenCoingeckoId === 'token0'
      ? Number(BigInt(token0Amount) / BigInt(10 ** protocolState.config.token0.decimals))
      : Number(BigInt(token1Amount) / BigInt(10 ** protocolState.config.token1.decimals)),
    unixTimestampMs: block.header.timestamp,
      txHash: log.transactionHash,
      logIndex: log.logIndex,
      blockNumber: block.header.height,
      blockHash: block.header.hash,
      userId: sender,
      currency: Currency.USD,
      valueUsd: pricedSwapVolume,
      gasUsed: Number(gasUsed),
      gasFeeUsd: gasFeeUsd,
  };
 
  protocolState.transactions.push(transactionSchema);
}

Transfer Event Processing

private async processTransferEvent(
  ctx: any,
  block: any,
  log: any,
  protocol: ProtocolConfig,
  protocolState: ProtocolStateUniv2,
): Promise<void> {
  // Step 1: Decode event
  const { from, to, value } = univ2Abi.events.Transfer.decode(log);
 
  // Step 2: Calculate LP token price using utility function
  const {
    price: lpTokenPrice,
    token0Price,
    token1Price,
    token0Value,
    token1Value,
    totalPoolValue,
    totalSupplyBig,
    reserve0,
    reserve1,
  } = await computeLpTokenPrice(
    ctx,
    block,
    protocolState.config,
    protocolState.state,
    this.env.coingeckoApiKey,
    block.header.timestamp,
  );
 
  // Step 3: Calculate USD value using utility function
  const lpTokenSwapUsdValue = pricePosition(
    lpTokenPrice,
    value,
    protocolState.config.lpToken.decimals,
  );
 
  // Step 4: Process value change using common utility
  const newHistoryWindows = processValueChange({
    from,
    to,
    amount: value,
    usdValue: lpTokenSwapUsdValue,
    blockTimestamp: block.header.timestamp,
    blockHeight: block.header.height,
    txHash: log.transactionHash,
    activeBalances: protocolState.activeBalances,
    windowDurationMs: this.refreshWindow,
    tokenPrice: lpTokenPrice,
    tokenDecimals: protocolState.config.lpToken.decimals,
    tokens: {
      // Extensive metadata including all pricing data
      lpTokenPrice: { value: lpTokenPrice.toString(), type: 'string' },
      token0Price: { value: token0Price.toString(), type: 'string' },
      reserve0: { value: reserve0.toString(), type: 'string' },
      // ... complete token and pool data
    },
    contractAddress: protocol.contractAddress,
  });
 
  protocolState.balanceWindows.push(...newHistoryWindows);
}
Key Utilities:
  • computeLpTokenPrice() - Calculates LP token value from reserves and CoinGecko prices
  • pricePosition() - Converts LP token amount to USD value
  • processValueChange() - Handles balance updates and creates time windows

⏰ Periodic Balance Flush

private async processPeriodicBalanceFlush(
  ctx: any,
  block: any,
  protocolState: ProtocolStateUniv2,
): Promise<void> {
  const currentTs = block.header.timestamp;
 
  // Initialize if first time
  if (!protocolState.processState?.lastInterpolatedTs) {
    protocolState.processState.lastInterpolatedTs = currentTs;
  }
 
  // Process all missed windows using while loop
  while (
    protocolState.processState.lastInterpolatedTs &&
    Number(protocolState.processState.lastInterpolatedTs) + this.refreshWindow < currentTs
  ) {
    const windowsSinceEpoch = Math.floor(
      Number(protocolState.processState.lastInterpolatedTs) / this.refreshWindow,
    );
    const nextBoundaryTs: number = (windowsSinceEpoch + 1) * this.refreshWindow;
 
    // Create balance windows for all active users
    for (const [userAddress, data] of protocolState.activeBalances.entries()) {
      const oldStart = data.updatedBlockTs;
      if (data.balance > 0n && oldStart < nextBoundaryTs) {
        // Get current LP token price
        const { price: lpTokenPrice /* ... other prices */ } = await computeLpTokenPrice(
          ctx, block, protocolState.config, protocolState.state,
          this.env.coingeckoApiKey, currentTs,
        );
 
        const lpTokenSwapUsdValue = pricePosition(
          lpTokenPrice,
          data.balance,
          protocolState.config.lpToken.decimals,
        );
 
        // Create time window for this period
        protocolState.balanceWindows.push({
          userAddress: userAddress,
          deltaAmount: 0,  // No change, just time passage
          trigger: TimeWindowTrigger.EXHAUSTED,
          startTs: oldStart,
          endTs: nextBoundaryTs,
          windowDurationMs: this.refreshWindow,
          balanceBefore: data.balance.toString(),
          balanceAfter: data.balance.toString(),  // Same balance
          valueUsd: lpTokenSwapUsdValue,
          // ... complete token metadata
        });
 
        // Update balance timestamp
        protocolState.activeBalances.set(userAddress, {
          balance: data.balance,
          updatedBlockTs: nextBoundaryTs,
          updatedBlockHeight: block.header.height,
        });
      }
    }
 
    // Move to next window
    protocolState.processState.lastInterpolatedTs = BigInt(nextBoundaryTs);
  }
}
Key Features:
  • Uses while loop to handle multiple missed windows
  • Tracks lastInterpolatedTs for precise timing
  • Uses TimeWindowTrigger.EXHAUSTED for periodic flushes
  • Updates balance timestamps without changing amounts
Example of EXHAUSTED Event:
  • User adds $1000 LP tokens at 10:00 AM (window starts)
  • No transfers happen for 6 hours
  • At 4:00 PM, system processes a new block
  • Creates EXHAUSTED event: startTs: 10:00 AM, endTs: 4:00 PM, deltaAmount: 0, valueUsd: $1000
  • This captures $1000 × 6 hours = $6000 USD-hours of time-weighted balance
  • User's balance remains $1000, but timestamp updates to 4:00 PM for next window

Finalization & API Transmission

private async finalizeBatch(
  ctx: any,
  protocolStates: Map<string, ProtocolStateUniv2>,
): Promise<void> {
  for (const protocol of this.protocols) {
    const protocolState = protocolStates.get(protocol.contractAddress.toLowerCase())!;
 
    // Skip if not properly initialized
    if (!protocolState.config.id || !protocolState.config.token0 ||
        !protocolState.config.token1 || !protocolState.config.lpToken) {
      logger.warn(`Skipping finalize for ${protocol.contractAddress} - config not initialized`);
      continue;
    }
 
    // Transform data using utility functions
    const balances = toTimeWeightedBalance(
      protocolState.balanceWindows,
      { ...protocol, type: this.protocolType },
      this.env,
      this.chainConfig,
    );
    const transactions = toTransaction(
      protocolState.transactions,
      { ...protocol, type: this.protocolType },
      this.env,
      this.chainConfig,
    );
 
    // Send to Absinthe API
    await this.apiClient.send(balances);
    await this.apiClient.send(transactions);
 
    // Save state to database (conditional upserts)
    if (protocolState.config.token0) await ctx.store.upsert(protocolState.config.token0);
    if (protocolState.config.token1) await ctx.store.upsert(protocolState.config.token1);
    if (protocolState.config.lpToken) await ctx.store.upsert(protocolState.config.lpToken);
    if (protocolState.config.id) await ctx.store.upsert(protocolState.config);
    if (protocolState.state.id) await ctx.store.upsert(protocolState.state);
    if (protocolState.processState.id) await ctx.store.upsert(protocolState.processState);
 
    // Save active balances as JSON
    await ctx.store.upsert(new ActiveBalances({
      id: `${protocol.contractAddress.toLowerCase()}-active-balances`,
      activeBalancesMap: mapToJson(protocolState.activeBalances),
    }));
  }
}
Key Utilities:
  • toTimeWeightedBalance() - Transforms balance windows to API format
  • toTransaction() - Transforms transaction events to API format
  • mapToJson() - Serializes active balances for database storage