Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/streamnative/pulsarctl
go 1.25.8

require (
github.com/apache/pulsar-client-go v0.18.0-candidate-1.0.20251222030102-3bb7d4eff361
github.com/apache/pulsar-client-go v0.19.0-candidate-1
github.com/docker/go-connections v0.5.0
github.com/fatih/color v1.7.0
github.com/ghodss/yaml v1.0.0
Expand All @@ -24,6 +24,7 @@ require (

require (
dario.cat/mergo v1.0.0 // indirect
github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 // indirect
github.com/AthenZ/athenz v1.12.31 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/DataDog/zstd v1.5.7 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 h1:He8afgbRMd7mFxO99hRNu+6tazq8nFF9lIwo9JFroBk=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8=
github.com/AthenZ/athenz v1.12.31 h1:GQnRDLgivPlVvklSpH9gp+t/dho9DJTtt+hlLYo5TX8=
github.com/AthenZ/athenz v1.12.31/go.mod h1:6Siq4JOA4OjgYVgtTVIeHrb4HB2hEL8i4fx7aOFrgfY=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
Expand All @@ -14,8 +14,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/RoaringBitmap/roaring/v2 v2.14.4 h1:4aKySrrg9G/5oRtJ3TrZLObVqxgQ9f1znCRBwEwjuVw=
github.com/RoaringBitmap/roaring/v2 v2.14.4/go.mod h1:oMvV6omPWr+2ifRdeZvVJyaz+aoEUopyv5iH0u/+wbY=
github.com/apache/pulsar-client-go v0.18.0-candidate-1.0.20251222030102-3bb7d4eff361 h1:Fb4j4v85TPq64FRp+QMLWaW3/Hg1Jg7TBWaZwPcSO9Y=
github.com/apache/pulsar-client-go v0.18.0-candidate-1.0.20251222030102-3bb7d4eff361/go.mod h1:/Zf8Q8bSSc6ndEJ8V1muIHf6ZWsMrHoQU+98Ww9pOeI=
github.com/apache/pulsar-client-go v0.19.0-candidate-1 h1:FtOmcndcFzmleMZmkQEO1OEZ0HroCchqPMpIflKC1lY=
github.com/apache/pulsar-client-go v0.19.0-candidate-1/go.mod h1:/Zf8Q8bSSc6ndEJ8V1muIHf6ZWsMrHoQU+98Ww9pOeI=
github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down
1 change: 1 addition & 0 deletions pkg/ctl/brokers/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
"broker")

cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getBrokerListCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, leaderBrokerCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getDynamicConfigListNameCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getOwnedNamespacesCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, updateDynamicConfig)
Expand Down
67 changes: 67 additions & 0 deletions pkg/ctl/brokers/leader_broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package brokers

import "github.com/streamnative/pulsarctl/pkg/cmdutils"

func leaderBrokerCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Get the information of the leader broker"
desc.CommandPermission = "This command requires super-user permissions."

var examples []cmdutils.Example
get := cmdutils.Example{
Desc: desc.CommandUsedFor,
Command: "pulsarctl brokers leader-broker",
}
examples = append(examples, get)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "{\n" +
" \"brokerId\": \"broker-1\",\n" +
" \"serviceUrl\": \"http://127.0.0.1:8080\"\n" +
"}",
}
out = append(out, successOut)
desc.CommandOutput = out

vc.SetDescription(
"leader-broker",
desc.CommandUsedFor,
desc.ToString(),
desc.ExampleToString(),
"leader-broker")

vc.SetRunFunc(func() error {
return doGetLeaderBroker(vc)
})
vc.EnableOutputFlagSet()
}

func doGetLeaderBroker(vc *cmdutils.VerbCmd) error {
admin := cmdutils.NewPulsarClient()
info, err := admin.Brokers().GetLeaderBroker()
if err == nil {
oc := cmdutils.NewOutputContent().WithObject(info)
err = vc.OutputConfig.WriteOutput(vc.Command.OutOrStdout(), oc)
}
return err
}
57 changes: 57 additions & 0 deletions pkg/ctl/brokers/leader_broker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package brokers

import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/stretchr/testify/assert"

"github.com/streamnative/pulsarctl/pkg/cmdutils"
)

func TestLeaderBroker(t *testing.T) {
oldURL := cmdutils.PulsarCtlConfig.WebServiceURL
defer func() {
cmdutils.PulsarCtlConfig.WebServiceURL = oldURL
}()

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, http.MethodGet, r.Method)
assert.Equal(t, "/admin/v2/brokers/leaderBroker", r.URL.Path)
_, _ = w.Write([]byte(`{"brokerId":"broker-1","serviceUrl":"http://127.0.0.1:8080"}`))
}))
defer srv.Close()

cmdutils.PulsarCtlConfig.WebServiceURL = srv.URL

args := []string{"leader-broker"}
out, execErr, _, err := TestBrokersCommands(leaderBrokerCmd, args)
assert.Nil(t, err)
assert.Nil(t, execErr)

var info utils.BrokerInfo
err = json.Unmarshal(out.Bytes(), &info)
assert.Nil(t, err)
assert.Equal(t, "broker-1", info.BrokerID)
assert.Equal(t, "http://127.0.0.1:8080", info.ServiceURL)
}
1 change: 1 addition & 0 deletions pkg/ctl/brokerstats/load_report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ func TestDumpLoadReport(t *testing.T) {
t.FailNow()
}
defaultBrokerData := utils.NewLocalBrokerData()
defaultBrokerData.LastStats = nil
assert.Equal(t, defaultBrokerData, getBrokerData)
}
166 changes: 166 additions & 0 deletions pkg/ctl/namespace/bookie_affinity_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package namespace

import (
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/streamnative/pulsarctl/pkg/cmdutils"
)

func GetBookieAffinityGroupCmd(vc *cmdutils.VerbCmd) {
var desc cmdutils.LongDescription
desc.CommandUsedFor = "Get bookie affinity group configured for a namespace"
desc.CommandPermission = "This command requires super-user permissions."

var examples []cmdutils.Example
get := cmdutils.Example{
Desc: "Get bookie affinity group configured for a namespace",
Command: "pulsarctl namespaces get-bookie-affinity-group tenant/namespace",
}
examples = append(examples, get)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "{\n \"bookkeeperAffinityGroupPrimary\": \"primary\",\n \"bookkeeperAffinityGroupSecondary\": \"secondary\"\n}",
}
out = append(out, successOut, ArgError, NsNotExistError)
out = append(out, NsErrors...)
desc.CommandOutput = out

vc.SetDescription(
"get-bookie-affinity-group",
"Get bookie affinity group configured for a namespace",
desc.ToString(),
desc.ExampleToString(),
)

vc.EnableOutputFlagSet()
vc.SetRunFuncWithNameArg(func() error {
return doGetBookieAffinityGroup(vc)
}, "the namespace name is not specified or the namespace name is specified more than one")
}

func doGetBookieAffinityGroup(vc *cmdutils.VerbCmd) error {
admin := cmdutils.NewPulsarClient()
group, err := admin.Namespaces().GetBookieAffinityGroup(vc.NameArg)
if err == nil {
oc := cmdutils.NewOutputContent().WithObject(group)
err = vc.OutputConfig.WriteOutput(vc.Command.OutOrStdout(), oc)
}
return err
}

func SetBookieAffinityGroupCmd(vc *cmdutils.VerbCmd) {
var desc cmdutils.LongDescription
desc.CommandUsedFor = "Set bookie affinity group configured for a namespace"
desc.CommandPermission = "This command requires super-user permissions."

var examples []cmdutils.Example
set := cmdutils.Example{
Desc: "Set bookie affinity group configured for a namespace",
Command: "pulsarctl namespaces set-bookie-affinity-group tenant/namespace \n" +
"\t--primary-group primary-group \n" +
"\t--secondary-group secondary-group",
}
examples = append(examples, set)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "Set bookie affinity group successfully for [tenant/namespace]",
}
out = append(out, successOut, ArgError, NsNotExistError)
out = append(out, NsErrors...)
desc.CommandOutput = out

vc.SetDescription(
"set-bookie-affinity-group",
"Set bookie affinity group configured for a namespace",
desc.ToString(),
desc.ExampleToString(),
)

data := utils.BookieAffinityGroupData{}
vc.FlagSetGroup.InFlagSet("Bookie Affinity Group", func(set *pflag.FlagSet) {
set.StringVar(&data.BookkeeperAffinityGroupPrimary, "primary-group", "", "primary affinity group")
set.StringVar(&data.BookkeeperAffinityGroupSecondary, "secondary-group", "", "secondary affinity group")
_ = cobra.MarkFlagRequired(set, "primary-group")
})

vc.SetRunFuncWithNameArg(func() error {
return doSetBookieAffinityGroup(vc, data)
}, "the namespace name is not specified or the namespace name is specified more than one")
}

func doSetBookieAffinityGroup(vc *cmdutils.VerbCmd, data utils.BookieAffinityGroupData) error {
admin := cmdutils.NewPulsarClient()
err := admin.Namespaces().SetBookieAffinityGroup(vc.NameArg, data)
if err == nil {
vc.Command.Printf("Set bookie affinity group successfully for [%s]\n", vc.NameArg)
}
return err
}

func DeleteBookieAffinityGroupCmd(vc *cmdutils.VerbCmd) {
var desc cmdutils.LongDescription
desc.CommandUsedFor = "Delete bookie affinity group configured for a namespace"
desc.CommandPermission = "This command requires super-user permissions."

var examples []cmdutils.Example
del := cmdutils.Example{
Desc: "Delete bookie affinity group configured for a namespace",
Command: "pulsarctl namespaces delete-bookie-affinity-group tenant/namespace",
}
examples = append(examples, del)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "Deleted bookie affinity group successfully for [tenant/namespace]",
}
out = append(out, successOut, ArgError, NsNotExistError)
out = append(out, NsErrors...)
desc.CommandOutput = out

vc.SetDescription(
"delete-bookie-affinity-group",
"Delete bookie affinity group configured for a namespace",
desc.ToString(),
desc.ExampleToString(),
)

vc.SetRunFuncWithNameArg(func() error {
return doDeleteBookieAffinityGroup(vc)
}, "the namespace name is not specified or the namespace name is specified more than one")
}

func doDeleteBookieAffinityGroup(vc *cmdutils.VerbCmd) error {
admin := cmdutils.NewPulsarClient()
err := admin.Namespaces().DeleteBookieAffinityGroup(vc.NameArg)
if err == nil {
vc.Command.Printf("Deleted bookie affinity group successfully for [%s]\n", vc.NameArg)
}
return err
}
Loading
Loading