exp 0.5: dynamic analysis

This commit is contained in:
Jean-Marie Mineau 2025-05-13 17:52:34 +02:00
parent ca38a9ada6
commit 80f7fe75cf
Signed by: histausse
GPG key ID: B66AEEDA9B645AD2
5 changed files with 241 additions and 6 deletions

View file

@ -0,0 +1,5 @@
#!/usr/bin/bash
FOLDER=$(dirname "$(realpath $0)")
"${FOLDER}/venv/bin/collect-runtime-data" --apk "${1}" --device "${2}" --output "${3}/data.json" --dex-dir "${3}"

196
experiment/orchestrator.py Normal file
View file

@ -0,0 +1,196 @@
from pathlib import Path
import os
import time
import subprocess
import threading
import argparse
EMULATORS = ["root34-1", "root34-2"]
ANDROID_IMG = "system-images;android-34;default;x86_64"
if "ANDROID_HOME" in os.environ:
ANDROID_HOME = Path(os.environ["ANDROID_HOME"])
else:
ANDROID_HOME = Path.home() / "Android" / "Sdk"
EMULATOR = str(ANDROID_HOME / "emulator" / "emulator")
AVDMANAGER = str(ANDROID_HOME / "cmdline-tools" / "latest" / "bin" / "avdmanager")
ADB = str(ANDROID_HOME / "platform-tools" / "adb")
def get_ports(emu: str) -> tuple[int, int]:
"""Return the console port and adb port for the emulator."""
i = EMULATORS.index(emu) * 2
return (5554 + i, 5554 + i + 1)
def get_installed_emu() -> set[str]:
"""List name of installed emulators"""
return set(
subprocess.run([EMULATOR, "-list-avds"], stdout=subprocess.PIPE)
.stdout.decode("utf-8")
.strip()
.split("\n")
)
def gen_emulators():
emu_lst = get_installed_emu()
for emu in EMULATORS:
if emu not in emu_lst:
subprocess.run(
[
AVDMANAGER,
"create",
"avd",
"--name",
emu,
"--package",
ANDROID_IMG,
"--sdcard",
"512M",
"--device",
"medium_phone",
]
)
def del_emulators():
emu_lst = get_installed_emu()
for emu in EMULATORS:
if emu in emu_lst:
subprocess.run(
[
AVDMANAGER,
"delete",
"avd",
"--name",
emu,
]
)
# def make_snapshot(folder: Path):
# for emu in EMULATORS:
# console_port, adb_port = get_ports(emu)
# proc = subprocess.Popen(
# [
# EMULATOR,
# "-avd",
# emu,
# "-no-window",
# "-no-metrics",
# "-debug-init",
# "-logcat",
# "*:v",
# "-ports",
# f"{console_port},{adb_port}",
# ]
# )
# subprocess.run([ADB, "-s", f"emulator-{console_port}", "wait-for-device"])
# subprocess.run(
# [
# ADB,
# "-s",
# f"emulator-{console_port}",
# "emu",
# "avd",
# "snapshot",
# "save",
# "baseline",
# ]
# )
def worker(emu: str, apklist: list[str], out_folder: Path, script: Path):
console_port, adb_port = get_ports(emu)
while apklist:
apk = apklist.pop()
folder_name = apk.split("/")[-1].removesuffix(".apk")
folder = out_folder / folder_name
if folder.exists():
continue
folder.mkdir(parents=True)
# Start emulator with wipped data
proc = subprocess.Popen(
[
EMULATOR,
"-avd",
emu,
"-wipe-data",
"-no-window",
"-no-metrics",
"-debug-init", # dunno why but sometime needed
"-ports",
f"{console_port},{adb_port}",
]
)
# Run script
subprocess.run(
["bash", str(script), f"emulator-{console_port}", apk, str(out_folder)]
)
# stop emulator
try:
subprocess.run(
[
ADB,
"-s",
f"emulator-{console_port}",
"emu",
"kill",
],
timeout=3,
)
except subprocess.TimeoutExpired:
pass
if proc.poll() is None:
proc.kill()
time.sleep(3)
def run(apklist: list[str], out_folder: Path, script: Path):
workers = []
for emu in EMULATORS:
workers.append(
threading.Thread(target=lambda: worker(emu, apklist, out_folder, script))
)
workers[-1].start()
for w in workers:
w.join()
def main():
parser = argparse.ArgumentParser(
prog="orchestrator",
description="Run several android emulators en run analysis on applications",
)
parser.add_argument(
"applist",
type=Path,
help="File containing the path to applications, one by line",
)
parser.add_argument(
"out_folder",
type=Path,
help="The folder where to store the results of the analysis, a folder for each application will be created in it",
)
parser.add_argument(
"analysis_script",
type=Path,
help=(
"The script to run the analysis. The script will be invoke with "
"`bash analysis_script.sh path/of/app.apk emulator-5554 path/of/out_folder/app/`"
),
)
args = parser.parse_args()
with args.applist.open("r") as fp:
apklist = fp.readlines()
run(apklist, args.out_folder, args.analysis_script)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,30 @@
#!/usr/bin/bash
FOLDER=$(dirname "$(realpath $0)")
APK_DIR="${1}"
RES_DIR="${2}"
if [ ! -f "${APK_DIR}" ]; then
echo "Usage: bash ${0} /path/to/apk/dir /path/to/result/dir"
echo " /path/to/apk/dir is the folder where to store the application downloaded"
echo " /path/to/result/dir is the folder where to store the analysis results"
exit
fi
if [ ! -f "${RES_DIR}" ]; then
echo "Usage: bash ${0} /path/to/apk/dir /path/to/result/dir"
echo " /path/to/apk/dir is the folder where to store the application downloaded"
echo " /path/to/result/dir is the folder where to store the analysis results"
exit
fi
TMP_DIR=$(mktemp -d)
python3 -m venv "${FOLDER}/venv"
"${FOLDER}/venv/bin/pip" install "${FOLDER}/../frida"
"${FOLDER}/venv/bin/pip" install "git+ssh://git@gitlab.inria.fr/CIDRE/malware/grodd-runner.git"
ls "${APK_DIR}"/*.apk > "${TMP_DIR}/apklist.txt"
python3 "${FOLDER}/orchestrator.py" "${TMP_DIR}/apklist.txt" "${RES_DIR}" "${FOLDER}/frida_collect.sh"

View file

@ -423,9 +423,11 @@ def collect_runtime(
# print(f"[*] Classloader list received" + " " * 20)
if apk_explorer is None:
explore_app(app, device=device.id, android_sdk=android_sdk_path)
exploration_data = explore_app(
app, device=device.id, android_sdk=android_sdk_path
)
else:
apk_explorer()
exploration_data = apk_explorer()
# Try to find the Main class loader
main_class_loader: str | None = None
@ -490,6 +492,7 @@ def collect_runtime(
else:
main_class_loader = list(cls.keys())[0]
data_storage["apk_cl_id"] = main_class_loader
data_storage["exploration_data"] = exploration_data
json.dump(data_storage, output, indent=" ")

View file

@ -13,10 +13,10 @@ def explore_app(
package: str,
device: str = "emulator-5554",
android_sdk: Path | None = None,
):
) -> dict:
if USE_GRODD:
time.sleep(1) # let the app load
grodd_runner(
return grodd_runner(
"grodd",
device,
timeout=300,
@ -29,9 +29,10 @@ def explore_app(
print(
"\033[31mGrodd is not available, you need to explore the app manually\033[0m"
)
manual_exploration()
return manual_exploration()
def manual_exploration():
def manual_exploration() -> dict:
print("==> Press ENTER to end the analysis <==")
input()
return {}