# Copyright 2020-2022 Le Filament (<https://le-filament.com>) # License AGPL-3 or later (http://www.gnu.org/licenses/agpl.html). import csv from datetime import date, datetime from io import BytesIO, StringIO from odoo import http from odoo.http import request from odoo.tools.misc import xlwt from odoo.tools.safe_eval import safe_eval from odoo.addons.web.controllers.main import content_disposition class JournalDatasExport(http.Controller): # ------------------------------------------------------ # Routes # ------------------------------------------------------ @http.route("/web/export_journal/", type="http", auth="user") def export_journal( self, export_format, export, export_type, date_start, date_end, **kwargs ): """ Sélectionne les account.move.line correspondants aux journaux et à la plage de date définis Crée le lignes de valeurs :param export_format: xls/csv :param export: id export.journal.type :param export_type: empty (not already exported lines) / all :param date_start: date :param date_end: date :return: file """ export_id = request.env["export.journal.type"].browse(int(export)) # Set domain domain = self._get_domain(export_id, export_type, date_start, date_end) order_by = export_id.order_by if export_id.order_by else "id" # Retrieve lines & set datas lines_to_export = [] if not export_id.group_ids: aml_ids = request.env["account.move.line"].search(domain, order=order_by) lines_to_export = self._get_lines(aml_ids, export_id) else: group_fields = export_id.group_ids.mapped("field_id.name") group_agg = export_id.group_ids.mapped( lambda f: f"{f.field_id.name}:{f.field_agg}" if f.field_agg else f.field_id.name ) grouped_lines = request.env["account.move.line"].read_group( domain, fields=group_fields, groupby=group_agg, orderby=order_by, lazy=False, ) for group in grouped_lines: if export_id.is_group_header: header = [self._get_group_name(export_id, group)] lines_to_export.append(header) aml_ids = request.env["account.move.line"].search(group.get("__domain")) lines_to_export += self._get_lines(aml_ids, export_id) filename_ = ( export_id.company_id.name.title().replace(" ", "") + date_start.replace("-", "") + "_" + date_end.replace("-", "") ) if export_format == "csv": return self.export_csv(export_id, lines_to_export, filename_) elif export_format == "xls": return self.export_xls(export_id, lines_to_export, filename_) # ------------------------------------------------------ # Common function # ------------------------------------------------------ def _get_domain(self, export_id, export_type, date_start, date_end): domain = [ ("date", ">=", date_start), ("date", "<=", date_end), ("journal_id", "in", export_id.journal_ids.ids), ("company_id", "=", export_id.company_id.id), ("move_id.state", "=", "posted"), ("display_type", "not in", ["line_section", "line_note"]), ] if export_type == "empty": domain += [("date_export", "=", False)] if export_id.export_domain: domain += safe_eval(export_id.export_domain) return domain def _get_lines(self, aml_ids, export_id): lines_to_export = [] sum_row = [None for f in range(0, len(export_id.fields_ids))] for line in aml_ids: row = [] for index, field in enumerate(export_id.fields_ids): if field.is_python: value = safe_eval(field["field_name"], {"line": line}, mode="eval") else: value = safe_eval(field["field_name"]) row.append(value) if field.is_sum: sum_row[index] = sum_row[index] + value if sum_row[index] else value lines_to_export.append(row) line.write({"date_export": datetime.now()}) if any(sum_row): lines_to_export.append(sum_row) return lines_to_export def _get_group_name(self, export_id, group): header_list = [] for f in export_id.group_ids: if f.field_agg: header_list.append(group[f"{f.field_id.name}:{f.field_agg}"]) elif f.field_id.ttype == "many2one": header_list.append(group[f.field_id.name][1]._value) else: header_list.append(group[f.field_id.name]) return " - ".join(header_list) def export_csv(self, export_id, lines_to_export, filename_): fp = StringIO() export_file = csv.writer( fp, delimiter=export_id.delimiter, quoting=csv.QUOTE_ALL ) # Add header line if export_id.is_header: row = [] for head in export_id.fields_ids.mapped("name"): row.append(head) export_file.writerow(row) for line in lines_to_export: # Format date value line_values = [ value if not isinstance(value, date) else value.strftime(export_id.csv_datestyle) for value in line ] export_file.writerow(line_values) fp.seek(0) data = fp.read() fp.close() filename = filename_ + ".csv" csvhttpheaders = [ ("Content-Type", "text/csv;charset=utf8"), ("Content-Disposition", content_disposition(filename)), ] return request.make_response(data, headers=csvhttpheaders) def export_xls(self, export_id, lines_to_export, filename_): workbook = xlwt.Workbook() worksheet = workbook.add_sheet(filename_) base_style = xlwt.easyxf("align: wrap yes") date_style = xlwt.XFStyle() date_style.num_format_str = export_id.xls_datestyle if export_id.is_header: for i, fieldname in enumerate(export_id.fields_ids.mapped("name")): worksheet.write(0, i, fieldname) worksheet.col(i).width = 4000 # around 220 pixels for row_index, line in enumerate(lines_to_export): for cell_index, value in enumerate(line): cell_style = base_style if isinstance(value, date): cell_style = date_style worksheet.write(row_index + 1, cell_index, value, cell_style) fp = BytesIO() workbook.save(fp) fp.seek(0) data = fp.read() fp.close() filename = filename_ + ".xls" xlshttpheaders = [ ("Content-Type", "text/csv;charset=utf8"), ("Content-Disposition", content_disposition(filename)), ] return request.make_response(data, headers=xlshttpheaders)