import argparse import base64 import os import hashlib import subprocess import time import json import sys import tempfile import shutil import lzma from pathlib import Path from typing import TextIO, Any import frida # type: ignore from androguard.core.apk import get_apkid # type: ignore from loguru import logger # type: ignore logger.remove() # remove androguard logs FRIDA_SCRIPT = Path(__file__).parent / "hook.js" STACK_CONSUMER_B64 = Path(__file__).parent / "StackConsumer.dex.b64" # The number of bytes used to encode a java hash (from Object.hashCode or System.identiyHashCode) # The type is 'int', so it sould be a 32bit signed value? HASH_NB_BYTES = 4 def spinner(symbs: str = "-\\|/"): while True: for s in symbs: yield s # Define handler to event generated by the scripts def on_message(message, data, data_storage: dict, file_storage: Path): if message["type"] == "error": print(f"[!] {message['description']}") print(" " + message["stack"].replace("\n", "\n ")) elif message["type"] == "send" and message["payload"]["type"] == "invoke": handle_invoke_data(message["payload"]["data"], data_storage) elif message["type"] == "send" and message["payload"]["type"] == "class-new-inst": handle_class_new_inst_data(message["payload"]["data"], data_storage) elif message["type"] == "send" and message["payload"]["type"] == "cnstr-new-isnt": handle_cnstr_new_inst_data(message["payload"]["data"], data_storage) elif message["type"] == "send" and message["payload"]["type"] == "load-dex": handle_load_dex(message["payload"]["data"], data_storage, file_storage) elif message["type"] == "send" and message["payload"]["type"] == "apk-cl": handle_classloader_data(message["payload"]["data"], data_storage) else: print("[-] message:", message) def print_stack(stack, prefix: str): for frame in stack: native = "" if frame["is_native"]: native = " (native)" print(f" {prefix}{frame['method']}:{frame['bytecode_index']}{native}") def handle_classloader_data(data: dict, data_storage: dict): data_storage["initial_classloaders"].append(data) def handle_invoke_data(data, data_storage: dict): method = data["method"] method_cl_id = data["method_cl_id"] # TODO: good idea? if method in [ "Landroid/view/View;->getTranslationZ()F", "Landroid/view/View;->getElevation()F", ]: return if len(data["stack"]) == 0: return caller_method = data["stack"][0]["method"] caller_cl_id = data["stack"][0]["cl_id"] addr = data["stack"][0]["bytecode_index"] is_static = data["is_static"] if is_static: is_static_str = " (static)" else: is_static_str = "" print("[+] Method.Invoke:") print(f" called: [{method_cl_id}]{method}{is_static_str}") print(f" by: [{caller_cl_id}]{caller_method}") print(f" at: 0x{addr:08x}") # print(f" stack:") # print_stack(data["stack"], " ") if addr < 0: return data_storage["invoke_data"].append( { "method": method, "method_cl_id": method_cl_id, "renamed_method": None, "caller_method": caller_method, "caller_cl_id": caller_cl_id, "renamed_caller_method": None, "addr": addr, "is_static": is_static, } ) def handle_class_new_inst_data(data, data_storage: dict): constructor = data["constructor"] constructor_cl_id = data["constructor_cl_id"] if len(data["stack"]) == 0: return if ( data["stack"][0]["method"] != "Ljava/lang/Class;->newInstance()Ljava/lang/Object;" ): frame = data["stack"][0] elif len(data["stack"]) > 1: frame = data["stack"][1] else: return caller_method = frame["method"] caller_cl_id = frame["cl_id"] addr = frame["bytecode_index"] print("[+] Class.NewInstance:") print(f" called: [{constructor_cl_id}]{constructor}") print(f" by: [{caller_cl_id}]{caller_method}") print(f" at: 0x{addr:08x}") # print(f" stack:") # print_stack(data["stack"], " ") if addr < 0: return data_storage["class_new_inst_data"].append( { "constructor": constructor, "constructor_cl_id": constructor_cl_id, "renamed_constructor": None, "caller_method": caller_method, "caller_cl_id": caller_cl_id, "renamed_caller_method": None, "addr": addr, } ) def handle_cnstr_new_inst_data(data, data_storage: dict): constructor = data["constructor"] constructor_cl_id = data["constructor_cl_id"] if not constructor.startswith("Lcom/example/theseus"): return if len(data["stack"]) == 0: return caller_method = data["stack"][0]["method"] caller_cl_id = data["stack"][0]["cl_id"] addr = data["stack"][0]["bytecode_index"] print("[+] Constructor.newInstance:") print(f" called: [{constructor_cl_id}]{constructor}") print(f" by: [{caller_cl_id}]{caller_method}") print(f" at: 0x{addr:08x}") # print(f" stack:") # print_stack(data["stack"], " ") if addr < 0: return data_storage["cnstr_new_inst_data"].append( { "constructor": constructor, "constructor_cl_id": constructor_cl_id, "renamed_constructor": None, "caller_method": caller_method, "caller_cl_id": caller_cl_id, "renamed_caller_method": None, "addr": addr, } ) def handle_load_dex(data, data_storage: dict, file_storage: Path): dex = data["dex"] classloader_class = data["classloader_class"] classloader = data["classloader"] if classloader < 0: classloader += 2 << (HASH_NB_BYTES * 8 - 1) classloader = classloader.to_bytes(HASH_NB_BYTES).hex() short_class = classloader_class.split("/")[-1].removesuffix(";") files = [] print("[+] DEX file loaded:") print(f" by: {classloader_class} ({classloader})") for file in dex: file_bin = base64.b64decode(file) hasher = hashlib.sha1() hasher.update(file_bin) h = hasher.digest().hex() print(f" hash: {h}") fname = ( file_storage / f"{short_class}_{classloader}_{h[:16]}.bytecode" ) # not .dex, can also be .jar or .apk or .oat or ... i = 1 while fname.exists(): fname = file_storage / f"{short_class}_{classloader}_{h[:16]}_{i}.bytecode" i += 1 fname = fname.absolute().resolve() with fname.open("wb") as fp: fp.write(file_bin) print(f" stored: {str(fname)}") files.append(str(fname)) data_storage["dyn_code_load"].append( { "classloader_class": classloader_class, "classloader": classloader, "files": files, } ) FRIDA_SERVER_BIN = Path(__file__).parent / "frida-server-16.7.0-android-x86_64.xz" FRIDA_SERVER_ANDROID_PATH = "/data/local/tmp/frida-server" def setup_frida(device_name: str, env: dict[str, str]) -> frida.core.Device: if device_name != "": device = frida.get_device(device_name) env["ANDROID_SERIAL"] = device_name else: device = frida.get_usb_device() try: s = device.attach(0) s.detach() return device except frida.ServerNotRunningError: pass # Start server proc: subprocess.CompletedProcess[str] | subprocess.CompletedProcess[bytes] = ( subprocess.run( ["adb", "shell", "whoami"], encoding="utf-8", stdout=subprocess.PIPE, env=env, ) ) if proc.stdout.strip() != "root": proc = subprocess.run(["adb", "root"], env=env) # Rooting adb will disconnect the device if device_name != "": device = frida.get_device(device_name) else: device = frida.get_usb_device() perm = subprocess.run( ["adb", "shell", "stat", "-c", "%a", FRIDA_SERVER_ANDROID_PATH], encoding="utf-8", stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, ).stdout.strip() need_perm_resset = (perm == "") or perm[0] not in [ "1", "3", "5", "7", ] # int(perm[0]) & 1 == 1 if perm == "": with tempfile.TemporaryDirectory() as tmpdname: tmpd = Path(tmpdname) with ( lzma.open(str(FRIDA_SERVER_BIN.absolute())) as fin, (tmpd / "frida-server").open("wb") as fout, ): shutil.copyfileobj(fin, fout) subprocess.run( [ "adb", "push", str((tmpd / "frida-server").absolute()), FRIDA_SERVER_ANDROID_PATH, ], env=env, ) if need_perm_resset: subprocess.run( ["adb", "shell", "chmod", "755", FRIDA_SERVER_ANDROID_PATH], env=env ) subprocess.Popen(["adb", "shell", FRIDA_SERVER_ANDROID_PATH], env=env) # The server take some time to start # time.sleep(3) t = spinner() while True: try: s = device.attach(0) s.detach() print("[*] Server started: begin analysis ") return device except frida.ServerNotRunningError: print(f"[{t.__next__()}] Waiting for frida server to start", end="\r") time.sleep(0.3) def collect_runtime(apk: Path, device_name: str, file_storage: Path, output: TextIO): env = dict(os.environ) if not file_storage.exists(): file_storage.mkdir(parents=True) if not file_storage.is_dir(): print("[!] file_storage must be a directory") exit() device = setup_frida(device_name, env) app = get_apkid(apk)[0] if device.enumerate_applications([app]): # Uninstall the APK if it already exist subprocess.run(["adb", "uninstall", app], env=env) subprocess.run(["adb", "install", str(apk.absolute())], env=env) with FRIDA_SCRIPT.open("r") as file: jsscript = file.read() with STACK_CONSUMER_B64.open("r") as file: jsscript = jsscript.replace( "", file.read().replace("\n", "").strip(), ) pid = device.spawn([app]) session = device.attach(pid) try: script = session.create_script(jsscript) except frida.InvalidArgumentError as e: print("[!] Error:") print( " " + "\n ".join( map(lambda v: f"{v[0]+1: 3} {v[1]}", enumerate(script.split("\n"))) ) ) raise e data_storage: dict[str, Any] = { "invoke_data": [], "class_new_inst_data": [], "cnstr_new_inst_data": [], "dyn_code_load": [], "initial_classloaders": [], } script.on( "message", lambda msg, data: on_message(msg, data, data_storage, file_storage), ) # Load script script.load() # Resume the execution of the APK device.resume(pid) print("==> Press ENTER to finish the analysis <==") input() main_class_loader: str | None = None cls = {d["id"]: d for d in data_storage["initial_classloaders"]} for load_data in data_storage["dyn_code_load"]: if load_data["classloader"] in cls: del cls[load_data["classloader"]] for id_ in cls.keys(): if ( 'dalvik.system.PathClassLoader[DexPathList[[directory "."],' in cls[id_]["str"] ): del cls[id_] elif cls[id_]["cname"] == "java.lang.BootClassLoader": del cls[id_] if len(cls) == 0: print("[!] No classloader found for the main APK") elif len(cls) > 1: print( "[!] Multiple classloader found that could be the main APK, try to guess the right one" ) nb_occ = {k: 0 for k in cls.keys()} for data in data_storage["class_new_inst_data"]: if data["caller_cl_id"] in nb_occ: nb_occ[data["caller_cl_id"]] += 1 for data in data_storage["invoke_data"]: if data["caller_cl_id"] in nb_occ: nb_occ[data["caller_cl_id"]] += 1 for data in data_storage["cnstr_new_inst_data"]: if data["caller_cl_id"] in nb_occ: nb_occ[data["caller_cl_id"]] += 1 main_class_loader = max(cls.keys(), key=lambda x: nb_occ[x]) else: main_class_loader = list(cls.keys())[0] data_storage["apk_cl_id"] = main_class_loader json.dump(data_storage, output, indent=" ") def main(): parser = argparse.ArgumentParser( prog="Android Theseus project", ) parser.add_argument( "-a", "--apk", required=True, help="Target application", type=Path ) parser.add_argument( "-s", "--device", default="", help="The android device to connect to, eg: 'emulator-5554'", type=str, ) parser.add_argument( "-o", "--output", default=None, help="where to dump the collected data, default is stdout", type=Path, ) parser.add_argument( "-d", "--dex-dir", default=Path("."), help="where to store dynamically loaded bytecode", type=Path, ) args = parser.parse_args() if args.output is None: collect_runtime( apk=args.apk, device_name=args.device, file_storage=args.dex_dir, output=sys.stdout, ) else: with args.output.open("w") as fp: collect_runtime( apk=args.apk, device_name=args.device, file_storage=args.dex_dir, output=fp, )