#!/bin/python3 # © 2024 Le Filament (<https://le-filament.com>)# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). import os import pathlib import psycopg import shutil import sys from psycopg import sql DEFAULT_DB_HOST = '127.0.0.1' DEFAULT_DB_PORT = '5432' DEFAULT_DB_DBNAME = 'odoo' DEFAULT_DB_USER = 'odoo' DEFAULT_DB_PASSWORD = 'odoo' MODELS = { 'account.invoice': { 'model': 'account.invoice', 'table': 'account_invoice', 'column': 'number' }, 'hr.expense': { 'model': 'hr.expense', 'table': 'hr_expense', 'column': 'date' }, 'sale.order': { 'model': 'sale.order', 'table': 'sale_order', 'column': 'name' }, } def init_store(fn): def wrapper(*args, **kwargs): self = args[0] args_names = fn.__code__.co_varnames[1:fn.__code__.co_argcount + 1] args_dict = dict(zip(args_names, args[1:])) args_dict |= {k: kwargs[k] for k in args_names if k in kwargs} for arg, value in args_dict.items(): self.__dict__[arg] = value fn(*args, **kwargs) return wrapper class Config: def __init__(self, argv, prog_name): self.dry_run = False self.extract_file = True self.filestore = '' self.filter = None self.output_dir = '' self.quiet = False self.remove_db_entry = False self.remove_file = False self.replace_slash = False self.verbose = False self.db_config = DatabaseConfig( host=DEFAULT_DB_HOST, port=DEFAULT_DB_PORT, dbname=DEFAULT_DB_DBNAME, user=DEFAULT_DB_USER, password=DEFAULT_DB_PASSWORD ) self.models = [] match len(argv): case x if x < 1: raise Exception("bad arguments number") argv = iter(argv) while (arg := next(argv, None)) is not None: match arg: case '--db-dbname': self.db_config.dbname = next(argv, None) case '--db-host': self.db_config.host = next(argv, None) case '--db-port': self.db_config.port = next(argv, None) case '--db-password': self.db_config.password = next(argv, None) case '--db-user': self.db_config.user = next(argv, None) case '-d' | '--dry-run': self.dry_run = True case '-n' | '--no-extract': self.extract_file = False case '-f' | '--filter': self.filter = next(argv, None) case '-h' | '--help': Config.print_help(prog_name) case '-i' | '--filestore': self.filestore = next(argv, None) case '-o' | '--output-dir': self.output_dir = next(argv, None) case '-q' | '--quiet': self.quiet = True case '-r' | '--replace-slash': self.replace_slash = True case '--rm-db-entry': self.remove_db_entry = True case '--rm-file': self.remove_file = True case '-v' | '--verbose': self.verbose = True case model_str: model = model_str.split(':') match len(model): case 1: if model[0] in MODELS: self.models.append(Model(**MODELS[model[0]])) else: raise Exception(f"model '{model_str}' not in default models. Try with '<model>:<table>:<column>' syntax.") case 3: self.models.append(Model( model=model[0], table=model[1], column=model[2] )) case _: raise Exception(f"bad syntax for model '{model_str}'. Syntax accepted: '<model>' or '<model>:<table>:<column>'.") if not self.models: raise Exception("model is needed.") if not self.dry_run: if not self.filestore: raise Exception("filestore path ('-f' or '--filestore') is needed.") if not self.output_dir: raise Exception("output directory ('-o' or '--output-dir') is needed.") self.filestore = pathlib.Path(self.filestore) self.output_dir = pathlib.Path(self.output_dir) def check(self): if self.extract_file or self.remove_file: if not self.filestore.is_dir(): raise Exception(f"filestore '{self.filestore}' is not a directory.") if not self.dry_run and self.extract_file: if not self.output_dir.is_dir(): raise Exception(f"output '{self.output_dir}' is not a directory.") def print_help(prog_name): print(f"usage: {prog_name} [OPTIONS] <MODEL|model:table:column>\n\n" "predifined models:\n" "MODEL: <account.invoice|hr.expense|sale.order>\n\n" "for other models:\n" "<model_name>:<database_table>:<column_that_contain_object_name>\n\n" "OPTIONS:\n" "--db-dbname <dbname> Odoo database name (default: 'odoo')\n" "--db-port <port> Odoo database port (default: '5432')\n" "--db-host <host> Odoo database port (default: '127.0.0.1')\n" "--db-password <password> Odoo database user password (default: 'odoo')\n" "--db-user <user> Odoo database user (default: 'odoo')\n" "-d, --dry-run run without actually doing action\n" "-n, --no-extract no extract file from filestore\n" "-f, --filter <SQL tests> add extra conditions to SQL command (e.g.: \"rm.type='in_invoice'\")\n" "-h, --help print this help\n" "-i, --filestore <path> path to filestore directory\n" "-o, --output-dir <path> path to output directory\n" "-q, --quiet less verbose\n" "-r, --replace-slash replace '/' with '_' in filename\n" "--rm-db-entry delete found database entries\n" "--rm-file delete found file from filestore\n" "-v, --verbose more verbose\n") sys.exit(0) class Model: @init_store def __init__(self, model, table, column): pass class Attachment: @init_store def __init__(self, id, model, name, filename, mime_type, store_fname, export_path=None): pass def export_path(self, replace_slash=False): if replace_slash: name = self.name.replace('/', '_') filename = self.filename.replace('/', '_') else: name = self.name filename = self.filename return pathlib.Path(self.model.table, name, filename) class DatabaseConfig: @init_store def __init__(self, host, port, dbname, user, password): pass def export(self): return { 'host': self.host, 'port': self.port, 'dbname': self.dbname, 'user': self.user, 'password': self.password } class Database: def __init__(self, config): self.conn = psycopg.connect(**config.export()) self.cur = self.conn.cursor() def validate(self): self.cur.close() self.conn.commit() self.conn.close() class Status: DeletedFromDB = 1 << 0 Extracted = 1 << 1 Found = 1 << 2 Removed = 1 << 3 class Style: """Set of console display styles. """ fgblack = '\033[30m' fgred = '\033[31m' fggreen = '\033[32m' fgyellow = '\033[33m' fgblue = '\033[34m' fgmagenta = '\033[35m' fgcyan = '\033[36m' fglightgray = '\033[37m' bgblack = '\033[40m' bgred = '\033[41m' bggreen = '\033[42m' bgyellow = '\033[43m' bgblue = '\033[44m' bgmagenta = '\033[45m' bgcyan = '\033[46m' bglightgray = '\033[47m' default = '\033[39m' bold = '\033[1m' normal = '\033[0m' def get_attachments(db, model, filter=None): return [Attachment(model=model, **attachment_values) for attachment_values in sql_select(db, model, filter)] def sql_select(db, model, filter=None): columns = ['id', 'filename', 'mime_type', 'store_fname', 'name'] query = sql.SQL( """ SELECT at.id, at.name, at.mimetype, at.store_fname, {column} FROM ir_attachment AS at INNER JOIN {table} AS rm ON at.res_id = rm.id WHERE res_model = (%(model)s) AND at.index_content != 'image' """ + (f" AND ({filter});" if filter else ';')).format( column=sql.Identifier('rm', model.column), table=sql.Identifier(model.table)) db.cur.execute(query, {'model': model.model}) return [dict(zip(columns, map(lambda e: str(e), record))) for record in db.cur] def sql_delete(db, id): query = sql.SQL('DELETE FROM ir_attachment WHERE id=(%(id)s)') db.cur.execute(query, {'id': id}) def handle_attachment(config, db, attachment, prog_name=None): src = config.filestore / attachment.store_fname dst = config.output_dir / attachment.export_path(replace_slash=True) result = [] prefix = f"[id: {attachment.id}] " if config.verbose: if config.remove_file: print_info(f"{prefix}mv {src} {dst}", prog_name=prog_name) else: print_info(f"{prefix}cp {src} {dst}", prog_name=prog_name) if config.extract_file: if not src.is_file(): if not config.quiet: print_err(f"{prefix}file '{src}' do not exist in filestore.") return result result.append(Status.Found) if not config.dry_run: if config.extract_file: if dst.exists(): if not config.quiet: print_err(f"{prefix}output file '{dst}' already exist.", prog_name=prog_name) else: if not dst.parent.exists(): try: os.makedirs(dst.parent) except Exception: return result if config.remove_file: try: shutil.move(src, dst) except Exception: return result result.append(Status.Extracted) result.append(Status.Removed) else: try: shutil.copy2(src, dst) except Exception: return result result.append(Status.Extracted) elif config.remove_file: try: os.remove(src) except Exception: return result result.append(Status.Removed) if config.remove_db_entry: id = attachment.id if config.verbose: print_info(f"{prefix}DELETE FROM ir_attachment WHERE id={id};", prog_name=prog_name) if not config.dry_run: try: sql_delete(db, id) except Exception: return result result.append(Status.DeletedFromDB) return result def print_err(message, prog_name=None, exit=None): print(f"{Style.bold}{Style.fgred}" f"{prog_name + ': ' if prog_name else ''}err: {message}" f"{Style.default}{Style.normal}", file=sys.stderr, flush=True) if exit: sys.exit(exit) def print_info(message, prog_name=None): print(f"{Style.fgblue}" f"{prog_name + ': ' if prog_name else ''}info: {message}" f"{Style.default}", file=sys.stdout, flush=True) def main(argv): prog_name = argv[0] # Read argument and build config try: config = Config(argv[1:], prog_name) config.check() except Exception as e: print_err(e, prog_name=prog_name, exit=1) try: db = Database(config.db_config) except Exception: print_err(f"failed to connect to database " f"'{config.db_config.user}@{config.db_config.host}" f":{config.db_config.port}/{config.db_config.dbname}'.", prog_name=prog_name, exit=1) # Get all selected attachments from database. attachments = [] for model in config.models: try: attachments += get_attachments(db, model, config.filter) except Exception as e: print_err(f"failed to get attachments from model '{model.model}'" f"database: {e}", prog_name=prog_name, exit=1) # Statistics. count_found = 0 count_extracted = 0 count_removed = 0 count_delete_from_db = 0 count_attachments = len(attachments) # Act on selected attachments. for attachment in attachments: result = handle_attachment(config, db, attachment) if Status.Found in result: count_found += 1 if Status.Extracted in result: count_extracted += 1 if Status.Removed in result: count_removed += 1 if Status.DeletedFromDB in result: count_delete_from_db += 1 db.validate() print(f"\n{count_attachments} attachments:\n" f" {count_found} files found in filestore\n" f" {count_extracted} files extracted\n" f" {count_removed} files removed\n" f" {count_delete_from_db} entries deleted from database") if __name__ == '__main__': main(sys.argv)