diff --git a/cache/cache.go b/cache/cache.go index 451c38e4..ed8b221c 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -126,6 +126,20 @@ func (c *Cache[V]) Cleanup() uint64 { return totalFreed } +func (c *Cache[V]) Evict(key uint32) { + c.mu.Lock() + defer c.mu.Unlock() + + e, ok := c.payload[key] + if !ok { + // no key in cache + return + } + + delete(c.payload, key) + e.deleted = true +} + // Recreates the payload map. If len is too small, fraction is probably out of date and useless func (c *Cache[V]) recreatePayload() { if c.maxPayloadSize < recreateThreshold { // not large enough diff --git a/cmd/seq-db/seq-db.go b/cmd/seq-db/seq-db.go index 27712ee1..b0a7c47e 100644 --- a/cmd/seq-db/seq-db.go +++ b/cmd/seq-db/seq-db.go @@ -33,6 +33,8 @@ import ( "github.com/ozontech/seq-db/proxy/search" "github.com/ozontech/seq-db/proxy/stores" "github.com/ozontech/seq-db/proxyapi" + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/skipmaskmanager" "github.com/ozontech/seq-db/storage/s3" "github.com/ozontech/seq-db/storeapi" "github.com/ozontech/seq-db/tracing" @@ -316,10 +318,15 @@ func startStore( From: cfg.Filtering.From, }, }, + SkipMaskManagerConfig: skipmaskmanager.Config{ + DataDir: cfg.SkipMaskManager.DataDir, + Workers: cfg.SkipMaskManager.Workers, + CacheSizeLimit: uint64(cfg.SkipMaskManager.CacheSize), + }, } s3cli := initS3Client(cfg) - store, err := storeapi.NewStore(ctx, sconfig, s3cli, mp) + store, err := storeapi.NewStore(ctx, sconfig, s3cli, mp, skipMaskParamsFromCfg(cfg.SkipMaskManager.SkipMasks)) if err != nil { logger.Fatal("initializing store", zap.Error(err)) } @@ -361,3 +368,15 @@ func initS3Client(cfg config.Config) *s3.Client { func enableIndexingForAllFields(mappingPath string) bool { return mappingPath == "auto" } + +func skipMaskParamsFromCfg(in []config.SkipMaskParams) []skipmaskmanager.SkipMaskParams { + out := make([]skipmaskmanager.SkipMaskParams, 0, len(in)) + for _, f := range in { + out = append(out, skipmaskmanager.SkipMaskParams{ + Query: f.Query, + From: seq.TimeToMID(f.From), + To: seq.TimeToMID(f.To), + }) + } + return out +} diff --git a/config/config.go b/config/config.go index 00df0d94..28a67dc0 100644 --- a/config/config.go +++ b/config/config.go @@ -277,13 +277,14 @@ type Config struct { } `config:"tracing"` // Additional filtering options - Filtering struct { - // If a search query time range overlaps with the [from; to] range - // the search query will be `AND`-ed with an additional predicate with the provided query expression - Query string `config:"query"` - From time.Time `config:"from"` - To time.Time `config:"to"` - } `config:"filtering"` + Filtering SkipMaskParams `config:"filtering"` + + SkipMaskManager struct { + DataDir string `config:"data_dir"` + Workers int `config:"workers" default:"1"` + SkipMasks []SkipMaskParams `config:"skip_masks"` + CacheSize Bytes `config:"cache_size" default:"100MiB"` + } `config:"skip_mask_manager"` // Experimental provides flags // For configuring experimental features. @@ -295,6 +296,12 @@ type Config struct { } `config:"experimental"` } +type SkipMaskParams struct { + Query string `config:"query"` + From time.Time `config:"from"` + To time.Time `config:"to"` +} + type Bytes units.Base2Bytes func (b *Bytes) UnmarshalString(s string) error { diff --git a/filtermanager/filter_manager.go b/filtermanager/filter_manager.go deleted file mode 100644 index 30805d17..00000000 --- a/filtermanager/filter_manager.go +++ /dev/null @@ -1,493 +0,0 @@ -package filtermanager - -import ( - "context" - "fmt" - "math" - "os" - "path" - "runtime" - "strings" - "sync" - "time" - - "go.uber.org/zap" - - "github.com/ozontech/seq-db/cache" - "github.com/ozontech/seq-db/frac" - "github.com/ozontech/seq-db/frac/processor" - "github.com/ozontech/seq-db/fracmanager" - "github.com/ozontech/seq-db/logger" - "github.com/ozontech/seq-db/node" - "github.com/ozontech/seq-db/parser" - "github.com/ozontech/seq-db/seq" - "github.com/ozontech/seq-db/util" -) - -const ( - fracInQueueExt = ".queue" - fracDoneExt = ".filter" - tmpExt = ".tmp" - - tmpDirSuffix = "_tmp" -) - -const ( - defaultMaintenanceInterval = 30 * time.Second - defaultCacheCleanInterval = 10 * time.Millisecond - defaultCacheGCDelay = 1 * time.Second -) - -type MappingProvider interface { - GetMapping() seq.Mapping -} - -type Config struct { - DataDir string - Workers int - CacheSizeLimit uint64 -} - -type FilterManager struct { - ctx context.Context - - config Config - filters map[string]*Filter - - fracs map[string][]string - fracsMu *sync.RWMutex - - mp MappingProvider - - rateLimit chan struct{} - - maintenanceWG *sync.WaitGroup - maintenanceInterval time.Duration - maintenanceStop context.CancelFunc - - cacheCleanInterval time.Duration - cacheGCDelay time.Duration - - headersCache *cache.Cache[[]lidsBlockHeader] - headersCacheCleaner *cache.Cleaner -} - -func New( - ctx context.Context, - cfg Config, - params []Params, - mp MappingProvider, -) *FilterManager { - workers := cfg.Workers - if workers <= 0 { - workers = runtime.GOMAXPROCS(0) - } - - filtersMap := make(map[string]*Filter, len(params)) - - for _, p := range params { - f := NewFilter(p) - filtersMap[f.Hash()] = f - } - - cacheCleaner := cache.NewCleaner(cfg.CacheSizeLimit, nil) - - return &FilterManager{ - ctx: ctx, - config: cfg, - filters: filtersMap, - fracs: make(map[string][]string), - fracsMu: &sync.RWMutex{}, - mp: mp, - rateLimit: make(chan struct{}, workers), - maintenanceInterval: defaultMaintenanceInterval, - cacheCleanInterval: defaultCacheCleanInterval, - cacheGCDelay: defaultCacheGCDelay, - headersCache: cache.NewCache[[]lidsBlockHeader](cacheCleaner, nil), - headersCacheCleaner: cacheCleaner, - } -} - -func (fm *FilterManager) Start(ctx context.Context, fracs fracmanager.List) { - fm.createDataDir() - - err := fm.loadFilters() - if err != nil { - logger.Fatal("failed to load previous docs filters", zap.Error(err)) - } - - err = fm.buildQueue(fracs) - if err != nil { - logger.Fatal("failed to build docs filters queue", zap.Error(err)) - } - - ctx, cancel := context.WithCancel(ctx) - fm.maintenanceStop = cancel - fm.startMaintenance(ctx) - - go fm.cacheCleanLoop() - - mapping := fm.mp.GetMapping() - - go func() { - for _, f := range fm.filters { - ast, err := parser.ParseSeqQL(f.params.Query, mapping) - if err != nil { - panic(fmt.Errorf("BUG: search query must be valid: %s", err)) - } - f.ast = ast - - fm.processFilter(ctx, f, fracs.FilterInRange(f.params.From, f.params.To)) - } - }() -} - -func (fm *FilterManager) Stop() { - fm.maintenanceStop() - fm.maintenanceWG.Wait() -} - -func (fm *FilterManager) GetHideFlagIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, error) { - fm.fracsMu.RLock() - defer fm.fracsMu.RUnlock() - - fracFiles, has := fm.fracs[fracName] - if !has { - return &EmptyIterator{}, nil - } - - iterators := make([]node.Node, 0, len(fracFiles)) - for _, f := range fracFiles { - loader, err := newLoader(f, fm.headersCache) - if err != nil { - logger.Error("can't open filtered lids file", zap.String("path", f), zap.Error(err)) - return nil, err - } - if reverse { - iterators = append(iterators, (*IteratorAsc)(NewIterator(loader, minLID, maxLID))) - } else { - iterators = append(iterators, (*IteratorDesc)(NewIterator(loader, minLID, maxLID))) - } - } - - return NewNMergedIterators(iterators), nil -} - -// RefreshFrac replaces frac's filter files with newly found results. Used after active frac is sealed. -func (fm *FilterManager) RefreshFrac(fraction frac.Fraction) { - fm.fracsMu.RLock() - fracsFiles, has := fm.fracs[fraction.Info().Name()] - fm.fracsMu.RUnlock() - - if !has { - return - } - - for _, fileName := range fracsFiles { - filter := fm.filters[filterNameFromPath(fileName)] - - queueFilePath := path.Join(filter.dirPath, makeFileName(fraction.Info().Name(), fracInQueueExt)) - util.MustWriteFileAtomic(queueFilePath, []byte{}, 0o666, tmpExt) - - fm.rateLimit <- struct{}{} - go func() { - defer func() { <-fm.rateLimit }() - if err := fm.processFrac(fraction, filter, false); err != nil { - panic(fmt.Errorf("docs filter refresh frac err: %s", err)) - } - }() - } -} - -// RemoveFrac removes fraction's filter files. Used after frac is deleted -func (fm *FilterManager) RemoveFrac(fracName string) { - fm.fracsMu.RLock() - fracsFiles, has := fm.fracs[fracName] - fm.fracsMu.RUnlock() - - if !has { - return - } - - fm.fracsMu.Lock() - delete(fm.fracs, fracName) - fm.fracsMu.Unlock() - - for _, fileName := range fracsFiles { - util.RemoveFile(fileName) - } -} - -func filterNameFromPath(p string) string { - return path.Base(path.Dir(p)) -} - -func (fm *FilterManager) addDoneFrac(fracName, fracPath string) { - fm.fracsMu.Lock() - defer fm.fracsMu.Unlock() - - fm.fracs[fracName] = append(fm.fracs[fracName], fracPath) -} - -// loadFilters loads existing filters -func (fm *FilterManager) loadFilters() error { - des, err := os.ReadDir(fm.config.DataDir) - if err != nil { - return err - } - - var anyRemove bool - - for _, de := range des { - if !de.IsDir() { - continue - } - - if _, ok := fm.filters[de.Name()]; !ok { - logger.Info("there is filter folder on disk, but not in config. need to delete it.") - err := os.RemoveAll(path.Join(fm.config.DataDir, de.Name())) - if err != nil && !os.IsNotExist(err) { - return err - } - anyRemove = true - continue - } - - f := fm.filters[de.Name()] - f.status = StatusInProgress - f.dirPath = path.Join(fm.config.DataDir, de.Name()) - - filterDes, err := os.ReadDir(f.dirPath) - if err != nil { - return fmt.Errorf("reading directory: %s", err) - } - - var hasFracsInQueue bool - - for _, fde := range filterDes { - if fde.IsDir() { - continue - } - name := fde.Name() - - switch path.Ext(name) { - case fracInQueueExt: - hasFracsInQueue = true - case fracDoneExt: - fm.addDoneFrac(fracNameFromFilePath(name), path.Join(f.dirPath, name)) - } - } - - if !hasFracsInQueue { - f.status = StatusDone - } - } - - if anyRemove { - util.MustFsyncFile(fm.config.DataDir) - } - - return nil -} - -// buildQueue creates a directory for each of unprocessed filters and creates .queue files -func (fm *FilterManager) buildQueue(fracs fracmanager.List) error { - for _, filter := range fm.filters { - if filter.status != StatusCreated { - continue - } - - // create tmp dir - tmpDir := path.Join(fm.config.DataDir, fmt.Sprintf("%s%s", filter.Hash(), tmpDirSuffix)) - util.MustCreateDir(tmpDir) - - filterFracs := fracs.FilterInRange(seq.MID(filter.params.From), seq.MID(filter.params.To)) - for _, f := range filterFracs { - queueFilePath := path.Join(tmpDir, makeFileName(f.Info().Name(), fracInQueueExt)) - util.MustWriteFileAtomic(queueFilePath, []byte{}, 0o666, tmpExt) - } - - // rename tmp dir - dir := path.Join(fm.config.DataDir, filter.Hash()) - if err := os.Rename(tmpDir, dir); err != nil { - return err - } - util.MustFsyncFile(fm.config.DataDir) - filter.dirPath = dir - } - - return nil -} - -// handleFilter finds docs and writes to fs -func (fm *FilterManager) processFilter(ctx context.Context, filter *Filter, fracs fracmanager.List) { - if len(fracs) == 0 { - return - } - - fracsByName := make(map[string]frac.Fraction) - for _, f := range fracs { - fracsByName[f.Info().Name()] = f - } - - filterDes, err := os.ReadDir(filter.dirPath) - if err != nil { - panic(fmt.Errorf("BUG: reading directory must be successful: %s", err)) - } - - inProgressFilters.Add(1) - - processFracInQueue := func(name string) error { - f, ok := fracsByName[fracNameFromFilePath(name)] - if !ok { // skip missing fracs - return nil - } - - select { - case <-ctx.Done(): - return nil - case fm.rateLimit <- struct{}{}: - filter.processWg.Go(func() { - defer func() { <-fm.rateLimit }() - if err := fm.processFrac(f, filter, false); err != nil { - panic(fmt.Errorf("docs filter process frac err: %s", err)) - } - }) - } - return nil - } - _ = util.VisitFilesWithExt(filterDes, fracInQueueExt, processFracInQueue) - - go func() { - filter.processWg.Wait() - filter.markAsDone() - inProgressFilters.Add(-1) - }() -} - -func (fm *FilterManager) processFrac(f frac.Fraction, filter *Filter, refresh bool) error { - qpr, err := f.Search(fm.ctx, processor.SearchParams{ - AST: filter.ast.Root, - From: seq.MID(filter.params.From), - To: seq.MID(filter.params.To), - Limit: math.MaxInt64, - }) - if err != nil { - return err - } - - queueFilePath := path.Join(filter.dirPath, makeFileName(f.Info().Name(), fracInQueueExt)) - doneFilePath := path.Join(filter.dirPath, makeFileName(f.Info().Name(), fracDoneExt)) - - if len(qpr.IDs) == 0 { - util.RemoveFile(queueFilePath) - return nil - } - - // TODO: here we doing part of the work twice: - // first time we find LIDs inside f.Search() and then find IDs by these LIDs. - // Then we again find LIDs by earlier found IDs in f.FindLIDs(). - // We did it like this because otherwise we had to do serious f.Search() rewrite. - // For now we're ok with some performance penalty. - lids, err := f.FindLIDs(fm.ctx, qpr.IDs.IDs()) - if err != nil { - return err - } - - docsFilterBin := DocsFilterBinIn{LIDs: lids} - if err := writeDocsFilter(&docsFilterBin, queueFilePath, doneFilePath); err != nil { - return err - } - - if !refresh { - fm.addDoneFrac(f.Info().Name(), doneFilePath) - } - - return nil -} - -func (fm *FilterManager) startMaintenance(ctx context.Context) { - fm.maintenanceWG.Go(func() { - logger.Info("start docs filter maintenance") - util.RunEvery(ctx.Done(), fm.maintenanceInterval, func() { - logger.Info("docs filter maintenance iteration") - fm.checkDiskUsage() - }) - }) -} - -func (fm *FilterManager) cacheCleanLoop() { - runs := 0 - gcRunsCount := int(fm.cacheGCDelay / fm.cacheCleanInterval) - - for { - runs++ - fm.headersCacheCleaner.Cleanup(&cache.CleanStat{}) - fm.headersCacheCleaner.Rotate() - - if runs >= gcRunsCount { - runs = 0 - fm.headersCacheCleaner.CleanEmptyGenerations() - fm.headersCacheCleaner.ReleaseBuckets() - } - - time.Sleep(fm.cacheCleanInterval) - } -} - -func (fm *FilterManager) checkDiskUsage() { - du := int64(0) - - for _, f := range fm.filters { - des, err := os.ReadDir(f.dirPath) - if err != nil { - logger.Error("docs filter: can't read filter's dir", - zap.String("filter", f.String()), zap.Error(err)) - return - } - - for _, fde := range des { - if fde.IsDir() { - continue - } - info, err := fde.Info() - if err != nil { - logger.Error("docs filter: can't read filter file info", - zap.String("filter", f.String()), zap.Error(err)) - return - } - du += info.Size() - } - } - - diskUsage.Set(float64(du)) - storedFilters.Set(float64(len(fm.filters))) -} - -func makeFileName(name, ext string) string { - return name + ext -} - -func fracNameFromFilePath(filterFilePath string) string { - return strings.Split(path.Base(filterFilePath), ".")[0] -} - -var marshalBufferPool util.BufferPool - -func writeDocsFilter(df *DocsFilterBinIn, queueFilePath, doneFilePath string) error { - rawDocsFilter := marshalBufferPool.Get() - defer marshalBufferPool.Put(rawDocsFilter) - - rawDocsFilter.B = marshalDocsFilter(rawDocsFilter.B, df) - util.MustWriteFileAtomic(doneFilePath, rawDocsFilter.B, 0o666, tmpExt) - util.RemoveFile(queueFilePath) - - return nil -} - -// createDataDir creates data dir. -func (fm *FilterManager) createDataDir() { - if err := os.MkdirAll(fm.config.DataDir, 0o777); err != nil { - panic(err) - } -} diff --git a/frac/active.go b/frac/active.go index 3e890d01..7c3691c1 100644 --- a/frac/active.go +++ b/frac/active.go @@ -60,6 +60,8 @@ type Active struct { writer *ActiveWriter indexer *ActiveIndexer + + skipMaskProvider skipMaskProvider } const ( @@ -79,6 +81,7 @@ func NewActive( docsCache *cache.Cache[[]byte], sortCache *cache.Cache[[]byte], cfg *Config, + skipMaskProvider skipMaskProvider, ) *Active { docsFile, docsStats := mustOpenFile(baseFileName+consts.DocsFileSuffix, config.SkipFsync) @@ -108,6 +111,8 @@ func NewActive( BaseFileName: baseFileName, info: common.NewInfo(baseFileName, uint64(docsStats.Size()), metaSize), Config: cfg, + + skipMaskProvider: skipMaskProvider, } // use of 0 as keys in maps is prohibited – it's system key, so add first element @@ -412,6 +417,8 @@ func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider { docsPositions: f.DocsPositions, idsToLids: f.IDsToLIDs, docsReader: &f.docsReader, + + skipMaskProvider: f.skipMaskProvider, } } diff --git a/frac/active_index.go b/frac/active_index.go index 71831c26..27e4e464 100644 --- a/frac/active_index.go +++ b/frac/active_index.go @@ -2,6 +2,8 @@ package frac import ( "context" + "math" + "slices" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" @@ -29,6 +31,8 @@ type activeDataProvider struct { docsReader *storage.DocsReader idsIndex *activeIDsIndex + + skipMaskProvider skipMaskProvider } func (dp *activeDataProvider) release() { @@ -75,9 +79,12 @@ func (dp *activeDataProvider) Fetch(ids []seq.ID) ([][]byte, error) { res := make([][]byte, len(ids)) indexes := []activeFetchIndex{{ - blocksOffsets: dp.blocksOffsets, - docsPositions: dp.docsPositions, - docsReader: dp.docsReader, + blocksOffsets: dp.blocksOffsets, + docsPositions: dp.docsPositions, + idsToLids: dp.idsToLids, + docsReader: dp.docsReader, + skipMaskProvider: dp.skipMaskProvider, + fracName: dp.info.Name(), }} for _, fi := range indexes { @@ -117,6 +124,8 @@ func (dp *activeDataProvider) Search(params processor.SearchParams) (*seq.QPR, e indexes := []activeSearchIndex{{ activeIDsIndex: dp.getIDsIndex(), activeTokenIndex: dp.getTokenIndex(), + skipMaskProvider: dp.skipMaskProvider, + fracName: dp.info.Name(), }} m.Stop() @@ -178,6 +187,35 @@ func (p *activeIDsIndex) LessOrEqual(lid seq.LID, id seq.ID) bool { type activeSearchIndex struct { *activeIDsIndex *activeTokenIndex + skipMaskProvider skipMaskProvider + fracName string +} + +func (si *activeSearchIndex) GetSkipLIDs(minLID, maxLID uint32, reverse bool) (node.Node, bool, error) { + // active fraction doesn't meet min and max lid + minLID, maxLID = uint32(0), uint32(math.MaxUint32) + + iterator, has, err := si.skipMaskProvider.GetIDsIteratorByFrac(si.fracName, minLID, maxLID, reverse) + if err != nil { + return nil, false, err + } + + res := make([]uint32, 0) + for { + // traverse iterator to inverse and sort lids + lid := iterator.Next() + if lid.IsNull() { + break + } + if inversed, ok := si.activeIDsIndex.inverser.Inverse(lid.Unpack()); ok { + res = append(res, uint32(inversed)) + } + } + + // we need to sort inversed values since they may be out of order after replay of active fraction + slices.Sort(res) + + return node.NewStatic(res, reverse), has, nil } type activeTokenIndex struct { @@ -221,21 +259,57 @@ func inverseLIDs(unmapped []uint32, inv *inverser, minLID, maxLID uint32) []uint } type activeFetchIndex struct { - blocksOffsets []uint64 - docsPositions *DocsPositions - docsReader *storage.DocsReader + blocksOffsets []uint64 + docsPositions *DocsPositions + idsToLids *ActiveLIDs + docsReader *storage.DocsReader + skipMaskProvider skipMaskProvider + fracName string } func (di *activeFetchIndex) GetBlocksOffsets(num uint32) uint64 { return di.blocksOffsets[num] } -func (di *activeFetchIndex) GetDocPos(ids []seq.ID) []seq.DocPos { +func (di *activeFetchIndex) GetDocPos(ids []seq.ID) ([]seq.DocPos, error) { docsPos := make([]seq.DocPos, len(ids)) for i, id := range ids { docsPos[i] = di.docsPositions.GetSync(id) } - return docsPos + + minLID, maxLID := uint32(0), uint32(math.MaxUint32) + skipLIDsIterator, has, err := di.skipMaskProvider.GetIDsIteratorByFrac(di.fracName, minLID, maxLID, false) + if err != nil { + return nil, err + } + + if !has { + return docsPos, nil + } + + allLids := make([]uint32, len(ids)) + for i, id := range ids { + if lid, ok := di.idsToLids.Get(id); ok { + allLids[i] = uint32(lid) + } + } + + skipLIDs := make(map[uint32]struct{}) + for { + lid := skipLIDsIterator.Next() + if lid.IsNull() { + break + } + skipLIDs[lid.Unpack()] = struct{}{} + } + + for i, lid := range allLids { + if _, ok := skipLIDs[lid]; ok { + docsPos[i] = seq.DocPosNotFound + } + } + + return docsPos, nil } func (di *activeFetchIndex) ReadDocs(blockOffset uint64, docOffsets []uint64) ([][]byte, error) { diff --git a/frac/active_indexer_test.go b/frac/active_indexer_test.go index 0b75b59e..a1200a7c 100644 --- a/frac/active_indexer_test.go +++ b/frac/active_indexer_test.go @@ -90,6 +90,7 @@ func BenchmarkIndexer(b *testing.B) { cache.NewCache[[]byte](nil, nil), cache.NewCache[[]byte](nil, nil), &Config{}, + testSkipMaskProvider{}, ) processor := getTestProcessor() diff --git a/frac/fraction_concurrency_test.go b/frac/fraction_concurrency_test.go index 70af8f46..a5c19b22 100644 --- a/frac/fraction_concurrency_test.go +++ b/frac/fraction_concurrency_test.go @@ -52,6 +52,7 @@ func TestConcurrentAppendAndQuery(t *testing.T) { cache.NewCache[[]byte](nil, nil), cache.NewCache[[]byte](nil, nil), &Config{}, + testSkipMaskProvider{}, ) mapping := seq.Mapping{ @@ -368,6 +369,7 @@ func seal(active *Active) (*Sealed, error) { indexCache, cache.NewCache[[]byte](nil, nil), &Config{}, + testSkipMaskProvider{}, ) active.Release() return sealed, nil diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 8e44f566..ec5f3d85 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -27,6 +27,7 @@ import ( "github.com/ozontech/seq-db/frac/sealed/seqids" "github.com/ozontech/seq-db/frac/sealed/token" "github.com/ozontech/seq-db/indexer" + "github.com/ozontech/seq-db/node" "github.com/ozontech/seq-db/parser" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/storage" @@ -34,6 +35,13 @@ import ( "github.com/ozontech/seq-db/tokenizer" ) +type testSkipMaskProvider struct{} + +func (testSkipMaskProvider) GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error) { + return node.NewStatic([]uint32{}, false), false, nil +} +func (testSkipMaskProvider) RemoveFrac(_ string) {} + type FractionTestSuite struct { suite.Suite tmpDir string @@ -2039,6 +2047,7 @@ func (s *FractionTestSuite) newActive(bulks ...[]string) *Active { cache.NewCache[[]byte](nil, nil), cache.NewCache[[]byte](nil, nil), s.config, + testSkipMaskProvider{}, ) var wg sync.WaitGroup @@ -2101,6 +2110,7 @@ func (s *FractionTestSuite) newSealed(bulks ...[]string) *Sealed { indexCache, cache.NewCache[[]byte](nil, nil), s.config, + testSkipMaskProvider{}, ) active.Release() return sealed @@ -2177,7 +2187,9 @@ func (s *ActiveReplayedFractionTestSuite) Replay(frac *Active) Fraction { storage.NewReadLimiter(1, nil), cache.NewCache[[]byte](nil, nil), cache.NewCache[[]byte](nil, nil), - &Config{}) + &Config{}, + testSkipMaskProvider{}, + ) err := replayedFrac.Replay(context.Background()) s.Require().NoError(err, "replay failed") return replayedFrac @@ -2288,7 +2300,9 @@ func (s *SealedLoadedFractionTestSuite) newSealedLoaded(bulks ...[]string) *Seal indexCache, cache.NewCache[[]byte](nil, nil), nil, - s.config) + s.config, + testSkipMaskProvider{}, + ) s.fraction = sealed return sealed } @@ -2356,7 +2370,9 @@ func (s *RemoteFractionTestSuite) SetupTest() { cache.NewCache[[]byte](nil, nil), sealed.info, s.config, - s3cli) + s3cli, + testSkipMaskProvider{}, + ) s.fraction = remoteFrac } } diff --git a/frac/processor/eval_tree.go b/frac/processor/eval_tree.go index 0f260053..b152b3bc 100644 --- a/frac/processor/eval_tree.go +++ b/frac/processor/eval_tree.go @@ -88,6 +88,11 @@ func evalLeaf( return node.BuildORTree(lidsTids), nil } +func evalSkipLIDs(root, skipLIDsIterator node.Node, stats *searchStats) node.Node { + stats.NodesTotal++ + return node.NewNAnd(skipLIDsIterator, root) +} + type Aggregator interface { // Next iterates to count the next lid. Next(lid node.LID) error diff --git a/frac/processor/fetch.go b/frac/processor/fetch.go index 41d46152..e2267780 100644 --- a/frac/processor/fetch.go +++ b/frac/processor/fetch.go @@ -7,13 +7,16 @@ import ( type fetchIndex interface { GetBlocksOffsets(uint32) uint64 - GetDocPos([]seq.ID) []seq.DocPos + GetDocPos([]seq.ID) ([]seq.DocPos, error) ReadDocs(blockOffset uint64, docOffsets []uint64) ([][]byte, error) } func IndexFetch(ids []seq.ID, sw *stopwatch.Stopwatch, fetchIndex fetchIndex, res [][]byte) error { m := sw.Start("get_docs_pos") - docsPos := fetchIndex.GetDocPos(ids) + docsPos, err := fetchIndex.GetDocPos(ids) + if err != nil { + return err + } blocks, offsets, index := seq.GroupDocsOffsets(docsPos) m.Stop() diff --git a/frac/processor/search.go b/frac/processor/search.go index 21fb4e8a..3c08eae0 100644 --- a/frac/processor/search.go +++ b/frac/processor/search.go @@ -38,6 +38,7 @@ type tokenIndex interface { type searchIndex interface { tokenIndex idsIndex + GetSkipLIDs(minLID, maxLID uint32, reverse bool) (node.Node, bool, error) } func IndexSearch( @@ -93,6 +94,19 @@ func IndexSearch( } } + m = sw.Start("get_skip_lids") + skipLIDs, hasSkipLIDs, err := index.GetSkipLIDs(minLID, maxLID, params.Order.IsReverse()) + m.Stop() + if err != nil { + return nil, err + } + + if hasSkipLIDs { + m = sw.Start("eval_skip_lids") + evalTree = evalSkipLIDs(evalTree, skipLIDs, stats) + m.Stop() + } + m = sw.Start("iterate_eval_tree") total, ids, histogram, aggs, err := iterateEvalTree(ctx, params, index, evalTree, aggSupplier, sw) m.Stop() diff --git a/frac/remote.go b/frac/remote.go index e15aa73f..7658e80e 100644 --- a/frac/remote.go +++ b/frac/remote.go @@ -55,6 +55,8 @@ type Remote struct { s3cli *s3.Client readLimiter *storage.ReadLimiter + + skipMaskProvider skipMaskProvider } func NewRemote( @@ -66,6 +68,7 @@ func NewRemote( info *common.Info, config *Config, s3cli *s3.Client, + skipMaskProvider skipMaskProvider, ) *Remote { f := &Remote{ ctx: ctx, @@ -81,6 +84,8 @@ func NewRemote( Config: config, s3cli: s3cli, + + skipMaskProvider: skipMaskProvider, } // Fast path if fraction-info cache exists AND it has valid index size. @@ -170,6 +175,7 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e &f.blocksData.IDsTable, f.info.BinaryDataVer, ), + skipMaskProvider: f.skipMaskProvider, }, nil } @@ -201,6 +207,8 @@ func (f *Remote) Suicide() { zap.Error(err), ) } + + f.skipMaskProvider.RemoveFrac(f.info.Name()) } func (f *Remote) String() string { diff --git a/frac/sealed.go b/frac/sealed.go index ddae8ccf..bda4fc72 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -51,6 +51,8 @@ type Sealed struct { // shit for testing PartialSuicideMode PSD + + skipMaskProvider skipMaskProvider } type PSD int // emulates hard shutdown on different stages of fraction deletion, used for tests @@ -68,6 +70,7 @@ func NewSealed( docsCache *cache.Cache[[]byte], info *common.Info, config *Config, + skipMaskProvider skipMaskProvider, ) *Sealed { f := &Sealed{ loadMu: &sync.RWMutex{}, @@ -81,6 +84,8 @@ func NewSealed( Config: config, PartialSuicideMode: Off, + + skipMaskProvider: skipMaskProvider, } // fast path if fraction-info cache exists AND it has valid index size @@ -130,6 +135,7 @@ func NewSealedPreloaded( indexCache *IndexCache, docsCache *cache.Cache[[]byte], config *Config, + skipMaskProvider skipMaskProvider, ) *Sealed { f := &Sealed{ blocksData: preloaded.BlocksData, @@ -144,6 +150,8 @@ func NewSealedPreloaded( info: preloaded.Info, BaseFileName: baseFile, Config: config, + + skipMaskProvider: skipMaskProvider, } // put the token table built during sealing into the cache of the sealed fraction @@ -292,6 +300,8 @@ func (f *Sealed) Suicide() { zap.Error(err), ) } + + f.skipMaskProvider.RemoveFrac(f.info.Name()) } func (f *Sealed) String() string { @@ -343,6 +353,8 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { &f.blocksData.IDsTable, f.info.BinaryDataVer, ), + + skipMaskProvider: f.skipMaskProvider, } } diff --git a/frac/sealed_index.go b/frac/sealed_index.go index 3899124a..7c62713c 100644 --- a/frac/sealed_index.go +++ b/frac/sealed_index.go @@ -22,6 +22,11 @@ import ( "github.com/ozontech/seq-db/util" ) +type skipMaskProvider interface { + GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error) + RemoveFrac(fracName string) +} + type sealedDataProvider struct { ctx context.Context info *common.Info @@ -42,6 +47,8 @@ type sealedDataProvider struct { // fractionTypeLabel can be either 'sealed' or 'remote'. // This value is used in metrics to distinguish between operations over local and remote fractions. fractionTypeLabel string + + skipMaskProvider skipMaskProvider } func (dp *sealedDataProvider) getIDsIndex() *sealedIDsIndex { @@ -54,9 +61,11 @@ func (dp *sealedDataProvider) getIDsIndex() *sealedIDsIndex { func (dp *sealedDataProvider) getFetchIndex() *sealedFetchIndex { return &sealedFetchIndex{ - idsIndex: dp.getIDsIndex(), - docsReader: dp.docsReader, - blocksOffsets: dp.blocksOffsets, + fracName: dp.info.Name(), + idsIndex: dp.getIDsIndex(), + docsReader: dp.docsReader, + blocksOffsets: dp.blocksOffsets, + skipMaskProvider: dp.skipMaskProvider, } } @@ -74,6 +83,7 @@ func (dp *sealedDataProvider) getSearchIndex() *sealedSearchIndex { return &sealedSearchIndex{ sealedIDsIndex: dp.getIDsIndex(), sealedTokenIndex: dp.getTokenIndex(), + skipMaskProvider: dp.skipMaskProvider, } } @@ -259,17 +269,56 @@ func (ti *sealedTokenIndex) GetLIDsFromTIDs(tids []uint32, stats lids.Counter, m } type sealedFetchIndex struct { - idsIndex *sealedIDsIndex - docsReader *storage.DocsReader - blocksOffsets []uint64 + fracName string + idsIndex *sealedIDsIndex + docsReader *storage.DocsReader + blocksOffsets []uint64 + skipMaskProvider skipMaskProvider } func (fi *sealedFetchIndex) GetBlocksOffsets(num uint32) uint64 { return fi.blocksOffsets[num] } -func (fi *sealedFetchIndex) GetDocPos(ids []seq.ID) []seq.DocPos { - return fi.getDocPosByLIDs(fi.findLIDs(ids)) +func (fi *sealedFetchIndex) GetDocPos(ids []seq.ID) ([]seq.DocPos, error) { + allLids := fi.findLIDs(ids) + + minLID, maxLID := uint32(0), uint32(math.MaxUint32) + if len(allLids) > 0 { + // allLids can be not sorted + minVal, maxVal := allLids[0], allLids[0] + for i := 1; i < len(allLids); i++ { + minVal = min(minVal, allLids[i]) + maxVal = max(maxVal, allLids[i]) + } + minLID, maxLID = uint32(minVal), uint32(maxVal) + } + + skipLIDsIterator, has, err := fi.skipMaskProvider.GetIDsIteratorByFrac(fi.fracName, minLID, maxLID, false) + if err != nil { + return nil, err + } + + if !has { + return fi.getDocPosByLIDs(allLids), nil + } + + skipLIDs := make(map[uint32]struct{}) + for { + lid := skipLIDsIterator.Next() + if lid.IsNull() { + break + } + skipLIDs[lid.Unpack()] = struct{}{} + } + + for i, lid := range allLids { + if _, ok := skipLIDs[uint32(lid)]; ok { + allLids[i] = 0 + } + } + + return fi.getDocPosByLIDs(allLids), nil } func (fi *sealedFetchIndex) ReadDocs(blockOffset uint64, docOffsets []uint64) ([][]byte, error) { @@ -324,4 +373,9 @@ func (fi *sealedFetchIndex) getDocPosByLIDs(localIDs []seq.LID) []seq.DocPos { type sealedSearchIndex struct { *sealedIDsIndex *sealedTokenIndex + skipMaskProvider skipMaskProvider +} + +func (si *sealedSearchIndex) GetSkipLIDs(minLID, maxLID uint32, reverse bool) (node.Node, bool, error) { + return si.skipMaskProvider.GetIDsIteratorByFrac(si.fracName, minLID, maxLID, reverse) } diff --git a/fracmanager/fracmanager.go b/fracmanager/fracmanager.go index bd5b4f19..b35066a9 100644 --- a/fracmanager/fracmanager.go +++ b/fracmanager/fracmanager.go @@ -35,13 +35,13 @@ var defaultStorageState = StorageState{ // - stats updating // // Returns the manager instance and a stop function to gracefully shutdown -func New(ctx context.Context, cfg *Config, s3cli *s3.Client) (*FracManager, func(), error) { +func New(ctx context.Context, cfg *Config, s3cli *s3.Client, skipMaskProvider skipMaskProvider) (*FracManager, func(), error) { FillConfigWithDefault(cfg) readLimiter := storage.NewReadLimiter(config.ReaderWorkers, storeBytesRead) idx, stopIdx := frac.NewActiveIndexer(config.IndexWorkers, config.IndexWorkers) cache := NewCacheMaintainer(cfg.CacheSize, cfg.SortCacheSize, newDefaultCacheMetrics()) - provider := newFractionProvider(cfg, s3cli, cache, readLimiter, idx) + provider := newFractionProvider(cfg, s3cli, cache, readLimiter, idx, skipMaskProvider) infoCache := NewFracInfoCache(filepath.Join(cfg.DataDir, consts.FracCacheFileSuffix)) // Load existing fractions into registry diff --git a/fracmanager/fracmanager_test.go b/fracmanager/fracmanager_test.go index 89437904..663dedec 100644 --- a/fracmanager/fracmanager_test.go +++ b/fracmanager/fracmanager_test.go @@ -8,9 +8,18 @@ import ( "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/indexer" + "github.com/ozontech/seq-db/node" "github.com/ozontech/seq-db/seq" ) +type testSkipMaskProvider struct{} + +func (testSkipMaskProvider) GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error) { + return node.NewStatic([]uint32{}, reverse), false, nil +} +func (testSkipMaskProvider) RefreshFrac(_ frac.Fraction) {} +func (testSkipMaskProvider) RemoveFrac(_ string) {} + func setupDataDir(t testing.TB, cfg *Config) *Config { if cfg == nil { cfg = &Config{ @@ -25,7 +34,7 @@ func setupDataDir(t testing.TB, cfg *Config) *Config { func setupFracManager(t testing.TB, cfg *Config) (*Config, *FracManager, func()) { cfg = setupDataDir(t, cfg) - fm, stop, err := New(t.Context(), cfg, nil) + fm, stop, err := New(t.Context(), cfg, nil, testSkipMaskProvider{}) assert.NoError(t, err) return cfg, fm, stop } diff --git a/fracmanager/fraction_provider.go b/fracmanager/fraction_provider.go index e2915598..73deb907 100644 --- a/fracmanager/fraction_provider.go +++ b/fracmanager/fraction_provider.go @@ -13,34 +13,44 @@ import ( "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" "github.com/ozontech/seq-db/frac/sealed/sealing" + "github.com/ozontech/seq-db/node" "github.com/ozontech/seq-db/storage" "github.com/ozontech/seq-db/storage/s3" ) const fileBasePattern = "seq-db-" +type skipMaskProvider interface { + GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error) + RefreshFrac(frac frac.Fraction) + RemoveFrac(fracName string) +} + // fractionProvider is a factory for creating different types of fractions // Contains all necessary dependencies for creating and managing fractions type fractionProvider struct { - s3cli *s3.Client // Client for S3 storage operations - config *Config // Fraction manager configuration - cacheProvider *CacheMaintainer // Cache provider for data access optimization - activeIndexer *frac.ActiveIndexer // Indexer for active fractions - readLimiter *storage.ReadLimiter // Read rate limiter - ulidEntropy io.Reader // Entropy source for ULID generation + s3cli *s3.Client // Client for S3 storage operations + config *Config // Fraction manager configuration + cacheProvider *CacheMaintainer // Cache provider for data access optimization + activeIndexer *frac.ActiveIndexer // Indexer for active fractions + readLimiter *storage.ReadLimiter // Read rate limiter + ulidEntropy io.Reader // Entropy source for ULID generation + skipMaskProvider skipMaskProvider } func newFractionProvider( cfg *Config, s3cli *s3.Client, cp *CacheMaintainer, readLimiter *storage.ReadLimiter, indexer *frac.ActiveIndexer, + skipMaskProvider skipMaskProvider, ) *fractionProvider { return &fractionProvider{ - s3cli: s3cli, - config: cfg, - cacheProvider: cp, - activeIndexer: indexer, - readLimiter: readLimiter, - ulidEntropy: ulid.Monotonic(rand.New(rand.NewSource(time.Now().UnixNano())), 0), + s3cli: s3cli, + config: cfg, + cacheProvider: cp, + activeIndexer: indexer, + readLimiter: readLimiter, + ulidEntropy: ulid.Monotonic(rand.New(rand.NewSource(time.Now().UnixNano())), 0), + skipMaskProvider: skipMaskProvider, } } @@ -52,6 +62,7 @@ func (fp *fractionProvider) NewActive(name string) *frac.Active { fp.cacheProvider.CreateDocBlockCache(), fp.cacheProvider.CreateSortDocsCache(), &fp.config.Fraction, + fp.skipMaskProvider, ) } @@ -63,6 +74,7 @@ func (fp *fractionProvider) NewSealed(name string, cachedInfo *common.Info) *fra fp.cacheProvider.CreateDocBlockCache(), cachedInfo, // Preloaded meta information &fp.config.Fraction, + fp.skipMaskProvider, ) } @@ -74,6 +86,7 @@ func (fp *fractionProvider) NewSealedPreloaded(name string, preloadedData *seale fp.cacheProvider.CreateIndexCache(), fp.cacheProvider.CreateDocBlockCache(), &fp.config.Fraction, + fp.skipMaskProvider, ) } @@ -87,6 +100,7 @@ func (fp *fractionProvider) NewRemote(ctx context.Context, name string, cachedIn cachedInfo, &fp.config.Fraction, fp.s3cli, + fp.skipMaskProvider, ) } @@ -117,7 +131,9 @@ func (fp *fractionProvider) Seal(active *frac.Active) (*frac.Sealed, error) { return nil, err } - return fp.NewSealedPreloaded(active.BaseFileName, preloaded), nil + sealedFrac := fp.NewSealedPreloaded(active.BaseFileName, preloaded) + fp.skipMaskProvider.RefreshFrac(sealedFrac) + return sealedFrac, nil } // Offload uploads fraction to S3 storage and returns a remote fraction diff --git a/fracmanager/fraction_provider_test.go b/fracmanager/fraction_provider_test.go index 5dffeeee..aae4e820 100644 --- a/fracmanager/fraction_provider_test.go +++ b/fracmanager/fraction_provider_test.go @@ -38,7 +38,7 @@ func setupFractionProvider(t testing.TB, cfg *Config) (*fractionProvider, func() s3cli, stopS3 := setupS3Client(t) idx, stopIdx := frac.NewActiveIndexer(1, 1) cache := NewCacheMaintainer(uint64(units.MB), uint64(units.MB), nil) - provider := newFractionProvider(cfg, s3cli, cache, rl, idx) + provider := newFractionProvider(cfg, s3cli, cache, rl, idx, testSkipMaskProvider{}) return provider, func() { stopIdx() stopS3() @@ -46,7 +46,7 @@ func setupFractionProvider(t testing.TB, cfg *Config) (*fractionProvider, func() } func TestFractionID(t *testing.T) { - fp := newFractionProvider(nil, nil, nil, nil, nil) + fp := newFractionProvider(nil, nil, nil, nil, nil, nil) ulid1 := fp.nextFractionID() ulid2 := fp.nextFractionID() assert.NotEqual(t, ulid1, ulid2, "ULIDs should be different") diff --git a/filtermanager/encoding.go b/skipmaskmanager/encoding.go similarity index 58% rename from filtermanager/encoding.go rename to skipmaskmanager/encoding.go index 26a6fa46..c0bd9c81 100644 --- a/filtermanager/encoding.go +++ b/skipmaskmanager/encoding.go @@ -1,4 +1,4 @@ -package filtermanager +package skipmaskmanager import ( "encoding/binary" @@ -12,40 +12,51 @@ import ( "github.com/ozontech/seq-db/zstd" ) -type DocsFilterBinIn struct { +// SkipMaskBinIn is the input structure for serializing a skip mask. +// It contains a slice of Local IDs (LIDs) that correspond to documents +// matching the skip mask query criteria. +type SkipMaskBinIn struct { LIDs []seq.LID } -type DocsFilterBinOut struct { +// SkipMaskBinOut is the output structure for deserialized skip mask data. +// After unmarshaling, LIDs are converted to uint32 array. +type SkipMaskBinOut struct { LIDs []uint32 } -type docsFilterBinVersion uint8 +type skipMaskBinVersion uint8 const ( - docsFilterBinVersion1 docsFilterBinVersion = iota + 1 + skipMaskBinVersion1 skipMaskBinVersion = iota + 1 ) -var availableVersions = map[docsFilterBinVersion]struct{}{ - docsFilterBinVersion1: {}, +var availableVersions = map[skipMaskBinVersion]struct{}{ + skipMaskBinVersion1: {}, } +// lidsCodec represents the compression codec used for LIDs block encoding. type lidsCodec byte const ( - lidsCodecDelta = 1 - lidsCodecDeltaZstd = 2 + lidsCodecDelta = 1 // Delta-encoded varints without compression + lidsCodecDeltaZstd = 2 // Delta-encoded varints with zstd compression ) +// lidsBlockHeader contains metadata for a block of LIDs. +// Each block stores a subset of LIDs (up to maxLIDsBlockLen) along with +// information needed to decode and locate the block data. type lidsBlockHeader struct { - Codec lidsCodec - Length uint32 // Number of LIDs in block - MinLID uint32 - MaxLID uint32 - Size uint32 // Size of ids block in bytes. - Offset uint64 // block's offset in file + Codec lidsCodec // Compression codec used for this block (delta or delta+zstd) + Length uint32 // Number of LIDs in this block + MinLID uint32 // Minimum LID value in the block + MaxLID uint32 // Maximum LID value in the block + Size uint32 // Size of the compressed block data in bytes + Offset uint64 // Offset of the block data in the file } +// marshal serializes the block header into the provided byte slice. +// The format is: Codec (1 byte) + Length (4 bytes) + MinLID (4 bytes) + MaxLID (4 bytes) + Size (4 bytes) + Offset (8 bytes) = 25 bytes. func (h *lidsBlockHeader) marshal(dst []byte) { if len(dst) < int(lidsBlockHeaderSizeBytes) { panic("BUG: marshal lidsBlockHeader: len(dst) is less than header size") @@ -65,6 +76,8 @@ func (h *lidsBlockHeader) marshal(dst []byte) { dst = dst[sizeOfUint64:] } +// unmarshal deserializes a block header from the provided byte slice. +// Returns the remaining unconsumed bytes and any error encountered. func (h *lidsBlockHeader) unmarshal(src []byte) ([]byte, error) { if len(src) < int(lidsBlockHeaderSizeBytes) { return src, errors.New("too few bytes") @@ -86,24 +99,31 @@ func (h *lidsBlockHeader) unmarshal(src []byte) ([]byte, error) { return src, nil } -func marshalDocsFilter(dst []byte, in *DocsFilterBinIn) []byte { - dst = append(dst, uint8(docsFilterBinVersion1)) +// marshalSkipMask serializes a skip mask into binary format. +// Returns the serialized data with the version byte prepended. +func marshalSkipMask(dst []byte, in *SkipMaskBinIn) []byte { + dst = append(dst, uint8(skipMaskBinVersion1)) dst = marshalLIDsBlocks(dst, in.LIDs) return dst } const ( - sizeOfUint32 = unsafe.Sizeof(uint32(0)) - sizeOfUint64 = unsafe.Sizeof(uint64(0)) + sizeOfUint32 = unsafe.Sizeof(uint32(0)) // 4 bytes + sizeOfUint64 = unsafe.Sizeof(uint64(0)) // 8 bytes ) const ( + // lidsBlockHeaderSizeBytes is the size of a single block header in bytes: 1 (Codec) + 4*4 (Length, MinLID, MaxLID, Size) + 8 (Offset) = 25 lidsBlockHeaderSizeBytes = 1 + (4 * sizeOfUint32) + sizeOfUint64 - maxLIDsBlockLen = 1024 + // maxLIDsBlockLen is the maximum number of LIDs stored in a single block + maxLIDsBlockLen = 1024 ) var lidsBlockBufPool util.BufferPool +// marshalLIDsBlocks splits the input LIDs into blocks and serializes them. +// Each block contains up to maxLIDsBlockLen LIDs. The output format is: +// [number of blocks: 4 bytes] [block 1 header] [block 2 header] ... [block 1 data] [block 2 data] ... func marshalLIDsBlocks(dst []byte, in []seq.LID) []byte { b := lidsBlockBufPool.Get() defer lidsBlockBufPool.Put(b) @@ -144,6 +164,10 @@ func marshalLIDsBlocks(dst []byte, in []seq.LID) []byte { return dst } +// marshalLIDsBlock encodes a slice of LIDs using delta compression. +// It first computes delta-encoded varints, then attempts zstd compression. +// If zstd provides at least 5% compression, it uses zstd; otherwise, it stores +// the raw delta-encoded data. Returns the encoded data and the codec used. func marshalLIDsBlock(dst []byte, in []seq.LID) ([]byte, lidsCodec) { b := lidsBlockBufPool.Get() defer lidsBlockBufPool.Put(b) @@ -168,17 +192,19 @@ func marshalLIDsBlock(dst []byte, in []seq.LID) ([]byte, lidsCodec) { return dst, lidsCodecDeltaZstd } -const minLIDsFIlterBytesLen = 10 // 1 byte lidsBinVersion + 8 byte number of LIDs + N (min 1) bytes varint + delta encoded LIDs +const minSkipMaskBytesLen = 10 // 1 byte skipMaskBinVersion + 8 byte number of LIDs + N (min 1) bytes varint + delta encoded LIDs -func unmarshalDocsFilter(dst *DocsFilterBinOut, src []byte) (_ []byte, err error) { - if len(src) < minLIDsFIlterBytesLen { - return nil, fmt.Errorf("invalid LIDs filter format; want %d bytes, got %d", minLIDsFIlterBytesLen, len(src)) +// unmarshalSkipMask deserializes a skip mask from binary format. +// Validates the version and delegates to unmarshalLIDsBlocks for block processing. +func unmarshalSkipMask(dst *SkipMaskBinOut, src []byte) (_ []byte, err error) { + if len(src) < minSkipMaskBytesLen { + return nil, fmt.Errorf("invalid skip mask format; want %d bytes, got %d", minSkipMaskBytesLen, len(src)) } - version := docsFilterBinVersion(src[0]) + version := skipMaskBinVersion(src[0]) src = src[1:] if _, ok := availableVersions[version]; !ok { - return nil, fmt.Errorf("invalid LIDs binary version: %d", version) + return nil, fmt.Errorf("invalid skip mask binary version: %d", version) } dst.LIDs, src, err = unmarshalLIDsBlocks(dst.LIDs, src) @@ -189,6 +215,9 @@ func unmarshalDocsFilter(dst *DocsFilterBinOut, src []byte) (_ []byte, err error return src, nil } +// unmarshalLIDsBlocks reads all LIDs blocks from the source data. +// First reads the number of blocks, then parses each block header, +// and finally decodes each block's data. func unmarshalLIDsBlocks(dst []uint32, src []byte) ([]uint32, []byte, error) { numberOfBlocks := binary.LittleEndian.Uint32(src) src = src[sizeOfUint32:] @@ -219,6 +248,8 @@ func unmarshalLIDsBlocks(dst []uint32, src []byte) ([]uint32, []byte, error) { return dst, src, nil } +// unmarshalLIDsBlock decodes a single LIDs block based on its header. +// Handles both compressed (zstd) and uncompressed codec types. func unmarshalLIDsBlock(dst []uint32, src []byte, header lidsBlockHeader) ([]uint32, []byte, error) { if len(src) == 0 { return dst, src, fmt.Errorf("empty LIDs block") @@ -274,6 +305,9 @@ func unmarshalLIDsDelta(dst []uint32, block []byte, header lidsBlockHeader) ([]u return dst, nil } +// getCompressLevel returns the appropriate zstd compression level based on data size. +// Higher compression levels are used for larger data to achieve better ratios. +// Returns: 1 for <=512 bytes, 2 for <=4KB, 3 for larger data. func getCompressLevel(size int) int { level := 3 if size <= 512 { diff --git a/filtermanager/encoding_test.go b/skipmaskmanager/encoding_test.go similarity index 58% rename from filtermanager/encoding_test.go rename to skipmaskmanager/encoding_test.go index 8af0b7a3..38647571 100644 --- a/filtermanager/encoding_test.go +++ b/skipmaskmanager/encoding_test.go @@ -1,4 +1,4 @@ -package filtermanager +package skipmaskmanager import ( "testing" @@ -9,28 +9,28 @@ import ( "github.com/ozontech/seq-db/seq" ) -func TestMarshalUnmarshalLIDsFilter(t *testing.T) { - test := func(df DocsFilterBinIn) { +func TestMarshalUnmarshalSkipMask(t *testing.T) { + test := func(df SkipMaskBinIn) { t.Helper() - rawDocsFilter := marshalDocsFilter(nil, &df) - var out DocsFilterBinOut - tail, err := unmarshalDocsFilter(&out, rawDocsFilter) + rawSkipMask := marshalSkipMask(nil, &df) + var out SkipMaskBinOut + tail, err := unmarshalSkipMask(&out, rawSkipMask) require.NoError(t, err) require.Equal(t, 0, len(tail)) assert.Equal(t, lidsToUint32s(df.LIDs), out.LIDs) } - test(DocsFilterBinIn{LIDs: []seq.LID{0, 1, 2, 3}}) - test(DocsFilterBinIn{LIDs: []seq.LID{10, 15, 22, 18, 105, 1010}}) - test(DocsFilterBinIn{LIDs: []seq.LID{11}}) + test(SkipMaskBinIn{LIDs: []seq.LID{0, 1, 2, 3}}) + test(SkipMaskBinIn{LIDs: []seq.LID{10, 15, 22, 18, 105, 1010}}) + test(SkipMaskBinIn{LIDs: []seq.LID{11}}) multipleBlocksSize := maxLIDsBlockLen*3 + 15 multipleBlocksLIDs := make([]seq.LID, 0, multipleBlocksSize) for i := range multipleBlocksSize { multipleBlocksLIDs = append(multipleBlocksLIDs, seq.LID(i)) } - test(DocsFilterBinIn{LIDs: multipleBlocksLIDs}) + test(SkipMaskBinIn{LIDs: multipleBlocksLIDs}) } func lidsToUint32s(in []seq.LID) []uint32 { diff --git a/filtermanager/iterator.go b/skipmaskmanager/iterator.go similarity index 97% rename from filtermanager/iterator.go rename to skipmaskmanager/iterator.go index eff9fbb6..b8a4de12 100644 --- a/filtermanager/iterator.go +++ b/skipmaskmanager/iterator.go @@ -1,4 +1,4 @@ -package filtermanager +package skipmaskmanager import "sort" diff --git a/filtermanager/iterator_asc.go b/skipmaskmanager/iterator_asc.go similarity index 93% rename from filtermanager/iterator_asc.go rename to skipmaskmanager/iterator_asc.go index db0e0c15..a50b6dcb 100644 --- a/filtermanager/iterator_asc.go +++ b/skipmaskmanager/iterator_asc.go @@ -1,4 +1,4 @@ -package filtermanager +package skipmaskmanager import ( "go.uber.org/zap" @@ -17,7 +17,7 @@ func (it *IteratorAsc) Next() node.LID { if it.loader.headers == nil { headers, err := it.loader.getHeaders() if err != nil { - logger.Panic("can't load filter file headers", zap.Error(err)) + logger.Panic("can't load skip mask file headers", zap.Error(err)) } it.loader.headers = headers it.blockIndex = len(it.loader.headers) - 1 diff --git a/filtermanager/iterator_asc_test.go b/skipmaskmanager/iterator_asc_test.go similarity index 88% rename from filtermanager/iterator_asc_test.go rename to skipmaskmanager/iterator_asc_test.go index b9e6ce93..72ab47ff 100644 --- a/filtermanager/iterator_asc_test.go +++ b/skipmaskmanager/iterator_asc_test.go @@ -1,4 +1,4 @@ -package filtermanager +package skipmaskmanager import ( "math" @@ -53,9 +53,9 @@ func TestIteratorAsc(t *testing.T) { for _, tc := range tests { t.Run(tc.title, func(t *testing.T) { - rawDocsFilter := marshalDocsFilter(nil, &DocsFilterBinIn{LIDs: multipleBlocksLIDs}) - filePath := filepath.Join(t.TempDir(), "some.filter") - err := os.WriteFile(filePath, rawDocsFilter, 0o644) + rawSkipMask := marshalSkipMask(nil, &SkipMaskBinIn{LIDs: multipleBlocksLIDs}) + filePath := filepath.Join(t.TempDir(), "some.skipmask") + err := os.WriteFile(filePath, rawSkipMask, 0o644) require.NoError(t, err) loader, err := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil)) diff --git a/filtermanager/iterator_desc.go b/skipmaskmanager/iterator_desc.go similarity index 93% rename from filtermanager/iterator_desc.go rename to skipmaskmanager/iterator_desc.go index e5951948..c207d671 100644 --- a/filtermanager/iterator_desc.go +++ b/skipmaskmanager/iterator_desc.go @@ -1,4 +1,4 @@ -package filtermanager +package skipmaskmanager import ( "go.uber.org/zap" @@ -17,7 +17,7 @@ func (it *IteratorDesc) Next() node.LID { if it.loader.headers == nil { headers, err := it.loader.getHeaders() if err != nil { - logger.Panic("can't load filter file headers", zap.Error(err)) + logger.Panic("can't load skip mask file headers", zap.Error(err)) } it.loader.headers = headers } diff --git a/filtermanager/iterator_desc_test.go b/skipmaskmanager/iterator_desc_test.go similarity index 87% rename from filtermanager/iterator_desc_test.go rename to skipmaskmanager/iterator_desc_test.go index e2736160..749ced7d 100644 --- a/filtermanager/iterator_desc_test.go +++ b/skipmaskmanager/iterator_desc_test.go @@ -1,4 +1,4 @@ -package filtermanager +package skipmaskmanager import ( "math" @@ -48,9 +48,9 @@ func TestIteratorDesc(t *testing.T) { for _, tc := range tests { t.Run(tc.title, func(t *testing.T) { - rawDocsFilter := marshalDocsFilter(nil, &DocsFilterBinIn{LIDs: multipleBlocksLIDs}) - filePath := filepath.Join(t.TempDir(), "some.filter") - err := os.WriteFile(filePath, rawDocsFilter, 0o644) + rawSkipMask := marshalSkipMask(nil, &SkipMaskBinIn{LIDs: multipleBlocksLIDs}) + filePath := filepath.Join(t.TempDir(), "some.skipmask") + err := os.WriteFile(filePath, rawSkipMask, 0o644) require.NoError(t, err) loader, err := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil)) diff --git a/filtermanager/loader.go b/skipmaskmanager/loader.go similarity index 92% rename from filtermanager/loader.go rename to skipmaskmanager/loader.go index dff30926..0a0717b3 100644 --- a/filtermanager/loader.go +++ b/skipmaskmanager/loader.go @@ -1,9 +1,8 @@ -package filtermanager +package skipmaskmanager import ( "encoding/binary" "fmt" - "hash/fnv" "io" "os" @@ -22,13 +21,10 @@ type loader struct { } func newLoader(filePath string, headersCache *cache.Cache[[]lidsBlockHeader]) (*loader, error) { - hash := fnv.New32a() - hash.Write([]byte(filterNameFromPath(filePath) + fracNameFromFilePath(filePath))) - return &loader{ filePath: filePath, headersCache: headersCache, - cashKey: hash.Sum32(), + cashKey: hashFilePath(filePath), }, nil } @@ -69,7 +65,7 @@ func (l *loader) loadHeaders() ([]lidsBlockHeader, error) { return nil, fmt.Errorf("can't read headers from disk: n=0") } - version := docsFilterBinVersion(numBuf[0]) + version := skipMaskBinVersion(numBuf[0]) if _, ok := availableVersions[version]; !ok { return nil, fmt.Errorf("invalid LIDs binary version: %d", version) } @@ -148,7 +144,7 @@ func (l *loader) loadBlock(index int) ([]uint32, error) { func (l *loader) release() error { if l.file != nil { if err := l.file.Close(); err != nil { - logger.Error("can't close filter file", zap.Error(err)) + logger.Error("can't close skip mask file", zap.Error(err)) return err } l.file = nil diff --git a/filtermanager/loader_test.go b/skipmaskmanager/loader_test.go similarity index 79% rename from filtermanager/loader_test.go rename to skipmaskmanager/loader_test.go index 3a13464f..eb49a472 100644 --- a/filtermanager/loader_test.go +++ b/skipmaskmanager/loader_test.go @@ -1,4 +1,4 @@ -package filtermanager +package skipmaskmanager import ( "os" @@ -18,9 +18,9 @@ func TestLoader(t *testing.T) { multipleBlocksLIDs = append(multipleBlocksLIDs, seq.LID(i)) } - rawDocsFilter := marshalDocsFilter(nil, &DocsFilterBinIn{LIDs: multipleBlocksLIDs}) - filePath := filepath.Join(t.TempDir(), "some.filter") - err := os.WriteFile(filePath, rawDocsFilter, 0o644) + rawSkipMask := marshalSkipMask(nil, &SkipMaskBinIn{LIDs: multipleBlocksLIDs}) + filePath := filepath.Join(t.TempDir(), "some.skipmask") + err := os.WriteFile(filePath, rawSkipMask, 0o644) require.NoError(t, err) loader, err := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil)) diff --git a/filtermanager/merged_iterator.go b/skipmaskmanager/merged_iterator.go similarity index 96% rename from filtermanager/merged_iterator.go rename to skipmaskmanager/merged_iterator.go index 7affabf5..e949c12f 100644 --- a/filtermanager/merged_iterator.go +++ b/skipmaskmanager/merged_iterator.go @@ -1,4 +1,4 @@ -package filtermanager +package skipmaskmanager import "github.com/ozontech/seq-db/node" diff --git a/filtermanager/merged_iterator_test.go b/skipmaskmanager/merged_iterator_test.go similarity index 95% rename from filtermanager/merged_iterator_test.go rename to skipmaskmanager/merged_iterator_test.go index c70d178c..fe0af2af 100644 --- a/filtermanager/merged_iterator_test.go +++ b/skipmaskmanager/merged_iterator_test.go @@ -1,4 +1,4 @@ -package filtermanager +package skipmaskmanager import ( "fmt" @@ -69,7 +69,7 @@ func (it *testIteratorDesc) Next() node.LID { } func (it *testIteratorDesc) NextGeq(nextID node.LID) node.LID { - return node.NullLID() // TODO: ??? + return node.NullLID() } type testIteratorAsc struct { @@ -87,7 +87,7 @@ func (it *testIteratorAsc) Next() node.LID { lid := it.lids[0] it.lids = it.lids[1:] - return node.NewAscLID(lid) // TODO: ??? + return node.NewAscLID(lid) } func (it *testIteratorAsc) NextGeq(nextID node.LID) node.LID { diff --git a/filtermanager/metrics.go b/skipmaskmanager/metrics.go similarity index 50% rename from filtermanager/metrics.go rename to skipmaskmanager/metrics.go index ed47171e..be39505b 100644 --- a/filtermanager/metrics.go +++ b/skipmaskmanager/metrics.go @@ -1,4 +1,4 @@ -package filtermanager +package skipmaskmanager import ( "github.com/prometheus/client_golang/prometheus" @@ -6,22 +6,22 @@ import ( ) var ( - inProgressFilters = promauto.NewGauge(prometheus.GaugeOpts{ + inProgress = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "seq_db_store", - Subsystem: "filters", + Subsystem: "skip_masks", Name: "in_progress", - Help: "Number of doc filters in progress", + Help: "Number of skip masks in progress", }) diskUsage = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "seq_db_store", - Subsystem: "filters", + Subsystem: "skip_masks", Name: "disk_usage_bytes", - Help: "Disk space used by filter files in bytes", + Help: "Disk space used by skip mask files in bytes", }) - storedFilters = promauto.NewGauge(prometheus.GaugeOpts{ + stored = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "seq_db_store", - Subsystem: "filters", + Subsystem: "skip_masks", Name: "stored", - Help: "Number of active doc filters", + Help: "Number of active skip masks", }) ) diff --git a/filtermanager/filter.go b/skipmaskmanager/skip_mask.go similarity index 51% rename from filtermanager/filter.go rename to skipmaskmanager/skip_mask.go index f7d03566..1ebccdc2 100644 --- a/filtermanager/filter.go +++ b/skipmaskmanager/skip_mask.go @@ -1,4 +1,4 @@ -package filtermanager +package skipmaskmanager import ( "crypto/sha256" @@ -10,47 +10,49 @@ import ( "github.com/ozontech/seq-db/seq" ) -type FilterStatus byte +type SkipMaskStatus byte const ( - StatusCreated FilterStatus = iota + StatusCreated SkipMaskStatus = iota StatusInProgress StatusDone StatusError ) -type Params struct { +type SkipMaskParams struct { Query string From seq.MID To seq.MID } -type Filter struct { - params Params +type SkipMask struct { + params SkipMaskParams - status FilterStatus + status SkipMaskStatus ast parser.SeqQLQuery hash string dirPath string + mu *sync.RWMutex processWg *sync.WaitGroup } -func NewFilter(params Params) *Filter { - return &Filter{ +func NewSkipMask(params SkipMaskParams) *SkipMask { + return &SkipMask{ params: params, status: StatusCreated, + mu: &sync.RWMutex{}, processWg: &sync.WaitGroup{}, } } -func (f *Filter) String() string { +func (f *SkipMask) String() string { return fmt.Sprintf("%s_%d_%d", f.params.Query, f.params.From, f.params.To) } -func (f *Filter) Hash() string { +func (f *SkipMask) Hash() string { if f.hash == "" { h := sha256.New() h.Write([]byte(f.String())) @@ -60,6 +62,16 @@ func (f *Filter) Hash() string { return f.hash } -func (f *Filter) markAsDone() { - f.status = StatusDone +func (f *SkipMask) setStatus(status SkipMaskStatus) { + f.mu.Lock() + defer f.mu.Unlock() + + f.status = status +} + +func (f *SkipMask) getStatus() SkipMaskStatus { + f.mu.RLock() + defer f.mu.RUnlock() + + return f.status } diff --git a/skipmaskmanager/skip_mask_manager.go b/skipmaskmanager/skip_mask_manager.go new file mode 100644 index 00000000..055cca33 --- /dev/null +++ b/skipmaskmanager/skip_mask_manager.go @@ -0,0 +1,631 @@ +package skipmaskmanager + +import ( + "context" + "errors" + "fmt" + "hash/fnv" + "math" + "os" + "path" + "runtime" + "strings" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/ozontech/seq-db/cache" + "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/processor" + "github.com/ozontech/seq-db/fracmanager" + "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/node" + "github.com/ozontech/seq-db/parser" + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/util" +) + +const ( + fracInQueueExt = ".queue" + fracDoneExt = ".skipmask" + tmpExt = ".tmp" + + tmpDirSuffix = "_tmp" +) + +const ( + defaultMaintenanceInterval = 30 * time.Second + defaultCacheCleanInterval = 10 * time.Millisecond + defaultCacheGCDelay = 1 * time.Second +) + +type MappingProvider interface { + GetMapping() seq.Mapping +} + +// Config holds configuration parameters for SkipMaskManager. +type Config struct { + DataDir string // Directory to store skip mask files + Workers int // Number of concurrent workers for processing + CacheSizeLimit uint64 // Maximum size of the headers cache in bytes +} + +// SkipMaskManager manages the lifecycle of skip masks across all fractions. +// It processes skip mask queries, stores results to disk, and provides +// iteration over matching document IDs. +// +// Skip masks are organized by query parameters (query string, from/to MID range). +// Each skip mask maintains a directory containing files for each fraction: +// - .queue file: fraction is currently being processed +// - .skipmask file: processing complete, contains matching document LIDs +// +// The manager runs background maintenance tasks for disk usage monitoring +// and cache cleanup. +type SkipMaskManager struct { + ctx context.Context + ctxCancel context.CancelFunc + + config Config + skipMasks map[string]*SkipMask + + fracs map[string][]string + fracsMu *sync.RWMutex + + mp MappingProvider + + rateLimit chan struct{} + + bgWG *sync.WaitGroup + maintenanceInterval time.Duration + + cacheCleanInterval time.Duration + cacheGCDelay time.Duration + + headersCache *cache.Cache[[]lidsBlockHeader] + headersCacheCleaner *cache.Cleaner +} + +// New creates a new SkipMaskManager with the given configuration. +// If Workers is not set (0), it defaults to GOMAXPROCS. +func New( + ctx context.Context, + cfg Config, + params []SkipMaskParams, + mp MappingProvider, +) *SkipMaskManager { + fmCtx, ctxCancel := context.WithCancel(ctx) + + workers := cfg.Workers + if workers <= 0 { + workers = runtime.GOMAXPROCS(0) + } + + skipMasksMap := make(map[string]*SkipMask, len(params)) + + for _, p := range params { + sm := NewSkipMask(p) + skipMasksMap[sm.Hash()] = sm + } + + cacheCleaner := cache.NewCleaner(cfg.CacheSizeLimit, nil) + + return &SkipMaskManager{ + ctx: fmCtx, + ctxCancel: ctxCancel, + config: cfg, + skipMasks: skipMasksMap, + fracs: make(map[string][]string), + fracsMu: &sync.RWMutex{}, + mp: mp, + rateLimit: make(chan struct{}, workers), + bgWG: &sync.WaitGroup{}, + maintenanceInterval: defaultMaintenanceInterval, + cacheCleanInterval: defaultCacheCleanInterval, + cacheGCDelay: defaultCacheGCDelay, + headersCache: cache.NewCache[[]lidsBlockHeader](cacheCleaner, nil), + headersCacheCleaner: cacheCleaner, + } +} + +// Start initializes and starts the skip mask manager. +// It performs the following steps: +// - Creates the data directory if it doesn't exist +// - Loads any existing skip masks from previous sessions +// - Builds the processing queue for all fractions +// - Starts background maintenance and cache cleanup loops +// - Begins asynchronous processing of all skip mask queries +// +// This method must be called before using the manager. +func (smm *SkipMaskManager) Start(fracs fracmanager.List) { + smm.createDataDir() + + err := smm.loadSkipMasks() + if err != nil { + logger.Fatal("failed to load previous skip masks", zap.Error(err)) + } + + err = smm.buildQueue(fracs) + if err != nil { + logger.Fatal("failed to build skip mask manager queue", zap.Error(err)) + } + + smm.startMaintenance() + smm.cacheCleanLoop() + + mapping := smm.mp.GetMapping() + + smm.bgWG.Add(1) + go func() { + defer smm.bgWG.Done() + + for _, sm := range smm.skipMasks { + ast, err := parser.ParseSeqQL(sm.params.Query, mapping) + if err != nil { + panic(fmt.Errorf("BUG: search query must be valid: %s", err)) + } + sm.ast = ast + + smm.processSkipMask(sm, fracs.FilterInRange(sm.params.From, sm.params.To)) + } + }() +} + +// Stop gracefully stops the skip mask manager. +// It cancels the context, waits for all background goroutines to complete, +// and logs a message when fully stopped. +func (smm *SkipMaskManager) Stop() { + smm.ctxCancel() + smm.bgWG.Wait() + logger.Info("skip mask manager stopped") +} + +// GetIDsIteratorByFrac returns an iterator over document IDs that match +// the skip mask queries for a specific fraction, within the given LID range. +// +// Parameters: +// - fracName: the name of the fraction to query +// - minLID: minimum local ID (inclusive) +// - maxLID: maximum local ID (inclusive) +// - reverse: if true, iterates IDs in descending order +// +// Returns: +// - node.Node: iterator over matching document IDs +// - bool: true if the fraction has any skip mask files, false otherwise +// - error: any error encountered while opening the skip mask files +func (smm *SkipMaskManager) GetIDsIteratorByFrac( + fracName string, + minLID, maxLID uint32, + reverse bool, +) (node.Node, bool, error) { + smm.fracsMu.RLock() + defer smm.fracsMu.RUnlock() + + fracFiles, has := smm.fracs[fracName] + if !has { + return &EmptyIterator{}, has, nil + } + + iterators := make([]node.Node, 0, len(fracFiles)) + for _, f := range fracFiles { + loader, err := newLoader(f, smm.headersCache) + if err != nil { + logger.Error("can't open skip mask file", zap.String("path", f), zap.Error(err)) + return nil, has, err + } + if reverse { + iterators = append(iterators, (*IteratorAsc)(NewIterator(loader, minLID, maxLID))) + } else { + iterators = append(iterators, (*IteratorDesc)(NewIterator(loader, minLID, maxLID))) + } + } + + return NewNMergedIterators(iterators), has, nil +} + +// RefreshFrac recomputes skip mask files for a fraction after it has been sealed. +// This is called when an active fraction becomes sealed. +// The method: +// - Removes existing skip mask files for the fraction +// - Marks relevant skip masks as in-progress +// - Queues the fraction for reprocessing +// - Asynchronously processes the fraction through all matching skip masks +func (smm *SkipMaskManager) RefreshFrac(fraction frac.Fraction) { + smm.fracsMu.Lock() + fracsFiles, has := smm.fracs[fraction.Info().Name()] + delete(smm.fracs, fraction.Info().Name()) + smm.fracsMu.Unlock() + + if !has { + return + } + + // mark skip masks as InProgress + for _, fileName := range fracsFiles { + smm.skipMasks[skipMaskNameFromPath(fileName)].setStatus(StatusInProgress) + } + + smm.bgWG.Add(1) + go func() { + defer smm.bgWG.Done() + + for _, fileName := range fracsFiles { + util.RemoveFile(fileName) + smm.headersCache.Evict(hashFilePath(fileName)) + + skipMask := smm.skipMasks[skipMaskNameFromPath(fileName)] + + queueFilePath := path.Join(skipMask.dirPath, makeFileName(fraction.Info().Name(), fracInQueueExt)) + util.MustWriteFileAtomic(queueFilePath, []byte{}, 0o666, tmpExt) + + select { + case <-smm.ctx.Done(): + // do not return because we have to create a .queue file for each of frac files to handle it on startup + continue + case smm.rateLimit <- struct{}{}: + go func() { + defer func() { <-smm.rateLimit }() + if err := smm.processFrac(fraction, skipMask); err != nil { + if errors.Is(err, context.Canceled) { + logger.Info("skip mask manager refresh frac context cancelled") + return + } + panic(fmt.Errorf("skip mask manager refresh frac err: %s", err)) + } + skipMask.setStatus(StatusDone) + }() + } + } + }() +} + +// RemoveFrac removes all skip mask files associated with a fraction. +// This should be called when a fraction is deleted from the system. +// The removal is performed asynchronously in the background. +func (smm *SkipMaskManager) RemoveFrac(fracName string) { + // TODO: we might want to have some kind of GC on startup to clean up missed files + smm.bgWG.Go(func() { + smm.fracsMu.RLock() + fracsFiles, has := smm.fracs[fracName] + smm.fracsMu.RUnlock() + + if !has { + return + } + + smm.fracsMu.Lock() + delete(smm.fracs, fracName) + smm.fracsMu.Unlock() + + for _, fileName := range fracsFiles { + util.RemoveFile(fileName) + smm.headersCache.Evict(hashFilePath(fileName)) + } + }) +} + +// IsDone returns true if all skip masks have been processed and are in StatusDone state. +// This is useful for determining when initial skip mask computation is complete. +func (smm *SkipMaskManager) IsDone() bool { + for _, sm := range smm.skipMasks { + if sm.getStatus() != StatusDone { + return false + } + } + return true +} + +func skipMaskNameFromPath(p string) string { + return path.Base(path.Dir(p)) +} + +// addDoneFrac registers a completed fraction's skip mask file path. +// Called when a fraction's skip mask processing finishes successfully. +func (smm *SkipMaskManager) addDoneFrac(fracName, fracPath string) { + smm.fracsMu.Lock() + defer smm.fracsMu.Unlock() + + smm.fracs[fracName] = append(smm.fracs[fracName], fracPath) +} + +// loadSkipMasks loads skip masks from a previous session. +// It scans the data directory for existing skip mask files and: +// - Removes directories that are not in the current configuration +// - Marks in-progress skip masks based on .queue files +// - Registers completed skip masks (.skipmask files) +// +// This allows the manager to resume processing after a restart. +func (smm *SkipMaskManager) loadSkipMasks() error { + des, err := os.ReadDir(smm.config.DataDir) + if err != nil { + return err + } + + var anyRemove bool + + for _, de := range des { + if !de.IsDir() { + continue + } + + if _, ok := smm.skipMasks[de.Name()]; !ok { + logger.Info("there is skip mask folder on disk, but not in config. need to delete it.") + err := os.RemoveAll(path.Join(smm.config.DataDir, de.Name())) + if err != nil && !os.IsNotExist(err) { + return err + } + anyRemove = true + continue + } + + sm := smm.skipMasks[de.Name()] + sm.setStatus(StatusInProgress) + sm.dirPath = path.Join(smm.config.DataDir, de.Name()) + + skipMaskDes, err := os.ReadDir(sm.dirPath) + if err != nil { + return fmt.Errorf("reading directory: %s", err) + } + + var hasFracsInQueue bool + + for _, smde := range skipMaskDes { + if smde.IsDir() { + continue + } + name := smde.Name() + + switch path.Ext(name) { + case fracInQueueExt: + hasFracsInQueue = true + case fracDoneExt: + smm.addDoneFrac(fracNameFromFilePath(name), path.Join(sm.dirPath, name)) + } + } + + if !hasFracsInQueue { + sm.setStatus(StatusDone) + } + } + + if anyRemove { + util.MustFsyncFile(smm.config.DataDir) + } + + return nil +} + +// buildQueue initializes the processing queue for newly created skip masks. +// For each skip mask in StatusCreated state, it: +// - Creates a temporary directory +// - Generates .queue files for all fractions in the mask's range +// - Atomically renames the temp directory to the final name +// +// This sets up the files needed for parallel processing. +func (smm *SkipMaskManager) buildQueue(fracs fracmanager.List) error { + for _, skipMask := range smm.skipMasks { + if skipMask.getStatus() != StatusCreated { + continue + } + + // create tmp dir + tmpDir := path.Join(smm.config.DataDir, fmt.Sprintf("%s%s", skipMask.Hash(), tmpDirSuffix)) + util.MustCreateDir(tmpDir) + + skipMaskFracs := fracs.FilterInRange(skipMask.params.From, skipMask.params.To) + for _, f := range skipMaskFracs { + queueFilePath := path.Join(tmpDir, makeFileName(f.Info().Name(), fracInQueueExt)) + util.MustWriteFileAtomic(queueFilePath, []byte{}, 0o666, tmpExt) + } + + // rename tmp dir + dir := path.Join(smm.config.DataDir, skipMask.Hash()) + if err := os.Rename(tmpDir, dir); err != nil { + return err + } + util.MustFsyncFile(smm.config.DataDir) + skipMask.dirPath = dir + } + + return nil +} + +// processSkipMask executes the skip mask query against all fractions in range. +// It processes each fraction with a .queue file, running search queries in parallel +// (limited by the rate limiter). Each successful search writes results to a .skipmask +// file. The skip mask status is set to Done when all fractions are processed. +func (smm *SkipMaskManager) processSkipMask(skipMask *SkipMask, fracs fracmanager.List) { + if len(fracs) == 0 { + skipMask.setStatus(StatusDone) + return + } + + fracsByName := make(map[string]frac.Fraction) + for _, f := range fracs { + fracsByName[f.Info().Name()] = f + } + + skipMaskDes, err := os.ReadDir(skipMask.dirPath) + if err != nil { + panic(fmt.Errorf("BUG: reading directory must be successful: %s", err)) + } + + inProgress.Add(1) + + processFracInQueue := func(name string) error { + f, ok := fracsByName[fracNameFromFilePath(name)] + if !ok { // skip missing fracs + return nil + } + + select { + case <-smm.ctx.Done(): + return nil + case smm.rateLimit <- struct{}{}: + skipMask.processWg.Add(1) + go func() { + defer skipMask.processWg.Done() + defer func() { <-smm.rateLimit }() + if err := smm.processFrac(f, skipMask); err != nil { + if errors.Is(err, context.Canceled) { + logger.Info("skip mask manager refresh frac context cancelled") + return + } + panic(fmt.Errorf("skip mask manager process frac err: %s", err)) + } + }() + } + return nil + } + _ = util.VisitFilesWithExt(skipMaskDes, fracInQueueExt, processFracInQueue) + + go func() { + skipMask.processWg.Wait() + skipMask.setStatus(StatusDone) + inProgress.Add(-1) + }() +} + +// processFrac executes the skip mask query against a single fraction. +// It performs a search to find matching document IDs, then converts them +// to local IDs (LIDs), and writes the serialized skip mask to disk. +// The .queue file is replaced with a .skipmask file upon completion. +func (smm *SkipMaskManager) processFrac(f frac.Fraction, skipMask *SkipMask) error { + qpr, err := f.Search(smm.ctx, processor.SearchParams{ + AST: skipMask.ast.Root, + From: skipMask.params.From, + To: skipMask.params.To, + Limit: math.MaxInt64, + }) + if err != nil { + return err + } + + queueFilePath := path.Join(skipMask.dirPath, makeFileName(f.Info().Name(), fracInQueueExt)) + doneFilePath := path.Join(skipMask.dirPath, makeFileName(f.Info().Name(), fracDoneExt)) + + if len(qpr.IDs) == 0 { + util.RemoveFile(queueFilePath) + return nil + } + + // TODO: here we doing part of the work twice: + // first time we find LIDs inside f.Search() and then find IDs by these LIDs. + // Then we again find LIDs by earlier found IDs in f.FindLIDs(). + // We did it like this because otherwise we had to do serious f.Search() rewrite. + // For now we're ok with some performance penalty. + lids, err := f.FindLIDs(smm.ctx, qpr.IDs.IDs()) + if err != nil { + return err + } + + skipMaskBin := SkipMaskBinIn{LIDs: lids} + if err := writeSkipMask(&skipMaskBin, queueFilePath, doneFilePath); err != nil { + return err + } + + smm.addDoneFrac(f.Info().Name(), doneFilePath) + + return nil +} + +// startMaintenance runs a background goroutine that periodically checks +// disk usage metrics. It logs the total size of all skip mask files. +func (smm *SkipMaskManager) startMaintenance() { + smm.bgWG.Go(func() { + logger.Info("start skip mask manager maintenance") + util.RunEvery(smm.ctx.Done(), smm.maintenanceInterval, func() { + logger.Info("skip mask manager maintenance iteration") + smm.checkDiskUsage() + }) + }) +} + +// cacheCleanLoop runs a background goroutine that periodically cleans up +// the headers cache. It performs incremental cleanup on each tick and +// full GC periodically (based on cacheGCDelay). +func (smm *SkipMaskManager) cacheCleanLoop() { + smm.bgWG.Go(func() { + runs := 0 + gcRunsCount := int(smm.cacheGCDelay / smm.cacheCleanInterval) + + util.RunEvery(smm.ctx.Done(), smm.cacheCleanInterval, func() { + runs++ + smm.headersCacheCleaner.Cleanup(&cache.CleanStat{}) + smm.headersCacheCleaner.Rotate() + + if runs >= gcRunsCount { + runs = 0 + smm.headersCacheCleaner.CleanEmptyGenerations() + smm.headersCacheCleaner.ReleaseBuckets() + } + }) + }) +} + +// checkDiskUsage calculates and reports the total disk space used by all +// skip mask files. Updates the diskUsage and stored metrics. +func (smm *SkipMaskManager) checkDiskUsage() { + du := int64(0) + + for _, sm := range smm.skipMasks { + des, err := os.ReadDir(sm.dirPath) + if err != nil { + logger.Error("skip mask manager: can't read skip mask's dir", + zap.String("skip mask", sm.String()), zap.Error(err)) + return + } + + for _, smde := range des { + if smde.IsDir() { + continue + } + info, err := smde.Info() + if err != nil { + logger.Error("skip mask manager: can't read skip mask file info", + zap.String("skip mask", sm.String()), zap.Error(err)) + return + } + du += info.Size() + } + } + + diskUsage.Set(float64(du)) + stored.Set(float64(len(smm.skipMasks))) +} + +func makeFileName(name, ext string) string { + return name + ext +} + +func fracNameFromFilePath(skipMaskFilePath string) string { + return strings.Split(path.Base(skipMaskFilePath), ".")[0] +} + +func hashFilePath(filePath string) uint32 { + hash := fnv.New32a() + hash.Write([]byte(skipMaskNameFromPath(filePath) + fracNameFromFilePath(filePath))) + return hash.Sum32() +} + +var marshalBufferPool util.BufferPool + +// writeSkipMask serializes the skip mask data and atomically writes it to disk. +// It removes the .queue file and creates the .skipmask file. +func writeSkipMask(df *SkipMaskBinIn, queueFilePath, doneFilePath string) error { + rawSkipMask := marshalBufferPool.Get() + defer marshalBufferPool.Put(rawSkipMask) + + rawSkipMask.B = marshalSkipMask(rawSkipMask.B, df) + util.MustWriteFileAtomic(doneFilePath, rawSkipMask.B, 0o666, tmpExt) + util.RemoveFile(queueFilePath) + + return nil +} + +// createDataDir ensures the data directory exists, creating it if necessary. +func (smm *SkipMaskManager) createDataDir() { + if err := os.MkdirAll(smm.config.DataDir, 0o777); err != nil { + panic(err) + } +} diff --git a/storeapi/grpc_v1_test.go b/storeapi/grpc_v1_test.go index ced60053..2dba2fc2 100644 --- a/storeapi/grpc_v1_test.go +++ b/storeapi/grpc_v1_test.go @@ -17,6 +17,7 @@ import ( "github.com/ozontech/seq-db/mappingprovider" "github.com/ozontech/seq-db/pkg/storeapi" "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/skipmaskmanager" "github.com/ozontech/seq-db/tests/common" ) @@ -67,11 +68,16 @@ func getTestGrpc(t *testing.T) (*GrpcV1, func(), func()) { dataDir := common.GetTestTmpDir(t) common.RecreateDir(dataDir) + mappingProvider, err := mappingprovider.New("", mappingprovider.WithMapping(seq.TestMapping)) + assert.NoError(t, err) + + skipMaskManager := skipmaskmanager.New(t.Context(), skipmaskmanager.Config{}, nil, mappingProvider) + fm, stop, err := fracmanager.New(t.Context(), &fracmanager.Config{ FracSize: 500, TotalSize: 5000, DataDir: dataDir, - }, nil) + }, nil, skipMaskManager) assert.NoError(t, err) config := APIConfig{ @@ -92,9 +98,6 @@ func getTestGrpc(t *testing.T) (*GrpcV1, func(), func()) { }, } - mappingProvider, err := mappingprovider.New("", mappingprovider.WithMapping(seq.TestMapping)) - assert.NoError(t, err) - g := NewGrpcV1(config, fm, mappingProvider) release := func() { diff --git a/storeapi/store.go b/storeapi/store.go index 857be4e2..5ea0c5a7 100644 --- a/storeapi/store.go +++ b/storeapi/store.go @@ -12,6 +12,7 @@ import ( "github.com/ozontech/seq-db/fracmanager" "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/metric" + "github.com/ozontech/seq-db/skipmaskmanager" "github.com/ozontech/seq-db/storage/s3" ) @@ -29,12 +30,15 @@ type Store struct { FracManager *fracmanager.FracManager fracManagerStop func() + SkipMaskManager *skipmaskmanager.SkipMaskManager + isStopped atomic.Bool } type StoreConfig struct { - API APIConfig - FracManager fracmanager.Config + API APIConfig + FracManager fracmanager.Config + SkipMaskManagerConfig skipmaskmanager.Config } func (c *StoreConfig) setDefaults() error { @@ -44,19 +48,32 @@ func (c *StoreConfig) setDefaults() error { if c.API.Search.Async.DataDir == "" { c.API.Search.Async.DataDir = path.Join(c.FracManager.DataDir, "async_searches") } + if c.SkipMaskManagerConfig.DataDir == "" { + c.SkipMaskManagerConfig.DataDir = path.Join(c.FracManager.DataDir, "skipmasks") + } return nil } -func NewStore(ctx context.Context, c StoreConfig, s3cli *s3.Client, mappingProvider MappingProvider) (*Store, error) { +func NewStore( + ctx context.Context, + c StoreConfig, + s3cli *s3.Client, + mappingProvider MappingProvider, + skipMaskParams []skipmaskmanager.SkipMaskParams, +) (*Store, error) { if err := c.setDefaults(); err != nil { return nil, err } - fracManager, stop, err := fracmanager.New(ctx, &c.FracManager, s3cli) + skipMaskManager := skipmaskmanager.New(ctx, c.SkipMaskManagerConfig, skipMaskParams, mappingProvider) + + fracManager, stop, err := fracmanager.New(ctx, &c.FracManager, s3cli, skipMaskManager) if err != nil { return nil, fmt.Errorf("loading fractions error: %w", err) } + skipMaskManager.Start(fracManager.Fractions()) + return &Store{ Config: c, // We will set grpcAddr later in Start() @@ -64,6 +81,7 @@ func NewStore(ctx context.Context, c StoreConfig, s3cli *s3.Client, mappingProvi grpcServer: newGRPCServer(c.API, fracManager, mappingProvider), FracManager: fracManager, fracManagerStop: stop, + SkipMaskManager: skipMaskManager, isStopped: atomic.Bool{}, }, nil } @@ -88,6 +106,7 @@ func (s *Store) Stop() { s.grpcServer.Stop(ctx) s.fracManagerStop() + s.SkipMaskManager.Stop() logger.Info("store stopped") } diff --git a/tests/integration_tests/integration_test.go b/tests/integration_tests/integration_test.go index cab874cf..d2abf818 100644 --- a/tests/integration_tests/integration_test.go +++ b/tests/integration_tests/integration_test.go @@ -31,6 +31,7 @@ import ( "github.com/ozontech/seq-db/pkg/storeapi" "github.com/ozontech/seq-db/proxy/search" "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/skipmaskmanager" "github.com/ozontech/seq-db/tests/common" "github.com/ozontech/seq-db/tests/setup" "github.com/ozontech/seq-db/tests/suites" @@ -1751,3 +1752,91 @@ func (s *IntegrationTestSuite) TestPaginationWithOffsetId() { r.Equal(totalDocs, len(fetchedDocs), "count of unique docs does not match") } } + +func (s *IntegrationTestSuite) TestSkipMaskManager() { + t := s.T() + r := require.New(t) + + cfg := *s.Config + env := setup.NewTestingEnv(&cfg) + + docs := []string{ + `{"service":"visible", "message":"doc1"}`, + `{"service":"hidden", "message":"doc2"}`, + `{"service":"visible", "message":"doc3"}`, + `{"service":"hidden", "message":"doc4"}`, + } + setup.Bulk(t, env.IngestorBulkAddr(), docs) + env.WaitIdle() + env.SealAll() + + // bulk docs one more time to have sealed and active fracs + setup.Bulk(t, env.IngestorBulkAddr(), docs) + + // save hidden doc ids to test fetch later + qpr, _, _, err := env.Search(`service:hidden`, 10, setup.WithTotal(true)) + r.NoError(err) + hiddenDocIDs := qpr.IDs.IDs() + + env.WaitIdle() + env.StopAll() + + cfg.SkipMaskParams = []skipmaskmanager.SkipMaskParams{ + { + Query: "service:hidden", + From: 0, + To: seq.TimeToMID(time.Now()), + }, + } + env = setup.NewTestingEnv(&cfg) + defer env.StopAll() + + var checkSkipMasksStatus = func(stores setup.Stores) bool { + for _, ss := range stores { + for _, s := range ss { + if !s.SkipMaskManager.IsDone() { + return false + } + } + } + return true + } + + // wait for skip masks processing + r.Eventually(func() bool { + return checkSkipMasksStatus(env.HotStores) && checkSkipMasksStatus(env.ColdStores) + }, 5*time.Second, 100*time.Millisecond) + + // test search + + qpr, _, _, err = env.Search(`service:hidden`, 10, setup.WithTotal(true)) + r.NoError(err) + r.Equal(uint64(0), qpr.Total) + + qpr, _, _, err = env.Search(`service:*`, 10, setup.WithTotal(true)) + r.NoError(err) + r.Equal(uint64(4), qpr.Total) + + // test fetch + + fetchedDocs, err := env.Fetch(hiddenDocIDs) + r.NoError(err) + r.Len(fetchedDocs, len(hiddenDocIDs)) + for _, doc := range fetchedDocs { + r.Len(doc, 0) // fetch hiddenID returns nothing + } + + // refresh frac + + env.WaitIdle() + env.SealAll() + + // wait for skip masks processing + r.Eventually(func() bool { + return checkSkipMasksStatus(env.HotStores) && checkSkipMasksStatus(env.ColdStores) + }, 5*time.Second, 100*time.Millisecond) + + qpr, _, _, err = env.Search(`service:hidden`, 10, setup.WithTotal(true)) + r.NoError(err) + r.Equal(uint64(0), qpr.Total) +} diff --git a/tests/setup/env.go b/tests/setup/env.go index fbc66018..a42ba006 100644 --- a/tests/setup/env.go +++ b/tests/setup/env.go @@ -32,6 +32,7 @@ import ( "github.com/ozontech/seq-db/proxy/stores" "github.com/ozontech/seq-db/proxyapi" "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/skipmaskmanager" seqs3 "github.com/ozontech/seq-db/storage/s3" "github.com/ozontech/seq-db/storeapi" testscommon "github.com/ozontech/seq-db/tests/common" @@ -48,6 +49,7 @@ type TestingEnvConfig struct { HotModeEnabled bool QueryRateLimit *float64 FracManagerConfig *fracmanager.Config + SkipMaskParams []skipmaskmanager.SkipMaskParams Mapping seq.Mapping IndexAllFields bool @@ -122,6 +124,9 @@ func (cfg *TestingEnvConfig) GetStoreConfig(replicaID string, cold bool) storeap LogThreshold: 0, }, }, + SkipMaskManagerConfig: skipmaskmanager.Config{ + DataDir: filepath.Join(cfg.DataDir, replicaID, "skipmasks"), + }, } } @@ -275,7 +280,7 @@ func (cfg *TestingEnvConfig) MakeStores( logger.Fatal("can't create mapping", zap.Error(err)) } - store, err := storeapi.NewStore(context.Background(), confs[i], s3cli, mappingProvider) + store, err := storeapi.NewStore(context.Background(), confs[i], s3cli, mappingProvider, cfg.SkipMaskParams) if err != nil { panic(err) }