android_of_theseus/frida/theseus_frida/__init__.py
2025-06-16 14:59:07 +02:00

553 lines
18 KiB
Python

import argparse
import base64
import os
import hashlib
import subprocess
import time
import json
import sys
import tempfile
import shutil
import lzma
import re
from pathlib import Path
from typing import TextIO, Any
from collections.abc import Callable
from .app_exploration import explore_app
from .setup_frida import setup_frida
import frida # type: ignore
from androguard.core.apk import get_apkid # type: ignore
from loguru import logger # type: ignore
logger.remove() # remove androguard logs
FRIDA_SCRIPT = Path(__file__).parent / "hook.js"
STACK_CONSUMER_B64 = Path(__file__).parent / "StackConsumer.dex.b64"
# The number of bytes used to encode a java hash (from Object.hashCode or System.identiyHashCode)
# The type is 'int', so it sould be a 32bit signed value?
HASH_NB_BYTES = 4
CLASSLOADER_DONE = False
# Define handler to event generated by the scripts
def on_message(message, data, data_storage: dict, file_storage: Path):
if message["type"] == "error":
print(f"[!] {message['description']}")
print(" " + message["stack"].replace("\n", "\n "))
elif message["type"] == "send" and message["payload"]["type"] == "invoke":
handle_invoke_data(message["payload"]["data"], data_storage)
elif message["type"] == "send" and message["payload"]["type"] == "class-new-inst":
handle_class_new_inst_data(message["payload"]["data"], data_storage)
elif message["type"] == "send" and message["payload"]["type"] == "cnstr-new-isnt":
handle_cnstr_new_inst_data(message["payload"]["data"], data_storage)
elif message["type"] == "send" and message["payload"]["type"] == "load-dex":
handle_load_dex(message["payload"]["data"], data_storage, file_storage)
elif message["type"] == "send" and message["payload"]["type"] == "classloader":
handle_classloader_data(message["payload"]["data"], data_storage)
elif message["type"] == "send" and message["payload"]["type"] == "classloader-done":
global CLASSLOADER_DONE
CLASSLOADER_DONE = True
elif message["type"] == "send" and message["payload"]["type"] == "app_info":
handle_app_info(message["payload"]["data"], data_storage)
else:
print("[-] message:", message)
def print_stack(stack, prefix: str):
for frame in stack:
native = ""
if frame["is_native"]:
native = " (native)"
print(f" {prefix}{frame['method']}:{frame['bytecode_index']}{native}")
def cl_id_to_string(classloader: int) -> str | None:
if classloader == 0: # 0 is the hash of java Null
return None
if classloader < 0:
classloader += 2 << (HASH_NB_BYTES * 8 - 1)
return classloader.to_bytes(HASH_NB_BYTES).hex()
def handle_classloader_data(data: dict, data_storage: dict):
data["id"] = cl_id_to_string(data["id"])
data["parent_id"] = cl_id_to_string(data["parent_id"])
print(f"[+] Got classloader {data['id']}({data['str']})")
data_storage["classloaders"][data["id"]] = data
def handle_invoke_data(data, data_storage: dict):
method = data["method"]
method_cl_id = cl_id_to_string(data["method_cl_id"])
# TODO: good idea?
if method in [
"Landroid/view/View;->getTranslationZ()F",
"Landroid/view/View;->getElevation()F",
]:
return
if len(data["stack"]) == 0:
return
caller_method = data["stack"][0]["method"]
caller_cl_id = cl_id_to_string(data["stack"][0]["cl_id"])
addr = data["stack"][0]["bytecode_index"]
is_static = data["is_static"]
if is_static:
is_static_str = " (static)"
else:
is_static_str = ""
print("[+] Method.Invoke:")
print(f" called: [{method_cl_id}]{method}{is_static_str}")
print(f" by: [{caller_cl_id}]{caller_method}")
print(f" at: 0x{addr:08x}")
# print(f" stack:")
# print_stack(data["stack"], " ")
if addr < 0:
return
data_storage["invoke_data"].append(
{
"method": method,
"method_cl_id": method_cl_id,
"renamed_method": None,
"caller_method": caller_method,
"caller_cl_id": caller_cl_id,
"renamed_caller_method": None,
"addr": addr,
"is_static": is_static,
}
)
def handle_class_new_inst_data(data, data_storage: dict):
constructor = data["constructor"]
constructor_cl_id = cl_id_to_string(data["constructor_cl_id"])
if len(data["stack"]) == 0:
return
if (
data["stack"][0]["method"]
!= "Ljava/lang/Class;->newInstance()Ljava/lang/Object;"
):
frame = data["stack"][0]
elif len(data["stack"]) > 1:
frame = data["stack"][1]
else:
return
caller_method = frame["method"]
caller_cl_id = cl_id_to_string(frame["cl_id"])
addr = frame["bytecode_index"]
print("[+] Class.NewInstance:")
print(f" called: [{constructor_cl_id}]{constructor}")
print(f" by: [{caller_cl_id}]{caller_method}")
print(f" at: 0x{addr:08x}")
# print(f" stack:")
# print_stack(data["stack"], " ")
if addr < 0:
return
data_storage["class_new_inst_data"].append(
{
"constructor": constructor,
"constructor_cl_id": constructor_cl_id,
"renamed_constructor": None,
"caller_method": caller_method,
"caller_cl_id": caller_cl_id,
"renamed_caller_method": None,
"addr": addr,
}
)
def handle_cnstr_new_inst_data(data, data_storage: dict):
constructor = data["constructor"]
constructor_cl_id = cl_id_to_string(data["constructor_cl_id"])
if not constructor.startswith("Lcom/example/theseus"):
return
if len(data["stack"]) == 0:
return
caller_method = data["stack"][0]["method"]
caller_cl_id = cl_id_to_string(data["stack"][0]["cl_id"])
addr = data["stack"][0]["bytecode_index"]
print("[+] Constructor.newInstance:")
print(f" called: [{constructor_cl_id}]{constructor}")
print(f" by: [{caller_cl_id}]{caller_method}")
print(f" at: 0x{addr:08x}")
# print(f" stack:")
# print_stack(data["stack"], " ")
if addr < 0:
return
data_storage["cnstr_new_inst_data"].append(
{
"constructor": constructor,
"constructor_cl_id": constructor_cl_id,
"renamed_constructor": None,
"caller_method": caller_method,
"caller_cl_id": caller_cl_id,
"renamed_caller_method": None,
"addr": addr,
}
)
def handle_load_dex(data, data_storage: dict, file_storage: Path):
dex = data["dex"]
classloader_class = data["classloader_class"]
classloader = cl_id_to_string(data["classloader"])
classloader_parent = cl_id_to_string(data["classloader_parent"])
short_class = classloader_class.split("/")[-1].removesuffix(";")
files = []
print("[+] DEX file loaded:")
print(f" by: {classloader_class} ({classloader})")
for file in dex:
file_bin = base64.b64decode(file)
hasher = hashlib.sha1()
hasher.update(file_bin)
h = hasher.digest().hex()
print(f" hash: {h}")
fname = (
file_storage / f"{short_class}_{classloader}_{h[:16]}.bytecode"
) # not .dex, can also be .jar or .apk or .oat or ...
i = 1
while fname.exists():
fname = file_storage / f"{short_class}_{classloader}_{h[:16]}_{i}.bytecode"
i += 1
fname = fname.absolute().resolve()
with fname.open("wb") as fp:
fp.write(file_bin)
print(f" stored: {str(fname)}")
files.append(str(fname))
data_storage["dyn_code_load"].append(
{
"classloader_class": classloader_class,
"classloader": classloader,
"files": files,
"classloader_parent": classloader_parent,
}
)
caml_pattern = re.compile(r"([a-z])([A-Z])")
def caml_to_snake_case(string: str) -> str:
return caml_pattern.sub(r"\1_\2", string).lower()
def handle_app_info(data, data_storage: dict):
data["actualSourceDir"] = data["sourceDir"].removesuffix("/base.apk")
data_storage["app_info"] = {}
print("[+] Received app info:")
for k in data.keys():
data_storage["app_info"][caml_to_snake_case(k)] = data[k]
print(f" {k}: {data[k]}")
def app_reinstall(
device: frida.core.Device, app: str, apk: Path, adb: str, env: dict[str, str]
):
if device.enumerate_applications([app]):
subprocess.run([adb, "uninstall", app], env=env)
i = 0
while not device.enumerate_applications([app]):
time.sleep(i)
subprocess.run([adb, "install", "-g", str(apk.absolute())], env=env)
i += 1
if i == 10:
print("[!] Failled to install apk")
e = RuntimeError("Failled to install apk")
e.add_note(f"apk: {app} ({str(apk.absolute())})")
e.add_note(
f"installed apk: {' '.join(map(str, device.enumerate_applications()))}"
)
raise e
def spinner(symbs: str = "-\\|/"):
while True:
for s in symbs:
yield s
def get_frida_device(
device_name: str, env: dict[str, str], adb: str
) -> frida.core.Device:
if device_name != "":
device = frida.get_device(device_name)
env["ANDROID_SERIAL"] = device_name
else:
device = frida.get_usb_device()
try:
s = device.attach(0)
s.detach()
return device
except frida.ServerNotRunningError:
pass
setup_frida(device_name, env, adb)
# setup_frida may disconnect the device
if device_name != "":
device = frida.get_device(device_name)
env["ANDROID_SERIAL"] = device_name
else:
device = frida.get_usb_device()
# The server take some time to start
# time.sleep(3)
t = spinner()
while True:
try:
s = device.attach(0)
s.detach()
print("[*] Server started: begin analysis ")
return device
except frida.ServerNotRunningError:
print(f"[{t.__next__()}] Waiting for frida server to start", end="\r")
time.sleep(0.3)
def collect_runtime(
apk: Path,
device_name: str,
file_storage: Path,
output: TextIO,
adb_path: Path | None = None,
android_sdk_path: Path | None = None,
apk_explorer: None | Callable[[], None] = None,
timeout: None | int = None,
):
"""Collect runtime data from an apk.
- apk: the path off the apk to analyze
- device_name: name of the device to use
- file_storage: path where to store collected files
- output: textio where to write json data
- adb_path: path to the adb executable
- android_sdk_path: path to the Android SDK folder (usually ~/Android/Sdk)
- apk_explorer: callable called to explore the apk
- timeout: timeout in s for the exploration of the apk, only used with grodd runner.
"""
data_storage: dict[str, Any] = {
"invoke_data": [],
"class_new_inst_data": [],
"cnstr_new_inst_data": [],
"dyn_code_load": [],
"classloaders": {},
"app_info": None,
}
try:
env = dict(os.environ)
if adb_path is not None:
adb = str(adb_path)
elif adb_path is None and android_sdk_path is None:
adb = "adb"
elif not (android_sdk_path / "platform-tools" / "adb").exists():
adb = "adb"
else:
adb = str(android_sdk_path / "platform-tools" / "adb")
if not file_storage.exists():
file_storage.mkdir(parents=True)
if not file_storage.is_dir():
print("[!] file_storage must be a directory")
exit()
device = get_frida_device(device_name, env, adb)
app = get_apkid(apk)[0]
app_reinstall(device, app, apk, adb, env)
with FRIDA_SCRIPT.open("r") as file:
jsscript = file.read()
with STACK_CONSUMER_B64.open("r") as file:
jsscript = jsscript.replace(
"<PYTHON REPLACE StackConsumer.dex.b64>",
file.read().replace("\n", "").strip(),
)
for i in range(10):
try:
pid = device.spawn([app])
except frida.NotSupportedError as e:
if str(e) == "unable to find a front-door activity":
print(f"[!] Failed to start frida ({e}), reinstalling apk")
pid = None
app_reinstall(device, app, apk, adb, env)
else:
raise e
if pid is None:
raise RuntimeError(
"Failed to start frida ('unable to find a front-door activity' error)"
)
session = device.attach(pid)
try:
script = session.create_script(jsscript)
except frida.InvalidArgumentError as e:
print("[!] Error:")
print(
" "
+ "\n ".join(
map(lambda v: f"{v[0]+1: 3} {v[1]}", enumerate(script.split("\n")))
)
)
raise e
script.on(
"message",
lambda msg, data: on_message(msg, data, data_storage, file_storage),
)
# Load script
script.load()
# Resume the execution of the APK
device.resume(pid)
# Dump all known classloaders
# Don't wait for confirmation that all cl were sended
# global CLASSLOADER_DONE
# CLASSLOADER_DONE = False
# script.post({"type": "dump-class-loaders"})
# t = spinner()
# while not CLASSLOADER_DONE:
# print(
# f"[{t.__next__()}] Waiting for the list of classloaders to be sent",
# end="\r",
# )
# time.sleep(0.3)
# print(f"[*] Classloader list received" + " " * 20)
if apk_explorer is None:
exploration_data: dict | None = explore_app(
app, device=device.id, android_sdk=android_sdk_path, timeout=timeout
)
else:
exploration_data = apk_explorer()
# Try to find the Main class loader
main_class_loader: str | None = None
# cls = {d["id"]: d for d in data_storage["classloaders"]}
# for load_data in data_storage["dyn_code_load"]:
# if load_data["classloader"] in cls:
# del cls[load_data["classloader"]]
# for id_ in list(cls.keys()):
# if (
# 'dalvik.system.PathClassLoader[DexPathList[[directory "."],'
# in cls[id_]["str"]
# ):
# del cls[id_]
# elif cls[id_]["cname"] == "java.lang.BootClassLoader":
# del cls[id_]
cls = {}
for cl in data_storage["classloaders"].values():
# This is verry doubious
if cl["cname"] == "Ldalvik/system/PathClassLoader;":
zip_files = list(
map(
lambda s: s.removeprefix('zip file "').removesuffix('"'),
filter(
lambda s: s.startswith('zip file "'),
(
w
for b in cl["str"].split("]")
for a in b.split("[")
for w in a.split(",")
),
),
)
)
if len(zip_files) == 1:
zip_path = Path(zip_files[0])
if (
len(zip_path.parts) == 6
and zip_path.parts[0] == "/"
and zip_path.parts[1] == "data"
and zip_path.parts[2] == "app"
and zip_path.parts[4].startswith(app + "-")
and zip_path.parts[5] == "base.apk"
):
cls[cl["id"]] = cl
if len(cls) == 0:
print("[!] No classloader found for the main APK")
elif len(cls) > 1:
print(
"[!] Multiple classloader found that could be the main APK, try to guess the right one"
)
nb_occ = {k: 0 for k in cls.keys()}
for data in data_storage["class_new_inst_data"]:
if data["caller_cl_id"] in nb_occ:
nb_occ[data["caller_cl_id"]] += 1
for data in data_storage["invoke_data"]:
if data["caller_cl_id"] in nb_occ:
nb_occ[data["caller_cl_id"]] += 1
for data in data_storage["cnstr_new_inst_data"]:
if data["caller_cl_id"] in nb_occ:
nb_occ[data["caller_cl_id"]] += 1
main_class_loader = max(cls.keys(), key=lambda x: nb_occ[x])
else:
main_class_loader = list(cls.keys())[0]
data_storage["apk_cl_id"] = main_class_loader
data_storage["exploration_data"] = exploration_data
except Exception as e:
import traceback
data_storage["error"] = {
"msg": str(e),
"all": traceback.format_exception(e),
}
print(traceback.format_exception(e))
json.dump(data_storage, output, indent=" ")
def main():
parser = argparse.ArgumentParser(
prog="Android Theseus project",
)
parser.add_argument(
"-a", "--apk", required=True, help="Target application", type=Path
)
parser.add_argument(
"-s",
"--device",
default="",
help="The android device to connect to, eg: 'emulator-5554'",
type=str,
)
parser.add_argument(
"-o",
"--output",
default=None,
help="where to dump the collected data, default is stdout",
type=Path,
)
parser.add_argument(
"-d",
"--dex-dir",
default=Path("."),
help="where to store dynamically loaded bytecode",
type=Path,
)
parser.add_argument(
"-t", "--timeout", default=None, type=int, help="timeout for grodd runner"
)
args = parser.parse_args()
if args.output is None:
collect_runtime(
apk=args.apk,
device_name=args.device,
file_storage=args.dex_dir,
output=sys.stdout,
)
else:
with args.output.open("w") as fp:
collect_runtime(
apk=args.apk,
device_name=args.device,
file_storage=args.dex_dir,
output=fp,
timeout=args.timeout,
)