diff --git a/s3/client.go b/s3/client.go index 754a1bfb..83d8195e 100644 --- a/s3/client.go +++ b/s3/client.go @@ -118,9 +118,16 @@ func (c *Client) ListObjectsV2(ctx context.Context) ([]*Object, error) { if err != nil { return nil, err } - for _, obj := range result.Objects { - obj.Key = c.config.StripPrefix(obj.Key) - all = append(all, obj) + for _, obj := range result.Contents { + var lastModified time.Time + if obj.LastModified != "" { + lastModified, _ = time.Parse(time.RFC3339, obj.LastModified) + } + all = append(all, &Object{ + Key: c.config.StripPrefix(obj.Key), + Size: obj.Size, + LastModified: lastModified, + }) } if !result.IsTruncated { return all, nil @@ -148,27 +155,11 @@ func (c *Client) listObjectsV2(ctx context.Context, continuationToken string, ma if err != nil { return nil, err } - var result listObjectsV2Response + var result listObjectsV2Result if err := xml.Unmarshal(respBody, &result); err != nil { return nil, fmt.Errorf("failed to unmarshal list object response: %w", err) } - objects := make([]*Object, len(result.Contents)) - for i, obj := range result.Contents { - var lastModified time.Time - if obj.LastModified != "" { - lastModified, _ = time.Parse(time.RFC3339, obj.LastModified) - } - objects[i] = &Object{ - Key: obj.Key, - Size: obj.Size, - LastModified: lastModified, - } - } - return &listObjectsV2Result{ - Objects: objects, - IsTruncated: result.IsTruncated, - NextContinuationToken: result.NextContinuationToken, - }, nil + return &result, nil } // DeleteObjects removes multiple objects in a single batch request. Keys are automatically @@ -222,6 +213,7 @@ func (c *Client) DeleteObjects(ctx context.Context, keys []string) error { // If body is nil, the request is sent with an empty payload. If body is non-nil, it is sent // with a computed SHA-256 payload hash and Content-Type: application/xml. func (c *Client) do(ctx context.Context, op, method, reqURL string, body []byte, headers map[string]string) ([]byte, error) { + log.Tag(tagS3Client).Trace("Performing request %s %s %s (body: %d bytes)", op, method, reqURL, len(body)) var reader io.Reader var hash string if body != nil { diff --git a/s3/client_multipart.go b/s3/client_multipart.go index f6b68784..5e98db38 100644 --- a/s3/client_multipart.go +++ b/s3/client_multipart.go @@ -26,7 +26,6 @@ func (c *Client) AbortIncompleteUploads(ctx context.Context, cutoff time.Time) e } for _, u := range uploads { if !u.Initiated.IsZero() && u.Initiated.Before(cutoff) { - log.Tag(tagS3Client).Debug("DeleteIncomplete key=%s uploadId=%s initiated=%s", u.Key, u.UploadID, u.Initiated) c.abortMultipartUpload(ctx, u.Key, u.UploadID) } } @@ -47,13 +46,13 @@ func (c *Client) listMultipartUploads(ctx context.Context) ([]*multipartUpload, query.Set("key-marker", keyMarker) query.Set("upload-id-marker", uploadIDMarker) } - respBody, err := c.do(ctx, "listMultipartUploads", http.MethodGet, c.config.BucketURL()+"?"+query.Encode(), nil, nil) + respBody, err := c.do(ctx, "ListMultipartUploads", http.MethodGet, c.config.BucketURL()+"?"+query.Encode(), nil, nil) if err != nil { return nil, err } var result listMultipartUploadsResult if err := xml.Unmarshal(respBody, &result); err != nil { - return nil, fmt.Errorf("s3: listMultipartUploads XML: %w", err) + return nil, fmt.Errorf("error unmarshalling multipart upload result: %w", err) } for _, u := range result.Uploads { var initiated time.Time @@ -75,6 +74,22 @@ func (c *Client) listMultipartUploads(ctx context.Context) ([]*multipartUpload, return nil, fmt.Errorf("s3: listMultipartUploads exceeded %d pages", maxPages) } +// abortMultipartUpload cancels an in-progress multipart upload. Called on error to clean up. +func (c *Client) abortMultipartUpload(ctx context.Context, key, uploadID string) { + log.Tag(tagS3Client).Info("Aborting multipart upload for object %s", key) + reqURL := fmt.Sprintf("%s?uploadId=%s", c.config.ObjectURL(key), url.QueryEscape(uploadID)) + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, reqURL, nil) + if err != nil { + return + } + c.signV4(req, emptyPayloadHash) + resp, err := c.http.Do(req) + if err != nil { + return + } + resp.Body.Close() +} + // putObjectMultipart uploads body using S3 multipart upload. It reads the body in partSize // chunks, uploading each as a separate part. This allows uploading without knowing the total // body size in advance. @@ -88,9 +103,9 @@ func (c *Client) putObjectMultipart(ctx context.Context, key string, body io.Rea } // Step 2: Upload parts - var parts []completedPart - buf := make([]byte, partSize) partNumber := 1 + buf := make([]byte, partSize) + var parts []*completedPart for { n, err := io.ReadFull(body, buf) if n > 0 { @@ -99,7 +114,10 @@ func (c *Client) putObjectMultipart(ctx context.Context, key string, body io.Rea c.abortMultipartUpload(ctx, key, uploadID) return uploadErr } - parts = append(parts, completedPart{PartNumber: partNumber, ETag: etag}) + parts = append(parts, &completedPart{ + PartNumber: partNumber, + ETag: etag, + }) partNumber++ } if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) { @@ -123,38 +141,36 @@ func (c *Client) initiateMultipartUpload(ctx context.Context, key string) (strin } var result initiateMultipartUploadResult if err := xml.Unmarshal(respBody, &result); err != nil { - return "", fmt.Errorf("s3: InitiateMultipartUpload XML: %w", err) + return "", fmt.Errorf("error unmarshalling initiate multipart upload response: %w", err) } - log.Tag(tagS3Client).Debug("InitiateMultipartUpload key=%s uploadId=%s", key, result.UploadID) return result.UploadID, nil } // uploadPart uploads a single part of a multipart upload and returns the ETag. func (c *Client) uploadPart(ctx context.Context, key, uploadID string, partNumber int, data []byte) (string, error) { - log.Tag(tagS3Client).Debug("UploadPart key=%s part=%d size=%d", key, partNumber, len(data)) + log.Tag(tagS3Client).Debug("Uploading multipart part for object %s, part %d, size %d", key, partNumber, len(data)) reqURL := fmt.Sprintf("%s?partNumber=%d&uploadId=%s", c.config.ObjectURL(key), partNumber, url.QueryEscape(uploadID)) req, err := http.NewRequestWithContext(ctx, http.MethodPut, reqURL, bytes.NewReader(data)) if err != nil { - return "", fmt.Errorf("s3: UploadPart request: %w", err) + return "", fmt.Errorf("error creating multipart upload part request for object %s: %w", key, err) } req.ContentLength = int64(len(data)) c.signV4(req, unsignedPayload) resp, err := c.http.Do(req) if err != nil { - return "", fmt.Errorf("s3: UploadPart: %w", err) + return "", fmt.Errorf("error uploading multipart part for object %s: %w", key, err) } defer resp.Body.Close() if !isHTTPSuccess(resp) { return "", parseError(resp) } - etag := resp.Header.Get("ETag") - return etag, nil + return resp.Header.Get("ETag"), nil } // completeMultipartUpload finalizes a multipart upload with the given parts. -func (c *Client) completeMultipartUpload(ctx context.Context, key, uploadID string, parts []completedPart) error { - log.Tag(tagS3Client).Debug("CompleteMultipartUpload key=%s uploadId=%s parts=%d", key, uploadID, len(parts)) - bodyBytes, err := xml.Marshal(completeMultipartUploadRequest{Parts: parts}) +func (c *Client) completeMultipartUpload(ctx context.Context, key, uploadID string, parts []*completedPart) error { + log.Tag(tagS3Client).Debug("Completing multipart upload for object %s, %d parts", key, len(parts)) + bodyBytes, err := xml.Marshal(&completeMultipartUploadRequest{Parts: parts}) if err != nil { return fmt.Errorf("s3: CompleteMultipartUpload marshal: %w", err) } @@ -170,19 +186,3 @@ func (c *Client) completeMultipartUpload(ctx context.Context, key, uploadID stri } return nil } - -// abortMultipartUpload cancels an in-progress multipart upload. Called on error to clean up. -func (c *Client) abortMultipartUpload(ctx context.Context, key, uploadID string) { - log.Tag(tagS3Client).Debug("AbortMultipartUpload key=%s uploadId=%s", key, uploadID) - reqURL := fmt.Sprintf("%s?uploadId=%s", c.config.ObjectURL(key), url.QueryEscape(uploadID)) - req, err := http.NewRequestWithContext(ctx, http.MethodDelete, reqURL, nil) - if err != nil { - return - } - c.signV4(req, emptyPayloadHash) - resp, err := c.http.Do(req) - if err != nil { - return - } - resp.Body.Close() -} diff --git a/s3/client_test.go b/s3/client_test.go index d488c832..d267a6a8 100644 --- a/s3/client_test.go +++ b/s3/client_test.go @@ -514,13 +514,13 @@ func TestClient_ListObjects(t *testing.T) { // List with prefix client: should only see 3 result, err := client.listObjectsV2(ctx, "", 0) require.Nil(t, err) - require.Len(t, result.Objects, 3) + require.Len(t, result.Contents, 3) require.False(t, result.IsTruncated) // List with no-prefix client: should see all 4 result, err = clientNoPrefix.listObjectsV2(ctx, "", 0) require.Nil(t, err) - require.Len(t, result.Objects, 4) + require.Len(t, result.Contents, 4) } func TestClient_ListObjects_Pagination(t *testing.T) { @@ -539,20 +539,20 @@ func TestClient_ListObjects_Pagination(t *testing.T) { // List with max-keys=2 result, err := client.listObjectsV2(ctx, "", 2) require.Nil(t, err) - require.Len(t, result.Objects, 2) + require.Len(t, result.Contents, 2) require.True(t, result.IsTruncated) require.NotEmpty(t, result.NextContinuationToken) // Get next page result2, err := client.listObjectsV2(ctx, result.NextContinuationToken, 2) require.Nil(t, err) - require.Len(t, result2.Objects, 2) + require.Len(t, result2.Contents, 2) require.True(t, result2.IsTruncated) // Get last page result3, err := client.listObjectsV2(ctx, result2.NextContinuationToken, 2) require.Nil(t, err) - require.Len(t, result3.Objects, 1) + require.Len(t, result3.Contents, 1) require.False(t, result3.IsTruncated) } diff --git a/s3/types.go b/s3/types.go index a3694bd4..1782b88d 100644 --- a/s3/types.go +++ b/s3/types.go @@ -91,20 +91,13 @@ func (e *ErrorResponse) Error() string { return fmt.Sprintf("s3: HTTP %d: %s", e.StatusCode, e.Body) } -// listObjectsV2Response is the XML response from S3 ListObjectsV2 -type listObjectsV2Response struct { +// listObjectsV2Result is the XML response from S3 ListObjectsV2 +type listObjectsV2Result struct { Contents []*listObject `xml:"Contents"` IsTruncated bool `xml:"IsTruncated"` NextContinuationToken string `xml:"NextContinuationToken"` } -// listObjectsV2Result holds the response from a single ListObjectsV2 page. -type listObjectsV2Result struct { - Objects []*Object - IsTruncated bool - NextContinuationToken string -} - type listObject struct { Key string `xml:"Key"` Size int64 `xml:"Size"` @@ -124,7 +117,7 @@ type deleteObject struct { // deleteObjectsResult is the XML response from S3 DeleteObjects type deleteObjectsResult struct { - Errors []deleteError `xml:"Error"` + Errors []*deleteError `xml:"Error"` } type deleteError struct { @@ -133,13 +126,6 @@ type deleteError struct { Message string `xml:"Message"` } -// multipartUpload represents an in-progress multipart upload returned by listMultipartUploads. -type multipartUpload struct { - Key string - UploadID string - Initiated time.Time -} - // listMultipartUploadsResult is the XML response from S3 listMultipartUploads type listMultipartUploadsResult struct { Uploads []*listUpload `xml:"Upload"` @@ -154,6 +140,13 @@ type listUpload struct { Initiated string `xml:"Initiated"` } +// multipartUpload represents an in-progress multipart upload returned by listMultipartUploads. +type multipartUpload struct { + Key string + UploadID string + Initiated time.Time +} + // initiateMultipartUploadResult is the XML response from S3 InitiateMultipartUpload type initiateMultipartUploadResult struct { UploadID string `xml:"UploadId"` @@ -161,8 +154,8 @@ type initiateMultipartUploadResult struct { // completeMultipartUploadRequest is the XML request body for S3 CompleteMultipartUpload type completeMultipartUploadRequest struct { - XMLName xml.Name `xml:"CompleteMultipartUpload"` - Parts []completedPart `xml:"Part"` + XMLName xml.Name `xml:"CompleteMultipartUpload"` + Parts []*completedPart `xml:"Part"` } // completedPart represents a successfully uploaded part for CompleteMultipartUpload