better, I guess?

This commit is contained in:
Jean-Marie Mineau 2025-05-20 11:07:45 +02:00
parent 89b5487d31
commit bb4b93167e
Signed by: histausse
GPG key ID: B66AEEDA9B645AD2
5 changed files with 313 additions and 157 deletions

View file

@ -10,4 +10,4 @@ echo "APK=${APK}"
echo "DEVICE=${DEVICE}" echo "DEVICE=${DEVICE}"
echo "OUT_DIR=${OUT_DIR}" echo "OUT_DIR=${OUT_DIR}"
"${FOLDER}/venv/bin/collect-runtime-data" --apk "${APK}" --device "${DEVICE}" --output "${OUT_DIR}/data.json" --dex-dir "${3}" "${FOLDER}/venv/bin/collect-runtime-data" --apk "${APK}" --device "${DEVICE}" --output "${OUT_DIR}/data.json" --dex-dir "${3}" --timeout 300

View file

@ -6,8 +6,9 @@ import subprocess
import threading import threading
import argparse import argparse
EMULATORS = [f"root34-{i}" for i in range(4)] EMULATORS = [f"root34-{i}" for i in range(20)]
ANDROID_IMG = "system-images;android-34;default;x86_64" ANDROID_IMG = "system-images;android-34;default;x86_64"
TIMEOUT = 400
if "ANDROID_HOME" in os.environ: if "ANDROID_HOME" in os.environ:
ANDROID_HOME = Path(os.environ["ANDROID_HOME"]) ANDROID_HOME = Path(os.environ["ANDROID_HOME"])
@ -19,6 +20,30 @@ AVDMANAGER = str(ANDROID_HOME / "cmdline-tools" / "latest" / "bin" / "avdmanager
ADB = str(ANDROID_HOME / "platform-tools" / "adb") ADB = str(ANDROID_HOME / "platform-tools" / "adb")
class AdbFailed(RuntimeError):
pass
def adb_run(emu: str, cmd: list[str], timeout: int | None = None) -> str:
"""Run an adb command,
Warning: don't use this to run a command with long output:
will hang due to deadlock on process.run when capturing output"""
cmd_l = [ADB, "-s", emu, *cmd]
cmd_txt = " ".join(cmd_l)
for i in range(3):
r = subprocess.run(
cmd_l, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout
)
if b"error: could not connect to TCP port" in r.stderr:
print(f"failled to run `{cmd_txt}`: error '{r.stderr.decode('utf-8')}'")
time.sleep(i + 1)
if i != 2:
print("retrying")
else:
return r.stdout.decode("utf-8")
raise AdbFailed("Failed to run `{cmd_txt}`")
def get_ports(emu: str) -> tuple[int, int]: def get_ports(emu: str) -> tuple[int, int]:
"""Return the console port and adb port for the emulator.""" """Return the console port and adb port for the emulator."""
i = EMULATORS.index(emu) * 2 i = EMULATORS.index(emu) * 2
@ -54,6 +79,7 @@ def gen_emulators():
"medium_phone", "medium_phone",
] ]
) )
make_snapshot(emu)
def del_emulators(): def del_emulators():
@ -71,42 +97,145 @@ def del_emulators():
) )
# def make_snapshot(folder: Path): FRIDA_SETUP_SCRIPT = (
# for emu in EMULATORS: Path(__file__).parent.parent / "frida" / "theseus_frida" / "setup_frida.py"
# console_port, adb_port = get_ports(emu) )
# proc = subprocess.Popen(
# [
# EMULATOR, def make_snapshot(emu: str):
# "-avd", console_port, adb_port = get_ports(emu)
# emu, # First run with debug stuff, for because android emulator black magic fuckery ? probably ?
# "-no-window", proc = subprocess.Popen(
# "-no-metrics", [
# "-debug-init", EMULATOR,
# "-logcat", "-avd",
# "*:v", emu,
# "-ports", "-no-window",
# f"{console_port},{adb_port}", "-no-metrics",
# ] "-debug-init",
# ) "-logcat",
# subprocess.run([ADB, "-s", f"emulator-{console_port}", "wait-for-device"]) "*:v",
# subprocess.run( "-ports",
# [ f"{console_port},{adb_port}",
# ADB, ]
# "-s", )
# f"emulator-{console_port}", adb_run(f"emulator-{console_port}", ["wait-for-device"])
# "emu", time.sleep(10)
# "avd", # stop emulator
# "snapshot", try:
# "save", adb_run(f"emulator-{console_port}", ["emu", "kill"], timeout=25)
# "baseline", time.sleep(25)
# ] except subprocess.TimeoutExpired:
# ) pass
if proc.poll() is None:
proc.kill()
time.sleep(3)
# start the emulator without the debug stuff
proc = subprocess.Popen(
[
EMULATOR,
"-avd",
emu,
"-no-window",
"-no-metrics",
"-ports",
f"{console_port},{adb_port}",
]
)
adb_run(f"emulator-{console_port}", ["wait-for-device"])
time.sleep(1)
# setup frida, uggly, but meh, at this point
import importlib.util
spec = importlib.util.spec_from_file_location(
"setup_frida", str(FRIDA_SETUP_SCRIPT)
)
assert spec is not None
setup_frida = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(setup_frida)
setup_frida.setup_frida(f"emulator-{console_port}", os.environ, ADB)
time.sleep(10)
adb_run(
f"emulator-{console_port}",
[
"emu",
"avd",
"snapshot",
"save",
"baseline",
],
)
# stop emulator
try:
adb_run(
f"emulator-{console_port}",
[
"emu",
"kill",
],
timeout=25,
)
time.sleep(25)
except subprocess.TimeoutExpired:
pass
if proc.poll() is None:
proc.kill()
time.sleep(3)
def restore_emu(emu: str, proc: None | subprocess.Popen) -> subprocess.Popen:
console_port, adb_port = get_ports(emu)
if proc is not None and proc.poll() is None:
adb_run(
f"emulator-{console_port}",
[
"emu",
"avd",
"snapshot",
"save",
"baseline",
],
)
time.sleep(3)
return proc
proc = subprocess.Popen(
[
EMULATOR,
"-avd",
emu,
"-no-window",
"-no-metrics",
"-ports",
f"{console_port},{adb_port}",
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
adb_run(f"emulator-{console_port}", ["wait-for-device"])
time.sleep(3)
adb_run(
f"emulator-{console_port}",
[
"emu",
"avd",
"snapshot",
"load",
"baseline",
],
)
time.sleep(3)
return proc
def worker(emu: str, apklist: list[str], out_folder: Path, script: Path): def worker(emu: str, apklist: list[str], out_folder: Path, script: Path):
console_port, adb_port = get_ports(emu) console_port, adb_port = get_ports(emu)
script_env = os.environ.copy() script_env = os.environ.copy()
script_env["ANDROID_HOME"] = str(ANDROID_HOME) script_env["ANDROID_HOME"] = str(ANDROID_HOME)
proc_emu = restore_emu(emu, None)
while apklist: while apklist:
apk = apklist.pop() apk = apklist.pop()
folder_name = apk.split("/")[-1].removesuffix(".apk") folder_name = apk.split("/")[-1].removesuffix(".apk")
@ -116,8 +245,6 @@ def worker(emu: str, apklist: list[str], out_folder: Path, script: Path):
folder.mkdir(parents=True) folder.mkdir(parents=True)
with ( with (
(folder / "emu.out").open("w") as fp_emu_stdout,
(folder / "emu.err").open("w") as fp_emu_stderr,
(folder / "analysis.out").open("w") as fp_anly_stdout, (folder / "analysis.out").open("w") as fp_anly_stdout,
(folder / "analysis.err").open("w") as fp_anly_stderr, (folder / "analysis.err").open("w") as fp_anly_stderr,
): ):
@ -131,29 +258,14 @@ def worker(emu: str, apklist: list[str], out_folder: Path, script: Path):
print( print(
f"Warning: tried to start emulator-{console_port} (avd {emu}) for the {i}th time without success" f"Warning: tried to start emulator-{console_port} (avd {emu}) for the {i}th time without success"
) )
proc = subprocess.Popen( proc_emu = restore_emu(emu, proc_emu)
[ adb_run(
EMULATOR, f"emulator-{console_port}",
"-avd", ["wait-for-device"],
emu,
"-wipe-data",
"-no-window",
"-no-metrics",
"-debug-init", # dunno why but sometime needed
"-ports",
f"{console_port},{adb_port}",
],
stdout=fp_emu_stdout,
stderr=fp_emu_stderr,
)
subprocess.run(
[ADB, "-s", f"emulator-{console_port}", "wait-for-device"],
stdout=fp_anly_stdout,
stderr=fp_anly_stderr,
) )
j = 0 j = 0
while not started: while not started:
started = f"emulator-{console_port}\t device" not in subprocess.run( started = f"emulator-{console_port}\tdevice" in subprocess.run(
[ADB, "devices"], stdout=subprocess.PIPE [ADB, "devices"], stdout=subprocess.PIPE
).stdout.decode("utf-8") ).stdout.decode("utf-8")
if not started: if not started:
@ -162,7 +274,7 @@ def worker(emu: str, apklist: list[str], out_folder: Path, script: Path):
print( print(
f"emulator-{console_port} has been offline for 10s, restarting it now" f"emulator-{console_port} has been offline for 10s, restarting it now"
) )
proc.kill() proc_emu.kill()
break break
j += 1 j += 1
i += 1 i += 1
@ -173,34 +285,20 @@ def worker(emu: str, apklist: list[str], out_folder: Path, script: Path):
stdout=fp_anly_stdout, stdout=fp_anly_stdout,
stderr=fp_anly_stderr, stderr=fp_anly_stderr,
) )
print(f"FINISHED ANALYSIS: {apk=}, emulator-{console_port}")
# Run script # Run script
subprocess.run(
["bash", str(script), apk, f"emulator-{console_port}", str(folder)],
env=script_env,
stdout=fp_anly_stdout,
stderr=fp_anly_stderr,
)
# stop emulator
try: try:
subprocess.run( subprocess.run(
[ ["bash", str(script), apk, f"emulator-{console_port}", str(folder)],
ADB, env=script_env,
"-s", stdout=fp_anly_stdout,
f"emulator-{console_port}", stderr=fp_anly_stderr,
"emu", timeout=TIMEOUT,
"kill",
],
timeout=3,
) )
print(f"FINISHED ANALYSIS: {apk=}, emulator-{console_port}")
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
pass with (folder / "TIMEOUT").open("w") as fp:
if proc.poll() is None: fp.write("Process timedout")
proc.kill() print(f"TIMEOUT ANALYSIS: {apk=}, emulator-{console_port}")
time.sleep(3)
print(f"emulator-{console_port} stoped")
def run(apklist: list[str], out_folder: Path, script: Path): def run(apklist: list[str], out_folder: Path, script: Path):

View file

@ -15,6 +15,7 @@ from typing import TextIO, Any
from collections.abc import Callable from collections.abc import Callable
from .app_exploration import explore_app from .app_exploration import explore_app
from .setup_frida import setup_frida
import frida # type: ignore import frida # type: ignore
from androguard.core.apk import get_apkid # type: ignore from androguard.core.apk import get_apkid # type: ignore
@ -24,21 +25,12 @@ logger.remove() # remove androguard logs
FRIDA_SCRIPT = Path(__file__).parent / "hook.js" FRIDA_SCRIPT = Path(__file__).parent / "hook.js"
STACK_CONSUMER_B64 = Path(__file__).parent / "StackConsumer.dex.b64" STACK_CONSUMER_B64 = Path(__file__).parent / "StackConsumer.dex.b64"
FRIDA_SERVER_BIN = Path(__file__).parent / "frida-server-16.7.4-android-x86_64.xz"
FRIDA_SERVER_ANDROID_PATH = "/data/local/tmp/frida-server"
# The number of bytes used to encode a java hash (from Object.hashCode or System.identiyHashCode) # The number of bytes used to encode a java hash (from Object.hashCode or System.identiyHashCode)
# The type is 'int', so it sould be a 32bit signed value? # The type is 'int', so it sould be a 32bit signed value?
HASH_NB_BYTES = 4 HASH_NB_BYTES = 4
def spinner(symbs: str = "-\\|/"):
while True:
for s in symbs:
yield s
CLASSLOADER_DONE = False CLASSLOADER_DONE = False
@ -253,7 +245,35 @@ def handle_app_info(data, data_storage: dict):
print(f" {k}: {data[k]}") print(f" {k}: {data[k]}")
def setup_frida(device_name: str, env: dict[str, str], adb: str) -> frida.core.Device: def app_reinstall(
device: frida.core.Device, app: str, apk: Path, adb: str, env: dict[str, str]
):
if device.enumerate_applications([app]):
subprocess.run([adb, "uninstall", app], env=env)
i = 0
while not device.enumerate_applications([app]):
time.sleep(i)
subprocess.run([adb, "install", "-g", str(apk.absolute())], env=env)
i += 1
if i == 10:
print("[!] Failled to install apk")
e = RuntimeError("Failled to install apk")
e.add_note(f"apk: {app} ({str(apk.absolute())})")
e.add_note(
f"installed apk: {' '.join(map(str, device.enumerate_applications()))}"
)
raise e
def spinner(symbs: str = "-\\|/"):
while True:
for s in symbs:
yield s
def get_frida_device(
device_name: str, env: dict[str, str], adb: str
) -> frida.core.Device:
if device_name != "": if device_name != "":
device = frida.get_device(device_name) device = frida.get_device(device_name)
env["ANDROID_SERIAL"] = device_name env["ANDROID_SERIAL"] = device_name
@ -266,58 +286,15 @@ def setup_frida(device_name: str, env: dict[str, str], adb: str) -> frida.core.D
return device return device
except frida.ServerNotRunningError: except frida.ServerNotRunningError:
pass pass
# Start server
proc: subprocess.CompletedProcess[str] | subprocess.CompletedProcess[bytes] = (
subprocess.run(
[adb, "shell", "whoami"],
encoding="utf-8",
stdout=subprocess.PIPE,
env=env,
)
)
if proc.stdout.strip() != "root":
proc = subprocess.run([adb, "root"], env=env)
# Rooting adb will disconnect the device
if device_name != "":
device = frida.get_device(device_name)
else:
device = frida.get_usb_device()
perm = subprocess.run(
[adb, "shell", "stat", "-c", "%a", FRIDA_SERVER_ANDROID_PATH],
encoding="utf-8",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
).stdout.strip()
need_perm_resset = (perm == "") or perm[0] not in [
"1",
"3",
"5",
"7",
] # int(perm[0]) & 1 == 1
if perm == "":
with tempfile.TemporaryDirectory() as tmpdname:
tmpd = Path(tmpdname)
with (
lzma.open(str(FRIDA_SERVER_BIN.absolute())) as fin,
(tmpd / "frida-server").open("wb") as fout,
):
shutil.copyfileobj(fin, fout)
subprocess.run( setup_frida(device_name, env, adb)
[ # setup_frida may disconnect the device
adb, if device_name != "":
"push", device = frida.get_device(device_name)
str((tmpd / "frida-server").absolute()), env["ANDROID_SERIAL"] = device_name
FRIDA_SERVER_ANDROID_PATH, else:
], device = frida.get_usb_device()
env=env,
)
if need_perm_resset:
subprocess.run(
[adb, "shell", "chmod", "755", FRIDA_SERVER_ANDROID_PATH], env=env
)
subprocess.Popen([adb, "shell", FRIDA_SERVER_ANDROID_PATH], env=env)
# The server take some time to start # The server take some time to start
# time.sleep(3) # time.sleep(3)
t = spinner() t = spinner()
@ -340,7 +317,18 @@ def collect_runtime(
adb_path: Path | None = None, adb_path: Path | None = None,
android_sdk_path: Path | None = None, android_sdk_path: Path | None = None,
apk_explorer: None | Callable[[], None] = None, apk_explorer: None | Callable[[], None] = None,
timeout: None | int = None,
): ):
"""Collect runtime data from an apk.
- apk: the path off the apk to analyze
- device_name: name of the device to use
- file_storage: path where to store collected files
- output: textio where to write json data
- adb_path: path to the adb executable
- android_sdk_path: path to the Android SDK folder (usually ~/Android/Sdk)
- apk_explorer: callable called to explore the apk
- timeout: timeout in s for the exploration of the apk, only used with grodd runner.
"""
data_storage: dict[str, Any] = { data_storage: dict[str, Any] = {
"invoke_data": [], "invoke_data": [],
"class_new_inst_data": [], "class_new_inst_data": [],
@ -367,23 +355,11 @@ def collect_runtime(
print("[!] file_storage must be a directory") print("[!] file_storage must be a directory")
exit() exit()
device = setup_frida(device_name, env, adb) device = get_frida_device(device_name, env, adb)
app = get_apkid(apk)[0] app = get_apkid(apk)[0]
i = 0 app_reinstall(device, app, apk, adb, env)
while not device.enumerate_applications([app]):
time.sleep(i)
subprocess.run([adb, "install", "-r", "-g", str(apk.absolute())], env=env)
i += 1
if i == 10:
print("[!] Failled to install apk")
e = RuntimeError("Failled to install apk")
e.add_note(f"apk: {app} ({str(apk.absolute())})")
e.add_note(
f"installed apk: {' '.join(map(str, device.enumerate_applications()))}"
)
raise e
with FRIDA_SCRIPT.open("r") as file: with FRIDA_SCRIPT.open("r") as file:
jsscript = file.read() jsscript = file.read()
@ -393,7 +369,20 @@ def collect_runtime(
file.read().replace("\n", "").strip(), file.read().replace("\n", "").strip(),
) )
pid = device.spawn([app]) for i in range(10):
try:
pid = device.spawn([app])
except frida.NotSupportedError as e:
if str(e) == "unable to find a front-door activity":
print(f"[!] Failed to start frida ({e}), reinstalling apk")
pid = None
app_reinstall(device, app, apk, adb, env)
else:
raise e
if pid is None:
raise RuntimeError(
"Failed to start frida ('unable to find a front-door activity' error)"
)
session = device.attach(pid) session = device.attach(pid)
try: try:
script = session.create_script(jsscript) script = session.create_script(jsscript)
@ -432,8 +421,8 @@ def collect_runtime(
# print(f"[*] Classloader list received" + " " * 20) # print(f"[*] Classloader list received" + " " * 20)
if apk_explorer is None: if apk_explorer is None:
exploration_data = explore_app( exploration_data: dict | None = explore_app(
app, device=device.id, android_sdk=android_sdk_path app, device=device.id, android_sdk=android_sdk_path, timeout=timeout
) )
else: else:
exploration_data = apk_explorer() exploration_data = apk_explorer()
@ -542,6 +531,9 @@ def main():
help="where to store dynamically loaded bytecode", help="where to store dynamically loaded bytecode",
type=Path, type=Path,
) )
parser.add_argument(
"-t", "--timeout", default=None, type=int, help="timeout for grodd runner"
)
args = parser.parse_args() args = parser.parse_args()
if args.output is None: if args.output is None:
collect_runtime( collect_runtime(
@ -557,4 +549,5 @@ def main():
device_name=args.device, device_name=args.device,
file_storage=args.dex_dir, file_storage=args.dex_dir,
output=fp, output=fp,
timeout=args.timeout,
) )

View file

@ -13,13 +13,14 @@ def explore_app(
package: str, package: str,
device: str = "emulator-5554", device: str = "emulator-5554",
android_sdk: Path | None = None, android_sdk: Path | None = None,
timeout: int | None = None,
) -> dict: ) -> dict:
if USE_GRODD: if USE_GRODD:
time.sleep(1) # let the app load time.sleep(1) # let the app load
return grodd_runner( return grodd_runner(
"grodd", "grodd",
device, device,
timeout=300, timeout=timeout,
package=package, package=package,
android_sdk=android_sdk, android_sdk=android_sdk,
slowdown=1.0, slowdown=1.0,

View file

@ -0,0 +1,64 @@
import subprocess
import tempfile
import lzma
import shutil
import time
from pathlib import Path
FRIDA_SERVER_BIN = Path(__file__).parent / "frida-server-16.7.4-android-x86_64.xz"
FRIDA_SERVER_ANDROID_PATH = "/data/local/tmp/frida-server"
def setup_frida(device_name: str, env: dict[str, str], adb: str):
env = env.copy()
if "ANDROID_SERIAL" not in env and device_name != "":
env["ANDROID_SERIAL"] = device_name
# Start server
proc: subprocess.CompletedProcess[str] | subprocess.CompletedProcess[bytes] = (
subprocess.run(
[adb, "shell", "whoami"],
encoding="utf-8",
stdout=subprocess.PIPE,
env=env,
)
)
if proc.stdout.strip() != "root":
proc = subprocess.run([adb, "root"], env=env)
perm = subprocess.run(
[adb, "shell", "stat", "-c", "%a", FRIDA_SERVER_ANDROID_PATH],
encoding="utf-8",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
).stdout.strip()
need_perm_resset = (perm == "") or perm[0] not in [
"1",
"3",
"5",
"7",
] # int(perm[0]) & 1 == 1
if perm == "":
with tempfile.TemporaryDirectory() as tmpdname:
tmpd = Path(tmpdname)
with (
lzma.open(str(FRIDA_SERVER_BIN.absolute())) as fin,
(tmpd / "frida-server").open("wb") as fout,
):
shutil.copyfileobj(fin, fout)
subprocess.run(
[
adb,
"push",
str((tmpd / "frida-server").absolute()),
FRIDA_SERVER_ANDROID_PATH,
],
env=env,
)
if need_perm_resset:
subprocess.run(
[adb, "shell", "chmod", "755", FRIDA_SERVER_ANDROID_PATH], env=env
)
subprocess.Popen([adb, "shell", "nohup", FRIDA_SERVER_ANDROID_PATH], env=env)