# Copyright 2023 Le Filament (<http://www.le-filament.com>) # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). from datetime import datetime from dateutil.relativedelta import relativedelta from odoo import _, fields, models from odoo.exceptions import ValidationError from odoo.tools import date_utils DEFAULT_MONTH_RANGE = 3 class AccOperation(models.Model): _inherit = "acc.operation" # ------------------------------------------------------ # Actions # ------------------------------------------------------ def action_view_courbes(self): """ Action qui ouvre le portail sur l'opération :return: Vue Qweb """ self.ensure_one() if self.id: return { "type": "ir.actions.act_url", "url": "/operation/%s" % (self.id), "target": "new", } # ------------------------------------------------------ # Business methods # ------------------------------------------------------ def get_power_tot(self): """ Fonction retournant la consommation totale, l'autoconsommation totale et la production totale du dernier mois. :return: la somme de la conso totale de tous les consommateurs, la somme de l autoconso totale de tous les consommateurs, la sommme de la prod totale de tous les producteurs la date de début de mois """ # Get last date slot recorded last_record = self.get_the_last_day() date_start, date_end = self.get_last_day("month", last_record) query = """ SELECT date_trunc('month', A.date_slot) AS date_slot, (SUM( (CASE WHEN comp_data_type = 'cons' THEN A.power ELSE 0 END) )/2) / 1000 as conso_tot, (SUM( (CASE WHEN comp_data_type = 'prod' THEN A.power ELSE 0 END) )/2) / 1000 as prod_tot, (SUM( (CASE WHEN comp_data_type = 'autocons' THEN A.power ELSE 0 END) )/2) / 1000 as autoconso_tot FROM acc_enedis_cdc A JOIN acc_operation E ON E.id = A.acc_operation_id WHERE A.acc_operation_id IS NOT NULL AND A.acc_operation_id = %s AND A.date_slot >= %s AND A.date_slot <= %s GROUP BY date_trunc('month', A.date_slot); """ query_params = ( self.id, date_start, date_end, ) self.env.cr.execute(query, query_params) raw_data = self.env.cr.fetchall() return raw_data[0][1], raw_data[0][2], raw_data[0][3], date_start def get_the_last_day(self): """ Fonction retournant la date du dernier enregistrement """ # Get last date slot recorded last_record = ( self.env["acc.enedis.cdc"] .sudo() .search( [ ("acc_operation_id", "in", self.ids), ], limit=1, order="date_slot DESC", ) ) if not last_record: raise ValidationError(_("L'opération ne possède pas de données")) return last_record def get_last_day(self, scale, last_record): """ Fonction retournant une date de début et une date de fin. Ces dates sont calculées en fonction de l'échelle choisie et du dernier élément enregistré - day: la date de début est égale à la dernière date avec un enreisgtrement pour l'opération donnée la date de fin ets la date de début + 1 - week: la date de début est égale à la dernière date avec un enreisgtrement pour l'opération donnée moins 7 jours la date de fin est égale à la dernière date avec un enreisgtrement pour l'opération donnée - month: la date de début est égale à la dernière date avec un enreisgtrement pour l'opération donnée moins 1 mois la date de fin est égale à la dernière date avec un enreisgtrement pour l'opération donnée - semestre: la date de début est égale à la dernière date avec un enreisgtrement pour l'opération donnée moins 6 mois la date de fin est égale à la dernière date avec un enreisgtrement pour l'opération donnée - year: la date de début est égale à la dernière date avec un enreisgtrement pour l'opération donnée moins 1 an la date de fin est égale à la dernière date avec un enreisgtrement pour l'opération donnée :param scale: type d'affichage des graphes (day/week/month/semestre/year) last_record: Dernier enregistrement dans la base :return: une date de début et une date de fin """ # Convert end datetime to timezone last_day_start = last_record.date_slot.replace(hour=0, minute=0, second=0) last_day_end = last_day_start + relativedelta(days=1) # Get end time slot for previous month with timezone # Manage if date is the last day of the month if fields.Date.to_date(last_day_start) == fields.Date.to_date( date_utils.end_of(last_record.date_slot, "month") ): end_month = last_day_end.replace(day=1) else: end_month = last_day_start.replace(day=1) if scale == "semestre": date_end = end_month date_start = end_month - relativedelta(months=6) elif scale == "year": date_end = end_month date_start = end_month.replace(month=1, day=1) elif scale == "week": date_start = last_day_end - relativedelta(days=7) date_end = last_day_end elif scale == "day": date_start = last_day_start date_end = last_day_end # month by default else: date_start = end_month - relativedelta(months=1) date_end = end_month return date_start, date_end def get_step_from_date(self, date_start, date_end, scale=None): """ Fonction retournant le pas des courbes en fonction de 2 dates. :return: step: hour/day/month/year """ # Calculate delta between 2 dates delta = (date_end - date_start).days if delta <= 1: step_display_courbe = "hour" step = "hour" elif delta < 32: step_display_courbe = "day" step = "hour" scale = "week" elif delta > 365: step = "year" step_display_courbe = "year" else: step = "month" step_display_courbe = "month" return scale, step, step_display_courbe def get_cdc_by_query_cons(self, slot_type, date_start, date_end, prm_ids=None): """ Fonction permettant de récupérer les données pour la construction des chart pour une ou des opérations données pour les consommateurs :param slot_type: type de slot pour la query ('month' ou 'hour' ou 'year') date_start: date début date_end: date de fin @returns: un dictionnaire de données (labels et data pour les charts à afficher) """ label = [] data_autocons = [] data_allocons = [] data_cons = [] data_prod = [] data_surplus = [] query = """ SELECT date_trunc(%s, A.date_slot) AS date_slot, (SUM( (CASE WHEN comp_data_type = 'autocons' THEN A.power ELSE 0 END) )/2) / 1000 as autocons, (SUM( (CASE WHEN comp_data_type = 'prod' THEN A.power ELSE 0 END) )/2) / 1000 as prod, ((SUM( (CASE WHEN comp_data_type = 'cons' THEN A.power ELSE 0 END)) - SUM(CASE WHEN comp_data_type = 'autocons' THEN A.power ELSE 0 END) ) / 2) / 1000 as allocons, (SUM( (CASE WHEN comp_data_type = 'cons' THEN A.power ELSE 0 END) )/2) / 1000 as cons FROM acc_enedis_cdc A JOIN acc_operation E ON E.id = A.acc_operation_id WHERE A.acc_operation_id IS NOT NULL AND A.acc_operation_id IN %s AND ( A.acc_counter_id IN %s OR A.comp_data_type = 'prod' ) AND A.date_slot >= %s AND A.date_slot <= %s GROUP BY date_trunc(%s, A.date_slot) ORDER BY date_slot ASC; """ query_params = ( slot_type, tuple(self.ids), tuple(prm_ids.ids), date_start, date_end, slot_type, ) self.env.cr.execute(query, query_params) raw_data = self.env.cr.fetchall() for row in raw_data: data_autocons.append({"x": row[0], "y": round(row[1], 2)}) data_prod.append({"x": row[0], "y": round(row[2], 2)}) data_allocons.append({"x": row[0], "y": round(row[3], 2)}) data_cons.append({"x": row[0], "y": round(row[4], 2)}) label.append(row[0]) cdc_jour = { "autocons": data_autocons, "prod": data_prod, "allocons": data_allocons, "label": label, "cons": data_cons, "surplus": data_surplus, } return cdc_jour def get_cdc_by_query_histo_cons( self, slot_type, date_start, date_end, prm_ids=None ): """ Fonction permettant de récupérer les données pour la construction des chart pour une ou des opérations données pour les consommateurs :param slot_type: type de slot pour la query ('month' ou 'hour' ou 'year') date_start: date début date_end: date de fin :return: un dictionnaire de données (labels et data pour les charts à afficher) """ label_histo = [] data_autocons_histo = [] data_allocons_histo = [] query = """ SELECT date_trunc(%s, A.date_slot) AS date_slot, (SUM( (CASE WHEN comp_data_type = 'autocons' THEN A.power ELSE 0 END) )/2) / 1000 as autocons, ((SUM( (CASE WHEN comp_data_type = 'cons' THEN A.power ELSE 0 END)) - SUM(CASE WHEN comp_data_type = 'autocons' THEN A.power ELSE 0 END) ) / 2) / 1000 as allocons FROM acc_enedis_cdc A JOIN acc_operation E ON E.id = A.acc_operation_id WHERE A.acc_operation_id IS NOT NULL AND A.acc_operation_id IN %s AND A.acc_counter_id IN %s AND A.date_slot >= %s AND A.date_slot <= %s GROUP BY date_trunc(%s, A.date_slot) ORDER BY date_slot ASC; """ query_params = ( slot_type, tuple(self.ids), tuple(prm_ids.ids), date_start, date_end, slot_type, ) self.env.cr.execute(query, query_params) raw_data = self.env.cr.fetchall() for row in raw_data: data_autocons_histo.append(round(row[1], 2)) data_allocons_histo.append(round(row[2], 2)) label_histo.append(row[0]) cdc_jour = { "autocons_histo": data_autocons_histo, "allocons_histo": data_allocons_histo, "label_histo": label_histo, } return cdc_jour def get_cdc_by_query_prod(self, slot_type, date_start, date_end, prm_ids=None): """ Fonction permettant de récupérer les données pour la construction des chart pour une ou des opérations données pour les consommateurs :param slot_type: type de slot pour la query ('month' ou 'hour' ou 'year') date_start: date début date_end: date de fin :return: un dictionnaire de données (labels et data pour les charts à afficher) """ label = [] data_autocons = [] data_surplus = [] label_histo = [] data_autocons_histo = [] data_surplus_histo = [] query = """ SELECT date_trunc(%s, A.date_slot) AS date_slot, ((SUM((CASE WHEN comp_data_type = 'prod' THEN A.power ELSE 0 END)) - SUM(CASE WHEN comp_data_type = 'surplus' THEN A.power ELSE 0 END)) / 2) / 1000 as autocons, (SUM( (CASE WHEN comp_data_type = 'surplus' THEN A.power ELSE 0 END) )/2) / 1000 as surplus FROM acc_enedis_cdc A JOIN acc_operation E ON E.id = A.acc_operation_id WHERE A.acc_operation_id IS NOT NULL AND A.acc_counter_id IN %s AND A.acc_operation_id IN %s AND A.date_slot >= %s AND A.date_slot <= %s GROUP BY date_trunc(%s, A.date_slot) ORDER BY date_slot ASC; """ query_params = ( slot_type, tuple(prm_ids.ids), tuple(self.ids), date_start, date_end, slot_type, ) self.env.cr.execute(query, query_params) raw_data = self.env.cr.fetchall() for row in raw_data: if slot_type == "day": data_autocons_histo.append(round(row[1], 2)) data_surplus_histo.append(round(row[2], 2)) label_histo.append(row[0]) data_autocons.append({"x": row[0], "y": round(row[1], 2)}) data_surplus.append({"x": row[0], "y": round(row[2], 2)}) label.append(row[0]) cdc_jour = { "autocons_prod": data_autocons, "label": label, "surplus": data_surplus, "autocons_prod_histo": data_autocons_histo, "label_histo": label_histo, "surplus_histo": data_surplus_histo, } return cdc_jour def get_cdc_by_query_histo_prod( self, slot_type, date_start, date_end, prm_ids=None ): """ Fonction permettant de récupérer les données pour la construction des chart pour une ou des opérations données pour les consommateurs :param slot_type: type de slot pour la query ('month' ou 'hour' ou 'year') date_start: date début date_end: date de fin :return: un dictionnaire de données (labels et data pour les charts à afficher) """ label_histo = [] data_autocons_prod_histo = [] data_surplus_histo = [] query = """ SELECT date_trunc(%s, A.date_slot) AS date_slot, ((SUM((CASE WHEN comp_data_type = 'prod' THEN A.power ELSE 0 END)) - SUM(CASE WHEN comp_data_type = 'surplus' THEN A.power ELSE 0 END)) / 2) / 1000 as autocons, (SUM( (CASE WHEN comp_data_type = 'surplus' THEN A.power ELSE 0 END) )/2) / 1000 as surplus FROM acc_enedis_cdc A JOIN acc_operation E ON E.id = A.acc_operation_id WHERE A.acc_operation_id IS NOT NULL AND A.acc_counter_id IN %s AND A.acc_operation_id IN %s AND A.date_slot >= %s AND A.date_slot <= %s GROUP BY date_trunc(%s, A.date_slot) ORDER BY date_slot ASC; """ query_params = ( slot_type, tuple(prm_ids.ids), tuple(self.ids), date_start, date_end, slot_type, ) self.env.cr.execute(query, query_params) raw_data = self.env.cr.fetchall() for row in raw_data: data_autocons_prod_histo.append(round(row[1], 2)) data_surplus_histo.append(round(row[2], 2)) label_histo.append(row[0]) cdc_jour = { "autocons_prod_histo": data_autocons_prod_histo, "label_histo": label_histo, "surplus_histo": data_surplus_histo, } return cdc_jour # ------------------------------------------------------ # Functions to manage route # ------------------------------------------------------ def graph_view_global( self, date_start=None, date_end=None, partner_id=None, prm_id=None, data_type=None, ): """ Fonction appelée pour l'affichage des courbes consommation sur le portail :param scale: type d'affichage des graphes (day/week/month/semestre/year) défini par le clic bouton @returns: dictionnaire pour la construction des graphes """ result_graph = {} date_start = datetime.strptime(date_start, "%d/%m/%Y") date_end = datetime.strptime(date_end, "%d/%m/%Y") date_end = date_utils.end_of(date_end, "day") scale, step_courbe, step_display_courbe = self.get_step_from_date( date_start=date_start, date_end=date_end, scale=False ) if data_type == "pmo": acc_counter_ids = ( self.env["acc.counter"] .sudo() .search([("acc_operation_id", "=", self.id)]) ) else: # Get PRM ids if prm_id: acc_counter_ids = self.env["acc.counter"].browse(prm_id) elif partner_id: acc_counter_ids = self.env["acc.counter"].search( [("partner_id", "=", partner_id)] ) else: acc_counter_ids = self.acc_delivery_ids + self.acc_injection_ids if scale == "week" or scale == "day": is_curve_line = True else: is_curve_line = False chart_data = {} if data_type == "cons" or data_type == "pmo": chart_data_cons = self.get_cdc_by_query_cons( step_courbe, date_start, date_end, acc_counter_ids ) if scale == "week": chart_data_histo = self.get_cdc_by_query_histo_cons( "day", date_start, date_end, acc_counter_ids ) chart_data_cons.update(chart_data_histo) chart_data.update(chart_data_cons) if data_type == "prod" or data_type == "pmo": chart_data_prod = self.get_cdc_by_query_prod( step_courbe, date_start, date_end, acc_counter_ids ) if scale == "week": chart_data_histo = self.get_cdc_by_query_histo_prod( "day", date_start, date_end, acc_counter_ids ) chart_data_prod.update(chart_data_histo) chart_data.update(chart_data_prod) result_graph["chart_data"] = chart_data last_record = self.get_the_last_day() date_deb, date_max = self.get_last_day("day", last_record) date_max = date_max.strftime("%d/%m/%Y") date_min = self.date_start_contract date_min = date_min.strftime("%d/%m/%Y") result_graph.update( { "date_start": date_start, "date_end": date_end, "date_min": date_min, "date_max": date_max, "scale": step_display_courbe, "is_curve_line": is_curve_line, } ) return result_graph