Sélectionner une révision Git

Julien - Le Filament authored
acc_operation.py 13,62 Kio
# Copyright 2021- Le Filament (https://le-filament.com)
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html)
import base64
import csv
import math
from collections import OrderedDict
from datetime import date, datetime
from io import StringIO
from odoo import _, api, fields, models
from odoo.exceptions import UserError, ValidationError
from odoo.addons.api_connector.tools.date_utils import utc_to_local
def float_to_str_auto(number):
s = format(number, ".15f")
s = s.rstrip("0").rstrip(".") if "." in s else s
return s
def get_algo_description(algo):
desc = {
"prorata": "La clé de répartition est calculée automatiquement chaque mois,"
" au prorata de la consommation de chacun des consommateurs.",
"static": "La clé de répartition est calculée automatiquement chaque mois, "
"en fonction des coefficients de répartition communiqués "
"initialement à Enedis.",
"dyn_perso_send": "",
"dyn_perso_compute": "",
}
return desc.get(algo, f"Pas de description associée a {algo}")
class AccOperation(models.Model):
_inherit = ["acc.operation"]
# ------------------------------------------------------
# Fields declaration
# ------------------------------------------------------
keys_file_repartition_ids = fields.One2many(
comodel_name="acc.repartition.keys.file",
inverse_name="operation_id",
string="keys files repartition",
required=False,
groups="oacc.group_operation_admin",
)
repartition_keys_id = fields.One2many(
comodel_name="acc.repartition.keys",
inverse_name="operation_id",
string="Repartition key",
required=False,
groups="oacc.group_operation_admin",
)
acc_priority_group_ids = fields.One2many(
comodel_name="acc.priority.group",
inverse_name="acc_operation_id",
string="Groupe de priorité",
)
# uniquement pour des question d affichage
type_algo = fields.Selection(
[
("prorata", "Dynamique par défaut"),
("static", "Statique"),
("dyn_perso_send", "Dynamique simple - envoi de fichier"),
("dyn_perso_compute", "Dynamique simple - calcul automatisé"),
],
string="Type de clé de répartition",
default="prorata",
required=True,
)
algo_description = fields.Text(
string="Description de l algorithme",
default="",
)
# ------------------------------------------------------
# SQL Constraints
# ------------------------------------------------------
# ------------------------------------------------------
# Default methods
# ------------------------------------------------------
def compute_repartition(self):
"""
compute task wich compute keys and generate csv file
"""
self.generate()
def delete_keys(self):
repartitions = self.env["acc.repartition.keys"].search(
[("operation_id", "=", self.id)]
)
for repartition in repartitions:
keys = self.env["acc.repartition.counter"].search(
[("acc_repartition_id", "=", repartition.id)]
)
keys.unlink()
for repartition in repartitions:
repartition.unlink()
def get_affect_sum(self, data_slot):
affect_sum = 0.0
for counter in data_slot.get("affect"):
affect_sum += data_slot.get("affect").get(counter, 0)
return affect_sum
def generate(self):
"""
generate repartition keys
"""
if not self.acc_priority_group_ids:
raise ValidationError(
_("Aucune priorité n'est définie pour cette opération.")
)
self.delete_keys()
repartition = self.env["acc.repartition.keys"].create({"operation_id": self.id})
data = self.env["acc.enedis.raw.cdc"].get_repartition_data(operation_id=self)
# on sauvegarde prod_totale pour permettre en fin de traitement
# le calcul du pourcentage attribué à chaque compteur
for slot in data:
data[slot]["prod_initiale"] = data.get(slot).get("prod_totale")
for priority in self.acc_priority_group_ids:
data = priority.compute(data)
if not data:
raise ValidationError(
_("Pas de données brute pour le mois précedent présente")
)
# contrôle, l'ensemble des affectations ne
# doit pas dépasser la production à affecter
for slot in data:
item = data.get(slot)
affect = item.get("affect")
if affect:
total_prod = item.get("prod_initiale")
remaining_prod = item.get("prod_totale")
affect_sum = self.get_affect_sum(item)
if abs(affect_sum + remaining_prod - total_prod) > 1e-3:
raise ValidationError(
_(
"Une erreur s'est produite lors de "
"l'affectation de l'auto-consommation"
)
)
# enregistrement en base
for slot in data:
item = data.get(slot)
affect = item.get("affect")
if affect:
total_prod = item.get("prod_initiale")
# calcul du pourcentage attribué à chaque
# compteur par rapport à la production totale
weights = {}
for counter_id in affect:
affecte_counter = affect.get(counter_id)
if total_prod == 0:
weight = 0.0
else:
weight = (
math.floor((affecte_counter * 100 / total_prod) * 1e6) / 1e6
)
weights[counter_id] = weight
slot_line = []
for counter in weights:
slot_line.append(
{
"acc_repartition_id": repartition.id,
"time_slot": slot,
"weight": weights[counter],
"acc_counter_id": counter.id,
}
)
self.env["acc.repartition.counter"].create(slot_line)
# ------------------------------------------------------
# Computed fields / Search Fields
# ------------------------------------------------------
# ------------------------------------------------------
# Onchange / Constraints
# ------------------------------------------------------
@api.onchange("type_algo")
def on_change_algo(self):
self.algo_description = get_algo_description(self.type_algo)
# ------------------------------------------------------
# CRUD methods (ORM overrides)
# ------------------------------------------------------
def write(self, vals):
if vals.get("type_algo"):
vals.update(
{"algo_description": get_algo_description(vals.get("type_algo"))}
)
res = super().write(vals)
return res
# ------------------------------------------------------
# Actions
# ------------------------------------------------------
def action_view_repartition_algo_priority_group(self):
"""
Action pour accéder aux algo de calcul defini
"""
action = self.env["ir.actions.actions"]._for_xml_id(
"oacc_repartition_keys.acc_priority_group_counter_act_window"
)
action["context"] = {
"default_acc_operation_id": self.id,
}
action["domain"] = [("acc_operation_id", "=", self.id)]
action["res_id"] = self.id
return action
def action_send_repartition_keys(self):
"""
Action d envoi des clés calculées a Enedis
"""
repartition = self.env["acc.repartition.keys"].search(
[("operation_id", "=", self.id)]
)
if not repartition:
raise UserError(_("Clés de répartition non générées"))
keys = (
self.env["acc.repartition.counter"]
.search([("acc_repartition_id", "=", repartition.id)])
.mapped("time_slot")
)
horodatages = list(OrderedDict.fromkeys(keys))
counter_period = self.get_counter_by_period_day(date_list=horodatages)
for time_slot in horodatages:
keys = self.env["acc.repartition.counter"].search(
[
("acc_repartition_id", "=", repartition.id),
("time_slot", "=", time_slot),
]
)
body = []
for key in keys:
if key.acc_counter_id.name in counter_period.get(
utc_to_local(time_slot, "Europe/Paris").date()
):
body.append(
{
"id": key.acc_counter_id.name,
"key": float_to_str_auto(key.weight),
}
)
if body:
data = {"timestamp": time_slot.strftime("%Y%m%dT%H%M%SZ"), "body": body}
job_description = (
f"{self.name} - Send repartition key at {data.get('timestamp')}"
)
try:
self.with_delay(description=job_description).send_repartition_key(
key=data
)
except ValidationError as exc:
raise UserError(_(str(exc))) from exc
def get_repartition_data_for_csv(self):
"""
generate data for csv
"""
repartition = self.env["acc.repartition.keys"].search(
[("operation_id", "=", self.id)]
)
keys = self.env["acc.repartition.counter"].search(
[("acc_repartition_id", "=", repartition.id)]
)
if not keys:
raise ValidationError(_("Clés de répartition non générées"))
counters = keys.mapped("acc_counter_id")
keys = keys.mapped("time_slot")
horodatages = list(OrderedDict.fromkeys(keys))
hearders = ["Horodate"] + [c.name for c in counters]
data = [hearders]
for time_slot in horodatages:
line = [utc_to_local(time_slot, "Europe/Paris")]
for counter in counters:
w = (
self.env["acc.repartition.counter"]
.search(
[
("acc_counter_id", "=", counter.id),
("time_slot", "=", time_slot),
]
)
.weight
)
line.append(str(w).replace(".", ","))
data.append(line)
return data
def export_repartition(self):
"""
Genere un fichier csv des repartitions
:param operation_id: operation
:return: file
"""
csv_data = self.get_repartition_data_for_csv()
try:
start_date = csv_data[1][0]
end_date = csv_data[-1][0]
d = csv_data[2][0] - start_date
except IndexError as e:
raise ValidationError(_("Pas de données")) from e
ts = int(d.total_seconds() / 60)
if isinstance(start_date, datetime) and isinstance(end_date, datetime):
filename = (
f"{self.name}_{ts}_"
f"{start_date.strftime('%d%m%Y')}_{end_date.strftime('%d%m%Y')}"
)
else:
filename = f"cle_de_repartition_{self.name}"
return self.create_csv(filename, csv_data)
# ------------------------------------------------------
# Common function
# ------------------------------------------------------
def create_csv(self, filename, lines_to_export):
fp = StringIO()
export_file = csv.writer(fp, delimiter=";", quoting=csv.QUOTE_NONE)
# Add header line
for line in lines_to_export:
# Format date value
line_values = [
value
if not isinstance(value, date)
else value.strftime("%d-%m-%Y %H:%M")
for value in line
]
export_file.writerow(line_values)
fp.seek(0)
data = fp.read()
fp.close()
filename = filename + ".csv"
# sauvegarde du fichier de cles
self.env["acc.repartition.keys.file"].create(
{
"csv_file": base64.b64encode(str.encode(data)),
"filename": filename,
"operation_id": self.id,
"date_send": datetime.now(),
}
)
def get_counter_by_period_day(self, date_list):
"""
return counter in period for simple day
{
day1 : [counter...],
day2 : [counter...],
}
"""
counter_period = {}
res = [
d.date()
for d in list(
OrderedDict.fromkeys(
[utc_to_local(h, "Europe/Paris") for h in date_list]
)
)
]
for date_item in res:
counter_period[date_item] = (
self.env["acc.counter.period"]
._get_periods_from_date(
[
("acc_operation_id", "=", self.id),
("prm_type", "=", "delivery"),
],
date_item,
)
.mapped("acc_counter_id.name")
)
return counter_period