From 4445b82062e33e7a5886535bca91664a083eb3fa Mon Sep 17 00:00:00 2001 From: carddev81 Date: Sun, 7 Jun 2026 09:46:54 -0500 Subject: [PATCH] feat: add batch process for updating status of classes and enrolled_at per ticket 607 --- backend/src/models/jobs.go | 26 ++++--- backend/src/tasks/scheduler.go | 45 ++++++++++++ provider-middleware/handlers.go | 1 + provider-middleware/program_classes.go | 94 ++++++++++++++++++++++++++ 4 files changed, 157 insertions(+), 9 deletions(-) create mode 100644 provider-middleware/program_classes.go diff --git a/backend/src/models/jobs.go b/backend/src/models/jobs.go index b6902b86e..d44621edf 100644 --- a/backend/src/models/jobs.go +++ b/backend/src/models/jobs.go @@ -33,6 +33,8 @@ func (cj *CronJob) BeforeCreate(tx *gorm.DB) error { cj.Category = OpenContentJob } else if slices.Contains(AllDefaultProviderJobs, JobType(cj.Name)) { cj.Category = ProviderPlatformJob + } else if slices.Contains(AllSystemJobs, JobType(cj.Name)) { + cj.Category = SystemJob } switch cj.Name { case string(RetryVideoDownloadsJob): @@ -41,6 +43,8 @@ func (cj *CronJob) BeforeCreate(tx *gorm.DB) error { schedule = EveryDaytimeHour } cj.Schedule = schedule + case string(ActivateScheduledClassesJob): + cj.Schedule = EveryMorningAt5AM default: cj.Schedule = os.Getenv("MIDDLEWARE_CRON_SCHEDULE") } @@ -83,24 +87,28 @@ func (RunnableTask) TableName() string { return "runnable_tasks" } const ( ProviderPlatformJob = 1 OpenContentJob = 2 + SystemJob = 3 GetMilestonesJob JobType = "get_milestones" GetCoursesJob JobType = "get_courses" GetActivityJob JobType = "get_activity" - ScrapeKiwixJob JobType = "scrape_kiwix" - RetryVideoDownloadsJob JobType = "retry_video_downloads" - RetryManualDownloadJob JobType = "retry_manual_download" - SyncVideoMetadataJob JobType = "sync_video_metadata" - AddVideosJob JobType = "add_videos" - EveryDaytimeHour string = "0 6-20 * * *" - EverySundayAt8PM string = "0 20 * * 6" - StatusPending JobStatus = "pending" - StatusRunning JobStatus = "running" + ScrapeKiwixJob JobType = "scrape_kiwix" + RetryVideoDownloadsJob JobType = "retry_video_downloads" + RetryManualDownloadJob JobType = "retry_manual_download" + SyncVideoMetadataJob JobType = "sync_video_metadata" + AddVideosJob JobType = "add_videos" + ActivateScheduledClassesJob JobType = "activate_scheduled_classes" + EveryDaytimeHour string = "0 6-20 * * *" + EverySundayAt8PM string = "0 20 * * 6" + EveryMorningAt5AM string = "0 5 * * *" + StatusPending JobStatus = "pending" + StatusRunning JobStatus = "running" ) var AllDefaultProviderJobs = []JobType{GetCoursesJob, GetMilestonesJob, GetActivityJob} var AllContentProviderJobs = []JobType{ScrapeKiwixJob, RetryVideoDownloadsJob, SyncVideoMetadataJob} +var AllSystemJobs = []JobType{ActivateScheduledClassesJob} func (jt JobType) IsVideoJob() bool { switch jt { diff --git a/backend/src/tasks/scheduler.go b/backend/src/tasks/scheduler.go index 1da332137..dd900803b 100644 --- a/backend/src/tasks/scheduler.go +++ b/backend/src/tasks/scheduler.go @@ -45,9 +45,54 @@ func (s *Scheduler) generateTasks() ([]models.RunnableTask, error) { } else { log.Println("Failed to generate provider tasks") } + if systemTasks, err := s.generateSystemTasks(); err == nil { + allTasks = append(allTasks, systemTasks...) + } else { + log.Println("Failed to generate system tasks") + } return allTasks, nil } +// generateSystemTasks builds the platform-wide batch tasks that are not tied to +// any provider (e.g. activating classes whose scheduled start date has arrived). +func (s *Scheduler) generateSystemTasks() ([]models.RunnableTask, error) { + systemTasks := make([]models.RunnableTask, 0, len(models.AllSystemJobs)) + for _, jobType := range models.AllSystemJobs { + created, err := s.createIfNotExists(jobType) + if err != nil { + log.Errorf("failed to create system job %s: %v", jobType, err) + return nil, err + } + task := models.RunnableTask{JobID: created.ID, Status: models.StatusPending} + if err := s.intoSystemTask(created, &task); err != nil { + log.Errorf("failed to create task for system job %s: %v", jobType, err) + return nil, err + } + systemTasks = append(systemTasks, task) + } + return systemTasks, nil +} + +// intoSystemTask finds or creates the single provider-less RunnableTask for a +// system job and prepares its parameters for publishing. +func (s *Scheduler) intoSystemTask(cj *models.CronJob, task *models.RunnableTask) error { + if task.ID == 0 { + if err := s.db.Model(&models.RunnableTask{}). + Where("job_id = ? AND provider_platform_id IS NULL AND open_content_provider_id IS NULL", cj.ID). + FirstOrCreate(&task).Error; err != nil { + log.Errorf("failed to create task for job: %v. error: %v", cj.Name, err) + return err + } + if err := s.db.Model(&models.RunnableTask{}).Preload("Job").First(&task, task.ID).Error; err != nil { + log.Errorf("failed to reload task for job: %v. error: %v", cj.Name, err) + return err + } + } + task.Job = cj + task.Prepare(nil) + return nil +} + func (s *Scheduler) generateOpenContentProviderTasks() ([]models.RunnableTask, error) { otherTasks := make([]models.RunnableTask, 0, 5) providers := make([]models.OpenContentProvider, 0, 2) diff --git a/provider-middleware/handlers.go b/provider-middleware/handlers.go index 76137402d..ec95e06f6 100644 --- a/provider-middleware/handlers.go +++ b/provider-middleware/handlers.go @@ -36,6 +36,7 @@ func (sh *ServiceHandler) initSubscription() error { {models.RetryVideoDownloadsJob.PubName(), sh.handleRetryFailedVideos}, {models.RetryManualDownloadJob.PubName(), sh.handleManualRetryDownload}, {models.SyncVideoMetadataJob.PubName(), sh.handleSyncVideoMetadata}, + {models.ActivateScheduledClassesJob.PubName(), sh.handleActivateScheduledClasses}, } for _, sub := range subscriptions { timeout := CANCEL_TIMEOUT diff --git a/provider-middleware/program_classes.go b/provider-middleware/program_classes.go new file mode 100644 index 000000000..d604fd302 --- /dev/null +++ b/provider-middleware/program_classes.go @@ -0,0 +1,94 @@ +package main + +import ( + "UnlockEdv2/src/models" + "context" + "encoding/json" + "fmt" + "time" + + "github.com/nats-io/nats.go" + "gorm.io/gorm" +) + +// handleActivateScheduledClasses is the entrypoint for the daily +// `tasks.activate_scheduled_classes` job. It flips any class whose scheduled +// start date has is the current day of the job run (in its facility's local timezone) from Scheduled to +// Active. The status update goes through the same map-based update the user interface uses +// so that ProgramClass.AfterUpdate fires and backfills enrolled_at on the +// class's enrollments. +func (sh *ServiceHandler) handleActivateScheduledClasses(ctx context.Context, msg *nats.Msg) { + var body map[string]any + if err := json.Unmarshal(msg.Data, &body); err != nil { + logger().Errorf("failed to unmarshal activate_scheduled_classes message: %v", err) + return + } + jobId, ok := body["job_id"].(string) + if !ok { + logger().Errorf("job_id not found in activate_scheduled_classes message: %v", body) + return + } + success := sh.activateScheduledClasses(ctx) == nil + sh.cleanupJob(ctx, nil, jobId, success) +} + +func (sh *ServiceHandler) activateScheduledClasses(ctx context.Context) error { + batchUserID, err := sh.systemBatchUserID(ctx) + if err != nil { //batch id doesn't exist then fail + logger().Errorf("cannot activate scheduled classes: %v", err) + return err + } + + var classIDs []int + if err := sh.db.WithContext(ctx). + Model(&models.ProgramClass{}). + Joins("JOIN facilities f ON f.id = program_classes.facility_id"). + Where("program_classes.status = ?", models.Scheduled). + Where("program_classes.archived_at IS NULL"). + Where("program_classes.start_dt <= (now() AT TIME ZONE f.timezone)::date"). + Pluck("program_classes.id", &classIDs).Error; err != nil { + logger().Errorf("failed to query scheduled classes to activate: %v", err) + return err + } + + if len(classIDs) == 0 { + logger().Infoln("no scheduled classes are due for activation") + return nil + } + logger().Infof("activating %d scheduled class(es): %v", len(classIDs), classIDs) + + batchCtx := context.WithValue(ctx, models.UserIDKey, batchUserID) + enrolledAt := time.Now().UTC() + if err := sh.db.WithContext(batchCtx).Transaction(func(tx *gorm.DB) error { + if err := tx. + Model(&models.ProgramClass{}). + Where("id IN ?", classIDs). + Set("class_ids", classIDs). + Updates(map[string]any{"status": models.Active}).Error; err != nil { + return err + } + return tx. + Model(&models.ProgramClassEnrollment{}). + Where("class_id IN ?", classIDs). + Where("enrollment_status = ?", models.Enrolled). + Where("enrolled_at IS NULL"). + Updates(map[string]any{ + "enrolled_at": enrolledAt, + "update_user_id": batchUserID, + }).Error + }); err != nil { + logger().Errorf("failed to activate scheduled classes %v: %v", classIDs, err) + return err + } + return nil +} + +func (sh *ServiceHandler) systemBatchUserID(ctx context.Context) (uint, error) { + var user models.User + if err := sh.db.WithContext(ctx). + Where("username = ?", "system_batch"). + First(&user).Error; err != nil { + return 0, fmt.Errorf("system_batch user not found: %w", err) + } + return user.ID, nil +}