mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-05-06 22:18:28 +00:00
support systemd activation for api serve
systemd has a feature called socket activation where initially systemd manages and listens on ports/uds and only invokes a service when traffic appears. to then hand over the involved sockets, systemd will pass the relevant FDs into the invoked process and defines them in the environment. use coreos/go-systemd to grab the active listeners passed by systemd and use them to serve the api routes. only one listener may be specified right now as we also only support one -listen argument for the binary. this allows admins to craft a systemd socket and service file for aptly where systemd manages the socket, its permission and its live time, and lazy start aptly when needed.
This commit is contained in:
@@ -10,3 +10,4 @@ from .graph import *
|
||||
from .snapshots import *
|
||||
from .packages import *
|
||||
from .unix_socket import *
|
||||
from .systemd_handover import *
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
import requests_unixsocket
|
||||
import time
|
||||
import urllib
|
||||
import os.path
|
||||
|
||||
from lib import BaseTest
|
||||
|
||||
class SystemdAPIHandoverTest(BaseTest):
|
||||
aptly_server = None
|
||||
socket_path = "/tmp/_aptly_systemdapihandovertest.sock"
|
||||
|
||||
def prepare(self):
|
||||
# On Debian they use /lib on other systems /usr/lib.
|
||||
systemd_activate = "/usr/lib/systemd/systemd-activate"
|
||||
if not os.path.exists(systemd_activate):
|
||||
systemd_activate = "/lib/systemd/systemd-activate"
|
||||
if not os.path.exists(systemd_activate):
|
||||
print("Could not find systemd-activate")
|
||||
return
|
||||
self.aptly_server = self._start_process("%s -l %s aptly api serve -no-lock" %
|
||||
(systemd_activate, self.socket_path),)
|
||||
super(SystemdAPIHandoverTest, self).prepare()
|
||||
|
||||
def shutdown(self):
|
||||
if self.aptly_server is not None:
|
||||
self.aptly_server.terminate()
|
||||
self.aptly_server.wait()
|
||||
self.aptly_server = None
|
||||
if os.path.exists(self.socket_path):
|
||||
os.remove(self.socket_path)
|
||||
super(SystemdAPIHandoverTest, self).shutdown()
|
||||
|
||||
def run(self):
|
||||
pass
|
||||
|
||||
"""
|
||||
Verify we can listen on a unix domain socket.
|
||||
"""
|
||||
def check(self):
|
||||
if self.aptly_server == None:
|
||||
print("Skipping test as we failed to setup a listener.")
|
||||
return
|
||||
session = requests_unixsocket.Session()
|
||||
r = session.get('http+unix://%s/api/version' % urllib.quote(self.socket_path, safe=''))
|
||||
self.check_equal(r.json(), {'Version': '0.9.8~dev'})
|
||||
Reference in New Issue
Block a user