Skip to content

Commit 3b58f99

Browse files
Dom Del Nanocopybaranaut
authored andcommitted
Add mux_frame_type_name UDF to ease developing mux pxl scripts
Summary: This adds a mux_frame_type_name UDF function that will be used in #607. It seems there is prior art for mysql, amqp and other protocols. Closes #609. Test Plan: New unit tests pass Reviewers: zasgar, philkuz Reviewed By: philkuz Signed-off-by: Dom Del Nano <ddelnano@twitter.com> Differential Revision: https://phab.corp.pixielabs.ai/D12340 GitOrigin-RevId: b1b54121dbf80f84fbadd646ffe8cde205286564
1 parent 05df293 commit 3b58f99

4 files changed

Lines changed: 99 additions & 0 deletions

File tree

src/carnot/funcs/protocols/mux.h

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2018- The Pixie Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
*/
18+
#pragma once
19+
20+
#include <string>
21+
22+
namespace px {
23+
namespace carnot {
24+
namespace funcs {
25+
namespace protocols {
26+
namespace mux {
27+
28+
inline std::string FrameTypeName(int frame_type) {
29+
switch (frame_type) {
30+
case 1:
31+
return "Treq";
32+
case -1:
33+
return "Rreq";
34+
case 2:
35+
return "Tdispatch";
36+
case -2:
37+
return "Rdispatch";
38+
case 64:
39+
return "Tdrain";
40+
case -64:
41+
return "Rdrain";
42+
case 65:
43+
return "Tping";
44+
case -65:
45+
return "Rping";
46+
case 66:
47+
return "Tdiscarded";
48+
case -66:
49+
return "Rdiscarded";
50+
case 67:
51+
return "Tlease";
52+
case 68:
53+
return "Tinit";
54+
case -68:
55+
return "Rinit";
56+
case -128:
57+
return "Rerr";
58+
case 127:
59+
return "Rerr (legacy)";
60+
case -62:
61+
return "Tdiscarded (legacy)";
62+
default:
63+
return absl::Substitute("Unknown ($0)", frame_type);
64+
}
65+
}
66+
67+
} // namespace mux
68+
69+
} // namespace protocols
70+
} // namespace funcs
71+
} // namespace carnot
72+
} // namespace px

src/carnot/funcs/protocols/protocol_ops.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "src/carnot/funcs/protocols/amqp.h"
2222
#include "src/carnot/funcs/protocols/http.h"
2323
#include "src/carnot/funcs/protocols/kafka.h"
24+
#include "src/carnot/funcs/protocols/mux.h"
2425
#include "src/carnot/funcs/protocols/mysql.h"
2526
#include "src/carnot/funcs/protocols/protocols.h"
2627
#include "src/carnot/udf/registry.h"
@@ -42,6 +43,7 @@ void RegisterProtocolOpsOrDie(px::carnot::udf::Registry* registry) {
4243
registry->RegisterOrDie<MySQLCommandNameUDF>("mysql_command_name");
4344
registry->RegisterOrDie<AMQPFrameTypeUDF>("amqp_frame_type_name");
4445
registry->RegisterOrDie<AMQPMethodTypeUDF>("amqp_method_name");
46+
registry->RegisterOrDie<MuxFrameTypeUDF>("mux_frame_type_name");
4547
}
4648

4749
types::StringValue ProtocolNameUDF::Exec(FunctionContext*, Int64Value protocol) {
@@ -69,6 +71,10 @@ types::StringValue MySQLCommandNameUDF::Exec(FunctionContext*, Int64Value api_ke
6971
return mysql::CommandName(api_key.val);
7072
}
7173

74+
types::StringValue MuxFrameTypeUDF::Exec(FunctionContext*, Int64Value frame_type) {
75+
return mux::FrameTypeName(frame_type.val);
76+
}
77+
7278
} // namespace protocols
7379
} // namespace funcs
7480
} // namespace carnot

src/carnot/funcs/protocols/protocol_ops.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,19 @@ class AMQPMethodTypeUDF : public px::carnot::udf::ScalarUDF {
107107
}
108108
};
109109

110+
class MuxFrameTypeUDF : public px::carnot::udf::ScalarUDF {
111+
public:
112+
StringValue Exec(FunctionContext*, Int64Value frame_type);
113+
114+
static udf::ScalarUDFDocBuilder Doc() {
115+
return udf::ScalarUDFDocBuilder("Convert a Mux frame type to its name.")
116+
.Details("UDF to convert Mux frame type into their corresponding human-readable names.")
117+
.Arg("frame_type", "A Mux frame_type in numeric value")
118+
.Example("df.frame_type_name = px.mux_frame_type_name(df.req_cmd)")
119+
.Returns("The mux frame type name.");
120+
}
121+
};
122+
110123
void RegisterProtocolOpsOrDie(px::carnot::udf::Registry* registry);
111124

112125
} // namespace protocols

src/carnot/funcs/protocols/protocol_ops_test.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,14 @@ TEST(ProtocolOps, AMQPMethodTypeUDF) {
6767
udf_tester.ForInput(60, 40).Expect("BasicPublish");
6868
}
6969

70+
TEST(ProtocolOps, MuxFrameTypeUDF) {
71+
auto udf_tester = udf::UDFTester<MuxFrameTypeUDF>();
72+
udf_tester.ForInput(1).Expect("Treq");
73+
udf_tester.ForInput(-1).Expect("Rreq");
74+
udf_tester.ForInput(68).Expect("Tinit");
75+
udf_tester.ForInput(-68).Expect("Rinit");
76+
}
77+
7078
} // namespace protocols
7179
} // namespace funcs
7280
} // namespace carnot

0 commit comments

Comments
 (0)