Move add and delete functionality to pkgfiles module, add tests

This commit is contained in:
Eric Torres 2019-03-25 14:20:36 -07:00
parent 9048a55c0f
commit fdc5510c46
4 changed files with 82 additions and 49 deletions

View File

@ -9,7 +9,6 @@ Functions:
import argparse import argparse
import logging import logging
import os.path
import shutil import shutil
import sys import sys
@ -17,6 +16,8 @@ import packaging_scripts.pacmanconf as pacmanconf
import packaging_scripts.pkgfiles as pkgfiles import packaging_scripts.pkgfiles as pkgfiles
import packaging_scripts.repos as repos import packaging_scripts.repos as repos
from pathlib import Path
# ========== Constants ========== # ========== Constants ==========
DB_EXT = "db.tar.xz" DB_EXT = "db.tar.xz"
LOGFORMAT = "==> %(levelname)s %(message)s" LOGFORMAT = "==> %(levelname)s %(message)s"
@ -43,19 +44,7 @@ syslog.addHandler(stdout_handler)
syslog.addHandler(stderr_handler) syslog.addHandler(stderr_handler)
# ========== Functions ========== # ========== Main Script ==========
def add_pkgfile(pkgfile, cachedir):
"""Add package file to the repository directory.
:param pkg: path of package to add
:type pkg: str, bytes, or path-like object
:param cachedir: cache directory to move package to
:type cachedir: str, bytes, or path-like object
"""
syslog.info(f"Adding {pkgfile} to {cachedir}")
shutil.move(pkgfile, os.path.join(cachedir, os.path.basename(pkgfile)))
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument( parser.add_argument(
@ -94,20 +83,9 @@ if __name__ == "__main__":
repo = args.repository repo = args.repository
pkgs = args.packages pkgs = args.packages
if args.opts is None: opts = [] if args.opts is None else args.opts
opts = [] cachedir = Path(args.cachedir) if args.cachedir else Path('/var') / "cache" / "pacman" / repo
else: db = cachedir / f"{args.db_filename}.{DB_EXT}" if args.db_filename else cachedir / f"{repo}.{DB_EXT}"
opts = args.opts
if args.cachedir is not None:
cachedir = args.cachedir
else:
cachedir = os.path.join("/var", "cache", "pacman", repo)
if args.db_filename is not None:
db = os.path.join(cachedir, f"{args.db_filename}.{DB_EXT}")
else:
db = os.path.join(cachedir, f"{repo}.{DB_EXT}")
if args.verbose: if args.verbose:
stdout_handler.setLevel(logging.DEBUG) stdout_handler.setLevel(logging.DEBUG)
@ -131,4 +109,4 @@ if __name__ == "__main__":
exit(E_REPO_ADDERR) exit(E_REPO_ADDERR)
for pkgfile in (*pkg_tarballs, *sigfiles): for pkgfile in (*pkg_tarballs, *sigfiles):
add_pkgfile(pkgfile, cachedir) pkgfiles.add_pkgfile(pkgfile, cachedir)

View File

@ -51,6 +51,7 @@ def del_pkgfile(pkg):
syslog.info(f"Removed {pkg}") syslog.info(f"Removed {pkg}")
# ========== Main Script ==========
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument( parser.add_argument(

View File

@ -1,18 +1,24 @@
"""Helper functions for dealing with package files.""" """Helper functions for dealing with package files."""
import logging
import re import re
import shutil
from pathlib import Path from pathlib import Path
# ========== Constants ========== # ========== Constants ==========
# Match any pkgfile of any name with the .pkg.tar.* extension # Match any pkgfile of any name with the .pkg.tar.* extension
PKGREGEX = "^[\w]+[.]pkg[.]tar([.][\w]+)?$" PKGREGEX = r"^\w+[.]pkg[.]tar([.]\w+)?$"
# Match any sigfile of any name with the .pkg.tar.*.sig extension # Match any sigfile of any name with the .pkg.tar.*.sig extension
SIGREGEX = "^[\w]+[.]pkg[.]tar[.][\w]*[.]sig$" SIGREGEX = r"^\w+[.]pkg[.]tar([.]\w+)?[.]sig$"
# ========== Logging Setup ===========
syslog = logging.getLogger(__name__)
# ========== Functions ========== # ========== Functions ==========
def get_pkgfiles(query=None, *, directory=None, signatures_only=False): def get(query=None, *, directory=None, signatures_only=False):
"""Return a list of package files in the current working directory. """Return a list of package files in the current working directory.
:param query: names of package files to search for :param query: names of package files to search for
@ -33,6 +39,28 @@ def get_pkgfiles(query=None, *, directory=None, signatures_only=False):
yield from _filter_by_regex(filequery, PKGREGEX, path) yield from _filter_by_regex(filequery, PKGREGEX, path)
def add(pkgfile, cachedir):
"""Add package file to the repository directory.
:param pkg: path of package to add
:type pkg: path-like object
:param cachedir: cache directory to move package to
:type cachedir: path-like object
"""
syslog.info(f"Adding {pkgfile} to {cachedir}")
shutil.move(pkgfile, cachedir / pkgfile.name)
def delete(pkg):
"""Remove package file.
:param pkg: path of package to remove
:type pkg: path-like object
"""
pkg.unlink()
syslog.info(f"Removed {pkg}")
def _filter_by_regex(query, regex_expression, path): def _filter_by_regex(query, regex_expression, path):
"""Filter package files only. """Filter package files only.

View File

@ -4,15 +4,15 @@ import unittest
from pathlib import Path from pathlib import Path
from types import GeneratorType from types import GeneratorType
from unittest.mock import patch from unittest.mock import MagicMock, patch
# ========== Constants ========== # ========== Constants ==========
TESTING_MODULE = f"packaging_scripts.pkgfiles" TESTING_MODULE = f"packaging_scripts.pkgfiles"
# Match any pkgfile of any name with the .pkg.tar.* extension # Match any pkgfile of any name with the .pkg.tar.* extension
PKGREGEX = "^[\w]+[.]pkg[.]tar([.][\w]+)?$" PKGREGEX = r"^\w+[.]pkg[.]tar([.]\w+)?$"
# Match any sigfile of any name with the .pkg.tar.*.sig extension # Match any sigfile of any name with the .pkg.tar.*.sig extension
SIGREGEX = '^.+[.]pkg[.]tar[.].+[.]sig$' SIGREGEX = r"^\w+[.]pkg[.]tar([.]\w+)?[.]sig$"
ALL_PKGFILES = [ ALL_PKGFILES = [
Path("pkg1.pkg.tar.xz"), Path("pkg1.pkg.tar.xz"),
@ -20,7 +20,7 @@ ALL_PKGFILES = [
Path("pkg2.pkg.tar.xz"), Path("pkg2.pkg.tar.xz"),
Path("pkg3.pkg.tar.xz"), Path("pkg3.pkg.tar.xz"),
Path("pkg3.pkg.tar.xz.sig"), Path("pkg3.pkg.tar.xz.sig"),
Path('notapkg.sig'), Path("notapkg.sig"),
] ]
@ -31,10 +31,7 @@ def get_all_files(*args):
def filter_files(query): def filter_files(query):
for f in [ for f in [Path("pkg3.pkg.tar.xz"), Path("pkg3.pkg.tar.xz.sig")]:
Path("pkg3.pkg.tar.xz"),
Path("pkg3.pkg.tar.xz.sig"),
]:
yield f yield f
@ -47,7 +44,7 @@ class TestFilterPkgfiles(unittest.TestCase):
self.mocked_glob.side_effect = get_all_files self.mocked_glob.side_effect = get_all_files
def test_yield_no_query(self): def test_yield_no_query(self):
result = pkgfiles.get_pkgfiles() result = pkgfiles.get()
expected = [s for s in get_all_files() if re.match(PKGREGEX, str(s))] expected = [s for s in get_all_files() if re.match(PKGREGEX, str(s))]
self.assertListEqual(list(result), expected) self.assertListEqual(list(result), expected)
@ -56,7 +53,7 @@ class TestFilterPkgfiles(unittest.TestCase):
def test_yield_with_query(self): def test_yield_with_query(self):
self.mocked_glob.side_effect = filter_files self.mocked_glob.side_effect = filter_files
result = pkgfiles.get_pkgfiles('pkg3') result = pkgfiles.get("pkg3")
expected = [Path("pkg3.pkg.tar.xz")] expected = [Path("pkg3.pkg.tar.xz")]
self.assertListEqual(list(result), expected) self.assertListEqual(list(result), expected)
@ -74,7 +71,7 @@ class TestFilterSigfiles(unittest.TestCase):
self.mocked_glob.side_effect = get_all_files self.mocked_glob.side_effect = get_all_files
def test_yield_no_query(self): def test_yield_no_query(self):
result = pkgfiles.get_pkgfiles(signatures_only=True) result = pkgfiles.get(signatures_only=True)
expected = [s for s in get_all_files() if re.match(SIGREGEX, str(s))] expected = [s for s in get_all_files() if re.match(SIGREGEX, str(s))]
self.assertListEqual(list(result), expected) self.assertListEqual(list(result), expected)
@ -83,7 +80,7 @@ class TestFilterSigfiles(unittest.TestCase):
def test_yield_with_query(self): def test_yield_with_query(self):
self.mocked_glob.side_effect = filter_files self.mocked_glob.side_effect = filter_files
result = pkgfiles.get_pkgfiles('pkg3', signatures_only=True) result = pkgfiles.get("pkg3", signatures_only=True)
expected = [Path("pkg3.pkg.tar.xz.sig")] expected = [Path("pkg3.pkg.tar.xz.sig")]
self.assertListEqual(list(result), expected) self.assertListEqual(list(result), expected)
@ -91,3 +88,32 @@ class TestFilterSigfiles(unittest.TestCase):
def tearDown(self): def tearDown(self):
self.patched_path.stop() self.patched_path.stop()
class TestAddPkgfile(unittest.TestCase):
def setUp(self):
self.patched_shutil = patch(f"{TESTING_MODULE}.shutil")
self.mocked_shutil = self.patched_shutil.start()
self.pkgfile = Path("/home/user/pkgfile")
self.cachedir = Path("/var/cache/repo")
def test_pkgfile_path(self):
pkgfiles.add(self.pkgfile, self.cachedir)
self.mocked_shutil.move.assert_called_with(
self.pkgfile, Path("/var/cache/repo/pkgfile")
)
def tearDown(self):
self.patched_shutil.stop()
class TestDeletePkgfiles(unittest.TestCase):
def setUp(self):
self.pkgfile = MagicMock("/var/cache/repo/pkgfile", spec_set=Path)
def test_delete(self):
pkgfiles.delete(self.pkgfile)
self.pkgfile.unlink.assert_called()