from pathlib import Path import os import json import shutil import time import subprocess import threading import argparse EMULATORS = [f"root34-{i}" for i in range(20)] ANDROID_IMG = "system-images;android-34;default;x86_64" TIMEOUT = 400 if "ANDROID_HOME" in os.environ: ANDROID_HOME = Path(os.environ["ANDROID_HOME"]) else: ANDROID_HOME = Path.home() / "Android" / "Sdk" EMULATOR = str(ANDROID_HOME / "emulator" / "emulator") AVDMANAGER = str(ANDROID_HOME / "cmdline-tools" / "latest" / "bin" / "avdmanager") ADB = str(ANDROID_HOME / "platform-tools" / "adb") class AdbFailed(RuntimeError): pass def adb_run(emu: str, cmd: list[str], timeout: int | None = None) -> str: """Run an adb command, Warning: don't use this to run a command with long output: will hang due to deadlock on process.run when capturing output""" cmd_l = [ADB, "-s", emu, *cmd] cmd_txt = " ".join(cmd_l) for i in range(3): r = subprocess.run( cmd_l, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout ) if b"error: could not connect to TCP port" in r.stderr: print(f"failled to run `{cmd_txt}`: error '{r.stderr.decode('utf-8')}'") time.sleep(i + 1) if i != 2: print("retrying") else: return r.stdout.decode("utf-8") raise AdbFailed("Failed to run `{cmd_txt}`") def get_ports(emu: str) -> tuple[int, int]: """Return the console port and adb port for the emulator.""" i = EMULATORS.index(emu) * 2 return (5554 + i, 5554 + i + 1) def get_installed_emu() -> set[str]: """List name of installed emulators""" return set( subprocess.run([EMULATOR, "-list-avds"], stdout=subprocess.PIPE) .stdout.decode("utf-8") .strip() .split("\n") ) def gen_emulators(): emu_lst = get_installed_emu() for emu in EMULATORS: if emu not in emu_lst: subprocess.run( [ AVDMANAGER, "create", "avd", "--name", emu, "--package", ANDROID_IMG, "--sdcard", "512M", "--device", "medium_phone", ] ) make_snapshot(emu) def del_emulators(): emu_lst = get_installed_emu() for emu in EMULATORS: if emu in emu_lst: subprocess.run( [ AVDMANAGER, "delete", "avd", "--name", emu, ] ) FRIDA_SETUP_SCRIPT = ( Path(__file__).parent.parent / "frida" / "theseus_frida" / "setup_frida.py" ) def make_snapshot(emu: str): console_port, adb_port = get_ports(emu) # First run with debug stuff, for because android emulator black magic fuckery ? probably ? proc = subprocess.Popen( [ EMULATOR, "-avd", emu, "-no-window", "-no-metrics", "-debug-init", "-logcat", "*:v", "-ports", f"{console_port},{adb_port}", ] ) adb_run(f"emulator-{console_port}", ["wait-for-device"]) time.sleep(10) # stop emulator try: adb_run(f"emulator-{console_port}", ["emu", "kill"], timeout=25) time.sleep(25) except subprocess.TimeoutExpired: pass if proc.poll() is None: proc.kill() time.sleep(3) # start the emulator without the debug stuff proc = subprocess.Popen( [ EMULATOR, "-avd", emu, "-no-window", "-no-metrics", "-ports", f"{console_port},{adb_port}", ] ) adb_run(f"emulator-{console_port}", ["wait-for-device"]) time.sleep(1) # setup frida, uggly, but meh, at this point import importlib.util spec = importlib.util.spec_from_file_location( "setup_frida", str(FRIDA_SETUP_SCRIPT) ) assert spec is not None setup_frida = importlib.util.module_from_spec(spec) assert spec.loader is not None spec.loader.exec_module(setup_frida) setup_frida.setup_frida(f"emulator-{console_port}", os.environ, ADB) time.sleep(10) adb_run( f"emulator-{console_port}", [ "emu", "avd", "snapshot", "save", "baseline", ], ) # stop emulator try: adb_run( f"emulator-{console_port}", [ "emu", "kill", ], timeout=25, ) time.sleep(25) except subprocess.TimeoutExpired: pass if proc.poll() is None: proc.kill() time.sleep(3) def restore_emu(emu: str, proc: None | subprocess.Popen) -> subprocess.Popen: console_port, adb_port = get_ports(emu) if proc is not None and proc.poll() is None: adb_run( f"emulator-{console_port}", [ "emu", "avd", "snapshot", "save", "baseline", ], ) time.sleep(3) return proc proc = subprocess.Popen( [ EMULATOR, "-avd", emu, "-no-window", "-no-metrics", "-ports", f"{console_port},{adb_port}", ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) adb_run(f"emulator-{console_port}", ["wait-for-device"]) time.sleep(3) adb_run( f"emulator-{console_port}", [ "emu", "avd", "snapshot", "load", "baseline", ], ) time.sleep(3) return proc def worker(emu: str, apklist: list[str], out_folder: Path, script: Path): console_port, adb_port = get_ports(emu) script_env = os.environ.copy() script_env["ANDROID_HOME"] = str(ANDROID_HOME) proc_emu = restore_emu(emu, None) while apklist: apk = apklist.pop() folder_name = apk.split("/")[-1].removesuffix(".apk") folder = out_folder / folder_name if folder.exists() and (folder / "data.json").exists(): has_error = False with (folder / "data.json").open() as fp: data = json.load(fp) if "error" in data: has_error = True if (folder / "TIMEOUT").exists(): has_error = True if has_error: print( f"Previous result for {apk=} found but with error of timeout, remove old result and rerun it" ) shutil.rmtree(str(folder)) else: continue folder.mkdir(parents=True) with ( (folder / "analysis.out").open("w") as fp_anly_stdout, (folder / "analysis.err").open("w") as fp_anly_stderr, ): # Start emulator with wipped data print(f"START ANALYSIS: {apk=}, emulator-{console_port}") i = 0 started = False while not started: if i != 0 and i % 10 == 0: print( f"Warning: tried to start emulator-{console_port} (avd {emu}) for the {i}th time without success" ) proc_emu = restore_emu(emu, proc_emu) adb_run( f"emulator-{console_port}", ["wait-for-device"], ) j = 0 while not started: started = f"emulator-{console_port}\tdevice" in subprocess.run( [ADB, "devices"], stdout=subprocess.PIPE ).stdout.decode("utf-8") if not started: time.sleep(1) if j != 0 and j % 10 == 0: print( f"emulator-{console_port} has been offline for 10s, restarting it now" ) proc_emu.kill() break j += 1 i += 1 print(f"emulator-{console_port} running") fp_anly_stdout.write(f"START ANALYSIS: {apk=}, emulator-{console_port}\n") subprocess.run( [ADB, "devices"], stdout=fp_anly_stdout, stderr=fp_anly_stderr, ) # Run script try: subprocess.run( ["bash", str(script), apk, f"emulator-{console_port}", str(folder)], env=script_env, stdout=fp_anly_stdout, stderr=fp_anly_stderr, timeout=TIMEOUT, ) print(f"FINISHED ANALYSIS: {apk=}, emulator-{console_port}") except subprocess.TimeoutExpired: with (folder / "TIMEOUT").open("w") as fp: fp.write("Process timedout") print(f"TIMEOUT ANALYSIS: {apk=}, emulator-{console_port}") with (folder / "emu").open("w") as fp: fp.write(f"Used emulator {emu}: emulator-{console_port}") def run(apklist: list[str], out_folder: Path, script: Path): gen_emulators() workers = [] for emu in EMULATORS: workers.append( threading.Thread(target=lambda: worker(emu, apklist, out_folder, script)) ) workers[-1].start() for w in workers: w.join() def main(): parser = argparse.ArgumentParser( prog="orchestrator", description="Run several android emulators en run analysis on applications", ) parser.add_argument( "applist", type=Path, help="File containing the path to applications, one by line", ) parser.add_argument( "out_folder", type=Path, help="The folder where to store the results of the analysis, a folder for each application will be created in it", ) parser.add_argument( "analysis_script", type=Path, help=( "The script to run the analysis. The script will be invoke with " "`bash analysis_script.sh path/of/app.apk emulator-5554 path/of/out_folder/app/`" ), ) args = parser.parse_args() with args.applist.open("r") as fp: apklist = list(map(str.strip, fp.readlines())) run(apklist, args.out_folder, args.analysis_script) if __name__ == "__main__": main()