|
| 1 | +import argparse |
| 2 | +import ctypes |
| 3 | +import mmap |
| 4 | +import os |
| 5 | +import platform |
| 6 | +import struct |
| 7 | +import sys |
| 8 | +import time |
| 9 | +from typing import Optional |
| 10 | + |
| 11 | + |
| 12 | +class AetherConsumer: |
| 13 | + WRITE_CURSOR_OFFSET = 0 |
| 14 | + READ_CURSOR_OFFSET = 128 |
| 15 | + CAPACITY_OFFSET = 256 |
| 16 | + DATA_OFFSET = 384 |
| 17 | + HEADER_BYTES = 16 |
| 18 | + ALIGNMENT_BYTES = 8 |
| 19 | + |
| 20 | + _LENGTH_STRUCT = struct.Struct("<I") |
| 21 | + _CURSOR_STRUCT = struct.Struct("<Q") |
| 22 | + # Header format: [ID (8B), Length (4B), Type (4B)] - ID at offset 0 for 8-byte alignment |
| 23 | + _HEADER_STRUCT = struct.Struct("<QII") # Q=long (8B), I=int (4B), I=int (4B) |
| 24 | + |
| 25 | + def __init__(self, path: str) -> None: |
| 26 | + self._path = os.path.abspath(path) |
| 27 | + self._file = open(self._path, "r+b", buffering=0) |
| 28 | + self._mmap = mmap.mmap(self._file.fileno(), 0, access=mmap.ACCESS_WRITE) |
| 29 | + |
| 30 | + self.capacity = self._LENGTH_STRUCT.unpack_from(self._mmap, self.CAPACITY_OFFSET)[0] |
| 31 | + if self.capacity <= 0 or self.capacity & (self.capacity - 1): |
| 32 | + raise ValueError(f"Capacity must be power-of-two, found {self.capacity}") |
| 33 | + self._mask = self.capacity - 1 |
| 34 | + self._read_cursor = self._CURSOR_STRUCT.unpack_from(self._mmap, self.READ_CURSOR_OFFSET)[0] |
| 35 | + # Allocate scratch buffer for max possible message size |
| 36 | + # Allocate scratch buffer for max possible message size |
| 37 | + # Max message size can be up to capacity (header + payload) |
| 38 | + self._scratch = bytearray(self.capacity) |
| 39 | + |
| 40 | + # Setup memory barrier for ARM architectures |
| 41 | + self._memory_fence = self._setup_memory_fence() |
| 42 | + |
| 43 | + @staticmethod |
| 44 | + def _setup_memory_fence(): |
| 45 | + """Setup memory barrier function for ARM architectures using ctypes.""" |
| 46 | + is_arm = platform.machine().lower() in ('arm64', 'aarch64', 'arm') |
| 47 | + is_apple = platform.system() == 'Darwin' |
| 48 | + |
| 49 | + if not (is_arm or is_apple): |
| 50 | + # x86_64 has strong memory ordering, no fence needed |
| 51 | + return lambda: None |
| 52 | + |
| 53 | + try: |
| 54 | + # Try to load system C library |
| 55 | + if sys.platform == 'darwin': |
| 56 | + libc = ctypes.CDLL('libc.dylib') |
| 57 | + elif sys.platform.startswith('linux'): |
| 58 | + libc = ctypes.CDLL('libc.so.6') |
| 59 | + else: |
| 60 | + # Windows or unknown |
| 61 | + # On Windows ARM, we might not have easy access to a barrier via ctypes without a custom DLL. |
| 62 | + # Log a warning that we are running without barriers on a weak memory model. |
| 63 | + print("[Aether-IPC] WARNING: Running on ARM without memory barriers! Data consistency is not guaranteed.") |
| 64 | + return lambda: None |
| 65 | + |
| 66 | + # Try to find memory barrier function |
| 67 | + # On ARM: __sync_synchronize or __atomic_thread_fence |
| 68 | + # On Linux: __sync_synchronize |
| 69 | + try: |
| 70 | + fence_func = libc.__sync_synchronize |
| 71 | + fence_func.argtypes = [] |
| 72 | + fence_func.restype = None |
| 73 | + return fence_func |
| 74 | + except AttributeError: |
| 75 | + try: |
| 76 | + # Try atomic_thread_fence (C11) |
| 77 | + fence_func = libc.__atomic_thread_fence |
| 78 | + fence_func.argtypes = [ctypes.c_int] |
| 79 | + fence_func.restype = None |
| 80 | + # __ATOMIC_SEQ_CST = 5 |
| 81 | + return lambda: fence_func(5) |
| 82 | + except AttributeError: |
| 83 | + # Fallback: dummy function |
| 84 | + return lambda: None |
| 85 | + except (OSError, ImportError, AttributeError): |
| 86 | + # Library not found or function not available |
| 87 | + print("[Aether-IPC] WARNING: Failed to load memory barrier function. Data consistency is not guaranteed.") |
| 88 | + return lambda: None |
| 89 | + |
| 90 | + def _memory_barrier(self) -> None: |
| 91 | + """Issue a memory barrier to ensure memory ordering on weak memory models.""" |
| 92 | + self._memory_fence() |
| 93 | + |
| 94 | + def close(self) -> None: |
| 95 | + try: |
| 96 | + self._mmap.close() |
| 97 | + finally: |
| 98 | + self._file.close() |
| 99 | + |
| 100 | + def poll(self) -> Optional[dict]: |
| 101 | + # Memory barrier before reading cursor (ARM safety) |
| 102 | + self._memory_barrier() |
| 103 | + write_cursor = self._CURSOR_STRUCT.unpack_from(self._mmap, self.WRITE_CURSOR_OFFSET)[0] |
| 104 | + if self._read_cursor >= write_cursor: |
| 105 | + return None |
| 106 | + |
| 107 | + offset_in_ring = self._read_cursor & self._mask |
| 108 | + absolute = self.DATA_OFFSET + offset_in_ring |
| 109 | + contiguous = self.capacity - offset_in_ring |
| 110 | + |
| 111 | + content_length = self._read_length(absolute, contiguous) |
| 112 | + if content_length < self.HEADER_BYTES or content_length > self.capacity: |
| 113 | + raise ValueError(f"Corrupted message length: {content_length}") |
| 114 | + |
| 115 | + # Round up to 8-byte alignment: (size + 7) & ~7 |
| 116 | + aligned_message_length = (content_length + self.ALIGNMENT_BYTES - 1) & ~(self.ALIGNMENT_BYTES - 1) |
| 117 | + |
| 118 | + if self._read_cursor + aligned_message_length > write_cursor: |
| 119 | + return None |
| 120 | + |
| 121 | + payload_length = content_length - self.HEADER_BYTES |
| 122 | + |
| 123 | + buffer_view = memoryview(self._mmap) |
| 124 | + if aligned_message_length <= contiguous: |
| 125 | + # Unpack all 3 values: ID (8B), Length (4B), Type (4B) - ignore Length since we already read it |
| 126 | + message_id, _, message_type = self._HEADER_STRUCT.unpack_from(buffer_view, absolute) |
| 127 | + payload_start = absolute + self.HEADER_BYTES |
| 128 | + payload_end = payload_start + payload_length |
| 129 | + payload_view = buffer_view[payload_start:payload_end] |
| 130 | + # For contiguous messages, return memoryview (zero-copy, but caller must not retain after close) |
| 131 | + payload = payload_view |
| 132 | + else: |
| 133 | + message_view = self._copy_wrapped(buffer_view, absolute, contiguous, aligned_message_length) |
| 134 | + # Unpack all 3 values: ID (8B), Length (4B), Type (4B) - ignore Length since we already read it |
| 135 | + message_id, _, message_type = self._HEADER_STRUCT.unpack_from(message_view, 0) |
| 136 | + # CRITICAL: Copy wrapped payload to bytes - scratch buffer gets reused! |
| 137 | + payload_bytes = bytes(message_view[self.HEADER_BYTES:self.HEADER_BYTES + payload_length]) |
| 138 | + payload = memoryview(payload_bytes) |
| 139 | + |
| 140 | + self._read_cursor += aligned_message_length |
| 141 | + # Memory barrier after updating cursor (ARM safety) |
| 142 | + self._memory_barrier() |
| 143 | + self._CURSOR_STRUCT.pack_into(self._mmap, self.READ_CURSOR_OFFSET, self._read_cursor) |
| 144 | + |
| 145 | + # NOTE: For contiguous messages, payload is a memoryview backed by the mmap (do not retain after closing). |
| 146 | + # For wrapped messages, payload is a memoryview of a bytes copy (safe to retain). |
| 147 | + # WARNING: This method allocates a new dict per message. For true zero-GC, use a reusable object pool or C extension. |
| 148 | + return {"id": message_id, "type": message_type, "payload": payload} |
| 149 | + |
| 150 | + def _read_length(self, absolute_offset: int, contiguous: int) -> int: |
| 151 | + # Length is at offset 8 (after 8-byte ID) |
| 152 | + length_offset = absolute_offset + 8 |
| 153 | + if contiguous >= 8 + self._LENGTH_STRUCT.size: |
| 154 | + return self._LENGTH_STRUCT.unpack_from(self._mmap, length_offset)[0] |
| 155 | + temp = self._copy_wrapped(memoryview(self._mmap), absolute_offset, contiguous, 8 + self._LENGTH_STRUCT.size) |
| 156 | + return self._LENGTH_STRUCT.unpack_from(temp, 8)[0] |
| 157 | + |
| 158 | + def _copy_wrapped(self, buffer_view: memoryview, absolute_offset: int, contiguous: int, total: int) -> memoryview: |
| 159 | + scratch_view = memoryview(self._scratch) |
| 160 | + first_chunk = min(contiguous, total) |
| 161 | + scratch_view[:first_chunk] = buffer_view[absolute_offset:absolute_offset + first_chunk] |
| 162 | + remaining = total - first_chunk |
| 163 | + if remaining: |
| 164 | + scratch_view[first_chunk:first_chunk + remaining] = buffer_view[self.DATA_OFFSET:self.DATA_OFFSET + remaining] |
| 165 | + return scratch_view[:total] |
| 166 | + |
| 167 | + |
| 168 | +def main() -> None: |
| 169 | + parser = argparse.ArgumentParser(description="Python bridge for Aether-IPC") |
| 170 | + parser.add_argument("path", help="Path to the shared memory file") |
| 171 | + parser.add_argument("--duration", type=float, default=0, help="Seconds to run (0 = infinite)") |
| 172 | + args = parser.parse_args() |
| 173 | + |
| 174 | + consumer = AetherConsumer(args.path) |
| 175 | + print(f"[Python] Attached to {args.path}, capacity={consumer.capacity}") |
| 176 | + |
| 177 | + start = time.perf_counter() |
| 178 | + last_report = start |
| 179 | + received = 0 |
| 180 | + |
| 181 | + # Smart backoff strategy: spin then yield then sleep |
| 182 | + spin_count = 0 |
| 183 | + max_spins = 100 |
| 184 | + try: |
| 185 | + while True: |
| 186 | + message = consumer.poll() |
| 187 | + if message is None: |
| 188 | + # Smart backoff: spin first (low latency), then yield, then sleep |
| 189 | + if spin_count < max_spins: |
| 190 | + spin_count += 1 |
| 191 | + # Busy-spin for low latency (Python GIL limits effectiveness but still helps) |
| 192 | + for _ in range(10): |
| 193 | + pass |
| 194 | + else: |
| 195 | + # After max spins, yield to scheduler |
| 196 | + time.sleep(0) # Yield to scheduler |
| 197 | + spin_count = 0 # Reset for next cycle |
| 198 | + else: |
| 199 | + received += 1 |
| 200 | + spin_count = 0 # Reset on successful poll |
| 201 | + |
| 202 | + now = time.perf_counter() |
| 203 | + if now - last_report >= 1.0: |
| 204 | + rate = received / (now - start) |
| 205 | + print(f"[Python] Received={received:,} avg_rate={rate:,.0f}/s") |
| 206 | + last_report = now |
| 207 | + |
| 208 | + if args.duration > 0 and (now - start) >= args.duration: |
| 209 | + break |
| 210 | + finally: |
| 211 | + consumer.close() |
| 212 | + |
| 213 | + |
| 214 | +if __name__ == "__main__": |
| 215 | + main() |
| 216 | + |
0 commit comments