Source code for pyabsa.utils.exception_utils.exception_utils

# -*- coding: utf-8 -*-
# file: exception_utils.py
# time: 02/11/2022 17:15
# author: YANG, HENG <hy345@exeter.ac.uk> (杨恒)
# github: https://github.com/yangheng95
# GScholar: https://scholar.google.com/citations?user=NPq5a_0AAAAJ&hl=en
# ResearchGate: https://www.researchgate.net/profile/Heng-Yang-17/research
# Copyright (C) 2022. All Rights Reserved.
import functools
import multiprocessing


[docs] def time_out(max_timeout): """Timeout decorator, parameter in seconds.""" def timeout_decorator(item): """Wrap the original function.""" @functools.wraps(item) def func_wrapper(*args, **kwargs): """Closure for function.""" try: pool = multiprocessing.Pool(processes=1) async_result = pool.apply_async(item, args=args, kwds=kwargs) return async_result.get(max_timeout) except Exception as e: return None return func_wrapper return timeout_decorator