40 lines
1.0 KiB
Python
40 lines
1.0 KiB
Python
from ..models import CronJob
|
|
|
|
|
|
class CronJobService:
|
|
@staticmethod
|
|
def create_cron_job(agent_id, data):
|
|
cron_job = CronJob(
|
|
agent_id=agent_id,
|
|
name=data["name"],
|
|
cron_expression=data["cron_expression"],
|
|
prompt=data["prompt"],
|
|
is_active=data.get("is_active", True),
|
|
)
|
|
cron_job.save()
|
|
return cron_job
|
|
|
|
@staticmethod
|
|
def update_cron_job(cron_job, data):
|
|
if "name" in data:
|
|
cron_job.name = data["name"]
|
|
|
|
if "cron_expression" in data:
|
|
cron_job.cron_expression = data["cron_expression"]
|
|
|
|
if "prompt" in data:
|
|
cron_job.prompt = data["prompt"]
|
|
|
|
if "is_active" in data:
|
|
cron_job.is_active = data["is_active"]
|
|
|
|
cron_job.save()
|
|
return cron_job
|
|
|
|
@staticmethod
|
|
def delete_cron_job(cron_job_id):
|
|
cron_job = CronJob.query.get(cron_job_id)
|
|
if not cron_job:
|
|
raise ValueError("定时任务不存在")
|
|
cron_job.delete()
|