This commit is contained in:
binwiederhier 2026-03-17 16:25:45 -04:00
parent a47d692cbf
commit cffa57950a
3 changed files with 23 additions and 6 deletions

View file

@ -19,6 +19,12 @@ import (
"strconv"
"strings"
"time"
"heckel.io/ntfy/v2/log"
)
const (
tagS3Client = "s3_client"
)
// Client is a minimal S3-compatible client. It supports PutObject, GetObject, DeleteObjects,
@ -60,11 +66,13 @@ func (c *Client) PutObject(ctx context.Context, key string, body io.Reader) erro
first := make([]byte, partSize)
n, err := io.ReadFull(body, first)
if errors.Is(err, io.ErrUnexpectedEOF) || err == io.EOF {
log.Tag(tagS3Client).Debug("PutObject key=%s size=%d (simple)", key, n)
return c.putObject(ctx, key, bytes.NewReader(first[:n]), int64(n))
}
if err != nil {
return fmt.Errorf("s3: PutObject read: %w", err)
}
log.Tag(tagS3Client).Debug("PutObject key=%s (multipart)", key)
combined := io.MultiReader(bytes.NewReader(first), body)
return c.putObjectMultipart(ctx, key, combined)
}
@ -72,6 +80,7 @@ func (c *Client) PutObject(ctx context.Context, key string, body io.Reader) erro
// GetObject downloads an object. The key is automatically prefixed with the client's configured
// prefix. The caller must close the returned ReadCloser.
func (c *Client) GetObject(ctx context.Context, key string) (io.ReadCloser, int64, error) {
log.Tag(tagS3Client).Debug("GetObject key=%s", key)
fullKey := c.objectKey(key)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.objectURL(fullKey), nil)
if err != nil {
@ -97,6 +106,7 @@ func (c *Client) GetObject(ctx context.Context, key string) (io.ReadCloser, int6
// Even when S3 returns HTTP 200, individual keys may fail. If any per-key errors are present
// in the response, they are returned as a combined error.
func (c *Client) DeleteObjects(ctx context.Context, keys []string) error {
log.Tag(tagS3Client).Debug("DeleteObjects keys=%d", len(keys))
var body bytes.Buffer
body.WriteString("<Delete><Quiet>true</Quiet>")
for _, key := range keys {
@ -152,6 +162,7 @@ func (c *Client) DeleteObjects(ctx context.Context, keys []string) error {
// ListObjects performs a single ListObjectsV2 request using the client's configured prefix.
// Use continuationToken for pagination. Set maxKeys to 0 for the server default (typically 1000).
func (c *Client) ListObjects(ctx context.Context, continuationToken string, maxKeys int) (*ListResult, error) {
log.Tag(tagS3Client).Debug("ListObjects continuation=%s maxKeys=%d", continuationToken, maxKeys)
query := url.Values{"list-type": {"2"}}
if prefix := c.prefixForList(); prefix != "" {
query.Set("prefix", prefix)
@ -197,7 +208,6 @@ func (c *Client) ListObjects(ctx context.Context, continuationToken string, maxK
// ListAllObjects returns all objects under the client's configured prefix by paginating through
// ListObjectsV2 results automatically. It stops after 10,000 pages as a safety valve.
func (c *Client) ListAllObjects(ctx context.Context) ([]Object, error) {
const maxPages = 10000
var all []Object
var token string
for page := 0; page < maxPages; page++ {
@ -299,11 +309,13 @@ func (c *Client) initiateMultipartUpload(ctx context.Context, fullKey string) (s
if err := xml.Unmarshal(respBody, &result); err != nil {
return "", fmt.Errorf("s3: InitiateMultipartUpload XML: %w", err)
}
log.Tag(tagS3Client).Debug("InitiateMultipartUpload key=%s uploadId=%s", fullKey, result.UploadID)
return result.UploadID, nil
}
// uploadPart uploads a single part of a multipart upload and returns the ETag.
func (c *Client) uploadPart(ctx context.Context, fullKey, uploadID string, partNumber int, data []byte) (string, error) {
log.Tag(tagS3Client).Debug("UploadPart key=%s part=%d size=%d", fullKey, partNumber, len(data))
reqURL := fmt.Sprintf("%s?partNumber=%d&uploadId=%s", c.objectURL(fullKey), partNumber, url.QueryEscape(uploadID))
req, err := http.NewRequestWithContext(ctx, http.MethodPut, reqURL, bytes.NewReader(data))
if err != nil {
@ -325,6 +337,7 @@ func (c *Client) uploadPart(ctx context.Context, fullKey, uploadID string, partN
// completeMultipartUpload finalizes a multipart upload with the given parts.
func (c *Client) completeMultipartUpload(ctx context.Context, fullKey, uploadID string, parts []completedPart) error {
log.Tag(tagS3Client).Debug("CompleteMultipartUpload key=%s uploadId=%s parts=%d", fullKey, uploadID, len(parts))
var body bytes.Buffer
body.WriteString("<CompleteMultipartUpload>")
for _, p := range parts {
@ -366,6 +379,7 @@ func (c *Client) completeMultipartUpload(ctx context.Context, fullKey, uploadID
// abortMultipartUpload cancels an in-progress multipart upload. Called on error to clean up.
func (c *Client) abortMultipartUpload(ctx context.Context, fullKey, uploadID string) {
log.Tag(tagS3Client).Debug("AbortMultipartUpload key=%s uploadId=%s", fullKey, uploadID)
reqURL := fmt.Sprintf("%s?uploadId=%s", c.objectURL(fullKey), url.QueryEscape(uploadID))
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, reqURL, nil)
if err != nil {

View file

@ -27,6 +27,9 @@ const (
// above which PutObject switches from a simple PUT to multipart upload. S3 requires a minimum
// part size of 5 MB for all parts except the last.
partSize = 5 * 1024 * 1024
// maxPages is the max number of pages to iterate through when listing objects
maxPages = 10000
)
// ParseURL parses an S3 URL of the form: