Event Indexer
Index and query blockchain events efficiently.
event-indexer.ts
import { createPublicClient, webSocket } from 'viem'
import { riseTestnet } from 'viem/chains'
import { watchShreds } from 'shreds/viem'
import { PrismaClient } from '@prisma/client'
const prisma = new PrismaClient()
const wsClient = createPublicClient({
chain: riseTestnet,
transport: webSocket()
})
class EventIndexer {
async start() {
console.log('Starting event indexer...')
// Index historical events
await this.indexHistoricalEvents()
// Watch for new events
this.watchNewEvents()
}
private async indexHistoricalEvents() {
const latestBlock = await wsClient.getBlockNumber()
const lastIndexed = await this.getLastIndexedBlock()
if (lastIndexed < latestBlock) {
console.log(`Indexing blocks ${lastIndexed + 1n} to ${latestBlock}`)
const logs = await wsClient.getLogs({
fromBlock: lastIndexed + 1n,
toBlock: latestBlock
})
await this.processLogs(logs)
await this.updateLastIndexedBlock(latestBlock)
}
}
private watchNewEvents() {
// Watch all contract events
wsClient.watchEvent({
onLogs: async (logs) => {
await this.processLogs(logs)
}
})
// Watch shreds for instant updates - avoid RPC calls inside!
watchShreds(wsClient, {
onShred: (shred) => {
console.log(`Processing shred at block ${shred.blockNumber}, index ${shred.shredIndex}`)
// Process shred transactions efficiently using provided data
const logsToIndex: any[] = []
for (const tx of shred.transactions) {
console.log(`Transaction ${tx.hash} with ${tx.logs.length} logs`)
// Collect logs for batch processing
tx.logs.forEach((log, index) => {
logsToIndex.push({
blockNumber: shred.blockNumber,
blockHash: `0x${shred.blockNumber.toString(16)}`, // Placeholder
transactionHash: tx.hash,
address: log.address,
topics: log.topics,
data: log.data,
logIndex: shred.startingLogIndex + index
})
})
}
// Process logs without awaiting to avoid blocking shred processing
if (logsToIndex.length > 0) {
this.processLogs(logsToIndex).catch(error => {
console.error('Error processing shred logs:', error)
})
}
},
onError: (error) => {
console.error('Shred watching error:', error)
}
})
}
private async processLogs(logs: any[]) {
for (const log of logs) {
await prisma.event.create({
data: {
blockNumber: Number(log.blockNumber),
blockHash: log.blockHash,
transactionHash: log.transactionHash,
address: log.address,
topics: log.topics,
data: log.data,
logIndex: log.logIndex
}
})
}
console.log(`Indexed ${logs.length} events`)
}
private async getLastIndexedBlock(): Promise<bigint> {
const last = await prisma.metadata.findUnique({
where: { key: 'lastIndexedBlock' }
})
return BigInt(last?.value || '0')
}
private async updateLastIndexedBlock(block: bigint) {
await prisma.metadata.upsert({
where: { key: 'lastIndexedBlock' },
create: { key: 'lastIndexedBlock', value: block.toString() },
update: { value: block.toString() }
})
}
}
// Run indexer
const indexer = new EventIndexer()
indexer.start()
Database Schema
// schema.prisma
model Event {
id String @id @default(cuid())
blockNumber Int
blockHash String
transactionHash String
address String
topics String[]
data String
logIndex Int
createdAt DateTime @default(now())
@@index([address])
@@index([blockNumber])
@@index([transactionHash])
}
model Metadata {
key String @id
value String
}
Key Features
- Historical Sync: Indexes past events on startup
- Real-Time Updates: Processes new events instantly via shreds
- Efficient Processing: Batch inserts for better performance
- Error Recovery: Continues indexing after errors
Query Examples
// Find all events for a specific contract
const contractEvents = await prisma.event.findMany({
where: {
address: '0x742d35Cc6634C0532925a3b844Bc9e7595f6E98d'
},
orderBy: {
blockNumber: 'desc'
}
})
// Find events by topic (e.g., Transfer events)
const transferTopic = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
const transfers = await prisma.event.findMany({
where: {
topics: {
has: transferTopic
}
}
})
// Get events in block range
const rangeEvents = await prisma.event.findMany({
where: {
blockNumber: {
gte: 1000000,
lte: 1001000
}
}
})
Performance Tips
- Avoid RPC in Shreds: Use provided data instead of making calls
- Batch Operations: Insert multiple events at once
- Index Wisely: Only index events you need
- Use Filters: Filter events at the source when possible
Advanced Features
// Decode and index specific event types
import { decodeEventLog } from 'viem'
const erc20TransferAbi = [{
type: 'event',
name: 'Transfer',
inputs: [
{ indexed: true, name: 'from', type: 'address' },
{ indexed: true, name: 'to', type: 'address' },
{ indexed: false, name: 'value', type: 'uint256' }
]
}] as const
// In processLogs method
if (log.topics[0] === transferTopic) {
const decoded = decodeEventLog({
abi: erc20TransferAbi,
data: log.data,
topics: log.topics
})
await prisma.transfer.create({
data: {
from: decoded.args.from,
to: decoded.args.to,
value: decoded.args.value.toString(),
transactionHash: log.transactionHash
}
})
}
Next Steps
- Add GraphQL API for queries
- Implement event notifications
- Create analytics dashboards
- Add support for custom decoders