浸透测试之红包激励模块逻辑破绽测试总结

下图是本次App逻辑破绽测试集体总结的领导准则和测试方法,领导准则前面对应的是发现的破绽,有些破绽形容做了脱敏泛化解决,依据这些破绽根本能够反推出测试用例。这些是自己认为App逻辑破绽测试应该重点关注的,可能不全,如有补充欢送探讨(其余sql注入、XSS等其余类型破绽不是本次测试的重点,测试的对象App,另外看了后端代码都是用的orm框架个别也没有什么问题)。

背景

公司不同App产品线的红包激励模块曾好几次被破解,呈现过几次针对我司产品的autojs破解协定apk,自己作为风控部门业余做逆向的技术人员,曾逆向剖析过这些黑产app,大部分都是抓包获取用户token后间接发包获取处分,绕过看广告、做工作等环节。
而本次工作的次要目标就是主动出击,对公司外围产品的红包激励模块进行逻辑破绽测试,看是否有容易被薅羊毛的的逻辑破绽,对发现的破绽督促业务线进行整改。尽管自己始终做的逆向方向的,但也长期关注平安相干的其余畛域,对浸透测试的办法和工具等也是近朱者赤;近墨者黑。本文次要是做自己本次所做浸透的测试简要总结。

浸透测试要点总结

浸透测试和代码审计相辅相成
通常须要编写burpsuite插件实现主动sign计算,这样在批改参数重放或并发测试时不须要关怀sign计算问题,再评估完sign算法爱护强度后,能够间接看代码算法实现,而不须要齐全逆向还原能够进步测试效率。
很多破绽,如并发破绽等是能够通过代码审计一眼看进去的,不须要齐全黑盒测试,这也能够极大进步测试效率。当然也有因为对代码逻辑不熟或代码逻辑过于简单,导致看代码还不是间接黑盒测试来的快得状况。集体感觉两者是相辅相成的。
竞争/并发破绽
服务端对客户端申请都是并发解决的,而且很多服务端还有负载平衡多节点解决,这样当对数据库某字段进行加减操作(如红包发放)如果没有加锁就会导致竞争/并发破绽,分布式锁个别用数据库锁或redis锁。
做好提现最初关卡审核
本次测试的一个重点就是看是否齐全脱离app间接通过发包就能够实现提现的,教训证在有提现微信账号openid的前提下是能够全程脱离APP通过发包实现提现1元到账的。但微信的openid是跟app绑定的,不同app对应的openid不同,须要获取微信的openid,须要调⽤微信受权登录接⼝来获取,并且调⽤时会校验调⽤⽅app的签名,若app的签名与后盾配置的签名不⼀致,⽆法胜利调⽤,试过各种签名坑骗未失效,预计不是在app内校验签名的而是在微信里校验的,⽬前openid是抓包获取的。
设想一下如果能够齐全脱离app就能够全程发包获取处分并提现(如领取号手机号间接提现),那么即便限度了每个账号的提现次数那也是很恐怖的。比方开发一个在线的薅羊毛工具,而后到处流传,每个人只有扫个码或者填个手机号就能够提现一元,流传量大也很恐怖。
解决办法就是当时在app内做好各个用户行为的埋点数据和用户环境检测数据等,而后在提现重要关卡验证这些数据是否失常,不失常就回绝提现。

burpsuite脚本

对于如何应用burpsuite来进行测试,看官网帮忙文档或网上材料根本就能学会这边就不开展细说,附上解决过burpsuite插件脚本供参考。

# -*- coding: utf-8 -*-from burp import IBurpExtenderfrom burp import IHttpListenerfrom burp import IRequestInfofrom burp import IParameterfrom burp import IBurpExtenderCallbacksfrom java.io import PrintWriterimport base64from hashlib import md5import timeHOST_FROM = "yy.xx.com"# https://portswigger.net/burp/extender/api/index.htmlclass BurpExtender(IBurpExtender, IHttpListener):    #    # implement IBurpExtender    #        def    registerExtenderCallbacks(self, callbacks):        # obtain an extension helpers object        self._helpers = callbacks.getHelpers()        self._stdout = PrintWriter(callbacks.getStdout(), True)                # set our extension name        callbacks.setExtensionName("zz resign plugin")                # register ourselves as an HTTP listener        callbacks.registerHttpListener(self)    #    # implement IHttpListener    #        def processHttpMessage(self, toolFlag, messageIsRequest, messageInfo):        # only process requests        if not messageIsRequest:            return        # get the HTTP service for the request        httpService = messageInfo.getHttpService()                # only process the host that is HOST_FROM        if (HOST_FROM != httpService.getHost()):            return        # self._stdout.println(toolFlag)        # self._stdout.println(IBurpExtenderCallbacks.TOOL_REPEATER)        if (IBurpExtenderCallbacks.TOOL_REPEATER != toolFlag) or (IBurpExtenderCallbacks.TOOL_INTRUDER != toolFlag):            return        requestInfo = self._helpers.analyzeRequest(messageInfo)        path = requestInfo.getUrl().getPath()        self._stdout.println("path=%s" % path)        if not path.startswith("/xx/"):            return        self.resign(messageInfo, check_sign=False)    def resign(self, messageInfo, check_sign=False):        requestInfo = self._helpers.analyzeRequest(messageInfo)        method = requestInfo.getMethod()        # self._stdout.println("method=%s" % method)        path = requestInfo.getUrl().getPath()        # self._stdout.println("path=%s" % path)          parameters = requestInfo.getParameters()        body_offset = requestInfo.getBodyOffset()        url_param_dist = {}        for param in parameters:            if IParameter.PARAM_URL == param.getType():                k = param.getName()                v = param.getValue()                v = self._helpers.urlDecode(v) # 须要转化一下有些%号没有decode,签名算法是谬误的                self._stdout.println("%s=%s" % (k, v))                # ...                request = messageInfo.getRequest()        body = request[body_offset:]        body = self._helpers.bytesToString(body)        self._stdout.println("body=%s" % body)        s = '{}-{}'.format(method, path)        # params        old_sign = ""        new_time_sign = None        new_ts = str(int(time.time()))        if not check_sign:            url_param_dist["_ts"] = new_ts #更新工夫戳            if "XX" in url_param_dist:                s = str(url_param_dist["XX"]) + time.strftime('%Y%m%d')                new_time_sign = md5(s.encode()).hexdigest()                url_param_dist["sign"] = new_time_sign #更新sign        arguments = url_param_dist        keys = list(arguments.keys())        keys.sort()        for k in keys:            if k == '_sign':                old_sign = arguments[k]                continue            val = arguments[k]            if type(val) == str or type(val) == unicode or type(val) == int:                s += '&{}={}'.format(k, val)            elif type(val) == list:                for v in val:                    s += '&{}={}'.format(k, v)            else:                self._stdout.println('params unknown type:{}, key:{}'.format(type(val), k))                return        # json body        s += '&json={}'.format(body or '')        digest = md5(s.encode()).hexdigest()        new_sign = base64.urlsafe_b64encode(digest)[:-2]        # self._stdout.println('new_sign:{}, old_sign:{}'.format(new_sign, old_sign))        request = self._helpers.urlDecode(request)        if not check_sign:            new_request = self._helpers.updateParameter(request, self._helpers.buildParameter("ts", new_ts, IParameter.PARAM_URL))            new_request = self._helpers.updateParameter(new_request, self._helpers.buildParameter("_sign", new_sign, IParameter.PARAM_URL))            if new_time_sign:                new_request = self._helpers.updateParameter(new_request, self._helpers.buildParameter("sign", new_time_sign, IParameter.PARAM_URL))            # self._stdout.println(self._helpers.bytesToString(new_request))            messageInfo.setRequest(new_request)        else:            if new_sign != old_sign:                self._stdout.println("error! new_sign != old_sign")            else:                self._stdout.println("success! new_sign == old_sign")        # messageInfo.setHttpService(self._helpers.buildHttpService(HOST_TO,        #     httpService.getPort(), httpService.getProtocol()))