#!/usr/bin/env python
""" odooly.py -- Odoo client library and command line tool
Author: Florent Xicluna
"""
import _ast
import argparse
import atexit
import datetime
import functools
import json
import os
import re
import shlex
import sys
import time
import traceback
from configparser import ConfigParser
from getpass import getpass
from pathlib import Path
from string import Formatter
from threading import current_thread
from urllib.parse import urlencode, urljoin, urlsplit
try:
import requests
except ImportError:
requests = None
__version__ = '2.6.5'
__all__ = ['Client', 'Env', 'HTTPSession', 'WebAPI', 'Service', 'Json2',
'Printer', 'Error', 'ServerError',
'BaseModel', 'Model', 'BaseRecord', 'Record', 'RecordList',
'format_exception', 'read_config', 'start_odoo_services']
CONF_FILE = Path('odooly.ini')
HIST_FILE = Path('~/.odooly_history').expanduser()
DEFAULT_URL = 'http://localhost:8069/'
ADMIN_USER = 'admin'
SYSTEM_USER = '__system__'
MAXCOL = [79, 179, 9999] # Line length in verbose mode
PP_FORMAT = {'sort_dicts': False, 'width': 120}
USER_AGENT = f'Mozilla/5.0 (X11) odooly.py/{__version__}'
USAGE = """\
Usage (some commands):
env[name] # Return a Model instance
env[name].keys() # List field names of the model
env[name].fields(names=None) # Return details for the fields
env[name].field(name) # Return details for the field
env[name].browse(ids=())
env[name].search(domain)
env[name].search(domain, offset=0, limit=None, order=None)
# Return a RecordList
rec = env[name].get(domain) # Get the Record matching domain
rec.some_field # Return the value of this field
rec.read(fields=None) # Return values for the fields
client.login(user) # Login with another user
client.connect(env_name) # Connect to another env.
client.connect(server=None) # Connect to another server
env.models(name) # List models matching pattern
env.modules(name) # List modules matching pattern
env.install(module1, module2, ...)
env.upgrade(module1, module2, ...)
# Install or upgrade the modules
env.upgrade_cancel() # Reset failed upgrade/install
"""
DOMAIN_OPERATORS = frozenset('!|&')
# Supported operators are:
# =, !=, >, >=, <, <=, like, ilike, in, not like, not ilike, not in,
# =like, =ilike, =?, child_of, parent_of,
# any, not any, # Odoo 17
# not =like, not =ilike, # Odoo 19
_term_re = re.compile(
r'([\w._]+)\s*' r'(=like\b|=ilike\b|=\?|[<>]=?|!?=|'
r'\b(?:like|ilike|in|any|not (?:=?like|=?ilike|in|any)|child_of|parent_of)\b)'
r'(?![?!=<>])\s*(.+)')
# Web methods (not exhaustive)
_web_methods = {
'database': ['backup', 'change_password', 'create',
'drop', 'duplicate', 'list', 'restore'],
'dataset': ['call_button', 'call_kw'],
'session': ['authenticate', 'check', 'destroy', 'get_lang_list', 'get_session_info'],
'webclient': ['version_info'],
}
# RPC methods
_rpc_methods = {
'common': ['login', 'authenticate', 'version'],
'object': ['execute', 'execute_kw'],
}
_cause_message = ("\nThe above exception was the direct cause "
"of the following exception:\n\n")
_pending_state = ('state', 'not in',
['uninstallable', 'uninstalled', 'installed'])
_base_method_params = [
('action_archive', ['ids']),
('action_unarchive', ['ids']),
('copy', ['ids', 'default']),
('create', ['vals_list']),
('get_external_id', ['ids']),
('get_metadata', ['ids']),
('read', ['ids', 'fields', 'load']),
('search', ['domain', 'offset', 'limit', 'order']),
('search_count', ['domain', 'limit']),
('search_read', ['domain', 'fields', 'offset', 'limit', 'order']),
('unlink', ['ids']),
('write', ['ids', 'vals']),
]
_sql_action_code = """\
sql_queries = env.context.get("__sql") or []
result = env.cr.connection.notices
if not env.is_system():
raise UserError("Not allowed")
for query in sql_queries:
env.cr.execute(query)
if not env.cr.description:
result.append(env.cr.statusmessage)
elif not env.cr.rowcount:
result.append({c.name: () for c in env.cr.description})
else:
columns = [c.name for c in env.cr.description]
result.extend(dict(zip(columns, values)) for values in env.cr.fetchall())
log(str({'queries': sql_queries, 'result': result}))
result[:] = []
"""
color_bold = color_comment = color_py = color_repr = str
http_context = None
if os.getenv('ODOOLY_SSL_UNVERIFIED'):
import ssl
http_context = ssl._create_unverified_context()
requests = None
if not requests:
from urllib.request import HTTPCookieProcessor, HTTPSHandler, Request, build_opener
class HTTPSession:
if requests: # requests.Session
def __init__(self):
self._session = requests.Session()
self._session.headers.update({'User-Agent': USER_AGENT, 'Accept': 'application/json'})
def set_auth(self, uri, username, password):
self._session.auth = (username, password)
def _request(self, url, method, data, json, headers, **kw):
resp = self._session.request(method, url, data=data, json=json, headers=headers, **kw)
return resp.raise_for_status() or resp
def _parse_response(self, resp):
is_json = 'json' in resp.headers.get('content-type', '')
return resp.json() if is_json else resp.text
def _parse_error(self, err):
resp = err.response
return (resp.status_code, self._parse_response(resp)) if resp is not None else (0, 0)
else: # urllib.request
def __init__(self):
self._session = build_opener(HTTPCookieProcessor(), HTTPSHandler(context=http_context))
self._session.addheaders = [('User-Agent', USER_AGENT), ('Accept', 'application/json')]
def set_auth(self, uri, username, password):
from urllib.request import HTTPBasicAuthHandler
auth = HTTPBasicAuthHandler()
auth.add_password(None, uri, username, password)
self._session.add_handler(auth)
def _request(self, url, method, data, json, headers, _json=json, **kw):
headers = dict(headers or ())
if json is not None:
headers.setdefault('Content-Type', 'application/json')
if method == 'POST':
data = (urlencode(data) if json is None else _json.dumps(json)).encode()
elif data is not None:
url, data = f'{url}?{urlencode(data)}', None
return self._session.open(Request(url, data=data, headers=headers, method=method))
def _parse_response(self, resp):
is_json = 'json' in resp.headers.get('content-type', '')
return json.load(resp) if is_json else resp.read().decode()
def _parse_error(self, err):
return (err.code, self._parse_response(err)) if hasattr(err, 'code') else (0, 0)
def request(self, url, *, method='POST', data=None, json=None, headers=None):
try:
with self._request(url, method=method, data=data, json=json, headers=headers) as resp:
return resp if method == 'HEAD' else self._parse_response(resp)
except OSError as exc:
status_code, result = self._parse_error(exc)
if result and status_code in (401, 403, 404, 422, 500):
# Unauthorized, Forbidden, NotFound, UnprocessableContent, InternalServerError
if isinstance(result, str):
lines = re.findall(r'>([^>\n]+)<', result) or (status_code, result)
result = {'name': exc.__class__.__name__, 'debug': None,
'arguments': (f'{lines[0]} - {lines[-1]}',)}
raise ServerError({'code': status_code, 'data': result})
raise
Ids, Id1 = type('ids', (list,), {'__slots__': ()}), type('id1', (int,), {'__slots__': ()})
def _memoize(inst, attr, value, doc_values=None):
if hasattr(value, '__get__') and not hasattr(value, '__self__'):
value.__name__ = attr
if doc_values is not None:
value.__doc__.format(*doc_values)
value = value.__get__(inst, type(inst))
inst.__dict__[attr] = value
return value
# Simplified ast.literal_eval which does not parse operators
def _convert(node):
if isinstance(node, _ast.Constant) and node.value.__class__ is not complex:
return node.value
if isinstance(node, _ast.Tuple):
return tuple(map(_convert, node.elts))
if isinstance(node, _ast.List):
return [*map(_convert, node.elts)]
if isinstance(node, _ast.Dict):
return {_convert(k): _convert(v)
for (k, v) in zip(node.keys, node.values)}
if isinstance(node, _ast.UnaryOp):
if isinstance(node.op, _ast.USub):
return -_convert(node.operand)
if isinstance(node.op, _ast.UAdd):
return +_convert(node.operand)
raise ValueError('malformed or disallowed expression')
def literal_eval(expression):
node = compile(expression, '<unknown>', 'eval', _ast.PyCF_ONLY_AST)
return _convert(node.body)
def format_params(params, hide=('passw', 'pwd')):
secret = {key: ... for key in params if any(sub in key for sub in hide)}
return [f'{key}={v!r}' if v != ... else f'{key}=*'
for (key, v) in {**params, **secret}.items()]
[docs]
def read_config(section=None):
"""Read the environment settings from the configuration file.
Config file ``odooly.ini`` contains a `section` for each environment.
Each section provides parameters for the connection: ``server``,
``username`` and (optional) ``database``, ``password`` and ``api_key``.
As an alternative, server can be declared with 4 parameters:
``scheme / host / port / protocol``.
Default values are read from the ``[DEFAULT]`` section. If the ``password``
is not set or is empty, it is requested on login.
Return tuple ``(server, db or None, user, password or None, api_key or None)``.
Without argument, it returns the list of configured environments.
"""
with Client._config_file.open() as f:
(p := ConfigParser()).read_file(f)
if section is None:
return p.sections()
server = (env := dict(p.items(section))).get('server')
if (scheme := env.get('scheme', 'http')) == 'local':
server = shlex.split(server or env.get('options', ''))
elif not server:
protocol = env.get('protocol', 'web')
server = f"{scheme}://{env['host']}:{env['port']}/{protocol}"
return (server, env.get('database', ''), env['username'], env.get('password'), env.get('api_key'))
[docs]
def start_odoo_services(options=None, appname=None):
"""Initialize the Odoo services.
Import the ``odoo`` Python package and load the Odoo services.
The argument `options` receives the command line arguments
for ``odoo``. Example:
``['-c', '/path/to/odoo-server.conf', '--without-demo', 'all']``.
Return the ``odoo`` package.
"""
import odoo
if not hasattr(odoo, "_get_pool"):
os.putenv('TZ', 'UTC')
if appname is not None:
os.putenv('PGAPPNAME', appname)
odoo.tools.config.parse_config(options or [])
if odoo.release.version_info < (15,):
odoo.api.Environment.reset()
elif odoo.release.version_info > (19, 2):
import odoo.http.router
odoo.http.dispatch_rpc = odoo.http.router.dispatch_rpc
odoo._get_pool = odoo.modules.registry.Registry
def close_all():
for db in odoo._get_pool.registries.keys():
odoo.sql_db.close_db(db)
atexit.register(close_all)
return odoo
[docs]
def issearchdomain(arg):
"""Check if the argument is a search domain.
Examples:
- ``[('name', '=', 'mushroom'), ('state', '!=', 'draft')]``
- ``['name = mushroom', 'state != draft']``
- ``[]``
"""
return isinstance(arg, list) and not (arg and (
# Not a list of ids: [1, 2, 3]
isinstance(arg[0], int) or
# Not a list of ids as str: ['1', '2', '3']
(isinstance(arg[0], str) and arg[0].isdigit())))
[docs]
def searchargs(params, kwargs=None):
"""Compute the 'search' parameters."""
if not params:
return ([],)
if not isinstance(domain := params[0], list):
return params
for (idx, term) in enumerate(domain):
if isinstance(term, str) and term not in DOMAIN_OPERATORS:
if not (m := _term_re.match(term.strip())):
raise ValueError(f"Cannot parse term {term!r}")
(field, operator, value) = m.groups()
try:
value = literal_eval(value)
except Exception:
pass # Interpret the value as a string
domain[idx] = (field, operator, value)
params = (domain,) + params[1:]
if kwargs and len(params) == 1:
args = (kwargs.pop('offset', 0),
kwargs.pop('limit', None),
kwargs.pop('order', None))
if any(args):
params += args
return params
def extract_http_response(method, result, regex):
if method == 'HEAD':
return result.url
found = re.search(regex or r'odoo._*session_info_* = (.*);', result)
return found and (found.group(1) if regex else json.loads(found.group(1)))
class partial(functools.partial):
__slots__ = ()
def __repr__(self):
# Hide arguments
return f"{self.__class__.__name__}({self.func!r}, ...)"
class Error(Exception):
"""An Odooly error."""
class ServerError(Exception):
"""An error received from the server."""
class Printer:
width = None
def _print_(self, message, _prefix):
cols = self.width - len(_prefix) - 1
suffix = f"... L={len(message)}" if len(message) > cols else ""
xch = message[:cols - len(suffix)] + suffix
print(color_comment(f"{_prefix} {xch}"))
print_sent = functools.partialmethod(_print_, _prefix=' >')
print_recv = functools.partialmethod(_print_, _prefix='<--')
def __bool__(self):
return bool(self.width)
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
if exc_type:
if issubclass(exc_type, ServerError):
exc = exc.args[0]["data"]["name"]
self.print_recv(f"{exc_type.__name__}: {exc}")
[docs]
class WebAPI:
"""A wrapper around Web endpoints.
The connected endpoints are exposed on the Client instance.
Argument `client` is the connected Client.
Argument `endpoint` is the name of the service
(examples: ``"web/database"``, ``"web/session"``).
Argument `methods` is the list of methods which should be
exposed on this endpoint. Use ``dir(...)`` on the
instance to list them.
"""
_methods = ()
def __init__(self, client, endpoint, methods):
self._dispatch = client._proxy_web(endpoint)
self._server = urljoin(client._server, '/')
self._endpoint = f'/{endpoint}'
self._methods = [*methods]
self._printer = client._printer
def __repr__(self):
return f"<WebAPI '{self._server[:-1]}{self._endpoint}'>"
def __dir__(self):
return sorted(self._methods)
def __getattr__(self, name):
def wrapper(self, _func=None, **params):
return self._request(f'{name}/{_func}' if _func else name, params)
return _memoize(self, name, wrapper)
def _request(self, path, params=None):
if not self._printer:
return self._dispatch(path, params)
if self._endpoint == '/doc':
snt = [f'GET /doc/{path}.json']
else:
snt = [f'POST {self._endpoint}/{path}'] + format_params(params)
with self._printer as log:
log.print_sent(' '.join(snt))
log.print_recv(repr(res := self._dispatch(path, params)))
return res
[docs]
class Service:
"""A wrapper around RPC endpoints.
The connected endpoints are exposed on the Client instance.
The `client` argument is the connected Client.
The `endpoint` argument is the name of the service
(examples: ``"object"``, ``"common"``). The `methods` is the list of methods
which should be exposed on this endpoint. Use ``dir(...)`` on the
instance to list them.
"""
_methods = ()
def __init__(self, client, endpoint, methods):
self._dispatch = client._proxy(endpoint)
self._rpcpath = client._server
self._endpoint = endpoint
self._methods = methods
self._printer = client._printer
def __repr__(self):
return f"<Service '{self._rpcpath}|{self._endpoint}'>"
def __dir__(self):
return sorted(self._methods)
def __getattr__(self, name):
if name not in self._methods:
raise AttributeError(f"'Service' object has no attribute {name!r}")
def sanitize(args):
if len(args) > 2:
args = args[:2] + ('*',) + args[3:]
return ', '.join(repr(arg) for arg in args)
def wrapper(self, *args):
if not self._printer:
return self._dispatch(name, args)
with self._printer as log:
log.print_sent(f"{self._endpoint}.{name}({sanitize(args)})")
log.print_recv(repr(res := self._dispatch(name, args)))
return res
return _memoize(self, name, wrapper)
[docs]
class Json2:
"""A connection to Json-2 API
Added in Odoo 19.
"""
_protocol_name = 'JSON-2'
_endpoint = '/json/2'
_doc_endpoint = '/doc-bearer'
def __init__(self, client, database, api_key):
self._http = HTTPSession()
self._server = urljoin(client._server, '/')
self._headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
'X-Odoo-Database': database or '',
}
self._method_params = {'base': dict(_base_method_params)}
self._printer = client._printer
[docs]
def doc(self, model):
"""Documentation of the `model`."""
model_doc = self._request(f'{self._doc_endpoint}/{model}.json')
if model not in self._method_params:
method_params = Model._parse_doc_methods(model_doc)
self._method_params[model] = dict(method_params)
return model_doc
def _methods(self, model):
if model not in self._method_params:
self.doc(model)
return self._method_params[model]
def _prepare_params(self, model, method, args, kwargs):
if not args:
return {**kwargs}
if len(args) == 1 and args[0].__class__ in (Ids, Id1):
return {'ids': args[0], **kwargs}
if not (arg_names := self._method_params['base'].get(method)):
arg_names = self._methods(model).setdefault(method, ())
params = dict(zip(arg_names, args))
params.update(kwargs)
if len(args) > len(arg_names) and self._printer:
print(f"Method {method!r} on {model!r} called with extra args: {args[len(arg_names):]}")
return params
[docs]
def __call__(self, model, method, args, kw=None):
"""Execute API call on the `model`."""
params = self._prepare_params(model, method, args, kw or {})
return self._request(f'{self._endpoint}/{model}/{method}', params)
def __repr__(self):
return f"<Json2 '{self._server[:-1]}{self._endpoint}'>"
def _check(self, uid=None):
url = urljoin(self._server, f'{self._endpoint}/res.users/context_get')
try:
context = self._http.request(url, json={}, headers=self._headers)
except ServerError:
return False
return self if (not uid or uid == context['uid']) else False
def _request(self, path, params=None):
url = urljoin(self._server, path)
verb = 'GET' if params is None else 'POST'
if not self._printer:
return self._http.request(url, method=verb, json=params, headers=self._headers)
with self._printer as log:
log.print_sent(' '.join([verb, path] + format_params(params or {})))
res = self._http.request(url, method=verb, json=params, headers=self._headers)
log.print_recv(repr(res))
return res
[docs]
class Env:
"""An environment wraps data for Odoo models and records:
- :attr:`db_name`, the current database;
- :attr:`uid`, the current user id;
- :attr:`context`, the current context dictionary.
To retrieve an instance of ``some.model``:
>>> env["some.model"]
"""
name = uid = user = session_info = _api_key = _doc = _json2 = _access_models = None
_class_ids = Ids, Id1
_cache = {}
def __new__(cls, client, db_name=()):
if db_name:
env = cls._cache.get((Env, db_name, client._server))
if env and env.client == client:
return env
if not db_name or client.env.db_name:
env = object.__new__(cls)
env.client, env.db_name, env.context = client, db_name, {}
else:
env, env.db_name = client.env, db_name
if db_name:
env._model_names = env._cache_get('model_names', dict)
env._models = {}
return env
def __contains__(self, name):
"""Test wether this model exists."""
return name in self.models(name)
[docs]
def __getitem__(self, name):
"""Return the :class:`Model` for the given ``name``."""
return self._get(name)
def __iter__(self):
"""Return an iterator on model names."""
return iter(self.models())
def __len__(self):
"""Return the size of the model registry."""
return len(self.models())
def __bool__(self):
return True
__eq__ = object.__eq__
__ne__ = object.__ne__
__hash__ = object.__hash__
def __repr__(self):
return f"<Env '{self.user.login if self.uid else ''}@{self.db_name}'>"
def _check_user_password(self, user, password, api_key):
if self.client._object and not self.db_name:
raise Error('Error: Not connected')
assert isinstance(user, str) and user
if user == SYSTEM_USER and self.uid and not password:
info = self.client._authenticate_system()
return info['uid'], password, info
# Read from cache
auth_cache = self._cache_get('auth', dict)
(uid, pwcache) = auth_cache.get(user) or (None, None)
password = password or pwcache
# Ask for password
if not password and not api_key:
password = getpass(f"Password for '{color_bold(user)}': ")
if not self.client._object:
try: # Standard Web or JSON-2 authentication
info = self.client._authenticate(self.db_name, user, password, api_key)
uid = info['uid']
except Exception as exc:
if 'does not exist' in str(exc): # Heuristic
raise Error('Error: Database does not exist')
raise
else: # Login through RPC Service (deprecated)
if not uid:
uid = self.client.common.login(self.db_name, user, api_key or password)
if uid:
args = self.db_name, uid, api_key or password, 'res.users', 'context_get', ()
info = {'uid': uid, 'user_context': self.client._object.execute_kw(*args)}
if not uid: # Failed
if user in auth_cache:
del auth_cache[user]
raise Error('Error: Invalid username or password')
# Discovered database name
if not self.db_name:
self.db_name = info.get('db') or ()
# Set the cache for authentication
self._cache_set('auth', auth_cache)
self.refresh()
if not self.uid:
# Cache the unauthenticated Env and the client
self._cache_set(Env, self)
# Update credentials in cache
auth_cache[user] = uid, password
return uid, password, info
[docs]
def set_api_key(self, api_key, store=True):
"""Configure methods to use an API key."""
def env_auth(method): # Authenticated endpoints
return partial(method, self.db_name, self.uid, api_key)
if self.client.web and self.client.version_info >= 19.0:
self._json2 = Json2(self.client, self.db_name, api_key)._check(self.uid)
self._doc = (self._json2 or self.client.web).doc
try:
self._doc('res.device')
except ServerError:
self._doc = None
prev_protocol = getattr(self, '_execute_kw', ...)
if self.client._object: # RPC endpoint if available
self._execute_kw = env_auth(self.client._object.execute_kw)
self._execute_kw._protocol_name = self.client._proxy._protocol_name
else: # Otherwise, use JSON-2 or WebAPI
self._execute_kw = self._json2 or self._call_kw
if prev_protocol not in (self._execute_kw._protocol_name, ...):
self.client.connect()
self._api_key = api_key if store else None
return api_key
def _configure(self, uid, user, password, api_key, context, session):
env = Env(self.client)
(env.db_name, env.name) = (self.db_name, self.name)
env._model_names = self._model_names
env._models = {}
# Setup uid and user
if isinstance(user, Record):
user = user.login
env.uid = uid
env.user = Record(env._get('res.users', False), uid)
env.context = dict(context)
env.session_info = session
if user:
assert isinstance(user, str), repr(user)
env.user.__dict__['login'] = user
# Set API methods
if uid != self.uid or (api_key and api_key != self._api_key):
env.set_api_key(api_key or password, bool(api_key))
else: # Copy methods
for attr in 'access_models', 'api_key', 'doc', 'execute_kw', 'json2':
setattr(env, f'_{attr}', getattr(self, f'_{attr}'))
return env
@property
def odoo_env(self):
"""Return a server Environment."""
return self.client._server.api.Environment(self.cr, self.uid, self.context)
@property
def cr(self):
"""Return a cursor on the database."""
return self.__dict__.get('cr') or _memoize(self, 'cr', self.registry.cursor())
@property
def registry(self):
"""Return the environment's registry."""
return self.client._server._get_pool(self.db_name)
def __call__(self, user=None, password=None, api_key=None, context=None):
"""Return an environment based on ``self`` with modified parameters."""
if user is not None:
(uid, password, session) = self._check_user_password(user, password, api_key)
if context is None:
context = session.get('user_context') or {}
elif context is not None:
(uid, user, session) = (self.uid, self.user, self.session_info)
else:
return self
env_key = bytes.fromhex(f"{uid:08x}{hash(json.dumps(context, sort_keys=1)) % 2**32:08x}")
if (env := self._cache_get(env_key)) is None:
env = self._configure(uid, user, password, api_key, context, session)
env._cache_set(env_key, env)
return env
[docs]
def sudo(self, user=None):
"""Attach to the provided user, or Superuser."""
if user is None:
if self.client._object or not (self.session_info or {}).get('is_system'):
user = ADMIN_USER
else:
user = SYSTEM_USER
return self(user=user)
[docs]
def ref(self, xml_id):
"""Return the record for the given ``xml_id`` external ID."""
(module, name) = xml_id.split('.')
data = self._get('ir.model.data', False).read(
[('module', '=', module), ('name', '=', name)], 'model res_id')
if data:
assert len(data) == 1
return Record(self[data[0]['model']], data[0]['res_id'])
@property
def lang(self):
"""Return the current language code."""
return self.context.get('lang')
def refresh(self):
db_key, preserve = (self.db_name, self.client._server), ('auth', Env)
for key in [*self._cache]:
if key[1:] == db_key and key[0] not in preserve and self._cache[key] != self:
del self._cache[key]
self._access_models = None
self._model_names = self._cache_set('model_names', {})
self._models = {}
if self._json2:
self._json2._method_params = {'base': dict(_base_method_params)}
def _cache_get(self, key, func=None):
try:
return self._cache[key, self.db_name, self.client._server]
except KeyError:
pass
if func is not None:
return self._cache_set(key, func())
def _cache_set(self, key, value, db_name=None):
self._cache[key, db_name or self.db_name, self.client._server] = value
return value
def _is_identitycheck(self, result):
return hasattr(result, 'get') and result.get('res_model') == 'res.users.identitycheck'
def _identitycheck(self, result):
assert self.client.version_info >= 14.0, f'Not supported: Odoo {self.client.version_info}'
idcheck = self._get(result['res_model'], transient=True).get(result['res_id'])
password = self._cache_get('auth')[self.user.login][1]
result = None
while not result:
try:
password = password or getpass(f"Password for '{color_bold(self.user.login)}': ")
except (KeyboardInterrupt, EOFError):
print()
raise Error("Security Control - FAILED")
if self.client.version_info < 19.0:
idcheck.password = password
try:
# Odoo >= 19 read from context
result = idcheck.with_context(password=password).run_check()
except ServerError as exc:
error = exc.args[0]['data']
if not self.client._is_interactive() or error['name'] != 'odoo.exceptions.UserError':
raise
password = None
print(error['message'])
if self.client._is_interactive():
print("Security Control - PASSED")
return result
[docs]
def _call_kw(self, model, method, args, kw=None):
if self.uid != self.client._session_uid:
password = self._cache_get('auth')[self.user.login][1]
if self.user.login == SYSTEM_USER and not password:
self.client._authenticate_system()
else:
self.client._authenticate_session(self.db_name, self.user.login, password)
return self.client.web_dataset.call_kw(model=model, method=method, args=args, kwargs=kw or {})
_call_kw._protocol_name = 'Web API'
[docs]
def execute(self, obj, method, *params, **kwargs):
"""Wrapper around ``/web/dataset/call_kw`` Webclient endpoint,
or ``/json/2`` API endpoint or ``object.execute_kw`` RPC method.
Argument `method` is the name of a standard ``Model`` method
or a specific method available on this `obj`.
Method `params` are accepted. If needed, keyword
arguments are collected in `kwargs`.
"""
assert self.uid, 'Not connected'
assert isinstance(obj, str) and isinstance(method, str) and method != 'browse'
order_ids = single_id = False
if method == 'read':
assert params, 'Missing parameter'
if not isinstance(params[0], list):
single_id = True
ids = [params[0]] if params[0] else False
elif params[0] and issearchdomain(params[0]):
# Combine search+read
method, [ids] = 'search_read', searchargs(params[:1])
else:
order_ids = kwargs.pop('order', False) and params[0]
ids = sorted({*params[0]} - {False})
if not ids and order_ids:
return [False] * len(order_ids)
if not ids:
return ids
params = (ids,) + params[1:]
elif method == 'search':
# Accept keyword arguments for the search method
params = searchargs(params, kwargs)
elif method == 'search_count':
params = searchargs(params)
elif method == 'search_read':
params = searchargs(params[:1]) + params[1:]
kw = (({**kwargs, 'context': self.context},)
if self.context else (kwargs and ({**kwargs},) or ()))
if self.client._use_call_button and method in ('create', 'write', 'unlink'):
return self.client.web_dataset.call_button(model=obj, method=method, args=params, kwargs=kw[0])
res = self._execute_kw(obj, method, params, *kw)
if self._is_identitycheck(res):
res = self._identitycheck(res)
if order_ids:
# Results were not in the same order as the IDs
# in case of missing records or duplicate ID
resdic = {val['id']: val for val in res}
res = [resdic.get(id_, False) for id_ in order_ids]
return (res or [None])[0] if single_id else res
[docs]
def access(self, model_name, mode="read"):
"""Check if the user has access to this model.
Optional argument `mode` is the access mode to check. Valid values
are ``read``, ``write``, ``create`` and ``unlink``. If omitted,
the ``read`` mode is checked. Return a boolean.
"""
try:
self.execute('ir.model.access', 'check', model_name, mode)
return True
except Exception:
return False
def _models_get(self, name, check, transient):
if name not in self._model_names:
if check:
raise KeyError(name)
self._model_names[name] = transient
try:
return self._models[name]
except KeyError:
self._models[name] = Model._new(self, name)
return self._models[name]
[docs]
def models(self, name='', transient=False):
"""Search Odoo models.
The argument `name` is a pattern to filter the models returned.
If omitted, all models are returned.
The return value is a sorted list of model names.
"""
if self._access_models is None:
ir_model = self._get('ir.model', False)
domain = [('abstract', '=', False)] if 'abstract' in ir_model._keys else [] # Odoo 19
try:
models = ir_model.search_read(domain, ('model', 'transient'))
except ServerError:
# Only Odoo 15 prevents non-admin user to retrieve models
models = ir_model.get_available_models() if self.client.version_info >= 16.0 else {}
self._model_names.update({m['model']: m.get('transient', False) for m in models})
self._access_models = bool(models)
return sorted(mod for mod, is_transient in self._model_names.items()
if name in mod and transient == is_transient)
def _get(self, name, check=True, transient=False):
"""Return a :class:`Model` instance.
The argument `name` is the name of the model. If the optional
argument `check` is :const:`False`, no validity check is done.
"""
check = check and not transient and self._access_models is not False
try:
return self._models_get(name, check, transient=transient)
except KeyError:
model_names = self.models(name)
if name in self._model_names or not self._access_models:
return self._models_get(name, self._access_models, transient=transient)
if model_names:
errmsg = 'Model not found. These models exist:'
else:
errmsg = f'Model not found: {name}'
raise Error('\n * '.join([errmsg] + model_names))
[docs]
def modules(self, name='', installed=None):
"""Return a dictionary of modules.
The optional argument `name` is a pattern to filter the modules.
If the boolean argument `installed` is :const:`True`, the modules
which are "Not Installed" or "Not Installable" are omitted. If
the argument is :const:`False`, only these modules are returned.
If argument `installed` is omitted, all modules are returned.
The return value is a dictionary where module names are grouped in
lists according to their ``state``.
"""
if isinstance(name, str):
domain = [('name', 'like', name)]
else:
domain = name
if installed is not None:
op = 'not in' if installed else 'in'
domain.append(('state', op, ['uninstalled', 'uninstallable']))
ir_module = self._get('ir.module.module', False)
if mods := ir_module.read(domain, 'name state'):
res = {}
for mod in mods:
if mod['state'] not in res:
res[mod['state']] = []
res[mod['state']].append(mod['name'])
return res
def _upgrade(self, modules, button, quiet):
# First, update the list of modules
ir_module = self._get('ir.module.module', False)
updated, added = ir_module.update_list()
if added:
print(f'{added} module(s) added to the list')
# Find modules
sel = ir_module.search([('name', 'in', modules)])
mods = ir_module.read([_pending_state], 'name state')
if sel:
# Safety check
if any(mod['name'] not in modules for mod in mods):
raise Error('Pending actions:\n' + '\n'.join(
f" {mod['state']}\t{mod['name']}" for mod in mods))
if button == 'button_uninstall':
# Safety check
names = ir_module.read([('id', 'in', sel.ids),
'state != installed',
'state != to upgrade',
'state != to remove'], 'name')
if names:
raise Error(f"Not installed: {', '.join(names)}")
# Click upgrade/install/uninstall button
if button != 'cancel':
self.execute('ir.module.module', button, sel.ids)
mods = ir_module.read([_pending_state], 'name state')
if not mods:
if sel:
print(f"Already up-to-date: {self.modules([('id', 'in', sel.ids)])}")
elif modules:
raise Error(f"Module(s) not found: {', '.join(modules)}")
print(f'{updated} module(s) updated')
return
print(f'{len(sel)} module(s) selected')
print(f'{len(mods)} module(s) to process:')
for mod in mods:
print(f" {mod['state']}\t{mod['name']}")
# Confirm?
if not quiet and any(mod['id'] not in sel.ids for mod in mods):
assert self.client._is_interactive(), "Cannot continue"
if not (ans := input('Confirm? [y/N] ')) or ans[:1].lower() != 'y':
button = 'cancel'
if button == 'cancel':
# Reset module state
if self.client.version_info < 19.0:
installed = [mod['id'] for mod in mods if mod['state'] != 'to install']
uninstalled = [mod['id'] for mod in mods if mod['state'] == 'to install']
if uninstalled:
self.execute('ir.module.module', 'button_install_cancel', uninstalled)
if installed:
self.execute('ir.module.module', 'button_upgrade_cancel', installed)
else: # Odoo >= 19
self.execute('ir.module.module', 'button_reset_state')
else:
# Apply scheduled upgrades
self.execute('base.module.upgrade', 'upgrade_module', [])
# Empty the cache for this database
self.refresh()
[docs]
def upgrade(self, *modules, quiet=False):
"""Press button ``Upgrade``."""
return self._upgrade(modules, button='button_upgrade', quiet=quiet)
[docs]
def install(self, *modules, quiet=False):
"""Press button ``Install``."""
return self._upgrade(modules, button='button_install', quiet=quiet)
[docs]
def uninstall(self, *modules, quiet=False):
"""Press button ``Uninstall``."""
return self._upgrade(modules, button='button_uninstall', quiet=quiet)
[docs]
def upgrade_cancel(self):
"""Press button ``Cancel Upgrade/Install/Uninstall``."""
return self._upgrade((), button='cancel', quiet=True)
[docs]
def session_authenticate(self, login=None, password=None):
"""Create a Webclient session for current user."""
if not login:
login = self.user.login
params = {
'db': self.db_name,
'login': login,
'password': password or getpass(f"Password for '{color_bold(login)}': "),
}
self.session_info = self.client._authenticate_session(**params)
# When database name is discovered, copy cached data
if not self.db_name and (not self.uid or login == self.user.login) and self.session_info.get('db'):
empty_db_key = (self.db_name, self.client._server)
self.db_name = self.session_info['db']
for key in [*self._cache]:
if key[1:] == empty_db_key:
self._cache_set(key[0], self._cache[key])
print(f'Session authenticated for {login!r}' if self.session_info['uid'] else 'Failed')
[docs]
def session_destroy(self):
"""Terminate current Webclient session."""
self.client._session_uid = None
try:
return self.client.web_session.destroy()
except ServerError as exc:
# Ignore: odoo.http.SessionExpiredException
if exc.args[0]['code'] != 100:
raise
[docs]
def generate_api_key(self):
"""Generate an API Key and configure environment to use it.
Caution: API Key is not saved. It can be set in the
configuration: ``api_key = ...``.
"""
assert self.client.version_info >= 14.0, f'Not supported: Odoo {self.client.version_info}'
key_vals = {'name': f'Created by Odooly {__version__}'}
wiz_model = self._get("res.users.apikeys.description", transient=True)
res = wiz_model.create(key_vals).make_key()
self.user._invalidate_cache()
self.refresh() # Remove cached Envs
assert res['res_model'] == "res.users.apikeys.show"
return self.set_api_key(res['context']['default_key'])
def _get_sql_action(self, _external_id="__odooly__.sql"):
act_model = self._get('ir.actions.server', False)
if not (action := act_model.get(_external_id)):
logg_model = self._get('ir.model', False).get('base.model_ir_logging')
values = {'name': 'SQL Execute', 'state': 'code', 'model_id': logg_model.id}
(action := act_model.create(values).ensure_one())._set_external_id(_external_id)
return action.with_context(lang=None)
[docs]
def sql(self, queries):
"""Execute SQL commands on the PostgreSQL server."""
qlist = []
for query in queries.split(";"):
if any(li.split('--', 1)[0].strip() for li in query.splitlines()):
qlist.append(query.strip())
if not qlist:
return None
vals = {"name": f"SQL Execute - {datetime.datetime.now()}"}
if (sql_action := self._get_sql_action()).code != _sql_action_code:
vals["code"] = _sql_action_code
sql_action.write(vals)
sql_action.with_context(__sql=qlist).run()
logg = self._get('ir.logging', False).get([f"func = {vals['name']}"])
return eval(logg.message, {"datetime": datetime}) if logg else None
[docs]
class Client:
"""Connection to an Odoo instance.
This is the top level object.
The `server` is the URL of the instance, like ``http://localhost:8069``.
If `server` is an ``odoo`` Python package, it is used to connect to the local server.
The `db` is the name of the database and the `user` should exist in the
table ``res.users``. If the `password` is not provided, it will be
asked on login.
"""
_config_file = CONF_FILE
_saved_config = {}
_globals = None
_use_call_button = False
def __init__(self, server, db=None, user=None, password=None, api_key=None, verbose=False):
self._http = HTTPSession()
self._printer = Printer()
self._session_uid = None
self.verbose = verbose
self._set_services(server, db)
self.env = Env(self)
if user: # Try to login
self.login(user, password=password, api_key=api_key, database=db)
@property
def verbose(self):
return self._printer.width
@verbose.setter
def verbose(self, cols):
cols = MAXCOL[min(3, cols) - 1] if (cols or 9) < 9 else cols
self._printer.width = cols and max(36, cols) or None
def _set_services(self, server, db):
if isinstance(server, list):
appname = Path(__file__).name.rstrip('co')
server = start_odoo_services(server, appname=appname)
elif isinstance(server, str):
rsvr = urlsplit(server)
if "@" in rsvr.netloc:
[username, password] = rsvr._userinfo
rsvr = rsvr._replace(netloc=rsvr.netloc.rsplit("@", 1)[1])
self._http.set_auth(server, username, password)
if rsvr.path[-1:] == '/':
rsvr = rsvr._replace(path=rsvr.path.rstrip('/'))
server = rsvr.geturl()
self._server = server
if not isinstance(server, str):
self._proxy = self._proxy_odoo
elif '/jsonrpc' in server:
self._proxy = self._proxy_jsonrpc
else:
if not db:
# Resolve redirects
__, server = self._request_parse(server, method='HEAD')
if not server.endswith('/web'):
server = urljoin(server, '/web')
self._server = server
self._proxy = self.common = self._object = None
if isinstance(server, str):
self.web = WebAPI(self, 'web', ())
self.web.doc = WebAPI(self, 'doc', ())._request # Odoo 19
def get_web_api(name):
return WebAPI(self, f'web/{name}', _web_methods[name][:])
self.database = get_web_api('database')
self.web_dataset = get_web_api('dataset')
self.web_session = get_web_api('session')
self.web_webclient = get_web_api('webclient')
else:
self.web = None
if self._proxy is None:
self.server_version = self.web_webclient.version_info()['server_version']
else:
# Create the RPC services
self.common = Service(self, 'common', _rpc_methods['common'])
self._object = Service(self, 'object', _rpc_methods['object'])
self.server_version = self.common.version()['server_version']
# Parse server version
major_minor = re.search(r'\d+\.?\d*', self.server_version).group()
self.version_info = float(major_minor)
assert self.version_info > 8.0, f'Not supported: Odoo {major_minor}'
def _request_parse(self, path, *, method=None, data=None, headers=None, regex=None):
verb = method or ('GET' if data is None else 'POST')
url = urljoin(self._server, path)
if not self._printer:
res = self._http.request(url, data=data, headers=headers, method=verb)
return res, extract_http_response(verb, res, regex)
with self._printer as log:
log.print_sent(' '.join([verb, path] + format_params(data or {})))
res = self._http.request(url, data=data, headers=headers, method=verb)
parsed = extract_http_response(verb, res, regex)
log.print_recv(str(parsed))
return res, parsed
def _post_jsonrpc(self, endpoint='', params=None):
req_id = f"{os.getpid():04x}{int(time.time() * 1E6) % 2**40:010x}"
payload = {'jsonrpc': '2.0', 'method': 'call', 'params': params or {}, 'id': req_id}
resp = self._http.request(urljoin(self._server, endpoint), json=payload)
if r_error := resp.get('error'):
raise ServerError(r_error)
return resp.get('result')
def _proxy_odoo(self, name):
return partial(self._server.http.dispatch_rpc, name)
_proxy_odoo._protocol_name = 'Odoo'
def _proxy_jsonrpc(self, name):
def dispatch_jsonrpc(method, args):
return self._post_jsonrpc(params={'service': name, 'method': method, 'args': args})
return dispatch_jsonrpc
_proxy_jsonrpc._protocol_name = 'JSON-RPC'
def _proxy_web(self, name):
if name == 'doc':
def dispatch_web(model, params):
return self._http.request(urljoin(self._server, f"doc/{model}.json"), method='GET')
elif name == 'web/database':
def dispatch_web(method, params):
if method != 'list':
return self._http.request(urljoin(self._server, f"{name}/{method}"), data=params)
return self._post_jsonrpc(f"{name}/{method}", params=params)
else:
def dispatch_web(method, params):
if method == 'call_kw' and name == 'web/dataset':
method = f"{method}/{params['model']}/{params['method']}"
return self._post_jsonrpc(f"{name}/{method}", params=params)
return dispatch_web
[docs]
def save(self, environment=None, skip=False):
"""Save environment settings with this name, or current name"""
self.env.name = environment or self.env.name or self.env.db_name
if not skip and self.env.uid:
config = (self._server, self.env.db_name, self.env.user.login, None, self.env._api_key)
self._saved_config[self.env.name] = config
if self._globals and self._globals.get('client', self) is self:
self._set_prompt()
return self
[docs]
@classmethod
def get_config(cls, environment):
"""Retrieve the settings for this environment.
It can be in memory, if it was saved before with :meth:`Client.save`.
If not, it will parse ``odooly.ini`` file, where it searches for the
section ``[ <environment> ]``.
See :func:`read_config` for details of the configuration file format.
"""
assert environment
if environment not in cls._saved_config:
try:
cls._saved_config[environment] = read_config(environment)
except Exception:
envs = [*cls._saved_config]
if cls._config_file.exists():
envs += [env for env in read_config() if env not in cls._saved_config]
raise Error("No such environment.\nAvailable: " + ", ".join(envs))
return cls._saved_config[environment]
[docs]
@classmethod
def from_config(cls, environment, user=None, verbose=False):
"""Create a connection to a defined environment.
See :meth:`Client.get_config`
Return a connected :class:`Client`.
"""
(server, db, conf_user, password, api_key) = cls.get_config(environment)
if skip_save := user and user != conf_user:
password = None
try:
client = Env._cache[Env, db, server].client
client.verbose = verbose
client.login(user or conf_user, password=password, api_key=api_key)
except KeyError:
client = cls(server, db, user or conf_user, password=password, api_key=api_key, verbose=verbose)
return client.save(environment, skip=skip_save)
def __repr__(self):
return f"<Client '{self._server}?db={self.env.db_name or ''}'>"
def _authenticate(self, db, login, password, api_key):
if api_key and not password and self.version_info >= 19.0:
json2_api = Json2(self, db, api_key)
context = json2_api('res.users', 'context_get', ())
info = {'uid': context['uid'], 'user_context': context, 'db': db}
elif self.web:
info = self._authenticate_session(db, login, password)
else:
raise Error("Error: Cannot authenticate")
return info
def _authenticate_session(self, db, login, password):
info = {'uid': None}
try:
if db:
info = self.web_session.authenticate(db=db, login=login, password=password)
if self.version_info >= 15.0 and info['uid'] is None: # Is it 2FA?
info = self._authenticate_web(db=db, login=login, password=password)
else:
info = self._authenticate_web(login=login, password=password)
self._session_uid = info.get('uid')
except TypeError:
pass # Cannot extract `csrf_token` or `session_info` with Regex
except ServerError as exc:
# Ignore: odoo.exceptions.AccessDenied
if exc.args[0]['code'] not in (0, 200):
raise
return info
def _authenticate_web(self, **kw):
# 1. Get CSRF token
qs = f"?{urlencode(dict(db=kw['db']))}" if "db" in kw else ""
__, csrf = self._request_parse('/web' + qs, regex=r'csrf_token: "(\w+)"')
# 2. Login
rv, session_info = self._request_parse('/web/login', data={'csrf_token': csrf, **kw})
for retry in range(4):
# 3. Parse 'session_info'
if 'user_id' in session_info and 'uid' not in session_info: # Odoo < 18
session_info['uid'] = session_info['user_id']
if retry and not session_info['uid']:
print('Verification failed')
if session_info['uid'] or 'totp_token' not in rv or retry == 3:
break
# 4. Ask TOTP code
token = getpass(f"Authentication Code for '{color_bold(kw['login'])}' (2FA 6-digits): ")
# 5. Submit TOTP
params = {'csrf_token': csrf, 'totp_token': token, 'remember': 1}
rv, session_info = self._request_parse('/web/login/totp', data=params)
return session_info if session_info.get('username') == kw['login'] else {'uid': None}
def _authenticate_system(self):
__, session_info = self._request_parse('/web/become')
self._session_uid = session_info.get('uid')
if self._session_uid != 1:
raise Error("Cannot become Superuser")
return session_info
def _select_database(self, db_list, limit=20):
if len(db_list) == 1:
return db_list[0]
if not db_list or not self._is_interactive():
return
print('Available databases:')
for idx, name in enumerate(db_list[:limit], start=1):
print(f' {idx}. {name!r}')
if len(db_list) > limit:
print(' ...')
print()
while db_list:
ans = input('Select a database: ')
try:
return db_list[int(ans) - 1]
except Exception:
pass
def _login(self, user, password=None, database=None, api_key=None):
"""Switch `user` and (optionally) `database`.
If the `password` is not available, it will be asked.
"""
env = self.env
if not env.db_name or (database and env.db_name != database):
try:
dbs = self.database.list()
except Exception:
pass # AccessDenied: simply ignore this check
else:
if not database:
# Database selector page
database = self._select_database(dbs)
elif dbs and database not in dbs:
raise Error(f"Database {database!r} does not exist: {dbs}")
if database and env.db_name != database:
if self._session_uid:
env.session_destroy()
env = Env(self, database)
try:
self.env = env(user=user, password=password, api_key=api_key)
except Exception:
raise
finally:
# Used for logging, copied from odoo.sql_db.db_connect
current_thread().dbname = self.env.db_name
return self.env.uid
[docs]
def login(self, user, password=None, database=None, api_key=None):
"""Switch `user` and (optionally) `database`."""
if not self._is_interactive():
return self._login(user, password=password, database=database, api_key=api_key)
try:
register = self._login(user, password=password, database=database, api_key=api_key)
except Error as exc:
print(exc)
register = 'client' not in self._globals
# Register the globals()
register and self.connect()
def connect(self, env_name=None, *, server=None, database=None, user=None):
"""Connect to another environment and replace the globals()."""
assert self._is_interactive(), 'Not available'
if env_name:
self.from_config(env_name, user=user, verbose=self.verbose)
elif server:
if not user and self.env.uid:
user = self.env.user.login
Client(server, db=database, user=user, verbose=self.verbose)
else:
assert not user, f"Use client.login({user!r}) instead"
self._globals['client'] = self.env.client
self._globals['env'] = env = self.env
self._set_prompt()
# Logged in?
if env.uid:
print(f"Logged in as '{color_bold(env.user.login)}' with {env._execute_kw._protocol_name}")
def _set_prompt(self):
# Tweak prompt
sys.ps1 = f'{self.env.name or self.env.db_name} >>> '
sys.ps2 = '... '.rjust(len(sys.ps1))
@classmethod
def _set_interactive(cls, global_vars={}):
global color_bold, color_comment, color_py, color_repr
# Don't call multiple times
del Client._set_interactive
assert not cls._is_interactive()
for name in ['__name__', '__version__', '__doc__', 'Client']:
global_vars[name] = globals()[name]
try: # Python >= 3.14
from _pyrepl.utils import disp_str, gen_colors, _colorize
color_py = color_repr = lambda v: "".join(disp_str(v, colors=[*gen_colors(v)])[0])
color_py.__name__ = color_py.__qualname__ = 'color_python'
color_bold = color_py('def _').split()[1].replace('_', '{}').format
color_comment = color_py('#').replace('#', '{}').format
global_vars |= {'color_py': color_py, 'decolor': _colorize.decolor}
except ImportError:
pass
cls._globals = global_vars
return global_vars
@classmethod
def _is_interactive(cls):
return cls._globals is not None
[docs]
def create_database(self, passwd, database, demo=False, lang='en_US',
user_password=ADMIN_USER, login=ADMIN_USER,
country_code=None):
"""Create a new database.
The superadmin `passwd` and the `database` name are mandatory.
By default, `demo` data are not loaded, `lang` is ``en_US``
and no country is set into the database.
Login if successful.
"""
self.database.create(master_pwd=passwd, name=database, lang=lang,
password=user_password, demo=demo, login=login,
country_code=country_code, phone='')
return self.login(login, user_password, database=database)
[docs]
def clone_database(self, passwd, database, neutralize_database=False):
"""Clone the current database.
The superadmin `passwd` and `database` are mandatory.
Login if successful.
"""
extra = {"neutralize_database": neutralize_database} if neutralize_database else {}
if extra and self.version_info < 16.0:
raise Error("Argument 'neutralize_database' is not supported")
self.database.duplicate(master_pwd=passwd, name=self.env.db_name,
new_name=database, **extra)
# Copy the cache for authentication
auth_cache = self.env._cache_get('auth')
self.env._cache_set('auth', {**auth_cache}, db_name=database)
# Login with the current user into the new database
auth_args = {
'password': auth_cache[self.env.user.login][1],
'api_key': self.env._api_key,
'database': database,
}
return self.login(self.env.user.login, **auth_args)
[docs]
def drop_database(self, passwd, database):
"""Drop the database.
The superadmin `passwd` and `database` are mandatory.
"""
if not database or database == self.env.db_name:
raise Error("Failed - Cannot delete active database")
self.database.drop(master_pwd=passwd, name=database)
if database in self.database.list():
raise Error("Failed - Database was not deleted")
class BaseModel:
def sudo(self, user=None):
"""Attach to the provided user, or Superuser."""
return self.with_env(self.env.sudo(user=user))
def with_context(self, *args, **kwargs):
"""Attach to an extended context."""
context = dict(args[0] if args else self.env.context, **kwargs)
return self.with_env(self.env(context=context))
def with_odoo(self):
"""Attach to an ``odoo.api.Environment``.
Use same (db_name, uid, context) as current ``Env``.
Only available in ``local`` mode.
"""
return self.with_env(self.env.odoo_env)
[docs]
class Model(BaseModel):
"""The class for Odoo models."""
def __new__(cls, env, name):
return env[name]
@classmethod
def _new(cls, env, name):
m = object.__new__(cls)
(m.env, m._name) = (env, name)
m._execute = partial(env.execute, name)
return m
def __repr__(self):
return f"<Model '{self._name}'>"
[docs]
def with_env(self, env):
"""Attach to the provided environment."""
return env[self._name]
[docs]
def keys(self):
"""Return the keys of the model."""
return self._keys
[docs]
def fields(self, names=None, attributes=None):
"""Return a dictionary of the fields of the model.
Optional argument `names` is a sequence of field names or
a space separated string of these names.
If omitted, all fields are returned.
Optional argument `attributes` is a sequence of attributes
or a space separated string of these attributes.
If omitted, all attributes are returned.
"""
if isinstance(names, str):
names = names.split()
if isinstance(attributes, str):
attributes = attributes.split()
if names is None:
if attributes is None:
return self._fields
return {fld: {att: val
for (att, val) in vals.items() if att in attributes}
for (fld, vals) in self._fields.items()}
if attributes is None:
return {fld: vals
for (fld, vals) in self._fields.items() if fld in names}
return {fld: {att: val
for (att, val) in vals.items() if att in attributes}
for (fld, vals) in self._fields.items() if fld in names}
[docs]
def field(self, name):
"""Return the field properties for field `name`."""
return self._fields[name]
@staticmethod
def _parse_doc_methods(doc_dict):
methods = doc_dict.get('methods') or {}
result = []
for key, vals in methods.items():
arg_names = [*vals['parameters']]
if 'model' not in vals.get('api', ()):
arg_names.insert(0, 'ids')
result.append((key, arg_names))
return result
[docs]
def _methods(self, name=''):
"""List methods and arguments.
Argument `name` is a pattern to filter the methods returned.
If omitted, all methods are returned.
"""
try:
method_params = self._parse_doc_methods(self._doc)
except Exception:
method_params = _base_method_params
return {key: args for (key, args) in method_params if name in key}
[docs]
def access(self, mode="read"):
"""Check if the user has access to this model.
Optional argument `mode` is the access mode to check. Valid values
are ``read``, ``write``, ``create`` and ``unlink``. If omitted,
the ``read`` mode is checked. Return a boolean.
"""
return self.env.access(self._name, mode)
[docs]
def browse(self, ids=()):
"""Return a :class:`Record` or a :class:`RecordList`.
The argument `ids` accepts a single integer ``id`` or a list of ids.
If it is a single integer, the return value is a :class:`Record`.
Otherwise, the return value is a :class:`RecordList`.
"""
if isinstance(ids, int) or (len(ids) == 2 and isinstance(ids[1], str)):
return Record(self, ids)
return RecordList(self, ids)
[docs]
def search(self, domain, **kwargs):
"""Search for records in the `domain`."""
if kwargs.get('count'):
return self.search_count(domain)
return RecordList._prepared(self, domain, kwargs)
[docs]
def search_count(self, domain=None):
"""Count the records in the `domain`."""
return self._execute('search_count', domain or [])
def search_read(self, domain=None, fields=None, **kwargs):
"""Combine search and read."""
fields, fmt = self._parse_format(fields, browse=False)
res = self._execute('search_read', domain or [], fields, **kwargs)
return fmt(res)
[docs]
def get(self, domain, *args, **kwargs):
"""Return a single :class:`Record`.
The argument `domain` accepts a single integer ``id`` or a search
domain, or an external ID ``xml_id``. The return value is a
:class:`Record` or None. If multiple records are found,
a ``ValueError`` is raised.
"""
if args or kwargs:
# Passthrough for env['ir.default'].get and alike
return self._execute('get', domain, *args, **kwargs)
if isinstance(domain, int): # a single id
return Record(self, domain)
if isinstance(domain, str): # lookup the xml_id
rec = self.env.ref(domain)
if not rec:
return None
assert rec._model is self, f'Model mismatch {rec!r} {self!r}'
return rec
assert issearchdomain(domain) # a search domain
if len(ids := self._execute('search', domain)) > 1:
raise ValueError(f'domain matches too many records ({len(ids)})')
return Record(self, ids[0]) if ids else None
[docs]
def create(self, values):
"""Create one :class:`Record` or many.
The argument `values` is a dictionary of values which are used to
create the record. Relationship fields `one2many` and `many2many`
accept either a list of ids or a RecordList or the extended Odoo
syntax. Relationship fields `many2one` and `reference` accept
either a Record or the Odoo syntax. It can create multiple records.
The newly created :class:`Record` is returned, or :class:`RecordList`.
"""
if hasattr(values, "items"):
values, nb = self._unbrowse_values(values), 1
else: # Odoo >= 12
values, nb = [self._unbrowse_values(vals) for vals in values], len(values)
new_ids = self._execute('create', values)
if not new_ids and self.env.client._use_call_button:
return self.search([], order='id DESC', limit=nb)
return self.browse(new_ids)
[docs]
def read(self, *params, **kwargs):
"""Wrapper for ``client.execute(model, 'read', [...], ('a', 'b'))``.
The first argument is a list of ids ``[1, 3]`` or a single id ``42``
or a search domain.
The second argument, `fields`, accepts:
- a single field: ``'first_name'``
- a tuple of fields: ``('street', 'city')``
- a space separated list: ``'street city'``
- a format string: ``'{street} {city}'``
If `fields` is omitted, all fields are read.
If `domain` is a single id, then:
- return a single value if a single field is requested.
- return a string if a format spec is passed in the `fields` argument.
- else, return a dictionary.
If `domain` is not a single id, the returned value is a list of items.
Each item complies with the rules of the previous paragraph.
The optional keyword arguments `offset`, `limit` and `order` are
used to restrict the search.
"""
arg = params[1] if len(params) > 1 else None
fields, fmt = self._parse_format(arg, browse=False)
if arg is not None:
params = (params[0], fields) + params[2:]
res = self._execute('read', *params, **kwargs)
return fmt(res) if isinstance(res, list) else fmt([res])[0]
def _parse_format(self, arg, browse=True):
if not isinstance(arg, str):
fields, formatter = arg, None
elif '}' in arg:
fields = [re.match(r'\w+', tup[1]).group(0)
for tup in Formatter().parse(arg) if tup[1]]
formatter = arg.format_map
else: # transform: "zip city" --> ["zip", "city"]
fields = arg.split()
formatter = (lambda d: d[fields[0]]) if len(fields) == 1 else None
lst_format = lambda values: [(val and formatter(val)) for val in values]
if browse:
if not formatter:
formatter = self._browse_values
elif fields == arg.split():
if 'relation' in (fspec := self.field(fields[0])):
rel_model = self.env._get(fspec['relation'], False)
if fspec['type'] == 'many2one':
m_browse = partial(RecordList, rel_model)
else:
def m_browse(values):
if not values:
return RecordList(rel_model, ())
return [RecordList(rel_model, val or ()) for val in values]
def lst_format(values):
return m_browse([(val and formatter(val)) for val in values])
elif fspec['type'] == 'reference':
def formatter(dic):
if not (value := dic[fields[0]]):
return value
(res_model, res_id) = value.split(',')
rel_model = self.env._get(res_model, False)
return Record(rel_model, int(res_id))
elif not formatter:
lst_format = lambda values: values
return fields, lst_format
def _browse_values(self, values):
"""Wrap the values of a Record.
The argument `values` is a dictionary of values read from a Record.
When the field type is relational (many2one, one2many or many2many),
the value is wrapped in a Record or a RecordList.
Return a dictionary with the same keys as the `values` argument.
"""
for (key, value) in values.items():
if key == 'id' or value is False or hasattr(value, 'id'):
continue
if (field := self._fields[key])['type'] == 'reference':
(res_model, res_id) = value.split(',')
value = int(res_id)
elif 'relation' in field:
res_model = field['relation']
else:
continue
rel_model = self.env._get(res_model, False)
values[key] = rel_model.browse(value)
return values
def _unbrowse_values(self, values):
"""Unwrap the id of Record and RecordList."""
new_values = values.copy()
for (key, value) in values.items():
many = self._fields[key]['type'] in ('one2many', 'many2many')
if hasattr(value, 'id'):
if self._fields[key]['type'] == 'reference':
new_values[key] = f'{value._name},{value.id}'
else:
fld_model = self._fields[key].get('relation')
if fld_model and value._name != fld_model:
raise TypeError(f"Mixing apples and oranges: {fld_model} <> {value._name}")
new_values[key] = value = value.id and (value.ids if many else int(value))
if many:
if not value:
new_values[key] = [(6, 0, [])]
elif isinstance(value[0], int):
new_values[key] = [(6, 0, value)]
return new_values
[docs]
def _get_external_ids(self, ids=None):
"""Retrieve the External IDs of the records.
Return a dictionary with keys being the fully qualified
External IDs, and values the ``Record`` entries.
"""
search_domain = [('model', '=', self._name)]
if ids is not None:
search_domain.append(('res_id', 'in', ids))
existing = self.env._get('ir.model.data', False).read(search_domain, 'module name res_id')
return {f"{rec['module']}.{rec['name']}": self.get(rec['res_id']) for rec in existing}
def __getattr__(self, attr):
if attr == '_fields':
if (vals := self.env._cache_get((attr, self._name))) is None:
vals = self._doc['fields'] if self.__dict__.get('_doc') else self._execute('fields_get')
self.env._cache_set((attr, self._name), vals)
return _memoize(self, attr, vals)
if attr == '_keys':
return _memoize(self, attr, sorted(self._fields))
if attr == '_doc':
return _memoize(self, attr, self.env._doc(self._name) if self.env._doc else None)
if attr.startswith('_'):
raise AttributeError(f"'Model' object has no attribute {attr!r}")
def wrapper(self, *params, **kwargs):
"""Wrapper for client.execute({!r}, {!r}, *params, **kwargs)."""
return self._execute(attr, *params, **kwargs)
return _memoize(self, attr, wrapper, (self._name, attr))
class BaseRecord(BaseModel):
def __init__(self, res_model, arg):
attrs = {'_name': res_model._name, '_model': res_model,
'env': res_model.env, '_execute': res_model._execute}
# Bypass __setattr__ method
self.__dict__.update(attrs)
def __repr__(self):
ids = f'length={len(self.ids)}' if len(self.ids) > 6 else self.id
return f"<{self.__class__.__name__} '{self._name},{ids}'>"
def __bool__(self):
return bool(self.ids)
def __len__(self):
return len(self.ids)
def __getitem__(self, key):
idname = self._idnames[key]
return self._model.browse(idname) if idname is not False else False
def __iter__(self):
yield from (Record(self._model, idname) for idname in self._idnames)
def __contains__(self, item):
if isinstance(item, BaseRecord):
item._check_model(self, 'in')
return len(item) == 1 and item.ids[0] in self.ids
def __add__(self, other):
return self.concat(other)
def __sub__(self, other):
self._check_model(other, '-')
other_ids = {*other.ids}
ids = [idn for (id_, idn) in zip(self.ids, self._idnames)
if id_ not in other_ids]
return RecordList(self._model, ids)
def __and__(self, other):
self._check_model(other, '&')
other_ids = {*other.ids}
self_set = self.union()
ids = [idn for (id_, idn) in zip(self_set.ids, self_set._idnames)
if id_ in other_ids]
return RecordList(self._model, ids)
def __or__(self, other):
return self.union(other)
def __eq__(self, other):
return (self.__class__ is other.__class__ and
self.ids == other.ids and self._model is other._model)
def __ne__(self, other):
return not self == other
def __lt__(self, other):
self._check_model(other, '<')
return {*self.ids} < {*other.ids}
def __le__(self, other):
self._check_model(other, '<=')
return {*self.ids}.issubset(other.ids)
def __gt__(self, other):
self._check_model(other, '>')
return {*self.ids} > {*other.ids}
def __ge__(self, other):
self._check_model(other, '>=')
return {*self.ids}.issuperset(other.ids)
def __int__(self):
return int(self.ensure_one().id)
@property
def _keys(self):
return self._model._keys
@property
def _fields(self):
return self._model._fields
def _methods(self, name=''):
return self._model._methods(name)
def _invalidate_cache(self):
pass
def ensure_one(self):
"""Return the single record in this recordset.
Raise a ValueError it recordset has more records or is empty.
"""
if self.id and not isinstance(self.id, list):
return self
if len((recs := self.union()).id) == 1:
return recs[0]
raise ValueError(f"Expected singleton: {self}")
def exists(self):
"""Return a subset of records that exist."""
if self.env.client.version_info < 19.0:
method, arg = 'exists', self.union().ids
else:
# Beware that it might be wrong, if `search` method is overloaded.
# This is the case for 'ir.attachment' for example.
method, arg = 'search', [('id', 'in', self.union().ids)]
ids = self.ids and self._execute(method, arg, context={'active_test': False})
if ids and not isinstance(self.id, list):
ids = ids[0]
return self._model.browse(ids)
def get_metadata(self):
"""Read the metadata of the record(s)
Return a dictionary of values.
"""
return self._execute('get_metadata', self.ids)
def with_env(self, env):
return env[self._name].browse(self.id)
def _check_model(self, other, oper):
if not (isinstance(other, BaseRecord) and self._model is other._model):
raise TypeError(f"Mixing apples and oranges: {self} {oper} {other}")
def _concat_ids(self, args):
ids = [*self._idnames]
for other in args:
self._check_model(other, '+')
ids.extend(other._idnames)
return ids
def concat(self, *args):
"""Return the concatenation of all records."""
ids = self._concat_ids(args)
return RecordList(self._model, ids)
def union(self, *args):
"""Return the union of all records.
Preserve first occurence order.
"""
ids = self._concat_ids(args)
if len(ids) > 1:
seen = {*()}
uniqids = []
for idn in ids:
id_, name = idn if isinstance(idn, (list, tuple)) else (idn, None)
if id_ not in seen and not seen.add(id_) and id_:
uniqids.append((id_, name) if name else id_)
ids = uniqids
return RecordList(self._model, ids)
@classmethod
def _union(cls, args):
if hasattr(args, 'union'):
return args.union()
if args and isinstance(args, list) and hasattr(args[0], 'union'):
return cls.union(*args)
return args
def _filter(self, attrs):
ids, rels = [], []
for (rec, rel) in zip(self, self.read(attrs.pop(0))):
if rel and (not hasattr(rel, 'ids') or rel.id):
ids.append(rec._idnames[0])
rels.append(rel)
if ids and attrs:
relids = {idn[0] for idn in BaseRecord.union(*rels)._filter(attrs)}
ids = [rec_id for (rec_id, rel) in zip(ids, rels)
if any(rel_id in relids for rel_id in rel.ids)]
return ids
def mapped(self, func):
"""Apply ``func`` on all records."""
if callable(func):
return self._union([func(rec) for rec in self])
# func is a path
vals = self[:]
for name in func.split('.'):
vals = self._union(vals.read(name))
return vals
def filtered(self, func):
"""Select the records such that ``func(rec)`` is true.
As an alternative ``func`` can be a search domain (list)
to search among the records.
"""
if callable(func):
ids = [rec._idnames[0] for rec in self if func(rec)]
elif isinstance(func, list):
return self & self._model.search([('id', 'in', self.ids)] + func)
else:
ids = self[:]._filter(func.split('.')) if func else self._idnames
return RecordList(self._model, ids)
def sorted(self, key=None, reverse=False):
"""Return the records sorted by ``key``."""
if len((recs := self.union()).ids) < 2:
return recs
if key is None:
idnames = dict(zip(recs.ids, recs._idnames))
recs = self._model.search([('id', 'in', recs.ids)])
ids = [idnames[id_] for id_ in recs.ids]
elif isinstance(key, str):
vals = sorted(zip(recs.read(key), recs._idnames))
ids = [idn for (__, idn) in vals]
else:
ids = [rec._idnames[0] for rec in sorted(recs, key=key)]
return RecordList(self._model, ids[::-1] if reverse else ids)
def write(self, values):
"""Write the `values` in the record(s).
`values` is a dictionary of values.
See :meth:`Model.create` for details.
"""
if not self.id:
return True
values = self._model._unbrowse_values(values)
self._invalidate_cache()
return self._execute('write', self.ids, values)
def unlink(self):
"""Delete the record(s) from the database."""
if not self.id:
return True
self._invalidate_cache()
return self._execute('unlink', self.ids)
[docs]
class RecordList(BaseRecord):
"""A sequence of Odoo :class:`Record`.
It has a similar API as the :class:`Record` class, but for a list of
records. The attributes of the ``RecordList`` are read-only, and they
return list of attribute values in the same order. The ``many2one``,
``one2many`` and ``many2many`` attributes are wrapped in ``RecordList``
and list of ``RecordList`` objects. Use the method ``RecordList.write``
to assign a single value to all the selected records.
"""
def __init__(self, res_model, arg, search=None):
super().__init__(res_model, arg)
if search is None:
Ids, idnames = self.env._class_ids[0], arg or ()
ids = Ids(idnames)
for index, id_ in enumerate(arg):
if isinstance(id_, (list, tuple)):
ids[index] = id_ = id_[0]
assert isinstance(id_, int), repr(id_)
self.__dict__.update({'id': ids, 'ids': ids, '_idnames': idnames, '_search_args': None})
else:
self.__dict__['_search_args'] = {'model': res_model, **search}
@classmethod
def _prepared(cls, res_model, domain, params):
[domain] = searchargs((domain,))
return cls(res_model, None, search={'domain': domain, **params})
def refresh(self):
"""Reset :class:`RecordList` content."""
if self._search_args:
for key in 'id', 'ids', '_idnames':
self.__dict__.pop(key, None)
def __getitem__(self, key):
if 'id' not in self.__dict__ and isinstance(key, int) and key >= 0:
self, key = self[key:key+1], 0
if 'id' in self.__dict__ or (getattr(key, 'start', -1) or 0) < 0:
return super().__getitem__(key)
is_stop_positive = key.stop is not None and key.stop >= 0
search_args = {**self._search_args}
new_offset, new_stop = key.start or 0, search_args.get('limit', None)
if is_stop_positive:
new_stop = key.stop if new_stop is None else min(new_stop, key.stop)
if new_stop is not None:
search_args['limit'] = limit = new_stop - new_offset
if limit <= 0:
return RecordList(self._model, [])
if new_offset:
search_args['offset'] = (search_args.get('offset') or 0) + new_offset
result = RecordList(self._model, None, search=search_args)
if (is_stop_positive or key.stop is None) and key.step in (1, None):
return result
key = slice(None, limit if is_stop_positive else key.stop, key.step)
return super(RecordList, result).__getitem__(key)
[docs]
def with_env(self, env):
if 'id' in self.__dict__:
return super().with_env(env)
return RecordList(env[self._name], None, {**self._search_args})
[docs]
def read(self, fields=None):
"""Read the `fields` of the :class:`RecordList`.
The argument `fields` accepts different kinds of values.
See :meth:`Model.read` for details.
"""
fields, fmt = self._model._parse_format(fields)
if fields == ['id']:
values = [{'id': res_id} for res_id in self.ids]
elif 'id' not in self.__dict__:
params = {**self._search_args}
values = params.pop('model').search_read(params.pop('domain'), fields, **params)
ids = idnames = [val['id'] for val in values]
if values and 'display_name' in values[0]:
idnames = [(val['id'], val['display_name']) for val in values]
Ids, __ = self.env._class_ids
self.__dict__.update({'id': Ids(ids), 'ids': Ids(ids), '_idnames': idnames})
else:
values = self._model.read(self.ids, fields, order=True) if self.ids else []
return fmt(values)
[docs]
def copy(self, default=None):
"""Copy records and return :class:`RecordList`.
The optional argument `default` is a mapping which overrides some
values of the new records.
Supported since Odoo 18.
"""
default = default and self._model._unbrowse_values(default)
new_ids = self._execute('copy', self.ids, default)
return RecordList(self._model, new_ids)
@property
def _external_id(self):
"""Retrieve the External IDs of the :class:`RecordList`.
Return the fully qualified External IDs with default value
False if there's none. If multiple IDs exist for a record,
only one of them is returned (randomly).
"""
xml_ids = {r.id: xml_id for (xml_id, r) in
self._model._get_external_ids(self.ids).items()}
return [xml_ids.get(res_id, False) for res_id in self.id]
def __getattr__(self, attr):
if attr in ('id', 'ids', '_idnames'):
params = {**self._search_args}
ids = params.pop('model')._execute('search', params.pop('domain'), **params)
Ids, __ = self.env._class_ids
self.__dict__.update({'id': Ids(ids), 'ids': Ids(ids), '_idnames': ids})
return self.__dict__[attr]
if attr in self._model._keys:
return self.read(attr)
if attr.startswith('_'):
errmsg = f"'RecordList' object has no attribute {attr!r}"
raise AttributeError(errmsg)
def wrapper(self, *params, **kwargs):
"""Wrapper for client.execute({!r}, {!r}, [...], *params, **kwargs)."""
return self._execute(attr, self.ids, *params, **kwargs)
return _memoize(self, attr, wrapper, (self._name, attr))
def __setattr__(self, attr, value):
if attr in self._model._keys or attr == 'id':
msg = f"attribute {attr!r} is read-only; use 'RecordList.write' instead."
else:
msg = f"has no attribute {attr!r}"
raise AttributeError("'RecordList' object " + msg)
[docs]
class Record(BaseRecord):
"""A class for all Odoo records.
It maps any Odoo object.
The fields can be accessed through attributes. The changes are immediately
sent to the server.
The ``many2one``, ``one2many`` and ``many2many`` attributes are wrapped in
``Record`` and ``RecordList`` objects. These attributes support writing
too.
The attributes are evaluated lazily, and they are cached in the record.
The Record's cache is invalidated if any attribute is changed.
"""
def __init__(self, res_model, arg):
super().__init__(res_model, arg)
if isinstance(arg, int):
name, idnames = None, [arg]
else:
idnames = [(arg, name)] = [arg]
Ids, Id1 = self.env._class_ids
attrs = {'id': Id1(arg), 'ids': Ids([arg]), '_idnames': idnames, '_cached_keys': {*()}}
if name is not None:
attrs['_Record__name'] = attrs['display_name'] = name
self.__dict__.update(attrs)
def __str__(self):
return self.__name if self.id else 'False'
def _get_name(self):
try:
name = self.display_name
except Exception:
name = f'{self._name},{self.id}'
self.__dict__['_idnames'] = [(int(self.id), str(name))]
return _memoize(self, '_Record__name', str(name))
[docs]
def refresh(self):
"""Force refreshing the record's data."""
self._invalidate_cache()
def _invalidate_cache(self):
self._cached_keys.discard('id')
for key in self._cached_keys:
delattr(self, key)
self._cached_keys.clear()
def _update(self, values):
new_values = self._model._browse_values(values)
self.__dict__.update(new_values)
self._cached_keys.update(new_values)
return new_values
[docs]
def read(self, fields=None):
"""Read the `fields` of the :class:`Record`.
The argument `fields` accepts different kinds of values.
See :meth:`Model.read` for details.
"""
rv = self._model.read(self.id, fields)
if rv is not None and isinstance(fields, str) and fields in self._model._keys:
return self._update({fields: rv})[fields]
if isinstance(rv, dict):
return self._update(rv)
return rv
[docs]
def copy(self, default=None):
"""Copy a record and return the new :class:`Record`.
The optional argument `default` is a mapping which overrides some
values of the new record.
"""
default = default and self._model._unbrowse_values(default)
new_id = self._execute('copy', self.id, default)
if isinstance(new_id, list):
[new_id] = new_id or [False]
return Record(self._model, new_id)
@property
def _external_id(self):
"""Retrieve the External ID of the :class:`Record`.
Return the fully qualified External ID of the :class:`Record`,
with default value False if there's none. If multiple IDs
exist, only one of them is returned (randomly).
"""
xml_ids = self._model._get_external_ids(self.ids)
return [*xml_ids][0] if xml_ids else False
def _set_external_id(self, xml_id):
"""Set the External ID of this record."""
(mod, name) = xml_id.split('.')
domain = ['|', '&', ('module', '=', mod), ('name', '=', name),
'&', ('model', '=', self._name), ('res_id', '=', self.id)]
if self.env._get('ir.model.data', False).search(domain):
raise ValueError(f'ID {xml_id!r} collides with another entry')
values = {'model': self._name, 'res_id': self.id, 'module': mod, 'name': name}
self.env._get('ir.model.data', False).create(values)
def __getattr__(self, attr):
if attr in self._model._keys:
return self.read(attr)
if attr == '_Record__name':
return self._get_name()
if attr.startswith('_'):
raise AttributeError(f"'Record' object has no attribute {attr!r}")
def wrapper(self, *params, **kwargs):
"""Wrapper for client.execute({!r}, {!r}, {:d}, *params, **kwargs)."""
res = self._execute(attr, self.ids, *params, **kwargs)
self._invalidate_cache()
if isinstance(res, list) and len(res) == 1:
return res[0]
return res
return _memoize(self, attr, wrapper, (self._name, attr, self.id))
def __setattr__(self, attr, value):
if attr == '_external_id':
return self._set_external_id(value)
if attr not in self._model._keys:
raise AttributeError(f"'Record' object has no attribute {attr!r}")
if attr == 'id':
raise AttributeError("'Record' object attribute 'id' is read-only")
self.write({attr: value})
def _interact(global_vars, use_pprint=True, usage=USAGE):
import builtins
import pprint
if use_pyrepl := not os.getenv("PYTHON_BASIC_REPL"):
try:
from _pyrepl.main import CAN_USE_PYREPL as use_pyrepl
except ImportError:
use_pyrepl = False
if use_pyrepl: # Python >= 3.13
from _pyrepl.console import InteractiveColoredConsole as Console
from _pyrepl.simple_interact import run_multiline_interactive_console as run
from _pyrepl import readline
else:
from code import InteractiveConsole as Console
builtins.clear = type('', (), {'__repr__': lambda s: os.system('clear') or ''})()
try:
import readline
import rlcompleter
readline.parse_and_bind('tab: complete')
except ImportError:
pass
try:
readline.read_history_file(HIST_FILE)
if readline.get_history_length() < 0:
readline.set_history_length(int(os.getenv('HISTSIZE', 500)))
# better append instead of replace?
atexit.register(readline.write_history_file, HIST_FILE)
except Exception:
pass # IOError if file missing, or other error
if use_pprint:
pp = lambda obj: print(color_repr(pprint.pformat(obj, **PP_FORMAT)))
def displayhook(value, _printer=pp, _builtins=builtins):
# Pretty-format the output
if value is not None:
_printer(value)
_builtins._ = value
sys.displayhook = displayhook
def excepthook(exc_type, exc, tb):
# Print readable errors
msg = ''.join(format_exception(exc_type, exc, tb, chain=False, **exfmt))
print(color_repr(msg.rstrip()))
sys.excepthook = excepthook
builtins.usage = type('Usage', (), {'__call__': lambda s: print(usage),
'__repr__': lambda s: usage})()
console = Console(global_vars, filename="<stdin>")
exfmt = {"colorize": console.can_colorize} if use_pyrepl else {}
run(console) if use_pyrepl else console.interact('', '')
def get_parser():
parser = argparse.ArgumentParser(description='Interact with Odoo')
parser.add_argument('args', nargs='*', help='URL of the server')
parser.add_argument(
'-l', '--list', action='store_true', dest='list_env',
help='list sections of the configuration')
parser.add_argument(
'--env',
help='read connection settings from the given section')
parser.add_argument(
'-c', '--config', default=None,
help=f'specify alternate config file (default: {CONF_FILE})')
parser.add_argument(
'--server', default=None,
help=f'full URL of the server (default: {DEFAULT_URL})')
parser.add_argument('-d', '--db', default=None, help='database')
parser.add_argument('-u', '--user', default=None, help='username')
parser.add_argument(
'-p', '--password', default=None, metavar='PASS',
help='password, or it will be requested on login')
parser.add_argument(
'--api-key', dest='api_key', default=None,
help='API Key for JSON-2 or JSON-RPC')
parser.add_argument(
'-v', '--verbose', default=0, action='count',
help='verbose')
parser.add_argument('--version', action='version', version=__version__)
return parser
def connect_client(args):
if args.env:
client = Client.from_config(args.env, user=args.user, verbose=args.verbose)
else:
if not args.server:
args.server = ['-c', args.config] if args.config else DEFAULT_URL
if args.args:
args.server = args.server + args.args if args.config else ' '.join(args.args)
if not args.user:
args.user = ADMIN_USER
client = Client(args.server, args.db, args.user, password=args.password,
api_key=args.api_key, verbose=args.verbose)
return client
def main(interact=_interact):
args = get_parser().parse_args()
Client._config_file = Path.cwd() / (args.config or CONF_FILE)
if args.list_env:
print('Available settings: ' + ' '.join(read_config()))
return
global_vars = Client._set_interactive()
print(color_repr(USAGE))
connect_client(args)
return interact(global_vars) if interact else global_vars
if __name__ == '__main__':
main()