This commit is contained in:
Jean-Marie Mineau 2024-10-28 14:58:27 +01:00
parent 48ec30204d
commit 6142719403
4 changed files with 68 additions and 24 deletions

View file

@ -8,7 +8,7 @@ from pathlib import Path
from getpass import getpass from getpass import getpass
from .androzoo import download_apk from .androzoo import download_apk
from .data import ApkData from .data import ApkData, load_from_directory
from .analysis import analyze from .analysis import analyze
@ -32,6 +32,11 @@ def main():
help="A file containing a list of application sha256s (one by line)", help="A file containing a list of application sha256s (one by line)",
type=Path, type=Path,
) )
apk_parser.add_argument(
"--logfile",
help="A file to store logs",
type=Path,
)
key_parser = parser.add_mutually_exclusive_group(required=False) key_parser = parser.add_mutually_exclusive_group(required=False)
key_parser.add_argument( key_parser.add_argument(
"--api-key-file", "--api-key-file",
@ -67,7 +72,18 @@ def main():
pass pass
args = parser.parse_args() args = parser.parse_args()
if args.logfile is not None:
logfile = args.logfile
elif args.apk_list is not None:
logfile = Path(".") / f"scan-{args.apk_list.name}.log"
elif args.sha256_list is not None:
logfile = Path(".") / f"scan-{args.sha256_list.name}.log"
else:
logfile = None
if args.output_dir: if args.output_dir:
if not args.output_dir.exists():
args.output_dir.mkdir(parents=True)
if not args.output_dir.is_dir(): if not args.output_dir.is_dir():
raise RuntimeError("--output-dir must be a directory") raise RuntimeError("--output-dir must be a directory")
args.output_dir.mkdir(parents=True, exist_ok=True) args.output_dir.mkdir(parents=True, exist_ok=True)
@ -142,7 +158,9 @@ def main():
for sha256 in sha256s: for sha256 in sha256s:
if args.output_dir and (args.output_dir / sha256).exists(): if args.output_dir and (args.output_dir / sha256).exists():
continue continue
with zipfile.ZipFile(io.BytesIO(download_apk(sha256, api_key))) as apk: with zipfile.ZipFile(
io.BytesIO(download_apk(sha256, api_key, logfile=logfile))
) as apk:
entry = analyze(apk, sha256, verbose=args.pprint) entry = analyze(apk, sha256, verbose=args.pprint)
if args.pprint: if args.pprint:
pprint.pprint(entry) pprint.pprint(entry)
@ -150,7 +168,12 @@ def main():
print(entry.to_string()) print(entry.to_string())
else: else:
with (args.output_dir / sha256).open("w") as file: with (args.output_dir / sha256).open("w") as file:
file.write(entry) file.write(entry.to_string())
if args.output_dir:
if args.sha256_list:
print(f"finished {args.sha256_list}")
else:
print(f"finished {args.sha256}")
def collect_to_db(): def collect_to_db():
@ -158,7 +181,6 @@ def collect_to_db():
prog="Android Class Shadowing Scan Collector", prog="Android Class Shadowing Scan Collector",
description="Collect Scan results into a database", description="Collect Scan results into a database",
) )
apk_parser = parser.add_mutually_exclusive_group(required=True)
parser.add_argument( parser.add_argument(
"--dir", "--dir",
help="The directory where the scan results are", help="The directory where the scan results are",
@ -173,7 +195,7 @@ def collect_to_db():
) )
parser.add_argument( parser.add_argument(
"--androzoo-list", "--androzoo-list",
help="The file 'latest.csv' or 'latest.csv.gz' from androzoo", help="The file 'latest_with-added-date.csv' or 'latest_with-added-date.csv.gz' from androzoo",
type=Path, type=Path,
required=False, required=False,
) )

View file

@ -222,7 +222,7 @@ def analyze(apk: zipfile.ZipFile, sha256: str, verbose: bool = False) -> ApkData
apk.namelist(), apk.namelist(),
) )
) )
dex_numbers = list( dex_numbers = set(
map( map(
int, int,
filter( filter(
@ -236,7 +236,6 @@ def analyze(apk: zipfile.ZipFile, sha256: str, verbose: bool = False) -> ApkData
), ),
) )
) )
dex_numbers.sort()
has_non_numeric_classes_dex = False has_non_numeric_classes_dex = False
for name in classes_dex: for name in classes_dex:
@ -246,14 +245,21 @@ def analyze(apk: zipfile.ZipFile, sha256: str, verbose: bool = False) -> ApkData
has_non_numeric_classes_dex = True has_non_numeric_classes_dex = True
has_non_consecutive_classes_dex = False has_non_consecutive_classes_dex = False
if "classes.dex" in classes_dex and dex_numbers: dex_numbers.discard(1)
has_non_consecutive_classes_dex = True dex_numbers.discard(0)
last_number = 1 if dex_numbers:
for i in range(len(dex_numbers)): max_dex_num = max(dex_numbers)
if dex_numbers[i] == 0 or dex_numbers[i] == 1: else:
continue if "classes.dex" in classes_dex:
# the list is sorted max_dex_num = 1
if dex_numbers[i] != last_number + 1: else:
max_dex_num = 0
for i in range(max_dex_num):
if i == 0:
name = "classes.dex"
else:
name = f"classes{i+1}.dex"
if name not in classes_dex:
has_non_consecutive_classes_dex = True has_non_consecutive_classes_dex = True
break break

View file

@ -1,10 +1,26 @@
import http.client import http.client
import time
import random
from datetime import datetime
from pathlib import Path
def download_apk(sha256: str, api_key: str) -> bytes: def download_apk(sha256: str, api_key: str, logfile: Path | None = None) -> bytes:
conn = http.client.HTTPSConnection("androzoo.uni.lu") while True:
conn.request("GET", f"/api/download?apikey={api_key}&sha256={sha256}") try:
resp = conn.getresponse() conn = http.client.HTTPSConnection("androzoo.uni.lu")
if resp.status != 200: conn.request("GET", f"/api/download?apikey={api_key}&sha256={sha256}")
raise RuntimeError(f"Failled to download APK {sha256}: {resp.reason}") resp = conn.getresponse()
return resp.read() if resp.status != 200:
raise RuntimeError(f"Failled to download APK {sha256}: {resp.reason}")
data = resp.read()
return data
except Exception as e:
delay = random.randint(1, 6)
log = f"[{datetime.today().strftime('%Y-%m-%d %H:%M:%S')}] Failed to download {sha256}: {e}, retry in {delay}s"
if logfile:
with logfile.open("a") as file:
file.write(f"{log}\n")
else:
print(log)
time.sleep(delay)

View file

@ -60,8 +60,8 @@ def load_from_directory(
for row in reader: for row in reader:
if row["sha256"] in androzoo_data: if row["sha256"] in androzoo_data:
androzoo_data[row["sha256"]] = ( androzoo_data[row["sha256"]] = (
int(row["first_seen_year"]), int(row["added"].split("-")[0]), # not worth parsing the date
int(row["vt_detection"]), int(row["vt_detection"]) if row["vt_detection"] else -1,
) )
def data_it(): def data_it():