I'm Go noob, and trying to write some functions associated with message handler callbacks for the Paho MQTT client (i.e. what to do when a message is received by the client). This is a two part question, and I've tried to clearly indicate both parts:
I need to check the payload data against a whitelist map
of allowed data before deciding to save to a database or not. I'm trying to accomplish two things by this:
- Detect bad data with a few cheap CPU cycles rather than a comparatively expensive database hit.
- Store the associated prepared
INSERT
statements in mymap
so that theyExec
faster.
Q1: Is this a sensible way of doing this? I anticipate having perhaps a few thousand entries in that map
, and never call Close()
on the statements.
I also need to poll the database occasionally to update the map
(it's called 'topics'), as what constitutes valid data is subject to change. At the moment I've got something like this (removed error checking for simplicity here):
topics := make(map[string]*sql.Stmt)
tc := time.NewTicker(time.Duration(freq)*time.Second).C // set up a ticker channel to poll every 'freq' seconds
go func() {
for {
select {
case <- tc:
// wipe the topics map and create a new one. TODO: does this need a mutex?!
topics = make(map[string]*sql.Stmt)
// poll the devices and make their prepared statements in a separate goroutine
go func() {
var mk string // to store the map keys, referred to as 'identifier' in the SQL query
rows, _ := db.Query("SELECT identifier FROM message_publishers WHERE active;")
defer rows.Close()
for rows.Next() {
_ := rows.Scan(&mk)
// in below SQL the table name is derived from the getter getTableName
insert_stmt, _ := m.tsdb.Prepare(fmt.Sprintf("INSERT INTO \"%s\"(colName) VALUES($1);", getTableName(mk)))
topics[mk] = insert_stmt
}
}()
}
}
}()
Q2: Do I need a mutex lock on that map
? (on the last line) I'm going to want to read from it (when a message arrives) while the goroutine is populating it with updated sql.Stmt
values. I'm also two goroutines deep; am I doing this right, or should I take another approach?
评论:
kapoof_euw:
A few things:
- You can use a 'for range' statement instead of a for with a select inside. Just 'for range tc' if I'm not mistaken.
- I would be careful with spawning new go-routines on every tick. If your DB access would be slow or you have to process a lot of rows, your go-routine might not be done before the next tick. Having 2 of those work simultaneously might give incorrect results. You can get away with omitting the inner "go func" and just executing it in the same thread as your for-loop.
- Instead of populating your "active" map, I would suggest you populate a different map stored in a temporary var first and then replacing your "active map" with the one stored in the temporary var. So basically instead of the "wipe the topics map", just use a different map and replace it when it's filled up. That way you'll never have concurrent read-write to the same map and you don't need a mutex.
metamatic:Great advice; thanks so much for reading the code through.
yorbit:A map and a mutex is a perfectly reasonable way to implement some caching. Yes, you need to lock read and well as write. You might fall under the preconditions for which sync.Map is optimized.
Assuming you know you're going to use all those SQL statements repeatedly for every new publisher, it probably makes sense to prepare and cache the statements. However, I'm thinking you'd get a much bigger boost in performance from buffering and inserting data in batches rather than paying the overhead of journal and index updates for every individual insert. That's harder work from a programming point of view, of course.
MrPhatBob:Thanks for the tip on
sync.Map
, I didn't know about that type. Regarding batching, that's a good idea, but as you pointed out, it's extra work to buffer for each table. I'll look into it when I feel it could become a bottleneck.
I can't remember where I got this Mutex protected Map code from but if you combine the Mutex with the Map in a struct:
var index = struct { sync.RWMutex m map[string]string } {m: make(map[string]string)}
Then you can protect writes with:
index.Lock() index.m[idx] = thing you're writing index.Unlock()
And reading with
index.RLock() thing = index.m[idx] index.RUnlock()
