Working with InfluxDB in Go
InfluxDB is one of the most popular time series databases on the market. In this post you will learn its key concepts, as well as, how to interact with InfluxDB in Go on example.
InfluxDB
InfluxDB design conceptually reminds relational database. Don’t get me wrong, it is not a relational database in any sense. It doesn’t impose schema on your data and doesn’t implement SQL. But… it has equivalent concepts and is operated via DSL looking quite similar to SQL.
Key concepts
If you are new to InfluxDB, have a look at key concepts with examples here. If you need a quick brush-up, read on.
The largest entity in InfluxDB is a database. You can have multiple databases in one InfluxDB instance.
A database usually has multiple measurements. A measurement is roughly equivalent to a relational table, but it doesn’t enforce any structure on data it stores.
Just like relational tables contain records, measurements contain points. Every point has mandatory fields(s) and a timestamp. Also a point may have optional tag(s).
A timestamp specifies when its point was created. It is a primary index of a measurement.
A field is a key=value
pair, which is similar to a column with its value in a relational table.
A tag is also a key=value
pair, but it differs from a field, because measurements are always indexed on tag(s) and never on fields. In that way tags are equivalent to indexed columns in a relational table.
InfluxDB also has series concept, which is a combination of a measurement and tag(s). Series is similar to compound index in a relational table.
There are official schema design recommendations. Make sure you read them before creating your first measurement.
Line Protocol
Every point is encoded using Line Protocol, which has a simple structure:
[key] [fields] [timestamp]
A key consists of a measurement and optional tags separated by comma.
It’s easier to understand Line Protocol on an example of one point:
cpu,host=server02,region=uswest value=3 1434055562000010000
This point belongs to a measurement cpu
. The point specifies two tags host
and region
. Also the point has one field with name value
and value 3
. The timestamp at the end informs us when the point was created.
InfluxDB Go client
InfluxDB team developed and supports a Go client. This package provides convenience functions to read and write time series data. Under the hood it utilizes InfluxDB RESTful API.
Connecting to InfluxDB
To connect to InfluxDB you need to create a new HTTP client, e.g.:
httpClient, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://localhost:8086",
Username: "username",
Password: "password",
})
Writing data
When writing points to InfluxDB you should do it in batches to improve performance, e.g.:
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Database: "nodes",
Precision: "us",
})
bp.AddPoint(client.NewPoint(
"cpu_usage",
tags,
fields,
time.Now(),
))
....
err := httpClient.Write(bp)
Reading data
To read data from InfluxDB, you need to create and execute a Query, e.g.:
q := client.Query{
Command: fmt.Sprintf("select mean(cpu_usage) from node_status where cluster = '%s'", cluster),
Database: "nodes",
}
resp, err := c.Query(q)
...
res, err := resp.Results[0].Series[0].Values[0][1].(json.Number).Float64()
Example
First you need to install InfluxDB if you haven’t done it yet.
Creating user and database
influx
> create user monitor with password 'secret'
> grant all privileges to monitor
> create database nodes
Example
Example below connects to InfluxDB, generates random metrics for two clusters for 20 days and calculates mean cpu usage for both clusters.
package main
import (
"encoding/json"
"fmt"
"github.com/influxdata/influxdb/client/v2"
"log"
"math/rand"
"time"
)
const (
database = "nodes"
username = "monitor"
password = "secret"
)
var clusters = []string{"public", "private"}
func main() {
c := influxDBClient()
createMetrics(c)
for _, cluster := range clusters {
log.Printf("Mean values: cluster='%s', cpu_usage='%f'", cluster, meanCpuUsage(c, cluster))
}
}
func influxDBClient() client.Client {
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://localhost:8086",
Username: username,
Password: password,
})
if err != nil {
log.Fatalln("Error: ", err)
}
return c
}
func createMetrics(c client.Client) {
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: database,
Precision: "s",
})
if err != nil {
log.Fatalln("Error: ", err)
}
eventTime := time.Now().Add(time.Second * -20)
var t time.Duration
for t = 0; t < 20; t++ {
for i := 0; i < 100; i++ {
clusterIndex := rand.Intn(len(clusters))
tags := map[string]string{
"cluster": clusters[clusterIndex],
"host": fmt.Sprintf("192.168.%d.%d", clusterIndex, rand.Intn(100)),
}
fields := map[string]interface{}{
"cpu_usage": rand.Float64() * 100.0,
"disk_usage": rand.Float64() * 100.0,
}
point, err := client.NewPoint(
"node_status",
tags,
fields,
eventTime.Add(time.Second*10),
)
if err != nil {
log.Fatalln("Error: ", err)
}
bp.AddPoint(point)
}
}
err = c.Write(bp)
if err != nil {
log.Fatal(err)
}
}
func meanCpuUsage(c client.Client, cluster string) float64 {
q := client.Query{
Command: fmt.Sprintf("select mean(cpu_usage) from node_status where cluster = '%s'", cluster),
Database: database,
}
resp, err := c.Query(q)
if err != nil {
log.Fatalln("Error: ", err)
}
if resp.Error() != nil {
log.Fatalln("Error: ", resp.Error())
}
res, err := resp.Results[0].Series[0].Values[0][1].(json.Number).Float64()
if err != nil {
log.Fatalln("Error: ", err)
}
return res
}
Sample output:
2016/04/18 19:04:24 Mean values: cluster='public', cpu_usage='47.198448'
2016/04/18 19:04:24 Mean values: cluster='private', cpu_usage='45.478159'
In summary
InfluxDB is a time series database optimised for writes and series searches. It has a data manipulation DSL designed to be very similar to SQL to simplify adoption. Also InfluxDB team provides a Go client, which allows to simplify writing and reading data.