diff --git a/CHANGELOG.md b/CHANGELOG.md index d19f007b63..de6eac6380 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ * [ENHANCEMENT] Ingester: Add fetch timeout for Ingester expanded postings cache. #7185 * [ENHANCEMENT] Ingester: Add feature flag to collect metrics of how expensive an unoptimized regex matcher is and new limits to protect Ingester query path against expensive unoptimized regex matchers. #7194 #7210 * [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217 +* [ENHANCEMENT] Compactor: Add concurrency for partition cleanup and mark block for deletion #7246 * [BUGFIX] Distributor: If remote write v2 is disabled, explicitly return HTTP 415 (Unsupported Media Type) for Remote Write V2 requests instead of attempting to parse them as V1. #7238 * [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088 * [BUGFIX] Ruler: Add XFunctions validation support. #7111 diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 563febc11b..88bc3d3bad 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -29,10 +29,11 @@ import ( ) const ( - defaultDeleteBlocksConcurrency = 16 - reasonValueRetention = "retention" - activeStatus = "active" - deletedStatus = "deleted" + defaultDeleteBlocksConcurrency = 16 + defaultDeletePartitionedGroupInfoConcurrency = 5 + reasonValueRetention = "retention" + activeStatus = "active" + deletedStatus = "deleted" ) type BlocksCleanerConfig struct { @@ -785,7 +786,15 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke level.Warn(userLogger).Log("msg", "error return when going through partitioned group directory", "err", err) } - for partitionedGroupInfo, extraInfo := range existentPartitionedGroupInfo { + partitionsToClean := make([]any, 0, len(existentPartitionedGroupInfo)) + for partitionedGroupInfo := range existentPartitionedGroupInfo { + partitionsToClean = append(partitionsToClean, partitionedGroupInfo) + } + + _ = concurrency.ForEach(ctx, partitionsToClean, defaultDeletePartitionedGroupInfoConcurrency, func(ctx context.Context, partitionToClean any) error { + partitionedGroupInfo := partitionToClean.(*PartitionedGroupInfo) + extraInfo := existentPartitionedGroupInfo[partitionedGroupInfo] + partitionedGroupInfoFile := extraInfo.path deletedBlocksCount := 0 partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString()) @@ -799,7 +808,7 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke // if one block can not be marked for deletion, we should // skip delete this partitioned group. next iteration // would try it again. - continue + return nil } } @@ -832,7 +841,8 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke } } } - } + return nil + }) } func (c *BlocksCleaner) emitUserPartitionMetrics(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) { diff --git a/pkg/compactor/partitioned_group_info.go b/pkg/compactor/partitioned_group_info.go index cdc66d1027..e3d90a4e30 100644 --- a/pkg/compactor/partitioned_group_info.go +++ b/pkg/compactor/partitioned_group_info.go @@ -18,7 +18,9 @@ import ( "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" + "go.uber.org/atomic" + "github.com/cortexproject/cortex/pkg/util/concurrency" "github.com/cortexproject/cortex/pkg/util/runutil" ) @@ -238,22 +240,31 @@ func (p *PartitionedGroupInfo) isBlockNoCompact(ctx context.Context, userBucket func (p *PartitionedGroupInfo) markAllBlocksForDeletion(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blocksMarkedForDeletion *prometheus.CounterVec, userID string) (int, error) { blocks := p.getAllBlocks() - deleteBlocksCount := 0 + var deleteBlocksCount atomic.Int64 partitionedGroupLogger := log.With(userLogger, "partitioned_group_id", p.PartitionedGroupID, "partitioned_group_creation_time", p.CreationTimeString()) defer func() { - level.Info(partitionedGroupLogger).Log("msg", "total number of blocks marked for deletion during partitioned group info clean up", "count", deleteBlocksCount) + level.Info(partitionedGroupLogger).Log("msg", "total number of blocks marked for deletion during partitioned group info clean up", "count", deleteBlocksCount.Load()) }() + + blocksForDeletion := make([]any, 0, len(blocks)) for _, blockID := range blocks { + blocksForDeletion = append(blocksForDeletion, blockID) + } + + err := concurrency.ForEach(ctx, blocksForDeletion, defaultDeleteBlocksConcurrency, func(ctx context.Context, blockForDeletion any) error { + blockID := blockForDeletion.(ulid.ULID) if p.doesBlockExist(ctx, userBucket, partitionedGroupLogger, blockID) && !p.isBlockDeleted(ctx, userBucket, partitionedGroupLogger, blockID) && !p.isBlockNoCompact(ctx, userBucket, partitionedGroupLogger, blockID) { if err := block.MarkForDeletion(ctx, partitionedGroupLogger, userBucket, blockID, "delete block during partitioned group completion check", blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention)); err != nil { level.Warn(partitionedGroupLogger).Log("msg", "unable to mark block for deletion", "block", blockID.String()) - return deleteBlocksCount, err + return err } - deleteBlocksCount++ + deleteBlocksCount.Add(1) level.Debug(partitionedGroupLogger).Log("msg", "marked block for deletion during partitioned group info clean up", "block", blockID.String()) } - } - return deleteBlocksCount, nil + return nil + }) + + return int(deleteBlocksCount.Load()), err } func (p *PartitionedGroupInfo) String() string {