Webhook Signatures
Kallglot signs all webhook events so you can verify they’re authentic. This prevents attackers from sending fake events to your endpoint.Signature Header Format
Each webhook request includes aKallglot-Signature header with the timestamp and signature:
Copy
Ask AI
Kallglot-Signature: t=1710072360,v1=5257a869e7ecebeda32affa62cdca3fa51cad7e77a0e56ff536d0ce8e108d8bd
| Component | Description |
|---|---|
t | Unix timestamp when the event was sent |
v1 | HMAC-SHA256 signature of the payload |
Verification Process
1. Parse the Signature Header
Extract the timestamp and signature from the header:Copy
Ask AI
# Ruby
parts = signature_header.split(',').map { |p| p.split('=', 2) }.to_h
timestamp = parts['t']
signature = parts['v1']
Copy
Ask AI
// Go
func parseSignature(header string) (timestamp, signature string) {
parts := make(map[string]string)
for _, part := range strings.Split(header, ",") {
kv := strings.SplitN(part, "=", 2)
if len(kv) == 2 {
parts[kv[0]] = kv[1]
}
}
return parts["t"], parts["v1"]
}
Copy
Ask AI
// JavaScript
const parts = signatureHeader.split(',').reduce((acc, part) => {
const [key, value] = part.split('=', 2);
acc[key] = value;
return acc;
}, {});
const timestamp = parts['t'];
const signature = parts['v1'];
Copy
Ask AI
# Python
parts = dict(p.split('=', 1) for p in signature_header.split(','))
timestamp = parts['t']
signature = parts['v1']
2. Prepare the Signed Payload
The signature is computed over:{timestamp}.{body}
Copy
Ask AI
signed_payload = "#{timestamp}.#{raw_body}"
Copy
Ask AI
signedPayload := fmt.Sprintf("%s.%s", timestamp, rawBody)
3. Compute Expected Signature
Copy
Ask AI
computed_sig = OpenSSL::HMAC.hexdigest('SHA256', webhook_secret, signed_payload)
Copy
Ask AI
mac := hmac.New(sha256.New, []byte(webhookSecret))
mac.Write([]byte(signedPayload))
computedSig := hex.EncodeToString(mac.Sum(nil))
4. Compare Signatures
Use a constant-time comparison to prevent timing attacks:Copy
Ask AI
ActiveSupport::SecurityUtils.secure_compare(computed_sig, signature)
Copy
Ask AI
hmac.Equal([]byte(computedSig), []byte(signature))
5. Check Timestamp
Reject events older than 5 minutes to prevent replay attacks:Copy
Ask AI
event_time = timestamp.to_i
current_time = Time.now.to_i
return false if (current_time - event_time).abs > 300
Copy
Ask AI
eventTime, _ := strconv.ParseInt(timestamp, 10, 64)
currentTime := time.Now().Unix()
if abs(currentTime-eventTime) > 300 {
return false
}
Complete Examples
Ruby / Rails
Copy
Ask AI
class WebhooksController < ApplicationController
skip_before_action :verify_authenticity_token
WEBHOOK_SECRET = ENV['KALLGLOT_WEBHOOK_SECRET']
TIMESTAMP_TOLERANCE = 300 # 5 minutes
def create
payload = request.raw_post
signature_header = request.headers['Kallglot-Signature']
unless verify_webhook(payload, signature_header, WEBHOOK_SECRET)
head :unauthorized
return
end
event = JSON.parse(payload)
Rails.logger.info "Received event: #{event['type']}"
case event['type']
when 'session.ended'
handle_session_ended(event['data'])
when 'transcript.ready'
handle_transcript_ready(event['data'])
when 'recording.ready'
handle_recording_ready(event['data'])
end
head :ok
end
private
def verify_webhook(payload, signature_header, secret)
return false if signature_header.blank?
# Parse signature header
parts = signature_header.split(',').map { |p| p.split('=', 2) }.to_h
timestamp = parts['t']
expected_sig = parts['v1']
return false if timestamp.blank? || expected_sig.blank?
# Check timestamp
event_time = timestamp.to_i
current_time = Time.now.to_i
return false if (current_time - event_time).abs > TIMESTAMP_TOLERANCE
# Compute and compare signature
signed_payload = "#{timestamp}.#{payload}"
computed_sig = OpenSSL::HMAC.hexdigest('SHA256', secret, signed_payload)
ActiveSupport::SecurityUtils.secure_compare(computed_sig, expected_sig)
end
end
Go
Copy
Ask AI
package main
import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"io"
"net/http"
"strconv"
"strings"
"time"
)
const (
webhookSecret = "whsec_your_secret"
timestampTolerance = 300 // 5 minutes
)
func webhookHandler(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read body", http.StatusBadRequest)
return
}
signatureHeader := r.Header.Get("Kallglot-Signature")
if !verifyWebhook(body, signatureHeader, webhookSecret) {
http.Error(w, "Invalid signature", http.StatusUnauthorized)
return
}
var event map[string]interface{}
if err := json.Unmarshal(body, &event); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
eventType := event["type"].(string)
switch eventType {
case "session.ended":
handleSessionEnded(event["data"])
case "transcript.ready":
handleTranscriptReady(event["data"])
case "recording.ready":
handleRecordingReady(event["data"])
}
w.WriteHeader(http.StatusOK)
}
func verifyWebhook(payload []byte, signatureHeader, secret string) bool {
if signatureHeader == "" {
return false
}
// Parse signature header
parts := make(map[string]string)
for _, part := range strings.Split(signatureHeader, ",") {
kv := strings.SplitN(part, "=", 2)
if len(kv) == 2 {
parts[kv[0]] = kv[1]
}
}
timestamp := parts["t"]
expectedSig := parts["v1"]
if timestamp == "" || expectedSig == "" {
return false
}
// Check timestamp
eventTime, err := strconv.ParseInt(timestamp, 10, 64)
if err != nil {
return false
}
currentTime := time.Now().Unix()
if abs(currentTime-eventTime) > timestampTolerance {
return false
}
// Compute signature
signedPayload := timestamp + "." + string(payload)
mac := hmac.New(sha256.New, []byte(secret))
mac.Write([]byte(signedPayload))
computedSig := hex.EncodeToString(mac.Sum(nil))
return hmac.Equal([]byte(computedSig), []byte(expectedSig))
}
func abs(n int64) int64 {
if n < 0 {
return -n
}
return n
}
Node.js / Express
Copy
Ask AI
import express from 'express';
import crypto from 'crypto';
const app = express();
const WEBHOOK_SECRET = process.env.KALLGLOT_WEBHOOK_SECRET;
const TIMESTAMP_TOLERANCE = 300; // 5 minutes
function verifyWebhook(payload, signatureHeader, secret) {
if (!signatureHeader) return false;
// Parse signature header
const parts = signatureHeader.split(',').reduce((acc, part) => {
const [key, value] = part.split('=', 2);
acc[key] = value;
return acc;
}, {});
const timestamp = parts['t'];
const expectedSig = parts['v1'];
if (!timestamp || !expectedSig) return false;
// Check timestamp
const eventTime = parseInt(timestamp, 10);
const currentTime = Math.floor(Date.now() / 1000);
if (Math.abs(currentTime - eventTime) > TIMESTAMP_TOLERANCE) {
return false;
}
// Compute signature
const signedPayload = `${timestamp}.${payload}`;
const computedSig = crypto
.createHmac('sha256', secret)
.update(signedPayload)
.digest('hex');
// Constant-time comparison
try {
return crypto.timingSafeEqual(
Buffer.from(computedSig),
Buffer.from(expectedSig)
);
} catch {
return false;
}
}
app.post('/webhooks/kallglot',
express.raw({ type: 'application/json' }),
(req, res) => {
const signatureHeader = req.headers['kallglot-signature'];
if (!verifyWebhook(req.body.toString(), signatureHeader, WEBHOOK_SECRET)) {
console.error('Invalid webhook signature');
return res.status(401).send('Invalid signature');
}
const event = JSON.parse(req.body);
console.log('Received event:', event.type);
switch (event.type) {
case 'session.ended':
handleSessionEnded(event.data);
break;
case 'transcript.ready':
handleTranscriptReady(event.data);
break;
case 'recording.ready':
handleRecordingReady(event.data);
break;
}
res.status(200).send('OK');
}
);
Python / Flask
Copy
Ask AI
import hmac
import hashlib
import time
import os
from flask import Flask, request, abort
app = Flask(__name__)
WEBHOOK_SECRET = os.environ['KALLGLOT_WEBHOOK_SECRET']
TIMESTAMP_TOLERANCE = 300 # 5 minutes
def verify_webhook(payload, signature_header, secret):
if not signature_header:
return False
# Parse signature header
parts = dict(p.split('=', 1) for p in signature_header.split(','))
timestamp = parts.get('t')
expected_sig = parts.get('v1')
if not timestamp or not expected_sig:
return False
# Check timestamp
event_time = int(timestamp)
current_time = int(time.time())
if abs(current_time - event_time) > TIMESTAMP_TOLERANCE:
return False
# Compute signature
signed_payload = f"{timestamp}.{payload}"
computed_sig = hmac.new(
secret.encode(),
signed_payload.encode(),
hashlib.sha256
).hexdigest()
# Constant-time comparison
return hmac.compare_digest(computed_sig, expected_sig)
@app.route('/webhooks/kallglot', methods=['POST'])
def webhook():
payload = request.get_data(as_text=True)
signature_header = request.headers.get('Kallglot-Signature')
if not verify_webhook(payload, signature_header, WEBHOOK_SECRET):
abort(401, 'Invalid signature')
event = request.get_json()
print(f"Received event: {event['type']}")
if event['type'] == 'session.ended':
handle_session_ended(event['data'])
elif event['type'] == 'transcript.ready':
handle_transcript_ready(event['data'])
elif event['type'] == 'recording.ready':
handle_recording_ready(event['data'])
return 'OK', 200
Webhook Secret
Your webhook signing secret starts withwhsec_ and is displayed once when you create a webhook endpoint in the Developer Portal.
If you lose your secret:
- Go to Developer Portal > Webhooks
- Select your endpoint
- Click Roll Secret
- Update your application with the new secret
When you roll a secret, the old secret remains valid for 24 hours to allow for deployment. After that, only the new secret works.
Troubleshooting
”Invalid signature” errors
- Check the secret: Ensure you’re using the correct webhook secret (starts with
whsec_) - Check body parsing: The signature is computed on the raw body, not parsed JSON
- Check header parsing: Ensure you’re splitting by
,then by=
”Timestamp outside tolerance” errors
- Check server time: Ensure your server’s clock is synchronized (NTP)
- Check for delays: If processing takes too long, the timestamp may expire
Testing signatures locally
Copy
Ask AI
# Ruby - Generate a test signature
timestamp = Time.now.to_i.to_s
body = { type: 'test', data: {} }.to_json
signed_payload = "#{timestamp}.#{body}"
signature = OpenSSL::HMAC.hexdigest('SHA256', 'whsec_test', signed_payload)
puts "Kallglot-Signature: t=#{timestamp},v1=#{signature}"
Copy
Ask AI
// Go - Generate a test signature
timestamp := strconv.FormatInt(time.Now().Unix(), 10)
body := `{"type":"test","data":{}}`
signedPayload := timestamp + "." + body
mac := hmac.New(sha256.New, []byte("whsec_test"))
mac.Write([]byte(signedPayload))
signature := hex.EncodeToString(mac.Sum(nil))
fmt.Printf("Kallglot-Signature: t=%s,v1=%s\n", timestamp, signature)