🎉 Native Google & Apple sign-in is here → read the guide

Realtime — API Reference

supabase-realtime opens a single WebSocket connection to a Supabase project’s Realtime service and multiplexes many Phoenix channels over it. Each channel can carry three kinds of traffic: postgres changes (rows inserted, updated or deleted in your database), broadcast (client-to-client messages, including raw binary frames), and presence (who is currently on the channel). The socket speaks the Phoenix protocol at version 2.0.0, in which every frame is an array [join_ref, ref, topic, event, payload] — the array encoding is what lets binary broadcasts travel as real binary frames rather than base64 text.

The connection auto-reconnects with exponential backoff (and jitter) when it drops, replaying its channel joins, and exposes its lifecycle as a StateFlow.

  • Maven artifact: io.github.androidpoet:supabase-realtime
  • Entry point: createRealtimeClient(client)

Methods that touch the network are either Result-first (SupabaseResult<T>) or suspend until the operation settles. Query methods (getSubscription, getActiveChannelNames, …) are synchronous snapshots, and event streams are exposed as Flow/StateFlow. Each symbol below notes which it is.

Creating a client

createRealtimeClient

fun createRealtimeClient(
    supabaseClient: SupabaseClient,
    config: RealtimeConfig = RealtimeConfig(),
    engineFactory: HttpClientEngineFactory<*> = platformEngine(),
): RealtimeClient

The module entry point. Builds a RealtimeClient bound to a SupabaseClient.

  • supabaseClient — the core client; supplies the project URL, API key and the current session token used for channel authorization.
  • config — reconnect and heartbeat tuning; see RealtimeConfig.
  • engineFactory — the Ktor engine used for the WebSocket. Defaults to the platform’s WebSocket-capable engine; pass your own (e.g. a mock engine) in tests.

Returns a RealtimeClient. The socket is not opened here — it connects lazily on the first subscription, or eagerly when you call connect().

val realtime = createRealtimeClient(client)

RealtimeConfig

data class RealtimeConfig(
    val autoReconnect: Boolean = true,
    val initialReconnectDelayMs: Long = 1_000L,
    val maxReconnectDelayMs: Long = 30_000L,
    val backoffMultiplier: Double = 2.0,
    val reconnectJitter: Boolean = true,
    val maxReconnectAttempts: Int = 0,
    val heartbeatIntervalMs: Long = 25_000L,
    val connectionTimeoutMs: Long = 10_000L,
    val logLevel: String? = null,
)

Tuning for reconnect and heartbeat behavior. Defaults are production-sane (exponential backoff with jitter, 25s heartbeat); override only what you need.

  • autoReconnect — reconnect automatically when the socket drops.
  • initialReconnectDelayMs — delay before the first reconnect attempt.
  • maxReconnectDelayMs — upper bound the backoff is clamped to.
  • backoffMultiplier — factor the delay grows by after each failed attempt.
  • reconnectJitter — when true, the computed backoff is randomized between half and the full exponential delay (“equal jitter”), so a fleet of clients that all dropped at once don’t reconnect in lockstep and stampede the server.
  • maxReconnectAttempts — give up after this many tries (0 = retry forever), transitioning to ConnectionState.Failed.
  • heartbeatIntervalMs — how often to send a Phoenix heartbeat to keep the socket alive.
  • connectionTimeoutMs — how long to wait for a connection or channel join to settle before failing it.
  • logLevel — server-side log verbosity, passed as the log_level query param when opening the socket (e.g. info, warn, error); null omits the param and leaves the server default in effect.
val realtime = createRealtimeClient(
    client,
    RealtimeConfig(heartbeatIntervalMs = 15_000L, maxReconnectAttempts = 10),
)

Connection lifecycle

RealtimeClient

interface RealtimeClient

One WebSocket connection to /realtime/v1/ multiplexing many Phoenix channels. Build channels with channel, and the socket connects lazily on the first subscription. Subscriptions are deduplicated by topic — subscribing to the same channel name twice returns the existing one. The members below cover the connection; channel and subscription members are documented in later sections.

connectionState

val connectionState: StateFlow<ConnectionState>

Cold-start-safe StateFlow of the socket lifecycle; emits the current ConnectionState on collection and on every transition. Drive reconnect indicators off this.

realtime.connectionState.collect { state ->
    println("realtime: $state")
}

isConnected / isConnecting / isDisconnecting

val isConnected: Boolean
val isConnecting: Boolean
val isDisconnecting: Boolean

Synchronous snapshots of the socket state: isConnected is true in ConnectionState.Connected; isConnecting is true while establishing or re-establishing a connection; isDisconnecting is true while tearing the connection down.

connect

suspend fun connect()

Opens the WebSocket eagerly. Optional — the socket connects on the first subscribe; call this to pre-warm it or to reconnect after disconnect(). Suspends until the attempt settles.

realtime.connect()

disconnect

suspend fun disconnect()

Closes the WebSocket but keeps the client (and its engine) reusable: active subscriptions are retained and rejoined on the next connect() or subscription. For a permanent teardown that frees the engine, use close.

close

suspend fun close()

Disconnects and releases the underlying HTTP/WebSocket engine. The client cannot be reused afterwards. Call this when you are done with the client to avoid leaking the engine’s connection pool and threads. Prefer disconnect() if you intend to connect() again later.

setAuth

suspend fun setAuth(token: String? = null)

Updates the auth token used for channel authorization and rejoins active channels with it (Phoenix access_token), so RLS-gated postgres_changes and private channels stay authorized after a session refresh. Pass null to fall back to the wrapped client’s current session token.

realtime.setAuth(newAccessToken)

sendHeartbeat

suspend fun sendHeartbeat()

Sends one Phoenix heartbeat immediately, outside the periodic schedule. Rarely needed — heartbeats are sent automatically per RealtimeConfig.

awaitConnected

suspend fun RealtimeClient.awaitConnected(): ConnectionState.Connected

Suspends until connectionState reaches ConnectionState.Connected and returns it — useful right after connect() to gate setup work.

realtime.connect()
realtime.awaitConnected()

awaitDisconnected

suspend fun RealtimeClient.awaitDisconnected(): ConnectionState.Disconnected

Suspends until connectionState reaches ConnectionState.Disconnected and returns it — useful to confirm teardown after disconnect().

ConnectionState

sealed interface ConnectionState {
    data object Disconnected : ConnectionState
    data object Connecting : ConnectionState
    data object Connected : ConnectionState
    data object Disconnecting : ConnectionState
    data class Reconnecting(val attempt: Int, val nextRetryMs: Long) : ConnectionState
    data class Failed(val reason: String, val attempts: Int) : ConnectionState
}

The lifecycle state of the Realtime WebSocket, exposed as connectionState. Reconnecting and Failed carry the attempt count from the backoff loop.

  • Disconnected — no socket open and no reconnect in progress (the initial state, and the resting state after disconnect()).
  • Connecting — the first connection attempt is in flight.
  • Connected — the socket is open and joins can proceed.
  • Disconnecting — a graceful shutdown is in progress.
  • Reconnecting — the socket dropped and an automatic reconnect is scheduled; attempt is the 1-based retry number and nextRetryMs the backoff delay before it.
  • Failed — reconnection gave up after attempts tries (when maxReconnectAttempts is exceeded), with reason describing the last failure. A terminal state until a manual connect().
when (val s = realtime.connectionState.value) {
    is ConnectionState.Reconnecting -> showRetry(s.attempt, s.nextRetryMs)
    is ConnectionState.Failed -> showError(s.reason)
    else -> Unit
}

Channels and subscriptions

A channel is built and configured before joining; a subscription is the live handle you get back after joining.

channel

fun RealtimeClient.channel(name: String): RealtimeChannelBuilder

Begins configuring a channel topic name (the part after realtime:), returning a RealtimeChannelBuilder on which to register postgres_changes/broadcast/presence callbacks before subscribe(). Building a channel does not join it.

val sub = realtime.channel("room1")
    .onBroadcast("cursor") { payload -> /* … */ }
    .subscribe()

subscribe (extension)

suspend fun RealtimeClient.subscribe(
    channelName: String,
    configure: RealtimeChannelBuilder.() -> Unit = {},
): RealtimeSubscription

One-call channel subscribe: builds the channel, applies configure (where you register onPostgresChange/onBroadcast/onPresence and channel options), and joins it. The lambda form of channel(name) + subscribe(); returns as soon as the join is sent.

val sub = realtime.subscribe("room1") {
    onBroadcast("cursor") { payload -> render(payload) }
}

Querying active subscriptions

These are synchronous snapshots — no network, no suspension.

getSubscription

fun getSubscription(name: String): RealtimeSubscription?

Returns the active subscription for channel name, or null if not subscribed.

getSubscriptionByTopic

fun getSubscriptionByTopic(topic: String): RealtimeSubscription?

Returns the active subscription for the fully-qualified Phoenix topic (e.g. realtime:room1), or null.

getSubscriptions

fun getSubscriptions(): Set<RealtimeSubscription>

Returns a snapshot of all currently active subscriptions.

getActiveChannelNames

fun getActiveChannelNames(): Set<String>

Returns the names of all currently subscribed channels.

getActiveChannels

fun getActiveChannels(): Set<RealtimeChannel>

Returns name+topic details for all active channels. See RealtimeChannel.

realtime.getActiveChannelNames() // e.g. setOf("room1", "room2")

Removing subscriptions

Each remove* call unsubscribes (leaves the Phoenix channel, status UNSUBSCRIBED) and drops the subscription from the client. All suspend until the leave is sent; the socket stays open.

removeSubscription

suspend fun removeSubscription(subscription: RealtimeSubscription)
suspend fun removeSubscription(name: String)

Removes a single subscription, either by its handle or by channel name.

realtime.removeSubscription("room1")

removeSubscriptions

suspend fun removeSubscriptions(subscriptions: List<RealtimeSubscription>)

Removes each subscription in the list.

removeSubscriptionByTopic

suspend fun removeSubscriptionByTopic(topic: String)

Removes the subscription on the fully-qualified Phoenix topic (e.g. realtime:room1), if any.

removeSubscriptionsByTopic

suspend fun removeSubscriptionsByTopic(topics: List<String>)

Removes the subscriptions on each of topics.

removeAllSubscriptions

suspend fun removeAllSubscriptions()

Unsubscribes and removes every active subscription, leaving the socket open.

RealtimeChannelBuilder

class RealtimeChannelBuilder

Configures one Realtime channel — its postgres_changes, broadcast and presence callbacks plus channel options — before joining it. Obtained from channel(name); the onXxx/configureXxx methods return this so they chain, and nothing is sent until subscribe() (or subscribeWithResult()).

onPostgresChange

fun onPostgresChange(
    schema: String = "public",
    table: String? = null,
    filter: String? = null,
    event: PostgresChangeEvent = PostgresChangeEvent.ALL,
    callback: suspend (JsonObject) -> Unit,
): RealtimeChannelBuilder
 
fun onPostgresChange(
    schema: String = "public",
    table: String? = null,
    filter: String? = null,
    event: PostgresChangeEvent = PostgresChangeEvent.ALL,
    callback: suspend (PostgresChangeEvent, JsonObject) -> Unit,
): RealtimeChannelBuilder

Subscribes to database postgres_changes and delivers each affected row as a raw JsonObject (the new row for INSERT/UPDATE, the old row for DELETE).

  • schema — database schema, default public.
  • table — table to scope to; null watches the whole schema.
  • filter — a single column=op.value string; build it with realtimeFilter rather than by hand. Realtime allows only one filter per subscription.
  • event — narrow to one change type, default ALL.
  • callback — receives the row; the second overload also passes the resolved PostgresChangeEvent so one handler can branch on the change type.

For typed rows, see the reified onPostgresChange extension. Returns the builder for chaining.

realtime.channel("rooms")
    .onPostgresChange(table = "messages", event = PostgresChangeEvent.INSERT) { row ->
        println("new message: $row")
    }
    .subscribe()

onPostgresChange (typed)

inline fun <reified T> RealtimeChannelBuilder.onPostgresChange(
    schema: String = "public",
    table: String? = null,
    filter: String? = null,
    event: PostgresChangeEvent = PostgresChangeEvent.ALL,
    crossinline onRow: suspend (T) -> Unit,
): RealtimeChannelBuilder

Like the raw onPostgresChange, but decodes each changed row into your model T before calling onRow. Rows that don’t deserialize into T (e.g. a partial DELETE payload missing non-replica-identity columns) are skipped rather than throwing, so one bad event can’t tear down the subscription. Returns the builder.

@Serializable data class Message(val id: Long, val body: String)
 
realtime.channel("rooms")
    .onPostgresChange<Message>(table = "messages") { msg -> store(msg) }
    .subscribe()

onBroadcast

fun onBroadcast(
    event: String,
    callback: suspend (JsonObject) -> Unit,
): RealtimeChannelBuilder

Registers callback for broadcast messages whose event name equals event, delivering the payload as a JsonObject. Send broadcasts with RealtimeSubscription.broadcast or RealtimeClient.broadcast. Returns the builder.

onPresence

fun onPresence(
    callback: suspend (PresenceState) -> Unit,
): RealtimeChannelBuilder

Registers callback for presence sync, invoked with the full cumulative PresenceState (every tracked member, keyed by presence key) on each presence_state/presence_diff. Publish your own state with track. Returns the builder.

configureBroadcast

fun configureBroadcast(
    receiveOwnBroadcasts: Boolean = false,
    acknowledgeBroadcasts: Boolean = false,
): RealtimeChannelBuilder

Tunes broadcast behavior for this channel: receiveOwnBroadcasts echoes the sender’s own messages back to it (off by default), and acknowledgeBroadcasts asks the server to ack each broadcast (required for broadcastWithAck). Returns the builder.

configureBroadcastReplay

fun configureBroadcastReplay(
    sinceMs: Long,
    limit: Int? = null,
): RealtimeChannelBuilder

Requests replay of recent broadcasts on join: messages from the last sinceMs milliseconds, capped at limit if given. Lets a late joiner catch up on missed broadcasts the server still retains (replayed events arrive with replayed = true). Returns the builder.

configurePresence

fun configurePresence(key: String = ""): RealtimeChannelBuilder

Sets the presence key identifying this client among channel members (defaults to a server-assigned key). Use a stable per-user key to dedupe a user across reconnects or devices. Returns the builder.

configurePrivate

fun configurePrivate(enabled: Boolean = true): RealtimeChannelBuilder

Marks the channel as private, so the server applies Realtime authorization (RLS) using the client’s session JWT. Required for RLS-protected postgres_changes and private broadcast/presence channels. Returns the builder.

subscribe

suspend fun subscribe(): RealtimeSubscription

Joins the Phoenix channel with the configured callbacks and returns its RealtimeSubscription, connecting the socket first if needed. Returns as soon as the join is sent — the subscription may still be SUBSCRIBING; use subscribeWithResult() or awaitSubscribed to wait for the server’s reply.

subscribeWithResult

suspend fun subscribeWithResult(): SupabaseResult<RealtimeSubscription>

Like subscribe(), but suspends until the channel join resolves and reports the outcome as a SupabaseResult instead of handing back a subscription that may still be joining. Returns SupabaseResult.Failure if the join is rejected by the server or times out (after connectionTimeoutMs).

when (val result = realtime.channel("room1").subscribeWithResult()) {
    is SupabaseResult.Success -> use(result.data)
    is SupabaseResult.Failure -> handle(result.error)
}

RealtimeSubscription

interface RealtimeSubscription

A live subscription to one joined channel: the handle returned by subscribe() for observing events and sending on the channel. It stays valid across reconnects — the client rejoins automatically.

channelName

val channelName: String

The channel name (the part after realtime:).

status

val status: StateFlow<Status>

The channel’s join lifecycle as a StateFlow. Await SUBSCRIBED or use awaitSubscribed.

asFlow

fun asFlow(): Flow<RealtimeEvent>

A hot Flow of every decoded inbound RealtimeEvent for this channel — postgres changes, broadcasts and presence events interleaved. This is a shared, best-effort multicast, not a cold flow: collecting it does not start or stop the subscription, and events delivered before a collector attaches are not replayed (replay = 0). Under backpressure a slow collector loses oldest events (signaled by RealtimeDebugEvent.InboundEventDropped) rather than stalling the socket. For one event kind, prefer the typed flows below.

sub.asFlow().collect { event ->
    when (event) {
        is RealtimeEvent.Broadcast -> render(event.payload)
        else -> Unit
    }
}

presenceState

fun presenceState(): PresenceState

The current cumulative presence state — every member currently tracked on this channel, keyed by presence key — as of the last presence_state/presence_diff. Returns an empty map before the first sync.

send

suspend fun send(type: SendType, event: String, payload: JsonObject? = null)

Low-level send of a Phoenix message of type with the given event name and optional payload on this channel. Prefer broadcast/track for the common cases; reach for this only for events they don’t cover.

broadcast

suspend fun broadcast(event: String, payload: JsonObject)

Broadcasts payload under the event name to other subscribers of this channel (and to this client too if receiveOwnBroadcasts was set). Fire-and-forget; received by onBroadcast handlers / broadcastFlow.

sub.broadcast("cursor", buildJsonObject { put("x", 12); put("y", 40) })

broadcastBinary

suspend fun broadcastBinary(event: String, payload: ByteArray)

Broadcasts a raw binary payload under the event name, sent over the WebSocket as a binary frame instead of JSON (enabled by the Phoenix 2.0.0 array frames). Received as RealtimeEvent.BinaryBroadcast / binaryBroadcastFlow. Prefer this over base64-in-broadcast for compact, high-frequency or non-textual data (sensor streams, image frames, encrypted bytes). Fire-and-forget: if the socket is momentarily disconnected the frame is dropped rather than buffered. Requires the channel to be subscribed to reach other clients.

sub.broadcastBinary("frame", imageBytes)

broadcastWithAck

suspend fun broadcastWithAck(
    event: String,
    payload: JsonObject,
    timeoutMillis: Long = 5_000,
): SupabaseResult<Unit>

Broadcasts payload like broadcast, but awaits the server’s acknowledgement instead of returning fire-and-forget. Requires the channel to have been configured with acknowledgeBroadcasts (see configureBroadcast). Returns SupabaseResult.Success once the server acknowledges, or SupabaseResult.Failure if the channel is not subscribed, the server rejects the push, the connection drops before the ack arrives, or no ack is received within timeoutMillis. Never throws for these expected failures; genuine caller cancellation still propagates.

track

suspend fun track(state: JsonObject)

Publishes this client’s presence state on the channel, making it visible to other members’ presence handlers under this subscription’s presence key. Call again to replace the tracked state.

sub.track(buildJsonObject { put("user", "ada"); put("online_at", now) })

untrack

suspend fun untrack()

Removes this client’s presence from the channel (the inverse of track), emitting a leave to other members.

unsubscribe

suspend fun unsubscribe()

Leaves the channel (UNSUBSCRIBED) and stops delivery; ends asFlow collectors. Equivalent to RealtimeClient.removeSubscription for this one.

RealtimeSubscription.Status

enum class Status { SUBSCRIBING, SUBSCRIBED, UNSUBSCRIBING, UNSUBSCRIBED, ERROR }

The lifecycle of the channel join, observed via status.

RealtimeSubscription.SendType

enum class SendType(val wireValue: String) {
    BROADCAST("broadcast"),
    PRESENCE("presence"),
    POSTGRES_CHANGES("postgres_changes"),
}

The kind of message send dispatches, with its Phoenix wireValue.

awaitSubscribed

suspend fun RealtimeSubscription.awaitSubscribed(
    timeoutMs: Long = 10_000,
): RealtimeSubscription.Status

Suspends until this subscription reaches SUBSCRIBED (or ERROR) and returns that status, throwing TimeoutCancellationException after timeoutMs. Turns the fire-and-forget subscribe() into an await.

val sub = realtime.channel("room1").subscribe()
sub.awaitSubscribed()

awaitUnsubscribed

suspend fun RealtimeSubscription.awaitUnsubscribed(
    timeoutMs: Long = 10_000,
): RealtimeSubscription.Status

Suspends until this subscription reaches UNSUBSCRIBED (or ERROR) and returns that status, throwing TimeoutCancellationException after timeoutMs. Use to confirm a leave completed after unsubscribe().

Postgres changes

Beyond the builder callbacks above, these one-call helpers and filter DSL cover the common “watch this table” cases.

subscribeToPostgresChanges

suspend fun RealtimeClient.subscribeToPostgresChanges(
    channelName: String,
    schema: String = "public",
    table: String? = null,
    filter: String? = null,
    event: PostgresChangeEvent = PostgresChangeEvent.ALL,
    callback: suspend (JsonObject) -> Unit,
): RealtimeSubscription
 
suspend fun RealtimeClient.subscribeToPostgresChanges(
    channelName: String,
    schema: String = "public",
    table: String? = null,
    filter: String? = null,
    event: PostgresChangeEvent = PostgresChangeEvent.ALL,
    callback: suspend (PostgresChangeEvent, JsonObject) -> Unit,
): RealtimeSubscription
 
suspend inline fun <reified T> RealtimeClient.subscribeToPostgresChanges(
    channelName: String,
    schema: String = "public",
    table: String? = null,
    filter: String? = null,
    event: PostgresChangeEvent = PostgresChangeEvent.ALL,
    crossinline onRow: suspend (T) -> Unit,
): RealtimeSubscription

Subscribes to channelName and registers a single postgres_changes callback in one call. The three overloads deliver, respectively: the raw row (JsonObject); the resolved change type plus the row; and the row decoded into your model T (with the same skip-on-mismatch semantics as the reified onPostgresChange). See onPostgresChange for the parameters and realtimeFilter for building filter.

val sub = realtime.subscribeToPostgresChanges<Message>(
    channelName = "rooms",
    table = "messages",
    filter = realtimeFilter { eq("room_id", roomId) },
) { msg -> store(msg) }

postgresInsertsFlow

fun RealtimeSubscription.postgresInsertsFlow(): Flow<JsonObject>

Narrows asFlow to postgres_changes INSERTs, yielding each inserted row (PostgresInsert.record).

postgresUpdatesFlow

fun RealtimeSubscription.postgresUpdatesFlow(): Flow<RealtimeEvent.PostgresUpdate>

Narrows asFlow to postgres_changes UPDATEs as RealtimeEvent.PostgresUpdate, carrying both new and old row (when replica identity provides the old one).

postgresDeletesFlow

fun RealtimeSubscription.postgresDeletesFlow(): Flow<JsonObject>

Narrows asFlow to postgres_changes DELETEs, yielding the deleted row (PostgresDelete.oldRecord).

sub.postgresInsertsFlow().collect { row -> append(row) }

realtimeFilter

inline fun realtimeFilter(block: RealtimeFilter.() -> Unit): String?

Builds the single column=op.value filter string for a postgres_changes subscription. Returns the built string, or null if no operator was set.

val f = realtimeFilter { gte("age", 18) } // "age=gte.18"

RealtimeFilter

class RealtimeFilter {
    fun eq(column: String, value: String)
    fun eq(column: String, value: Number)
    fun eq(column: String, value: Boolean)
    fun neq(column: String, value: String)
    fun neq(column: String, value: Number)
    fun neq(column: String, value: Boolean)
    fun gt(column: String, value: Number)
    fun gte(column: String, value: Number)
    fun lt(column: String, value: Number)
    fun lte(column: String, value: Number)
    fun isIn(column: String, values: List<String>)
    fun build(): String?
}

Builds the single server-side filter for a postgres_changes subscription without hand-writing the wire string. Realtime supports exactly one filter per subscription and a fixed operator set (eq, neq, gt, gte, lt, lte, in) — setting more than one throws, as does a blank column.

  • eq / neq — equality and inequality, for strings, numbers or booleans.
  • gt / gte / lt / lte — numeric comparisons.
  • isIncolumn IN (…), e.g. isIn("status", listOf("open", "pending")).
  • build() — returns the column=op.value string, or null if no operator was set.
⚠️

Values are passed through verbatim — the DSL does not quote or escape them. Avoid values containing characters the column=op.value grammar is sensitive to, notably a , inside an in (...) list and the =/. separators.

RealtimeFilterDsl

annotation class RealtimeFilterDsl

@DslMarker for RealtimeFilter, so its member operators don’t leak into nested scopes.

PostgresChange

@Serializable
data class PostgresChange(
    val schema: String = "public",
    val table: String? = null,
    val filter: String? = null,
    val event: PostgresChangeEvent = PostgresChangeEvent.ALL,
)

A postgres_changes subscription descriptor: which schema/table to watch, for which event type, optionally narrowed by a single filter (column=op.value, built with realtimeFilter). Serializable.

PostgresChangeEvent

@Serializable
enum class PostgresChangeEvent { INSERT, UPDATE, DELETE, ALL }

The database change type a postgres_changes subscription listens for. ALL (the wire *) matches every type. Serializable, with each entry mapped to its wire name (INSERT/UPDATE/DELETE/*).

Broadcast

Broadcast sends messages between clients on a channel, bypassing the database. Sending over a subscription is covered above (broadcast, broadcastBinary, broadcastWithAck); the helpers here cover HTTP send, one-call subscribe, and consuming flows.

broadcast (over HTTP)

suspend fun RealtimeClient.broadcast(
    channelName: String,
    event: String,
    payload: JsonObject,
    private: Boolean = false,
): SupabaseResult<Unit>

Sends a broadcast over HTTP to the realtime broadcast endpoint (/realtime/v1/api/broadcast) without joining a channel or opening a WebSocket. Useful for fire-and-forget server-to-client fan-out where the sender doesn’t need to subscribe.

  • channelName — the channel name (without the realtime: prefix).
  • event — the broadcast event name.
  • payload — the message body.
  • private — when true, targets a private channel; the caller’s session JWT (or apikey) is attached automatically.

Returns SupabaseResult<Unit>.

realtime.broadcast("room1", "notice", buildJsonObject { put("text", "hi") })

subscribeToBroadcast

suspend fun RealtimeClient.subscribeToBroadcast(
    channelName: String,
    event: String,
    callback: suspend (JsonObject) -> Unit,
): RealtimeSubscription

Subscribes to channelName and registers a single broadcast callback for the named event in one call.

broadcastFlow

fun RealtimeSubscription.broadcastFlow(event: String? = null): Flow<JsonObject>

Narrows asFlow to broadcast payloads, optionally only those whose event name equals event (all broadcasts when null). Yields each message’s JsonObject payload — the typed view over RealtimeEvent.Broadcast.

sub.broadcastFlow("cursor").collect { payload -> moveCursor(payload) }

binaryBroadcastFlow

fun RealtimeSubscription.binaryBroadcastFlow(event: String? = null): Flow<ByteArray>

Narrows asFlow to binary broadcasts, optionally only those whose event name equals event (all when null). Yields each message’s raw ByteArray payload — the typed view over RealtimeEvent.BinaryBroadcast. Pair with broadcastBinary for sending.

BroadcastPayload

@Serializable
data class BroadcastPayload(
    val event: String,
    val payload: JsonObject,
)

A broadcast envelope pairing an event name with its payload; the shape of a broadcast message body on the wire. Serializable.

Presence

Presence tracks which clients are currently on a channel. Publishing is covered above (track / untrack); the helpers here cover one-call subscribe and consuming flows.

subscribeToPresence

suspend fun RealtimeClient.subscribeToPresence(
    channelName: String,
    callback: suspend (PresenceState) -> Unit,
): RealtimeSubscription

Subscribes to channelName and registers a single presence sync callback, invoked with the full PresenceState.

presenceSyncFlow

fun RealtimeSubscription.presenceSyncFlow(): Flow<PresenceState>

Narrows asFlow to presence syncs, yielding the full cumulative PresenceState on each presence_state/presence_diff.

presenceJoinFlow

fun RealtimeSubscription.presenceJoinFlow(): Flow<RealtimeEvent.PresenceJoin>

Narrows asFlow to presence join events, one per member that joins the channel.

presenceLeaveFlow

fun RealtimeSubscription.presenceLeaveFlow(): Flow<RealtimeEvent.PresenceLeave>

Narrows asFlow to presence leave events, one per member that leaves the channel.

presenceDataFlow

inline fun <reified T> RealtimeSubscription.presenceDataFlow(
    json: Json = Json { ignoreUnknownKeys = true },
    ignoreDecodeErrors: Boolean = true,
): Flow<List<T>>

Like presenceSyncFlow, but decodes each member’s presence payload into T, emitting the list of decoded members on every sync.

  • json — the serializer used to decode each member payload.
  • ignoreDecodeErrors — when true (default), members whose payload doesn’t fit T are skipped so one malformed member can’t break the flow; set false to fail fast with IllegalArgumentException. CancellationException is always rethrown so collection stays cancellable.
@Serializable data class Member(val user: String)
 
sub.presenceDataFlow<Member>().collect { members -> showOnline(members) }

PresenceState

typealias PresenceState = Map<String, JsonObject>

Cumulative presence membership: each presence key mapped to that member’s last tracked payload. Delivered by RealtimeEvent.PresenceSync and RealtimeSubscription.presenceState(); published with track.

Events

RealtimeEvent

sealed interface RealtimeEvent {
    data class PostgresInsert(
        val record: JsonObject,
        val oldRecord: JsonObject? = null,
        val commitTimestamp: String? = null,
        val schema: String? = null,
        val table: String? = null,
    ) : RealtimeEvent
 
    data class PostgresUpdate(
        val record: JsonObject,
        val oldRecord: JsonObject? = null,
        val commitTimestamp: String? = null,
        val schema: String? = null,
        val table: String? = null,
    ) : RealtimeEvent
 
    data class PostgresDelete(
        val oldRecord: JsonObject,
        val commitTimestamp: String? = null,
        val schema: String? = null,
        val table: String? = null,
    ) : RealtimeEvent
 
    data class Broadcast(
        val event: String,
        val payload: JsonObject,
        val replayed: Boolean = false,
    ) : RealtimeEvent
 
    class BinaryBroadcast(
        val event: String,
        val payload: ByteArray,
        val replayed: Boolean = false,
    ) : RealtimeEvent
 
    data class PresenceSync(val state: PresenceState) : RealtimeEvent
    data class PresenceJoin(val key: String, val newPresence: JsonObject) : RealtimeEvent
    data class PresenceLeave(val key: String, val leftPresence: JsonObject) : RealtimeEvent
    data class SystemEvent(val status: String, val message: String? = null) : RealtimeEvent
}

A decoded inbound event on a Realtime channel, as delivered by asFlow. Spans the three channel features plus channel SystemEvents. The typed flows above narrow this union to one case so you can avoid the is-checks.

  • PostgresInsert — a row was inserted; record is the new row. commitTimestamp is the server commit time (ISO-8601), schema/table identify the relation; all three are null if the server omits them.
  • PostgresUpdate — a row was updated; record is the new row and oldRecord the previous one when the table’s replica identity provides it.
  • PostgresDelete — a row was deleted; oldRecord is the removed row (may be partial, depending on replica identity).
  • Broadcast — a broadcast message named event carrying payload. replayed is true when the server is replaying a retained broadcast to a late joiner.
  • BinaryBroadcast — a broadcast named event carrying raw binary payload, delivered as a binary frame. Implements structural equals/hashCode over the byte content. replayed mirrors Broadcast.replayed.
  • PresenceSyncstate is the full cumulative membership after a sync.
  • PresenceJoin / PresenceLeave — a member identified by key joined (carrying newPresence) or left (carrying its last leftPresence).
  • SystemEvent — a channel-level system message (e.g. subscription status changes), with an optional human-readable message.

systemEventFlow

fun RealtimeSubscription.systemEventFlow(status: String? = null): Flow<RealtimeEvent.SystemEvent>

Narrows asFlow to channel system events, optionally only those with the given status (all when null).

Models and diagnostics

RealtimeChannel

data class RealtimeChannel(val name: String, val topic: String)

Identifies an active channel by its name and fully-qualified Phoenix topic (e.g. realtime:room1); returned by getActiveChannels.

RealtimeMessage

@Serializable(with = RealtimeMessageSerializer::class)
data class RealtimeMessage(
    val topic: String,
    val event: String,
    val payload: JsonObject,
    val joinRef: String? = null,
    val ref: String? = null,
)

A raw Phoenix protocol frame. On the wire (Realtime Protocol 2.0.0) this is the array [join_ref, ref, topic, event, payload]. Surfaced via RealtimeDebugEvent for diagnostics; application code normally works with decoded RealtimeEvents instead.

  • topic — the channel topic (e.g. realtime:room1) or phoenix for socket control frames.
  • event — the Phoenix event name (phx_join, phx_reply, broadcast, …).
  • payload — the message body.
  • joinRef — the ref of the join that scopes this message.
  • ref — the per-message correlation ref used to match replies.

RealtimeClient.debugState

val debugState: StateFlow<RealtimeDebugState>

Running counters (messages in/out, heartbeats, last refs) for diagnostics and tests. See RealtimeDebugState.

RealtimeClient.debugEvents

val debugEvents: Flow<RealtimeDebugEvent>

A hot stream of low-level protocol events (RealtimeDebugEvent) — including dropped-message signals — for diagnostics; not needed for normal use.

RealtimeDebugState

data class RealtimeDebugState(
    val outboundMessageCount: Long = 0,
    val inboundMessageCount: Long = 0,
    val heartbeatSentCount: Long = 0,
    val heartbeatReceivedCount: Long = 0,
    val lastOutboundRef: String? = null,
    val lastInboundRef: String? = null,
)

A snapshot of cumulative socket counters, exposed via debugState. Counts every message and heartbeat sent and received since the client was created, plus the last Phoenix message ref in each direction.

RealtimeDebugEvent

sealed interface RealtimeDebugEvent {
    data class OutboundMessage(val message: RealtimeMessage) : RealtimeDebugEvent
    data class OutboundMessageDropped(val message: RealtimeMessage, val capacity: Int) : RealtimeDebugEvent
    data class InboundMessage(val message: RealtimeMessage) : RealtimeDebugEvent
    data class InboundEventDropped(val topic: String, val event: RealtimeEvent, val capacity: Int) : RealtimeDebugEvent
    data class HeartbeatSent(val ref: String) : RealtimeDebugEvent
    data class HeartbeatReceived(val ref: String?) : RealtimeDebugEvent
}

A low-level protocol event observed on the socket, streamed by debugEvents. Covers every raw RealtimeMessage in and out, heartbeats, and the buffer-overflow drop signals — for diagnostics and tests, not normal application logic.

  • OutboundMessage / InboundMessage — a raw frame sent to / received from the server.
  • OutboundMessageDropped — an outbound application message was dropped from the full offline send buffer (capacity messages); the oldest buffered message is discarded to make room for the newest.
  • InboundEventDropped — a decoded inbound event was dropped from a subscription’s event flow because a slow collector left the buffer full (capacity events); Realtime delivery is best-effort under backpressure. The inbound mirror of OutboundMessageDropped.
  • HeartbeatSent — a heartbeat was sent, tagged with its Phoenix ref.
  • HeartbeatReceived — a heartbeat reply was received, echoing the sent ref (null if the server omitted it).