Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/streamnative/pulsarctl
go 1.25.8

require (
github.com/apache/pulsar-client-go v0.18.0-candidate-1.0.20251222030102-3bb7d4eff361
github.com/apache/pulsar-client-go v0.19.0-candidate-1
github.com/docker/go-connections v0.5.0
github.com/fatih/color v1.7.0
github.com/ghodss/yaml v1.0.0
Expand All @@ -24,6 +24,7 @@ require (

require (
dario.cat/mergo v1.0.0 // indirect
github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 // indirect
github.com/AthenZ/athenz v1.12.31 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/DataDog/zstd v1.5.7 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 h1:He8afgbRMd7mFxO99hRNu+6tazq8nFF9lIwo9JFroBk=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8=
github.com/AthenZ/athenz v1.12.31 h1:GQnRDLgivPlVvklSpH9gp+t/dho9DJTtt+hlLYo5TX8=
github.com/AthenZ/athenz v1.12.31/go.mod h1:6Siq4JOA4OjgYVgtTVIeHrb4HB2hEL8i4fx7aOFrgfY=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
Expand All @@ -14,8 +14,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/RoaringBitmap/roaring/v2 v2.14.4 h1:4aKySrrg9G/5oRtJ3TrZLObVqxgQ9f1znCRBwEwjuVw=
github.com/RoaringBitmap/roaring/v2 v2.14.4/go.mod h1:oMvV6omPWr+2ifRdeZvVJyaz+aoEUopyv5iH0u/+wbY=
github.com/apache/pulsar-client-go v0.18.0-candidate-1.0.20251222030102-3bb7d4eff361 h1:Fb4j4v85TPq64FRp+QMLWaW3/Hg1Jg7TBWaZwPcSO9Y=
github.com/apache/pulsar-client-go v0.18.0-candidate-1.0.20251222030102-3bb7d4eff361/go.mod h1:/Zf8Q8bSSc6ndEJ8V1muIHf6ZWsMrHoQU+98Ww9pOeI=
github.com/apache/pulsar-client-go v0.19.0-candidate-1 h1:FtOmcndcFzmleMZmkQEO1OEZ0HroCchqPMpIflKC1lY=
github.com/apache/pulsar-client-go v0.19.0-candidate-1/go.mod h1:/Zf8Q8bSSc6ndEJ8V1muIHf6ZWsMrHoQU+98Ww9pOeI=
github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down
1 change: 1 addition & 0 deletions pkg/ctl/brokers/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
"broker")

cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getBrokerListCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, leaderBrokerCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getDynamicConfigListNameCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getOwnedNamespacesCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, updateDynamicConfig)
Expand Down
67 changes: 67 additions & 0 deletions pkg/ctl/brokers/leader_broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package brokers

import "github.com/streamnative/pulsarctl/pkg/cmdutils"

func leaderBrokerCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Get the information of the leader broker"
desc.CommandPermission = "This command requires super-user permissions."

var examples []cmdutils.Example
get := cmdutils.Example{
Desc: desc.CommandUsedFor,
Command: "pulsarctl brokers leader-broker",
}
examples = append(examples, get)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "{\n" +
" \"brokerId\": \"broker-1\",\n" +
" \"serviceUrl\": \"http://127.0.0.1:8080\"\n" +
"}",
}
out = append(out, successOut)
desc.CommandOutput = out

vc.SetDescription(
"leader-broker",
desc.CommandUsedFor,
desc.ToString(),
desc.ExampleToString(),
"leader-broker")

vc.SetRunFunc(func() error {
return doGetLeaderBroker(vc)
})
vc.EnableOutputFlagSet()
}

func doGetLeaderBroker(vc *cmdutils.VerbCmd) error {
admin := cmdutils.NewPulsarClient()
info, err := admin.Brokers().GetLeaderBroker()
if err == nil {
oc := cmdutils.NewOutputContent().WithObject(info)
err = vc.OutputConfig.WriteOutput(vc.Command.OutOrStdout(), oc)
}
return err
}
57 changes: 57 additions & 0 deletions pkg/ctl/brokers/leader_broker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package brokers

import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/stretchr/testify/assert"

"github.com/streamnative/pulsarctl/pkg/cmdutils"
)

func TestLeaderBroker(t *testing.T) {
oldURL := cmdutils.PulsarCtlConfig.WebServiceURL
defer func() {
cmdutils.PulsarCtlConfig.WebServiceURL = oldURL
}()

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, http.MethodGet, r.Method)
assert.Equal(t, "/admin/v2/brokers/leaderBroker", r.URL.Path)
_, _ = w.Write([]byte(`{"brokerId":"broker-1","serviceUrl":"http://127.0.0.1:8080"}`))
}))
defer srv.Close()

cmdutils.PulsarCtlConfig.WebServiceURL = srv.URL

args := []string{"leader-broker"}
out, execErr, _, err := TestBrokersCommands(leaderBrokerCmd, args)
assert.Nil(t, err)
assert.Nil(t, execErr)

var info utils.BrokerInfo
err = json.Unmarshal(out.Bytes(), &info)
assert.Nil(t, err)
assert.Equal(t, "broker-1", info.BrokerID)
assert.Equal(t, "http://127.0.0.1:8080", info.ServiceURL)
}
103 changes: 103 additions & 0 deletions pkg/ctl/namespace/admin_http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package namespace

import (
"io"
"net/http"
"net/url"
"path"
"strconv"
"strings"

pulsaradmin "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/auth"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"

"github.com/streamnative/pulsarctl/pkg/cmdutils"
)

func newNamespaceRESTClient() (*rest.Client, error) {
cfg := config.Config(*cmdutils.PulsarCtlConfig)
cfg.PulsarAPIVersion = config.V2

authProvider, err := auth.GetAuthProvider(&cfg)
if err != nil {
return nil, err
}

return &rest.Client{
ServiceURL: cfg.WebServiceURL,
VersionInfo: pulsaradmin.ReleaseVersion,
HTTPClient: &http.Client{
Timeout: pulsaradmin.DefaultHTTPTimeOutDuration,
Transport: authProvider,
},
}, nil
}

func namespaceAdminEndpoint(parts ...string) string {
escapedParts := make([]string, len(parts))
for i, part := range parts {
escapedParts[i] = url.PathEscape(part)
Comment thread
freeznet marked this conversation as resolved.
Outdated
}

return path.Join(
utils.MakeHTTPPath(config.V2.String(), "/namespaces"),
path.Join(escapedParts...),
)
}

func readOptionalStringResponse(resp *http.Response) (*string, error) {
defer func() {
_ = resp.Body.Close()
}()

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

value := strings.TrimSpace(string(body))
if value == "" || value == "null" {
return nil, nil
}

if unquoted, err := strconv.Unquote(value); err == nil {
value = unquoted
}

return &value, nil
}

func removeNamespaceProperty(ns utils.NameSpaceName, key string) (*string, error) {
client, err := newNamespaceRESTClient()
if err != nil {
return nil, err
}

endpoint := namespaceAdminEndpoint(ns.String(), "property", key)
resp, err := client.MakeRequest(http.MethodDelete, endpoint)
if err != nil {
return nil, err
}

return readOptionalStringResponse(resp)
}
Loading
Loading