diff --git a/message/cache.go b/message/cache.go index 76ba7926..870eb599 100644 --- a/message/cache.go +++ b/message/cache.go @@ -246,7 +246,7 @@ func (c *Cache) MessagesDue() ([]*model.Message, error) { return readMessages(rows) } -// MessagesExpired returns a list of IDs for messages that have expired (should be deleted) +// MessagesExpired returns a list of message IDs that have expired and should be deleted func (c *Cache) MessagesExpired() ([]string, error) { rows, err := c.db.Query(c.queries.selectMessagesExpired, time.Now().Unix()) if err != nil { @@ -262,10 +262,10 @@ func (c *Cache) Message(id string) (*model.Message, error) { if err != nil { return nil, err } + defer rows.Close() if !rows.Next() { return nil, model.ErrMessageNotFound } - defer rows.Close() return readMessage(rows) } diff --git a/message/cache_postgres_schema.go b/message/cache_postgres_schema.go index 4a539d0e..99e18c2c 100644 --- a/message/cache_postgres_schema.go +++ b/message/cache_postgres_schema.go @@ -5,6 +5,7 @@ import ( "fmt" "heckel.io/ntfy/v2/db" + "heckel.io/ntfy/v2/log" ) // Initial PostgreSQL schema @@ -41,6 +42,7 @@ const ( CREATE INDEX IF NOT EXISTS idx_message_sequence_id ON message (sequence_id); CREATE INDEX IF NOT EXISTS idx_message_topic_published_time ON message (topic, published, time, id); CREATE INDEX IF NOT EXISTS idx_message_published_expires ON message (published, expires); + CREATE INDEX IF NOT EXISTS idx_message_attachment_expires ON message (attachment_expires) WHERE attachment_deleted = FALSE; CREATE INDEX IF NOT EXISTS idx_message_sender_attachment_expires ON message (sender, attachment_expires) WHERE user_id = ''; CREATE INDEX IF NOT EXISTS idx_message_user_id_attachment_expires ON message (user_id, attachment_expires); CREATE TABLE IF NOT EXISTS message_stats ( @@ -57,21 +59,57 @@ const ( // PostgreSQL schema management queries const ( - postgresCurrentSchemaVersion = 14 + postgresCurrentSchemaVersion = 15 postgresInsertSchemaVersionQuery = `INSERT INTO schema_version (store, version) VALUES ('message', $1)` + postgresUpdateSchemaVersionQuery = `UPDATE schema_version SET version = $1 WHERE store = 'message'` postgresSelectSchemaVersionQuery = `SELECT version FROM schema_version WHERE store = 'message'` ) -func setupPostgres(db *sql.DB) error { +// PostgreSQL schema migrations +const ( + // 14 -> 15 + postgresMigrate14To15CreateIndexQuery = ` + CREATE INDEX IF NOT EXISTS idx_message_attachment_expires ON message (attachment_expires) WHERE attachment_deleted = FALSE; + ` +) + +var postgresMigrations = map[int]func(db *sql.DB) error{ + 14: postgresMigrateFrom14, +} + +func setupPostgres(sqlDB *sql.DB) error { var schemaVersion int - if err := db.QueryRow(postgresSelectSchemaVersionQuery).Scan(&schemaVersion); err != nil { - return setupNewPostgresDB(db) + if err := sqlDB.QueryRow(postgresSelectSchemaVersionQuery).Scan(&schemaVersion); err != nil { + return setupNewPostgresDB(sqlDB) + } else if schemaVersion == postgresCurrentSchemaVersion { + return nil } else if schemaVersion > postgresCurrentSchemaVersion { return fmt.Errorf("unexpected schema version: version %d is higher than current version %d", schemaVersion, postgresCurrentSchemaVersion) } + for i := schemaVersion; i < postgresCurrentSchemaVersion; i++ { + fn, ok := postgresMigrations[i] + if !ok { + return fmt.Errorf("cannot find migration step from schema version %d to %d", i, i+1) + } else if err := fn(sqlDB); err != nil { + return err + } + } return nil } +func postgresMigrateFrom14(sqlDB *sql.DB) error { + log.Tag(tagMessageCache).Info("Migrating message cache database schema: from 14 to 15") + return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + if _, err := tx.Exec(postgresMigrate14To15CreateIndexQuery); err != nil { + return err + } + if _, err := tx.Exec(postgresUpdateSchemaVersionQuery, 15); err != nil { + return err + } + return nil + }) +} + func setupNewPostgresDB(sqlDB *sql.DB) error { return db.ExecTx(sqlDB, func(tx *sql.Tx) error { if _, err := tx.Exec(postgresCreateTablesQuery); err != nil { diff --git a/message/cache_sqlite_schema.go b/message/cache_sqlite_schema.go index 8c68bad8..b19bfca1 100644 --- a/message/cache_sqlite_schema.go +++ b/message/cache_sqlite_schema.go @@ -57,7 +57,7 @@ const ( // Schema version management for SQLite const ( - sqliteCurrentSchemaVersion = 14 + sqliteCurrentSchemaVersion = 15 sqliteCreateSchemaVersionTableQuery = ` CREATE TABLE IF NOT EXISTS schemaVersion ( id INT PRIMARY KEY, @@ -208,6 +208,7 @@ var ( 11: sqliteMigrateFrom11, 12: sqliteMigrateFrom12, 13: sqliteMigrateFrom13, + 14: sqliteMigrateFrom14, } ) @@ -451,3 +452,15 @@ func sqliteMigrateFrom13(sqlDB *sql.DB, _ time.Duration) error { return nil }) } + +// sqliteMigrateFrom14 is a no-op; the corresponding Postgres migration adds +// idx_message_attachment_expires, which SQLite already has from the initial schema. +func sqliteMigrateFrom14(sqlDB *sql.DB, _ time.Duration) error { + log.Tag(tagMessageCache).Info("Migrating cache database schema: from 14 to 15") + return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 15); err != nil { + return err + } + return nil + }) +} diff --git a/server/server.go b/server/server.go index dc56d57f..508d9a70 100644 --- a/server/server.go +++ b/server/server.go @@ -1426,6 +1426,9 @@ func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *model.Me return err } attachmentExpiry := time.Now().Add(vinfo.Limits.AttachmentExpiryDuration).Unix() + if m.Expires > 0 && attachmentExpiry > m.Expires { + attachmentExpiry = m.Expires // Attachment must never outlive the message + } if m.Time > attachmentExpiry { return errHTTPBadRequestAttachmentsExpiryBeforeDelivery.With(m) } diff --git a/server/server_manager.go b/server/server_manager.go index 89ff38c2..11fa4dfe 100644 --- a/server/server_manager.go +++ b/server/server_manager.go @@ -3,7 +3,6 @@ package server import ( "heckel.io/ntfy/v2/log" "heckel.io/ntfy/v2/util" - "strings" ) func (s *Server) execManager() { @@ -151,11 +150,11 @@ func (s *Server) pruneAttachments() { log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments") } else if len(ids) > 0 { if log.Tag(tagManager).IsDebug() { - log.Tag(tagManager).Debug("Deleting attachments %s", strings.Join(ids, ", ")) - } - if err := s.attachment.Remove(ids...); err != nil { - log.Tag(tagManager).Err(err).Warn("Error deleting attachments") + log.Tag(tagManager).Debug("Marking %d expired attachment(s) as deleted", len(ids)) } + // Only mark as deleted in DB. The actual storage files are cleaned up + // by the attachment store's sync() loop, which periodically reconciles + // storage with the database and removes orphaned files. if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil { log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted") } @@ -174,13 +173,11 @@ func (s *Server) pruneMessages() { if err != nil { log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages") } else if len(expiredMessageIDs) > 0 { - if s.attachment != nil { - if err := s.attachment.Remove(expiredMessageIDs...); err != nil { - log.Tag(tagManager).Err(err).Warn("Error deleting attachments for expired messages") - } - } + // Only delete DB rows. Attachment storage files are cleaned up by the + // attachment store's sync() loop, which periodically reconciles storage + // with the database and removes orphaned files. if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil { - log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted") + log.Tag(tagManager).Err(err).Warn("Error deleting expired messages") } } else { log.Tag(tagManager).Debug("No expired messages to delete") diff --git a/web/package-lock.json b/web/package-lock.json index 175ef11b..b04b2b41 100644 --- a/web/package-lock.json +++ b/web/package-lock.json @@ -9514,24 +9514,6 @@ "dev": true, "license": "ISC" }, - "node_modules/yaml": { - "version": "2.8.3", - "resolved": "https://registry.npmjs.org/yaml/-/yaml-2.8.3.tgz", - "integrity": "sha512-AvbaCLOO2Otw/lW5bmh9d/WEdcDFdQp2Z2ZUH3pX9U2ihyUY0nvLv7J6TrWowklRGPYbB/IuIMfYgxaCPg5Bpg==", - "dev": true, - "license": "ISC", - "optional": true, - "peer": true, - "bin": { - "yaml": "bin.mjs" - }, - "engines": { - "node": ">= 14.6" - }, - "funding": { - "url": "https://github.com/sponsors/eemeli" - } - }, "node_modules/yocto-queue": { "version": "0.1.0", "resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-0.1.0.tgz",