Skip to content

Commit 03bafb4

Browse files
committed
vendor and patch containerd/pkg/shim for Windows
The existing runhcs shim management logic is tightly coupled, making it difficult to reuse for new shim implementations. Additionally, aligning with upstream's move toward BootstrapParams requires additional effort in existing logic. To maintain compatibility and reduce technical debt, this commit vendors the containerd/pkg/shim implementation. Since Windows requires specific considerations not yet present upstream, we are applying local patches while those changes are prepared for upstream contribution. Source: https://github.com/containerd/containerd/tree/main/pkg/shim Signed-off-by: Harsh Rawat <harshrawat@microsoft.com>
1 parent 6dea86c commit 03bafb4

9 files changed

Lines changed: 1949 additions & 2 deletions

File tree

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ require (
3636
github.com/containerd/errdefs v1.0.0
3737
github.com/containerd/errdefs/pkg v0.3.0
3838
github.com/containerd/go-runc v1.1.0
39+
github.com/containerd/log v0.1.0
3940
github.com/containerd/platforms v1.0.0-rc.1
41+
github.com/containerd/plugin v1.0.0
4042
github.com/containerd/ttrpc v1.2.7
4143
github.com/containerd/typeurl/v2 v2.2.3
4244
github.com/google/go-cmp v0.7.0
@@ -76,8 +78,6 @@ require (
7678
github.com/checkpoint-restore/go-criu/v6 v6.3.0 // indirect
7779
github.com/containerd/continuity v0.4.5 // indirect
7880
github.com/containerd/fifo v1.1.0 // indirect
79-
github.com/containerd/log v0.1.0 // indirect
80-
github.com/containerd/plugin v1.0.0 // indirect
8181
github.com/containerd/protobuild v0.3.0 // indirect
8282
github.com/containerd/stargz-snapshotter/estargz v0.15.1 // indirect
8383
github.com/coreos/go-systemd/v22 v22.5.0 // indirect

pkg/shim/publisher.go

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
//go:build windows
2+
3+
/*
4+
Copyright The containerd Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package shim
20+
21+
import (
22+
"context"
23+
"sync"
24+
"time"
25+
26+
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
27+
"github.com/containerd/containerd/api/types"
28+
"github.com/containerd/containerd/v2/core/events"
29+
"github.com/containerd/containerd/v2/pkg/namespaces"
30+
"github.com/containerd/containerd/v2/pkg/protobuf"
31+
"github.com/containerd/containerd/v2/pkg/ttrpcutil"
32+
"github.com/containerd/log"
33+
"github.com/containerd/ttrpc"
34+
"github.com/containerd/typeurl/v2"
35+
)
36+
37+
const (
38+
queueSize = 2048
39+
maxRequeue = 5
40+
)
41+
42+
type item struct {
43+
ev *types.Envelope
44+
ctx context.Context
45+
count int
46+
}
47+
48+
type publisherConfig struct {
49+
ttrpcOpts []ttrpc.ClientOpts
50+
}
51+
52+
type PublisherOpts func(*publisherConfig)
53+
54+
func WithPublishTTRPCOpts(opts ...ttrpc.ClientOpts) PublisherOpts {
55+
return func(cfg *publisherConfig) {
56+
cfg.ttrpcOpts = append(cfg.ttrpcOpts, opts...)
57+
}
58+
}
59+
60+
// NewPublisher creates a new remote events publisher
61+
func NewPublisher(address string, opts ...PublisherOpts) (*RemoteEventsPublisher, error) {
62+
client, err := ttrpcutil.NewClient(address)
63+
if err != nil {
64+
return nil, err
65+
}
66+
67+
l := &RemoteEventsPublisher{
68+
client: client,
69+
closed: make(chan struct{}),
70+
requeue: make(chan *item, queueSize),
71+
}
72+
73+
go l.processQueue()
74+
return l, nil
75+
}
76+
77+
// RemoteEventsPublisher forwards events to a ttrpc server
78+
type RemoteEventsPublisher struct {
79+
client *ttrpcutil.Client
80+
closed chan struct{}
81+
closer sync.Once
82+
requeue chan *item
83+
}
84+
85+
// Done returns a channel which closes when done
86+
func (l *RemoteEventsPublisher) Done() <-chan struct{} {
87+
return l.closed
88+
}
89+
90+
// Close closes the remote connection and closes the done channel
91+
func (l *RemoteEventsPublisher) Close() (err error) {
92+
err = l.client.Close()
93+
l.closer.Do(func() {
94+
close(l.closed)
95+
})
96+
return err
97+
}
98+
99+
func (l *RemoteEventsPublisher) processQueue() {
100+
for i := range l.requeue {
101+
if i.count > maxRequeue {
102+
log.L.Errorf("evicting %s from queue because of retry count", i.ev.Topic)
103+
// drop the event
104+
continue
105+
}
106+
107+
if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil {
108+
log.L.WithError(err).Error("forward event")
109+
l.queue(i)
110+
}
111+
}
112+
}
113+
114+
func (l *RemoteEventsPublisher) queue(i *item) {
115+
go func() {
116+
i.count++
117+
// re-queue after a short delay
118+
time.Sleep(time.Duration(1*i.count) * time.Second)
119+
l.requeue <- i
120+
}()
121+
}
122+
123+
// Publish publishes the event by forwarding it to the configured ttrpc server
124+
func (l *RemoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
125+
ns, err := namespaces.NamespaceRequired(ctx)
126+
if err != nil {
127+
return err
128+
}
129+
evt, err := typeurl.MarshalAnyToProto(event)
130+
if err != nil {
131+
return err
132+
}
133+
i := &item{
134+
ev: &types.Envelope{
135+
Timestamp: protobuf.ToTimestamp(time.Now()),
136+
Namespace: ns,
137+
Topic: topic,
138+
Event: evt,
139+
},
140+
ctx: ctx,
141+
}
142+
143+
if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil {
144+
l.queue(i)
145+
return err
146+
}
147+
148+
return nil
149+
}
150+
151+
func (l *RemoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error {
152+
service, err := l.client.EventsService()
153+
if err == nil {
154+
fCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
155+
_, err = service.Forward(fCtx, req)
156+
cancel()
157+
if err == nil {
158+
return nil
159+
}
160+
}
161+
162+
if err != ttrpc.ErrClosed {
163+
return err
164+
}
165+
166+
// Reconnect and retry request
167+
if err = l.client.Reconnect(); err != nil {
168+
return err
169+
}
170+
171+
service, err = l.client.EventsService()
172+
if err != nil {
173+
return err
174+
}
175+
176+
// try again with a fresh context, otherwise we may get a context timeout unexpectedly.
177+
fCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
178+
_, err = service.Forward(fCtx, req)
179+
cancel()
180+
if err != nil {
181+
return err
182+
}
183+
184+
return nil
185+
}

0 commit comments

Comments
 (0)