import argparse import os import subprocess import time import json from pathlib import Path import frida # type: ignore from androguard.core.apk import get_apkid # type: ignore FRIDA_SCRIPT = Path(__file__).parent / "hook.js" STACK_CONSUMER_B64 = Path(__file__).parent / "StackConsumer.dex.b64" # Define handler to event generated by the scripts def on_message(message, data, data_storage: dict): if message["type"] == "error": print(f"[error] {message['description']}") print(message["stack"]) elif message["type"] == "send" and message["payload"]["type"] == "invoke": handle_invoke_data(message["payload"]["data"], data_storage) elif message["type"] == "send" and message["payload"]["type"] == "class-new-inst": handle_class_new_inst_data(message["payload"]["data"], data_storage) elif message["type"] == "send" and message["payload"]["type"] == "cnstr-new-isnt": handle_cnstr_new_inst_data(message["payload"]["data"], data_storage) else: print("[on_message] message:", message) def print_stack(stack, prefix: str): for frame in stack: native = "" if frame["is_native"]: native = " (native)" print(f"{prefix}{frame['method']}:{frame['bytecode_index']}{native}") # def get_ty(java_name: str) -> str: # """Return the android name from the java name of a class / type""" # # TODO: array # # TODO: scalar # if java_name == "V": # tmp stub # return "V" # return f"L{java_name.replace('.', '/')};" # def get_method_id(method_data) -> str: # """Get a method descriptor from the different elements collected from the methods.""" # name = method_data["name"] # ret = get_ty(method_data["ret"]) # cls = get_ty(method_data["class"]) # args = "".join(map(get_ty, method_data["args"])) # return f"{cls}->{name}({args}){ret}" def handle_invoke_data(data, data_storage: dict): method = data["method"] if len(data["stack"]) == 0: return caller_method = data["stack"][0]["method"] addr = data["stack"][0]["bytecode_index"] print("Method.Invoke:") print(f" called: {method}") print(f" by: {caller_method}") print(f" at: 0x{addr:08x}") # print(f" stack:") # print_stack(data["stack"], " ") if addr < 0: return data_storage["invoke_data"].append( { "method": method, "caller_method": caller_method, "addr": addr, } ) def handle_class_new_inst_data(data, data_storage: dict): constructor = data["constructor"] if len(data["stack"]) == 0: return if ( data["stack"][0]["method"] != "Ljava/lang/Class;->newInstance()Ljava/lang/Object;" ): frame = data["stack"][0] elif len(data["stack"]) > 1: frame = data["stack"][1] else: return caller_method = frame["method"] addr = frame["bytecode_index"] print("Class.NewInstance:") print(f" called: {constructor}") print(f" by: {caller_method}") print(f" at: 0x{addr:08x}") # print(f" stack:") # print_stack(data["stack"], " ") if addr < 0: return data_storage["class_new_inst_data"].append( { "constructor": constructor, "caller_method": caller_method, "addr": addr, } ) def handle_cnstr_new_inst_data(data, data_storage: dict): constructor = data["constructor"] if not constructor.startswith("Lcom/example/theseus"): return if len(data["stack"]) == 0: return caller_method = data["stack"][0]["method"] addr = data["stack"][0]["bytecode_index"] print("Constructor.newInstance:") print(f" called: {constructor}") print(f" by: {caller_method}") print(f" at: 0x{addr:08x}") # print(f" stack:") # print_stack(data["stack"], " ") if addr < 0: return data_storage["cnstr_new_inst_data"].append( { "constructor": constructor, "caller_method": caller_method, "addr": addr, } ) def main(): parser = argparse.ArgumentParser( prog="Android Theseus project", ) parser.add_argument( "-a", "--apk", required=True, help="Target application", type=Path ) parser.add_argument( "-s", "--device", default="", help="The android device to connect to, eg: 'emulator-5554'", type=str, ) parser.add_argument( "-o", "--output", default=None, help="where to dump the collected data, default is stdout", type=Path, ) args = parser.parse_args() env = dict(os.environ) if args.device != "": device = frida.get_device(args.device) env["ANDROID_SERIAL"] = args.device else: device = frida.get_usb_device() app = get_apkid(args.apk)[0] if device.enumerate_applications([app]): # Uninstall the APK if it already exist subprocess.run(["adb", "uninstall", app], env=env) subprocess.run(["adb", "install", str(args.apk.absolute())], env=env) with FRIDA_SCRIPT.open("r") as file: script = file.read() with STACK_CONSUMER_B64.open("r") as file: script = script.replace( "", file.read().replace("\n", "").strip(), ) pid = device.spawn([app]) session = device.attach(pid) script = session.create_script(script) data_storage = { "invoke_data": [], "class_new_inst_data": [], "cnstr_new_inst_data": [], } script.on( "message", lambda msg, data: on_message(msg, data, data_storage), ) # Load script script.load() # Resume the execution of the APK device.resume(pid) print("Press ENTER to finish the analysis") input() if args.output is None: print(json.dumps(data_storage, indent=" ")) else: with args.output.open("w") as fp: json.dump(data_storage, fp)