android_of_theseus/frida/theseus_frida/__init__.py

339 lines
10 KiB
Python

import argparse
import base64
import os
import hashlib
import subprocess
import time
import json
from pathlib import Path
import frida # type: ignore
from androguard.core.apk import get_apkid # type: ignore
from loguru import logger # type: ignore
logger.remove() # remove androguard logs
FRIDA_SCRIPT = Path(__file__).parent / "hook.js"
STACK_CONSUMER_B64 = Path(__file__).parent / "StackConsumer.dex.b64"
# The number of bytes used to encode a java hash (from Object.hashCode or System.identiyHashCode)
# The type is 'int', so it sould be a 32bit signed value?
HASH_NB_BYTES = 4
# Define handler to event generated by the scripts
def on_message(message, data, data_storage: dict, file_storage: Path):
if message["type"] == "error":
print(f"[!] {message['description']}")
print(" " + message["stack"].replace("\n", "\n "))
elif message["type"] == "send" and message["payload"]["type"] == "invoke":
handle_invoke_data(message["payload"]["data"], data_storage)
elif message["type"] == "send" and message["payload"]["type"] == "class-new-inst":
handle_class_new_inst_data(message["payload"]["data"], data_storage)
elif message["type"] == "send" and message["payload"]["type"] == "cnstr-new-isnt":
handle_cnstr_new_inst_data(message["payload"]["data"], data_storage)
elif message["type"] == "send" and message["payload"]["type"] == "load-dex":
handle_load_dex(message["payload"]["data"], data_storage, file_storage)
else:
print("[-] message:", message)
def print_stack(stack, prefix: str):
for frame in stack:
native = ""
if frame["is_native"]:
native = " (native)"
print(f" {prefix}{frame['method']}:{frame['bytecode_index']}{native}")
def handle_invoke_data(data, data_storage: dict):
method = data["method"]
# TODO: good idea?
if method in [
"Landroid/view/View;->getTranslationZ()F",
"Landroid/view/View;->getElevation()F",
]:
return
if len(data["stack"]) == 0:
return
caller_method = data["stack"][0]["method"]
addr = data["stack"][0]["bytecode_index"]
is_static = data["is_static"]
if is_static:
is_static_str = " (static)"
else:
is_static_str = ""
print("[+] Method.Invoke:")
print(f" called: {method}{is_static_str}")
print(f" by: {caller_method}")
print(f" at: 0x{addr:08x}")
# print(f" stack:")
# print_stack(data["stack"], " ")
if addr < 0:
return
data_storage["invoke_data"].append(
{
"method": method,
"caller_method": caller_method,
"addr": addr,
"is_static": is_static,
}
)
def handle_class_new_inst_data(data, data_storage: dict):
constructor = data["constructor"]
if len(data["stack"]) == 0:
return
if (
data["stack"][0]["method"]
!= "Ljava/lang/Class;->newInstance()Ljava/lang/Object;"
):
frame = data["stack"][0]
elif len(data["stack"]) > 1:
frame = data["stack"][1]
else:
return
caller_method = frame["method"]
addr = frame["bytecode_index"]
print("[+] Class.NewInstance:")
print(f" called: {constructor}")
print(f" by: {caller_method}")
print(f" at: 0x{addr:08x}")
# print(f" stack:")
# print_stack(data["stack"], " ")
if addr < 0:
return
data_storage["class_new_inst_data"].append(
{
"constructor": constructor,
"caller_method": caller_method,
"addr": addr,
}
)
def handle_cnstr_new_inst_data(data, data_storage: dict):
constructor = data["constructor"]
if not constructor.startswith("Lcom/example/theseus"):
return
if len(data["stack"]) == 0:
return
caller_method = data["stack"][0]["method"]
addr = data["stack"][0]["bytecode_index"]
print("[+] Constructor.newInstance:")
print(f" called: {constructor}")
print(f" by: {caller_method}")
print(f" at: 0x{addr:08x}")
# print(f" stack:")
# print_stack(data["stack"], " ")
if addr < 0:
return
data_storage["cnstr_new_inst_data"].append(
{
"constructor": constructor,
"caller_method": caller_method,
"addr": addr,
}
)
def handle_load_dex(data, data_storage: dict, file_storage: Path):
dex = data["dex"]
classloader_class = data["classloader_class"]
classloader = data["classloader"]
if classloader < 0:
classloader += 2 << (HASH_NB_BYTES * 8 - 1)
classloader = classloader.to_bytes(HASH_NB_BYTES).hex()
short_class = classloader_class.split("/")[-1].removesuffix(";")
files = []
print("[+] DEX file loaded:")
print(f" by: {classloader_class} ({classloader})")
for file in dex:
file_bin = base64.b64decode(file)
hasher = hashlib.sha1()
hasher.update(file_bin)
h = hasher.digest().hex()
print(f" hash: {h}")
fname = (
file_storage / f"{short_class}_{classloader}_{h[:16]}.bytecode"
) # not .dex, can also be .jar or .apk or .oat or ...
i = 1
while fname.exists():
fname = file_storage / f"{short_class}_{classloader}_{h[:16]}_{i}.bytecode"
i += 1
fname = fname.absolute().resolve()
with fname.open("wb") as fp:
fp.write(file_bin)
print(f" stored: {str(fname)}")
files.append(str(fname))
data_storage["dyn_code_load"].append(
{
"classloader_class": classloader_class,
"classloader": classloader,
"files": files,
}
)
FRIDA_SERVER_BIN = Path(__file__).parent / "frida-server-16.7.0-android-x86_64.xz"
FRIDA_SERVER_ANDROID_PATH = "/data/local/tmp/frida-server"
def setup_frida(device: str, env: dict[str, str]) -> frida.core.Device:
if device != "":
device = frida.get_device(args.device)
env["ANDROID_SERIAL"] = args.device
else:
device = frida.get_usb_device()
try:
s = device.attach(0)
s.detach()
return device
except frida.ServerNotRunningError:
pass
# Start server
proc = subprocess.run(
["adb", "shell", "whoami"], encoding="utf-8", stdout=subprocess.PIPE, env=env
)
if proc.stdout.strip() != "root":
proc = subprocess.run(["adb", "root"], env=env)
# Rooting adb will disconnect the device
if device != "":
device = frida.get_device(device)
else:
device = frida.get_usb_device()
perm = subprocess.run(
["adb", "shell", "stat", "-c", "%a", FRIDA_SERVER_ANDROID_PATH],
encoding="utf-8",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
).stdout.strip()
need_perm_resset = (perm == "") or perm[0] not in [
"1",
"3",
"5",
"7",
] # int(perm[0]) & 1 == 1
if perm == "":
subprocess.run(
[
"adb",
"push",
str(FRIDA_SERVER_BIN.absolute()),
FRIDA_SERVER_ANDROID_PATH,
],
env=env,
)
if need_perm_resset:
subprocess.run(["adb", "chmod", "755", FRIDA_SERVER_ANDROID_PATH], env=env)
subprocess.Popen(["adb", "shell", FRIDA_SERVER_ANDROID_PATH], env=env)
# The server take some time to start
# time.sleep(3)
while True:
try:
s = device.attach(0)
s.detach()
print("[*] Server started: begin analysis ")
return device
except frida.ServerNotRunningError:
print("[-] Waiting for frida server to start", end="\r")
time.sleep(0.3)
def main():
parser = argparse.ArgumentParser(
prog="Android Theseus project",
)
parser.add_argument(
"-a", "--apk", required=True, help="Target application", type=Path
)
parser.add_argument(
"-s",
"--device",
default="",
help="The android device to connect to, eg: 'emulator-5554'",
type=str,
)
parser.add_argument(
"-o",
"--output",
default=None,
help="where to dump the collected data, default is stdout",
type=Path,
)
parser.add_argument(
"-d",
"--dex-dir",
default=Path("."),
help="where to store dynamically loaded bytecode",
type=Path,
)
args = parser.parse_args()
env = dict(os.environ)
file_storage = args.dex_dir
if not file_storage.exists():
file_storage.mkdir(parents=True)
if not file_storage.is_dir():
print("[!] --dex-dir must be a directory")
exit()
device = setup_frida(args.device, env)
app = get_apkid(args.apk)[0]
if device.enumerate_applications([app]):
# Uninstall the APK if it already exist
subprocess.run(["adb", "uninstall", app], env=env)
subprocess.run(["adb", "install", str(args.apk.absolute())], env=env)
with FRIDA_SCRIPT.open("r") as file:
script = file.read()
with STACK_CONSUMER_B64.open("r") as file:
script = script.replace(
"<PYTHON REPLACE StackConsumer.dex.b64>",
file.read().replace("\n", "").strip(),
)
pid = device.spawn([app])
session = device.attach(pid)
try:
script = session.create_script(script)
except frida.InvalidArgumentError as e:
print("[!] Error:")
print(
" "
+ "\n ".join(
map(lambda v: f"{v[0]+1: 3} {v[1]}", enumerate(script.split("\n")))
)
)
raise e
data_storage = {
"invoke_data": [],
"class_new_inst_data": [],
"cnstr_new_inst_data": [],
"dyn_code_load": [],
}
script.on(
"message",
lambda msg, data: on_message(msg, data, data_storage, file_storage),
)
# Load script
script.load()
# Resume the execution of the APK
device.resume(pid)
print("==> Press ENTER to finish the analysis <==")
input()
if args.output is None:
print(json.dumps(data_storage, indent=" "))
else:
with args.output.open("w") as fp:
json.dump(data_storage, fp)