<p>I'm Go noob, and trying to write some functions associated with message handler callbacks for the Paho MQTT client (i.e. what to do when a message is received by the client). This is a two part question, and I've tried to clearly indicate both parts:</p>
<p>I need to check the payload data against a whitelist <code>map</code> of allowed data before deciding to save to a database or not. I'm trying to accomplish two things by this:</p>
<ul>
<li>Detect bad data with a few cheap CPU cycles rather than a comparatively expensive database hit.</li>
<li>Store the associated prepared <code>INSERT</code> statements in my <code>map</code> so that they <code>Exec</code> faster.</li>
</ul>
<p><strong>Q1: Is this a sensible way of doing this?</strong> I anticipate having perhaps a few thousand entries in that <code>map</code>, and never call <code>Close()</code> on the statements.</p>
<p>I also need to poll the database occasionally to update the <code>map</code> (it's called 'topics'), as what constitutes valid data is subject to change. At the moment I've got something like this (removed error checking for simplicity here):</p>
<pre><code>topics := make(map[string]*sql.Stmt)
tc := time.NewTicker(time.Duration(freq)*time.Second).C // set up a ticker channel to poll every 'freq' seconds
go func() {
for {
select {
case <- tc:
// wipe the topics map and create a new one. TODO: does this need a mutex?!
topics = make(map[string]*sql.Stmt)
// poll the devices and make their prepared statements in a separate goroutine
go func() {
var mk string // to store the map keys, referred to as 'identifier' in the SQL query
rows, _ := db.Query("SELECT identifier FROM message_publishers WHERE active;")
defer rows.Close()
for rows.Next() {
_ := rows.Scan(&mk)
// in below SQL the table name is derived from the getter getTableName
insert_stmt, _ := m.tsdb.Prepare(fmt.Sprintf("INSERT INTO \"%s\"(colName) VALUES($1);", getTableName(mk)))
topics[mk] = insert_stmt
}
}()
}
}
}()
</code></pre>
<p><strong>Q2: Do I need a mutex lock on that <code>map</code>? (on the last line)</strong> I'm going to want to read from it (when a message arrives) while the goroutine is populating it with updated <code>sql.Stmt</code> values. I'm also two goroutines deep; am I doing this right, or should I take another approach?</p>
<hr/>**评论:**<br/><br/>kapoof_euw: <pre><p>A few things:</p>
<ul>
<li>You can use a 'for range' statement instead of a for with a select inside. Just 'for range tc' if I'm not mistaken.</li>
<li>I would be careful with spawning new go-routines on every tick. If your DB access would be slow or you have to process a lot of rows, your go-routine might not be done before the next tick. Having 2 of those work simultaneously might give incorrect results. You can get away with omitting the inner "go func" and just executing it in the same thread as your for-loop.</li>
<li>Instead of populating your "active" map, I would suggest you populate a different map stored in a temporary var first and then replacing your "active map" with the one stored in the temporary var. So basically instead of the "wipe the topics map", just use a different map and replace it when it's filled up. That way you'll never have concurrent read-write to the same map and you don't need a mutex.</li>
</ul></pre>yorbit: <pre><p>Great advice; thanks so much for reading the code through.</p></pre>metamatic: <pre><p>A map and a mutex is a perfectly reasonable way to implement some caching. Yes, you need to lock read and well as write. You might fall under the preconditions for which <a href="https://golang.org/pkg/sync/#Map" rel="nofollow">sync.Map</a> is optimized.</p>
<p>Assuming you know you're going to use all those SQL statements repeatedly for every new publisher, it probably makes sense to prepare and cache the statements. However, I'm thinking you'd get a much bigger boost in performance from buffering and inserting data in batches rather than paying the overhead of journal and index updates for every individual insert. That's harder work from a programming point of view, of course.</p></pre>yorbit: <pre><p>Thanks for the tip on <code>sync.Map</code>, I didn't know about that type. Regarding batching, that's a good idea, but as you pointed out, it's extra work to buffer for each table. I'll look into it when I feel it could become a bottleneck.</p></pre>MrPhatBob: <pre><p>I can't remember where I got this Mutex protected Map code from but if you combine the Mutex with the Map in a struct:</p>
<pre><code>var index = struct {
sync.RWMutex
m map[string]string
}
{m: make(map[string]string)}
</code></pre>
<p>Then you can protect writes with:</p>
<pre><code>index.Lock()
index.m[idx] = thing you're writing
index.Unlock()
</code></pre>
<p>And reading with</p>
<pre><code>index.RLock()
thing = index.m[idx]
index.RUnlock()
</code></pre></pre>
这是一个分享于 的资源,其中的信息可能已经有所发展或是发生改变。
入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889
- 请尽量让自己的回复能够对别人有帮助
- 支持 Markdown 格式, **粗体**、~~删除线~~、
`单行代码`
- 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
- 图片支持拖拽、截图粘贴等方式上传