From bb2f97bbc82c315060f64e6b75684ee1a18a8048 Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Wed, 11 Feb 2026 10:49:09 -0800 Subject: [PATCH] Add namespace-level inactive topic policies commands --- pkg/ctl/namespace/get_inactive_topic.go | 68 ++++++++++++ pkg/ctl/namespace/inactive_topic_test.go | 76 +++++++++++++ pkg/ctl/namespace/namespace.go | 3 + pkg/ctl/namespace/remove_inactive_topic.go | 65 +++++++++++ pkg/ctl/namespace/set_inactive_topic.go | 122 +++++++++++++++++++++ 5 files changed, 334 insertions(+) create mode 100644 pkg/ctl/namespace/get_inactive_topic.go create mode 100644 pkg/ctl/namespace/inactive_topic_test.go create mode 100644 pkg/ctl/namespace/remove_inactive_topic.go create mode 100644 pkg/ctl/namespace/set_inactive_topic.go diff --git a/pkg/ctl/namespace/get_inactive_topic.go b/pkg/ctl/namespace/get_inactive_topic.go new file mode 100644 index 00000000..f1235dd7 --- /dev/null +++ b/pkg/ctl/namespace/get_inactive_topic.go @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package namespace + +import ( + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + + "github.com/streamnative/pulsarctl/pkg/cmdutils" +) + +func GetInactiveTopicCmd(vc *cmdutils.VerbCmd) { + var desc cmdutils.LongDescription + desc.CommandUsedFor = "Get the inactive topic policies on a namespace" + desc.CommandPermission = "This command requires tenant admin permissions." + + var examples []cmdutils.Example + examples = append(examples, cmdutils.Example{ + Desc: desc.CommandUsedFor, + Command: "pulsarctl namespaces get-inactive-topic-policies tenant/namespace", + }) + desc.CommandExamples = examples + + var out []cmdutils.Output + out = append(out, NsErrors...) + desc.CommandOutput = out + + vc.SetDescription( + "get-inactive-topic-policies", + desc.CommandUsedFor, + desc.ToString(), + desc.ExampleToString()) + + vc.EnableOutputFlagSet() + + vc.SetRunFuncWithNameArg(func() error { + return doGetInactiveTopicPolicies(vc) + }, "the namespace name is not specified or the namespace name is specified more than one") +} + +func doGetInactiveTopicPolicies(vc *cmdutils.VerbCmd) error { + ns, err := utils.GetNamespaceName(vc.NameArg) + if err != nil { + return err + } + + admin := cmdutils.NewPulsarClient() + response, err := admin.Namespaces().GetInactiveTopicPolicies(*ns) + if err == nil { + cmdutils.PrintJSON(vc.Command.OutOrStdout(), &response) + } + + return err +} diff --git a/pkg/ctl/namespace/inactive_topic_test.go b/pkg/ctl/namespace/inactive_topic_test.go new file mode 100644 index 00000000..b569723a --- /dev/null +++ b/pkg/ctl/namespace/inactive_topic_test.go @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package namespace + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + "github.com/onsi/gomega" + + "github.com/streamnative/pulsarctl/pkg/test" +) + +func TestInactiveTopicCmd(t *testing.T) { + g := gomega.NewWithT(t) + + nsName := fmt.Sprintf("public/test-inactive-topic-ns-%s", test.RandomSuffix()) + createArgs := []string{"create", nsName} + _, execErr, _, _ := TestNamespaceCommands(createNs, createArgs) + g.Expect(execErr).Should(gomega.BeNil()) + + setArgs := []string{"set-inactive-topic-policies", nsName, + "-e=true", + "-t", "1h", + "-m", "delete_when_no_subscriptions"} + out, execErr, _, _ := TestNamespaceCommands(SetInactiveTopicCmd, setArgs) + g.Expect(execErr).Should(gomega.BeNil()) + g.Expect(out.String()).Should(gomega.Equal(fmt.Sprintf("Set inactive topic policies successfully for [%s]", + nsName))) + + getArgs := []string{"get-inactive-topic-policies", nsName} + g.Eventually(func(g gomega.Gomega) { + out, execErr, _, _ = TestNamespaceCommands(GetInactiveTopicCmd, getArgs) + g.Expect(execErr).Should(gomega.BeNil()) + var inactiveTopic utils.InactiveTopicPolicies + err := json.Unmarshal(out.Bytes(), &inactiveTopic) + g.Expect(err).Should(gomega.BeNil()) + g.Expect(inactiveTopic.DeleteWhileInactive).Should(gomega.Equal(true)) + g.Expect(inactiveTopic.MaxInactiveDurationSeconds).Should(gomega.Equal(3600)) + g.Expect(inactiveTopic.InactiveTopicDeleteMode.String()).Should(gomega.Equal("delete_when_no_subscriptions")) + }).Should(gomega.Succeed()) + + removeArgs := []string{"remove-inactive-topic-policies", nsName} + out, execErr, _, _ = TestNamespaceCommands(RemoveInactiveTopicCmd, removeArgs) + g.Expect(execErr).Should(gomega.BeNil()) + g.Expect(out.String()).Should(gomega.Equal(fmt.Sprintf("Remove inactive topic policies successfully from [%s]", + nsName))) + + g.Eventually(func(g gomega.Gomega) { + out, execErr, _, _ = TestNamespaceCommands(GetInactiveTopicCmd, getArgs) + g.Expect(execErr).Should(gomega.BeNil()) + var inactiveTopic utils.InactiveTopicPolicies + err := json.Unmarshal(out.Bytes(), &inactiveTopic) + g.Expect(err).Should(gomega.BeNil()) + g.Expect(inactiveTopic.DeleteWhileInactive).Should(gomega.Equal(false)) + g.Expect(inactiveTopic.MaxInactiveDurationSeconds).Should(gomega.Equal(0)) + g.Expect(inactiveTopic.InactiveTopicDeleteMode).Should(gomega.BeNil()) + }).Should(gomega.Succeed()) +} diff --git a/pkg/ctl/namespace/namespace.go b/pkg/ctl/namespace/namespace.go index 3183d629..116f260f 100644 --- a/pkg/ctl/namespace/namespace.go +++ b/pkg/ctl/namespace/namespace.go @@ -94,5 +94,8 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { cmdutils.AddVerbCmd(flagGrouping, resourceCmd, SetPublishRateCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, SetIsAllowAutoUpdateSchemaCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, GetIsAllowAutoUpdateSchemaCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, GetInactiveTopicCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, SetInactiveTopicCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, RemoveInactiveTopicCmd) return resourceCmd } diff --git a/pkg/ctl/namespace/remove_inactive_topic.go b/pkg/ctl/namespace/remove_inactive_topic.go new file mode 100644 index 00000000..980fce98 --- /dev/null +++ b/pkg/ctl/namespace/remove_inactive_topic.go @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package namespace + +import ( + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + + "github.com/streamnative/pulsarctl/pkg/cmdutils" +) + +func RemoveInactiveTopicCmd(vc *cmdutils.VerbCmd) { + var desc cmdutils.LongDescription + desc.CommandUsedFor = "Remove inactive topic policies from a namespace" + desc.CommandPermission = "This command requires tenant admin permissions." + + var examples []cmdutils.Example + examples = append(examples, cmdutils.Example{ + Desc: desc.CommandUsedFor, + Command: "pulsarctl namespaces remove-inactive-topic-policies tenant/namespace", + }) + desc.CommandExamples = examples + + var out []cmdutils.Output + out = append(out, NsErrors...) + desc.CommandOutput = out + + vc.SetDescription( + "remove-inactive-topic-policies", + desc.CommandUsedFor, + desc.ToString(), + desc.ExampleToString()) + + vc.SetRunFuncWithNameArg(func() error { + return doRemoveInactiveTopicPolicies(vc) + }, "the namespace name is not specified or the namespace name is specified more than one") +} + +func doRemoveInactiveTopicPolicies(vc *cmdutils.VerbCmd) error { + ns, err := utils.GetNamespaceName(vc.NameArg) + if err != nil { + return err + } + + admin := cmdutils.NewPulsarClient() + err = admin.Namespaces().RemoveInactiveTopicPolicies(*ns) + if err == nil { + vc.Command.Printf("Remove inactive topic policies successfully from [%s]", ns.String()) + } + return err +} diff --git a/pkg/ctl/namespace/set_inactive_topic.go b/pkg/ctl/namespace/set_inactive_topic.go new file mode 100644 index 00000000..9a3b51e9 --- /dev/null +++ b/pkg/ctl/namespace/set_inactive_topic.go @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package namespace + +import ( + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + + "github.com/streamnative/pulsarctl/pkg/cmdutils" + ctlutils "github.com/streamnative/pulsarctl/pkg/ctl/utils" +) + +type setInactiveTopicPoliciesArgs struct { + deleteWhileInactive bool + deleteInactiveTopicsMaxInactiveDuration string + inactiveTopicDeleteMode string +} + +func SetInactiveTopicCmd(vc *cmdutils.VerbCmd) { + var desc cmdutils.LongDescription + desc.CommandUsedFor = "Set the inactive topic policies on a namespace" + desc.CommandPermission = "This command requires tenant admin permissions." + + var examples []cmdutils.Example + set := cmdutils.Example{ + Desc: desc.CommandUsedFor, + Command: "pulsarctl namespaces set-inactive-topic-policies tenant/namespace \n" + + "\t--enable-delete-while-inactive true \n" + + "\t--max-inactive-duration 1h \n" + + "\t--delete-mode delete_when_no_subscriptions", + } + + examples = append(examples, set) + desc.CommandExamples = examples + + var out []cmdutils.Output + out = append(out, NsErrors...) + desc.CommandOutput = out + + vc.SetDescription( + "set-inactive-topic-policies", + desc.CommandUsedFor, + desc.ToString(), + desc.ExampleToString()) + + args := setInactiveTopicPoliciesArgs{} + + vc.FlagSetGroup.InFlagSet("Set Inactive Topic", func(flagSet *pflag.FlagSet) { + flagSet.BoolVarP(&args.deleteWhileInactive, + "enable-delete-while-inactive", + "e", + false, + "Control whether deletion is enabled while inactive") + + flagSet.StringVarP(&args.deleteInactiveTopicsMaxInactiveDuration, + "max-inactive-duration", + "t", + "", + "Max duration of topic inactivity in seconds, "+ + "topics that are inactive for longer than this value will be deleted (eg: 1s, 10s, 1m, 5h, 3d)") + flagSet.StringVarP(&args.inactiveTopicDeleteMode, + "delete-mode", + "m", + "", + "Mode of delete inactive topic, "+ + "Valid options are: [delete_when_no_subscriptions, delete_when_subscriptions_caught_up]") + + _ = cobra.MarkFlagRequired(flagSet, "delete-mode") + _ = cobra.MarkFlagRequired(flagSet, "max-inactive-duration") + }) + vc.EnableOutputFlagSet() + + vc.SetRunFuncWithNameArg(func() error { + return doSetInactiveTopicPolicies(vc, args) + }, "the namespace name is not specified or the namespace name is specified more than one") +} + +func doSetInactiveTopicPolicies(vc *cmdutils.VerbCmd, args setInactiveTopicPoliciesArgs) error { + inactiveTopicDeleteMode, err := utils.ParseInactiveTopicDeleteMode(args.inactiveTopicDeleteMode) + if err != nil { + return err + } + + maxInactiveDuration, err := ctlutils.ParseRelativeTimeInSeconds(args.deleteInactiveTopicsMaxInactiveDuration) + if err != nil { + return err + } + + body := utils.NewInactiveTopicPolicies( + &inactiveTopicDeleteMode, + int(maxInactiveDuration.Seconds()), + args.deleteWhileInactive) + + ns, err := utils.GetNamespaceName(vc.NameArg) + if err != nil { + return err + } + + admin := cmdutils.NewPulsarClient() + err = admin.Namespaces().SetInactiveTopicPolicies(*ns, body) + if err == nil { + vc.Command.Printf("Set inactive topic policies successfully for [%s]", ns.String()) + } + + return err +}