507 lines
17 KiB
Python
507 lines
17 KiB
Python
import re
|
|
from pathlib import Path
|
|
from more_itertools import peekable
|
|
from typing import Any, Optional, Type
|
|
|
|
|
|
class LoggedError:
|
|
"""A class representing an error parsed from a log file."""
|
|
|
|
def __str__(self):
|
|
pass
|
|
|
|
def get_dict(self) -> dict: # type: ignore
|
|
pass
|
|
|
|
def set_logfile_name(self, file: str):
|
|
"""Register the name of the file containing the error log"""
|
|
self.logfile_name = file
|
|
|
|
@staticmethod
|
|
def parse_error(logs: "peekable[tuple[int, str]]") -> Optional["LoggedError"]: # type: ignore
|
|
pass
|
|
|
|
|
|
class JavaError(LoggedError):
|
|
java_error_re = re.compile(
|
|
r"(?:Exception in thread \".+?\"|Caused by:) ([a-zA-Z0-9.$]+?)(?:: *(.*))?(?: *~\[.*\])?$"
|
|
)
|
|
java_stack_re = re.compile(r"[ \t]*at (.*)\((.*?)(?::(\d*))?\)(?: *~\[.*\])?$")
|
|
# TODO: link "Caused by:" to exception?
|
|
|
|
def __init__(
|
|
self,
|
|
first_line_nb: int,
|
|
last_line_nb: int,
|
|
error: str,
|
|
msg: str,
|
|
stack: list,
|
|
logfile_name: str = "",
|
|
):
|
|
self.first_line_nb = first_line_nb
|
|
self.last_line_nb = last_line_nb
|
|
self.error = error
|
|
self.msg = msg
|
|
self.stack = stack
|
|
self.logfile_name = logfile_name
|
|
|
|
def __str__(self):
|
|
stack = "\n at ".join(
|
|
map(lambda e: f"{e['method']}({e['class']}:{e['line']})", self.stack)
|
|
)
|
|
return f"{self.error}: {self.msg}{stack}\n"
|
|
|
|
def get_dict(self) -> dict:
|
|
return {
|
|
"error_type": "Java",
|
|
"error": self.error,
|
|
"msg": self.msg,
|
|
"stack": self.stack,
|
|
"first_line": self.first_line_nb,
|
|
"last_line": self.last_line_nb,
|
|
"logfile_name": self.logfile_name,
|
|
}
|
|
|
|
@staticmethod
|
|
def parse_error(logs: "peekable[tuple[int, str]]") -> Optional["JavaError"]:
|
|
"""Return the JavaError at the begenning of the logs if there is one, else return None.
|
|
If there is a JavaError at the begenning of the logs, the iterator of the logs will
|
|
consume all the lines of the error, else the iterator will not be modified."""
|
|
line_nb, line = logs.peek((None, None))
|
|
if line is None or line_nb is None:
|
|
return None
|
|
match = JavaError.java_error_re.match(line)
|
|
if match is None:
|
|
return None
|
|
if match.group(2) is None:
|
|
msg = ""
|
|
else:
|
|
msg = match.group(2)
|
|
error = JavaError(line_nb, line_nb, match.group(1), msg, [])
|
|
next(logs)
|
|
while True:
|
|
line_nb, line = logs.peek((None, None))
|
|
if line is None or line_nb is None:
|
|
return error
|
|
match = JavaError.java_stack_re.match(line)
|
|
if match is None:
|
|
return error
|
|
line_dsc = {
|
|
"method": match.group(1),
|
|
"class": match.group(2),
|
|
"line": match.group(3),
|
|
}
|
|
if len(error.stack) == 0 or (
|
|
error.stack[-1]["method"] != line_dsc["method"]
|
|
and error.stack[-1]["class"] != line_dsc["method"]
|
|
):
|
|
error.stack.append(line_dsc)
|
|
error.last_line_nb = line_nb
|
|
next(logs)
|
|
|
|
|
|
class NoPrefixJavaError(JavaError):
|
|
java_error_re = re.compile(r"([a-zA-Z0-9.$]+?)(?:: *(.*))?(?: *~\[.*\])?$")
|
|
java_stack_re = re.compile(r"[ \t]*at (.*)\((.*?)(?::(\d*))?\)(?: *~\[.*\])?$")
|
|
|
|
@staticmethod
|
|
def parse_error(
|
|
logs: "peekable[tuple[int, str]]",
|
|
) -> Optional["NoPrefixJavaError"]:
|
|
line_nb, line = logs.peek((None, None))
|
|
if line is None or line_nb is None:
|
|
return None
|
|
match = NoPrefixJavaError.java_error_re.match(line)
|
|
if match is None:
|
|
return None
|
|
if match.group(2) is None:
|
|
msg = ""
|
|
else:
|
|
msg = match.group(2)
|
|
error = NoPrefixJavaError(line_nb, line_nb, match.group(1), msg, [])
|
|
# Check that the next line match java_stack_re to reduce false possitives
|
|
try:
|
|
line_nb, line = logs[1]
|
|
except IndexError:
|
|
return None
|
|
if NoPrefixJavaError.java_stack_re.match(line) is None:
|
|
return None
|
|
next(logs)
|
|
while True:
|
|
line_nb, line = logs.peek((None, None))
|
|
if line is None or line_nb is None:
|
|
return error
|
|
match = NoPrefixJavaError.java_stack_re.match(line)
|
|
if match is None:
|
|
return error
|
|
line_dsc = {
|
|
"method": match.group(1),
|
|
"class": match.group(2),
|
|
"line": match.group(3),
|
|
}
|
|
|
|
if len(error.stack) == 0 or (
|
|
error.stack[-1]["method"] != line_dsc["method"]
|
|
and error.stack[-1]["class"] != line_dsc["method"]
|
|
):
|
|
error.stack.append(line_dsc)
|
|
error.last_line_nb = line_nb
|
|
next(logs)
|
|
return None
|
|
|
|
|
|
class PythonError(LoggedError):
|
|
python_error_traceback_re = re.compile(r"Traceback \(most recent call last\):$")
|
|
python_error_file_re = re.compile(r" File \"(.+?)\", line (\d+?), in (.*)$")
|
|
python_error_code_re = re.compile(r" (.*)$")
|
|
python_error_msg_re = re.compile(r"(.*?)(?:: (.*))?$")
|
|
|
|
def __init__(
|
|
self,
|
|
first_line_nb: int,
|
|
last_line_nb: int,
|
|
error: str,
|
|
msg: str,
|
|
stack: list,
|
|
logfile_name: str = "",
|
|
):
|
|
self.first_line_nb = first_line_nb
|
|
self.last_line_nb = last_line_nb
|
|
self.error = error
|
|
self.msg = msg
|
|
self.stack = stack
|
|
self.logfile_name = logfile_name
|
|
|
|
def __str__(self):
|
|
stack = "\n".join(
|
|
map(
|
|
lambda d: (
|
|
f" File \"{d['file']}\", line {d['line']}, in {d['module']}\n"
|
|
f" {d['code']}"
|
|
),
|
|
self.stack,
|
|
)
|
|
)
|
|
return (
|
|
"Traceback (most recent call last):\n"
|
|
f"{stack}\n"
|
|
f"{self.error}: {self.msg}\n"
|
|
)
|
|
|
|
def get_dict(self) -> dict:
|
|
return {
|
|
"error_type": "Python",
|
|
"error": self.error,
|
|
"msg": self.msg,
|
|
"stack": self.stack,
|
|
"first_line": self.first_line_nb,
|
|
"last_line": self.last_line_nb,
|
|
"logfile_name": self.logfile_name,
|
|
}
|
|
|
|
# TODO: why peekable[str] crashes?
|
|
@staticmethod
|
|
def parse_error(logs: "peekable[tuple[int, str]]") -> Optional["PythonError"]:
|
|
"""Return the PythonError at the begenning of the logs if there is one, else return None.
|
|
If there is a PythonError at the begenning of the logs, the iterator of the logs will
|
|
consume all the lines of the error, else the iterator will not be modified."""
|
|
line_nb, line = logs.peek((None, None))
|
|
if line is None or line_nb is None:
|
|
return None
|
|
match = PythonError.python_error_traceback_re.match(line)
|
|
if match is None:
|
|
return None
|
|
error = PythonError(line_nb, line_nb, "", "", [])
|
|
next(logs)
|
|
while True:
|
|
line_nb, line = logs.peek((None, None))
|
|
if line is None or line_nb is None:
|
|
break
|
|
match = PythonError.python_error_file_re.match(line)
|
|
if match is None:
|
|
break
|
|
new_stack_line = {
|
|
"file": match.group(1),
|
|
"line": match.group(2),
|
|
"module": match.group(3),
|
|
"code": "",
|
|
}
|
|
if len(error.stack) == 0 or error.stack[-1] != new_stack_line:
|
|
error.stack.append(new_stack_line)
|
|
error.last_line_nb = line_nb
|
|
next(logs)
|
|
line_nb, line = logs.peek((None, None))
|
|
if line is None or line_nb is None:
|
|
break
|
|
match = PythonError.python_error_code_re.match(line)
|
|
if match is None:
|
|
break
|
|
new_stack_line["code"] = match.group(1)
|
|
error.last_line_nb = line_nb
|
|
next(logs)
|
|
line_nb, line = logs.peek((None, None))
|
|
if line is None:
|
|
raise RuntimeError("Found EOF before en of Python trackback")
|
|
match = PythonError.python_error_msg_re.match(line)
|
|
if match is None:
|
|
raise RuntimeError("Last line of python traceback not found")
|
|
error.error = match.group(1)
|
|
error.msg = str(match.group(2))
|
|
return error
|
|
|
|
|
|
class Python311Error(PythonError):
|
|
python_code_marker_re = re.compile(r"^ *~*\^+~* *$")
|
|
|
|
@staticmethod
|
|
def parse_error(logs: "peekable[tuple[int, str]]") -> Optional["Python311Error"]:
|
|
"""Return the PythonError at the begenning of the logs if there is one, else return None.
|
|
If there is a PythonError at the begenning of the logs, the iterator of the logs will
|
|
consume all the lines of the error, else the iterator will not be modified."""
|
|
line_nb, line = logs.peek((None, None))
|
|
if line is None or line_nb is None:
|
|
return None
|
|
match = PythonError.python_error_traceback_re.match(line)
|
|
if match is None:
|
|
return None
|
|
error = PythonError(line_nb, line_nb, "", "", [])
|
|
next(logs)
|
|
while True:
|
|
line_nb, line = logs.peek((None, None))
|
|
if line is None or line_nb is None:
|
|
break
|
|
match = PythonError.python_error_file_re.match(line)
|
|
if match is None:
|
|
break
|
|
new_stack_line = {
|
|
"file": match.group(1),
|
|
"line": match.group(2),
|
|
"module": match.group(3),
|
|
"code": "",
|
|
}
|
|
if len(error.stack) == 0 or error.stack[-1] != new_stack_line:
|
|
error.stack.append(new_stack_line)
|
|
error.last_line_nb = line_nb
|
|
next(logs)
|
|
line_nb, line = logs.peek((None, None))
|
|
if line is None or line_nb is None:
|
|
break
|
|
match = PythonError.python_error_code_re.match(line)
|
|
if match is None:
|
|
break
|
|
new_stack_line["code"] = match.group(1)
|
|
error.last_line_nb = line_nb
|
|
next(logs)
|
|
line_nb, line = logs.peek((None, None))
|
|
if line is None or line_nb is None:
|
|
break
|
|
match = Python311Error.python_code_marker_re.match(line)
|
|
if match is not None:
|
|
next(logs)
|
|
line_nb, line = logs.peek((None, None))
|
|
if line is None:
|
|
raise RuntimeError("Found EOF before en of Python trackback")
|
|
match = PythonError.python_error_msg_re.match(line)
|
|
if match is None:
|
|
raise RuntimeError("Last line of python traceback not found")
|
|
error.error = match.group(1)
|
|
error.msg = str(match.group(2))
|
|
return error
|
|
|
|
|
|
class RubyError(LoggedError):
|
|
ruby_error_re = re.compile(r"(.*?\.rb):(\d*):in `(.*?)'(?:: (.*))?$")
|
|
ruby_stack_re = re.compile(r"[ \t]*from (.*?\.rb):(\d*):in `(.*?)'")
|
|
|
|
def __init__(
|
|
self,
|
|
first_line_nb: int,
|
|
last_line_nb: int,
|
|
file: str,
|
|
line: str,
|
|
function: str,
|
|
msg: str,
|
|
stack: list,
|
|
logfile_name: str = "",
|
|
):
|
|
self.first_line_nb = first_line_nb
|
|
self.last_line_nb = last_line_nb
|
|
self.file = file
|
|
self.line = line
|
|
self.function = function
|
|
self.msg = msg
|
|
self.stack = stack
|
|
self.logfile_name = logfile_name
|
|
|
|
def __str__(self):
|
|
stack = "\n at ".join(
|
|
map(
|
|
lambda e: f"from {e['file']}:({e['line']}:in `{e['function']}')",
|
|
self.stack,
|
|
)
|
|
)
|
|
return f"{self.file}:{self.line}:in `{self.function}':{self.msg}{stack}\n"
|
|
|
|
def get_dict(self) -> dict:
|
|
return {
|
|
"error_type": "Ruby",
|
|
"file": self.file,
|
|
"line": self.line,
|
|
"function": self.function,
|
|
"msg": self.msg,
|
|
"stack": self.stack,
|
|
"first_line": self.first_line_nb,
|
|
"last_line": self.last_line_nb,
|
|
"logfile_name": self.logfile_name,
|
|
}
|
|
|
|
@staticmethod
|
|
def parse_error(logs: "peekable[tuple[int, str]]") -> Optional["RubyError"]:
|
|
line_nb, line = logs.peek((None, None))
|
|
if line is None or line_nb is None:
|
|
return None
|
|
match = RubyError.ruby_error_re.match(line)
|
|
if match is None:
|
|
return None
|
|
if match.group(4) is None:
|
|
msg = ""
|
|
else:
|
|
msg = match.group(4)
|
|
error = RubyError(
|
|
line_nb, line_nb, match.group(1), match.group(2), match.group(3), msg, []
|
|
)
|
|
next(logs)
|
|
while True:
|
|
line_nb, line = logs.peek((None, None))
|
|
if line is None or line_nb is None:
|
|
return error
|
|
match = RubyError.ruby_stack_re.match(line)
|
|
if match is None:
|
|
return error
|
|
line_dsc = {
|
|
"file": match.group(1),
|
|
"line": match.group(2),
|
|
"function": match.group(3),
|
|
}
|
|
if len(error.stack) == 0 or error.stack[-1] != line_dsc:
|
|
error.stack.append(line_dsc)
|
|
error.last_line_nb = line_nb
|
|
next(logs)
|
|
|
|
|
|
class FlowdroidLog4jError(LoggedError):
|
|
error_re = re.compile(r"\[.*?\] (ERROR|FATAL) (.*?) - (.*)$")
|
|
|
|
def __init__(
|
|
self,
|
|
first_line_nb: int,
|
|
last_line_nb: int,
|
|
level: str,
|
|
origin: str,
|
|
msg: str,
|
|
logfile_name: str = "",
|
|
):
|
|
self.first_line_nb = first_line_nb
|
|
self.last_line_nb = last_line_nb
|
|
self.level = level
|
|
self.origin = origin
|
|
self.msg = msg
|
|
self.logfile_name = logfile_name
|
|
|
|
def __str__(self) -> str:
|
|
return f"{self.level} {self.origin} {self.msg}"
|
|
|
|
def get_dict(self) -> dict:
|
|
return {
|
|
"error_type": "Log4j",
|
|
"level": self.level,
|
|
"origin": self.origin,
|
|
"msg": self.msg,
|
|
"first_line": self.first_line_nb,
|
|
"last_line": self.last_line_nb,
|
|
"logfile_name": self.logfile_name,
|
|
}
|
|
|
|
@staticmethod
|
|
def parse_error(logs: peekable) -> Optional["FlowdroidLog4jError"]:
|
|
line_nb, line = logs.peek((None, None))
|
|
if line is None or line_nb is None:
|
|
return None
|
|
match = FlowdroidLog4jError.error_re.match(line)
|
|
if match is None:
|
|
return None
|
|
error = FlowdroidLog4jError(
|
|
line_nb, line_nb, match.group(1), match.group(2), match.group(3)
|
|
)
|
|
next(logs)
|
|
return error
|
|
|
|
|
|
class DroidsafeLog4jError(LoggedError):
|
|
error_re = re.compile(r"(ERROR|FATAL): (.*)")
|
|
|
|
def __init__(
|
|
self,
|
|
first_line_nb: int,
|
|
last_line_nb: int,
|
|
level: str,
|
|
msg: str,
|
|
logfile_name: str = "",
|
|
):
|
|
self.first_line_nb = first_line_nb
|
|
self.last_line_nb = last_line_nb
|
|
self.level = level
|
|
self.msg = msg
|
|
self.logfile_name = logfile_name
|
|
|
|
def __str__(self) -> str:
|
|
return f"{self.level}: {self.msg}"
|
|
|
|
def get_dict(self) -> dict:
|
|
return {
|
|
"error_type": "Log4jSimpleMsg",
|
|
"level": self.level,
|
|
"msg": self.msg,
|
|
"first_line": self.first_line_nb,
|
|
"last_line": self.last_line_nb,
|
|
"logfile_name": self.logfile_name,
|
|
}
|
|
|
|
@staticmethod
|
|
def parse_error(logs: peekable) -> Optional["DroidsafeLog4jError"]:
|
|
line_nb, line = logs.peek((None, None))
|
|
if line is None or line_nb is None:
|
|
return None
|
|
match = DroidsafeLog4jError.error_re.match(line)
|
|
if match is None:
|
|
return None
|
|
error = DroidsafeLog4jError(line_nb, line_nb, match.group(1), match.group(2))
|
|
next(logs)
|
|
return error
|
|
|
|
|
|
# def get_errors(path: Path, error_types: list[Type[LoggedError]]) -> list[LoggedError]:
|
|
def get_errors(path: Path, error_types: list) -> list:
|
|
"""List the errors found in the logs collected from the analusis of an apk.
|
|
The file containing the error traces must be located at `path`, and the list
|
|
of type of error expected must be provided in `error_types`."""
|
|
if not path.exists():
|
|
raise RuntimeError(f"Error log {path} not found")
|
|
if not error_types:
|
|
return []
|
|
errors = []
|
|
with path.open("r", errors="replace") as file:
|
|
logs = peekable(enumerate(file))
|
|
while logs.peek(None) is not None:
|
|
new_errors = []
|
|
for error_type in error_types:
|
|
error = error_type.parse_error(logs)
|
|
if error is not None:
|
|
new_errors.append(error)
|
|
if new_errors:
|
|
errors.extend(new_errors)
|
|
else:
|
|
next(logs)
|
|
for error in errors:
|
|
error.set_logfile_name(path.name)
|
|
return errors
|