Files
aptly/system/lib.py
Andrey Smirnov 1b2fccb615 Compatibility with GnuPG 1.x and 2.x, auto-detect GnuPG version
* aptly can sign and verify without issues with GnuPG 1.x and 2.x
* aptly auto-detects GnuPG version and adapts accordingly
* aptly automatically finds suitable GnuPG version

Majority of the work was to get unit-tests which can work with GnuPG 1.x & 2.x.
Locally I've verified that aptly supports GnuPG 1.4.x & 2.2.x. Travis CI
environment is based on trusty, so it runs gpg2 tests with GnuPG 2.0.x.

Configuration parameter gpgProvider now supports three values for GnuPG:

* gpg (same as before, default): use GnuPG 1.x if available (checks gpg, gpg1),
otherwise uses GnuPG 2.x; for aptly users who already have GnuPG 1.x
environment (as it was the only supported version) nothing should change; new
users might start with GnuPG 2.x if that's their installed version

* gpg1 looks for GnuPG 1.x only, fails otherwise

* gpg2 looks for GnuPG 2.x only, fails otherwise
2018-10-10 01:34:00 +03:00

341 lines
12 KiB
Python

"""
Test library.
"""
import difflib
import inspect
import json
import subprocess
import os
import posixpath
import shlex
import shutil
import string
import threading
import urllib
import pprint
import SocketServer
import SimpleHTTPServer
class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
pass
class FileHTTPServerRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
def translate_path(self, path):
"""Translate a /-separated PATH to the local filename syntax.
Components that mean special things to the local file system
(e.g. drive or directory names) are ignored. (XXX They should
probably be diagnosed.)
"""
# abandon query parameters
path = path.split('?', 1)[0]
path = path.split('#', 1)[0]
path = posixpath.normpath(urllib.unquote(path))
words = path.split('/')
words = filter(None, words)
path = self.rootPath
for word in words:
drive, word = os.path.splitdrive(word)
head, word = os.path.split(word)
if word in (os.curdir, os.pardir):
continue
path = os.path.join(path, word)
return path
def log_message(self, format, *args):
pass
class BaseTest(object):
"""
Base class for all tests.
"""
longTest = False
fixturePool = False
fixturePoolCopy = False
fixtureDB = False
fixtureGpg = False
fixtureWebServer = False
requiresFTP = False
expectedCode = 0
configFile = {
"rootDir": "%s/.aptly" % os.environ["HOME"],
"downloadConcurrency": 4,
"downloadSpeedLimit": 0,
"architectures": [],
"dependencyFollowSuggests": False,
"dependencyFollowRecommends": False,
"dependencyFollowAllVariants": False,
"dependencyFollowSource": False,
"gpgDisableVerify": False,
"gpgDisableSign": False,
"ppaDistributorID": "ubuntu",
"ppaCodename": "",
}
configOverride = {}
environmentOverride = {}
fixtureDBDir = os.path.join(os.environ["HOME"], "aptly-fixture-db")
fixturePoolDir = os.path.join(os.environ["HOME"], "aptly-fixture-pool")
fixtureGpgKeys = ["debian-archive-keyring.gpg",
"ubuntu-archive-keyring.gpg",
"launchpad.key",
"flat.key",
"pagerduty.key",
"nvidia.key",
"jenkins.key"]
outputMatchPrepare = None
captureResults = False
def test(self):
self.prepare()
self.run()
self.check()
def prepare_remove_all(self):
if os.path.exists(os.path.join(os.environ["HOME"], ".aptly")):
shutil.rmtree(os.path.join(os.environ["HOME"], ".aptly"))
if os.path.exists(os.path.join(os.environ["HOME"], ".aptly.conf")):
os.remove(os.path.join(os.environ["HOME"], ".aptly.conf"))
if os.path.exists(os.path.join(os.environ["HOME"], ".gnupg", "aptlytest.gpg")):
os.remove(os.path.join(os.environ["HOME"], ".gnupg", "aptlytest.gpg"))
def prepare_default_config(self):
cfg = self.configFile.copy()
cfg.update(**self.configOverride)
f = open(os.path.join(os.environ["HOME"], ".aptly.conf"), "w")
f.write(json.dumps(cfg))
f.close()
def fixture_available(self):
if self.fixturePool and not os.path.exists(self.fixturePoolDir):
return False
if self.fixtureDB and not os.path.exists(self.fixtureDBDir):
return False
if self.requiresFTP and os.environ.get('NO_FTP_ACCESS', '') == 'yes':
return False
return True
def prepare_fixture(self):
if self.fixturePool:
os.makedirs(os.path.join(os.environ["HOME"], ".aptly"), 0755)
os.symlink(self.fixturePoolDir, os.path.join(os.environ["HOME"], ".aptly", "pool"))
if self.fixturePoolCopy:
os.makedirs(os.path.join(os.environ["HOME"], ".aptly"), 0755)
shutil.copytree(self.fixturePoolDir, os.path.join(os.environ["HOME"], ".aptly", "pool"), ignore=shutil.ignore_patterns(".git"))
if self.fixtureDB:
shutil.copytree(self.fixtureDBDir, os.path.join(os.environ["HOME"], ".aptly", "db"))
if self.fixtureWebServer:
self.webServerUrl = self.start_webserver(os.path.join(os.path.dirname(inspect.getsourcefile(self.__class__)),
self.fixtureWebServer))
if self.fixtureGpg:
# try to find gpg1 as that's what aptly prefers by default to build trusted keys in DB
# in lowest supported format
gpg = "gpg1"
try:
subprocess.check_output(["gpg1", "--version"])
except Exception:
gpg = "gpg"
# TODO: fixme
self.run_cmd([gpg, "--no-default-keyring", "--trust-model", "always", "--batch", "--keyring", "aptlytest.gpg", "--import"] +
[os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "files", key) for key in self.fixtureGpgKeys])
if hasattr(self, "fixtureCmds"):
for cmd in self.fixtureCmds:
self.run_cmd(cmd)
def run(self):
self.output = self.output_processor(self.run_cmd(self.runCmd, self.expectedCode))
def _start_process(self, command, stderr=subprocess.STDOUT, stdout=None):
if not hasattr(command, "__iter__"):
params = {
'files': os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "files"),
'changes': os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "changes"),
'udebs': os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), "udebs"),
'testfiles': os.path.join(os.path.dirname(inspect.getsourcefile(self.__class__)), self.__class__.__name__),
'aptlyroot': os.path.join(os.environ["HOME"], ".aptly"),
}
if self.fixtureWebServer:
params['url'] = self.webServerUrl
command = string.Template(command).substitute(params)
command = shlex.split(command)
environ = os.environ.copy()
environ["LC_ALL"] = "C"
environ.update(self.environmentOverride)
return subprocess.Popen(command, stderr=stderr, stdout=stdout, env=environ)
def run_cmd(self, command, expected_code=0):
try:
proc = self._start_process(command, stdout=subprocess.PIPE)
output, _ = proc.communicate()
if proc.returncode != expected_code:
raise Exception("exit code %d != %d (output: %s)" % (proc.returncode, expected_code, output))
return output
except Exception, e:
raise Exception("Running command %s failed: %s" % (command, str(e)))
def gold_processor(self, gold):
return gold
def output_processor(self, output):
return output
def expand_environ(self, gold):
return string.Template(gold).substitute(os.environ)
def get_gold_filename(self, gold_name="gold"):
return os.path.join(os.path.dirname(inspect.getsourcefile(self.__class__)), self.__class__.__name__ + "_" + gold_name)
def get_gold(self, gold_name="gold"):
return self.gold_processor(open(self.get_gold_filename(gold_name), "r").read())
def check_output(self):
try:
self.verify_match(self.get_gold(), self.output, match_prepare=self.outputMatchPrepare)
except: # noqa: E722
if self.captureResults:
if self.outputMatchPrepare is not None:
self.output = self.outputMatchPrepare(self.output)
with open(self.get_gold_filename(), "w") as f:
f.write(self.output)
else:
raise
def check_cmd_output(self, command, gold_name, match_prepare=None, expected_code=0):
output = self.run_cmd(command, expected_code=expected_code)
try:
self.verify_match(self.get_gold(gold_name), output, match_prepare)
except: # noqa: E722
if self.captureResults:
if match_prepare is not None:
output = match_prepare(output)
with open(self.get_gold_filename(gold_name), "w") as f:
f.write(output)
else:
raise
def read_file(self, path):
with open(os.path.join(os.environ["HOME"], ".aptly", path), "r") as f:
return f.read()
def delete_file(self, path):
os.unlink(os.path.join(os.environ["HOME"], ".aptly", path))
def check_file_contents(self, path, gold_name, match_prepare=None):
contents = self.read_file(path)
try:
self.verify_match(self.get_gold(gold_name), contents, match_prepare=match_prepare)
except: # noqa: E722
if self.captureResults:
if match_prepare is not None:
contents = match_prepare(contents)
with open(self.get_gold_filename(gold_name), "w") as f:
f.write(contents)
else:
raise
def check_file(self):
contents = open(self.checkedFile, "r").read()
try:
self.verify_match(self.get_gold(), contents)
except: # noqa: E722
if self.captureResults:
with open(self.get_gold_filename(), "w") as f:
f.write(contents)
else:
raise
def check_exists(self, path):
if not os.path.exists(os.path.join(os.environ["HOME"], ".aptly", path)):
raise Exception("path %s doesn't exist" % (path, ))
def check_not_exists(self, path):
if os.path.exists(os.path.join(os.environ["HOME"], ".aptly", path)):
raise Exception("path %s exists" % (path, ))
def check_file_not_empty(self, path):
if os.stat(os.path.join(os.environ["HOME"], ".aptly", path))[6] == 0:
raise Exception("file %s is empty" % (path, ))
def check_equal(self, a, b):
if a != b:
self.verify_match(a, b, match_prepare=pprint.pformat)
def check_ge(self, a, b):
if not a >= b:
raise Exception("%s is not greater or equal to %s" % (a, b))
def check_gt(self, a, b):
if not a > b:
raise Exception("%s is not greater to %s" % (a, b))
def check_in(self, item, l):
if item not in l:
raise Exception("item %r not in %r", item, l)
def check_subset(self, a, b):
diff = ''
for k, v in a.items():
if k not in b:
diff += "unexpected key '%s'\n" % (k,)
elif b[k] != v:
diff += "wrong value '%s' for key '%s', expected '%s'\n" % (v, k, b[k])
if diff:
raise Exception("content doesn't match:\n" + diff)
def verify_match(self, a, b, match_prepare=None):
if match_prepare is not None:
a = match_prepare(a)
b = match_prepare(b)
if a != b:
diff = "".join(difflib.unified_diff([l + "\n" for l in a.split("\n")], [l + "\n" for l in b.split("\n")]))
raise Exception("content doesn't match:\n" + diff + "\n")
check = check_output
def prepare(self):
self.prepare_remove_all()
self.prepare_default_config()
self.prepare_fixture()
def start_webserver(self, directory):
FileHTTPServerRequestHandler.rootPath = directory
self.webserver = ThreadedTCPServer(("localhost", 0), FileHTTPServerRequestHandler)
server_thread = threading.Thread(target=self.webserver.serve_forever)
server_thread.daemon = True
server_thread.start()
return "http://%s:%d/" % self.webserver.server_address
def shutdown(self):
if hasattr(self, 'webserver'):
self.shutdown_webserver()
def shutdown_webserver(self):
self.webserver.shutdown()
@classmethod
def shutdown_class(cls):
pass