Luan Phung
Published on

Build Scalable Database Audit log

Authors
  • avatar
    Name
    Luan Phung
    Twitter

The story

A year ago, I made a system that keeps track of changes in our databases. It records important details like who made the change, when it happened, and what data got changed. This system is really helping our DEV team. It makes their life much easier to find and fix bugs in the system.

Audit log

Now, as we're moving to a new way of organizing our systems - Microservices and event-driven architecture, i realized that we need to keep track of changes in a lot of places. So, we urgently need to create a scalable system that can handle all these changes in our growing business.

The idea

Audit log I set up this service using a few simple steps:
  1. First, each server (node) sends an event when a record has been changed. The event includes information like the data before and after the change, who made the change, and when it happened. To avoid repeating code in every server, I created a simple module that we call in the DAO layer of our code. Maybe your will have a question like why don't i use trigger to implement the changes in database layer, well the most important thing i want to track is who changed the data, that is reason i don't use them, also you can use both trigger and store updated user in database which will work as well, but i just wanna take control of the my code.

  2. I set up a queue that subscribes to the SNS topic mentioned earlier. This queue collects all the data sent by the servers.

  3. Next, I wrote a small audit log handler to pull events from the SQS queue we set up in step 2. This handler server does a few things:

  • It compares the old data and new data to find only the different fields when changes occur.
  • If the record has changed, I store those changes in a database like MongoDB or Cassandra. These databases are great for handling really big data.
  • Make sure this handler can be easily dockerized, making it simpler to scale this module across multiple nodes.

Implementation

1. Setup AWS SNS

To Setup SNS-SQS in aws, just follow these easy steps:

  1. Create SNS, I highly recommend using SNS Standard Topics, because it has an unlimited number of messages per second. (Link)
  2. Create SQS (Link)
  3. Subscribing an Amazon SQS queue to an Amazon SNS topic (Link)

2. DAO (Data Access Object) implementation

Here is example of updateById method in daoBase.ts. Note that we will implement the same logic with all update/delete/create methods.

daoBase.ts
export class DaoBase<T> implements DaoBaseInterface<T> {
  protected tableName!: string;
  protected dbAuditLog: IDbAuditLog;
  ...

  async updateById(id: string, fields: any) {
    let beforeDoc = await this.findById(id)
    await this.updateOne({ id }, fields);
    let afterDoc = await this.findById(id)
    this.logDbEvent('update', beforeDoc, afterDoc)
    return afterDoc
  }

  private logDbEvent(operation: string, before: Document, after: Document) {
    this.dbAuditLog.publish({
      tableName: this.tableName,
      operation,
      before,
      after
    }).catch(e => console.log(e))
  }
  ...
}

Here is simple DbAudilog class implementation:

DbAuditLog.ts
interface DbAuditEvent {
  tableName: string;
  operation: string;
  before: Document;
  after: Document;
}
export class DbAuditLog implements IDbAuditLog {
  user: AuthUser
  snsEvent: ISnsEvent
  ...
  
  private async publish(data: DbAuditEvent) {
    try {
      const event = {
        ...data,
        by: this.user
      }
      //publish event to sns
      return await this.snsEvent?.publish(event);
    } catch (e) {
      console.error(e);
    }
  }
}

For publishing sns event, you can use @aws-sdk/client-sns package

3. Audit Log Consumer implement:

AuditLogConsumer.ts
export class AuditLogConsumer implements IAuditLogConsumer{
  async handleEvents(messages: Message[]) {
   //compare data before and after then update to your audit log database
  }

  start(sqsOptions) {
    try {
      const sqs = new SQSClient(sqsOptions)
      const app = Consumer.create({
        queueUrl: sqsOptions?.queueUrl,
        handleMessageBatch: this.handleEvents.bind(this),
        sqs: sqs,
        visibilityTimeout: 300,
        waitTimeSeconds: 3,
        batchSize: 10
      });
      app.on('error', (err, message) => {
        console.error('on app error', err)
      });
      app.start()
      return this
    } catch (e) {
      console.error('error when start sqs consumer ', e);
    }
  }
}

To implement sqs consumer, i use the package sqs-comsumer. All done, now we just need to dockerize the consumer server above using Docker and deploy it everywhere you like.

Conclusion

I hope you enjoy this post. Thank you so much.