Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [ENHANCEMENT] Ingester: Add fetch timeout for Ingester expanded postings cache. #7185
* [ENHANCEMENT] Ingester: Add feature flag to collect metrics of how expensive an unoptimized regex matcher is and new limits to protect Ingester query path against expensive unoptimized regex matchers. #7194 #7210
* [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217
* [ENHANCEMENT] Compactor: Add concurrency for partition cleanup and mark block for deletion #7246
* [BUGFIX] Distributor: If remote write v2 is disabled, explicitly return HTTP 415 (Unsupported Media Type) for Remote Write V2 requests instead of attempting to parse them as V1. #7238
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
Expand Down
24 changes: 17 additions & 7 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ import (
)

const (
defaultDeleteBlocksConcurrency = 16
reasonValueRetention = "retention"
activeStatus = "active"
deletedStatus = "deleted"
defaultDeleteBlocksConcurrency = 16
defaultDeletePartitionedGroupInfoConcurrency = 5
reasonValueRetention = "retention"
activeStatus = "active"
deletedStatus = "deleted"
)

type BlocksCleanerConfig struct {
Expand Down Expand Up @@ -785,7 +786,15 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
level.Warn(userLogger).Log("msg", "error return when going through partitioned group directory", "err", err)
}

for partitionedGroupInfo, extraInfo := range existentPartitionedGroupInfo {
partitionsToClean := make([]any, 0, len(existentPartitionedGroupInfo))
for partitionedGroupInfo := range existentPartitionedGroupInfo {
partitionsToClean = append(partitionsToClean, partitionedGroupInfo)
}

_ = concurrency.ForEach(ctx, partitionsToClean, defaultDeletePartitionedGroupInfoConcurrency, func(ctx context.Context, partitionToClean any) error {
partitionedGroupInfo := partitionToClean.(*PartitionedGroupInfo)
extraInfo := existentPartitionedGroupInfo[partitionedGroupInfo]

partitionedGroupInfoFile := extraInfo.path
deletedBlocksCount := 0
partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString())
Expand All @@ -799,7 +808,7 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
// if one block can not be marked for deletion, we should
// skip delete this partitioned group. next iteration
// would try it again.
continue
return nil
}
}

Expand Down Expand Up @@ -832,7 +841,8 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
}
}
}
}
return nil
})
}

func (c *BlocksCleaner) emitUserPartitionMetrics(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) {
Expand Down
23 changes: 17 additions & 6 deletions pkg/compactor/partitioned_group_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/util/concurrency"
"github.com/cortexproject/cortex/pkg/util/runutil"
)

Expand Down Expand Up @@ -238,22 +240,31 @@ func (p *PartitionedGroupInfo) isBlockNoCompact(ctx context.Context, userBucket

func (p *PartitionedGroupInfo) markAllBlocksForDeletion(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blocksMarkedForDeletion *prometheus.CounterVec, userID string) (int, error) {
blocks := p.getAllBlocks()
deleteBlocksCount := 0
var deleteBlocksCount atomic.Int64
partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", p.PartitionedGroupID, "partitioned_group_creation_time", p.CreationTimeString())
defer func() {
level.Info(partitionedGroupLogger).Log("msg", "total number of blocks marked for deletion during partitioned group info clean up", "count", deleteBlocksCount)
level.Info(partitionedGroupLogger).Log("msg", "total number of blocks marked for deletion during partitioned group info clean up", "count", deleteBlocksCount.Load())
}()

blocksForDeletion := make([]any, 0, len(blocks))
for _, blockID := range blocks {
blocksForDeletion = append(blocksForDeletion, blockID)
}

err := concurrency.ForEach(ctx, blocksForDeletion, defaultDeleteBlocksConcurrency, func(ctx context.Context, blockForDeletion any) error {
blockID := blockForDeletion.(ulid.ULID)
if p.doesBlockExist(ctx, userBucket, partitionedGroupLogger, blockID) && !p.isBlockDeleted(ctx, userBucket, partitionedGroupLogger, blockID) && !p.isBlockNoCompact(ctx, userBucket, partitionedGroupLogger, blockID) {
if err := block.MarkForDeletion(ctx, partitionedGroupLogger, userBucket, blockID, "delete block during partitioned group completion check", blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention)); err != nil {
level.Warn(partitionedGroupLogger).Log("msg", "unable to mark block for deletion", "block", blockID.String())
return deleteBlocksCount, err
return err
}
deleteBlocksCount++
deleteBlocksCount.Add(1)
level.Debug(partitionedGroupLogger).Log("msg", "marked block for deletion during partitioned group info clean up", "block", blockID.String())
}
}
return deleteBlocksCount, nil
return nil
})

return int(deleteBlocksCount.Load()), err
}

func (p *PartitionedGroupInfo) String() string {
Expand Down
Loading