12 Star 102 Fork 24

MegaLab / mega-ide

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
job_scan_task.py 2.93 KB
一键复制 编辑 原始数据 按行查看 历史
小埋酱 提交于 2022-04-15 18:19 . up
from server.app.cloud_ide.model import IdeImage, Ide, IdeRegistry
from server.framework.core.database import SessionLocal
from server.framework.core.logger import logger
import requests
from server.framework.core.scheduler import scheduler
from server.framework.core.settings import settings
@scheduler.scheduled_job('interval', id='ide_status_scanner', seconds=5)
def ide_status_scanner():
db = SessionLocal()
ides = db.query(Ide).all()
for ide in ides:
try:
status = requests.get(f'http://{ide.id}.{settings.ide_domain}', timeout=2000)
if ide.status == 1 and (
status.status_code == 200 or status.status_code == 500 or status.status_code == 401):
ide.status = 2
db.commit()
if ide.status == 3 and (
status.status_code != 200 or status.status_code != 500 or status.status_code != 401):
ide.status = 0
db.commit()
logger.error(f'检查IDE[{ide.name}]运行状态,当前状态为[{status.status_code}]')
except Exception as e:
logger.error(f'检查IDE[{ide.name}]运行状态失败,异常信息:[{e}]')
db.close()
@scheduler.scheduled_job('interval', id='build_image_scanner', seconds=5)
def build_image_scanner():
argo_job_status = requests.get(f'{settings.argo_url}/api/v1/workflows/{settings.kubernetes_namespace}').json()
db = SessionLocal()
images = db.query(IdeImage).filter(IdeImage.status == 1).all()
for image in images:
for job in argo_job_status['items']:
if image.job_id == job['metadata']['name']:
if job['status']['phase'] == 'Succeeded':
image.status = 3
if job['status']['phase'] in ['Failed', 'Error']:
image.status = 2
db.commit()
logger.info(f'检查镜像[{image.name}]编译情况,状态为:[{job["status"]["phase"]}]')
db.close()
@scheduler.scheduled_job('interval', id='registry_scanner', seconds=5)
def registry_scanner():
"""
扫描镜像仓库的状态
@return:
"""
try:
db = SessionLocal()
registries = db.query(IdeRegistry).filter(IdeRegistry.enable != 0).all()
for registry in registries:
try:
status = requests.get(f'http://{registry.registry}', timeout=2)
if registry.enable == 1 and status.status_code != 200:
registry.enable = 2
if registry.enable == 2 and status.status_code == 200:
registry.enable = 1
logger.info(f'检查镜像源[{registry.name}],镜像源状态为[{status.status_code}]')
except Exception as e:
registry.enable = 2
logger.error(f'检查镜像源[{registry.name}]失败,异常信息:[{e}]')
db.commit()
finally:
db.close()
if __name__ == '__main__':
scheduler.start()
Python
1
https://gitee.com/mega-lab/mega-ide.git
git@gitee.com:mega-lab/mega-ide.git
mega-lab
mega-ide
mega-ide
master

搜索帮助

53164aa7 5694891 3bd8fe86 5694891