From 29132f5a58b2d38dad2242a207218cadc9e85d07 Mon Sep 17 00:00:00 2001 From: Jean-Marie Mineau Date: Wed, 14 May 2025 15:39:44 +0200 Subject: [PATCH] wip --- experiment/frida_collect.sh | 10 +- experiment/orchestrator.py | 8 +- frida/theseus_frida/__init__.py | 282 ++++++++++++++++---------------- 3 files changed, 160 insertions(+), 140 deletions(-) diff --git a/experiment/frida_collect.sh b/experiment/frida_collect.sh index 1ea985a..ed9d7e4 100644 --- a/experiment/frida_collect.sh +++ b/experiment/frida_collect.sh @@ -2,4 +2,12 @@ FOLDER=$(dirname "$(realpath $0)") -"${FOLDER}/venv/bin/collect-runtime-data" --apk "${1}" --device "${2}" --output "${3}/data.json" --dex-dir "${3}" +APK="${1}" +DEVICE="${2}" +OUT_DIR="${3}" + +echo "APK=${APK}" +echo "DEVICE=${DEVICE}" +echo "OUT_DIR=${OUT_DIR}" + +"${FOLDER}/venv/bin/collect-runtime-data" --apk "${APK}" --device "${DEVICE}" --output "${OUT_DIR}/data.json" --dex-dir "${3}" diff --git a/experiment/orchestrator.py b/experiment/orchestrator.py index ccaeb9a..1324aa7 100644 --- a/experiment/orchestrator.py +++ b/experiment/orchestrator.py @@ -6,7 +6,7 @@ import subprocess import threading import argparse -EMULATORS = [f"root34-{i}" for i in range(20)] +EMULATORS = [f"root34-{i}" for i in range(1)] ANDROID_IMG = "system-images;android-34;default;x86_64" if "ANDROID_HOME" in os.environ: @@ -105,6 +105,8 @@ def del_emulators(): def worker(emu: str, apklist: list[str], out_folder: Path, script: Path): console_port, adb_port = get_ports(emu) + script_env = os.environ.copy() + script_env["ANDROID_HOME"] = str(ANDROID_HOME) while apklist: apk = apklist.pop() folder_name = apk.split("/")[-1].removesuffix(".apk") @@ -127,10 +129,12 @@ def worker(emu: str, apklist: list[str], out_folder: Path, script: Path): f"{console_port},{adb_port}", ] ) + subprocess.run([ADB, "-s", f"emulator-{console_port}", "wait-for-device"]) # Run script subprocess.run( - ["bash", str(script), f"emulator-{console_port}", apk, str(out_folder)] + ["bash", str(script), apk, f"emulator-{console_port}", str(folder)], + env=script_env, ) # stop emulator diff --git a/frida/theseus_frida/__init__.py b/frida/theseus_frida/__init__.py index edb516b..5371515 100644 --- a/frida/theseus_frida/__init__.py +++ b/frida/theseus_frida/__init__.py @@ -341,54 +341,6 @@ def collect_runtime( android_sdk_path: Path | None = None, apk_explorer: None | Callable[[], None] = None, ): - env = dict(os.environ) - - if adb_path is not None: - adb = str(adb_path) - elif adb_path is None and android_sdk_path is None: - adb = "adb" - elif not (android_sdk_path / "platform-tools" / "adb").exists(): - adb = "adb" - else: - adb = str(android_sdk_path / "platform-tools" / "adb") - - if not file_storage.exists(): - file_storage.mkdir(parents=True) - if not file_storage.is_dir(): - print("[!] file_storage must be a directory") - exit() - - device = setup_frida(device_name, env, adb) - - app = get_apkid(apk)[0] - - if device.enumerate_applications([app]): - # Uninstall the APK if it already exist - subprocess.run([adb, "uninstall", app], env=env) - subprocess.run([adb, "install", "-g", str(apk.absolute())], env=env) - - with FRIDA_SCRIPT.open("r") as file: - jsscript = file.read() - with STACK_CONSUMER_B64.open("r") as file: - jsscript = jsscript.replace( - "", - file.read().replace("\n", "").strip(), - ) - - pid = device.spawn([app]) - session = device.attach(pid) - try: - script = session.create_script(jsscript) - except frida.InvalidArgumentError as e: - print("[!] Error:") - print( - " " - + "\n ".join( - map(lambda v: f"{v[0]+1: 3} {v[1]}", enumerate(script.split("\n"))) - ) - ) - raise e - data_storage: dict[str, Any] = { "invoke_data": [], "class_new_inst_data": [], @@ -397,103 +349,159 @@ def collect_runtime( "classloaders": {}, "app_info": {}, } + try: + env = dict(os.environ) - script.on( - "message", - lambda msg, data: on_message(msg, data, data_storage, file_storage), - ) + if adb_path is not None: + adb = str(adb_path) + elif adb_path is None and android_sdk_path is None: + adb = "adb" + elif not (android_sdk_path / "platform-tools" / "adb").exists(): + adb = "adb" + else: + adb = str(android_sdk_path / "platform-tools" / "adb") - # Load script - script.load() - # Resume the execution of the APK - device.resume(pid) + if not file_storage.exists(): + file_storage.mkdir(parents=True) + if not file_storage.is_dir(): + print("[!] file_storage must be a directory") + exit() - # Dump all known classloaders - # Don't wait for confirmation that all cl were sended - # global CLASSLOADER_DONE - # CLASSLOADER_DONE = False - # script.post({"type": "dump-class-loaders"}) - # t = spinner() - # while not CLASSLOADER_DONE: - # print( - # f"[{t.__next__()}] Waiting for the list of classloaders to be sent", - # end="\r", - # ) - # time.sleep(0.3) - # print(f"[*] Classloader list received" + " " * 20) + device = setup_frida(device_name, env, adb) - if apk_explorer is None: - exploration_data = explore_app( - app, device=device.id, android_sdk=android_sdk_path - ) - else: - exploration_data = apk_explorer() + app = get_apkid(apk)[0] - # Try to find the Main class loader - main_class_loader: str | None = None - # cls = {d["id"]: d for d in data_storage["classloaders"]} - # for load_data in data_storage["dyn_code_load"]: - # if load_data["classloader"] in cls: - # del cls[load_data["classloader"]] - # for id_ in list(cls.keys()): - # if ( - # 'dalvik.system.PathClassLoader[DexPathList[[directory "."],' - # in cls[id_]["str"] - # ): - # del cls[id_] - # elif cls[id_]["cname"] == "java.lang.BootClassLoader": - # del cls[id_] - cls = {} - for cl in data_storage["classloaders"].values(): - # This is verry doubious - if cl["cname"] == "Ldalvik/system/PathClassLoader;": - zip_files = list( - map( - lambda s: s.removeprefix('zip file "').removesuffix('"'), - filter( - lambda s: s.startswith('zip file "'), - ( - w - for b in cl["str"].split("]") - for a in b.split("[") - for w in a.split(",") - ), - ), + if device.enumerate_applications([app]): + # Uninstall the APK if it already exist + subprocess.run([adb, "uninstall", app], env=env) + subprocess.run([adb, "install", "-g", str(apk.absolute())], env=env) + + with FRIDA_SCRIPT.open("r") as file: + jsscript = file.read() + with STACK_CONSUMER_B64.open("r") as file: + jsscript = jsscript.replace( + "", + file.read().replace("\n", "").strip(), + ) + + pid = device.spawn([app]) + session = device.attach(pid) + try: + script = session.create_script(jsscript) + except frida.InvalidArgumentError as e: + print("[!] Error:") + print( + " " + + "\n ".join( + map(lambda v: f"{v[0]+1: 3} {v[1]}", enumerate(script.split("\n"))) ) ) - if len(zip_files) == 1: - zip_path = Path(zip_files[0]) - if ( - len(zip_path.parts) == 6 - and zip_path.parts[0] == "/" - and zip_path.parts[1] == "data" - and zip_path.parts[2] == "app" - and zip_path.parts[4].startswith(app + "-") - and zip_path.parts[5] == "base.apk" - ): - cls[cl["id"]] = cl - if len(cls) == 0: - print("[!] No classloader found for the main APK") - elif len(cls) > 1: - print( - "[!] Multiple classloader found that could be the main APK, try to guess the right one" - ) - nb_occ = {k: 0 for k in cls.keys()} - for data in data_storage["class_new_inst_data"]: - if data["caller_cl_id"] in nb_occ: - nb_occ[data["caller_cl_id"]] += 1 - for data in data_storage["invoke_data"]: - if data["caller_cl_id"] in nb_occ: - nb_occ[data["caller_cl_id"]] += 1 - for data in data_storage["cnstr_new_inst_data"]: - if data["caller_cl_id"] in nb_occ: - nb_occ[data["caller_cl_id"]] += 1 - main_class_loader = max(cls.keys(), key=lambda x: nb_occ[x]) - else: - main_class_loader = list(cls.keys())[0] - data_storage["apk_cl_id"] = main_class_loader - data_storage["exploration_data"] = exploration_data + raise e + script.on( + "message", + lambda msg, data: on_message(msg, data, data_storage, file_storage), + ) + + # Load script + script.load() + # Resume the execution of the APK + device.resume(pid) + + # Dump all known classloaders + # Don't wait for confirmation that all cl were sended + # global CLASSLOADER_DONE + # CLASSLOADER_DONE = False + # script.post({"type": "dump-class-loaders"}) + # t = spinner() + # while not CLASSLOADER_DONE: + # print( + # f"[{t.__next__()}] Waiting for the list of classloaders to be sent", + # end="\r", + # ) + # time.sleep(0.3) + # print(f"[*] Classloader list received" + " " * 20) + + if apk_explorer is None: + exploration_data = explore_app( + app, device=device.id, android_sdk=android_sdk_path + ) + else: + exploration_data = apk_explorer() + + # Try to find the Main class loader + main_class_loader: str | None = None + # cls = {d["id"]: d for d in data_storage["classloaders"]} + # for load_data in data_storage["dyn_code_load"]: + # if load_data["classloader"] in cls: + # del cls[load_data["classloader"]] + # for id_ in list(cls.keys()): + # if ( + # 'dalvik.system.PathClassLoader[DexPathList[[directory "."],' + # in cls[id_]["str"] + # ): + # del cls[id_] + # elif cls[id_]["cname"] == "java.lang.BootClassLoader": + # del cls[id_] + cls = {} + for cl in data_storage["classloaders"].values(): + # This is verry doubious + if cl["cname"] == "Ldalvik/system/PathClassLoader;": + zip_files = list( + map( + lambda s: s.removeprefix('zip file "').removesuffix('"'), + filter( + lambda s: s.startswith('zip file "'), + ( + w + for b in cl["str"].split("]") + for a in b.split("[") + for w in a.split(",") + ), + ), + ) + ) + if len(zip_files) == 1: + zip_path = Path(zip_files[0]) + if ( + len(zip_path.parts) == 6 + and zip_path.parts[0] == "/" + and zip_path.parts[1] == "data" + and zip_path.parts[2] == "app" + and zip_path.parts[4].startswith(app + "-") + and zip_path.parts[5] == "base.apk" + ): + cls[cl["id"]] = cl + if len(cls) == 0: + print("[!] No classloader found for the main APK") + elif len(cls) > 1: + print( + "[!] Multiple classloader found that could be the main APK, try to guess the right one" + ) + nb_occ = {k: 0 for k in cls.keys()} + for data in data_storage["class_new_inst_data"]: + if data["caller_cl_id"] in nb_occ: + nb_occ[data["caller_cl_id"]] += 1 + for data in data_storage["invoke_data"]: + if data["caller_cl_id"] in nb_occ: + nb_occ[data["caller_cl_id"]] += 1 + for data in data_storage["cnstr_new_inst_data"]: + if data["caller_cl_id"] in nb_occ: + nb_occ[data["caller_cl_id"]] += 1 + main_class_loader = max(cls.keys(), key=lambda x: nb_occ[x]) + else: + main_class_loader = list(cls.keys())[0] + data_storage["apk_cl_id"] = main_class_loader + data_storage["exploration_data"] = exploration_data + + except Exception as e: + import traceback + + data_storage["error"] = { + "msg": str(e), + "all": traceback.format_exception(e), + } + print(traceback.format_exception(e)) json.dump(data_storage, output, indent=" ")