mirror of
https://github.com/binwiederhier/ntfy.git
synced 2026-05-15 07:35:49 -06:00
S3 WIP
This commit is contained in:
parent
4487299a80
commit
790ba243c7
10 changed files with 1917 additions and 226 deletions
325
s3/client.go
Normal file
325
s3/client.go
Normal file
|
|
@ -0,0 +1,325 @@
|
|||
// Package s3 provides a minimal S3-compatible client that works with AWS S3, DigitalOcean Spaces,
|
||||
// GCP Cloud Storage, MinIO, Backblaze B2, and other S3-compatible providers. It uses raw HTTP
|
||||
// requests with AWS Signature V4 signing, no AWS SDK dependency required.
|
||||
package s3
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/md5" //nolint:gosec // MD5 is required by the S3 protocol for Content-MD5 headers
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Client is a minimal S3-compatible client. It supports PutObject, GetObject, DeleteObjects,
|
||||
// and ListObjectsV2 operations using AWS Signature V4 signing. The bucket and optional key prefix
|
||||
// are fixed at construction time. All operations target the same bucket and prefix.
|
||||
//
|
||||
// Fields must not be modified after the Client is passed to any method or goroutine.
|
||||
type Client struct {
|
||||
AccessKey string // AWS access key ID
|
||||
SecretKey string // AWS secret access key
|
||||
Region string // e.g. "us-east-1"
|
||||
Endpoint string // host[:port] only, e.g. "s3.amazonaws.com" or "nyc3.digitaloceanspaces.com"
|
||||
Bucket string // S3 bucket name
|
||||
Prefix string // optional key prefix (e.g. "attachments"); prepended to all keys automatically
|
||||
PathStyle bool // if true, use path-style addressing; otherwise virtual-hosted-style
|
||||
HTTPClient *http.Client // if nil, http.DefaultClient is used
|
||||
}
|
||||
|
||||
// New creates a new S3 client from the given Config.
|
||||
func New(config *Config) *Client {
|
||||
return &Client{
|
||||
AccessKey: config.AccessKey,
|
||||
SecretKey: config.SecretKey,
|
||||
Region: config.Region,
|
||||
Endpoint: config.Endpoint,
|
||||
Bucket: config.Bucket,
|
||||
Prefix: config.Prefix,
|
||||
PathStyle: config.PathStyle,
|
||||
}
|
||||
}
|
||||
|
||||
// PutObject uploads body to the given key. The key is automatically prefixed with the client's
|
||||
// configured prefix. The body size must be known in advance. The payload is sent as
|
||||
// UNSIGNED-PAYLOAD, which is supported by all major S3-compatible providers over HTTPS.
|
||||
func (c *Client) PutObject(ctx context.Context, key string, body io.Reader, size int64) error {
|
||||
fullKey := c.objectKey(key)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, c.objectURL(fullKey), body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("s3: PutObject request: %w", err)
|
||||
}
|
||||
req.ContentLength = size
|
||||
c.signV4(req, unsignedPayload)
|
||||
resp, err := c.httpClient().Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("s3: PutObject: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return parseError(resp)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetObject downloads an object. The key is automatically prefixed with the client's configured
|
||||
// prefix. The caller must close the returned ReadCloser.
|
||||
func (c *Client) GetObject(ctx context.Context, key string) (io.ReadCloser, int64, error) {
|
||||
fullKey := c.objectKey(key)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.objectURL(fullKey), nil)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("s3: GetObject request: %w", err)
|
||||
}
|
||||
c.signV4(req, emptyPayloadHash)
|
||||
resp, err := c.httpClient().Do(req)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("s3: GetObject: %w", err)
|
||||
}
|
||||
if resp.StatusCode/100 != 2 {
|
||||
err := parseError(resp)
|
||||
resp.Body.Close()
|
||||
return nil, 0, err
|
||||
}
|
||||
return resp.Body, resp.ContentLength, nil
|
||||
}
|
||||
|
||||
// DeleteObjects removes multiple objects in a single batch request. Keys are automatically
|
||||
// prefixed with the client's configured prefix. S3 supports up to 1000 keys per call; the
|
||||
// caller is responsible for batching if needed.
|
||||
//
|
||||
// Even when S3 returns HTTP 200, individual keys may fail. If any per-key errors are present
|
||||
// in the response, they are returned as a combined error.
|
||||
func (c *Client) DeleteObjects(ctx context.Context, keys []string) error {
|
||||
var body bytes.Buffer
|
||||
body.WriteString("<Delete><Quiet>true</Quiet>")
|
||||
for _, key := range keys {
|
||||
body.WriteString("<Object><Key>")
|
||||
xml.EscapeText(&body, []byte(c.objectKey(key)))
|
||||
body.WriteString("</Key></Object>")
|
||||
}
|
||||
body.WriteString("</Delete>")
|
||||
bodyBytes := body.Bytes()
|
||||
payloadHash := sha256Hex(bodyBytes)
|
||||
|
||||
// Content-MD5 is required by the S3 protocol for DeleteObjects requests.
|
||||
md5Sum := md5.Sum(bodyBytes) //nolint:gosec
|
||||
contentMD5 := base64.StdEncoding.EncodeToString(md5Sum[:])
|
||||
|
||||
reqURL := c.bucketURL() + "?delete="
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL, bytes.NewReader(bodyBytes))
|
||||
if err != nil {
|
||||
return fmt.Errorf("s3: DeleteObjects request: %w", err)
|
||||
}
|
||||
req.ContentLength = int64(len(bodyBytes))
|
||||
req.Header.Set("Content-Type", "application/xml")
|
||||
req.Header.Set("Content-MD5", contentMD5)
|
||||
c.signV4(req, payloadHash)
|
||||
resp, err := c.httpClient().Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("s3: DeleteObjects: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return parseError(resp)
|
||||
}
|
||||
|
||||
// S3 may return HTTP 200 with per-key errors in the response body
|
||||
respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxResponseBytes))
|
||||
if err != nil {
|
||||
return fmt.Errorf("s3: DeleteObjects read response: %w", err)
|
||||
}
|
||||
var result deleteResult
|
||||
if err := xml.Unmarshal(respBody, &result); err != nil {
|
||||
return nil // If we can't parse, assume success (Quiet mode returns empty body on success)
|
||||
}
|
||||
if len(result.Errors) > 0 {
|
||||
var msgs []string
|
||||
for _, e := range result.Errors {
|
||||
msgs = append(msgs, fmt.Sprintf("%s: %s", e.Key, e.Message))
|
||||
}
|
||||
return fmt.Errorf("s3: DeleteObjects partial failure: %s", strings.Join(msgs, "; "))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListObjects performs a single ListObjectsV2 request using the client's configured prefix.
|
||||
// Use continuationToken for pagination. Set maxKeys to 0 for the server default (typically 1000).
|
||||
func (c *Client) ListObjects(ctx context.Context, continuationToken string, maxKeys int) (*ListResult, error) {
|
||||
query := url.Values{"list-type": {"2"}}
|
||||
if prefix := c.prefixForList(); prefix != "" {
|
||||
query.Set("prefix", prefix)
|
||||
}
|
||||
if continuationToken != "" {
|
||||
query.Set("continuation-token", continuationToken)
|
||||
}
|
||||
if maxKeys > 0 {
|
||||
query.Set("max-keys", strconv.Itoa(maxKeys))
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.bucketURL()+"?"+query.Encode(), nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("s3: ListObjects request: %w", err)
|
||||
}
|
||||
c.signV4(req, emptyPayloadHash)
|
||||
resp, err := c.httpClient().Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("s3: ListObjects: %w", err)
|
||||
}
|
||||
respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxResponseBytes))
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("s3: ListObjects read: %w", err)
|
||||
}
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return nil, parseErrorFromBytes(resp.StatusCode, respBody)
|
||||
}
|
||||
var result listObjectsV2Response
|
||||
if err := xml.Unmarshal(respBody, &result); err != nil {
|
||||
return nil, fmt.Errorf("s3: ListObjects XML: %w", err)
|
||||
}
|
||||
objects := make([]Object, len(result.Contents))
|
||||
for i, obj := range result.Contents {
|
||||
objects[i] = Object(obj)
|
||||
}
|
||||
return &ListResult{
|
||||
Objects: objects,
|
||||
IsTruncated: result.IsTruncated,
|
||||
NextContinuationToken: result.NextContinuationToken,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ListAllObjects returns all objects under the client's configured prefix by paginating through
|
||||
// ListObjectsV2 results automatically. It stops after 10,000 pages as a safety valve.
|
||||
func (c *Client) ListAllObjects(ctx context.Context) ([]Object, error) {
|
||||
const maxPages = 10000
|
||||
var all []Object
|
||||
var token string
|
||||
for page := 0; page < maxPages; page++ {
|
||||
result, err := c.ListObjects(ctx, token, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
all = append(all, result.Objects...)
|
||||
if !result.IsTruncated {
|
||||
return all, nil
|
||||
}
|
||||
token = result.NextContinuationToken
|
||||
}
|
||||
return nil, fmt.Errorf("s3: ListAllObjects exceeded %d pages", maxPages)
|
||||
}
|
||||
|
||||
// signV4 signs req in place using AWS Signature V4. payloadHash is the hex-encoded SHA-256
|
||||
// of the request body, or the literal string "UNSIGNED-PAYLOAD" for streaming uploads.
|
||||
func (c *Client) signV4(req *http.Request, payloadHash string) {
|
||||
now := time.Now().UTC()
|
||||
datestamp := now.Format("20060102")
|
||||
amzDate := now.Format("20060102T150405Z")
|
||||
|
||||
// Required headers
|
||||
req.Header.Set("Host", c.hostHeader())
|
||||
req.Header.Set("X-Amz-Date", amzDate)
|
||||
req.Header.Set("X-Amz-Content-Sha256", payloadHash)
|
||||
|
||||
// Canonical headers (all headers we set, sorted by lowercase key)
|
||||
signedKeys := make([]string, 0, len(req.Header))
|
||||
canonHeaders := make(map[string]string, len(req.Header))
|
||||
for k := range req.Header {
|
||||
lk := strings.ToLower(k)
|
||||
signedKeys = append(signedKeys, lk)
|
||||
canonHeaders[lk] = strings.TrimSpace(req.Header.Get(k))
|
||||
}
|
||||
sort.Strings(signedKeys)
|
||||
signedHeadersStr := strings.Join(signedKeys, ";")
|
||||
var chBuf strings.Builder
|
||||
for _, k := range signedKeys {
|
||||
chBuf.WriteString(k)
|
||||
chBuf.WriteByte(':')
|
||||
chBuf.WriteString(canonHeaders[k])
|
||||
chBuf.WriteByte('\n')
|
||||
}
|
||||
|
||||
// Canonical request
|
||||
canonicalRequest := strings.Join([]string{
|
||||
req.Method,
|
||||
canonicalURI(req.URL),
|
||||
canonicalQueryString(req.URL.Query()),
|
||||
chBuf.String(),
|
||||
signedHeadersStr,
|
||||
payloadHash,
|
||||
}, "\n")
|
||||
|
||||
// String to sign
|
||||
credentialScope := datestamp + "/" + c.Region + "/s3/aws4_request"
|
||||
stringToSign := "AWS4-HMAC-SHA256\n" + amzDate + "\n" + credentialScope + "\n" + sha256Hex([]byte(canonicalRequest))
|
||||
|
||||
// Signing key
|
||||
signingKey := hmacSHA256(hmacSHA256(hmacSHA256(hmacSHA256(
|
||||
[]byte("AWS4"+c.SecretKey), []byte(datestamp)),
|
||||
[]byte(c.Region)),
|
||||
[]byte("s3")),
|
||||
[]byte("aws4_request"))
|
||||
|
||||
signature := hex.EncodeToString(hmacSHA256(signingKey, []byte(stringToSign)))
|
||||
req.Header.Set("Authorization", fmt.Sprintf(
|
||||
"AWS4-HMAC-SHA256 Credential=%s/%s, SignedHeaders=%s, Signature=%s",
|
||||
c.AccessKey, credentialScope, signedHeadersStr, signature,
|
||||
))
|
||||
}
|
||||
|
||||
func (c *Client) httpClient() *http.Client {
|
||||
if c.HTTPClient != nil {
|
||||
return c.HTTPClient
|
||||
}
|
||||
return http.DefaultClient
|
||||
}
|
||||
|
||||
// objectKey prepends the configured prefix to the given key.
|
||||
func (c *Client) objectKey(key string) string {
|
||||
if c.Prefix != "" {
|
||||
return c.Prefix + "/" + key
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
// prefixForList returns the prefix to use in ListObjectsV2 requests,
|
||||
// with a trailing slash so that only objects under the prefix directory are returned.
|
||||
func (c *Client) prefixForList() string {
|
||||
if c.Prefix != "" {
|
||||
return c.Prefix + "/"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// bucketURL returns the base URL for bucket-level operations.
|
||||
func (c *Client) bucketURL() string {
|
||||
if c.PathStyle {
|
||||
return fmt.Sprintf("https://%s/%s", c.Endpoint, c.Bucket)
|
||||
}
|
||||
return fmt.Sprintf("https://%s.%s", c.Bucket, c.Endpoint)
|
||||
}
|
||||
|
||||
// objectURL returns the full URL for an object (key should already include the prefix).
|
||||
// Each path segment is URI-encoded to handle special characters in keys.
|
||||
func (c *Client) objectURL(key string) string {
|
||||
segments := strings.Split(key, "/")
|
||||
for i, seg := range segments {
|
||||
segments[i] = uriEncode(seg)
|
||||
}
|
||||
return c.bucketURL() + "/" + strings.Join(segments, "/")
|
||||
}
|
||||
|
||||
// hostHeader returns the value for the Host header.
|
||||
func (c *Client) hostHeader() string {
|
||||
if c.PathStyle {
|
||||
return c.Endpoint
|
||||
}
|
||||
return c.Bucket + "." + c.Endpoint
|
||||
}
|
||||
727
s3/client_test.go
Normal file
727
s3/client_test.go
Normal file
|
|
@ -0,0 +1,727 @@
|
|||
package s3
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// --- Mock S3 server ---
|
||||
//
|
||||
// A minimal S3-compatible HTTP server that supports PutObject, GetObject, DeleteObjects, and
|
||||
// ListObjectsV2. Uses path-style addressing: /{bucket}/{key}. Objects are stored in memory.
|
||||
|
||||
type mockS3Server struct {
|
||||
objects map[string][]byte // full key (bucket/key) -> body
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func newMockS3Server() (*httptest.Server, *mockS3Server) {
|
||||
m := &mockS3Server{objects: make(map[string][]byte)}
|
||||
return httptest.NewTLSServer(m), m
|
||||
}
|
||||
|
||||
func (m *mockS3Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// Path is /{bucket}[/{key...}]
|
||||
path := strings.TrimPrefix(r.URL.Path, "/")
|
||||
|
||||
switch {
|
||||
case r.Method == http.MethodPut:
|
||||
m.handlePut(w, r, path)
|
||||
case r.Method == http.MethodGet && r.URL.Query().Get("list-type") == "2":
|
||||
m.handleList(w, r, path)
|
||||
case r.Method == http.MethodGet:
|
||||
m.handleGet(w, r, path)
|
||||
case r.Method == http.MethodPost && r.URL.Query().Has("delete"):
|
||||
m.handleDelete(w, r, path)
|
||||
default:
|
||||
http.Error(w, "not implemented", http.StatusNotImplemented)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockS3Server) handlePut(w http.ResponseWriter, r *http.Request, path string) {
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
m.mu.Lock()
|
||||
m.objects[path] = body
|
||||
m.mu.Unlock()
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func (m *mockS3Server) handleGet(w http.ResponseWriter, r *http.Request, path string) {
|
||||
m.mu.RLock()
|
||||
body, ok := m.objects[path]
|
||||
m.mu.RUnlock()
|
||||
if !ok {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
w.Write([]byte(`<?xml version="1.0" encoding="UTF-8"?><Error><Code>NoSuchKey</Code><Message>The specified key does not exist.</Message></Error>`))
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(body)))
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(body)
|
||||
}
|
||||
|
||||
type listObjectsResponse struct {
|
||||
XMLName xml.Name `xml:"ListBucketResult"`
|
||||
Contents []listObject `xml:"Contents"`
|
||||
// Pagination support
|
||||
IsTruncated bool `xml:"IsTruncated"`
|
||||
NextContinuationToken string `xml:"NextContinuationToken"`
|
||||
}
|
||||
|
||||
func (m *mockS3Server) handleDelete(w http.ResponseWriter, r *http.Request, bucketPath string) {
|
||||
// bucketPath is just the bucket name
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
var req struct {
|
||||
Objects []struct {
|
||||
Key string `xml:"Key"`
|
||||
} `xml:"Object"`
|
||||
}
|
||||
if err := xml.Unmarshal(body, &req); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
m.mu.Lock()
|
||||
for _, obj := range req.Objects {
|
||||
delete(m.objects, bucketPath+"/"+obj.Key)
|
||||
}
|
||||
m.mu.Unlock()
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(`<?xml version="1.0" encoding="UTF-8"?><DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/"></DeleteResult>`))
|
||||
}
|
||||
|
||||
func (m *mockS3Server) handleList(w http.ResponseWriter, r *http.Request, bucketPath string) {
|
||||
prefix := r.URL.Query().Get("prefix")
|
||||
contToken := r.URL.Query().Get("continuation-token")
|
||||
|
||||
m.mu.RLock()
|
||||
var allKeys []string
|
||||
for key := range m.objects {
|
||||
objKey := strings.TrimPrefix(key, bucketPath+"/")
|
||||
if objKey == key {
|
||||
continue // different bucket
|
||||
}
|
||||
if prefix == "" || strings.HasPrefix(objKey, prefix) {
|
||||
allKeys = append(allKeys, objKey)
|
||||
}
|
||||
}
|
||||
m.mu.RUnlock()
|
||||
sort.Strings(allKeys)
|
||||
|
||||
// Simple continuation token: it's the key to start after
|
||||
startIdx := 0
|
||||
if contToken != "" {
|
||||
for i, k := range allKeys {
|
||||
if k == contToken {
|
||||
startIdx = i + 1
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
maxKeys := 1000
|
||||
if mk := r.URL.Query().Get("max-keys"); mk != "" {
|
||||
fmt.Sscanf(mk, "%d", &maxKeys)
|
||||
}
|
||||
|
||||
endIdx := startIdx + maxKeys
|
||||
truncated := false
|
||||
nextToken := ""
|
||||
if endIdx < len(allKeys) {
|
||||
truncated = true
|
||||
nextToken = allKeys[endIdx-1]
|
||||
allKeys = allKeys[startIdx:endIdx]
|
||||
} else {
|
||||
allKeys = allKeys[startIdx:]
|
||||
}
|
||||
|
||||
m.mu.RLock()
|
||||
var contents []listObject
|
||||
for _, objKey := range allKeys {
|
||||
body := m.objects[bucketPath+"/"+objKey]
|
||||
contents = append(contents, listObject{Key: objKey, Size: int64(len(body))})
|
||||
}
|
||||
m.mu.RUnlock()
|
||||
|
||||
resp := listObjectsResponse{
|
||||
Contents: contents,
|
||||
IsTruncated: truncated,
|
||||
NextContinuationToken: nextToken,
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/xml")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
xml.NewEncoder(w).Encode(resp)
|
||||
}
|
||||
|
||||
func (m *mockS3Server) objectCount() int {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return len(m.objects)
|
||||
}
|
||||
|
||||
// --- Helper to create a test client pointing at mock server ---
|
||||
|
||||
func newTestClient(server *httptest.Server, bucket, prefix string) *Client {
|
||||
// httptest.NewTLSServer URL is like "https://127.0.0.1:PORT"
|
||||
host := strings.TrimPrefix(server.URL, "https://")
|
||||
return &Client{
|
||||
AccessKey: "AKID",
|
||||
SecretKey: "SECRET",
|
||||
Region: "us-east-1",
|
||||
Endpoint: host,
|
||||
Bucket: bucket,
|
||||
Prefix: prefix,
|
||||
PathStyle: true,
|
||||
HTTPClient: server.Client(),
|
||||
}
|
||||
}
|
||||
|
||||
// --- URL parsing tests ---
|
||||
|
||||
func TestParseURL_Success(t *testing.T) {
|
||||
cfg, err := ParseURL("s3://AKID:SECRET@my-bucket/attachments?region=us-east-1")
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, "my-bucket", cfg.Bucket)
|
||||
require.Equal(t, "attachments", cfg.Prefix)
|
||||
require.Equal(t, "us-east-1", cfg.Region)
|
||||
require.Equal(t, "AKID", cfg.AccessKey)
|
||||
require.Equal(t, "SECRET", cfg.SecretKey)
|
||||
require.Equal(t, "s3.us-east-1.amazonaws.com", cfg.Endpoint)
|
||||
require.False(t, cfg.PathStyle)
|
||||
}
|
||||
|
||||
func TestParseURL_NoPrefix(t *testing.T) {
|
||||
cfg, err := ParseURL("s3://AKID:SECRET@my-bucket?region=us-east-1")
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, "my-bucket", cfg.Bucket)
|
||||
require.Equal(t, "", cfg.Prefix)
|
||||
}
|
||||
|
||||
func TestParseURL_WithEndpoint(t *testing.T) {
|
||||
cfg, err := ParseURL("s3://AKID:SECRET@my-bucket/prefix?region=us-east-1&endpoint=https://s3.example.com")
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, "my-bucket", cfg.Bucket)
|
||||
require.Equal(t, "prefix", cfg.Prefix)
|
||||
require.Equal(t, "s3.example.com", cfg.Endpoint)
|
||||
require.True(t, cfg.PathStyle)
|
||||
}
|
||||
|
||||
func TestParseURL_EndpointHTTP(t *testing.T) {
|
||||
cfg, err := ParseURL("s3://AKID:SECRET@my-bucket?region=us-east-1&endpoint=http://localhost:9000")
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, "localhost:9000", cfg.Endpoint)
|
||||
require.True(t, cfg.PathStyle)
|
||||
}
|
||||
|
||||
func TestParseURL_EndpointTrailingSlash(t *testing.T) {
|
||||
cfg, err := ParseURL("s3://AKID:SECRET@my-bucket?region=us-east-1&endpoint=https://s3.example.com/")
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, "s3.example.com", cfg.Endpoint)
|
||||
}
|
||||
|
||||
func TestParseURL_NestedPrefix(t *testing.T) {
|
||||
cfg, err := ParseURL("s3://AKID:SECRET@my-bucket/a/b/c?region=us-east-1")
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, "my-bucket", cfg.Bucket)
|
||||
require.Equal(t, "a/b/c", cfg.Prefix)
|
||||
}
|
||||
|
||||
func TestParseURL_MissingRegion(t *testing.T) {
|
||||
_, err := ParseURL("s3://AKID:SECRET@my-bucket")
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "region")
|
||||
}
|
||||
|
||||
func TestParseURL_MissingCredentials(t *testing.T) {
|
||||
_, err := ParseURL("s3://my-bucket?region=us-east-1")
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "access key")
|
||||
}
|
||||
|
||||
func TestParseURL_MissingSecretKey(t *testing.T) {
|
||||
_, err := ParseURL("s3://AKID@my-bucket?region=us-east-1")
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "secret key")
|
||||
}
|
||||
|
||||
func TestParseURL_WrongScheme(t *testing.T) {
|
||||
_, err := ParseURL("http://AKID:SECRET@my-bucket?region=us-east-1")
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "scheme")
|
||||
}
|
||||
|
||||
func TestParseURL_EmptyBucket(t *testing.T) {
|
||||
_, err := ParseURL("s3://AKID:SECRET@?region=us-east-1")
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "bucket")
|
||||
}
|
||||
|
||||
// --- Unit tests: URL construction ---
|
||||
|
||||
func TestClient_BucketURL_PathStyle(t *testing.T) {
|
||||
c := &Client{Endpoint: "s3.example.com", Bucket: "my-bucket", PathStyle: true}
|
||||
require.Equal(t, "https://s3.example.com/my-bucket", c.bucketURL())
|
||||
}
|
||||
|
||||
func TestClient_BucketURL_VirtualHosted(t *testing.T) {
|
||||
c := &Client{Endpoint: "s3.us-east-1.amazonaws.com", Bucket: "my-bucket", PathStyle: false}
|
||||
require.Equal(t, "https://my-bucket.s3.us-east-1.amazonaws.com", c.bucketURL())
|
||||
}
|
||||
|
||||
func TestClient_ObjectURL_PathStyle(t *testing.T) {
|
||||
c := &Client{Endpoint: "s3.example.com", Bucket: "my-bucket", PathStyle: true}
|
||||
require.Equal(t, "https://s3.example.com/my-bucket/prefix/obj", c.objectURL("prefix/obj"))
|
||||
}
|
||||
|
||||
func TestClient_ObjectURL_VirtualHosted(t *testing.T) {
|
||||
c := &Client{Endpoint: "s3.us-east-1.amazonaws.com", Bucket: "my-bucket", PathStyle: false}
|
||||
require.Equal(t, "https://my-bucket.s3.us-east-1.amazonaws.com/prefix/obj", c.objectURL("prefix/obj"))
|
||||
}
|
||||
|
||||
func TestClient_HostHeader_PathStyle(t *testing.T) {
|
||||
c := &Client{Endpoint: "s3.example.com", Bucket: "my-bucket", PathStyle: true}
|
||||
require.Equal(t, "s3.example.com", c.hostHeader())
|
||||
}
|
||||
|
||||
func TestClient_HostHeader_VirtualHosted(t *testing.T) {
|
||||
c := &Client{Endpoint: "s3.us-east-1.amazonaws.com", Bucket: "my-bucket", PathStyle: false}
|
||||
require.Equal(t, "my-bucket.s3.us-east-1.amazonaws.com", c.hostHeader())
|
||||
}
|
||||
|
||||
func TestClient_ObjectKey(t *testing.T) {
|
||||
c := &Client{Prefix: "attachments"}
|
||||
require.Equal(t, "attachments/file123", c.objectKey("file123"))
|
||||
|
||||
c2 := &Client{Prefix: ""}
|
||||
require.Equal(t, "file123", c2.objectKey("file123"))
|
||||
}
|
||||
|
||||
func TestClient_PrefixForList(t *testing.T) {
|
||||
c := &Client{Prefix: "attachments"}
|
||||
require.Equal(t, "attachments/", c.prefixForList())
|
||||
|
||||
c2 := &Client{Prefix: ""}
|
||||
require.Equal(t, "", c2.prefixForList())
|
||||
}
|
||||
|
||||
// --- Integration tests using mock S3 server ---
|
||||
|
||||
func TestClient_PutGetObject(t *testing.T) {
|
||||
server, _ := newMockS3Server()
|
||||
defer server.Close()
|
||||
client := newTestClient(server, "my-bucket", "")
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Put
|
||||
err := client.PutObject(ctx, "test-key", strings.NewReader("hello world"), 11)
|
||||
require.Nil(t, err)
|
||||
|
||||
// Get
|
||||
reader, size, err := client.GetObject(ctx, "test-key")
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, int64(11), size)
|
||||
data, err := io.ReadAll(reader)
|
||||
reader.Close()
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, "hello world", string(data))
|
||||
}
|
||||
|
||||
func TestClient_PutGetObject_WithPrefix(t *testing.T) {
|
||||
server, _ := newMockS3Server()
|
||||
defer server.Close()
|
||||
client := newTestClient(server, "my-bucket", "pfx")
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
err := client.PutObject(ctx, "test-key", strings.NewReader("hello"), 5)
|
||||
require.Nil(t, err)
|
||||
|
||||
reader, _, err := client.GetObject(ctx, "test-key")
|
||||
require.Nil(t, err)
|
||||
data, _ := io.ReadAll(reader)
|
||||
reader.Close()
|
||||
require.Equal(t, "hello", string(data))
|
||||
}
|
||||
|
||||
func TestClient_GetObject_NotFound(t *testing.T) {
|
||||
server, _ := newMockS3Server()
|
||||
defer server.Close()
|
||||
client := newTestClient(server, "my-bucket", "")
|
||||
|
||||
_, _, err := client.GetObject(context.Background(), "nonexistent")
|
||||
require.Error(t, err)
|
||||
var errResp *ErrorResponse
|
||||
require.ErrorAs(t, err, &errResp)
|
||||
require.Equal(t, 404, errResp.StatusCode)
|
||||
require.Equal(t, "NoSuchKey", errResp.Code)
|
||||
}
|
||||
|
||||
func TestClient_DeleteObjects(t *testing.T) {
|
||||
server, mock := newMockS3Server()
|
||||
defer server.Close()
|
||||
client := newTestClient(server, "my-bucket", "")
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Put several objects
|
||||
for i := 0; i < 5; i++ {
|
||||
err := client.PutObject(ctx, fmt.Sprintf("key-%d", i), bytes.NewReader([]byte("data")), 4)
|
||||
require.Nil(t, err)
|
||||
}
|
||||
require.Equal(t, 5, mock.objectCount())
|
||||
|
||||
// Delete some
|
||||
err := client.DeleteObjects(ctx, []string{"key-1", "key-3"})
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, 3, mock.objectCount())
|
||||
|
||||
// Verify deleted ones are gone
|
||||
_, _, err = client.GetObject(ctx, "key-1")
|
||||
require.Error(t, err)
|
||||
_, _, err = client.GetObject(ctx, "key-3")
|
||||
require.Error(t, err)
|
||||
|
||||
// Verify remaining ones are still there
|
||||
reader, _, err := client.GetObject(ctx, "key-0")
|
||||
require.Nil(t, err)
|
||||
reader.Close()
|
||||
}
|
||||
|
||||
func TestClient_ListObjects(t *testing.T) {
|
||||
server, _ := newMockS3Server()
|
||||
defer server.Close()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Client with prefix "pfx": list should only return objects under pfx/
|
||||
client := newTestClient(server, "my-bucket", "pfx")
|
||||
for i := 0; i < 3; i++ {
|
||||
err := client.PutObject(ctx, fmt.Sprintf("%d", i), bytes.NewReader([]byte("x")), 1)
|
||||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
// Also put an object outside the prefix using a no-prefix client
|
||||
clientNoPrefix := newTestClient(server, "my-bucket", "")
|
||||
err := clientNoPrefix.PutObject(ctx, "other", bytes.NewReader([]byte("y")), 1)
|
||||
require.Nil(t, err)
|
||||
|
||||
// List with prefix client: should only see 3
|
||||
result, err := client.ListObjects(ctx, "", 0)
|
||||
require.Nil(t, err)
|
||||
require.Len(t, result.Objects, 3)
|
||||
require.False(t, result.IsTruncated)
|
||||
|
||||
// List with no-prefix client: should see all 4
|
||||
result, err = clientNoPrefix.ListObjects(ctx, "", 0)
|
||||
require.Nil(t, err)
|
||||
require.Len(t, result.Objects, 4)
|
||||
}
|
||||
|
||||
func TestClient_ListObjects_Pagination(t *testing.T) {
|
||||
server, _ := newMockS3Server()
|
||||
defer server.Close()
|
||||
client := newTestClient(server, "my-bucket", "")
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Put 5 objects
|
||||
for i := 0; i < 5; i++ {
|
||||
err := client.PutObject(ctx, fmt.Sprintf("key-%02d", i), bytes.NewReader([]byte("x")), 1)
|
||||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
// List with max-keys=2
|
||||
result, err := client.ListObjects(ctx, "", 2)
|
||||
require.Nil(t, err)
|
||||
require.Len(t, result.Objects, 2)
|
||||
require.True(t, result.IsTruncated)
|
||||
require.NotEmpty(t, result.NextContinuationToken)
|
||||
|
||||
// Get next page
|
||||
result2, err := client.ListObjects(ctx, result.NextContinuationToken, 2)
|
||||
require.Nil(t, err)
|
||||
require.Len(t, result2.Objects, 2)
|
||||
require.True(t, result2.IsTruncated)
|
||||
|
||||
// Get last page
|
||||
result3, err := client.ListObjects(ctx, result2.NextContinuationToken, 2)
|
||||
require.Nil(t, err)
|
||||
require.Len(t, result3.Objects, 1)
|
||||
require.False(t, result3.IsTruncated)
|
||||
}
|
||||
|
||||
func TestClient_ListAllObjects(t *testing.T) {
|
||||
server, _ := newMockS3Server()
|
||||
defer server.Close()
|
||||
client := newTestClient(server, "my-bucket", "pfx")
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
err := client.PutObject(ctx, fmt.Sprintf("key-%02d", i), bytes.NewReader([]byte("x")), 1)
|
||||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
objects, err := client.ListAllObjects(ctx)
|
||||
require.Nil(t, err)
|
||||
require.Len(t, objects, 10)
|
||||
}
|
||||
|
||||
func TestClient_PutObject_LargeBody(t *testing.T) {
|
||||
server, _ := newMockS3Server()
|
||||
defer server.Close()
|
||||
client := newTestClient(server, "my-bucket", "")
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// 1 MB object
|
||||
data := make([]byte, 1024*1024)
|
||||
for i := range data {
|
||||
data[i] = byte(i % 256)
|
||||
}
|
||||
err := client.PutObject(ctx, "large", bytes.NewReader(data), int64(len(data)))
|
||||
require.Nil(t, err)
|
||||
|
||||
reader, size, err := client.GetObject(ctx, "large")
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, int64(1024*1024), size)
|
||||
got, err := io.ReadAll(reader)
|
||||
reader.Close()
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, data, got)
|
||||
}
|
||||
|
||||
func TestClient_PutObject_NestedKey(t *testing.T) {
|
||||
server, _ := newMockS3Server()
|
||||
defer server.Close()
|
||||
client := newTestClient(server, "my-bucket", "")
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
err := client.PutObject(ctx, "deep/nested/prefix/file.txt", strings.NewReader("nested"), 6)
|
||||
require.Nil(t, err)
|
||||
|
||||
reader, _, err := client.GetObject(ctx, "deep/nested/prefix/file.txt")
|
||||
require.Nil(t, err)
|
||||
data, _ := io.ReadAll(reader)
|
||||
reader.Close()
|
||||
require.Equal(t, "nested", string(data))
|
||||
}
|
||||
|
||||
// --- Scale test: 20k objects (ntfy-adjacent) ---
|
||||
|
||||
func TestClient_ListAllObjects_20k(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping 20k object test in short mode")
|
||||
}
|
||||
|
||||
server, _ := newMockS3Server()
|
||||
defer server.Close()
|
||||
client := newTestClient(server, "my-bucket", "attachments")
|
||||
|
||||
ctx := context.Background()
|
||||
const numObjects = 20000
|
||||
const batchSize = 500
|
||||
|
||||
// Insert 20k objects in batches to keep it fast
|
||||
for batch := 0; batch < numObjects/batchSize; batch++ {
|
||||
for i := 0; i < batchSize; i++ {
|
||||
idx := batch*batchSize + i
|
||||
key := fmt.Sprintf("%08d", idx)
|
||||
err := client.PutObject(ctx, key, bytes.NewReader([]byte("x")), 1)
|
||||
require.Nil(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
// List all 20k objects with pagination
|
||||
objects, err := client.ListAllObjects(ctx)
|
||||
require.Nil(t, err)
|
||||
require.Len(t, objects, numObjects)
|
||||
|
||||
// Verify total size
|
||||
var totalSize int64
|
||||
for _, obj := range objects {
|
||||
totalSize += obj.Size
|
||||
}
|
||||
require.Equal(t, int64(numObjects), totalSize)
|
||||
|
||||
// Delete 1000 objects (simulating attachment expiry cleanup)
|
||||
keys := make([]string, 1000)
|
||||
for i := range keys {
|
||||
keys[i] = fmt.Sprintf("%08d", i)
|
||||
}
|
||||
err = client.DeleteObjects(ctx, keys)
|
||||
require.Nil(t, err)
|
||||
|
||||
// List again: should have 19000
|
||||
objects, err = client.ListAllObjects(ctx)
|
||||
require.Nil(t, err)
|
||||
require.Len(t, objects, numObjects-1000)
|
||||
}
|
||||
|
||||
// --- Real S3 integration test ---
|
||||
//
|
||||
// Set the following environment variables to run this test against a real S3 bucket:
|
||||
//
|
||||
// S3_ACCESS_KEY, S3_SECRET_KEY, S3_REGION, S3_BUCKET
|
||||
//
|
||||
// Optional:
|
||||
//
|
||||
// S3_ENDPOINT: host[:port] for S3-compatible providers (e.g. "nyc3.digitaloceanspaces.com")
|
||||
// S3_PATH_STYLE: set to "true" for path-style addressing
|
||||
// S3_PREFIX: key prefix to use (default: "ntfy-s3-test")
|
||||
func TestClient_RealBucket(t *testing.T) {
|
||||
accessKey := os.Getenv("S3_ACCESS_KEY")
|
||||
secretKey := os.Getenv("S3_SECRET_KEY")
|
||||
region := os.Getenv("S3_REGION")
|
||||
bucket := os.Getenv("S3_BUCKET")
|
||||
|
||||
if accessKey == "" || secretKey == "" || region == "" || bucket == "" {
|
||||
t.Skip("skipping real S3 test: set S3_ACCESS_KEY, S3_SECRET_KEY, S3_REGION, S3_BUCKET")
|
||||
}
|
||||
|
||||
endpoint := os.Getenv("S3_ENDPOINT")
|
||||
if endpoint == "" {
|
||||
endpoint = fmt.Sprintf("s3.%s.amazonaws.com", region)
|
||||
}
|
||||
pathStyle := os.Getenv("S3_PATH_STYLE") == "true"
|
||||
prefix := os.Getenv("S3_PREFIX")
|
||||
if prefix == "" {
|
||||
prefix = "ntfy-s3-test"
|
||||
}
|
||||
|
||||
client := &Client{
|
||||
AccessKey: accessKey,
|
||||
SecretKey: secretKey,
|
||||
Region: region,
|
||||
Endpoint: endpoint,
|
||||
Bucket: bucket,
|
||||
Prefix: prefix,
|
||||
PathStyle: pathStyle,
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Clean up any leftover objects from previous runs
|
||||
existing, err := client.ListAllObjects(ctx)
|
||||
require.Nil(t, err)
|
||||
if len(existing) > 0 {
|
||||
keys := make([]string, len(existing))
|
||||
for i, obj := range existing {
|
||||
// Strip the prefix since DeleteObjects will re-add it
|
||||
keys[i] = strings.TrimPrefix(obj.Key, prefix+"/")
|
||||
}
|
||||
// Batch delete in groups of 1000
|
||||
for i := 0; i < len(keys); i += 1000 {
|
||||
end := i + 1000
|
||||
if end > len(keys) {
|
||||
end = len(keys)
|
||||
}
|
||||
err := client.DeleteObjects(ctx, keys[i:end])
|
||||
require.Nil(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("PutGetDelete", func(t *testing.T) {
|
||||
key := "test-object"
|
||||
content := "hello from ntfy s3 test"
|
||||
|
||||
// Put
|
||||
err := client.PutObject(ctx, key, strings.NewReader(content), int64(len(content)))
|
||||
require.Nil(t, err)
|
||||
|
||||
// Get
|
||||
reader, size, err := client.GetObject(ctx, key)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, int64(len(content)), size)
|
||||
data, err := io.ReadAll(reader)
|
||||
reader.Close()
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, content, string(data))
|
||||
|
||||
// Delete
|
||||
err = client.DeleteObjects(ctx, []string{key})
|
||||
require.Nil(t, err)
|
||||
|
||||
// Get after delete should fail
|
||||
_, _, err = client.GetObject(ctx, key)
|
||||
require.Error(t, err)
|
||||
var errResp *ErrorResponse
|
||||
require.ErrorAs(t, err, &errResp)
|
||||
require.Equal(t, 404, errResp.StatusCode)
|
||||
})
|
||||
|
||||
t.Run("ListObjects", func(t *testing.T) {
|
||||
// Use a sub-prefix client for isolation
|
||||
listClient := &Client{
|
||||
AccessKey: accessKey,
|
||||
SecretKey: secretKey,
|
||||
Region: region,
|
||||
Endpoint: endpoint,
|
||||
Bucket: bucket,
|
||||
Prefix: prefix + "/list-test",
|
||||
PathStyle: pathStyle,
|
||||
}
|
||||
|
||||
// Put 10 objects
|
||||
for i := 0; i < 10; i++ {
|
||||
err := listClient.PutObject(ctx, fmt.Sprintf("%d", i), strings.NewReader("x"), 1)
|
||||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
// List
|
||||
objects, err := listClient.ListAllObjects(ctx)
|
||||
require.Nil(t, err)
|
||||
require.Len(t, objects, 10)
|
||||
|
||||
// Clean up
|
||||
keys := make([]string, 10)
|
||||
for i := range keys {
|
||||
keys[i] = fmt.Sprintf("%d", i)
|
||||
}
|
||||
err = listClient.DeleteObjects(ctx, keys)
|
||||
require.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("LargeObject", func(t *testing.T) {
|
||||
key := "large-object"
|
||||
data := make([]byte, 5*1024*1024) // 5 MB
|
||||
for i := range data {
|
||||
data[i] = byte(i % 256)
|
||||
}
|
||||
|
||||
err := client.PutObject(ctx, key, bytes.NewReader(data), int64(len(data)))
|
||||
require.Nil(t, err)
|
||||
|
||||
reader, size, err := client.GetObject(ctx, key)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, int64(len(data)), size)
|
||||
got, err := io.ReadAll(reader)
|
||||
reader.Close()
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, data, got)
|
||||
|
||||
err = client.DeleteObjects(ctx, []string{key})
|
||||
require.Nil(t, err)
|
||||
})
|
||||
}
|
||||
65
s3/types.go
Normal file
65
s3/types.go
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
package s3
|
||||
|
||||
import "fmt"
|
||||
|
||||
// Config holds the parsed fields from an S3 URL. Use ParseURL to create one from a URL string.
|
||||
type Config struct {
|
||||
Endpoint string // host[:port] only, e.g. "s3.us-east-1.amazonaws.com"
|
||||
PathStyle bool
|
||||
Bucket string
|
||||
Prefix string
|
||||
Region string
|
||||
AccessKey string
|
||||
SecretKey string
|
||||
}
|
||||
|
||||
// Object represents an S3 object returned by list operations.
|
||||
type Object struct {
|
||||
Key string
|
||||
Size int64
|
||||
}
|
||||
|
||||
// ListResult holds the response from a ListObjectsV2 call.
|
||||
type ListResult struct {
|
||||
Objects []Object
|
||||
IsTruncated bool
|
||||
NextContinuationToken string
|
||||
}
|
||||
|
||||
// ErrorResponse is returned when S3 responds with a non-2xx status code.
|
||||
type ErrorResponse struct {
|
||||
StatusCode int
|
||||
Code string `xml:"Code"`
|
||||
Message string `xml:"Message"`
|
||||
Body string `xml:"-"` // raw response body
|
||||
}
|
||||
|
||||
func (e *ErrorResponse) Error() string {
|
||||
if e.Code != "" {
|
||||
return fmt.Sprintf("s3: %s (HTTP %d): %s", e.Code, e.StatusCode, e.Message)
|
||||
}
|
||||
return fmt.Sprintf("s3: HTTP %d: %s", e.StatusCode, e.Body)
|
||||
}
|
||||
|
||||
// listObjectsV2Response is the XML response from S3 ListObjectsV2
|
||||
type listObjectsV2Response struct {
|
||||
Contents []listObject `xml:"Contents"`
|
||||
IsTruncated bool `xml:"IsTruncated"`
|
||||
NextContinuationToken string `xml:"NextContinuationToken"`
|
||||
}
|
||||
|
||||
type listObject struct {
|
||||
Key string `xml:"Key"`
|
||||
Size int64 `xml:"Size"`
|
||||
}
|
||||
|
||||
// deleteResult is the XML response from S3 DeleteObjects
|
||||
type deleteResult struct {
|
||||
Errors []deleteError `xml:"Error"`
|
||||
}
|
||||
|
||||
type deleteError struct {
|
||||
Key string `xml:"Key"`
|
||||
Code string `xml:"Code"`
|
||||
Message string `xml:"Message"`
|
||||
}
|
||||
161
s3/util.go
Normal file
161
s3/util.go
Normal file
|
|
@ -0,0 +1,161 @@
|
|||
package s3
|
||||
|
||||
import (
|
||||
"crypto/hmac"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
// SHA-256 hash of the empty string, used as the payload hash for bodiless requests
|
||||
emptyPayloadHash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
|
||||
|
||||
// Sent as the payload hash for streaming uploads where the body is not buffered in memory
|
||||
unsignedPayload = "UNSIGNED-PAYLOAD"
|
||||
|
||||
// maxResponseBytes caps the size of S3 response bodies we read into memory (10 MB)
|
||||
maxResponseBytes = 10 * 1024 * 1024
|
||||
)
|
||||
|
||||
// ParseURL parses an S3 URL of the form:
|
||||
//
|
||||
// s3://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]?region=REGION[&endpoint=ENDPOINT]
|
||||
//
|
||||
// When endpoint is specified, path-style addressing is enabled automatically.
|
||||
func ParseURL(s3URL string) (*Config, error) {
|
||||
u, err := url.Parse(s3URL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("s3: invalid URL: %w", err)
|
||||
}
|
||||
if u.Scheme != "s3" {
|
||||
return nil, fmt.Errorf("s3: URL scheme must be 's3', got '%s'", u.Scheme)
|
||||
}
|
||||
if u.Host == "" {
|
||||
return nil, fmt.Errorf("s3: bucket name must be specified as host")
|
||||
}
|
||||
bucket := u.Host
|
||||
prefix := strings.TrimPrefix(u.Path, "/")
|
||||
accessKey := u.User.Username()
|
||||
secretKey, _ := u.User.Password()
|
||||
if accessKey == "" || secretKey == "" {
|
||||
return nil, fmt.Errorf("s3: access key and secret key must be specified in URL")
|
||||
}
|
||||
region := u.Query().Get("region")
|
||||
if region == "" {
|
||||
return nil, fmt.Errorf("s3: region query parameter is required")
|
||||
}
|
||||
endpointParam := u.Query().Get("endpoint")
|
||||
var endpoint string
|
||||
var pathStyle bool
|
||||
if endpointParam != "" {
|
||||
// Custom endpoint: strip scheme prefix to extract host[:port]
|
||||
ep := strings.TrimRight(endpointParam, "/")
|
||||
ep = strings.TrimPrefix(ep, "https://")
|
||||
ep = strings.TrimPrefix(ep, "http://")
|
||||
endpoint = ep
|
||||
pathStyle = true
|
||||
} else {
|
||||
endpoint = fmt.Sprintf("s3.%s.amazonaws.com", region)
|
||||
pathStyle = false
|
||||
}
|
||||
return &Config{
|
||||
Endpoint: endpoint,
|
||||
PathStyle: pathStyle,
|
||||
Bucket: bucket,
|
||||
Prefix: prefix,
|
||||
Region: region,
|
||||
AccessKey: accessKey,
|
||||
SecretKey: secretKey,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// parseError reads an S3 error response and returns an *ErrorResponse.
|
||||
func parseError(resp *http.Response) error {
|
||||
body, err := io.ReadAll(io.LimitReader(resp.Body, maxResponseBytes))
|
||||
if err != nil {
|
||||
return fmt.Errorf("s3: reading error response: %w", err)
|
||||
}
|
||||
return parseErrorFromBytes(resp.StatusCode, body)
|
||||
}
|
||||
|
||||
func parseErrorFromBytes(statusCode int, body []byte) error {
|
||||
errResp := &ErrorResponse{
|
||||
StatusCode: statusCode,
|
||||
Body: string(body),
|
||||
}
|
||||
// Try to parse XML error; if it fails, we still have StatusCode and Body
|
||||
_ = xml.Unmarshal(body, errResp)
|
||||
return errResp
|
||||
}
|
||||
|
||||
// canonicalURI returns the URI-encoded path for the canonical request. Each path segment is
|
||||
// percent-encoded per RFC 3986; forward slashes are preserved.
|
||||
func canonicalURI(u *url.URL) string {
|
||||
p := u.Path
|
||||
if p == "" {
|
||||
return "/"
|
||||
}
|
||||
segments := strings.Split(p, "/")
|
||||
for i, seg := range segments {
|
||||
segments[i] = uriEncode(seg)
|
||||
}
|
||||
return strings.Join(segments, "/")
|
||||
}
|
||||
|
||||
// canonicalQueryString builds the query string for the canonical request. Keys and values
|
||||
// are URI-encoded per RFC 3986 (using %20, not +) and sorted lexically by key.
|
||||
func canonicalQueryString(values url.Values) string {
|
||||
if len(values) == 0 {
|
||||
return ""
|
||||
}
|
||||
keys := make([]string, 0, len(values))
|
||||
for k := range values {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
var pairs []string
|
||||
for _, k := range keys {
|
||||
ek := uriEncode(k)
|
||||
vs := make([]string, len(values[k]))
|
||||
copy(vs, values[k])
|
||||
sort.Strings(vs)
|
||||
for _, v := range vs {
|
||||
pairs = append(pairs, ek+"="+uriEncode(v))
|
||||
}
|
||||
}
|
||||
return strings.Join(pairs, "&")
|
||||
}
|
||||
|
||||
// uriEncode percent-encodes a string per RFC 3986, encoding everything except unreserved
|
||||
// characters (A-Z a-z 0-9 - _ . ~).
|
||||
func uriEncode(s string) string {
|
||||
var buf strings.Builder
|
||||
for i := 0; i < len(s); i++ {
|
||||
b := s[i]
|
||||
if (b >= 'A' && b <= 'Z') || (b >= 'a' && b <= 'z') || (b >= '0' && b <= '9') ||
|
||||
b == '-' || b == '_' || b == '.' || b == '~' {
|
||||
buf.WriteByte(b)
|
||||
} else {
|
||||
fmt.Fprintf(&buf, "%%%02X", b)
|
||||
}
|
||||
}
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
func sha256Hex(data []byte) string {
|
||||
h := sha256.Sum256(data)
|
||||
return hex.EncodeToString(h[:])
|
||||
}
|
||||
|
||||
func hmacSHA256(key, data []byte) []byte {
|
||||
h := hmac.New(sha256.New, key)
|
||||
h.Write(data)
|
||||
return h.Sum(nil)
|
||||
}
|
||||
181
s3/util_test.go
Normal file
181
s3/util_test.go
Normal file
|
|
@ -0,0 +1,181 @@
|
|||
package s3
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestURIEncode(t *testing.T) {
|
||||
// Unreserved characters are not encoded
|
||||
require.Equal(t, "abcdefghijklmnopqrstuvwxyz", uriEncode("abcdefghijklmnopqrstuvwxyz"))
|
||||
require.Equal(t, "ABCDEFGHIJKLMNOPQRSTUVWXYZ", uriEncode("ABCDEFGHIJKLMNOPQRSTUVWXYZ"))
|
||||
require.Equal(t, "0123456789", uriEncode("0123456789"))
|
||||
require.Equal(t, "-_.~", uriEncode("-_.~"))
|
||||
|
||||
// Spaces use %20, not +
|
||||
require.Equal(t, "hello%20world", uriEncode("hello world"))
|
||||
|
||||
// Slashes are encoded (canonicalURI handles slash splitting separately)
|
||||
require.Equal(t, "a%2Fb", uriEncode("a/b"))
|
||||
|
||||
// Special characters
|
||||
require.Equal(t, "%2B", uriEncode("+"))
|
||||
require.Equal(t, "%3D", uriEncode("="))
|
||||
require.Equal(t, "%26", uriEncode("&"))
|
||||
require.Equal(t, "%40", uriEncode("@"))
|
||||
require.Equal(t, "%23", uriEncode("#"))
|
||||
|
||||
// Mixed
|
||||
require.Equal(t, "test~file-name_1.txt", uriEncode("test~file-name_1.txt"))
|
||||
require.Equal(t, "key%20with%20spaces%2Fand%2Fslashes", uriEncode("key with spaces/and/slashes"))
|
||||
|
||||
// Empty string
|
||||
require.Equal(t, "", uriEncode(""))
|
||||
}
|
||||
|
||||
func TestCanonicalURI(t *testing.T) {
|
||||
// Simple path
|
||||
u, _ := url.Parse("https://example.com/bucket/key")
|
||||
require.Equal(t, "/bucket/key", canonicalURI(u))
|
||||
|
||||
// Root path
|
||||
u, _ = url.Parse("https://example.com/")
|
||||
require.Equal(t, "/", canonicalURI(u))
|
||||
|
||||
// Empty path
|
||||
u, _ = url.Parse("https://example.com")
|
||||
require.Equal(t, "/", canonicalURI(u))
|
||||
|
||||
// Path with special characters
|
||||
u, _ = url.Parse("https://example.com/bucket/key%20with%20spaces")
|
||||
require.Equal(t, "/bucket/key%20with%20spaces", canonicalURI(u))
|
||||
|
||||
// Nested path
|
||||
u, _ = url.Parse("https://example.com/bucket/a/b/c/file.txt")
|
||||
require.Equal(t, "/bucket/a/b/c/file.txt", canonicalURI(u))
|
||||
}
|
||||
|
||||
func TestCanonicalQueryString(t *testing.T) {
|
||||
// Multiple keys sorted alphabetically
|
||||
vals := url.Values{
|
||||
"prefix": {"test/"},
|
||||
"list-type": {"2"},
|
||||
}
|
||||
require.Equal(t, "list-type=2&prefix=test%2F", canonicalQueryString(vals))
|
||||
|
||||
// Empty values
|
||||
require.Equal(t, "", canonicalQueryString(url.Values{}))
|
||||
|
||||
// Single key
|
||||
require.Equal(t, "key=value", canonicalQueryString(url.Values{"key": {"value"}}))
|
||||
|
||||
// Key with multiple values (sorted)
|
||||
vals = url.Values{"key": {"b", "a"}}
|
||||
require.Equal(t, "key=a&key=b", canonicalQueryString(vals))
|
||||
|
||||
// Keys requiring encoding
|
||||
vals = url.Values{"continuation-token": {"abc+def"}}
|
||||
require.Equal(t, "continuation-token=abc%2Bdef", canonicalQueryString(vals))
|
||||
}
|
||||
|
||||
func TestSHA256Hex(t *testing.T) {
|
||||
// SHA-256 of empty string
|
||||
require.Equal(t, emptyPayloadHash, sha256Hex([]byte("")))
|
||||
|
||||
// SHA-256 of known value
|
||||
require.Equal(t, "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824", sha256Hex([]byte("hello")))
|
||||
}
|
||||
|
||||
func TestHmacSHA256(t *testing.T) {
|
||||
// Known test vector: HMAC-SHA256("key", "message")
|
||||
result := hmacSHA256([]byte("key"), []byte("message"))
|
||||
require.Len(t, result, 32) // SHA-256 produces 32 bytes
|
||||
require.NotEqual(t, make([]byte, 32), result)
|
||||
|
||||
// Same inputs should produce same output
|
||||
result2 := hmacSHA256([]byte("key"), []byte("message"))
|
||||
require.Equal(t, result, result2)
|
||||
|
||||
// Different inputs should produce different output
|
||||
result3 := hmacSHA256([]byte("different-key"), []byte("message"))
|
||||
require.NotEqual(t, result, result3)
|
||||
}
|
||||
|
||||
func TestSignV4_SetsRequiredHeaders(t *testing.T) {
|
||||
c := &Client{
|
||||
AccessKey: "AKID",
|
||||
SecretKey: "SECRET",
|
||||
Region: "us-east-1",
|
||||
Endpoint: "s3.us-east-1.amazonaws.com",
|
||||
Bucket: "my-bucket",
|
||||
}
|
||||
|
||||
req, _ := http.NewRequest(http.MethodGet, "https://my-bucket.s3.us-east-1.amazonaws.com/test-key", nil)
|
||||
c.signV4(req, emptyPayloadHash)
|
||||
|
||||
// All required SigV4 headers must be set
|
||||
require.NotEmpty(t, req.Header.Get("Host"))
|
||||
require.NotEmpty(t, req.Header.Get("X-Amz-Date"))
|
||||
require.Equal(t, emptyPayloadHash, req.Header.Get("X-Amz-Content-Sha256"))
|
||||
|
||||
// Authorization header must have correct format
|
||||
auth := req.Header.Get("Authorization")
|
||||
require.Contains(t, auth, "AWS4-HMAC-SHA256")
|
||||
require.Contains(t, auth, "Credential=AKID/")
|
||||
require.Contains(t, auth, "/us-east-1/s3/aws4_request")
|
||||
require.Contains(t, auth, "SignedHeaders=")
|
||||
require.Contains(t, auth, "Signature=")
|
||||
}
|
||||
|
||||
func TestSignV4_UnsignedPayload(t *testing.T) {
|
||||
c := &Client{
|
||||
AccessKey: "AKID",
|
||||
SecretKey: "SECRET",
|
||||
Region: "us-east-1",
|
||||
Endpoint: "s3.us-east-1.amazonaws.com",
|
||||
Bucket: "my-bucket",
|
||||
}
|
||||
|
||||
req, _ := http.NewRequest(http.MethodPut, "https://my-bucket.s3.us-east-1.amazonaws.com/test-key", nil)
|
||||
c.signV4(req, unsignedPayload)
|
||||
|
||||
require.Equal(t, unsignedPayload, req.Header.Get("X-Amz-Content-Sha256"))
|
||||
}
|
||||
|
||||
func TestSignV4_DifferentRegions(t *testing.T) {
|
||||
c1 := &Client{AccessKey: "AKID", SecretKey: "SECRET", Region: "us-east-1", Endpoint: "s3.us-east-1.amazonaws.com", Bucket: "b"}
|
||||
c2 := &Client{AccessKey: "AKID", SecretKey: "SECRET", Region: "eu-west-1", Endpoint: "s3.eu-west-1.amazonaws.com", Bucket: "b"}
|
||||
|
||||
req1, _ := http.NewRequest(http.MethodGet, "https://b.s3.us-east-1.amazonaws.com/key", nil)
|
||||
c1.signV4(req1, emptyPayloadHash)
|
||||
|
||||
req2, _ := http.NewRequest(http.MethodGet, "https://b.s3.eu-west-1.amazonaws.com/key", nil)
|
||||
c2.signV4(req2, emptyPayloadHash)
|
||||
|
||||
// Different regions should produce different signatures
|
||||
require.NotEqual(t, req1.Header.Get("Authorization"), req2.Header.Get("Authorization"))
|
||||
}
|
||||
|
||||
func TestParseError_XMLResponse(t *testing.T) {
|
||||
xmlBody := []byte(`<?xml version="1.0" encoding="UTF-8"?><Error><Code>NoSuchKey</Code><Message>The specified key does not exist.</Message></Error>`)
|
||||
err := parseErrorFromBytes(404, xmlBody)
|
||||
|
||||
var errResp *ErrorResponse
|
||||
require.ErrorAs(t, err, &errResp)
|
||||
require.Equal(t, 404, errResp.StatusCode)
|
||||
require.Equal(t, "NoSuchKey", errResp.Code)
|
||||
require.Equal(t, "The specified key does not exist.", errResp.Message)
|
||||
}
|
||||
|
||||
func TestParseError_NonXMLResponse(t *testing.T) {
|
||||
err := parseErrorFromBytes(500, []byte("internal server error"))
|
||||
|
||||
var errResp *ErrorResponse
|
||||
require.ErrorAs(t, err, &errResp)
|
||||
require.Equal(t, 500, errResp.StatusCode)
|
||||
require.Equal(t, "", errResp.Code) // XML parsing failed, no code
|
||||
require.Contains(t, errResp.Body, "internal server error")
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue