Influxdb client를 구조체로 구현하기
개요
최근 influx DB를 다루게 되었다. 사이트에 대한 로그를 수집하는 프로젝트를 진행하고 있는데 로그를 일정 시간마다 꾸준히 수집하고 있다. 관계형 데이터베이스도 사용할 수 있지만 시간에 관계되는 데이터다 보니 시계열 데이터베이스를 사용하는게 낫겠다 싶었다.
시계열 데이터베이스에는 여러가지가 존재하는데 IBM Informix, Prometheus, InfluxDB등 이 존재한다. 이 중에 InfluxDB를 선택한 이유는 Open-source임과 동시에 익숙한 언어인 Go로 작성되어 있었고, Go client library도 문서화가 잘 되어있었다. Prometheus도 비슷한 조건에서 사용할 수 있는 DB였으나 모니터링보다는 주기적으로 로깅만 할 땐 InfluxDB가 더 적합할 것 같아서 InfluxDB를 선택했다.
InfluxDB
Influx DB를 간단히 소개하면 open-source time series database(TSDB)중 하나이다. 실제로는 Open-source 버전과 Enterprise(유료)버전이 따로 존재한다.
Influx DB는 Attribute, Tuple, Table이 존재하는 일반적인 관계형 데이터베이스와는 다르게 시계열 데이터베이스는 다음과 같은 구조를 갖는다.
Bucket: 시계열 데이터가 저장된 곳을 의미한다. 초기에 Bucket을 만들고 bucket에 데이터를 저장하게 된다.
Measurement: 데이터를 구분짓는 문자열이다. 관계형 데이터베이스에서 테이블 명이라고 생각하면 편하다.
Tag: Key와 Value쌍으로 저장되는 데이터이다. 주로 메타데이터를 저장하는데 활용한다. 예를 들어 온도라고 하면 위치 (거실, 주방 등)에 대한 정보를 저장하는데 활용할 수 있다.
Field: Key와 Value 쌍으로 저장되는 데이터 이지만 실제 데이터를 저장하는 곳이다. 예를 들어 온도를 저장하는데 활용될 수 있다.
Timestamp: 데이터가 저장될 때 시간을 나타낸다.
Tag와 Field의 큰 차이점은 index가 되는 냐의 여부이다. Tag에 메타데이터를 주로 저장한다는 이유도 index로 인한 성능 향상 때문에 메타데이터로 데이터를 찾기 용이하기 때문이다.
문제점
Influx DB는 go로 구현된 DB답게 go client 라이브리러리도 잘 문서화 되있는 편이다. Influx DB를 set up할 때에도 step by step으로 잘 따라하면 누구나 구현할 수 있다.
go에서 influx DB에 데이터를 쓰기 위해선 다음과 같은 절차를 따른다.
- bucket, org, token, url등 변수를 설정한다.
- NewClient로 새로운 client를 만든다.
- WriteAPIBlocking(WriteAPI) 또는 QueryAPI를 통해 API를 만든다.
변수를 각자 선언하고 필요할 때 마다 NewClient를 통해 새로운 클라이언트를 만들어도 되지만 이렇게 할 경우 나중에 변수를 관리하기 어려워질 것 같았다. 그래서 기존의 API에 wrapper를 씌어 변수를 관리하기 더 편리하게 구조체로 구현해 보기로 했다.
구현
우선 구현하기 위해 기본적으로 필요한 모듈을 import한다.
import (
"context"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
)
새로운 구조체를 반환하는 코드이다. 구조체 내부에 새로운 client를 만드는 메서드를 구현할 수도 있었지만 따로 함수로 구현한 이유는 influxDB client코드를 보았을 때 관행적으로 새로운 구조체를 반환할 떄에는 따로 함수로 구현하는 경우가 많았고 Go의 경우에 :=
를 사용하여 내부의 구조체 형식을 알지 못하더라도 사용이 반환값을 사용가능하기에 이렇게 구현하였다.
참고로 influxdb-client-go에서는 다음과 같이 Client를 반환하는 함수를 만든다.
func NewClient(serverURL string, authToken string) Client {
return NewClientWithOptions(serverURL, authToken, DefaultOptions())
}
따라서 다음과 같이 org, bucket, url, token을 받고 해당 값에 해당하는 client를 만들어 새로운 구조체 포인터를 반환하도록 구현했다.
influxdb-client-go
func GetNewInfluxClient(org string, bucket string, url string, token string) *InfluxClient {
return &InfluxClient{
token: token,
url: url,
org: org,
bucket: bucket,
client: influxdb2.NewClient(url, token),
}
}
NewClient코드는 사실 자체적으로 만든 http 통신을 위한 구조체에 불과하다. 구조체에 저장된 정보를 바탕으로 api를 통해 Query를 보내게 된다. 더 자세한 코드는 여기에서 NewService 항목을 보면 된다.
구조체의 field값들을 다음과 같이 정하였다. query_table은 query 요청후 받은 query table이 저장되고 query_result에는 table로 부터 결과 한줄 씩 받아 저장하게 된다.
type InfluxClient struct {
token string
url string
org string
bucket string
client influxdb2.Client
query_table *api.QueryTableResult
query_result map[string]interface{}
}
measurement, tag, fields를 받고 influx DB에 값을 쓰게 된다.
// Write a line of protocol to the database
func (ic *InfluxClient) Write(measurement string, tags map[string]string, fields map[string]interface{}) error {
writeAPI := ic.client.WriteAPIBlocking(ic.org, ic.bucket)
point := write.NewPoint(measurement, tags, fields, time.Now())
time.Sleep(1 * time.Second)
if err := writeAPI.WritePoint(context.Background(), point); err != nil {
return err
}
return nil
}
Flux query를 받아 해당 query에 대한 결과를 query_table에 저장한다.
// Send a query and print the results
func (ic *InfluxClient) Query(queryString string) error {
queryAPI := ic.client.QueryAPI(ic.org)
results, err := queryAPI.Query(context.Background(), queryString)
if err != nil {
return err
}
ic.query_table = results
if err := results.Err(); err != nil {
return err
}
return nil
}
query table로 부터 다음 결과를 받아 query_result에 저장한다.
// Get and save the following results
func (ic *InfluxClient) NextResult() bool {
if ic.query_table.Next() {
ic.query_result = ic.query_table.Record().Values()
return true
}
return false
}
query_result에 저장된 결과를 반환한다.
// Return the retrieved results
func (ic *InfluxClient) GetResult() (map[string]interface{}, error) {
if err := ic.query_table.Err(); err != nil {
return nil, err
} else {
return ic.query_result, nil
}
}
전체 코드
package main
import (
"context"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
)
func GetNewInfluxClient(org string, bucket string, url string, token string) *InfluxClient {
return &InfluxClient{
token: token,
url: url,
org: org,
bucket: bucket,
client: influxdb2.NewClient(url, token),
}
}
type InfluxClient struct {
token string
url string
org string
bucket string
client influxdb2.Client
query_table *api.QueryTableResult
query_result map[string]interface{}
}
// Write a line of protocol to the database
func (ic *InfluxClient) Write(measurement string, tags map[string]string, fields map[string]interface{}) error {
writeAPI := ic.client.WriteAPIBlocking(ic.org, ic.bucket)
point := write.NewPoint(measurement, tags, fields, time.Now())
time.Sleep(1 * time.Second)
if err := writeAPI.WritePoint(context.Background(), point); err != nil {
return err
}
return nil
}
// Send a query and print the results
func (ic *InfluxClient) Query(queryString string) error {
queryAPI := ic.client.QueryAPI(ic.org)
results, err := queryAPI.Query(context.Background(), queryString)
if err != nil {
return err
}
ic.query_table = results
if err := results.Err(); err != nil {
return err
}
return nil
}
// Get and save the following results
func (ic *InfluxClient) NextResult() bool {
if ic.query_table.Next() {
ic.query_result = ic.query_table.Record().Values()
return true
}
return false
}
// Return the retrieved results
func (ic *InfluxClient) GetResult() (map[string]interface{}, error) {
if err := ic.query_table.Err(); err != nil {
return nil, err
} else {
return ic.query_result, nil
}
}