dump data in json

This commit is contained in:
Jean-Marie Mineau 2024-10-30 11:50:40 +01:00
parent 0d524d6a3e
commit 3d7764d958
2 changed files with 45 additions and 69 deletions

View file

@ -1,7 +1,7 @@
import zipfile import zipfile
import io import io
import hashlib import hashlib
import pprint import json
from argparse import ArgumentParser from argparse import ArgumentParser
from pathlib import Path from pathlib import Path
@ -53,8 +53,8 @@ def main():
type=Path, type=Path,
) )
parser.add_argument( parser.add_argument(
"--pprint", "--json",
help="Print the result with pprint and add more information when available", help="Print the results in json format with additionnal data",
action="store_true", action="store_true",
) )
@ -82,6 +82,8 @@ def main():
else: else:
logfile = None logfile = None
json_data = {}
if args.output_dir: if args.output_dir:
if not args.output_dir.exists(): if not args.output_dir.exists():
args.output_dir.mkdir(parents=True) args.output_dir.mkdir(parents=True)
@ -107,12 +109,15 @@ def main():
if args.output_dir and (args.output_dir / sha256).exists(): if args.output_dir and (args.output_dir / sha256).exists():
continue continue
if args.pprint:
print(f"APK: {str(apk_path)}")
with apk_path.open("rb") as file: with apk_path.open("rb") as file:
with zipfile.ZipFile(file) as apk: with zipfile.ZipFile(file) as apk:
try: try:
entry = analyze(apk, sha256, verbose=args.pprint) if args.json:
json_data[sha256] = {"file": str(apk_path)}
json_out = json_data[sha256]
else:
json_out = None
entry = analyze(apk, sha256, json_out=json_out)
except Exception as e: except Exception as e:
log = f"[{datetime.today().strftime('%Y-%m-%d %H:%M:%S')}] Failed to analyzed {sha256}: {e}, abort" log = f"[{datetime.today().strftime('%Y-%m-%d %H:%M:%S')}] Failed to analyzed {sha256}: {e}, abort"
if logfile: if logfile:
@ -121,8 +126,6 @@ def main():
else: else:
print(log) print(log)
continue continue
if args.pprint:
pprint.pprint(entry)
if not args.output_dir: if not args.output_dir:
print(entry.to_string()) print(entry.to_string())
else: else:
@ -130,6 +133,8 @@ def main():
file.write(entry) file.write(entry)
if apks: if apks:
if args.json:
print(json.dumps(json_data))
exit() exit()
# Case 2: apk from SHA256 # Case 2: apk from SHA256
@ -173,7 +178,11 @@ def main():
continue continue
with zipfile.ZipFile(io.BytesIO(apk_bin)) as apk: with zipfile.ZipFile(io.BytesIO(apk_bin)) as apk:
try: try:
entry = analyze(apk, sha256, verbose=args.pprint) if args.json:
json_out = json_data[sha256]
else:
json_out = None
entry = analyze(apk, sha256, json_out=json_out)
except Exception as e: except Exception as e:
log = f"[{datetime.today().strftime('%Y-%m-%d %H:%M:%S')}] Failed to analyzed {sha256}: {e}, abort" log = f"[{datetime.today().strftime('%Y-%m-%d %H:%M:%S')}] Failed to analyzed {sha256}: {e}, abort"
if logfile: if logfile:
@ -183,8 +192,6 @@ def main():
print(log) print(log)
continue continue
if args.pprint:
pprint.pprint(entry)
if not args.output_dir: if not args.output_dir:
print(entry.to_string()) print(entry.to_string())
else: else:
@ -195,6 +202,8 @@ def main():
print(f"finished {args.sha256_list}") print(f"finished {args.sha256_list}")
else: else:
print(f"finished {args.sha256}") print(f"finished {args.sha256}")
if args.json:
print(json.dumps(json_data))
def collect_to_db(): def collect_to_db():

View file

@ -78,7 +78,7 @@ class PlatformClassesData:
def scan_classes( def scan_classes(
apk: zipfile.ZipFile, file_names: set[str], verbose: bool = False apk: zipfile.ZipFile, file_names: set[str], json_out: dict | None = None
) -> PlatformClassesData: ) -> PlatformClassesData:
all_classes = set() all_classes = set()
duplicated_classes = set() duplicated_classes = set()
@ -144,61 +144,8 @@ def scan_classes(
and ty_name not in D8_CLASSES and ty_name not in D8_CLASSES
): ):
ref_platform_non_sdk_34_classes.add(ty_name) ref_platform_non_sdk_34_classes.add(ty_name)
if verbose:
if duplicated_classes:
print("Duplicated classes:")
for cl in duplicated_classes:
print(f" {cl}")
if platform_32_classes:
print("Redefined Platform Classes (v32):")
for cl in platform_32_classes:
print(f" {cl}")
if sdk_32_classes:
print("Redefined SDK Classes (v32):")
for cl in sdk_32_classes:
print(f" {cl}")
if platform_non_sdk_32_classes:
print("Redefined non-SDK Platform Classes (v32):")
for cl in platform_non_sdk_32_classes:
print(f" {cl}")
if ref_platform_non_sdk_32_classes:
print("Reference to non-SDK Platform Classes (v32):")
for cl in ref_platform_non_sdk_32_classes:
print(f" {cl}")
if platform_33_classes:
print("Redefined Platform Classes (v33):")
for cl in platform_33_classes:
print(f" {cl}")
if sdk_33_classes:
print("Redefined SDK Classes (v33):")
for cl in sdk_33_classes:
print(f" {cl}")
if platform_non_sdk_33_classes:
print("Redefined non-SDK Platform Classes (v33):")
for cl in platform_non_sdk_33_classes:
print(f" {cl}")
if ref_platform_non_sdk_33_classes:
print("Reference to non-SDK Platform Classes (v33):")
for cl in ref_platform_non_sdk_33_classes:
print(f" {cl}")
if platform_34_classes:
print("Redefined Platform Classes (v34):")
for cl in platform_34_classes:
print(f" {cl}")
if sdk_34_classes:
print("Redefined SDK Classes (v34):")
for cl in sdk_34_classes:
print(f" {cl}")
if platform_non_sdk_34_classes:
print("Redefined non-SDK Platform Classes (v34):")
for cl in platform_non_sdk_34_classes:
print(f" {cl}")
if ref_platform_non_sdk_34_classes:
print("Reference to non-SDK Platform Classes (v34):")
for cl in ref_platform_non_sdk_34_classes:
print(f" {cl}")
return PlatformClassesData( entry = PlatformClassesData(
nb_duplicate_classes=len(duplicated_classes), nb_duplicate_classes=len(duplicated_classes),
nb_def_platform_32_classes=len(platform_32_classes), nb_def_platform_32_classes=len(platform_32_classes),
nb_def_platform_non_sdk_32_classes=len(platform_non_sdk_32_classes), nb_def_platform_non_sdk_32_classes=len(platform_non_sdk_32_classes),
@ -213,9 +160,25 @@ def scan_classes(
nb_def_sdk_34_classes=len(sdk_34_classes), nb_def_sdk_34_classes=len(sdk_34_classes),
nb_ref_platform_non_sdk_34_classes=len(ref_platform_non_sdk_34_classes), nb_ref_platform_non_sdk_34_classes=len(ref_platform_non_sdk_34_classes),
) )
if json_out is not None:
data = json_out
data["duplicated_classes"] = list(duplicated_classes)
data["platform_32_classes"] = list(platform_32_classes)
data["sdk_32_classes"] = list(sdk_32_classes)
data["platform_non_sdk_32_classes"] = list(platform_non_sdk_32_classes)
data["ref_platform_non_sdk_32_classes"] = list(ref_platform_non_sdk_32_classes)
data["platform_33_classes"] = list(platform_33_classes)
data["sdk_33_classes"] = list(sdk_33_classes)
data["platform_non_sdk_33_classes"] = list(platform_non_sdk_33_classes)
data["ref_platform_non_sdk_33_classes"] = list(ref_platform_non_sdk_33_classes)
data["platform_34_classes"] = list(platform_34_classes)
data["sdk_34_classes"] = list(sdk_34_classes)
data["platform_non_sdk_34_classes"] = list(platform_non_sdk_34_classes)
data["ref_platform_non_sdk_34_classes"] = list(ref_platform_non_sdk_34_classes)
return entry
def analyze(apk: zipfile.ZipFile, sha256: str, verbose: bool = False) -> ApkData: def analyze(apk: zipfile.ZipFile, sha256: str, json_out: dict | None = None) -> ApkData:
classes_dex = set( classes_dex = set(
filter( filter(
lambda name: name.startswith("classes") and name.endswith(".dex"), lambda name: name.startswith("classes") and name.endswith(".dex"),
@ -263,9 +226,9 @@ def analyze(apk: zipfile.ZipFile, sha256: str, verbose: bool = False) -> ApkData
has_non_consecutive_classes_dex = True has_non_consecutive_classes_dex = True
break break
platform_classes_data = scan_classes(apk, classes_dex, verbose=verbose) platform_classes_data = scan_classes(apk, classes_dex, json_out=json_out)
return ApkData( entry = ApkData(
sha256=sha256, sha256=sha256,
**asdict(platform_classes_data), **asdict(platform_classes_data),
has_classes0_dex="classes0.dex" in classes_dex, has_classes0_dex="classes0.dex" in classes_dex,
@ -277,3 +240,7 @@ def analyze(apk: zipfile.ZipFile, sha256: str, verbose: bool = False) -> ApkData
has_non_numeric_classes_dex=has_non_numeric_classes_dex, has_non_numeric_classes_dex=has_non_numeric_classes_dex,
has_non_consecutive_classes_dex=has_non_consecutive_classes_dex, has_non_consecutive_classes_dex=has_non_consecutive_classes_dex,
) )
if json_out is not None:
json_out["entry"] = asdict(entry)
json_out["class_dex"] = list(classes_dex)
return entry