Library for API system tests. #116

This commit is contained in:
Andrey Smirnov
2014-10-10 17:43:04 +04:00
parent 53f7fef4cf
commit 523d0d0945
3 changed files with 99 additions and 21 deletions

52
system/api_lib.py Normal file
View File

@@ -0,0 +1,52 @@
from lib import BaseTest
import time
import json
import random
import string
try:
import requests
except ImportError:
requests = None
class APITest(BaseTest):
"""
BaseTest + testing aptly API
"""
aptly_server = None
base_url = "127.0.0.1:8765"
def fixture_available(self):
return super(APITest, self).fixture_available() and requests is not None
def prepare(self):
if APITest.aptly_server is None:
super(APITest, self).prepare()
APITest.aptly_server = self._start_process("aptly api serve -listen=%s" % (self.base_url),)
time.sleep(1)
def run(self):
pass
def get(self, uri, *args, **kwargs):
return requests.get("http://%s%s" % (self.base_url, uri), *args, **kwargs)
def post(self, uri, *args, **kwargs):
if "json" in kwargs:
kwargs["data"] = json.dumps(kwargs.pop("json"))
if not "headers" in kwargs:
kwargs["headers"] = {}
kwargs["headers"]["Content-Type"] = "application/json"
return requests.post("http://%s%s" % (self.base_url, uri), *args, **kwargs)
@classmethod
def shutdown_class(cls):
if cls.aptly_server is not None:
cls.aptly_server.terminate()
cls.aptly_server.wait()
cls.aptly_server = None
def random_name(self):
return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(15))

View File

@@ -14,6 +14,7 @@ import string
import threading
import urllib
#import time
import pprint
import SocketServer
import SimpleHTTPServer
@@ -149,25 +150,28 @@ class BaseTest(object):
def run(self):
self.output = self.output_processor(self.run_cmd(self.runCmd, self.expectedCode))
def _start_process(self, command, stderr=subprocess.STDOUT, stdout=None):
if not hasattr(command, "__iter__"):
params = {
'files': os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "files"),
'udebs': os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "udebs"),
'testfiles': os.path.join(os.path.dirname(inspect.getsourcefile(self.__class__)), self.__class__.__name__),
}
if self.fixtureWebServer:
params['url'] = self.webServerUrl
command = string.Template(command).substitute(params)
command = shlex.split(command)
environ = os.environ.copy()
environ["LC_ALL"] = "C"
environ.update(self.environmentOverride)
return subprocess.Popen(command, stderr=stderr, stdout=stdout, env=environ)
def run_cmd(self, command, expected_code=0):
try:
#start = time.time()
if not hasattr(command, "__iter__"):
params = {
'files': os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "files"),
'udebs': os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "udebs"),
'testfiles': os.path.join(os.path.dirname(inspect.getsourcefile(self.__class__)), self.__class__.__name__),
}
if self.fixtureWebServer:
params['url'] = self.webServerUrl
command = string.Template(command).substitute(params)
command = shlex.split(command)
environ = os.environ.copy()
environ["LC_ALL"] = "C"
environ.update(self.environmentOverride)
proc = subprocess.Popen(command, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, env=environ)
proc = self._start_process(command, stdout=subprocess.PIPE)
output, _ = proc.communicate()
#print "CMD %s: %.2f" % (" ".join(command), time.time()-start)
if proc.returncode != expected_code:
@@ -254,6 +258,10 @@ class BaseTest(object):
if os.path.exists(os.path.join(os.environ["HOME"], ".aptly", path)):
raise Exception("path %s exists" % (path, ))
def check_equal(self, a, b):
if a != b:
self.verify_match(a, b, match_prepare=pprint.pformat)
def verify_match(self, a, b, match_prepare=None):
if match_prepare is not None:
a = match_prepare(a)
@@ -287,3 +295,7 @@ class BaseTest(object):
def shutdown_webserver(self):
self.webserver.shutdown()
@classmethod
def shutdown_class(cls):
pass

View File

@@ -6,9 +6,11 @@ import os
import inspect
import fnmatch
import sys
import traceback
from lib import BaseTest
from s3_lib import S3Test
from api_lib import APITest
try:
from termcolor import colored
@@ -25,6 +27,7 @@ def run(include_long_tests=False, capture_results=False, tests=None, filters=Non
tests = glob.glob("t*_*")
fails = []
numTests = numFailed = numSkipped = 0
lastBase = None
for test in tests:
@@ -34,9 +37,15 @@ def run(include_long_tests=False, capture_results=False, tests=None, filters=Non
o = getattr(testModule, name)
if not (inspect.isclass(o) and issubclass(o, BaseTest) and o is not BaseTest and
o is not S3Test):
o is not S3Test and o is not APITest):
continue
newBase = o.__bases__[0]
if lastBase is not None and lastBase is not newBase:
lastBase.shutdown_class()
lastBase = newBase
if filters:
matches = False
@@ -60,23 +69,28 @@ def run(include_long_tests=False, capture_results=False, tests=None, filters=Non
try:
t.captureResults = capture_results
t.test()
except BaseException, e:
except BaseException:
numFailed += 1
fails.append((test, t, e, testModule))
typ, val, tb = sys.exc_info()
fails.append((test, t, typ, val, tb, testModule))
sys.stdout.write(colored("FAIL\n", color="red"))
else:
sys.stdout.write(colored("OK\n", color="green"))
t.shutdown()
if lastBase is not None:
lastBase.shutdown_class()
print "TESTS: %d SUCCESS: %d FAIL: %d SKIP: %d" % (numTests, numTests - numFailed, numFailed, numSkipped)
if len(fails) > 0:
print "\nFAILURES (%d):" % (len(fails), )
for (test, t, e, testModule) in fails:
for (test, t, typ, val, tb, testModule) in fails:
print "%s:%s %s" % (test, t.__class__.__name__, testModule.__doc__.strip() + ": " + t.__doc__.strip())
print "ERROR: %s" % (e, )
#print "ERROR: %s" % (val, )
traceback.print_exception(typ, val, tb)
print "=" * 60
sys.exit(1)