hugo/cache/dynacache/dynacache.go
2024-04-11 09:31:33 +02:00

584 lines
14 KiB
Go

// Copyright 2024 The Hugo Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dynacache
import (
"context"
"fmt"
"math"
"path"
"regexp"
"runtime"
"sync"
"time"
"github.com/bep/lazycache"
"github.com/bep/logg"
"github.com/gohugoio/hugo/common/collections"
"github.com/gohugoio/hugo/common/herrors"
"github.com/gohugoio/hugo/common/loggers"
"github.com/gohugoio/hugo/common/paths"
"github.com/gohugoio/hugo/common/rungroup"
"github.com/gohugoio/hugo/config"
"github.com/gohugoio/hugo/helpers"
"github.com/gohugoio/hugo/identity"
"github.com/gohugoio/hugo/resources/resource"
)
const minMaxSize = 10
// New creates a new cache.
func New(opts Options) *Cache {
if opts.CheckInterval == 0 {
opts.CheckInterval = time.Second * 2
}
if opts.MaxSize == 0 {
opts.MaxSize = 100000
}
if opts.Log == nil {
panic("nil Log")
}
if opts.MinMaxSize == 0 {
opts.MinMaxSize = 30
}
stats := &stats{
opts: opts,
adjustmentFactor: 1.0,
currentMaxSize: opts.MaxSize,
availableMemory: config.GetMemoryLimit(),
}
infol := opts.Log.InfoCommand("dynacache")
evictedIdentities := collections.NewStack[identity.Identity]()
onEvict := func(k, v any) {
if !opts.Running {
return
}
identity.WalkIdentitiesShallow(v, func(level int, id identity.Identity) bool {
evictedIdentities.Push(id)
return false
})
resource.MarkStale(v)
}
c := &Cache{
partitions: make(map[string]PartitionManager),
onEvict: onEvict,
evictedIdentities: evictedIdentities,
opts: opts,
stats: stats,
infol: infol,
}
c.stop = c.start()
return c
}
// Options for the cache.
type Options struct {
Log loggers.Logger
CheckInterval time.Duration
MaxSize int
MinMaxSize int
Running bool
}
// Options for a partition.
type OptionsPartition struct {
// When to clear the this partition.
ClearWhen ClearWhen
// Weight is a number between 1 and 100 that indicates how, in general, how big this partition may get.
Weight int
}
func (o OptionsPartition) WeightFraction() float64 {
return float64(o.Weight) / 100
}
func (o OptionsPartition) CalculateMaxSize(maxSizePerPartition int) int {
return int(math.Floor(float64(maxSizePerPartition) * o.WeightFraction()))
}
// A dynamic partitioned cache.
type Cache struct {
mu sync.RWMutex
partitions map[string]PartitionManager
onEvict func(k, v any)
evictedIdentities *collections.Stack[identity.Identity]
opts Options
infol logg.LevelLogger
stats *stats
stopOnce sync.Once
stop func()
}
// DrainEvictedIdentities drains the evicted identities from the cache.
func (c *Cache) DrainEvictedIdentities() []identity.Identity {
return c.evictedIdentities.Drain()
}
// ClearMatching clears all partition for which the predicate returns true.
func (c *Cache) ClearMatching(predicate func(k, v any) bool) {
g := rungroup.Run[PartitionManager](context.Background(), rungroup.Config[PartitionManager]{
NumWorkers: len(c.partitions),
Handle: func(ctx context.Context, partition PartitionManager) error {
partition.clearMatching(predicate)
return nil
},
})
for _, p := range c.partitions {
g.Enqueue(p)
}
g.Wait()
}
// ClearOnRebuild prepares the cache for a new rebuild taking the given changeset into account.
func (c *Cache) ClearOnRebuild(changeset ...identity.Identity) {
g := rungroup.Run[PartitionManager](context.Background(), rungroup.Config[PartitionManager]{
NumWorkers: len(c.partitions),
Handle: func(ctx context.Context, partition PartitionManager) error {
partition.clearOnRebuild(changeset...)
return nil
},
})
for _, p := range c.partitions {
g.Enqueue(p)
}
g.Wait()
// Clear any entries marked as stale above.
g = rungroup.Run[PartitionManager](context.Background(), rungroup.Config[PartitionManager]{
NumWorkers: len(c.partitions),
Handle: func(ctx context.Context, partition PartitionManager) error {
partition.clearStale()
return nil
},
})
for _, p := range c.partitions {
g.Enqueue(p)
}
g.Wait()
}
type keysProvider interface {
Keys() []string
}
// Keys returns a list of keys in all partitions.
func (c *Cache) Keys(predicate func(s string) bool) []string {
if predicate == nil {
predicate = func(s string) bool { return true }
}
var keys []string
for pn, g := range c.partitions {
pkeys := g.(keysProvider).Keys()
for _, k := range pkeys {
p := path.Join(pn, k)
if predicate(p) {
keys = append(keys, p)
}
}
}
return keys
}
func calculateMaxSizePerPartition(maxItemsTotal, totalWeightQuantity, numPartitions int) int {
if numPartitions == 0 {
panic("numPartitions must be > 0")
}
if totalWeightQuantity == 0 {
panic("totalWeightQuantity must be > 0")
}
avgWeight := float64(totalWeightQuantity) / float64(numPartitions)
return int(math.Floor(float64(maxItemsTotal) / float64(numPartitions) * (100.0 / avgWeight)))
}
// Stop stops the cache.
func (c *Cache) Stop() {
c.stopOnce.Do(func() {
c.stop()
})
}
func (c *Cache) adjustCurrentMaxSize() {
c.mu.RLock()
defer c.mu.RUnlock()
if len(c.partitions) == 0 {
return
}
var m runtime.MemStats
runtime.ReadMemStats(&m)
s := c.stats
s.memstatsCurrent = m
// fmt.Printf("\n\nAvailable = %v\nAlloc = %v\nTotalAlloc = %v\nSys = %v\nNumGC = %v\nMaxSize = %d\nAdjustmentFactor=%f\n\n", helpers.FormatByteCount(s.availableMemory), helpers.FormatByteCount(m.Alloc), helpers.FormatByteCount(m.TotalAlloc), helpers.FormatByteCount(m.Sys), m.NumGC, c.stats.currentMaxSize, s.adjustmentFactor)
if s.availableMemory >= s.memstatsCurrent.Alloc {
if s.adjustmentFactor <= 1.0 {
s.adjustmentFactor += 0.2
}
} else {
// We're low on memory.
s.adjustmentFactor -= 0.4
}
if s.adjustmentFactor <= 0 {
s.adjustmentFactor = 0.05
}
if !s.adjustCurrentMaxSize() {
return
}
totalWeight := 0
for _, pm := range c.partitions {
totalWeight += pm.getOptions().Weight
}
maxSizePerPartition := calculateMaxSizePerPartition(c.stats.currentMaxSize, totalWeight, len(c.partitions))
evicted := 0
for _, p := range c.partitions {
evicted += p.adjustMaxSize(p.getOptions().CalculateMaxSize(maxSizePerPartition))
}
if evicted > 0 {
c.infol.
WithFields(
logg.Fields{
{Name: "evicted", Value: evicted},
{Name: "numGC", Value: m.NumGC},
{Name: "limit", Value: helpers.FormatByteCount(c.stats.availableMemory)},
{Name: "alloc", Value: helpers.FormatByteCount(m.Alloc)},
{Name: "totalAlloc", Value: helpers.FormatByteCount(m.TotalAlloc)},
},
).Logf("adjusted partitions' max size")
}
}
func (c *Cache) start() func() {
ticker := time.NewTicker(c.opts.CheckInterval)
quit := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
c.adjustCurrentMaxSize()
// Reset the ticker to avoid drift.
ticker.Reset(c.opts.CheckInterval)
case <-quit:
ticker.Stop()
return
}
}
}()
return func() {
close(quit)
}
}
var partitionNameRe = regexp.MustCompile(`^\/[a-zA-Z0-9]{4}(\/[a-zA-Z0-9]+)?(\/[a-zA-Z0-9]+)?`)
// GetOrCreatePartition gets or creates a partition with the given name.
func GetOrCreatePartition[K comparable, V any](c *Cache, name string, opts OptionsPartition) *Partition[K, V] {
if c == nil {
panic("nil Cache")
}
if opts.Weight < 1 || opts.Weight > 100 {
panic("invalid Weight, must be between 1 and 100")
}
if partitionNameRe.FindString(name) != name {
panic(fmt.Sprintf("invalid partition name %q", name))
}
c.mu.RLock()
p, found := c.partitions[name]
c.mu.RUnlock()
if found {
return p.(*Partition[K, V])
}
c.mu.Lock()
defer c.mu.Unlock()
// Double check.
p, found = c.partitions[name]
if found {
return p.(*Partition[K, V])
}
// At this point, we don't know the number of partitions or their configuration, but
// this will be re-adjusted later.
const numberOfPartitionsEstimate = 10
maxSize := opts.CalculateMaxSize(c.opts.MaxSize / numberOfPartitionsEstimate)
onEvict := func(k K, v V) {
c.onEvict(k, v)
}
// Create a new partition and cache it.
partition := &Partition[K, V]{
c: lazycache.New(lazycache.Options[K, V]{MaxEntries: maxSize, OnEvict: onEvict}),
maxSize: maxSize,
trace: c.opts.Log.Logger().WithLevel(logg.LevelTrace).WithField("partition", name),
opts: opts,
}
c.partitions[name] = partition
return partition
}
// Partition is a partition in the cache.
type Partition[K comparable, V any] struct {
c *lazycache.Cache[K, V]
zero V
trace logg.LevelLogger
opts OptionsPartition
maxSize int
}
// GetOrCreate gets or creates a value for the given key.
func (p *Partition[K, V]) GetOrCreate(key K, create func(key K) (V, error)) (V, error) {
v, _, err := p.c.GetOrCreate(key, create)
return v, err
}
// GetOrCreateWitTimeout gets or creates a value for the given key and times out if the create function
// takes too long.
func (p *Partition[K, V]) GetOrCreateWitTimeout(key K, duration time.Duration, create func(key K) (V, error)) (V, error) {
resultch := make(chan V, 1)
errch := make(chan error, 1)
go func() {
v, _, err := p.c.GetOrCreate(key, create)
if err != nil {
errch <- err
return
}
resultch <- v
}()
select {
case v := <-resultch:
return v, nil
case err := <-errch:
return p.zero, err
case <-time.After(duration):
return p.zero, &herrors.TimeoutError{
Duration: duration,
}
}
}
func (p *Partition[K, V]) clearMatching(predicate func(k, v any) bool) {
p.c.DeleteFunc(func(key K, v V) bool {
if predicate(key, v) {
p.trace.Log(
logg.StringFunc(
func() string {
return fmt.Sprintf("clearing cache key %v", key)
},
),
)
return true
}
return false
})
}
func (p *Partition[K, V]) clearOnRebuild(changeset ...identity.Identity) {
opts := p.getOptions()
if opts.ClearWhen == ClearNever {
return
}
if opts.ClearWhen == ClearOnRebuild {
// Clear all.
p.Clear()
return
}
depsFinder := identity.NewFinder(identity.FinderConfig{})
shouldDelete := func(key K, v V) bool {
// We always clear elements marked as stale.
if resource.IsStaleAny(v) {
return true
}
// Now check if this entry has changed based on the changeset
// based on filesystem events.
if len(changeset) == 0 {
// Nothing changed.
return false
}
var probablyDependent bool
identity.WalkIdentitiesShallow(v, func(level int, id2 identity.Identity) bool {
for _, id := range changeset {
if r := depsFinder.Contains(id, id2, -1); r > 0 {
// It's probably dependent, evict from cache.
probablyDependent = true
return true
}
}
return false
})
return probablyDependent
}
// First pass.
// Second pass needs to be done in a separate loop to catch any
// elements marked as stale in the other partitions.
p.c.DeleteFunc(func(key K, v V) bool {
if shouldDelete(key, v) {
p.trace.Log(
logg.StringFunc(
func() string {
return fmt.Sprintf("first pass: clearing cache key %v", key)
},
),
)
return true
}
return false
})
}
func (p *Partition[K, V]) Keys() []K {
var keys []K
p.c.DeleteFunc(func(key K, v V) bool {
keys = append(keys, key)
return false
})
return keys
}
func (p *Partition[K, V]) clearStale() {
p.c.DeleteFunc(func(key K, v V) bool {
isStale := resource.IsStaleAny(v)
if isStale {
p.trace.Log(
logg.StringFunc(
func() string {
return fmt.Sprintf("second pass: clearing cache key %v", key)
},
),
)
}
return isStale
})
}
// adjustMaxSize adjusts the max size of the and returns the number of items evicted.
func (p *Partition[K, V]) adjustMaxSize(newMaxSize int) int {
if newMaxSize < minMaxSize {
newMaxSize = minMaxSize
}
oldMaxSize := p.maxSize
if newMaxSize == oldMaxSize {
return 0
}
p.maxSize = newMaxSize
// fmt.Println("Adjusting max size of partition from", oldMaxSize, "to", newMaxSize)
return p.c.Resize(newMaxSize)
}
func (p *Partition[K, V]) getMaxSize() int {
return p.maxSize
}
func (p *Partition[K, V]) getOptions() OptionsPartition {
return p.opts
}
func (p *Partition[K, V]) Clear() {
p.c.DeleteFunc(func(key K, v V) bool {
return true
})
}
func (p *Partition[K, V]) Get(ctx context.Context, key K) (V, bool) {
return p.c.Get(key)
}
type PartitionManager interface {
adjustMaxSize(addend int) int
getMaxSize() int
getOptions() OptionsPartition
clearOnRebuild(changeset ...identity.Identity)
clearMatching(predicate func(k, v any) bool)
clearStale()
}
const (
ClearOnRebuild ClearWhen = iota + 1
ClearOnChange
ClearNever
)
type ClearWhen int
type stats struct {
opts Options
memstatsCurrent runtime.MemStats
currentMaxSize int
availableMemory uint64
adjustmentFactor float64
}
func (s *stats) adjustCurrentMaxSize() bool {
newCurrentMaxSize := int(math.Floor(float64(s.opts.MaxSize) * s.adjustmentFactor))
if newCurrentMaxSize < s.opts.MinMaxSize {
newCurrentMaxSize = int(s.opts.MinMaxSize)
}
changed := newCurrentMaxSize != s.currentMaxSize
s.currentMaxSize = newCurrentMaxSize
return changed
}
// CleanKey turns s into a format suitable for a cache key for this package.
// The key will be a Unix-styled path with a leading slash but no trailing slash.
func CleanKey(s string) string {
return path.Clean(paths.ToSlashPreserveLeading(s))
}