diff options
Diffstat (limited to 'client')
24 files changed, 236 insertions, 785 deletions
diff --git a/client/go/go.mod b/client/go/go.mod index 18e3853868d..94f69c8286a 100644 --- a/client/go/go.mod +++ b/client/go/go.mod @@ -13,8 +13,8 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.7.0 github.com/zalando/go-keyring v0.1.1 - golang.org/x/net v0.8.0 - golang.org/x/sys v0.6.0 + golang.org/x/net v0.9.0 + golang.org/x/sys v0.7.0 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ) @@ -28,7 +28,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/stretchr/objx v0.1.1 // indirect - golang.org/x/text v0.8.0 // indirect + golang.org/x/text v0.9.0 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/client/go/go.sum b/client/go/go.sum index 2af8bb1e4c0..ac662c9fd43 100644 --- a/client/go/go.sum +++ b/client/go/go.sum @@ -47,16 +47,16 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/zalando/go-keyring v0.1.1 h1:w2V9lcx/Uj4l+dzAf1m9s+DJ1O8ROkEHnynonHjTcYE= github.com/zalando/go-keyring v0.1.1/go.mod h1:OIC+OZ28XbmwFxU/Rp9V7eKzZjamBJwRzC8UFJH9+L8= -golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= -golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210616045830-e2b7044e8c71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= -golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/client/go/internal/admin/prog/common_env.go b/client/go/internal/admin/prog/common_env.go deleted file mode 100644 index f743716a64e..00000000000 --- a/client/go/internal/admin/prog/common_env.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -// Author: arnej - -package prog - -import ( - "os" - "strings" - - "github.com/vespa-engine/vespa/client/go/internal/admin/envvars" - "github.com/vespa-engine/vespa/client/go/internal/vespa" -) - -func (spec *Spec) configureCommonEnv() { - os.Unsetenv(envvars.LD_PRELOAD) - spec.Setenv(envvars.STD_THREAD_PREVENT_TRY_CATCH, "true") - spec.Setenv(envvars.GLIBCXX_FORCE_NEW, "1") - spec.Setenv(envvars.LD_LIBRARY_PATH, vespa.FindHome()+"/lib64:/opt/vespa-deps/lib64") - spec.Setenv(envvars.MALLOC_ARENA_MAX, "1") - - // fallback from old env.vars: - spec.considerEnvFallback(envvars.VESPA_USE_HUGEPAGES_LIST, envvars.HUGEPAGES_LIST) - spec.considerEnvFallback(envvars.VESPA_USE_MADVISE_LIST, envvars.MADVISE_LIST) - spec.considerEnvFallback(envvars.VESPA_USE_VESPAMALLOC, envvars.VESPAMALLOC_LIST) - spec.considerEnvFallback(envvars.VESPA_USE_VESPAMALLOC_D, envvars.VESPAMALLOCD_LIST) - spec.considerEnvFallback(envvars.VESPA_USE_VESPAMALLOC_DST, envvars.VESPAMALLOCDST_LIST) - spec.considerEnvFallback(envvars.VESPA_USE_NO_VESPAMALLOC, envvars.NO_VESPAMALLOC_LIST) - // other fallbacks: - spec.considerFallback(envvars.ROOT, vespa.FindHome()) - spec.considerFallback(envvars.VESPA_USER, vespa.FindVespaUser()) - spec.considerFallback(envvars.VESPA_USE_HUGEPAGES_LIST, "all") - spec.considerFallback(envvars.VESPA_USE_VESPAMALLOC, "all") - spec.considerFallback(envvars.VESPA_USE_NO_VESPAMALLOC, strings.Join([]string{ - "vespa-rpc-invoke", - "vespa-get-config", - "vespa-sentinel-cmd", - "vespa-route", - "vespa-proton-cmd", - "vespa-configproxy-cmd", - "vespa-config-status", - }, " ")) - -} diff --git a/client/go/internal/admin/prog/hugepages.go b/client/go/internal/admin/prog/hugepages.go index c6f019937ff..b66d512d4c9 100644 --- a/client/go/internal/admin/prog/hugepages.go +++ b/client/go/internal/admin/prog/hugepages.go @@ -9,7 +9,7 @@ import ( ) func (spec *Spec) ConfigureHugePages() { - if spec.matchesListEnv(envvars.VESPA_USE_HUGEPAGES_LIST) { + if spec.MatchesListEnv(envvars.VESPA_USE_HUGEPAGES_LIST) { trace.Debug("setting", envvars.VESPA_USE_HUGEPAGES, "= 'yes'") spec.Setenv(envvars.VESPA_USE_HUGEPAGES, "yes") } diff --git a/client/go/internal/admin/prog/madvise.go b/client/go/internal/admin/prog/madvise.go index 48986a12182..967823d956b 100644 --- a/client/go/internal/admin/prog/madvise.go +++ b/client/go/internal/admin/prog/madvise.go @@ -9,7 +9,7 @@ import ( ) func (spec *Spec) ConfigureUseMadvise() { - limit := spec.valueFromListEnv(envvars.VESPA_USE_MADVISE_LIST) + limit := spec.ValueFromListEnv(envvars.VESPA_USE_MADVISE_LIST) if limit != "" { trace.Trace("shall use madvise with limit", limit, "as set in", envvars.VESPA_USE_MADVISE_LIST) spec.Setenv(envvars.VESPA_MALLOC_MADVISE_LIMIT, limit) diff --git a/client/go/internal/admin/prog/spec_env.go b/client/go/internal/admin/prog/spec_env.go index 4fa40695acb..c88ec963812 100644 --- a/client/go/internal/admin/prog/spec_env.go +++ b/client/go/internal/admin/prog/spec_env.go @@ -50,17 +50,17 @@ func (spec *Spec) EffectiveEnv() []string { return envVec } -func (spec *Spec) considerFallback(varName, varValue string) { +func (spec *Spec) ConsiderFallback(varName, varValue string) { if spec.Getenv(varName) == "" && varValue != "" { spec.Setenv(varName, varValue) } } -func (spec *Spec) considerEnvFallback(targetVar, fallbackVar string) { - spec.considerFallback(targetVar, spec.Getenv(fallbackVar)) +func (spec *Spec) ConsiderEnvFallback(targetVar, fallbackVar string) { + spec.ConsiderFallback(targetVar, spec.Getenv(fallbackVar)) } -func (p *Spec) matchesListEnv(envVarName string) bool { +func (p *Spec) MatchesListEnv(envVarName string) bool { return p.matchesListString(p.Getenv(envVarName)) } @@ -80,7 +80,7 @@ func (p *Spec) matchesListString(env string) bool { return false } -func (p *Spec) valueFromListEnv(envVarName string) string { +func (p *Spec) ValueFromListEnv(envVarName string) string { return p.valueFromListString(p.Getenv(envVarName)) } diff --git a/client/go/internal/admin/prog/valgrind.go b/client/go/internal/admin/prog/valgrind.go index 2d7f0a597d9..b949102d6bd 100644 --- a/client/go/internal/admin/prog/valgrind.go +++ b/client/go/internal/admin/prog/valgrind.go @@ -21,25 +21,18 @@ const ( func (p *Spec) ConfigureValgrind() { p.shouldUseValgrind = false p.shouldUseCallgrind = false - env := p.Getenv(envvars.VESPA_USE_VALGRIND) - allValgrind := env == "all" - parts := strings.Split(env, " ") - for _, part := range parts { - if p.BaseName == part || allValgrind { - trace.Trace("using valgrind as", p.Program, "has basename in", envvars.VESPA_USE_VALGRIND, "=>", env) - backticks := util.BackTicksWithStderr - out, err := backticks.Run(VALGRIND_PROG, "--help") - if err != nil { - trace.Trace("trial run of valgrind fails:", err, "=>", out) - return - } - if opts := p.Getenv(envvars.VESPA_VALGRIND_OPT); strings.Contains(opts, "callgrind") { - p.shouldUseCallgrind = true - } - p.shouldUseValgrind = true + if p.MatchesListEnv(envvars.VESPA_USE_VALGRIND) { + trace.Trace("using valgrind as", p.Program, "has basename in", envvars.VESPA_USE_VALGRIND) + backticks := util.BackTicksWithStderr + out, err := backticks.Run(VALGRIND_PROG, "--help") + if err != nil { + trace.Trace("trial run of valgrind fails:", err, "=>", out) return } - trace.Debug("checking", envvars.VESPA_USE_VALGRIND, ":", p.BaseName, "!=", part) + if opts := p.Getenv(envvars.VESPA_VALGRIND_OPT); strings.Contains(opts, "callgrind") { + p.shouldUseCallgrind = true + } + p.shouldUseValgrind = true } } diff --git a/client/go/internal/admin/prog/vespamalloc.go b/client/go/internal/admin/prog/vespamalloc.go index 439935770d7..e66c9e5d966 100644 --- a/client/go/internal/admin/prog/vespamalloc.go +++ b/client/go/internal/admin/prog/vespamalloc.go @@ -27,7 +27,7 @@ func vespaMallocLib(suf string) string { func (p *Spec) ConfigureVespaMalloc() { p.shouldUseVespaMalloc = false - if p.matchesListEnv(envvars.VESPA_USE_NO_VESPAMALLOC) { + if p.MatchesListEnv(envvars.VESPA_USE_NO_VESPAMALLOC) { trace.Trace("use no vespamalloc:", p.BaseName) return } @@ -36,11 +36,11 @@ func (p *Spec) ConfigureVespaMalloc() { return } var useFile string - if p.matchesListEnv(envvars.VESPA_USE_VESPAMALLOC_DST) { + if p.MatchesListEnv(envvars.VESPA_USE_VESPAMALLOC_DST) { useFile = vespaMallocLib("libvespamallocdst16.so") - } else if p.matchesListEnv(envvars.VESPA_USE_VESPAMALLOC_D) { + } else if p.MatchesListEnv(envvars.VESPA_USE_VESPAMALLOC_D) { useFile = vespaMallocLib("libvespamallocd.so") - } else if p.matchesListEnv(envvars.VESPA_USE_VESPAMALLOC) { + } else if p.MatchesListEnv(envvars.VESPA_USE_VESPAMALLOC) { useFile = vespaMallocLib("libvespamalloc.so") } trace.Trace("use file:", useFile) @@ -51,7 +51,7 @@ func (p *Spec) ConfigureVespaMalloc() { otherFile := vespaMallocLib("libvespa_load_as_huge.so") useFile = fmt.Sprintf("%s:%s", useFile, otherFile) } - p.considerEnvFallback(envvars.VESPA_MALLOC_HUGEPAGES, envvars.VESPA_USE_HUGEPAGES) + p.ConsiderEnvFallback(envvars.VESPA_MALLOC_HUGEPAGES, envvars.VESPA_USE_HUGEPAGES) p.vespaMallocPreload = useFile p.shouldUseVespaMalloc = true } diff --git a/client/go/internal/admin/vespa-wrapper/startcbinary/common_env.go b/client/go/internal/admin/vespa-wrapper/startcbinary/common_env.go index 6bc730b5119..07ec19bf7e5 100644 --- a/client/go/internal/admin/vespa-wrapper/startcbinary/common_env.go +++ b/client/go/internal/admin/vespa-wrapper/startcbinary/common_env.go @@ -8,40 +8,30 @@ import ( "strings" "github.com/vespa-engine/vespa/client/go/internal/admin/envvars" - "github.com/vespa-engine/vespa/client/go/internal/admin/trace" + "github.com/vespa-engine/vespa/client/go/internal/admin/prog" "github.com/vespa-engine/vespa/client/go/internal/vespa" ) -func (spec *ProgSpec) considerFallback(varName, varValue string) { - if spec.getenv(varName) == "" && varValue != "" { - spec.setenv(varName, varValue) - } -} - -func (spec *ProgSpec) considerEnvFallback(targetVar, fallbackVar string) { - spec.considerFallback(targetVar, spec.getenv(fallbackVar)) -} - -func (spec *ProgSpec) configureCommonEnv() { +func configureCommonEnv(spec *prog.Spec) { os.Unsetenv(envvars.LD_PRELOAD) - spec.setenv(envvars.STD_THREAD_PREVENT_TRY_CATCH, "true") - spec.setenv(envvars.GLIBCXX_FORCE_NEW, "1") - spec.setenv(envvars.LD_LIBRARY_PATH, vespa.FindHome()+"/lib64:/opt/vespa-deps/lib64") - spec.setenv(envvars.MALLOC_ARENA_MAX, "1") + spec.Setenv(envvars.STD_THREAD_PREVENT_TRY_CATCH, "true") + spec.Setenv(envvars.GLIBCXX_FORCE_NEW, "1") + spec.Setenv(envvars.LD_LIBRARY_PATH, vespa.FindHome()+"/lib64:/opt/vespa-deps/lib64") + spec.Setenv(envvars.MALLOC_ARENA_MAX, "1") // fallback from old env.vars: - spec.considerEnvFallback(envvars.VESPA_USE_HUGEPAGES_LIST, envvars.HUGEPAGES_LIST) - spec.considerEnvFallback(envvars.VESPA_USE_MADVISE_LIST, envvars.MADVISE_LIST) - spec.considerEnvFallback(envvars.VESPA_USE_VESPAMALLOC, envvars.VESPAMALLOC_LIST) - spec.considerEnvFallback(envvars.VESPA_USE_VESPAMALLOC_D, envvars.VESPAMALLOCD_LIST) - spec.considerEnvFallback(envvars.VESPA_USE_VESPAMALLOC_DST, envvars.VESPAMALLOCDST_LIST) - spec.considerEnvFallback(envvars.VESPA_USE_NO_VESPAMALLOC, envvars.NO_VESPAMALLOC_LIST) + spec.ConsiderEnvFallback(envvars.VESPA_USE_HUGEPAGES_LIST, envvars.HUGEPAGES_LIST) + spec.ConsiderEnvFallback(envvars.VESPA_USE_MADVISE_LIST, envvars.MADVISE_LIST) + spec.ConsiderEnvFallback(envvars.VESPA_USE_VESPAMALLOC, envvars.VESPAMALLOC_LIST) + spec.ConsiderEnvFallback(envvars.VESPA_USE_VESPAMALLOC_D, envvars.VESPAMALLOCD_LIST) + spec.ConsiderEnvFallback(envvars.VESPA_USE_VESPAMALLOC_DST, envvars.VESPAMALLOCDST_LIST) + spec.ConsiderEnvFallback(envvars.VESPA_USE_NO_VESPAMALLOC, envvars.NO_VESPAMALLOC_LIST) // other fallbacks: - spec.considerFallback(envvars.ROOT, vespa.FindHome()) - spec.considerFallback(envvars.VESPA_USER, vespa.FindVespaUser()) - spec.considerFallback(envvars.VESPA_USE_HUGEPAGES_LIST, "all") - spec.considerFallback(envvars.VESPA_USE_VESPAMALLOC, "all") - spec.considerFallback(envvars.VESPA_USE_NO_VESPAMALLOC, strings.Join([]string{ + spec.ConsiderFallback(envvars.ROOT, vespa.FindHome()) + spec.ConsiderFallback(envvars.VESPA_USER, vespa.FindVespaUser()) + spec.ConsiderFallback(envvars.VESPA_USE_HUGEPAGES_LIST, "all") + spec.ConsiderFallback(envvars.VESPA_USE_VESPAMALLOC, "all") + spec.ConsiderFallback(envvars.VESPA_USE_NO_VESPAMALLOC, strings.Join([]string{ "vespa-rpc-invoke", "vespa-get-config", "vespa-sentinel-cmd", @@ -53,31 +43,16 @@ func (spec *ProgSpec) configureCommonEnv() { } -func (spec *ProgSpec) configureHugePages() { - if spec.matchesListEnv(envvars.VESPA_USE_HUGEPAGES_LIST) { - spec.setenv(envvars.VESPA_USE_HUGEPAGES, "yes") - } -} - -func (spec *ProgSpec) configureUseMadvise() { - limit := spec.valueFromListEnv(envvars.VESPA_USE_MADVISE_LIST) - if limit != "" { - trace.Trace("shall use madvise with limit", limit, "as set in", envvars.VESPA_USE_MADVISE_LIST) - spec.setenv(envvars.VESPA_MALLOC_MADVISE_LIMIT, limit) - return - } -} - -func (spec *ProgSpec) configurePath() { +func configurePath(spec *prog.Spec) { // Prefer newer gdb and pstack: - spec.prependPath("/opt/rh/gcc-toolset-12/root/usr/bin") + prependPath("/opt/rh/gcc-toolset-12/root/usr/bin", spec) // Maven is needed for tester applications: - spec.prependPath(vespa.FindHome() + "/local/maven/bin") - spec.prependPath(vespa.FindHome() + "/bin64") - spec.prependPath(vespa.FindHome() + "/bin") + prependPath(vespa.FindHome()+"/local/maven/bin", spec) + prependPath(vespa.FindHome()+"/bin64", spec) + prependPath(vespa.FindHome()+"/bin", spec) // how to find the "java" program? // should be available in $VESPA_HOME/bin or JAVA_HOME - if javaHome := spec.getenv(envvars.JAVA_HOME); javaHome != "" { - spec.prependPath(javaHome + "/bin") + if javaHome := spec.Getenv(envvars.JAVA_HOME); javaHome != "" { + prependPath(javaHome+"/bin", spec) } } diff --git a/client/go/internal/admin/vespa-wrapper/startcbinary/numactl.go b/client/go/internal/admin/vespa-wrapper/startcbinary/numactl.go deleted file mode 100644 index fe091dedba9..00000000000 --- a/client/go/internal/admin/vespa-wrapper/startcbinary/numactl.go +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -// Author: arnej - -package startcbinary - -import ( - "fmt" - "strconv" - "strings" - - "github.com/vespa-engine/vespa/client/go/internal/admin/envvars" - "github.com/vespa-engine/vespa/client/go/internal/admin/trace" - "github.com/vespa-engine/vespa/client/go/internal/util" -) - -func (p *ProgSpec) configureNumaCtl() { - p.shouldUseNumaCtl = false - p.numaSocket = -1 - if p.getenv(envvars.VESPA_NO_NUMACTL) != "" { - return - } - backticks := util.BackTicksIgnoreStderr - out, err := backticks.Run("numactl", "--hardware") - trace.Debug("numactl --hardware says:", out) - if err != nil { - trace.Trace("numactl error:", err) - return - } - outfoo, errfoo := backticks.Run("numactl", "--interleave", "all", "echo", "foo") - if errfoo != nil { - trace.Trace("cannot run with numactl:", errfoo) - return - } - if outfoo != "foo\n" { - trace.Trace("bad numactl output:", outfoo) - return - } - p.shouldUseNumaCtl = true - if affinity := p.getenv(envvars.VESPA_AFFINITY_CPU_SOCKET); affinity != "" { - wantSocket, _ := strconv.Atoi(affinity) - trace.Debug("want socket:", wantSocket) - parts := strings.Fields(out) - for idx := 0; idx+2 < len(parts); idx++ { - if parts[idx] == "available:" && parts[idx+2] == "nodes" { - numSockets, _ := strconv.Atoi(parts[idx+1]) - trace.Debug("numSockets:", numSockets) - if numSockets > 1 { - p.numaSocket = (wantSocket % numSockets) - return - } - } - } - } -} - -func (p *ProgSpec) numaCtlBinary() string { - return "numactl" -} - -func (p *ProgSpec) prependNumaCtl(args []string) []string { - v := util.NewArrayList[string](5 + len(args)) - v.Append("numactl") - if p.numaSocket >= 0 { - v.Append(fmt.Sprintf("--cpunodebind=%d", p.numaSocket)) - v.Append(fmt.Sprintf("--membind=%d", p.numaSocket)) - } else { - v.Append("--interleave") - v.Append("all") - } - v.AppendAll(args...) - return v -} diff --git a/client/go/internal/admin/vespa-wrapper/startcbinary/numactl_test.go b/client/go/internal/admin/vespa-wrapper/startcbinary/numactl_test.go deleted file mode 100644 index 65f52be988e..00000000000 --- a/client/go/internal/admin/vespa-wrapper/startcbinary/numactl_test.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package startcbinary - -import ( - "fmt" - "os" - "runtime" - "strings" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/vespa-engine/vespa/client/go/internal/admin/trace" -) - -func setup(t *testing.T, testFileName string) { - trace.AdjustVerbosity(1) - mockBinParent = strings.TrimSuffix(testFileName, "/numactl_test.go") - tmpBin = t.TempDir() + "/mock.bin.numactl_test" - err := os.MkdirAll(tmpBin, 0755) - assert.Nil(t, err) - t.Setenv("PATH", fmt.Sprintf("%s:%s", tmpBin, os.Getenv("PATH"))) -} - -func TestNumaCtlDetection(t *testing.T) { - if runtime.GOOS == "windows" { - return - } - _, tfn, _, _ := runtime.Caller(0) - setup(t, tfn) - orig := []string{"/bin/myprog", "-c", "cfgid"} - spec := NewProgSpec(orig) - - useMock("no-numactl", "numactl") - spec.configureNumaCtl() - assert.Equal(t, false, spec.shouldUseNumaCtl) - - useMock("bad-numactl", "numactl") - spec.configureNumaCtl() - assert.Equal(t, false, spec.shouldUseNumaCtl) - - t.Setenv("VESPA_AFFINITY_CPU_SOCKET", "") - useMock("good-numactl", "numactl") - spec.configureNumaCtl() - assert.Equal(t, true, spec.shouldUseNumaCtl) - assert.Equal(t, -1, spec.numaSocket) - argv := spec.prependNumaCtl(orig) - trace.Trace("argv:", argv) - assert.Equal(t, 6, len(argv)) - assert.Equal(t, "numactl", argv[0]) - assert.Equal(t, "--interleave", argv[1]) - assert.Equal(t, "all", argv[2]) - assert.Equal(t, "/bin/myprog-bin", argv[3]) - assert.Equal(t, "-c", argv[4]) - assert.Equal(t, "cfgid", argv[5]) - - t.Setenv("VESPA_AFFINITY_CPU_SOCKET", "0") - spec.configureNumaCtl() - assert.Equal(t, true, spec.shouldUseNumaCtl) - assert.Equal(t, 0, spec.numaSocket) - argv = spec.prependNumaCtl(orig) - trace.Trace("argv:", argv) - assert.Equal(t, 6, len(argv)) - assert.Equal(t, "numactl", argv[0]) - assert.Equal(t, "--cpunodebind=0", argv[1]) - assert.Equal(t, "--membind=0", argv[2]) - assert.Equal(t, "/bin/myprog-bin", argv[3]) - assert.Equal(t, "-c", argv[4]) - assert.Equal(t, "cfgid", argv[5]) - - t.Setenv("VESPA_AFFINITY_CPU_SOCKET", "1") - spec.configureNumaCtl() - assert.Equal(t, true, spec.shouldUseNumaCtl) - assert.Equal(t, 1, spec.numaSocket) - argv = spec.prependNumaCtl(orig) - trace.Trace("argv:", argv) - assert.Equal(t, 6, len(argv)) - assert.Equal(t, "numactl", argv[0]) - assert.Equal(t, "--cpunodebind=1", argv[1]) - assert.Equal(t, "--membind=1", argv[2]) - assert.Equal(t, "/bin/myprog-bin", argv[3]) - assert.Equal(t, "-c", argv[4]) - assert.Equal(t, "cfgid", argv[5]) - - t.Setenv("VESPA_AFFINITY_CPU_SOCKET", "2") - spec.configureNumaCtl() - assert.Equal(t, true, spec.shouldUseNumaCtl) - assert.Equal(t, 0, spec.numaSocket) - -} diff --git a/client/go/internal/admin/vespa-wrapper/startcbinary/progspec.go b/client/go/internal/admin/vespa-wrapper/startcbinary/progspec.go index 9975f6c3c90..b0dcc402893 100644 --- a/client/go/internal/admin/vespa-wrapper/startcbinary/progspec.go +++ b/client/go/internal/admin/vespa-wrapper/startcbinary/progspec.go @@ -8,56 +8,21 @@ import ( "strings" "github.com/vespa-engine/vespa/client/go/internal/admin/envvars" - "github.com/vespa-engine/vespa/client/go/internal/admin/trace" + "github.com/vespa-engine/vespa/client/go/internal/admin/prog" ) -type ProgSpec struct { - Program string - Args []string - BaseName string - Env map[string]string - numaSocket int - shouldUseCallgrind bool - shouldUseValgrind bool - shouldUseNumaCtl bool - shouldUseVespaMalloc bool - vespaMallocPreload string -} - -func NewProgSpec(argv []string) *ProgSpec { +func NewProgSpec(argv []string) *prog.Spec { progName := argv[0] binProg := progName + "-bin" - p := ProgSpec{ - Program: binProg, - Args: argv, - BaseName: baseNameOf(progName), - Env: make(map[string]string), - numaSocket: -1, - } + p := prog.NewSpec(argv) + p.Program = binProg p.Args[0] = binProg - return &p -} - -func baseNameOf(s string) string { - idx := strings.LastIndex(s, "/") - idx++ - return s[idx:] -} - -func (p *ProgSpec) setenv(k, v string) { - p.Env[k] = v -} - -func (p *ProgSpec) getenv(k string) string { - if v, ok := p.Env[k]; ok { - return v - } - return os.Getenv(k) + return p } -func (p *ProgSpec) prependPath(dirName string) { +func prependPath(dirName string, p *prog.Spec) { pathList := []string{dirName} - oldPath := p.getenv(envvars.PATH) + oldPath := p.Getenv(envvars.PATH) if oldPath == "" { oldPath = "/usr/bin" } @@ -67,78 +32,6 @@ func (p *ProgSpec) prependPath(dirName string) { } } newPath := strings.Join(pathList, ":") - p.setenv(envvars.PATH, newPath) + p.Setenv(envvars.PATH, newPath) os.Setenv(envvars.PATH, newPath) } - -func (p *ProgSpec) matchesListEnv(envVarName string) bool { - return p.matchesListString(p.getenv(envVarName)) -} - -func (p *ProgSpec) matchesListString(env string) bool { - if env == "all" { - trace.Debug(p.Program, "always matching in:", env) - return true - } - parts := strings.Fields(env) - for _, part := range parts { - if p.BaseName == part { - trace.Debug(p.Program, "has basename matching in:", env) - return true - } - trace.Debug("checking matching:", p.BaseName, "!=", part) - } - return false -} - -func (p *ProgSpec) valueFromListEnv(envVarName string) string { - return p.valueFromListString(p.getenv(envVarName)) -} - -func (p *ProgSpec) valueFromListString(env string) string { - parts := strings.Fields(env) - for _, part := range parts { - idx := strings.Index(part, "=") - if idx <= 0 { - trace.Trace("expected key=value, but got:", part) - continue - } - partName := part[:idx] - idx++ - partValue := part[idx:] - if p.BaseName == partName || partName == "all" { - trace.Debug(p.Program, "has basename matching in:", env) - return partValue - } - trace.Debug("checking matching:", p.BaseName, "!=", part) - } - return "" -} - -func (spec *ProgSpec) effectiveEnv() []string { - env := make(map[string]string) - for _, entry := range os.Environ() { - addInMap := func(kv string) bool { - for idx, elem := range kv { - if elem == '=' { - k := kv[:idx] - env[k] = kv - return true - } - } - return false - } - if !addInMap(entry) { - env[entry] = "" - } - } - for k, v := range spec.Env { - trace.Trace("add to environment:", k, "=", v) - env[k] = k + "=" + v - } - envv := make([]string, 0, len(env)) - for _, v := range env { - envv = append(envv, v) - } - return envv -} diff --git a/client/go/internal/admin/vespa-wrapper/startcbinary/progspec_test.go b/client/go/internal/admin/vespa-wrapper/startcbinary/progspec_test.go deleted file mode 100644 index be113e4e350..00000000000 --- a/client/go/internal/admin/vespa-wrapper/startcbinary/progspec_test.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package startcbinary - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestProgSpec(t *testing.T) { - spec := NewProgSpec([]string{"/opt/vespa/bin/foobar"}) - var b bool - - b = spec.matchesListString("") - assert.Equal(t, false, b) - b = spec.matchesListString("foobar") - assert.Equal(t, true, b) - b = spec.matchesListString("foo bar") - assert.Equal(t, false, b) - b = spec.matchesListString("one foobar") - assert.Equal(t, true, b) - b = spec.matchesListString("foobar two") - assert.Equal(t, true, b) - b = spec.matchesListString("one foobar two") - assert.Equal(t, true, b) - b = spec.matchesListString("all") - assert.Equal(t, true, b) - - var s string - s = spec.valueFromListString("") - assert.Equal(t, "", s) - s = spec.valueFromListString("foobar=123") - assert.Equal(t, "123", s) - s = spec.valueFromListString("one=1 foobar=123 two=2") - assert.Equal(t, "123", s) - s = spec.valueFromListString("one=1 all=123") - assert.Equal(t, "123", s) -} diff --git a/client/go/internal/admin/vespa-wrapper/startcbinary/startcbinary.go b/client/go/internal/admin/vespa-wrapper/startcbinary/startcbinary.go index f5e58e59808..a062f631b2c 100644 --- a/client/go/internal/admin/vespa-wrapper/startcbinary/startcbinary.go +++ b/client/go/internal/admin/vespa-wrapper/startcbinary/startcbinary.go @@ -7,20 +7,19 @@ import ( "fmt" "os" - "github.com/vespa-engine/vespa/client/go/internal/admin/envvars" - "github.com/vespa-engine/vespa/client/go/internal/util" + "github.com/vespa-engine/vespa/client/go/internal/admin/prog" ) -func startCbinary(spec *ProgSpec) int { - spec.configureCommonEnv() - spec.configurePath() - spec.configureTuning() - spec.configureValgrind() - spec.configureNumaCtl() - spec.configureHugePages() - spec.configureUseMadvise() - spec.configureVespaMalloc() - err := spec.run() +func startCbinary(spec *prog.Spec) int { + configureCommonEnv(spec) + configurePath(spec) + configureTuning() + spec.ConfigureValgrind() + spec.ConfigureNumaCtl() + spec.ConfigureHugePages() + spec.ConfigureUseMadvise() + spec.ConfigureVespaMalloc() + err := spec.Run() if err != nil { fmt.Fprintln(os.Stderr, err) return 1 @@ -28,20 +27,3 @@ func startCbinary(spec *ProgSpec) int { return 0 } } - -func (spec *ProgSpec) run() error { - prog := spec.Program - args := spec.Args - if spec.shouldUseValgrind { - args = spec.prependValgrind(args) - prog = spec.valgrindBinary() - } else if spec.shouldUseNumaCtl { - args = spec.prependNumaCtl(args) - prog = spec.numaCtlBinary() - } - if spec.shouldUseVespaMalloc { - spec.setenv(envvars.LD_PRELOAD, spec.vespaMallocPreload) - } - envv := spec.effectiveEnv() - return util.Execvpe(prog, args, envv) -} diff --git a/client/go/internal/admin/vespa-wrapper/startcbinary/tuning.go b/client/go/internal/admin/vespa-wrapper/startcbinary/tuning.go index 57230629d7a..f839d6a0946 100644 --- a/client/go/internal/admin/vespa-wrapper/startcbinary/tuning.go +++ b/client/go/internal/admin/vespa-wrapper/startcbinary/tuning.go @@ -7,7 +7,7 @@ import ( "github.com/vespa-engine/vespa/client/go/internal/util" ) -func (spec *ProgSpec) configureTuning() { +func configureTuning() { util.OptionallyReduceTimerFrequency() util.TuneResourceLimits() } diff --git a/client/go/internal/admin/vespa-wrapper/startcbinary/valgrind.go b/client/go/internal/admin/vespa-wrapper/startcbinary/valgrind.go deleted file mode 100644 index cccb37df8e5..00000000000 --- a/client/go/internal/admin/vespa-wrapper/startcbinary/valgrind.go +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -// Author: arnej - -package startcbinary - -import ( - "fmt" - "os" - "strings" - - "github.com/vespa-engine/vespa/client/go/internal/admin/envvars" - "github.com/vespa-engine/vespa/client/go/internal/admin/trace" - "github.com/vespa-engine/vespa/client/go/internal/util" - "github.com/vespa-engine/vespa/client/go/internal/vespa" -) - -func (p *ProgSpec) configureValgrind() { - p.shouldUseValgrind = false - p.shouldUseCallgrind = false - env := p.getenv(envvars.VESPA_USE_VALGRIND) - allValgrind := env == "all" - parts := strings.Split(env, " ") - for _, part := range parts { - if p.BaseName == part || allValgrind { - trace.Trace("using valgrind as", p.Program, "has basename in", envvars.VESPA_USE_VALGRIND, "=>", env) - backticks := util.BackTicksWithStderr - out, err := backticks.Run("which", "valgrind") - if err != nil { - trace.Trace("no valgrind, 'which' fails:", err, "=>", out) - return - } - if opts := p.getenv(envvars.VESPA_VALGRIND_OPT); strings.Contains(opts, "callgrind") { - p.shouldUseCallgrind = true - } - p.shouldUseValgrind = true - return - } - trace.Debug("checking", envvars.VESPA_USE_VALGRIND, ":", p.BaseName, "!=", part) - } -} - -func (p *ProgSpec) valgrindBinary() string { - return "valgrind" -} - -func (p *ProgSpec) valgrindOptions() []string { - env := p.getenv(envvars.VESPA_VALGRIND_OPT) - if env != "" { - return strings.Fields(env) - } - result := []string{ - "--num-callers=32", - "--run-libc-freeres=yes", - "--track-origins=yes", - "--freelist-vol=1000000000", - "--leak-check=full", - "--show-reachable=yes", - } - result = addValgrindSuppression(result, "etc/vespa/valgrind-suppressions.txt") - result = addValgrindSuppression(result, "etc/vespa/suppressions.txt") - return result -} - -func addValgrindSuppression(r []string, fn string) []string { - existsOk, fileName := vespa.HasFileUnderVespaHome(fn) - if existsOk { - r = append(r, fmt.Sprintf("--suppressions=%s", fileName)) - } - return r -} - -func (p *ProgSpec) valgrindLogOption() string { - return fmt.Sprintf("--log-file=%s/tmp/valgrind.%s.log.%d", vespa.FindHome(), p.BaseName, os.Getpid()) -} - -func (p *ProgSpec) prependValgrind(args []string) []string { - v := util.NewArrayList[string](15 + len(args)) - v.Append(p.valgrindBinary()) - v.AppendAll(p.valgrindOptions()...) - v.Append(p.valgrindLogOption()) - v.AppendAll(args...) - return v -} diff --git a/client/go/internal/admin/vespa-wrapper/startcbinary/valgrind_test.go b/client/go/internal/admin/vespa-wrapper/startcbinary/valgrind_test.go deleted file mode 100644 index 1a105d66c4a..00000000000 --- a/client/go/internal/admin/vespa-wrapper/startcbinary/valgrind_test.go +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package startcbinary - -import ( - "fmt" - "os" - "runtime" - "strings" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/vespa-engine/vespa/client/go/internal/admin/trace" - "github.com/vespa-engine/vespa/client/go/internal/util" -) - -var tmpBin string -var mockBinParent string - -func useMock(prog, target string) { - mock := fmt.Sprintf("%s/mockbin/%s", mockBinParent, prog) - symlink := fmt.Sprintf("%s/%s", tmpBin, target) - os.Remove(symlink) - err := os.Symlink(mock, symlink) - if err != nil { - util.JustExitWith(err) - } -} - -func setupValgrind(t *testing.T, testFileName string) { - trace.AdjustVerbosity(1) - t.Setenv("VESPA_HOME", mockBinParent+"/mock_vespahome") - mockBinParent = strings.TrimSuffix(testFileName, "/valgrind_test.go") - tmpBin = t.TempDir() + "/mock.bin.valgrind_test" - err := os.MkdirAll(tmpBin, 0755) - assert.Nil(t, err) - t.Setenv("PATH", fmt.Sprintf("%s:%s", tmpBin, os.Getenv("PATH"))) -} - -func TestValgrindDetection(t *testing.T) { - if runtime.GOOS == "windows" { - return - } - _, tfn, _, _ := runtime.Caller(0) - setupValgrind(t, tfn) - spec := NewProgSpec([]string{"/opt/vespa/bin/foobar"}) - var argv []string - - useMock("has-valgrind", "which") - - t.Setenv("VESPA_USE_VALGRIND", "") - spec.configureValgrind() - assert.Equal(t, false, spec.shouldUseValgrind) - assert.Equal(t, false, spec.shouldUseCallgrind) - - t.Setenv("VESPA_USE_VALGRIND", "all") - spec.configureValgrind() - assert.Equal(t, true, spec.shouldUseValgrind) - assert.Equal(t, false, spec.shouldUseCallgrind) - - t.Setenv("VESPA_USE_VALGRIND", "foo bar") - spec.configureValgrind() - assert.Equal(t, false, spec.shouldUseValgrind) - assert.Equal(t, false, spec.shouldUseCallgrind) - - t.Setenv("VESPA_USE_VALGRIND", "foobar") - spec.configureValgrind() - assert.Equal(t, true, spec.shouldUseValgrind) - assert.Equal(t, false, spec.shouldUseCallgrind) - - argv = spec.prependValgrind([]string{"/bin/myprog", "-c", "cfgid"}) - trace.Trace("argv:", argv) - assert.Equal(t, 11, len(argv)) - assert.Equal(t, "valgrind", argv[0]) - assert.Equal(t, "/bin/myprog", argv[8]) - - t.Setenv("VESPA_USE_VALGRIND", "another foobar yetmore") - spec.configureValgrind() - assert.Equal(t, true, spec.shouldUseValgrind) - assert.Equal(t, false, spec.shouldUseCallgrind) - - t.Setenv("VESPA_VALGRIND_OPT", "--tool=callgrind") - spec.configureValgrind() - assert.Equal(t, true, spec.shouldUseValgrind) - assert.Equal(t, true, spec.shouldUseCallgrind) - - argv = spec.prependValgrind([]string{"/bin/myprog", "-c", "cfgid"}) - trace.Trace("argv:", argv) - assert.Equal(t, 6, len(argv)) - assert.Equal(t, "valgrind", argv[0]) - assert.Equal(t, "--tool=callgrind", argv[1]) - assert.Equal(t, "/bin/myprog", argv[3]) - - useMock("no-valgrind", "which") - spec.configureValgrind() - assert.Equal(t, false, spec.shouldUseValgrind) - assert.Equal(t, false, spec.shouldUseCallgrind) -} diff --git a/client/go/internal/admin/vespa-wrapper/startcbinary/vespamalloc.go b/client/go/internal/admin/vespa-wrapper/startcbinary/vespamalloc.go deleted file mode 100644 index c6d53e1d03c..00000000000 --- a/client/go/internal/admin/vespa-wrapper/startcbinary/vespamalloc.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -// Author: arnej - -package startcbinary - -import ( - "fmt" - - "github.com/vespa-engine/vespa/client/go/internal/admin/envvars" - "github.com/vespa-engine/vespa/client/go/internal/admin/trace" - "github.com/vespa-engine/vespa/client/go/internal/vespa" -) - -func vespaMallocLib(suf string) string { - prefixes := []string{"lib64", "lib"} - for _, pre := range prefixes { - fn := fmt.Sprintf("%s/vespa/malloc/%s", pre, suf) - existsOk, fileName := vespa.HasFileUnderVespaHome(fn) - if existsOk { - trace.Debug("found library:", fileName) - return fileName - } - trace.Debug("bad or missing library:", fn) - } - return "" -} - -func (p *ProgSpec) configureVespaMalloc() { - p.shouldUseVespaMalloc = false - if p.matchesListEnv(envvars.VESPA_USE_NO_VESPAMALLOC) { - trace.Trace("use no vespamalloc:", p.BaseName) - return - } - if p.shouldUseValgrind && !p.shouldUseCallgrind { - trace.Trace("use valgrind, so no vespamalloc:", p.BaseName) - return - } - var useFile string - if p.matchesListEnv(envvars.VESPA_USE_VESPAMALLOC_DST) { - useFile = vespaMallocLib("libvespamallocdst16.so") - } else if p.matchesListEnv(envvars.VESPA_USE_VESPAMALLOC_D) { - useFile = vespaMallocLib("libvespamallocd.so") - } else if p.matchesListEnv(envvars.VESPA_USE_VESPAMALLOC) { - useFile = vespaMallocLib("libvespamalloc.so") - } - trace.Trace("use file:", useFile) - if useFile == "" { - return - } - if loadAsHuge := p.getenv(envvars.VESPA_LOAD_CODE_AS_HUGEPAGES); loadAsHuge != "" { - otherFile := vespaMallocLib("libvespa_load_as_huge.so") - useFile = fmt.Sprintf("%s:%s", useFile, otherFile) - } - p.considerEnvFallback(envvars.VESPA_MALLOC_HUGEPAGES, envvars.VESPA_USE_HUGEPAGES) - p.vespaMallocPreload = useFile - p.shouldUseVespaMalloc = true -} diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index 06568dd35c3..a6447ef8d2e 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -18,10 +18,11 @@ import ( func addFeedFlags(cmd *cobra.Command, options *feedOptions) { cmd.PersistentFlags().IntVar(&options.connections, "connections", 8, "The number of connections to use") cmd.PersistentFlags().StringVar(&options.compression, "compression", "auto", `Compression mode to use. Default is "auto" which compresses large documents. Must be "auto", "gzip" or "none"`) + cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Invididual feed operation timeout in seconds. 0 to disable") + cmd.PersistentFlags().IntVar(&options.doomSecs, "max-failure-seconds", 0, "Exit if given number of seconds elapse without any successful operations. 0 to disable") + cmd.PersistentFlags().BoolVar(&options.verbose, "verbose", false, "Verbose mode. Print successful operations in addition to errors") cmd.PersistentFlags().StringVar(&options.route, "route", "", "Target Vespa route for feed operations") cmd.PersistentFlags().IntVar(&options.traceLevel, "trace", 0, "The trace level of network traffic. 0 to disable") - cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Feed operation timeout in seconds. 0 to disable") - cmd.PersistentFlags().BoolVar(&options.verbose, "verbose", false, "Verbose mode. Print successful operations in addition to errors") memprofile := "memprofile" cpuprofile := "cpuprofile" cmd.PersistentFlags().StringVar(&options.memprofile, memprofile, "", "Write a heap profile to given file") @@ -38,44 +39,34 @@ type feedOptions struct { verbose bool traceLevel int timeoutSecs int - memprofile string - cpuprofile string + doomSecs int + + memprofile string + cpuprofile string } func newFeedCmd(cli *CLI) *cobra.Command { var options feedOptions cmd := &cobra.Command{ - Use: "feed FILE", + Use: "feed FILE [FILE]...", Short: "Feed documents to a Vespa cluster", Long: `Feed documents to a Vespa cluster. -A high performance feeding client. This can be used to feed large amounts of -documents to a Vespa cluster efficiently. +This command can be used to feed large amounts of documents to a Vespa cluster +efficiently. The contents of FILE must be either a JSON array or JSON objects separated by newline (JSONL). If FILE is a single dash ('-'), documents will be read from standard input. `, - Example: `$ vespa feed documents.jsonl -$ cat documents.jsonl | vespa feed - -`, - Args: cobra.ExactArgs(1), + Example: `$ vespa feed docs.jsonl moredocs.json +$ cat docs.jsonl | vespa feed -`, + Args: cobra.MinimumNArgs(1), DisableAutoGenTag: true, SilenceUsage: true, Hidden: true, // TODO(mpolden): Remove when ready for public use RunE: func(cmd *cobra.Command, args []string) error { - var r io.Reader - if args[0] == "-" { - r = cli.Stdin - } else { - f, err := os.Open(args[0]) - if err != nil { - return err - } - defer f.Close() - r = f - } if options.cpuprofile != "" { f, err := os.Create(options.cpuprofile) if err != nil { @@ -84,7 +75,7 @@ $ cat documents.jsonl | vespa feed - pprof.StartCPUProfile(f) defer pprof.StopCPUProfile() } - err := feed(r, cli, options) + err := feed(args, options, cli) if options.memprofile != "" { f, err := os.Create(options.memprofile) if err != nil { @@ -123,7 +114,7 @@ func (opts feedOptions) compressionMode() (document.Compression, error) { return 0, errHint(fmt.Errorf("invalid compression mode: %s", opts.compression), `Must be "auto", "gzip" or "none"`) } -func feed(r io.Reader, cli *CLI, options feedOptions) error { +func feed(files []string, options feedOptions, cli *CLI) error { service, err := documentService(cli) if err != nil { return err @@ -139,25 +130,37 @@ func feed(r io.Reader, cli *CLI, options feedOptions) error { Route: options.route, TraceLevel: options.traceLevel, BaseURL: service.BaseURL, + NowFunc: cli.now, }, clients) throttler := document.NewThrottler(options.connections) - // TODO(mpolden): Make doom duration configurable - circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0) + circuitBreaker := document.NewCircuitBreaker(10*time.Second, time.Duration(options.doomSecs)*time.Second) dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, cli.Stderr, options.verbose) - dec := document.NewDecoder(r) - start := cli.now() - for { - doc, err := dec.Decode() - if err == io.EOF { - break - } - if err != nil { - cli.printErr(fmt.Errorf("failed to decode document: %w", err)) + for _, name := range files { + var r io.ReadCloser + if len(files) == 1 && name == "-" { + r = io.NopCloser(cli.Stdin) + } else { + f, err := os.Open(name) + if err != nil { + return err + } + r = f } - if err := dispatcher.Enqueue(doc); err != nil { - cli.printErr(err) + dec := document.NewDecoder(r) + for { + doc, err := dec.Decode() + if err == io.EOF { + break + } + if err != nil { + cli.printErr(fmt.Errorf("failed to decode document: %w", err)) + } + if err := dispatcher.Enqueue(doc); err != nil { + cli.printErr(err) + } } + r.Close() } if err := dispatcher.Close(); err != nil { return err diff --git a/client/go/internal/cli/cmd/feed_test.go b/client/go/internal/cli/cmd/feed_test.go index 521d2b2abd0..467d55a0a6e 100644 --- a/client/go/internal/cli/cmd/feed_test.go +++ b/client/go/internal/cli/cmd/feed_test.go @@ -31,47 +31,50 @@ func TestFeed(t *testing.T) { cli.now = clock.now td := t.TempDir() - jsonFile := filepath.Join(td, "docs.jsonl") - err := os.WriteFile(jsonFile, []byte(`{ + doc := []byte(`{ "put": "id:ns:type::doc1", "fields": {"foo": "123"} -}`), 0644) - - require.Nil(t, err) +}`) + jsonFile1 := filepath.Join(td, "docs1.jsonl") + jsonFile2 := filepath.Join(td, "docs2.jsonl") + require.Nil(t, os.WriteFile(jsonFile1, doc, 0644)) + require.Nil(t, os.WriteFile(jsonFile2, doc, 0644)) httpClient.NextResponseString(200, `{"message":"OK"}`) - require.Nil(t, cli.Run("feed", jsonFile)) + httpClient.NextResponseString(200, `{"message":"OK"}`) + require.Nil(t, cli.Run("feed", jsonFile1, jsonFile2)) assert.Equal(t, "", stderr.String()) want := `{ - "feeder.seconds": 1.000, - "feeder.ok.count": 1, - "feeder.ok.rate": 1.000, + "feeder.seconds": 5.000, + "feeder.ok.count": 2, + "feeder.ok.rate": 0.400, "feeder.error.count": 0, "feeder.inflight.count": 0, - "http.request.count": 1, - "http.request.bytes": 25, + "http.request.count": 2, + "http.request.bytes": 50, "http.request.MBps": 0.000, "http.exception.count": 0, - "http.response.count": 1, - "http.response.bytes": 16, + "http.response.count": 2, + "http.response.bytes": 32, "http.response.MBps": 0.000, "http.response.error.count": 0, - "http.response.latency.millis.min": 0, - "http.response.latency.millis.avg": 0, - "http.response.latency.millis.max": 0, + "http.response.latency.millis.min": 1000, + "http.response.latency.millis.avg": 1000, + "http.response.latency.millis.max": 1000, "http.response.code.counts": { - "200": 1 + "200": 2 } } ` assert.Equal(t, want, stdout.String()) stdout.Reset() - cli.Stdin = bytes.NewBuffer([]byte(`{ - "put": "id:ns:type::doc1", - "fields": {"foo": "123"} -}`)) + var stdinBuf bytes.Buffer + stdinBuf.Write(doc) + stdinBuf.Write(doc) + cli.Stdin = &stdinBuf + httpClient.NextResponseString(200, `{"message":"OK"}`) httpClient.NextResponseString(200, `{"message":"OK"}`) require.Nil(t, cli.Run("feed", "-")) assert.Equal(t, want, stdout.String()) diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 5c99f3bf056..51e60e4e131 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -128,7 +128,6 @@ func (d *Dispatcher) start() { return } d.listPool.New = func() any { return list.New() } - d.ready = make(chan Id, 4096) d.results = make(chan Result, 4096) d.msgs = make(chan string, 4096) d.started = true @@ -164,27 +163,35 @@ func (d *Dispatcher) enqueue(op documentOp) error { } d.mu.Unlock() group.add(op, op.attempts > 0) - d.enqueueWithSlot(op.document.Id) + d.dispatch(op.document.Id, group) return nil } -func (d *Dispatcher) enqueueWithSlot(id Id) { +func (d *Dispatcher) dispatch(id Id, group *documentGroup) { + if !d.canDispatch() { + d.msgs <- fmt.Sprintf("refusing to dispatch document %s: too many errors", id) + return + } d.acquireSlot() - d.ready <- id - d.throttler.Sent() - d.dispatch() -} - -func (d *Dispatcher) dispatch() { d.workerWg.Add(1) go func() { defer d.workerWg.Done() - id := <-d.ready - d.mu.RLock() - group := d.inflight[id.String()] - d.mu.RUnlock() d.sendDocumentIn(group) }() + d.throttler.Sent() +} + +func (d *Dispatcher) canDispatch() bool { + switch d.circuitBreaker.State() { + case CircuitClosed: + return true + case CircuitHalfOpen: + time.Sleep(time.Second) + return true + case CircuitOpen: + return false + } + panic("invalid circuit state") } func (d *Dispatcher) acquireSlot() { diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go index d066f5bc9ae..2e2e9a5abbd 100644 --- a/client/go/internal/vespa/document/dispatcher_test.go +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -36,6 +36,12 @@ func (f *mockFeeder) Send(doc Document) Result { return result } +type mockCircuitBreaker struct{ state CircuitState } + +func (c *mockCircuitBreaker) Success() {} +func (c *mockCircuitBreaker) Error(err error) {} +func (c *mockCircuitBreaker) State() CircuitState { return c.state } + func TestDispatcher(t *testing.T) { feeder := &mockFeeder{} clock := &manualClock{tick: time.Second} @@ -130,3 +136,32 @@ func TestDispatcherOrderingWithFailures(t *testing.T) { assert.Equal(t, int64(2), dispatcher.Stats().Errors) assert.Equal(t, 6, len(feeder.documents)) } + +func TestDispatcherOpenCircuit(t *testing.T) { + feeder := &mockFeeder{} + doc := Document{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut} + clock := &manualClock{tick: time.Second} + throttler := newThrottler(8, clock.now) + breaker := &mockCircuitBreaker{} + dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false) + dispatcher.Enqueue(doc) + breaker.state = CircuitOpen + dispatcher.Enqueue(doc) + dispatcher.Close() + assert.Equal(t, 1, len(feeder.documents)) +} + +func BenchmarkDocumentDispatching(b *testing.B) { + feeder := &mockFeeder{} + clock := &manualClock{tick: time.Second} + throttler := newThrottler(8, clock.now) + breaker := NewCircuitBreaker(time.Second, 0) + dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false) + doc := Document{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)} + b.ResetTimer() // ignore setup time + + for n := 0; n < b.N; n++ { + dispatcher.enqueue(documentOp{document: doc}) + dispatcher.workerWg.Wait() + } +} diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index 51b6fa4de39..877bcc5edce 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -11,6 +11,7 @@ import ( "net/url" "strconv" "strings" + "sync" "sync/atomic" "time" @@ -31,6 +32,7 @@ type Client struct { httpClients []countingHTTPClient now func() time.Time sendCount int32 + gzippers sync.Pool } // ClientOptions specifices the configuration options of a feed client. @@ -40,6 +42,7 @@ type ClientOptions struct { Route string TraceLevel int Compression Compression + NowFunc func() time.Time } type countingHTTPClient struct { @@ -73,11 +76,17 @@ func NewClient(options ClientOptions, httpClients []util.HTTPClient) *Client { for _, client := range httpClients { countingClients = append(countingClients, countingHTTPClient{client: client}) } - return &Client{ + nowFunc := options.NowFunc + if nowFunc == nil { + nowFunc = time.Now + } + c := &Client{ options: options, httpClients: countingClients, - now: time.Now, + now: nowFunc, } + c.gzippers.New = func() any { return gzip.NewWriter(io.Discard) } + return c } func (c *Client) queryParams() url.Values { @@ -162,18 +171,25 @@ func (c *Client) leastBusyClient() *countingHTTPClient { return &leastBusy } +func (c *Client) gzipWriter(w io.Writer) *gzip.Writer { + gzipWriter := c.gzippers.Get().(*gzip.Writer) + gzipWriter.Reset(w) + return gzipWriter +} + func (c *Client) createRequest(method, url string, body []byte) (*http.Request, error) { var r io.Reader useGzip := c.options.Compression == CompressionGzip || (c.options.Compression == CompressionAuto && len(body) > 512) if useGzip { var buf bytes.Buffer - w := gzip.NewWriter(&buf) + w := c.gzipWriter(&buf) if _, err := w.Write(body); err != nil { return nil, err } if err := w.Close(); err != nil { return nil, err } + c.gzippers.Put(w) r = &buf } else { r = bytes.NewReader(body) diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go index 314113c53be..f67368b5128 100644 --- a/client/go/internal/vespa/document/http_test.go +++ b/client/go/internal/vespa/document/http_test.go @@ -293,3 +293,26 @@ func TestClientFeedURL(t *testing.T) { } } } + +func benchmarkClientSend(b *testing.B, compression Compression, document Document) { + httpClient := mock.HTTPClient{} + client := NewClient(ClientOptions{ + Compression: compression, + BaseURL: "https://example.com:1337", + Timeout: time.Duration(5 * time.Second), + }, []util.HTTPClient{&httpClient}) + b.ResetTimer() // ignore setup + for n := 0; n < b.N; n++ { + client.Send(document) + } +} + +func BenchmarkClientSend(b *testing.B) { + doc := Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "my document"}}`)} + benchmarkClientSend(b, CompressionNone, doc) +} + +func BenchmarkClientSendCompressed(b *testing.B) { + doc := Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "my document"}}`)} + benchmarkClientSend(b, CompressionGzip, doc) +} |