android_of_theseus/frida/theseus_frida/__init__.py

354 lines
10 KiB
Python

import argparse
import base64
import os
import hashlib
import subprocess
import time
import json
import sys
from pathlib import Path
from typing import TextIO
import frida # type: ignore
from androguard.core.apk import get_apkid # type: ignore
from loguru import logger # type: ignore
logger.remove() # remove androguard logs
FRIDA_SCRIPT = Path(__file__).parent / "hook.js"
STACK_CONSUMER_B64 = Path(__file__).parent / "StackConsumer.dex.b64"
# The number of bytes used to encode a java hash (from Object.hashCode or System.identiyHashCode)
# The type is 'int', so it sould be a 32bit signed value?
HASH_NB_BYTES = 4
# Define handler to event generated by the scripts
def on_message(message, data, data_storage: dict, file_storage: Path):
if message["type"] == "error":
print(f"[!] {message['description']}")
print(" " + message["stack"].replace("\n", "\n "))
elif message["type"] == "send" and message["payload"]["type"] == "invoke":
handle_invoke_data(message["payload"]["data"], data_storage)
elif message["type"] == "send" and message["payload"]["type"] == "class-new-inst":
handle_class_new_inst_data(message["payload"]["data"], data_storage)
elif message["type"] == "send" and message["payload"]["type"] == "cnstr-new-isnt":
handle_cnstr_new_inst_data(message["payload"]["data"], data_storage)
elif message["type"] == "send" and message["payload"]["type"] == "load-dex":
handle_load_dex(message["payload"]["data"], data_storage, file_storage)
else:
print("[-] message:", message)
def print_stack(stack, prefix: str):
for frame in stack:
native = ""
if frame["is_native"]:
native = " (native)"
print(f" {prefix}{frame['method']}:{frame['bytecode_index']}{native}")
def handle_invoke_data(data, data_storage: dict):
method = data["method"]
# TODO: good idea?
if method in [
"Landroid/view/View;->getTranslationZ()F",
"Landroid/view/View;->getElevation()F",
]:
return
if len(data["stack"]) == 0:
return
caller_method = data["stack"][0]["method"]
addr = data["stack"][0]["bytecode_index"]
is_static = data["is_static"]
if is_static:
is_static_str = " (static)"
else:
is_static_str = ""
print("[+] Method.Invoke:")
print(f" called: {method}{is_static_str}")
print(f" by: {caller_method}")
print(f" at: 0x{addr:08x}")
# print(f" stack:")
# print_stack(data["stack"], " ")
if addr < 0:
return
data_storage["invoke_data"].append(
{
"method": method,
"caller_method": caller_method,
"addr": addr,
"is_static": is_static,
}
)
def handle_class_new_inst_data(data, data_storage: dict):
constructor = data["constructor"]
if len(data["stack"]) == 0:
return
if (
data["stack"][0]["method"]
!= "Ljava/lang/Class;->newInstance()Ljava/lang/Object;"
):
frame = data["stack"][0]
elif len(data["stack"]) > 1:
frame = data["stack"][1]
else:
return
caller_method = frame["method"]
addr = frame["bytecode_index"]
print("[+] Class.NewInstance:")
print(f" called: {constructor}")
print(f" by: {caller_method}")
print(f" at: 0x{addr:08x}")
# print(f" stack:")
# print_stack(data["stack"], " ")
if addr < 0:
return
data_storage["class_new_inst_data"].append(
{
"constructor": constructor,
"caller_method": caller_method,
"addr": addr,
}
)
def handle_cnstr_new_inst_data(data, data_storage: dict):
constructor = data["constructor"]
if not constructor.startswith("Lcom/example/theseus"):
return
if len(data["stack"]) == 0:
return
caller_method = data["stack"][0]["method"]
addr = data["stack"][0]["bytecode_index"]
print("[+] Constructor.newInstance:")
print(f" called: {constructor}")
print(f" by: {caller_method}")
print(f" at: 0x{addr:08x}")
# print(f" stack:")
# print_stack(data["stack"], " ")
if addr < 0:
return
data_storage["cnstr_new_inst_data"].append(
{
"constructor": constructor,
"caller_method": caller_method,
"addr": addr,
}
)
def handle_load_dex(data, data_storage: dict, file_storage: Path):
dex = data["dex"]
classloader_class = data["classloader_class"]
classloader = data["classloader"]
if classloader < 0:
classloader += 2 << (HASH_NB_BYTES * 8 - 1)
classloader = classloader.to_bytes(HASH_NB_BYTES).hex()
short_class = classloader_class.split("/")[-1].removesuffix(";")
files = []
print("[+] DEX file loaded:")
print(f" by: {classloader_class} ({classloader})")
for file in dex:
file_bin = base64.b64decode(file)
hasher = hashlib.sha1()
hasher.update(file_bin)
h = hasher.digest().hex()
print(f" hash: {h}")
fname = (
file_storage / f"{short_class}_{classloader}_{h[:16]}.bytecode"
) # not .dex, can also be .jar or .apk or .oat or ...
i = 1
while fname.exists():
fname = file_storage / f"{short_class}_{classloader}_{h[:16]}_{i}.bytecode"
i += 1
fname = fname.absolute().resolve()
with fname.open("wb") as fp:
fp.write(file_bin)
print(f" stored: {str(fname)}")
files.append(str(fname))
data_storage["dyn_code_load"].append(
{
"classloader_class": classloader_class,
"classloader": classloader,
"files": files,
}
)
FRIDA_SERVER_BIN = Path(__file__).parent / "frida-server-16.7.0-android-x86_64.xz"
FRIDA_SERVER_ANDROID_PATH = "/data/local/tmp/frida-server"
def setup_frida(device: str, env: dict[str, str]) -> frida.core.Device:
if device != "":
device = frida.get_device(args.device)
env["ANDROID_SERIAL"] = args.device
else:
device = frida.get_usb_device()
try:
s = device.attach(0)
s.detach()
return device
except frida.ServerNotRunningError:
pass
# Start server
proc = subprocess.run(
["adb", "shell", "whoami"], encoding="utf-8", stdout=subprocess.PIPE, env=env
)
if proc.stdout.strip() != "root":
proc = subprocess.run(["adb", "root"], env=env)
# Rooting adb will disconnect the device
if device != "":
device = frida.get_device(device)
else:
device = frida.get_usb_device()
perm = subprocess.run(
["adb", "shell", "stat", "-c", "%a", FRIDA_SERVER_ANDROID_PATH],
encoding="utf-8",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
).stdout.strip()
need_perm_resset = (perm == "") or perm[0] not in [
"1",
"3",
"5",
"7",
] # int(perm[0]) & 1 == 1
if perm == "":
subprocess.run(
[
"adb",
"push",
str(FRIDA_SERVER_BIN.absolute()),
FRIDA_SERVER_ANDROID_PATH,
],
env=env,
)
if need_perm_resset:
subprocess.run(["adb", "chmod", "755", FRIDA_SERVER_ANDROID_PATH], env=env)
subprocess.Popen(["adb", "shell", FRIDA_SERVER_ANDROID_PATH], env=env)
# The server take some time to start
# time.sleep(3)
while True:
try:
s = device.attach(0)
s.detach()
print("[*] Server started: begin analysis ")
return device
except frida.ServerNotRunningError:
print("[-] Waiting for frida server to start", end="\r")
time.sleep(0.3)
def collect_runtime(apk: Path, device: str, file_storage: Path, output: TextIO):
env = dict(os.environ)
if not file_storage.exists():
file_storage.mkdir(parents=True)
if not file_storage.is_dir():
print("[!] file_storage must be a directory")
exit()
device = setup_frida(device, env)
app = get_apkid(apk)[0]
if device.enumerate_applications([app]):
# Uninstall the APK if it already exist
subprocess.run(["adb", "uninstall", app], env=env)
subprocess.run(["adb", "install", str(apk.absolute())], env=env)
with FRIDA_SCRIPT.open("r") as file:
script = file.read()
with STACK_CONSUMER_B64.open("r") as file:
script = script.replace(
"<PYTHON REPLACE StackConsumer.dex.b64>",
file.read().replace("\n", "").strip(),
)
pid = device.spawn([app])
session = device.attach(pid)
try:
script = session.create_script(script)
except frida.InvalidArgumentError as e:
print("[!] Error:")
print(
" "
+ "\n ".join(
map(lambda v: f"{v[0]+1: 3} {v[1]}", enumerate(script.split("\n")))
)
)
raise e
data_storage = {
"invoke_data": [],
"class_new_inst_data": [],
"cnstr_new_inst_data": [],
"dyn_code_load": [],
}
script.on(
"message",
lambda msg, data: on_message(msg, data, data_storage, file_storage),
)
# Load script
script.load()
# Resume the execution of the APK
device.resume(pid)
print("==> Press ENTER to finish the analysis <==")
input()
json.dump(data_storage, output, indent=" ")
def main():
parser = argparse.ArgumentParser(
prog="Android Theseus project",
)
parser.add_argument(
"-a", "--apk", required=True, help="Target application", type=Path
)
parser.add_argument(
"-s",
"--device",
default="",
help="The android device to connect to, eg: 'emulator-5554'",
type=str,
)
parser.add_argument(
"-o",
"--output",
default=None,
help="where to dump the collected data, default is stdout",
type=Path,
)
parser.add_argument(
"-d",
"--dex-dir",
default=Path("."),
help="where to store dynamically loaded bytecode",
type=Path,
)
args = parser.parse_args()
if args.output is None:
collect_runtime(
apk=args.apk,
device=args.device,
file_storage=args.dex_dir,
output=sys.stdout,
)
else:
with args.output.open("w") as fp:
collect_runtime(
apk=args.apk,
device=args.device,
file_storage=args.dex_dir,
output=fp,
)