#!/usr/bin/python3 ''' Check MD5 sums for 'Multi-Arch: same' packages. ''' import argparse import collections import os import pipes import re import sys import subprocess as ipc import apt_pkg default_mirror = 'http://ftp.debian.org/debian' default_distribution = 'unstable' def setup_proxies(): apt_pkg.init_config() os.environ['http_proxy'] = apt_pkg.config.get('Acquire::http::Proxy', '') os.environ['ftp_proxy'] = apt_pkg.config.get('Acquire::ftp::Proxy', '') def log_download(url): print('D: {url}'.format(url=url), file=sys.stderr) def log_action(package, version, action): print( 'I: {pkg} {ver} => {action}'.format(pkg=package, ver=version, action=action), file=sys.stderr ) sys.stderr.flush() def log_error(package, version, message): print( 'E: {pkg} {ver} => {message}'.format(pkg=package, ver=version, message=message), file=sys.stderr ) class download: def __init__(self, url, pipe=None): self._url = url self._pipe = pipe def __enter__(self): log_download(self._url) quoted_url = pipes.quote(self._url) if self._url.startswith(('/', '.')): if self._pipe is not None: commandline = '< {url} {pipe}'.format(url=quoted_url, pipe=self._pipe) else: commandline = 'cat {url}'.format(url=quoted_url) else: commandline = 'wget -O- -q {url}'.format(url=quoted_url) if self._pipe is not None: commandline += ' | ' + self._pipe self._child = ipc.Popen(commandline, shell=True, stdout=ipc.PIPE) return self._child.stdout def __exit__(self, exc_type, exc_val, exc_tb): if self._child.wait() != 0: raise IOError def do_qa(options): data = collections.defaultdict(dict) if options.architectures is None: release_dist = options.distribution if release_dist in ('unstable', 'sid', 'experimental', 'rc-buggy'): release_dist = 'testing' url = '{mirror}/dists/{dist}/Release'.format( mirror=options.mirror, dist=release_dist ) with download(url) as release_tags: for para in apt_pkg.TagFile(release_tags): options.architectures = para['Architectures'].split() for architecture in options.architectures: for section in 'main', 'contrib', 'non-free': url = '{mirror}/dists/{dist}/{section}/binary-{arch}/Packages.gz'.format( mirror=options.mirror, dist=options.distribution, section=section, arch=architecture ) with download(url, pipe='gzip -dc') as package_tags: for pkgdata in apt_pkg.TagFile(package_tags): if pkgdata.get('Multi-Arch', '') == 'same': pkgname = pkgdata['Package'] if pkgname not in options.packages: continue pkgversion = pkgdata['Version'] url = '{mirror}/{path}'.format(mirror=options.mirror, path=pkgdata['Filename']) data[pkgname, pkgversion][architecture] = url last = None for (pkgname, pkgversion), urls in data.items(): if len(urls) <= 1: log_action(pkgname, pkgversion, 'skip') continue log_action(pkgname, pkgversion, 'download ({})'.format(' '.join(urls.keys()))) pkgdata = collections.defaultdict( lambda: collections.defaultdict(set) ) for architecture, url in urls.items(): try: with download(url, pipe='dpkg-deb -I /dev/stdin md5sums 2>/dev/null') as md5sums_file: for line in md5sums_file: md5sum = line[:32] filename = line[34:-1] pkgdata[filename][md5sum].add(architecture) except IOError: log_error(pkgname, pkgversion, 'missing md5sums for {arch}'.format(arch=architecture)) continue for filename, md5sums in pkgdata.items(): if len(md5sums) <= 1: continue if last != (pkgname, pkgversion): if last is not None: print() print('[{name} {ver}]'.format(name=pkgname, ver=pkgversion)) last = (pkgname, pkgversion) print(filename.decode('UTF-8', 'replace')) if options.compact: if all(len(x) == 1 for x in md5sums.values()): continue for md5sum, architectures in sorted(md5sums.items()): print(' {md5sum} {arch}'.format( md5sum=md5sum.decode('ASCII'), arch=' '.join(architectures) )) class Universum(object): def __contains__(self, other): return True def main(): setup_proxies() parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('--mirror', default=default_mirror, metavar='', help='use this mirror (default: {mirror})'.format(mirror=default_mirror) ) parser.add_argument('--distribution', default=default_distribution, metavar='', help='check this distribution (default: {dist})'.format(dist=default_distribution) ) parser.add_argument('--architectures', nargs='+', metavar='', help='check these architectures (default: all release architectures)' ) parser.add_argument('--packages', nargs='+', default=Universum(), metavar='', help='check only these packages (default: check all)' ) parser.add_argument('--compact', action='store_true', help='don\'t print MD5 sums if they are all different' ) options = parser.parse_args() if isinstance(options.packages, list): options.packages = frozenset(options.packages) do_qa(options) if __name__ == '__main__': main() # vim:ts=4 sw=4 et