Sélectionner une révision Git
acc_operation.py 19,62 Kio
# Copyright 2023 Le Filament (<http://www.le-filament.com>)
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
from datetime import datetime
from dateutil.relativedelta import relativedelta
from odoo import _, fields, models
from odoo.exceptions import ValidationError
from odoo.tools import date_utils
DEFAULT_MONTH_RANGE = 3
class AccOperation(models.Model):
_inherit = "acc.operation"
# ------------------------------------------------------
# Actions
# ------------------------------------------------------
def action_view_courbes(self):
"""
Action qui ouvre le portail sur l'opération
:return: Vue Qweb
"""
self.ensure_one()
if self.id:
return {
"type": "ir.actions.act_url",
"url": "/operation/%s" % (self.id),
"target": "new",
}
# ------------------------------------------------------
# Business methods
# ------------------------------------------------------
def get_power_tot(self):
"""
Fonction retournant la consommation totale, l'autoconsommation totale
et la production totale du dernier mois.
:return: la somme de la conso totale de tous les consommateurs,
la somme de l autoconso totale de tous les consommateurs,
la sommme de la prod totale de tous les producteurs
la date de début de mois
"""
# Get last date slot recorded
last_record = self.get_last_cdc_record()
date_start, date_end = self.get_last_day("month", last_record)
query = """
SELECT
date_trunc('month', A.date_slot) AS date_slot,
(SUM( (CASE
WHEN comp_data_type = 'cons' THEN A.power
ELSE 0 END) )/2) / 1000 as conso_tot,
(SUM( (CASE
WHEN comp_data_type = 'prod'
THEN A.power ELSE 0 END) )/2) / 1000 as prod_tot,
(SUM( (CASE WHEN
comp_data_type = 'autocons' THEN A.power
ELSE 0 END) )/2) / 1000 as autoconso_tot
FROM
acc_enedis_cdc A
JOIN
acc_operation E ON E.id = A.acc_operation_id
WHERE
A.acc_operation_id IS NOT NULL
AND A.acc_operation_id = %s
AND A.date_slot >= %s
AND A.date_slot <= %s
GROUP BY date_trunc('month', A.date_slot);
"""
query_params = (
self.id,
date_start,
date_end,
)
self.env.cr.execute(query, query_params)
raw_data = self.env.cr.fetchall()
return raw_data[0][1], raw_data[0][2], raw_data[0][3], date_start
def get_last_cdc_record(self):
"""
@returns: last acc.enedis.cdc record in operations
"""
# Get last date slot recorded
last_record = (
self.env["acc.enedis.cdc"]
.sudo()
.search(
[
("acc_operation_id", "in", self.ids),
],
limit=1,
order="date_slot DESC",
)
)
if not last_record:
raise ValidationError(_("L'opération ne possède pas de données"))
return last_record
def get_last_day(self, scale, last_record):
"""
Fonction retournant une date de début et une date de fin.
Ces dates sont calculées en fonction de l'échelle choisie et du dernier
élément enregistré
- day: la date de début est égale à la dernière date
avec un enreisgtrement pour l'opération donnée
la date de fin ets la date de début + 1
- week: la date de début est égale à la dernière date
avec un enreisgtrement pour l'opération donnée moins 7 jours
la date de fin est égale à la dernière date
avec un enreisgtrement pour l'opération donnée
- month: la date de début est égale à la dernière date
avec un enreisgtrement pour l'opération donnée moins 1 mois
la date de fin est égale à la dernière date
avec un enreisgtrement pour l'opération donnée
- semestre: la date de début est égale à la dernière date
avec un enreisgtrement pour l'opération donnée moins 6 mois
la date de fin est égale à la dernière date
avec un enreisgtrement pour l'opération donnée
- year: la date de début est égale à la dernière date
avec un enreisgtrement pour l'opération donnée moins 1 an
la date de fin est égale à la dernière date
avec un enreisgtrement pour l'opération donnée
:param str scale: type d'affichage des graphes
(day/week/month/semestre/year)
:param object last_record: Dernier enregistrement dans la base
@returns: une date de début et une date de fin
"""
# Convert end datetime to timezone
last_day_start = date_utils.start_of(last_record.date_slot, "day")
last_day_end = date_utils.end_of(last_record.date_slot, "day")
end_month = date_utils.end_of(last_record.date_slot, "month")
if scale == "semestre":
date_end = end_month
date_start = end_month - relativedelta(months=6)
elif scale == "year":
date_end = end_month
date_start = end_month.replace(month=1, day=1)
elif scale == "week":
date_start = last_day_end - relativedelta(days=7)
date_end = last_day_end
elif scale == "day":
date_start = last_day_start
date_end = last_day_end
# month by default
else:
date_start = date_utils.start_of(end_month, "month")
date_end = end_month
return date_start, date_end
def get_step_from_date(self, date_start, date_end, scale=None):
"""
Fonction retournant le pas des courbes en fonction de 2 dates.
:return: step: hour/day/month/year
"""
# Calculate delta between 2 dates
delta = (date_end - date_start).days
if delta <= 1:
step_display_courbe = "hour"
step = "hour"
elif delta < 32:
step_display_courbe = "day"
step = "hour"
scale = "week"
elif delta > 365:
step = "year"
step_display_courbe = "year"
else:
step = "month"
step_display_courbe = "month"
return scale, step, step_display_courbe
def get_cdc_by_query_cons(self, slot_type, date_start, date_end, prm_ids=None):
"""
Fonction permettant de récupérer les données pour la
construction des chart pour une ou des opérations données
pour les consommateurs
:param slot_type: type de slot pour la query ('month' ou 'hour' ou 'year')
date_start: date début
date_end: date de fin
@returns: un dictionnaire de données
(labels et data pour les charts à afficher)
"""
label = []
data_autocons = []
data_allocons = []
data_cons = []
data_prod = []
data_surplus = []
query = """
SELECT
date_trunc(%s, A.date_slot) AS date_slot,
(SUM( (CASE WHEN
comp_data_type = 'autocons' THEN A.power
ELSE 0 END) )/2) / 1000 as autocons,
(SUM( (CASE
WHEN comp_data_type = 'prod'
THEN A.power ELSE 0 END) )/2) / 1000 as prod,
((SUM( (CASE WHEN comp_data_type = 'cons' THEN A.power ELSE 0 END)) -
SUM(CASE WHEN comp_data_type = 'autocons' THEN A.power ELSE 0 END) ) / 2) / 1000 as allocons,
(SUM( (CASE
WHEN comp_data_type = 'cons' THEN A.power
ELSE 0 END) )/2) / 1000 as cons
FROM
acc_enedis_cdc A
JOIN
acc_operation E ON E.id = A.acc_operation_id
WHERE
A.acc_operation_id IS NOT NULL
AND A.acc_operation_id IN %s
AND ( A.acc_counter_id IN %s OR A.comp_data_type = 'prod' )
AND A.date_slot >= %s
AND A.date_slot <= %s
GROUP BY date_trunc(%s, A.date_slot)
ORDER BY date_slot ASC;
"""
query_params = (
slot_type,
tuple(self.ids),
tuple(prm_ids.ids),
date_start,
date_end,
slot_type,
)
self.env.cr.execute(query, query_params)
raw_data = self.env.cr.fetchall()
for row in raw_data:
data_autocons.append({"x": row[0], "y": round(row[1], 2)})
data_prod.append({"x": row[0], "y": round(row[2], 2)})
data_allocons.append({"x": row[0], "y": round(row[3], 2)})
data_cons.append({"x": row[0], "y": round(row[4], 2)})
label.append(row[0])
cdc_jour = {
"autocons": data_autocons,
"prod": data_prod,
"allocons": data_allocons,
"label": label,
"cons": data_cons,
"surplus": data_surplus,
}
return cdc_jour
def get_cdc_by_query_histo_cons(
self, slot_type, date_start, date_end, prm_ids=None
):
"""
Fonction permettant de récupérer les données pour la construction
des chart pour une ou des opérations données pour les consommateurs
:param slot_type: type de slot pour la query ('month' ou 'hour' ou 'year')
date_start: date début
date_end: date de fin
:return: un dictionnaire de données
(labels et data pour les charts à afficher)
"""
label_histo = []
data_autocons_histo = []
data_allocons_histo = []
query = """
SELECT
date_trunc(%s, A.date_slot) AS date_slot,
(SUM( (CASE
WHEN comp_data_type = 'autocons'
THEN A.power ELSE 0 END) )/2) / 1000 as autocons,
((SUM( (CASE WHEN comp_data_type = 'cons' THEN A.power ELSE 0 END)) -
SUM(CASE WHEN comp_data_type = 'autocons' THEN A.power ELSE 0 END) ) / 2) / 1000 as allocons
FROM acc_enedis_cdc A
JOIN acc_operation E ON E.id = A.acc_operation_id
WHERE A.acc_operation_id IS NOT NULL
AND A.acc_operation_id IN %s
AND A.acc_counter_id IN %s
AND A.date_slot >= %s
AND A.date_slot <= %s
GROUP BY date_trunc(%s, A.date_slot)
ORDER BY date_slot ASC;
"""
query_params = (
slot_type,
tuple(self.ids),
tuple(prm_ids.ids),
date_start,
date_end,
slot_type,
)
self.env.cr.execute(query, query_params)
raw_data = self.env.cr.fetchall()
for row in raw_data:
data_autocons_histo.append(round(row[1], 2))
data_allocons_histo.append(round(row[2], 2))
label_histo.append(row[0])
cdc_jour = {
"autocons_histo": data_autocons_histo,
"allocons_histo": data_allocons_histo,
"label_histo": label_histo,
}
return cdc_jour
def get_cdc_by_query_prod(self, slot_type, date_start, date_end, prm_ids=None):
"""
Fonction permettant de récupérer les données pour la construction des chart
pour une ou des opérations données pour les consommateurs
:param slot_type: type de slot pour la query ('month' ou 'hour' ou 'year')
date_start: date début
date_end: date de fin
:return: un dictionnaire de données
(labels et data pour les charts à afficher)
"""
label = []
data_autocons = []
data_surplus = []
label_histo = []
data_autocons_histo = []
data_surplus_histo = []
query = """
SELECT
date_trunc(%s, A.date_slot) AS date_slot,
((SUM((CASE
WHEN comp_data_type = 'prod' THEN A.power
ELSE 0 END))
- SUM(CASE
WHEN comp_data_type = 'surplus' THEN A.power
ELSE 0 END)) / 2) / 1000 as autocons,
(SUM( (CASE
WHEN comp_data_type = 'surplus' THEN A.power
ELSE 0 END) )/2) / 1000 as surplus
FROM acc_enedis_cdc A
JOIN acc_operation E ON E.id = A.acc_operation_id
WHERE A.acc_operation_id IS NOT NULL
AND A.acc_counter_id IN %s
AND A.acc_operation_id IN %s
AND A.date_slot >= %s
AND A.date_slot <= %s
GROUP BY date_trunc(%s, A.date_slot)
ORDER BY date_slot ASC;
"""
query_params = (
slot_type,
tuple(prm_ids.ids),
tuple(self.ids),
date_start,
date_end,
slot_type,
)
self.env.cr.execute(query, query_params)
raw_data = self.env.cr.fetchall()
for row in raw_data:
if slot_type == "day":
data_autocons_histo.append(round(row[1], 2))
data_surplus_histo.append(round(row[2], 2))
label_histo.append(row[0])
data_autocons.append({"x": row[0], "y": round(row[1], 2)})
data_surplus.append({"x": row[0], "y": round(row[2], 2)})
label.append(row[0])
cdc_jour = {
"autocons_prod": data_autocons,
"label": label,
"surplus": data_surplus,
"autocons_prod_histo": data_autocons_histo,
"label_histo": label_histo,
"surplus_histo": data_surplus_histo,
}
return cdc_jour
def get_cdc_by_query_histo_prod(
self, slot_type, date_start, date_end, prm_ids=None
):
"""
Fonction permettant de récupérer les données pour la construction des chart
pour une ou des opérations données pour les consommateurs
:param slot_type: type de slot pour la query ('month' ou 'hour' ou 'year')
date_start: date début
date_end: date de fin
:return: un dictionnaire de données
(labels et data pour les charts à afficher)
"""
label_histo = []
data_autocons_prod_histo = []
data_surplus_histo = []
query = """
SELECT
date_trunc(%s, A.date_slot) AS date_slot,
((SUM((CASE
WHEN comp_data_type = 'prod' THEN A.power
ELSE 0 END))
- SUM(CASE
WHEN comp_data_type = 'surplus' THEN A.power
ELSE 0 END)) / 2) / 1000 as autocons,
(SUM( (CASE
WHEN comp_data_type = 'surplus' THEN A.power
ELSE 0 END) )/2) / 1000 as surplus
FROM acc_enedis_cdc A
JOIN acc_operation E ON E.id = A.acc_operation_id
WHERE A.acc_operation_id IS NOT NULL
AND A.acc_counter_id IN %s
AND A.acc_operation_id IN %s
AND A.date_slot >= %s
AND A.date_slot <= %s
GROUP BY date_trunc(%s, A.date_slot)
ORDER BY date_slot ASC;
"""
query_params = (
slot_type,
tuple(prm_ids.ids),
tuple(self.ids),
date_start,
date_end,
slot_type,
)
self.env.cr.execute(query, query_params)
raw_data = self.env.cr.fetchall()
for row in raw_data:
data_autocons_prod_histo.append(round(row[1], 2))
data_surplus_histo.append(round(row[2], 2))
label_histo.append(row[0])
cdc_jour = {
"autocons_prod_histo": data_autocons_prod_histo,
"label_histo": label_histo,
"surplus_histo": data_surplus_histo,
}
return cdc_jour
# ------------------------------------------------------
# Functions to manage route
# ------------------------------------------------------
def graph_view_global(
self,
date_start=None,
date_end=None,
partner_id=None,
prm_id=None,
data_type=None,
):
"""
Fonction appelée pour l'affichage des courbes consommation
sur le portail
:param scale: type d'affichage des graphes
(day/week/month/semestre/year)
défini par le clic bouton
@returns: dictionnaire pour la construction des graphes
"""
result_graph = {}
date_start = datetime.strptime(date_start, "%d/%m/%Y")
date_end = datetime.strptime(date_end, "%d/%m/%Y")
date_end = date_utils.end_of(date_end, "day")
scale, step_courbe, step_display_courbe = self.get_step_from_date(
date_start=date_start, date_end=date_end, scale=False
)
if data_type == "pmo":
acc_counter_ids = (
self.env["acc.counter"]
.sudo()
.search([("acc_operation_id", "=", self.id)])
)
else:
# Get PRM ids
if prm_id:
acc_counter_ids = self.env["acc.counter"].browse(prm_id)
elif partner_id:
acc_counter_ids = self.env["acc.counter"].search(
[("partner_id", "=", partner_id)]
)
else:
acc_counter_ids = self.acc_delivery_ids + self.acc_injection_ids
if scale == "week" or scale == "day":
is_curve_line = True
else:
is_curve_line = False
chart_data = {}
if data_type == "cons" or data_type == "pmo":
chart_data_cons = self.get_cdc_by_query_cons(
step_courbe, date_start, date_end, acc_counter_ids
)
if scale == "week":
chart_data_histo = self.get_cdc_by_query_histo_cons(
"day", date_start, date_end, acc_counter_ids
)
chart_data_cons.update(chart_data_histo)
chart_data.update(chart_data_cons)
if data_type == "prod" or data_type == "pmo":
chart_data_prod = self.get_cdc_by_query_prod(
step_courbe, date_start, date_end, acc_counter_ids
)
if scale == "week":
chart_data_histo = self.get_cdc_by_query_histo_prod(
"day", date_start, date_end, acc_counter_ids
)
chart_data_prod.update(chart_data_histo)
chart_data.update(chart_data_prod)
result_graph["chart_data"] = chart_data
last_record = self.get_last_cdc_record()
date_deb, date_max = self.get_last_day("day", last_record)
date_max = date_max.strftime("%d/%m/%Y")
date_min = self.date_start_contract
date_min = date_min.strftime("%d/%m/%Y")
result_graph.update(
{
"date_start": date_start,
"date_end": date_end,
"date_min": date_min,
"date_max": date_max,
"scale": step_display_courbe,
"is_curve_line": is_curve_line,
}
)
return result_graph