EmiliaVision Webhook Integration - Troubleshooting & Best Practices¶
Last Updated: November 2025
Version: 2.0
🔍 Common Issues & Solutions¶
Authentication Issues¶
Problem: 401 Unauthorized Error¶
Causes & Solutions:
-
Incorrect Token
-
Token in Wrong Place
-
Inactive Token
- Contact team@emiliavision.com to verify token status
- Request token reactivation if needed
Payload Issues¶
Problem: 413 Payload Too Large¶
Solution: Reduce payload size
# Check payload size before sending
import json
import sys
payload = {...} # Your data
payload_size = sys.getsizeof(json.dumps(payload))
if payload_size > 10 * 1024 * 1024: # 10MB
# Split into multiple webhooks or reduce data
print(f"Payload too large: {payload_size / 1024 / 1024:.2f}MB")
Problem: Silent Failures (200 but no data in dashboard)¶
Common Causes:
-
Missing Critical Fields
# Wrong - Missing table_number payload = { "event": "session_complete", "order_id": "ORD-123", "timestamp": "2025-11-24T12:00:00-03:00" } # Correct - Include table_number (CRITICAL!) payload = { "event": "session_complete", "order_id": "ORD-123", "table_number": "Mesa 8", # ✅ ESSENTIAL "timestamp": "2025-11-24T12:00:00-03:00" } -
Wrong Environment
-
Invalid JSON
Timing Issues¶
Problem: Incorrect Timestamps¶
Issue: Timestamps without timezone
# Wrong - No timezone
timestamp = "2025-11-24T18:30:00"
# Correct - Include timezone offset
timestamp = "2025-11-24T18:30:00-03:00" # São Paulo time
Issue: Using wrong timezone
from datetime import datetime, timezone
import pytz
# Wrong - Using UTC when restaurant is in São Paulo
timestamp = datetime.now(timezone.utc).isoformat()
# Correct - Use restaurant's local timezone
sp_tz = pytz.timezone('America/Sao_Paulo')
timestamp = datetime.now(sp_tz).isoformat()
Network Issues¶
Problem: Timeouts¶
Solution: Increase timeout and implement retry
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
def create_session_with_retries():
session = requests.Session()
retry = Retry(
total=3,
read=3,
connect=3,
backoff_factor=0.3,
status_forcelist=(500, 502, 503, 504)
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
# Use session with automatic retries
session = create_session_with_retries()
response = session.post(url, json=payload, timeout=30)
Problem: Connection Errors¶
Solution: Implement robust error handling
import time
import requests
def send_webhook_robust(url, payload, max_attempts=3):
"""Send webhook with comprehensive error handling"""
for attempt in range(max_attempts):
try:
response = requests.post(
url,
json=payload,
timeout=30
)
# Check response
if response.status_code == 200:
print(f"✅ Success: {response.json()}")
return response.json()
# Handle specific errors
if response.status_code == 401:
print("❌ Invalid token - not retrying")
return None
if response.status_code == 413:
print("❌ Payload too large - not retrying")
return None
if response.status_code == 429:
# Rate limited - wait longer
wait_time = min(60, 2 ** attempt)
print(f"⏳ Rate limited - waiting {wait_time}s")
time.sleep(wait_time)
continue
# Server errors - retry
if 500 <= response.status_code < 600:
wait_time = 2 ** attempt
print(f"⚠️ Server error {response.status_code} - retry in {wait_time}s")
time.sleep(wait_time)
continue
except requests.exceptions.Timeout:
print(f"⏱️ Timeout on attempt {attempt + 1}")
time.sleep(2 ** attempt)
except requests.exceptions.ConnectionError as e:
print(f"🔌 Connection error on attempt {attempt + 1}: {e}")
time.sleep(2 ** attempt)
except Exception as e:
print(f"❌ Unexpected error: {e}")
time.sleep(2 ** attempt)
print(f"❌ Failed after {max_attempts} attempts")
return None
✅ Best Practices¶
1. Data Quality¶
Always Include Timestamps for Items¶
# ❌ Bad - No timestamp for when ordered
{
"items": [
{"name": "Pasta", "price": 85.00}
]
}
# ✅ Good - Include when each item was ordered
{
"items": [
{
"timestamp": "2025-11-24T18:35:00-03:00",
"employee_id": 72,
"name": "Pasta",
"price": 85.00
}
]
}
Use Consistent Identifiers¶
# Pick one format and stick to it
table_formats = {
"good": ["Mesa 8", "Mesa 12", "Mesa 104"], # ✅ Consistent
"bad": ["Mesa 8", "Table 12", "M-104"] # ❌ Inconsistent
}
Include Employee Information¶
# The WHO is as important as the WHAT
{
"opened_by": {"id": 72, "name": "João Silva"},
"items": [
{
"employee_id": 72, # Who took this order
"timestamp": "2025-11-24T18:35:00-03:00"
}
],
"closed_by": {"id": 96, "name": "Maria Santos"}
}
2. Integration Architecture¶
Implement a Queue System¶
import queue
import threading
import time
class WebhookQueue:
def __init__(self, webhook_url):
self.url = webhook_url
self.queue = queue.Queue()
self.worker_thread = threading.Thread(target=self._worker)
self.worker_thread.daemon = True
self.worker_thread.start()
def add(self, payload):
"""Add webhook to queue"""
self.queue.put(payload)
def _worker(self):
"""Background worker to send webhooks"""
while True:
try:
payload = self.queue.get(timeout=1)
self._send_with_retry(payload)
self.queue.task_done()
except queue.Empty:
continue
def _send_with_retry(self, payload):
"""Send with retry logic"""
# Implementation from above
pass
# Usage
webhook_queue = WebhookQueue(webhook_url)
# Add to queue (non-blocking)
webhook_queue.add({
"event": "session_opened",
"table_number": "Mesa 8",
# ...
})
Log Everything¶
import logging
import json
from datetime import datetime
# Set up logging
logging.basicConfig(
filename='webhook.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def send_webhook_with_logging(url, payload):
"""Send webhook with comprehensive logging"""
# Log attempt
logging.info(f"Sending webhook: {json.dumps(payload)[:200]}")
try:
response = requests.post(url, json=payload, timeout=30)
# Log response
logging.info(f"Response {response.status_code}: {response.text[:200]}")
if response.status_code == 200:
return response.json()
else:
logging.error(f"Failed with {response.status_code}: {response.text}")
return None
except Exception as e:
logging.error(f"Exception: {e}")
return None
3. Performance Optimization¶
Batch When Possible¶
# Instead of sending 100 individual item updates
# Send them in logical batches
def batch_items_by_time(items, batch_window_seconds=60):
"""Group items ordered within same time window"""
batches = []
current_batch = []
current_window_start = None
for item in sorted(items, key=lambda x: x['timestamp']):
item_time = datetime.fromisoformat(item['timestamp'])
if current_window_start is None:
current_window_start = item_time
current_batch = [item]
elif (item_time - current_window_start).seconds <= batch_window_seconds:
current_batch.append(item)
else:
batches.append(current_batch)
current_batch = [item]
current_window_start = item_time
if current_batch:
batches.append(current_batch)
return batches
# Send batched updates
for batch in batch_items_by_time(all_items):
send_webhook({
"event": "items_added",
"items": batch
})
Use Connection Pooling¶
import requests
# Create a session for connection pooling
session = requests.Session()
# Reuse the session for multiple requests
for payload in webhooks_to_send:
response = session.post(url, json=payload)
4. Monitoring & Alerting¶
Track Success Rate¶
class WebhookMonitor:
def __init__(self):
self.total_sent = 0
self.successful = 0
self.failed = 0
self.errors = {}
def record_success(self):
self.total_sent += 1
self.successful += 1
def record_failure(self, error_code):
self.total_sent += 1
self.failed += 1
self.errors[error_code] = self.errors.get(error_code, 0) + 1
def get_stats(self):
success_rate = (self.successful / self.total_sent * 100) if self.total_sent > 0 else 0
return {
"total": self.total_sent,
"successful": self.successful,
"failed": self.failed,
"success_rate": f"{success_rate:.2f}%",
"errors": self.errors
}
def should_alert(self):
# Alert if success rate drops below 95%
if self.total_sent > 100:
success_rate = self.successful / self.total_sent
return success_rate < 0.95
return False
# Usage
monitor = WebhookMonitor()
# Track results
if response.status_code == 200:
monitor.record_success()
else:
monitor.record_failure(response.status_code)
# Check if we should alert
if monitor.should_alert():
send_alert_email("Webhook success rate below 95%")
5. Testing Strategy¶
Use Test Environment First¶
class WebhookClient:
def __init__(self, token, environment='dev'):
self.token = token
self.environment = environment
self.base_url = f"https://api.emiliavision.com/webhooks/v1/pos/{token}"
if environment == 'dev':
self.url = f"{self.base_url}?env=dev"
else:
self.url = self.base_url
def test_connection(self):
"""Test webhook connectivity"""
test_payload = {
"event": "connection_test",
"timestamp": datetime.now().isoformat(),
"table_number": "Test Table"
}
response = requests.post(self.url, json=test_payload)
return response.status_code == 200
def switch_to_production(self):
"""Switch from dev to production"""
if self.test_connection():
self.environment = 'prod'
self.url = self.base_url
print("✅ Switched to production")
return True
else:
print("❌ Connection test failed")
return False
Create Test Data Generator¶
import random
from datetime import datetime, timedelta
def generate_test_session():
"""Generate realistic test data"""
start_time = datetime.now() - timedelta(hours=2)
end_time = datetime.now()
items = []
current_time = start_time
# Generate realistic item ordering pattern
for i in range(random.randint(2, 8)):
current_time += timedelta(minutes=random.randint(5, 15))
items.append({
"timestamp": current_time.isoformat(),
"employee_id": random.choice([72, 96, 113]),
"name": random.choice(["Pasta", "Pizza", "Salad", "Wine", "Dessert"]),
"quantity": random.randint(1, 3),
"price": round(random.uniform(10, 100), 2)
})
subtotal = sum(item["price"] * item["quantity"] for item in items)
tip = round(subtotal * 0.13, 2)
tax = round(subtotal * 0.10, 2)
return {
"event": "session_complete",
"order_id": f"TEST-{random.randint(1000, 9999)}",
"table_number": f"Mesa {random.randint(1, 20)}",
"session_start": start_time.isoformat(),
"session_end": end_time.isoformat(),
"items": items,
"totals": {
"subtotal": subtotal,
"tip": tip,
"tax": tax,
"total": subtotal + tip + tax
}
}
# Generate and send test data
test_session = generate_test_session()
send_webhook(test_url, test_session)
📊 Debugging Checklist¶
When webhooks aren't working:
1. Check Basics¶
- Token is complete UUID (36 characters)
- Token is in URL path, not header
- Using HTTPS (not HTTP)
- Content-Type is
application/json
2. Verify Payload¶
- Valid JSON structure
-
table_numberfield present (CRITICAL!) - Timestamps include timezone
- No null values in required fields
3. Test Environment¶
- Using
?env=devfor testing - Checking correct dashboard (dev vs prod)
- Allowing time for processing (5 minutes)
4. Network & Security¶
- No firewall blocking outbound HTTPS
- No proxy interfering
- SSL certificates valid
- Timeout set to at least 30 seconds
5. Response Handling¶
- Checking HTTP status code
- Reading response body for errors
- Logging all attempts
- Implementing retry logic
🔧 Diagnostic Script¶
Use this script to diagnose integration issues:
#!/usr/bin/env python3
"""
EmiliaVision Webhook Diagnostic Tool
"""
import requests
import json
import sys
from datetime import datetime
import time
def diagnose_webhook(token):
"""Run comprehensive webhook diagnostics"""
print("🔍 EmiliaVision Webhook Diagnostics")
print("=" * 50)
# Test 1: Token Format
print("\n1. Checking token format...")
if len(token) != 36 or token.count('-') != 4:
print("❌ Token format invalid (should be UUID)")
return
print("✅ Token format valid")
# Test 2: Network Connectivity
print("\n2. Testing network connectivity...")
try:
response = requests.get("https://api.emiliavision.com/health", timeout=5)
if response.status_code == 200:
print("✅ API is reachable")
else:
print(f"⚠️ API returned status {response.status_code}")
except Exception as e:
print(f"❌ Cannot reach API: {e}")
return
# Test 3: Dev Environment
print("\n3. Testing dev environment...")
dev_url = f"https://api.emiliavision.com/webhooks/v1/pos/{token}?env=dev"
test_payload = {
"event": "diagnostic_test",
"timestamp": datetime.now().isoformat(),
"table_number": "Diagnostic Test",
"diagnostic": True
}
try:
response = requests.post(
dev_url,
json=test_payload,
timeout=10,
headers={"Content-Type": "application/json"}
)
print(f" Status Code: {response.status_code}")
print(f" Response: {response.text[:200]}")
if response.status_code == 200:
print("✅ Dev environment working")
event_id = response.json().get("event_id")
print(f" Event ID: {event_id}")
elif response.status_code == 401:
print("❌ Token is invalid or inactive")
return
else:
print(f"❌ Unexpected response: {response.status_code}")
except Exception as e:
print(f"❌ Request failed: {e}")
return
# Test 4: Production Environment
print("\n4. Testing production environment...")
prod_url = f"https://api.emiliavision.com/webhooks/v1/pos/{token}"
try:
response = requests.post(
prod_url,
json=test_payload,
timeout=10,
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
print("✅ Production environment working")
else:
print(f"⚠️ Production returned: {response.status_code}")
except Exception as e:
print(f"❌ Production request failed: {e}")
# Test 5: Payload Size
print("\n5. Testing payload sizes...")
# Small payload
small_payload = {"event": "test", "table_number": "Mesa 1", "timestamp": datetime.now().isoformat()}
response = requests.post(dev_url, json=small_payload)
if response.status_code == 200:
print("✅ Small payload (< 1KB): OK")
# Medium payload with items
medium_payload = {
"event": "session_complete",
"table_number": "Mesa 1",
"timestamp": datetime.now().isoformat(),
"items": [{"name": f"Item {i}", "price": 10.00} for i in range(100)]
}
response = requests.post(dev_url, json=medium_payload)
if response.status_code == 200:
print("✅ Medium payload (~10KB): OK")
# Test 6: Response Time
print("\n6. Measuring response times...")
times = []
for i in range(3):
start = time.time()
response = requests.post(dev_url, json=test_payload, timeout=30)
elapsed = (time.time() - start) * 1000
times.append(elapsed)
print(f" Request {i+1}: {elapsed:.0f}ms")
avg_time = sum(times) / len(times)
print(f" Average: {avg_time:.0f}ms")
if avg_time < 500:
print("✅ Excellent response time")
elif avg_time < 1000:
print("✅ Good response time")
else:
print("⚠️ Slow response time")
print("\n" + "=" * 50)
print("✅ Diagnostics complete!")
print("\nNext steps:")
print("1. If all tests passed, you're ready for production")
print("2. If token is invalid, request a new one")
print("3. If network issues, check firewall/proxy settings")
print("4. Contact team@emiliavision.com if problems persist")
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python diagnose.py YOUR_TOKEN")
sys.exit(1)
diagnose_webhook(sys.argv[1])
📞 Getting Help¶
Support Channels¶
| Issue Type | Contact | Response Time |
|---|---|---|
| Token Issues | team@emiliavision.com | < 24 hours |
| Technical Problems | team@emiliavision.com | < 4 hours |
| Urgent Issues | team@emiliavision.com (mark URGENT) | < 2 hours |
Information to Include¶
When contacting support, include:
- Your webhook token (first 8 characters only)
- Error messages (complete response)
- Sample payload that's failing
- Timestamp of failed attempt
- Environment (dev or prod)
- POS system name and version
Sample Support Request¶
Subject: Webhook Integration Issue - 401 Error
Token: 019ab6ed-xxxx (first 8 chars)
Environment: Development
POS System: CustomPOS v3.2
Error: 401 Unauthorized
Timestamp: 2025-11-24T18:30:00-03:00
Sample payload:
{
"event": "session_complete",
"table_number": "Mesa 8",
...
}
Error response:
{
"detail": "Invalid or inactive webhook token"
}
We've verified the token matches what was provided.
Please assist.
Remember: Most issues are related to missing table_number field or incorrect token placement. Always check these first!