state machines, fan-out, chatter, and more.
Classic traffic light. Different message types trigger different transitions:
actor = FX.StartActor(
input=topic,
initial_state=ET.Green(),
rules={
('tick', ET.Green): constant_func([[FX.Print(content='π’βπ‘')], ET.Yellow()]),
('tick', ET.Yellow): constant_func([[FX.Print(content='π‘βπ΄')], ET.Red()]),
('tick', ET.Red): constant_func([[FX.Print(content='π΄βπ’')], ET.Green()]),
('emergency', Any): constant_func([[FX.Print(content='π¨βπ΄')], ET.Red()]),
},
) | run
Rules are tuples of (msg_pattern, state_pattern). First match wins. Put narrow rules first, catch-alls last.
| pattern | matches when⦠|
|---|---|
Any | anything |
Int | value is an Int |
ET.Dog | value is an ET.Dog entity |
greater_than(5) | op returns True for value |
'tick' | equality |
{1, 2, 3} | value is in set |
Running stats, counters, log aggregation:
@zef_function
def running_sum(msg: Int, state: Int) -> Array:
total = state + msg
return [[FX.Print(content=f'total={total}')], total]
actor = FX.StartActor(input=topic, initial_state=0, handler=running_sum) | run
Multiple actors subscribe to the same topic. Each processes every message independently.
topic = ET.Topic('π-c3c3c3c3c3c3c3c3c3c3')
a1 = FX.StartActor(input=topic, initial_state=0, handler=doubler) | run
a2 = FX.StartActor(input=topic, initial_state=0, handler=squarer) | run
FX.Publish(target=topic, content=5) | run
# doubler prints 10
# squarer prints 25
An actor can publish to another topic as part of its effects list:
topic_a = ET.Topic('π-a1a1a1a1a1a1a1a1a1a1')
topic_b = ET.Topic('π-b2b2b2b2b2b2b2b2b2b2')
player_a = FX.StartActor(
input=topic_a,
initial_state=ET.Ready(),
rules={
(Any, Any): constant_func([
[FX.Print(content='π A'),
FX.Publish(target=topic_b, content='ball')],
ET.Ready(),
]),
},
) | run
player_b = FX.StartActor(
input=topic_b,
initial_state=ET.Ready(),
rules={
(Any, Any): constant_func([
[FX.Print(content='π B'),
FX.Publish(target=topic_a, content='ball')],
ET.Ready(),
]),
},
) | run
# serve
FX.Publish(target=topic_a, content='ball') | run
# π A
# π B
# π A
# π B
# ... bouncing at ~100k msg/s in Rust
A coordinator receives high-level commands and fans out to subordinate topics:
coordinator = FX.StartActor(
input=t_control,
initial_state=ET.Normal(),
rules={
('fire', ET.Normal): constant_func([
[FX.Publish(target=t_alarm, content='sound'),
FX.Publish(target=t_sprinkler, content='activate'),
FX.Publish(target=t_doors, content='unlock')],
ET.Emergency(),
]),
('all_clear', ET.Emergency): constant_func([
[FX.Publish(target=t_alarm, content='silence'),
FX.Publish(target=t_sprinkler, content='deactivate'),
FX.Publish(target=t_doors, content='lock')],
ET.Normal(),
]),
},
) | run
Each subordinate (alarm, sprinkler, doors) is its own actor on its own topic. Coordinator just fires messages.
When someone connects to your WebSocket, spawn a fresh actor just for them:
FX.StartHTTPServer(
routes={
'/ws': ET.WebSocket(
actor=FX.StartActor(
initial_state=ET.Guest(),
rules={
(ET.WSClientConnected, ET.Guest): on_connect,
(String, ET.Connected): on_message,
(ET.WSClientDisconnected, Any): on_disconnect,
},
),
),
},
port=8000,
) | run
Each client = one actor. Private state per connection. The actor dies when the client disconnects.
More on WebSocket actors in chapter 22.
@zef_function
def fizzbuzz(n: Int, state: Int) -> Array:
if n % 15 == 0: out = 'FizzBuzz!'
elif n % 3 == 0: out = 'Fizz'
elif n % 5 == 0: out = 'Buzz'
else: out = str(n)
return [[FX.Print(content=out)], state + 1]
actor = FX.StartActor(input=topic, initial_state=0, handler=fizzbuzz) | run
for i in range(1, 16):
FX.Publish(target=topic, content=i) | run
The state can be any zef value. Use the shape that fits your model:
IntET.Green() | ET.Yellow() | ET.Red() union[a, b] listDict with fields# What's the current state of the traffic light?
FX.QueryActorState(actor=traffic_light) | run
# All logins, with just the username field extracted
FX.QueryActorState(transform=F.username) | run
# How many items does each actor have in its state?
FX.QueryActorState(transform=length) | run
States: ET.Locked(), ET.Unlocked(), ET.Open(). Messages: 'unlock', 'lock', 'open', 'close'. Rules:
actor = FX.StartActor(
input=topic,
initial_state=ET.Locked(),
rules={
('unlock', ET.Locked): constant_func([[FX.Print(content='π')], ET.Unlocked()]),
('lock', ET.Unlocked): constant_func([[FX.Print(content='π')], ET.Locked()]),
('open', ET.Unlocked): constant_func([[FX.Print(content='πͺ')], ET.Open()]),
('close', ET.Open): constant_func([[FX.Print(content='β₯')], ET.Unlocked()]),
(Any, Any): constant_func([[FX.Print(content='illegal')], Z]), # keep state
},
) | run
Next up: topics, SubscribeFX and timers. β