๐Ÿ 

the chat agent

an end-to-end example that ties everything together.

We'll build a multi-turn LLM chat agent. Under 50 lines. Every idea from the book shows up: actors, state, FX effects, @zef_function, topics, HTTPRequest. It's the capstone.

what we're building

FX.Publish FX.Print User โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถ topic โ”€โ”€โ”€โ”€โ–ถ Actor โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถ stdout โ”‚ โ”‚ holds conversation state โ”‚ calls LLM over HTTP โ”‚ appends response to history โ–ผ state = { history: [...], system_prompt: '...', }

the whole thing

from zef import *
import builtins, time

topic = ET.Topic('๐Ÿƒ-cafe000000000000a011')

@zef_function
def chat(msg: String, state: Any) -> Array:
    # append the user's turn
    history = state['history'] | append({'role': 'user', 'content': msg}) | collect

    # build the full messages list for the API
    messages = [{'role': 'system', 'content': state['system_prompt']}] + list(history)
    body = {'model': 'qwen2.5:32b', 'stream': False, 'messages': messages} | to_json | collect

    # fire the HTTP request as an FX
    resp = FX.HTTPRequest(
        url='http://localhost:11434/api/chat',
        method='POST',
        headers={'Content-Type': 'application/json'},
        body=body,
    ) | run

    # extract the reply
    reply = resp | F.body | parse_json | F.message | F.content | collect

    # append assistant turn
    history = history | append({'role': 'assistant', 'content': reply}) | collect

    # emit a print effect and store new state
    return [
        [FX.Print(content=f'๐Ÿค–  {reply}')],
        {**state, 'history': history},
    ]

# start the actor
actor = FX.StartActor(
    input=topic,
    initial_state={
        'history':       [],
        'system_prompt': 'Reply warmly in one short sentence.',
    },
    handler=chat,
) | run

# REPL loop โ€” feed user input into the topic
while (line := builtins.input('๐Ÿง‘  ').strip()):
    FX.Publish(target=topic, content=line) | run
    time.sleep(6.0)    # let the LLM reply before next prompt

FX.StopActor(actor=actor) | run

what's happening โ€” line by line

the mental playback
  1. You type "hi, I'm Kai"
  2. FX.Publish drops it on the topic
  3. The actor receives (msg, state) โ€” msg = "hi, I'm Kai", state = the dict
  4. Handler appends user turn to history
  5. Builds the API payload
  6. Fires FX.HTTPRequest โ†’ Ollama
  7. Parses the response, extracts the assistant reply
  8. Returns [[FX.Print(...)], new_state]
  9. Runtime fires the Print; stores the new state
  10. Next message, state now has both turns

what each zef feature buys you

featurewhat it gives
Actorprivate state per session โ€” no locks, no globals
Topicdecouples "how messages arrive" from "how they're processed"
FX.HTTPRequestprovider calls are data โ€” swap Ollama โ†’ Gemini by changing one string
FX.Printside effect labeled as one; function stays pure-ish
@zef_functionhandler is content-addressed โ€” reproducible and shippable
ZefOpsparsing (parse_json | F.message | F.content) is a one-liner
dict statehistory grows naturally; any Zef value would work too

swapping providers โ€” the big payoff

Want Gemini instead of Ollama? Change the URL + payload:

resp = FX.HTTPRequest(
    url=f'https://generativelanguage.googleapis.com/v1beta/models/gemini-pro:generateContent?key={api_key}',
    method='POST',
    headers={'Content-Type': 'application/json'},
    body={'contents': [{'parts': [{'text': msg}]}]} | to_json | collect,
) | run

No rewrite. The actor, the topic, the state shape all stay exactly the same. That's the point of "effects as data" โ€” provider integration is a config change, not a refactor.

enhancements

persistent history

db = FX.CreateDB(
    type=DictDatabase,
    persistence='local_disk',
    path='~/.chat/history.zef',
) | run

# in the handler, after updating history:
db['last_session'] = history

expose as an HTTP endpoint

@zef_function
def api_chat(req):
    user_msg = str(req.body) | parse_json | F.message | collect
    FX.Publish(target=topic, content=user_msg) | run
    # (return an ack; the actor handles printing/storing)
    return ET.JSON(content={'ok': True})

FX.StartHTTPServer(
    routes={('POST', '/chat'): api_chat},
    port=8000,
) | run

multiple conversation sessions

Use one actor per session. Each has its own state. Route by session ID:

sessions = {}   # session_id โ†’ (topic, actor)

# when a new user arrives:
def new_session(id):
    t = ET.Topic(generate_uid())
    a = FX.StartActor(input=t, initial_state={'history': [], 'system_prompt': '...'}, handler=chat) | run
    sessions[id] = (t, a)

# route a message:
def send(id, text):
    t, _ = sessions[id]
    FX.Publish(target=t, content=text) | run

introspect live state

# how long is this session's history?
FX.QueryActorState(actor=actor, transform=F.history | length) | run

# all running sessions
FX.QueryActorState() | run

reflect on how many layers are in play

count the zef ideas in use

Every single chapter, folded into 50 lines of code. That's the framework.

what this suggests for bigger systems

Think of every app as:

  1. Topics for communication channels
  2. Actors holding stateful logic
  3. Databases/signals for persistence
  4. HTTP server(s) exposing endpoints
  5. Pure ZefOps for data transformations
  6. FX values for effects at the boundary

Wire them up with pipes and topics. Observe them with FX.Query*. Persist them to files with FX.Save*. The primitives are small; the compositions are large.

modify it

Pick one:

Next up: the zen of zef โ€” the 10-line manifesto. โ†’