rasta/rasta_exp/error_collector.py
Jean-Marie Mineau cd1e91bb99
first commit
2023-11-16 14:30:24 +01:00

507 lines
17 KiB
Python

import re
from pathlib import Path
from more_itertools import peekable
from typing import Any, Optional, Type
class LoggedError:
"""A class representing an error parsed from a log file."""
def __str__(self):
pass
def get_dict(self) -> dict: # type: ignore
pass
def set_logfile_name(self, file: str):
"""Register the name of the file containing the error log"""
self.logfile_name = file
@staticmethod
def parse_error(logs: "peekable[tuple[int, str]]") -> Optional["LoggedError"]: # type: ignore
pass
class JavaError(LoggedError):
java_error_re = re.compile(
r"(?:Exception in thread \".+?\"|Caused by:) ([a-zA-Z0-9.$]+?)(?:: *(.*))?(?: *~\[.*\])?$"
)
java_stack_re = re.compile(r"[ \t]*at (.*)\((.*?)(?::(\d*))?\)(?: *~\[.*\])?$")
# TODO: link "Caused by:" to exception?
def __init__(
self,
first_line_nb: int,
last_line_nb: int,
error: str,
msg: str,
stack: list,
logfile_name: str = "",
):
self.first_line_nb = first_line_nb
self.last_line_nb = last_line_nb
self.error = error
self.msg = msg
self.stack = stack
self.logfile_name = logfile_name
def __str__(self):
stack = "\n at ".join(
map(lambda e: f"{e['method']}({e['class']}:{e['line']})", self.stack)
)
return f"{self.error}: {self.msg}{stack}\n"
def get_dict(self) -> dict:
return {
"error_type": "Java",
"error": self.error,
"msg": self.msg,
"stack": self.stack,
"first_line": self.first_line_nb,
"last_line": self.last_line_nb,
"logfile_name": self.logfile_name,
}
@staticmethod
def parse_error(logs: "peekable[tuple[int, str]]") -> Optional["JavaError"]:
"""Return the JavaError at the begenning of the logs if there is one, else return None.
If there is a JavaError at the begenning of the logs, the iterator of the logs will
consume all the lines of the error, else the iterator will not be modified."""
line_nb, line = logs.peek((None, None))
if line is None or line_nb is None:
return None
match = JavaError.java_error_re.match(line)
if match is None:
return None
if match.group(2) is None:
msg = ""
else:
msg = match.group(2)
error = JavaError(line_nb, line_nb, match.group(1), msg, [])
next(logs)
while True:
line_nb, line = logs.peek((None, None))
if line is None or line_nb is None:
return error
match = JavaError.java_stack_re.match(line)
if match is None:
return error
line_dsc = {
"method": match.group(1),
"class": match.group(2),
"line": match.group(3),
}
if len(error.stack) == 0 or (
error.stack[-1]["method"] != line_dsc["method"]
and error.stack[-1]["class"] != line_dsc["method"]
):
error.stack.append(line_dsc)
error.last_line_nb = line_nb
next(logs)
class NoPrefixJavaError(JavaError):
java_error_re = re.compile(r"([a-zA-Z0-9.$]+?)(?:: *(.*))?(?: *~\[.*\])?$")
java_stack_re = re.compile(r"[ \t]*at (.*)\((.*?)(?::(\d*))?\)(?: *~\[.*\])?$")
@staticmethod
def parse_error(
logs: "peekable[tuple[int, str]]",
) -> Optional["NoPrefixJavaError"]:
line_nb, line = logs.peek((None, None))
if line is None or line_nb is None:
return None
match = NoPrefixJavaError.java_error_re.match(line)
if match is None:
return None
if match.group(2) is None:
msg = ""
else:
msg = match.group(2)
error = NoPrefixJavaError(line_nb, line_nb, match.group(1), msg, [])
# Check that the next line match java_stack_re to reduce false possitives
try:
line_nb, line = logs[1]
except IndexError:
return None
if NoPrefixJavaError.java_stack_re.match(line) is None:
return None
next(logs)
while True:
line_nb, line = logs.peek((None, None))
if line is None or line_nb is None:
return error
match = NoPrefixJavaError.java_stack_re.match(line)
if match is None:
return error
line_dsc = {
"method": match.group(1),
"class": match.group(2),
"line": match.group(3),
}
if len(error.stack) == 0 or (
error.stack[-1]["method"] != line_dsc["method"]
and error.stack[-1]["class"] != line_dsc["method"]
):
error.stack.append(line_dsc)
error.last_line_nb = line_nb
next(logs)
return None
class PythonError(LoggedError):
python_error_traceback_re = re.compile(r"Traceback \(most recent call last\):$")
python_error_file_re = re.compile(r" File \"(.+?)\", line (\d+?), in (.*)$")
python_error_code_re = re.compile(r" (.*)$")
python_error_msg_re = re.compile(r"(.*?)(?:: (.*))?$")
def __init__(
self,
first_line_nb: int,
last_line_nb: int,
error: str,
msg: str,
stack: list,
logfile_name: str = "",
):
self.first_line_nb = first_line_nb
self.last_line_nb = last_line_nb
self.error = error
self.msg = msg
self.stack = stack
self.logfile_name = logfile_name
def __str__(self):
stack = "\n".join(
map(
lambda d: (
f" File \"{d['file']}\", line {d['line']}, in {d['module']}\n"
f" {d['code']}"
),
self.stack,
)
)
return (
"Traceback (most recent call last):\n"
f"{stack}\n"
f"{self.error}: {self.msg}\n"
)
def get_dict(self) -> dict:
return {
"error_type": "Python",
"error": self.error,
"msg": self.msg,
"stack": self.stack,
"first_line": self.first_line_nb,
"last_line": self.last_line_nb,
"logfile_name": self.logfile_name,
}
# TODO: why peekable[str] crashes?
@staticmethod
def parse_error(logs: "peekable[tuple[int, str]]") -> Optional["PythonError"]:
"""Return the PythonError at the begenning of the logs if there is one, else return None.
If there is a PythonError at the begenning of the logs, the iterator of the logs will
consume all the lines of the error, else the iterator will not be modified."""
line_nb, line = logs.peek((None, None))
if line is None or line_nb is None:
return None
match = PythonError.python_error_traceback_re.match(line)
if match is None:
return None
error = PythonError(line_nb, line_nb, "", "", [])
next(logs)
while True:
line_nb, line = logs.peek((None, None))
if line is None or line_nb is None:
break
match = PythonError.python_error_file_re.match(line)
if match is None:
break
new_stack_line = {
"file": match.group(1),
"line": match.group(2),
"module": match.group(3),
"code": "",
}
if len(error.stack) == 0 or error.stack[-1] != new_stack_line:
error.stack.append(new_stack_line)
error.last_line_nb = line_nb
next(logs)
line_nb, line = logs.peek((None, None))
if line is None or line_nb is None:
break
match = PythonError.python_error_code_re.match(line)
if match is None:
break
new_stack_line["code"] = match.group(1)
error.last_line_nb = line_nb
next(logs)
line_nb, line = logs.peek((None, None))
if line is None:
raise RuntimeError("Found EOF before en of Python trackback")
match = PythonError.python_error_msg_re.match(line)
if match is None:
raise RuntimeError("Last line of python traceback not found")
error.error = match.group(1)
error.msg = str(match.group(2))
return error
class Python311Error(PythonError):
python_code_marker_re = re.compile(r"^ *~*\^+~* *$")
@staticmethod
def parse_error(logs: "peekable[tuple[int, str]]") -> Optional["Python311Error"]:
"""Return the PythonError at the begenning of the logs if there is one, else return None.
If there is a PythonError at the begenning of the logs, the iterator of the logs will
consume all the lines of the error, else the iterator will not be modified."""
line_nb, line = logs.peek((None, None))
if line is None or line_nb is None:
return None
match = PythonError.python_error_traceback_re.match(line)
if match is None:
return None
error = PythonError(line_nb, line_nb, "", "", [])
next(logs)
while True:
line_nb, line = logs.peek((None, None))
if line is None or line_nb is None:
break
match = PythonError.python_error_file_re.match(line)
if match is None:
break
new_stack_line = {
"file": match.group(1),
"line": match.group(2),
"module": match.group(3),
"code": "",
}
if len(error.stack) == 0 or error.stack[-1] != new_stack_line:
error.stack.append(new_stack_line)
error.last_line_nb = line_nb
next(logs)
line_nb, line = logs.peek((None, None))
if line is None or line_nb is None:
break
match = PythonError.python_error_code_re.match(line)
if match is None:
break
new_stack_line["code"] = match.group(1)
error.last_line_nb = line_nb
next(logs)
line_nb, line = logs.peek((None, None))
if line is None or line_nb is None:
break
match = Python311Error.python_code_marker_re.match(line)
if match is not None:
next(logs)
line_nb, line = logs.peek((None, None))
if line is None:
raise RuntimeError("Found EOF before en of Python trackback")
match = PythonError.python_error_msg_re.match(line)
if match is None:
raise RuntimeError("Last line of python traceback not found")
error.error = match.group(1)
error.msg = str(match.group(2))
return error
class RubyError(LoggedError):
ruby_error_re = re.compile(r"(.*?\.rb):(\d*):in `(.*?)'(?:: (.*))?$")
ruby_stack_re = re.compile(r"[ \t]*from (.*?\.rb):(\d*):in `(.*?)'")
def __init__(
self,
first_line_nb: int,
last_line_nb: int,
file: str,
line: str,
function: str,
msg: str,
stack: list,
logfile_name: str = "",
):
self.first_line_nb = first_line_nb
self.last_line_nb = last_line_nb
self.file = file
self.line = line
self.function = function
self.msg = msg
self.stack = stack
self.logfile_name = logfile_name
def __str__(self):
stack = "\n at ".join(
map(
lambda e: f"from {e['file']}:({e['line']}:in `{e['function']}')",
self.stack,
)
)
return f"{self.file}:{self.line}:in `{self.function}':{self.msg}{stack}\n"
def get_dict(self) -> dict:
return {
"error_type": "Ruby",
"file": self.file,
"line": self.line,
"function": self.function,
"msg": self.msg,
"stack": self.stack,
"first_line": self.first_line_nb,
"last_line": self.last_line_nb,
"logfile_name": self.logfile_name,
}
@staticmethod
def parse_error(logs: "peekable[tuple[int, str]]") -> Optional["RubyError"]:
line_nb, line = logs.peek((None, None))
if line is None or line_nb is None:
return None
match = RubyError.ruby_error_re.match(line)
if match is None:
return None
if match.group(4) is None:
msg = ""
else:
msg = match.group(4)
error = RubyError(
line_nb, line_nb, match.group(1), match.group(2), match.group(3), msg, []
)
next(logs)
while True:
line_nb, line = logs.peek((None, None))
if line is None or line_nb is None:
return error
match = RubyError.ruby_stack_re.match(line)
if match is None:
return error
line_dsc = {
"file": match.group(1),
"line": match.group(2),
"function": match.group(3),
}
if len(error.stack) == 0 or error.stack[-1] != line_dsc:
error.stack.append(line_dsc)
error.last_line_nb = line_nb
next(logs)
class FlowdroidLog4jError(LoggedError):
error_re = re.compile(r"\[.*?\] (ERROR|FATAL) (.*?) - (.*)$")
def __init__(
self,
first_line_nb: int,
last_line_nb: int,
level: str,
origin: str,
msg: str,
logfile_name: str = "",
):
self.first_line_nb = first_line_nb
self.last_line_nb = last_line_nb
self.level = level
self.origin = origin
self.msg = msg
self.logfile_name = logfile_name
def __str__(self) -> str:
return f"{self.level} {self.origin} {self.msg}"
def get_dict(self) -> dict:
return {
"error_type": "Log4j",
"level": self.level,
"origin": self.origin,
"msg": self.msg,
"first_line": self.first_line_nb,
"last_line": self.last_line_nb,
"logfile_name": self.logfile_name,
}
@staticmethod
def parse_error(logs: peekable) -> Optional["FlowdroidLog4jError"]:
line_nb, line = logs.peek((None, None))
if line is None or line_nb is None:
return None
match = FlowdroidLog4jError.error_re.match(line)
if match is None:
return None
error = FlowdroidLog4jError(
line_nb, line_nb, match.group(1), match.group(2), match.group(3)
)
next(logs)
return error
class DroidsafeLog4jError(LoggedError):
error_re = re.compile(r"(ERROR|FATAL): (.*)")
def __init__(
self,
first_line_nb: int,
last_line_nb: int,
level: str,
msg: str,
logfile_name: str = "",
):
self.first_line_nb = first_line_nb
self.last_line_nb = last_line_nb
self.level = level
self.msg = msg
self.logfile_name = logfile_name
def __str__(self) -> str:
return f"{self.level}: {self.msg}"
def get_dict(self) -> dict:
return {
"error_type": "Log4jSimpleMsg",
"level": self.level,
"msg": self.msg,
"first_line": self.first_line_nb,
"last_line": self.last_line_nb,
"logfile_name": self.logfile_name,
}
@staticmethod
def parse_error(logs: peekable) -> Optional["DroidsafeLog4jError"]:
line_nb, line = logs.peek((None, None))
if line is None or line_nb is None:
return None
match = DroidsafeLog4jError.error_re.match(line)
if match is None:
return None
error = DroidsafeLog4jError(line_nb, line_nb, match.group(1), match.group(2))
next(logs)
return error
# def get_errors(path: Path, error_types: list[Type[LoggedError]]) -> list[LoggedError]:
def get_errors(path: Path, error_types: list) -> list:
"""List the errors found in the logs collected from the analusis of an apk.
The file containing the error traces must be located at `path`, and the list
of type of error expected must be provided in `error_types`."""
if not path.exists():
raise RuntimeError(f"Error log {path} not found")
if not error_types:
return []
errors = []
with path.open("r", errors="replace") as file:
logs = peekable(enumerate(file))
while logs.peek(None) is not None:
new_errors = []
for error_type in error_types:
error = error_type.parse_error(logs)
if error is not None:
new_errors.append(error)
if new_errors:
errors.extend(new_errors)
else:
next(logs)
for error in errors:
error.set_logfile_name(path.name)
return errors