Skip to content

Event Indexer

Index and query blockchain events efficiently.

event-indexer.ts
import { createPublicClient, webSocket } from 'viem'
import { riseTestnet } from 'viem/chains'
import { watchShreds } from 'shreds/viem'
import { PrismaClient } from '@prisma/client'
 
const prisma = new PrismaClient()
const wsClient = createPublicClient({
  chain: riseTestnet,
  transport: webSocket()
})
 
class EventIndexer {
  async start() {
    console.log('Starting event indexer...')
    
    // Index historical events
    await this.indexHistoricalEvents()
    
    // Watch for new events
    this.watchNewEvents()
  }
  
  private async indexHistoricalEvents() {
    const latestBlock = await wsClient.getBlockNumber()
    const lastIndexed = await this.getLastIndexedBlock()
    
    if (lastIndexed < latestBlock) {
      console.log(`Indexing blocks ${lastIndexed + 1n} to ${latestBlock}`)
      
      const logs = await wsClient.getLogs({
        fromBlock: lastIndexed + 1n,
        toBlock: latestBlock
      })
      
      await this.processLogs(logs)
      await this.updateLastIndexedBlock(latestBlock)
    }
  }
  
  private watchNewEvents() {
    // Watch all contract events
    wsClient.watchEvent({
      onLogs: async (logs) => {
        await this.processLogs(logs)
      }
    })
    
    // Watch shreds for instant updates - avoid RPC calls inside!
    watchShreds(wsClient, {
      onShred: (shred) => {
        console.log(`Processing shred at block ${shred.blockNumber}, index ${shred.shredIndex}`)
        
        // Process shred transactions efficiently using provided data
        const logsToIndex: any[] = []
        
        for (const tx of shred.transactions) {
          console.log(`Transaction ${tx.hash} with ${tx.logs.length} logs`)
          
          // Collect logs for batch processing
          tx.logs.forEach((log, index) => {
            logsToIndex.push({
              blockNumber: shred.blockNumber,
              blockHash: `0x${shred.blockNumber.toString(16)}`, // Placeholder
              transactionHash: tx.hash,
              address: log.address,
              topics: log.topics,
              data: log.data,
              logIndex: shred.startingLogIndex + index
            })
          })
        }
        
        // Process logs without awaiting to avoid blocking shred processing
        if (logsToIndex.length > 0) {
          this.processLogs(logsToIndex).catch(error => {
            console.error('Error processing shred logs:', error)
          })
        }
      },
      onError: (error) => {
        console.error('Shred watching error:', error)
      }
    })
  }
  
  private async processLogs(logs: any[]) {
    for (const log of logs) {
      await prisma.event.create({
        data: {
          blockNumber: Number(log.blockNumber),
          blockHash: log.blockHash,
          transactionHash: log.transactionHash,
          address: log.address,
          topics: log.topics,
          data: log.data,
          logIndex: log.logIndex
        }
      })
    }
    
    console.log(`Indexed ${logs.length} events`)
  }
  
  private async getLastIndexedBlock(): Promise<bigint> {
    const last = await prisma.metadata.findUnique({
      where: { key: 'lastIndexedBlock' }
    })
    return BigInt(last?.value || '0')
  }
  
  private async updateLastIndexedBlock(block: bigint) {
    await prisma.metadata.upsert({
      where: { key: 'lastIndexedBlock' },
      create: { key: 'lastIndexedBlock', value: block.toString() },
      update: { value: block.toString() }
    })
  }
}
 
// Run indexer
const indexer = new EventIndexer()
indexer.start()

Database Schema

// schema.prisma
model Event {
  id              String   @id @default(cuid())
  blockNumber     Int
  blockHash       String
  transactionHash String
  address         String
  topics          String[]
  data            String
  logIndex        Int
  createdAt       DateTime @default(now())
  
  @@index([address])
  @@index([blockNumber])
  @@index([transactionHash])
}
 
model Metadata {
  key   String @id
  value String
}

Key Features

  • Historical Sync: Indexes past events on startup
  • Real-Time Updates: Processes new events instantly via shreds
  • Efficient Processing: Batch inserts for better performance
  • Error Recovery: Continues indexing after errors

Query Examples

// Find all events for a specific contract
const contractEvents = await prisma.event.findMany({
  where: {
    address: '0x742d35Cc6634C0532925a3b844Bc9e7595f6E98d'
  },
  orderBy: {
    blockNumber: 'desc'
  }
})
 
// Find events by topic (e.g., Transfer events)
const transferTopic = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
const transfers = await prisma.event.findMany({
  where: {
    topics: {
      has: transferTopic
    }
  }
})
 
// Get events in block range
const rangeEvents = await prisma.event.findMany({
  where: {
    blockNumber: {
      gte: 1000000,
      lte: 1001000
    }
  }
})

Performance Tips

  1. Avoid RPC in Shreds: Use provided data instead of making calls
  2. Batch Operations: Insert multiple events at once
  3. Index Wisely: Only index events you need
  4. Use Filters: Filter events at the source when possible

Advanced Features

// Decode and index specific event types
import { decodeEventLog } from 'viem'
 
const erc20TransferAbi = [{
  type: 'event',
  name: 'Transfer',
  inputs: [
    { indexed: true, name: 'from', type: 'address' },
    { indexed: true, name: 'to', type: 'address' },
    { indexed: false, name: 'value', type: 'uint256' }
  ]
}] as const
 
// In processLogs method
if (log.topics[0] === transferTopic) {
  const decoded = decodeEventLog({
    abi: erc20TransferAbi,
    data: log.data,
    topics: log.topics
  })
  
  await prisma.transfer.create({
    data: {
      from: decoded.args.from,
      to: decoded.args.to,
      value: decoded.args.value.toString(),
      transactionHash: log.transactionHash
    }
  })
}

Next Steps

  • Add GraphQL API for queries
  • Implement event notifications
  • Create analytics dashboards
  • Add support for custom decoders