1、循环取能够重复使用的数据

csvData.csv

1,12,23,34,45,56,67,78,89,9

locust2.py

from locust import TaskSet, task, HttpUser, HttpLocust, between, tag, constantimport csvclass UserBehavior(TaskSet):    def on_start(self):        self.index = 0    # @tag('getJoke分页查问')    @task    def test_visit(self):        self.index = (self.index + 1 ) % len(self.parent.share_data)        url = "/getJoke?page=" + self.parent.share_data[self.index] + "&count=10&type=video" + self.parent.share_data[self.index]        print('visit url: %s' % url, self.parent.share_data)        self.client.get(url, name='getJoke分页查问')class WebsiteUser(HttpUser):    host = 'https://api.apiopen.top'    # task_set = UserBehavior    tasks = [UserBehavior]    # my_data = [1, 3, 5]    share_data = []    with open('csvData.csv', newline='') as csvfile:        # 此办法:当文件不必时会主动敞开文件        csvReader = csv.reader(csvfile)        for content in csvReader:            # print(content)            share_data.append(content[1])    # min_wait = 1000    # max_wait = 3000    wait_time = between(1, 3)

2、不循环取数据,测试数据惟一

应用队列存储数据

from locust import TaskSet, task, HttpUserimport queueclass UserBehavior(TaskSet):    @task    def test_register(self):        try:            data = self.parent.user_data_queue.get()        except queue.Empty:            print('account data run out, test ended.')            exit(0)        print('register with user: {}, pwd: {}'.format(data['username'], data['password']))        payload = { 'username': data['username'], 'password': data['password'] }        self.client.post('/register', data=payload)class WebsiteUser(HttpUser):    host = 'http://debugtalk.com'    # task_set = UserBehavior    tasks = [UserBehavior]    user_data_queue = queue.Queue()    for index in range(100):        data = { "username": "test%04d" % index,                 "password": "pwd%04d" % index,                 "email": "test%04d@debugtalk.test" % index,                 "phone": "186%08d" % index,                 }        user_data_queue.put_nowait(data)    min_wait = 1000    max_wait = 3000

3、循环取数据,测试数据惟一

from locust import TaskSet, task, HttpLocustimport queueclass UserBehavior(TaskSet):    @task    def test_register(self):        try:            data = self.parent.user_data_queue.get()        except queue.Empty:            print('account data run out, test ended.')            exit(0)        print('register with user: {}, pwd: {}'.format(data['username'], data['password']))        payload = { 'username': data['username'], 'password': data['password'] }        self.client.post('/register', data=payload)        self.parent.user_data_queue.put_nowait(data) # 将取出的数据从新写入队列class WebsiteUser(HttpLocust):    host = 'http://debugtalk.com'    # task_set = UserBehavior    tasks = [UserBehavior]    user_data_queue = queue.Queue()    for index in range(100):        data = { "username": "test%04d" % index,                 "password": "pwd%04d" % index,                 "email": "test%04d@debugtalk.test" % index,                 "phone": "186%08d" % index, }        user_data_queue.put_nowait(data)    min_wait = 1000    max_wait = 3000
集体博客 蜗牛