1、循环取能够重复使用的数据
csvData.csv
1,12,23,34,45,56,67,78,89,9
locust2.py
from locust import TaskSet, task, HttpUser, HttpLocust, between, tag, constantimport csvclass UserBehavior(TaskSet): def on_start(self): self.index = 0 # @tag('getJoke分页查问') @task def test_visit(self): self.index = (self.index + 1 ) % len(self.parent.share_data) url = "/getJoke?page=" + self.parent.share_data[self.index] + "&count=10&type=video" + self.parent.share_data[self.index] print('visit url: %s' % url, self.parent.share_data) self.client.get(url, name='getJoke分页查问')class WebsiteUser(HttpUser): host = 'https://api.apiopen.top' # task_set = UserBehavior tasks = [UserBehavior] # my_data = [1, 3, 5] share_data = [] with open('csvData.csv', newline='') as csvfile: # 此办法:当文件不必时会主动敞开文件 csvReader = csv.reader(csvfile) for content in csvReader: # print(content) share_data.append(content[1]) # min_wait = 1000 # max_wait = 3000 wait_time = between(1, 3)
2、不循环取数据,测试数据惟一
应用队列存储数据
from locust import TaskSet, task, HttpUserimport queueclass UserBehavior(TaskSet): @task def test_register(self): try: data = self.parent.user_data_queue.get() except queue.Empty: print('account data run out, test ended.') exit(0) print('register with user: {}, pwd: {}'.format(data['username'], data['password'])) payload = { 'username': data['username'], 'password': data['password'] } self.client.post('/register', data=payload)class WebsiteUser(HttpUser): host = 'http://debugtalk.com' # task_set = UserBehavior tasks = [UserBehavior] user_data_queue = queue.Queue() for index in range(100): data = { "username": "test%04d" % index, "password": "pwd%04d" % index, "email": "test%04d@debugtalk.test" % index, "phone": "186%08d" % index, } user_data_queue.put_nowait(data) min_wait = 1000 max_wait = 3000
3、循环取数据,测试数据惟一
from locust import TaskSet, task, HttpLocustimport queueclass UserBehavior(TaskSet): @task def test_register(self): try: data = self.parent.user_data_queue.get() except queue.Empty: print('account data run out, test ended.') exit(0) print('register with user: {}, pwd: {}'.format(data['username'], data['password'])) payload = { 'username': data['username'], 'password': data['password'] } self.client.post('/register', data=payload) self.parent.user_data_queue.put_nowait(data) # 将取出的数据从新写入队列class WebsiteUser(HttpLocust): host = 'http://debugtalk.com' # task_set = UserBehavior tasks = [UserBehavior] user_data_queue = queue.Queue() for index in range(100): data = { "username": "test%04d" % index, "password": "pwd%04d" % index, "email": "test%04d@debugtalk.test" % index, "phone": "186%08d" % index, } user_data_queue.put_nowait(data) min_wait = 1000 max_wait = 3000
集体博客 蜗牛