Supabase Realtime Event System: Typesafe Event Handling with Supabase

A Practical Approach to Typesafe Client Messaging with Supabase Realtime

tech
supabase

Introduction to Supabase Realtime:

Real-time communication has become a cornerstone for creating dynamic and interactive user experiences. Supabase Realtime, a powerful feature of Supabase that enables developers to seamlessly integrate real-time functionality into their applications without the complexities typically associated with building a custom solution. This guide focuses on the practical application of Supabase Realtime by building a custom typesafe event system on top of it.

Supabase Realtime - A Primer:

Supabase, often hailed as the open-source alternative to Firebase, is a robust platform that simplifies the process of building scalable and performant applications. At the heart of Supabase is its Realtime feature, a game-changer in the realm of database communication. Realtime is designed to deliver instant updates to connected clients whenever changes occur within the database, providing a dynamic and responsive user experience.

How It Works:

Supabase Realtime operates on the principles of Pub/Sub (Publish/Subscribe), a messaging pattern where senders (publishers) push messages to a central hub, and subscribers receive these messages. In the context of Supabase, this translates to the database acting as the hub, broadcasting changes to connected clients.

The process begins when a change occurs in the database, such as an INSERT, UPDATE, or DELETE operation. Supabase, using PostgreSQL’s NOTIFY/LISTEN mechanism, captures these changes and sends notifications to the connected clients. This asynchronous communication ensures that clients are informed of any modifications, allowing them to update their local state in real-time.

In addition to the inherent capability of Supabase Realtime to broadcast database changes to connected clients, there’s also a possibility that can be uncovered by using a table to sync user defined events. This opens up the possibility of sending arbitrary messages to clients through the use of unique identifiers and a syncronization table. By assigning a unique ID to each event, clients can accurately reconstruct and retrieve the associated data, adding a layer of flexibility to the real-time communication process.

Typesafe Messaging System

Firstly, some foundational definitions are needed. A class, to represent a real-time event, a configuration for a real-time table containing the table name, and an optional filter, along with a payload configuration to define the database event payload for real-time table change events. Additionally, a type definitions for a constructor and a subscriber callback function for events.

export interface RtTableDef {
    name: string
    filter? :string
}

export interface DbEventPayload<T extends keyof Database['public']['Tables']> {
    data: Row<T>,
    type: "INSERT" | "UPDATE" | "DELETE"
}

export type Constructor<T> = new (...args: any[]) => T;
export type Callback<T> = (event: T, info?: { senderId: string, isSelf: boolean }) => void

export class RealtimeEvent {}

It’s worth noting that the Callback type also includes additional information about the sender of the event and whether it was self-received. This information is essential as events should be executable locally, not only when received from the server.

The event system created encapsulates the Supabase realtime client and realtime channels, abstracting Supabase to allow extensibility in the system. The channels are categorized into two types: the RtChannel for the event system and the DbChannel for database changes.

To construct the RtChannel some extra types are needed. A PresenceClient and a PresenceCb to represent and manage user presence in a channel and an EventPayload that acts as an internal wrapper, allowing additional metadata to be sent through the event system that the RtChannel relies on behind the scenes.

interface EventPayload<T extends RealtimeEvent> {
    senderId: string,
    data: T
}

export interface PresenceClient<T> {
    id: string
    data: T
}

export type PresenceCb<T> = (client: PresenceClient<T>) => void

The RtChannel - Realtime Events

The RtChannel serves as the central component of the event system, responsible for handling real-time events and reconstructing the correct classes on the client. It also syncs client presence state and notifies all observers once an event is received. The core of the RtChannel can be broken down into a few significant pieces.

The subscribe function is utilized to subscribe to a specific event type:

public subscribe<T>(ctr: Constructor<T>, cb: Callback<T>) {
    if (!this.constructors.has(ctr.name)) {
        this.constructors.set(ctr.name, ctr);
        this.channel.on(
            'broadcast',
            {event: ctr.name},
            this.handleOnEvent.bind(this)
        )
        this.channel.on(
            'broadcast',
            {event: this.encodeEventName(ctr.name, this.client.clientId)},
            this.handleOnEvent.bind(this)
        )
    }
    this.safeAddObserver(ctr.name, cb)
}

This function performs validation and registration of a constructor and creates two Supabase channel subscriptions using the handleOnEvent function as a callback. One subscription is for broadcasting, and the other is for sending events to a specific client. Channel names are created by encoding the event name with optionally the client ID to a unique string. The purpose is to send events to a specific client, addressing a limitation in Supabase Realtime, which lacks a built-in mechanism for direct client targeting. In theory, clients could subscribe to client-specific channels to receive events intended for others..

The handleOnEvent function receives the event payload and executes the steps necessary to reconstruct the event on the client, while also adding additional metadata that was sent along.

private handleOnEvent(received: any) {
    const payload: EventPayload<any> = received.payload;
    const eventName = this.decodeEventName(received.event).name
    const ctr = this.constructors.get(eventName)!
    const observers = this.observers.get(eventName)!

    // @ts-ignore
    let instance = new ctr();
    Object.assign(instance, payload.data)
    for (const observer of observers) {
        observer(instance, {
            isSelf: payload.senderId == this.client.clientId,
            senderId: payload.senderId,
        })
    }
}

It decodes the event name, retrieves the constructor from the map, creates a new instance of the event, assigns the payload data to the instance, and then calls all observers with the event and additional metadata.

The send function is responsible for sending events:

public async send<T extends RealtimeEvent>(event: T, receivers = new Array<string>()) {
    if (receivers.length == 0) {
        await this.broadcast(event.constructor.name, event)
    } else { //ToDo: rate limit if list gets long
        for (const receiver of receivers)
            await this.broadcast(this.encodeEventName(event.constructor.name, receiver), event)
    }
}

public async sendTo<T extends RealtimeEvent>(event: T, to: string) {
    await this.send(event, [to])
}

It sends events to the server, which then broadcasts it to all clients. It takes an event and a list of receivers. If the list is empty, it broadcasts the event to all clients. The sendTo function is a shorthand for sending to a specific client. It has to be kept in mind that sending to a client still broadcasts to all clients subscribed to the channel.

private async broadcast<T extends RealtimeEvent>(to: string, event: T) {
    const payload: EventPayload<T> = {
        data: {
            ...event
        },
        senderId: this.client.clientId
    }
    await this.channel.send({
        type: 'broadcast',
        event: to,
        payload: payload
    });
}

The broadcast function is a wrapper around the Supabase channel.broadcast function, sending all necessary metadata along with the event.

The initialization process takes care of subscribing to presence events and a Supabase channel.

 constructor(private client: RtClient, private channel: RealtimeChannel) {
    this.channel.on('presence', {event: 'sync'}, this.onPresenceSync.bind(this))
    this.channel.on('presence', {event: 'join'}, this.onPresenceJoin.bind(this))
    this.channel.on('presence', {event: 'leave'}, this.onPresenceLeave.bind(this))
    this.channel.subscribe(this.onSubscribed.bind(this));
}

The onSubscribed function is invoked once the channel is successfully subscribed, notifying the user of the subscription connection state.

The DbChannel - Listening to Database Changes

The DbChannel, much like the RtChannel, is responsible for handling events. However, instead of sending and receiving user events, it listens to database changes. The code structure is similar to that of the RtChannel, but it is initialized with a different configuration.

constructor(private client: RtClient, private channel: RealtimeChannel, tables: RtTableDef[]) {
    for (const table of tables) {
        channel.on(
            'postgres_changes',
            {
                event: '*',
                schema: 'public',
                table: table.name,
                filter: table.filter
            },
            (payload) => this.handleOnPayload(payload)
        )
    }
    this.channel.subscribe(this.onSubscribed.bind(this));
}

Rather than subscribing to the broadcast channel and building event channels on top, the DbChannel subscribes to the Postgres changes channel on specific tables that match a defined filter. This provides a more fine-grained approach to listening to the desired database changes. It’s important to note that any Postgres change events adhere to row-level security, requiring the client using these events to be logged in and authorized, if RLS is enabled.

The RtClient - Managing Channels

The RtChannels and DbChannels are created inside a RtClient, which is responsible for their creation and management.


export class RtClient {
    //@ts-ignore
    private rtChannels = new Map<string, RtChannel>();
    private dbChannels = new Map<string, DbChannel>

    constructor(public clientId: string, public sb: SupabaseClient<any, "public", any>) {
    }

    public createDbChannel(id: string, tables: RtTableDef[]) {
        const channel = this.sb.channel("db-changes")
        const dbChannel = new DbChannel(this, channel, tables);
        this.dbChannels.set(id, dbChannel);
        return dbChannel;
    }

    public createRtChannel<T>(name: string): RtChannel<T> {
        const channel = this.sb.channel(name, {
            config: {
                broadcast: {
                    self: true,
                    ack: true
                },
                presence: {
                    key: this.clientId
                }
            }
        })
        const rtChannel = new RtChannel<T>(this, channel)
        this.rtChannels.set(name, rtChannel);
        return rtChannel;
    }
}

The RtClient is initialized with a Supabase client and a unique client ID. It provides functions to create RtChannels and DbChannels. Notably, the configuration for an RtChannel includes setting the ack (acknowledge) flag to true, which makes sure the client acknowledges that it received the event.

Usage

Implementing events functionality on the user side is now simplified. Users only need to create an event that inherits from RealtimeEvent and include the desired properties.

export class ToggleEvent extends RealtimeEvent {
    deviceId: string
    state: boolean
}

export class ToggleAllOffEvent extends RealtimeEvent {
}

In this example, the scenario involves multiple IoT devices connected to a central self-hosted Supabase instance. The server running this instance serves a webpage for interacting with these devices. Some devices are connected to physical switches that can be toggled on or off. The ToggleEvent is utilized to toggle a specific device connected to the client on or off, identified by its deviceId. The ToggleAllOffEvent is employed to toggle off all physical devices across all connected client devices.

On the server side, a new client is instantiated. In this example, it doesn’t subscribe to any domain events as it’s solely interested in client join and leave events.

const client = new RtClient("server", sb)
const eventChannel = messageClient.createRtChannel<{}>("events"); 

eventChannel.onJoined((client) => {
    if (client.id != "server") {
        connectedClients.get(client.id)!.online = true;
    }
});

eventChannel.onLeft(async (client) => {
    let client = eventChannel.get(client.id)!;
    client.online = false;
    eventChannel.set(client.id, client);
    connectedStudents = connectedStudents;
    if (trackStudent)
        await interactiveSheet.stopTracking()
});

Sending events is straightforward - simply type eventChannel.sendTo(new ToggleEvent(deviceId, true), clientId).

The client-side implementation mirrors the server’s, with the only difference being that the client subscribes to the ToggleEvent and ToggleAllOffEvent events.

const client = new RtClient(clientId, sb)
const eventChannel = messageClient.createRtChannel<{}>("events"); 

eventChannel.subscribe(ToggleEvent, (event) => {});
eventChannel.subscribe(ToggleAllOffEvent, (event) => {});

Source

Source Code