Skip to content
Snippets Groups Projects
main.py 7.13 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2020-2022 Le Filament (<https://le-filament.com>)
    
    # License AGPL-3 or later (http://www.gnu.org/licenses/agpl.html).
    
    import csv
    
    from datetime import date, datetime
    
    from io import BytesIO, StringIO
    
    from odoo import http
    from odoo.http import request
    from odoo.tools.misc import xlwt
    
    from odoo.tools.safe_eval import safe_eval
    
    Rémi - Le Filament's avatar
    Rémi - Le Filament committed
    from odoo.addons.web.controllers.main import content_disposition
    
    
    class JournalDatasExport(http.Controller):
        # ------------------------------------------------------
        # Routes
        # ------------------------------------------------------
    
        @http.route("/web/export_journal/", type="http", auth="user")
    
            self, export_format, export, export_type, date_start, date_end, **kwargs
        ):
    
            """
            Sélectionne les account.move.line correspondants aux journaux
            et à la plage de date définis
            Crée le lignes de valeurs
    
            :param export_format: xls/csv
    
            :param export: id export.journal.type
    
            :param export_type: empty (not already exported lines) / all
    
            :param date_start: date
            :param date_end: date
            :return: file
            """
    
            export_id = request.env["export.journal.type"].browse(int(export))
    
            # Set domain
            domain = self._get_domain(export_id, export_type, date_start, date_end)
    
            order_by = export_id.order_by if export_id.order_by else "id"
    
    
            # Retrieve lines & set datas
            lines_to_export = []
            if not export_id.group_ids:
    
                aml_ids = request.env["account.move.line"].search(domain, order=order_by)
    
                lines_to_export = self._get_lines(aml_ids, export_id)
            else:
                group_fields = export_id.group_ids.mapped("field_id.name")
                group_agg = export_id.group_ids.mapped(
    
                    lambda f: f"{f.field_id.name}:{f.field_agg}"
                    if f.field_agg
                    else f.field_id.name
                )
    
                grouped_lines = request.env["account.move.line"].read_group(
    
                    domain,
                    fields=group_fields,
                    groupby=group_agg,
                    orderby=order_by,
                    lazy=False,
                )
    
                for group in grouped_lines:
                    if export_id.is_group_header:
                        header = [self._get_group_name(export_id, group)]
                        lines_to_export.append(header)
                    aml_ids = request.env["account.move.line"].search(group.get("__domain"))
                    lines_to_export += self._get_lines(aml_ids, export_id)
    
            filename_ = (
                export_id.company_id.name.title().replace(" ", "")
                + date_start.replace("-", "")
                + "_"
                + date_end.replace("-", "")
            )
    
            if export_format == "csv":
                return self.export_csv(export_id, lines_to_export, filename_)
            elif export_format == "xls":
                return self.export_xls(export_id, lines_to_export, filename_)
    
        # ------------------------------------------------------
        # Common function
        # ------------------------------------------------------
        def _get_domain(self, export_id, export_type, date_start, date_end):
    
                ("date", ">=", date_start),
                ("date", "<=", date_end),
                ("journal_id", "in", export_id.journal_ids.ids),
                ("company_id", "=", export_id.company_id.id),
    
                ("move_id.state", "=", "posted"),
    
                ("display_type", "not in", ["line_section", "line_note"]),
    
            if export_type == "empty":
                domain += [("date_export", "=", False)]
    
            if export_id.export_domain:
                domain += safe_eval(export_id.export_domain)
            return domain
    
        def _get_lines(self, aml_ids, export_id):
    
            lines_to_export = []
    
            sum_row = [None for f in range(0, len(export_id.fields_ids))]
            for line in aml_ids:
    
                row = []
    
                for index, field in enumerate(export_id.fields_ids):
    
                        value = safe_eval(field["field_name"], {"line": line}, mode="eval")
    
                    else:
                        value = safe_eval(field["field_name"])
    
                    row.append(value)
    
                    if field.is_sum:
                        sum_row[index] = sum_row[index] + value if sum_row[index] else value
    
                lines_to_export.append(row)
    
                line.write({"date_export": datetime.now()})
    
            if any(sum_row):
                lines_to_export.append(sum_row)
            return lines_to_export
    
        def _get_group_name(self, export_id, group):
            header_list = []
            for f in export_id.group_ids:
                if f.field_agg:
                    header_list.append(group[f"{f.field_id.name}:{f.field_agg}"])
                elif f.field_id.ttype == "many2one":
                    header_list.append(group[f.field_id.name][1]._value)
                else:
                    header_list.append(group[f.field_id.name])
            return " - ".join(header_list)
    
    
        def export_csv(self, export_id, lines_to_export, filename_):
            fp = StringIO()
    
            export_file = csv.writer(
    
                fp, delimiter=export_id.delimiter, quoting=csv.QUOTE_ALL
            )
    
            # Add header line
            if export_id.is_header:
                row = []
    
                for head in export_id.fields_ids.mapped("name"):
    
                    row.append(head)
                export_file.writerow(row)
    
            for line in lines_to_export:
                # Format date value
                line_values = [
    
                    value
                    if not isinstance(value, date)
                    else value.strftime(export_id.csv_datestyle)
                    for value in line
                ]
    
                export_file.writerow(line_values)
    
            fp.seek(0)
            data = fp.read()
            fp.close()
    
    
            filename = filename_ + ".csv"
    
            csvhttpheaders = [
    
                ("Content-Type", "text/csv;charset=utf8"),
                ("Content-Disposition", content_disposition(filename)),
    
            ]
            return request.make_response(data, headers=csvhttpheaders)
    
        def export_xls(self, export_id, lines_to_export, filename_):
            workbook = xlwt.Workbook()
            worksheet = workbook.add_sheet(filename_)
    
            base_style = xlwt.easyxf("align: wrap yes")
    
            date_style = xlwt.XFStyle()
            date_style.num_format_str = export_id.xls_datestyle
    
            if export_id.is_header:
    
                for i, fieldname in enumerate(export_id.fields_ids.mapped("name")):
    
                    worksheet.write(0, i, fieldname)
                    worksheet.col(i).width = 4000  # around 220 pixels
    
            for row_index, line in enumerate(lines_to_export):
                for cell_index, value in enumerate(line):
                    cell_style = base_style
                    if isinstance(value, date):
                        cell_style = date_style
                    worksheet.write(row_index + 1, cell_index, value, cell_style)
            fp = BytesIO()
            workbook.save(fp)
            fp.seek(0)
            data = fp.read()
            fp.close()
    
            filename = filename_ + ".xls"
    
            xlshttpheaders = [
    
                ("Content-Type", "text/csv;charset=utf8"),
                ("Content-Disposition", content_disposition(filename)),
    
            ]
            return request.make_response(data, headers=xlshttpheaders)