24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263 | class JSONReportOutput(ReportOutput):
export_extension = "json"
is_interactive = False
def __init__(self, *args, **kwargs):
self.document: List[ReportElement] = kwargs.get("document", [])
def add_heading(self, text: str, **kwargs):
element = ReportElement(name="Heading", data=text, parameters=kwargs)
self.document.append(element)
log.debug(f"Heading {text} added.")
return self.document
def add_page_break(self):
element = ReportElement(name="Page break", data=None, parameters={})
self.document.append(element)
log.debug("Page Break added.")
return self.document
def add_hyperlink(self, text: str, url: str, **kwargs):
element = ReportElement(name="Hyperlink", data={"text": text, "url": url}, parameters=kwargs)
self.document.append(element)
log.debug(f"Hyperlink ({url})[{text}] added.")
return self.document
def add_comment_box(self, **kwargs) -> None:
"""
This method's implementation in JSONReportOutput is some kind of the response
for recommendations section needs. We need to show text that is an initial text in
DashReportOutput.
Keyword Args:
initial_value (str): value that should be shown as the paragraph text in report output
"""
initial_text: str = kwargs.get("initial_text", "")
if initial_text:
self.add_text(text=initial_text)
def add_text(self, text: str, **kwargs) -> List:
"""
Adds text to paragraphs using markdown to search for heading number, variables in line and hyperlinks.
:param text: str, text to add to document
:param kwargs:
dictionary: dict, defines values of variables in string
:return: list, list of ReportElements
"""
dictionary = kwargs.get("dictionary", {})
for line in text.split("\n"):
heading_text, heading_level = get_heading_from_line(line)
if heading_level is not None:
self.add_heading(text=heading_text, level=heading_level)
continue
new_line = replace_variables_in_line(line, dictionary)
# Search for hyperlinks
elements = []
for sub_line in split_line_on_hyperlinks(line=new_line):
# add text to current paragraph if it is not a hyperlink
if sub_line.get("type") == "text" and sub_line.get("text"):
_text = ReportElement(name="Span", data=sub_line.get("text"), parameters=kwargs)
elements.append(_text)
# add hyperlink
elif sub_line.get("type") == "link" and sub_line.get("text") and sub_line.get("url"):
_hyperlink = ReportElement(
name="Hyperlink",
data={"text": sub_line.get("text"), "url": sub_line.get("url")},
parameters=kwargs,
)
elements.append(_hyperlink)
paragraph = ReportElement(name="Paragraph", data=elements, parameters={})
self.document.append(paragraph)
# Trim Line for logging
trimmed_line = trim_text(new_line, max_length=40)
if trimmed_line:
log.debug(f'Paragraph "{trimmed_line}" added.')
return self.document
def add_toc(self, **kwargs):
element = ReportElement(name="TOC", data=None, parameters=kwargs)
self.document.append(element)
log.debug("TOC added.")
return self.document
def add_table(self, table_data: Sequence[Sequence[Any]], style: TableStyle = TableStyle.DEFAULT, **kwargs):
t_rows, t_cols = get_table_size(table_data)
if t_rows is None or t_cols is None:
return self.document
table_dict = defaultdict(list)
for row in table_data[1:]:
for name, cell in zip(table_data[0], row):
# We drop all the cell formatting
# and just keep the text value of the cell.
# This is because the JSONReportOutput is not interactive and does not support formatting.
# We also need to check if the cell is a HeaderCell or TableCell
# and get the text value accordingly.
if isinstance(cell, HeaderCell):
value = "\n".join(cell.text) if isinstance(cell.text, list) else cell.text
# Use float_val if it is a number, otherwise use str(cell)
elif isinstance(cell, TableCell):
value = cell.float_val if cell.float_val else str(cell)
else:
value = str(cell)
table_dict[str(name)].append(value)
element = ReportElement(name="Table", data=dict(table_dict), parameters=kwargs)
self.document.append(element)
log.debug(f"Table ({t_rows}, {t_cols}) added.")
return self.document
def add_table_legend(self, legend_cells: Sequence, **kwargs):
"""
Not Implemented
"""
pass
def add_figures(self, figures: Sequence[Figure], widths: Optional[Sequence[float]] = None, **kwargs) -> None:
element = ReportElement(name="Figures", data=figures, parameters=kwargs)
self.document.append(element)
log.debug("Data plot added.")
return None
def export(self, filepath: Union[str, Path], **kwargs) -> None:
"""
Exports a report document to a file path provided in the `filepath` argument.
Args:
filepath: Path of the exported document file.
Keyword Args:
encoding (str): Data encoding. Default: `utf-8`
Note:
Even if you provide an extension in the filepath it will be overwritten to `json`.
"""
filepath = Path(filepath).with_suffix(f".{self.export_extension}")
report_bytes = self.to_bytes(**kwargs)
try:
filepath.write_bytes(report_bytes)
log.info(f"Report saved to file: {filepath}")
except PermissionError:
new_filepath = filepath.with_stem(f"{filepath.stem}_1")
self.export(filepath=new_filepath, **kwargs)
def to_bytes(self, **kwargs) -> bytes:
"""
Converts content of the report document to bytes.
Keyword Args:
encoding (str): Data encoding. Default: `utf-8`
Returns:
Bytes that contains the binary content of the report document.
"""
encoding = kwargs.get("encoding", "utf-8")
# data = json.dumps(self.document, cls=ReportOutputJSONEncoder, **kwargs)
# The line `data = json.dumps(ReportOutputJSONEncoder().default(self.document))` is
# commented out in the code snippet provided. If uncommented, it would call the `ReportElement.to_dict()`
# for each element in the document list, which is not necessary since the `default()` method of the encoder
data = json.dumps(self.document, cls=ReportOutputJSONEncoder, **kwargs)
report_bytes = bytes(data, encoding=encoding)
return report_bytes
def add_pictures(
self, pictures: Sequence[Union[Path, str, bytes]], widths: Sequence[float] = None, **kwargs
) -> None:
pictures_data = []
for pic in pictures:
b64_picture = self._pic_to_b64(picture=pic)
if b64_picture is None:
continue
pictures_data.append(b64_picture)
element = ReportElement(name="Pictures", data=[str(p) for p in pictures_data], parameters=kwargs)
self.document.append(element)
log.debug("Data plot added.")
return None
def reset(self) -> None:
"""Resets the document to the initial state"""
self.document = []
@contextmanager
def column_section(self, no_of_columns=2) -> Generator[Callable[[], None], None, None]:
"""
Creates a context in which the Report Output works in column layout.
It is not supported yet for the JSONReportOutput
"""
_column_id = 1
_no_effect = "IT HAS NO EFFECT"
log.debug(f"Working in column: {_column_id} / {no_of_columns} {_no_effect}")
def move_to_next_column():
"""
Function that moves the context to the next column.
"""
nonlocal _column_id
# Check if we still have columns available to jump to
if _column_id >= no_of_columns:
log.warning(f"There is no next column to move to. ({_column_id} / {no_of_columns})")
return None
_column_id += 1
log.debug(f"Working in column: {_column_id} / {no_of_columns} {_no_effect}")
try:
yield move_to_next_column
finally:
# Nothing to clean up here
pass
@staticmethod
def read_from_file(path: str):
with open(path, "r") as f:
document = json.load(f, cls=ReportOutputJSONDecoder)
return JSONReportOutput(document=document)
@staticmethod
def _pic_to_b64(picture: Union[bytes, str, Path]) -> Optional[str]:
"""
Converts picture input to base64 with UTF-8 encoding
Args:
picture: Sequence of bytes to be encoded, or picture path
Returns:
Optional Encoded B64 string
"""
if isinstance(picture, bytes):
pic_bytes = picture
elif Path(picture).exists():
pic_bytes = picture.read_bytes()
else:
return None
return base64.b64encode(pic_bytes).decode("utf-8")
|