T O P

[资源分享]     【odoo】【知识杂谈】单一实例多库模式下定时任务的问题分析

  • By - 楼主

  • 2021-09-06 10:00:17
  • 欢迎转载,但需标注出处,谢谢!

    背景:

    有客户反应有个别模块下的定时任务没有正常执行,是否是新装的模块哪些有问题?排查后发现,客户是在一台服务器上跑着一个odoo容器,对应多个数据库。个别库的定时任务是正常的,但是一个对接其他平台的库的定时任务没有正常跑起来。

    先说结论,看官没时间支持按说明处理即可,分析过程在下面。

    结论

    在odoo的配置文件db_name字段配置希望后台一直跑着的库名称字符串,以英文“,”分割。

    分析

    直接源码

    1. 看odoo日志,我们知道odoo的任务正常执行时会打印Starting Job 任务名称,直接vscode全局查找,定位到ir_cron.py文件的_process_jobs函数。
        @classmethod
        def _process_jobs(cls, db_name):
            """ Try to process all cron jobs.
    
            This selects in database all the jobs that should be processed. It then
            tries to lock each of them and, if it succeeds, run the cron job (if it
            doesn't succeed, it means the job was already locked to be taken care
            of by another thread) and return.
    
            :raise BadVersion: if the version is different from the worker's
            :raise BadModuleState: if modules are to install/upgrade/remove
            """
            db = odoo.sql_db.db_connect(db_name)
            threading.current_thread().dbname = db_name
            try:
                with db.cursor() as cr:
                    # Make sure the database has the same version as the code of
                    # base and that no module must be installed/upgraded/removed
                    cr.execute("SELECT latest_version FROM ir_module_module WHERE name=%s", ['base'])
                    (version,) = cr.fetchone()
                    cr.execute("SELECT COUNT(*) FROM ir_module_module WHERE state LIKE %s", ['to %'])
                    (changes,) = cr.fetchone()
                    if version is None:
                        raise BadModuleState()
                    elif version != BASE_VERSION:
                        raise BadVersion()
                    # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
                    cr.execute("""SELECT * FROM ir_cron
                                  WHERE numbercall != 0
                                      AND active AND nextcall <= (now() at time zone 'UTC')
                                  ORDER BY priority""")
                    jobs = cr.dictfetchall()
    
                if changes:
                    if not jobs:
                        raise BadModuleState()
                    # nextcall is never updated if the cron is not executed,
                    # it is used as a sentinel value to check whether cron jobs
                    # have been locked for a long time (stuck)
                    parse = fields.Datetime.from_string
                    oldest = min([parse(job['nextcall']) for job in jobs])
                    if datetime.now() - oldest > MAX_FAIL_TIME:
                        odoo.modules.reset_modules_state(db_name)
                    else:
                        raise BadModuleState()
    
                for job in jobs:
                    lock_cr = db.cursor()
                    try:
                        # Try to grab an exclusive lock on the job row from within the task transaction
                        # Restrict to the same conditions as for the search since the job may have already
                        # been run by an other thread when cron is running in multi thread
                        lock_cr.execute("""SELECT *
                                           FROM ir_cron
                                           WHERE numbercall != 0
                                              AND active
                                              AND nextcall <= (now() at time zone 'UTC')
                                              AND id=%s
                                           FOR UPDATE NOWAIT""",
                                       (job['id'],), log_exceptions=False)
    
                        locked_job = lock_cr.fetchone()
                        if not locked_job:
                            _logger.debug("Job `%s` already executed by another process/thread. skipping it", job['cron_name'])
                            continue
                        # Got the lock on the job row, run its code
                        _logger.info('Starting job `%s`.', job['cron_name'])
                        job_cr = db.cursor()
                        try:
                            registry = odoo.registry(db_name)
                            registry[cls._name]._process_job(job_cr, job, lock_cr)
                            _logger.info('Job `%s` done.', job['cron_name'])
                        except Exception:
                            _logger.exception('Unexpected exception while processing cron job %r', job)
                        finally:
                            job_cr.close()
    
                    except psycopg2.OperationalError as e:
                        if e.pgcode == '55P03':
                            # Class 55: Object not in prerequisite state; 55P03: lock_not_available
                            _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['cron_name'])
                            continue
                        else:
                            # Unexpected OperationalError
                            raise
                    finally:
                        # we're exiting due to an exception while acquiring the lock
                        lock_cr.close()
    
            finally:
                if hasattr(threading.current_thread(), 'dbname'):
                    del threading.current_thread().dbname
    
    1. 看到上面这个函数已经是执行的具体内容了。我们继续在该文件查找_process_jobs函数 => _acquire_job函数 => server.py文件中的cron_thread函数 => cron_spawn函数。
      其中cron_spawn定了开启几个cron线程,由max_cron_threads决定。
      在cron_thread函数中,我们可以看到定时任务的调用过程
        def cron_thread(self, number):
            from odoo.addons.base.models.ir_cron import ir_cron
            while True:
                time.sleep(SLEEP_INTERVAL + number)     # Steve Reich timing style
                registries = odoo.modules.registry.Registry.registries
                _logger.debug('cron%d polling for jobs', number)
                for db_name, registry in registries.d.items():
                    if registry.ready:
                        thread = threading.currentThread()
                        thread.start_time = time.time()
                        try:
                            ir_cron._acquire_job(db_name)
                        except Exception:
                            _logger.warning('cron%d encountered an Exception:', number, exc_info=True)
                        thread.start_time = None
    
    1. 核心内容是registries.d.items(),cron线程将循环调用registries中的数据库信息,那么这个变量中到底有哪些内容,如何添加的呢?可以全局搜索registries,定位到Registry.py文件(具体的registry类对象)以及server.py中的preload_registries函数以及调用该函数的run函数。看名称可以了解将预加载registry信息。
    
        def run(self, preload=None, stop=False):
            """ Start the http server and the cron thread then wait for a signal.
    
            The first SIGINT or SIGTERM signal will initiate a graceful shutdown while
            a second one if any will force an immediate exit.
            """
            self.start(stop=stop)
    
            rc = preload_registries(preload)
    
            if stop:
                if config['test_enable']:
                    logger = odoo.tests.runner._logger
                    with Registry.registries._lock:
                        for db, registry in Registry.registries.d.items():
                            report = registry._assertion_report
                            log = logger.error if not report.wasSuccessful() \
                             else logger.warning if not report.testsRun \
                             else logger.info
                            log("%s when loading database %r", report, db)
                self.stop()
                return rc
    
    1. 核心是run函数汇总的preload变量,记录着将初始化哪些数据库的对象;

    2. 查找上级为,server.py中的main函数,至此所有思路都清晰了,看源码

    
    def main(args):
        check_root_user()
        odoo.tools.config.parse_config(args)
        check_postgres_user()
        report_configuration()
    
        config = odoo.tools.config
    
        # the default limit for CSV fields in the module is 128KiB, which is not
        # quite sufficient to import images to store in attachment. 500MiB is a
        # bit overkill, but better safe than sorry I guess
        csv.field_size_limit(500 * 1024 * 1024)
    
        preload = []
        if config['db_name']:
            preload = config['db_name'].split(',')
            for db_name in preload:
                try:
                    odoo.service.db._create_empty_database(db_name)
                    config['init']['base'] = True
                except ProgrammingError as err:
                    if err.pgcode == errorcodes.INSUFFICIENT_PRIVILEGE:
                        # We use an INFO loglevel on purpose in order to avoid
                        # reporting unnecessary warnings on build environment
                        # using restricted database access.
                        _logger.info("Could not determine if database %s exists, "
                                     "skipping auto-creation: %s", db_name, err)
                    else:
                        raise err
                except odoo.service.db.DatabaseExists:
                    pass
    
        if config["translate_out"]:
            export_translation()
            sys.exit(0)
    
        if config["translate_in"]:
            import_translation()
            sys.exit(0)
    
        # This needs to be done now to ensure the use of the multiprocessing
        # signaling mecanism for registries loaded with -d
        if config['workers']:
            odoo.multi_process = True
    
        stop = config["stop_after_init"]
    
        setup_pid_file()
        rc = odoo.service.server.start(preload=preload, stop=stop)
        sys.exit(rc)
    

    在上面,我们preload为配置文件中的db_name的值,那么正向梳理回去就是
    a) odoo在启动的时候加载odoo.conf配置文件,并读取db_name的值
    b) 加载完成后将通过db_name的值初始化数据库对象;
    c) 并在完成cron线程初始化后循环调用库对象,执行相关定时任务。

    本帖子中包含资源

    您需要 登录 才可以下载,没有帐号?立即注册