from Library.LocustBasic import * Account = CsvDataReader('accounts.csv') class Test(FastHttpUser): token = '' wait_time = between(0.5, 0.8) def on_start(self): account = Account.read() with self.client.request( method='POST', url='/api/user/login', data={ 'username': account[0], 'password': account[1] }, catch_response=True ) as response: pass if response.status_code == 200 and response.json()['code'] == 0: self.token = response.json()['data']['token'] else: response.failure(Exception('Login failed.')) self.stop() @task(30) def api_task(self): with self.client.request( method='GET', url='/api/task?page_size=20&page=1&id=&protocol=&name=&tag=&host_id=&status=', headers={ 'Auth-Token': self.token }, catch_response=True ) as response: if response.status_code == 200 and response.json()['code'] == 0: pass else: response.failure(Exception('%s: %.64s' % (response.status_code, response.text))) @task(30) def api_logs(self): with self.client.request( method='GET', url='/api/task/log?page_size=20&page=1&task_id=&protocol=&status=', headers={ 'Auth-Token': self.token }, catch_response=True ) as response: if response.status_code == 200 and response.json()['code'] == 0: pass else: response.failure(Exception('%s: %.64s' % (response.status_code, response.text))) @task(25) def api_host(self): with self.client.request( method='GET', url='/api/host?page_size=20&page=1&id=&name=&alias=', headers={ 'Auth-Token': self.token }, catch_response=True ) as response: if response.status_code == 200 and response.json()['code'] == 0: pass else: response.failure(Exception('%s: %.64s' % (response.status_code, response.text))) @task(15) def api_user(self): with self.client.request( method='GET', url='/api/user', headers={ 'Auth-Token': self.token }, catch_response=True ) as response: if response.status_code == 200 and response.json()['code'] == 0: pass else: response.failure(Exception('%s: %.64s' % (response.status_code, response.text))) if __name__ == '__main__': # Locust file, default self. sys.argv.append('--locustfile=%s' % __file__) # Host to load test. sys.argv.append('--host=http://fanscloud.net:5920') # Host to bind the web interface to. Defaults to '*' (all interfaces) sys.argv.append('--web-host=127.0.0.1') # Peak number of concurrent Locust users. # Primarily used together with --headless or --autostart. sys.argv.append('--users=250') # Rate to spawn users at (users per second). # Primarily used together with --headless or --autostart sys.argv.append('--spawn-rate=10') # Primarily used together with --headless or --autostart sys.argv.append('--run-time=3m') sys.exit(main())