共计 4959 个字符,预计需要花费 13 分钟才能阅读完成。
浸透测试之红包激励模块逻辑破绽测试总结
下图是本次 App 逻辑破绽测试集体总结的领导准则和测试方法,领导准则前面对应的是发现的破绽,有些破绽形容做了脱敏泛化解决,依据这些破绽根本能够反推出测试用例。这些是自己认为 App 逻辑破绽测试应该重点关注的,可能不全,如有补充欢送探讨(其余 sql 注入、XSS 等其余类型破绽不是本次测试的重点,测试的对象 App,另外看了后端代码都是用的 orm 框架个别也没有什么问题)。
背景
公司不同 App 产品线的红包激励模块曾好几次被破解,呈现过几次针对我司产品的 autojs 破解协定 apk,自己作为风控部门业余做逆向的技术人员,曾逆向剖析过这些黑产 app,大部分都是抓包获取用户 token 后间接发包获取处分,绕过看广告、做工作等环节。
而本次工作的次要目标就是主动出击,对公司外围产品的红包激励模块进行逻辑破绽测试,看是否有容易被薅羊毛的的逻辑破绽,对发现的破绽督促业务线进行整改。尽管自己始终做的逆向方向的,但也长期关注平安相干的其余畛域,对浸透测试的办法和工具等也是近朱者赤; 近墨者黑。本文次要是做自己本次所做浸透的测试简要总结。
浸透测试要点总结
浸透测试和代码审计相辅相成
通常须要编写 burpsuite 插件实现主动 sign 计算,这样在批改参数重放或并发测试时不须要关怀 sign 计算问题,再评估完 sign 算法爱护强度后,能够间接看代码算法实现,而不须要齐全逆向还原能够进步测试效率。
很多破绽,如并发破绽等是能够通过代码审计一眼看进去的,不须要齐全黑盒测试,这也能够极大进步测试效率。当然也有因为对代码逻辑不熟或代码逻辑过于简单,导致看代码还不是间接黑盒测试来的快得状况。集体感觉两者是相辅相成的。
竞争 / 并发破绽
服务端对客户端申请都是并发解决的,而且很多服务端还有负载平衡多节点解决,这样当对数据库某字段进行加减操作(如红包发放)如果没有加锁就会导致竞争 / 并发破绽,分布式锁个别用数据库锁或 redis 锁。
做好提现最初关卡审核
本次测试的一个重点就是看是否齐全脱离 app 间接通过发包就能够实现提现的,教训证在有提现微信账号 openid 的前提下是能够全程脱离 APP 通过发包实现提现 1 元到账的。但微信的 openid 是跟 app 绑定的,不同 app 对应的 openid 不同,须要获取微信的 openid,须要调⽤微信受权登录接⼝来获取,并且调⽤时会校验调⽤⽅ app 的签名,若 app 的签名与后盾配置的签名不⼀致,⽆法胜利调⽤,试过各种签名坑骗未失效,预计不是在 app 内校验签名的而是在微信里校验的,⽬前 openid 是抓包获取的。
设想一下如果能够齐全脱离 app 就能够全程发包获取处分并提现(如领取号手机号间接提现),那么即便限度了每个账号的提现次数那也是很恐怖的。比方开发一个在线的薅羊毛工具,而后到处流传,每个人只有扫个码或者填个手机号就能够提现一元,流传量大也很恐怖。
解决办法就是当时在 app 内做好各个用户行为的埋点数据和用户环境检测数据等,而后在提现重要关卡验证这些数据是否失常,不失常就回绝提现。
burpsuite 脚本
对于如何应用 burpsuite 来进行测试,看官网帮忙文档或网上材料根本就能学会这边就不开展细说,附上解决过 burpsuite 插件脚本供参考。
# -*- coding: utf-8 -*-
from burp import IBurpExtender
from burp import IHttpListener
from burp import IRequestInfo
from burp import IParameter
from burp import IBurpExtenderCallbacks
from java.io import PrintWriter
import base64
from hashlib import md5
import time
HOST_FROM = "yy.xx.com"
# https://portswigger.net/burp/extender/api/index.html
class BurpExtender(IBurpExtender, IHttpListener):
#
# implement IBurpExtender
#
def registerExtenderCallbacks(self, callbacks):
# obtain an extension helpers object
self._helpers = callbacks.getHelpers()
self._stdout = PrintWriter(callbacks.getStdout(), True)
# set our extension name
callbacks.setExtensionName("zz resign plugin")
# register ourselves as an HTTP listener
callbacks.registerHttpListener(self)
#
# implement IHttpListener
#
def processHttpMessage(self, toolFlag, messageIsRequest, messageInfo):
# only process requests
if not messageIsRequest:
return
# get the HTTP service for the request
httpService = messageInfo.getHttpService()
# only process the host that is HOST_FROM
if (HOST_FROM != httpService.getHost()):
return
# self._stdout.println(toolFlag)
# self._stdout.println(IBurpExtenderCallbacks.TOOL_REPEATER)
if (IBurpExtenderCallbacks.TOOL_REPEATER != toolFlag) or (IBurpExtenderCallbacks.TOOL_INTRUDER != toolFlag):
return
requestInfo = self._helpers.analyzeRequest(messageInfo)
path = requestInfo.getUrl().getPath()
self._stdout.println("path=%s" % path)
if not path.startswith("/xx/"):
return
self.resign(messageInfo, check_sign=False)
def resign(self, messageInfo, check_sign=False):
requestInfo = self._helpers.analyzeRequest(messageInfo)
method = requestInfo.getMethod()
# self._stdout.println("method=%s" % method)
path = requestInfo.getUrl().getPath()
# self._stdout.println("path=%s" % path)
parameters = requestInfo.getParameters()
body_offset = requestInfo.getBodyOffset()
url_param_dist = {}
for param in parameters:
if IParameter.PARAM_URL == param.getType():
k = param.getName()
v = param.getValue()
v = self._helpers.urlDecode(v) # 须要转化一下有些 % 号没有 decode,签名算法是谬误的
self._stdout.println("%s=%s" % (k, v))
# ...
request = messageInfo.getRequest()
body = request[body_offset:]
body = self._helpers.bytesToString(body)
self._stdout.println("body=%s" % body)
s = '{}-{}'.format(method, path)
# params
old_sign = ""
new_time_sign = None
new_ts = str(int(time.time()))
if not check_sign:
url_param_dist["_ts"] = new_ts #更新工夫戳
if "XX" in url_param_dist:
s = str(url_param_dist["XX"]) + time.strftime('%Y%m%d')
new_time_sign = md5(s.encode()).hexdigest()
url_param_dist["sign"] = new_time_sign #更新 sign
arguments = url_param_dist
keys = list(arguments.keys())
keys.sort()
for k in keys:
if k == '_sign':
old_sign = arguments[k]
continue
val = arguments[k]
if type(val) == str or type(val) == unicode or type(val) == int:
s += '&{}={}'.format(k, val)
elif type(val) == list:
for v in val:
s += '&{}={}'.format(k, v)
else:
self._stdout.println('params unknown type:{}, key:{}'.format(type(val), k))
return
# json body
s += '&json={}'.format(body or '')
digest = md5(s.encode()).hexdigest()
new_sign = base64.urlsafe_b64encode(digest)[:-2]
# self._stdout.println('new_sign:{}, old_sign:{}'.format(new_sign, old_sign))
request = self._helpers.urlDecode(request)
if not check_sign:
new_request = self._helpers.updateParameter(request, self._helpers.buildParameter("ts", new_ts, IParameter.PARAM_URL))
new_request = self._helpers.updateParameter(new_request, self._helpers.buildParameter("_sign", new_sign, IParameter.PARAM_URL))
if new_time_sign:
new_request = self._helpers.updateParameter(new_request, self._helpers.buildParameter("sign", new_time_sign, IParameter.PARAM_URL))
# self._stdout.println(self._helpers.bytesToString(new_request))
messageInfo.setRequest(new_request)
else:
if new_sign != old_sign:
self._stdout.println("error! new_sign != old_sign")
else:
self._stdout.println("success! new_sign == old_sign")
# messageInfo.setHttpService(self._helpers.buildHttpService(HOST_TO,
# httpService.getPort(), httpService.getProtocol()))