List and extract files from Odoo filestore
The snippet can be accessed without any authentication.
Authored by
Théo - Le Filament
Usage
usage: odoo-at-extract.py [OPTIONS] <MODEL|model:table:column>
predifined models:
MODEL: <account.invoice|hr.expense|sale.order>
for other models:
<model_name>:<database_table>:<column_that_contain_object_name>
OPTIONS:
--db-dbname <dbname> Odoo database name (default: 'odoo')
--db-port <port> Odoo database port (default: '5432')
--db-host <host> Odoo database port (default: '127.0.0.1')
--db-password <password> Odoo database user password (default: 'odoo')
--db-user <user> Odoo database user (default: 'odoo')
-d, --dry-run run without actually doing action
-n, --no-extract no extract file from filestore
-f, --filter <SQL tests> add extra conditions to SQL command (e.g.: "rm.type='in_invoice'")
-h, --help print this help
-i, --filestore <path> path to filestore directory
-o, --output-dir <path> path to output directory
-q, --quiet less verbose
-r, --replace-slash replace '/' with '_' in filename
--rm-db-entry delete found database entries
--rm-file delete found file from filestore
-v, --verbose more verbose
Examples
Get number of attachments linked to a specific model (here account.invoice
):
./odoo-at-extract.py 'account.invoice' --dry-run --quiet
Get number of attachments linked to a specific model (here account.invoice
) and present in filestore:
./odoo-at-extract.py --filestore <path_to_filestore> 'account.invoice' --dry-run
Print information and output file path for each attachment linked to a specific model (here account.invoice
):
./odoo-at-extract.py --filestore <path_to_filestore> --output-dir <path_to_output_dir> 'account.invoice' --dry-run --verbose
Extract attachment linked to a specific model (here account.invoice
) from filestore to output directory:
./odoo-at-extract.py --filestore <path_to_filestore> --output-dir <path_to_output_dir> 'account.invoice'
Same as above, but with the deletion of attachments in filestore and in database:
./odoo-at-extract.py --filestore <path_to_filestore> --output-dir <path_to_output_dir> 'account.invoice' --rm-db-entry --rm-file
Extract suppliers invoices (thanks to the filter option) from filestore to output directory:
./odoo-at-extract.py --filestore <path_to_filestore> --output-dir <path_to_output_dir> --filter "rm.type='in_invoice'" 'account.invoice'
odoo-at-extract.py 14.04 KiB
#!/bin/python3
# © 2024 Le Filament (<https://le-filament.com>)# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import os
import pathlib
import psycopg
import shutil
import sys
from psycopg import sql
DEFAULT_DB_HOST = '127.0.0.1'
DEFAULT_DB_PORT = '5432'
DEFAULT_DB_DBNAME = 'odoo'
DEFAULT_DB_USER = 'odoo'
DEFAULT_DB_PASSWORD = 'odoo'
MODELS = {
'account.invoice': {
'model': 'account.invoice',
'table': 'account_invoice',
'column': 'number'
},
'hr.expense': {
'model': 'hr.expense',
'table': 'hr_expense',
'column': 'date'
},
'sale.order': {
'model': 'sale.order',
'table': 'sale_order',
'column': 'name'
},
}
def init_store(fn):
def wrapper(*args, **kwargs):
self = args[0]
args_names = fn.__code__.co_varnames[1:fn.__code__.co_argcount + 1]
args_dict = dict(zip(args_names, args[1:]))
args_dict |= {k: kwargs[k] for k in args_names if k in kwargs}
for arg, value in args_dict.items():
self.__dict__[arg] = value
fn(*args, **kwargs)
return wrapper
class Config:
def __init__(self, argv, prog_name):
self.dry_run = False
self.extract_file = True
self.filestore = ''
self.filter = None
self.output_dir = ''
self.quiet = False
self.remove_db_entry = False
self.remove_file = False
self.replace_slash = False
self.verbose = False
self.db_config = DatabaseConfig(
host=DEFAULT_DB_HOST,
port=DEFAULT_DB_PORT,
dbname=DEFAULT_DB_DBNAME,
user=DEFAULT_DB_USER,
password=DEFAULT_DB_PASSWORD
)
self.models = []
match len(argv):
case x if x < 1:
raise Exception("bad arguments number")
argv = iter(argv)
while (arg := next(argv, None)) is not None:
match arg:
case '--db-dbname':
self.db_config.dbname = next(argv, None)
case '--db-host':
self.db_config.host = next(argv, None)
case '--db-port':
self.db_config.port = next(argv, None)
case '--db-password':
self.db_config.password = next(argv, None)
case '--db-user':
self.db_config.user = next(argv, None)
case '-d' | '--dry-run':
self.dry_run = True
case '-n' | '--no-extract':
self.extract_file = False
case '-f' | '--filter':
self.filter = next(argv, None)
case '-h' | '--help':
Config.print_help(prog_name)
case '-i' | '--filestore':
self.filestore = next(argv, None)
case '-o' | '--output-dir':
self.output_dir = next(argv, None)
case '-q' | '--quiet':
self.quiet = True
case '-r' | '--replace-slash':
self.replace_slash = True
case '--rm-db-entry':
self.remove_db_entry = True
case '--rm-file':
self.remove_file = True
case '-v' | '--verbose':
self.verbose = True
case model_str:
model = model_str.split(':')
match len(model):
case 1:
if model[0] in MODELS:
self.models.append(Model(**MODELS[model[0]]))
else:
raise Exception(f"model '{model_str}' not in default models. Try with '<model>:<table>:<column>' syntax.")
case 3:
self.models.append(Model(
model=model[0],
table=model[1],
column=model[2]
))
case _:
raise Exception(f"bad syntax for model '{model_str}'. Syntax accepted: '<model>' or '<model>:<table>:<column>'.")
if not self.models:
raise Exception("model is needed.")
if not self.dry_run:
if not self.filestore:
raise Exception("filestore path ('-f' or '--filestore') is needed.")
if not self.output_dir:
raise Exception("output directory ('-o' or '--output-dir') is needed.")
self.filestore = pathlib.Path(self.filestore)
self.output_dir = pathlib.Path(self.output_dir)
def check(self):
if self.extract_file or self.remove_file:
if not self.filestore.is_dir():
raise Exception(f"filestore '{self.filestore}' is not a directory.")
if not self.dry_run and self.extract_file:
if not self.output_dir.is_dir():
raise Exception(f"output '{self.output_dir}' is not a directory.")
def print_help(prog_name):
print(f"usage: {prog_name} [OPTIONS] <MODEL|model:table:column>\n\n"
"predifined models:\n"
"MODEL: <account.invoice|hr.expense|sale.order>\n\n"
"for other models:\n"
"<model_name>:<database_table>:<column_that_contain_object_name>\n\n"
"OPTIONS:\n"
"--db-dbname <dbname> Odoo database name (default: 'odoo')\n"
"--db-port <port> Odoo database port (default: '5432')\n"
"--db-host <host> Odoo database port (default: '127.0.0.1')\n"
"--db-password <password> Odoo database user password (default: 'odoo')\n"
"--db-user <user> Odoo database user (default: 'odoo')\n"
"-d, --dry-run run without actually doing action\n"
"-n, --no-extract no extract file from filestore\n"
"-f, --filter <SQL tests> add extra conditions to SQL command (e.g.: \"rm.type='in_invoice'\")\n"
"-h, --help print this help\n"
"-i, --filestore <path> path to filestore directory\n"
"-o, --output-dir <path> path to output directory\n"
"-q, --quiet less verbose\n"
"-r, --replace-slash replace '/' with '_' in filename\n"
"--rm-db-entry delete found database entries\n"
"--rm-file delete found file from filestore\n"
"-v, --verbose more verbose\n")
sys.exit(0)
class Model:
@init_store
def __init__(self, model, table, column):
pass
class Attachment:
@init_store
def __init__(self, id, model, name, filename, mime_type,
store_fname, export_path=None):
pass
def export_path(self, replace_slash=False):
if replace_slash:
name = self.name.replace('/', '_')
filename = self.filename.replace('/', '_')
else:
name = self.name
filename = self.filename
return pathlib.Path(self.model.table, name, filename)
class DatabaseConfig:
@init_store
def __init__(self, host, port, dbname, user, password):
pass
def export(self):
return {
'host': self.host,
'port': self.port,
'dbname': self.dbname,
'user': self.user,
'password': self.password
}
class Database:
def __init__(self, config):
self.conn = psycopg.connect(**config.export())
self.cur = self.conn.cursor()
def validate(self):
self.cur.close()
self.conn.commit()
self.conn.close()
class Status:
DeletedFromDB = 1 << 0
Extracted = 1 << 1
Found = 1 << 2
Removed = 1 << 3
class Style:
"""Set of console display styles.
"""
fgblack = '\033[30m'
fgred = '\033[31m'
fggreen = '\033[32m'
fgyellow = '\033[33m'
fgblue = '\033[34m'
fgmagenta = '\033[35m'
fgcyan = '\033[36m'
fglightgray = '\033[37m'
bgblack = '\033[40m'
bgred = '\033[41m'
bggreen = '\033[42m'
bgyellow = '\033[43m'
bgblue = '\033[44m'
bgmagenta = '\033[45m'
bgcyan = '\033[46m'
bglightgray = '\033[47m'
default = '\033[39m'
bold = '\033[1m'
normal = '\033[0m'
def get_attachments(db, model, filter=None):
return [Attachment(model=model, **attachment_values)
for attachment_values in sql_select(db, model, filter)]
def sql_select(db, model, filter=None):
columns = ['id', 'filename', 'mime_type', 'store_fname', 'name']
query = sql.SQL(
"""
SELECT at.id, at.name, at.mimetype, at.store_fname,
{column}
FROM ir_attachment AS at
INNER JOIN {table} AS rm
ON at.res_id = rm.id
WHERE res_model = (%(model)s)
AND at.index_content != 'image'
""" + (f" AND ({filter});" if filter else ';')).format(
column=sql.Identifier('rm', model.column),
table=sql.Identifier(model.table))
db.cur.execute(query, {'model': model.model})
return [dict(zip(columns, map(lambda e: str(e), record))) for record in db.cur]
def sql_delete(db, id):
query = sql.SQL('DELETE FROM ir_attachment WHERE id=(%(id)s)')
db.cur.execute(query, {'id': id})
def handle_attachment(config, db, attachment, prog_name=None):
src = config.filestore / attachment.store_fname
dst = config.output_dir / attachment.export_path(replace_slash=True)
result = []
prefix = f"[id: {attachment.id}] "
if config.verbose:
if config.remove_file:
print_info(f"{prefix}mv {src} {dst}", prog_name=prog_name)
else:
print_info(f"{prefix}cp {src} {dst}", prog_name=prog_name)
if config.extract_file:
if not src.is_file():
if not config.quiet:
print_err(f"{prefix}file '{src}' do not exist in filestore.")
return result
result.append(Status.Found)
if not config.dry_run:
if config.extract_file:
if dst.exists():
if not config.quiet:
print_err(f"{prefix}output file '{dst}' already exist.",
prog_name=prog_name)
else:
if not dst.parent.exists():
try:
os.makedirs(dst.parent)
except Exception:
return result
if config.remove_file:
try:
shutil.move(src, dst)
except Exception:
return result
result.append(Status.Extracted)
result.append(Status.Removed)
else:
try:
shutil.copy2(src, dst)
except Exception:
return result
result.append(Status.Extracted)
elif config.remove_file:
try:
os.remove(src)
except Exception:
return result
result.append(Status.Removed)
if config.remove_db_entry:
id = attachment.id
if config.verbose:
print_info(f"{prefix}DELETE FROM ir_attachment WHERE id={id};",
prog_name=prog_name)
if not config.dry_run:
try:
sql_delete(db, id)
except Exception:
return result
result.append(Status.DeletedFromDB)
return result
def print_err(message, prog_name=None, exit=None):
print(f"{Style.bold}{Style.fgred}"
f"{prog_name + ': ' if prog_name else ''}err: {message}"
f"{Style.default}{Style.normal}",
file=sys.stderr, flush=True)
if exit:
sys.exit(exit)
def print_info(message, prog_name=None):
print(f"{Style.fgblue}"
f"{prog_name + ': ' if prog_name else ''}info: {message}"
f"{Style.default}",
file=sys.stdout, flush=True)
def main(argv):
prog_name = argv[0]
# Read argument and build config
try:
config = Config(argv[1:], prog_name)
config.check()
except Exception as e:
print_err(e, prog_name=prog_name, exit=1)
try:
db = Database(config.db_config)
except Exception:
print_err(f"failed to connect to database "
f"'{config.db_config.user}@{config.db_config.host}"
f":{config.db_config.port}/{config.db_config.dbname}'.",
prog_name=prog_name, exit=1)
# Get all selected attachments from database.
attachments = []
for model in config.models:
try:
attachments += get_attachments(db, model, config.filter)
except Exception as e:
print_err(f"failed to get attachments from model '{model.model}'"
f"database: {e}",
prog_name=prog_name, exit=1)
# Statistics.
count_found = 0
count_extracted = 0
count_removed = 0
count_delete_from_db = 0
count_attachments = len(attachments)
# Act on selected attachments.
for attachment in attachments:
result = handle_attachment(config, db, attachment)
if Status.Found in result:
count_found += 1
if Status.Extracted in result:
count_extracted += 1
if Status.Removed in result:
count_removed += 1
if Status.DeletedFromDB in result:
count_delete_from_db += 1
db.validate()
print(f"\n{count_attachments} attachments:\n"
f" {count_found} files found in filestore\n"
f" {count_extracted} files extracted\n"
f" {count_removed} files removed\n"
f" {count_delete_from_db} entries deleted from database")
if __name__ == '__main__':
main(sys.argv)
Please register or sign in to comment