Pub/Sub

Lightweight, dependency-free fake of Google Cloud Pub/Sub that speaks the real Pub/Sub v1 REST API (https://pubsub.googleapis.com/v1), so application code using @google-cloud/pubsub can run against it with zero cost and zero side effects.

KeyValue
Port4582
ProtocolPub/Sub v1 REST API (HTTP + JSON)
Compatible client@google-cloud/pubsub (v4)
Size~70 KB
Startup< 100ms
StateIn-memory, ephemeral, resettable

Quick Start

Start the server:

import { PubsubServer } from "./services/pubsub/src/server.js";

const server = new PubsubServer(4582);
await server.start();
// ... use it ...
await server.stop();

Connect with the real @google-cloud/pubsub client. The fake speaks the HTTP/1.1 REST transport (the google-gax fallback mode), so the client must be constructed with fallback: true and protocol: "http". Point it at the fake via the PUBSUB_EMULATOR_HOST environment variable:

export PUBSUB_EMULATOR_HOST=127.0.0.1:4582
import { PubSub } from "@google-cloud/pubsub";

const pubsub = new PubSub({
  projectId: "parlel",
  fallback: true,       // use the HTTP/1.1 REST transport instead of gRPC
  protocol: "http",     // talk plain HTTP to the local fake
  // Any credentials work — the fake does not verify them.
  credentials: {
    client_email: "parlel@parlel.iam.gserviceaccount.com",
    private_key: "<any valid PEM>",
  },
});

// Create a topic and a subscription.
const [topic] = await pubsub.createTopic("orders");
const [subscription] = await topic.createSubscription("orders-worker");

// Publish a message.
const messageId = await topic.publishMessage({
  data: Buffer.from("hello"),
  attributes: { tier: "gold" },
});

Pulling messages

The high-level subscription.on("message", ...) streaming API uses bidi gRPC StreamingPull, which is not available over the REST transport. Use the low-level synchronous Pull RPC instead (this is exactly what the real service exposes over REST):

import { v1 } from "@google-cloud/pubsub";

const subClient = new v1.SubscriberClient({
  projectId: "parlel",
  fallback: true,
  protocol: "http",
  apiEndpoint: "127.0.0.1", // low-level gapic clients need the host explicitly
  port: 4582,
  credentials: { client_email: "parlel@parlel.iam.gserviceaccount.com", private_key: "<PEM>" },
});

const subscriptionPath = subClient.subscriptionPath("parlel", "orders-worker");

const [response] = await subClient.pull({ subscription: subscriptionPath, maxMessages: 10 });
for (const received of response.receivedMessages) {
  console.log(Buffer.from(received.message.data, "base64").toString());
  await subClient.acknowledge({ subscription: subscriptionPath, ackIds: [received.ackId] });
}

Authentication

Google credentials and OAuth tokens are accepted but not verified (any syntactically valid credentials work). No network calls leave the process.

Internal (parlel) endpoints

These are not part of Pub/Sub; they exist to manage the fake.

MethodPathDescription
GET/_parlel/healthHealth check + resource counts
POST/_parlel/resetWipe all in-memory state
GET/_parlel/dumpDump topics/subscriptions/snapshots/schemas

You can also call server.reset() directly in process.

Implemented operations / endpoints

All 35 Pub/Sub v1 RPCs plus the 3 IAM RPCs are implemented.

Publisher (topics)

RPCHTTP
CreateTopicPUT /v1/{name=projects/*/topics/*}
UpdateTopicPATCH /v1/{topic.name=projects/*/topics/*}
GetTopicGET /v1/{topic=projects/*/topics/*}
ListTopicsGET /v1/{project=projects/*}/topics
ListTopicSubscriptionsGET /v1/{topic=projects/*/topics/*}/subscriptions
ListTopicSnapshotsGET /v1/{topic=projects/*/topics/*}/snapshots
DeleteTopicDELETE /v1/{topic=projects/*/topics/*}
PublishPOST /v1/{topic=projects/*/topics/*}:publish
DetachSubscriptionPOST /v1/{subscription=projects/*/subscriptions/*}:detach

Subscriber (subscriptions)

RPCHTTP
CreateSubscriptionPUT /v1/{name=projects/*/subscriptions/*}
GetSubscriptionGET /v1/{subscription=projects/*/subscriptions/*}
UpdateSubscriptionPATCH /v1/{subscription.name=projects/*/subscriptions/*}
ListSubscriptionsGET /v1/{project=projects/*}/subscriptions
DeleteSubscriptionDELETE /v1/{subscription=projects/*/subscriptions/*}
ModifyAckDeadlinePOST /v1/{subscription=...}:modifyAckDeadline
AcknowledgePOST /v1/{subscription=...}:acknowledge
PullPOST /v1/{subscription=...}:pull
ModifyPushConfigPOST /v1/{subscription=...}:modifyPushConfig
SeekPOST /v1/{subscription=...}:seek

Snapshots

RPCHTTP
CreateSnapshotPUT /v1/{name=projects/*/snapshots/*}
GetSnapshotGET /v1/{snapshot=projects/*/snapshots/*}
UpdateSnapshotPATCH /v1/{snapshot.name=projects/*/snapshots/*}
ListSnapshotsGET /v1/{project=projects/*}/snapshots
DeleteSnapshotDELETE /v1/{snapshot=projects/*/snapshots/*}

Schemas

RPCHTTP
CreateSchemaPOST /v1/{parent=projects/*}/schemas
GetSchemaGET /v1/{name=projects/*/schemas/*}
ListSchemasGET /v1/{parent=projects/*}/schemas
ListSchemaRevisionsGET /v1/{name=projects/*/schemas/*}:listRevisions
CommitSchemaPOST /v1/{name=projects/*/schemas/*}:commit
RollbackSchemaPOST /v1/{name=projects/*/schemas/*}:rollback
DeleteSchemaRevisionDELETE /v1/{name=projects/*/schemas/*}:deleteRevision
DeleteSchemaDELETE /v1/{name=projects/*/schemas/*}
ValidateSchemaPOST /v1/{parent=projects/*}/schemas:validate
ValidateMessagePOST /v1/{parent=projects/*}/schemas:validateMessage

IAM (google.iam.v1)

RPCHTTP
GetIamPolicyPOST /v1/{resource=**}:getIamPolicy
SetIamPolicyPOST /v1/{resource=**}:setIamPolicy
TestIamPermissionsPOST /v1/{resource=**}:testIamPermissions

Behavior notes

Surface coverage

This emulator faithfully replicates the API surface most application code and agents exercise. Anything below the supported lines is either an intentional design choice for a fast, zero-cost local emulator (✓ By design) or a candidate for a future release (⟳ Roadmap) — never a silent inaccuracy.

Legend: ✅ fully supported · ◐ accepted (stored, not strictly enforced) · ✓ by design · ⟳ on the roadmap.

FeatureStatus
Topic CRUD + list + update✅ Supported
Subscription CRUD + list + update✅ Supported
Publish (single + batch, attributes, ordering key)✅ Supported
Pull / Acknowledge / ModifyAckDeadline (lease + nack)✅ Supported
Push config (set via create/update/modifyPushConfig)✅ Stored (no actual HTTP push delivery)
Snapshots + Seek (by snapshot and by time)✅ Supported
Schemas (create/get/list/commit/rollback/revisions/validate)✅ Supported
ValidateMessage (JSON payloads)✅ Supported (JSON well-formedness)
IAM get/set/test policy✅ Supported (permissive: grants all)
DetachSubscription✅ Supported
Message filtering (filter evaluated at delivery)⚠️ Stored on the subscription, not enforced at pull time
Exactly-once delivery semantics⚠️ Flag stored; delivery is at-least-once
Ordering guarantees⚠️ orderingKey is stored & returned; strict per-key ordering is not enforced
Avro/protobuf payload schema enforcement⚠️ Structural validation only (JSON well-formedness / record shape)
StreamingPull (subscription.on("message"))⟳ Roadmap — Unsupported — bidi gRPC stream, not available over REST. Use Pull.
BigQuery / Cloud Storage subscriptions⚠️ Config stored; no actual export
Real push HTTP delivery to endpoints✓ By design — Not delivered

Error codes / shapes

Errors are returned in the standard Google REST error envelope:

{
  "error": {
    "code": 404,
    "message": "Topic not found: projects/parlel/topics/missing",
    "status": "NOT_FOUND"
  }
}

The @google-cloud/pubsub client (over the gax REST transport) decodes the canonical gRPC status code from the HTTP status.

ConditionHTTPgRPC code (as decoded by the client)
Invalid argument (bad name, bad ack deadline, empty message)400INVALID_ARGUMENT (3)
Resource not found (topic/subscription/snapshot/schema)404NOT_FOUND (5)
Duplicate create (topic/subscription/snapshot/schema already exists)412FAILED_PRECONDITION (9) †
Unimplemented verb501UNIMPLEMENTED (12)
Internal error500INTERNAL (13)

† The underlying service semantic is ALREADY_EXISTS (6). Over the REST fallback transport there is no HTTP status that decodes back to code 6, and HTTP 409 decodes to ABORTED — which the client's create-subscription retry policy would retry. The fake therefore surfaces create-conflicts as a non-retryable FAILED_PRECONDITION, so a duplicate create rejects immediately.

Resource naming rules

Topic / subscription / snapshot IDs must be 3–255 characters, start with a letter, contain only letters, digits, and -._~%+, and must not start with goog. These match the real Pub/Sub constraints.

<!-- parlel:testenv:start -->

Configuration — test.env

Copy these into your test.env (used by the bridge sidecar flow). Tokens are Parlel's seeded test credentials — any non-empty value is accepted by the emulator, so you rarely need to change them. Swap in real credentials only when pointing at the live service in prod.env.

PUBSUB_EMULATOR_HOST=parlel-bridge:4582
PUBSUB_PROJECT_ID=parlel
GOOGLE_CLOUD_PROJECT=parlel
GCLOUD_PROJECT=parlel
<!-- parlel:testenv:end -->