-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathslice_mgmt_app.py
More file actions
361 lines (279 loc) · 15.5 KB
/
slice_mgmt_app.py
File metadata and controls
361 lines (279 loc) · 15.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
"""
Slice Management Application for JRTC
This application integrates with the JRTC framework to manage RAN slice
allocations. It subscribes to slice management indication streams, logs slice
allocation data, and periodically issues slice allocation updates. The app
supports both local logging and optional remote logging to Azure Log Analytics.
Key features:
- All confif parameters are set in slice_mgmt_app_params.py.
- Initializes a JRTC application with request and indication streams for slice management.
- Sends an initial GET request to retrieve current slice allocations.
- Processes slice allocation indications, logs details, and stores state.
- Periodically sends SLICE_ALLOC_SET requests to update slice allocations. The periodicity is set by config parameter "slice_update_periodicity_secs".
- For the first SLICE_ALLOC_SET request, the min_prb_policy_ratio/min_prb_policy_ratio/priority fields are set to 0/30/1 and 0/70/1 for slices 0 and 1 respectively. For subsequent SLICE_ALLOC_SET requests, these values are swapped.
Note that 30 and 70 are the slice_prb_percentage_low and slice_prb_percentage_high config parameters.
- If you were to do a Iperf test on one of the slices, you should see performance figures as these ..
[ 5] 308.00-309.00 sec 63.5 MBytes 533 Mbits/sec 0.026 ms 24964/75429 (33%)
[ 5] 309.00-310.00 sec 67.2 MBytes 564 Mbits/sec 0.037 ms 22441/75827 (30%)
[ 5] 310.00-311.00 sec 64.3 MBytes 539 Mbits/sec 0.054 ms 24666/75753 (33%)
[ 5] 311.00-312.00 sec 37.2 MBytes 312 Mbits/sec 0.034 ms 45900/75417 (61%) <----- slice max_prb_policy_ratio changes here
[ 5] 312.00-313.00 sec 28.5 MBytes 239 Mbits/sec 0.061 ms 53129/75790 (70%)
[ 5] 313.00-314.00 sec 27.8 MBytes 233 Mbits/sec 0.033 ms 53764/75875 (71%)
[ 5] 314.00-315.00 sec 25.0 MBytes 210 Mbits/sec 0.030 ms 55594/75475 (74%)
[ 5] 315.00-316.00 sec 23.8 MBytes 200 Mbits/sec 0.041 ms 57172/76083 (75%)
[ 5] 316.00-317.00 sec 27.2 MBytes 229 Mbits/sec 0.061 ms 53954/75599 (71%)
[ 5] 317.00-318.00 sec 28.9 MBytes 242 Mbits/sec 0.035 ms 52808/75727 (70%)
"""
import time
import json
import os
import sys
import ctypes
import socket
import threading
import datetime as dt
from dataclasses import dataclass, asdict
from typing import Dict
from enum import Enum
import traceback
JRTC_APP_PATH = os.environ.get("JRTC_APP_PATH")
if JRTC_APP_PATH is None:
raise ValueError("JRTC_APP_PATH not set")
sys.path.append(f"{JRTC_APP_PATH}")
from jrtc_router_stream_id import jrtc_router_stream_id_get_device_id
from jrtc_wrapper_utils import get_ctx_from_capsule
import jrtc_app
from jrtc_app import *
import datetime as dt
# always include the logger modules
logger = sys.modules.get('logger')
from logger import Logger
# always include the params file
params = sys.modules.get('slice_mgmt_app_params')
if params.la_enabled:
la_logger = sys.modules.get('la_logger')
from la_logger import LaLogger, LaLoggerConfig
slice_mgmt = sys.modules.get('slice_mgmt')
from slice_mgmt import slice_mgmt_req, slice_mgmt_ind, slice_t, slice_mgmt_msg_type_SLICE_ALLOC_GET, slice_mgmt_msg_type_SLICE_ALLOC_SET
rlog_enabled = False
log_enabled = True
# create lock.
# This is used by "json_handler" and "app_handler" to ensure they use the resources safely.
app_lock = threading.Lock()
##########################################################################
# Define the state variables for the application
@dataclass
class AppStateVars:
logger: Logger
app: JrtcApp
device: str
initial_get_requested: bool
first_request_sent: bool
next_set_request_ts: dt.datetime
slice_allocation: slice_mgmt_ind
##########################################################################
def app_handler(timeout: bool, stream_idx: int, data_entry: struct_jrtc_router_data_entry, state: AppStateVars):
global rlog_enabled
global log_enabled
try:
with app_lock:
timestamp = dt.datetime.now(dt.timezone.utc).isoformat("T", "microseconds")
##########################################################################
# main part of function
if timeout:
## timeout processing
state.logger.process_timeout()
# on first entry, issue a SLICE_ALLOC_GET request
if state.initial_get_requested is False:
# send GET to the codelet
req = slice_mgmt_req()
req.msg_type = slice_mgmt_msg_type_SLICE_ALLOC_GET
req.has_set_req = False
# Convert to raw bytes
data_len = ctypes.sizeof(req)
data_to_send = ctypes.string_at(ctypes.byref(req), data_len)
state.logger.log_msg(True, False, "", "slice_mgmt_app: Sending SLICE_ALLOC_GET request")
# Send the SLICE_ALLOC_GET request to the codelet
res = jrtc_app_router_channel_send_input_msg(
state.app, SLICE_MGMT_REQ_SIDX, data_to_send, data_len
)
if res == 0:
state.initial_get_requested = True
# Has state.next_set_request_ts been reached.
# If so, reverse the allocations of slices 0 and 1
if (state.next_set_request_ts is not None) and (dt.datetime.utcnow() >= state.next_set_request_ts):
# state.slice_allocation should be set
if state.slice_allocation is None:
state.logger.log_msg(True, False, "", "slice_mgmt_app: ERROR: state.slice_allocation is None !!")
# only do this there are >=2 slices
if state.slice_allocation.slice_count >= 2:
# if this is the first request, set slice[0].min = 60, and set slice[1].min = 40
# if not first request, swap slice[0].min and slice[1].min
s0, s1 = state.slice_allocation.slice[0], state.slice_allocation.slice[1]
if not state.first_request_sent:
s0.min_prb_policy_ratio = 0
s0.max_prb_policy_ratio = params.slice_prb_percentage_low
s0.priority = 1
s1.min_prb_policy_ratio = 0
s1.max_prb_policy_ratio = params.slice_prb_percentage_high
s0.priority = 1
else:
for attr in ("min_prb_policy_ratio", "max_prb_policy_ratio", "priority"):
tmp = getattr(s0, attr)
setattr(s0, attr, getattr(s1, attr))
setattr(s1, attr, tmp)
state.logger.log_msg(True, False, "", f"slice_mgmt_app: Sending SLICE_ALLOC_SET: sfn {state.slice_allocation.sfn} slot_index {state.slice_allocation.slot_index} num custom slices {state.slice_allocation.slice_count}")
slices = list(state.slice_allocation.slice)
for i in range(state.slice_allocation.slice_count):
state.logger.log_msg(True, False, "", f"slice_mgmt_app: slice {i} : pci {slices[i].pci} nssai {slices[i].nssai.sst}/{slices[i].nssai.sd} min {slices[i].min_prb_policy_ratio} max {slices[i].max_prb_policy_ratio} priority {slices[i].priority}")
req = slice_mgmt_req()
req.msg_type = slice_mgmt_msg_type_SLICE_ALLOC_SET
req.has_set_req = True
req.set_req.has_sfn = (params.slice_update_sfn is not None)
if req.set_req.has_sfn:
req.set_req.sfn = params.slice_update_sfn
req.set_req.has_slot_index = (params.slice_update_slot_index is not None)
if req.set_req.has_slot_index:
req.set_req.slot_index = params.slice_update_slot_index
req.set_req.slice = state.slice_allocation.slice
req.set_req.slice_count = state.slice_allocation.slice_count
# Convert to raw bytes
data_len = ctypes.sizeof(req)
data_to_send = ctypes.string_at(ctypes.byref(req), data_len)
# Send the SLICE_ALLOC_SET request to the codelet
res = jrtc_app_router_channel_send_input_msg(
state.app, SLICE_MGMT_REQ_SIDX, data_to_send, data_len
)
state.first_request_sent = True
# clear next_set_request_ts. It will be re-set when the next indication is received
state.next_set_request_ts = None
else:
stream_id = data_entry.stream_id
deviceid = jrtc_router_stream_id_get_device_id(stream_id)
hostname = os.environ.get("HOSTNAME", "")
output = {}
# Check the stream index and process the data accordingly
#####################################################
### Ue contexts
if stream_idx == SLICE_MGMT_IND_SIDX:
state.logger.log_msg(True, False, "", "slice_mgmt_app: Received INDICATION")
data_ptr = ctypes.cast(
data_entry.data, ctypes.POINTER(slice_mgmt_ind)
)
data = data_ptr.contents
state.logger.log_msg(True, False, "", f"slice_mgmt_app: SLICE_MGMT_IND_SIDX: timestamp {data.timestamp} sfn {data.sfn} slot_index {data.slot_index} num custom slices {data.slice_count}")
slices = list(data.slice)
for i in range(data.slice_count):
state.logger.log_msg(True, False, "", f"slice_mgmt_app: slice {i} : pci {slices[i].pci} nssai {slices[i].nssai.sst}/{slices[i].nssai.sd} min {slices[i].min_prb_policy_ratio} max {slices[i].max_prb_policy_ratio} priority {slices[i].priority}")
# store the current slice allocation
state.slice_allocation = data
# trigger an update in <params.slice_update_periodicity_secs> seconds
state.next_set_request_ts = dt.datetime.utcnow() + dt.timedelta(seconds=params.slice_update_periodicity_secs)
if state.slice_allocation.slice_count >= 2:
state.logger.log_msg(True, False, "", f"slice_mgmt_app: Triggering a slice allocation update in {params.slice_update_periodicity_secs} seconds, "
"sfn/slot:"
f"{'Any' if (params.slice_update_sfn is None) else params.slice_update_sfn}/"
f"{'Any' if (params.slice_update_slot_index is None) else params.slice_update_slot_index}")
else:
state.logger.log_msg(True, False, "", f"slice_mgmt_app: Unknown stream index: {stream_idx}")
output = {
"stream_index": stream_idx,
"error": "Unknown stream index"
}
# Send the output to the dashboard
state.logger.log_msg(log_enabled, rlog_enabled, "Slice-Mgmt", f"{json.dumps(output)}")
except Exception as e:
print(f"app_handler: error: {e}", flush=True)
traceback.print_exc()
##########################################################################
# Main function to start the app (converted from jrtc_start_app)
def jrtc_start_app(capsule):
env_ctx = get_ctx_from_capsule(capsule)
if not env_ctx:
raise ValueError("Failed to retrieve JrtcAppEnv from capsule")
device_mapping = env_ctx.device_mapping
device = device_mapping[0].value.decode("utf-8")
print(f"Starting JRTC Slice Management app for device: {device}", flush=True)
global SLICE_MGMT_REQ_SIDX
global SLICE_MGMT_IND_SIDX
streams = []
la_workspace_id = os.environ.get("LA_WORKSPACE_ID", "")
la_primary_key = os.environ.get("LA_PRIMARY_KEY", "")
if (params.la_enabled is False) or la_workspace_id == "" or la_primary_key == "":
print("Log Analytics workspace ID or primary key not set. Using local logger only.", flush=True)
la_logger = None
else:
print("Log Analytics workspace ID and primary key are set. Will do remote logging to Log Analytics.", flush=True)
# Create the Log Analytics logger
la_logger = LaLogger(
LaLoggerConfig(
"slice_mgmt", # Log type
la_workspace_id,
la_primary_key,
params.la_msgs_per_batch,
params.la_bytes_per_batch,
params.la_tx_timeout_secs,
params.la_stats_period_secs
),
dbg=False
)
stream_id = "slice_mgmt"
stream_type = "slice_mgmt"
hostname = os.environ.get("HOSTNAME", "")
# Initialize the app
state = AppStateVars(
logger=Logger(device, hostname, stream_id, stream_type, remote_logger=la_logger),
app=None,
device=device,
initial_get_requested=False,
first_request_sent=False,
next_set_request_ts=None,
slice_allocation=None)
# if LA is configured and intitialised, send to LA, and not write to console.
# else, write to console
rlog_enabled = (la_logger is not None)
log_enabled = (not rlog_enabled)
#####################################################
### configure the streams
last_cnt = 0
streams.append(JrtcStreamCfg_t(
JrtcStreamIdCfg_t(
JRTC_ROUTER_REQ_DEST_NONE,
1,
b"slice_mgmt://jbpf_agent/slice_mgmt/slice_mgmt",
b"slice_request_map"),
False, # is_rx
None # No AppChannelCfg
))
SLICE_MGMT_REQ_SIDX = last_cnt
state.logger.log_msg(True, False, "", f"slice_mgmt_app: SLICE_MGMT_REQ_SIDX: {SLICE_MGMT_REQ_SIDX}")
last_cnt += 1
streams.append(JrtcStreamCfg_t(
JrtcStreamIdCfg_t(
JRTC_ROUTER_REQ_DEST_ANY,
JRTC_ROUTER_REQ_DEVICE_ID_ANY,
b"slice_mgmt://jbpf_agent/slice_mgmt/slice_mgmt",
b"slice_indication_map"),
True, # is_rx
None # No AppChannelCfg
))
SLICE_MGMT_IND_SIDX = last_cnt
state.logger.log_msg(True, False, "", f"slice_mgmt_app: SLICE_MGMT_IND_SIDX: {SLICE_MGMT_IND_SIDX}")
last_cnt += 1
app_cfg = JrtcAppCfg_t(
b"slice_mgmt", # context
100, # q_size
len(streams), # num_streams
(JrtcStreamCfg_t * len(streams))(*streams), # streams
10.0, # initialization_timeout_secs
0.25, # sleep_timeout_secs
2.0 # inactivity_timeout_secs
)
state.app = jrtc_app_create(capsule, app_cfg, app_handler, state)
state.logger.log_msg(True, True, "", f"slice_mgmt_app: Number of subscribed streams: {len(streams)}")
# run the app - This is blocking until the app exists
jrtc_app_run(state.app)
# clean up app resources
jrtc_app_destroy(state.app)