Skip to content

gRPC Services

capiscio-core exposes gRPC services for validation, scoring, and badge operations. The gRPC server is automatically started and managed by the SDK when using the Python or Node.js SDKs.

No Manual Server Start Required

There is no capiscio rpc CLI command. The SDKs include a process manager that automatically spawns the capiscio-core binary and connects via gRPC. You don't need to start the server manually.

Overview

The gRPC API provides four core services:

Service Description
ScoringService Validate and score agent cards
BadgeService Issue, verify, and manage trust badges
MCPService MCP tool access control and server identity (RFC-006/007)
ValidationService Schema validation for agent cards
SimpleGuardService Authority envelope operations (RFC-008)

Connecting to the Server

Python SDK

from capiscio_sdk._rpc.client import CapiscioRPCClient

# Connect to local server
client = CapiscioRPCClient(address='localhost:50051', auto_start=False)

# Or auto-start embedded server
client = CapiscioRPCClient()  # Starts server automatically

Go

import (
    pb "github.com/capiscio/capiscio-core/pkg/rpc/gen/capiscio/v1"
    "google.golang.org/grpc"
)

conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
    log.Fatal(err)
}
defer conn.Close()

scoringClient := pb.NewScoringServiceClient(conn)
badgeClient := pb.NewBadgeServiceClient(conn)

ScoringService

Validates agent cards and generates trust scores.

ScoreAgentCard

Score an agent card and get detailed category breakdowns.

rpc ScoreAgentCard(ScoreAgentCardRequest) returns (ScoreAgentCardResponse);

Request:

response = client.score_agent_card(
    agent_card_json='{"name": "My Agent", ...}',
    rule_set_id='default',  # Optional
)

Response:

{
  "result": {
    "overall_score": 0.85,
    "rating": "RATING_GOOD",
    "categories": [
      {
        "category": "SCORE_CATEGORY_COMPLIANCE",
        "score": 0.95,
        "rules_passed": 18,
        "rules_failed": 2
      },
      {
        "category": "SCORE_CATEGORY_SECURITY",
        "score": 0.75,
        "rules_passed": 8,
        "rules_failed": 4
      }
    ]
  }
}

Score Categories

Category Description
SCORE_CATEGORY_IDENTITY Agent identity and DID validation
SCORE_CATEGORY_CAPABILITIES Capability declarations
SCORE_CATEGORY_SECURITY Security practices and authentication
SCORE_CATEGORY_COMPLIANCE A2A protocol compliance
SCORE_CATEGORY_TRANSPARENCY Documentation and provider info

BadgeService

Issue and verify trust badges per RFC-002.

SignBadge

Sign a new badge with a private key.

rpc SignBadge(SignBadgeRequest) returns (SignBadgeResponse);

Request:

response = client.sign_badge(
    claims={
        "sub": "did:key:z6Mkf5rGMoatrSj1f4CyvuHBeXJELe9RPdzo2PKGNCKVtZxP",
        "domain": "my-agent.example.com",
        "trust_level": 1,  # TRUST_LEVEL_DV
    },
    private_key_jwk='{"kty": "OKP", "crv": "Ed25519", ...}',
    key_id="key-1",
)
print(response.token)  # Signed JWT

VerifyBadge

Verify a badge signature.

rpc VerifyBadge(VerifyBadgeRequest) returns (VerifyBadgeResponse);

Request:

response = client.verify_badge(
    token="eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9...",
    public_key_jwk='{"kty": "OKP", ...}',  # Optional if JWKS URL used
)

if response.valid:
    print(f"Subject: {response.claims.sub}")
    print(f"Trust Level: {response.claims.trust_level}")
else:
    print(f"Invalid: {response.error_message}")

VerifyBadgeWithOptions

Full verification with online checks.

rpc VerifyBadgeWithOptions(VerifyBadgeWithOptionsRequest) returns (VerifyBadgeResponse);

Options:

response = client.verify_badge_with_options(
    token="eyJhbGciOiJFZERTQSI...",
    options={
        "mode": "VERIFY_MODE_ONLINE",
        "trusted_issuers": ["https://registry.capisc.io"],
        "audience": "https://my-service.example.com",
        "accept_self_signed": False,  # Reject Level 0 in production
    },
)

RequestBadge

Request a badge from a Certificate Authority (CA).

rpc RequestBadge(RequestBadgeRequest) returns (RequestBadgeResponse);

Request:

response = client.request_badge(
    agent_id="550e8400-e29b-41d4-a716-446655440000",
    ca_url="https://registry.capisc.io",
    api_key="cpsc_live_xxx",
    domain="my-agent.example.com",
    trust_level=2,  # TRUST_LEVEL_OV
    ttl_seconds=300,
)

if response.success:
    print(f"Badge: {response.token}")
    print(f"Expires: {response.expires_at}")

StartKeeper

Start a background daemon that auto-renews badges.

rpc StartKeeper(StartKeeperRequest) returns (stream KeeperEvent);

Request:

# Stream keeper events
for event in client.start_keeper(
    mode="KEEPER_MODE_CA",
    agent_id="550e8400...",
    ca_url="https://registry.capisc.io",
    api_key="cpsc_live_xxx",
    output_file="./badge.jwt",
    ttl_seconds=300,
    renew_before_seconds=60,
):
    if event.type == "KEEPER_EVENT_RENEWED":
        print(f"Badge renewed: {event.badge_jti}")
    elif event.type == "KEEPER_EVENT_ERROR":
        print(f"Error: {event.error}")

MCPService

Model Context Protocol security enforcement implementing RFC-006 (Tool Authority) and RFC-007 (Server Identity).

EvaluateToolAccess

Evaluate whether a caller is authorized to invoke an MCP tool.

rpc EvaluateToolAccess(EvaluateToolAccessRequest) returns (EvaluateToolAccessResponse);

Request:

result = client.mcp.evaluate_tool_access(
    tool_name="write_file",
    params_hash="sha256:abc123...",
    server_origin="https://files.example.com",
    badge_jws=caller_badge,
    min_trust_level=2,  # Require OV
    trusted_issuers=["https://registry.capisc.io"],
)

Response:

{
  "decision": "allow",
  "agent_did": "did:web:example.com:agents:alice",
  "auth_level": "badge",
  "trust_level": 2,
  "evidence_id": "ev_abc123",
  "timestamp": "2025-01-15T12:00:00Z"
}

VerifyServerIdentity

Verify an MCP server's identity before trusting responses.

rpc VerifyServerIdentity(VerifyServerIdentityRequest) returns (VerifyServerIdentityResponse);

Request:

result = client.mcp.verify_server_identity(
    server_did="did:web:files.example.com:mcp:files",
    server_badge=server_badge_token,
    transport_origin="https://files.example.com",
    min_trust_level=1,
)

Response:

{
  "state": "verified_principal",
  "trust_level": 2,
  "server_did": "did:web:files.example.com:mcp:files",
  "badge_jti": "badge_xyz789"
}

EvaluatePolicyDecision

Evaluate an authorization decision via the PDP integration (RFC-005). The server-side PEP calls this RPC to enforce policy decisions from an external PDP.

rpc EvaluatePolicyDecision(PolicyDecisionRequest) returns (PolicyDecisionResponse);

Request:

Field Type Description
subject PolicySubject Badge-derived identity (did, badge_jti, ial, trust_level, badge_exp)
action PolicyAction Operation being attempted (operation, capability_class)
resource PolicyResource Target resource (identifier)
config PolicyConfig PEP/PDP configuration (pdp_endpoint, pdp_timeout_ms, enforcement_mode, pep_id, workspace, breakglass_public_key)
breakglass_token string Optional break-glass JWS for emergency override

Response:

Field Type Description
decision string ALLOW, DENY, or ALLOW_OBSERVE
decision_id string Unique evaluation ID
reason string Human-readable reason
ttl int32 Cache TTL in seconds
obligations MCPObligation[] Obligations to enforce (type, params_json)
enforcement_mode string Mode that was applied
cache_hit bool Whether the decision came from cache
breakglass_override bool Whether a break-glass override was applied
breakglass_jti string Break-glass token JTI (if override)
error_code string pdp_unavailable, pdp_timeout, pdp_invalid_response, or empty
pdp_latency_ms int64 PDP query latency
txn_id string Transaction ID

Error Handling

This RPC does not return gRPC errors for PDP unavailability. All outcomes (including PDP failures) are encoded in the response fields so SDKs can handle them uniformly.


ParseServerIdentity

Extract server identity from HTTP headers or JSON-RPC _meta.

rpc ParseServerIdentity(ParseServerIdentityRequest) returns (ParseServerIdentityResponse);

HTTP Headers:

identity = client.mcp.parse_server_identity_http(
    capiscio_server_did=response.headers.get("Capiscio-Server-DID", ""),
    capiscio_server_badge=response.headers.get("Capiscio-Server-Badge", ""),
)

JSON-RPC _meta:

identity = client.mcp.parse_server_identity_jsonrpc(
    meta_json=json.dumps(response.get("_meta", {}))
)

Server States

State Description Action
verified_principal DID verified, badge valid, origin matches Trust responses
declared_principal DID provided but not fully verified Prompt user
unverified_origin Origin doesn't match DID Reject or warn

Deny Reasons

Reason Description
badge_missing No credential provided
badge_invalid Signature verification failed
badge_expired Badge has expired
trust_insufficient Trust level below minimum
tool_not_allowed Tool not in allowlist
issuer_untrusted Badge issuer not trusted

SimpleGuardService — Authority Envelopes

Create and verify delegated authority chains per RFC-008.

CreateEnvelope

Create a root authority envelope delegating authority to another agent.

rpc CreateEnvelope(CreateEnvelopeRequest) returns (CreateEnvelopeResponse);

Request:

Field Type Required Description
key_id string Yes Key ID for signing
subject_did string Yes DID of the agent receiving authority
capability_class string Yes Dot-notation capability (e.g., tools.database.read)
delegation_depth_remaining int32 Yes How many further delegations allowed
issuer_badge_jti string No JTI of the issuer's badge
txn_id string No Transaction ID (auto-generated if empty)
expires_in_seconds int64 No TTL from now (default: 3600)
constraints_json string No JSON-serialized constraints object
subject_badge_jti string No JTI of the subject's badge
enforcement_mode_min string No Minimum enforcement mode

Response:

Field Type Description
envelope_jws string JWS Compact Serialization of the signed envelope
error_message string Error description (empty on success)

DeriveEnvelope

Derive a child authority envelope from a parent, with hash linking and narrowing validation.

rpc DeriveEnvelope(DeriveEnvelopeRequest) returns (DeriveEnvelopeResponse);

Request:

Field Type Required Description
key_id string Yes Key ID for signing
parent_envelope_jws string Yes Parent envelope JWS to derive from
subject_did string Yes DID of the next delegate
capability_class string Yes Must be equal or narrower than parent's
delegation_depth_remaining int32 Yes Must be less than parent's depth
issuer_badge_jti string No JTI of the child issuer's own badge
expires_in_seconds int64 No TTL from now (default: 1800)
constraints_json string No Must be equal or more restrictive than parent's
subject_badge_jti string No JTI of the subject's badge
enforcement_mode_min string No Cannot relax parent's mode

Response:

Field Type Description
envelope_jws string JWS Compact Serialization of the signed child envelope
error_message string Error description (empty on success)

Narrowing Violations

DeriveEnvelope returns an error if the child violates monotonic narrowing rules (capability, depth, constraints, or enforcement mode).

BuildTransportHeaders

Encode a delegation chain into HTTP transport headers for use in requests.

rpc BuildTransportHeaders(BuildTransportHeadersRequest) returns (BuildTransportHeadersResponse);

Request:

Field Type Required Description
chain string[] Yes Ordered list of envelope JWS strings [root, ..., leaf]
badge_map_json string No JSON object mapping DID → badge JWS for intermediate agents

Response:

Field Type Description
authority string Value for X-Capiscio-Authority header (leaf JWS)
authority_chain string Value for X-Capiscio-Authority-Chain header (base64url JSON array)
badge_map string Value for X-Capiscio-Badge-Map header (JSON object)
error_message string Error description (empty on success)

VerifyEnvelopeChain

Verify the integrity of an authority envelope chain.

rpc VerifyEnvelopeChain(VerifyEnvelopeChainRequest) returns (VerifyEnvelopeChainResponse);

Request:

Field Type Required Description
chain string[] Yes Ordered list of envelope JWS strings to verify
badge_map_json string No JSON object mapping DID → badge JWS for badge-issuer lookups
enforcement_mode string No Enforcement mode to apply during verification

Response:

Field Type Description
valid bool Whether the chain is valid
leaf_capability string Capability class of the leaf envelope
leaf_subject string Subject DID of the leaf envelope
chain_depth int32 Number of envelopes in the chain
error_message string Error description (empty when valid)
error_code string Machine-readable error code

Error Codes:

Code Description
ENVELOPE_SIGNATURE_INVALID Envelope signature verification failed
ENVELOPE_EXPIRED Envelope has expired
ENVELOPE_CHAIN_BROKEN Hash chain integrity failure
ENVELOPE_NARROWING_VIOLATION Child is wider than parent
ENVELOPE_DEPTH_EXCEEDED Delegation depth remaining is negative
ENVELOPE_CHAIN_TOO_DEEP Chain exceeds maximum allowed depth

Trust Levels

The TrustLevel enum maps to badge trust levels (RFC-002 §5):

Enum Value Level Name Description
TRUST_LEVEL_UNSPECIFIED - - Not specified
TRUST_LEVEL_SS 0 Self-Signed (SS) Self-signed (did:key), iss = sub
TRUST_LEVEL_REG 1 Registered (REG) Account registration with CapiscIO CA
TRUST_LEVEL_DV 2 Domain Validated (DV) DNS TXT or HTTP challenge
TRUST_LEVEL_OV 3 Organization Validated (OV) DV + legal entity verification
TRUST_LEVEL_EV 4 Extended Validated (EV) OV + manual security audit

Proto Files

The protobuf definitions are located at:

capiscio-core/proto/capiscio/v1/
├── badge.proto      # BadgeService
├── mcp.proto        # MCPService (RFC-006/007)
├── scoring.proto    # ScoringService
├── simpleguard.proto # SimpleGuardService (RFC-008)
├── common.proto     # Shared types
├── did.proto        # DID operations
├── registry.proto   # Registry operations
├── revocation.proto # Badge revocation
└── trust.proto      # Trust model types

Generating Client Code

# Install protoc and plugins
brew install protobuf
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

# Generate Go code
cd capiscio-core/proto
buf generate

Error Handling

gRPC errors use standard status codes:

Code Description
OK Success
INVALID_ARGUMENT Bad request parameters
NOT_FOUND Resource not found
UNAUTHENTICATED Missing or invalid credentials
PERMISSION_DENIED Insufficient permissions
INTERNAL Server error

Python example:

from grpc import RpcError, StatusCode

try:
    response = client.verify_badge(token=invalid_token)
except RpcError as e:
    if e.code() == StatusCode.INVALID_ARGUMENT:
        print(f"Invalid token: {e.details()}")
    elif e.code() == StatusCode.UNAUTHENTICATED:
        print("Authentication required")

See Also