“A9 Team 甲方攻防团队,成员来自某证券、微步、青藤、长亭、安全狗等公司。成员能力涉及安全运营、威胁情报、攻防对抗、渗透测试、数据安全、安全产品开发等领域,持续分享安全运营和攻防的思考和实践。”
01
—
前言
一个由 c0ny1 大佬编写的用于前端加密 Fuzz 的 Burp Suite 插件。
项目地址:
https://github.com/c0ny1/jsEncrypter
使用步骤如下:
1、定位加密方法
2、编写加密方法
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from Crypto.Util.Padding import unpad
import base64
import binascii
def AES_encrypt(plain_text, key, mode, iv="", output_type="base64"):
"""
AES 加密
:param output_type: 密文输出方式 - base64 / hex
:param plain_text: 明文
:param key: 密钥
:param mode: 加密模式
:param iv: 偏移量
:return: 密文
"""
# 填充字符
data = pad(data_to_pad=plain_text.encode(), block_size=AES.block_size)
# 字符串补位
if mode is AES.MODE_ECB:
cipher = AES.new(key.encode(), mode)
else:
cipher = AES.new(key.encode(), mode, iv.encode())
encrypted_bytes = cipher.encrypt(data)
# # 加密后得到的是 bytes 类型的数据,使用 Base64 进行编码, 返回 byte 字符串, 对 byte 字符串按 utf-8 进行解码
if output_type.lower() == "base64":
crypto_text = str(base64.b64encode(encrypted_bytes), encoding='utf-8')
elif output_type.lower() == "hex":
crypto_text = binascii.b2a_hex(encrypted_bytes).decode()
else:
crypto_text = encrypted_bytes.decode()
return crypto_text
def AES_decrypt(crypto_text, key, mode, iv="", input_type="base64"):
"""
AES 解密
:param input_type: 密文输出方式 - base64 / hex, 默认 base64
:param crypto_text: 密文
:param key: 密钥
:param mode: 解密模式
:param iv: 偏移量
:return: 明文
"""
if mode is AES.MODE_ECB:
cipher = AES.new(key.encode(), mode)
else:
cipher = AES.new(key.encode(), mode, iv.encode())
try:
if input_type.lower() == "hex":
data_byte = binascii.a2b_hex(crypto_text)
else:
data_byte = base64.b64decode(crypto_text)
padded_data = cipher.decrypt(data_byte)
plain_text = unpad(padded_data=padded_data, block_size=AES.block_size).decode()
return plain_text
except Exception:
return None
可以运行调试一下确认与前端加密结果是否一致
3、启动加密函数,与插件联动
综合以上,与 jsEncrypter 联动的 python 脚本如下:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : Fa11en丶R0ck
# @File : jsEncrypter.py
# @Description: jsEncrypter
from flask import Flask, request, make_response
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from Crypto.Util.Padding import unpad
import base64
import binascii
import sys
import logging
app = Flask(__name__)
log = logging.getLogger('werkzeug')
log.disabled = True
def get_connect():
if request.method == 'POST':
# 加密
payload = request.form['payload']
plain_text = payload
crypt_text = do_encrypt(plain_text)
print(str(plain_text) + "::" + crypt_text, file=sys.stderr)
response = make_response(crypt_text, 200)
return response
else:
response = make_response('hello', 200)
return response
def do_encrypt(plain_text):
return AES_encrypt(plain_text=plain_text, key="MpWsyseHtJywNON8", iv="", mode=AES.MODE_ECB, output_type="hex")
def AES_encrypt(plain_text, key, mode, iv="", output_type="base64"):
"""
AES 加密
:param output_type: 密文输出方式 - base64 / hex
:param plain_text: 明文
:param key: 密钥
:param mode: 加密模式
:param iv: 偏移量
:return: 密文
"""
# 填充字符
data = pad(data_to_pad=plain_text.encode(), block_size=AES.block_size)
# 字符串补位
if mode is AES.MODE_ECB:
cipher = AES.new(key.encode(), mode)
else:
cipher = AES.new(key.encode(), mode, iv.encode())
encrypted_bytes = cipher.encrypt(data)
# 加密后得到的是 bytes 类型的数据,需要进行编码
if output_type.lower() == "base64":
crypto_text = str(base64.b64encode(encrypted_bytes), encoding='utf-8')
elif output_type.lower() == "hex":
crypto_text = binascii.b2a_hex(encrypted_bytes).decode()
else:
crypto_text = encrypted_bytes.decode()
return crypto_text
def AES_decrypt(crypto_text, key, mode, iv="", input_type="base64"):
"""
AES 解密
:param input_type: 密文输出方式 - base64 / hex, 默认 base64
:param crypto_text: 密文
:param key: 密钥
:param mode: 解密模式
:param iv: 偏移量
:return: 明文
"""
if mode is AES.MODE_ECB:
cipher = AES.new(key.encode(), mode)
else:
cipher = AES.new(key.encode(), mode, iv.encode())
try:
if input_type.lower() == "hex":
data_byte = binascii.a2b_hex(crypto_text)
else:
data_byte = base64.b64decode(crypto_text)
padded_data = cipher.decrypt(data_byte)
plain_text = unpad(padded_data=padded_data, block_size=AES.block_size).decode()
return plain_text
except Exception:
return None
if __name__ == '__main__':
app.run(debug=False, host='0.0.0.0', port=1664)
可以看到插件成功将明文加密成密文。
4、后续利用
后续利用就是可绕过前端加密直接进行暴力破解
autoDecoder
项目地址:https://github.com/f0ng/autoDecoder
autoDecoder 处理密文的流程如下:
autoDecoder 处理明文的流程如下:
简单演示:
某站登录请求如下
请求数据被全部加密了,从 js 中定位到加密方法
自带算法加解密
可以用直接发送明文,会自动调用插件加密发给服务器
可在 Logger 或 Logger++ 中查看实际请求数据
接口加解密
加解密接口代码大致如下:
def encrypt():
param = request.form.get('dataBody').strip("\n") # 获取 post 参数
encryptData = json.loads(param)['encryptData']
# print("[*] encrypt : " + param)
encrypt_param = AES_encrypt(plain_text=encryptData, key="j**************2", iv="0**************8", mode=AES.MODE_CBC, padding=pkcs7_padding, output_type='hex').upper()
if encrypt_param is not None:
encrypt_param = quote(encrypt_param)
# print("{} : {}".format(param, encrypt_param))
return {'encryptData': encrypt_param}
else:
return "Encrypt Warning !!!"
def decrypt():
param = request.form.get('dataBody').strip("\n") # 获取 post 参数
if "encryptData" in param:
encryptData = json.loads(param)['encryptData']
# print("[*] decrypt : " + param)
decryptData = AES_decrypt(crypto_text=encryptData, key="j**************2", iv="0**************8", mode=AES.MODE_CBC, padding=pkcs7_unpadding, input_type='hex')
if decryptData is not None:
# print("{} : {}".format(encryptData, decryptData))
return {'encryptData': decryptData}
else:
return "Decrypt Warning !!!"
else:
return param
同样可以实现请求数据自动解密
也可以发明文让插件自动加密
JSRPC
JSRPC 简单实现
1、设置全局加密函数
通过浏览器的“保存并覆盖”本地替换功能,将加密方法设置为全局变量。
if (!window._pwd_Encrypt) window._pwd_Encrypt = Object(f.h);
if (!window._postdata_Encrypt) window._postdata_Encrypt = Object(f.c);
2、建立 websocket 服务端和 Web 服务器
思路如下:
import logging
import asyncio
import threading
import websockets
from flask import Flask, request, jsonify
app = Flask(__name__)
log = logging.getLogger('werkzeug')
log.disabled = True
clients = set()
message_queue = asyncio.Queue()
loop = asyncio.get_event_loop()
async def handler(websocket, path):
# 添加新连接到客户端集合
clients.add(websocket)
try:
# 处理消息
async for message in websocket:
# print(f"Received message: {message}")
await message_queue.put(message)
print("end")
finally:
# 从客户端集合中移除断开的连接
clients.remove(websocket)
def start_server(host, port):
start_server = websockets.serve(handler, host, port)
print(f"WebSocket server started at ws://{host}:{port}")
loop.run_until_complete(start_server)
loop.run_forever()
async def send_data_to_clients(message):
response_text = ""
# 发送消息给所有连接的客户端
if len(clients) > 0:
for client in clients:
if client.open:
await client.send(message)
# response_text = await client.recv()
response_text = await message_queue.get()
# print("加密结果:", response_text)
print(f"[Enc] {message} : {response_text}")
else:
print("连接已经关闭")
else:
response_text = "请先建立 Websocket 连接!"
return response_text
def start_web():
# 启动 Flask 应用
print("Http server started at http://0.0.0.0:1664")
app.run(debug=False, host='0.0.0.0', port=1664)
def send_message():
global loop
asyncio.set_event_loop(loop)
payload = request.values.get('payload')
# print("[send_message] " + payload)
result = asyncio.run_coroutine_threadsafe(send_data_to_clients(payload), loop).result(timeout=10)
# print("[send_message] " + result)
return jsonify({"response": result})
# 示例用法
if __name__ == "__main__":
web_thread = threading.Thread(target=start_web, args=())
web_thread.start()
start_server("localhost", 5678)
web_thread.join()
3、连接 Websocket 服务
var ws = new WebSocket("ws://127.0.0.1:5678");
ws.onmessage = function (msg) {
console.log("[+] 收到消息:" + msg.data);
if (msg.data == "exit") {
console.log("[+] WebSocket closed!");
ws.send("exit")
ws.close();
} else {
ws.send(window._pwd_Encrypt(msg.data));
}
}
4、发送明文加密
访问 http://<Your_IP>/send?payload=123456,即可得到 123456 的密文
原理啥的可以看官网,这里直接说怎么使用。
1、启动 Sekiro 服务
启动脚本下载地址:https://oss.iinti.cn/sekiro/sekiro-demo(推荐下 sekiro-open-demo-20230704,最新版本测试时未成功建立 Websocket 连接,不知什么问题)
bash bin/sekiro.sh
备注:默认 Sekiro 服务开启端口是 5612,需要修改编辑 `conf/config.properties` 即可
2、油猴(浏览器插件)创建以下脚本:
// ==UserScript==
// @name SekiroClient for JSRPC
// @version 0.1
// @description 注入 SekiroClient
// @author Anonymous ( I Don't want to tell you:) )
// @match *://*/*
// @icon https://sekiro.iinti.cn/favicon.ico
// @grant unsafewindow
// @run-at document-start
// ==/UserScript==
(function () {
;
console.log("[*] SekiroClient for JSRPC loaded...");
unsafeWindow.SekiroClient = function (e) {
if (((this.wsURL = e), (this.handlers = {}), (this.socket = {}), !e)) throw new Error("wsURL can not be empty!!");
(this.webSocketFactory = this.resolveWebSocketFactory()), this.connect();
};
(unsafeWindow.SekiroClient.prototype.resolveWebSocketFactory = function () {
if ("object" == typeof window) {
var e = window.WebSocket ? window.WebSocket : window.MozWebSocket;
return function (o) {
function t(o) {
this.mSocket = new e(o);
}
return (
(t.prototype.close = function () {
this.mSocket.close();
}),
(t.prototype.onmessage = function (e) {
this.mSocket.onmessage = e;
}),
(t.prototype.onopen = function (e) {
this.mSocket.onopen = e;
}),
(t.prototype.onclose = function (e) {
this.mSocket.onclose = e;
}),
(t.prototype.send = function (e) {
this.mSocket.send(e);
}),
new t(o)
);
};
}
if ("object" == typeof weex)
try {
console.log("test webSocket for weex");
var o = weex.requireModule("webSocket");
return (
console.log("find webSocket for weex:" + o),
function (e) {
try {
o.close();
} catch (e) {}
return o.WebSocket(e, ""), o;
}
);
} catch (e) {
console.log(e);
}
if ("object" == typeof WebSocket)
return function (o) {
return new e(o);
};
throw new Error("the js environment do not support websocket");
}),
(unsafeWindow.SekiroClient.prototype.connect = function () {
console.log("sekiro: begin of connect to wsURL: " + this.wsURL);
var e = this;
try {
this.socket = this.webSocketFactory(this.wsURL);
} catch (o) {
return (
console.log("sekiro: create connection failed,reconnect after 2s:" + o),
void setTimeout(function () {
e.connect();
}, 2e3)
);
}
this.socket.onmessage(function (o) {
e.handleSekiroRequest(o.data);
}),
this.socket.onopen(function (e) {
console.log("sekiro: open a sekiro client connection");
}),
this.socket.onclose(function (o) {
console.log("sekiro: disconnected ,reconnection after 2s"),
setTimeout(function () {
e.connect();
}, 2e3);
});
}),
(unsafeWindow.SekiroClient.prototype.handleSekiroRequest = function (e) {
console.log("receive sekiro request: " + e);
var o = JSON.parse(e),
t = o.__sekiro_seq__;
if (o.action) {
var n = o.action;
if (this.handlers[n]) {
var s = this.handlers[n],
i = this;
try {
s(
o,
function (e) {
try {
i.sendSuccess(t, e);
} catch (e) {
i.sendFailed(t, "e:" + e);
}
},
function (e) {
i.sendFailed(t, e);
}
);
} catch (e) {
console.log("error: " + e), i.sendFailed(t, ":" + e);
}
} else this.sendFailed(t, "no action handler: " + n + " defined");
} else this.sendFailed(t, "need request param {action}");
}),
(unsafeWindow.SekiroClient.prototype.sendSuccess = function (e, o) {
var t;
if ("string" == typeof o)
try {
t = JSON.parse(o);
} catch (e) {
(t = {}).data = o;
}
else "object" == typeof o ? (t = o) : ((t = {}).data = o);
(Array.isArray(t) || "string" == typeof t) && (t = { data: t, code: 0 }), t.code ? (t.code = 0) : (t.status, (t.status = 0)), (t.__sekiro_seq__ = e);
var n = JSON.stringify(t);
console.log("response :" + n), this.socket.send(n);
}),
(unsafeWindow.SekiroClient.prototype.sendFailed = function (e, o) {
"string" != typeof o && (o = JSON.stringify(o));
var t = {};
(t.message = o), (t.status = -1), (t.__sekiro_seq__ = e);
var n = JSON.stringify(t);
console.log("sekiro: response :" + n), this.socket.send(n);
}),
(unsafeWindow.SekiroClient.prototype.registerAction = function (e, o) {
if ("string" != typeof e) throw new Error("an action must be string");
if ("function" != typeof o) throw new Error("a handler must be function");
return console.log("sekiro: register action: " + e), (this.handlers[e] = o), this;
});
})();
成功后刷新页面会在浏览器控制台显示如下信息:
var client = new SekiroClient("ws://127.0.0.1:5612/business/register?group=test&clientId=" + Math.random());
client.registerAction("encrypt", function (request, resolve, reject) {
try {
var data = request["data"];
resolve(window._pwd_Encrypt(data)); // 这里修改自己的加密方法,这里演示用 window._pwd_Encrypt
} catch (e) {
reject("[encrypt] error: " + e);
}
});
4、访问以下 URL 获取加密结果
http://127.0.0.1:5612/business-demo/invoke?group=test&action=encrypt&data=admin123
说明:
`group`:第三步创建 SekiroClient 对象时定义的 `group`
`action`:第三步中注册的 action
`data`:想加密的明文
参考链接
[1]https://cloud.tencent.com/developer/article/2021745
[2]https://mp.weixin.qq.com/s/hvJHgmV-0oEt992fnTTzBg
[3]https://blog.csdn.net/lilongsy/article/details/127862693
[4]http://sekiro.iinti.cn/sekiro-doc/