Newer
Older
# Copyright 2020-2022 Le Filament (<https://le-filament.com>)
# License AGPL-3 or later (http://www.gnu.org/licenses/agpl.html).
import csv
from datetime import date, datetime
from io import BytesIO, StringIO
from odoo import http
from odoo.http import request
from odoo.tools.misc import xlwt
from odoo.tools.safe_eval import safe_eval
from odoo.addons.web.controllers.main import content_disposition
class JournalDatasExport(http.Controller):
# ------------------------------------------------------
# Routes
# ------------------------------------------------------
@http.route("/web/export_journal/", type="http", auth="user")
def export_journal(
self, export_format, export, export_type, date_start, date_end, **kwargs
):
"""
Sélectionne les account.move.line correspondants aux journaux
et à la plage de date définis
Crée le lignes de valeurs
:param export_format: xls/csv
:param export_type: empty (not already exported lines) / all
:param date_start: date
:param date_end: date
:return: file
"""
export_id = request.env["export.journal.type"].browse(int(export))
# Set domain
domain = self._get_domain(export_id, export_type, date_start, date_end)
order_by = export_id.order_by if export_id.order_by else "id"
# Retrieve lines & set datas
lines_to_export = []
if not export_id.group_ids:
aml_ids = request.env["account.move.line"].search(domain, order=order_by)
lines_to_export = self._get_lines(aml_ids, export_id)
else:
group_fields = export_id.group_ids.mapped("field_id.name")
group_agg = export_id.group_ids.mapped(
lambda f: f"{f.field_id.name}:{f.field_agg}"
if f.field_agg
else f.field_id.name
)
grouped_lines = request.env["account.move.line"].read_group(
domain,
fields=group_fields,
groupby=group_agg,
orderby=order_by,
lazy=False,
)
for group in grouped_lines:
if export_id.is_group_header:
header = [self._get_group_name(export_id, group)]
lines_to_export.append(header)
aml_ids = request.env["account.move.line"].search(group.get("__domain"))
lines_to_export += self._get_lines(aml_ids, export_id)
filename_ = (
export_id.company_id.name.title().replace(" ", "")
+ date_start.replace("-", "")
+ "_"
+ date_end.replace("-", "")
)
if export_format == "csv":
return self.export_csv(export_id, lines_to_export, filename_)
elif export_format == "xls":
return self.export_xls(export_id, lines_to_export, filename_)
# ------------------------------------------------------
# Common function
# ------------------------------------------------------
def _get_domain(self, export_id, export_type, date_start, date_end):
("date", ">=", date_start),
("date", "<=", date_end),
("journal_id", "in", export_id.journal_ids.ids),
("company_id", "=", export_id.company_id.id),
("display_type", "not in", ["line_section", "line_note"]),
if export_type == "empty":
domain += [("date_export", "=", False)]
if export_id.export_domain:
domain += safe_eval(export_id.export_domain)
return domain
def _get_lines(self, aml_ids, export_id):
sum_row = [None for f in range(0, len(export_id.fields_ids))]
for line in aml_ids:
for index, field in enumerate(export_id.fields_ids):
value = safe_eval(field["field_name"], {"line": line}, mode="eval")
else:
value = safe_eval(field["field_name"])
if field.is_sum:
sum_row[index] = sum_row[index] + value if sum_row[index] else value
line.write({"date_export": datetime.now()})
if any(sum_row):
lines_to_export.append(sum_row)
return lines_to_export
def _get_group_name(self, export_id, group):
header_list = []
for f in export_id.group_ids:
if f.field_agg:
header_list.append(group[f"{f.field_id.name}:{f.field_agg}"])
elif f.field_id.ttype == "many2one":
header_list.append(group[f.field_id.name][1]._value)
else:
header_list.append(group[f.field_id.name])
return " - ".join(header_list)
def export_csv(self, export_id, lines_to_export, filename_):
fp = StringIO()
fp, delimiter=export_id.delimiter, quoting=csv.QUOTE_ALL
)
# Add header line
if export_id.is_header:
row = []
for head in export_id.fields_ids.mapped("name"):
row.append(head)
export_file.writerow(row)
for line in lines_to_export:
# Format date value
line_values = [
value
if not isinstance(value, date)
else value.strftime(export_id.csv_datestyle)
for value in line
]
export_file.writerow(line_values)
fp.seek(0)
data = fp.read()
fp.close()
filename = filename_ + ".csv"
("Content-Type", "text/csv;charset=utf8"),
("Content-Disposition", content_disposition(filename)),
]
return request.make_response(data, headers=csvhttpheaders)
def export_xls(self, export_id, lines_to_export, filename_):
workbook = xlwt.Workbook()
worksheet = workbook.add_sheet(filename_)
base_style = xlwt.easyxf("align: wrap yes")
date_style = xlwt.XFStyle()
date_style.num_format_str = export_id.xls_datestyle
if export_id.is_header:
for i, fieldname in enumerate(export_id.fields_ids.mapped("name")):
worksheet.write(0, i, fieldname)
worksheet.col(i).width = 4000 # around 220 pixels
for row_index, line in enumerate(lines_to_export):
for cell_index, value in enumerate(line):
cell_style = base_style
if isinstance(value, date):
cell_style = date_style
worksheet.write(row_index + 1, cell_index, value, cell_style)
fp = BytesIO()
workbook.save(fp)
fp.seek(0)
data = fp.read()
fp.close()
filename = filename_ + ".xls"
("Content-Type", "text/csv;charset=utf8"),
("Content-Disposition", content_disposition(filename)),
]
return request.make_response(data, headers=xlshttpheaders)