from ..models import CronJob class CronJobService: @staticmethod def create_cron_job(agent_id, data): cron_job = CronJob( agent_id=agent_id, name=data["name"], cron_expression=data["cron_expression"], prompt=data["prompt"], is_active=data.get("is_active", True), ) cron_job.save() return cron_job @staticmethod def update_cron_job(cron_job, data): if "name" in data: cron_job.name = data["name"] if "cron_expression" in data: cron_job.cron_expression = data["cron_expression"] if "prompt" in data: cron_job.prompt = data["prompt"] if "is_active" in data: cron_job.is_active = data["is_active"] cron_job.save() return cron_job @staticmethod def delete_cron_job(cron_job_id): cron_job = CronJob.query.get(cron_job_id) if not cron_job: raise ValueError("定时任务不存在") cron_job.delete()