개요

최근 influx DB를 다루게 되었다. 사이트에 대한 로그를 수집하는 프로젝트를 진행하고 있는데 로그를 일정 시간마다 꾸준히 수집하고 있다. 관계형 데이터베이스도 사용할 수 있지만 시간에 관계되는 데이터다 보니 시계열 데이터베이스를 사용하는게 낫겠다 싶었다.
시계열 데이터베이스에는 여러가지가 존재하는데 IBM Informix, Prometheus, InfluxDB등 이 존재한다. 이 중에 InfluxDB를 선택한 이유는 Open-source임과 동시에 익숙한 언어인 Go로 작성되어 있었고, Go client library도 문서화가 잘 되어있었다. Prometheus도 비슷한 조건에서 사용할 수 있는 DB였으나 모니터링보다는 주기적으로 로깅만 할 땐 InfluxDB가 더 적합할 것 같아서 InfluxDB를 선택했다.

InfluxDB

Influx DB를 간단히 소개하면 open-source time series database(TSDB)중 하나이다. 실제로는 Open-source 버전과 Enterprise(유료)버전이 따로 존재한다.
Influx DB는 Attribute, Tuple, Table이 존재하는 일반적인 관계형 데이터베이스와는 다르게 시계열 데이터베이스는 다음과 같은 구조를 갖는다.

Bucket: 시계열 데이터가 저장된 곳을 의미한다. 초기에 Bucket을 만들고 bucket에 데이터를 저장하게 된다.
Measurement: 데이터를 구분짓는 문자열이다. 관계형 데이터베이스에서 테이블 명이라고 생각하면 편하다.
Tag: Key와 Value쌍으로 저장되는 데이터이다. 주로 메타데이터를 저장하는데 활용한다. 예를 들어 온도라고 하면 위치 (거실, 주방 등)에 대한 정보를 저장하는데 활용할 수 있다.
Field: Key와 Value 쌍으로 저장되는 데이터 이지만 실제 데이터를 저장하는 곳이다. 예를 들어 온도를 저장하는데 활용될 수 있다.
Timestamp: 데이터가 저장될 때 시간을 나타낸다.

Tag와 Field의 큰 차이점은 index가 되는 냐의 여부이다. Tag에 메타데이터를 주로 저장한다는 이유도 index로 인한 성능 향상 때문에 메타데이터로 데이터를 찾기 용이하기 때문이다.

문제점

Influx DB는 go로 구현된 DB답게 go client 라이브리러리도 잘 문서화 되있는 편이다. Influx DB를 set up할 때에도 step by step으로 잘 따라하면 누구나 구현할 수 있다.
go에서 influx DB에 데이터를 쓰기 위해선 다음과 같은 절차를 따른다.

  1. bucket, org, token, url등 변수를 설정한다.
  2. NewClient로 새로운 client를 만든다.
  3. WriteAPIBlocking(WriteAPI) 또는 QueryAPI를 통해 API를 만든다.

변수를 각자 선언하고 필요할 때 마다 NewClient를 통해 새로운 클라이언트를 만들어도 되지만 이렇게 할 경우 나중에 변수를 관리하기 어려워질 것 같았다. 그래서 기존의 API에 wrapper를 씌어 변수를 관리하기 더 편리하게 구조체로 구현해 보기로 했다.

구현

우선 구현하기 위해 기본적으로 필요한 모듈을 import한다.

import (
	"context"
	"time"

	influxdb2 "github.com/influxdata/influxdb-client-go/v2"
	"github.com/influxdata/influxdb-client-go/v2/api"
	"github.com/influxdata/influxdb-client-go/v2/api/write"
)

새로운 구조체를 반환하는 코드이다. 구조체 내부에 새로운 client를 만드는 메서드를 구현할 수도 있었지만 따로 함수로 구현한 이유는 influxDB client코드를 보았을 때 관행적으로 새로운 구조체를 반환할 떄에는 따로 함수로 구현하는 경우가 많았고 Go의 경우에 :=를 사용하여 내부의 구조체 형식을 알지 못하더라도 사용이 반환값을 사용가능하기에 이렇게 구현하였다.
참고로 influxdb-client-go에서는 다음과 같이 Client를 반환하는 함수를 만든다.

func NewClient(serverURL string, authToken string) Client {
	return NewClientWithOptions(serverURL, authToken, DefaultOptions())
}

따라서 다음과 같이 org, bucket, url, token을 받고 해당 값에 해당하는 client를 만들어 새로운 구조체 포인터를 반환하도록 구현했다.

influxdb-client-go

func GetNewInfluxClient(org string, bucket string, url string, token string) *InfluxClient {
	return &InfluxClient{
		token:  token,
		url:    url,
		org:    org,
		bucket: bucket,
		client: influxdb2.NewClient(url, token),
	}
}

NewClient코드는 사실 자체적으로 만든 http 통신을 위한 구조체에 불과하다. 구조체에 저장된 정보를 바탕으로 api를 통해 Query를 보내게 된다. 더 자세한 코드는 여기에서 NewService 항목을 보면 된다.

구조체의 field값들을 다음과 같이 정하였다. query_table은 query 요청후 받은 query table이 저장되고 query_result에는 table로 부터 결과 한줄 씩 받아 저장하게 된다.

type InfluxClient struct {
	token        string
	url          string
	org          string
	bucket       string
	client       influxdb2.Client
	query_table  *api.QueryTableResult
	query_result map[string]interface{}
}

measurement, tag, fields를 받고 influx DB에 값을 쓰게 된다.

// Write a line of protocol to the database
func (ic *InfluxClient) Write(measurement string, tags map[string]string, fields map[string]interface{}) error {
	writeAPI := ic.client.WriteAPIBlocking(ic.org, ic.bucket)
	point := write.NewPoint(measurement, tags, fields, time.Now())
	time.Sleep(1 * time.Second)

	if err := writeAPI.WritePoint(context.Background(), point); err != nil {
		return err
	}
	return nil
}

Flux query를 받아 해당 query에 대한 결과를 query_table에 저장한다.

// Send a query and print the results
func (ic *InfluxClient) Query(queryString string) error {
	queryAPI := ic.client.QueryAPI(ic.org)
	results, err := queryAPI.Query(context.Background(), queryString)

	if err != nil {
		return err
	}
	ic.query_table = results
	if err := results.Err(); err != nil {
		return err
	}
	return nil
}

query table로 부터 다음 결과를 받아 query_result에 저장한다.

// Get and save the following results
func (ic *InfluxClient) NextResult() bool {
	if ic.query_table.Next() {
		ic.query_result = ic.query_table.Record().Values()
		return true
	}
	return false
}

query_result에 저장된 결과를 반환한다.

// Return the retrieved results
func (ic *InfluxClient) GetResult() (map[string]interface{}, error) {
	if err := ic.query_table.Err(); err != nil {
		return nil, err
	} else {
		return ic.query_result, nil
	}
}

전체 코드

package main

import (
	"context"
	"time"

	influxdb2 "github.com/influxdata/influxdb-client-go/v2"
	"github.com/influxdata/influxdb-client-go/v2/api"
	"github.com/influxdata/influxdb-client-go/v2/api/write"
)

func GetNewInfluxClient(org string, bucket string, url string, token string) *InfluxClient {
	return &InfluxClient{
		token:  token,
		url:    url,
		org:    org,
		bucket: bucket,
		client: influxdb2.NewClient(url, token),
	}
}

type InfluxClient struct {
	token        string
	url          string
	org          string
	bucket       string
	client       influxdb2.Client
	query_table  *api.QueryTableResult
	query_result map[string]interface{}
}

// Write a line of protocol to the database
func (ic *InfluxClient) Write(measurement string, tags map[string]string, fields map[string]interface{}) error {
	writeAPI := ic.client.WriteAPIBlocking(ic.org, ic.bucket)
	point := write.NewPoint(measurement, tags, fields, time.Now())
	time.Sleep(1 * time.Second)

	if err := writeAPI.WritePoint(context.Background(), point); err != nil {
		return err
	}
	return nil
}

// Send a query and print the results
func (ic *InfluxClient) Query(queryString string) error {
	queryAPI := ic.client.QueryAPI(ic.org)
	results, err := queryAPI.Query(context.Background(), queryString)

	if err != nil {
		return err
	}
	ic.query_table = results
	if err := results.Err(); err != nil {
		return err
	}
	return nil
}

// Get and save the following results
func (ic *InfluxClient) NextResult() bool {
	if ic.query_table.Next() {
		ic.query_result = ic.query_table.Record().Values()
		return true
	}
	return false
}

// Return the retrieved results
func (ic *InfluxClient) GetResult() (map[string]interface{}, error) {
	if err := ic.query_table.Err(); err != nil {
		return nil, err
	} else {
		return ic.query_result, nil
	}
}

Reference