520 lines
17 KiB
Python
520 lines
17 KiB
Python
import argparse
|
|
import base64
|
|
import os
|
|
import hashlib
|
|
import subprocess
|
|
import time
|
|
import json
|
|
import sys
|
|
import tempfile
|
|
import shutil
|
|
import lzma
|
|
from pathlib import Path
|
|
from typing import TextIO, Any
|
|
from collections.abc import Callable
|
|
|
|
from .app_exploration import explore_app
|
|
|
|
import frida # type: ignore
|
|
from androguard.core.apk import get_apkid # type: ignore
|
|
from loguru import logger # type: ignore
|
|
|
|
logger.remove() # remove androguard logs
|
|
|
|
FRIDA_SCRIPT = Path(__file__).parent / "hook.js"
|
|
STACK_CONSUMER_B64 = Path(__file__).parent / "StackConsumer.dex.b64"
|
|
FRIDA_SERVER_BIN = Path(__file__).parent / "frida-server-16.7.4-android-x86_64.xz"
|
|
FRIDA_SERVER_ANDROID_PATH = "/data/local/tmp/frida-server"
|
|
|
|
|
|
# The number of bytes used to encode a java hash (from Object.hashCode or System.identiyHashCode)
|
|
# The type is 'int', so it sould be a 32bit signed value?
|
|
HASH_NB_BYTES = 4
|
|
|
|
|
|
def spinner(symbs: str = "-\\|/"):
|
|
while True:
|
|
for s in symbs:
|
|
yield s
|
|
|
|
|
|
CLASSLOADER_DONE = False
|
|
|
|
|
|
# Define handler to event generated by the scripts
|
|
def on_message(message, data, data_storage: dict, file_storage: Path):
|
|
if message["type"] == "error":
|
|
print(f"[!] {message['description']}")
|
|
print(" " + message["stack"].replace("\n", "\n "))
|
|
elif message["type"] == "send" and message["payload"]["type"] == "invoke":
|
|
handle_invoke_data(message["payload"]["data"], data_storage)
|
|
elif message["type"] == "send" and message["payload"]["type"] == "class-new-inst":
|
|
handle_class_new_inst_data(message["payload"]["data"], data_storage)
|
|
elif message["type"] == "send" and message["payload"]["type"] == "cnstr-new-isnt":
|
|
handle_cnstr_new_inst_data(message["payload"]["data"], data_storage)
|
|
elif message["type"] == "send" and message["payload"]["type"] == "load-dex":
|
|
handle_load_dex(message["payload"]["data"], data_storage, file_storage)
|
|
elif message["type"] == "send" and message["payload"]["type"] == "classloader":
|
|
handle_classloader_data(message["payload"]["data"], data_storage)
|
|
elif message["type"] == "send" and message["payload"]["type"] == "classloader-done":
|
|
global CLASSLOADER_DONE
|
|
CLASSLOADER_DONE = True
|
|
else:
|
|
print("[-] message:", message)
|
|
|
|
|
|
def print_stack(stack, prefix: str):
|
|
for frame in stack:
|
|
native = ""
|
|
if frame["is_native"]:
|
|
native = " (native)"
|
|
print(f" {prefix}{frame['method']}:{frame['bytecode_index']}{native}")
|
|
|
|
|
|
def cl_id_to_string(classloader: int) -> str | None:
|
|
if classloader == 0: # 0 is the hash of java Null
|
|
return None
|
|
if classloader < 0:
|
|
classloader += 2 << (HASH_NB_BYTES * 8 - 1)
|
|
return classloader.to_bytes(HASH_NB_BYTES).hex()
|
|
|
|
|
|
def handle_classloader_data(data: dict, data_storage: dict):
|
|
data["id"] = cl_id_to_string(data["id"])
|
|
data["parent_id"] = cl_id_to_string(data["parent_id"])
|
|
print(f"[+] Got classloader {data['id']}({data['str']})")
|
|
data_storage["classloaders"].append(data)
|
|
|
|
|
|
def handle_invoke_data(data, data_storage: dict):
|
|
method = data["method"]
|
|
method_cl_id = cl_id_to_string(data["method_cl_id"])
|
|
# TODO: good idea?
|
|
if method in [
|
|
"Landroid/view/View;->getTranslationZ()F",
|
|
"Landroid/view/View;->getElevation()F",
|
|
]:
|
|
return
|
|
if len(data["stack"]) == 0:
|
|
return
|
|
caller_method = data["stack"][0]["method"]
|
|
caller_cl_id = cl_id_to_string(data["stack"][0]["cl_id"])
|
|
addr = data["stack"][0]["bytecode_index"]
|
|
is_static = data["is_static"]
|
|
if is_static:
|
|
is_static_str = " (static)"
|
|
else:
|
|
is_static_str = ""
|
|
print("[+] Method.Invoke:")
|
|
print(f" called: [{method_cl_id}]{method}{is_static_str}")
|
|
print(f" by: [{caller_cl_id}]{caller_method}")
|
|
print(f" at: 0x{addr:08x}")
|
|
# print(f" stack:")
|
|
# print_stack(data["stack"], " ")
|
|
if addr < 0:
|
|
return
|
|
data_storage["invoke_data"].append(
|
|
{
|
|
"method": method,
|
|
"method_cl_id": method_cl_id,
|
|
"renamed_method": None,
|
|
"caller_method": caller_method,
|
|
"caller_cl_id": caller_cl_id,
|
|
"renamed_caller_method": None,
|
|
"addr": addr,
|
|
"is_static": is_static,
|
|
}
|
|
)
|
|
|
|
|
|
def handle_class_new_inst_data(data, data_storage: dict):
|
|
constructor = data["constructor"]
|
|
constructor_cl_id = cl_id_to_string(data["constructor_cl_id"])
|
|
if len(data["stack"]) == 0:
|
|
return
|
|
if (
|
|
data["stack"][0]["method"]
|
|
!= "Ljava/lang/Class;->newInstance()Ljava/lang/Object;"
|
|
):
|
|
frame = data["stack"][0]
|
|
elif len(data["stack"]) > 1:
|
|
frame = data["stack"][1]
|
|
else:
|
|
return
|
|
caller_method = frame["method"]
|
|
caller_cl_id = cl_id_to_string(frame["cl_id"])
|
|
addr = frame["bytecode_index"]
|
|
print("[+] Class.NewInstance:")
|
|
print(f" called: [{constructor_cl_id}]{constructor}")
|
|
print(f" by: [{caller_cl_id}]{caller_method}")
|
|
print(f" at: 0x{addr:08x}")
|
|
# print(f" stack:")
|
|
# print_stack(data["stack"], " ")
|
|
if addr < 0:
|
|
return
|
|
data_storage["class_new_inst_data"].append(
|
|
{
|
|
"constructor": constructor,
|
|
"constructor_cl_id": constructor_cl_id,
|
|
"renamed_constructor": None,
|
|
"caller_method": caller_method,
|
|
"caller_cl_id": caller_cl_id,
|
|
"renamed_caller_method": None,
|
|
"addr": addr,
|
|
}
|
|
)
|
|
|
|
|
|
def handle_cnstr_new_inst_data(data, data_storage: dict):
|
|
constructor = data["constructor"]
|
|
constructor_cl_id = cl_id_to_string(data["constructor_cl_id"])
|
|
if not constructor.startswith("Lcom/example/theseus"):
|
|
return
|
|
if len(data["stack"]) == 0:
|
|
return
|
|
caller_method = data["stack"][0]["method"]
|
|
caller_cl_id = cl_id_to_string(data["stack"][0]["cl_id"])
|
|
addr = data["stack"][0]["bytecode_index"]
|
|
print("[+] Constructor.newInstance:")
|
|
print(f" called: [{constructor_cl_id}]{constructor}")
|
|
print(f" by: [{caller_cl_id}]{caller_method}")
|
|
print(f" at: 0x{addr:08x}")
|
|
# print(f" stack:")
|
|
# print_stack(data["stack"], " ")
|
|
if addr < 0:
|
|
return
|
|
data_storage["cnstr_new_inst_data"].append(
|
|
{
|
|
"constructor": constructor,
|
|
"constructor_cl_id": constructor_cl_id,
|
|
"renamed_constructor": None,
|
|
"caller_method": caller_method,
|
|
"caller_cl_id": caller_cl_id,
|
|
"renamed_caller_method": None,
|
|
"addr": addr,
|
|
}
|
|
)
|
|
|
|
|
|
def handle_load_dex(data, data_storage: dict, file_storage: Path):
|
|
dex = data["dex"]
|
|
classloader_class = data["classloader_class"]
|
|
classloader = cl_id_to_string(data["classloader"])
|
|
classloader_parent = cl_id_to_string(data["classloader_parent"])
|
|
short_class = classloader_class.split("/")[-1].removesuffix(";")
|
|
files = []
|
|
print("[+] DEX file loaded:")
|
|
print(f" by: {classloader_class} ({classloader})")
|
|
for file in dex:
|
|
file_bin = base64.b64decode(file)
|
|
hasher = hashlib.sha1()
|
|
hasher.update(file_bin)
|
|
h = hasher.digest().hex()
|
|
print(f" hash: {h}")
|
|
fname = (
|
|
file_storage / f"{short_class}_{classloader}_{h[:16]}.bytecode"
|
|
) # not .dex, can also be .jar or .apk or .oat or ...
|
|
i = 1
|
|
while fname.exists():
|
|
fname = file_storage / f"{short_class}_{classloader}_{h[:16]}_{i}.bytecode"
|
|
i += 1
|
|
fname = fname.absolute().resolve()
|
|
|
|
with fname.open("wb") as fp:
|
|
fp.write(file_bin)
|
|
print(f" stored: {str(fname)}")
|
|
files.append(str(fname))
|
|
data_storage["dyn_code_load"].append(
|
|
{
|
|
"classloader_class": classloader_class,
|
|
"classloader": classloader,
|
|
"files": files,
|
|
"classloader_parent": classloader_parent,
|
|
}
|
|
)
|
|
|
|
|
|
def setup_frida(device_name: str, env: dict[str, str], adb: str) -> frida.core.Device:
|
|
if device_name != "":
|
|
device = frida.get_device(device_name)
|
|
env["ANDROID_SERIAL"] = device_name
|
|
else:
|
|
device = frida.get_usb_device()
|
|
|
|
try:
|
|
s = device.attach(0)
|
|
s.detach()
|
|
return device
|
|
except frida.ServerNotRunningError:
|
|
pass
|
|
# Start server
|
|
proc: subprocess.CompletedProcess[str] | subprocess.CompletedProcess[bytes] = (
|
|
subprocess.run(
|
|
[adb, "shell", "whoami"],
|
|
encoding="utf-8",
|
|
stdout=subprocess.PIPE,
|
|
env=env,
|
|
)
|
|
)
|
|
if proc.stdout.strip() != "root":
|
|
proc = subprocess.run([adb, "root"], env=env)
|
|
# Rooting adb will disconnect the device
|
|
if device_name != "":
|
|
device = frida.get_device(device_name)
|
|
else:
|
|
device = frida.get_usb_device()
|
|
perm = subprocess.run(
|
|
[adb, "shell", "stat", "-c", "%a", FRIDA_SERVER_ANDROID_PATH],
|
|
encoding="utf-8",
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
env=env,
|
|
).stdout.strip()
|
|
need_perm_resset = (perm == "") or perm[0] not in [
|
|
"1",
|
|
"3",
|
|
"5",
|
|
"7",
|
|
] # int(perm[0]) & 1 == 1
|
|
if perm == "":
|
|
with tempfile.TemporaryDirectory() as tmpdname:
|
|
tmpd = Path(tmpdname)
|
|
with (
|
|
lzma.open(str(FRIDA_SERVER_BIN.absolute())) as fin,
|
|
(tmpd / "frida-server").open("wb") as fout,
|
|
):
|
|
shutil.copyfileobj(fin, fout)
|
|
|
|
subprocess.run(
|
|
[
|
|
adb,
|
|
"push",
|
|
str((tmpd / "frida-server").absolute()),
|
|
FRIDA_SERVER_ANDROID_PATH,
|
|
],
|
|
env=env,
|
|
)
|
|
if need_perm_resset:
|
|
subprocess.run(
|
|
[adb, "shell", "chmod", "755", FRIDA_SERVER_ANDROID_PATH], env=env
|
|
)
|
|
subprocess.Popen([adb, "shell", FRIDA_SERVER_ANDROID_PATH], env=env)
|
|
# The server take some time to start
|
|
# time.sleep(3)
|
|
t = spinner()
|
|
while True:
|
|
try:
|
|
s = device.attach(0)
|
|
s.detach()
|
|
print("[*] Server started: begin analysis ")
|
|
return device
|
|
except frida.ServerNotRunningError:
|
|
print(f"[{t.__next__()}] Waiting for frida server to start", end="\r")
|
|
time.sleep(0.3)
|
|
|
|
|
|
def collect_runtime(
|
|
apk: Path,
|
|
device_name: str,
|
|
file_storage: Path,
|
|
output: TextIO,
|
|
adb_path: Path | None = None,
|
|
android_sdk_path: Path | None = None,
|
|
apk_explorer: None | Callable[[], None] = None,
|
|
):
|
|
env = dict(os.environ)
|
|
|
|
if adb_path is not None:
|
|
adb = str(adb_path)
|
|
elif adb_path is None and android_sdk_path is None:
|
|
adb = "adb"
|
|
elif not (android_sdk_path / "platform-tools" / "adb").exists():
|
|
adb = "adb"
|
|
else:
|
|
adb = str(android_sdk_path / "platform-tools" / "adb")
|
|
|
|
if not file_storage.exists():
|
|
file_storage.mkdir(parents=True)
|
|
if not file_storage.is_dir():
|
|
print("[!] file_storage must be a directory")
|
|
exit()
|
|
|
|
device = setup_frida(device_name, env, adb)
|
|
|
|
app = get_apkid(apk)[0]
|
|
|
|
if device.enumerate_applications([app]):
|
|
# Uninstall the APK if it already exist
|
|
subprocess.run([adb, "uninstall", app], env=env)
|
|
subprocess.run([adb, "install", "-g", str(apk.absolute())], env=env)
|
|
|
|
with FRIDA_SCRIPT.open("r") as file:
|
|
jsscript = file.read()
|
|
with STACK_CONSUMER_B64.open("r") as file:
|
|
jsscript = jsscript.replace(
|
|
"<PYTHON REPLACE StackConsumer.dex.b64>",
|
|
file.read().replace("\n", "").strip(),
|
|
)
|
|
|
|
pid = device.spawn([app])
|
|
session = device.attach(pid)
|
|
try:
|
|
script = session.create_script(jsscript)
|
|
except frida.InvalidArgumentError as e:
|
|
print("[!] Error:")
|
|
print(
|
|
" "
|
|
+ "\n ".join(
|
|
map(lambda v: f"{v[0]+1: 3} {v[1]}", enumerate(script.split("\n")))
|
|
)
|
|
)
|
|
raise e
|
|
|
|
data_storage: dict[str, Any] = {
|
|
"invoke_data": [],
|
|
"class_new_inst_data": [],
|
|
"cnstr_new_inst_data": [],
|
|
"dyn_code_load": [],
|
|
"classloaders": [],
|
|
}
|
|
|
|
script.on(
|
|
"message",
|
|
lambda msg, data: on_message(msg, data, data_storage, file_storage),
|
|
)
|
|
|
|
# Load script
|
|
script.load()
|
|
# Resume the execution of the APK
|
|
device.resume(pid)
|
|
|
|
# Dump all known classloaders
|
|
# Don't wait for confirmation that all cl were sended
|
|
# global CLASSLOADER_DONE
|
|
# CLASSLOADER_DONE = False
|
|
# script.post({"type": "dump-class-loaders"})
|
|
# t = spinner()
|
|
# while not CLASSLOADER_DONE:
|
|
# print(
|
|
# f"[{t.__next__()}] Waiting for the list of classloaders to be sent",
|
|
# end="\r",
|
|
# )
|
|
# time.sleep(0.3)
|
|
# print(f"[*] Classloader list received" + " " * 20)
|
|
|
|
if apk_explorer is None:
|
|
explore_app(app, device=device.id, android_sdk=android_sdk_path)
|
|
else:
|
|
apk_explorer()
|
|
|
|
# Try to find the Main class loader
|
|
main_class_loader: str | None = None
|
|
# cls = {d["id"]: d for d in data_storage["classloaders"]}
|
|
# for load_data in data_storage["dyn_code_load"]:
|
|
# if load_data["classloader"] in cls:
|
|
# del cls[load_data["classloader"]]
|
|
# for id_ in list(cls.keys()):
|
|
# if (
|
|
# 'dalvik.system.PathClassLoader[DexPathList[[directory "."],'
|
|
# in cls[id_]["str"]
|
|
# ):
|
|
# del cls[id_]
|
|
# elif cls[id_]["cname"] == "java.lang.BootClassLoader":
|
|
# del cls[id_]
|
|
cls = {}
|
|
for cl in data_storage["classloaders"]:
|
|
# This is verry doubious
|
|
if cl["cname"] == "dalvik.system.PathClassLoader":
|
|
zip_files = list(
|
|
map(
|
|
lambda s: s.removeprefix('zip file "').removesuffix('"'),
|
|
filter(
|
|
lambda s: s.startswith('zip file "'),
|
|
(
|
|
w
|
|
for b in cl["str"].split("]")
|
|
for a in b.split("[")
|
|
for w in a.split(",")
|
|
),
|
|
),
|
|
)
|
|
)
|
|
if len(zip_files) == 1:
|
|
zip_path = Path(zip_files[0])
|
|
if (
|
|
len(zip_path.parts) == 6
|
|
and zip_path.parts[0] == "/"
|
|
and zip_path.parts[1] == "data"
|
|
and zip_path.parts[2] == "app"
|
|
and zip_path.parts[4].startswith(app + "-")
|
|
and zip_path.parts[5] == "base.apk"
|
|
):
|
|
cls[cl["id"]] = cl
|
|
if len(cls) == 0:
|
|
print("[!] No classloader found for the main APK")
|
|
elif len(cls) > 1:
|
|
print(
|
|
"[!] Multiple classloader found that could be the main APK, try to guess the right one"
|
|
)
|
|
nb_occ = {k: 0 for k in cls.keys()}
|
|
for data in data_storage["class_new_inst_data"]:
|
|
if data["caller_cl_id"] in nb_occ:
|
|
nb_occ[data["caller_cl_id"]] += 1
|
|
for data in data_storage["invoke_data"]:
|
|
if data["caller_cl_id"] in nb_occ:
|
|
nb_occ[data["caller_cl_id"]] += 1
|
|
for data in data_storage["cnstr_new_inst_data"]:
|
|
if data["caller_cl_id"] in nb_occ:
|
|
nb_occ[data["caller_cl_id"]] += 1
|
|
main_class_loader = max(cls.keys(), key=lambda x: nb_occ[x])
|
|
else:
|
|
main_class_loader = list(cls.keys())[0]
|
|
data_storage["apk_cl_id"] = main_class_loader
|
|
|
|
json.dump(data_storage, output, indent=" ")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
prog="Android Theseus project",
|
|
)
|
|
parser.add_argument(
|
|
"-a", "--apk", required=True, help="Target application", type=Path
|
|
)
|
|
parser.add_argument(
|
|
"-s",
|
|
"--device",
|
|
default="",
|
|
help="The android device to connect to, eg: 'emulator-5554'",
|
|
type=str,
|
|
)
|
|
parser.add_argument(
|
|
"-o",
|
|
"--output",
|
|
default=None,
|
|
help="where to dump the collected data, default is stdout",
|
|
type=Path,
|
|
)
|
|
parser.add_argument(
|
|
"-d",
|
|
"--dex-dir",
|
|
default=Path("."),
|
|
help="where to store dynamically loaded bytecode",
|
|
type=Path,
|
|
)
|
|
args = parser.parse_args()
|
|
if args.output is None:
|
|
collect_runtime(
|
|
apk=args.apk,
|
|
device_name=args.device,
|
|
file_storage=args.dex_dir,
|
|
output=sys.stdout,
|
|
)
|
|
else:
|
|
with args.output.open("w") as fp:
|
|
collect_runtime(
|
|
apk=args.apk,
|
|
device_name=args.device,
|
|
file_storage=args.dex_dir,
|
|
output=fp,
|
|
)
|