Skip to content

Commit

Permalink
cleanup(server.Handler): hide public fields
Browse files Browse the repository at this point in the history
We initialize these fields using the constructor and then
there's no real need to expose them to any consumer.

While there, fix a log message string.
  • Loading branch information
bassosimone committed Jan 29, 2024
1 parent ea928aa commit f03a731
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 66 deletions.
3 changes: 1 addition & 2 deletions cmd/dash-client/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ func testhelper(t *testing.T, f func(int, testconfig)) {
t.Skip("Skipping this test in short mode")
}
mux := http.NewServeMux()
handler := server.NewHandler("../../testdata")
handler := server.NewHandler("../../testdata", log.Log)
ctx, cancel := context.WithCancel(context.Background())
handler.Logger = log.Log
handler.StartReaper(ctx)
handler.RegisterHandlers(mux)
server := httptest.NewServer(mux)
Expand Down
15 changes: 7 additions & 8 deletions cmd/dash-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
//
// Usage:
//
// dash-server [-datadir <dirpath>]
// [-http-listen-address <endpoint>]
// [-https-listen-address <endpoint>]
// [-prometheusx.listen-address <endpoint>]
// [-tls-cert <filepath>]
// [-tls-key <filepath>]
// dash-server [-datadir <dirpath>]
// [-http-listen-address <endpoint>]
// [-https-listen-address <endpoint>]
// [-prometheusx.listen-address <endpoint>]
// [-tls-cert <filepath>]
// [-tls-key <filepath>]
//
// The server will listen for incoming DASH experiment requests and
// will keep serving them until it is interrupted.
Expand Down Expand Up @@ -78,10 +78,9 @@ func main() {
promServer := prometheusx.MustServeMetrics()
defer promServer.Close()
mux := http.NewServeMux()
handler := server.NewHandler(*flagDatadir)
handler := server.NewHandler(*flagDatadir, log.Log)
handler.StartReaper(context.Background())
handler.RegisterHandlers(mux)
handler.Logger = log.Log
rootHandler := handlers.LoggingHandler(os.Stdout, mux)
go func() {
rtx.Must(http.ListenAndServeTLS(
Expand Down
59 changes: 29 additions & 30 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"time"

"github.com/google/uuid"
"github.com/neubot/dash/internal"
"github.com/neubot/dash/model"
"github.com/neubot/dash/spec"
)
Expand Down Expand Up @@ -55,16 +54,16 @@ type dependencies struct {
// get rid of sessions that have been running for too much. If you don't
// call StartReaper, you will eventually run out of RAM.
type Handler struct {
// Datadir is the directory where to save measurements.
Datadir string

// Logger is the logger to use. This field is initialized by the
// NewHandler constructor to a do-nothing logger.
Logger model.Logger
// datadir is the directory where to save measurements.
datadir string

// deps contains the [*Handler] dependencies.
deps dependencies

// logger is the logger to use. This field is initialized by the
// NewHandler constructor to a do-nothing logger.
logger model.Logger

// maxIterations is the maximum allowed number of iterations.
maxIterations int64

Expand All @@ -79,11 +78,11 @@ type Handler struct {
}

// NewHandler creates a new [*Handler] instance.
func NewHandler(datadir string) *Handler {
func NewHandler(datadir string, logger model.Logger) *Handler {
handler := &Handler{
Datadir: datadir,
Logger: internal.NoLogger{},
datadir: datadir,
deps: dependencies{}, // initialized later
logger: logger,
maxIterations: 17,
mtx: sync.Mutex{},
sessions: make(map[string]*sessionInfo),
Expand Down Expand Up @@ -199,7 +198,7 @@ func (h *Handler) CountSessions() (count int) {
func (h *Handler) reapStaleSessions() {
h.mtx.Lock()
defer h.mtx.Unlock()
h.Logger.Debugf("reapStaleSessions: inspecting %d sessions", len(h.sessions))
h.logger.Debugf("reapStaleSessions: inspecting %d sessions", len(h.sessions))
now := time.Now()
var stale []string
for UUID, session := range h.sessions {
Expand All @@ -208,7 +207,7 @@ func (h *Handler) reapStaleSessions() {
stale = append(stale, UUID)
}
}
h.Logger.Debugf("reapStaleSessions: reaping %d stale sessions", len(stale))
h.logger.Debugf("reapStaleSessions: reaping %d stale sessions", len(stale))
for _, UUID := range stale {
delete(h.sessions, UUID)
}
Expand All @@ -229,7 +228,7 @@ func (h *Handler) negotiate(w http.ResponseWriter, r *http.Request) {
// Obtain the client's remote address.
address, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
h.Logger.Warnf("negotiate: net.SplitHostPort: %s", err.Error())
h.logger.Warnf("negotiate: net.SplitHostPort: %s", err.Error())
w.WriteHeader(500)
return
}
Expand All @@ -239,7 +238,7 @@ func (h *Handler) negotiate(w http.ResponseWriter, r *http.Request) {
// We assume we're not going to have UUID conflicts.
UUID, err := h.deps.UUIDNewRandom()
if err != nil {
h.Logger.Warnf("negotiate: uuid.NewRandom: %s", err.Error())
h.logger.Warnf("negotiate: uuid.NewRandom: %s", err.Error())
w.WriteHeader(500)
return
}
Expand All @@ -264,7 +263,7 @@ func (h *Handler) negotiate(w http.ResponseWriter, r *http.Request) {

// Make sure we can properly marshal the response.
if err != nil {
h.Logger.Warnf("negotiate: json.Marshal: %s", err.Error())
h.logger.Warnf("negotiate: json.Marshal: %s", err.Error())
w.WriteHeader(500)
return
}
Expand Down Expand Up @@ -320,7 +319,7 @@ func (h *Handler) download(w http.ResponseWriter, r *http.Request) {
sessionID := r.Header.Get(authorization)
state := h.getSessionState(sessionID)
if state == sessionMissing {
h.Logger.Warn("download: session missing")
h.logger.Warn("download: session missing")
w.WriteHeader(400)
return
}
Expand All @@ -333,7 +332,7 @@ func (h *Handler) download(w http.ResponseWriter, r *http.Request) {
// the original implementation returning a value that seems to be much
// more useful and actionable to the client.
if state == sessionExpired {
h.Logger.Warn("download: session expired")
h.logger.Warn("download: session expired")
w.WriteHeader(429)
return
}
Expand All @@ -347,7 +346,7 @@ func (h *Handler) download(w http.ResponseWriter, r *http.Request) {
}
count, err := strconv.Atoi(siz)
if err != nil {
h.Logger.Warnf("download: strconv.Atoi: %s", err.Error())
h.logger.Warnf("download: strconv.Atoi: %s", err.Error())
w.WriteHeader(400)
return
}
Expand All @@ -356,7 +355,7 @@ func (h *Handler) download(w http.ResponseWriter, r *http.Request) {
// the acceptable bounds for the response size.
data, err := h.genbody(&count)
if err != nil {
h.Logger.Warnf("download: genbody: %s", err.Error())
h.logger.Warnf("download: genbody: %s", err.Error())
w.WriteHeader(500)
return
}
Expand All @@ -373,12 +372,12 @@ func (h *Handler) download(w http.ResponseWriter, r *http.Request) {
// savedata is an utility function saving information about this session.
func (h *Handler) savedata(session *sessionInfo) error {
// obtain the directory path where to write
name := path.Join(h.Datadir, "dash", session.stamp.Format("2006/01/02"))
name := path.Join(h.datadir, "dash", session.stamp.Format("2006/01/02"))

// make sure we have the correct directory hierarchy
err := h.deps.OSMkdirAll(name, 0755)
if err != nil {
h.Logger.Warnf("savedata: os.MkdirAll: %s", err.Error())
h.logger.Warnf("savedata: os.MkdirAll: %s", err.Error())
return err
}

Expand All @@ -393,23 +392,23 @@ func (h *Handler) savedata(session *sessionInfo) error {
// unlikely to have conflicts. If I'm wrong, O_EXCL will let us know.
filep, err := h.deps.OSOpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0644)
if err != nil {
h.Logger.Warnf("savedata: os.OpenFile: %s", err.Error())
h.logger.Warnf("savedata: os.OpenFile: %s", err.Error())
return err
}
defer filep.Close()

// wrap the output file with a gzipper
zipper, err := h.deps.GzipNewWriterLevel(filep, gzip.BestSpeed)
if err != nil {
h.Logger.Warnf("savedata: gzip.NewWriterLevel: %s", err.Error())
h.logger.Warnf("savedata: gzip.NewWriterLevel: %s", err.Error())
return err
}
defer zipper.Close()

// marshal the measurement to JSON
data, err := h.deps.JSONMarshal(session.serverSchema)
if err != nil {
h.Logger.Warnf("savedata: json.Marshal: %s", err.Error())
h.logger.Warnf("savedata: json.Marshal: %s", err.Error())
return err
}

Expand All @@ -423,31 +422,31 @@ func (h *Handler) collect(w http.ResponseWriter, r *http.Request) {
// make sure we have a session
session := h.popSession(r.Header.Get(authorization))
if session == nil {
h.Logger.Warn("collect: session missing")
h.logger.Warn("collect: session missing")
w.WriteHeader(400)
return
}

// read the incoming measurements collected by the client
data, err := h.deps.IOReadAll(r.Body)
if err != nil {
h.Logger.Warnf("collect: ioutil.ReadAll: %s", err.Error())
h.logger.Warnf("collect: io.ReadAll: %s", err.Error())
w.WriteHeader(400)
return
}

// unmarshal client data from JSON into the server data structure
err = json.Unmarshal(data, &session.serverSchema.Client)
if err != nil {
h.Logger.Warnf("collect: json.Unmarshal: %s", err.Error())
h.logger.Warnf("collect: json.Unmarshal: %s", err.Error())
w.WriteHeader(400)
return
}

// serialize all
data, err = h.deps.JSONMarshal(session.serverSchema.Server)
if err != nil {
h.Logger.Warnf("collect: json.Marshal: %s", err.Error())
h.logger.Warnf("collect: json.Marshal: %s", err.Error())
w.WriteHeader(500)
return
}
Expand Down Expand Up @@ -489,8 +488,8 @@ func (h *Handler) RegisterHandlers(mux *http.ServeMux) {

// reaperLoop is the goroutine that periodically reaps expired sessions.
func (h *Handler) reaperLoop(ctx context.Context) {
h.Logger.Debug("reaperLoop: start")
defer h.Logger.Debug("reaperLoop: done")
h.logger.Debug("reaperLoop: start")
defer h.logger.Debug("reaperLoop: done")
defer close(h.stop)
for ctx.Err() == nil {
const reapInterval = 14 * time.Second
Expand Down
Loading

0 comments on commit f03a731

Please sign in to comment.