mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-04-20 19:38:39 +00:00
@@ -13,17 +13,27 @@ import shlex
|
||||
import shutil
|
||||
import string
|
||||
import threading
|
||||
import urllib
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
import pprint
|
||||
import SocketServer
|
||||
import SimpleHTTPServer
|
||||
import socketserver
|
||||
import http.server
|
||||
import zlib
|
||||
|
||||
|
||||
class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
|
||||
def ungzip_if_required(output):
|
||||
if isinstance(output, bytes) and output.startswith(b"\x1f\x8b"):
|
||||
return zlib.decompress(output, 16 + zlib.MAX_WBITS).decode('utf-8')
|
||||
|
||||
return output
|
||||
|
||||
|
||||
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
|
||||
pass
|
||||
|
||||
|
||||
class FileHTTPServerRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
|
||||
class FileHTTPServerRequestHandler(http.server.SimpleHTTPRequestHandler):
|
||||
def translate_path(self, path):
|
||||
"""Translate a /-separated PATH to the local filename syntax.
|
||||
|
||||
@@ -35,9 +45,9 @@ class FileHTTPServerRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
|
||||
# abandon query parameters
|
||||
path = path.split('?', 1)[0]
|
||||
path = path.split('#', 1)[0]
|
||||
path = posixpath.normpath(urllib.unquote(path))
|
||||
path = posixpath.normpath(urllib.parse.unquote(path))
|
||||
words = path.split('/')
|
||||
words = filter(None, words)
|
||||
words = [_f for _f in words if _f]
|
||||
path = self.rootPath
|
||||
for word in words:
|
||||
_, word = os.path.splitdrive(word)
|
||||
@@ -70,7 +80,7 @@ class GPGFinder(object):
|
||||
def find_gpg(self, executables, expected_version):
|
||||
for executable in executables:
|
||||
try:
|
||||
output = subprocess.check_output([executable, "--version"])
|
||||
output = subprocess.check_output([executable, "--version"], text=True)
|
||||
if expected_version in output:
|
||||
return executable
|
||||
except Exception:
|
||||
@@ -90,7 +100,7 @@ class DotFinder(object):
|
||||
def find_dot(self, executables):
|
||||
for executable in executables:
|
||||
try:
|
||||
subprocess.check_output([executable, "-V"])
|
||||
subprocess.check_output([executable, "-V"], text=True)
|
||||
return executable
|
||||
except Exception:
|
||||
pass
|
||||
@@ -114,6 +124,7 @@ class BaseTest(object):
|
||||
requiresGPG1 = False
|
||||
requiresGPG2 = False
|
||||
requiresDot = False
|
||||
sortOutput = False
|
||||
|
||||
expectedCode = 0
|
||||
configFile = {
|
||||
@@ -198,12 +209,12 @@ class BaseTest(object):
|
||||
|
||||
def prepare_fixture(self):
|
||||
if self.fixturePool:
|
||||
os.makedirs(os.path.join(os.environ["HOME"], ".aptly"), 0755)
|
||||
os.makedirs(os.path.join(os.environ["HOME"], ".aptly"), 0o755)
|
||||
os.symlink(self.fixturePoolDir, os.path.join(
|
||||
os.environ["HOME"], ".aptly", "pool"))
|
||||
|
||||
if self.fixturePoolCopy:
|
||||
os.makedirs(os.path.join(os.environ["HOME"], ".aptly"), 0755)
|
||||
os.makedirs(os.path.join(os.environ["HOME"], ".aptly"), 0o755)
|
||||
shutil.copytree(self.fixturePoolDir, os.path.join(
|
||||
os.environ["HOME"], ".aptly", "pool"), ignore=shutil.ignore_patterns(".git"))
|
||||
|
||||
@@ -228,12 +239,17 @@ class BaseTest(object):
|
||||
for cmd in self.fixtureCmds:
|
||||
self.run_cmd(cmd)
|
||||
|
||||
def sort_lines(self, output):
|
||||
return "\n".join(sorted(self.ensure_utf8(output).split("\n")))
|
||||
|
||||
def run(self):
|
||||
self.output = self.output_processor(
|
||||
self.run_cmd(self.runCmd, self.expectedCode))
|
||||
output = self.run_cmd(self.runCmd, self.expectedCode)
|
||||
if self.sortOutput:
|
||||
output = self.sort_lines(output)
|
||||
self.output = self.output_processor(output)
|
||||
|
||||
def _start_process(self, command, stderr=subprocess.STDOUT, stdout=None):
|
||||
if not hasattr(command, "__iter__"):
|
||||
if isinstance(command, str):
|
||||
params = {
|
||||
'files': os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "files"),
|
||||
'changes': os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "changes"),
|
||||
@@ -245,8 +261,8 @@ class BaseTest(object):
|
||||
params['url'] = self.webServerUrl
|
||||
|
||||
command = string.Template(command).substitute(params)
|
||||
|
||||
command = shlex.split(command)
|
||||
|
||||
environ = os.environ.copy()
|
||||
environ["LC_ALL"] = "C"
|
||||
environ.update(self.environmentOverride)
|
||||
@@ -261,7 +277,7 @@ class BaseTest(object):
|
||||
raise Exception("exit code %d != %d (output: %s)" % (
|
||||
proc.returncode, expected_code, output))
|
||||
return output
|
||||
except Exception, e:
|
||||
except Exception as e:
|
||||
raise Exception("Running command %s failed: %s" %
|
||||
(command, str(e)))
|
||||
|
||||
@@ -315,19 +331,20 @@ class BaseTest(object):
|
||||
else:
|
||||
raise
|
||||
|
||||
def read_file(self, path):
|
||||
with open(os.path.join(os.environ["HOME"], ".aptly", path), "r") as f:
|
||||
def read_file(self, path, mode=''):
|
||||
with open(os.path.join(os.environ["HOME"], ".aptly", path), "r" + mode) as f:
|
||||
return f.read()
|
||||
|
||||
def delete_file(self, path):
|
||||
os.unlink(os.path.join(os.environ["HOME"], ".aptly", path))
|
||||
|
||||
def check_file_contents(self, path, gold_name, match_prepare=None):
|
||||
contents = self.read_file(path)
|
||||
def check_file_contents(self, path, gold_name, match_prepare=None, mode='', ensure_utf8=True):
|
||||
contents = self.read_file(path, mode=mode)
|
||||
try:
|
||||
|
||||
self.verify_match(self.get_gold(gold_name),
|
||||
contents, match_prepare=match_prepare)
|
||||
contents, match_prepare=match_prepare,
|
||||
ensure_utf8=ensure_utf8)
|
||||
except: # noqa: E722
|
||||
if self.captureResults:
|
||||
if match_prepare is not None:
|
||||
@@ -376,9 +393,13 @@ class BaseTest(object):
|
||||
if item not in l:
|
||||
raise Exception("item %r not in %r", item, l)
|
||||
|
||||
def check_not_in(self, item, l):
|
||||
if item in l:
|
||||
raise Exception("item %r in %r", item, l)
|
||||
|
||||
def check_subset(self, a, b):
|
||||
diff = ''
|
||||
for k, v in a.items():
|
||||
for k, v in list(a.items()):
|
||||
if k not in b:
|
||||
diff += "unexpected key '%s'\n" % (k,)
|
||||
elif b[k] != v:
|
||||
@@ -387,7 +408,16 @@ class BaseTest(object):
|
||||
if diff:
|
||||
raise Exception("content doesn't match:\n" + diff)
|
||||
|
||||
def verify_match(self, a, b, match_prepare=None):
|
||||
def ensure_utf8(self, a):
|
||||
if isinstance(a, bytes):
|
||||
return a.decode('utf-8')
|
||||
return a
|
||||
|
||||
def verify_match(self, a, b, match_prepare=None, ensure_utf8=True):
|
||||
if ensure_utf8:
|
||||
a = self.ensure_utf8(a)
|
||||
b = self.ensure_utf8(b)
|
||||
|
||||
if match_prepare is not None:
|
||||
a = match_prepare(a)
|
||||
b = match_prepare(b)
|
||||
|
||||
Reference in New Issue
Block a user