Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions crates/openshell-supervisor-network/src/l7/jsonrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ use crate::l7::provider::{L7Provider, L7Request};

pub const DEFAULT_MAX_BODY_BYTES: usize = 64 * 1024;

/// Stable classification reason for a top-level array on an MCP endpoint.
pub(crate) const UNSUPPORTED_MCP_BATCH_REASON: &str = "unsupported_mcp_batch";

/// Preserve stable classifier reasons while contextualizing ordinary JSON-RPC parse failures.
pub(crate) fn jsonrpc_parse_rejection_reason(error: &str) -> String {
if error == UNSUPPORTED_MCP_BATCH_REASON {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to special-case UNSUPPORTED_MCP_BATCH_REASON, but I don't think this is a good pattern. I would prefer it if the jsonrpc_info was the only source of this info.

return error.to_string();
}
format!("JSON-RPC request rejected: {error}")
}

/// Selects whether the parser should treat a JSON-RPC message as generic
/// JSON-RPC 2.0 or as an MCP message with MCP method/params validation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -177,6 +188,17 @@ pub fn parse_jsonrpc_body_with_options(
};

if let serde_json::Value::Array(items) = value {
// Streamable HTTP carries one MCP message per POST. Reject the array
// itself before parsing members so every batch shape fails uniformly.
if inspection_options.mode == JsonRpcInspectionMode::Mcp {
return JsonRpcRequestInfo {
calls: Vec::new(),
is_batch: true,
receive_stream: false,
has_response: false,
error: Some(UNSUPPORTED_MCP_BATCH_REASON.to_string()),
};
}
if items.is_empty() {
return JsonRpcRequestInfo {
calls: Vec::new(),
Expand Down Expand Up @@ -491,6 +513,64 @@ mod tests {
assert_eq!(call.params.len(), 1);
}

#[test]
fn mcp_batch_rejects_every_top_level_array_shape() {
let batches: &[&[u8]] = &[
br"[]",
br#"[{"jsonrpc":"2.0","id":1,"method":"ping"}]"#,
br#"[{"jsonrpc":"2.0","method":"notifications/initialized"}]"#,
br#"[{"jsonrpc":"2.0","id":1,"result":{}}]"#,
br#"[{"jsonrpc":"2.0","id":1,"method":"ping"},{"jsonrpc":"2.0","id":1,"result":{}}]"#,
];

for body in batches {
let info = parse_jsonrpc_body(body, JsonRpcInspectionMode::Mcp);
assert!(info.calls.is_empty(), "batch calls escaped: {info:?}");
assert!(
info.is_batch,
"array was not classified as a batch: {info:?}"
);
assert!(!info.has_response, "batch members were inspected: {info:?}");
assert_eq!(info.error.as_deref(), Some(UNSUPPORTED_MCP_BATCH_REASON));
}
}

#[test]
fn mcp_batch_rejection_preserves_single_mcp_and_generic_batch_controls() {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this test would have failed before this change, and it seems to duplicate parses_valid_batch_without_error. I would remove it.

let singles: &[(&[u8], Option<&str>, bool)] = &[
(
br#"{"jsonrpc":"2.0","id":1,"method":"ping"}"#,
Some("ping"),
false,
),
(
br#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#,
Some("notifications/initialized"),
false,
),
(br#"{"jsonrpc":"2.0","id":1,"result":{}}"#, None, true),
];

for (body, expected_method, expected_response) in singles {
let mcp = parse_jsonrpc_body(body, JsonRpcInspectionMode::Mcp);
assert!(mcp.error.is_none(), "single MCP message failed: {mcp:?}");
assert!(!mcp.is_batch);
assert_eq!(
mcp.calls.first().map(|call| call.method.as_str()),
*expected_method
);
assert_eq!(mcp.has_response, *expected_response);
}

let generic = parse_jsonrpc_body(
br#"[{"jsonrpc":"2.0","id":1,"method":"reports.list"},{"jsonrpc":"2.0","method":"reports.observed"}]"#,
JsonRpcInspectionMode::JsonRpc,
);
assert!(generic.error.is_none(), "generic batch failed: {generic:?}");
assert!(generic.is_batch);
assert_eq!(generic.calls.len(), 2);
}

#[test]
fn mcp_mode_rejects_non_recommended_tool_names_by_default() {
let body = br#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"read status","arguments":{}}}"#;
Expand Down
177 changes: 175 additions & 2 deletions crates/openshell-supervisor-network/src/l7/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ where
jsonrpc_info
.as_ref()
.and_then(|info| info.error.as_deref())
.map(|error| format!("JSON-RPC request rejected: {error}"))
.map(crate::l7::jsonrpc::jsonrpc_parse_rejection_reason)
});
let force_deny = parse_error_reason.is_some();
let (allowed, reason) = if let Some(reason) = parse_error_reason {
Expand Down Expand Up @@ -1082,7 +1082,7 @@ where
let parse_error_reason = jsonrpc_info
.error
.as_deref()
.map(|e| format!("JSON-RPC request rejected: {e}"));
.map(crate::l7::jsonrpc::jsonrpc_parse_rejection_reason);
let response_frame_reason =
jsonrpc_response_frame_hard_deny_reason(config.protocol, &jsonrpc_info);
let force_deny = parse_error_reason.is_some() || response_frame_reason.is_some();
Expand Down Expand Up @@ -3307,6 +3307,121 @@ network_policies:
);
}

#[derive(Debug, Clone, Copy)]
enum McpBatchEntryPath {
Direct,
RouteSelected,
}

async fn assert_mcp_batch_denied(
body: &[u8],
entry_path: McpBatchEntryPath,
enforcement: EnforcementMode,
) {
let (mut config, tunnel_engine, ctx) = mcp_test_relay_context();
config.enforcement = enforcement;
let (mut app, mut relay_client) = tokio::io::duplex(8192);
let (mut relay_upstream, mut upstream) = tokio::io::duplex(8192);
let relay = tokio::spawn(async move {
match entry_path {
McpBatchEntryPath::Direct => {
relay_with_inspection(
&config,
tunnel_engine,
&mut relay_client,
&mut relay_upstream,
&ctx,
)
.await
}
McpBatchEntryPath::RouteSelected => {
let configs = [config];
relay_with_route_selection(
&configs,
tunnel_engine,
&mut relay_client,
&mut relay_upstream,
&ctx,
)
.await
}
}
});

let request = format!(
"POST /mcp HTTP/1.1\r\nHost: mcp.example.test:8000\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
body.len()
);
app.write_all(request.as_bytes()).await.unwrap();
app.write_all(body).await.unwrap();

let mut response = Vec::new();
tokio::time::timeout(
std::time::Duration::from_secs(1),
app.read_to_end(&mut response),
)
.await
.expect("relay should close after denying an MCP batch")
.unwrap();
let response = String::from_utf8(response).expect("denial response should be UTF-8");
assert!(
response.starts_with("HTTP/1.1 403 Forbidden\r\n"),
"{entry_path:?}/{enforcement:?} returned an unexpected response: {response:?}"
);
let (_, body) = response
.split_once("\r\n\r\n")
.expect("denial response should contain a body");
let body: serde_json::Value =
serde_json::from_str(body).expect("denial body should be JSON");
assert_eq!(
body.get("detail").and_then(serde_json::Value::as_str),
Some(crate::l7::jsonrpc::UNSUPPORTED_MCP_BATCH_REASON),
"{entry_path:?}/{enforcement:?} changed the stable denial detail"
);

tokio::time::timeout(std::time::Duration::from_secs(1), relay)
.await
.expect("relay should finish after denying an MCP batch")
.unwrap()
.unwrap();
let mut byte = [0u8; 1];
let read =
tokio::time::timeout(std::time::Duration::from_secs(1), upstream.read(&mut byte))
.await
.expect("upstream side should close")
.unwrap();
assert_eq!(
read, 0,
"{entry_path:?}/{enforcement:?} forwarded MCP batch bytes upstream"
);
}

#[tokio::test]
async fn mcp_rejects_jsonrpc_batches() {
let batches: &[&[u8]] = &[
br"[]",
br#"[{"jsonrpc":"2.0","id":1,"method":"ping"}]"#,
br#"[{"jsonrpc":"2.0","method":"notifications/initialized"}]"#,
br#"[{"jsonrpc":"2.0","id":1,"result":{}}]"#,
br#"[{"jsonrpc":"2.0","id":1,"method":"ping"},{"jsonrpc":"2.0","id":1,"result":{}}]"#,
];

for body in batches {
assert_mcp_batch_denied(body, McpBatchEntryPath::Direct, EnforcementMode::Enforce)
.await;
}
}

#[tokio::test]
async fn mcp_batch_entry_path_matrix() {
let body = br#"[{"jsonrpc":"2.0","id":1,"method":"ping"}]"#;
for entry_path in [McpBatchEntryPath::Direct, McpBatchEntryPath::RouteSelected] {
for enforcement in [EnforcementMode::Audit, EnforcementMode::Enforce] {
assert_mcp_batch_denied(body, entry_path, enforcement).await;
}
}
}

#[tokio::test]
async fn jsonrpc_relay_forwards_allowed_method() {
let (config, tunnel_engine, ctx) = jsonrpc_test_relay_context();
Expand Down Expand Up @@ -3373,6 +3488,64 @@ network_policies:
.unwrap();
}

#[tokio::test]
async fn jsonrpc_relay_forwards_allowed_batch_separately_from_mcp() {
let (config, tunnel_engine, ctx) = jsonrpc_test_relay_context();
let (mut app, mut relay_client) = tokio::io::duplex(8192);
let (mut relay_upstream, mut upstream) = tokio::io::duplex(8192);
let relay = tokio::spawn(async move {
relay_with_inspection(
&config,
tunnel_engine,
&mut relay_client,
&mut relay_upstream,
&ctx,
)
.await
});

let body = br#"[{"jsonrpc":"2.0","id":1,"method":"initialize"},{"jsonrpc":"2.0","method":"initialize"}]"#;
let request = format!(
"POST /rpc HTTP/1.1\r\nHost: jsonrpc.example.test:8000\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
body.len()
);
app.write_all(request.as_bytes()).await.unwrap();
app.write_all(body).await.unwrap();

let mut upstream_bytes = Vec::new();
let mut upstream_buf = [0u8; 1024];
while !upstream_bytes.ends_with(body) {
let read = tokio::time::timeout(
std::time::Duration::from_secs(1),
upstream.read(&mut upstream_buf),
)
.await
.expect("allowed generic JSON-RPC batch should reach upstream")
.unwrap();
assert_ne!(read, 0, "upstream closed before the full batch arrived");
upstream_bytes.extend_from_slice(&upstream_buf[..read]);
}

upstream
.write_all(b"HTTP/1.1 204 No Content\r\nContent-Length: 0\r\nConnection: close\r\n\r\n")
.await
.unwrap();

let mut response = [0u8; 512];
let read = tokio::time::timeout(std::time::Duration::from_secs(1), app.read(&mut response))
.await
.expect("generic JSON-RPC batch response should reach client")
.unwrap();
assert!(String::from_utf8_lossy(&response[..read]).contains("204 No Content"));

drop(app);
tokio::time::timeout(std::time::Duration::from_secs(1), relay)
.await
.expect("relay should complete")
.unwrap()
.unwrap();
}

#[tokio::test]
async fn mcp_relay_forwards_jsonrpc_response_frame() {
let (config, tunnel_engine, ctx) = mcp_test_relay_context();
Expand Down
61 changes: 54 additions & 7 deletions crates/openshell-supervisor-network/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,14 +467,30 @@ fn forward_l7_hard_deny_reason(
request_info.jsonrpc.as_ref().and_then(|info| {
info.error
.as_deref()
.map(|error| format!("JSON-RPC request rejected: {error}"))
.map(crate::l7::jsonrpc::jsonrpc_parse_rejection_reason)
.or_else(|| {
crate::l7::relay::jsonrpc_response_frame_hard_deny_reason(protocol, info)
})
})
})
}

fn forward_l7_denial_detail(
force_deny: bool,
method: &str,
host: &str,
port: u16,
path: &str,
reason: &str,
) -> String {
// Stable classifier reasons are machine-readable API details. Policy and
// ordinary parse denials retain request context for human diagnostics.
if force_deny && reason == crate::l7::jsonrpc::UNSUPPORTED_MCP_BATCH_REASON {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another special-casing of UNSUPPORTED_MCP_BATCH_REASON which I think could be resolved through better design.

return reason.to_string();
}
format!("{method} {host}:{port}{path} denied by L7 policy: {reason}")
}

/// Emit a denial event to the aggregator channel (if configured).
/// Used by `handle_tcp_connection` which owns `Option<Sender>`.
fn emit_denial(
Expand Down Expand Up @@ -3791,14 +3807,11 @@ async fn handle_forward_proxy(
&reason,
"forward-l7-deny",
);
let detail =
forward_l7_denial_detail(force_deny, method, &host_lc, port, &path, &reason);
respond(
client,
&build_json_error_response(
403,
"Forbidden",
"policy_denied",
&format!("{method} {host_lc}:{port}{path} denied by L7 policy: {reason}"),
),
&build_json_error_response(403, "Forbidden", "policy_denied", &detail),
)
.await?;
return Ok(());
Expand Down Expand Up @@ -4577,6 +4590,40 @@ network_policies:
);
}

#[test]
fn forward_mcp_batch_denial_preserves_stable_detail() {
let jsonrpc = crate::l7::jsonrpc::parse_jsonrpc_body(
br#"[{"jsonrpc":"2.0","id":1,"method":"ping"}]"#,
crate::l7::jsonrpc::JsonRpcInspectionMode::Mcp,
);
let request_info = crate::l7::L7RequestInfo {
action: "POST".to_string(),
target: "/mcp".to_string(),
query_params: std::collections::HashMap::new(),
graphql: None,
jsonrpc: Some(jsonrpc),
};
let reason = forward_l7_hard_deny_reason(crate::l7::L7Protocol::Mcp, &request_info)
.expect("MCP batch should be a forward-proxy hard denial");
let detail =
forward_l7_denial_detail(true, "POST", "mcp.example.test", 8000, "/mcp", &reason);
assert_eq!(detail, crate::l7::jsonrpc::UNSUPPORTED_MCP_BATCH_REASON);

let response = build_json_error_response(403, "Forbidden", "policy_denied", &detail);
let response = String::from_utf8(response).expect("denial response should be UTF-8");
assert!(response.starts_with("HTTP/1.1 403 Forbidden\r\n"));
let (_, body) = response
.split_once("\r\n\r\n")
.expect("denial response should contain a body");
let body: serde_json::Value =
serde_json::from_str(body).expect("denial body should be JSON");
assert_eq!(body["error"], "policy_denied");
assert_eq!(
body["detail"],
crate::l7::jsonrpc::UNSUPPORTED_MCP_BATCH_REASON
);
}

#[test]
fn forward_l7_hard_deny_reason_includes_jsonrpc_response_frames() {
let request_info = crate::l7::L7RequestInfo {
Expand Down
Loading
Loading