fix(typesense): update tasks in Typesense directly when the change happened

Resolves https://community.vikunja.io/t/no-filters-working-assignee-date-task-done-etc/1910
This commit is contained in:
kolaente 2024-01-28 11:46:52 +01:00
parent ae9b382963
commit 90ad975ca0
Signed by: konrad
GPG Key ID: F40E70337AB24C9B
4 changed files with 68 additions and 26 deletions

View File

@ -26,6 +26,7 @@ import (
func init() { func init() {
rootCmd.AddCommand(indexCmd) rootCmd.AddCommand(indexCmd)
rootCmd.AddCommand(partialReindexCmd)
} }
var indexCmd = &cobra.Command{ var indexCmd = &cobra.Command{
@ -35,8 +36,8 @@ var indexCmd = &cobra.Command{
initialize.FullInitWithoutAsync() initialize.FullInitWithoutAsync()
}, },
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
if !config.TypesenseEnabled.GetBool() { if config.TypesenseURL.GetString() == "" {
log.Error("Typesense not enabled") log.Error("Typesense not configured")
return return
} }
@ -56,3 +57,32 @@ var indexCmd = &cobra.Command{
log.Infof("Done!") log.Infof("Done!")
}, },
} }
var partialReindexCmd = &cobra.Command{
Use: "partial-index",
Short: "Reindex any tasks which were not indexed yet into Typesense. This will not remove any existing index.",
PreRun: func(cmd *cobra.Command, args []string) {
initialize.FullInitWithoutAsync()
},
Run: func(cmd *cobra.Command, args []string) {
if config.TypesenseURL.GetString() == "" {
log.Error("Typesense not configured")
return
}
log.Infof("Indexing… This may take a while.")
err := models.CreateTypesenseCollections()
if err != nil {
log.Criticalf("Could not create Typesense collections: %s", err.Error())
return
}
err = models.SyncUpdatedTasksIntoTypesense()
if err != nil {
log.Criticalf("Could not reindex all changed tasks into Typesense: %s", err.Error())
return
}
log.Infof("Done!")
},
}

View File

@ -95,7 +95,6 @@ func FullInit() {
models.RegisterUserDeletionCron() models.RegisterUserDeletionCron()
models.RegisterOldExportCleanupCron() models.RegisterOldExportCleanupCron()
openid.CleanupSavedOpenIDProviders() openid.CleanupSavedOpenIDProviders()
models.RegisterPeriodicTypesenseResyncCron()
// Start processing events // Start processing events
go func() { go func() {

View File

@ -70,6 +70,7 @@ func RegisterListeners() {
if config.TypesenseEnabled.GetBool() { if config.TypesenseEnabled.GetBool() {
events.RegisterListener((&TaskDeletedEvent{}).Name(), &RemoveTaskFromTypesense{}) events.RegisterListener((&TaskDeletedEvent{}).Name(), &RemoveTaskFromTypesense{})
events.RegisterListener((&TaskCreatedEvent{}).Name(), &AddTaskToTypesense{}) events.RegisterListener((&TaskCreatedEvent{}).Name(), &AddTaskToTypesense{})
events.RegisterListener((&TaskUpdatedEvent{}).Name(), &UpdateTaskInTypesense{})
} }
if config.WebhooksEnabled.GetBool() { if config.WebhooksEnabled.GetBool() {
RegisterEventForWebhook(&TaskCreatedEvent{}) RegisterEventForWebhook(&TaskCreatedEvent{})
@ -511,7 +512,7 @@ type RemoveTaskFromTypesense struct {
// Name defines the name for the RemoveTaskFromTypesense listener // Name defines the name for the RemoveTaskFromTypesense listener
func (s *RemoveTaskFromTypesense) Name() string { func (s *RemoveTaskFromTypesense) Name() string {
return "remove.task.from.typesense" return "typesense.task.remove"
} }
// Handle is executed when the event RemoveTaskFromTypesense listens on is fired // Handle is executed when the event RemoveTaskFromTypesense listens on is fired
@ -537,7 +538,7 @@ type AddTaskToTypesense struct {
// Name defines the name for the AddTaskToTypesense listener // Name defines the name for the AddTaskToTypesense listener
func (l *AddTaskToTypesense) Name() string { func (l *AddTaskToTypesense) Name() string {
return "add.task.to.typesense" return "typesense.task.add"
} }
// Handle is executed when the event AddTaskToTypesense listens on is fired // Handle is executed when the event AddTaskToTypesense listens on is fired
@ -563,6 +564,32 @@ func (l *AddTaskToTypesense) Handle(msg *message.Message) (err error) {
return return
} }
// UpdateTaskInTypesense represents a listener
type UpdateTaskInTypesense struct {
}
// Name defines the name for the UpdateTaskInTypesense listener
func (l *UpdateTaskInTypesense) Name() string {
return "typesense.task.update"
}
// Handle is executed when the event UpdateTaskInTypesense listens on is fired
func (l *UpdateTaskInTypesense) Handle(msg *message.Message) (err error) {
event := &TaskUpdatedEvent{}
err = json.Unmarshal(msg.Payload, event)
if err != nil {
return err
}
s := db.NewSession()
defer s.Close()
task := make(map[int64]*Task, 1)
task[event.Task.ID] = event.Task // Will be filled with all data by the Typesense connector
return reindexTasksInTypesense(s, task)
}
// IncreaseAttachmentCounter represents a listener // IncreaseAttachmentCounter represents a listener
type IncreaseAttachmentCounter struct { type IncreaseAttachmentCounter struct {
} }

View File

@ -22,7 +22,6 @@ import (
"time" "time"
"code.vikunja.io/api/pkg/config" "code.vikunja.io/api/pkg/config"
"code.vikunja.io/api/pkg/cron"
"code.vikunja.io/api/pkg/db" "code.vikunja.io/api/pkg/db"
"code.vikunja.io/api/pkg/log" "code.vikunja.io/api/pkg/log"
"code.vikunja.io/api/pkg/user" "code.vikunja.io/api/pkg/user"
@ -232,7 +231,7 @@ func ReindexAllTasks() (err error) {
return fmt.Errorf("could not index dummy task: %w", err) return fmt.Errorf("could not index dummy task: %w", err)
} }
err = reindexTasks(s, tasks) err = reindexTasksInTypesense(s, tasks)
if err != nil { if err != nil {
return fmt.Errorf("could not reindex all tasks: %s", err.Error()) return fmt.Errorf("could not reindex all tasks: %s", err.Error())
} }
@ -278,7 +277,7 @@ func getTypesenseTaskForTask(s *xorm.Session, task *Task, projectsCache map[int6
return return
} }
func reindexTasks(s *xorm.Session, tasks map[int64]*Task) (err error) { func reindexTasksInTypesense(s *xorm.Session, tasks map[int64]*Task) (err error) {
if len(tasks) == 0 { if len(tasks) == 0 {
log.Infof("No tasks to index") log.Infof("No tasks to index")
@ -314,6 +313,8 @@ func reindexTasks(s *xorm.Session, tasks map[int64]*Task) (err error) {
return err return err
} }
log.Debugf("Indexed tasks %v into Typesense", tasks)
return nil return nil
} }
@ -475,6 +476,8 @@ func convertTaskToTypesenseTask(task *Task) *typesenseTask {
return tt return tt
} }
// This function is only used to catch up with the Typesense Sync when it didn't index for some reason
func SyncUpdatedTasksIntoTypesense() (err error) { func SyncUpdatedTasksIntoTypesense() (err error) {
tasks := make(map[int64]*Task) tasks := make(map[int64]*Task)
@ -517,7 +520,7 @@ func SyncUpdatedTasksIntoTypesense() (err error) {
if len(tasks) > 0 { if len(tasks) > 0 {
log.Debugf("[Typesense Sync] Updating %d tasks", len(tasks)) log.Debugf("[Typesense Sync] Updating %d tasks", len(tasks))
err = reindexTasks(s, tasks) err = reindexTasksInTypesense(s, tasks)
if err != nil { if err != nil {
_ = s.Rollback() _ = s.Rollback()
return return
@ -539,20 +542,3 @@ func SyncUpdatedTasksIntoTypesense() (err error) {
return s.Commit() return s.Commit()
} }
func RegisterPeriodicTypesenseResyncCron() {
if !config.TypesenseEnabled.GetBool() {
log.Debugf("[Typesense Sync] Typesense is disabled, not setting up sync cron")
return
}
err := cron.Schedule("* * * * *", func() {
err := SyncUpdatedTasksIntoTypesense()
if err != nil {
log.Fatalf("[Typesense Sync] Could not sync updated tasks into typesense: %s", err)
}
})
if err != nil {
log.Fatalf("[Typesense Sync] Could not register typesense resync cron: %s", err)
}
}