Files
remote-volume-monitor/Test/Scripts/test_rdp_detection.py
Agent 45e7d9553a Initial commit - 按新规范整理目录结构
- Code/: 源代码、配置文件、文档、工具
- Releases/: 发布包(v1.0)
- Test/: 测试用例和测试脚本
2026-03-20 06:54:40 +08:00

230 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
RDP 连接检测测试工具
用于诊断 RDP 检测问题
"""
import os
import subprocess
import sys
def print_header(title):
print("\n" + "=" * 60)
print(f" {title}")
print("=" * 60)
def check_environment_variables():
"""检查环境变量"""
print_header("1. 环境变量检查")
vars_to_check = [
'SESSIONNAME',
'USERNAME',
'USERDOMAIN',
'COMPUTERNAME',
]
for var in vars_to_check:
value = os.environ.get(var, '(未设置)')
print(f" {var}: {value}")
session_name = os.environ.get('SESSIONNAME', '')
if session_name.startswith('RDP'):
print(f"\n ✓ SESSIONNAME 以 RDP 开头,检测到远程会话")
return True
else:
print(f"\n ⚠ SESSIONNAME 不以 RDP 开头")
return False
def check_query_user():
"""检查 query user 命令输出"""
print_header("2. query user 命令检查")
try:
result = subprocess.run(
['query', 'user'],
capture_output=True,
text=True,
shell=True,
timeout=5
)
print(f" 返回码:{result.returncode}")
print(f"\n 标准输出:")
for line in result.stdout.split('\n'):
print(f" {line}")
if result.stderr:
print(f"\n 错误输出:")
for line in result.stderr.split('\n'):
print(f" {line}")
# 分析输出
output_lower = result.stdout.lower()
print(f"\n 分析结果:")
# 检查 RDP/TCP 关键字
if 'rdp' in output_lower or 'tcp' in output_lower:
print(f" ✓ 包含 'rdp''tcp' 关键字")
# 逐行检查
for line in result.stdout.strip().split('\n'):
line_lower = line.lower()
if 'rdp' in line_lower or 'tcp' in line_lower:
if 'active' in line_lower:
print(f" ✓ 检测到活跃的 RDP/TCP 会话:{line.strip()}")
elif '>' in line:
print(f" ✓ 当前会话是 RDP/TCP{line.strip()}")
else:
print(f" ⚠ 未包含 'rdp''tcp' 关键字")
# 检查会话数量
lines = [l for l in result.stdout.strip().split('\n') if l.strip() and not l.startswith(' ')]
if len(lines) > 1:
print(f" ⚠ 检测到 {len(lines)-1} 个会话(可能有多用户)")
return True
except FileNotFoundError:
print(f" ✗ query 命令不存在(仅在 Windows 上可用)")
return False
except Exception as e:
print(f" ✗ 执行失败:{e}")
return False
def check_registry():
"""检查注册表"""
print_header("3. 注册表检查")
try:
import winreg
# 检查 Terminal Server 设置
try:
key = winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE,
r"SYSTEM\CurrentControlSet\Control\Terminal Server"
)
try:
val, _ = winreg.QueryValueEx(key, "fDenyTSConnections")
if val == 0:
print(f" ✓ 终端服务已启用")
else:
print(f" ⚠ 终端服务被禁用")
except:
print(f" ⚠ 无法读取 fDenyTSConnections")
winreg.CloseKey(key)
except Exception as e:
print(f" ⚠ Terminal Server 键值访问失败:{e}")
# 检查当前会话
try:
key = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Volatile Environment"
)
print(f" ✓ 当前用户环境键可访问")
winreg.CloseKey(key)
except:
print(f" ⚠ 当前用户环境键访问失败")
return True
except ImportError:
print(f" ⚠ winreg 模块不可用(非 Windows 系统?)")
return False
except Exception as e:
print(f" ✗ 检查失败:{e}")
return False
def check_network():
"""检查网络连接"""
print_header("4. 网络连接检查")
try:
result = subprocess.run(
['netstat', '-an'],
capture_output=True,
text=True,
shell=True,
timeout=5
)
output_lower = result.stdout.lower()
# 检查 RDP 端口 3389
if '3389' in output_lower:
print(f" ✓ 检测到 RDP 端口 (3389) 活动")
# 统计连接数
lines = output_lower.split('\n')
rdp_connections = [l for l in lines if '3389' in l and 'established' in l]
if rdp_connections:
print(f" ✓ 发现 {len(rdp_connections)} 个 RDP 连接:")
for conn in rdp_connections[:5]: # 最多显示 5 个
print(f" {conn.strip()}")
else:
print(f" ⚠ 未检测到 RDP 端口 (3389) 活动")
return True
except Exception as e:
print(f" ✗ 检查失败:{e}")
return False
def main():
print("\n")
print("" + "" * 58 + "")
print("" + " " * 15 + "RDP 连接检测诊断工具" + " " * 15 + "")
print("" + "" * 58 + "")
print(f"\n 计算机名:{os.environ.get('COMPUTERNAME', 'Unknown')}")
print(f" 用户名:{os.environ.get('USERNAME', 'Unknown')}")
print(f" 时间:{subprocess.run(['date'], capture_output=True, text=True, shell=True).stdout.strip()}")
# 执行各项检查
env_result = check_environment_variables()
query_result = check_query_user()
registry_result = check_registry()
network_result = check_network()
# 总结
print_header("诊断总结")
if env_result:
print(" ✓ 环境变量检测到 RDP 会话")
print("\n 建议:程序应该能检测到 RDP 连接")
elif query_result:
print(" ⚠ 环境变量未检测到,但 query user 可能有信息")
print("\n 建议:检查 query user 输出中的 RDP/TCP 关键字")
else:
print(" ✗ 未检测到 RDP 会话特征")
print("\n 可能原因:")
print(" 1. 当前是本地登录,不是 RDP 远程连接")
print(" 2. RDP 连接已断开")
print(" 3. 终端服务被禁用")
print(" 4. 使用了其他远程工具(如 TeamViewer、AnyDesk")
print("\n 测试完成!")
print("\n")
# 返回结果
if env_result or query_result:
sys.exit(0) # 检测到 RDP
else:
sys.exit(1) # 未检测到 RDP
if __name__ == '__main__':
main()