Architecture is an Internet connected service. exposes HTTP and SSH service endpoints all running on standard ports. Connections to these services connect to a load balancer.

HTTP requests to port 80 are always HTTP 301 redirected to and come back in via port 443.

HTTPS connections to port 443 are routed to a pool of read-only mirrors. We call these the hgweb servers.

(Server names are representative.)

Each server is identically configured and behaves like the others. HTTP connections/requests are routed to any available hgweb server.

SSH connections to port 22 are routed to a single primary server. There is a warm standby for the primary server that is made active should the primary server fail. We collectively refer to these as the hgssh servers.

SSH connections use public key authentication. User access control and storage of SSH public keys is stored in an LDAP server.

An authenticated SSH connection spawns the script from the version-control-tools repository. This script performs additional checking of the incoming request before eventually spawning an hg serve process, which allows the Mercurial client to communicate with a repository on the server.

The hgssh servers hold the canonical repository data on a network appliance exporting a mountable read-write filesystem.

All repository write operations are performed via SSH and are handled by the primary hgssh server.

Various Mercurial hooks and extensions run on the hgssh server when repository events occur. Some of these verify incoming changes are acceptable and reject them if not. Others perform actions in reaction to the incoming change.

In terms of service architecture, the most important action taken in reaction to pushes is writing events into the replication system.

Repository Replication Mechanism

The hgweb and hgssh servers comprising the service run a Kafka cluster. Kafka is a distributed message service. It allows you to publish and consume an ordered stream of messages with robust consistency and durability guarantees.

The cluster consists of multiple network services all logically behaving as a single service.

When a repository event occurs, we publish a message into Kafka describing that change.

For example, when a repository is created:

When a new push occurs:

The published message contains the repository name and details about the change that occurred.

We treat publishing to Kafka as a transaction: if we cannot write messages to Kafka, the current repository change operation is prevented or rolled back.

At a lower level, messages are written into 1 of 8 partitions in the pushdata Kafka topic.

Each repository deterministically writes to the same partition via a name-based routing mechanism plus hashing. For example, the foo repository may always write to partition 1 but the bar repository may always write to partition 6.

Messages published to Kafka topics/partitions are ordered: if message A is published before message B, consumers will always see A before B.

On each hgweb server, we run a vcsreplicator-consumer daemon process bound to each partition in the pushdata Kafka topic. These processes essentially monitor each partition in the topic for new messages.

When a new message is written to the partition, the consumer reacts to that message, typically by invoking an hg process to complete the action.

Sometimes the replicated data is too large to fit in a Kafka message. In that case, the consumer will connect to the hgssh server to obtain data.

Consumers react to new messages within milliseconds of the message being written. And the same activity is occurring on each hgweb server independently.

When the consumer successfully performs action in reaction to a received message, it tells Kafka. Once a consumer has acknowledged that it processed a message, that message will never be delivered to that consumer again.

Consumers typically fully process a repository event / message within a few seconds. Events corresponding to big changes (such as cloning a repository, large pushes, etc) can take longer - sometimes minutes.

We rely on repository change messages to have deterministic side-effects. i.e. independent consumers starting in the same state that apply the same stream of messages should end up in an identical state.

Consumers only process a single message per topic+partition simultaneously. This is to ensure ordering of messages is adhered to and that no message is successfully processed more than once (Kafka records message delivery by recording the consumer’s message offset within a logical queue).

This all means that there can be a lag and backlog of messages after the hgssh server produces a Kafka message and the hgweb servers apply it. In addition, there is a window where each hgweb server may have slightly different state of a repository, since each hgweb server will consume and apply messages at different rates.

See Replication for more on this replication system.

Aggregated Push Data and Notifications

The primary hgssh server runs a service that monitors the consumer state of the replication service on all hgweb consumers. It essentially repeatedly polls Kafka, asking it for the topic+partition offsets for all known hgweb topic consumers.

When all hgweb consumers have acknowledged that they’ve processed a message, this service re-publishes that fully consumed message in the pushdataaggregator Kafka topic.

The stream of messages in the pushdataaggregator Kafka topic represents all fully replicated repository changes available on the hgweb servers.

Various services on the active primary hgssh server consume this aggregate topic and do things with the messages. One consumer notifies Pulse of repository changes. Another sends messages to AWS SNS.

See Change Notifications for more.

From there, various other services not part of the infrastructure react to events. For example, pulsebot creates IRC notifications, Taskcluster schedules Firefox CI, and Treeherder records the push.

HTTP Repository Serving

Nearly every consumer of consumes the service via HTTP: only pushes should be using SSH.

The HTTP service is pretty standard for a Python-based service: there’s an HTTP server running on each hgweb server that converts the requests to WSGI and sends them to a Python process running Mercurial’s built-in hgweb server. hgweb handles processing the request and generating a response.

hgweb serves mixed content types (HTML, JSON, etc) for web browsers and other agents. In addition, hgweb also services Mercurial’s custom wire protocol for communicating with Mercurial clients.

When a client executes an hg command that needs to talk to, the client process establishes an HTTP connection with and issues commands to a repository there via HTTP. Run hg help internals.wireproto for details of how this works.

Clone Offload

In order to alleviate server-side CPU and network load, frequently accessed repositories on use Mercurial’s clone bundles feature so hg clone operations download most repository data from a pre-generated static bundle file hosted on a scalable HTTP server.

Most Mercurial clients will fetch a bundle from the CloudFront CDN.

If the client can’t connect to CloudFront (requires SNI) or if the client is connecting from an AWS IP belonging to an AWS region where we have an S3 bucket containing repository data, the client will connect to S3 instead.

Offloading the bulk of expensive hg clone operations to pre-generated files hosted on highly scalable services results in faster clones for clients and drastically reduces the server requirements for the service.

See Cloning from Pre-Generated Bundles and the Cloning from S3 blog post for more on clone bundles.