Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ require (
github.com/containerd/errdefs v1.0.0
github.com/containerd/errdefs/pkg v0.3.0
github.com/containerd/go-runc v1.1.0
github.com/containerd/log v0.1.0
github.com/containerd/platforms v1.0.0-rc.1
github.com/containerd/plugin v1.0.0
github.com/containerd/ttrpc v1.2.7
github.com/containerd/typeurl/v2 v2.2.3
github.com/google/go-cmp v0.7.0
Expand Down Expand Up @@ -76,8 +78,6 @@ require (
github.com/checkpoint-restore/go-criu/v6 v6.3.0 // indirect
github.com/containerd/continuity v0.4.5 // indirect
github.com/containerd/fifo v1.1.0 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/containerd/plugin v1.0.0 // indirect
github.com/containerd/protobuild v0.3.0 // indirect
github.com/containerd/stargz-snapshotter/estargz v0.15.1 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
Expand Down
186 changes: 186 additions & 0 deletions pkg/shim/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
//go:build windows

/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package shim

import (
"context"
"errors"
"sync"
"time"

v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/v2/core/events"
"github.com/containerd/containerd/v2/pkg/namespaces"
"github.com/containerd/containerd/v2/pkg/protobuf"
"github.com/containerd/containerd/v2/pkg/ttrpcutil"
"github.com/containerd/log"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl/v2"
)

const (
queueSize = 2048
maxRequeue = 5
)

type item struct {
ev *types.Envelope
ctx context.Context
count int
}

type publisherConfig struct {
ttrpcOpts []ttrpc.ClientOpts
}

type PublisherOpts func(*publisherConfig)

func WithPublishTTRPCOpts(opts ...ttrpc.ClientOpts) PublisherOpts {
return func(cfg *publisherConfig) {
cfg.ttrpcOpts = append(cfg.ttrpcOpts, opts...)
}
}

// NewPublisher creates a new remote events publisher
func NewPublisher(address string, opts ...PublisherOpts) (*RemoteEventsPublisher, error) {
client, err := ttrpcutil.NewClient(address)
if err != nil {
return nil, err
}

l := &RemoteEventsPublisher{
client: client,
closed: make(chan struct{}),
requeue: make(chan *item, queueSize),
}

go l.processQueue()
return l, nil
}

// RemoteEventsPublisher forwards events to a ttrpc server
type RemoteEventsPublisher struct {
client *ttrpcutil.Client
closed chan struct{}
closer sync.Once
requeue chan *item
}

// Done returns a channel which closes when done
func (l *RemoteEventsPublisher) Done() <-chan struct{} {
return l.closed
}

// Close closes the remote connection and closes the done channel
func (l *RemoteEventsPublisher) Close() (err error) {
err = l.client.Close()
l.closer.Do(func() {
close(l.closed)
})
return err
}

func (l *RemoteEventsPublisher) processQueue() {
for i := range l.requeue {
if i.count > maxRequeue {
log.L.Errorf("evicting %s from queue because of retry count", i.ev.Topic)
// drop the event
continue
}

if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil {
log.L.WithError(err).Error("forward event")
l.queue(i)
}
}
}

func (l *RemoteEventsPublisher) queue(i *item) {
go func() {
i.count++
// re-queue after a short delay
time.Sleep(time.Duration(1*i.count) * time.Second)
l.requeue <- i
}()
}

// Publish publishes the event by forwarding it to the configured ttrpc server
func (l *RemoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
evt, err := typeurl.MarshalAnyToProto(event)
if err != nil {
return err
}
i := &item{
ev: &types.Envelope{
Timestamp: protobuf.ToTimestamp(time.Now()),
Namespace: ns,
Topic: topic,
Event: evt,
},
ctx: ctx,
}

if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil {
l.queue(i)
return err
}

return nil
}

func (l *RemoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error {
service, err := l.client.EventsService()
if err == nil {
fCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
_, err = service.Forward(fCtx, req)
cancel()
if err == nil {
return nil
}
}

if !errors.Is(err, ttrpc.ErrClosed) {
return err
}

// Reconnect and retry request
if err = l.client.Reconnect(); err != nil {
return err
}

service, err = l.client.EventsService()
if err != nil {
return err
}

// try again with a fresh context, otherwise we may get a context timeout unexpectedly.
fCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
_, err = service.Forward(fCtx, req)
cancel()
if err != nil {
return err
}

return nil
}
Loading