Skip to main content

Webhook Signatures

Kallglot signs all webhook events so you can verify they’re authentic. This prevents attackers from sending fake events to your endpoint.

Signature Header Format

Each webhook request includes a Kallglot-Signature header with the timestamp and signature:
Kallglot-Signature: t=1710072360,v1=5257a869e7ecebeda32affa62cdca3fa51cad7e77a0e56ff536d0ce8e108d8bd
ComponentDescription
tUnix timestamp when the event was sent
v1HMAC-SHA256 signature of the payload

Verification Process

1. Parse the Signature Header

Extract the timestamp and signature from the header:
# Ruby
parts = signature_header.split(',').map { |p| p.split('=', 2) }.to_h
timestamp = parts['t']
signature = parts['v1']
// Go
func parseSignature(header string) (timestamp, signature string) {
    parts := make(map[string]string)
    for _, part := range strings.Split(header, ",") {
        kv := strings.SplitN(part, "=", 2)
        if len(kv) == 2 {
            parts[kv[0]] = kv[1]
        }
    }
    return parts["t"], parts["v1"]
}
// JavaScript
const parts = signatureHeader.split(',').reduce((acc, part) => {
  const [key, value] = part.split('=', 2);
  acc[key] = value;
  return acc;
}, {});
const timestamp = parts['t'];
const signature = parts['v1'];
# Python
parts = dict(p.split('=', 1) for p in signature_header.split(','))
timestamp = parts['t']
signature = parts['v1']

2. Prepare the Signed Payload

The signature is computed over: {timestamp}.{body}
signed_payload = "#{timestamp}.#{raw_body}"
signedPayload := fmt.Sprintf("%s.%s", timestamp, rawBody)

3. Compute Expected Signature

computed_sig = OpenSSL::HMAC.hexdigest('SHA256', webhook_secret, signed_payload)
mac := hmac.New(sha256.New, []byte(webhookSecret))
mac.Write([]byte(signedPayload))
computedSig := hex.EncodeToString(mac.Sum(nil))

4. Compare Signatures

Use a constant-time comparison to prevent timing attacks:
ActiveSupport::SecurityUtils.secure_compare(computed_sig, signature)
hmac.Equal([]byte(computedSig), []byte(signature))

5. Check Timestamp

Reject events older than 5 minutes to prevent replay attacks:
event_time = timestamp.to_i
current_time = Time.now.to_i
return false if (current_time - event_time).abs > 300
eventTime, _ := strconv.ParseInt(timestamp, 10, 64)
currentTime := time.Now().Unix()
if abs(currentTime-eventTime) > 300 {
    return false
}

Complete Examples

Ruby / Rails

class WebhooksController < ApplicationController
  skip_before_action :verify_authenticity_token

  WEBHOOK_SECRET = ENV['KALLGLOT_WEBHOOK_SECRET']
  TIMESTAMP_TOLERANCE = 300 # 5 minutes

  def create
    payload = request.raw_post
    signature_header = request.headers['Kallglot-Signature']

    unless verify_webhook(payload, signature_header, WEBHOOK_SECRET)
      head :unauthorized
      return
    end

    event = JSON.parse(payload)
    Rails.logger.info "Received event: #{event['type']}"

    case event['type']
    when 'session.ended'
      handle_session_ended(event['data'])
    when 'transcript.ready'
      handle_transcript_ready(event['data'])
    when 'recording.ready'
      handle_recording_ready(event['data'])
    end

    head :ok
  end

  private

  def verify_webhook(payload, signature_header, secret)
    return false if signature_header.blank?

    # Parse signature header
    parts = signature_header.split(',').map { |p| p.split('=', 2) }.to_h
    timestamp = parts['t']
    expected_sig = parts['v1']

    return false if timestamp.blank? || expected_sig.blank?

    # Check timestamp
    event_time = timestamp.to_i
    current_time = Time.now.to_i
    return false if (current_time - event_time).abs > TIMESTAMP_TOLERANCE

    # Compute and compare signature
    signed_payload = "#{timestamp}.#{payload}"
    computed_sig = OpenSSL::HMAC.hexdigest('SHA256', secret, signed_payload)

    ActiveSupport::SecurityUtils.secure_compare(computed_sig, expected_sig)
  end
end

Go

package main

import (
    "crypto/hmac"
    "crypto/sha256"
    "encoding/hex"
    "encoding/json"
    "io"
    "net/http"
    "strconv"
    "strings"
    "time"
)

const (
    webhookSecret      = "whsec_your_secret"
    timestampTolerance = 300 // 5 minutes
)

func webhookHandler(w http.ResponseWriter, r *http.Request) {
    body, err := io.ReadAll(r.Body)
    if err != nil {
        http.Error(w, "Failed to read body", http.StatusBadRequest)
        return
    }

    signatureHeader := r.Header.Get("Kallglot-Signature")
    if !verifyWebhook(body, signatureHeader, webhookSecret) {
        http.Error(w, "Invalid signature", http.StatusUnauthorized)
        return
    }

    var event map[string]interface{}
    if err := json.Unmarshal(body, &event); err != nil {
        http.Error(w, "Invalid JSON", http.StatusBadRequest)
        return
    }

    eventType := event["type"].(string)
    switch eventType {
    case "session.ended":
        handleSessionEnded(event["data"])
    case "transcript.ready":
        handleTranscriptReady(event["data"])
    case "recording.ready":
        handleRecordingReady(event["data"])
    }

    w.WriteHeader(http.StatusOK)
}

func verifyWebhook(payload []byte, signatureHeader, secret string) bool {
    if signatureHeader == "" {
        return false
    }

    // Parse signature header
    parts := make(map[string]string)
    for _, part := range strings.Split(signatureHeader, ",") {
        kv := strings.SplitN(part, "=", 2)
        if len(kv) == 2 {
            parts[kv[0]] = kv[1]
        }
    }

    timestamp := parts["t"]
    expectedSig := parts["v1"]
    if timestamp == "" || expectedSig == "" {
        return false
    }

    // Check timestamp
    eventTime, err := strconv.ParseInt(timestamp, 10, 64)
    if err != nil {
        return false
    }
    currentTime := time.Now().Unix()
    if abs(currentTime-eventTime) > timestampTolerance {
        return false
    }

    // Compute signature
    signedPayload := timestamp + "." + string(payload)
    mac := hmac.New(sha256.New, []byte(secret))
    mac.Write([]byte(signedPayload))
    computedSig := hex.EncodeToString(mac.Sum(nil))

    return hmac.Equal([]byte(computedSig), []byte(expectedSig))
}

func abs(n int64) int64 {
    if n < 0 {
        return -n
    }
    return n
}

Node.js / Express

import express from 'express';
import crypto from 'crypto';

const app = express();
const WEBHOOK_SECRET = process.env.KALLGLOT_WEBHOOK_SECRET;
const TIMESTAMP_TOLERANCE = 300; // 5 minutes

function verifyWebhook(payload, signatureHeader, secret) {
  if (!signatureHeader) return false;

  // Parse signature header
  const parts = signatureHeader.split(',').reduce((acc, part) => {
    const [key, value] = part.split('=', 2);
    acc[key] = value;
    return acc;
  }, {});

  const timestamp = parts['t'];
  const expectedSig = parts['v1'];
  if (!timestamp || !expectedSig) return false;

  // Check timestamp
  const eventTime = parseInt(timestamp, 10);
  const currentTime = Math.floor(Date.now() / 1000);
  if (Math.abs(currentTime - eventTime) > TIMESTAMP_TOLERANCE) {
    return false;
  }

  // Compute signature
  const signedPayload = `${timestamp}.${payload}`;
  const computedSig = crypto
    .createHmac('sha256', secret)
    .update(signedPayload)
    .digest('hex');

  // Constant-time comparison
  try {
    return crypto.timingSafeEqual(
      Buffer.from(computedSig),
      Buffer.from(expectedSig)
    );
  } catch {
    return false;
  }
}

app.post('/webhooks/kallglot',
  express.raw({ type: 'application/json' }),
  (req, res) => {
    const signatureHeader = req.headers['kallglot-signature'];

    if (!verifyWebhook(req.body.toString(), signatureHeader, WEBHOOK_SECRET)) {
      console.error('Invalid webhook signature');
      return res.status(401).send('Invalid signature');
    }

    const event = JSON.parse(req.body);
    console.log('Received event:', event.type);

    switch (event.type) {
      case 'session.ended':
        handleSessionEnded(event.data);
        break;
      case 'transcript.ready':
        handleTranscriptReady(event.data);
        break;
      case 'recording.ready':
        handleRecordingReady(event.data);
        break;
    }

    res.status(200).send('OK');
  }
);

Python / Flask

import hmac
import hashlib
import time
import os
from flask import Flask, request, abort

app = Flask(__name__)
WEBHOOK_SECRET = os.environ['KALLGLOT_WEBHOOK_SECRET']
TIMESTAMP_TOLERANCE = 300  # 5 minutes

def verify_webhook(payload, signature_header, secret):
    if not signature_header:
        return False

    # Parse signature header
    parts = dict(p.split('=', 1) for p in signature_header.split(','))
    timestamp = parts.get('t')
    expected_sig = parts.get('v1')

    if not timestamp or not expected_sig:
        return False

    # Check timestamp
    event_time = int(timestamp)
    current_time = int(time.time())
    if abs(current_time - event_time) > TIMESTAMP_TOLERANCE:
        return False

    # Compute signature
    signed_payload = f"{timestamp}.{payload}"
    computed_sig = hmac.new(
        secret.encode(),
        signed_payload.encode(),
        hashlib.sha256
    ).hexdigest()

    # Constant-time comparison
    return hmac.compare_digest(computed_sig, expected_sig)

@app.route('/webhooks/kallglot', methods=['POST'])
def webhook():
    payload = request.get_data(as_text=True)
    signature_header = request.headers.get('Kallglot-Signature')

    if not verify_webhook(payload, signature_header, WEBHOOK_SECRET):
        abort(401, 'Invalid signature')

    event = request.get_json()
    print(f"Received event: {event['type']}")

    if event['type'] == 'session.ended':
        handle_session_ended(event['data'])
    elif event['type'] == 'transcript.ready':
        handle_transcript_ready(event['data'])
    elif event['type'] == 'recording.ready':
        handle_recording_ready(event['data'])

    return 'OK', 200

Webhook Secret

Your webhook signing secret starts with whsec_ and is displayed once when you create a webhook endpoint in the Developer Portal. If you lose your secret:
  1. Go to Developer Portal > Webhooks
  2. Select your endpoint
  3. Click Roll Secret
  4. Update your application with the new secret
When you roll a secret, the old secret remains valid for 24 hours to allow for deployment. After that, only the new secret works.

Troubleshooting

”Invalid signature” errors

  1. Check the secret: Ensure you’re using the correct webhook secret (starts with whsec_)
  2. Check body parsing: The signature is computed on the raw body, not parsed JSON
  3. Check header parsing: Ensure you’re splitting by , then by =

”Timestamp outside tolerance” errors

  1. Check server time: Ensure your server’s clock is synchronized (NTP)
  2. Check for delays: If processing takes too long, the timestamp may expire

Testing signatures locally

# Ruby - Generate a test signature
timestamp = Time.now.to_i.to_s
body = { type: 'test', data: {} }.to_json
signed_payload = "#{timestamp}.#{body}"
signature = OpenSSL::HMAC.hexdigest('SHA256', 'whsec_test', signed_payload)

puts "Kallglot-Signature: t=#{timestamp},v1=#{signature}"
// Go - Generate a test signature
timestamp := strconv.FormatInt(time.Now().Unix(), 10)
body := `{"type":"test","data":{}}`
signedPayload := timestamp + "." + body

mac := hmac.New(sha256.New, []byte("whsec_test"))
mac.Write([]byte(signedPayload))
signature := hex.EncodeToString(mac.Sum(nil))

fmt.Printf("Kallglot-Signature: t=%s,v1=%s\n", timestamp, signature)