From Post 874: Intent-based foundation
Key insight: PUSH intent (fire & forget), DATA flows in asynchronously
Result: Pure reactive streams - no blocking anywhere
Traditional (❌ wrong):
# Request and WAIT for response
response = node.request(data) # BLOCKS
if response.success:
process(response.data)
Pure Flux (✅ correct):
# Push (fire & forget)
dht.push({'want': 'chunk_x'})
series.append({'pushed': 'chunk_x', 't': now()})
# Data flows in separately (async)
@stream_handler
def on_data_arrives(data):
series.append(data) # Just append whatever comes
# Derive state anytime (no waiting)
state = derive_from_series(series)
No waiting. No blocking. Pure flux.
class PureFluxNode:
def __init__(self):
# Local series (append-only)
self.series = []
# Incoming data stream
self.stream = AsyncDataStream()
# Hook up stream handler
self.stream.on_data(self._handle_incoming)
def push_intent(self, intent):
"""Push intent (no waiting!)"""
# 1. Push to DHT (fire & forget)
dht.push(intent)
# 2. Append to series
self.series.append({
't': time.time(),
'event': 'pushed_intent',
'intent': intent
})
# 3. Return immediately (NO WAITING)
return
def _handle_incoming(self, data):
"""Async handler for incoming data"""
# Whatever comes in, just append
self.series.append({
't': time.time(),
'event': 'data_received',
'data': data,
'from': data.get('source')
})
def derive_state(self):
"""Derive from series anytime"""
# No waiting needed
return self._reduce(self.series)
Flow:
1. Push intent → DHT (immediate return)
2. Data flows in → Stream handler appends
3. Derive state → From series (no waiting)
class StreamProcessor:
"""Process incoming data stream"""
def __init__(self):
self.series = []
# Register handlers for different data types
self.handlers = {
'chunk': self._on_chunk,
'status': self._on_status,
'error': self._on_error
}
def _on_chunk(self, data):
"""Chunk data arrived"""
self.series.append({
't': time.time(),
'event': 'chunk_received',
'chunk_id': data['id'],
'data': data['content'],
'from': data['source']
})
def _on_status(self, data):
"""Status update arrived (including NO)"""
self.series.append({
't': time.time(),
'event': 'status_received',
'status': data['status'], # Could be "busy", "rate_limited", etc
'reason': data.get('reason'),
'from': data['source']
})
# Even "NO" contains useful data!
# → "I have it but rate limited" = valuable info
def _on_error(self, data):
"""Error info arrived"""
self.series.append({
't': time.time(),
'event': 'error_received',
'error': data['error'],
'from': data['source']
})
Key: All data just flows in and gets appended. No decisions about waiting.
# When you push intent:
dht.push({'want': 'chunk_x'})
# Responses that might flow in:
@stream_handler
def process_response(data):
if data['type'] == 'chunk':
# Got the data
series.append({
'event': 'chunk',
'data': data['content']
})
elif data['type'] == 'status_no':
# "NO" with valuable info
series.append({
'event': 'status_no',
'reason': data['reason'], # "rate_limited", "busy", "low_space"
'node': data['from'],
'has_data': data['has_it'], # YES, they have it!
'available_in': data['retry_after'] # When to retry
})
# This is DATA, not rejection!
# Tells you: node has it, just temporarily busy
elif data['type'] == 'status_yes_but_later':
# "YES but not now"
series.append({
'event': 'delayed_yes',
'eta': data['eta'],
'node': data['from']
})
“NO” responses are information:
class NoBlockingNode:
"""Demonstrates no blocking anywhere"""
def get_chunk(self, chunk_id):
"""Get chunk (no blocking!)"""
# Push intent
dht.push({
'want': chunk_id,
'from': self.address
})
# Append push event
self.series.append({
't': time.time(),
'event': 'requested',
'chunk': chunk_id
})
# Return IMMEDIATELY
# No waiting!
return
def check_if_arrived(self, chunk_id):
"""Check series to see if it arrived"""
# Scan series (fast)
for entry in reversed(self.series):
if entry.get('chunk_id') == chunk_id:
if entry['event'] == 'chunk_received':
return entry['data']
return None # Not yet
def derive_chunks(self):
"""Derive all chunks from series"""
chunks = {}
for entry in self.series:
if entry['event'] == 'chunk_received':
chunks[entry['chunk_id']] = entry['data']
return chunks
Usage:
# Push multiple intents (all immediate)
node.get_chunk('chunk_1')
node.get_chunk('chunk_2')
node.get_chunk('chunk_3')
# Do other work
node.process_local_data()
node.run_computations()
# Later, check what arrived
chunks = node.derive_chunks()
# Process whatever came in
class CommunicationPattern:
"""
DHT: Broadcast intents only
P2P: Direct responses
"""
def push_intent(self, intent):
"""Intent goes through DHT (broadcast)"""
# DHT broadcasts intent to all relevant nodes
dht.broadcast({
'from': self.address, # Include return address!
'intent': intent
})
# Return immediately
return
def _handle_incoming_intent(self, intent):
"""Received intent from DHT"""
# Decide how to respond
if self._should_respond(intent):
# Respond DIRECTLY via P2P (not through DHT!)
self._send_p2p_response(
to=intent['from'], # Direct to requester
data=self._prepare_response(intent)
)
def _send_p2p_response(self, to, data):
"""Send response directly P2P"""
# Direct connection to requester
socket = connect_to(to)
socket.send(data)
socket.close()
# Fire & forget (no waiting for ACK)
def _handle_incoming_p2p(self, data):
"""Received P2P response"""
# Append to series
self.series.append({
't': time.time(),
'event': 'p2p_response',
'data': data
})
Key distinction:
Why this design?
Intent broadcast:
Node A → DHT → [N1, N2, N3, ...]
Efficient: one message reaches many
Responses:
N1 → Node A (direct P2P)
N2 → Node A (direct P2P)
Efficient: no DHT overhead for responses
def derive_from_series(series):
"""Derive current state from series"""
state = {
'chunks': {},
'pending': set(),
'status': {}
}
for entry in series:
if entry['event'] == 'requested':
# We pushed a request
state['pending'].add(entry['chunk'])
elif entry['event'] == 'chunk_received':
# Data arrived
chunk_id = entry['chunk_id']
state['chunks'][chunk_id] = entry['data']
state['pending'].discard(chunk_id)
elif entry['event'] == 'status_no':
# "NO" with info
chunk_id = entry.get('chunk_id')
state['status'][chunk_id] = {
'has_it': entry['has_data'],
'reason': entry['reason'],
'retry_after': entry.get('retry_after')
}
return state
Derive anytime, no waiting:
# Push some intents
node.push_intent({'want': 'chunk_1'})
node.push_intent({'want': 'chunk_2'})
# Immediately derive (even though responses haven't arrived yet)
state = node.derive_from_series()
# state.pending = {'chunk_1', 'chunk_2'}
# state.chunks = {}
# Later, after some data flows in
state = node.derive_from_series()
# state.pending = {'chunk_2'}
# state.chunks = {'chunk_1': <data>}
# state.status = {'chunk_2': {'has_it': True, 'reason': 'rate_limited'}}
class FluxExample:
def __init__(self):
self.series = []
self.stream = AsyncDataStream()
self.stream.on_data(self._handle_data)
def want_multiple_chunks(self, chunk_ids):
"""Request multiple chunks (no waiting)"""
for chunk_id in chunk_ids:
# Push intent
dht.push({
'want': chunk_id,
'from': self.address
})
# Append to series
self.series.append({
't': time.time(),
'event': 'pushed_want',
'chunk_id': chunk_id
})
# Return immediately
# Data will flow in asynchronously
def _handle_data(self, data):
"""Handle whatever flows in"""
# Just append everything
self.series.append({
't': time.time(),
'event': f"received_{data['type']}",
'data': data
})
def process_continuously(self):
"""Main loop - no blocking"""
while True:
# Derive current state
state = self._derive_state()
# Process whatever we have
for chunk_id, chunk_data in state['chunks'].items():
self._process_chunk(chunk_data)
# Check status of pending
for chunk_id in state['pending']:
status = state['status'].get(chunk_id)
if status and status['retry_after'] < time.time():
# Retry
self.want_chunk(chunk_id)
# Sleep briefly (not blocking on I/O!)
time.sleep(0.01)
def _derive_state(self):
"""Derive from series"""
state = {
'chunks': {},
'pending': set(),
'status': {}
}
for entry in self.series:
if entry['event'] == 'pushed_want':
state['pending'].add(entry['chunk_id'])
elif entry['event'] == 'received_chunk':
chunk_id = entry['data']['chunk_id']
state['chunks'][chunk_id] = entry['data']['content']
state['pending'].discard(chunk_id)
elif entry['event'] == 'received_status':
chunk_id = entry['data']['chunk_id']
state['status'][chunk_id] = entry['data']
return state
Result:
Request-Response (blocking):
# Blocks until response
chunk = node.request('chunk_1') # WAITS
process(chunk)
chunk = node.request('chunk_2') # WAITS
process(chunk)
# Sequential, slow
Pure Flux (non-blocking):
# Push all (immediate)
node.push_intent({'want': 'chunk_1'})
node.push_intent({'want': 'chunk_2'})
node.push_intent({'want': 'chunk_3'})
# Process as data flows in
while True:
state = node.derive_state()
for chunk_id, data in state['chunks'].items():
process(data)
time.sleep(0.01)
# Parallel, fast
1. No blocking:
# Can push thousands of intents
for i in range(10000):
dht.push({'want': f'chunk_{i}'})
# All push immediately
# Data flows in as available
2. Parallel by default:
# All requests "in flight" simultaneously
# Responses arrive in any order
# Process as they come
3. Resilient to failures:
# If a node doesn't respond, no problem
# Other nodes will
# No timeout issues
4. Natural backpressure:
# If you push too fast, series grows
# Can monitor series size
# Self-regulating
5. Simple mental model:
# Push (immediate)
# Append (as data arrives)
# Derive (from series)
#
# That's it!
class PracticalFlux:
"""Practical flux implementation"""
def __init__(self):
self.series = []
self.max_pending = 1000 # Backpressure limit
def push_with_backpressure(self, intent):
"""Push with backpressure check"""
# Check pending count
pending = self._count_pending()
if pending > self.max_pending:
# Too many pending, wait a bit
time.sleep(0.1)
# Push
dht.push(intent)
self.series.append({
't': time.time(),
'event': 'pushed',
'intent': intent
})
def _count_pending(self):
"""Count pending requests from series"""
pending = set()
completed = set()
for entry in self.series:
if entry['event'] == 'pushed_want':
pending.add(entry['chunk_id'])
elif entry['event'] == 'received_chunk':
completed.add(entry['chunk_id'])
return len(pending - completed)
def retry_pending(self):
"""Retry old pending requests"""
state = self._derive_state()
now = time.time()
for chunk_id in state['pending']:
# Find when we first requested
first_request = self._find_first_request(chunk_id)
if now - first_request['t'] > 60: # 60 seconds timeout
# Retry
dht.push({
'want': chunk_id,
'retry': True
})
Core principles:
dht.push(intent) # Returns immediately
@stream_handler
def on_data(data):
series.append(data)
state = derive_from_series(series)
Key insights:
Result: Simple, fast, resilient distributed communication
From Post 874: Intent foundation
From Post 876: iR³ Alpha architecture
This post: Pure flux/push-only paradigm
∞
Links:
Announcement: 2026-02-19
Paradigm: Pure Flux - Push Only, No Waiting, Async Streams
Status: 🌊 Data Flows Like Water
∞