๐Ÿ 

websockets

broadcast mode and actor-per-connection mode.

two modes, same building blocks

pick your mode
modeuse case
topic modebroadcast/chatroom โ€” everyone sees everything
actor modeper-connection state โ€” auth, sessions, stateful protocols

mode 1 โ€” topic mode (chatroom)

All clients subscribe to one topic AND push into the same topic โ†’ instant broadcast.

chat = ET.Topic('๐Ÿƒ-87f83f06bfec2ba79b96')

FX.StartHTTPServer(
    routes={
        '/ws': ET.WebSocket(
            clients_subscribe_to=[chat],   # server โ†’ all clients
            clients_push_into_=[chat],     # any client โ†’ the topic
        ),
    },
    port=8000,
) | run

That's a complete chatroom. Connect, anything you send is broadcast to everyone.

asymmetric example

Server pushes updates, clients don't push back:

news_feed = ET.Topic(generate_uid())

FX.StartHTTPServer(
    routes={
        '/feed': ET.WebSocket(
            clients_subscribe_to=[news_feed],
            # no clients_push_into_ โ€” one-way
        ),
    },
    port=8000,
) | run

# elsewhere โ€” emit updates
FX.Publish(target=news_feed, content='breaking news!') | run

mode 2 โ€” actor per connection

Each client gets its own private actor with its own state:

FX.StartHTTPServer(
    routes={
        '/ws': ET.WebSocket(
            actor=FX.StartActor(
                initial_state=ET.Guest(),
                rules={
                    (ET.WSClientConnected, ET.Guest):       on_connect,
                    (String,                ET.Connected):   on_message,
                    (ET.WSClientDisconnected, Any):         on_disconnect,
                },
            ),
        ),
    },
    port=8000,
) | run

When a client connects, Zef spawns a fresh actor for that connection. When the client leaves, the actor stops.

the connection lifecycle

Every per-connection actor sees (at minimum) three kinds of message:

ET.WSClientConnected(
    client_ip='127.0.0.1',
    path='/ws',
    time=Time(...),
    topic_in=EntityUid(...),    # actor's input topic
    topic_out=EntityUid(...),   # publish here โ†’ sends to THIS client
    origin='http://...',
)

# regular text frames
String('hello from client')

# regular binary frames
Bytes(b'...')

ET.WSClientDisconnected(
    client_ip='127.0.0.1',
    path='/ws',
    time=Time(...),
    origin='http://...',
)

the topic_out trick

On connect, you get a topic_out that's unique to THIS client. Stash it in the actor's state so subsequent handlers know where to publish replies. This is how you "reply to the sender."

worked example โ€” a polite echo server

@zef_function
def on_connect(input: Any) -> Any:
    msg, state = input[0], input[1]
    return [
        [FX.Publish(target=msg.topic_out, content='welcome!')],
        ET.Connected(topic_out=msg.topic_out),
    ]

@zef_function
def on_message(input: Any) -> Any:
    msg, state = input[0], input[1]
    return [
        [FX.Publish(target=state.topic_out, content=f'you said: {msg}')],
        state,
    ]

@zef_function
def on_disconnect(input: Any) -> Any:
    msg, state = input[0], input[1]
    print(f'client {msg.client_ip} left')
    return [[], ET.Done()]

FX.StartHTTPServer(
    routes={
        '/ws': ET.WebSocket(
            actor=FX.StartActor(
                initial_state=ET.Guest(),
                rules={
                    (ET.WSClientConnected,    ET.Guest):     on_connect,
                    (String,                  ET.Connected): on_message,
                    (ET.WSClientDisconnected, Any):          on_disconnect,
                },
            ),
        ),
    },
    port=8000,
) | run

binary frames

Text frames arrive as String, binary as Bytes. If you're using the topic mode, binary arrives as Array1[u8] โ€” you have to convert:

FX.SubscribeFX(
    topic=from_clients,
    op=array_to_bytes | insert_into(FX.Publish(target=to_clients), 'content'),
) | run

shared state via initial_state

Thread shared resources through the initial state:

db = FX.CreateDB(type=DictDatabase, persistence='in_memory') | run

FX.StartHTTPServer(
    routes={
        '/ws': ET.WebSocket(
            actor=FX.StartActor(
                initial_state=ET.Guest(db=db),   # pass db through state
                rules={
                    (ET.WSClientConnected, ET.Guest):      on_connect,
                    (ET.StoreRequest,      ET.Connected):  on_store,
                },
            ),
        ),
    },
    port=8000,
) | run

Every connection's actor starts with a reference to the shared db handle. They all write to the same store.

monitoring clients

FX.QueryAllActiveHTTPServers() | run

FX.QueryAllWebsocketEndpoints(http_server=server) | run

FX.QueryAllConnectedWebsocketClients(http_server=server, path='/ws') | run
# [
#   {'channel_id': 0, 'client_ip': '127.0.0.1', 'connect_time': Time(...)},
#   ...
# ]

Built-in introspection โ€” great for debugging or building an admin panel.

connecting OUTWARD โ€” FX.StartWebSocketClient

Need to talk to an external WS server? Classic mode:

inbox  = ET.Topic(generate_uid())
outbox = ET.Topic(generate_uid())

client = FX.StartWebSocketClient(
    url='wss://echo.example.com/ws',
    in_stream_into=[inbox],
    out_stream_from=[outbox],
) | run

# subscribe to see what comes back
FX.SubscribeFX(topic=inbox, op=log) | run

# send a message
FX.Publish(target=outbox, content='hello remote!') | run

# cleanup
FX.StopWebSocketClient(client=client) | run

when to use which mode

topic mode
  • chatrooms, broadcast
  • live dashboards
  • fan-out events
  • no per-client state
actor mode
  • auth / sessions
  • stateful protocols
  • per-client rate limits
  • game lobbies

build a "chat with username" server

First message from client = their username. All subsequent messages are broadcast with "[username] says: ...".

solution sketch
chat = ET.Topic(generate_uid())

@zef_function
def on_connect(input: Any) -> Any:
    msg, state = input[0], input[1]
    return [
        [FX.Publish(target=msg.topic_out, content='send your name')],
        ET.AwaitingName(topic_out=msg.topic_out),
    ]

@zef_function
def on_name(input: Any) -> Any:
    name, state = input[0], input[1]
    # subscribe client to chat
    return [
        [FX.Publish(target=chat, content=f'* {name} joined'),
         FX.SubscribeFX(topic=chat, op=insert_into(FX.Publish(target=state.topic_out), 'content'))],
        ET.Chatting(name=name, topic_out=state.topic_out),
    ]

@zef_function
def on_msg(input: Any) -> Any:
    msg, state = input[0], input[1]
    return [[FX.Publish(target=chat, content=f'[{state.name}]: {msg}')], state]

# wire up rules
rules={
    (ET.WSClientConnected, Any):         on_connect,
    (String, ET.AwaitingName):           on_name,
    (String, ET.Chatting):                on_msg,
}

Next up: auth + HTTPS โ€” OAuth with one parameter, Let's Encrypt in two lines. โ†’