Events, Pub/Sub and Systems Research Challenges at Google

These are notes taken by Michael Olson and Fredrik Heintz (Michael had the most detailed notes so I (Fredrik) just added a few extra comments).

Q: One of the challenges with event processing is efficiency. If you have one publisher publishing a variety of stuff in one channel and it's forwarded on to ten subscribers and they all want the same filter applied, does that filter get applied at the last broker in the chain or pushed back to the source?

A: Pushed back to the source. A logical OR is performed on the filters. There is a limit on the total length of the expression that will be sent upstream. Anything longer is sent downstream.

Q: Can you tell us more about the topology of the brokers.

A: The first level of the broker topology is always the cluster. The clients, publishers or subscribers, first connect within the cluster. Networking is great inside a cluster, so we don't want to have conections across cluster boundaries because they traverse additional network gear. The level of connectivity depend on what the cluster topology is. If I'm a publisher in cluster A, and I have cluster B nearby, with subscribers in that cluster, and a cluster C closer to B than A, then I have a connection from A to B and then the route would compute that B is closer to C, so there is a leg from B to C. There would also be a link from A to C directly for redundancy. In a larger graph, redundancy can be achieved without the direct link.

Q: So this isn't a tree, but a mesh on which you do routing.

A: Not really, but it depends on the underlying network topology. The same physical link shouldn't carry the same message twice. At least two path disjoint trees must be formed. The routing graph is based on network distance, which determines how links are added to the tree.

Q: Are routes predetermined?

A: Routing trees are calculated on the fly. If we have an origin A, and all these destinations that I have to reach, we pick the cheapest link that we can add to extend the diameter of the tree. Similar to shortest-path coverage.

Back in my space-suit.

Q: If the underlying network is totally connected do you send information to everyone?

A: Previously everyone could talk to everybody else. The problem is then that the same message is sent several times over the same network. This works for LANs but not that well for scarce network links.

Q: How do you define the costs of links?

A: It's rather simplistic; within a cluster, the cost is 1. With every additional layer in the network hierarchy, it's an order of magnitude higher (1, 10, 100). For each layer, you get an order of magnitude more machines, but not an order of magnitude more network bandwidth.

Q: What are some of the challenges in guaranteeing FIFO order?

A: The reason why it's hard is that in order to guarantee it, you have to reorder messages. One of the forwarders/brokers gets multiple message streams coming into it, with one FIFO stream being sent to the subscriber. Each flow has a sequence space. When data gets interrupted, it gets logged to disk; data is written to disk in the order in which it arrived. Links fail asynchronously. If a link with 101-200 fails, they are written to the log. If the already written messages to the subscriber, 1-100, are on a link which, subsequent to 101-200 being written to the log, fails 1-100 are appended to the log out of order. Sorting is not done at the broker needs because it's such a heavyweight operation. We're not sorting at the broker, so rearrangement of messages is done at the subscriber. Megabytes worth of data are being sent while subscriber client is trying to close the 1-100 gap, and no data is being delivered to the application. Initial design assumption was that data should move so quickly that an insert operation can't be done on disk (since you might have to rewrite the pages on disk). There is no guarantee that an out-of-order beginning will be found by the client. There is a stabilization time before the system guarantees FIFO order. FIFO could be easily done using a random access data structure, but it is too expensive.

Q: Could you have multiple logs?

A: There is a log per topic. We don't log data that we've sent to the network right away. Data is sent to the network and a timer queue; if the ACK arrives before the timer goes off, the timer is canceled and no write to the disk is made. If the timer fires, a write to disk is made and it goes into the queue.

Q: Can you send just that data to a different log?

A: We try to avoid having too many threads contending for the same locks. There are already two threads; the incoming thread and the thread that has the timer queue. In the best case there is only one thread. These are high performance optimizations.

Q: You don't care about out of order message?

A: We do enforce FIFO, but at the subscriber. It's bounded by the amount of memory at the subscriber. With enough memory, nothing should ever be out of order.

Q: What about in the broker network?

A: We try to process everything in order, but we won't create delay to do so. There are flag settings that a user can use that says that if there is every something out of order at the subscriber, then crash.

Q: Sometimes we need order to do CEP?

A: No processing is done within the broker network.

Q: Why not make the brokers more intelligent?

A: The brokers are running on our resources; it's a shared resource, so if people give us arbitrary code to run, we have no way to verify if the code is sound, how much memory it consumes, etc, so we're at risk that one of our clients giving us complex code to run will really hurt the performance of our brokers and our throughput.

Q: Could there be a predefined set of services inside the brokers?

A: Why not just write it as a second layer, apply the transformation, and resend the message through the network. Why push it into the dissemination structure?

Q: If you have common services, things are processed only in one place. Many people can use the output from the broker.

A: Let's say F is a function you want to compute. Why not implement F as a subscriber. You compute F off of the stream that you receive, and republish it. Our setup is basically that we publish the sensor readings and let developers choose what to do with the readings. There was a paper about pushing functionality into sensor networks, but we don't want to expose ourselves, as the owner of the distribution network, to running all this code and logic that interprets all of this code. We made a conscious choice not to do anything that looks like active networks.

Q: If you consider this as the base layer, do you have another layer within Google that works on this?

A: There are other groups in Google that utilize this process. They work on another layer that runs on top of this layer. They have their own notion of what a subscription means. They are subscribers to our system, and they publish something more sophisticated in terms of their own logic.

Q: Given that there's lots of standards, why go out and build your own?

A: It was faster to write this than to read all of the standards. For many use cases we could have used one of the standard implementations; it would, however, have required us to do a lot more work. We run all of our software inside of a giant job scheduler. In order to use something from somewhere else, we would have had to wrap it to be able to be taken off of one machine and moved to another. Some systems depend on a backend database, which gets even messier (not just messaging component has to be moved, but database too). This results in lots of glue code, and we still can't ensure that it's safe to run that way. Those are practical reasons. Many API specifications were read, but most were found to be too cumbersome. Easy things should be easy was our guiding principle, and it didn't seem that it would be easy to do things with other people's APIs. 15 lines of code instead of 2-3 lines. The more code lines that are required, the easier it is to make a mistake, and the less adoption we'd see.

Q: Let's say I'm a bank and I have the same problem that you have. What should I do? What messaging spec should I use?

A: There are vendors here, so I'm not going to recommend one over another. What I can say is that rolling your own is always harder than you think. We expected 6-12 months to finish it. It took 12 months before first release, then more time to make the code stable. In spite of that, it's not a tiny effort to roll your own, but it's not a gigantic effort either. If you have specialized needs, it is definitely feasible to roll your own infrastructure. You can certainly use some of the messaging code infrastructure pieces, and you can cut corners depending on what workloads you expect to see. Should there be something better for the public? Definitely. There should be more powerful reuseable components. If there had been something like that, we probably would have used it.

Q: What is the recommendation for improving the standards? How could they move closer to what would have been reusable.

A: Get rid of all the crud. Standardize on something simple, with a very simple API. Unix you can argue is not the best API, but it's quite simple and concise. Standards have the problem of volumes of documentation and all of this "stuff" that you have to buy into. Why do you have to say this in a complicated way? Get rid of all the excess and layer the standard so you don't have to buy into the way that you find and locate your broker network, maybe just have a standard for forwarding. As soon as you add one feature, you end up bloating the standard because everyone has to support it. You start reading all of the documentation and it gets confusing. Total order sounds great, but then half a year later you realize everything is running too slow.

Q: The broker server is a logical server. If I publish, how can I find the best available broker to publish my messages?

A: In the current system you got to the directory, and it assigns you to someone.

Q: How do you store predicates and subscriptions at the brokers, and do you generate false positives?

A: No, we don't generate false positives. We have very simple filters: equality, prefix, hash values, and so on. While delivery is going, a client may have crashed; what happens if it comes back with changed filters? We have dynamic filters and filters change on the fly. When we're replaying messages, we apply the current filter, not the old filter. It's the closest to a false positive that we have. When you change filters dynamically, you can not have the same guarantees as when you start a subscription. The guarantee that you see all of the messages goes out of the window, because there is an interval between changes of filters when you do not have this guarantee.

Q: Are your consumers asking for additional features, and if so what? Do they want more expressive features?

A: Some people have asked for more expressive filters. Early on they asked for different kinds of filters, but that has mostly died down. People have accepted what is available; there is a range of things that the infrastructure provides, and they have to build everything else. What they still ask about is the delays that we inject to make this scale. Sometimes there's a delay starting a client back up and resynchronizing these streams. They just want things to be crisper. One feature request was the ability to load balance across multiple clusters; when multiple clusters subscribe to the same data, they will all get the message. So even if the subscribers are part of the same group, a logical subscriber cannot extend across cluster boundaries. The rationale for that original choice was that it wasn't clear what would happen if you only get partial data in a cluster; what do you do with that partial data? How do you reconstruct your full state? If this is an update propagation mechanism, you have incomplete state in the clusters. How useful would that be? Now, however, there's been progress in the storage systems and people know how to do cross-cluster storage updates. You can write to a global cloud-like storage backend that is accessible from all of the machines, so sending to one out of many clusters is not that far off. We could do this, and it would make some sense, but it requires changes in our internal logic because as the message propagates, it gets sent to a cluster, and it's persistent in a cluster. If we do load balancing across many clusters, then instead of replicating n1, we load balance and n1 goes to cluster 1, n2 to cluster 2, n3 to cluster 3, etc. But if cluster 1 fails, the messages to cluster 1 are lost. In order for the scenario to be supported fully, the storage has to be centralized to be durable in the face of n1 dropping off.

Q: Since you have multiple users sharing the overlay, how do you insure that a resource intensive user doesn't affect other users? i.e. How do you provide resource isolation?

A: If the user = tom, then map this user to resource partition video. The partition video has been configured to allow 100 messages per second, 10 megabytes per second, and a maximum of 50 megabytes of RAM. If additional messages beyond this limit are received, then we stop reading from the incoming link. You only add the socket back to the watch list at the point when resources are released and you fall under the limits again. The system internally doesn't have any magic to isolate things, the isolation comes from partitioning the resources inside of the system. Initially we were quite lazy and let things go as a free for all, but when the system got larger and the users were more unequal, then boundaries had to be placed. The reservation is work-conserving, in that if no one is contending for the resources then we will take more work from users that have already exceeded their limit. If your resources are exhausted lower in the hierarchy, you can move up in the hierarchy and borrow from the root. If all the resources at the root are used up, then you'll have to be quiet until resources are freed up. The isolation comes through partitioning. Use moving average to compute msg/s, MB/s.

Q: Are your brokers CPU or network limited?

A: I/O limited; they're neither CPU or network limited, but rather we bound the load on them based on what we know the I/O subsystem can provide. We don't try to service more work than we know the underlying I/O system can provide. It's more likely to break because of CPU than network resources.

Q: Load balancing; you talked yesterday that you used fingerprints and hashes of keyworks to load balance. How can you avoid the hard points of overloaded storage.

A: We can't avoid it in that case. If we feel the problem, the user will feel it even more. We're just a dumb forwarder with no logic, so the subscriber will fail before we do. We have no internal logic to define hotspots.

Q: You talked yesterday that databases are very slow; however, there are still kinds of data that must be stored in databases. Which kind of data do you store in databases?

A: In Google we have a database system that is not terribly slow, it's rather fast: BigTable. You can store a lot of data in BigTable, and you can do that very quickly. You add an additional layer of complexity, but overall it's very fast. We store almost everything in (BigTable) databases somewhere. A common use case of our system is to connect databases. Most data is persistent, was persistent or will be persistent. The publisher has a database with information which the subscriber only wants some of, or wants it in a different format. The subscriber uses that result and store it in a different format. Therefore we do not need to store the data.

Q: What about providing exactly once? I remember that it's a big challenge; what are the challenges?

A: Exactly once gets you into this problem where you have to have a transaction with the client library; the client library's code that we don't own. The ACK is not just an ACK, it's in the context of a transaction. The subscriber has to have a log that persists across invocation of what messages he's already acknowledged. The broker himself cannot make the choice. If the message fails while en route to the subscriber, we can replay. If it fails at the subscriber, it depends (i.e. what code has been executed so far matters). If it fails on the way back to the broker, we cannot replay. This is invisible from the pubsub system itself. The commit logic must be a commit with the application logic. To include this in the messaging service we force the application to buy in to a larger framework. There wasn't an easy transparent way of putting in the logic to include the application transaction.

Q: It looks like from your experience that customers actually want less than you'd initially planned, that is they use less filters than you provide. Going forward, you've got a subset of initial goals, and the customers are now used to not having FIFO. They have different concerns, for instance the startup concerns. If you were to start again, what's the major change?

A: At the conceptual level, the big change is the shifting of the storage responsibility. Right now the pub/sub is responsible; the publisher sends a message, and the ACK that he receives tells him that we've stored the message and we will deliver it. The new meaning is that the ACK means that if you still have the data available in some way, we can guarantee that we will deliver it. That is, we will replay the message if it is in our caches, and if it is not there, we will go to the publisher to re-retrieve the message; if it's not available, then delivery of that message fails. Now things like message timeouts become the responsibility of the producer, since that's where we'll go to get the message to try and deliver it later. We no longer care what the timeout period is since we're not storing the message. It's the biggest change at the conceptual level, and at the operational level you don't have to apply for quotas. The system also degrades more gracefully; if components start failing, you can always revert to the source.

Q: The publishers API has changed, because the publisher has to provide the ability to replay a message.

A: Yes.

Q: Different applications have different requirements from the messaging system, i.e. prioritizing on throughput or latency. Is this the case, and does the messaging system enable this kind of configuration?

A: Right now the system is tuned for high performance. We've relaxed the configuration to be more and more for low maintenance. We provide special use cases, so some people want queueing logic, that is they pull items off the queue. They don't want the push logic, but rather pull. We ran it with a different configuration since no ACKing is the normal case with queueing system. That requires different code paths. With testing, it started getting nightmarish to test all of these different code paths. You can optimize the system for different use cases, but I recommend avoiding it. Don't get into the configuration. Look at the use case, look at what the use case requires. It's easier to generalize it later than to implement so many flags that you don't know what your code is doing anymore.

Q: Do you have any client applications for which persistence or fault tolerance is critical?

A: Yes, many customers say that it's critical for them. There are some applications that get data that relates to click fraud; there are robots that click on links, and we don't want to charge our customers for anything that's not legitimate. It's in the application pipeline, and a lost message could mean, for instance, that a document is lost. You want very high reliability.

Q: Since the publish API is going to change in the next version, how do you transition since you can't guarantee compatibility.

A: The old API will remain valid, but it will defer to the new APIs implementation of some of these functions. Even though it's a stripped down API function in some ways, you can still express most of the things you did. The functions that aren't 100% same in the revised API will be emulated on the old system.

Q: An old application will still work with the new version?

A: That's the intent. There's intermediate code to handle the change in where storage happens.

Events, Pub/Sub and Systems Rsearch Challenges at Google (last edited 2009-09-15 21:49:12 by localhost)