Skip to content
Extraits de code Groupes Projets

Cgscop Endpoint for carto subject

Tous les fils de conversation ont été résolus !
Fusionnées
Thibaud - Le Filamentrequested to merge
carto_fastapi into 14.0
Tous les fils de conversation ont été résolus !
5 files
+ 176
108
Comparer les modifications
  • Côte à côte
  • En ligne

Fichiers

+ 34
107
import hashlib
import json
from typing import Annotated
from datetime import datetime
import requests
from fastapi import APIRouter, Depends, HTTPException
from odoo import api, fields, models
from odoo import fields, models
from odoo.api import Environment
from odoo.addons.fastapi.dependencies import odoo_env
from ..schemas import PartnerInfo
from ..routers.cgscop_incub_api import cg_router
class FastapiEndpoint(models.Model):
_inherit = "fastapi.endpoint"
alter_event_route = fields.Char()
api_secret_key = fields.Char(readonly=True)
@api.model
def create(self, vals):
"""
Store the generated api key in the database for more confidentiality
"""
record = super().create(vals)
record.api_secret_key = self.generate_key()
return record
def generate_key(self) -> str:
"""
Generate an API key based on
The user generating the key
The date and time when the key is generated
All information are hashed, this hash will then be stored in the database and used as API key
"""
# TODO: Use something else than the user because it comes from data files the first time
datas = {"usr": "ozifzjhefuiozejhiozefg", "date": datetime.now()}
encoded_data = json.dumps(datas, sort_keys=True, default=str).encode()
h = hashlib.sha256()
h.update(encoded_data)
hashed = h.hexdigest()
return hashed
def refresh_api_key(self):
self.api_secret_key = self.generate_key()
app: str = fields.Selection(
selection_add=[("cgincub", "cgscop_alter_endpoint")],
@@ -25,102 +49,5 @@ class FastapiEndpoint(models.Model):
def _get_fastapi_routers(self):
if self.app == "cgincub":
return [cg_incub_router]
return [cg_router]
return super()._get_fastapi_routers()
# create a router
cg_incub_router = APIRouter()
@cg_incub_router.get("/partners", response_model=list[PartnerInfo])
def get_partners(
env: Annotated[Environment, Depends(odoo_env)], aggreg_aura: bool = True
) -> list[PartnerInfo]:
domain = [
("incub_status", "in", ["2_pre-incubation", "3_incubation", "4_sortie_incub"]),
("dissolution_date", "=", False),
]
all_partners = []
for partner in env["res.partner"].sudo().search(domain):
all_partners.append(
PartnerInfo(
email=partner.email,
title=partner.name,
description=partner.social_object,
theme=partner.incub_thematique_ids.mapped(lambda t: t.name),
start_date=fields.Datetime.to_string(partner.incub_incubation_deb),
department=partner.state_id.name,
region=partner.region.name,
city=partner.city,
website=partner.website,
logo=partner.image_1920,
tel=partner.phone,
latitude=partner.partner_latitude,
longitude=partner.partner_longitude,
)
)
if aggreg_aura:
try:
# TODO: Import URL from DB
fa = env["fastapi.endpoint"].sudo().search([("app", "=", "cgincub")])
alter_event_route = fa.alter_event_route
if not alter_event_route:
raise ConnectionError("Alter_event_route not found")
req = requests.get(alter_event_route)
if req.status_code != 200:
raise HTTPException(
status_code=req.status_code,
detail=f"Error from alter event route: {req.text}",
)
r_json = req.json()
count_before_merge = len(all_partners)
print(f"{count_before_merge} elements from CGscops")
for el in r_json:
all_partners.append(
PartnerInfo(
email=el["email"],
title=el["title"],
description=el["description"],
theme=[el["theme"]],
start_date=el["start_date"],
department=el["department"],
region=el["region"],
city=el["city"],
website=el["website"],
logo=el["logo"],
tel=el["tel"],
latitude=el["latitude"],
longitude=el["longitude"],
)
)
print(f"{len(all_partners)-count_before_merge} elements from Alterincub")
except ConnectionError as e:
print(f"Connexion issue: {e}")
return all_partners
@cg_incub_router.get("/partners/checksum", response_model=dict)
def get_partners_checksum(
env: Annotated[Environment, Depends(odoo_env)], aggreg_aura: bool = True
) -> dict:
data = get_partners(env, aggreg_aura)
encoded_data = json.dumps(data, sort_keys=True, default=str).encode()
h = hashlib.sha256()
h.update(encoded_data)
checksum_dict = {"checksum": h.hexdigest()}
return checksum_dict
# @cg_incub_router.get("/partners")
# def get_partners(env: Annotated[Environment, Depends(odoo_env)]):
# search_ = [p for p in env["res.partner"].sudo().search([])]
# return search_
Chargement en cours