-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaether_client.py
More file actions
216 lines (183 loc) · 9.36 KB
/
aether_client.py
File metadata and controls
216 lines (183 loc) · 9.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import argparse
import ctypes
import mmap
import os
import platform
import struct
import sys
import time
from typing import Optional
class AetherConsumer:
WRITE_CURSOR_OFFSET = 0
READ_CURSOR_OFFSET = 128
CAPACITY_OFFSET = 256
DATA_OFFSET = 384
HEADER_BYTES = 16
ALIGNMENT_BYTES = 8
_LENGTH_STRUCT = struct.Struct("<I")
_CURSOR_STRUCT = struct.Struct("<Q")
# Header format: [ID (8B), Length (4B), Type (4B)] - ID at offset 0 for 8-byte alignment
_HEADER_STRUCT = struct.Struct("<QII") # Q=long (8B), I=int (4B), I=int (4B)
def __init__(self, path: str) -> None:
self._path = os.path.abspath(path)
self._file = open(self._path, "r+b", buffering=0)
self._mmap = mmap.mmap(self._file.fileno(), 0, access=mmap.ACCESS_WRITE)
self.capacity = self._LENGTH_STRUCT.unpack_from(self._mmap, self.CAPACITY_OFFSET)[0]
if self.capacity <= 0 or self.capacity & (self.capacity - 1):
raise ValueError(f"Capacity must be power-of-two, found {self.capacity}")
self._mask = self.capacity - 1
self._read_cursor = self._CURSOR_STRUCT.unpack_from(self._mmap, self.READ_CURSOR_OFFSET)[0]
# Allocate scratch buffer for max possible message size
# Allocate scratch buffer for max possible message size
# Max message size can be up to capacity (header + payload)
self._scratch = bytearray(self.capacity)
# Setup memory barrier for ARM architectures
self._memory_fence = self._setup_memory_fence()
@staticmethod
def _setup_memory_fence():
"""Setup memory barrier function for ARM architectures using ctypes."""
is_arm = platform.machine().lower() in ('arm64', 'aarch64', 'arm')
is_apple = platform.system() == 'Darwin'
if not (is_arm or is_apple):
# x86_64 has strong memory ordering, no fence needed
return lambda: None
try:
# Try to load system C library
if sys.platform == 'darwin':
libc = ctypes.CDLL('libc.dylib')
elif sys.platform.startswith('linux'):
libc = ctypes.CDLL('libc.so.6')
else:
# Windows or unknown
# On Windows ARM, we might not have easy access to a barrier via ctypes without a custom DLL.
# Log a warning that we are running without barriers on a weak memory model.
print("[Aether-IPC] WARNING: Running on ARM without memory barriers! Data consistency is not guaranteed.")
return lambda: None
# Try to find memory barrier function
# On ARM: __sync_synchronize or __atomic_thread_fence
# On Linux: __sync_synchronize
try:
fence_func = libc.__sync_synchronize
fence_func.argtypes = []
fence_func.restype = None
return fence_func
except AttributeError:
try:
# Try atomic_thread_fence (C11)
fence_func = libc.__atomic_thread_fence
fence_func.argtypes = [ctypes.c_int]
fence_func.restype = None
# __ATOMIC_SEQ_CST = 5
return lambda: fence_func(5)
except AttributeError:
# Fallback: dummy function
return lambda: None
except (OSError, ImportError, AttributeError):
# Library not found or function not available
print("[Aether-IPC] WARNING: Failed to load memory barrier function. Data consistency is not guaranteed.")
return lambda: None
def _memory_barrier(self) -> None:
"""Issue a memory barrier to ensure memory ordering on weak memory models."""
self._memory_fence()
def close(self) -> None:
try:
self._mmap.close()
finally:
self._file.close()
def poll(self) -> Optional[dict]:
# Memory barrier before reading cursor (ARM safety)
self._memory_barrier()
write_cursor = self._CURSOR_STRUCT.unpack_from(self._mmap, self.WRITE_CURSOR_OFFSET)[0]
if self._read_cursor >= write_cursor:
return None
offset_in_ring = self._read_cursor & self._mask
absolute = self.DATA_OFFSET + offset_in_ring
contiguous = self.capacity - offset_in_ring
content_length = self._read_length(absolute, contiguous)
if content_length < self.HEADER_BYTES or content_length > self.capacity:
raise ValueError(f"Corrupted message length: {content_length}")
# Round up to 8-byte alignment: (size + 7) & ~7
aligned_message_length = (content_length + self.ALIGNMENT_BYTES - 1) & ~(self.ALIGNMENT_BYTES - 1)
if self._read_cursor + aligned_message_length > write_cursor:
return None
payload_length = content_length - self.HEADER_BYTES
buffer_view = memoryview(self._mmap)
if aligned_message_length <= contiguous:
# Unpack all 3 values: ID (8B), Length (4B), Type (4B) - ignore Length since we already read it
message_id, _, message_type = self._HEADER_STRUCT.unpack_from(buffer_view, absolute)
payload_start = absolute + self.HEADER_BYTES
payload_end = payload_start + payload_length
payload_view = buffer_view[payload_start:payload_end]
# For contiguous messages, return memoryview (zero-copy, but caller must not retain after close)
payload = payload_view
else:
message_view = self._copy_wrapped(buffer_view, absolute, contiguous, aligned_message_length)
# Unpack all 3 values: ID (8B), Length (4B), Type (4B) - ignore Length since we already read it
message_id, _, message_type = self._HEADER_STRUCT.unpack_from(message_view, 0)
# CRITICAL: Copy wrapped payload to bytes - scratch buffer gets reused!
payload_bytes = bytes(message_view[self.HEADER_BYTES:self.HEADER_BYTES + payload_length])
payload = memoryview(payload_bytes)
self._read_cursor += aligned_message_length
# Memory barrier after updating cursor (ARM safety)
self._memory_barrier()
self._CURSOR_STRUCT.pack_into(self._mmap, self.READ_CURSOR_OFFSET, self._read_cursor)
# NOTE: For contiguous messages, payload is a memoryview backed by the mmap (do not retain after closing).
# For wrapped messages, payload is a memoryview of a bytes copy (safe to retain).
# WARNING: This method allocates a new dict per message. For true zero-GC, use a reusable object pool or C extension.
return {"id": message_id, "type": message_type, "payload": payload}
def _read_length(self, absolute_offset: int, contiguous: int) -> int:
# Length is at offset 8 (after 8-byte ID)
length_offset = absolute_offset + 8
if contiguous >= 8 + self._LENGTH_STRUCT.size:
return self._LENGTH_STRUCT.unpack_from(self._mmap, length_offset)[0]
temp = self._copy_wrapped(memoryview(self._mmap), absolute_offset, contiguous, 8 + self._LENGTH_STRUCT.size)
return self._LENGTH_STRUCT.unpack_from(temp, 8)[0]
def _copy_wrapped(self, buffer_view: memoryview, absolute_offset: int, contiguous: int, total: int) -> memoryview:
scratch_view = memoryview(self._scratch)
first_chunk = min(contiguous, total)
scratch_view[:first_chunk] = buffer_view[absolute_offset:absolute_offset + first_chunk]
remaining = total - first_chunk
if remaining:
scratch_view[first_chunk:first_chunk + remaining] = buffer_view[self.DATA_OFFSET:self.DATA_OFFSET + remaining]
return scratch_view[:total]
def main() -> None:
parser = argparse.ArgumentParser(description="Python bridge for Aether-IPC")
parser.add_argument("path", help="Path to the shared memory file")
parser.add_argument("--duration", type=float, default=0, help="Seconds to run (0 = infinite)")
args = parser.parse_args()
consumer = AetherConsumer(args.path)
print(f"[Python] Attached to {args.path}, capacity={consumer.capacity}")
start = time.perf_counter()
last_report = start
received = 0
# Smart backoff strategy: spin then yield then sleep
spin_count = 0
max_spins = 100
try:
while True:
message = consumer.poll()
if message is None:
# Smart backoff: spin first (low latency), then yield, then sleep
if spin_count < max_spins:
spin_count += 1
# Busy-spin for low latency (Python GIL limits effectiveness but still helps)
for _ in range(10):
pass
else:
# After max spins, yield to scheduler
time.sleep(0) # Yield to scheduler
spin_count = 0 # Reset for next cycle
else:
received += 1
spin_count = 0 # Reset on successful poll
now = time.perf_counter()
if now - last_report >= 1.0:
rate = received / (now - start)
print(f"[Python] Received={received:,} avg_rate={rate:,.0f}/s")
last_report = now
if args.duration > 0 and (now - start) >= args.duration:
break
finally:
consumer.close()
if __name__ == "__main__":
main()