a tiny concurrent worker that holds state.
topic = ET.Topic('๐-...')
Int, Dict,ET.Running(),[1, 2, 3], whatever
(msg, state),[effects, new_state]
All in Rust, on a shared Tokio runtime. You write handlers in Python but they execute concurrently. No locks. No shared memory. No race conditions.
from zef import *
import time
# 1. make a topic (the actor's mailbox)
topic = ET.Topic('๐-a1a1a1a1a1a1a1a1a1a2')
# 2. define the handler
@zef_function
def greeter(msg: String, state: Int) -> Array:
n = state + 1
return [
[FX.Print(content=f'#{n} Hello, {msg}!')],
n,
]
# 3. start it
actor = FX.StartActor(
input=topic,
initial_state=0,
handler=greeter,
) | run
time.sleep(0.3) # let it register
# 4. send messages
FX.Publish(target=topic, content='World') | run
FX.Publish(target=topic, content='Zef') | run
time.sleep(0.5)
FX.StopActor(actor=actor) | run
Output:
#1 Hello, World!
#2 Hello, Zef!
[effects_list, new_state]# no effects this time
return [[], new_state]
# one effect
return [[FX.Print(content='hi')], new_state]
# many effects โ they execute in order
return [
[FX.Print(content='step 1'),
FX.Print(content='step 2'),
FX.Publish(target=other_topic, content='ping')],
new_state,
]
Returning [FX.Print(...), 0] instead of [[FX.Print(...)], 0]. The first element must be a list of effects โ even a list of one.
topic = ET.Topic('๐-a0b1c2d3e4f567890abc') # platonic UID
topic = ET.Topic(generate_uid()) # random
The UID is how Zef routes messages. Same UID โ same topic. Multiple actors can subscribe to the same topic; each gets every message.
FX.Publish(target=topic, content='hello') | run
FX.Publish(target=topic, content=42) | run
FX.Publish(target=topic, content=ET.Ping()) | run
FX.Publish(target=topic, content={'x': 1, 'y': 2}) | run
Any Zef value can be a message โ strings, numbers, entities, dicts, arrays.
actor = FX.StartActor(
input=topic,
initial_state=0,
handler=my_func,
) | run
Handler takes (msg, state) as two args. Use for simple accumulators.
actor = FX.StartActor(
input=topic,
initial_state=ET.Green(),
rules={
('tick', ET.Green): h1,
('tick', ET.Yellow): h2,
(Any, Any): h_default,
},
) | run
Different code for different (msg, state) patterns. Perfect for state machines.
@zef_function
def counter(msg: Any, state: Int) -> Array:
if msg == 'inc':
return [[FX.Print(content=f'count={state+1}')], state + 1]
elif msg == 'dec':
return [[FX.Print(content=f'count={state-1}')], state - 1]
elif msg == 'reset':
return [[FX.Print(content='reset')], 0]
else:
return [[], state] # ignore
actor = FX.StartActor(input=topic, initial_state=0, handler=counter) | run
# read a snapshot of the actor's state
FX.QueryActorState(actor=actor) | run
# { ET.Actor('๐-...'): <state> }
# multiple actors
FX.QueryActorState(actors=[a1, a2, a3]) | run
# transform before returning
FX.QueryActorState(actor=actor, transform=length) | run
# all running actors
FX.QueryActorState() | run
FX.ListActors() | run
actor = FX.StartActor(
input=topic,
initial_state=0,
handler=h,
logging=True, # automatic ET.ActorLog entries
) | run
FX.Publish(target=topic, content='hi') | run
FX.QueryLog(filter=ET.ActorLog) | run
# [ET.ActorLog(event='processed', message_size=..., ...), ...]
FX.StopActor(actor=actor) | run
Cooperative cancellation โ finishes the current message, then exits cleanly.
The bottleneck is almost always the Python side doing the publishing. Once messages are in the system, actor-to-actor runs in Rust.
time.sleep() after StartActor. Give the actor a moment to register before publishing.[effects_list, new_state]. First element is always a list.handler= and rules=. Pick one.(msg, state) โ returns [effects, new_state]Write an actor that tracks a running average. It should receive numbers, and every time it gets one, print the new average.
@zef_function
def avg(msg: Float, state: Dict) -> Array:
n = state['n'] + 1
tot = state['sum'] + msg
a = tot / n
return [[FX.Print(content=f'avg={a:.2f}')], {'n': n, 'sum': tot}]
FX.StartActor(input=topic, initial_state={'n': 0, 'sum': 0.0}, handler=avg) | run
Next up: patterns โ state machines, fan-out, actor-to-actor. โ