awaitTxId(txid, timeout?)
awaitMatch(matchFn, timeout?)
Electric collections provide seamless integration between TanStack DB and ElectricSQL, enabling real-time data synchronization with your Postgres database through Electric's sync engine.
The @tanstack/electric-db-collection package allows you to create collections that:
npm install @tanstack/electric-db-collection @tanstack/react-db
npm install @tanstack/electric-db-collection @tanstack/react-db
import { createCollection } from '@tanstack/react-db'
import { electricCollectionOptions } from '@tanstack/electric-db-collection'
const todosCollection = createCollection(
electricCollectionOptions({
shapeOptions: {
url: '/api/todos',
},
getKey: (item) => item.id,
})
)
import { createCollection } from '@tanstack/react-db'
import { electricCollectionOptions } from '@tanstack/electric-db-collection'
const todosCollection = createCollection(
electricCollectionOptions({
shapeOptions: {
url: '/api/todos',
},
getKey: (item) => item.id,
})
)
The electricCollectionOptions function accepts the following options:
shapeOptions: Configuration for the ElectricSQL ShapeStream
getKey: Function to extract the unique key from an item
Handlers are called before mutations to persist changes to your backend:
Each handler should return { txid } to wait for synchronization. For cases where your API can not return txids, use the awaitMatch utility function.
Handlers persist mutations to the backend and wait for Electric to sync the changes back. This prevents UI glitches where optimistic updates would be removed and then re-added. TanStack DB blocks sync data until the mutation is confirmed, ensuring smooth user experience.
The recommended approach uses PostgreSQL transaction IDs (txids) for precise matching. The backend returns a txid, and the client waits for that specific txid to appear in the Electric stream.
const todosCollection = createCollection(
electricCollectionOptions({
id: 'todos',
schema: todoSchema,
getKey: (item) => item.id,
shapeOptions: {
url: '/api/todos',
params: { table: 'todos' },
},
onInsert: async ({ transaction }) => {
const newItem = transaction.mutations[0].modified
const response = await api.todos.create(newItem)
// Return txid to wait for sync
return { txid: response.txid }
},
onUpdate: async ({ transaction }) => {
const { original, changes } = transaction.mutations[0]
const response = await api.todos.update({
where: { id: original.id },
data: changes
})
return { txid: response.txid }
}
})
)
const todosCollection = createCollection(
electricCollectionOptions({
id: 'todos',
schema: todoSchema,
getKey: (item) => item.id,
shapeOptions: {
url: '/api/todos',
params: { table: 'todos' },
},
onInsert: async ({ transaction }) => {
const newItem = transaction.mutations[0].modified
const response = await api.todos.create(newItem)
// Return txid to wait for sync
return { txid: response.txid }
},
onUpdate: async ({ transaction }) => {
const { original, changes } = transaction.mutations[0]
const response = await api.todos.update({
where: { id: original.id },
data: changes
})
return { txid: response.txid }
}
})
)
For cases where txids aren't available, use the awaitMatch utility function to wait for synchronization with custom matching logic:
import { isChangeMessage } from '@tanstack/electric-db-collection'
const todosCollection = createCollection(
electricCollectionOptions({
id: 'todos',
getKey: (item) => item.id,
shapeOptions: {
url: '/api/todos',
params: { table: 'todos' },
},
onInsert: async ({ transaction, collection }) => {
const newItem = transaction.mutations[0].modified
await api.todos.create(newItem)
// Use awaitMatch utility for custom matching
await collection.utils.awaitMatch(
(message) => {
return isChangeMessage(message) &&
message.headers.operation === 'insert' &&
message.value.text === newItem.text
},
5000 // timeout in ms (optional, defaults to 3000)
)
}
})
)
import { isChangeMessage } from '@tanstack/electric-db-collection'
const todosCollection = createCollection(
electricCollectionOptions({
id: 'todos',
getKey: (item) => item.id,
shapeOptions: {
url: '/api/todos',
params: { table: 'todos' },
},
onInsert: async ({ transaction, collection }) => {
const newItem = transaction.mutations[0].modified
await api.todos.create(newItem)
// Use awaitMatch utility for custom matching
await collection.utils.awaitMatch(
(message) => {
return isChangeMessage(message) &&
message.headers.operation === 'insert' &&
message.value.text === newItem.text
},
5000 // timeout in ms (optional, defaults to 3000)
)
}
})
)
For quick prototyping or when you're confident about timing, you can use a simple timeout. This is crude but works as almost always the data will be synced back in under 2 seconds:
const todosCollection = createCollection(
electricCollectionOptions({
id: 'todos',
getKey: (item) => item.id,
shapeOptions: {
url: '/api/todos',
params: { table: 'todos' },
},
onInsert: async ({ transaction }) => {
const newItem = transaction.mutations[0].modified
await api.todos.create(newItem)
// Simple timeout approach
await new Promise(resolve => setTimeout(resolve, 2000))
}
})
)
const todosCollection = createCollection(
electricCollectionOptions({
id: 'todos',
getKey: (item) => item.id,
shapeOptions: {
url: '/api/todos',
params: { table: 'todos' },
},
onInsert: async ({ transaction }) => {
const newItem = transaction.mutations[0].modified
await api.todos.create(newItem)
// Simple timeout approach
await new Promise(resolve => setTimeout(resolve, 2000))
}
})
)
On the backend, you can extract the txid for a transaction by querying Postgres directly.
async function generateTxId(tx) {
// The ::xid cast strips off the epoch, giving you the raw 32-bit value
// that matches what PostgreSQL sends in logical replication streams
// (and then exposed through Electric which we'll match against
// in the client).
const result = await tx.execute(
sql`SELECT pg_current_xact_id()::xid::text as txid`
)
const txid = result.rows[0]?.txid
if (txid === undefined) {
throw new Error(`Failed to get transaction ID`)
}
return parseInt(txid as string, 10)
}
async function generateTxId(tx) {
// The ::xid cast strips off the epoch, giving you the raw 32-bit value
// that matches what PostgreSQL sends in logical replication streams
// (and then exposed through Electric which we'll match against
// in the client).
const result = await tx.execute(
sql`SELECT pg_current_xact_id()::xid::text as txid`
)
const txid = result.rows[0]?.txid
if (txid === undefined) {
throw new Error(`Failed to get transaction ID`)
}
return parseInt(txid as string, 10)
}
Electric is typically deployed behind a proxy server that handles shape configuration, authentication and authorization. This provides better security and allows you to control what data users can access without exposing Electric to the client.
Here is an example proxy implementation using TanStack Starter:
import { createServerFileRoute } from "@tanstack/react-start/server"
import { ELECTRIC_PROTOCOL_QUERY_PARAMS } from "@electric-sql/client"
// Electric URL
const baseUrl = 'http://.../v1/shape'
const serve = async ({ request }: { request: Request }) => {
// ...check user authorization
const url = new URL(request.url)
const originUrl = new URL(baseUrl)
// passthrough parameters from electric client
url.searchParams.forEach((value, key) => {
if (ELECTRIC_PROTOCOL_QUERY_PARAMS.includes(key)) {
originUrl.searchParams.set(key, value)
}
})
// set shape parameters
// full spec: https://github.com/electric-sql/electric/blob/main/website/electric-api.yaml
originUrl.searchParams.set("table", "todos")
// Where clause to filter rows in the table (optional).
// originUrl.searchParams.set("where", "completed = true")
// Select the columns to sync (optional)
// originUrl.searchParams.set("columns", "id,text,completed")
const response = await fetch(originUrl)
const headers = new Headers(response.headers)
headers.delete("content-encoding")
headers.delete("content-length")
return new Response(response.body, {
status: response.status,
statusText: response.statusText,
headers,
})
}
export const ServerRoute = createServerFileRoute("/api/todos").methods({
GET: serve,
})
import { createServerFileRoute } from "@tanstack/react-start/server"
import { ELECTRIC_PROTOCOL_QUERY_PARAMS } from "@electric-sql/client"
// Electric URL
const baseUrl = 'http://.../v1/shape'
const serve = async ({ request }: { request: Request }) => {
// ...check user authorization
const url = new URL(request.url)
const originUrl = new URL(baseUrl)
// passthrough parameters from electric client
url.searchParams.forEach((value, key) => {
if (ELECTRIC_PROTOCOL_QUERY_PARAMS.includes(key)) {
originUrl.searchParams.set(key, value)
}
})
// set shape parameters
// full spec: https://github.com/electric-sql/electric/blob/main/website/electric-api.yaml
originUrl.searchParams.set("table", "todos")
// Where clause to filter rows in the table (optional).
// originUrl.searchParams.set("where", "completed = true")
// Select the columns to sync (optional)
// originUrl.searchParams.set("columns", "id,text,completed")
const response = await fetch(originUrl)
const headers = new Headers(response.headers)
headers.delete("content-encoding")
headers.delete("content-length")
return new Response(response.body, {
status: response.status,
statusText: response.statusText,
headers,
})
}
export const ServerRoute = createServerFileRoute("/api/todos").methods({
GET: serve,
})
For more advanced use cases, you can create custom actions that can do multiple mutations across collections transactionally. You can use the utility methods to wait for synchronization with different strategies:
const addTodoAction = createOptimisticAction({
onMutate: ({ text }) => {
// optimistically insert with a temporary ID
const tempId = crypto.randomUUID()
todosCollection.insert({
id: tempId,
text,
completed: false,
created_at: new Date(),
})
// ... mutate other collections
},
mutationFn: async ({ text }) => {
const response = await api.todos.create({
data: { text, completed: false }
})
// Wait for the specific txid
await todosCollection.utils.awaitTxId(response.txid)
}
})
const addTodoAction = createOptimisticAction({
onMutate: ({ text }) => {
// optimistically insert with a temporary ID
const tempId = crypto.randomUUID()
todosCollection.insert({
id: tempId,
text,
completed: false,
created_at: new Date(),
})
// ... mutate other collections
},
mutationFn: async ({ text }) => {
const response = await api.todos.create({
data: { text, completed: false }
})
// Wait for the specific txid
await todosCollection.utils.awaitTxId(response.txid)
}
})
import { isChangeMessage } from '@tanstack/electric-db-collection'
const addTodoAction = createOptimisticAction({
onMutate: ({ text }) => {
const tempId = crypto.randomUUID()
todosCollection.insert({
id: tempId,
text,
completed: false,
created_at: new Date(),
})
},
mutationFn: async ({ text }) => {
await api.todos.create({
data: { text, completed: false }
})
// Wait for matching message
await todosCollection.utils.awaitMatch(
(message) => {
return isChangeMessage(message) &&
message.headers.operation === 'insert' &&
message.value.text === text
}
)
}
})
import { isChangeMessage } from '@tanstack/electric-db-collection'
const addTodoAction = createOptimisticAction({
onMutate: ({ text }) => {
const tempId = crypto.randomUUID()
todosCollection.insert({
id: tempId,
text,
completed: false,
created_at: new Date(),
})
},
mutationFn: async ({ text }) => {
await api.todos.create({
data: { text, completed: false }
})
// Wait for matching message
await todosCollection.utils.awaitMatch(
(message) => {
return isChangeMessage(message) &&
message.headers.operation === 'insert' &&
message.value.text === text
}
)
}
})
The collection provides these utility methods via collection.utils:
Manually wait for a specific transaction ID to be synchronized:
// Wait for specific txid
await todosCollection.utils.awaitTxId(12345)
// With custom timeout (default is 30 seconds)
await todosCollection.utils.awaitTxId(12345, 10000)
// Wait for specific txid
await todosCollection.utils.awaitTxId(12345)
// With custom timeout (default is 30 seconds)
await todosCollection.utils.awaitTxId(12345, 10000)
This is useful when you need to ensure a mutation has been synchronized before proceeding with other operations.
Manually wait for a custom match function to find a matching message:
import { isChangeMessage } from '@tanstack/electric-db-collection'
// Wait for a specific message pattern
await todosCollection.utils.awaitMatch(
(message) => {
return isChangeMessage(message) &&
message.headers.operation === 'insert' &&
message.value.text === 'New Todo'
},
5000 // timeout in ms
)
import { isChangeMessage } from '@tanstack/electric-db-collection'
// Wait for a specific message pattern
await todosCollection.utils.awaitMatch(
(message) => {
return isChangeMessage(message) &&
message.headers.operation === 'insert' &&
message.value.text === 'New Todo'
},
5000 // timeout in ms
)
The package exports helper functions for use in custom match functions:
import { isChangeMessage, isControlMessage } from '@tanstack/electric-db-collection'
// Use in custom match functions
const matchFn = (message) => {
if (isChangeMessage(message)) {
return message.headers.operation === 'insert'
}
return false
}
import { isChangeMessage, isControlMessage } from '@tanstack/electric-db-collection'
// Use in custom match functions
const matchFn = (message) => {
if (isChangeMessage(message)) {
return message.headers.operation === 'insert'
}
return false
}
A frequent issue developers encounter is that awaitTxId (or the transaction's isPersisted.promise) stalls indefinitely, eventually timing out with no error messages. The data persists correctly to the database, but the optimistic mutation never resolves.
Root Cause: This happens when the transaction ID (txid) returned from your API doesn't match the actual transaction ID of the mutation in Postgres. This mismatch occurs when you query pg_current_xact_id() outside the same transaction that performs the mutation.
To diagnose txid issues, enable debug logging in your browser console:
localStorage.debug = 'ts/db:electric'
localStorage.debug = 'ts/db:electric'
This will show you when mutations start waiting for txids and when txids arrive from Electric's sync stream.
This is powered by the debug package.
When txids DON'T match (common bug):
ts/db:electric awaitTxId called with txid 124
ts/db:electric new txids synced from pg [123]
// Stalls forever - 124 never arrives!
ts/db:electric awaitTxId called with txid 124
ts/db:electric new txids synced from pg [123]
// Stalls forever - 124 never arrives!
In this example, the mutation happened in transaction 123, but you queried pg_current_xact_id() in a separate transaction (124) that ran after the mutation. The client waits for 124 which will never arrive.
When txids DO match (correct):
ts/db:electric awaitTxId called with txid 123
ts/db:electric new txids synced from pg [123]
ts/db:electric awaitTxId found match for txid 123
// Resolves immediately!
ts/db:electric awaitTxId called with txid 123
ts/db:electric new txids synced from pg [123]
ts/db:electric awaitTxId found match for txid 123
// Resolves immediately!
You must call pg_current_xact_id() inside the same transaction as your mutation:
❌ Wrong - txid queried outside transaction:
// DON'T DO THIS
async function createTodo(data) {
const txid = await generateTxId(sql) // Wrong: separate transaction
await sql.begin(async (tx) => {
await tx`INSERT INTO todos ${tx(data)}`
})
return { txid } // This txid won't match!
}
// DON'T DO THIS
async function createTodo(data) {
const txid = await generateTxId(sql) // Wrong: separate transaction
await sql.begin(async (tx) => {
await tx`INSERT INTO todos ${tx(data)}`
})
return { txid } // This txid won't match!
}
✅ Correct - txid queried inside transaction:
// DO THIS
async function createTodo(data) {
let txid!: Txid
const result = await sql.begin(async (tx) => {
// Call generateTxId INSIDE the transaction
txid = await generateTxId(tx)
const [todo] = await tx`
INSERT INTO todos ${tx(data)}
RETURNING *
`
return todo
})
return { todo: result, txid } // txid matches the mutation
}
async function generateTxId(tx: any): Promise<Txid> {
const result = await tx`SELECT pg_current_xact_id()::xid::text as txid`
const txid = result[0]?.txid
if (txid === undefined) {
throw new Error(`Failed to get transaction ID`)
}
return parseInt(txid, 10)
}
// DO THIS
async function createTodo(data) {
let txid!: Txid
const result = await sql.begin(async (tx) => {
// Call generateTxId INSIDE the transaction
txid = await generateTxId(tx)
const [todo] = await tx`
INSERT INTO todos ${tx(data)}
RETURNING *
`
return todo
})
return { todo: result, txid } // txid matches the mutation
}
async function generateTxId(tx: any): Promise<Txid> {
const result = await tx`SELECT pg_current_xact_id()::xid::text as txid`
const txid = result[0]?.txid
if (txid === undefined) {
throw new Error(`Failed to get transaction ID`)
}
return parseInt(txid, 10)
}
See working examples in:
Your weekly dose of JavaScript news. Delivered every Monday to over 100,000 devs, for free.