Home   About Me   Blog  

Store application logs in timescaleDB/postgres.(23 November 2018)

  1. Intro
  2. Opinion
  3. timescaleDB & PostgreSQL
  4. Our application
  5. Querying
  6. Conclusion
Intro
If the title alone has already turned you off, I understand. It's almost unheard of, these days, to store application logs in a structured datastore.
Whenever people talk about logs and their logging pipeline, it usually involves ELK(Elasticsearch), MongoDB or such other NoSQL databases.

And no such talk is complete without a tale of how people have to stay up at night feeding their Elasticsearch monstrous JVM with the blood of unblemished year-old sheep(or whatever JVM's feed on).

And it does make a lot of sense to store logs in an unstructured database, because after all, logs are unstructured by nature. Are they?
In this blogpost, I'll try & make a case for storing logs in an SQL database. I'll then go ahead and implement a proto-type logging pipeline using PostgreSQL/timescaleDB.

Opinion
I hold a differing opinion, I think if you look at logs hard enough; a structure starts to emerge.
In this section, I'll lay out my opinion about logs and logging in general.
This are my opinions and I hold no brief for anybody. I'm not going to try and convince you to adopt my worldview, but atleast to consider it.

opinion 1: logs are actually structured.
If you look at an example HTTP log of the popular tcp/http load balancer, haproxy:

Mar 9 15:08:05 LB1 local0.info haproxy[21843]: 10.0.0.1:1028 [09/Mar/2012:15:08:05.179] FT BK/SRV 0/0/1/8/9 304 12 - - ”GET / HTTP/1.1”
                
You can see a lot of structure in it, there's; timestamp, client IP address, backend name, response time, status code etc
And all this things will be available in every log event; that's a lot of structure.

opinion 2: You should annotate your logs with more structured information to make them much more useful.
Take the haproxy log above, it would be much more useful if we added metadata like; opinion 3: logs should be primarily used to help debug issues in production.

opinion 4: logs should only be persisted for a short period of time(~7days).
This is a corollary of opinion 3 above. If we buy into the argument that logs are for debugging purposes, then we can also argue that if an issue occurs in production, you ought to debug it asap. Which means, you only need to store logs for a max of 7days.
What you do with the logs after 7days(7 is just a rough estimate) is upto you; you can send them to /dev/null, or AWS s3 or some other cold storage but they should not be lying around in your primary log store.

opinion 5: you should not lose logs, but it should not be a big deal if you do.

opinion 6: you should err on the side of logging more meta/data than you think you'll need.

opinion 7: logs can be time-series data.
Take a deep breathe; time-series data at this point in time in our industry is a loaded term that means different things to different people.
The definition of what time-series data is that I like asscociating myself with is; data that collectively represents how a system/process/behavior changes over time. - taken from the blogpost; What the heck is time-series data (go read it)

ie, I'm making the case that, the following log events are time-series data:


time                          | application_name |application_version | environment_name |     log_event      | trace_id |            file_path        |  host_ip   |      data
------------------------------+------------------+--------------------+------------------+--------------------+----------+-----------------------------+------------+------------------------------------------------------------------------
2018-11-18 14:07:18.936522+00 | ETL_app          | v0.5.0             | production       | video_process_job3 | caf72697 | /usr/src/app/code/etl.py    | 172.23.0.2 | {"job_id": "658d31dd85fd", "jobType": "batch"}
2018-11-18 14:14:58.223893+00 | ETL_app          | v0.6.0             | canary           | video_process_job3 | 17603a0  | /usr/src/app/code/etl.py    | 172.23.0.3 | {"error": "Traceback (most recent call last):\n  File \"/usr/src/app/code/etl.py\", line 129, in job3\n    ourJobs[5]\nIndexError: list index out of range\n", "job_id": "c1a164c2-86c5-43e6-a0e5-3ca2377abe95", "jobType": "batch"}
2018-11-18 14:09:09.581655+00 | WebApp           | v0.5.0             | production       | login              | 45a7dc73 | /usr/src/app/code/web.py    | 172.23.0.2 | {"user": "Shawn Corey Carter", "email": "someemail@email.com", "status_code": 504, "response_time": 95}
2018-11-18 14:09:09.580918+00 | WebApp           | v0.5.0             | production       | login              | 0f776af0 | /usr/src/app/code/web.py    | 172.23.0.2 | {"user": "Shawn Corey Carter", "email": "someemail@email.com", "status_code": 504, "response_time": 6}
2018-11-18 14:09:10.552307+00 | WorkerApp        | v1.6.8             | staging          | process_work       | 6d07fb95 | /usr/src/app/code/worker.py | 172.23.0.2 | {"worker_id": "97b44537", "datacenter": "us-west"}
2018-11-18 14:09:10.551532+00 | WorkerApp        | v0.5.6             | production       | process_work       | 8b2daa49 | /usr/src/app/code/woker.py  | 172.23.0.2 | {"worker_id": "a6035461, "datacenter": "us-west"}
                
you can(and probably should) fashion your logs as time-series events.

When we consider all that, we see that an SQL database is not that bad of an idea as a medium for storing logs.
And on top of that you gain a lot of other added advantages: timescaleDB/postgres
PostgreSQL is a popular SQL datatabse.
TimescaleDB, despite the name, is actually a postgres extension that enhances postgres for time-series data.
    side note: it is possible to use PostgreSQL to store time-series data without having to install the timescaleDB extension.
In this blogpost, however, we are going to use postgres + timescaleDB extension for storing logs.

You can go through the TimescaleDB documentation to learn how to set it up.
But assuming you already have PostgreSQL installed, the following bash script should get you going;

#!/usr/bin/env bash

create_extension() {
    psql -U "${POSTGRES_USER}" "${POSTGRES_DB}" -c "CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;"   
}

create_table() {
    psql -U "${POSTGRES_USER}" "${POSTGRES_DB}" -c "CREATE TABLE logs (
    time                   TIMESTAMPTZ       NOT NULL,
    application_name       TEXT              NOT NULL,
    application_version    TEXT              NOT NULL,
    environment_name       TEXT              NOT NULL,
    log_event              TEXT              NOT NULL,
    trace_id               TEXT              NOT NULL,
    file_path              TEXT              NOT NULL,
    host_ip                TEXT              NOT NULL,
    data                   JSONB             NULL,
    PRIMARY KEY(time, trace_id)
    );"  
}

create_table_indices() {
    # read:
    # 1. https://www.postgresql.org/docs/11/indexes.html
    # 2. https://blog.timescale.com/use-composite-indexes-to-speed-up-time-series-queries-sql-8ca2df6b3aaa
    # 3. https://docs.timescale.com/v1.0/using-timescaledb/schema-management#indexing-best-practices

    psql -U "${POSTGRES_USER}" "${POSTGRES_DB}" -c "CREATE INDEX idxgin ON logs USING GIN (data);"
    psql -U "${POSTGRES_USER}" "${POSTGRES_DB}" -c "CREATE INDEX ON logs (log_event, trace_id, time DESC) WHERE log_event IS NOT NULL AND trace_id IS NOT NULL;"
}

create_hypertable() {
    psql -U "${POSTGRES_USER}" "${POSTGRES_DB}" -c "SELECT create_hypertable('logs', 'time');"
}

# 1. create timescaledb extension
# 2. create database table
# 3. create indices
# 4. create  hypertable
create_extension
create_table
create_table_indices
create_hypertable
                
That creates the timescaleDB exension in postgres, creates a table, indices and a timescaledb hypertable

Our application
In our application, we want our log events to have a few mandatory items; time, application_name, application_version, environment_name, log_event, trace_id, host_ip etc
This will allow us to be able to ask more meaningful questions when debugging our app/service, we can ask questions like; That's why we created a database table with those fields in the bash script above. We also added a JSONB field so that we can be able to store any other fields that are not mandatory in all log events eg errors, http status code etc

Our app will write logs with the said structure to a linux named pipe, then there will be another agent/daemon running continuously that reads from the same pipe and sends the logs to timescaleDB.

So, the application code to write logs to a named pipe would look like the following:

import os
import json
import uuid
import errno
import random
import asyncio
import datetime

import get_ip


def makeFifo(fifo_directory="/tmp/namedPipes", fifo_file="komusNamedPipe"):
    fifo_file = os.path.join(fifo_directory, fifo_file)
    try:
        os.mkdir(fifo_directory, mode=0o777)
    except OSError as e:
        if e.errno == 17:
            pass
        else:
            raise e
    try:
        os.mkfifo(fifo_file, mode=0o777)
    except OSError as e:
        if e.errno != errno.EEXIST:
            raise e
    return fifo_file


def log_structure(log_event, trace_id, application_name, environment_name, file_path, data):
    now = datetime.datetime.now(datetime.timezone.utc)
    return {
        "log_event": log_event,
        "trace_id": trace_id,
        "application_name": application_name,
        "application_version": "v2.1.0",
        "environment_name": environment_name,
        "file_path": file_path,
        "data": data,
        "time": str(now),
        "host_ip": get_ip(),
    }


async def emmit_logs(
    log_event, trace_id, application_name, environment_name, file_path, log_event_data
):
    try:
        pipe = os.open(makeFifo(), os.O_WRONLY | os.O_NONBLOCK | os.O_ASYNC)
        log = log_structure(
            log_event=log_event,
            trace_id=trace_id,
            application_name=application_name,
            environment_name=environment_name,
            file_path=file_path,
            data=log_event_data,
        )
        # we use newline to demarcate where one log event ends.
        write_data = json.dumps(log) + "\n"
        write_data = write_data.encode()
        os.write(pipe, write_data)
        os.close(pipe)
    except OSError as e:
        if e.errno == 6:
            pass
        else:
            pass
    finally:
        await asyncio.sleep(1)

async def web_app(app_name):
    while True:
        await emmit_logs(
            log_event="login",
            trace_id=str(uuid.uuid4()),
            application_name=app_name,
            environment_name=random.choice(["production", "canary", "staging"]),
            file_path=os.path.realpath(__file__),
            log_event_data={
                "user": "Shawn Corey Carter",
                "age": 48,
                "email": "someemail@email.com",
                "status_code": random.choice([200, 202, 307, 400, 404, 500, 504]),
                "response_time": random.randint(1, 110),
            },
        )

loop = asyncio.get_event_loop()
loop.run_until_complete(web_app(app_name="web_app"))
loop.close()
            

Now we need to write code for the agent that will read code from the named pipe and send it to timescaleDB/postgres.
This will be in two parts, in part one we read from the named pipe and buffer the logs in memory, the second part takes whatever is buffered in memory and sends it to timescaleDB.
In the following code, we read from the named pipe and buffer log events in memory:

import os
import json
import random
import asyncio
import asyncpg
import datetime

loop = asyncio.get_event_loop()

class Buffer:
    def __init__(self, loop, interval=6):
        self.loop = loop
        self.interval = interval
        self.lock = asyncio.Semaphore(value=1, loop=self.loop)
        self.buf = []

    def send_logs_every(self):
        jitter = random.randint(1, 9) * 0.1
        return self.interval + jitter


bufferedLogs = Buffer(loop=loop)


async def send_log_to_remote_storage(logs):
    # todo: to be implemented later
    pass

async def schedule_log_sending():
    # todo: to be implemented later
    pass


class PIPE:
    fifo_file = None

    def __enter__(self):
        self.fifo_file = open("/tmp/namedPipes/komusNamedPipe", mode="r")
        os.set_blocking(self.fifo_file.fileno(), False)
        return self.fifo_file

    def __exit__(self, type, value, traceback):
        if self.fifo_file:
            self.fifo_file.close()

    async def __aenter__(self):
        self.fifo_file = open("/tmp/namedPipes/komusNamedPipe", mode="r")
        os.set_blocking(self.fifo_file.fileno(), False)
        return await asyncio.sleep(-1, result=self.fifo_file)

    async def __aexit__(self, exc_type, exc, tb):
        if self.fifo_file:
            await asyncio.sleep(-1, result=self.fifo_file.close())

async def collect_logs():
    async with PIPE() as pipe:
        while True:
            try:
                data = pipe.readline()
                if len(data) == 0:
                    await asyncio.sleep(1)
                    continue
                log = json.loads(data)
                if log:
                    # buffer log events in memory
                    async with bufferedLogs.lock:
                        bufferedLogs.buf.append(log)
            except OSError as e:
                if e.errno == 6:
                    pass
                else:
                    pass

tasks = asyncio.gather(collect_logs(), schedule_log_sending(), loop=loop)
loop.run_until_complete(tasks)
loop.close()         
                

In the following code, we now take log events from the in memory buffer every X seconds and send them to timescaleDB/postgres:

async def send_log_to_remote_storage(logs):
    IN_DOCKER = os.environ.get("IN_DOCKER")
    try:
        host = "localhost" # replace with your postgres server IP address
        if IN_DOCKER:
            host = "timescale_db"
        conn = await asyncpg.connect(
            host=host,
            port=5432,
            user="myuser",
            password="hey_NSA",
            database="mydb",
            timeout=6.0,
            command_timeout=8.0,
        )

        all_logs = []
        for i in logs:
            time = datetime.datetime.strptime(i["time"], "%Y-%m-%d %H:%M:%S.%f%z")
            application_name = i["application_name"]
            application_version = i["application_version"]
            environment_name = i["environment_name"]
            log_event = i["log_event"]
            trace_id = i["trace_id"]
            file_path = i["file_path"]
            host_ip = i["host_ip"]
            data = i.get("data")
            if data:
                data = json.dumps(data)

            all_logs.append(
                (
                    time,
                    application_name,
                    environment_name,
                    application_version,
                    log_event,
                    trace_id,
                    file_path,
                    host_ip,
                    data,
                )
            )

        # batch insert
        await conn.executemany(
            """
            INSERT INTO logs(time, application_name, application_version, environment_name, log_event, trace_id, file_path, host_ip, data)
                      VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)
            """,
            all_logs,
            timeout=8.0,
        )
        print("sent")
        await conn.close()
    except Exception:
        pass


async def schedule_log_sending():
    while True:
        async with bufferedLogs.lock:
            if len(bufferedLogs.buf) > 0:
                await send_log_to_remote_storage(logs=bufferedLogs.buf)
                bufferedLogs.buf = []
        await asyncio.sleep(bufferedLogs.send_logs_every())    
                

Querying
To badly misquote The O'Jays, now that we have logs in our database, what are we going to do with it?

Let's run some queries:
1. what are the latest five events where an exception/error has been raised?

SELECT
    log_event,
    data -> 'error' AS error
FROM
    logs
WHERE
    data -> 'error' IS NOT NULL
ORDER BY
    time DESC
LIMIT 5;
                

log_event          |                                                                            error
-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------
video_process_job3 | "Traceback (most recent call last):\n  File \"/usr/src/app/code/log_emmiter.py\", line 133, in job3\n    ourJobs[5]\nIndexError: list index out of range\n"
video_process_job3 | "Traceback (most recent call last):\n  File \"/usr/src/app/code/log_emmiter.py\", line 133, in job3\n    ourJobs[5]\nIndexError: list index out of range\n"
video_process_job3 | "Traceback (most recent call last):\n  File \"/usr/src/app/code/log_emmiter.py\", line 133, in job3\n    ourJobs[5]\nIndexError: list index out of range\n"
video_process_job3 | "Traceback (most recent call last):\n  File \"/usr/src/app/code/log_emmiter.py\", line 133, in job3\n    ourJobs[5]\nIndexError: list index out of range\n"
video_process_job3 | "Traceback (most recent call last):\n  File \"/usr/src/app/code/log_emmiter.py\", line 133, in job3\n    ourJobs[5]\nIndexError: list index out of range\n"
                

2. what is the path/trace taken by the last request which resulted in an exception occuring?

SELECT
    log_event,
    trace_id,
    file_path,
    data -> 'error' AS error
FROM
    logs
WHERE
    logs.environment_name = 'production'
    AND logs.trace_id = (
        SELECT
            trace_id
        FROM
            logs
        WHERE
            data -> 'error' IS NOT NULL
        ORDER BY
            time DESC
        LIMIT 1);
                

log_event          | trace_id |            file_path             |                                                                            error
-------------------+----------+----------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------
video_process_job1 | f552320e | /usr/src/app/code/log_emmiter.py |
video_process_job2 | f552320e | /usr/src/app/code/log_emmiter.py |
video_process_job3 | f552320e | /usr/src/app/code/log_emmiter.py |
video_process_job3 | f552320e | /usr/src/app/code/log_emmiter.py | "Traceback (most recent call last):\n  File \"/usr/src/app/code/log_emmiter.py\", line 133, in job3\n    ourJobs[5]\nIndexError: list index out of range\n"
                

3. what are the log events in which the web application has returned a http 5XX status code in production?

SELECT
    log_event,
    trace_id,
    host_ip,
    data -> 'status_code' AS status_code
FROM
    logs
WHERE
    logs.environment_name = 'production'
    AND logs.application_name = 'web_app'
    AND data -> 'status_code' IN ('500', '504');
                

log_event | trace_id |  host_ip   | status_code
----------+----------+------------+-------------
login     | aa9951dc | 172.24.0.4 | 504
login     | 63898b0d | 172.24.0.4 | 500
login     | 6154aa3e | 172.24.0.4 | 504
login     | 053a9820 | 172.24.0.4 | 504
login     | 29e6644c | 172.24.0.4 | 504
                
All those instances were login events, maybe we should go and have a look at our login code.
But why are all of them emanating from the same server with IP adress 172.24.0.4? Maybe the bug is a combination of the login code and that particular server and not necessarily a fault of the login code alone.

4. what is the average, 75th and 99th percentile response time(in milliseconds) of the web application in our canary servers?


SELECT
    percentile_disc(0.5)
    WITHIN GROUP (ORDER BY data -> 'response_time') AS average,
    percentile_disc(0.75)
    WITHIN GROUP (ORDER BY data -> 'response_time') AS seven_five_percentile,
    percentile_disc(0.99)
    WITHIN GROUP (ORDER BY data -> 'response_time') AS nine_nine_percentile
FROM
    logs
WHERE
    logs.environment_name = 'canary'
    AND logs.application_name = 'web_app'
    AND data -> 'status_code' IS NOT NULL
    AND data -> 'response_time' IS NOT NULL;
                

average | seven_five_percentile | nine_nine_percentile
--------+-----------------------+----------------------
43      | 86                    | 89
                

5. which application version introduced the most errors/exceptions?

SELECT
    application_version,
    COUNT(data -> 'error') AS error_count
FROM
    logs
WHERE
    data -> 'error' IS NOT NULL
GROUP BY
    application_version
ORDER BY
    error_count DESC;
            

application_version | error_count
--------------------+-------------
v3.5.0              |           5
v3.4.0              |           1
v2.3.0              |           1
v1.7.0              |           1
            
Maybe we should roll-back the deployment of version v3.5.0 as we figure out why it has introduced so many regressions.

Conclusion
All this is fine and dandy, but does the solution scale?
This is a hard question to answer conclusively. What I can encourage you to do, is to run your own benchmarks.
You could also look at literature of people who have ran some benchmarks.
But ultimately, it comes down to your own individual application and set of circumstances. Run your benchmarks and adopt the solution that seems to fit your app.

And maybe a NoSQL database is exactly what fits your needs. Who knows? Explore and find out.
If you are a small shop and are already using an SQL database for your other business processes, it might turn out that there's no need to introduce a new database for logs.

All the code in this blogpost can be found at: https://github.com/komuw/komu.engineer/tree/master/blogs/timeScaleDB