mirror of
https://github.com/tuna/tunasync-scripts.git
synced 2025-04-20 04:12:42 +00:00
add sha256 validation
This commit is contained in:
parent
3fd629ef61
commit
f3dfbeab10
102
apt-sync.py
102
apt-sync.py
@ -99,6 +99,51 @@ def mkdir_with_dot_tmp(folder: Path)->Tuple[Path, Path]:
|
|||||||
tmpdir.mkdir(parents=True, exist_ok=True)
|
tmpdir.mkdir(parents=True, exist_ok=True)
|
||||||
return (folder, tmpdir)
|
return (folder, tmpdir)
|
||||||
|
|
||||||
|
def parse_packages_info(content: str, package_info = None, is_new = True):
|
||||||
|
|
||||||
|
if package_info is None:
|
||||||
|
package_info = {}
|
||||||
|
|
||||||
|
for pkg in content.split('\n\n'):
|
||||||
|
if len(pkg) < 10: # ignore blanks
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
pkg_filename = pattern_package_name.search(pkg).group(1)
|
||||||
|
pkg_size = int(pattern_package_size.search(pkg).group(1))
|
||||||
|
pkg_checksum = pattern_package_sha256.search(pkg).group(1)
|
||||||
|
if pkg_filename not in package_info:
|
||||||
|
if is_new:
|
||||||
|
pkg_info = {
|
||||||
|
'size': pkg_size,
|
||||||
|
'sha256': {
|
||||||
|
'new': pkg_checksum,
|
||||||
|
'old': None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
pkg_info = {
|
||||||
|
'size': None,
|
||||||
|
'sha256': {
|
||||||
|
'new': None,
|
||||||
|
'old': pkg_checksum
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
pkg_info = package_info[pkg_filename]
|
||||||
|
if is_new:
|
||||||
|
pkg_info['size'] = pkg_size
|
||||||
|
pkg_info['sha256']['new'] = pkg_checksum
|
||||||
|
else:
|
||||||
|
pkg_info['sha256']['old'] = pkg_checksum
|
||||||
|
package_info.update({
|
||||||
|
pkg_filename: pkg_info
|
||||||
|
})
|
||||||
|
except:
|
||||||
|
print("Failed to parse one package description", flush=True)
|
||||||
|
traceback.print_exc()
|
||||||
|
return package_info
|
||||||
|
return package_info
|
||||||
|
|
||||||
def move_files_in(src: Path, dst: Path):
|
def move_files_in(src: Path, dst: Path):
|
||||||
empty = True
|
empty = True
|
||||||
for file in src.glob('*'):
|
for file in src.glob('*'):
|
||||||
@ -110,6 +155,9 @@ def move_files_in(src: Path, dst: Path):
|
|||||||
print(f"{src} is empty")
|
print(f"{src} is empty")
|
||||||
|
|
||||||
def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Path, deb_set: Dict[str, int])->int:
|
def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Path, deb_set: Dict[str, int])->int:
|
||||||
|
|
||||||
|
package_info = {}
|
||||||
|
|
||||||
if not dest_base_dir.is_dir():
|
if not dest_base_dir.is_dir():
|
||||||
print("Destination directory is empty, cannot continue")
|
print("Destination directory is empty, cannot continue")
|
||||||
return 1
|
return 1
|
||||||
@ -134,6 +182,7 @@ def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Pa
|
|||||||
pkgidx_dir,pkgidx_tmp_dir = mkdir_with_dot_tmp(comp_dir / arch_dir)
|
pkgidx_dir,pkgidx_tmp_dir = mkdir_with_dot_tmp(comp_dir / arch_dir)
|
||||||
with open(release_file, "r") as fd:
|
with open(release_file, "r") as fd:
|
||||||
pkgidx_content=None
|
pkgidx_content=None
|
||||||
|
pkgidx_content_old = None
|
||||||
cnt_start=False
|
cnt_start=False
|
||||||
for line in fd:
|
for line in fd:
|
||||||
if cnt_start:
|
if cnt_start:
|
||||||
@ -146,6 +195,8 @@ def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Pa
|
|||||||
filename.startswith(f"Contents-{arch}"):
|
filename.startswith(f"Contents-{arch}"):
|
||||||
fn = Path(filename)
|
fn = Path(filename)
|
||||||
pkgidx_file = dist_dir / fn.parent / ".tmp" / fn.name
|
pkgidx_file = dist_dir / fn.parent / ".tmp" / fn.name
|
||||||
|
if pkgidx_file.stem == 'Packages':
|
||||||
|
pkgidx_file_old = Path(f'{dist_dir}/{filename}')
|
||||||
else:
|
else:
|
||||||
print(f"Ignore the file {filename}")
|
print(f"Ignore the file {filename}")
|
||||||
continue
|
continue
|
||||||
@ -176,12 +227,38 @@ def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Pa
|
|||||||
pkgidx_content = content.decode('utf-8')
|
pkgidx_content = content.decode('utf-8')
|
||||||
else:
|
else:
|
||||||
print("unsupported format")
|
print("unsupported format")
|
||||||
|
continue
|
||||||
|
|
||||||
|
package_info = parse_packages_info(pkgidx_content, package_info = package_info, is_new = True)
|
||||||
|
|
||||||
|
if not os.path.exists(pkgidx_file_old):
|
||||||
|
continue
|
||||||
|
|
||||||
|
with pkgidx_file_old.open('rb') as t: content = t.read()
|
||||||
|
if pkgidx_content_old is None and pkgidx_file_old.stem == 'Packages':
|
||||||
|
print(f"getting packages index content from old {pkgidx_file_old.name}", flush=True)
|
||||||
|
suffix = pkgidx_file.suffix
|
||||||
|
if suffix == '.xz':
|
||||||
|
pkgidx_content_old = lzma.decompress(content).decode('utf-8')
|
||||||
|
elif suffix == '.bz2':
|
||||||
|
pkgidx_content_old = bz2.decompress(content).decode('utf-8')
|
||||||
|
elif suffix == '.gz':
|
||||||
|
pkgidx_content_old = gzip.decompress(content).decode('utf-8')
|
||||||
|
elif suffix == '':
|
||||||
|
pkgidx_content_old = content.decode('utf-8')
|
||||||
|
else:
|
||||||
|
print("unsupported format")
|
||||||
|
continue
|
||||||
|
|
||||||
|
package_info = parse_packages_info(pkgidx_content_old, package_info = package_info, is_new = False)
|
||||||
|
|
||||||
|
|
||||||
# Currently only support SHA-256 checksum, because
|
# Currently only support SHA-256 checksum, because
|
||||||
# "Clients may not use the MD5Sum and SHA1 fields for security purposes, and must require a SHA256 or a SHA512 field."
|
# "Clients may not use the MD5Sum and SHA1 fields for security purposes, and must require a SHA256 or a SHA512 field."
|
||||||
# from https://wiki.debian.org/DebianRepository/Format#A.22Release.22_files
|
# from https://wiki.debian.org/DebianRepository/Format#A.22Release.22_files
|
||||||
if line.startswith('SHA256:'):
|
if line.startswith('SHA256:'):
|
||||||
cnt_start = True
|
cnt_start = True
|
||||||
|
|
||||||
if not cnt_start:
|
if not cnt_start:
|
||||||
print("Cannot find SHA-256 checksum")
|
print("Cannot find SHA-256 checksum")
|
||||||
return 1
|
return 1
|
||||||
@ -216,18 +293,13 @@ def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Pa
|
|||||||
err = 0
|
err = 0
|
||||||
deb_count = 0
|
deb_count = 0
|
||||||
deb_size = 0
|
deb_size = 0
|
||||||
for pkg in pkgidx_content.split('\n\n'):
|
for pkg_filename, pkg_info in package_info.items():
|
||||||
if len(pkg) < 10: # ignore blanks
|
pkg_size = pkg_info['size']
|
||||||
continue
|
pkg_checksum = pkg_info['sha256']
|
||||||
try:
|
|
||||||
pkg_filename = pattern_package_name.search(pkg).group(1)
|
if pkg_checksum['new'] is None and pkg_size is None:
|
||||||
pkg_size = int(pattern_package_size.search(pkg).group(1))
|
|
||||||
pkg_checksum = pattern_package_sha256.search(pkg).group(1)
|
|
||||||
except:
|
|
||||||
print("Failed to parse one package description", flush=True)
|
|
||||||
traceback.print_exc()
|
|
||||||
err = 1
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
deb_count += 1
|
deb_count += 1
|
||||||
deb_size += pkg_size
|
deb_size += pkg_size
|
||||||
|
|
||||||
@ -237,8 +309,8 @@ def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Pa
|
|||||||
dest_dir.mkdir(parents=True, exist_ok=True)
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
||||||
if dest_filename.suffix == '.deb':
|
if dest_filename.suffix == '.deb':
|
||||||
deb_set[str(dest_filename.relative_to(dest_base_dir))] = pkg_size
|
deb_set[str(dest_filename.relative_to(dest_base_dir))] = pkg_size
|
||||||
if dest_filename.is_file() and dest_filename.stat().st_size == pkg_size:
|
if dest_filename.is_file() and dest_filename.stat().st_size == pkg_size and pkg_checksum['old'] == pkg_checksum['new']:
|
||||||
print(f"Skipping {pkg_filename}, size {pkg_size}")
|
print(f"Skipping {pkg_filename}, size {pkg_size}, sha256 {pkg_checksum['new']}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
pkg_url=f"{base_url}/{pkg_filename}"
|
pkg_url=f"{base_url}/{pkg_filename}"
|
||||||
@ -253,8 +325,8 @@ def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Pa
|
|||||||
with dest_tmp_filename.open("rb") as f:
|
with dest_tmp_filename.open("rb") as f:
|
||||||
for block in iter(lambda: f.read(1024**2), b""):
|
for block in iter(lambda: f.read(1024**2), b""):
|
||||||
sha.update(block)
|
sha.update(block)
|
||||||
if sha.hexdigest() != pkg_checksum:
|
if sha.hexdigest() != pkg_checksum['new']:
|
||||||
print(f"Invalid checksum of {dest_filename}, expected {pkg_checksum}")
|
print(f"Invalid checksum of {dest_filename}, expected {pkg_checksum['new']}")
|
||||||
dest_tmp_filename.unlink()
|
dest_tmp_filename.unlink()
|
||||||
continue
|
continue
|
||||||
dest_tmp_filename.rename(dest_filename)
|
dest_tmp_filename.rename(dest_filename)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user