๐Ÿ 

signals

thread-safe variables with automatic history.

the quick start

counter = FX.CreateSignal(value=0) | run
# โ†’ Signal('๐Ÿƒ-a48ba7b311a6cbb2fd6d')

FX.ReadSignal(signal=counter) | run            # 0
FX.SetSignal(signal=counter, value=42) | run
FX.ReadSignal(signal=counter) | run            # 42

counter is a handle (not the value). Every read/write is thread-safe. Fine.

why signals exist

You've got multiple threads or handlers needing to share state. Options:

optionfast?thread-safe?history?
plain variableโœ…โŒโŒ
variable + threading.Lockโœ…you hopeโŒ
DictDatabasemehโœ…โœ… (full)
Signalโœ…โœ… (automatic)โœ… (ring buffer)
mental model โ€” a signal is a named cell with a ring buffer
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ Global Signal Registry (process-wide) โ”‚ โ”‚ โ”‚ โ”‚ Signal('๐Ÿƒ-abc...') โ”€โ”€โ–ถ RingBuffer โ”‚ โ”‚ โ”Œโ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”โ”‚ โ”‚ โ”‚v0 โ”‚v1 โ”‚v2 โ”‚โ”‚ โ”‚ โ””โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”˜โ”‚ โ”‚ โ†‘ oldest โ”‚ โ”‚ newest โ†‘ โ”‚ โ”‚ โ”‚ multiple readers (shared lock) โ”‚ โ”‚ one writer at a time (exclusive lock) โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Named by UID. Lives in a process-wide registry. Every thread can read/write safely.

atomicity โ€” the one rule

operationatomic?why
ReadSignalโœ…holds read lock for duration
SetSignalโœ…holds write lock for duration
Read โ†’ then โ†’ WriteโŒlock released between the two
UpdateSignal(transform=f)โœ…optimistic CAS + retry

never do read-then-write under concurrency

If two threads both do:

v = FX.ReadSignal(signal=counter) | run
FX.SetSignal(signal=counter, value=v + 1) | run

...one increment will be lost. Between the read and the write, another thread can slip in and also read the same value. Both then write v+1. Lost increment.

Fix: always use UpdateSignal with a pure transform.

UpdateSignal โ€” safe read-modify-write

FX.UpdateSignal(signal=counter, transform=add(1)) | run

How it works: reads the value, releases the lock, runs your transform, then re-acquires the lock to commit โ€” but only if no concurrent write landed. Otherwise it retries automatically with the fresh value.

@zef_function
def increment_by_n(x: Int) -> Int:
    return x + 5

FX.UpdateSignal(signal=counter, transform=increment_by_n) | run

transforms MUST be pure

Since UpdateSignal retries on conflict, a transform that sends an email or writes a file may do so multiple times. Keep transforms to arithmetic, dict/list construction โ€” pure data shuffling.

history for free

sig = FX.CreateSignal(value=0, history=100) | run
# keep last 100 values

# ... many updates ...

hist = FX.ReadSignalHistory(signal=sig, count=10) | run
# [
#   {'time': Time('2026-01-15 10:30:01 +0000'), 'value': 1},
#   {'time': Time('2026-01-15 10:30:02 +0000'), 'value': 2},
#   ...
# ]

Ring buffer: when it fills up, oldest values get overwritten. Perfect for metrics, audit trails, "what changed recently."

type constraints

sig = FX.CreateSignal(value=0, type=Int) | run

FX.SetSignal(signal=sig, value=42) | run         # ok
FX.SetSignal(signal=sig, value='hi') | run       # error! type mismatch

# union types
sig = FX.CreateSignal(value=0, type=Int | String) | run

the gotcha: you get back zef types, not python types

you writeyou read backworksbreaks
intInt32_x+1, x*2isinstance(x, int)
strStringShort_x == 'foo'x.upper(), x.split()
NoneNil_x == Nonex is None
dictDict1_x['k'], x.items()isinstance(x, dict)

Most operations "just work" because Zef types implement most Python protocols. But isinstance and specific-python-only methods need different approaches.

metrics-collection example

# Hour-long sliding window of request counts
metrics = FX.CreateSignal(
    value={'requests': 0, 'errors': 0},
    history=60,     # 60 snapshots
) | run

@zef_function
def inc_requests(m: Dict) -> Dict:
    return {'requests': m['requests'] + 1, 'errors': m['errors']}

@zef_function
def inc_errors(m: Dict) -> Dict:
    return {'requests': m['requests'], 'errors': m['errors'] + 1}

# From any handler thread:
FX.UpdateSignal(signal=metrics, transform=inc_requests) | run

# Read a historical snapshot
FX.ReadSignalHistory(signal=metrics, count=10) | run

when to use what

needuse
thread-safe variable in one processSignal
persistent across restartsDatabase
cross-process sharingDatabase (vault)
full historyDatabase (append log)
last N valuesSignal (ring buffer)
single-threaded stateplain variable

signals are for NOW

In-memory, process-scoped. When the process exits, they vanish. Think of them as the "configuration / current metrics / application state" layer. For "saved for later" data, use databases.

snapshot โ€” all signals at once

snapshot = FX.SignalsSnapshot() | run
# {Signal('๐Ÿƒ-...'): <value>, Signal('๐Ÿƒ-...'): <value>, ...}

FX.DeleteSignal(signal=counter) | run

Handy for debugging โ€” "what's the current state of everything?"

exercise

Build a signal-based "last 10 requests" log. Every time a request comes in, append its timestamp + path to the stored list. Never grow beyond 10.

solution
reqs = FX.CreateSignal(value=[], history=50) | run

@zef_function
def append_cap10(lst: Array, entry: Any) -> Array:
    return (list(lst) + [entry])[-10:]

# each request:
entry = {'t': FX.CurrentTime() | run, 'p': req.path}
FX.UpdateSignal(signal=reqs, transform=lambda lst: append_cap10(lst, entry)) | run
# (or wrap in a zef_function to avoid lambda restriction)

Next up: actors โ€” the concurrency model. โ†’