diff --git a/attachment/store.go b/attachment/store.go index 70fb55c0..14b39f81 100644 --- a/attachment/store.go +++ b/attachment/store.go @@ -14,9 +14,8 @@ import ( ) const ( - tagStore = "attachment_store" - syncInterval = 15 * time.Minute // How often to run the background sync loop - orphanGracePeriod = time.Hour // Don't delete orphaned objects younger than this to avoid races with in-flight uploads + tagStore = "attachment_store" + syncInterval = 15 * time.Minute // How often to run the background sync loop ) var errInvalidFileID = errors.New("invalid file ID") @@ -29,36 +28,38 @@ type Store struct { size int64 // Current size of the store in bytes sizes map[string]int64 // File ID -> size, for subtracting on Remove attachmentsWithSizes func() (map[string]int64, error) // Returns file ID -> size for active attachments + orphanGracePeriod time.Duration // Don't delete orphaned objects younger than this closeChan chan struct{} mu sync.RWMutex // Protects size and sizes } // NewFileStore creates a new file-system backed attachment cache -func NewFileStore(dir string, totalSizeLimit int64, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) { +func NewFileStore(dir string, totalSizeLimit int64, orphanGracePeriod time.Duration, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) { b, err := newFileBackend(dir) if err != nil { return nil, err } - return newStore(b, totalSizeLimit, attachmentsWithSizes) + return newStore(b, totalSizeLimit, orphanGracePeriod, attachmentsWithSizes) } // NewS3Store creates a new S3-backed attachment cache. The s3URL must be in the format: // // s3://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]?region=REGION[&endpoint=ENDPOINT] -func NewS3Store(s3URL string, totalSizeLimit int64, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) { +func NewS3Store(s3URL string, totalSizeLimit int64, orphanGracePeriod time.Duration, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) { config, err := s3.ParseURL(s3URL) if err != nil { return nil, err } - return newStore(newS3Backend(s3.New(config)), totalSizeLimit, attachmentsWithSizes) + return newStore(newS3Backend(s3.New(config)), totalSizeLimit, orphanGracePeriod, attachmentsWithSizes) } -func newStore(backend backend, totalSizeLimit int64, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) { +func newStore(backend backend, totalSizeLimit int64, orphanGracePeriod time.Duration, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) { c := &Store{ backend: backend, limit: totalSizeLimit, sizes: make(map[string]int64), attachmentsWithSizes: attachmentsWithSizes, + orphanGracePeriod: orphanGracePeriod, closeChan: make(chan struct{}), } // Hydrate sizes from the database immediately so that Size()/Remaining()/Remove() @@ -140,9 +141,14 @@ func (c *Store) Remove(ids ...string) error { return nil } +// Sync triggers an immediate reconciliation of storage with the database. +func (c *Store) Sync() error { + return c.sync() +} + // sync reconciles the backend storage with the database. It lists all objects, -// deletes orphans (not in the valid ID set and older than 1 hour), and recomputes -// the total size from the existing attachments in the database. +// deletes orphans (not in the valid ID set and older than the grace period), and +// recomputes the total size from the existing attachments in the database. func (c *Store) sync() error { if c.attachmentsWithSizes == nil { return nil @@ -157,7 +163,7 @@ func (c *Store) sync() error { } // Calculate total cache size and collect orphaned attachments, excluding objects younger // than the grace period to account for races, and skipping objects with invalid IDs. - cutoff := time.Now().Add(-orphanGracePeriod) + cutoff := time.Now().Add(-c.orphanGracePeriod) var orphanIDs []string var count, totalSize int64 sizes := make(map[string]int64, len(remoteObjects)) diff --git a/attachment/store_file_test.go b/attachment/store_file_test.go index d0b6e135..0f7495b4 100644 --- a/attachment/store_file_test.go +++ b/attachment/store_file_test.go @@ -2,6 +2,7 @@ package attachment import ( "testing" + "time" "github.com/stretchr/testify/require" ) @@ -9,7 +10,7 @@ import ( func newTestFileStore(t *testing.T, totalSizeLimit int64) (dir string, cache *Store) { t.Helper() dir = t.TempDir() - cache, err := NewFileStore(dir, totalSizeLimit, nil) + cache, err := NewFileStore(dir, totalSizeLimit, time.Hour, nil) require.Nil(t, err) t.Cleanup(func() { cache.Close() }) return dir, cache diff --git a/attachment/store_s3_test.go b/attachment/store_s3_test.go index 6615f4e9..22c1d6bf 100644 --- a/attachment/store_s3_test.go +++ b/attachment/store_s3_test.go @@ -24,7 +24,7 @@ func TestS3Store_WriteWithPrefix(t *testing.T) { client := s3.New(cfg) deleteAllObjects(t, client) backend := newS3Backend(client) - cache, err := newStore(backend, 10*1024, nil) + cache, err := newStore(backend, 10*1024, time.Hour, nil) require.Nil(t, err) t.Cleanup(func() { deleteAllObjects(t, client) @@ -62,7 +62,7 @@ func newTestRealS3Store(t *testing.T, totalSizeLimit int64) (*Store, *modTimeOve inner := newS3Backend(client) wrapper := &modTimeOverrideBackend{backend: inner, modTimes: make(map[string]time.Time)} deleteAllObjects(t, client) - store, err := newStore(wrapper, totalSizeLimit, nil) + store, err := newStore(wrapper, totalSizeLimit, time.Hour, nil) require.Nil(t, err) t.Cleanup(func() { deleteAllObjects(t, client) diff --git a/message/cache.go b/message/cache.go index 870eb599..90fbf51d 100644 --- a/message/cache.go +++ b/message/cache.go @@ -24,7 +24,6 @@ var errNoRows = errors.New("no rows found") // queries holds the database-specific SQL queries type queries struct { insertMessage string - deleteMessage string selectScheduledMessageIDsBySeqID string deleteScheduledBySequenceID string updateMessagesForTopicExpiry string @@ -35,12 +34,11 @@ type queries struct { selectMessagesSinceIDScheduled string selectMessagesLatest string selectMessagesDue string - selectMessagesExpired string + deleteExpiredMessages string updateMessagePublished string selectMessagesCount string selectTopics string - updateAttachmentDeleted string - selectAttachmentsExpired string + markExpiredAttachmentsDeleted string selectAttachmentsSizeBySender string selectAttachmentsSizeByUserID string selectAttachmentsWithSizes string @@ -246,14 +244,16 @@ func (c *Cache) MessagesDue() ([]*model.Message, error) { return readMessages(rows) } -// MessagesExpired returns a list of message IDs that have expired and should be deleted -func (c *Cache) MessagesExpired() ([]string, error) { - rows, err := c.db.Query(c.queries.selectMessagesExpired, time.Now().Unix()) +// DeleteExpiredMessages deletes up to `limit` expired messages in a single query +// and returns the number of deleted rows. +func (c *Cache) DeleteExpiredMessages(limit int) (int64, error) { + c.maybeLock() + defer c.maybeUnlock() + result, err := c.db.Exec(c.queries.deleteExpiredMessages, time.Now().Unix(), limit) if err != nil { - return nil, err + return 0, err } - defer rows.Close() - return readStrings(rows) + return result.RowsAffected() } // Message returns the message with the given ID, or ErrMessageNotFound if not found @@ -312,20 +312,6 @@ func (c *Cache) Topics() ([]string, error) { return readStrings(rows) } -// DeleteMessages deletes the messages with the given IDs -func (c *Cache) DeleteMessages(ids ...string) error { - c.maybeLock() - defer c.maybeUnlock() - return db.ExecTx(c.db, func(tx *sql.Tx) error { - for _, id := range ids { - if _, err := tx.Exec(c.queries.deleteMessage, id); err != nil { - return err - } - } - return nil - }) -} - // DeleteScheduledBySequenceID deletes unpublished (scheduled) messages with the given topic and sequence ID. // It returns the message IDs of the deleted messages, which can be used to clean up attachment files. func (c *Cache) DeleteScheduledBySequenceID(topic, sequenceID string) ([]string, error) { @@ -363,28 +349,16 @@ func (c *Cache) ExpireMessages(topics ...string) error { }) } -// AttachmentsExpired returns message IDs with expired attachments that have not been deleted -func (c *Cache) AttachmentsExpired() ([]string, error) { - rows, err := c.db.Query(c.queries.selectAttachmentsExpired, time.Now().Unix()) - if err != nil { - return nil, err - } - defer rows.Close() - return readStrings(rows) -} - -// MarkAttachmentsDeleted marks the attachments for the given message IDs as deleted -func (c *Cache) MarkAttachmentsDeleted(ids ...string) error { +// MarkExpiredAttachmentsDeleted marks up to `limit` expired attachments as deleted in a single +// query and returns the number of updated rows. +func (c *Cache) MarkExpiredAttachmentsDeleted(limit int) (int64, error) { c.maybeLock() defer c.maybeUnlock() - return db.ExecTx(c.db, func(tx *sql.Tx) error { - for _, id := range ids { - if _, err := tx.Exec(c.queries.updateAttachmentDeleted, id); err != nil { - return err - } - } - return nil - }) + result, err := c.db.Exec(c.queries.markExpiredAttachmentsDeleted, time.Now().Unix(), limit) + if err != nil { + return 0, err + } + return result.RowsAffected() } // AttachmentBytesUsedBySender returns the total size of active attachments sent by the given sender diff --git a/message/cache_postgres.go b/message/cache_postgres.go index f0a32036..4d7c3f93 100644 --- a/message/cache_postgres.go +++ b/message/cache_postgres.go @@ -12,7 +12,6 @@ const ( INSERT INTO message (mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user_id, content_type, encoding, published) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24) ` - postgresDeleteMessageQuery = `DELETE FROM message WHERE mid = $1` postgresSelectScheduledMessageIDsBySeqIDQuery = `SELECT mid FROM message WHERE topic = $1 AND sequence_id = $2 AND published = FALSE` postgresDeleteScheduledBySequenceIDQuery = `DELETE FROM message WHERE topic = $1 AND sequence_id = $2 AND published = FALSE` postgresUpdateMessagesForTopicExpiryQuery = `UPDATE message SET expires = $1 WHERE topic = $2` @@ -61,13 +60,12 @@ const ( WHERE time <= $1 AND published = FALSE ORDER BY time, id ` - postgresSelectMessagesExpiredQuery = `SELECT mid FROM message WHERE expires <= $1 AND published = TRUE` postgresUpdateMessagePublishedQuery = `UPDATE message SET published = TRUE WHERE mid = $1` postgresSelectMessagesCountQuery = `SELECT COUNT(*) FROM message` postgresSelectTopicsQuery = `SELECT topic FROM message GROUP BY topic` - postgresUpdateAttachmentDeletedQuery = `UPDATE message SET attachment_deleted = TRUE WHERE mid = $1` - postgresSelectAttachmentsExpiredQuery = `SELECT mid FROM message WHERE attachment_expires > 0 AND attachment_expires <= $1 AND attachment_deleted = FALSE` + postgresDeleteExpiredMessagesQuery = `DELETE FROM message WHERE mid IN (SELECT mid FROM message WHERE expires <= $1 AND published = TRUE LIMIT $2)` + postgresMarkExpiredAttachmentsDeletedQuery = `UPDATE message SET attachment_deleted = TRUE WHERE mid IN (SELECT mid FROM message WHERE attachment_expires > 0 AND attachment_expires <= $1 AND attachment_deleted = FALSE LIMIT $2)` postgresSelectAttachmentsSizeBySenderQuery = `SELECT COALESCE(SUM(attachment_size), 0) FROM message WHERE user_id = '' AND sender = $1 AND attachment_expires >= $2` postgresSelectAttachmentsSizeByUserIDQuery = `SELECT COALESCE(SUM(attachment_size), 0) FROM message WHERE user_id = $1 AND attachment_expires >= $2` postgresSelectAttachmentsWithSizesQuery = `SELECT mid, attachment_size FROM message WHERE attachment_expires > $1 AND attachment_deleted = FALSE` @@ -79,7 +77,6 @@ const ( var postgresQueries = queries{ insertMessage: postgresInsertMessageQuery, - deleteMessage: postgresDeleteMessageQuery, selectScheduledMessageIDsBySeqID: postgresSelectScheduledMessageIDsBySeqIDQuery, deleteScheduledBySequenceID: postgresDeleteScheduledBySequenceIDQuery, updateMessagesForTopicExpiry: postgresUpdateMessagesForTopicExpiryQuery, @@ -90,12 +87,11 @@ var postgresQueries = queries{ selectMessagesSinceIDScheduled: postgresSelectMessagesSinceIDIncludeScheduledQuery, selectMessagesLatest: postgresSelectMessagesLatestQuery, selectMessagesDue: postgresSelectMessagesDueQuery, - selectMessagesExpired: postgresSelectMessagesExpiredQuery, + deleteExpiredMessages: postgresDeleteExpiredMessagesQuery, updateMessagePublished: postgresUpdateMessagePublishedQuery, selectMessagesCount: postgresSelectMessagesCountQuery, selectTopics: postgresSelectTopicsQuery, - updateAttachmentDeleted: postgresUpdateAttachmentDeletedQuery, - selectAttachmentsExpired: postgresSelectAttachmentsExpiredQuery, + markExpiredAttachmentsDeleted: postgresMarkExpiredAttachmentsDeletedQuery, selectAttachmentsSizeBySender: postgresSelectAttachmentsSizeBySenderQuery, selectAttachmentsSizeByUserID: postgresSelectAttachmentsSizeByUserIDQuery, selectAttachmentsWithSizes: postgresSelectAttachmentsWithSizesQuery, diff --git a/message/cache_postgres_schema.go b/message/cache_postgres_schema.go index 99e18c2c..994df9b0 100644 --- a/message/cache_postgres_schema.go +++ b/message/cache_postgres_schema.go @@ -73,14 +73,14 @@ const ( ` ) -var postgresMigrations = map[int]func(db *sql.DB) error{ +var postgresMigrations = map[int]func(d *sql.DB) error{ 14: postgresMigrateFrom14, } -func setupPostgres(sqlDB *sql.DB) error { +func setupPostgres(d *sql.DB) error { var schemaVersion int - if err := sqlDB.QueryRow(postgresSelectSchemaVersionQuery).Scan(&schemaVersion); err != nil { - return setupNewPostgresDB(sqlDB) + if err := d.QueryRow(postgresSelectSchemaVersionQuery).Scan(&schemaVersion); err != nil { + return setupNewPostgresDB(d) } else if schemaVersion == postgresCurrentSchemaVersion { return nil } else if schemaVersion > postgresCurrentSchemaVersion { @@ -90,16 +90,16 @@ func setupPostgres(sqlDB *sql.DB) error { fn, ok := postgresMigrations[i] if !ok { return fmt.Errorf("cannot find migration step from schema version %d to %d", i, i+1) - } else if err := fn(sqlDB); err != nil { + } else if err := fn(d); err != nil { return err } } return nil } -func postgresMigrateFrom14(sqlDB *sql.DB) error { +func postgresMigrateFrom14(d *sql.DB) error { log.Tag(tagMessageCache).Info("Migrating message cache database schema: from 14 to 15") - return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + return db.ExecTx(d, func(tx *sql.Tx) error { if _, err := tx.Exec(postgresMigrate14To15CreateIndexQuery); err != nil { return err } diff --git a/message/cache_sqlite.go b/message/cache_sqlite.go index b39095e0..b9d7394f 100644 --- a/message/cache_sqlite.go +++ b/message/cache_sqlite.go @@ -18,7 +18,6 @@ const ( INSERT INTO messages (mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user, content_type, encoding, published) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` - sqliteDeleteMessageQuery = `DELETE FROM messages WHERE mid = ?` sqliteSelectScheduledMessageIDsBySeqIDQuery = `SELECT mid FROM messages WHERE topic = ? AND sequence_id = ? AND published = 0` sqliteDeleteScheduledBySequenceIDQuery = `DELETE FROM messages WHERE topic = ? AND sequence_id = ? AND published = 0` sqliteUpdateMessagesForTopicExpiryQuery = `UPDATE messages SET expires = ? WHERE topic = ?` @@ -64,13 +63,12 @@ const ( WHERE time <= ? AND published = 0 ORDER BY time, id ` - sqliteSelectMessagesExpiredQuery = `SELECT mid FROM messages WHERE expires <= ? AND published = 1` sqliteUpdateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE mid = ?` sqliteSelectMessagesCountQuery = `SELECT COUNT(*) FROM messages` sqliteSelectTopicsQuery = `SELECT topic FROM messages GROUP BY topic` - sqliteUpdateAttachmentDeletedQuery = `UPDATE messages SET attachment_deleted = 1 WHERE mid = ?` - sqliteSelectAttachmentsExpiredQuery = `SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires <= ? AND attachment_deleted = 0` + sqliteDeleteExpiredMessagesQuery = `DELETE FROM messages WHERE mid IN (SELECT mid FROM messages WHERE expires <= ? AND published = 1 LIMIT ?)` + sqliteMarkExpiredAttachmentsDeletedQuery = `UPDATE messages SET attachment_deleted = 1 WHERE mid IN (SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires <= ? AND attachment_deleted = 0 LIMIT ?)` sqliteSelectAttachmentsSizeBySenderQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE user = '' AND sender = ? AND attachment_expires >= ?` sqliteSelectAttachmentsSizeByUserIDQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE user = ? AND attachment_expires >= ?` sqliteSelectAttachmentsWithSizesQuery = `SELECT mid, attachment_size FROM messages WHERE attachment_expires > ? AND attachment_deleted = 0` @@ -82,7 +80,6 @@ const ( var sqliteQueries = queries{ insertMessage: sqliteInsertMessageQuery, - deleteMessage: sqliteDeleteMessageQuery, selectScheduledMessageIDsBySeqID: sqliteSelectScheduledMessageIDsBySeqIDQuery, deleteScheduledBySequenceID: sqliteDeleteScheduledBySequenceIDQuery, updateMessagesForTopicExpiry: sqliteUpdateMessagesForTopicExpiryQuery, @@ -93,12 +90,11 @@ var sqliteQueries = queries{ selectMessagesSinceIDScheduled: sqliteSelectMessagesSinceIDIncludeScheduledQuery, selectMessagesLatest: sqliteSelectMessagesLatestQuery, selectMessagesDue: sqliteSelectMessagesDueQuery, - selectMessagesExpired: sqliteSelectMessagesExpiredQuery, + deleteExpiredMessages: sqliteDeleteExpiredMessagesQuery, updateMessagePublished: sqliteUpdateMessagePublishedQuery, selectMessagesCount: sqliteSelectMessagesCountQuery, selectTopics: sqliteSelectTopicsQuery, - updateAttachmentDeleted: sqliteUpdateAttachmentDeletedQuery, - selectAttachmentsExpired: sqliteSelectAttachmentsExpiredQuery, + markExpiredAttachmentsDeleted: sqliteMarkExpiredAttachmentsDeletedQuery, selectAttachmentsSizeBySender: sqliteSelectAttachmentsSizeBySenderQuery, selectAttachmentsSizeByUserID: sqliteSelectAttachmentsSizeByUserIDQuery, selectAttachmentsWithSizes: sqliteSelectAttachmentsWithSizesQuery, diff --git a/message/cache_sqlite_test.go b/message/cache_sqlite_test.go index e69488e6..95ff7e48 100644 --- a/message/cache_sqlite_test.go +++ b/message/cache_sqlite_test.go @@ -209,7 +209,7 @@ func TestSqliteStore_Migration_From9(t *testing.T) { require.True(t, rows.Next()) var version int require.Nil(t, rows.Scan(&version)) - require.Equal(t, 14, version) + require.Equal(t, 15, version) require.Nil(t, rows.Close()) messages, err := s.Messages("mytopic", model.SinceAllMessages, false) @@ -287,6 +287,6 @@ func checkSqliteSchemaVersion(t *testing.T, filename string) { require.True(t, rows.Next()) var schemaVersion int require.Nil(t, rows.Scan(&schemaVersion)) - require.Equal(t, 14, schemaVersion) + require.Equal(t, 15, schemaVersion) require.Nil(t, rows.Close()) } diff --git a/message/cache_test.go b/message/cache_test.go index 0fddc88b..059a1f62 100644 --- a/message/cache_test.go +++ b/message/cache_test.go @@ -3,7 +3,6 @@ package message_test import ( "net/netip" "path/filepath" - "sort" "sync" "testing" "time" @@ -274,9 +273,9 @@ func TestStore_Prune(t *testing.T) { require.Nil(t, err) require.Equal(t, 3, count) - expiredMessageIDs, err := s.MessagesExpired() + deleted, err := s.DeleteExpiredMessages(10) require.Nil(t, err) - require.Nil(t, s.DeleteMessages(expiredMessageIDs...)) + require.Equal(t, int64(2), deleted) count, err = s.MessagesCount() require.Nil(t, err) @@ -414,10 +413,9 @@ func TestStore_AttachmentsExpired(t *testing.T) { } require.Nil(t, s.AddMessage(m)) - ids, err := s.AttachmentsExpired() + count, err := s.MarkExpiredAttachmentsDeleted(10) require.Nil(t, err) - require.Equal(t, 1, len(ids)) - require.Equal(t, "m4", ids[0]) + require.Equal(t, int64(1), count) }) } @@ -583,13 +581,9 @@ func TestStore_ExpireMessages(t *testing.T) { require.Nil(t, s.ExpireMessages("topic1")) // topic1 messages should now be expired (expires set to past) - expiredIDs, err := s.MessagesExpired() + deleted, err := s.DeleteExpiredMessages(100) require.Nil(t, err) - require.Equal(t, 2, len(expiredIDs)) - sort.Strings(expiredIDs) - expectedIDs := []string{m1.ID, m2.ID} - sort.Strings(expectedIDs) - require.Equal(t, expectedIDs, expiredIDs) + require.Equal(t, int64(2), deleted) // topic2 should be unaffected messages, err = s.Messages("topic2", model.SinceAllMessages, false) @@ -629,27 +623,15 @@ func TestStore_MarkAttachmentsDeleted(t *testing.T) { } require.Nil(t, s.AddMessage(m2)) - // Both should show as expired attachments needing cleanup - ids, err := s.AttachmentsExpired() + // Both should be marked as deleted in one batch + count, err := s.MarkExpiredAttachmentsDeleted(10) require.Nil(t, err) - require.Equal(t, 2, len(ids)) - - // Mark msg1's attachment as deleted (file cleaned up) - require.Nil(t, s.MarkAttachmentsDeleted("msg1")) - - // Now only msg2 should show as needing cleanup - ids, err = s.AttachmentsExpired() - require.Nil(t, err) - require.Equal(t, 1, len(ids)) - require.Equal(t, "msg2", ids[0]) - - // Mark msg2 too - require.Nil(t, s.MarkAttachmentsDeleted("msg2")) + require.Equal(t, int64(2), count) // No more expired attachments to clean up - ids, err = s.AttachmentsExpired() + count, err = s.MarkExpiredAttachmentsDeleted(10) require.Nil(t, err) - require.Equal(t, 0, len(ids)) + require.Equal(t, int64(0), count) // Messages themselves still exist messages, err := s.Messages("mytopic", model.SinceAllMessages, false) diff --git a/server/config.go b/server/config.go index 8ead312c..8497b18e 100644 --- a/server/config.go +++ b/server/config.go @@ -20,6 +20,7 @@ const ( DefaultCacheBatchTimeout = time.Duration(0) DefaultKeepaliveInterval = 45 * time.Second // Not too frequently to save battery (Android read timeout used to be 77s!) DefaultManagerInterval = time.Minute + DefaultManagerBatchSize = 30000 DefaultDelayedSenderInterval = 10 * time.Second DefaultMessageDelayMin = 10 * time.Second DefaultMessageDelayMax = 3 * 24 * time.Hour @@ -46,11 +47,13 @@ const ( // - total topic limit: max number of topics overall // - various attachment limits const ( - DefaultMessageSizeLimit = 4096 // Bytes; note that FCM/APNS have a limit of ~4 KB for the entire message - DefaultTotalTopicLimit = 15000 - DefaultAttachmentTotalSizeLimit = int64(5 * 1024 * 1024 * 1024) // 5 GB - DefaultAttachmentFileSizeLimit = int64(15 * 1024 * 1024) // 15 MB - DefaultAttachmentExpiryDuration = 3 * time.Hour + DefaultMessageSizeLimit = 4096 // Bytes; note that FCM/APNS have a limit of ~4 KB for the entire message + DefaultTotalTopicLimit = 15000 + DefaultAttachmentTotalSizeLimit = int64(5 * 1024 * 1024 * 1024) // 5 GB + DefaultAttachmentFileSizeLimit = int64(15 * 1024 * 1024) // 15 MB + DefaultAttachmentExpiryDuration = 3 * time.Hour + DefaultAttachmentOrphanGracePeriod = time.Hour // Don't delete orphaned objects younger than this to avoid races with in-flight uploads + ) // Defines all per-visitor limits @@ -115,9 +118,11 @@ type Config struct { AttachmentTotalSizeLimit int64 AttachmentFileSizeLimit int64 AttachmentExpiryDuration time.Duration + AttachmentOrphanGracePeriod time.Duration TemplateDir string // Directory to load named templates from KeepaliveInterval time.Duration ManagerInterval time.Duration + ManagerBatchSize int DisallowedTopics []string WebRoot string // empty to disable DelayedSenderInterval time.Duration @@ -217,9 +222,11 @@ func NewConfig() *Config { AttachmentTotalSizeLimit: DefaultAttachmentTotalSizeLimit, AttachmentFileSizeLimit: DefaultAttachmentFileSizeLimit, AttachmentExpiryDuration: DefaultAttachmentExpiryDuration, + AttachmentOrphanGracePeriod: DefaultAttachmentOrphanGracePeriod, TemplateDir: DefaultTemplateDir, KeepaliveInterval: DefaultKeepaliveInterval, ManagerInterval: DefaultManagerInterval, + ManagerBatchSize: DefaultManagerBatchSize, DisallowedTopics: DefaultDisallowedTopics, WebRoot: "/", DelayedSenderInterval: DefaultDelayedSenderInterval, diff --git a/server/server.go b/server/server.go index 508d9a70..71d08b01 100644 --- a/server/server.go +++ b/server/server.go @@ -302,9 +302,9 @@ func createMessageCache(conf *Config, pool *db.DB) (*message.Cache, error) { func createAttachmentStore(conf *Config, messageCache *message.Cache) (*attachment.Store, error) { if strings.HasPrefix(conf.AttachmentCacheDir, "s3://") { - return attachment.NewS3Store(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, messageCache.AttachmentsWithSizes) + return attachment.NewS3Store(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, conf.AttachmentOrphanGracePeriod, messageCache.AttachmentsWithSizes) } else if conf.AttachmentCacheDir != "" { - return attachment.NewFileStore(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, messageCache.AttachmentsWithSizes) + return attachment.NewFileStore(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, conf.AttachmentOrphanGracePeriod, messageCache.AttachmentsWithSizes) } return nil, nil } diff --git a/server/server_account_test.go b/server/server_account_test.go index 0360fcd4..58f4d5d4 100644 --- a/server/server_account_test.go +++ b/server/server_account_test.go @@ -673,6 +673,7 @@ func TestAccount_Reservation_Delete_Messages_And_Attachments(t *testing.T) { t.Parallel() conf := newTestConfigWithAuthFile(t, databaseURL) conf.AuthDefault = user.PermissionReadWrite + conf.AttachmentOrphanGracePeriod = 0 // For testing: delete orphans immediately s := newTestServer(t, conf) // Create user with tier @@ -742,6 +743,7 @@ func TestAccount_Reservation_Delete_Messages_And_Attachments(t *testing.T) { // Verify that messages and attachments were deleted // This does not explicitly call the manager! waitFor(t, func() bool { + s.attachment.Sync() // File cleanup is done by sync, not by the manager ms, err := s.messageCache.Messages("mytopic1", model.SinceAllMessages, false) require.Nil(t, err) return len(ms) == 0 && !util.FileExists(filepath.Join(s.config.AttachmentCacheDir, m1.ID)) diff --git a/server/server_manager.go b/server/server_manager.go index 11fa4dfe..d0eefe72 100644 --- a/server/server_manager.go +++ b/server/server_manager.go @@ -142,22 +142,17 @@ func (s *Server) pruneAttachments() { if s.attachment == nil { return } + // Only mark as deleted in DB. The actual storage files are cleaned up + // by the attachment store's sync() loop, which periodically reconciles + // storage with the database and removes orphaned files. log. Tag(tagManager). Timing(func() { - ids, err := s.messageCache.AttachmentsExpired() + count, err := s.messageCache.MarkExpiredAttachmentsDeleted(s.config.ManagerBatchSize) if err != nil { - log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments") - } else if len(ids) > 0 { - if log.Tag(tagManager).IsDebug() { - log.Tag(tagManager).Debug("Marking %d expired attachment(s) as deleted", len(ids)) - } - // Only mark as deleted in DB. The actual storage files are cleaned up - // by the attachment store's sync() loop, which periodically reconciles - // storage with the database and removes orphaned files. - if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil { - log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted") - } + log.Tag(tagManager).Err(err).Warn("Error marking expired attachments as deleted") + } else if count > 0 { + log.Tag(tagManager).Debug("Marked %d expired attachment(s) as deleted", count) } else { log.Tag(tagManager).Debug("No expired attachments to delete") } @@ -166,19 +161,17 @@ func (s *Server) pruneAttachments() { } func (s *Server) pruneMessages() { + // Only delete DB rows. Attachment storage files are cleaned up by the + // attachment store's sync() loop, which periodically reconciles storage + // with the database and removes orphaned files. log. Tag(tagManager). Timing(func() { - expiredMessageIDs, err := s.messageCache.MessagesExpired() + count, err := s.messageCache.DeleteExpiredMessages(s.config.ManagerBatchSize) if err != nil { - log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages") - } else if len(expiredMessageIDs) > 0 { - // Only delete DB rows. Attachment storage files are cleaned up by the - // attachment store's sync() loop, which periodically reconciles storage - // with the database and removes orphaned files. - if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil { - log.Tag(tagManager).Err(err).Warn("Error deleting expired messages") - } + log.Tag(tagManager).Err(err).Warn("Error deleting expired messages") + } else if count > 0 { + log.Tag(tagManager).Debug("Deleted %d expired message(s)", count) } else { log.Tag(tagManager).Debug("No expired messages to delete") } diff --git a/server/server_payments_test.go b/server/server_payments_test.go index 9873d6d8..30b1a22e 100644 --- a/server/server_payments_test.go +++ b/server/server_payments_test.go @@ -443,6 +443,7 @@ func TestPayments_Webhook_Subscription_Updated_Downgrade_From_PastDue_To_Active( c := newTestConfigWithAuthFile(t, databaseURL) c.StripeSecretKey = "secret key" c.StripeWebhookKey = "webhook key" + c.AttachmentOrphanGracePeriod = 0 // For testing: delete orphans immediately s := newTestServer(t, c) s.stripe = stripeMock @@ -546,6 +547,7 @@ func TestPayments_Webhook_Subscription_Updated_Downgrade_From_PastDue_To_Active( // Verify that messages and attachments were deleted time.Sleep(time.Second) s.execManager() + s.attachment.Sync() // File cleanup is done by sync, not by the manager ms, err := s.messageCache.Messages("atopic", model.SinceAllMessages, false) require.Nil(t, err) diff --git a/server/server_test.go b/server/server_test.go index 44b9ac94..3f5f5b06 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -2285,6 +2285,7 @@ func TestServer_PublishAttachmentAndExpire(t *testing.T) { c := newTestConfig(t, databaseURL) c.AttachmentExpiryDuration = time.Millisecond // Hack + c.AttachmentOrphanGracePeriod = 0 // For testing: delete orphans immediately s := newTestServer(t, c) // Publish and make sure we can retrieve it @@ -2301,7 +2302,8 @@ func TestServer_PublishAttachmentAndExpire(t *testing.T) { // Prune and makes sure it's gone waitFor(t, func() bool { - s.execManager() // May run many times + s.execManager() + s.attachment.Sync() // File cleanup is done by sync, not by the manager return !util.FileExists(file) }) response = request(t, s, "GET", path, "", nil)