LinuxCzar

Go, Python, Linux, and Operations. The website of Jack Neely.

There are many factors that limit the available bandwidth of a network link from point A to point B. Knowing and expecting something reasonably close to the theoretical maximum bandwidth is one thing. However, the latency of the link can vastly affect available throughput. This is called the Bandwidth Delay Product. It can be thought of as the “memory” of the link as well (although that memory is the send/receive buffers). This came into play when I was setting up a global monitoring system for my client’s geographically diverse data centers.

Warning: Math

The Bandwidth Delay Product, $BDP$ is found by multiplying the theoretical max throughput of the link, $BW$ by the Round Trip Time, $RTT$.

$$BW \times RTT = BDP$$

For example, a 100Mbps link at 80ms latency:

$$\frac{12.5 MiB}{seconds} \times 0.08 seconds = 1 MiB$$

This indicates that the TCP buffers on either side of the link must be able to store 1 MiB of data. There are many, many places in modern code where the default TCP buffer size is set to 64 KiB.

$$\frac{64 KiB}{0.08 seconds} = 800 KiB/s = 6.25 Mbps$$

Ouch. That 64 KiB buffer size really hurt the throughput on the 100 Mbps link. In fact, 64 KiB is a really painful for trans-continental links.

In Pictures

Below is a logarithmic graph of bandwidth vs latency at 64 KiB buffer sizes. The curve is available bandwidth and the blue vertical line is at the 80ms point.

Solutions

Check your socket code. There may be assumptions, limitations, or a configuration option in the code.

Linux kernel wise, most modern distributions come fairly well tuned for TCP performance. You may need to tune the default and maximum memory that can be allocated to a socket:

  • net.core.rmem_default
  • net.core.rmem_max
  • net.core.wmem_default
  • net.core.wmem_max

Also, the TCP minimum, initial, and maximum buffer size. These settings also apply to the TCP protocol in IPv6.

  • net.ipv4.tcp_rmem
  • net.ipv4.tcp_wmem

In Conclusion

I enjoy technical solutions to problems. If there’s a little math involved and a trick or two, I tend to follow that path for a solution. But there is one thing that will affect your TCP throughput even more than the Bandwidth Delay Product. Be absolutely certain that the other side of the connection is reading fast enough. TCP is designed to put back pressure on the writer if the reader isn’t fast enough or purposely slows down.

Armed with this knowledge, and a few tuning options I chased a problem for too long. In reality, there was some network congestion in a trans-continental link (no surprise there) and the tool I was using had a bug. It would block on TCP writes in its event loop, which caused it to slow down on reads, which mean the daemon on the other side would block on TCP writes.

Now, if only finding that bug was as simple.

Notes

f(x) = (64*1024)/x
set ylabel "Bandwidth (Bytes Per Second)"
set xlabel "Latency (Seconds)"
set title "Throughput with 64K TCP Window / Buffer Size"
set logscale y 2
set key off
set label "100 Mbps" at .2,18107200
set label "1 Gbps" at .2,1.298576e9
set label "80 ms" at 0.1,1.04858e6
set arrow from 0.08,65535 to 0.08,2.097152e9 nohead lc rgb 'blue'
set terminal png size 800,600
set output 'output.png'
plot [0:1] [65536:2.097152e9] f(x), 13107200 with dots, 1.048576e9 with dots

Comments

I’ve been experimenting with Cyanite to make my Graphite cluster more reliable. The main problem I face is when a data node goes down the Graphite web app, more or less, stops responding to requests. Cyanite is a daemon written in Clojure that runs on the JVM. The daemon is stateless and stores timeseries data in Cassandra.

I found the documentation a bit lacking, so here’s how to setup Cyanite to build a scalable Graphite storage backend.

  1. Acquire a Cassandra database cluster. You will need at least Cassandra 3.4. The Makefile tests use Cassandra 3.5. I used Cassandra 3.7 in my experiments which is the current release as of this writing. (Note Cassandra’s new Tick-Tock based release cycle.)

    Parts of the documentation indicated that Elasticsearch were required. That is no longer the case. Cyanite must store a searchable index of the metrics it has data points for so that it can resolve glob requests into a list of metrics. Example:

    carbon.agents.*.metricsReceived
    

    This is now done in Cassandra using SASI indexes which enable CQL SELECT statements to use the LIKE operator. This is the feature that requires a more recent Cassandra version that you may be running in production.

  2. Clone the Cyanite Git repository. There are no tags or releases. However, the rumor at Monitorama 2016 is that Cyanite is a stable and scalable platform. So I just grabbed the master branch.

    git clone https://github.com/pyr/cyanite.git
    
  3. Create a Cassandra user depending on your local policy. Import the schema to initially create the keyspace you will use. The schema is found in the repository:

    doc/schema.cql
    

    Here, I altered the schema to set the replication factor I wanted. So I created my keyspace like this:

    CREATE KEYSPACE IF NOT EXISTS metric WITH replication =
    {'class': 'SimpleStrategy', 'replication_factor': '3'}
    AND durable_writes = true;
    

    I’m only replicating in a Cassandra database that lives in a single data center. No cross data center replication strategies here…yet.

  4. Install Leiningen. This is the build system tool used by the Cyanite project. Its very friendly seeming and installs locally into your home directory. This allows you to build JARs and other distributable versions of the code.

  5. I need to distribute code as Debian packages for Ubuntu. Fortunately, we have a target to build just that.

    $ cd path/to/cyanite/repo
    $ lein fatdeb
    

    This should produce artifacts in the target/ directory.

  6. Install the Cyanite packages. Configure /etc/cyanite.yaml to match your storage schema file (from carbon-cache.py) and with the connection information about your Cassandra cluster.

    An example configuration with additional documentation can be found in the Cyanite repo.

    doc/cyanite.yaml
    

    Here is a sanitized version of my config. This required some parsing of the source to find needed options.

     1 # Retention rules from storage-schema.conf
     2 engine:
     3   rules:
     4     '^1sec\.*': [ "1s:14d" ]
     5     '^1min\.*': [ "60s:760d" ]
     6     '^carbon\..*': [ "60s:30d", "15m:2y" ]
     7     default: [ "60s:30d" ]
     8 
     9 # IP and PORT where the Cyanite REST API will bind
    10 api:
    11   port: 8080
    12   host: 0.0.0.0
    13 
    14 # An input, carbon line protocol
    15 input:
    16   - type: carbon
    17     port: 2003
    18     host: 0.0.0.0
    19 
    20 # Store the metric index in Cassandra SASI indexes
    21 index:
    22   type: cassandra
    23   keyspace: 'metric'
    24   username: XXXXXX
    25   password: YYYYYY
    26   cluster:
    27     - cas-000.foobar.com
    28     - cas-001.foobar.com
    29     - cas-002.foobar.com
    30 
    31 # Time drift calculations.  I use / trust NTP.
    32 drift:
    33   type: no-op
    34 
    35 # Timeseries are stored in Cassandra
    36 store:
    37   keyspace: 'metric'
    38   username: XXXXXX
    39   password: YYYYYY
    40   cluster:
    41     - cas-000.foobar.com
    42     - cas-001.foobar.com
    43     - cas-002.foobar.com
    44 
    45 # Logging configuration.  See: https://github.com/pyr/unilog
    46 logging:
    47   level: info
    48   console: true
    49   files:
    50     - "/var/log/cyanite/cyanite.log"
    51   overrides:
    52     io.cyanite: "debug"
    
  7. Cyanite should be startable at this point. You can test that it accepts carbon line protocol metrics and that they are returned by the Cyanite REST API.

  8. Package and install Graphite-API along with the Cyanite Python module. Graphite-API is stripped down version of the Graphite web application that uses plugable finders to search different storage backends as a Flask application. Python’s Pip can easily find these packages. This is a WSGI application so use what you would normally deploy these applications with. I use mod_wsgi with Apache to run this on port 80.

    A sample /etc/graphite-api.yaml to configure Graphite-API to use the Cyanite plugin and query the local Cyanite daemon.

     1 # Where the graphite-api search index is built
     2 search_index: /var/tmp/graphite-index
     3 
     4 # Plugins to use to find metrics
     5 finders:
     6   - cyanite.CyaniteFinder
     7 
     8 # Additional Graphite functions
     9 functions:
    10   - graphite_api.functions.SeriesFunctions
    11   - graphite_api.functions.PieFunctions
    12 
    13 # Cyanite Specific options
    14 cyanite:
    15   urls:
    16     - http://127.0.0.1:8080
    17 
    18 time_zone: UTC
    

    My plan here is that I can deploy many of these Cyanite / Graphite-API machines in a load balanced fashion to support my query and write loads. They are completely stateless like any good web application so choose your favorite load balancing technique.

At this point you should have a basic Cyanite setup that is able to answer normal Graphite queries and ingest carbon metrics. You might want to use a tool like carbon-c-relay to route metrics into the Cyanite pool. You could point Grafana directly to the load balanced Graphite-API or use the normal Graphite web application (if you like the Graphite composer) and list the Graphite-API load balanced VIP as the single CLUSTER_SERVERS entry.

This should at least get you going with Cyanite as a Graphite storage backend. There will be much tuning and testing to transform this into a scalable system depending on your exact setup. I am just starting down this path and may have more to share in the future. Or it may blow up on me. Time will tell.

Update 2016/07/19: There are several other Graphite storage backends that I’m aware of. All are Cassandra based.

What am I missing?

Comments

I’ve updated Buckytools, my suite for managing at scale consistent hashing Graphite clusters, with a few minor changes.

Sparse File Support

The buckyd daemon now supports working with sparse Whisper DB files on disk. In this case its assumed that you have carbon-cache.py daemons running with:

WHISPER_SPARSE_CREATE = True

Any new Whisper files that buckyd copies into place will also be checked, in 4KiB blocks, for areas that can be made sparse. Therefore, when running bucky rebalance files that were sparse on one server can be moved to a new server and recreated as sparse files.

Using bucky tar works as before but the generated archives do not have the GNU sparse types set and if expanded by hand will not automatically result in sparse files.

The bucky du command works as before and reports the apparent size of the Whisper files on disk. Similar to:

du -hs --apparent-size

Restoring tarballs with bucky restore attempts to create sparse files in the cluster.

To enable support for sparse Whisper DB files run the daemon with the -sparse option:

description "Buckyd, the Buckytools daemon"
author      "Jack Neely <[email protected]>"

start on startup
stop on shutdown

setuid graphite

exec /usr/bin/buckyd --sparse \
    graphite010:2104:a \
    graphite011:2104:a \
    graphite012:2104:a

Bucky Restore Bug Fixes

I can tell that we restore tarballs a lot using these tools. Oops! I’ve corrected bucky restore to properly ignore directories in the tarballs rather than create 0 length Whisper DBs in the cluster.

bucky-pickle-relay

This tool that listens for Graphite’s Pickle Protocol and emits Graphite’s text protocol has had some more verbose debugging added. This tool is on my short list to work on some improvements to, such as not storing things in memory as strings. Go’s UTF8 strings are very resource and memory intensive.

Comments

In Go 1.5 we have the beginings of vendoring support. The easiest way to incorperate other projects into your Git repo is by using the following command. Short and dirty, but it works:

$ git subtree add --prefix vendor/gopkg.in/check.v1  \
    https://gopkg.in/check.v1 master --squash

Being that I wont remember the command, its now here in the blog. Also, don’t forget to set your environment:

$ export GO15VENDOREXPERIMENT=1

Comments

I’ve been researching quite a few algorithms for my client, Bruce, as I continue to scale out certain systems. I thought that getting them on my blog would be very useful for a future version of myself and many others. I suspect, and hope, that most folks that work in Systems Administration / Operations will find these at least familiar.

Flap Detection

Flap detection is the ability to determine rapid state changes so that one can take corrective action. The text book example is BGP route flapping where a new route is advertised and then withdrawn (a flap) multiple times in a short period. In general, this “scores” an event with higher scores the more frequent the event.

Warning: Math

For each event, a “penalty” $P$, “timestamp” $t$, and a boolean “isFlapping” variable is stored. $P$ is the current “score” for this event and is $0$ for an event that has not happened. $t$ is the timestamp, usually Unix Epoch time, of the last event.

Each time an event occurs we add a penalty value to the current penalty. The penalty value decays exponentially over time. A suppress limit is known which defines if the event is flapping when the event’s penalty is greater than the suppress limit. A lower reuse limit is known when the event is considered no longer flapping once the penalty value is less than the reuse limit.

Summary

Penalty$P$An event's current score.
Half-Life$h$How much time for half of the penalty to decay.
Timestamp$t$Unix Epoch time of last event.
Supress LimitPenalty > Suppress Limit == Flapping
Reuse LimitPenalty < Reuse Limit && Flapping == Recovery

Calculating for $P$

$$P(t_2) = P(t_1) \times e^{-(t_2 - t_1) \times ln(2) / h}$$

In Pictures

Notes

f(x) = 2.71828 ** (-x*log(2) / 60)
set ylabel "Penalty"
set xlabel "Time"
set key off
set terminal png size 800,600
set output 'output.png'
set label "Reuse Limit" at 40,11
set label "Suppress Limit" at 100,26
plot [0:240] x < 10 ? f(x) : x < 20 ? f(x-10)*10 : x < 30 ? f(x-20)*20 : f(x-30)*30, 10 with dots, 25 with dots

Comments

This is a update of an old post so its back at the top of the blog. Original posting was 2015-08-19.

I’m considering swapping out Statsd with Bitly’s statsdaemon for better performance. But, because Bitly’s version only accepts integer data I wanted to analyze our Statsd traffic. I figured I’d use my friend tcpdump to capture some trafic samples and replay them through a test box for analysis. Also, figuring out what are our hot metrics is very handy.

# tcpdump -s0 -w /tmp/statsd.pcap udp port 9125

Wireshark confirmed that this was the traffic I was looking for. A spot check looks like I have good integer data. How to dump out the traffic data so I can at least run grep and other common unix tools on the text data?

The Tcpreplay tools look very powerful. However, it can’t replay TCP traffic at a server daemon because it cannot synchronize the SYN/ACK numbers with the real client. But this is UDP taffic! UDP does provide checksums for data integrity so after changing the IP and MAC address via tcprewrite I had packets that my Linux box dropped because the checksum didn’t match.

Back to my friend Wireshark:

$ tshark -r /tmp/statsd.pcap -T fields -e data > data

This dumps out newline separated dump of the data field of each packet which is exactly what I need. Just not as hexadecimal encoded binary data.

import binascii
import sys

for s in open(sys.argv[1], "r").readlines():
    print binascii.unhexlify(s.strip())

Finally, I have newline separated list of the Statsd metrics in the pcap data and can finally run grep!

$ python unhex.py data | gawk -F: '/.+/ { print $1 }' | sort | uniq -c | sort -n

Now I also have a frequency distribution chart of the packet capture showing me what the most common metrics are.

Comments

Life is busy around the holidays with much family and new family additions. If there is one thing I wish I could remind the world it is that Christmas does last 12 days from Christmas Day until January 6th. So, Merry Christmas and a Happy 2016 to all! Here are some updates in no particular order.

Google Jump Hash Support in Buckytools

It looks like my Graphite cluster will be used for long term storage as we migrate toward Prometheus – which means I get no relief in my ever-scaling cluster. So, I’m adding Google’s Jump Hash to Buckytools. With this hashing algorithm you cannot remove a node in the middle of the hash ring without affecting all nodes afterwards in the ring so full support of Graphite’s replication factor will find its way into Buckytools as well. If I’ve not merged yet, take a look at the jump branch. The plan here is to be directly compatible with the FNV1a-Jump hashing now implemented in carbon-c-relay.

New Graphite Storage Backend

To support my Graphite work I’m moving toward a new storage backend that uses a Ceres-like approach. One or multiple files per metric using a columnar data format so the files grow based on the number of data points stored. Implementing this in Go will give incredible performance improvements and the new file format will give a marked reduction in storage space used. Some code for this is in my new Journal project.

Also, key to this project is being able to regularly compress older data. There are some interesting choices here that aught to help a lot with my storage requirements, but they make the archives not randomly accessible. Doing this for data past a certain age probably makes since.

This is probably the most challenging aspect of my plans for better Graphite scaling. Will I get the time? Will a better solution present itself? Will a entirely different method of handling telemetry that will get in the way of my plans of using Prometheus for our operational telemetry?

Graphite Metric Generator

Quick and dirty scaling tests for Graphite. This small tool will generate random metrics with random data, or just random data for a known list of metric names. The gentestmetrics command is part of Buckytools.

Usage of ./gentestmetrics:
Copyright 2015 42 Lines, Inc.
Original Author: Jack Neely

  -c int
        Number of random metrics to generate. (default 1000)
  -f string
        Use metric names found in this file.  Overrides -c.
  -i int
        Seconds between each timestamp/value pair per metric. (default 60)
  -l int
        Upper limit to randomly generated integer values. (default 100)
  -p string
        Prefix prepended to each metric name. (default "test.")
  -s int
        Time stamp to start -- iterations go BACKWARDS from here (default now)
  -v int
        Use the given value rather than a random integer.
  -x int
        How many timestamp/value pairs for each generated metric name. (default 1)

Metrics are printed to STDOUT. The most interesting usage is generating data for a list of known metrics delimited by newlines in a text file. Say with a 1 minute interval and 5 data points each:

./gentestmetrics -f metrics.txt -i 60 -x 5

The generated metrics are output in chronological order. However, the timestamp given via -s dictates the timestamp of the last data point rather than the first.

Is There Anyone Not Podcasting?

A few friends and I started the Practical Operations Podcast. We talk about the practical side of operations work in small to large scale environments and the not so obvious implications of various tools and how you might use them in real world conditions. Check us out!

Comments

I’ve been thinking about the “future” and how I can move my metrics system from what I have now to what I’d like to have. I run a large Graphite cluster. In the 26 million metrics per minute range with a quarter petabyte of provisioned storage. I integrate with a Naemon (Nagios fork) and Merlin setup for alerting.

I’ve been following Prometheus for a year and wondering about what the future might be like. Turns out, my fellow Operations team members and the Developers are also highly interested in Prometheus or a tool that offers Prometheus-like features. Specifically:

  • Support ephemeral hosts: Be smarter about how metrics are managed so that each host adds metric data without polluting the namespace with thousands of host entries.
  • Scale storage: No more Whisper files, storage needs to scale based on the timestamp/value pairs we store rather than a pre-allocated chunk of disk space.
  • Scale to a multi- data center environment: Graphite isn’t designed to make multiple clusters in different data centers of regions work well together. Although, modern versions of Grafana can really help there. Prometheus handles this style of sharding natively.
  • Ability to tag or label metrics: This makes ephemeral hosts work well combined with storage allocated as needed (rather than allocating all possible storage at once).
  • Support advanced metric based alerting: A strength of Prometheus and we can funnel through our Nagios-based monitoring to deal with pager groups etc.

So, how does one get from a monolithic Graphite setup to something like the above? A good question that I’m still trying to work out. Here’s what I’m thinking.

Global Systems:

  • Keep our Nagios based alerting system. It routes alerts, handles paging time periods, and, most importantly, handles alerts from many different sources via checks. Uses PagerDuty, email, etc. as appropriate.
  • Keep the current check_graphite code we are using to do metric based alerting. It enables us to transition when we can and roll back if needed.
  • Setup a Prometheus / AlertManager instance for any global aggregation and handle routing of alerts from Prometheus metric based checks to Nagios.
  • Upgrade Grafana to 2.5 (or better) to be the global user interface to metrics and be able to pull data from many different sources: Graphite, Prometheus, and Elasticsearch.
  • Scale Graphite storage with some form of black magic.

Sharded Systems: These systems are the infrastructure setup as part of each data center or region.

  • A Prometheus server to scrape metrics from local systems and services. Each Prometheus server maps and forwards data points to Graphite. Perhaps an identical second server for redundancy. Alerts and aggregate metrics flow upward toward the global Prometheus service.
  • A local Graphite/Statsd ingestion service found by service discovery to handle and route old school metrics.

The design of this gives me a Prometheus system we can used for advanced alerting and short term monitoring of metrics with great support for ephemeral hosts and labeling. Graphite still collects unconverted metrics and holds our historical or long term data. Graphite also serves as a long term storage option for Prometheus. (See this patch.)

What’s left unsolved? The hard parts:

  • Long term metric storage must scale and Whisper files isn’t cutting it. I need to spend some time with alternate Graphite backends or in writing one. Many of the existing options bring along their own challenges. I am required to keep full resolution data for years.

I have some ideas here. I had hopes for InfluxDB but it does not appear stable. But, I’m thinking something far simpler. More to come here.

Will this work? Will this scale to 20 million metrics or more? Perhaps its worth finding out.

Comments

If you run Graphite at scale you are interested in applying this patch.

I was tracing performance issues in my Graphite cluster and saw that for some queries the backend storage nodes were sending abnormally large pickle objects back to the Graphite web frontends. Python’s httplib was taking several minutes to download the pickle objects causing query times to skyrocket.

Testing against my backend storage nodes I found that with carefully crafted time ranges the whisper.py code would adjust the from and until times so that they were equal. This case was not detected and resulted in a read of the entire Whisper database. Only one valid point was returned and a list of many, many None values. Example:

curl -v -o /tmp/out.pkl 'http://storage-backendXXX/render/?local=1&format=pickle&from=1444249200&until=1444249440&target=<simple metric glob target>'

The query (identifying bits removed) I was testing with was returning pickle object that were just shy of 50MiB with an M. With the above patch those pickle objects shrink down to about 40KiB. This matched the size of the pickle objects generated with time ranges that included only 1 data point and did not cause the above bug.

These long queries were affecting response times for other queries as well. The following graph shows the difference in performance the patch achieved. The scale on the left is time to retrieve pickle objects from the backend storage nodes in seconds. The scale on the right is the number of retrievals per second.

Graphite Retrieval Times

Comments

The most difficult bit about running a Graphite cluster is handling queries or graph rendering during a cluster rebalance. Or after a partitioning event when you use replication in your consistent hashing cluster. Suddenly, graphs under report, have partial data, or might even be completely different when you reload the graph. Generally, your Graphite cluster becomes useless until sanity is restored.

I upgraded my Graphite setup in May to Graphite 0.9.13-ish. Its very close to the top of the 0.9.x branch of the Git repo. This has a bulk-fetch patch that drastically speeds up queries and rendering. It also changes how the webapp decides which metric TimeSeries to use if it gets more than one.

Getting more than one answer for a specific metric is what causes all the pain. This is caused by duplicate Whisper files for the same metric that do not have identical data in them. Exactly what happens during a rebalance. It also happens with replication set higher than 1, but without an outage the Whisper DBs are identical.

In these cases, instead of choosing the “most complete” TimeSeries to use (which causes partial results or under reported results) why not merge them together? Why hasn’t this been done before?

I patched the bulk-fetch CRDT query resolver to do just this. Now I wonder if I can continue to scale Graphite into the petabytes without having to replace the backend with a Cassandra or Riak database?

Comments