274 lines
8.2 KiB
Python
274 lines
8.2 KiB
Python
import argparse
|
|
import base64
|
|
import os
|
|
import hashlib
|
|
import subprocess
|
|
import time
|
|
import json
|
|
from pathlib import Path
|
|
|
|
import frida # type: ignore
|
|
from androguard.core.apk import get_apkid # type: ignore
|
|
from loguru import logger # type: ignore
|
|
|
|
logger.remove() # remove androguard logs
|
|
|
|
FRIDA_SCRIPT = Path(__file__).parent / "hook.js"
|
|
STACK_CONSUMER_B64 = Path(__file__).parent / "StackConsumer.dex.b64"
|
|
|
|
# The number of bytes used to encode a java hash (from Object.hashCode or System.identiyHashCode)
|
|
# The type is 'int', so it sould be a 32bit signed value?
|
|
HASH_NB_BYTES = 4
|
|
|
|
|
|
# Define handler to event generated by the scripts
|
|
def on_message(message, data, data_storage: dict, file_storage: Path):
|
|
if message["type"] == "error":
|
|
print(f"[error] {message['description']}")
|
|
print(message["stack"])
|
|
elif message["type"] == "send" and message["payload"]["type"] == "invoke":
|
|
handle_invoke_data(message["payload"]["data"], data_storage)
|
|
elif message["type"] == "send" and message["payload"]["type"] == "class-new-inst":
|
|
handle_class_new_inst_data(message["payload"]["data"], data_storage)
|
|
elif message["type"] == "send" and message["payload"]["type"] == "cnstr-new-isnt":
|
|
handle_cnstr_new_inst_data(message["payload"]["data"], data_storage)
|
|
elif message["type"] == "send" and message["payload"]["type"] == "load-dex":
|
|
handle_load_dex(message["payload"]["data"], data_storage, file_storage)
|
|
else:
|
|
print("[on_message] message:", message)
|
|
|
|
|
|
def print_stack(stack, prefix: str):
|
|
for frame in stack:
|
|
native = ""
|
|
if frame["is_native"]:
|
|
native = " (native)"
|
|
print(f"{prefix}{frame['method']}:{frame['bytecode_index']}{native}")
|
|
|
|
|
|
def handle_invoke_data(data, data_storage: dict):
|
|
method = data["method"]
|
|
# TODO: good idea?
|
|
if method in [
|
|
"Landroid/view/View;->getTranslationZ()F",
|
|
"Landroid/view/View;->getElevation()F",
|
|
]:
|
|
return
|
|
if len(data["stack"]) == 0:
|
|
return
|
|
caller_method = data["stack"][0]["method"]
|
|
addr = data["stack"][0]["bytecode_index"]
|
|
is_static = data["is_static"]
|
|
if is_static:
|
|
is_static_str = " (static)"
|
|
else:
|
|
is_static_str = ""
|
|
print("Method.Invoke:")
|
|
print(f" called: {method}{is_static_str}")
|
|
print(f" by: {caller_method}")
|
|
print(f" at: 0x{addr:08x}")
|
|
# print(f" stack:")
|
|
# print_stack(data["stack"], " ")
|
|
if addr < 0:
|
|
return
|
|
data_storage["invoke_data"].append(
|
|
{
|
|
"method": method,
|
|
"caller_method": caller_method,
|
|
"addr": addr,
|
|
"is_static": is_static,
|
|
}
|
|
)
|
|
|
|
|
|
def handle_class_new_inst_data(data, data_storage: dict):
|
|
constructor = data["constructor"]
|
|
if len(data["stack"]) == 0:
|
|
return
|
|
if (
|
|
data["stack"][0]["method"]
|
|
!= "Ljava/lang/Class;->newInstance()Ljava/lang/Object;"
|
|
):
|
|
frame = data["stack"][0]
|
|
elif len(data["stack"]) > 1:
|
|
frame = data["stack"][1]
|
|
else:
|
|
return
|
|
caller_method = frame["method"]
|
|
addr = frame["bytecode_index"]
|
|
print("Class.NewInstance:")
|
|
print(f" called: {constructor}")
|
|
print(f" by: {caller_method}")
|
|
print(f" at: 0x{addr:08x}")
|
|
# print(f" stack:")
|
|
# print_stack(data["stack"], " ")
|
|
if addr < 0:
|
|
return
|
|
data_storage["class_new_inst_data"].append(
|
|
{
|
|
"constructor": constructor,
|
|
"caller_method": caller_method,
|
|
"addr": addr,
|
|
}
|
|
)
|
|
|
|
|
|
def handle_cnstr_new_inst_data(data, data_storage: dict):
|
|
constructor = data["constructor"]
|
|
if not constructor.startswith("Lcom/example/theseus"):
|
|
return
|
|
if len(data["stack"]) == 0:
|
|
return
|
|
caller_method = data["stack"][0]["method"]
|
|
addr = data["stack"][0]["bytecode_index"]
|
|
print("Constructor.newInstance:")
|
|
print(f" called: {constructor}")
|
|
print(f" by: {caller_method}")
|
|
print(f" at: 0x{addr:08x}")
|
|
# print(f" stack:")
|
|
# print_stack(data["stack"], " ")
|
|
if addr < 0:
|
|
return
|
|
data_storage["cnstr_new_inst_data"].append(
|
|
{
|
|
"constructor": constructor,
|
|
"caller_method": caller_method,
|
|
"addr": addr,
|
|
}
|
|
)
|
|
|
|
|
|
def handle_load_dex(data, data_storage: dict, file_storage: Path):
|
|
dex = data["dex"]
|
|
classloader_class = data["classloader_class"]
|
|
classloader = data["classloader"]
|
|
if classloader < 0:
|
|
classloader += 2 << (HASH_NB_BYTES * 8 - 1)
|
|
classloader = classloader.to_bytes(HASH_NB_BYTES).hex()
|
|
short_class = classloader_class.split("/")[-1].removesuffix(";")
|
|
files = []
|
|
print("DEX file loaded:")
|
|
print(f" by: {classloader_class} ({classloader})")
|
|
for file in dex:
|
|
file_bin = base64.b64decode(file)
|
|
hasher = hashlib.sha1()
|
|
hasher.update(file_bin)
|
|
h = hasher.digest().hex()
|
|
print(f" hash: {h}")
|
|
fname = (
|
|
file_storage / f"{short_class}_{classloader}_{h[:16]}.bytecode"
|
|
) # not .dex, can also be .jar or .apk or .oat or ...
|
|
i = 1
|
|
while fname.exists():
|
|
fname = file_storage / f"{short_class}_{classloader}_{h[:16]}_{i}.bytecode"
|
|
i += 1
|
|
fname = fname.absolute().resolve()
|
|
|
|
with fname.open("wb") as fp:
|
|
fp.write(file_bin)
|
|
print(f" stored: {str(fname)}")
|
|
files.append(str(fname))
|
|
data_storage["dyn_code_load"].append(
|
|
{
|
|
"classloader_class": classloader_class,
|
|
"classloader": classloader,
|
|
"files": files,
|
|
}
|
|
)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
prog="Android Theseus project",
|
|
)
|
|
parser.add_argument(
|
|
"-a", "--apk", required=True, help="Target application", type=Path
|
|
)
|
|
parser.add_argument(
|
|
"-s",
|
|
"--device",
|
|
default="",
|
|
help="The android device to connect to, eg: 'emulator-5554'",
|
|
type=str,
|
|
)
|
|
parser.add_argument(
|
|
"-o",
|
|
"--output",
|
|
default=None,
|
|
help="where to dump the collected data, default is stdout",
|
|
type=Path,
|
|
)
|
|
parser.add_argument(
|
|
"-d",
|
|
"--dex-dir",
|
|
default=Path("."),
|
|
help="where to store dynamically loaded bytecode",
|
|
type=Path,
|
|
)
|
|
args = parser.parse_args()
|
|
env = dict(os.environ)
|
|
|
|
file_storage = args.dex_dir
|
|
if not file_storage.exists():
|
|
file_storage.mkdir(parents=True)
|
|
if not file_storage.is_dir():
|
|
print("--dex-dir must be a directory")
|
|
exit()
|
|
|
|
if args.device != "":
|
|
device = frida.get_device(args.device)
|
|
env["ANDROID_SERIAL"] = args.device
|
|
else:
|
|
device = frida.get_usb_device()
|
|
|
|
app = get_apkid(args.apk)[0]
|
|
|
|
if device.enumerate_applications([app]):
|
|
# Uninstall the APK if it already exist
|
|
subprocess.run(["adb", "uninstall", app], env=env)
|
|
subprocess.run(["adb", "install", str(args.apk.absolute())], env=env)
|
|
|
|
with FRIDA_SCRIPT.open("r") as file:
|
|
script = file.read()
|
|
with STACK_CONSUMER_B64.open("r") as file:
|
|
script = script.replace(
|
|
"<PYTHON REPLACE StackConsumer.dex.b64>",
|
|
file.read().replace("\n", "").strip(),
|
|
)
|
|
|
|
pid = device.spawn([app])
|
|
session = device.attach(pid)
|
|
try:
|
|
script = session.create_script(script)
|
|
except frida.InvalidArgumentError as e:
|
|
print(
|
|
"\n".join(
|
|
map(lambda v: f"{v[0]+1: 3} {v[1]}", enumerate(script.split("\n")))
|
|
)
|
|
)
|
|
raise e
|
|
|
|
data_storage = {
|
|
"invoke_data": [],
|
|
"class_new_inst_data": [],
|
|
"cnstr_new_inst_data": [],
|
|
"dyn_code_load": [],
|
|
}
|
|
|
|
script.on(
|
|
"message",
|
|
lambda msg, data: on_message(msg, data, data_storage, file_storage),
|
|
)
|
|
|
|
# Load script
|
|
script.load()
|
|
# Resume the execution of the APK
|
|
device.resume(pid)
|
|
|
|
print("Press ENTER to finish the analysis")
|
|
input()
|
|
if args.output is None:
|
|
print(json.dumps(data_storage, indent=" "))
|
|
else:
|
|
with args.output.open("w") as fp:
|
|
json.dump(data_storage, fp)
|