浸透测试之红包激励模块逻辑破绽测试总结
下图是本次App逻辑破绽测试集体总结的领导准则和测试方法,领导准则前面对应的是发现的破绽,有些破绽形容做了脱敏泛化解决,依据这些破绽根本能够反推出测试用例。这些是自己认为App逻辑破绽测试应该重点关注的,可能不全,如有补充欢送探讨(其余sql注入、XSS等其余类型破绽不是本次测试的重点,测试的对象App,另外看了后端代码都是用的orm框架个别也没有什么问题)。
背景
公司不同App产品线的红包激励模块曾好几次被破解,呈现过几次针对我司产品的autojs破解协定apk,自己作为风控部门业余做逆向的技术人员,曾逆向剖析过这些黑产app,大部分都是抓包获取用户token后间接发包获取处分,绕过看广告、做工作等环节。
而本次工作的次要目标就是主动出击,对公司外围产品的红包激励模块进行逻辑破绽测试,看是否有容易被薅羊毛的的逻辑破绽,对发现的破绽督促业务线进行整改。尽管自己始终做的逆向方向的,但也长期关注平安相干的其余畛域,对浸透测试的办法和工具等也是近朱者赤;近墨者黑。本文次要是做自己本次所做浸透的测试简要总结。
浸透测试要点总结
浸透测试和代码审计相辅相成
通常须要编写burpsuite插件实现主动sign计算,这样在批改参数重放或并发测试时不须要关怀sign计算问题,再评估完sign算法爱护强度后,能够间接看代码算法实现,而不须要齐全逆向还原能够进步测试效率。
很多破绽,如并发破绽等是能够通过代码审计一眼看进去的,不须要齐全黑盒测试,这也能够极大进步测试效率。当然也有因为对代码逻辑不熟或代码逻辑过于简单,导致看代码还不是间接黑盒测试来的快得状况。集体感觉两者是相辅相成的。
竞争/并发破绽
服务端对客户端申请都是并发解决的,而且很多服务端还有负载平衡多节点解决,这样当对数据库某字段进行加减操作(如红包发放)如果没有加锁就会导致竞争/并发破绽,分布式锁个别用数据库锁或redis锁。
做好提现最初关卡审核
本次测试的一个重点就是看是否齐全脱离app间接通过发包就能够实现提现的,教训证在有提现微信账号openid的前提下是能够全程脱离APP通过发包实现提现1元到账的。但微信的openid是跟app绑定的,不同app对应的openid不同,须要获取微信的openid,须要调⽤微信受权登录接⼝来获取,并且调⽤时会校验调⽤⽅app的签名,若app的签名与后盾配置的签名不⼀致,⽆法胜利调⽤,试过各种签名坑骗未失效,预计不是在app内校验签名的而是在微信里校验的,⽬前openid是抓包获取的。
设想一下如果能够齐全脱离app就能够全程发包获取处分并提现(如领取号手机号间接提现),那么即便限度了每个账号的提现次数那也是很恐怖的。比方开发一个在线的薅羊毛工具,而后到处流传,每个人只有扫个码或者填个手机号就能够提现一元,流传量大也很恐怖。
解决办法就是当时在app内做好各个用户行为的埋点数据和用户环境检测数据等,而后在提现重要关卡验证这些数据是否失常,不失常就回绝提现。
burpsuite脚本
对于如何应用burpsuite来进行测试,看官网帮忙文档或网上材料根本就能学会这边就不开展细说,附上解决过burpsuite插件脚本供参考。
# -*- coding: utf-8 -*-from burp import IBurpExtenderfrom burp import IHttpListenerfrom burp import IRequestInfofrom burp import IParameterfrom burp import IBurpExtenderCallbacksfrom java.io import PrintWriterimport base64from hashlib import md5import timeHOST_FROM = "yy.xx.com"# https://portswigger.net/burp/extender/api/index.htmlclass BurpExtender(IBurpExtender, IHttpListener): # # implement IBurpExtender # def registerExtenderCallbacks(self, callbacks): # obtain an extension helpers object self._helpers = callbacks.getHelpers() self._stdout = PrintWriter(callbacks.getStdout(), True) # set our extension name callbacks.setExtensionName("zz resign plugin") # register ourselves as an HTTP listener callbacks.registerHttpListener(self) # # implement IHttpListener # def processHttpMessage(self, toolFlag, messageIsRequest, messageInfo): # only process requests if not messageIsRequest: return # get the HTTP service for the request httpService = messageInfo.getHttpService() # only process the host that is HOST_FROM if (HOST_FROM != httpService.getHost()): return # self._stdout.println(toolFlag) # self._stdout.println(IBurpExtenderCallbacks.TOOL_REPEATER) if (IBurpExtenderCallbacks.TOOL_REPEATER != toolFlag) or (IBurpExtenderCallbacks.TOOL_INTRUDER != toolFlag): return requestInfo = self._helpers.analyzeRequest(messageInfo) path = requestInfo.getUrl().getPath() self._stdout.println("path=%s" % path) if not path.startswith("/xx/"): return self.resign(messageInfo, check_sign=False) def resign(self, messageInfo, check_sign=False): requestInfo = self._helpers.analyzeRequest(messageInfo) method = requestInfo.getMethod() # self._stdout.println("method=%s" % method) path = requestInfo.getUrl().getPath() # self._stdout.println("path=%s" % path) parameters = requestInfo.getParameters() body_offset = requestInfo.getBodyOffset() url_param_dist = {} for param in parameters: if IParameter.PARAM_URL == param.getType(): k = param.getName() v = param.getValue() v = self._helpers.urlDecode(v) # 须要转化一下有些%号没有decode,签名算法是谬误的 self._stdout.println("%s=%s" % (k, v)) # ... request = messageInfo.getRequest() body = request[body_offset:] body = self._helpers.bytesToString(body) self._stdout.println("body=%s" % body) s = '{}-{}'.format(method, path) # params old_sign = "" new_time_sign = None new_ts = str(int(time.time())) if not check_sign: url_param_dist["_ts"] = new_ts #更新工夫戳 if "XX" in url_param_dist: s = str(url_param_dist["XX"]) + time.strftime('%Y%m%d') new_time_sign = md5(s.encode()).hexdigest() url_param_dist["sign"] = new_time_sign #更新sign arguments = url_param_dist keys = list(arguments.keys()) keys.sort() for k in keys: if k == '_sign': old_sign = arguments[k] continue val = arguments[k] if type(val) == str or type(val) == unicode or type(val) == int: s += '&{}={}'.format(k, val) elif type(val) == list: for v in val: s += '&{}={}'.format(k, v) else: self._stdout.println('params unknown type:{}, key:{}'.format(type(val), k)) return # json body s += '&json={}'.format(body or '') digest = md5(s.encode()).hexdigest() new_sign = base64.urlsafe_b64encode(digest)[:-2] # self._stdout.println('new_sign:{}, old_sign:{}'.format(new_sign, old_sign)) request = self._helpers.urlDecode(request) if not check_sign: new_request = self._helpers.updateParameter(request, self._helpers.buildParameter("ts", new_ts, IParameter.PARAM_URL)) new_request = self._helpers.updateParameter(new_request, self._helpers.buildParameter("_sign", new_sign, IParameter.PARAM_URL)) if new_time_sign: new_request = self._helpers.updateParameter(new_request, self._helpers.buildParameter("sign", new_time_sign, IParameter.PARAM_URL)) # self._stdout.println(self._helpers.bytesToString(new_request)) messageInfo.setRequest(new_request) else: if new_sign != old_sign: self._stdout.println("error! new_sign != old_sign") else: self._stdout.println("success! new_sign == old_sign") # messageInfo.setHttpService(self._helpers.buildHttpService(HOST_TO, # httpService.getPort(), httpService.getProtocol()))