feat(server): optional limit for poll subscriptions (rebased on upstream)

Upstream moved message storage to message/ and added PostgreSQL, so the
previous branch could not merge cleanly. Reimplement poll limit in
sendOldMessages: apply query filters first, then keep the most recent N
messages. Document limit/ aliases in subscribe API.

Made-with: Cursor
This commit is contained in:
epifeny 2026-03-28 09:54:38 +00:00
parent 266d0b9d37
commit 5e1316aff4
2 changed files with 45 additions and 8 deletions

View file

@ -245,6 +245,19 @@ combined with `since=` (defaults to `since=all`).
curl -s "ntfy.sh/mytopic/json?poll=1"
```
### Limit number of polled messages
When using `poll=1`, you can additionally use the `limit=` query parameter to restrict how many messages are
returned in a single poll. If `limit` is not set, all available messages since `since=` are returned (as before).
Filters (`tags`, `priority`, etc.) are applied first; `limit` then keeps only the **most recent** messages that match.
For example, to only fetch the most recent message:
```
curl -s "ntfy.sh/mytopic/json?poll=1&limit=1"
```
The `limit` parameter also works with the `/sse`, `/raw` and `/ws` subscription endpoints.
### Fetch cached messages
Messages may be cached for a couple of hours (see [message caching](../config.md#message-cache)) to account for network
interruptions of subscribers. If the server has configured message caching, you can read back what you missed by using
@ -429,6 +442,7 @@ and can be passed as **HTTP headers** or **query parameters in the URL**. They a
|-------------|----------------------------|---------------------------------------------------------------------------------|
| `poll` | `X-Poll`, `po` | Return cached messages and close connection |
| `since` | `X-Since`, `si` | Return cached messages since timestamp, duration or message ID |
| `limit` | `X-Limit`, `li` | With `poll=1`: return at most this many messages (most recent after filters) |
| `scheduled` | `X-Scheduled`, `sched` | Include scheduled/delayed messages in message list |
| `id` | `X-ID` | Filter: Only return messages that match this exact message ID |
| `message` | `X-Message`, `m` | Filter: Only return messages that match this exact message string |

View file

@ -1517,7 +1517,7 @@ func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *
if err != nil {
return err
}
poll, since, scheduled, filters, err := parseSubscribeParams(r)
poll, since, scheduled, filters, limit, err := parseSubscribeParams(r)
if err != nil {
return err
}
@ -1563,7 +1563,7 @@ func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *
for _, t := range topics {
t.Keepalive()
}
return s.sendOldMessages(topics, since, scheduled, v, sub)
return s.sendOldMessages(topics, since, scheduled, limit, filters, v, sub)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -1579,7 +1579,7 @@ func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *
if err := sub(v, model.NewOpenMessage(topicsStr)); err != nil { // Send out open message
return err
}
if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
if err := s.sendOldMessages(topics, since, scheduled, limit, filters, v, sub); err != nil {
return err
}
for {
@ -1620,7 +1620,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
if err != nil {
return err
}
poll, since, scheduled, filters, err := parseSubscribeParams(r)
poll, since, scheduled, filters, limit, err := parseSubscribeParams(r)
if err != nil {
return err
}
@ -1714,7 +1714,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
for _, t := range topics {
t.Keepalive()
}
return s.sendOldMessages(topics, since, scheduled, v, sub)
return s.sendOldMessages(topics, since, scheduled, limit, filters, v, sub)
}
subscriberIDs := make([]int, 0)
for _, t := range topics {
@ -1728,7 +1728,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
if err := sub(v, model.NewOpenMessage(topicsStr)); err != nil { // Send out open message
return err
}
if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
if err := s.sendOldMessages(topics, since, scheduled, limit, filters, v, sub); err != nil {
return err
}
err = g.Wait()
@ -1742,7 +1742,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
return nil
}
func parseSubscribeParams(r *http.Request) (poll bool, since model.SinceMarker, scheduled bool, filters *queryFilter, err error) {
func parseSubscribeParams(r *http.Request) (poll bool, since model.SinceMarker, scheduled bool, filters *queryFilter, limit int, err error) {
poll = readBoolParam(r, false, "x-poll", "poll", "po")
scheduled = readBoolParam(r, false, "x-scheduled", "scheduled", "sched")
since, err = parseSince(r, poll)
@ -1753,6 +1753,11 @@ func parseSubscribeParams(r *http.Request) (poll bool, since model.SinceMarker,
if err != nil {
return
}
if limitStr := readParam(r, "x-limit", "limit", "li"); limitStr != "" {
if n, parseErr := strconv.Atoi(limitStr); parseErr == nil && n > 0 {
limit = n
}
}
return
}
@ -1823,7 +1828,8 @@ func (s *Server) setRateVisitors(r *http.Request, v *visitor, rateTopics []*topi
// sendOldMessages selects old messages from the messageCache and calls sub for each of them. It uses since as the
// marker, returning only messages that are newer than the marker.
func (s *Server) sendOldMessages(topics []*topic, since model.SinceMarker, scheduled bool, v *visitor, sub subscriber) error {
// If limit > 0, filters are applied first, then at most limit messages are returned (the most recent ones matching the filters).
func (s *Server) sendOldMessages(topics []*topic, since model.SinceMarker, scheduled bool, limit int, filters *queryFilter, v *visitor, sub subscriber) error {
if since.IsNone() {
return nil
}
@ -1838,6 +1844,23 @@ func (s *Server) sendOldMessages(topics []*topic, since model.SinceMarker, sched
sort.Slice(messages, func(i, j int) bool {
return messages[i].Time < messages[j].Time
})
if limit > 0 {
filtered := make([]*model.Message, 0, len(messages))
for _, m := range messages {
if filters.Pass(m) {
filtered = append(filtered, m)
}
}
if len(filtered) > limit {
filtered = filtered[len(filtered)-limit:]
}
for _, m := range filtered {
if err := sub(v, m); err != nil {
return err
}
}
return nil
}
for _, m := range messages {
if err := sub(v, m); err != nil {
return err