# Copyright 2023 Le Filament (<http://www.le-filament.com>) # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). from datetime import datetime from dateutil.relativedelta import relativedelta from odoo import _, api, fields, models from odoo.exceptions import ValidationError from odoo.tools import date_utils DEFAULT_MONTH_RANGE = 3 class AccOperation(models.Model): _inherit = "acc.operation" def prepare_values(self): last_record = self.get_the_last_day() date_day_start, date_day_end = self.get_last_day("day", last_record) last_day = date_day_start.strftime("%d %B %Y") date_day_start = date_day_start.strftime("%Y-%m-%d") date_day_end = date_day_end.strftime("%Y-%m-%d") date_week_start, date_week_end = self.get_last_day("week", last_record) last_week = ( date_week_start.strftime("%d %B %Y") + "-" + date_week_end.strftime("%d %B %Y") ) date_week_start = date_week_start.strftime("%Y-%m-%d") date_week_end = date_week_end.strftime("%Y-%m-%d") date_month_start, date_month_end = self.get_last_day("month", last_record) last_month = ( date_month_start.strftime("%d %B %Y") + "-" + date_month_end.strftime("%d %B %Y") ) date_month_start = date_month_start.strftime("%Y-%m-%d") date_month_end = date_month_end.strftime("%Y-%m-%d") date_semestre_start, date_semestre_end = self.get_last_day("semestre", last_record) last_semester = ( date_semestre_start.strftime("%d %B %Y") + "-" + date_semestre_end.strftime("%d %B %Y") ) date_semestre_start = date_semestre_start.strftime("%Y-%m-%d") date_semestre_end = date_semestre_end.strftime("%Y-%m-%d") date_year_start, date_year_end = self.get_last_day("year", last_record) last_year = ( date_year_start.strftime("%d %B %Y") + "- " + date_year_end.strftime("%d %B %Y") ) date_year_start = date_year_start.strftime("%Y-%m-%d") date_year_end = date_year_end.strftime("%Y-%m-%d") data_values = { "last_day": last_day, "last_week": last_week, "last_month": last_month, "last_semestre": last_semester, "last_year": last_year, # 'range_date': range_date, "date_day_start": date_day_start, "date_day_end": date_day_end, "date_week_start": date_week_start, "date_week_end": date_week_end, "date_month_start": date_month_start, "date_month_end": date_month_end, "date_semestre_start": date_semestre_start, "date_semestre_end": date_semestre_end, "date_year_start": date_year_start, "date_year_end": date_year_end, } return data_values def get_overview(self): return self.prepare_values() # ------------------------------------------------------ # Actions # ------------------------------------------------------ def action_view_courbes(self): """ Action qui ouvre la vue Qweb :return: Vue Qweb """ self.ensure_one() if self.id: return { 'type': 'ir.actions.act_url', 'url': '/operation/%s' % (self.id), 'target': 'new', } # ------------------------------------------------------ # Business methods # ------------------------------------------------------ def get_auto_power_tot(self): """ Fonction retournant l autoconsommation totale du dernier mois. :return: la somme de l'autoconso totale de tous les consommateurs """ # Récupère les dates du dernier mois disponible last_record = self.get_the_last_day() date_start, date_end = self.get_last_day("month", last_record) # Conso Totale autoconso_ids = self.env["acc.enedis.cdc"].search( [ ("acc_operation_id", "=", self.id), ("comp_data_type", "=", "autocons"), ("date_slot", ">=", date_start), ("date_slot", "<", date_end), ] ) autoconso_tot = (sum(autoconso_ids.mapped("power")) / 1000) / 2 return autoconso_tot def get_power_tot(self): """ Fonction retournant la consommation totale, l'autoconsommation totale et la production totale du dernier mois. :return: la somme de la conso totale de tous les consommateurs, la somme de l autoconso totale de tous les consommateurs, la sommme de la prod totale de tous les producteurs la date de début de mois """ # Get last date slot recorded last_record = self.get_the_last_day() date_start, date_end = self.get_last_day("month", last_record) query = """ SELECT date_trunc('month', A.date_slot) AS date_slot, (SUM( (CASE WHEN comp_data_type = 'cons' THEN A.power ELSE 0 END) )/2) / 1000 as conso_tot, (SUM( (CASE WHEN comp_data_type = 'prod' THEN A.power ELSE 0 END) )/2) / 1000 as prod_tot, (SUM( (CASE WHEN comp_data_type = 'autocons' THEN A.power ELSE 0 END) )/2) / 1000 as autoconso_tot FROM acc_enedis_cdc A JOIN acc_operation E ON E.id = A.acc_operation_id WHERE A.acc_operation_id IS NOT NULL AND A.acc_operation_id = %s AND A.date_slot >= %s AND A.date_slot < %s GROUP BY date_trunc('month', A.date_slot); """ query_params = ( self.id, date_start, date_end, ) self.env.cr.execute(query, query_params) raw_data = self.env.cr.fetchall() return raw_data[0][1], raw_data[0][2], raw_data[0][3], date_start def get_power_install_tot(self): """ Fonction retournant la puissance totale installée. :return: la somme de toutes les puissances d'injection """ puiss_tot = 0.0 for counter in self.acc_injection_ids: puiss_tot += counter.acc_counter_id.power_instal return puiss_tot def get_the_last_day(self): """ Fonction retournant la date du dernier enregistrement """ # Get last date slot recorded last_record = self.env["acc.enedis.cdc"].sudo().search( [ ("acc_operation_id", "in", self.ids), ], limit=1, order="date_slot DESC", ) if not last_record: raise ValidationError(_("L'opération ne possède pas de données")) return last_record def get_last_day(self, scale, last_record): """ Fonction retournant une date de début et une date de fin. Ces dates sont calculées en fonction de l'échelle choisie et du dernier élément enregistré - day: la date de début est égale à la dernière date avec un enreisgtrement pour l'opération donnée la date de fin ets la date de début + 1 - week: la date de début est égale à la dernière date avec un enreisgtrement pour l'opération donnée moins 7 jours la date de fin est égale à la dernière date avec un enreisgtrement pour l'opération donnée - month: la date de début est égale à la dernière date avec un enreisgtrement pour l'opération donnée moins 1 mois la date de fin est égale à la dernière date avec un enreisgtrement pour l'opération donnée - semestre: la date de début est égale à la dernière date avec un enreisgtrement pour l'opération donnée moins 6 mois la date de fin est égale à la dernière date avec un enreisgtrement pour l'opération donnée - year: la date de début est égale à la dernière date avec un enreisgtrement pour l'opération donnée moins 1 an la date de fin est égale à la dernière date avec un enreisgtrement pour l'opération donnée :param scale: type d'affichage des graphes (day/week/month/semestre/year) last_record: Dernier enregistrement dans la base :return: une date de début et une date de fin """ # Convert end datetime to timezone last_day_start = last_record.date_slot.replace(hour=0, minute=0, second=0) last_day_end = last_day_start + relativedelta(days=1) # Get end time slot for previous month with timezone # Manage if date is the last day of the month if fields.Date.to_date(last_day_start) == fields.Date.to_date( date_utils.end_of(last_record.date_slot, "month") ): end_month = last_day_end.replace(day=1) else: end_month = last_day_start.replace(day=1) if scale == "semestre": date_end = end_month date_start = end_month - relativedelta(months=6) elif scale == "year": date_end = end_month date_start = end_month.replace(month=1, day=1) elif scale == "week": date_start = last_day_end - relativedelta(days=7) date_end = last_day_end elif scale == "day": date_start = last_day_start date_end = last_day_end # month by default else: date_start = end_month - relativedelta(months=1) date_end = end_month return date_start, date_end def get_step_from_date(self, date_start, date_end, scale=None): """ Fonction retournant le pas des courbes en fonction de 2 dates. :return: step: hour/day/month/year """ # Calculate delta between 2 dates delta = (date_end - date_start).days if delta <= 1: step_display_courbe = "hour" step = "hour" elif delta < 32: step_display_courbe = "day" step = "hour" scale = "week" elif delta > 365: step = "year" step_display_courbe = "year" else: step = "month" step_display_courbe = "month" return scale, step, step_display_courbe def get_cdc_by_query_cons(self, slot_type, date_start, date_end, prm_ids=None): """ Fonction permettant de récupérer les données pour la construction des chart pour une ou des opérations données pour les consommateurs :param slot_type: type de slot pour la query ('month' ou 'hour' ou 'year') date_start: date début date_end: date de fin @returns: un dictionnaire de données (labels et data pour les charts à afficher) """ label = [] data_autocons = [] data_allocons = [] data_cons = [] data_prod = [] data_surplus = [] query = """ SELECT date_trunc(%s, A.date_slot) AS date_slot, (SUM( (CASE WHEN comp_data_type = 'autocons' THEN A.power ELSE 0 END) )/2) / 1000 as autocons, (SUM( (CASE WHEN comp_data_type = 'prod' THEN A.power ELSE 0 END) )/2) / 1000 as prod, ((SUM( (CASE WHEN comp_data_type = 'cons' THEN A.power ELSE 0 END)) - SUM(CASE WHEN comp_data_type = 'autocons' THEN A.power ELSE 0 END) ) / 2) / 1000 as allocons, (SUM( (CASE WHEN comp_data_type = 'cons' THEN A.power ELSE 0 END) )/2) / 1000 as cons FROM acc_enedis_cdc A JOIN acc_operation E ON E.id = A.acc_operation_id WHERE A.acc_operation_id IS NOT NULL AND A.acc_operation_id IN %s AND ( A.acc_counter_id IN %s OR A.comp_data_type = 'prod' ) AND A.date_slot >= %s AND A.date_slot < %s GROUP BY date_trunc(%s, A.date_slot) ORDER BY date_slot ASC; """ query_params = ( slot_type, tuple(self.ids), tuple(prm_ids.ids), date_start, date_end, slot_type, ) self.env.cr.execute(query, query_params) raw_data = self.env.cr.fetchall() for row in raw_data: # if slot_type == "month" or slot_type == "year": # data_autocons.append(int(row[1])) # data_prod.append(int(row[2])) # data_allocons.append(int(row[3])) # data_cons.append(int(row[4])) # label.append(row[0]) # else: data_autocons.append({"x": row[0], "y": round(row[1], 2)}) data_prod.append({"x": row[0], "y": round(row[2], 2)}) data_allocons.append({"x": row[0], "y": round(row[3], 2)}) data_cons.append({"x": row[0], "y": round(row[4], 2)}) label.append(row[0]) cdc_jour = { "autocons": data_autocons, "prod": data_prod, "allocons": data_allocons, "label": label, "cons": data_cons, "surplus": data_surplus, } return cdc_jour def get_cdc_by_query_histo_cons( self, slot_type, date_start, date_end, prm_ids=None ): """ Fonction permettant de récupérer les données pour la construction des chart pour une ou des opérations données pour les consommateurs :param slot_type: type de slot pour la query ('month' ou 'hour' ou 'year') date_start: date début date_end: date de fin :return: un dictionnaire de données (labels et data pour les charts à afficher) """ label_histo = [] data_autocons_histo = [] data_allocons_histo = [] query = """ SELECT date_trunc(%s, A.date_slot) AS date_slot, (SUM( (CASE WHEN comp_data_type = 'autocons' THEN A.power ELSE 0 END) )/2) / 1000 as autocons, ((SUM( (CASE WHEN comp_data_type = 'cons' THEN A.power ELSE 0 END)) - SUM(CASE WHEN comp_data_type = 'autocons' THEN A.power ELSE 0 END) ) / 2) / 1000 as allocons FROM acc_enedis_cdc A JOIN acc_operation E ON E.id = A.acc_operation_id WHERE A.acc_operation_id IS NOT NULL AND A.acc_operation_id IN %s AND A.acc_counter_id IN %s AND A.date_slot >= %s AND A.date_slot < %s GROUP BY date_trunc(%s, A.date_slot) ORDER BY date_slot ASC; """ query_params = ( slot_type, tuple(self.ids), tuple(prm_ids.ids), date_start, date_end, slot_type, ) self.env.cr.execute(query, query_params) raw_data = self.env.cr.fetchall() for row in raw_data: data_autocons_histo.append(round(row[1], 2)) data_allocons_histo.append(round(row[2], 2)) label_histo.append(row[0]) cdc_jour = { "autocons_histo": data_autocons_histo, "allocons_histo": data_allocons_histo, "label_histo": label_histo, } return cdc_jour def get_cdc_by_query_prod(self, slot_type, date_start, date_end, prm_ids=None): """ Fonction permettant de récupérer les données pour la construction des chart pour une ou des opérations données pour les consommateurs :param slot_type: type de slot pour la query ('month' ou 'hour' ou 'year') date_start: date début date_end: date de fin :return: un dictionnaire de données (labels et data pour les charts à afficher) """ label = [] data_autocons = [] data_surplus = [] label_histo = [] data_autocons_histo = [] data_surplus_histo = [] query = """ SELECT date_trunc(%s, A.date_slot) AS date_slot, ((SUM((CASE WHEN comp_data_type = 'prod' THEN A.power ELSE 0 END)) - SUM(CASE WHEN comp_data_type = 'surplus' THEN A.power ELSE 0 END)) / 2) / 1000 as autocons, (SUM( (CASE WHEN comp_data_type = 'surplus' THEN A.power ELSE 0 END) )/2) / 1000 as surplus FROM acc_enedis_cdc A JOIN acc_operation E ON E.id = A.acc_operation_id WHERE A.acc_operation_id IS NOT NULL AND A.acc_counter_id IN %s AND A.acc_operation_id IN %s AND A.date_slot >= %s AND A.date_slot < %s GROUP BY date_trunc(%s, A.date_slot) ORDER BY date_slot ASC; """ query_params = ( slot_type, tuple(prm_ids.ids), tuple(self.ids), date_start, date_end, slot_type, ) self.env.cr.execute(query, query_params) raw_data = self.env.cr.fetchall() for row in raw_data: # if slot_type == "month" or slot_type == "year": # data_autocons.append(int(row[1])) # data_surplus.append(int(row[2])) # label.append(row[0]) if slot_type == "day": data_autocons_histo.append(round(row[1], 2)) data_surplus_histo.append(round(row[2], 2)) label_histo.append(row[0]) # else: data_autocons.append({"x": row[0], "y": round(row[1], 2)}) data_surplus.append({"x": row[0], "y": round(row[2], 2)}) label.append(row[0]) cdc_jour = { "autocons_prod": data_autocons, "label": label, "surplus": data_surplus, "autocons_prod_histo": data_autocons_histo, "label_histo": label_histo, "surplus_histo": data_surplus_histo, } return cdc_jour def get_cdc_by_query_histo_prod( self, slot_type, date_start, date_end, prm_ids=None ): """ Fonction permettant de récupérer les données pour la construction des chart pour une ou des opérations données pour les consommateurs :param slot_type: type de slot pour la query ('month' ou 'hour' ou 'year') date_start: date début date_end: date de fin :return: un dictionnaire de données (labels et data pour les charts à afficher) """ label_histo = [] data_autocons_prod_histo = [] data_surplus_histo = [] query = """ SELECT date_trunc(%s, A.date_slot) AS date_slot, ((SUM((CASE WHEN comp_data_type = 'prod' THEN A.power ELSE 0 END)) - SUM(CASE WHEN comp_data_type = 'surplus' THEN A.power ELSE 0 END)) / 2) / 1000 as autocons, (SUM( (CASE WHEN comp_data_type = 'surplus' THEN A.power ELSE 0 END) )/2) / 1000 as surplus FROM acc_enedis_cdc A JOIN acc_operation E ON E.id = A.acc_operation_id WHERE A.acc_operation_id IS NOT NULL AND A.acc_counter_id IN %s AND A.acc_operation_id IN %s AND A.date_slot >= %s AND A.date_slot < %s GROUP BY date_trunc(%s, A.date_slot) ORDER BY date_slot ASC; """ query_params = ( slot_type, tuple(prm_ids.ids), tuple(self.ids), date_start, date_end, slot_type, ) self.env.cr.execute(query, query_params) raw_data = self.env.cr.fetchall() for row in raw_data: data_autocons_prod_histo.append(round(row[1], 2)) data_surplus_histo.append(round(row[2], 2)) label_histo.append(row[0]) cdc_jour = { "autocons_prod_histo": data_autocons_prod_histo, "label_histo": label_histo, "surplus_histo": data_surplus_histo, } return cdc_jour # ------------------------------------------------------ # Functions to manage route # ------------------------------------------------------ def graph_view_global( self, date_start=None, date_end=None, partner_id=None, prm_id=None, data_type=None, ): """ Fonction appelée pour l'affichage des courbes consommation sur le portail :param scale: type d'affichage des graphes (day/week/month/semestre/year) défini par le clic bouton :return: dictionnaire pour la construction des graphes """ result_graph = {} date_end = date_utils.end_of(date_end, "day") date_start = datetime.strptime(date_start, "%d/%m/%Y") date_end = datetime.strptime(date_end, "%d/%m/%Y") scale, step_courbe, step_display_courbe = self.get_step_from_date( date_start=date_start, date_end=date_end, scale=False ) if data_type == "pmo": acc_counter_ids = ( self.env["acc.counter"] .sudo() .search([("acc_operation_id", "=", self.id)]) ) else: # Get PRM ids if prm_id: acc_counter_ids = self.env["acc.counter"].browse(prm_id) elif partner_id: acc_counter_ids = self.env["acc.counter"].search( [("partner_id", "=", partner_id)] ) else: acc_counter_ids = self.acc_delivery_ids + self.acc_injection_ids if scale == "week" or scale == "day": is_curve_line = True else: is_curve_line = False chart_data = {} if data_type == "cons" or data_type == "pmo": chart_data_cons = self.get_cdc_by_query_cons( step_courbe, date_start, date_end, acc_counter_ids ) if scale == "week": chart_data_histo = self.get_cdc_by_query_histo_cons( "day", date_start, date_end, acc_counter_ids ) chart_data_cons.update(chart_data_histo) chart_data.update(chart_data_cons) if data_type == "prod" or data_type == "pmo": chart_data_prod = self.get_cdc_by_query_prod( step_courbe, date_start, date_end, acc_counter_ids ) if scale == "week": chart_data_histo = self.get_cdc_by_query_histo_prod( "day", date_start, date_end, acc_counter_ids ) chart_data_prod.update(chart_data_histo) chart_data.update(chart_data_prod) result_graph["chart_data"] = chart_data last_record = self.get_the_last_day() date_deb, date_max = self.get_last_day("day", last_record) date_max = date_max.strftime("%d/%m/%Y") date_min = self.date_start_contract date_min = date_min.strftime("%d/%m/%Y") result_graph.update( { "date_start": date_start, "date_end": date_end, "date_min": date_min, "date_max": date_max, "scale": step_display_courbe, "is_curve_line": is_curve_line, } ) return result_graph