More manual review and refinement

This commit is contained in:
binwiederhier 2026-03-21 16:12:46 -04:00
parent 02ea09ab0f
commit 393f730d11
4 changed files with 62 additions and 77 deletions

View file

@ -118,9 +118,16 @@ func (c *Client) ListObjectsV2(ctx context.Context) ([]*Object, error) {
if err != nil {
return nil, err
}
for _, obj := range result.Objects {
obj.Key = c.config.StripPrefix(obj.Key)
all = append(all, obj)
for _, obj := range result.Contents {
var lastModified time.Time
if obj.LastModified != "" {
lastModified, _ = time.Parse(time.RFC3339, obj.LastModified)
}
all = append(all, &Object{
Key: c.config.StripPrefix(obj.Key),
Size: obj.Size,
LastModified: lastModified,
})
}
if !result.IsTruncated {
return all, nil
@ -148,27 +155,11 @@ func (c *Client) listObjectsV2(ctx context.Context, continuationToken string, ma
if err != nil {
return nil, err
}
var result listObjectsV2Response
var result listObjectsV2Result
if err := xml.Unmarshal(respBody, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal list object response: %w", err)
}
objects := make([]*Object, len(result.Contents))
for i, obj := range result.Contents {
var lastModified time.Time
if obj.LastModified != "" {
lastModified, _ = time.Parse(time.RFC3339, obj.LastModified)
}
objects[i] = &Object{
Key: obj.Key,
Size: obj.Size,
LastModified: lastModified,
}
}
return &listObjectsV2Result{
Objects: objects,
IsTruncated: result.IsTruncated,
NextContinuationToken: result.NextContinuationToken,
}, nil
return &result, nil
}
// DeleteObjects removes multiple objects in a single batch request. Keys are automatically
@ -222,6 +213,7 @@ func (c *Client) DeleteObjects(ctx context.Context, keys []string) error {
// If body is nil, the request is sent with an empty payload. If body is non-nil, it is sent
// with a computed SHA-256 payload hash and Content-Type: application/xml.
func (c *Client) do(ctx context.Context, op, method, reqURL string, body []byte, headers map[string]string) ([]byte, error) {
log.Tag(tagS3Client).Trace("Performing request %s %s %s (body: %d bytes)", op, method, reqURL, len(body))
var reader io.Reader
var hash string
if body != nil {

View file

@ -26,7 +26,6 @@ func (c *Client) AbortIncompleteUploads(ctx context.Context, cutoff time.Time) e
}
for _, u := range uploads {
if !u.Initiated.IsZero() && u.Initiated.Before(cutoff) {
log.Tag(tagS3Client).Debug("DeleteIncomplete key=%s uploadId=%s initiated=%s", u.Key, u.UploadID, u.Initiated)
c.abortMultipartUpload(ctx, u.Key, u.UploadID)
}
}
@ -47,13 +46,13 @@ func (c *Client) listMultipartUploads(ctx context.Context) ([]*multipartUpload,
query.Set("key-marker", keyMarker)
query.Set("upload-id-marker", uploadIDMarker)
}
respBody, err := c.do(ctx, "listMultipartUploads", http.MethodGet, c.config.BucketURL()+"?"+query.Encode(), nil, nil)
respBody, err := c.do(ctx, "ListMultipartUploads", http.MethodGet, c.config.BucketURL()+"?"+query.Encode(), nil, nil)
if err != nil {
return nil, err
}
var result listMultipartUploadsResult
if err := xml.Unmarshal(respBody, &result); err != nil {
return nil, fmt.Errorf("s3: listMultipartUploads XML: %w", err)
return nil, fmt.Errorf("error unmarshalling multipart upload result: %w", err)
}
for _, u := range result.Uploads {
var initiated time.Time
@ -75,6 +74,22 @@ func (c *Client) listMultipartUploads(ctx context.Context) ([]*multipartUpload,
return nil, fmt.Errorf("s3: listMultipartUploads exceeded %d pages", maxPages)
}
// abortMultipartUpload cancels an in-progress multipart upload. Called on error to clean up.
func (c *Client) abortMultipartUpload(ctx context.Context, key, uploadID string) {
log.Tag(tagS3Client).Info("Aborting multipart upload for object %s", key)
reqURL := fmt.Sprintf("%s?uploadId=%s", c.config.ObjectURL(key), url.QueryEscape(uploadID))
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, reqURL, nil)
if err != nil {
return
}
c.signV4(req, emptyPayloadHash)
resp, err := c.http.Do(req)
if err != nil {
return
}
resp.Body.Close()
}
// putObjectMultipart uploads body using S3 multipart upload. It reads the body in partSize
// chunks, uploading each as a separate part. This allows uploading without knowing the total
// body size in advance.
@ -88,9 +103,9 @@ func (c *Client) putObjectMultipart(ctx context.Context, key string, body io.Rea
}
// Step 2: Upload parts
var parts []completedPart
buf := make([]byte, partSize)
partNumber := 1
buf := make([]byte, partSize)
var parts []*completedPart
for {
n, err := io.ReadFull(body, buf)
if n > 0 {
@ -99,7 +114,10 @@ func (c *Client) putObjectMultipart(ctx context.Context, key string, body io.Rea
c.abortMultipartUpload(ctx, key, uploadID)
return uploadErr
}
parts = append(parts, completedPart{PartNumber: partNumber, ETag: etag})
parts = append(parts, &completedPart{
PartNumber: partNumber,
ETag: etag,
})
partNumber++
}
if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) {
@ -123,38 +141,36 @@ func (c *Client) initiateMultipartUpload(ctx context.Context, key string) (strin
}
var result initiateMultipartUploadResult
if err := xml.Unmarshal(respBody, &result); err != nil {
return "", fmt.Errorf("s3: InitiateMultipartUpload XML: %w", err)
return "", fmt.Errorf("error unmarshalling initiate multipart upload response: %w", err)
}
log.Tag(tagS3Client).Debug("InitiateMultipartUpload key=%s uploadId=%s", key, result.UploadID)
return result.UploadID, nil
}
// uploadPart uploads a single part of a multipart upload and returns the ETag.
func (c *Client) uploadPart(ctx context.Context, key, uploadID string, partNumber int, data []byte) (string, error) {
log.Tag(tagS3Client).Debug("UploadPart key=%s part=%d size=%d", key, partNumber, len(data))
log.Tag(tagS3Client).Debug("Uploading multipart part for object %s, part %d, size %d", key, partNumber, len(data))
reqURL := fmt.Sprintf("%s?partNumber=%d&uploadId=%s", c.config.ObjectURL(key), partNumber, url.QueryEscape(uploadID))
req, err := http.NewRequestWithContext(ctx, http.MethodPut, reqURL, bytes.NewReader(data))
if err != nil {
return "", fmt.Errorf("s3: UploadPart request: %w", err)
return "", fmt.Errorf("error creating multipart upload part request for object %s: %w", key, err)
}
req.ContentLength = int64(len(data))
c.signV4(req, unsignedPayload)
resp, err := c.http.Do(req)
if err != nil {
return "", fmt.Errorf("s3: UploadPart: %w", err)
return "", fmt.Errorf("error uploading multipart part for object %s: %w", key, err)
}
defer resp.Body.Close()
if !isHTTPSuccess(resp) {
return "", parseError(resp)
}
etag := resp.Header.Get("ETag")
return etag, nil
return resp.Header.Get("ETag"), nil
}
// completeMultipartUpload finalizes a multipart upload with the given parts.
func (c *Client) completeMultipartUpload(ctx context.Context, key, uploadID string, parts []completedPart) error {
log.Tag(tagS3Client).Debug("CompleteMultipartUpload key=%s uploadId=%s parts=%d", key, uploadID, len(parts))
bodyBytes, err := xml.Marshal(completeMultipartUploadRequest{Parts: parts})
func (c *Client) completeMultipartUpload(ctx context.Context, key, uploadID string, parts []*completedPart) error {
log.Tag(tagS3Client).Debug("Completing multipart upload for object %s, %d parts", key, len(parts))
bodyBytes, err := xml.Marshal(&completeMultipartUploadRequest{Parts: parts})
if err != nil {
return fmt.Errorf("s3: CompleteMultipartUpload marshal: %w", err)
}
@ -170,19 +186,3 @@ func (c *Client) completeMultipartUpload(ctx context.Context, key, uploadID stri
}
return nil
}
// abortMultipartUpload cancels an in-progress multipart upload. Called on error to clean up.
func (c *Client) abortMultipartUpload(ctx context.Context, key, uploadID string) {
log.Tag(tagS3Client).Debug("AbortMultipartUpload key=%s uploadId=%s", key, uploadID)
reqURL := fmt.Sprintf("%s?uploadId=%s", c.config.ObjectURL(key), url.QueryEscape(uploadID))
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, reqURL, nil)
if err != nil {
return
}
c.signV4(req, emptyPayloadHash)
resp, err := c.http.Do(req)
if err != nil {
return
}
resp.Body.Close()
}

View file

@ -514,13 +514,13 @@ func TestClient_ListObjects(t *testing.T) {
// List with prefix client: should only see 3
result, err := client.listObjectsV2(ctx, "", 0)
require.Nil(t, err)
require.Len(t, result.Objects, 3)
require.Len(t, result.Contents, 3)
require.False(t, result.IsTruncated)
// List with no-prefix client: should see all 4
result, err = clientNoPrefix.listObjectsV2(ctx, "", 0)
require.Nil(t, err)
require.Len(t, result.Objects, 4)
require.Len(t, result.Contents, 4)
}
func TestClient_ListObjects_Pagination(t *testing.T) {
@ -539,20 +539,20 @@ func TestClient_ListObjects_Pagination(t *testing.T) {
// List with max-keys=2
result, err := client.listObjectsV2(ctx, "", 2)
require.Nil(t, err)
require.Len(t, result.Objects, 2)
require.Len(t, result.Contents, 2)
require.True(t, result.IsTruncated)
require.NotEmpty(t, result.NextContinuationToken)
// Get next page
result2, err := client.listObjectsV2(ctx, result.NextContinuationToken, 2)
require.Nil(t, err)
require.Len(t, result2.Objects, 2)
require.Len(t, result2.Contents, 2)
require.True(t, result2.IsTruncated)
// Get last page
result3, err := client.listObjectsV2(ctx, result2.NextContinuationToken, 2)
require.Nil(t, err)
require.Len(t, result3.Objects, 1)
require.Len(t, result3.Contents, 1)
require.False(t, result3.IsTruncated)
}

View file

@ -91,20 +91,13 @@ func (e *ErrorResponse) Error() string {
return fmt.Sprintf("s3: HTTP %d: %s", e.StatusCode, e.Body)
}
// listObjectsV2Response is the XML response from S3 ListObjectsV2
type listObjectsV2Response struct {
// listObjectsV2Result is the XML response from S3 ListObjectsV2
type listObjectsV2Result struct {
Contents []*listObject `xml:"Contents"`
IsTruncated bool `xml:"IsTruncated"`
NextContinuationToken string `xml:"NextContinuationToken"`
}
// listObjectsV2Result holds the response from a single ListObjectsV2 page.
type listObjectsV2Result struct {
Objects []*Object
IsTruncated bool
NextContinuationToken string
}
type listObject struct {
Key string `xml:"Key"`
Size int64 `xml:"Size"`
@ -124,7 +117,7 @@ type deleteObject struct {
// deleteObjectsResult is the XML response from S3 DeleteObjects
type deleteObjectsResult struct {
Errors []deleteError `xml:"Error"`
Errors []*deleteError `xml:"Error"`
}
type deleteError struct {
@ -133,13 +126,6 @@ type deleteError struct {
Message string `xml:"Message"`
}
// multipartUpload represents an in-progress multipart upload returned by listMultipartUploads.
type multipartUpload struct {
Key string
UploadID string
Initiated time.Time
}
// listMultipartUploadsResult is the XML response from S3 listMultipartUploads
type listMultipartUploadsResult struct {
Uploads []*listUpload `xml:"Upload"`
@ -154,6 +140,13 @@ type listUpload struct {
Initiated string `xml:"Initiated"`
}
// multipartUpload represents an in-progress multipart upload returned by listMultipartUploads.
type multipartUpload struct {
Key string
UploadID string
Initiated time.Time
}
// initiateMultipartUploadResult is the XML response from S3 InitiateMultipartUpload
type initiateMultipartUploadResult struct {
UploadID string `xml:"UploadId"`
@ -161,8 +154,8 @@ type initiateMultipartUploadResult struct {
// completeMultipartUploadRequest is the XML request body for S3 CompleteMultipartUpload
type completeMultipartUploadRequest struct {
XMLName xml.Name `xml:"CompleteMultipartUpload"`
Parts []completedPart `xml:"Part"`
XMLName xml.Name `xml:"CompleteMultipartUpload"`
Parts []*completedPart `xml:"Part"`
}
// completedPart represents a successfully uploaded part for CompleteMultipartUpload