Realtime — API Reference
supabase-realtime opens a single WebSocket connection to a Supabase project’s
Realtime service and multiplexes many Phoenix channels over it. Each channel can
carry three kinds of traffic: postgres changes (rows inserted, updated or
deleted in your database), broadcast (client-to-client messages, including
raw binary frames), and presence (who is currently on the channel). The
socket speaks the Phoenix protocol at version 2.0.0, in which every frame is an
array [join_ref, ref, topic, event, payload] — the array encoding is what lets
binary broadcasts travel as real binary frames rather than base64 text.
The connection auto-reconnects with exponential backoff (and jitter) when it
drops, replaying its channel joins, and exposes its lifecycle as a StateFlow.
- Maven artifact:
io.github.androidpoet:supabase-realtime - Entry point:
createRealtimeClient(client)
Methods that touch the network are either Result-first (SupabaseResult<T>) or
suspend until the operation settles. Query methods (getSubscription,
getActiveChannelNames, …) are synchronous snapshots, and event streams are
exposed as Flow/StateFlow. Each symbol below notes which it is.
Creating a client
createRealtimeClient
fun createRealtimeClient(
supabaseClient: SupabaseClient,
config: RealtimeConfig = RealtimeConfig(),
engineFactory: HttpClientEngineFactory<*> = platformEngine(),
): RealtimeClientThe module entry point. Builds a RealtimeClient bound to a
SupabaseClient.
supabaseClient— the core client; supplies the project URL, API key and the current session token used for channel authorization.config— reconnect and heartbeat tuning; seeRealtimeConfig.engineFactory— the Ktor engine used for the WebSocket. Defaults to the platform’s WebSocket-capable engine; pass your own (e.g. a mock engine) in tests.
Returns a RealtimeClient. The socket is not opened here — it connects lazily on
the first subscription, or eagerly when you call connect().
val realtime = createRealtimeClient(client)RealtimeConfig
data class RealtimeConfig(
val autoReconnect: Boolean = true,
val initialReconnectDelayMs: Long = 1_000L,
val maxReconnectDelayMs: Long = 30_000L,
val backoffMultiplier: Double = 2.0,
val reconnectJitter: Boolean = true,
val maxReconnectAttempts: Int = 0,
val heartbeatIntervalMs: Long = 25_000L,
val connectionTimeoutMs: Long = 10_000L,
val logLevel: String? = null,
)Tuning for reconnect and heartbeat behavior. Defaults are production-sane (exponential backoff with jitter, 25s heartbeat); override only what you need.
autoReconnect— reconnect automatically when the socket drops.initialReconnectDelayMs— delay before the first reconnect attempt.maxReconnectDelayMs— upper bound the backoff is clamped to.backoffMultiplier— factor the delay grows by after each failed attempt.reconnectJitter— whentrue, the computed backoff is randomized between half and the full exponential delay (“equal jitter”), so a fleet of clients that all dropped at once don’t reconnect in lockstep and stampede the server.maxReconnectAttempts— give up after this many tries (0= retry forever), transitioning toConnectionState.Failed.heartbeatIntervalMs— how often to send a Phoenix heartbeat to keep the socket alive.connectionTimeoutMs— how long to wait for a connection or channel join to settle before failing it.logLevel— server-side log verbosity, passed as thelog_levelquery param when opening the socket (e.g.info,warn,error);nullomits the param and leaves the server default in effect.
val realtime = createRealtimeClient(
client,
RealtimeConfig(heartbeatIntervalMs = 15_000L, maxReconnectAttempts = 10),
)Connection lifecycle
RealtimeClient
interface RealtimeClientOne WebSocket connection to /realtime/v1/ multiplexing many Phoenix channels.
Build channels with channel, and the socket connects lazily on the
first subscription. Subscriptions are deduplicated by topic — subscribing to the
same channel name twice returns the existing one. The members below cover the
connection; channel and subscription members are documented in later sections.
connectionState
val connectionState: StateFlow<ConnectionState>Cold-start-safe StateFlow of the socket lifecycle; emits the current
ConnectionState on collection and on every transition.
Drive reconnect indicators off this.
realtime.connectionState.collect { state ->
println("realtime: $state")
}isConnected / isConnecting / isDisconnecting
val isConnected: Boolean
val isConnecting: Boolean
val isDisconnecting: BooleanSynchronous snapshots of the socket state: isConnected is true in
ConnectionState.Connected; isConnecting is true while establishing or
re-establishing a connection; isDisconnecting is true while tearing the
connection down.
connect
suspend fun connect()Opens the WebSocket eagerly. Optional — the socket connects on the first
subscribe; call this to pre-warm it or to reconnect after
disconnect(). Suspends until the attempt settles.
realtime.connect()disconnect
suspend fun disconnect()Closes the WebSocket but keeps the client (and its engine) reusable: active
subscriptions are retained and rejoined on the next connect() or subscription.
For a permanent teardown that frees the engine, use close.
close
suspend fun close()Disconnects and releases the underlying HTTP/WebSocket engine. The client cannot
be reused afterwards. Call this when you are done with the client to avoid
leaking the engine’s connection pool and threads. Prefer disconnect() if you
intend to connect() again later.
setAuth
suspend fun setAuth(token: String? = null)Updates the auth token used for channel authorization and rejoins active channels
with it (Phoenix access_token), so RLS-gated postgres_changes and private
channels stay authorized after a session refresh. Pass null to fall back to the
wrapped client’s current session token.
realtime.setAuth(newAccessToken)sendHeartbeat
suspend fun sendHeartbeat()Sends one Phoenix heartbeat immediately, outside the periodic schedule. Rarely
needed — heartbeats are sent automatically per RealtimeConfig.
awaitConnected
suspend fun RealtimeClient.awaitConnected(): ConnectionState.ConnectedSuspends until connectionState reaches ConnectionState.Connected and returns
it — useful right after connect() to gate setup work.
realtime.connect()
realtime.awaitConnected()awaitDisconnected
suspend fun RealtimeClient.awaitDisconnected(): ConnectionState.DisconnectedSuspends until connectionState reaches ConnectionState.Disconnected and
returns it — useful to confirm teardown after disconnect().
ConnectionState
sealed interface ConnectionState {
data object Disconnected : ConnectionState
data object Connecting : ConnectionState
data object Connected : ConnectionState
data object Disconnecting : ConnectionState
data class Reconnecting(val attempt: Int, val nextRetryMs: Long) : ConnectionState
data class Failed(val reason: String, val attempts: Int) : ConnectionState
}The lifecycle state of the Realtime WebSocket, exposed as
connectionState. Reconnecting and Failed carry the
attempt count from the backoff loop.
Disconnected— no socket open and no reconnect in progress (the initial state, and the resting state afterdisconnect()).Connecting— the first connection attempt is in flight.Connected— the socket is open and joins can proceed.Disconnecting— a graceful shutdown is in progress.Reconnecting— the socket dropped and an automatic reconnect is scheduled;attemptis the 1-based retry number andnextRetryMsthe backoff delay before it.Failed— reconnection gave up afterattemptstries (whenmaxReconnectAttemptsis exceeded), withreasondescribing the last failure. A terminal state until a manualconnect().
when (val s = realtime.connectionState.value) {
is ConnectionState.Reconnecting -> showRetry(s.attempt, s.nextRetryMs)
is ConnectionState.Failed -> showError(s.reason)
else -> Unit
}Channels and subscriptions
A channel is built and configured before joining; a subscription is the live handle you get back after joining.
channel
fun RealtimeClient.channel(name: String): RealtimeChannelBuilderBegins configuring a channel topic name (the part after realtime:), returning
a RealtimeChannelBuilder on which to register
postgres_changes/broadcast/presence callbacks before subscribe(). Building
a channel does not join it.
val sub = realtime.channel("room1")
.onBroadcast("cursor") { payload -> /* … */ }
.subscribe()subscribe (extension)
suspend fun RealtimeClient.subscribe(
channelName: String,
configure: RealtimeChannelBuilder.() -> Unit = {},
): RealtimeSubscriptionOne-call channel subscribe: builds the channel, applies configure (where you
register onPostgresChange/onBroadcast/onPresence and channel options), and
joins it. The lambda form of channel(name) + subscribe(); returns as soon as
the join is sent.
val sub = realtime.subscribe("room1") {
onBroadcast("cursor") { payload -> render(payload) }
}Querying active subscriptions
These are synchronous snapshots — no network, no suspension.
getSubscription
fun getSubscription(name: String): RealtimeSubscription?Returns the active subscription for channel name, or null if not subscribed.
getSubscriptionByTopic
fun getSubscriptionByTopic(topic: String): RealtimeSubscription?Returns the active subscription for the fully-qualified Phoenix topic (e.g.
realtime:room1), or null.
getSubscriptions
fun getSubscriptions(): Set<RealtimeSubscription>Returns a snapshot of all currently active subscriptions.
getActiveChannelNames
fun getActiveChannelNames(): Set<String>Returns the names of all currently subscribed channels.
getActiveChannels
fun getActiveChannels(): Set<RealtimeChannel>Returns name+topic details for all active channels. See
RealtimeChannel.
realtime.getActiveChannelNames() // e.g. setOf("room1", "room2")Removing subscriptions
Each remove* call unsubscribes (leaves the Phoenix channel, status
UNSUBSCRIBED) and drops the subscription from the client. All suspend until the
leave is sent; the socket stays open.
removeSubscription
suspend fun removeSubscription(subscription: RealtimeSubscription)
suspend fun removeSubscription(name: String)Removes a single subscription, either by its handle or by channel name.
realtime.removeSubscription("room1")removeSubscriptions
suspend fun removeSubscriptions(subscriptions: List<RealtimeSubscription>)Removes each subscription in the list.
removeSubscriptionByTopic
suspend fun removeSubscriptionByTopic(topic: String)Removes the subscription on the fully-qualified Phoenix topic (e.g.
realtime:room1), if any.
removeSubscriptionsByTopic
suspend fun removeSubscriptionsByTopic(topics: List<String>)Removes the subscriptions on each of topics.
removeAllSubscriptions
suspend fun removeAllSubscriptions()Unsubscribes and removes every active subscription, leaving the socket open.
RealtimeChannelBuilder
class RealtimeChannelBuilderConfigures one Realtime channel — its postgres_changes, broadcast and
presence callbacks plus channel options — before joining it. Obtained from
channel(name); the onXxx/configureXxx methods return this so they chain,
and nothing is sent until subscribe() (or subscribeWithResult()).
onPostgresChange
fun onPostgresChange(
schema: String = "public",
table: String? = null,
filter: String? = null,
event: PostgresChangeEvent = PostgresChangeEvent.ALL,
callback: suspend (JsonObject) -> Unit,
): RealtimeChannelBuilder
fun onPostgresChange(
schema: String = "public",
table: String? = null,
filter: String? = null,
event: PostgresChangeEvent = PostgresChangeEvent.ALL,
callback: suspend (PostgresChangeEvent, JsonObject) -> Unit,
): RealtimeChannelBuilderSubscribes to database postgres_changes and delivers each affected row as a raw
JsonObject (the new row for INSERT/UPDATE, the old row for DELETE).
schema— database schema, defaultpublic.table— table to scope to;nullwatches the whole schema.filter— a singlecolumn=op.valuestring; build it withrealtimeFilterrather than by hand. Realtime allows only one filter per subscription.event— narrow to one change type, defaultALL.callback— receives the row; the second overload also passes the resolvedPostgresChangeEventso one handler can branch on the change type.
For typed rows, see the reified onPostgresChange
extension. Returns the builder for chaining.
realtime.channel("rooms")
.onPostgresChange(table = "messages", event = PostgresChangeEvent.INSERT) { row ->
println("new message: $row")
}
.subscribe()onPostgresChange (typed)
inline fun <reified T> RealtimeChannelBuilder.onPostgresChange(
schema: String = "public",
table: String? = null,
filter: String? = null,
event: PostgresChangeEvent = PostgresChangeEvent.ALL,
crossinline onRow: suspend (T) -> Unit,
): RealtimeChannelBuilderLike the raw onPostgresChange, but decodes each changed row into your model T
before calling onRow. Rows that don’t deserialize into T (e.g. a partial
DELETE payload missing non-replica-identity columns) are skipped rather than
throwing, so one bad event can’t tear down the subscription. Returns the builder.
@Serializable data class Message(val id: Long, val body: String)
realtime.channel("rooms")
.onPostgresChange<Message>(table = "messages") { msg -> store(msg) }
.subscribe()onBroadcast
fun onBroadcast(
event: String,
callback: suspend (JsonObject) -> Unit,
): RealtimeChannelBuilderRegisters callback for broadcast messages whose event name equals event,
delivering the payload as a JsonObject. Send broadcasts with
RealtimeSubscription.broadcast or
RealtimeClient.broadcast. Returns the builder.
onPresence
fun onPresence(
callback: suspend (PresenceState) -> Unit,
): RealtimeChannelBuilderRegisters callback for presence sync, invoked with the full cumulative
PresenceState (every tracked member, keyed by presence key) on
each presence_state/presence_diff. Publish your own state with
track. Returns the builder.
configureBroadcast
fun configureBroadcast(
receiveOwnBroadcasts: Boolean = false,
acknowledgeBroadcasts: Boolean = false,
): RealtimeChannelBuilderTunes broadcast behavior for this channel: receiveOwnBroadcasts echoes the
sender’s own messages back to it (off by default), and acknowledgeBroadcasts
asks the server to ack each broadcast (required for
broadcastWithAck). Returns the builder.
configureBroadcastReplay
fun configureBroadcastReplay(
sinceMs: Long,
limit: Int? = null,
): RealtimeChannelBuilderRequests replay of recent broadcasts on join: messages from the last sinceMs
milliseconds, capped at limit if given. Lets a late joiner catch up on missed
broadcasts the server still retains (replayed events arrive with replayed = true).
Returns the builder.
configurePresence
fun configurePresence(key: String = ""): RealtimeChannelBuilderSets the presence key identifying this client among channel members (defaults
to a server-assigned key). Use a stable per-user key to dedupe a user across
reconnects or devices. Returns the builder.
configurePrivate
fun configurePrivate(enabled: Boolean = true): RealtimeChannelBuilderMarks the channel as private, so the server applies Realtime authorization (RLS)
using the client’s session JWT. Required for RLS-protected postgres_changes and
private broadcast/presence channels. Returns the builder.
subscribe
suspend fun subscribe(): RealtimeSubscriptionJoins the Phoenix channel with the configured callbacks and returns its
RealtimeSubscription, connecting the socket first if
needed. Returns as soon as the join is sent — the subscription may still be
SUBSCRIBING; use subscribeWithResult() or awaitSubscribed
to wait for the server’s reply.
subscribeWithResult
suspend fun subscribeWithResult(): SupabaseResult<RealtimeSubscription>Like subscribe(), but suspends until the channel join resolves and reports the
outcome as a SupabaseResult instead of handing back a subscription that may
still be joining. Returns SupabaseResult.Failure if the join is rejected by the
server or times out (after connectionTimeoutMs).
when (val result = realtime.channel("room1").subscribeWithResult()) {
is SupabaseResult.Success -> use(result.data)
is SupabaseResult.Failure -> handle(result.error)
}RealtimeSubscription
interface RealtimeSubscriptionA live subscription to one joined channel: the handle returned by subscribe()
for observing events and sending on the channel. It stays valid across reconnects
— the client rejoins automatically.
channelName
val channelName: StringThe channel name (the part after realtime:).
status
val status: StateFlow<Status>The channel’s join lifecycle as a StateFlow. Await SUBSCRIBED or use
awaitSubscribed.
asFlow
fun asFlow(): Flow<RealtimeEvent>A hot Flow of every decoded inbound RealtimeEvent for
this channel — postgres changes, broadcasts and presence events interleaved. This
is a shared, best-effort multicast, not a cold flow: collecting it does not
start or stop the subscription, and events delivered before a collector attaches
are not replayed (replay = 0). Under backpressure a slow collector loses oldest
events (signaled by RealtimeDebugEvent.InboundEventDropped)
rather than stalling the socket. For one event kind, prefer the typed flows
below.
sub.asFlow().collect { event ->
when (event) {
is RealtimeEvent.Broadcast -> render(event.payload)
else -> Unit
}
}presenceState
fun presenceState(): PresenceStateThe current cumulative presence state — every member currently tracked on this
channel, keyed by presence key — as of the last presence_state/presence_diff.
Returns an empty map before the first sync.
send
suspend fun send(type: SendType, event: String, payload: JsonObject? = null)Low-level send of a Phoenix message of type with the given event name and
optional payload on this channel. Prefer broadcast/track for the common
cases; reach for this only for events they don’t cover.
broadcast
suspend fun broadcast(event: String, payload: JsonObject)Broadcasts payload under the event name to other subscribers of this channel
(and to this client too if receiveOwnBroadcasts was set). Fire-and-forget;
received by onBroadcast handlers / broadcastFlow.
sub.broadcast("cursor", buildJsonObject { put("x", 12); put("y", 40) })broadcastBinary
suspend fun broadcastBinary(event: String, payload: ByteArray)Broadcasts a raw binary payload under the event name, sent over the WebSocket
as a binary frame instead of JSON (enabled by the Phoenix 2.0.0 array frames).
Received as RealtimeEvent.BinaryBroadcast /
binaryBroadcastFlow. Prefer this over base64-in-broadcast
for compact, high-frequency or non-textual data (sensor streams, image frames,
encrypted bytes). Fire-and-forget: if the socket is momentarily disconnected the
frame is dropped rather than buffered. Requires the channel to be subscribed to
reach other clients.
sub.broadcastBinary("frame", imageBytes)broadcastWithAck
suspend fun broadcastWithAck(
event: String,
payload: JsonObject,
timeoutMillis: Long = 5_000,
): SupabaseResult<Unit>Broadcasts payload like broadcast, but awaits the server’s acknowledgement
instead of returning fire-and-forget. Requires the channel to have been configured
with acknowledgeBroadcasts (see configureBroadcast).
Returns SupabaseResult.Success once the server acknowledges, or
SupabaseResult.Failure if the channel is not subscribed, the server rejects the
push, the connection drops before the ack arrives, or no ack is received within
timeoutMillis. Never throws for these expected failures; genuine caller
cancellation still propagates.
track
suspend fun track(state: JsonObject)Publishes this client’s presence state on the channel, making it visible to
other members’ presence handlers under this subscription’s presence key. Call
again to replace the tracked state.
sub.track(buildJsonObject { put("user", "ada"); put("online_at", now) })untrack
suspend fun untrack()Removes this client’s presence from the channel (the inverse of track),
emitting a leave to other members.
unsubscribe
suspend fun unsubscribe()Leaves the channel (UNSUBSCRIBED) and stops delivery; ends asFlow collectors.
Equivalent to RealtimeClient.removeSubscription for this
one.
RealtimeSubscription.Status
enum class Status { SUBSCRIBING, SUBSCRIBED, UNSUBSCRIBING, UNSUBSCRIBED, ERROR }The lifecycle of the channel join, observed via status.
RealtimeSubscription.SendType
enum class SendType(val wireValue: String) {
BROADCAST("broadcast"),
PRESENCE("presence"),
POSTGRES_CHANGES("postgres_changes"),
}The kind of message send dispatches, with its Phoenix wireValue.
awaitSubscribed
suspend fun RealtimeSubscription.awaitSubscribed(
timeoutMs: Long = 10_000,
): RealtimeSubscription.StatusSuspends until this subscription reaches SUBSCRIBED (or ERROR) and returns
that status, throwing TimeoutCancellationException after timeoutMs. Turns the
fire-and-forget subscribe() into an await.
val sub = realtime.channel("room1").subscribe()
sub.awaitSubscribed()awaitUnsubscribed
suspend fun RealtimeSubscription.awaitUnsubscribed(
timeoutMs: Long = 10_000,
): RealtimeSubscription.StatusSuspends until this subscription reaches UNSUBSCRIBED (or ERROR) and returns
that status, throwing TimeoutCancellationException after timeoutMs. Use to
confirm a leave completed after unsubscribe().
Postgres changes
Beyond the builder callbacks above, these one-call helpers and filter DSL cover the common “watch this table” cases.
subscribeToPostgresChanges
suspend fun RealtimeClient.subscribeToPostgresChanges(
channelName: String,
schema: String = "public",
table: String? = null,
filter: String? = null,
event: PostgresChangeEvent = PostgresChangeEvent.ALL,
callback: suspend (JsonObject) -> Unit,
): RealtimeSubscription
suspend fun RealtimeClient.subscribeToPostgresChanges(
channelName: String,
schema: String = "public",
table: String? = null,
filter: String? = null,
event: PostgresChangeEvent = PostgresChangeEvent.ALL,
callback: suspend (PostgresChangeEvent, JsonObject) -> Unit,
): RealtimeSubscription
suspend inline fun <reified T> RealtimeClient.subscribeToPostgresChanges(
channelName: String,
schema: String = "public",
table: String? = null,
filter: String? = null,
event: PostgresChangeEvent = PostgresChangeEvent.ALL,
crossinline onRow: suspend (T) -> Unit,
): RealtimeSubscriptionSubscribes to channelName and registers a single postgres_changes callback in one
call. The three overloads deliver, respectively: the raw row (JsonObject); the
resolved change type plus the row; and the row decoded into your model T (with
the same skip-on-mismatch semantics as the reified onPostgresChange). See
onPostgresChange for the parameters and realtimeFilter
for building filter.
val sub = realtime.subscribeToPostgresChanges<Message>(
channelName = "rooms",
table = "messages",
filter = realtimeFilter { eq("room_id", roomId) },
) { msg -> store(msg) }postgresInsertsFlow
fun RealtimeSubscription.postgresInsertsFlow(): Flow<JsonObject>Narrows asFlow to postgres_changes INSERTs, yielding each inserted row
(PostgresInsert.record).
postgresUpdatesFlow
fun RealtimeSubscription.postgresUpdatesFlow(): Flow<RealtimeEvent.PostgresUpdate>Narrows asFlow to postgres_changes UPDATEs as
RealtimeEvent.PostgresUpdate, carrying both new and old row
(when replica identity provides the old one).
postgresDeletesFlow
fun RealtimeSubscription.postgresDeletesFlow(): Flow<JsonObject>Narrows asFlow to postgres_changes DELETEs, yielding the deleted row
(PostgresDelete.oldRecord).
sub.postgresInsertsFlow().collect { row -> append(row) }realtimeFilter
inline fun realtimeFilter(block: RealtimeFilter.() -> Unit): String?Builds the single column=op.value filter string for a postgres_changes
subscription. Returns the built string, or null if no operator was set.
val f = realtimeFilter { gte("age", 18) } // "age=gte.18"RealtimeFilter
class RealtimeFilter {
fun eq(column: String, value: String)
fun eq(column: String, value: Number)
fun eq(column: String, value: Boolean)
fun neq(column: String, value: String)
fun neq(column: String, value: Number)
fun neq(column: String, value: Boolean)
fun gt(column: String, value: Number)
fun gte(column: String, value: Number)
fun lt(column: String, value: Number)
fun lte(column: String, value: Number)
fun isIn(column: String, values: List<String>)
fun build(): String?
}Builds the single server-side filter for a postgres_changes subscription
without hand-writing the wire string. Realtime supports exactly one filter per
subscription and a fixed operator set (eq, neq, gt, gte, lt, lte,
in) — setting more than one throws, as does a blank column.
eq/neq— equality and inequality, for strings, numbers or booleans.gt/gte/lt/lte— numeric comparisons.isIn—column IN (…), e.g.isIn("status", listOf("open", "pending")).build()— returns thecolumn=op.valuestring, ornullif no operator was set.
Values are passed through verbatim — the DSL does not quote or escape them. Avoid
values containing characters the column=op.value grammar is sensitive to,
notably a , inside an in (...) list and the =/. separators.
RealtimeFilterDsl
annotation class RealtimeFilterDsl@DslMarker for RealtimeFilter, so its member operators don’t leak into nested
scopes.
PostgresChange
@Serializable
data class PostgresChange(
val schema: String = "public",
val table: String? = null,
val filter: String? = null,
val event: PostgresChangeEvent = PostgresChangeEvent.ALL,
)A postgres_changes subscription descriptor: which schema/table to watch, for
which event type, optionally narrowed by a single filter (column=op.value,
built with realtimeFilter). Serializable.
PostgresChangeEvent
@Serializable
enum class PostgresChangeEvent { INSERT, UPDATE, DELETE, ALL }The database change type a postgres_changes subscription listens for. ALL
(the wire *) matches every type. Serializable, with each entry mapped to its
wire name (INSERT/UPDATE/DELETE/*).
Broadcast
Broadcast sends messages between clients on a channel, bypassing the database.
Sending over a subscription is covered above (broadcast,
broadcastBinary, broadcastWithAck);
the helpers here cover HTTP send, one-call subscribe, and consuming flows.
broadcast (over HTTP)
suspend fun RealtimeClient.broadcast(
channelName: String,
event: String,
payload: JsonObject,
private: Boolean = false,
): SupabaseResult<Unit>Sends a broadcast over HTTP to the realtime broadcast endpoint
(/realtime/v1/api/broadcast) without joining a channel or opening a WebSocket.
Useful for fire-and-forget server-to-client fan-out where the sender doesn’t need
to subscribe.
channelName— the channel name (without therealtime:prefix).event— the broadcast event name.payload— the message body.private— whentrue, targets a private channel; the caller’s session JWT (or apikey) is attached automatically.
Returns SupabaseResult<Unit>.
realtime.broadcast("room1", "notice", buildJsonObject { put("text", "hi") })subscribeToBroadcast
suspend fun RealtimeClient.subscribeToBroadcast(
channelName: String,
event: String,
callback: suspend (JsonObject) -> Unit,
): RealtimeSubscriptionSubscribes to channelName and registers a single broadcast callback for the named
event in one call.
broadcastFlow
fun RealtimeSubscription.broadcastFlow(event: String? = null): Flow<JsonObject>Narrows asFlow to broadcast payloads, optionally only those whose event name
equals event (all broadcasts when null). Yields each message’s JsonObject
payload — the typed view over RealtimeEvent.Broadcast.
sub.broadcastFlow("cursor").collect { payload -> moveCursor(payload) }binaryBroadcastFlow
fun RealtimeSubscription.binaryBroadcastFlow(event: String? = null): Flow<ByteArray>Narrows asFlow to binary broadcasts, optionally only those whose event name
equals event (all when null). Yields each message’s raw ByteArray payload —
the typed view over RealtimeEvent.BinaryBroadcast. Pair with
broadcastBinary for sending.
BroadcastPayload
@Serializable
data class BroadcastPayload(
val event: String,
val payload: JsonObject,
)A broadcast envelope pairing an event name with its payload; the shape of a
broadcast message body on the wire. Serializable.
Presence
Presence tracks which clients are currently on a channel. Publishing is covered
above (track / untrack); the helpers here cover one-call
subscribe and consuming flows.
subscribeToPresence
suspend fun RealtimeClient.subscribeToPresence(
channelName: String,
callback: suspend (PresenceState) -> Unit,
): RealtimeSubscriptionSubscribes to channelName and registers a single presence sync callback, invoked
with the full PresenceState.
presenceSyncFlow
fun RealtimeSubscription.presenceSyncFlow(): Flow<PresenceState>Narrows asFlow to presence syncs, yielding the full cumulative PresenceState
on each presence_state/presence_diff.
presenceJoinFlow
fun RealtimeSubscription.presenceJoinFlow(): Flow<RealtimeEvent.PresenceJoin>Narrows asFlow to presence join events, one per member that joins the channel.
presenceLeaveFlow
fun RealtimeSubscription.presenceLeaveFlow(): Flow<RealtimeEvent.PresenceLeave>Narrows asFlow to presence leave events, one per member that leaves the channel.
presenceDataFlow
inline fun <reified T> RealtimeSubscription.presenceDataFlow(
json: Json = Json { ignoreUnknownKeys = true },
ignoreDecodeErrors: Boolean = true,
): Flow<List<T>>Like presenceSyncFlow, but decodes each member’s presence payload into T,
emitting the list of decoded members on every sync.
json— the serializer used to decode each member payload.ignoreDecodeErrors— whentrue(default), members whose payload doesn’t fitTare skipped so one malformed member can’t break the flow; setfalseto fail fast withIllegalArgumentException.CancellationExceptionis always rethrown so collection stays cancellable.
@Serializable data class Member(val user: String)
sub.presenceDataFlow<Member>().collect { members -> showOnline(members) }PresenceState
typealias PresenceState = Map<String, JsonObject>Cumulative presence membership: each presence key mapped to that member’s last
tracked payload. Delivered by RealtimeEvent.PresenceSync and
RealtimeSubscription.presenceState(); published with track.
Events
RealtimeEvent
sealed interface RealtimeEvent {
data class PostgresInsert(
val record: JsonObject,
val oldRecord: JsonObject? = null,
val commitTimestamp: String? = null,
val schema: String? = null,
val table: String? = null,
) : RealtimeEvent
data class PostgresUpdate(
val record: JsonObject,
val oldRecord: JsonObject? = null,
val commitTimestamp: String? = null,
val schema: String? = null,
val table: String? = null,
) : RealtimeEvent
data class PostgresDelete(
val oldRecord: JsonObject,
val commitTimestamp: String? = null,
val schema: String? = null,
val table: String? = null,
) : RealtimeEvent
data class Broadcast(
val event: String,
val payload: JsonObject,
val replayed: Boolean = false,
) : RealtimeEvent
class BinaryBroadcast(
val event: String,
val payload: ByteArray,
val replayed: Boolean = false,
) : RealtimeEvent
data class PresenceSync(val state: PresenceState) : RealtimeEvent
data class PresenceJoin(val key: String, val newPresence: JsonObject) : RealtimeEvent
data class PresenceLeave(val key: String, val leftPresence: JsonObject) : RealtimeEvent
data class SystemEvent(val status: String, val message: String? = null) : RealtimeEvent
}A decoded inbound event on a Realtime channel, as delivered by asFlow. Spans the
three channel features plus channel SystemEvents. The typed flows above narrow
this union to one case so you can avoid the is-checks.
PostgresInsert— a row was inserted;recordis the new row.commitTimestampis the server commit time (ISO-8601),schema/tableidentify the relation; all three arenullif the server omits them.PostgresUpdate— a row was updated;recordis the new row andoldRecordthe previous one when the table’s replica identity provides it.PostgresDelete— a row was deleted;oldRecordis the removed row (may be partial, depending on replica identity).Broadcast— a broadcast message namedeventcarryingpayload.replayedistruewhen the server is replaying a retained broadcast to a late joiner.BinaryBroadcast— a broadcast namedeventcarrying raw binarypayload, delivered as a binary frame. Implements structuralequals/hashCodeover the byte content.replayedmirrorsBroadcast.replayed.PresenceSync—stateis the full cumulative membership after a sync.PresenceJoin/PresenceLeave— a member identified bykeyjoined (carryingnewPresence) or left (carrying its lastleftPresence).SystemEvent— a channel-level system message (e.g. subscription status changes), with an optional human-readablemessage.
systemEventFlow
fun RealtimeSubscription.systemEventFlow(status: String? = null): Flow<RealtimeEvent.SystemEvent>Narrows asFlow to channel system events, optionally only those with the given
status (all when null).
Models and diagnostics
RealtimeChannel
data class RealtimeChannel(val name: String, val topic: String)Identifies an active channel by its name and fully-qualified Phoenix topic
(e.g. realtime:room1); returned by getActiveChannels.
RealtimeMessage
@Serializable(with = RealtimeMessageSerializer::class)
data class RealtimeMessage(
val topic: String,
val event: String,
val payload: JsonObject,
val joinRef: String? = null,
val ref: String? = null,
)A raw Phoenix protocol frame. On the wire (Realtime Protocol 2.0.0) this is the
array [join_ref, ref, topic, event, payload]. Surfaced via
RealtimeDebugEvent for diagnostics; application code
normally works with decoded RealtimeEvents instead.
topic— the channel topic (e.g.realtime:room1) orphoenixfor socket control frames.event— the Phoenix event name (phx_join,phx_reply,broadcast, …).payload— the message body.joinRef— the ref of the join that scopes this message.ref— the per-message correlation ref used to match replies.
RealtimeClient.debugState
val debugState: StateFlow<RealtimeDebugState>Running counters (messages in/out, heartbeats, last refs) for diagnostics and
tests. See RealtimeDebugState.
RealtimeClient.debugEvents
val debugEvents: Flow<RealtimeDebugEvent>A hot stream of low-level protocol events (RealtimeDebugEvent)
— including dropped-message signals — for diagnostics; not needed for normal use.
RealtimeDebugState
data class RealtimeDebugState(
val outboundMessageCount: Long = 0,
val inboundMessageCount: Long = 0,
val heartbeatSentCount: Long = 0,
val heartbeatReceivedCount: Long = 0,
val lastOutboundRef: String? = null,
val lastInboundRef: String? = null,
)A snapshot of cumulative socket counters, exposed via debugState. Counts every
message and heartbeat sent and received since the client was created, plus the
last Phoenix message ref in each direction.
RealtimeDebugEvent
sealed interface RealtimeDebugEvent {
data class OutboundMessage(val message: RealtimeMessage) : RealtimeDebugEvent
data class OutboundMessageDropped(val message: RealtimeMessage, val capacity: Int) : RealtimeDebugEvent
data class InboundMessage(val message: RealtimeMessage) : RealtimeDebugEvent
data class InboundEventDropped(val topic: String, val event: RealtimeEvent, val capacity: Int) : RealtimeDebugEvent
data class HeartbeatSent(val ref: String) : RealtimeDebugEvent
data class HeartbeatReceived(val ref: String?) : RealtimeDebugEvent
}A low-level protocol event observed on the socket, streamed by debugEvents.
Covers every raw RealtimeMessage in and out, heartbeats, and the buffer-overflow
drop signals — for diagnostics and tests, not normal application logic.
OutboundMessage/InboundMessage— a raw frame sent to / received from the server.OutboundMessageDropped— an outbound application message was dropped from the full offline send buffer (capacitymessages); the oldest buffered message is discarded to make room for the newest.InboundEventDropped— a decoded inbound event was dropped from a subscription’s event flow because a slow collector left the buffer full (capacityevents); Realtime delivery is best-effort under backpressure. The inbound mirror ofOutboundMessageDropped.HeartbeatSent— a heartbeat was sent, tagged with its Phoenixref.HeartbeatReceived— a heartbeat reply was received, echoing the sentref(nullif the server omitted it).