We’re live-blogging MongoSV today. This is the last post, but here’s a link to all of the posts from the event.
This talk is being given by Eliot (the first of his that I’ve seen all day!)
Going to go over HA best practices: keeping data online and safe.
What about a single node? This will have downtime. If the node crashes, intervention might be necessary. If it disappears will need a backup.
Replica set v1: Single datacenter, single switch, single power source. But automatically recovers from a single node crash. A good start but not great.
The next step up is still single datacenter, but w/ multiple power/network zones. Like EC2 in a single region but multiple AZs. Still some points of failure (datacenter / two node failure). With an arbiter, we can’t do w=2 writes and remain up. With 3 non-arbiters we can use w=2 but are still vulnerable to datacenter failure.
The next step up is multi datacenter w/ a single DR (disaster recovery) node in a different DC. We can’t always stay up, but we at least have a DR option now.
Now let’s look at the ideal: three datacenters, five nodes. One has a single delayed slave (that helps recover from fat-finger incidents like accidental db.drop). The other two DCs each have 2 active nodes. We can lose an entire DC and still have a majority w/ the other two DCs. Can do `w={dc: 2}` to guarantee write in 2 DCs.
Moving on to HA sharding
Each shard needs to be a replica set - same rules apply as above. Balancing can be run in a window (this is cool!) can set an activeWindow to only run the balancer at night. Sweet!
Config servers need to be on at least 2 different power/network zones. Ideally just put in three separate DCs. Use host names rather than IP addresses: much easier to move a config server. Take backups of config servers. Important note (saw this earlier too in Richard’s talk): not a replica set. To bring new nodes online you need to manually move the data. Not a problem if a config server is down for a day or something: just won’t do splits/migrates.
Run one mongos per app-server. Don’t need to worry about scaling mongos. Saves a network hop for many ops. If you really don’t like this, run a pool per power region with a load-balancer in front.
Application-level tips:
Handle spikes: queue non-synchronous writes, isolate components and features. Can your site handle going into a read-only mode? That helps a lot when dealing w/ issues.
Monitor! Load, disk, CPU, but most importantly I/O (iostat). Alerts go hand in hand w/ monitoring.
Have good procedures for backups, adding replica set members, adding shards, etc. Practice (in staging) (which ought to be the same as prod). Randomly shut down boxes & load test as much as possible.
Recap
That’s all for today. Another great event by 10gen; I think my favorite talks of the day were Kyle’s and Richard’s, but all of them were great. Hope you enjoyed the blog posts!
We’re live-blogging from MongoSV today. Here’s a link to the entire series of posts.
Presented by Greg Brockman from Stripe
The actual title of this talk is “There’s a Moster in My Closest”, but I thought the subtitle would be more elucidating. This talk is packed! Actually, all of the talks so far today have been pretty packed - great crowd here.
Monster is the name of the event processing system Greg built for Stripe. Been using it in production for a few months now, and it’s built on top of MongoDB. The concept of event processing is that you want to glean some information from lots of real-time events that are happening (incremental stats, real time analytics, trending topics, etc.). Stripe uses it for fraud detection, dashboards, and more. Now we’re going to get a live demo!
He’s showing a blog-post generator that he’s written, going to use Monster to monitor the content of the posts that it’s spitting out. Live coding a “model”, which looks like sort of a quanta of reporting. Logging a new event per-sentence that gets generated. Now we need a consumer to actually do something with the events. The consumer gets streamed events and just needs to “do something”. Doesn’t worry about storage, generation, etc. Registers for classes of events and has a `consume()` method. Pretty simple, but flexible. Consumer is logging when generated sentences are “too long”.
Question: Monster vs celery/beanstalkd/resque? Answer: when using a job queue the act of logging implies an “action”/job. With Monster/event queuing the goal is to totally decouple logging from performing actions on logs. Can add new consumers later, etc. Events persist, not ephemeral.
Consumer uses polling to get new events.
Now we’re hearing why they chose MongoDB. Replica sets are a major reason, for HA. They also wanted a document store: easy to use, so developers will all use it. They need atomic operations (talking about things like findAndModify). Seems like a lot of the talks today have been mentioning findAndModify. They like automatic collection creation, from a deployment perspective. No migrations, etc. Finally, background index building is really important for Stripe. Can create new indexes w/o compromising availability.
Tradeoff: no transactions. This is the one thing they’d really like for Monster (mainly for DR). The particular case that they need it for is what they call a Stateful Consumer - can modify the state of an event while consuming it. They basically build transactions at the application layer here.
Like the previous talk, they aren’t using capped collections. They don’t expire old events. They also aren’t using sharding (these are in response to audience questions again). Environment is a 3-node replica set on AWS (large instances). Not using EBS except for on one of the secondaries.
We’re live-blogging from MongoSV today. Here’s a link to the entire series of posts.
Presented by Luke Gotszling
About.me uses MongoDB for different pieces of infrastructure, but this talk is just about queuing.
Originally ran a 3-node RabbitMQ cluster, without disk persistence. Were having trouble diagnosing issues at scale. Looked at some other AMQP options, but decided on MongoDB.
Benefits: async ops, per-message (document) atomicity, batch processing, periodic processing, durability, sharding, operational familiarity (n.b. that would be the big one for me!). One drawback: AMQP push model needs to be emulated with MongoDB polling. To model topic matching, they’re using a regex. One thing they don’t (can’t) do with Mongo: fanout.
Use a capped collection? It has better performance but is limited to a single node and FIFO. They use an uncapped collection: can shard. Can get semi-FIFO but not strict.
Implementation:
Each message is a document. To create a message, just insert. The document has a queue field (string id) and a payload (serialized data).
To consume a message they use a findAndModify to grab and remove a document atomically. They index on (queue, _id).
That’s pretty much it! This would be pretty simple to implement in any language (he’s showing an example in the shell + in Python).
Benchmarks they ran showed MongoDB outperforming RabbitMQ for message creation by 19% (this is a single-node benchmark on a laptop, FYI). For consumption MongoDB again does very well (outperforming RabbitMQ for different levels of concurrency).
FindAndModify is blocking, so you will see high lock % w/ lots of concurrent consumers.
Pros and Cons
Pro: familiar, sharding, durability/persistence, low operational overhead, optional use of advanced queries.
Cons: Not AMQP, needs to poll, performance depends on polling frequency + concurrency, fewer libraries available (for Python there’s a library called Kombu), locking for findAndModify.
Presented by Richard Kreuter
We’re live-blogging from MongoSV today. Here’s a link to the entire series of posts.
These are cautionary tales, but also an opportunity for schadenfreude. All true stories.
1. The first example is of a user who ran w/o journaling. Important to note that journaling isn’t a panacea: OSes can have bugs, hardware can fail, etc. Journaling is good, but replication is even more good.
So. No journaling. All of the mongod processes were in the same data center. Lost power and everything shut down. When things came up they needed to run `repair`. (This is like fsck for MongoDB). They were warned that it would take a long time (about 12 hrs in their case). They decided to go back into production w/o repairing to avoid the 12 hr wait.
After a few days, things weren’t happy. Mongod was stable but some cursors were aborting when they saw corruption (they were seeing “Invalid BSON” in the log files). They didn’t notice the log file problem but just saw queries aborting. One of the cursors that was aborting was the replication cursor. The corruption was such that mongod could run, but queries were failing after a certain corruption point.
Note: this isn’t guaranteed operational behavior, just the way this particular corruption played out. Ultimately, the fix was to take the system offline and repair. It fixed the problem but took 12 hrs.
tl;dr Use journaling unless you’re absolutely sure you don’t need it.
2. A user decided to shut down a config server (in a sharded deployment) and delete all of it’s data files. The config server is one of three independent servers that use 2PC to maintain identical copies of their data.
Once the config server was down (because 2 were still up) reads and writes still worked but data rebalancing stopped. At some point they brought down the second and third config servers as well. At the time they had some mongos servers running that probably had config server data cached, but that data isn’t persisted. They had two options:
1. Dump each shard’s data. Set up a new cluster and re-load all of the data.
2. Analyze data on each shard to figure out what the actual ranges of data were on each shard. That could reconstruct the data that was lost. This is a very tricky/hard program to write.
Either of these approaches could lead to a problem because sometimes there are multiple copies of data across shards (during migrations from one shard to another, after an aborted migration, etc.). This is normally the job of mongos/config servers. They ended up having to resolve those conflicts manually.
tl;dr Don’t delete your config servers!
3. User was seeing replication lagging and write ops taking forever on the primary. Application was basically unresponsive. Primary was showing large #s of page faults and a high lock % (time spent holding the write lock).
As it turned out, 100% of their write load consisted of pushing elements onto lists. Number of pushes grew with the square of the number of users. They also didn’t have indexes on userId. So 100% of writes and reads were table scans, and each write action was N^2. This load was saturating the primary, so secondaries couldn’t keep up.
Adding an index fixed immediate issues, but they needed schema/application logic changes for long-term fix.
tl;dr Pay attention to your indexes!
4. A user deployed but vastly underestimated their uptake. Needed to shard within 3 hours of launch.
Going from N shards to N+1 shards is easier for large N. In any case, it’s hard if you’re already overloaded - can’t migrate data. Luckily, they hada few collections, each equally written to.
All they needed to do was set up some replica sets and dump out specific collections to move them off. Then they set up config servers manually. Once they had headroom, they added shards and used a more conventional sharding setup.
tl;dr Get reasonable performance measurements in advance, and a reasonable set of requirements.Need to do capacity planning in advance or over-provision: too hard to add capacity when it’s absolutely needed.
Presented by Kyle Banker
We’re live-blogging from MongoSV today. Here’s a link to the entire series of posts.
Kyle’s strategy is to start with a normalized representation and then embed for simplicity and optimization. This reminds me of our data-modeling post.
1. Hierarchies
Two strategies; first is to store the canonical hierarchy in a single document. That allows atomic updates of the hierarchy, but can make other ops difficult. The second is to store a list of ancestors in each document; that’s the one we’ll talk about. Each node has a parent_id with the immediate parent. Each also has an array of ancestors. It’s easy to display a single node, but it’s also easy to display all descendants of a node (by looking inside the ancestors array). What about updates?
There’s a new helper to compute ancestors for a node and overwrite the ancestors array (in case we insert in the middle of the hierarchy). When we do an insert in the middle we’ll need to run the helper on each descendant, since they all have a new ancestor.
tl;dr use arrays and multi-key indexes (and the positional operator) to deal with hierarchies.
2. Ticketing
What about ticketing, don’t you need transactions? How can you do transactions w/o RDBMS style transactions? Distributed systems, long-running transactions, and contentious environments might require different strategies (see paper: your coffee shop doesn’t use two-phase commit). Let’s see an example:
Two collections. One is a map of available seats for an event. The second has one document for each seat, with a state (available or not) and price.
Use find-and-modify as a TAS operation to update the state of seats to “Cart”. If it succeeds all of the seats then we’re good to go. If not, we can manually roll-back state. Basically do each state-change manually and be prepared to roll-back if needed.
This isn’t right for everything but it works for some.
3. Feed reader example
Four collections: users, feeds, entries and buckets.
Users have username and an array of feeds. Each feed in that array is a document with an ID and denormalized name. We’ll index on username.
Feeds collection each has a URL, name, subscriber count (denormalizing here again) and date of last entry.
To add a feed, do an upsert to insert if missing or increment subscriber count otherwise (good use of upsert). Also need to do an $addToSet on the user to add the feed to the feeds array.
Removing means a $pull from the user’s feed array and a $inc of -1 on the feed’s subscriber count.
Entries documents each have content for a single entry, with ID of feed it belongs to.
We need a way to show an individual user’s feed: that query can be expensive if done naively. That’s what bucketing is for: only query for latest entries and then store them in a bucket.
A bucket has a user_id, timestamp, and list of entries. We can just query for buckets to get a users feed. Again, we’re selectively denormalizing here to get good locality and performance. Use rich documents for caching. This gives good sharded locality, too.
Presented by Kenny Gorman
We’re live-blogging from MongoSV today. Here’s a link to the entire series of posts.
Kenny is getting started, talking about performance tuning based on experience at Shutterfly. They have 8 MongoDB clusters in production with ~30 servers. Not cloud based: all own hardware and datacenters.
MongoDB performance tuning is similar to traditional RDBMS tuning. Looking at queries, indexes, etc. If performance isn’t good on a single server than don’t look to sharding, reading from replicas, etc. Single server performance is critical.
Modeling is key. Schema design can be really important for performance (recommends talks later on by Eliot & Kyle).
Know when to stop tuning: prioritize what is important/adequate for the business/application. What needs to be fast? Build tuning into dev. lifecycle, don’t wait until there’s an issue. Tuning is “personal”: need to know your problem/domain.
MongoDB is really fast when read only, writes start to impact performance. Important consideration during design phase.
The profiler. Writes to db.system.profile collection. Recommendation is to turn it on and leave it on: low overhead. Look for full scans (nreturned vs nscanned) and updates (ideally you want fastmod - in place updates. Look for moved & key updates).
Should graph response times over time (from the system.profile collection). Shows performance over time of db. To look at the profiling data just do `show profile` from the shell.
Showing examples of data from the profiler: here’s an example where nscanned is 10000 and nreturned is 1: we need an index! Another example where need to move the document due to an update (keyword “moved” in the profile doc.). Now showing an example using $inc - you’ll see “fastmod” in the profile document - that’s good!
Now talking about explain(). Use during development, don’t wait. This actually runs the query when you call it. When you find a bad op using the profiler, run explain on it to get more info: shows index usage, yields, covered indexes, nscanned vs nreturned. Another recommendation: run explain() twice to see difference when data is in memory. Showing the difference between a query w/ and w/o an index in terms of explain.
Now talking about covered indexes: need to do a projection that says we don’t need _id: `db.test.find({userid: 10}, {_id: 0, userid: 1})`. When you don’t need _id it’s possible to respond to the query using the index only.
Architecture tips: split on functional areas first to different replica set clusters, then worry about sharding those (possibly). Do reads off of slaves when you can, but be sure your app can handle inconsistent reads first. Also, use slaves for maintenance (index compaction, etc.). Move reports & backups to slaves, too. One mongod instance per machine: keeps things simple for introspection.
Emphasizing the importance of minimizing writes.
Now we’re talking about data locality. When you’re doing a query it’s best if the results are as dense as possible (as few blocks on disk). How do you maintain this? Here’s an example of how to see this: need to include `$diskLoc` in your query document, and finish with a `.showDiskLoc()` (analogous to `.explain()`).
Total performance is a function of write performance. Keep an eye on lock % and queue size: how much is the DB waiting for writes. A trick (for pre 2.0 when data > RAM) is to do read before write: spend more time in read lock rather than write lock. Tune for fastmod’s: reduce moves (maybe by pre-padding documents). Evaluate indexes for key changes, minimize # of indexes if unused. Look for places to do inserts instead of updates.
What about scaling reads? They scale easily if writes are tuned. Identify reads that can be performed on slaves. Make sure you have enough RAM for indexes - can check the mongostat “faults” column for cache misses. Minimize I/O per query (back to data locality).
Tools: mongostat (look for faults & lock % / queue lengeth). currentOp() to see what’s waiting. mtop to get a picture of current session level information. iostat to see how much physical I/O is going on. Do load testing before going live. Use MMS (or some other monitoring system).
What if you still need more performance after doing all of this tuning? One option is to use SSDs. Shutterfly uses Facebook’s flashcache: kernel module to cache data on SSD. Designed for MySQL/InnoDB. SSD in front of a disk, but exposed as a single mount point. This only makes sense when you have lots of physical I/O. Shutterfly saw a speedup of 500% w/ flashcache. A benefit is that you can delay sharding: less complexity.
I made the trip to Santa Clara for 10gen’s big MongoDB conference, MongoSV, which starts in just a few minutes. It should be a great event, and I thought it’d be fun to report back on the event as it happens for those of you who couldn’t make the trip.
I’ll be trying to update this blog with posts from each session I attend, so subscribe or refresh to get the latest. If there’s a specific session you’d like to hear about, let me know in the comments and I’ll make an effort to get to that one!
Here’s a link to the entire series of posts.
MongoDB’s Storage Engine Bit by Bit
Presented by Mathias Stearn
Mathias is set to present about MongoDB storage internals and the room is filling up. Somebody asks a question: what is his favorite Mongo feature?. He’s partial to find-and-modify but his favorite is just general ease-of-use. He’s also talking a bit about the new aggregation framework which should be coming out in 2.2. Declarative aggregations: much easier than map reduce and better performance. Another question: what’s the most common mistake? One: thinking MongoDB is the same as a relational DB. Two: thinking MongoDB is totally different from a relational DB. Basic concepts still matter but also can’t think “too relational”. The room is now SRO and the talk is starting.
Mathias is talking about the structure of the files in /data/db. You’ll see mongod.lock, which is a lock file that just contains the PID (a few bytes). There’s also a .ns file per DB, which is 16MB and is basically just a big hash table (more details later). There are also data files: dbname.0, dbname.1, etc. Those files grow up to 2GB, and there’s always a pre-allocated empty file: this is why an almost empty DB will still use >200MB of space. Trading off space for speed.
Now we’re hearing about mmap: an OS feature that maps a file on disk into virtual memory. A common confusion point is that “MongoDB is an in-memory DB”. Mongo maps to virtual memory: doesn’t need to fit in RAM. Now he’s showing a diagram of virtual address space: kernel space, stack, heap, program text, etc. Program text is mmap’ed just like Mongo data files. Showing where the mapped data files live in the virtual address space.
Now Mathias is talking about the 32-bit limitation caused by relying on mmap: roughly 2.5GB of space left over in the address space after kernel, heap, stack, program text, etc. (start w/ 4GB: 2^32). With journaling this is effectively even smaller: don’t use 32-bit. With journaling you still effectively have 64TB of capacity w/ 64-bit.
Describing a doubly-linked list: data structure used a lot in internals. Each element has both a next & prev pointer to allow traversal in either direction.
Now we’re talking about the .ns file: Namespace Details. 16MB divided up into ~1KB buckets. Inside each bucket we have the name of a collection, some stats (size, count, etc.), pointers to first extent & last extent (doubly-linked list), free list (actually an array of bucketed free-lists), and index data. There are namespaces for collections and also one for each index. There’s one special namespace: $freelist for free extents. ~16000 namespaces per database, which is usually enough but can be configured if more is needed.
The pointers are stored as DiskLocs, which is a data structure with a fileNum and an offset: two ints (a 64-bit struct). Note that max offset is 2^32 - 1: same as max file size.
Now we’re talking about data files (.0, .1, etc.). Each data file is broken up into extents: contiguous blocks w/in a file that is owned by a single namespace. Each extent has a location, pointers to the next and previous extents, pointers to the first and last records in the extent, and the length of the extent. Then, obviously, there’s a data section. The first and last record pointers are just 4-byte offsets into the data section. Extents are nice because they keep data local per namespace.
Now on to records. Each record has a length, offset w/in the extent, and then offsets of the next and previous record. Lastly there’s data (BSON objects, b-tree buckets, etc.). How big is the data section? In the BSON case it’s bigger than the object it contains: there’s some padding that gets pre-allocated to try to avoid moves when documents grow. That padding factor is dynamic and calculated by how often moves end up occurring. The more documents move, the larger the padding factor will be. It’s local to the namespace (i.e. index collections never grow). Padding factor can vary between 1 (no padding) and 2 (double data size), starts at 1. The extent offset is necessary to go from a record back to the extent header data.
Raw queries ($natural order) just walk over the next/prev links directly.
Talking about the “swiss-cheese problem”. If documents change sizes a lot you can end up with gaps that are too small to be used by any new documents: hence the new “compact” option to remove those gaps. This is a blocking operation: best to run on a secondary node and then step-down the master. Right now there’s not great metrics about fragmentation: can get some clue by looking at dataSize vs storageSize. If storageSize - lastExtentSize » dataSize then there is a lot of empty space that compact would clean up.
On initial RS sync or re-sync you’ll basically get compaction too.
Now we’re talking about the B-tree implementation. Regularly B-tree w/ a few modifications. Each bucket is 8KB, w/ keys in sorted order. One modification is the way splits happen: rather than always splitting in the middle of the node Mongo will do a 90/10 split if you’re doing incremental / ordered insertions. That can save a lot of space.
Bucket data structure: parent link, link to the right-most child, and key nodes. Key nodes are fixed size blocks, each with a link to the left child, data record, and a key offset. The offset points into a key objects section (BSON object w/ stripped key names and values). That key objects section actually has new format in 2.0 (v1 index format): more compact than BSON. Key nodes are sorted (can do binary search), but key objects are unsorted and not fixed size.