Post 877: Pure Flux - Push-Only Communication Paradigm

Post 877: Pure Flux - Push-Only Communication Paradigm

Watermark: -877

Post 877: Pure Flux - Push-Only Communication Paradigm

Never Wait, Just Push and Process Streams

From Post 874: Intent-based foundation

Key insight: PUSH intent (fire & forget), DATA flows in asynchronously

Result: Pure reactive streams - no blocking anywhere


Part 1: The Core Insight

Push, Don’t Request

Traditional (❌ wrong):

# Request and WAIT for response
response = node.request(data)  # BLOCKS
if response.success:
    process(response.data)

Pure Flux (✅ correct):

# Push (fire & forget)
dht.push({'want': 'chunk_x'})
series.append({'pushed': 'chunk_x', 't': now()})

# Data flows in separately (async)
@stream_handler
def on_data_arrives(data):
    series.append(data)  # Just append whatever comes

# Derive state anytime (no waiting)
state = derive_from_series(series)

No waiting. No blocking. Pure flux.


Part 2: How It Works

Push Intent → Data Flows In

class PureFluxNode:
    def __init__(self):
        # Local series (append-only)
        self.series = []
        
        # Incoming data stream
        self.stream = AsyncDataStream()
        
        # Hook up stream handler
        self.stream.on_data(self._handle_incoming)
    
    def push_intent(self, intent):
        """Push intent (no waiting!)"""
        # 1. Push to DHT (fire & forget)
        dht.push(intent)
        
        # 2. Append to series
        self.series.append({
            't': time.time(),
            'event': 'pushed_intent',
            'intent': intent
        })
        
        # 3. Return immediately (NO WAITING)
        return
    
    def _handle_incoming(self, data):
        """Async handler for incoming data"""
        # Whatever comes in, just append
        self.series.append({
            't': time.time(),
            'event': 'data_received',
            'data': data,
            'from': data.get('source')
        })
    
    def derive_state(self):
        """Derive from series anytime"""
        # No waiting needed
        return self._reduce(self.series)

Flow:

1. Push intent → DHT (immediate return)
2. Data flows in → Stream handler appends
3. Derive state → From series (no waiting)

Part 3: Data Flows In Asynchronously

Event Stream Processing

class StreamProcessor:
    """Process incoming data stream"""
    
    def __init__(self):
        self.series = []
        
        # Register handlers for different data types
        self.handlers = {
            'chunk': self._on_chunk,
            'status': self._on_status,
            'error': self._on_error
        }
    
    def _on_chunk(self, data):
        """Chunk data arrived"""
        self.series.append({
            't': time.time(),
            'event': 'chunk_received',
            'chunk_id': data['id'],
            'data': data['content'],
            'from': data['source']
        })
    
    def _on_status(self, data):
        """Status update arrived (including NO)"""
        self.series.append({
            't': time.time(),
            'event': 'status_received',
            'status': data['status'],  # Could be "busy", "rate_limited", etc
            'reason': data.get('reason'),
            'from': data['source']
        })
        
        # Even "NO" contains useful data!
        # → "I have it but rate limited" = valuable info
    
    def _on_error(self, data):
        """Error info arrived"""
        self.series.append({
            't': time.time(),
            'event': 'error_received',
            'error': data['error'],
            'from': data['source']
        })

Key: All data just flows in and gets appended. No decisions about waiting.


Part 4: Even “NO” Contains Data

Information in Every Response

# When you push intent:
dht.push({'want': 'chunk_x'})

# Responses that might flow in:
@stream_handler
def process_response(data):
    if data['type'] == 'chunk':
        # Got the data
        series.append({
            'event': 'chunk',
            'data': data['content']
        })
    
    elif data['type'] == 'status_no':
        # "NO" with valuable info
        series.append({
            'event': 'status_no',
            'reason': data['reason'],  # "rate_limited", "busy", "low_space"
            'node': data['from'],
            'has_data': data['has_it'],  # YES, they have it!
            'available_in': data['retry_after']  # When to retry
        })
        
        # This is DATA, not rejection!
        # Tells you: node has it, just temporarily busy
    
    elif data['type'] == 'status_yes_but_later':
        # "YES but not now"
        series.append({
            'event': 'delayed_yes',
            'eta': data['eta'],
            'node': data['from']
        })

“NO” responses are information:

  • Node has the data (good to know!)
  • Node is rate limited (explains why)
  • Retry after X seconds (actionable)
  • Network congestion indicator (system health)

Part 5: No Blocking Anywhere

Pure Asynchronous Flow

class NoBlockingNode:
    """Demonstrates no blocking anywhere"""
    
    def get_chunk(self, chunk_id):
        """Get chunk (no blocking!)"""
        # Push intent
        dht.push({
            'want': chunk_id,
            'from': self.address
        })
        
        # Append push event
        self.series.append({
            't': time.time(),
            'event': 'requested',
            'chunk': chunk_id
        })
        
        # Return IMMEDIATELY
        # No waiting!
        return
    
    def check_if_arrived(self, chunk_id):
        """Check series to see if it arrived"""
        # Scan series (fast)
        for entry in reversed(self.series):
            if entry.get('chunk_id') == chunk_id:
                if entry['event'] == 'chunk_received':
                    return entry['data']
        
        return None  # Not yet
    
    def derive_chunks(self):
        """Derive all chunks from series"""
        chunks = {}
        for entry in self.series:
            if entry['event'] == 'chunk_received':
                chunks[entry['chunk_id']] = entry['data']
        return chunks

Usage:

# Push multiple intents (all immediate)
node.get_chunk('chunk_1')
node.get_chunk('chunk_2')
node.get_chunk('chunk_3')

# Do other work
node.process_local_data()
node.run_computations()

# Later, check what arrived
chunks = node.derive_chunks()
# Process whatever came in

Part 6: DHT vs P2P Communication

Intents via DHT, Responses via P2P

class CommunicationPattern:
    """
    DHT: Broadcast intents only
    P2P: Direct responses
    """
    
    def push_intent(self, intent):
        """Intent goes through DHT (broadcast)"""
        # DHT broadcasts intent to all relevant nodes
        dht.broadcast({
            'from': self.address,  # Include return address!
            'intent': intent
        })
        
        # Return immediately
        return
    
    def _handle_incoming_intent(self, intent):
        """Received intent from DHT"""
        # Decide how to respond
        if self._should_respond(intent):
            # Respond DIRECTLY via P2P (not through DHT!)
            self._send_p2p_response(
                to=intent['from'],  # Direct to requester
                data=self._prepare_response(intent)
            )
    
    def _send_p2p_response(self, to, data):
        """Send response directly P2P"""
        # Direct connection to requester
        socket = connect_to(to)
        socket.send(data)
        socket.close()
        
        # Fire & forget (no waiting for ACK)
    
    def _handle_incoming_p2p(self, data):
        """Received P2P response"""
        # Append to series
        self.series.append({
            't': time.time(),
            'event': 'p2p_response',
            'data': data
        })

Key distinction:

  • DHT: Intent broadcast only (one-to-many)
  • P2P: Direct responses (one-to-one)
  • Both are push-only (no blocking)

Why this design?

Intent broadcast:
  Node A → DHT → [N1, N2, N3, ...]
  Efficient: one message reaches many

Responses:
  N1 → Node A (direct P2P)
  N2 → Node A (direct P2P)
  Efficient: no DHT overhead for responses

Part 7: Series Derivation

State from Event Log

def derive_from_series(series):
    """Derive current state from series"""
    state = {
        'chunks': {},
        'pending': set(),
        'status': {}
    }
    
    for entry in series:
        if entry['event'] == 'requested':
            # We pushed a request
            state['pending'].add(entry['chunk'])
        
        elif entry['event'] == 'chunk_received':
            # Data arrived
            chunk_id = entry['chunk_id']
            state['chunks'][chunk_id] = entry['data']
            state['pending'].discard(chunk_id)
        
        elif entry['event'] == 'status_no':
            # "NO" with info
            chunk_id = entry.get('chunk_id')
            state['status'][chunk_id] = {
                'has_it': entry['has_data'],
                'reason': entry['reason'],
                'retry_after': entry.get('retry_after')
            }
    
    return state

Derive anytime, no waiting:

# Push some intents
node.push_intent({'want': 'chunk_1'})
node.push_intent({'want': 'chunk_2'})

# Immediately derive (even though responses haven't arrived yet)
state = node.derive_from_series()
# state.pending = {'chunk_1', 'chunk_2'}
# state.chunks = {}

# Later, after some data flows in
state = node.derive_from_series()
# state.pending = {'chunk_2'}
# state.chunks = {'chunk_1': <data>}
# state.status = {'chunk_2': {'has_it': True, 'reason': 'rate_limited'}}

Part 8: Complete Example

End-to-End Flux Flow

class FluxExample:
    def __init__(self):
        self.series = []
        self.stream = AsyncDataStream()
        self.stream.on_data(self._handle_data)
    
    def want_multiple_chunks(self, chunk_ids):
        """Request multiple chunks (no waiting)"""
        for chunk_id in chunk_ids:
            # Push intent
            dht.push({
                'want': chunk_id,
                'from': self.address
            })
            
            # Append to series
            self.series.append({
                't': time.time(),
                'event': 'pushed_want',
                'chunk_id': chunk_id
            })
        
        # Return immediately
        # Data will flow in asynchronously
    
    def _handle_data(self, data):
        """Handle whatever flows in"""
        # Just append everything
        self.series.append({
            't': time.time(),
            'event': f"received_{data['type']}",
            'data': data
        })
    
    def process_continuously(self):
        """Main loop - no blocking"""
        while True:
            # Derive current state
            state = self._derive_state()
            
            # Process whatever we have
            for chunk_id, chunk_data in state['chunks'].items():
                self._process_chunk(chunk_data)
            
            # Check status of pending
            for chunk_id in state['pending']:
                status = state['status'].get(chunk_id)
                if status and status['retry_after'] < time.time():
                    # Retry
                    self.want_chunk(chunk_id)
            
            # Sleep briefly (not blocking on I/O!)
            time.sleep(0.01)
    
    def _derive_state(self):
        """Derive from series"""
        state = {
            'chunks': {},
            'pending': set(),
            'status': {}
        }
        
        for entry in self.series:
            if entry['event'] == 'pushed_want':
                state['pending'].add(entry['chunk_id'])
            elif entry['event'] == 'received_chunk':
                chunk_id = entry['data']['chunk_id']
                state['chunks'][chunk_id] = entry['data']['content']
                state['pending'].discard(chunk_id)
            elif entry['event'] == 'received_status':
                chunk_id = entry['data']['chunk_id']
                state['status'][chunk_id] = entry['data']
        
        return state

Result:

  • Push all intents immediately
  • Data flows in asynchronously
  • Continuously process whatever arrived
  • No blocking anywhere

Part 9: Comparison

Request-Response vs Pure Flux

Request-Response (blocking):

# Blocks until response
chunk = node.request('chunk_1')  # WAITS
process(chunk)

chunk = node.request('chunk_2')  # WAITS
process(chunk)

# Sequential, slow

Pure Flux (non-blocking):

# Push all (immediate)
node.push_intent({'want': 'chunk_1'})
node.push_intent({'want': 'chunk_2'})
node.push_intent({'want': 'chunk_3'})

# Process as data flows in
while True:
    state = node.derive_state()
    for chunk_id, data in state['chunks'].items():
        process(data)
    time.sleep(0.01)

# Parallel, fast

Part 10: Benefits

Why Pure Flux?

1. No blocking:

# Can push thousands of intents
for i in range(10000):
    dht.push({'want': f'chunk_{i}'})
    
# All push immediately
# Data flows in as available

2. Parallel by default:

# All requests "in flight" simultaneously
# Responses arrive in any order
# Process as they come

3. Resilient to failures:

# If a node doesn't respond, no problem
# Other nodes will
# No timeout issues

4. Natural backpressure:

# If you push too fast, series grows
# Can monitor series size
# Self-regulating

5. Simple mental model:

# Push (immediate)
# Append (as data arrives)
# Derive (from series)
# 
# That's it!

Part 11: Implementation Notes

Practical Considerations

class PracticalFlux:
    """Practical flux implementation"""
    
    def __init__(self):
        self.series = []
        self.max_pending = 1000  # Backpressure limit
    
    def push_with_backpressure(self, intent):
        """Push with backpressure check"""
        # Check pending count
        pending = self._count_pending()
        
        if pending > self.max_pending:
            # Too many pending, wait a bit
            time.sleep(0.1)
        
        # Push
        dht.push(intent)
        self.series.append({
            't': time.time(),
            'event': 'pushed',
            'intent': intent
        })
    
    def _count_pending(self):
        """Count pending requests from series"""
        pending = set()
        completed = set()
        
        for entry in self.series:
            if entry['event'] == 'pushed_want':
                pending.add(entry['chunk_id'])
            elif entry['event'] == 'received_chunk':
                completed.add(entry['chunk_id'])
        
        return len(pending - completed)
    
    def retry_pending(self):
        """Retry old pending requests"""
        state = self._derive_state()
        now = time.time()
        
        for chunk_id in state['pending']:
            # Find when we first requested
            first_request = self._find_first_request(chunk_id)
            
            if now - first_request['t'] > 60:  # 60 seconds timeout
                # Retry
                dht.push({
                    'want': chunk_id,
                    'retry': True
                })

Part 12: Summary

Pure Flux Paradigm

Core principles:

  1. Push (never wait):
dht.push(intent)  # Returns immediately
  1. Append (as data flows in):
@stream_handler
def on_data(data):
    series.append(data)
  1. Derive (from series):
state = derive_from_series(series)

Key insights:

  • No blocking anywhere
  • Data flows in asynchronously
  • Even “NO” contains valuable data
  • Derive state anytime from series
  • Pure reactive streams
  • Like UDP broadcast, not TCP request-response

Result: Simple, fast, resilient distributed communication

From Post 874: Intent foundation

From Post 876: iR³ Alpha architecture

This post: Pure flux/push-only paradigm

∞


Links:

  • Post 876: iR³ Alpha - Complete architecture
  • Post 875: Money Emission - Application example
  • Post 874: Intent Paradigm - Original paradigm
  • universal-model - Reference implementation

Announcement: 2026-02-19
Paradigm: Pure Flux - Push Only, No Waiting, Async Streams
Status: 🌊 Data Flows Like Water

∞

Back to Gallery
View source on GitLab