Source code for pkgbuilder.transaction

# -*- encoding: utf-8 -*-
# PKGBUILDer v4.3.2
# An AUR helper (and library) in Python 3.
# Copyright © 2011-2023, Chris Warrick.
# See /LICENSE for licensing information.

"""
Package installation transactions.

.. versionadded:: 4.1.0

:Copyright: © 2011-2023, Chris Warrick.
:License: BSD (see /LICENSE).
"""

import os
import os.path
import time
import json
import enum
import pkgbuilder.utils
import pyalpm
from . import DS, _, __version__

__all__ = ('generate_filename', 'Transaction', 'TransactionStatus')


[docs]def generate_filename(absolute=True): """Generate a filename for the transaction.""" fn = "pkgbuilder-{0}.tx".format(int(time.time())) if absolute: return os.path.abspath(fn) else: return fn
[docs]class TransactionStatus(enum.Enum): """Transaction status.""" # validate, move, install, success undefined = 0 moved = 0b11 move_failed = 0b10 installed = 0b111 install_failed = 0b110 validated = 0b1111 validation_failed = 0b1110
[docs]class Transaction(object): """A package transaction.""" def __init__(self, pkgnames, pkgpaths, sigpaths, asdeps=True, uopt='', filename=None, delete=False, status=None, pacmanreturn=-1, invalid=-1): """Initialize a transaction. :param list pkgnames: package names to install :param list pkgpaths: absolute paths to packages to install :param list sigpaths: absolute paths to signatures for packages :param bool asdeps: Install with ``--asdeps`` option :param str uopt: additional options to ``pacman -U`` :param str filename: transaction file :param bool delete: delete transaction file if run successful :param TransactionStatus status: transaction status :param int pacmanreturn: Return code from ``pacman -U`` :param int invalid: number of invalid packages """ # all lists are deduplicated self.pkgnames = list(set(pkgnames)) self.pkgpaths = list(set(pkgpaths)) self.sigpaths = list(set(sigpaths)) self.asdeps = asdeps self.uopt = uopt self.filename = filename self.delete = delete if status is not None: self.status = status else: self.status = TransactionStatus.undefined self.pacmanreturn = pacmanreturn self.invalid = invalid if self.filename: self.save() def __repr__(self): """Return the representation of a transaction.""" s = "<Transaction {0} ({1})>" if self.filename: return s.format(self.filename, self.status) else: return s.format(hex(id(self)), self.status)
[docs] @classmethod def load(cls, filename): """Load a transaction file.""" with open(filename, 'r') as fh: jsondata = fh.read() tx = cls.fromjson(jsondata) tx.filename = filename DS.log.info("Transaction loaded: {0}".format(tx.filename)) return tx
[docs] def save(self, filename=None): """Save a transaction file.""" if filename is not None: self.filename = filename if self.filename: with open(self.filename, 'w+') as fh: fh.write(self.tojson()) DS.log.info("Transaction saved: {0}".format(self.filename))
[docs] @classmethod def fromjson(cls, jsondata): """Create a transaction from JSON data. The following fields exist: * ``__PBTX__`` — PKGBUILDer version * ``pkgnames`` — names of packages to install * ``pkgpaths`` — paths to packages to install * ``sigpaths`` — paths to attached signature files * ``asdeps`` — whether or not this is a dependency install * ``uopt`` — special (local) options to ``pacman -U`` * ``status`` — status code of the transaction * ``pacmanreturn`` — pacman return code * ``invalid`` — number of invalid packages """ txdata = json.loads(jsondata) return cls( pkgnames=txdata['pkgnames'], pkgpaths=txdata['pkgpaths'], sigpaths=txdata['sigpaths'], asdeps=txdata['asdeps'], uopt=txdata['uopt'], status=TransactionStatus(txdata['status']), pacmanreturn=txdata['pacmanreturn'], invalid=txdata['invalid'] )
[docs] def tojson(self): """Serialize a transaction to JSON.""" return json.dumps({ '__PBTX__': __version__, 'pkgnames': self.pkgnames, 'pkgpaths': self.pkgpaths, 'sigpaths': self.sigpaths, 'asdeps': self.asdeps, 'uopt': self.uopt, 'status': self.status.value, 'pacmanreturn': self.pacmanreturn, 'invalid': self.invalid, }, sort_keys=True, indent=4)
@property def exitcode(self): """Provide the most appropriate exit code.""" if self.invalid > -1: return self.invalid elif self.pacmanreturn > -1: return self.pacmanreturn else: return 1
[docs] def run(self, standalone=True, quiet=False, validate=True): """Run a transaction.""" if not quiet: if not standalone: DS.fancy_msg(_('Installing built packages...')) if self.filename: DS.fancy_msg(_('Running transaction from file {0}...').format( self.filename)) else: DS.fancy_msg(_('Running transaction...')) DS.log.info("Running transaction {0!r}".format(self)) self._test_sudo() ret = self.move(True, quiet) if ret != 0: self._print_txfail('move', quiet) return False ret = self.install(True, quiet) if ret != 0: self._print_txfail('install', quiet) return False if validate: ret = self.validate(quiet) if ret != 0: self._print_txfail('validate', quiet) return False DS.log.info("Transaction {0!r} succeeded".format(self)) if not quiet: DS.fancy_msg(_("Transaction succeeded.")) if self.delete and self.filename: os.remove(self.filename) if not quiet: DS.fancy_msg2(_("Deleted transaction file {0}").format( self.filename)) self.filename = None return True
def _print_txfail(self, stage, quiet): """Print transaction failure message.""" if not quiet: DS.log.error("Transaction {0!r} failed (stage {1})".format( self, stage)) if self.pacmanreturn == 0 and self.invalid > 0: # special case: retrying the transaction is not helpful, as it # won't help fix the validation status. The user should # investigate by reading the build logs and acting accordingly. DS.fancy_error(_("Some packages failed to build.")) else: DS.fancy_error(_("Transaction failed!")) if self.filename: c = 'c' if self.delete else '' DS.fancy_error2(_("To retry, run:")) DS.fancy_error2("pkgbuilder -X{c} {fn}".format( c=c, fn=self.filename)) def _test_sudo(self): """Test if sudo works.""" trueexit = 256 while trueexit != 0: trueexit = DS.sudo(['true']) def _set_status_from_return(self, returncode, success, failure): """Set status from return code.""" if returncode == 0: self.status = success else: self.status = failure self.save() def _pacman_pkgpath(self, pkgpath): """Return package path in pacman cache.""" return os.path.join('/var/cache/pacman/pkg/', os.path.basename(pkgpath)) @property def pacman_pkgpaths(self): """Return package paths, augmented for pacman.""" return [self._pacman_pkgpath(i) for i in self.pkgpaths]
[docs] def move(self, sudo_tested=False, quiet=False): """Move package and signature files to pacman cache. :param bool sudo_tested: if sudo was tested (password prompt) :param bool quiet: suppress messages :return: 0 on success, +mv return, -failed files :rtype: int """ if not sudo_tested: self._test_sudo() if not quiet: DS.fancy_msg2(_('Moving to /var/cache/pacman/pkg/...')) pkgpaths = [] sigpaths = [] failed_files = 0 for p in self.pkgpaths: pacp = self._pacman_pkgpath(p) if p == pacp: DS.log.warning("Not moving package file {0} -- " "already in pacman cache".format(p)) elif os.path.exists(p): pkgpaths.append(p) elif os.path.exists(pacp): DS.log.warning("Not moving package file {0} -- " "found in pacman cache".format(p)) else: DS.log.error("Not moving package file {0} -- " "not found".format(p)) if not quiet: DS.fancy_warning2(_("Package file {0} not found").format( p)) failed_files += 1 for s in self.sigpaths: pacs = self._pacman_pkgpath(p) if s == pacs: DS.log.warning("Not moving signature file {0} -- " "already in pacman cache".format(s)) elif os.path.exists(s): sigpaths.append(s) elif os.path.exists(pacs): DS.log.warning("Not moving signature file {0} -- " "found in pacman cache".format(s)) else: DS.log.error("Not moving signature file {0} -- " "not found".format(s)) if not quiet: DS.fancy_warning2(_("Signature file {0} not found").format( s)) failed_files += 1 DS.log.debug('mv {0} {1} /var/cache/pacman/pkg/'.format( pkgpaths, sigpaths)) ret = -failed_files if pkgpaths or sigpaths: ret = DS.sudo(['mv'] + pkgpaths + sigpaths + ['/var/cache/pacman/pkg/']) self._set_status_from_return(ret, TransactionStatus.moved, TransactionStatus.move_failed) return ret
[docs] def install(self, sudo_tested=False, quiet=False): """Install packages through ``pacman -U``. :param bool sudo_tested: if sudo was tested (password prompt) :param bool quiet: suppress messages :return: pacman return code :rtype: int """ if not sudo_tested: self._test_sudo() if not quiet: DS.fancy_msg2(_('Installing with pacman -U...')) npkgpaths = self.pacman_pkgpaths uopt = self.uopt.strip() if self.asdeps: uopt = uopt + ' --asdeps' if not DS.confirm: uopt = uopt + ' --noconfirm' uopt = uopt.strip() if uopt: DS.log.debug('$PACMAN -U {0} {1}'.format(uopt, npkgpaths)) ret = DS.sudo([DS.paccommand, '-U'] + uopt.split(' ') + npkgpaths) else: DS.log.debug('$PACMAN -U {0}'.format(npkgpaths)) ret = DS.sudo([DS.paccommand, '-U'] + npkgpaths) self.pacmanreturn = ret self._set_status_from_return(ret, TransactionStatus.installed, TransactionStatus.install_failed) return ret
[docs] def validate(self, quiet): """Check if packages were installed. :param bool quiet: suppress messages :return: number of packages that were not installed :rtype: int """ if self.pkgnames: if not quiet: DS.fancy_msg(_('Validating installation status...')) DS.log.info('Validating: ' + '; '.join(self.pkgnames)) DS.pycreload() localdb = DS.pyc.get_localdb() aurpkgs = {aurpkg.name: aurpkg.version for aurpkg in pkgbuilder.utils.info(self.pkgnames)} wrong = len(self.pkgnames) else: wrong = 0 for pkgname in self.pkgnames: lpkg = localdb.get_pkg(pkgname) try: aurversion = aurpkgs[pkgname] except KeyError: if not lpkg: if not quiet: DS.fancy_error2(_('{0}: not an AUR package').format( pkgname)) else: wrong -= 1 if not quiet: DS.fancy_msg2(_('{0}: installed {1}').format( pkgname, lpkg.version)) else: if not lpkg: if not quiet: DS.fancy_error2(_('{0}: NOT installed').format( pkgname)) else: if pyalpm.vercmp(aurversion, lpkg.version) > 0: if not quiet: DS.fancy_error2(_('{0}: outdated {1}').format( pkgname, lpkg.version)) else: wrong -= 1 if not quiet: DS.fancy_msg2(_('{0}: installed {1}').format( pkgname, lpkg.version)) self.invalid = wrong self._set_status_from_return(wrong, TransactionStatus.validated, TransactionStatus.validation_failed) return wrong