#!/usr/bin/env python3
"""
Watch git repositories and trigger arbitrary pipelines such as docker build and run commands.
Light on runtime resources and configuration overhead.
"""
import argparse
import asyncio
import configparser
import gc
import json
import logging
import os
import re
import signal
import sys
from collections import defaultdict, UserDict
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, asdict
from enum import IntEnum
from locale import setlocale, LC_ALL
from logging.handlers import QueueHandler, QueueListener
from pathlib import Path
from queue import Queue
from string import Template
from typing import AsyncIterator, Iterator, Callable, TypeVar, Generic, Union, Optional, List, Tuple, Set, Dict, Any, \
DefaultDict
from urllib.parse import quote as urlquote
# region Config
####
@dataclass(frozen=True)
class GitConfig:
"""Per-project git configuration."""
remote: str
shallow: bool = False
@dataclass(frozen=True)
class DockerConfig:
"""Per-project docker settings."""
build: bool
run: bool
dockerfile: Path
pull: bool
cache: bool
docker_in_docker: bool
read_only: bool
tmpfs: int # percent
tag: Optional[Template]
label: Optional[Template]
@dataclass(frozen=True)
class RsyncConfig:
"""Per-project rsync settings."""
destination: Optional[str] # path or remote server if enabled
delete: bool
checksum: bool
preserve: bool # might need to run as root
include: Optional[Path]
@dataclass(frozen=True)
class ProjectConfig:
"""Project configuration."""
name: str
enabled: bool
remote: GitConfig
branches: Optional[re.Pattern[str]]
tags: Optional[re.Pattern[str]]
env: Dict[str, str]
docker: DockerConfig
rsync: RsyncConfig
timeout: Optional[float]
prepare_callback: Optional[Path]
success_callback: Optional[Path]
error_callback: Optional[Path]
def as_env(self) -> Dict[str, str]:
return {**{"BUILD_PROJECT": self.name, "BUILD_REMOTE": self.remote.remote}, **self.env}
@dataclass(frozen=True)
class PipelineConfig:
"""Static configuration from commandline."""
workdir: Path
interval: int
limit: int
class Config:
"""Parse ini file as list of project configurations."""
class Parser(configparser.ConfigParser):
T = TypeVar('T')
V = TypeVar('V')
def get_as(self, section: str, option: str, fallback: T, conv: Callable[[str], V]) -> Union[V, T]:
val: Union[str, Config.Parser.T] = self.get(section, option, fallback=fallback)
return conv(val) if isinstance(val, str) else val
@classmethod
def _parse_env(cls, assignments: str) -> Iterator[Tuple[str, str]]:
for line in assignments.splitlines(keepends=False):
if "=" in line and not line.startswith(("#", ";")):
var, val = line.split("=", maxsplit=1)
yield var.strip(), val.strip()
@classmethod
def _parse_env_file(cls, filename: Optional[Path]) -> Iterator[Tuple[str, str]]:
if filename is not None:
yield from cls._parse_env(filename.read_text(encoding="utf-8", errors="strict"))
@classmethod
def _parse(cls, parser: Parser) -> Iterator[ProjectConfig]:
sections: Set[str] = set()
for section in parser.sections():
if re.fullmatch(r"[a-zA-Z0-9][a-zA-Z0-9_.-]*", section) is None:
raise ValueError(f"Invalid project section name '{section}'")
elif section in sections:
raise ValueError(f"Duplicate project section '{section}'")
else:
sections.add(section)
try:
yield ProjectConfig(
name=section,
enabled=parser.getboolean(section, "enabled", fallback=True),
remote=GitConfig(
remote=parser.get(section, "remote"),
shallow=parser.getboolean(section, "shallow", fallback=True)
),
branches=parser.get_as(section, "branches", fallback=None, conv=re.compile),
tags=parser.get_as(section, "tags", fallback=None, conv=re.compile),
env={**dict(cls._parse_env_file(parser.get_as(section, "env_file", fallback=None, conv=Path))),
**dict(cls._parse_env(parser.get(section, "env", fallback="")))},
docker=DockerConfig(
build=parser.getboolean(section, "build", fallback=False),
run=parser.getboolean(section, "run",
fallback=parser.getboolean(section, "build", fallback=False)),
dockerfile=parser.get_as(section, "dockerfile", fallback=Path("Dockerfile"), conv=Path),
pull=parser.getboolean(section, "pull", fallback=True),
cache=parser.getboolean(section, "cache", fallback=True),
docker_in_docker=parser.getboolean(section, "docker_in_docker", fallback=False),
read_only=parser.getboolean(section, "read_only", fallback=True),
tmpfs=max(0, min(50, parser.getint(section, "tmpfs", fallback=50))),
tag=parser.get_as(section, "tag", fallback=None, conv=Template),
label=parser.get_as(section, "label", fallback=None, conv=Template),
),
rsync=RsyncConfig(
destination=parser.get(section, "rsync.destination", fallback=None),
delete=parser.getboolean(section, "rsync.delete", fallback=False),
checksum=parser.getboolean(section, "rsync.checksum", fallback=False),
preserve=parser.getboolean(section, "rsync.preserve", fallback=False),
include=parser.get_as(section, "rsync.include", fallback=None, conv=Path),
),
timeout=parser.getfloat(section, "timeout", fallback=None),
prepare_callback=parser.get_as(section, "prepare_callback", fallback=None, conv=Path),
success_callback=parser.get_as(section, "success_callback", fallback=None, conv=Path),
error_callback=parser.get_as(section, "error_callback", fallback=None, conv=Path),
)
except (ValueError, re.error, configparser.Error, UnicodeError, OSError) as e:
raise ValueError(f"Parsing error in '{section}': {str(e)}") from None
@classmethod
def from_ini(cls, filename: Path) -> Iterator[ProjectConfig]:
try:
parser: Config.Parser = Config.Parser()
with filename.open("r") as fp:
parser.read_file(fp)
except (OSError, configparser.Error, UnicodeError, ValueError) as e:
raise RuntimeError(f"Cannot read config '{filename}': {str(e)}") from None
try:
yield from cls._parse(parser)
except ValueError as e:
raise RuntimeError(f"Cannot parse config '{filename}': {str(e)}") from None
# endregion
# region Logging
####
class QueuedLogger:
"""Move logging i/o into a separate thread."""
def __init__(self, logger: logging.Logger, handlers: List[logging.Handler]) -> None:
self._logger = logger
self._queue: Queue[Optional[logging.LogRecord]] = Queue()
self._handler: logging.Handler = QueueHandler(self._queue)
self._listener: QueueListener = QueueListener(self._queue, *handlers)
self._waiter: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=1, thread_name_prefix=self.__class__.__name__)
async def flush(self) -> None:
await asyncio.wrap_future(self._waiter.submit(self._queue.join))
async def __aenter__(self) -> logging.Logger:
self._listener.start()
self._logger.addHandler(self._handler)
return self._logger
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
self._logger.removeHandler(self._handler)
self._handler.close()
await asyncio.wrap_future(self._waiter.submit(self._listener.stop))
for h in self._listener.handlers:
await asyncio.wrap_future(self._waiter.submit(h.close))
self._listener.handlers = tuple()
self._waiter.shutdown(wait=True)
class RootLogger(QueuedLogger):
"""Move queue between root logger and its handlers."""
def __init__(self) -> None:
logger: logging.Logger = logging.getLogger()
handlers, logger.handlers = logger.handlers, []
super().__init__(logger, handlers)
class BuildLogger(QueuedLogger):
"""Standalone queued logger to file."""
def __init__(self, name: str, filename: Path) -> None:
try:
handler: logging.StreamHandler = logging.FileHandler(filename.as_posix(), delay=False, encoding="utf-8")
handler.setFormatter(logging.Formatter('%(asctime)s %(message)s'))
except OSError as e:
raise OSError(f"Cannot create build logger for '{filename}': {str(e)}") from None
logger: logging.Logger = logging.Logger(f"build.{name}", level=logging.DEBUG)
logger.propagate = False
super().__init__(logger, [handler])
# endregion
# region Context
####
@dataclass(frozen=True)
class GitRef:
"""Project remote specific version to build."""
ref: str
commit: str
def __post_init__(self) -> None:
if not self.ref.startswith(("refs/heads/", "refs/tags/")):
raise ValueError(self.ref)
if not self.commit.isalnum() or len(self.commit) != 40:
raise ValueError(self.commit)
@property
def short_commit(self) -> str:
return self.commit[:8] # NB: 7 is also common
@property
def branch(self) -> Optional[str]:
return self.ref[11:] if self.ref.startswith("refs/heads/") else None
@property
def tag(self) -> Optional[str]:
return self.ref[10:] if self.ref.startswith("refs/tags/") else None
@property
def branch_or_tag(self) -> str:
return self.branch or self.tag or self.ref
def as_dict(self) -> Dict[str, str]:
return {"ref": self.ref, "commit": self.commit,
"short_commit": self.short_commit, "branch": self.branch or "", "tag": self.tag or ""}
def as_env(self) -> Dict[str, str]:
return {f"BUILD_{k.upper()}": v for k, v in self.as_dict().items()}
def __str__(self) -> str:
return f"<{self.ref}:{self.short_commit}>"
class BuildContext:
"""Runnable instance of project, git ref, and path configuration."""
def __init__(self, project: ProjectConfig, ref: GitRef, path: Path, target_path: Path) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__).getChild(project.name)
self._project: ProjectConfig = project
self._ref: GitRef = ref
self._path: Path = path
self._target_path: Path = target_path
@property
def project(self) -> ProjectConfig:
return self._project
@property
def ref(self) -> GitRef:
return self._ref
@classmethod
def quote_path(cls, p: str) -> str:
return urlquote(p, safe="")
@classmethod
def _ensure_dir(cls, p: Path) -> Path:
try:
p.mkdir(parents=False, exist_ok=True)
except OSError as e:
raise OSError(f"Cannot create '{p}': {str(e)}") from None
else:
return p
@property
def root_dir(self) -> Path:
return self._path
@property
def build_dir(self) -> Path:
return self._ensure_dir(self._path / "build")
@property
def drop_dir(self) -> Path:
return self._ensure_dir(self._path / "drop")
@property
def logger(self) -> logging.Logger:
return self._logger
def get_build_logger(self) -> BuildLogger:
"""Open build logfile. Raises OSError."""
return BuildLogger(self._project.name, self._path / "build.log")
def finalize(self) -> None:
"""Mark build as successfully completed (by renaming to without the .tmp suffix)."""
try:
self._path.rename(self._target_path)
except OSError as e:
raise OSError(f"Cannot rename build directory to {self._target_path}: {str(e)}") from None
else:
self._logger.info(f"{self._path} -> {self._target_path}")
def __str__(self) -> str:
return f"<{self._path.as_posix()}>"
class ProjectContext:
"""Project that is watched for git updates, possibly creating a build context to run."""
def __init__(self, project: ProjectConfig, root: Path) -> None:
self._path: Path = root / BuildContext.quote_path(project.name)
self._project: ProjectConfig = project
self._ref_cache: DefaultDict[str, Dict[str, bool]] = defaultdict(dict)
@property
def project(self) -> ProjectConfig:
return self._project
def _get_build_path(self, ref: GitRef) -> Path:
return self._path / BuildContext.quote_path(ref.ref) / BuildContext.quote_path(ref.short_commit + ".tmp")
def _get_target_path(self, ref: GitRef) -> Path:
return self._path / BuildContext.quote_path(ref.ref) / BuildContext.quote_path(ref.short_commit)
def build_exists(self, ref: GitRef) -> bool:
try:
if ref.commit not in self._ref_cache[ref.ref]:
self._ref_cache[ref.ref][ref.commit] = \
self._get_build_path(ref).is_dir() or self._get_target_path(ref).is_dir() # NB: check tmp first
return self._ref_cache[ref.ref][ref.commit]
except OSError as e:
raise OSError(f"Cannot check {str(self)} for {str(ref)}: {str(e)}") from None
def build_acquire(self, ref: GitRef) -> BuildContext:
try:
path: Path = self._get_build_path(ref)
path.mkdir(parents=True, exist_ok=False) # acts as lock, but called in sync anyway
self._ref_cache[ref.ref][ref.commit] = True
except OSError as e:
raise OSError(f"Cannot create {str(ref)} in {str(self)}: {str(e)}") from None
else:
return BuildContext(self._project, ref, path, self._get_target_path(ref))
def __str__(self) -> str:
return f"<{self._path.as_posix()}>"
# endregion
# region Commands
####
class CommandEnv(UserDict[str, str]):
pass
class OsCommandEnv(CommandEnv):
"""Pass along current environment variables, possibly overridden."""
def __init__(self, extra: CommandEnv = CommandEnv()) -> None:
env: Dict[str, str] = os.environ.copy()
env.update(extra.data)
super().__init__(env)
class BuildCommandEnv(CommandEnv):
"""Configured and build-specific environment, used also as explicit docker arguments/environment."""
def __init__(self, project: ProjectConfig, ref: GitRef, build_dir: Path) -> None:
env: Dict[str, str] = {"BUILD_DIR": build_dir.as_posix()}
env.update(ref.as_env())
env.update(project.as_env())
super().__init__(env)
class CommandProxy:
"""Call a binary and process stdout/stderr with build logger and async callbacks."""
T = TypeVar('T')
class OutputCallback(Generic[T]):
"""Registered to receive raw or parsed output lines. The default implementation discards."""
async def __call__(self, line: 'CommandProxy.T') -> None:
pass
class LoggerCallback(OutputCallback[str]):
"""Pass each output line to the build logger before passing to a chained parent."""
def __init__(self, level: int, logger: logging.Logger, parent: 'CommandProxy.OutputCallback[str]') -> None:
self._level: int = level
self._logger: logging.Logger = logger
self._parent: CommandProxy.OutputCallback[str] = parent
self._prefix: str = "! " if level >= logging.WARNING else "> "
async def __call__(self, line: str) -> None:
self._logger.log(self._level, self._prefix + line)
await self._parent(line)
class CommandProcess:
"""Subprocess context manager to ensure termination upon exit or exception."""
def __init__(self, logger: logging.Logger, args: List[str], env: CommandEnv, kill_timeout: float = 5.0) -> None:
self._logger: logging.Logger = logger
self._name: str = args[0]
self._args: List[str] = args[1:]
self._env: CommandEnv = env
self._kill_timeout: float = kill_timeout
self._process: Optional[asyncio.subprocess.Process] = None
async def _cancel(self) -> None:
if self._process is None or self._process.returncode is not None:
return
try:
self._logger.warning(f"Terminating {self._name}")
self._process.terminate()
await asyncio.wait_for(self._process.wait(), self._kill_timeout)
except (asyncio.TimeoutError, asyncio.CancelledError):
pass
except ProcessLookupError:
return
else:
return
try:
self._logger.error(f"Killing {self._name}")
self._process.kill()
except ProcessLookupError:
pass
async def __aenter__(self) -> asyncio.subprocess.Process:
"""Execute without shell, in a new process group, and with stdout/stderr pipes."""
try:
self._process = await asyncio.create_subprocess_exec(
self._name, *self._args, env=self._env,
close_fds=True, start_new_session=True,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL,
)
except OSError as e:
raise RuntimeError(f"Cannot run {self._name}: {str(e)}") from None
else:
return self._process
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
if exc_type:
self._logger.error(f"Cancelling {self._name} due to {exc_type.__name__}")
await self._cancel()
if self._process is not None and await self._process.wait() != 0:
raise RuntimeError(f"{self._name} exited with {self._process.returncode}") from exc_type
def __init__(self, logger: logging.Logger, build_logger: logging.Logger,
env: CommandEnv, timeout: Optional[float] = None) -> None:
self._logger: logging.Logger = logger
self._build_logger: logging.Logger = build_logger
self._env: CommandEnv = env
self._timeout: Optional[float] = timeout
async def _readline(self, stream: Optional[asyncio.StreamReader]) -> AsyncIterator[str]:
while stream is not None:
try:
line: bytes = await stream.readline()
except ValueError as e:
self._logger.error(f"Cannot read line: {str(e)}")
continue
except OSError as e:
self._logger.error(f"Cannot read: {str(e)}")
break
if not len(line):
break
else:
line = line.rstrip()
if len(line):
yield line.decode("utf-8", errors="replace")
async def _log_and_tee(self, stream: Optional[asyncio.StreamReader], cb: OutputCallback[str]) -> None:
async for line in self._readline(stream):
await cb(line)
async def exec(self, args: List[str],
cb_out: OutputCallback[str] = OutputCallback(),
cb_err: OutputCallback[str] = OutputCallback()) -> None:
"""Run command with timeout, processing output callbacks in the background."""
self._build_logger.info(f"$ {' '.join(args)}")
cb_out = CommandProxy.LoggerCallback(logging.DEBUG, self._build_logger, parent=cb_out)
cb_err = CommandProxy.LoggerCallback(logging.WARNING, self._build_logger, parent=cb_err)
readers: List[asyncio.Task[None]] = []
try:
async with self.CommandProcess(self._logger, args, self._env) as proc:
readers = [asyncio.create_task(self._log_and_tee(proc.stdout, cb_out)),
asyncio.create_task(self._log_and_tee(proc.stderr, cb_err))]
await asyncio.wait_for(proc.wait(), self._timeout)
finally:
await asyncio.gather(*readers, return_exceptions=True)
class Git(CommandProxy):
"""Git binary commands: ls-remote, clone, and checkout."""
class LsRemoteParser(CommandProxy.OutputCallback[str]):
"""Parse the output of ls-remote into ref (branch/tag) and commit hash."""
def __init__(self, logger: logging.Logger, parent: CommandProxy.OutputCallback[GitRef]) -> None:
self._logger: logging.Logger = logger
self._parent: CommandProxy.OutputCallback[GitRef] = parent
async def __call__(self, line: str) -> None:
try:
commit, ref = line.split(maxsplit=1)
git_ref: GitRef = GitRef(ref, commit)
except ValueError as e:
self._logger.warning(f"Cannot parse git ref, got '{line}': {str(e)}")
else:
await self._parent(git_ref)
async def ls_remote(self, remote: GitConfig, cb: CommandProxy.OutputCallback[GitRef]) -> None:
await self.exec([
"git", "--no-pager", "ls-remote", "--heads", "--tags", "--refs", "--quiet", "--", remote.remote
], cb_out=Git.LsRemoteParser(self._logger, cb))
async def clone(self, remote: GitConfig, ref: GitRef, path: Path) -> None:
shallow_args: List[str] = [
"--depth", "1", "--shallow-submodules", "--no-tags", "--single-branch", "--branch", ref.branch_or_tag
] if remote.shallow else []
await self.exec(
["git", "--no-pager", "-C", path.parent.as_posix()] +
["clone", "--no-local", "--no-hardlinks", "--recurse-submodules", "--verbose"] +
shallow_args +
["--", remote.remote, path.name]
)
async def checkout(self, remote: GitConfig, ref: GitRef, path: Path) -> None:
await self.exec(
["git", "--no-pager", "-C", path.as_posix(), "-c", "advice.detachedHead=false"] +
["checkout", "--recurse-submodules", "--force", "--no-guess", "--detach", ref.commit]
)
async def clone_ref(self, remote: GitConfig, ref: GitRef, path: Path) -> None:
# fetching an arbitrary commit might not be supported or allowed, so do a clone with checkout instead
# note that the checkout will fail on a shallow clone if there is a new head in the meanwhile
await self.clone(remote, ref, path)
await self.checkout(remote, ref, path)
class Docker(CommandProxy):
"""Docker binary commands: build and run."""
class DockerBuildParser(CommandProxy.OutputCallback[str]):
"""As neither --quiet nor tagging is forced, need to parse the build output for image id."""
def __init__(self) -> None:
self._patterns: List[re.Pattern[str]] = [
re.compile(r"writing image sha256:(?P<image>[0-9a-f]+)"),
re.compile(r"Successfully built (?P<image>[0-9a-f]+)"),
]
self._last_match: Optional[str] = None
async def __call__(self, line: str) -> None:
for pattern in self._patterns:
m: Optional[re.Match[str]] = pattern.search(line)
if m is not None:
self._last_match = m.group("image")
def get(self) -> str:
if self._last_match is None:
raise ValueError("Cannot parse build log for image")
return self._last_match
def _build_command(self, config: DockerConfig, expose_env: List[str]) -> Iterator[str]:
"""Get the config-dependent first part of the docker build command. Raises ValueError."""
yield from ["docker", "build", "--rm", "--force-rm"] # always remove intermediate containers
if config.pull: # always attempt to pull a newer version of the image
yield "--pull"
if not config.cache: # do not use cache when building the image
yield "--no-cache"
if config.tag is not None: # name and optionally a tag in name:tag format
yield from ["--tag", config.tag.substitute(self._env)]
if config.label is not None: # set metadata for an image in name=value format
yield from ["--label", config.label.substitute(self._env)]
for var_name in expose_env: # set build-time variables
yield from ["--build-arg", var_name]
def _run_command(self, config: DockerConfig, expose_env: List[str]) -> Iterator[str]:
"""Get the config-dependent first part of the docker run command. Raises ValueError."""
yield from ["docker", "run", "--rm", "--pull", "never"]
if config.read_only: # mount the container's root filesystem as read only
yield "--read-only"
if config.docker_in_docker: # XXX: assumes standard unix socket
yield from ["-v", "/var/run/docker.sock:/var/run/docker.sock"]
if config.label is not None: # set metadata on a container in name=value format
yield from ["--label", config.label.substitute(self._env)]
if config.tmpfs > 0: # mount a tmpfs directory
yield from ["--tmpfs", f"/tmp:size={config.tmpfs}%,mode=1777,rw,relatime"] # nosec
for var_name in expose_env: # set environment variables
yield from ["--env", var_name]
async def build(self, config: DockerConfig, path: Path, env: List[str]) -> str:
cb: Docker.DockerBuildParser = Docker.DockerBuildParser()
await self.exec(list(self._build_command(config, env)) +
["--file", (path / config.dockerfile).as_posix(), "--", path.as_posix()],
cb_err=cb)
return cb.get()
async def run(self, config: DockerConfig, image: str, name: str, volume: Path, drop: Path, env: List[str]) -> None:
await self.exec(list(self._run_command(config, env)) +
["-v", f"{volume.as_posix()}:/build:rw", "-v", f"{drop.as_posix()}:/drop:rw"] +
["--name", name, "--", image])
class Rsync(CommandProxy):
@classmethod
def _sync_command(cls, config: RsyncConfig, checkout: Path) -> Iterator[str]:
yield from ["rsync", "--verbose", "--recursive", "--links"]
if config.checksum:
yield from ["--checksum"]
if config.preserve:
yield from ["--owner", "--group", "--perms", "--times"]
if config.delete:
yield from ["--delete-after", "--delete-excluded"]
if config.include is not None:
yield from ["--include-from", (checkout / config.include).as_posix()]
else:
yield from ["--exclude", ".git"]
async def sync(self, config: RsyncConfig, checkout: Path, source: Path) -> None:
if config.destination is not None:
await self.exec(list(self._sync_command(config, checkout)) + [
"--", source.as_posix() + "/", config.destination
])
# endregion
# region Runner
####
class Runner:
"""Perform the actual git and docker operations of a build task."""
@dataclass(frozen=True)
class RunId:
"""Unique identifier used for exclusively locking both branch/tag and commit per project."""
project: str
ref: str
commit: str
def __init__(self, build: BuildContext) -> None:
self._build: BuildContext = build
self._logger: logging.Logger = self._build.logger
self._logger.info(f"Enqueueing {self._build}")
@property
def run_id(self) -> RunId:
return Runner.RunId(project=self._build.project.name,
ref=self._build.ref.branch_or_tag,
commit=self._build.ref.short_commit)
async def run(self) -> None:
"""Main callback doing the actual work."""
self._logger.info(f"Running for {self._build}")
try:
build_logger: QueuedLogger = self._build.get_build_logger()
async with build_logger as logger:
env: CommandEnv = BuildCommandEnv(self._build.project, self._build.ref, self._build.root_dir)
command_env: CommandEnv = OsCommandEnv(env)
env_keys: List[str] = list(env.keys())
git: Git = Git(self._logger, logger, command_env, self._build.project.timeout)
docker: Docker = Docker(self._logger, logger, command_env, self._build.project.timeout)
rsync: Rsync = Rsync(self._logger, logger, command_env, self._build.project.timeout)
callback: CommandProxy = CommandProxy(self._logger, logger, command_env, self._build.project.timeout)
try:
await git.clone_ref(self._build.project.remote, self._build.ref, self._build.build_dir)
if self._build.project.prepare_callback is not None:
await callback.exec([self._build.project.prepare_callback.as_posix()])
if self._build.project.docker.build:
image: str = await docker.build(self._build.project.docker, self._build.build_dir, env_keys)
name: str = f"{self._build.project.name}-{self._build.ref.short_commit}"
if self._build.project.docker.run:
await docker.run(self._build.project.docker, image, name,
self._build.build_dir, self._build.drop_dir, env_keys)
await rsync.sync(self._build.project.rsync, self._build.build_dir, self._build.drop_dir)
else:
await rsync.sync(self._build.project.rsync, self._build.build_dir, self._build.build_dir)
else:
await rsync.sync(self._build.project.rsync, self._build.build_dir, self._build.build_dir)
if self._build.project.success_callback is not None:
await build_logger.flush()
await callback.exec([self._build.project.success_callback.as_posix()])
except Exception:
if self._build.project.error_callback is not None:
await build_logger.flush()
await callback.exec([self._build.project.error_callback.as_posix()])
raise
else:
self._build.finalize() # NB: build logger file stream still active
except asyncio.CancelledError:
self._logger.error("Run cancelled")
except asyncio.TimeoutError:
self._logger.error("Run timeout")
except (RuntimeError, ValueError, OSError) as e:
self._logger.error(f"Run failed with {e.__class__.__name__}: {str(e)}")
else:
self._logger.info("Success")
class RemoteWatcher:
"""Watch git remotes for new commits that match the project configuration."""
WorkQueue = asyncio.Queue[Optional[Tuple[str, GitRef]]]
class RefCallback(CommandProxy.OutputCallback[GitRef]):
"""Parse git output for new refs to be submitted to the given queue."""
def __init__(self, logger: logging.Logger, project: ProjectConfig, queue: 'RemoteWatcher.WorkQueue') -> None:
self._logger: logging.Logger = logger
self._project: ProjectConfig = project
self._queue: RemoteWatcher.WorkQueue = queue
self._cache: DefaultDict[str, str] = defaultdict(str)
def _match(self, ref: GitRef) -> bool:
if ref.branch is not None:
if self._project.branches is not None and self._project.branches.fullmatch(ref.branch) is not None:
return True
if ref.tag is not None:
if self._project.tags is not None and self._project.tags.fullmatch(ref.tag) is not None:
return True
return False
async def __call__(self, ref: GitRef) -> None:
if self._cache[ref.ref] != ref.commit:
self._cache[ref.ref] = ref.commit
if self._match(ref):
self._logger.info(f"{self._project.name} now at {ref}")
self._queue.put_nowait((self._project.name, ref))
def __init__(self, projects: List[ProjectContext], interval: int) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._git_logger: logging.Logger = self._logger.getChild("git")
self._interval: int = max(1, interval)
self._shutdown: asyncio.Event = asyncio.Event()
self._queue: RemoteWatcher.WorkQueue = asyncio.Queue()
self._task: Optional[asyncio.Task[None]] = None
self._projects: Dict[str, ProjectContext] = {_.project.name: _ for _ in projects}
self._callbacks: Dict[str, RemoteWatcher.RefCallback] = {
_.project.name: RemoteWatcher.RefCallback(self._logger, _.project, self._queue) for _ in projects
}
async def subscribe(self) -> AsyncIterator[Tuple[ProjectContext, GitRef]]:
"""Block on new refs to be enqueued as build tasks, until explicitly cancelled."""
while self._task is not None and not self._shutdown.is_set():
name_ref: Optional[Tuple[str, GitRef]] = await self._queue.get()
if name_ref is None:
self._shutdown.set()
else:
yield self._projects[name_ref[0]], name_ref[1]
def cancel(self) -> None:
self._shutdown.set()
self._queue.put_nowait(None)
async def _wait(self, timeout: float) -> bool:
try:
return not await asyncio.wait_for(self._shutdown.wait(), timeout)
except (asyncio.CancelledError, asyncio.TimeoutError):
return True
async def _worker(self) -> None:
git: Git = Git(self._logger, self._git_logger, OsCommandEnv())
interval: float = self._interval / len(self._projects)
while True:
for name, project in self._projects.items():
if not await self._wait(interval):
self._queue.put_nowait(None)
return
try:
await git.ls_remote(project.project.remote, self._callbacks[name])
except Exception as e:
self._logger.error(str(e)) # but go on
async def __aenter__(self) -> 'RemoteWatcher':
if self._task is None and len(self._projects):
self._logger.info(f"Watching {len(self._projects)} remotes")
self._task = asyncio.create_task(self._worker())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
self.cancel()
if self._task is not None:
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
self._logger.info("Stopped remote watchdog")
# endregion
# region Tasks
####
class LimitProvider:
"""Ensure the globally configured max parallelism on build tasks. Also maintain per-project ref semaphores."""
LockId = Tuple[str, ...]
def __init__(self, global_limit: int) -> None:
self._global_limit: asyncio.Semaphore = asyncio.BoundedSemaphore(max(1, global_limit))
self._ref_lock: DefaultDict[LimitProvider.LockId, asyncio.Semaphore] = defaultdict(asyncio.BoundedSemaphore)
self._commit_lock: DefaultDict[LimitProvider.LockId, asyncio.Semaphore] = defaultdict(asyncio.BoundedSemaphore)
def global_limit(self) -> asyncio.Semaphore:
return self._global_limit
def ref_limit(self, run: Runner.RunId) -> asyncio.Semaphore:
return self._ref_lock[(run.project, run.ref)]
def commit_limit(self, run: Runner.RunId) -> asyncio.Semaphore:
return self._commit_lock[(run.project, run.commit)]
class TaskSet:
"""Enqueue, maintain, and run all pending build tasks. No dependencies, so can outlive reconfigurations."""
def __init__(self, limits: LimitProvider) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._limits: LimitProvider = limits
self._tasks: Set[asyncio.Task[None]] = set()
async def _run(self, runner: Runner) -> None:
try:
async with self._limits.global_limit():
async with self._limits.ref_limit(runner.run_id):
async with self._limits.commit_limit(runner.run_id):
await runner.run()
except asyncio.CancelledError:
self._logger.warning("Runner task cancelled")
except BaseException as e:
self._logger.error("Runner task failed", exc_info=e)
async def __aenter__(self) -> 'TaskSet':
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
if exc_type:
self.cancel()
if len(self._tasks):
self._logger.info(f"Waiting for {len(self._tasks)} remaining tasks")
await asyncio.gather(*self._tasks, return_exceptions=True) # ignoring exceptions, logged by task
async def enqueue(self, runner: Runner) -> None:
task: asyncio.Task[None] = asyncio.create_task(self._run(runner))
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)
def cancel(self) -> None:
if len(self._tasks):
self._logger.warning(f"Cancelling {len(self._tasks)} tasks")
for task in self._tasks:
task.cancel()
class SignalHandler:
"""Put received signals on a stack and call registered callbacks."""
Callback = Callable[[], None]
class Signal(IntEnum):
"""Signal values in increasing severity and (accidentally also) integer value."""
none = 0
reconfigure = signal.SIGHUP
term_soft = signal.SIGINT
term = signal.SIGQUIT
term_hard = signal.SIGTERM
class Watcher:
"""Context manager to (un-)register a callback when a signal of the given severity (or higher) occurs."""
def __init__(self, signals: 'SignalHandler', sig: 'SignalHandler.Signal', cb: 'SignalHandler.Callback') -> None:
self._signals: SignalHandler = signals
self._min_signal: SignalHandler.Signal = sig
self._callback: SignalHandler.Callback = cb
def __enter__(self) -> None:
self._signals._register(self._min_signal, self._callback)
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self._signals._unregister(self._min_signal, self._callback)
def __init__(self) -> None:
self._logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self._signals: Set[SignalHandler.Signal] = set()
self._callbacks: DefaultDict[SignalHandler.Signal, Set[SignalHandler.Callback]] = defaultdict(set)
loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
for s in SignalHandler.Signal:
if s > SignalHandler.Signal.none:
loop.add_signal_handler(s.value, self._handler, s)
def _handler(self, sig: 'SignalHandler.Signal') -> None:
self._logger.warning(f"Received {sig.name} signal {sig.value}")
self._signals.add(sig)
for cb in [_ for min_sig, cbs in self._callbacks.items() for _ in cbs if sig >= min_sig]: # NB: copy list
cb()
def _register(self, min_sig: 'SignalHandler.Signal', cb: Callable[[], None]) -> None:
self._callbacks[min_sig].add(cb)
if self.max_signal() >= min_sig:
cb()
def _unregister(self, min_sig: 'SignalHandler.Signal', cb: Callable[[], None]) -> None:
self._callbacks[min_sig].discard(cb)
def max_signal(self) -> Signal:
"""The highest-severity recorded signal."""
return max(self._signals) if len(self._signals) else SignalHandler.Signal.none
def reset(self) -> None:
"""Start recording signals from scratch."""
self._signals.clear()
# endregion
# region Main
####
def _setup_logging() -> QueuedLogger:
"""Hard-coded logging subsystem configuration to (queued) stderr."""
logging.raiseExceptions = False
logging.logThreads = False
logging.logMultiprocessing = False
logging.logProcesses = False
logging.basicConfig(level=logging.DEBUG, format="%(levelname)s %(name)s: %(message)s")
logging.getLogger(f"{RemoteWatcher.__name__}.git").setLevel(logging.WARNING)
return RootLogger()
async def _run(logger: logging.Logger, interval: int,
tasks: TaskSet, signals: SignalHandler, projects: List[ProjectContext]) -> bool:
"""Per-configuration-time main loop watching projects for new refs and enqueueing build tasks."""
async with RemoteWatcher(projects, interval) as watcher:
with SignalHandler.Watcher(signals, SignalHandler.Signal.reconfigure, watcher.cancel):
async for project, ref in watcher.subscribe():
try:
if not project.build_exists(ref):
await tasks.enqueue(Runner(project.build_acquire(ref)))
except OSError as e:
logger.error(str(e))
return False
return True
async def _main(config: PipelineConfig, project_config: Path) -> bool:
async with _setup_logging() as logger:
setlocale(LC_ALL, "C")
try:
config.workdir.mkdir(parents=True, exist_ok=True) # NB: default permissions and umask
workdir: Path = config.workdir.resolve(strict=True)
except OSError as e:
logger.error(f"Cannot create '{config.workdir}': {str(e)}")
return False
else:
logger.info(f"Using state in '{workdir}'")
signals: SignalHandler = SignalHandler()
tasks: TaskSet = TaskSet(LimitProvider(config.limit))
with SignalHandler.Watcher(signals, SignalHandler.Signal.term, tasks.cancel):
async with tasks as jobs:
while signals.max_signal() < SignalHandler.Signal.term_soft: # (re-)configure and run
signals.reset()
try:
projects: List[ProjectContext] = [
ProjectContext(_, workdir) for _ in Config.from_ini(project_config) if _.enabled
]
except RuntimeError as e:
logger.error(str(e))
return False # waits for background jobs
else:
logger.info(f"Configured {len(projects)} projects from '{project_config}'")
gc.collect()
if not await _run(logger, config.interval, jobs, signals, projects):
return False
logger.info("Done.")
return True
def _dump_config(workdir: Path, config: Path) -> int:
class ConfigEncoder(json.JSONEncoder):
def default(self, o: Any) -> Any:
if isinstance(o, re.Pattern):
return o.pattern
elif isinstance(o, Template):
return o.template
elif isinstance(o, Path):
return o.as_posix()
elif isinstance(o, CommandEnv):
return dict(o)
elif isinstance(o, ProjectConfig):
return asdict(o)
else:
return super().default(o)
print(ConfigEncoder(ensure_ascii=False, sort_keys=True, indent=2).encode({
project.name: {
"config": project,
"env": BuildCommandEnv(project, GitRef(ref="refs/heads/master", commit="0" * 40), workdir / "tmp")
} for project in Config.from_ini(config)
}))
return 0
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--limit", type=int, default=1, help="parallel build limit", metavar="NUM")
parser.add_argument("--interval", type=int, default=60, help="git remote check interval", metavar="SECONDS")
parser.add_argument("--workdir", type=Path, required=True, help="output and state directory", metavar="PATH")
parser.add_argument("--config", type=Path, required=True, help="project configuration file", metavar="CONFIG.INI")
parser.add_argument("--dump-config", action='store_const', const=True, default=False, help="dump config and exit")
args = parser.parse_args()
if args.dump_config:
return _dump_config(args.workdir, args.config)
return 0 if asyncio.run(_main(PipelineConfig(workdir=args.workdir,
limit=args.limit,
interval=args.interval), args.config)) else 1
if __name__ == "__main__":
sys.exit(main())
####
# endregion