【A9】请求加密后的渗透测试方法

文摘   科技   2023-11-27 09:43   广东  

“A9 Team 甲方攻防团队,成员来自某证券、微步、青藤、长亭、安全狗等公司。成员能力涉及安全运营、威胁情报、攻防对抗、渗透测试、数据安全、安全产品开发等领域,持续分享安全运营和攻防的思考和实践。”


01

前言


在渗透测试过程中经常会遇到请求的数据被加密的情况,本篇介绍几个在前端加密情况下的渗透测试的方法。


02

jsEncrypter

一个由 c0ny1 大佬编写的用于前端加密 Fuzz 的 Burp Suite 插件。

项目地址:https://github.com/c0ny1/jsEncrypter


使用步骤如下:


1、定位加密方法


定位方法有很多,对前端做了混淆的可以尝试全局搜索 enc 来找加密函数,也可以尝试搜索请求参数然后调试找到加密函数。作为演示这里的加密函数如下:




该项目中提供了两种启动前端加密函数对数据进行加密,一个为 `phantomjs`,另一个是 `Nodejs`。在本例中,前端使用了 AES 的加密方法,需要导入第三方的 crypto-js.js,这要自行寻找。AES 加密在 python 中有一个现成的库,直接 `pip install pycryptodome` 安装即可使用,所以这里给大家提供使用 `python` 启动的方法。


2、编写加密方法


本例中 AES 的加解密的 python 脚本如下:


from Crypto.Cipher import AESfrom Crypto.Util.Padding import padfrom Crypto.Util.Padding import unpadimport base64import binascii

def AES_encrypt(plain_text, key, mode, iv="", output_type="base64"): """ AES 加密 :param output_type: 密文输出方式 - base64 / hex :param plain_text: 明文 :param key: 密钥 :param mode: 加密模式 :param iv: 偏移量 :return: 密文 """ # 填充字符 data = pad(data_to_pad=plain_text.encode(), block_size=AES.block_size)
# 字符串补位 if mode is AES.MODE_ECB: cipher = AES.new(key.encode(), mode) else: cipher = AES.new(key.encode(), mode, iv.encode()) encrypted_bytes = cipher.encrypt(data) # # 加密后得到的是 bytes 类型的数据,使用 Base64 进行编码, 返回 byte 字符串, 对 byte 字符串按 utf-8 进行解码 if output_type.lower() == "base64": crypto_text = str(base64.b64encode(encrypted_bytes), encoding='utf-8') elif output_type.lower() == "hex": crypto_text = binascii.b2a_hex(encrypted_bytes).decode() else: crypto_text = encrypted_bytes.decode() return crypto_text

def AES_decrypt(crypto_text, key, mode, iv="", input_type="base64"): """ AES 解密 :param input_type: 密文输出方式 - base64 / hex, 默认 base64 :param crypto_text: 密文 :param key: 密钥 :param mode: 解密模式 :param iv: 偏移量 :return: 明文 """ if mode is AES.MODE_ECB: cipher = AES.new(key.encode(), mode) else: cipher = AES.new(key.encode(), mode, iv.encode()) try: if input_type.lower() == "hex": data_byte = binascii.a2b_hex(crypto_text) else: data_byte = base64.b64decode(crypto_text) padded_data = cipher.decrypt(data_byte) plain_text = unpad(padded_data=padded_data, block_size=AES.block_size).decode()
return plain_text except Exception: return None


可以运行调试一下确认与前端加密结果是否一致





3、启动加密函数,与插件联动


JsEncrypt 与脚本的交流是通过 http 协议进行传输。在源码中,插件与脚本端口(默认 1664)连接时会发送一个 GET 请求,要求脚本返回 200 的响应码,以及一串指定的字符:hello




加密前的明文 Payload 则是通过 POST 请求发送 payload 字段给脚本



综合以上,与 jsEncrypter 联动的 python 脚本如下:


#!/usr/bin/env python# -*- coding: utf-8 -*-# @Author : Fa11en丶R0ck# @File : jsEncrypter.py# @Description: jsEncrypter
from flask import Flask, request, make_responsefrom Crypto.Cipher import AESfrom Crypto.Util.Padding import padfrom Crypto.Util.Padding import unpadimport base64import binasciiimport sysimport logging
app = Flask(__name__)log = logging.getLogger('werkzeug')log.disabled = True
@app.route('/', methods=['POST', 'GET'])def get_connect(): if request.method == 'POST': # 加密 payload = request.form['payload'] plain_text = payload
crypt_text = do_encrypt(plain_text) print(str(plain_text) + "::" + crypt_text, file=sys.stderr) response = make_response(crypt_text, 200) return response else: response = make_response('hello', 200) return response

def do_encrypt(plain_text): return AES_encrypt(plain_text=plain_text, key="MpWsyseHtJywNON8", iv="", mode=AES.MODE_ECB, output_type="hex")

def AES_encrypt(plain_text, key, mode, iv="", output_type="base64"): """ AES 加密 :param output_type: 密文输出方式 - base64 / hex :param plain_text: 明文 :param key: 密钥 :param mode: 加密模式 :param iv: 偏移量 :return: 密文 """ # 填充字符 data = pad(data_to_pad=plain_text.encode(), block_size=AES.block_size)
# 字符串补位 if mode is AES.MODE_ECB: cipher = AES.new(key.encode(), mode) else: cipher = AES.new(key.encode(), mode, iv.encode()) encrypted_bytes = cipher.encrypt(data) # 加密后得到的是 bytes 类型的数据,需要进行编码 if output_type.lower() == "base64": crypto_text = str(base64.b64encode(encrypted_bytes), encoding='utf-8') elif output_type.lower() == "hex": crypto_text = binascii.b2a_hex(encrypted_bytes).decode() else: crypto_text = encrypted_bytes.decode() return crypto_text

def AES_decrypt(crypto_text, key, mode, iv="", input_type="base64"): """ AES 解密 :param input_type: 密文输出方式 - base64 / hex, 默认 base64 :param crypto_text: 密文 :param key: 密钥 :param mode: 解密模式 :param iv: 偏移量 :return: 明文 """ if mode is AES.MODE_ECB: cipher = AES.new(key.encode(), mode) else: cipher = AES.new(key.encode(), mode, iv.encode()) try: if input_type.lower() == "hex": data_byte = binascii.a2b_hex(crypto_text) else: data_byte = base64.b64decode(crypto_text) padded_data = cipher.decrypt(data_byte) plain_text = unpad(padded_data=padded_data, block_size=AES.block_size).decode()
return plain_text except Exception: return None

if __name__ == '__main__': app.run(debug=False, host='0.0.0.0', port=1664)




可以看到插件成功将明文加密成密文。


4、后续利用


后续利用就是可绕过前端加密直接进行暴力破解






03

autoDecoder

项目地址:https://github.com/f0ng/autoDecoder

autoDecoder 处理密文的流程如下:




autoDecoder 处理明文的流程如下:





简单演示:


某站登录请求如下







请求数据被全部加密了,从 js 中定位到加密方法




04

自带算法加解密


算法为 AES/CBC/PKCS7Padding 算法,key 为 j**************2,iv 为 0**************8。autoDecoder 中支持 AES 算法,可使用“自带算法加解密”






配置完后,可以直接对请求包解密


可以用直接发送明文,会自动调用插件加密发给服务器





可在 Logger 或 Logger++ 中查看实际请求数据




05

接口加解密

当插件自带算法不支持时(比如 3DES、SM4 等),可调用外部接口进行加解密

加解密接口代码大致如下:


@app.route('/encode', methods=["POST"])def encrypt():    param = request.form.get('dataBody').strip("\n")  # 获取  post 参数    encryptData = json.loads(param)['encryptData']    # print("[*] encrypt : " + param)    encrypt_param = AES_encrypt(plain_text=encryptData, key="j**************2", iv="0**************8", mode=AES.MODE_CBC, padding=pkcs7_padding, output_type='hex').upper()    if encrypt_param is not None:        encrypt_param = quote(encrypt_param)        # print("{} : {}".format(param, encrypt_param))        return {'encryptData': encrypt_param}    else:        return "Encrypt Warning !!!"

@app.route('/decode', methods=["POST"])def decrypt(): param = request.form.get('dataBody').strip("\n") # 获取 post 参数 if "encryptData" in param: encryptData = json.loads(param)['encryptData'] # print("[*] decrypt : " + param) decryptData = AES_decrypt(crypto_text=encryptData, key="j**************2", iv="0**************8", mode=AES.MODE_CBC, padding=pkcs7_unpadding, input_type='hex') if decryptData is not None: # print("{} : {}".format(encryptData, decryptData)) return {'encryptData': decryptData} else: return "Decrypt Warning !!!" else:        return param





同样可以实现请求数据自动解密





也可以发明文让插件自动加密





06

JSRPC

JSRPC(JavaScript Remote Procedure Call)是一种基于 JavaScript 的远程过程调用协议,用于实现客户端和服务器之间的通信和函数调用。它允许开发人员在客户端 JavaScript 代码中调用远程服务器上的函数,以便获取数据、执行操作或获取服务。

优点:可以让我们直接调用浏览器环境下的 js 加密或解密函数,免去了扣加密逻辑的时间,和避免很多本地用 node 去执行 js 所出现的各种问题,比如说环境缺失等问题。

原理:在网站的控制台新建一个 WebSocket 客户端链接到服务器通信,调用服务器的接口。服务器会发送信息给客户端,客户端接收到要执行的方法执行完 js 代码后把获得想要的内容发回给服务器,服务器接收到后再显示出来。简单图示如下:




07

JSRPC 简单实现

1、设置全局加密函数


通过浏览器的“保存并覆盖”本地替换功能,将加密方法设置为全局变量


if (!window._pwd_Encrypt) window._pwd_Encrypt = Object(f.h);if (!window._postdata_Encrypt) window._postdata_Encrypt = Object(f.c);


当运行到 _login 方法时就会将加密方法注册为全局变量,控制台验证一下是否设置成功


2、建立 websocket 服务端和 Web 服务器


思路如下:


Web 服务负责接收明文,通过 websocket 发送给浏览器获取密文,再通过 Web 返回给前端。

import loggingimport asyncioimport threadingimport websocketsfrom flask import Flask, request, jsonify
app = Flask(__name__)log = logging.getLogger('werkzeug')log.disabled = Trueclients = set()message_queue = asyncio.Queue()loop = asyncio.get_event_loop()

async def handler(websocket, path): # 添加新连接到客户端集合 clients.add(websocket) try: # 处理消息 async for message in websocket: # print(f"Received message: {message}") await message_queue.put(message) print("end") finally: # 从客户端集合中移除断开的连接 clients.remove(websocket)

def start_server(host, port): start_server = websockets.serve(handler, host, port) print(f"WebSocket server started at ws://{host}:{port}") loop.run_until_complete(start_server) loop.run_forever()

async def send_data_to_clients(message): response_text = "" # 发送消息给所有连接的客户端 if len(clients) > 0: for client in clients: if client.open: await client.send(message) # response_text = await client.recv() response_text = await message_queue.get() # print("加密结果:", response_text) print(f"[Enc] {message} : {response_text}") else: print("连接已经关闭") else: response_text = "请先建立 Websocket 连接!" return response_text

def start_web(): # 启动 Flask 应用 print("Http server started at http://0.0.0.0:1664") app.run(debug=False, host='0.0.0.0', port=1664)

@app.route('/send', methods=['GET'])def send_message(): global loop asyncio.set_event_loop(loop) payload = request.values.get('payload') # print("[send_message] " + payload) result = asyncio.run_coroutine_threadsafe(send_data_to_clients(payload), loop).result(timeout=10) # print("[send_message] " + result) return jsonify({"response": result})

# 示例用法if __name__ == "__main__": web_thread = threading.Thread(target=start_web, args=()) web_thread.start() start_server("localhost", 5678) web_thread.join()

3、连接 Websocket 服务


先运行以上程序,再运行以下代码 JS 代码与 Python 程序建立一个 WebSocket 链接。

var ws = new WebSocket("ws://127.0.0.1:5678");    ws.onmessage = function (msg) {    console.log("[+] 收到消息:" + msg.data);    if (msg.data == "exit") {        console.log("[+] WebSocket closed!");        ws.send("exit")        ws.close();    } else {        ws.send(window._pwd_Encrypt(msg.data));    }}







4、发送明文加密


访问 http://<Your_IP>/send?payload=123456,即可得到 123456 的密文



通过调试可以看到密文与 JS 加密的结果一致。后续利用就和上面 jsEncrypt 和 autoDecoder 一样联动 Burp 即可。

原理啥的可以看官网,这里直接说怎么使用。


1、启动 Sekiro 服务


启动脚本下载地址:https://oss.iinti.cn/sekiro/sekiro-demo(推荐下 sekiro-open-demo-20230704,最新版本测试时未成功建立 Websocket 连接,不知什么问题)


$ bash bin/sekiro.sh

备注:默认 Sekiro 服务开启端口是 5612,需要修改编辑 `conf/config.properties` 即可


2、油猴(浏览器插件)创建以下脚本:


// ==UserScript==// @name         SekiroClient for JSRPC// @version      0.1// @description  注入 SekiroClient// @author       Anonymous ( I Don't want to tell you:) )// @match        *://*/*// @icon         https://sekiro.iinti.cn/favicon.ico// @grant        unsafewindow// @run-at       document-start// ==/UserScript==
(function () { "use strict"; console.log("[*] SekiroClient for JSRPC loaded...");
unsafeWindow.SekiroClient = function (e) { if (((this.wsURL = e), (this.handlers = {}), (this.socket = {}), !e)) throw new Error("wsURL can not be empty!!"); (this.webSocketFactory = this.resolveWebSocketFactory()), this.connect(); }; (unsafeWindow.SekiroClient.prototype.resolveWebSocketFactory = function () { if ("object" == typeof window) { var e = window.WebSocket ? window.WebSocket : window.MozWebSocket; return function (o) { function t(o) { this.mSocket = new e(o); } return ( (t.prototype.close = function () { this.mSocket.close(); }), (t.prototype.onmessage = function (e) { this.mSocket.onmessage = e; }), (t.prototype.onopen = function (e) { this.mSocket.onopen = e; }), (t.prototype.onclose = function (e) { this.mSocket.onclose = e; }), (t.prototype.send = function (e) { this.mSocket.send(e); }), new t(o) ); }; } if ("object" == typeof weex) try { console.log("test webSocket for weex"); var o = weex.requireModule("webSocket"); return ( console.log("find webSocket for weex:" + o), function (e) { try { o.close(); } catch (e) {} return o.WebSocket(e, ""), o; } ); } catch (e) { console.log(e); } if ("object" == typeof WebSocket) return function (o) { return new e(o); }; throw new Error("the js environment do not support websocket"); }), (unsafeWindow.SekiroClient.prototype.connect = function () { console.log("sekiro: begin of connect to wsURL: " + this.wsURL); var e = this; try { this.socket = this.webSocketFactory(this.wsURL); } catch (o) { return ( console.log("sekiro: create connection failed,reconnect after 2s:" + o), void setTimeout(function () { e.connect(); }, 2e3) ); } this.socket.onmessage(function (o) { e.handleSekiroRequest(o.data); }), this.socket.onopen(function (e) { console.log("sekiro: open a sekiro client connection"); }), this.socket.onclose(function (o) { console.log("sekiro: disconnected ,reconnection after 2s"), setTimeout(function () { e.connect(); }, 2e3); }); }), (unsafeWindow.SekiroClient.prototype.handleSekiroRequest = function (e) { console.log("receive sekiro request: " + e); var o = JSON.parse(e), t = o.__sekiro_seq__; if (o.action) { var n = o.action; if (this.handlers[n]) { var s = this.handlers[n], i = this; try { s( o, function (e) { try { i.sendSuccess(t, e); } catch (e) { i.sendFailed(t, "e:" + e); } }, function (e) { i.sendFailed(t, e); } ); } catch (e) { console.log("error: " + e), i.sendFailed(t, ":" + e); } } else this.sendFailed(t, "no action handler: " + n + " defined"); } else this.sendFailed(t, "need request param {action}"); }), (unsafeWindow.SekiroClient.prototype.sendSuccess = function (e, o) { var t; if ("string" == typeof o) try { t = JSON.parse(o); } catch (e) { (t = {}).data = o; } else "object" == typeof o ? (t = o) : ((t = {}).data = o); (Array.isArray(t) || "string" == typeof t) && (t = { data: t, code: 0 }), t.code ? (t.code = 0) : (t.status, (t.status = 0)), (t.__sekiro_seq__ = e); var n = JSON.stringify(t); console.log("response :" + n), this.socket.send(n); }), (unsafeWindow.SekiroClient.prototype.sendFailed = function (e, o) { "string" != typeof o && (o = JSON.stringify(o)); var t = {}; (t.message = o), (t.status = -1), (t.__sekiro_seq__ = e); var n = JSON.stringify(t); console.log("sekiro: response :" + n), this.socket.send(n); }), (unsafeWindow.SekiroClient.prototype.registerAction = function (e, o) { if ("string" != typeof e) throw new Error("an action must be string"); if ("function" != typeof o) throw new Error("a handler must be function"); return console.log("sekiro: register action: " + e), (this.handlers[e] = o), this; });})();


成功后刷新页面会在浏览器控制台显示如下信息:



3、控制台运行以下代码,注册加密方法 encrypt

var client = new SekiroClient("ws://127.0.0.1:5612/business/register?group=test&clientId=" + Math.random());client.registerAction("encrypt", function (request, resolve, reject) {    try {        var data = request["data"];        resolve(window._pwd_Encrypt(data)); // 这里修改自己的加密方法,这里演示用 window._pwd_Encrypt    } catch (e) {        reject("[encrypt] error: " + e);    }});


4、访问以下 URL 获取加密结果


http://127.0.0.1:5612/business-demo/invoke?group=test&action=encrypt&data=admin123


说明:


  • `group`:第三步创建 SekiroClient 对象时定义的 `group`

  •  `action`:第三步中注册的 action

  •  `data`:想加密的明文




08

参考链接

[1]https://cloud.tencent.com/developer/article/2021745

[2]https://mp.weixin.qq.com/s/hvJHgmV-0oEt992fnTTzBg

[3]https://blog.csdn.net/lilongsy/article/details/127862693

[4]http://sekiro.iinti.cn/sekiro-doc/










A9 Team
A9 Team 甲方攻防团队,成员来自某证券、微步、青藤、长亭、安全狗等公司。成员能力涉及安全运营、威胁情报、攻防对抗、渗透测试、数据安全、安全产品开发等领域,持续分享安全运营和攻防的思考和实践,期望和朋友们共同进步,守望相助,合作共赢。
 最新文章