Skip to content

Commit b9ee88f

Browse files
authored
Add namespace-level inactive topic policies commands (#2004)
1 parent ae25e7d commit b9ee88f

5 files changed

Lines changed: 334 additions & 0 deletions

File tree

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package namespace
19+
20+
import (
21+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
22+
23+
"github.com/streamnative/pulsarctl/pkg/cmdutils"
24+
)
25+
26+
func GetInactiveTopicCmd(vc *cmdutils.VerbCmd) {
27+
var desc cmdutils.LongDescription
28+
desc.CommandUsedFor = "Get the inactive topic policies on a namespace"
29+
desc.CommandPermission = "This command requires tenant admin permissions."
30+
31+
var examples []cmdutils.Example
32+
examples = append(examples, cmdutils.Example{
33+
Desc: desc.CommandUsedFor,
34+
Command: "pulsarctl namespaces get-inactive-topic-policies tenant/namespace",
35+
})
36+
desc.CommandExamples = examples
37+
38+
var out []cmdutils.Output
39+
out = append(out, NsErrors...)
40+
desc.CommandOutput = out
41+
42+
vc.SetDescription(
43+
"get-inactive-topic-policies",
44+
desc.CommandUsedFor,
45+
desc.ToString(),
46+
desc.ExampleToString())
47+
48+
vc.EnableOutputFlagSet()
49+
50+
vc.SetRunFuncWithNameArg(func() error {
51+
return doGetInactiveTopicPolicies(vc)
52+
}, "the namespace name is not specified or the namespace name is specified more than one")
53+
}
54+
55+
func doGetInactiveTopicPolicies(vc *cmdutils.VerbCmd) error {
56+
ns, err := utils.GetNamespaceName(vc.NameArg)
57+
if err != nil {
58+
return err
59+
}
60+
61+
admin := cmdutils.NewPulsarClient()
62+
response, err := admin.Namespaces().GetInactiveTopicPolicies(*ns)
63+
if err == nil {
64+
cmdutils.PrintJSON(vc.Command.OutOrStdout(), &response)
65+
}
66+
67+
return err
68+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package namespace
19+
20+
import (
21+
"encoding/json"
22+
"fmt"
23+
"testing"
24+
25+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
26+
"github.com/onsi/gomega"
27+
28+
"github.com/streamnative/pulsarctl/pkg/test"
29+
)
30+
31+
func TestInactiveTopicCmd(t *testing.T) {
32+
g := gomega.NewWithT(t)
33+
34+
nsName := fmt.Sprintf("public/test-inactive-topic-ns-%s", test.RandomSuffix())
35+
createArgs := []string{"create", nsName}
36+
_, execErr, _, _ := TestNamespaceCommands(createNs, createArgs)
37+
g.Expect(execErr).Should(gomega.BeNil())
38+
39+
setArgs := []string{"set-inactive-topic-policies", nsName,
40+
"-e=true",
41+
"-t", "1h",
42+
"-m", "delete_when_no_subscriptions"}
43+
out, execErr, _, _ := TestNamespaceCommands(SetInactiveTopicCmd, setArgs)
44+
g.Expect(execErr).Should(gomega.BeNil())
45+
g.Expect(out.String()).Should(gomega.Equal(fmt.Sprintf("Set inactive topic policies successfully for [%s]",
46+
nsName)))
47+
48+
getArgs := []string{"get-inactive-topic-policies", nsName}
49+
g.Eventually(func(g gomega.Gomega) {
50+
out, execErr, _, _ = TestNamespaceCommands(GetInactiveTopicCmd, getArgs)
51+
g.Expect(execErr).Should(gomega.BeNil())
52+
var inactiveTopic utils.InactiveTopicPolicies
53+
err := json.Unmarshal(out.Bytes(), &inactiveTopic)
54+
g.Expect(err).Should(gomega.BeNil())
55+
g.Expect(inactiveTopic.DeleteWhileInactive).Should(gomega.Equal(true))
56+
g.Expect(inactiveTopic.MaxInactiveDurationSeconds).Should(gomega.Equal(3600))
57+
g.Expect(inactiveTopic.InactiveTopicDeleteMode.String()).Should(gomega.Equal("delete_when_no_subscriptions"))
58+
}).Should(gomega.Succeed())
59+
60+
removeArgs := []string{"remove-inactive-topic-policies", nsName}
61+
out, execErr, _, _ = TestNamespaceCommands(RemoveInactiveTopicCmd, removeArgs)
62+
g.Expect(execErr).Should(gomega.BeNil())
63+
g.Expect(out.String()).Should(gomega.Equal(fmt.Sprintf("Remove inactive topic policies successfully from [%s]",
64+
nsName)))
65+
66+
g.Eventually(func(g gomega.Gomega) {
67+
out, execErr, _, _ = TestNamespaceCommands(GetInactiveTopicCmd, getArgs)
68+
g.Expect(execErr).Should(gomega.BeNil())
69+
var inactiveTopic utils.InactiveTopicPolicies
70+
err := json.Unmarshal(out.Bytes(), &inactiveTopic)
71+
g.Expect(err).Should(gomega.BeNil())
72+
g.Expect(inactiveTopic.DeleteWhileInactive).Should(gomega.Equal(false))
73+
g.Expect(inactiveTopic.MaxInactiveDurationSeconds).Should(gomega.Equal(0))
74+
g.Expect(inactiveTopic.InactiveTopicDeleteMode).Should(gomega.BeNil())
75+
}).Should(gomega.Succeed())
76+
}

pkg/ctl/namespace/namespace.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,5 +94,8 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
9494
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, SetPublishRateCmd)
9595
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, SetIsAllowAutoUpdateSchemaCmd)
9696
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, GetIsAllowAutoUpdateSchemaCmd)
97+
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, GetInactiveTopicCmd)
98+
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, SetInactiveTopicCmd)
99+
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, RemoveInactiveTopicCmd)
97100
return resourceCmd
98101
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package namespace
19+
20+
import (
21+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
22+
23+
"github.com/streamnative/pulsarctl/pkg/cmdutils"
24+
)
25+
26+
func RemoveInactiveTopicCmd(vc *cmdutils.VerbCmd) {
27+
var desc cmdutils.LongDescription
28+
desc.CommandUsedFor = "Remove inactive topic policies from a namespace"
29+
desc.CommandPermission = "This command requires tenant admin permissions."
30+
31+
var examples []cmdutils.Example
32+
examples = append(examples, cmdutils.Example{
33+
Desc: desc.CommandUsedFor,
34+
Command: "pulsarctl namespaces remove-inactive-topic-policies tenant/namespace",
35+
})
36+
desc.CommandExamples = examples
37+
38+
var out []cmdutils.Output
39+
out = append(out, NsErrors...)
40+
desc.CommandOutput = out
41+
42+
vc.SetDescription(
43+
"remove-inactive-topic-policies",
44+
desc.CommandUsedFor,
45+
desc.ToString(),
46+
desc.ExampleToString())
47+
48+
vc.SetRunFuncWithNameArg(func() error {
49+
return doRemoveInactiveTopicPolicies(vc)
50+
}, "the namespace name is not specified or the namespace name is specified more than one")
51+
}
52+
53+
func doRemoveInactiveTopicPolicies(vc *cmdutils.VerbCmd) error {
54+
ns, err := utils.GetNamespaceName(vc.NameArg)
55+
if err != nil {
56+
return err
57+
}
58+
59+
admin := cmdutils.NewPulsarClient()
60+
err = admin.Namespaces().RemoveInactiveTopicPolicies(*ns)
61+
if err == nil {
62+
vc.Command.Printf("Remove inactive topic policies successfully from [%s]", ns.String())
63+
}
64+
return err
65+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package namespace
19+
20+
import (
21+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
22+
"github.com/spf13/cobra"
23+
"github.com/spf13/pflag"
24+
25+
"github.com/streamnative/pulsarctl/pkg/cmdutils"
26+
ctlutils "github.com/streamnative/pulsarctl/pkg/ctl/utils"
27+
)
28+
29+
type setInactiveTopicPoliciesArgs struct {
30+
deleteWhileInactive bool
31+
deleteInactiveTopicsMaxInactiveDuration string
32+
inactiveTopicDeleteMode string
33+
}
34+
35+
func SetInactiveTopicCmd(vc *cmdutils.VerbCmd) {
36+
var desc cmdutils.LongDescription
37+
desc.CommandUsedFor = "Set the inactive topic policies on a namespace"
38+
desc.CommandPermission = "This command requires tenant admin permissions."
39+
40+
var examples []cmdutils.Example
41+
set := cmdutils.Example{
42+
Desc: desc.CommandUsedFor,
43+
Command: "pulsarctl namespaces set-inactive-topic-policies tenant/namespace \n" +
44+
"\t--enable-delete-while-inactive true \n" +
45+
"\t--max-inactive-duration 1h \n" +
46+
"\t--delete-mode delete_when_no_subscriptions",
47+
}
48+
49+
examples = append(examples, set)
50+
desc.CommandExamples = examples
51+
52+
var out []cmdutils.Output
53+
out = append(out, NsErrors...)
54+
desc.CommandOutput = out
55+
56+
vc.SetDescription(
57+
"set-inactive-topic-policies",
58+
desc.CommandUsedFor,
59+
desc.ToString(),
60+
desc.ExampleToString())
61+
62+
args := setInactiveTopicPoliciesArgs{}
63+
64+
vc.FlagSetGroup.InFlagSet("Set Inactive Topic", func(flagSet *pflag.FlagSet) {
65+
flagSet.BoolVarP(&args.deleteWhileInactive,
66+
"enable-delete-while-inactive",
67+
"e",
68+
false,
69+
"Control whether deletion is enabled while inactive")
70+
71+
flagSet.StringVarP(&args.deleteInactiveTopicsMaxInactiveDuration,
72+
"max-inactive-duration",
73+
"t",
74+
"",
75+
"Max duration of topic inactivity in seconds, "+
76+
"topics that are inactive for longer than this value will be deleted (eg: 1s, 10s, 1m, 5h, 3d)")
77+
flagSet.StringVarP(&args.inactiveTopicDeleteMode,
78+
"delete-mode",
79+
"m",
80+
"",
81+
"Mode of delete inactive topic, "+
82+
"Valid options are: [delete_when_no_subscriptions, delete_when_subscriptions_caught_up]")
83+
84+
_ = cobra.MarkFlagRequired(flagSet, "delete-mode")
85+
_ = cobra.MarkFlagRequired(flagSet, "max-inactive-duration")
86+
})
87+
vc.EnableOutputFlagSet()
88+
89+
vc.SetRunFuncWithNameArg(func() error {
90+
return doSetInactiveTopicPolicies(vc, args)
91+
}, "the namespace name is not specified or the namespace name is specified more than one")
92+
}
93+
94+
func doSetInactiveTopicPolicies(vc *cmdutils.VerbCmd, args setInactiveTopicPoliciesArgs) error {
95+
inactiveTopicDeleteMode, err := utils.ParseInactiveTopicDeleteMode(args.inactiveTopicDeleteMode)
96+
if err != nil {
97+
return err
98+
}
99+
100+
maxInactiveDuration, err := ctlutils.ParseRelativeTimeInSeconds(args.deleteInactiveTopicsMaxInactiveDuration)
101+
if err != nil {
102+
return err
103+
}
104+
105+
body := utils.NewInactiveTopicPolicies(
106+
&inactiveTopicDeleteMode,
107+
int(maxInactiveDuration.Seconds()),
108+
args.deleteWhileInactive)
109+
110+
ns, err := utils.GetNamespaceName(vc.NameArg)
111+
if err != nil {
112+
return err
113+
}
114+
115+
admin := cmdutils.NewPulsarClient()
116+
err = admin.Namespaces().SetInactiveTopicPolicies(*ns, body)
117+
if err == nil {
118+
vc.Command.Printf("Set inactive topic policies successfully for [%s]", ns.String())
119+
}
120+
121+
return err
122+
}

0 commit comments

Comments
 (0)