add scan collection
This commit is contained in:
parent
decac18a0d
commit
48ec30204d
3 changed files with 83 additions and 3 deletions
|
|
@ -109,13 +109,13 @@ def main():
|
||||||
# Case 2: apk from SHA256
|
# Case 2: apk from SHA256
|
||||||
sha256s = []
|
sha256s = []
|
||||||
if args.sha256:
|
if args.sha256:
|
||||||
sha256s.append(args.sha256)
|
sha256s.append(args.sha256.upper())
|
||||||
if args.sha256_list:
|
if args.sha256_list:
|
||||||
with args.sha256_list.open("r") as file:
|
with args.sha256_list.open("r") as file:
|
||||||
for line in file:
|
for line in file:
|
||||||
if not line.strip():
|
if not line.strip():
|
||||||
continue
|
continue
|
||||||
sha256s.append(line.strip())
|
sha256s.append(line.strip().upper())
|
||||||
|
|
||||||
api_key = ""
|
api_key = ""
|
||||||
if args.api_key:
|
if args.api_key:
|
||||||
|
|
@ -151,3 +151,31 @@ def main():
|
||||||
else:
|
else:
|
||||||
with (args.output_dir / sha256).open("w") as file:
|
with (args.output_dir / sha256).open("w") as file:
|
||||||
file.write(entry)
|
file.write(entry)
|
||||||
|
|
||||||
|
|
||||||
|
def collect_to_db():
|
||||||
|
parser = ArgumentParser(
|
||||||
|
prog="Android Class Shadowing Scan Collector",
|
||||||
|
description="Collect Scan results into a database",
|
||||||
|
)
|
||||||
|
apk_parser = parser.add_mutually_exclusive_group(required=True)
|
||||||
|
parser.add_argument(
|
||||||
|
"--dir",
|
||||||
|
help="The directory where the scan results are",
|
||||||
|
type=Path,
|
||||||
|
required=True,
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--db",
|
||||||
|
help="Path to the database",
|
||||||
|
type=Path,
|
||||||
|
required=True,
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--androzoo-list",
|
||||||
|
help="The file 'latest.csv' or 'latest.csv.gz' from androzoo",
|
||||||
|
type=Path,
|
||||||
|
required=False,
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
load_from_directory(args.dir, args.db, args.androzoo_list)
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,10 @@
|
||||||
|
import sqlite3
|
||||||
|
import gzip
|
||||||
|
import csv
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
from dataclasses import dataclass, astuple, fields
|
from dataclasses import dataclass, astuple, fields
|
||||||
from typing import Self
|
from typing import Self, Iterator
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
|
@ -24,6 +29,8 @@ class ApkData:
|
||||||
has_classes_dex_over_10: bool
|
has_classes_dex_over_10: bool
|
||||||
has_non_numeric_classes_dex: bool
|
has_non_numeric_classes_dex: bool
|
||||||
has_non_consecutive_classes_dex: bool
|
has_non_consecutive_classes_dex: bool
|
||||||
|
year: int = -1
|
||||||
|
vt_detection: int = -1
|
||||||
|
|
||||||
def to_string(self) -> str:
|
def to_string(self) -> str:
|
||||||
return "|".join(map(str, astuple(self)))
|
return "|".join(map(str, astuple(self)))
|
||||||
|
|
@ -33,3 +40,47 @@ class ApkData:
|
||||||
return ApkData(
|
return ApkData(
|
||||||
*(map(lambda f_v: f_v[1] == "True" if f_v[0].type is bool else f_v[0].type(f_v[1]), zip(fields(ApkData), val.strip().split("|")))) # type: ignore
|
*(map(lambda f_v: f_v[1] == "True" if f_v[0].type is bool else f_v[0].type(f_v[1]), zip(fields(ApkData), val.strip().split("|")))) # type: ignore
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def load_from_directory(
|
||||||
|
directory: Path, database: Path, androzoo_csv: Path | None = None
|
||||||
|
):
|
||||||
|
androzoo_data: dict[str, None | tuple[int, int]] = {
|
||||||
|
path.name: None for path in directory.glob("*")
|
||||||
|
}
|
||||||
|
|
||||||
|
def open_zoofile(androzoo_csv):
|
||||||
|
if androzoo_csv.name.endswith(".gz"):
|
||||||
|
return gzip.open(androzoo_csv, mode="rt", encoding="utf-8")
|
||||||
|
return androzoo_csv.open("r")
|
||||||
|
|
||||||
|
if androzoo_csv is not None:
|
||||||
|
with open_zoofile(androzoo_csv) as file:
|
||||||
|
reader = csv.DictReader(file)
|
||||||
|
for row in reader:
|
||||||
|
if row["sha256"] in androzoo_data:
|
||||||
|
androzoo_data[row["sha256"]] = (
|
||||||
|
int(row["first_seen_year"]),
|
||||||
|
int(row["vt_detection"]),
|
||||||
|
)
|
||||||
|
|
||||||
|
def data_it():
|
||||||
|
for sha256, zoo_data in androzoo_data.items():
|
||||||
|
with (directory / sha256).open("r") as file:
|
||||||
|
data = ApkData.from_string(file.read().strip())
|
||||||
|
if zoo_data is not None:
|
||||||
|
data.year, data.vt_detection = zoo_data
|
||||||
|
yield data
|
||||||
|
|
||||||
|
save_data_in_db(database, data_it())
|
||||||
|
|
||||||
|
|
||||||
|
def save_data_in_db(database: Path, data: Iterator[ApkData]):
|
||||||
|
with sqlite3.connect(database) as conn:
|
||||||
|
conn.execute(
|
||||||
|
f"CREATE TABLE IF NOT EXISTS data({', '.join(map(lambda f: f.name,fields(ApkData)))})"
|
||||||
|
)
|
||||||
|
conn.executemany(
|
||||||
|
f"INSERT INTO data VALUES({', '.join(['?' for _ in fields(ApkData)])})",
|
||||||
|
map(astuple, data),
|
||||||
|
)
|
||||||
|
|
|
||||||
|
|
@ -20,3 +20,4 @@ build-backend = "poetry.core.masonry.api"
|
||||||
|
|
||||||
[tool.poetry.scripts]
|
[tool.poetry.scripts]
|
||||||
scan = 'android_class_shadowing_scanner.__init__:main'
|
scan = 'android_class_shadowing_scanner.__init__:main'
|
||||||
|
collect-scan = 'android_class_shadowing_scanner.__init__:collect_to_db'
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue