Documentation Index
Fetch the complete documentation index at: https://developer.kallglot.com/llms.txt
Use this file to discover all available pages before exploring further.
Webhook Signatures
Kallglot signs all webhook events so you can verify they’re authentic. This prevents attackers from sending fake events to your endpoint.Signature Header Format
Each webhook request includes aKallglot-Signature header with the timestamp and signature:
Kallglot-Signature: t=1710072360,v1=5257a869e7ecebeda32affa62cdca3fa51cad7e77a0e56ff536d0ce8e108d8bd
| Component | Description |
|---|---|
t | Unix timestamp when the event was sent |
v1 | HMAC-SHA256 signature of the payload |
Verification Process
1. Parse the Signature Header
Extract the timestamp and signature from the header:# Ruby
parts = signature_header.split(',').map { |p| p.split('=', 2) }.to_h
timestamp = parts['t']
signature = parts['v1']
// Go
func parseSignature(header string) (timestamp, signature string) {
parts := make(map[string]string)
for _, part := range strings.Split(header, ",") {
kv := strings.SplitN(part, "=", 2)
if len(kv) == 2 {
parts[kv[0]] = kv[1]
}
}
return parts["t"], parts["v1"]
}
// JavaScript
const parts = signatureHeader.split(',').reduce((acc, part) => {
const [key, value] = part.split('=', 2);
acc[key] = value;
return acc;
}, {});
const timestamp = parts['t'];
const signature = parts['v1'];
# Python
parts = dict(p.split('=', 1) for p in signature_header.split(','))
timestamp = parts['t']
signature = parts['v1']
2. Prepare the Signed Payload
The signature is computed over:{timestamp}.{body}
signed_payload = "#{timestamp}.#{raw_body}"
signedPayload := fmt.Sprintf("%s.%s", timestamp, rawBody)
3. Compute Expected Signature
computed_sig = OpenSSL::HMAC.hexdigest('SHA256', webhook_secret, signed_payload)
mac := hmac.New(sha256.New, []byte(webhookSecret))
mac.Write([]byte(signedPayload))
computedSig := hex.EncodeToString(mac.Sum(nil))
4. Compare Signatures
Use a constant-time comparison to prevent timing attacks:# Pure Ruby (2.5+)
OpenSSL.secure_compare(computed_sig, signature)
# ActiveSupport (Rails app)
ActiveSupport::SecurityUtils.secure_compare(computed_sig, signature)
hmac.Equal([]byte(computedSig), []byte(signature))
5. Check Timestamp
Reject events older than 5 minutes to prevent replay attacks:event_time = timestamp.to_i
current_time = Time.now.to_i
return false if (current_time - event_time).abs > 300
eventTime, _ := strconv.ParseInt(timestamp, 10, 64)
currentTime := time.Now().Unix()
if abs(currentTime-eventTime) > 300 {
return false
}
Complete Examples
Ruby (Pure)
require 'json'
require 'openssl'
require 'webrick'
WEBHOOK_SECRET = ENV['KALLGLOT_WEBHOOK_SECRET']
TIMESTAMP_TOLERANCE = 300 # 5 minutes
def verify_webhook(payload, signature_header, secret)
return false if signature_header.nil? || signature_header.empty?
# Parse signature header
parts = signature_header.split(',').map { |p| p.split('=', 2) }.to_h
timestamp = parts['t']
expected_sig = parts['v1']
return false if timestamp.nil? || expected_sig.nil?
# Check timestamp
event_time = timestamp.to_i
current_time = Time.now.to_i
return false if (current_time - event_time).abs > TIMESTAMP_TOLERANCE
# Compute and compare signature
signed_payload = "#{timestamp}.#{payload}"
computed_sig = OpenSSL::HMAC.hexdigest('SHA256', secret, signed_payload)
# Constant-time comparison (Ruby 2.5+)
OpenSSL.secure_compare(computed_sig, expected_sig)
end
# Example with WEBrick
server = WEBrick::HTTPServer.new(Port: 3000)
server.mount_proc '/webhooks/kallglot' do |req, res|
signature_header = req['Kallglot-Signature']
unless verify_webhook(req.body, signature_header, WEBHOOK_SECRET)
res.status = 401
res.body = 'Invalid signature'
next
end
event = JSON.parse(req.body)
puts "Received event: #{event['type']}"
case event['type']
when 'session.ended'
handle_session_ended(event['data'])
when 'transcript.ready'
handle_transcript_ready(event['data'])
when 'recording.ready'
handle_recording_ready(event['data'])
end
res.status = 200
res.body = 'OK'
end
Ruby on Rails (receiver example)
class WebhooksController < ApplicationController
skip_before_action :verify_authenticity_token
WEBHOOK_SECRET = ENV['KALLGLOT_WEBHOOK_SECRET']
TIMESTAMP_TOLERANCE = 300 # 5 minutes
def create
payload = request.raw_post
signature_header = request.headers['Kallglot-Signature']
unless verify_webhook(payload, signature_header, WEBHOOK_SECRET)
head :unauthorized
return
end
event = JSON.parse(payload)
Rails.logger.info "Received event: #{event['type']}"
case event['type']
when 'session.ended'
handle_session_ended(event['data'])
when 'transcript.ready'
handle_transcript_ready(event['data'])
when 'recording.ready'
handle_recording_ready(event['data'])
end
head :ok
end
private
def verify_webhook(payload, signature_header, secret)
return false if signature_header.blank?
# Parse signature header
parts = signature_header.split(',').map { |p| p.split('=', 2) }.to_h
timestamp = parts['t']
expected_sig = parts['v1']
return false if timestamp.blank? || expected_sig.blank?
# Check timestamp
event_time = timestamp.to_i
current_time = Time.now.to_i
return false if (current_time - event_time).abs > TIMESTAMP_TOLERANCE
# Compute and compare signature
signed_payload = "#{timestamp}.#{payload}"
computed_sig = OpenSSL::HMAC.hexdigest('SHA256', secret, signed_payload)
ActiveSupport::SecurityUtils.secure_compare(computed_sig, expected_sig)
end
end
Go
package main
import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"io"
"net/http"
"strconv"
"strings"
"time"
)
const (
webhookSecret = "whsec_your_secret"
timestampTolerance = 300 // 5 minutes
)
func webhookHandler(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read body", http.StatusBadRequest)
return
}
signatureHeader := r.Header.Get("Kallglot-Signature")
if !verifyWebhook(body, signatureHeader, webhookSecret) {
http.Error(w, "Invalid signature", http.StatusUnauthorized)
return
}
var event map[string]interface{}
if err := json.Unmarshal(body, &event); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
eventType := event["type"].(string)
switch eventType {
case "session.ended":
handleSessionEnded(event["data"])
case "transcript.ready":
handleTranscriptReady(event["data"])
case "recording.ready":
handleRecordingReady(event["data"])
}
w.WriteHeader(http.StatusOK)
}
func verifyWebhook(payload []byte, signatureHeader, secret string) bool {
if signatureHeader == "" {
return false
}
// Parse signature header
parts := make(map[string]string)
for _, part := range strings.Split(signatureHeader, ",") {
kv := strings.SplitN(part, "=", 2)
if len(kv) == 2 {
parts[kv[0]] = kv[1]
}
}
timestamp := parts["t"]
expectedSig := parts["v1"]
if timestamp == "" || expectedSig == "" {
return false
}
// Check timestamp
eventTime, err := strconv.ParseInt(timestamp, 10, 64)
if err != nil {
return false
}
currentTime := time.Now().Unix()
if abs(currentTime-eventTime) > timestampTolerance {
return false
}
// Compute signature
signedPayload := timestamp + "." + string(payload)
mac := hmac.New(sha256.New, []byte(secret))
mac.Write([]byte(signedPayload))
computedSig := hex.EncodeToString(mac.Sum(nil))
return hmac.Equal([]byte(computedSig), []byte(expectedSig))
}
func abs(n int64) int64 {
if n < 0 {
return -n
}
return n
}
Node.js / Express
import express from 'express';
import crypto from 'crypto';
const app = express();
const WEBHOOK_SECRET = process.env.KALLGLOT_WEBHOOK_SECRET;
const TIMESTAMP_TOLERANCE = 300; // 5 minutes
function verifyWebhook(payload, signatureHeader, secret) {
if (!signatureHeader) return false;
// Parse signature header
const parts = signatureHeader.split(',').reduce((acc, part) => {
const [key, value] = part.split('=', 2);
acc[key] = value;
return acc;
}, {});
const timestamp = parts['t'];
const expectedSig = parts['v1'];
if (!timestamp || !expectedSig) return false;
// Check timestamp
const eventTime = parseInt(timestamp, 10);
const currentTime = Math.floor(Date.now() / 1000);
if (Math.abs(currentTime - eventTime) > TIMESTAMP_TOLERANCE) {
return false;
}
// Compute signature
const signedPayload = `${timestamp}.${payload}`;
const computedSig = crypto
.createHmac('sha256', secret)
.update(signedPayload)
.digest('hex');
// Constant-time comparison
try {
return crypto.timingSafeEqual(
Buffer.from(computedSig),
Buffer.from(expectedSig)
);
} catch {
return false;
}
}
app.post('/webhooks/kallglot',
express.raw({ type: 'application/json' }),
(req, res) => {
const signatureHeader = req.headers['kallglot-signature'];
if (!verifyWebhook(req.body.toString(), signatureHeader, WEBHOOK_SECRET)) {
console.error('Invalid webhook signature');
return res.status(401).send('Invalid signature');
}
const event = JSON.parse(req.body);
console.log('Received event:', event.type);
switch (event.type) {
case 'session.ended':
handleSessionEnded(event.data);
break;
case 'transcript.ready':
handleTranscriptReady(event.data);
break;
case 'recording.ready':
handleRecordingReady(event.data);
break;
}
res.status(200).send('OK');
}
);
Python / Flask
import hmac
import hashlib
import time
import os
from flask import Flask, request, abort
app = Flask(__name__)
WEBHOOK_SECRET = os.environ['KALLGLOT_WEBHOOK_SECRET']
TIMESTAMP_TOLERANCE = 300 # 5 minutes
def verify_webhook(payload, signature_header, secret):
if not signature_header:
return False
# Parse signature header
parts = dict(p.split('=', 1) for p in signature_header.split(','))
timestamp = parts.get('t')
expected_sig = parts.get('v1')
if not timestamp or not expected_sig:
return False
# Check timestamp
event_time = int(timestamp)
current_time = int(time.time())
if abs(current_time - event_time) > TIMESTAMP_TOLERANCE:
return False
# Compute signature
signed_payload = f"{timestamp}.{payload}"
computed_sig = hmac.new(
secret.encode(),
signed_payload.encode(),
hashlib.sha256
).hexdigest()
# Constant-time comparison
return hmac.compare_digest(computed_sig, expected_sig)
@app.route('/webhooks/kallglot', methods=['POST'])
def webhook():
payload = request.get_data(as_text=True)
signature_header = request.headers.get('Kallglot-Signature')
if not verify_webhook(payload, signature_header, WEBHOOK_SECRET):
abort(401, 'Invalid signature')
event = request.get_json()
print(f"Received event: {event['type']}")
if event['type'] == 'session.ended':
handle_session_ended(event['data'])
elif event['type'] == 'transcript.ready':
handle_transcript_ready(event['data'])
elif event['type'] == 'recording.ready':
handle_recording_ready(event['data'])
return 'OK', 200
Webhook Secret
Your webhook signing secret starts withwhsec_ and is displayed once when you create a webhook endpoint in the Developer Portal.
If you lose your secret:
- Go to Developer Portal > Webhooks
- Select your endpoint
- Click Roll Secret
- Update your application with the new secret
When you roll a secret, the old secret remains valid for 24 hours to allow for deployment. After that, only the new secret works.
Troubleshooting
”Invalid signature” errors
- Check the secret: Ensure you’re using the correct webhook secret (starts with
whsec_) - Check body parsing: The signature is computed on the raw body, not parsed JSON
- Check header parsing: Ensure you’re splitting by
,then by=
”Timestamp outside tolerance” errors
- Check server time: Ensure your server’s clock is synchronized (NTP)
- Check for delays: If processing takes too long, the timestamp may expire
Testing signatures locally
# Ruby - Generate a test signature
timestamp = Time.now.to_i.to_s
body = { type: 'test', data: {} }.to_json
signed_payload = "#{timestamp}.#{body}"
signature = OpenSSL::HMAC.hexdigest('SHA256', 'whsec_test', signed_payload)
puts "Kallglot-Signature: t=#{timestamp},v1=#{signature}"
// Go - Generate a test signature
timestamp := strconv.FormatInt(time.Now().Unix(), 10)
body := `{"type":"test","data":{}}`
signedPayload := timestamp + "." + body
mac := hmac.New(sha256.New, []byte("whsec_test"))
mac.Write([]byte(signedPayload))
signature := hex.EncodeToString(mac.Sum(nil))
fmt.Printf("Kallglot-Signature: t=%s,v1=%s\n", timestamp, signature)