Sélectionner une révision Git
Bifurcation depuis
Le Filament / Confédération Générale des SCOP / cgscop_partner
Le projet source a une visibilité limitée.
acc_operation.py 20,11 Kio
# Copyright 2021- Le Filament (https://le-filament.com)
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html)
import logging
from datetime import date, datetime
from dateutil.relativedelta import relativedelta
from odoo import _, fields, models
from odoo.exceptions import UserError, ValidationError
_logger = logging.getLogger(__name__)
class AccOperation(models.Model):
_name = "acc.operation"
_inherit = ["acc.operation", "api.enedis"]
# ------------------------------------------------------
# Fields declaration
# ------------------------------------------------------
client_id = fields.Char("Client ID Enedis")
secret_id = fields.Char("Secret ID Enedis")
# ------------------------------------------------------
# SQL Constraints
# ------------------------------------------------------
# ------------------------------------------------------
# Default methods
# ------------------------------------------------------
# ------------------------------------------------------
# Computed fields / Search Fields
# ------------------------------------------------------
# ------------------------------------------------------
# Onchange / Constraints
# ------------------------------------------------------
# ------------------------------------------------------
# CRUD methods (ORM overrides)
# ------------------------------------------------------
# ------------------------------------------------------
# Actions
# ------------------------------------------------------
def get_perimeter(self):
self.ensure_one()
message = self.perimeter()
# Logs information logs
log_id = self.env["acc.logs"].create(
{
"name": "Appel API Enedis Périmètre "
+ self.name
+ " du "
+ str(fields.Date.today()),
"date_launched": fields.Datetime.now(),
"type_log": "api",
"message": message,
"acc_operation_id": self.id,
}
)
view_id = self.env.ref("oacc.acc_logs_form").id
return {
"name": "LOGS",
"view_type": "form",
"view_mode": "form",
"views": [(view_id, "form")],
"res_model": "acc.logs",
"view_id": view_id,
"type": "ir.actions.act_window",
"res_id": log_id.id,
"target": "new",
"flags": {"initial_mode": "view"},
}
# ------------------------------------------------------
# API functions
# ------------------------------------------------------
def curves(
self, start_date, end_date, usage_point_cons_ids=None, usage_point_prod_ids=None
):
"""
Vérifie les périodes, crée un log et appelle les courbes de conso / prod
à partir de l'API Enedis (cet appel passe par un job queue)
:param date start_date : date de début à récupérer (incluse)
:param date end_date : date de fin à récupérer (exclue)
:param recordset usage_point_cons_ids : liste de PRMs de soutirage à récupérer
:param recordset usage_point_prod_ids : liste de PRMs d'injection à récupérer
Les PRMs sont optionnels,
si non renseignés, tous les PRMs liés à l'opération seront récupérés
"""
self._check_access_api()
# TODO: Update to get curves only for existing periods
# Si pas de PRM sélectionnés
if not usage_point_cons_ids and not usage_point_prod_ids:
usage_point_cons_ids = self.acc_delivery_period_ids.mapped("acc_counter_id")
usage_point_prod_ids = self.acc_injection_period_ids.mapped(
"acc_counter_id"
)
message = ""
message += (
"<h1>API Enedis OACC - Création des jobs de récupération des Courbes "
+ str(fields.Datetime.now())
+ "</h1>"
"Appels API pour la période "
"" + str(start_date) + " " + str(end_date) + "<br/>"
)
if usage_point_cons_ids:
# Traitement données de cons
message += (
"<br/><strong>Création des jobs de récupération des courbes "
"de consommation</strong><br/>"
)
for usage_point_id in usage_point_cons_ids:
# Vérification que le PRM est actif sur la période demandée
period_ids = self.env["acc.counter.period"]._get_periods_from_interval(
[
("acc_operation_id", "=", self.id),
("acc_counter_id", "=", usage_point_id.id),
("prm_type", "=", "delivery"),
],
start_date,
end_date,
)
# Si pas de période de PRM, on ne fait pas d'appel API
if not period_ids:
message += (
"Opération: "
+ self.name
+ " - PRM: "
+ usage_point_id.name
+ " - aucune période trouvée du "
+ str(start_date)
+ " au "
+ str(end_date)
+ "<br/>"
)
# Si période de PRM on vérifie les dates puis on appelle l'API
else:
start_date_upd = max(start_date, period_ids[0].start_date)
if period_ids[-1].end_date:
end_date_upd = min(end_date, period_ids[-1].end_date)
else:
end_date_upd = end_date
desc = (
"Opération: "
+ self.name
+ " - PRM: "
+ usage_point_id.name
+ " - Date: "
+ str(fields.Datetime.today())
+ " - Période: du "
+ str(start_date_upd)
+ " au "
+ str(end_date_upd)
)
message += desc + "<br/>"
self.with_delay(description=desc).get_definitive_load_curves(
start_date_upd, end_date_upd, usage_point_id, "cons"
)
if usage_point_prod_ids:
# Traitement données de prod
message += (
"<br/><strong>Création des jobs de récupération des courbes "
"de production</strong><br/>"
)
for usage_point_id in usage_point_prod_ids:
# Vérification que le PRM est actif sur la période demandée
period_ids = self.env["acc.counter.period"]._get_periods_from_interval(
[
("acc_operation_id", "=", self.id),
("acc_counter_id", "=", usage_point_id.id),
("prm_type", "=", "injection"),
],
start_date,
end_date,
)
# Si pas de période de PRM, on ne fait pas d'appel API
if not period_ids:
message += (
"Opération: "
+ self.name
+ " - PRM: "
+ usage_point_id.name
+ " - aucune période trouvée du "
+ str(start_date)
+ " au "
+ str(end_date)
+ "<br/>"
)
# Si période de PRM on vérifie les dates puis on appelle l'API
else:
start_date_upd = max(start_date, period_ids[0].start_date)
if period_ids[-1].end_date:
end_date_upd = min(end_date, period_ids[-1].end_date)
else:
end_date_upd = end_date
desc = (
"Opération: "
+ self.name
+ " - PRM: "
+ usage_point_id.name
+ " - Date: "
+ str(fields.Datetime.today())
+ " - Période: du "
+ str(start_date_upd)
+ " au "
+ str(end_date_upd)
)
message += desc + "<br/>"
self.with_delay(description=desc).get_definitive_load_curves(
start_date_upd,
end_date_upd,
usage_point_id,
"prod",
)
message += (
"<br/><h1>Fin de création des jobs de récupération des courbes: "
+ str(fields.Datetime.now())
+ "</h1>"
)
if not self.is_cdc_data_exists:
self.is_cdc_data_exists = True
# Logs information
self.env["acc.logs"].create(
{
"name": "Appel API Enedis Courbes du "
+ str(fields.Date.today())
+ " - Période "
+ str(start_date)
+ " "
+ str(end_date),
"date_launched": fields.Datetime.now(),
"type_log": "api",
"message": message,
"acc_operation_id": self.id,
}
)
def get_definitive_load_curves(
self, start_date, end_date, usage_point_id, type_prm
):
"""
Appelle les courbes de conso / prod à partir de l'API Enedis
:param date start_date : date de début à récupérer (incluse)
:param date end_date : date de fin à récupérer (exclue)
:param record usage_point_id : PRM à récupérer
:param char type_prm : type de PRM à récupérer ("cons" ou "prod")
"""
message = ""
message += "PRM " + usage_point_id.name + "\n"
message += "Appel API ...\n"
# Transformation des dates naives en datetime UTC
start_datetime = self._convert_time(start_date)
end_datetime = self._convert_time(end_date)
curves_data = self._get_definitive_load_curves(
self.name,
start_datetime,
end_datetime,
usage_point_id.name,
self.client_id,
self.secret_id,
type_prm,
)
message += "Appel API terminé. Traitement des données ...\n"
curves = curves_data.get("curves")
if curves:
name = usage_point_id.name + "_" + str(start_date) + "_" + str(end_date)
for curve in curves:
type_curve = curve["type"]
for point in curve["interval_reading"]:
date_slot = datetime.strptime(point["timestamp"], "%Y-%m-%dT%H:%M:%SZ")
self.env["acc.enedis.cdc"].create(
{
"name": name,
"acc_operation_id": self.id,
"acc_counter_id": usage_point_id.id or False,
"comp_data_type": type_curve,
"power": point["value"],
"date_slot": date_slot,
}
)
# Update partner_id for retrieved cdc
domain = [
("acc_operation_id", "=", self.id),
("acc_counter_id", "=", usage_point_id.id or False),
("prm_type", "=", "delivery" if type_prm == "cons" else "injection"),
]
self.env["acc.counter.period"]._get_periods_from_interval(
domain, start_date, end_date
)._update_cdc_partner_id()
message += "Fin du traitement des données\n"
_logger.info(message)
return message
def get_curves_all(self, usage_point_cons_ids=None, usage_point_prod_ids=None):
"""
Récupère les données de l'opération depuis le début de la mise en place
de l'opération. A partir de la date de début de contrat, calcul du nombre
de mois entre la date de début de contrat et la date du jour. Puis
récupération des données de tous les mois
"""
nb_months = (date.today().year - self.date_start_contract.year) * 12 + (
date.today().month - self.date_start_contract.month
)
start_date_it = self.date_start_contract
end_date_it = self.date_start_contract + relativedelta(months=1)
for _i in range(nb_months):
self.curves(
start_date_it, end_date_it, usage_point_cons_ids, usage_point_prod_ids
)
start_date_it = start_date_it + relativedelta(months=1)
end_date_it = start_date_it + relativedelta(months=1)
def perimeter(self):
"""
Récupère les données de l'opération concernant le périmètre:
- liste des PRM
- date de début opération
"""
# TODO : refactor, too complex
self._check_access_api()
message = ""
message += (
"<h1>Appel Enedis Périmètre "
+ self.name
+ " du "
+ str(fields.Datetime.now())
+ "</h1>"
)
message += "<p><strong>Appel API ...<br/>"
perimeter_data = self._get_perimeter(self.name, self.client_id, self.secret_id)
message += "<p><strong>Appel API terminé<br/>" "Traitement des données ...<br/>"
usage_points = perimeter_data.get("usage_points")
list_injection = []
list_soutirage = []
for usage_point in sorted(
usage_points, key=lambda p: date.fromisoformat(p["start"])
):
usage_point_start = date.fromisoformat(usage_point["start"])
usage_point_end = (
date.fromisoformat(usage_point["end"])
if usage_point["end"] != "9999-12-31"
else False
)
usage_point_prm_type = False
if usage_point["type"] == "CONS":
usage_point_prm_type = "delivery"
elif usage_point["type"] == "PROD":
usage_point_prm_type = "injection"
message += (
"<br/>PRM "
+ usage_point["type"]
+ " : "
+ usage_point["usage_point_id"]
+ " - Dates Enedis : "
+ usage_point["start"]
+ " - "
+ usage_point["end"]
+ "<br/>"
)
counter_id = self.env["acc.counter"].search(
[
("name", "=", usage_point["usage_point_id"]),
]
)
if counter_id and len(counter_id) == 1:
message += "PRM existe dans Odoo<br/>"
counter_period_ids = counter_id.period_ids.filtered(
lambda p: p.prm_type == usage_point_prm_type
)
if counter_period_ids.filtered(
lambda p: p.start_date == usage_point_start
and p.end_date == usage_point_end
):
message += (
"période existante avec les mêmes dates, pas de modif<br/>"
)
elif counter_period_ids.filtered(
lambda p: p.start_date == usage_point_start
):
counter_period_ids.filtered(
lambda p: p.start_date == usage_point_start
).end_date = usage_point_end
message += (
"période existante avec la même date de début, mais date de "
"fin différente, mise à jour date de fin<br/>"
)
else:
try:
self.env["acc.counter.period"].create(
{
"acc_counter_id": counter_id.id,
"prm_type": usage_point_prm_type,
"acc_operation_id": self.id,
"start_date": usage_point_start,
"end_date": usage_point_end,
}
)
message += (
"période inexistante avec les mêmes dates : création<br/>"
)
except ValidationError as e:
message += (
"<strong>erreur lors de la tentative de création d'une "
"nouvelle période, à vérifier manuellement :</strong><br/>"
+ str(e)
+ "<br/>"
)
elif len(counter_id) > 1:
message += "Plusieurs PRMs trouvés avec ce numéro - pas de modif<br/>"
else:
message += "PRM n'existe pas : Création ...<br/>"
if usage_point_prm_type == "injection":
# Si la date de l'opération n'est pas renseignée ou
# après la date de démarrage du point d'injection
# alors on force la date à celle du point d'injection
if (
not self.date_start_contract
or self.date_start_contract > usage_point_start
):
self.date_start_contract = usage_point_start
try:
counter_id = self.env["acc.counter"].create(
{
"name": usage_point["usage_point_id"],
}
)
self.env["acc.counter.period"].create(
{
"acc_counter_id": counter_id.id,
"prm_type": usage_point_prm_type,
"acc_operation_id": self.id,
"start_date": usage_point_start,
"end_date": usage_point_end,
}
)
message += "Fin de la création du PRM<br/>"
except ValidationError as e:
message += (
"<strong>erreur lors de la tentative de création du PRM et/ou "
"de la nouvelle période, à vérifier manuellement :<strong><br/>"
+ str(e)
+ "<br/>"
)
if usage_point_prm_type == "injection":
if usage_point["usage_point_id"] not in list_injection:
list_injection.append(usage_point["usage_point_id"])
if usage_point_prm_type == "delivery":
if usage_point["usage_point_id"] not in list_soutirage:
list_soutirage.append(usage_point["usage_point_id"])
message += "<p>LISTE TOTAL DE PRMs: </br>PRM Injection</br>"
i = 1
for inj in list_injection:
message += str(i) + " - " + inj + "<br/>"
i += 1
message += "Total: " + str(len(list_injection)) + "</br>"
message += "<br/>PRM Soutirage<br/>"
i = 1
for inj in list_soutirage:
message += str(i) + " - " + inj + "<br/>"
i += 1
message += "Total: " + str(len(list_soutirage)) + "</br>"
message += (
"<h1>Fin appel API Périmètre: " + str(fields.Datetime.now()) + "</h1>"
)
return message
# ------------------------------------------------------
# Business methods
# ------------------------------------------------------
def _check_access_api(self):
if not self.client_id and not self.secret_id:
raise UserError(
_(
"L'identifiant et la clé de l'opération pour l'utilisation de "
"l'API Enedis ne sont pas renseignées. "
"Veuillez les renseigner dans l'onglet API Enedis."
)
)