Source code for odooly

#!/usr/bin/env python
""" odooly.py -- Odoo client library and command line tool

Author: Florent Xicluna
"""
import _ast
import argparse
import atexit
import datetime
import functools
import json
import os
import re
import shlex
import sys
import time
import traceback

from configparser import ConfigParser
from getpass import getpass
from pathlib import Path
from string import Formatter
from threading import current_thread
from urllib.parse import urlencode, urljoin, urlsplit

try:
    import requests
except ImportError:
    requests = None

__version__ = '2.6.5'
__all__ = ['Client', 'Env', 'HTTPSession', 'WebAPI', 'Service', 'Json2',
           'Printer', 'Error', 'ServerError',
           'BaseModel', 'Model', 'BaseRecord', 'Record', 'RecordList',
           'format_exception', 'read_config', 'start_odoo_services']

CONF_FILE = Path('odooly.ini')
HIST_FILE = Path('~/.odooly_history').expanduser()
DEFAULT_URL = 'http://localhost:8069/'
ADMIN_USER = 'admin'
SYSTEM_USER = '__system__'
MAXCOL = [79, 179, 9999]    # Line length in verbose mode
PP_FORMAT = {'sort_dicts': False, 'width': 120}
USER_AGENT = f'Mozilla/5.0 (X11) odooly.py/{__version__}'

USAGE = """\
Usage (some commands):
    env[name]                       # Return a Model instance
    env[name].keys()                # List field names of the model
    env[name].fields(names=None)    # Return details for the fields
    env[name].field(name)           # Return details for the field
    env[name].browse(ids=())
    env[name].search(domain)
    env[name].search(domain, offset=0, limit=None, order=None)
                                    # Return a RecordList

    rec = env[name].get(domain)     # Get the Record matching domain
    rec.some_field                  # Return the value of this field
    rec.read(fields=None)           # Return values for the fields

    client.login(user)              # Login with another user
    client.connect(env_name)        # Connect to another env.
    client.connect(server=None)     # Connect to another server
    env.models(name)                # List models matching pattern
    env.modules(name)               # List modules matching pattern
    env.install(module1, module2, ...)
    env.upgrade(module1, module2, ...)
                                    # Install or upgrade the modules
    env.upgrade_cancel()            # Reset failed upgrade/install
"""

DOMAIN_OPERATORS = frozenset('!|&')
# Supported operators are:
#   =, !=, >, >=, <, <=, like, ilike, in, not like, not ilike, not in,
#   =like, =ilike, =?, child_of, parent_of,
#   any, not any,                             # Odoo 17
#   not =like, not =ilike,                    # Odoo 19
_term_re = re.compile(
    r'([\w._]+)\s*'   r'(=like\b|=ilike\b|=\?|[<>]=?|!?=|'
    r'\b(?:like|ilike|in|any|not (?:=?like|=?ilike|in|any)|child_of|parent_of)\b)'
    r'(?![?!=<>])\s*(.+)')

# Web methods (not exhaustive)
_web_methods = {
    'database': ['backup', 'change_password', 'create',
                 'drop', 'duplicate', 'list', 'restore'],
    'dataset': ['call_button', 'call_kw'],
    'session': ['authenticate', 'check', 'destroy', 'get_lang_list', 'get_session_info'],
    'webclient': ['version_info'],
}

# RPC methods
_rpc_methods = {
    'common': ['login', 'authenticate', 'version'],
    'object': ['execute', 'execute_kw'],
}
_cause_message = ("\nThe above exception was the direct cause "
                  "of the following exception:\n\n")
_pending_state = ('state', 'not in',
                  ['uninstallable', 'uninstalled', 'installed'])
_base_method_params = [
    ('action_archive', ['ids']),
    ('action_unarchive', ['ids']),
    ('copy', ['ids', 'default']),
    ('create', ['vals_list']),
    ('get_external_id', ['ids']),
    ('get_metadata', ['ids']),
    ('read', ['ids', 'fields', 'load']),
    ('search', ['domain', 'offset', 'limit', 'order']),
    ('search_count', ['domain', 'limit']),
    ('search_read', ['domain', 'fields', 'offset', 'limit', 'order']),
    ('unlink', ['ids']),
    ('write', ['ids', 'vals']),
]
_sql_action_code = """\
sql_queries = env.context.get("__sql") or []
result = env.cr.connection.notices

if not env.is_system():
    raise UserError("Not allowed")

for query in sql_queries:
    env.cr.execute(query)
    if not env.cr.description:
        result.append(env.cr.statusmessage)
    elif not env.cr.rowcount:
        result.append({c.name: () for c in env.cr.description})
    else:
        columns = [c.name for c in env.cr.description]
        result.extend(dict(zip(columns, values)) for values in env.cr.fetchall())

log(str({'queries': sql_queries, 'result': result}))
result[:] = []
"""
color_bold = color_comment = color_py = color_repr = str
http_context = None

if os.getenv('ODOOLY_SSL_UNVERIFIED'):
    import ssl

    http_context = ssl._create_unverified_context()
    requests = None

if not requests:
    from urllib.request import HTTPCookieProcessor, HTTPSHandler, Request, build_opener


class HTTPSession:
    if requests:  # requests.Session
        def __init__(self):
            self._session = requests.Session()
            self._session.headers.update({'User-Agent': USER_AGENT, 'Accept': 'application/json'})

        def set_auth(self, uri, username, password):
            self._session.auth = (username, password)

        def _request(self, url, method, data, json, headers, **kw):
            resp = self._session.request(method, url, data=data, json=json, headers=headers, **kw)
            return resp.raise_for_status() or resp

        def _parse_response(self, resp):
            is_json = 'json' in resp.headers.get('content-type', '')
            return resp.json() if is_json else resp.text

        def _parse_error(self, err):
            resp = err.response
            return (resp.status_code, self._parse_response(resp)) if resp is not None else (0, 0)

    else:  # urllib.request
        def __init__(self):
            self._session = build_opener(HTTPCookieProcessor(), HTTPSHandler(context=http_context))
            self._session.addheaders = [('User-Agent', USER_AGENT), ('Accept', 'application/json')]

        def set_auth(self, uri, username, password):
            from urllib.request import HTTPBasicAuthHandler
            auth = HTTPBasicAuthHandler()
            auth.add_password(None, uri, username, password)
            self._session.add_handler(auth)

        def _request(self, url, method, data, json, headers, _json=json, **kw):
            headers = dict(headers or ())
            if json is not None:
                headers.setdefault('Content-Type', 'application/json')
            if method == 'POST':
                data = (urlencode(data) if json is None else _json.dumps(json)).encode()
            elif data is not None:
                url, data = f'{url}?{urlencode(data)}', None
            return self._session.open(Request(url, data=data, headers=headers, method=method))

        def _parse_response(self, resp):
            is_json = 'json' in resp.headers.get('content-type', '')
            return json.load(resp) if is_json else resp.read().decode()

        def _parse_error(self, err):
            return (err.code, self._parse_response(err)) if hasattr(err, 'code') else (0, 0)

    def request(self, url, *, method='POST', data=None, json=None, headers=None):
        try:
            with self._request(url, method=method, data=data, json=json, headers=headers) as resp:
                return resp if method == 'HEAD' else self._parse_response(resp)
        except OSError as exc:
            status_code, result = self._parse_error(exc)
            if result and status_code in (401, 403, 404, 422, 500):
                # Unauthorized, Forbidden, NotFound, UnprocessableContent, InternalServerError
                if isinstance(result, str):
                    lines = re.findall(r'>([^>\n]+)<', result) or (status_code, result)
                    result = {'name': exc.__class__.__name__, 'debug': None,
                              'arguments': (f'{lines[0]} - {lines[-1]}',)}
                raise ServerError({'code': status_code, 'data': result})
            raise


Ids, Id1 = type('ids', (list,), {'__slots__': ()}), type('id1', (int,), {'__slots__': ()})


def _memoize(inst, attr, value, doc_values=None):
    if hasattr(value, '__get__') and not hasattr(value, '__self__'):
        value.__name__ = attr
        if doc_values is not None:
            value.__doc__.format(*doc_values)
        value = value.__get__(inst, type(inst))
    inst.__dict__[attr] = value
    return value


# Simplified ast.literal_eval which does not parse operators
def _convert(node):
    if isinstance(node, _ast.Constant) and node.value.__class__ is not complex:
        return node.value
    if isinstance(node, _ast.Tuple):
        return tuple(map(_convert, node.elts))
    if isinstance(node, _ast.List):
        return [*map(_convert, node.elts)]
    if isinstance(node, _ast.Dict):
        return {_convert(k): _convert(v)
                for (k, v) in zip(node.keys, node.values)}
    if isinstance(node, _ast.UnaryOp):
        if isinstance(node.op, _ast.USub):
            return -_convert(node.operand)
        if isinstance(node.op, _ast.UAdd):
            return +_convert(node.operand)
    raise ValueError('malformed or disallowed expression')


def literal_eval(expression):
    node = compile(expression, '<unknown>', 'eval', _ast.PyCF_ONLY_AST)
    return _convert(node.body)


def format_params(params, hide=('passw', 'pwd')):
    secret = {key: ... for key in params if any(sub in key for sub in hide)}
    return [f'{key}={v!r}' if v != ... else f'{key}=*'
            for (key, v) in {**params, **secret}.items()]


[docs] def format_exception(exc_type, exc, tb, limit=None, chain=True, _format_exception=traceback.format_exception, **kw): """Format a stack trace and the exception information. This wrapper is a replacement of ``traceback.format_exception`` which formats the error and traceback received by API. If `chain` is True, then the original exception is printed too. """ values = _format_exception(exc_type, exc, tb, limit=limit, **kw) server_error = None if issubclass(exc_type, Error): # Client-side values = [f"{exc}\n"] elif issubclass(exc_type, OSError): # HTTPError (requests or urllib) values = [f"{exc_type.__name__}: {exc}\n"] elif issubclass(exc_type, ServerError): # JSON-RPC or Web API server_error = exc.args[0]['data'] print_tb = not server_error.get('name', '').startswith(('odoo.', 'werkzeug.')) if server_error: # Format readable API errors try: message = str(server_error['arguments'][0]) except Exception: message = str(server_error['arguments']) fault = f"{server_error['name']}: {message}" tb = print_tb and not message.startswith('FATAL:') and server_error['debug'] if chain: values = [tb or fault, _cause_message] + values values[-1] = fault else: values = [tb or fault] return values
[docs] def read_config(section=None): """Read the environment settings from the configuration file. Config file ``odooly.ini`` contains a `section` for each environment. Each section provides parameters for the connection: ``server``, ``username`` and (optional) ``database``, ``password`` and ``api_key``. As an alternative, server can be declared with 4 parameters: ``scheme / host / port / protocol``. Default values are read from the ``[DEFAULT]`` section. If the ``password`` is not set or is empty, it is requested on login. Return tuple ``(server, db or None, user, password or None, api_key or None)``. Without argument, it returns the list of configured environments. """ with Client._config_file.open() as f: (p := ConfigParser()).read_file(f) if section is None: return p.sections() server = (env := dict(p.items(section))).get('server') if (scheme := env.get('scheme', 'http')) == 'local': server = shlex.split(server or env.get('options', '')) elif not server: protocol = env.get('protocol', 'web') server = f"{scheme}://{env['host']}:{env['port']}/{protocol}" return (server, env.get('database', ''), env['username'], env.get('password'), env.get('api_key'))
[docs] def start_odoo_services(options=None, appname=None): """Initialize the Odoo services. Import the ``odoo`` Python package and load the Odoo services. The argument `options` receives the command line arguments for ``odoo``. Example: ``['-c', '/path/to/odoo-server.conf', '--without-demo', 'all']``. Return the ``odoo`` package. """ import odoo if not hasattr(odoo, "_get_pool"): os.putenv('TZ', 'UTC') if appname is not None: os.putenv('PGAPPNAME', appname) odoo.tools.config.parse_config(options or []) if odoo.release.version_info < (15,): odoo.api.Environment.reset() elif odoo.release.version_info > (19, 2): import odoo.http.router odoo.http.dispatch_rpc = odoo.http.router.dispatch_rpc odoo._get_pool = odoo.modules.registry.Registry def close_all(): for db in odoo._get_pool.registries.keys(): odoo.sql_db.close_db(db) atexit.register(close_all) return odoo
[docs] def issearchdomain(arg): """Check if the argument is a search domain. Examples: - ``[('name', '=', 'mushroom'), ('state', '!=', 'draft')]`` - ``['name = mushroom', 'state != draft']`` - ``[]`` """ return isinstance(arg, list) and not (arg and ( # Not a list of ids: [1, 2, 3] isinstance(arg[0], int) or # Not a list of ids as str: ['1', '2', '3'] (isinstance(arg[0], str) and arg[0].isdigit())))
[docs] def searchargs(params, kwargs=None): """Compute the 'search' parameters.""" if not params: return ([],) if not isinstance(domain := params[0], list): return params for (idx, term) in enumerate(domain): if isinstance(term, str) and term not in DOMAIN_OPERATORS: if not (m := _term_re.match(term.strip())): raise ValueError(f"Cannot parse term {term!r}") (field, operator, value) = m.groups() try: value = literal_eval(value) except Exception: pass # Interpret the value as a string domain[idx] = (field, operator, value) params = (domain,) + params[1:] if kwargs and len(params) == 1: args = (kwargs.pop('offset', 0), kwargs.pop('limit', None), kwargs.pop('order', None)) if any(args): params += args return params
def extract_http_response(method, result, regex): if method == 'HEAD': return result.url found = re.search(regex or r'odoo._*session_info_* = (.*);', result) return found and (found.group(1) if regex else json.loads(found.group(1))) class partial(functools.partial): __slots__ = () def __repr__(self): # Hide arguments return f"{self.__class__.__name__}({self.func!r}, ...)" class Error(Exception): """An Odooly error.""" class ServerError(Exception): """An error received from the server.""" class Printer: width = None def _print_(self, message, _prefix): cols = self.width - len(_prefix) - 1 suffix = f"... L={len(message)}" if len(message) > cols else "" xch = message[:cols - len(suffix)] + suffix print(color_comment(f"{_prefix} {xch}")) print_sent = functools.partialmethod(_print_, _prefix=' >') print_recv = functools.partialmethod(_print_, _prefix='<--') def __bool__(self): return bool(self.width) def __enter__(self): return self def __exit__(self, exc_type, exc, tb): if exc_type: if issubclass(exc_type, ServerError): exc = exc.args[0]["data"]["name"] self.print_recv(f"{exc_type.__name__}: {exc}")
[docs] class WebAPI: """A wrapper around Web endpoints. The connected endpoints are exposed on the Client instance. Argument `client` is the connected Client. Argument `endpoint` is the name of the service (examples: ``"web/database"``, ``"web/session"``). Argument `methods` is the list of methods which should be exposed on this endpoint. Use ``dir(...)`` on the instance to list them. """ _methods = () def __init__(self, client, endpoint, methods): self._dispatch = client._proxy_web(endpoint) self._server = urljoin(client._server, '/') self._endpoint = f'/{endpoint}' self._methods = [*methods] self._printer = client._printer def __repr__(self): return f"<WebAPI '{self._server[:-1]}{self._endpoint}'>" def __dir__(self): return sorted(self._methods) def __getattr__(self, name): def wrapper(self, _func=None, **params): return self._request(f'{name}/{_func}' if _func else name, params) return _memoize(self, name, wrapper) def _request(self, path, params=None): if not self._printer: return self._dispatch(path, params) if self._endpoint == '/doc': snt = [f'GET /doc/{path}.json'] else: snt = [f'POST {self._endpoint}/{path}'] + format_params(params) with self._printer as log: log.print_sent(' '.join(snt)) log.print_recv(repr(res := self._dispatch(path, params))) return res
[docs] class Service: """A wrapper around RPC endpoints. The connected endpoints are exposed on the Client instance. The `client` argument is the connected Client. The `endpoint` argument is the name of the service (examples: ``"object"``, ``"common"``). The `methods` is the list of methods which should be exposed on this endpoint. Use ``dir(...)`` on the instance to list them. """ _methods = () def __init__(self, client, endpoint, methods): self._dispatch = client._proxy(endpoint) self._rpcpath = client._server self._endpoint = endpoint self._methods = methods self._printer = client._printer def __repr__(self): return f"<Service '{self._rpcpath}|{self._endpoint}'>" def __dir__(self): return sorted(self._methods) def __getattr__(self, name): if name not in self._methods: raise AttributeError(f"'Service' object has no attribute {name!r}") def sanitize(args): if len(args) > 2: args = args[:2] + ('*',) + args[3:] return ', '.join(repr(arg) for arg in args) def wrapper(self, *args): if not self._printer: return self._dispatch(name, args) with self._printer as log: log.print_sent(f"{self._endpoint}.{name}({sanitize(args)})") log.print_recv(repr(res := self._dispatch(name, args))) return res return _memoize(self, name, wrapper)
[docs] class Json2: """A connection to Json-2 API Added in Odoo 19. """ _protocol_name = 'JSON-2' _endpoint = '/json/2' _doc_endpoint = '/doc-bearer' def __init__(self, client, database, api_key): self._http = HTTPSession() self._server = urljoin(client._server, '/') self._headers = { 'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/json', 'X-Odoo-Database': database or '', } self._method_params = {'base': dict(_base_method_params)} self._printer = client._printer
[docs] def doc(self, model): """Documentation of the `model`.""" model_doc = self._request(f'{self._doc_endpoint}/{model}.json') if model not in self._method_params: method_params = Model._parse_doc_methods(model_doc) self._method_params[model] = dict(method_params) return model_doc
def _methods(self, model): if model not in self._method_params: self.doc(model) return self._method_params[model] def _prepare_params(self, model, method, args, kwargs): if not args: return {**kwargs} if len(args) == 1 and args[0].__class__ in (Ids, Id1): return {'ids': args[0], **kwargs} if not (arg_names := self._method_params['base'].get(method)): arg_names = self._methods(model).setdefault(method, ()) params = dict(zip(arg_names, args)) params.update(kwargs) if len(args) > len(arg_names) and self._printer: print(f"Method {method!r} on {model!r} called with extra args: {args[len(arg_names):]}") return params
[docs] def __call__(self, model, method, args, kw=None): """Execute API call on the `model`.""" params = self._prepare_params(model, method, args, kw or {}) return self._request(f'{self._endpoint}/{model}/{method}', params)
def __repr__(self): return f"<Json2 '{self._server[:-1]}{self._endpoint}'>" def _check(self, uid=None): url = urljoin(self._server, f'{self._endpoint}/res.users/context_get') try: context = self._http.request(url, json={}, headers=self._headers) except ServerError: return False return self if (not uid or uid == context['uid']) else False def _request(self, path, params=None): url = urljoin(self._server, path) verb = 'GET' if params is None else 'POST' if not self._printer: return self._http.request(url, method=verb, json=params, headers=self._headers) with self._printer as log: log.print_sent(' '.join([verb, path] + format_params(params or {}))) res = self._http.request(url, method=verb, json=params, headers=self._headers) log.print_recv(repr(res)) return res
[docs] class Env: """An environment wraps data for Odoo models and records: - :attr:`db_name`, the current database; - :attr:`uid`, the current user id; - :attr:`context`, the current context dictionary. To retrieve an instance of ``some.model``: >>> env["some.model"] """ name = uid = user = session_info = _api_key = _doc = _json2 = _access_models = None _class_ids = Ids, Id1 _cache = {} def __new__(cls, client, db_name=()): if db_name: env = cls._cache.get((Env, db_name, client._server)) if env and env.client == client: return env if not db_name or client.env.db_name: env = object.__new__(cls) env.client, env.db_name, env.context = client, db_name, {} else: env, env.db_name = client.env, db_name if db_name: env._model_names = env._cache_get('model_names', dict) env._models = {} return env def __contains__(self, name): """Test wether this model exists.""" return name in self.models(name)
[docs] def __getitem__(self, name): """Return the :class:`Model` for the given ``name``.""" return self._get(name)
def __iter__(self): """Return an iterator on model names.""" return iter(self.models()) def __len__(self): """Return the size of the model registry.""" return len(self.models()) def __bool__(self): return True __eq__ = object.__eq__ __ne__ = object.__ne__ __hash__ = object.__hash__ def __repr__(self): return f"<Env '{self.user.login if self.uid else ''}@{self.db_name}'>" def _check_user_password(self, user, password, api_key): if self.client._object and not self.db_name: raise Error('Error: Not connected') assert isinstance(user, str) and user if user == SYSTEM_USER and self.uid and not password: info = self.client._authenticate_system() return info['uid'], password, info # Read from cache auth_cache = self._cache_get('auth', dict) (uid, pwcache) = auth_cache.get(user) or (None, None) password = password or pwcache # Ask for password if not password and not api_key: password = getpass(f"Password for '{color_bold(user)}': ") if not self.client._object: try: # Standard Web or JSON-2 authentication info = self.client._authenticate(self.db_name, user, password, api_key) uid = info['uid'] except Exception as exc: if 'does not exist' in str(exc): # Heuristic raise Error('Error: Database does not exist') raise else: # Login through RPC Service (deprecated) if not uid: uid = self.client.common.login(self.db_name, user, api_key or password) if uid: args = self.db_name, uid, api_key or password, 'res.users', 'context_get', () info = {'uid': uid, 'user_context': self.client._object.execute_kw(*args)} if not uid: # Failed if user in auth_cache: del auth_cache[user] raise Error('Error: Invalid username or password') # Discovered database name if not self.db_name: self.db_name = info.get('db') or () # Set the cache for authentication self._cache_set('auth', auth_cache) self.refresh() if not self.uid: # Cache the unauthenticated Env and the client self._cache_set(Env, self) # Update credentials in cache auth_cache[user] = uid, password return uid, password, info
[docs] def set_api_key(self, api_key, store=True): """Configure methods to use an API key.""" def env_auth(method): # Authenticated endpoints return partial(method, self.db_name, self.uid, api_key) if self.client.web and self.client.version_info >= 19.0: self._json2 = Json2(self.client, self.db_name, api_key)._check(self.uid) self._doc = (self._json2 or self.client.web).doc try: self._doc('res.device') except ServerError: self._doc = None prev_protocol = getattr(self, '_execute_kw', ...) if self.client._object: # RPC endpoint if available self._execute_kw = env_auth(self.client._object.execute_kw) self._execute_kw._protocol_name = self.client._proxy._protocol_name else: # Otherwise, use JSON-2 or WebAPI self._execute_kw = self._json2 or self._call_kw if prev_protocol not in (self._execute_kw._protocol_name, ...): self.client.connect() self._api_key = api_key if store else None return api_key
def _configure(self, uid, user, password, api_key, context, session): env = Env(self.client) (env.db_name, env.name) = (self.db_name, self.name) env._model_names = self._model_names env._models = {} # Setup uid and user if isinstance(user, Record): user = user.login env.uid = uid env.user = Record(env._get('res.users', False), uid) env.context = dict(context) env.session_info = session if user: assert isinstance(user, str), repr(user) env.user.__dict__['login'] = user # Set API methods if uid != self.uid or (api_key and api_key != self._api_key): env.set_api_key(api_key or password, bool(api_key)) else: # Copy methods for attr in 'access_models', 'api_key', 'doc', 'execute_kw', 'json2': setattr(env, f'_{attr}', getattr(self, f'_{attr}')) return env @property def odoo_env(self): """Return a server Environment.""" return self.client._server.api.Environment(self.cr, self.uid, self.context) @property def cr(self): """Return a cursor on the database.""" return self.__dict__.get('cr') or _memoize(self, 'cr', self.registry.cursor()) @property def registry(self): """Return the environment's registry.""" return self.client._server._get_pool(self.db_name) def __call__(self, user=None, password=None, api_key=None, context=None): """Return an environment based on ``self`` with modified parameters.""" if user is not None: (uid, password, session) = self._check_user_password(user, password, api_key) if context is None: context = session.get('user_context') or {} elif context is not None: (uid, user, session) = (self.uid, self.user, self.session_info) else: return self env_key = bytes.fromhex(f"{uid:08x}{hash(json.dumps(context, sort_keys=1)) % 2**32:08x}") if (env := self._cache_get(env_key)) is None: env = self._configure(uid, user, password, api_key, context, session) env._cache_set(env_key, env) return env
[docs] def sudo(self, user=None): """Attach to the provided user, or Superuser.""" if user is None: if self.client._object or not (self.session_info or {}).get('is_system'): user = ADMIN_USER else: user = SYSTEM_USER return self(user=user)
[docs] def ref(self, xml_id): """Return the record for the given ``xml_id`` external ID.""" (module, name) = xml_id.split('.') data = self._get('ir.model.data', False).read( [('module', '=', module), ('name', '=', name)], 'model res_id') if data: assert len(data) == 1 return Record(self[data[0]['model']], data[0]['res_id'])
@property def lang(self): """Return the current language code.""" return self.context.get('lang') def refresh(self): db_key, preserve = (self.db_name, self.client._server), ('auth', Env) for key in [*self._cache]: if key[1:] == db_key and key[0] not in preserve and self._cache[key] != self: del self._cache[key] self._access_models = None self._model_names = self._cache_set('model_names', {}) self._models = {} if self._json2: self._json2._method_params = {'base': dict(_base_method_params)} def _cache_get(self, key, func=None): try: return self._cache[key, self.db_name, self.client._server] except KeyError: pass if func is not None: return self._cache_set(key, func()) def _cache_set(self, key, value, db_name=None): self._cache[key, db_name or self.db_name, self.client._server] = value return value def _is_identitycheck(self, result): return hasattr(result, 'get') and result.get('res_model') == 'res.users.identitycheck' def _identitycheck(self, result): assert self.client.version_info >= 14.0, f'Not supported: Odoo {self.client.version_info}' idcheck = self._get(result['res_model'], transient=True).get(result['res_id']) password = self._cache_get('auth')[self.user.login][1] result = None while not result: try: password = password or getpass(f"Password for '{color_bold(self.user.login)}': ") except (KeyboardInterrupt, EOFError): print() raise Error("Security Control - FAILED") if self.client.version_info < 19.0: idcheck.password = password try: # Odoo >= 19 read from context result = idcheck.with_context(password=password).run_check() except ServerError as exc: error = exc.args[0]['data'] if not self.client._is_interactive() or error['name'] != 'odoo.exceptions.UserError': raise password = None print(error['message']) if self.client._is_interactive(): print("Security Control - PASSED") return result
[docs] def _call_kw(self, model, method, args, kw=None): if self.uid != self.client._session_uid: password = self._cache_get('auth')[self.user.login][1] if self.user.login == SYSTEM_USER and not password: self.client._authenticate_system() else: self.client._authenticate_session(self.db_name, self.user.login, password) return self.client.web_dataset.call_kw(model=model, method=method, args=args, kwargs=kw or {})
_call_kw._protocol_name = 'Web API'
[docs] def execute(self, obj, method, *params, **kwargs): """Wrapper around ``/web/dataset/call_kw`` Webclient endpoint, or ``/json/2`` API endpoint or ``object.execute_kw`` RPC method. Argument `method` is the name of a standard ``Model`` method or a specific method available on this `obj`. Method `params` are accepted. If needed, keyword arguments are collected in `kwargs`. """ assert self.uid, 'Not connected' assert isinstance(obj, str) and isinstance(method, str) and method != 'browse' order_ids = single_id = False if method == 'read': assert params, 'Missing parameter' if not isinstance(params[0], list): single_id = True ids = [params[0]] if params[0] else False elif params[0] and issearchdomain(params[0]): # Combine search+read method, [ids] = 'search_read', searchargs(params[:1]) else: order_ids = kwargs.pop('order', False) and params[0] ids = sorted({*params[0]} - {False}) if not ids and order_ids: return [False] * len(order_ids) if not ids: return ids params = (ids,) + params[1:] elif method == 'search': # Accept keyword arguments for the search method params = searchargs(params, kwargs) elif method == 'search_count': params = searchargs(params) elif method == 'search_read': params = searchargs(params[:1]) + params[1:] kw = (({**kwargs, 'context': self.context},) if self.context else (kwargs and ({**kwargs},) or ())) if self.client._use_call_button and method in ('create', 'write', 'unlink'): return self.client.web_dataset.call_button(model=obj, method=method, args=params, kwargs=kw[0]) res = self._execute_kw(obj, method, params, *kw) if self._is_identitycheck(res): res = self._identitycheck(res) if order_ids: # Results were not in the same order as the IDs # in case of missing records or duplicate ID resdic = {val['id']: val for val in res} res = [resdic.get(id_, False) for id_ in order_ids] return (res or [None])[0] if single_id else res
[docs] def access(self, model_name, mode="read"): """Check if the user has access to this model. Optional argument `mode` is the access mode to check. Valid values are ``read``, ``write``, ``create`` and ``unlink``. If omitted, the ``read`` mode is checked. Return a boolean. """ try: self.execute('ir.model.access', 'check', model_name, mode) return True except Exception: return False
def _models_get(self, name, check, transient): if name not in self._model_names: if check: raise KeyError(name) self._model_names[name] = transient try: return self._models[name] except KeyError: self._models[name] = Model._new(self, name) return self._models[name]
[docs] def models(self, name='', transient=False): """Search Odoo models. The argument `name` is a pattern to filter the models returned. If omitted, all models are returned. The return value is a sorted list of model names. """ if self._access_models is None: ir_model = self._get('ir.model', False) domain = [('abstract', '=', False)] if 'abstract' in ir_model._keys else [] # Odoo 19 try: models = ir_model.search_read(domain, ('model', 'transient')) except ServerError: # Only Odoo 15 prevents non-admin user to retrieve models models = ir_model.get_available_models() if self.client.version_info >= 16.0 else {} self._model_names.update({m['model']: m.get('transient', False) for m in models}) self._access_models = bool(models) return sorted(mod for mod, is_transient in self._model_names.items() if name in mod and transient == is_transient)
def _get(self, name, check=True, transient=False): """Return a :class:`Model` instance. The argument `name` is the name of the model. If the optional argument `check` is :const:`False`, no validity check is done. """ check = check and not transient and self._access_models is not False try: return self._models_get(name, check, transient=transient) except KeyError: model_names = self.models(name) if name in self._model_names or not self._access_models: return self._models_get(name, self._access_models, transient=transient) if model_names: errmsg = 'Model not found. These models exist:' else: errmsg = f'Model not found: {name}' raise Error('\n * '.join([errmsg] + model_names))
[docs] def modules(self, name='', installed=None): """Return a dictionary of modules. The optional argument `name` is a pattern to filter the modules. If the boolean argument `installed` is :const:`True`, the modules which are "Not Installed" or "Not Installable" are omitted. If the argument is :const:`False`, only these modules are returned. If argument `installed` is omitted, all modules are returned. The return value is a dictionary where module names are grouped in lists according to their ``state``. """ if isinstance(name, str): domain = [('name', 'like', name)] else: domain = name if installed is not None: op = 'not in' if installed else 'in' domain.append(('state', op, ['uninstalled', 'uninstallable'])) ir_module = self._get('ir.module.module', False) if mods := ir_module.read(domain, 'name state'): res = {} for mod in mods: if mod['state'] not in res: res[mod['state']] = [] res[mod['state']].append(mod['name']) return res
def _upgrade(self, modules, button, quiet): # First, update the list of modules ir_module = self._get('ir.module.module', False) updated, added = ir_module.update_list() if added: print(f'{added} module(s) added to the list') # Find modules sel = ir_module.search([('name', 'in', modules)]) mods = ir_module.read([_pending_state], 'name state') if sel: # Safety check if any(mod['name'] not in modules for mod in mods): raise Error('Pending actions:\n' + '\n'.join( f" {mod['state']}\t{mod['name']}" for mod in mods)) if button == 'button_uninstall': # Safety check names = ir_module.read([('id', 'in', sel.ids), 'state != installed', 'state != to upgrade', 'state != to remove'], 'name') if names: raise Error(f"Not installed: {', '.join(names)}") # Click upgrade/install/uninstall button if button != 'cancel': self.execute('ir.module.module', button, sel.ids) mods = ir_module.read([_pending_state], 'name state') if not mods: if sel: print(f"Already up-to-date: {self.modules([('id', 'in', sel.ids)])}") elif modules: raise Error(f"Module(s) not found: {', '.join(modules)}") print(f'{updated} module(s) updated') return print(f'{len(sel)} module(s) selected') print(f'{len(mods)} module(s) to process:') for mod in mods: print(f" {mod['state']}\t{mod['name']}") # Confirm? if not quiet and any(mod['id'] not in sel.ids for mod in mods): assert self.client._is_interactive(), "Cannot continue" if not (ans := input('Confirm? [y/N] ')) or ans[:1].lower() != 'y': button = 'cancel' if button == 'cancel': # Reset module state if self.client.version_info < 19.0: installed = [mod['id'] for mod in mods if mod['state'] != 'to install'] uninstalled = [mod['id'] for mod in mods if mod['state'] == 'to install'] if uninstalled: self.execute('ir.module.module', 'button_install_cancel', uninstalled) if installed: self.execute('ir.module.module', 'button_upgrade_cancel', installed) else: # Odoo >= 19 self.execute('ir.module.module', 'button_reset_state') else: # Apply scheduled upgrades self.execute('base.module.upgrade', 'upgrade_module', []) # Empty the cache for this database self.refresh()
[docs] def upgrade(self, *modules, quiet=False): """Press button ``Upgrade``.""" return self._upgrade(modules, button='button_upgrade', quiet=quiet)
[docs] def install(self, *modules, quiet=False): """Press button ``Install``.""" return self._upgrade(modules, button='button_install', quiet=quiet)
[docs] def uninstall(self, *modules, quiet=False): """Press button ``Uninstall``.""" return self._upgrade(modules, button='button_uninstall', quiet=quiet)
[docs] def upgrade_cancel(self): """Press button ``Cancel Upgrade/Install/Uninstall``.""" return self._upgrade((), button='cancel', quiet=True)
[docs] def session_authenticate(self, login=None, password=None): """Create a Webclient session for current user.""" if not login: login = self.user.login params = { 'db': self.db_name, 'login': login, 'password': password or getpass(f"Password for '{color_bold(login)}': "), } self.session_info = self.client._authenticate_session(**params) # When database name is discovered, copy cached data if not self.db_name and (not self.uid or login == self.user.login) and self.session_info.get('db'): empty_db_key = (self.db_name, self.client._server) self.db_name = self.session_info['db'] for key in [*self._cache]: if key[1:] == empty_db_key: self._cache_set(key[0], self._cache[key]) print(f'Session authenticated for {login!r}' if self.session_info['uid'] else 'Failed')
[docs] def session_destroy(self): """Terminate current Webclient session.""" self.client._session_uid = None try: return self.client.web_session.destroy() except ServerError as exc: # Ignore: odoo.http.SessionExpiredException if exc.args[0]['code'] != 100: raise
[docs] def generate_api_key(self): """Generate an API Key and configure environment to use it. Caution: API Key is not saved. It can be set in the configuration: ``api_key = ...``. """ assert self.client.version_info >= 14.0, f'Not supported: Odoo {self.client.version_info}' key_vals = {'name': f'Created by Odooly {__version__}'} wiz_model = self._get("res.users.apikeys.description", transient=True) res = wiz_model.create(key_vals).make_key() self.user._invalidate_cache() self.refresh() # Remove cached Envs assert res['res_model'] == "res.users.apikeys.show" return self.set_api_key(res['context']['default_key'])
def _get_sql_action(self, _external_id="__odooly__.sql"): act_model = self._get('ir.actions.server', False) if not (action := act_model.get(_external_id)): logg_model = self._get('ir.model', False).get('base.model_ir_logging') values = {'name': 'SQL Execute', 'state': 'code', 'model_id': logg_model.id} (action := act_model.create(values).ensure_one())._set_external_id(_external_id) return action.with_context(lang=None)
[docs] def sql(self, queries): """Execute SQL commands on the PostgreSQL server.""" qlist = [] for query in queries.split(";"): if any(li.split('--', 1)[0].strip() for li in query.splitlines()): qlist.append(query.strip()) if not qlist: return None vals = {"name": f"SQL Execute - {datetime.datetime.now()}"} if (sql_action := self._get_sql_action()).code != _sql_action_code: vals["code"] = _sql_action_code sql_action.write(vals) sql_action.with_context(__sql=qlist).run() logg = self._get('ir.logging', False).get([f"func = {vals['name']}"]) return eval(logg.message, {"datetime": datetime}) if logg else None
[docs] class Client: """Connection to an Odoo instance. This is the top level object. The `server` is the URL of the instance, like ``http://localhost:8069``. If `server` is an ``odoo`` Python package, it is used to connect to the local server. The `db` is the name of the database and the `user` should exist in the table ``res.users``. If the `password` is not provided, it will be asked on login. """ _config_file = CONF_FILE _saved_config = {} _globals = None _use_call_button = False def __init__(self, server, db=None, user=None, password=None, api_key=None, verbose=False): self._http = HTTPSession() self._printer = Printer() self._session_uid = None self.verbose = verbose self._set_services(server, db) self.env = Env(self) if user: # Try to login self.login(user, password=password, api_key=api_key, database=db) @property def verbose(self): return self._printer.width @verbose.setter def verbose(self, cols): cols = MAXCOL[min(3, cols) - 1] if (cols or 9) < 9 else cols self._printer.width = cols and max(36, cols) or None def _set_services(self, server, db): if isinstance(server, list): appname = Path(__file__).name.rstrip('co') server = start_odoo_services(server, appname=appname) elif isinstance(server, str): rsvr = urlsplit(server) if "@" in rsvr.netloc: [username, password] = rsvr._userinfo rsvr = rsvr._replace(netloc=rsvr.netloc.rsplit("@", 1)[1]) self._http.set_auth(server, username, password) if rsvr.path[-1:] == '/': rsvr = rsvr._replace(path=rsvr.path.rstrip('/')) server = rsvr.geturl() self._server = server if not isinstance(server, str): self._proxy = self._proxy_odoo elif '/jsonrpc' in server: self._proxy = self._proxy_jsonrpc else: if not db: # Resolve redirects __, server = self._request_parse(server, method='HEAD') if not server.endswith('/web'): server = urljoin(server, '/web') self._server = server self._proxy = self.common = self._object = None if isinstance(server, str): self.web = WebAPI(self, 'web', ()) self.web.doc = WebAPI(self, 'doc', ())._request # Odoo 19 def get_web_api(name): return WebAPI(self, f'web/{name}', _web_methods[name][:]) self.database = get_web_api('database') self.web_dataset = get_web_api('dataset') self.web_session = get_web_api('session') self.web_webclient = get_web_api('webclient') else: self.web = None if self._proxy is None: self.server_version = self.web_webclient.version_info()['server_version'] else: # Create the RPC services self.common = Service(self, 'common', _rpc_methods['common']) self._object = Service(self, 'object', _rpc_methods['object']) self.server_version = self.common.version()['server_version'] # Parse server version major_minor = re.search(r'\d+\.?\d*', self.server_version).group() self.version_info = float(major_minor) assert self.version_info > 8.0, f'Not supported: Odoo {major_minor}' def _request_parse(self, path, *, method=None, data=None, headers=None, regex=None): verb = method or ('GET' if data is None else 'POST') url = urljoin(self._server, path) if not self._printer: res = self._http.request(url, data=data, headers=headers, method=verb) return res, extract_http_response(verb, res, regex) with self._printer as log: log.print_sent(' '.join([verb, path] + format_params(data or {}))) res = self._http.request(url, data=data, headers=headers, method=verb) parsed = extract_http_response(verb, res, regex) log.print_recv(str(parsed)) return res, parsed def _post_jsonrpc(self, endpoint='', params=None): req_id = f"{os.getpid():04x}{int(time.time() * 1E6) % 2**40:010x}" payload = {'jsonrpc': '2.0', 'method': 'call', 'params': params or {}, 'id': req_id} resp = self._http.request(urljoin(self._server, endpoint), json=payload) if r_error := resp.get('error'): raise ServerError(r_error) return resp.get('result') def _proxy_odoo(self, name): return partial(self._server.http.dispatch_rpc, name) _proxy_odoo._protocol_name = 'Odoo' def _proxy_jsonrpc(self, name): def dispatch_jsonrpc(method, args): return self._post_jsonrpc(params={'service': name, 'method': method, 'args': args}) return dispatch_jsonrpc _proxy_jsonrpc._protocol_name = 'JSON-RPC' def _proxy_web(self, name): if name == 'doc': def dispatch_web(model, params): return self._http.request(urljoin(self._server, f"doc/{model}.json"), method='GET') elif name == 'web/database': def dispatch_web(method, params): if method != 'list': return self._http.request(urljoin(self._server, f"{name}/{method}"), data=params) return self._post_jsonrpc(f"{name}/{method}", params=params) else: def dispatch_web(method, params): if method == 'call_kw' and name == 'web/dataset': method = f"{method}/{params['model']}/{params['method']}" return self._post_jsonrpc(f"{name}/{method}", params=params) return dispatch_web
[docs] def save(self, environment=None, skip=False): """Save environment settings with this name, or current name""" self.env.name = environment or self.env.name or self.env.db_name if not skip and self.env.uid: config = (self._server, self.env.db_name, self.env.user.login, None, self.env._api_key) self._saved_config[self.env.name] = config if self._globals and self._globals.get('client', self) is self: self._set_prompt() return self
[docs] @classmethod def get_config(cls, environment): """Retrieve the settings for this environment. It can be in memory, if it was saved before with :meth:`Client.save`. If not, it will parse ``odooly.ini`` file, where it searches for the section ``[ <environment> ]``. See :func:`read_config` for details of the configuration file format. """ assert environment if environment not in cls._saved_config: try: cls._saved_config[environment] = read_config(environment) except Exception: envs = [*cls._saved_config] if cls._config_file.exists(): envs += [env for env in read_config() if env not in cls._saved_config] raise Error("No such environment.\nAvailable: " + ", ".join(envs)) return cls._saved_config[environment]
[docs] @classmethod def from_config(cls, environment, user=None, verbose=False): """Create a connection to a defined environment. See :meth:`Client.get_config` Return a connected :class:`Client`. """ (server, db, conf_user, password, api_key) = cls.get_config(environment) if skip_save := user and user != conf_user: password = None try: client = Env._cache[Env, db, server].client client.verbose = verbose client.login(user or conf_user, password=password, api_key=api_key) except KeyError: client = cls(server, db, user or conf_user, password=password, api_key=api_key, verbose=verbose) return client.save(environment, skip=skip_save)
def __repr__(self): return f"<Client '{self._server}?db={self.env.db_name or ''}'>" def _authenticate(self, db, login, password, api_key): if api_key and not password and self.version_info >= 19.0: json2_api = Json2(self, db, api_key) context = json2_api('res.users', 'context_get', ()) info = {'uid': context['uid'], 'user_context': context, 'db': db} elif self.web: info = self._authenticate_session(db, login, password) else: raise Error("Error: Cannot authenticate") return info def _authenticate_session(self, db, login, password): info = {'uid': None} try: if db: info = self.web_session.authenticate(db=db, login=login, password=password) if self.version_info >= 15.0 and info['uid'] is None: # Is it 2FA? info = self._authenticate_web(db=db, login=login, password=password) else: info = self._authenticate_web(login=login, password=password) self._session_uid = info.get('uid') except TypeError: pass # Cannot extract `csrf_token` or `session_info` with Regex except ServerError as exc: # Ignore: odoo.exceptions.AccessDenied if exc.args[0]['code'] not in (0, 200): raise return info def _authenticate_web(self, **kw): # 1. Get CSRF token qs = f"?{urlencode(dict(db=kw['db']))}" if "db" in kw else "" __, csrf = self._request_parse('/web' + qs, regex=r'csrf_token: "(\w+)"') # 2. Login rv, session_info = self._request_parse('/web/login', data={'csrf_token': csrf, **kw}) for retry in range(4): # 3. Parse 'session_info' if 'user_id' in session_info and 'uid' not in session_info: # Odoo < 18 session_info['uid'] = session_info['user_id'] if retry and not session_info['uid']: print('Verification failed') if session_info['uid'] or 'totp_token' not in rv or retry == 3: break # 4. Ask TOTP code token = getpass(f"Authentication Code for '{color_bold(kw['login'])}' (2FA 6-digits): ") # 5. Submit TOTP params = {'csrf_token': csrf, 'totp_token': token, 'remember': 1} rv, session_info = self._request_parse('/web/login/totp', data=params) return session_info if session_info.get('username') == kw['login'] else {'uid': None} def _authenticate_system(self): __, session_info = self._request_parse('/web/become') self._session_uid = session_info.get('uid') if self._session_uid != 1: raise Error("Cannot become Superuser") return session_info def _select_database(self, db_list, limit=20): if len(db_list) == 1: return db_list[0] if not db_list or not self._is_interactive(): return print('Available databases:') for idx, name in enumerate(db_list[:limit], start=1): print(f' {idx}. {name!r}') if len(db_list) > limit: print(' ...') print() while db_list: ans = input('Select a database: ') try: return db_list[int(ans) - 1] except Exception: pass def _login(self, user, password=None, database=None, api_key=None): """Switch `user` and (optionally) `database`. If the `password` is not available, it will be asked. """ env = self.env if not env.db_name or (database and env.db_name != database): try: dbs = self.database.list() except Exception: pass # AccessDenied: simply ignore this check else: if not database: # Database selector page database = self._select_database(dbs) elif dbs and database not in dbs: raise Error(f"Database {database!r} does not exist: {dbs}") if database and env.db_name != database: if self._session_uid: env.session_destroy() env = Env(self, database) try: self.env = env(user=user, password=password, api_key=api_key) except Exception: raise finally: # Used for logging, copied from odoo.sql_db.db_connect current_thread().dbname = self.env.db_name return self.env.uid
[docs] def login(self, user, password=None, database=None, api_key=None): """Switch `user` and (optionally) `database`.""" if not self._is_interactive(): return self._login(user, password=password, database=database, api_key=api_key) try: register = self._login(user, password=password, database=database, api_key=api_key) except Error as exc: print(exc) register = 'client' not in self._globals # Register the globals() register and self.connect()
def connect(self, env_name=None, *, server=None, database=None, user=None): """Connect to another environment and replace the globals().""" assert self._is_interactive(), 'Not available' if env_name: self.from_config(env_name, user=user, verbose=self.verbose) elif server: if not user and self.env.uid: user = self.env.user.login Client(server, db=database, user=user, verbose=self.verbose) else: assert not user, f"Use client.login({user!r}) instead" self._globals['client'] = self.env.client self._globals['env'] = env = self.env self._set_prompt() # Logged in? if env.uid: print(f"Logged in as '{color_bold(env.user.login)}' with {env._execute_kw._protocol_name}") def _set_prompt(self): # Tweak prompt sys.ps1 = f'{self.env.name or self.env.db_name} >>> ' sys.ps2 = '... '.rjust(len(sys.ps1)) @classmethod def _set_interactive(cls, global_vars={}): global color_bold, color_comment, color_py, color_repr # Don't call multiple times del Client._set_interactive assert not cls._is_interactive() for name in ['__name__', '__version__', '__doc__', 'Client']: global_vars[name] = globals()[name] try: # Python >= 3.14 from _pyrepl.utils import disp_str, gen_colors, _colorize color_py = color_repr = lambda v: "".join(disp_str(v, colors=[*gen_colors(v)])[0]) color_py.__name__ = color_py.__qualname__ = 'color_python' color_bold = color_py('def _').split()[1].replace('_', '{}').format color_comment = color_py('#').replace('#', '{}').format global_vars |= {'color_py': color_py, 'decolor': _colorize.decolor} except ImportError: pass cls._globals = global_vars return global_vars @classmethod def _is_interactive(cls): return cls._globals is not None
[docs] def create_database(self, passwd, database, demo=False, lang='en_US', user_password=ADMIN_USER, login=ADMIN_USER, country_code=None): """Create a new database. The superadmin `passwd` and the `database` name are mandatory. By default, `demo` data are not loaded, `lang` is ``en_US`` and no country is set into the database. Login if successful. """ self.database.create(master_pwd=passwd, name=database, lang=lang, password=user_password, demo=demo, login=login, country_code=country_code, phone='') return self.login(login, user_password, database=database)
[docs] def clone_database(self, passwd, database, neutralize_database=False): """Clone the current database. The superadmin `passwd` and `database` are mandatory. Login if successful. """ extra = {"neutralize_database": neutralize_database} if neutralize_database else {} if extra and self.version_info < 16.0: raise Error("Argument 'neutralize_database' is not supported") self.database.duplicate(master_pwd=passwd, name=self.env.db_name, new_name=database, **extra) # Copy the cache for authentication auth_cache = self.env._cache_get('auth') self.env._cache_set('auth', {**auth_cache}, db_name=database) # Login with the current user into the new database auth_args = { 'password': auth_cache[self.env.user.login][1], 'api_key': self.env._api_key, 'database': database, } return self.login(self.env.user.login, **auth_args)
[docs] def drop_database(self, passwd, database): """Drop the database. The superadmin `passwd` and `database` are mandatory. """ if not database or database == self.env.db_name: raise Error("Failed - Cannot delete active database") self.database.drop(master_pwd=passwd, name=database) if database in self.database.list(): raise Error("Failed - Database was not deleted")
class BaseModel: def sudo(self, user=None): """Attach to the provided user, or Superuser.""" return self.with_env(self.env.sudo(user=user)) def with_context(self, *args, **kwargs): """Attach to an extended context.""" context = dict(args[0] if args else self.env.context, **kwargs) return self.with_env(self.env(context=context)) def with_odoo(self): """Attach to an ``odoo.api.Environment``. Use same (db_name, uid, context) as current ``Env``. Only available in ``local`` mode. """ return self.with_env(self.env.odoo_env)
[docs] class Model(BaseModel): """The class for Odoo models.""" def __new__(cls, env, name): return env[name] @classmethod def _new(cls, env, name): m = object.__new__(cls) (m.env, m._name) = (env, name) m._execute = partial(env.execute, name) return m def __repr__(self): return f"<Model '{self._name}'>"
[docs] def with_env(self, env): """Attach to the provided environment.""" return env[self._name]
[docs] def keys(self): """Return the keys of the model.""" return self._keys
[docs] def fields(self, names=None, attributes=None): """Return a dictionary of the fields of the model. Optional argument `names` is a sequence of field names or a space separated string of these names. If omitted, all fields are returned. Optional argument `attributes` is a sequence of attributes or a space separated string of these attributes. If omitted, all attributes are returned. """ if isinstance(names, str): names = names.split() if isinstance(attributes, str): attributes = attributes.split() if names is None: if attributes is None: return self._fields return {fld: {att: val for (att, val) in vals.items() if att in attributes} for (fld, vals) in self._fields.items()} if attributes is None: return {fld: vals for (fld, vals) in self._fields.items() if fld in names} return {fld: {att: val for (att, val) in vals.items() if att in attributes} for (fld, vals) in self._fields.items() if fld in names}
[docs] def field(self, name): """Return the field properties for field `name`.""" return self._fields[name]
@staticmethod def _parse_doc_methods(doc_dict): methods = doc_dict.get('methods') or {} result = [] for key, vals in methods.items(): arg_names = [*vals['parameters']] if 'model' not in vals.get('api', ()): arg_names.insert(0, 'ids') result.append((key, arg_names)) return result
[docs] def _methods(self, name=''): """List methods and arguments. Argument `name` is a pattern to filter the methods returned. If omitted, all methods are returned. """ try: method_params = self._parse_doc_methods(self._doc) except Exception: method_params = _base_method_params return {key: args for (key, args) in method_params if name in key}
[docs] def access(self, mode="read"): """Check if the user has access to this model. Optional argument `mode` is the access mode to check. Valid values are ``read``, ``write``, ``create`` and ``unlink``. If omitted, the ``read`` mode is checked. Return a boolean. """ return self.env.access(self._name, mode)
[docs] def browse(self, ids=()): """Return a :class:`Record` or a :class:`RecordList`. The argument `ids` accepts a single integer ``id`` or a list of ids. If it is a single integer, the return value is a :class:`Record`. Otherwise, the return value is a :class:`RecordList`. """ if isinstance(ids, int) or (len(ids) == 2 and isinstance(ids[1], str)): return Record(self, ids) return RecordList(self, ids)
[docs] def search(self, domain, **kwargs): """Search for records in the `domain`.""" if kwargs.get('count'): return self.search_count(domain) return RecordList._prepared(self, domain, kwargs)
[docs] def search_count(self, domain=None): """Count the records in the `domain`.""" return self._execute('search_count', domain or [])
def search_read(self, domain=None, fields=None, **kwargs): """Combine search and read.""" fields, fmt = self._parse_format(fields, browse=False) res = self._execute('search_read', domain or [], fields, **kwargs) return fmt(res)
[docs] def get(self, domain, *args, **kwargs): """Return a single :class:`Record`. The argument `domain` accepts a single integer ``id`` or a search domain, or an external ID ``xml_id``. The return value is a :class:`Record` or None. If multiple records are found, a ``ValueError`` is raised. """ if args or kwargs: # Passthrough for env['ir.default'].get and alike return self._execute('get', domain, *args, **kwargs) if isinstance(domain, int): # a single id return Record(self, domain) if isinstance(domain, str): # lookup the xml_id rec = self.env.ref(domain) if not rec: return None assert rec._model is self, f'Model mismatch {rec!r} {self!r}' return rec assert issearchdomain(domain) # a search domain if len(ids := self._execute('search', domain)) > 1: raise ValueError(f'domain matches too many records ({len(ids)})') return Record(self, ids[0]) if ids else None
[docs] def create(self, values): """Create one :class:`Record` or many. The argument `values` is a dictionary of values which are used to create the record. Relationship fields `one2many` and `many2many` accept either a list of ids or a RecordList or the extended Odoo syntax. Relationship fields `many2one` and `reference` accept either a Record or the Odoo syntax. It can create multiple records. The newly created :class:`Record` is returned, or :class:`RecordList`. """ if hasattr(values, "items"): values, nb = self._unbrowse_values(values), 1 else: # Odoo >= 12 values, nb = [self._unbrowse_values(vals) for vals in values], len(values) new_ids = self._execute('create', values) if not new_ids and self.env.client._use_call_button: return self.search([], order='id DESC', limit=nb) return self.browse(new_ids)
[docs] def read(self, *params, **kwargs): """Wrapper for ``client.execute(model, 'read', [...], ('a', 'b'))``. The first argument is a list of ids ``[1, 3]`` or a single id ``42`` or a search domain. The second argument, `fields`, accepts: - a single field: ``'first_name'`` - a tuple of fields: ``('street', 'city')`` - a space separated list: ``'street city'`` - a format string: ``'{street} {city}'`` If `fields` is omitted, all fields are read. If `domain` is a single id, then: - return a single value if a single field is requested. - return a string if a format spec is passed in the `fields` argument. - else, return a dictionary. If `domain` is not a single id, the returned value is a list of items. Each item complies with the rules of the previous paragraph. The optional keyword arguments `offset`, `limit` and `order` are used to restrict the search. """ arg = params[1] if len(params) > 1 else None fields, fmt = self._parse_format(arg, browse=False) if arg is not None: params = (params[0], fields) + params[2:] res = self._execute('read', *params, **kwargs) return fmt(res) if isinstance(res, list) else fmt([res])[0]
def _parse_format(self, arg, browse=True): if not isinstance(arg, str): fields, formatter = arg, None elif '}' in arg: fields = [re.match(r'\w+', tup[1]).group(0) for tup in Formatter().parse(arg) if tup[1]] formatter = arg.format_map else: # transform: "zip city" --> ["zip", "city"] fields = arg.split() formatter = (lambda d: d[fields[0]]) if len(fields) == 1 else None lst_format = lambda values: [(val and formatter(val)) for val in values] if browse: if not formatter: formatter = self._browse_values elif fields == arg.split(): if 'relation' in (fspec := self.field(fields[0])): rel_model = self.env._get(fspec['relation'], False) if fspec['type'] == 'many2one': m_browse = partial(RecordList, rel_model) else: def m_browse(values): if not values: return RecordList(rel_model, ()) return [RecordList(rel_model, val or ()) for val in values] def lst_format(values): return m_browse([(val and formatter(val)) for val in values]) elif fspec['type'] == 'reference': def formatter(dic): if not (value := dic[fields[0]]): return value (res_model, res_id) = value.split(',') rel_model = self.env._get(res_model, False) return Record(rel_model, int(res_id)) elif not formatter: lst_format = lambda values: values return fields, lst_format def _browse_values(self, values): """Wrap the values of a Record. The argument `values` is a dictionary of values read from a Record. When the field type is relational (many2one, one2many or many2many), the value is wrapped in a Record or a RecordList. Return a dictionary with the same keys as the `values` argument. """ for (key, value) in values.items(): if key == 'id' or value is False or hasattr(value, 'id'): continue if (field := self._fields[key])['type'] == 'reference': (res_model, res_id) = value.split(',') value = int(res_id) elif 'relation' in field: res_model = field['relation'] else: continue rel_model = self.env._get(res_model, False) values[key] = rel_model.browse(value) return values def _unbrowse_values(self, values): """Unwrap the id of Record and RecordList.""" new_values = values.copy() for (key, value) in values.items(): many = self._fields[key]['type'] in ('one2many', 'many2many') if hasattr(value, 'id'): if self._fields[key]['type'] == 'reference': new_values[key] = f'{value._name},{value.id}' else: fld_model = self._fields[key].get('relation') if fld_model and value._name != fld_model: raise TypeError(f"Mixing apples and oranges: {fld_model} <> {value._name}") new_values[key] = value = value.id and (value.ids if many else int(value)) if many: if not value: new_values[key] = [(6, 0, [])] elif isinstance(value[0], int): new_values[key] = [(6, 0, value)] return new_values
[docs] def _get_external_ids(self, ids=None): """Retrieve the External IDs of the records. Return a dictionary with keys being the fully qualified External IDs, and values the ``Record`` entries. """ search_domain = [('model', '=', self._name)] if ids is not None: search_domain.append(('res_id', 'in', ids)) existing = self.env._get('ir.model.data', False).read(search_domain, 'module name res_id') return {f"{rec['module']}.{rec['name']}": self.get(rec['res_id']) for rec in existing}
def __getattr__(self, attr): if attr == '_fields': if (vals := self.env._cache_get((attr, self._name))) is None: vals = self._doc['fields'] if self.__dict__.get('_doc') else self._execute('fields_get') self.env._cache_set((attr, self._name), vals) return _memoize(self, attr, vals) if attr == '_keys': return _memoize(self, attr, sorted(self._fields)) if attr == '_doc': return _memoize(self, attr, self.env._doc(self._name) if self.env._doc else None) if attr.startswith('_'): raise AttributeError(f"'Model' object has no attribute {attr!r}") def wrapper(self, *params, **kwargs): """Wrapper for client.execute({!r}, {!r}, *params, **kwargs).""" return self._execute(attr, *params, **kwargs) return _memoize(self, attr, wrapper, (self._name, attr))
class BaseRecord(BaseModel): def __init__(self, res_model, arg): attrs = {'_name': res_model._name, '_model': res_model, 'env': res_model.env, '_execute': res_model._execute} # Bypass __setattr__ method self.__dict__.update(attrs) def __repr__(self): ids = f'length={len(self.ids)}' if len(self.ids) > 6 else self.id return f"<{self.__class__.__name__} '{self._name},{ids}'>" def __bool__(self): return bool(self.ids) def __len__(self): return len(self.ids) def __getitem__(self, key): idname = self._idnames[key] return self._model.browse(idname) if idname is not False else False def __iter__(self): yield from (Record(self._model, idname) for idname in self._idnames) def __contains__(self, item): if isinstance(item, BaseRecord): item._check_model(self, 'in') return len(item) == 1 and item.ids[0] in self.ids def __add__(self, other): return self.concat(other) def __sub__(self, other): self._check_model(other, '-') other_ids = {*other.ids} ids = [idn for (id_, idn) in zip(self.ids, self._idnames) if id_ not in other_ids] return RecordList(self._model, ids) def __and__(self, other): self._check_model(other, '&') other_ids = {*other.ids} self_set = self.union() ids = [idn for (id_, idn) in zip(self_set.ids, self_set._idnames) if id_ in other_ids] return RecordList(self._model, ids) def __or__(self, other): return self.union(other) def __eq__(self, other): return (self.__class__ is other.__class__ and self.ids == other.ids and self._model is other._model) def __ne__(self, other): return not self == other def __lt__(self, other): self._check_model(other, '<') return {*self.ids} < {*other.ids} def __le__(self, other): self._check_model(other, '<=') return {*self.ids}.issubset(other.ids) def __gt__(self, other): self._check_model(other, '>') return {*self.ids} > {*other.ids} def __ge__(self, other): self._check_model(other, '>=') return {*self.ids}.issuperset(other.ids) def __int__(self): return int(self.ensure_one().id) @property def _keys(self): return self._model._keys @property def _fields(self): return self._model._fields def _methods(self, name=''): return self._model._methods(name) def _invalidate_cache(self): pass def ensure_one(self): """Return the single record in this recordset. Raise a ValueError it recordset has more records or is empty. """ if self.id and not isinstance(self.id, list): return self if len((recs := self.union()).id) == 1: return recs[0] raise ValueError(f"Expected singleton: {self}") def exists(self): """Return a subset of records that exist.""" if self.env.client.version_info < 19.0: method, arg = 'exists', self.union().ids else: # Beware that it might be wrong, if `search` method is overloaded. # This is the case for 'ir.attachment' for example. method, arg = 'search', [('id', 'in', self.union().ids)] ids = self.ids and self._execute(method, arg, context={'active_test': False}) if ids and not isinstance(self.id, list): ids = ids[0] return self._model.browse(ids) def get_metadata(self): """Read the metadata of the record(s) Return a dictionary of values. """ return self._execute('get_metadata', self.ids) def with_env(self, env): return env[self._name].browse(self.id) def _check_model(self, other, oper): if not (isinstance(other, BaseRecord) and self._model is other._model): raise TypeError(f"Mixing apples and oranges: {self} {oper} {other}") def _concat_ids(self, args): ids = [*self._idnames] for other in args: self._check_model(other, '+') ids.extend(other._idnames) return ids def concat(self, *args): """Return the concatenation of all records.""" ids = self._concat_ids(args) return RecordList(self._model, ids) def union(self, *args): """Return the union of all records. Preserve first occurence order. """ ids = self._concat_ids(args) if len(ids) > 1: seen = {*()} uniqids = [] for idn in ids: id_, name = idn if isinstance(idn, (list, tuple)) else (idn, None) if id_ not in seen and not seen.add(id_) and id_: uniqids.append((id_, name) if name else id_) ids = uniqids return RecordList(self._model, ids) @classmethod def _union(cls, args): if hasattr(args, 'union'): return args.union() if args and isinstance(args, list) and hasattr(args[0], 'union'): return cls.union(*args) return args def _filter(self, attrs): ids, rels = [], [] for (rec, rel) in zip(self, self.read(attrs.pop(0))): if rel and (not hasattr(rel, 'ids') or rel.id): ids.append(rec._idnames[0]) rels.append(rel) if ids and attrs: relids = {idn[0] for idn in BaseRecord.union(*rels)._filter(attrs)} ids = [rec_id for (rec_id, rel) in zip(ids, rels) if any(rel_id in relids for rel_id in rel.ids)] return ids def mapped(self, func): """Apply ``func`` on all records.""" if callable(func): return self._union([func(rec) for rec in self]) # func is a path vals = self[:] for name in func.split('.'): vals = self._union(vals.read(name)) return vals def filtered(self, func): """Select the records such that ``func(rec)`` is true. As an alternative ``func`` can be a search domain (list) to search among the records. """ if callable(func): ids = [rec._idnames[0] for rec in self if func(rec)] elif isinstance(func, list): return self & self._model.search([('id', 'in', self.ids)] + func) else: ids = self[:]._filter(func.split('.')) if func else self._idnames return RecordList(self._model, ids) def sorted(self, key=None, reverse=False): """Return the records sorted by ``key``.""" if len((recs := self.union()).ids) < 2: return recs if key is None: idnames = dict(zip(recs.ids, recs._idnames)) recs = self._model.search([('id', 'in', recs.ids)]) ids = [idnames[id_] for id_ in recs.ids] elif isinstance(key, str): vals = sorted(zip(recs.read(key), recs._idnames)) ids = [idn for (__, idn) in vals] else: ids = [rec._idnames[0] for rec in sorted(recs, key=key)] return RecordList(self._model, ids[::-1] if reverse else ids) def write(self, values): """Write the `values` in the record(s). `values` is a dictionary of values. See :meth:`Model.create` for details. """ if not self.id: return True values = self._model._unbrowse_values(values) self._invalidate_cache() return self._execute('write', self.ids, values) def unlink(self): """Delete the record(s) from the database.""" if not self.id: return True self._invalidate_cache() return self._execute('unlink', self.ids)
[docs] class RecordList(BaseRecord): """A sequence of Odoo :class:`Record`. It has a similar API as the :class:`Record` class, but for a list of records. The attributes of the ``RecordList`` are read-only, and they return list of attribute values in the same order. The ``many2one``, ``one2many`` and ``many2many`` attributes are wrapped in ``RecordList`` and list of ``RecordList`` objects. Use the method ``RecordList.write`` to assign a single value to all the selected records. """ def __init__(self, res_model, arg, search=None): super().__init__(res_model, arg) if search is None: Ids, idnames = self.env._class_ids[0], arg or () ids = Ids(idnames) for index, id_ in enumerate(arg): if isinstance(id_, (list, tuple)): ids[index] = id_ = id_[0] assert isinstance(id_, int), repr(id_) self.__dict__.update({'id': ids, 'ids': ids, '_idnames': idnames, '_search_args': None}) else: self.__dict__['_search_args'] = {'model': res_model, **search} @classmethod def _prepared(cls, res_model, domain, params): [domain] = searchargs((domain,)) return cls(res_model, None, search={'domain': domain, **params}) def refresh(self): """Reset :class:`RecordList` content.""" if self._search_args: for key in 'id', 'ids', '_idnames': self.__dict__.pop(key, None) def __getitem__(self, key): if 'id' not in self.__dict__ and isinstance(key, int) and key >= 0: self, key = self[key:key+1], 0 if 'id' in self.__dict__ or (getattr(key, 'start', -1) or 0) < 0: return super().__getitem__(key) is_stop_positive = key.stop is not None and key.stop >= 0 search_args = {**self._search_args} new_offset, new_stop = key.start or 0, search_args.get('limit', None) if is_stop_positive: new_stop = key.stop if new_stop is None else min(new_stop, key.stop) if new_stop is not None: search_args['limit'] = limit = new_stop - new_offset if limit <= 0: return RecordList(self._model, []) if new_offset: search_args['offset'] = (search_args.get('offset') or 0) + new_offset result = RecordList(self._model, None, search=search_args) if (is_stop_positive or key.stop is None) and key.step in (1, None): return result key = slice(None, limit if is_stop_positive else key.stop, key.step) return super(RecordList, result).__getitem__(key)
[docs] def with_env(self, env): if 'id' in self.__dict__: return super().with_env(env) return RecordList(env[self._name], None, {**self._search_args})
[docs] def read(self, fields=None): """Read the `fields` of the :class:`RecordList`. The argument `fields` accepts different kinds of values. See :meth:`Model.read` for details. """ fields, fmt = self._model._parse_format(fields) if fields == ['id']: values = [{'id': res_id} for res_id in self.ids] elif 'id' not in self.__dict__: params = {**self._search_args} values = params.pop('model').search_read(params.pop('domain'), fields, **params) ids = idnames = [val['id'] for val in values] if values and 'display_name' in values[0]: idnames = [(val['id'], val['display_name']) for val in values] Ids, __ = self.env._class_ids self.__dict__.update({'id': Ids(ids), 'ids': Ids(ids), '_idnames': idnames}) else: values = self._model.read(self.ids, fields, order=True) if self.ids else [] return fmt(values)
[docs] def copy(self, default=None): """Copy records and return :class:`RecordList`. The optional argument `default` is a mapping which overrides some values of the new records. Supported since Odoo 18. """ default = default and self._model._unbrowse_values(default) new_ids = self._execute('copy', self.ids, default) return RecordList(self._model, new_ids)
@property def _external_id(self): """Retrieve the External IDs of the :class:`RecordList`. Return the fully qualified External IDs with default value False if there's none. If multiple IDs exist for a record, only one of them is returned (randomly). """ xml_ids = {r.id: xml_id for (xml_id, r) in self._model._get_external_ids(self.ids).items()} return [xml_ids.get(res_id, False) for res_id in self.id] def __getattr__(self, attr): if attr in ('id', 'ids', '_idnames'): params = {**self._search_args} ids = params.pop('model')._execute('search', params.pop('domain'), **params) Ids, __ = self.env._class_ids self.__dict__.update({'id': Ids(ids), 'ids': Ids(ids), '_idnames': ids}) return self.__dict__[attr] if attr in self._model._keys: return self.read(attr) if attr.startswith('_'): errmsg = f"'RecordList' object has no attribute {attr!r}" raise AttributeError(errmsg) def wrapper(self, *params, **kwargs): """Wrapper for client.execute({!r}, {!r}, [...], *params, **kwargs).""" return self._execute(attr, self.ids, *params, **kwargs) return _memoize(self, attr, wrapper, (self._name, attr)) def __setattr__(self, attr, value): if attr in self._model._keys or attr == 'id': msg = f"attribute {attr!r} is read-only; use 'RecordList.write' instead." else: msg = f"has no attribute {attr!r}" raise AttributeError("'RecordList' object " + msg)
[docs] class Record(BaseRecord): """A class for all Odoo records. It maps any Odoo object. The fields can be accessed through attributes. The changes are immediately sent to the server. The ``many2one``, ``one2many`` and ``many2many`` attributes are wrapped in ``Record`` and ``RecordList`` objects. These attributes support writing too. The attributes are evaluated lazily, and they are cached in the record. The Record's cache is invalidated if any attribute is changed. """ def __init__(self, res_model, arg): super().__init__(res_model, arg) if isinstance(arg, int): name, idnames = None, [arg] else: idnames = [(arg, name)] = [arg] Ids, Id1 = self.env._class_ids attrs = {'id': Id1(arg), 'ids': Ids([arg]), '_idnames': idnames, '_cached_keys': {*()}} if name is not None: attrs['_Record__name'] = attrs['display_name'] = name self.__dict__.update(attrs) def __str__(self): return self.__name if self.id else 'False' def _get_name(self): try: name = self.display_name except Exception: name = f'{self._name},{self.id}' self.__dict__['_idnames'] = [(int(self.id), str(name))] return _memoize(self, '_Record__name', str(name))
[docs] def refresh(self): """Force refreshing the record's data.""" self._invalidate_cache()
def _invalidate_cache(self): self._cached_keys.discard('id') for key in self._cached_keys: delattr(self, key) self._cached_keys.clear() def _update(self, values): new_values = self._model._browse_values(values) self.__dict__.update(new_values) self._cached_keys.update(new_values) return new_values
[docs] def read(self, fields=None): """Read the `fields` of the :class:`Record`. The argument `fields` accepts different kinds of values. See :meth:`Model.read` for details. """ rv = self._model.read(self.id, fields) if rv is not None and isinstance(fields, str) and fields in self._model._keys: return self._update({fields: rv})[fields] if isinstance(rv, dict): return self._update(rv) return rv
[docs] def copy(self, default=None): """Copy a record and return the new :class:`Record`. The optional argument `default` is a mapping which overrides some values of the new record. """ default = default and self._model._unbrowse_values(default) new_id = self._execute('copy', self.id, default) if isinstance(new_id, list): [new_id] = new_id or [False] return Record(self._model, new_id)
@property def _external_id(self): """Retrieve the External ID of the :class:`Record`. Return the fully qualified External ID of the :class:`Record`, with default value False if there's none. If multiple IDs exist, only one of them is returned (randomly). """ xml_ids = self._model._get_external_ids(self.ids) return [*xml_ids][0] if xml_ids else False def _set_external_id(self, xml_id): """Set the External ID of this record.""" (mod, name) = xml_id.split('.') domain = ['|', '&', ('module', '=', mod), ('name', '=', name), '&', ('model', '=', self._name), ('res_id', '=', self.id)] if self.env._get('ir.model.data', False).search(domain): raise ValueError(f'ID {xml_id!r} collides with another entry') values = {'model': self._name, 'res_id': self.id, 'module': mod, 'name': name} self.env._get('ir.model.data', False).create(values) def __getattr__(self, attr): if attr in self._model._keys: return self.read(attr) if attr == '_Record__name': return self._get_name() if attr.startswith('_'): raise AttributeError(f"'Record' object has no attribute {attr!r}") def wrapper(self, *params, **kwargs): """Wrapper for client.execute({!r}, {!r}, {:d}, *params, **kwargs).""" res = self._execute(attr, self.ids, *params, **kwargs) self._invalidate_cache() if isinstance(res, list) and len(res) == 1: return res[0] return res return _memoize(self, attr, wrapper, (self._name, attr, self.id)) def __setattr__(self, attr, value): if attr == '_external_id': return self._set_external_id(value) if attr not in self._model._keys: raise AttributeError(f"'Record' object has no attribute {attr!r}") if attr == 'id': raise AttributeError("'Record' object attribute 'id' is read-only") self.write({attr: value})
def _interact(global_vars, use_pprint=True, usage=USAGE): import builtins import pprint if use_pyrepl := not os.getenv("PYTHON_BASIC_REPL"): try: from _pyrepl.main import CAN_USE_PYREPL as use_pyrepl except ImportError: use_pyrepl = False if use_pyrepl: # Python >= 3.13 from _pyrepl.console import InteractiveColoredConsole as Console from _pyrepl.simple_interact import run_multiline_interactive_console as run from _pyrepl import readline else: from code import InteractiveConsole as Console builtins.clear = type('', (), {'__repr__': lambda s: os.system('clear') or ''})() try: import readline import rlcompleter readline.parse_and_bind('tab: complete') except ImportError: pass try: readline.read_history_file(HIST_FILE) if readline.get_history_length() < 0: readline.set_history_length(int(os.getenv('HISTSIZE', 500))) # better append instead of replace? atexit.register(readline.write_history_file, HIST_FILE) except Exception: pass # IOError if file missing, or other error if use_pprint: pp = lambda obj: print(color_repr(pprint.pformat(obj, **PP_FORMAT))) def displayhook(value, _printer=pp, _builtins=builtins): # Pretty-format the output if value is not None: _printer(value) _builtins._ = value sys.displayhook = displayhook def excepthook(exc_type, exc, tb): # Print readable errors msg = ''.join(format_exception(exc_type, exc, tb, chain=False, **exfmt)) print(color_repr(msg.rstrip())) sys.excepthook = excepthook builtins.usage = type('Usage', (), {'__call__': lambda s: print(usage), '__repr__': lambda s: usage})() console = Console(global_vars, filename="<stdin>") exfmt = {"colorize": console.can_colorize} if use_pyrepl else {} run(console) if use_pyrepl else console.interact('', '') def get_parser(): parser = argparse.ArgumentParser(description='Interact with Odoo') parser.add_argument('args', nargs='*', help='URL of the server') parser.add_argument( '-l', '--list', action='store_true', dest='list_env', help='list sections of the configuration') parser.add_argument( '--env', help='read connection settings from the given section') parser.add_argument( '-c', '--config', default=None, help=f'specify alternate config file (default: {CONF_FILE})') parser.add_argument( '--server', default=None, help=f'full URL of the server (default: {DEFAULT_URL})') parser.add_argument('-d', '--db', default=None, help='database') parser.add_argument('-u', '--user', default=None, help='username') parser.add_argument( '-p', '--password', default=None, metavar='PASS', help='password, or it will be requested on login') parser.add_argument( '--api-key', dest='api_key', default=None, help='API Key for JSON-2 or JSON-RPC') parser.add_argument( '-v', '--verbose', default=0, action='count', help='verbose') parser.add_argument('--version', action='version', version=__version__) return parser def connect_client(args): if args.env: client = Client.from_config(args.env, user=args.user, verbose=args.verbose) else: if not args.server: args.server = ['-c', args.config] if args.config else DEFAULT_URL if args.args: args.server = args.server + args.args if args.config else ' '.join(args.args) if not args.user: args.user = ADMIN_USER client = Client(args.server, args.db, args.user, password=args.password, api_key=args.api_key, verbose=args.verbose) return client def main(interact=_interact): args = get_parser().parse_args() Client._config_file = Path.cwd() / (args.config or CONF_FILE) if args.list_env: print('Available settings: ' + ' '.join(read_config())) return global_vars = Client._set_interactive() print(color_repr(USAGE)) connect_client(args) return interact(global_vars) if interact else global_vars if __name__ == '__main__': main()