Big Data Tar Pits
What is Big Data
My personal definition of Big Data is:
Any Data Processing that requires special considerations to overcome time and/or space limitations. Yours Truly
Which lines up pretty closely with Wikipedia’s definition of Big Data:
Big data primarily refers to data sets that are too large or complex to be dealt with by traditional data-processing application software. Wikipedia
In essence “Big Data” is an ambiguous term that is generally understood to mean any data that can’t processed using “traditional”, for some definition of traditional, data processing mechanisms. The need for special data processing usually stems from either the speed which the data is changing, call the data velocity; Or the size of the data, called the volume.
Managing big data usually means designing a system that will allow for distributing the data, and the processing of the data, across multiple compute and storage nodes in a cluster aka a Distributed Data Processing System. Even if the exact mechanics of distributing the data and computation is abstracted behind a framework like Hadoop, Spark, or Flink the distributed nature of the system needs to be taken into account.
The size (aka. Volume) and speed at which the data changes (aka Velocity) of a Big Data system usually break assumptions that are common in traditional applications. This means that you need to take on a different perspective when designing your apps. More importantly there are a couple tar pits people tend to fall into if they are not careful and I am hoping this post will help you avoid some of them.
Tar Pits
To help discuss our tar pits we are going to make up an example use case. Our use case is going to be a game that sends a players current location on the game map to our system. Our system needs to collect the events, store them and make them available for ad-hoc query analysis. In addition our system needs to:
- Maintain a real-time map of current player locations.
- Maintain a per-player timeline of the player’s movements around the map.
Time is an Illusion
When processing data in a distributed system, you are most likely processing multiple data items in parallel across a cluster of machines. The distributed and parallel nature of the processing means that you lose most guarantees over message ordering, especially global ordering.
In our game, multiple users are sending us messages with updates to their location. User A might live closer to our data center than user B. It is easy to imagine how an message from user B might arrive after a message from user A, even though the event occurred and was sent before user A’s message. Even further once the messages are received the are put on a queue (e.g. a Kafka queue) and as nodes on the cluster become free they will pickup the next available message form the queue. A node might pick up a message from user A, and another node pickup the message from user B. The second node might finish processing message B and send the result down to the next step in the pipeline before the first node is done processing message A. The point is even if you could receive messages in order, processing them in parallel means you lose any guarantees that the messages will finish processing in the same order.
A seemingly easy solution to this whole mess is to use timestamps to order events. Basically our game will attach a timestamp to each message before the message is sent. The issue here is that you can’t rely on client machines to have accurate timestamps. You can’t even rely on machines you control to have accurate and synchronized timestamps. Timestamps on a single machine can jitter giving subsequent messages timestamps that occur “before” previous messages.
Relying on timestamps to order events is a common tar pit I see distributed solutions fall into. The issue with using timestamps is that they work 90% of the time and might honestly be good enough for your use case. However a much more reliable and future-proof solution is to utilize and explicit ordering counter, like a sequence number. You cannot achieve global ordering, but you can achieve strong local ordering by using an event sequence number.
Going back to our example, there is no way to achieve exact ordering of messages across all player, but we don’t need global ordering either. What we need is ordering of events per-player. Like we said we can’t use timestamps to order events even for a single player session. What we can do though is use a sequence number where each time the game sends a message for a player it increments that player’s event sequence number.
When developing the map of the current player locations, whenever we receive a new location from a player we can check the sequence number to see if the “new” location is really newer than the last location we have for that player or not. Keep in mind that this check-and-update operation needs to be an atomic operation. Most distributed key-value stores, like HBase and DynamoDB, and relational databases, like Postgres, provide some sort of atomic check-and-update operation.
Along the same lines each time we receive an event from a player we can order the events by sequence number to make sure we are displaying the timeline of movements in the correct order.
Data Partitioning
By definition Big Data sets are too large to process as one single sum. In all but the most extreme cases you will want to process only a subset of the data and partitioning is how the data is sliced up into subsets. Partitioning in the distributed processing world usually comes in two flavors: Partitioning the data for processing and partitioning the data for storage and retrieval, i.e. querying and analysis.
Partitioning Data For Processing
Ideally partitioning the data during processing should allow for the uniform distribution of the data across the available computation nodes. In other words we want each of the nodes in the cluster to process a similar amount of data, for new data to be available for node to process as soon as it is done processing the current data item, and for a node to be available to process a data item as soon as the data is created. At the same time there might be ordering constraints that need to be taken into account or a need to group similar items during processing.
Partitioning the data during processing also provide some partial ordering guarantees. A partitioning scheme that guarantees that all the events for a given player will be processed by the same node means that the node can inspect the state of the current player knowing that no other node will also be inspecting and updating the same player’s state. In our example we would want a data processing partitioning scheme that will guarantees that all messages from a given player are processed by the same node, so that the processing node can inspect the current “last known” location and possibly update it without other nodes also updating the same player’s current location causing race conditions. We might not want the entire data processing pipeline to utilize the same partitioning scheme, but we do want the portion that checks and updates the current state of the player to use such a scheme.
If we do follow such a partitioning scheme when updating a users latest location, we won’t need an atomic check-and-update operation any more since we know that only one node will be updating a given players location. In other words we might be updating the latest location of multiple players in parallel, but for any given player, because of how we partitioned the data, we know that only one node will be updating that players latest location. The downside is that we might have a sub-optimal partitioning scheme from a data processing perspective. If you have 100 nodes, but only have 70 players currently playing the game, our partitioning scheme means that at-most 70 nodes will be used for processing the data from those players and that at least 30 nodes will be sitting idle without any data to process.
Partitioning Data for Storage
Partitioning the data for storage and retrieval is a little more complex because the “ideal” partitioning scheme doesn’t exist. Instead how the data is partitioned will highly depend on how the data is expected to be accessed. To illustrate that let assume you want to two types for analysis on your accumulated player data:
The first type of analysis is to build up reports and aggregates on how many players performed certain actions or reach certain milestones over a given time line. For example how many players beat level 10 this month compared to last month. In SQL that would look something like this:
Action Frequency
SELECT
actions.type, sum(1) as cnt
FROM mydb.actions
WHERE
actions.timestamp > MONTH_START and actions.timestamp < MONTH_END
and actions.type in (LIST OF EVENT TYPES I CARE ABOUT)
GROUP BY actions.type;
Case 1: No Partitioning
No partitioning is not a practical approach because if your data is all in just one big blob, that blob will eventually become too big to read and process. At the very least data is usually partitioned by date, which in this case would mean the queries above could limit their search to just the data in the date range they need but also means the queries will have to parse through all the data in that date range.
For example, the “Action Frequency” will have to parse through the all actions of each player, instead of being able to focus on specifically the actions listed in the query.
Case 2: Partitioning By Action Type - Player - Date
With this partitioning schema the “Action Frequency” query will first find the partitions for only the action types in the query, and then for each player sub-partition will only read the data for the dates within the query’s range. This partitioning schema allows the query engine to hone in and read only the data required to execute the query.
At the same time this partitioning scheme is designed to fit the queries criteria. If a different query is used, for example to find the location where each player was last seen, this partition schema would be of little help and the query engine would still have to read all the data for all the actions for all the players within the given date range. Understanding the types of queries and analysis you expect to be performed on the data, especially the attributes most likely to be used as filters, will help you design a partitioning schema to fit those queries.
Have a Reprocessing Plan
When working with a distributed system, the question isn’t if something will fail, it is when will it fail. When processing a million events a day a “1 in a million” chance is a daily occurrence. Network connections will fail, hosts in your cluster will have storage issues, you will write and push buggy code. As a result trying to predict and react to every possible issue in a distributed data processing system is impractical. A much more practical approach is to have a data reprocessing plan, that will easily allow you to identify and remove any corrupted data and then re-submit your source data for processing by the system. Generally speaking a reprocessing plan, like automated testing, is much easier to build while the system is being defined and built instead of retroactively trying to create a re-processing plan for an already operational system.
One major key when defining your data processing plan is to determine if you need to process each data element at-least-once or at-most-once. Ensuring any data element gets processed exactly-once is very difficult. In theory ensuring each data element gets processed only once sound plausible, but to do so would require surgical cleaning up of corrupt data and then reprocessing of only the data elements corresponding to the corrupt data all while the system is still actively processing other data. Another way of looking at that is the requirement to re-process data means that your system will have to be able to handle processing a given data element more than once. Finally different portions of your pipeline might have different data processing requirements.
At least once processing means that you are able to process the same data element multiple times and produce the correct result. This is also called idempotent processing. An example of this is determining a players last-seen location. For any given data element I can check the event-sequence (see: Time is an Illusion above) of the current event is greater than the event-sequence of the last known location: If the current sequence is greater I update the last known location and sequence number, otherwise I ignore the current event. This means if I see the same event multiple times, my system will still produce the correct last seen location.
At once processing processing means that your system can’t handle seeing the same event multiple times, but can handle data loss. An example of this use case is calculating averages: If I incorporate the same event multiple times into the average calculation, my average will be skewed. At the same time, if miss a few data points from my average calculation my average will still be correct within a given tolerance level. So when I am pre-computing an average I might chose to use at most once processing, but when I am computing a min or max value I would prefer at least once processing since those are idempotent operations.
Ok, Now What?
Why write this? Because I have worked with distributed data processing systems that fallen into at leas of the tar pits above and resulted in significant time and cost to try and fix the issues those tar pits caused. Making this even worse getting a system out of any of those tar pits will come with even more costs, and might not be feasible without a significant re-design of the system. My hope is that this article will help you avoid falling into those tar pits.
What is the Fediverse
By the time you are reading this you’ve probably already heard of “The Fediverse”, or at least Mastodon. At the time I am writing this post, the Fediverse is the fastest growing social media platform. At the same time, most people don’t know what the Fediverse is or what makes it different from Facebook, TikTok or X, formerly Twitter.
Lets start with what the Fediverse is. Wikipedia defines the Fediverse as:
The fediverse (a portmanteau of “federation” and “universe”) is an ensemble of federated (i.e. interconnected) servers that are used for web publishing […] and file hosting, but which, while independently hosted, can communicate with each other.
More commonly the Fediverse is defined as the set of applications (e.g. Mastodon or PeerTube) that can communicate using the ActivityPub protocol, the servers and groups that host the applications, and the users of those applications and the content they create. All of that together makes up the Fediverse. The key difference between the Fediverse and traditional social media sites like Facebook or YouTube is the addition of the ActivityPub protocol and independent servers into the mix. Anyone can setup a Mastodon server, and anyone can write a new application that uses the ActivityPub protocol, and that server or that application can, within reason, communicate with any other server or application in the Fediverse.
DNS security has been getting a lot of attention these past couple of years. This has lead to a number of DNS security-enhancing standards to be proposed, with the three big ones being DNS-over-TLS, DNSSEC and DNS-over-HTTPS. In this article we will discuss all three of those standards, the threat model they assume and what protection the provide.
DevComo Bitcoin Transactions Presentation
Here is a link to the slides to the a presentation I gave at DevComo describing Bitcoin Transactions and how they work: The Anatomy of Bitcoin Transactions
FCC Filing in Support of Net Neutrality
With the internet having become an integral part of our Americans lives, it is necessary to protect and preserve free, open and nondiscriminatory internet access for all of us. Internet Service Providers are tasked with connecting users with the content and services available on the internet, not with regulating and managing what content users are able to connect to and how they connect.
It would be unacceptable for a phone company to redirect phone calls from one business to another and would be unfair for a phone company to charge different rates for equal service to two equal businesses. It would also be scandalous for a phone company to record customers phone conversations and then sell that data in order to inject advertisements into a customers telephone conversation.
In the same manner, it is and should be unacceptable for an Internet Service Provider to redirect requests from one website to another, and for an ISP to provide more bandwidth when requesting one site over another. It should be illegal for an ISP to record a customer’s web history and later sell that history to advertisers in order to inject targeted ads into the pages a customer has requested.
Title II places restrictions on phone companies that both protect consumers and create fertile ground for a healthy and robust communication infrastructure. In the same manner the public needs restrictions on ISP to both protect consumers from ISP overreach and create a healthy and fertile internet communication infrastructure that benefits all.
Introduction
Those who would give up essential Liberty, to purchase a little temporary Safety, deserve neither Liberty nor Safety.
Benjamin Franklin
There has been a recent push by governments and government agencies to ban End-to-End Strong Encryption, all under the guise of stopping terrorists and pedophiles. My hope here is to provide simple arguments in layman’s terms as to why banning strong end-to-end encryption will not improve the government’s, nor it’s agencies’, ability to catch the “Bad Guys”. In fact banning strong encryption will only daemonize legitimate users of strong encryption and undermine the rest of the population’s security, while having almost no effect on those who wish to use it for nefarious means.
Markov Chains
Introduction
A Markov Chain is a set of transitions from one state to the next; Such that the transition from the current state to the next depends only on the current state, the previous and future states do not effect the probability of the transition. A transitions independence from future and past sates is called the Markov Property. What we are going to do is explore Markov Chains through a little story and some code.
MiniMax and Tic-Tac-Toe
What it is
Lets start this post of with some techno-babble. Minimax is a depth-first search algorithm to find the least-lost strategy for zero-sum two person turn based games.
A zero-sum game is a game where, if one player loses, than another player must win the same number of points and vice-versa. For example chess is a zero-sum game because if one player wins (Lets say winning a score of +1) than the other player must lose (getting a score of -1), or the game is a draw and both players get a score of 0. We don’t give players that lose a score of 0 and the winner +1 because that would mean than one player won the game and the other player is at a draw, which is impossible in chess. The same goes for other games such as tic-tac-toe, Go, and four in a row. Another set of zero sum games are games where the total number of points is constant (think poker where players can’t buy chips) and if someone wins (a bet) than the rest of the players must lose points (money) equal to the number of points won.
Depth-first search means that we follow the algorithm all the way down until the end, and then start moving backwards, looking for the best move as we go. The reason why we have to follow the algorithm all the way to the end will become clear soon.
What Tor Users Connect To
The other day I was playing with my Tor node’s configuration when I found out that with absolutely no trickery you can get Tor to log some interesting addresses. One of the things Tor logs is the address of edge (aka. exit) connections, which tells you what the node is connecting to but not who the connection is for.
Naturally it would be interesting to know what other people are using Tor for, so I kept a couple of days worth of logs and wrote this little perl script to parse them out for me.
The "Freedom" of Ubuntu
The Problem
I’ve been running Ubuntu as my main OS for pretty much a couple of years now and I have to say I LOVE it. The hardware support is great, tons of apps, and its free. Well “free” as in beer for sure, but not really “free” as in freedom.
This past summer I moved back to Syria, where, as you all know, the US has restricted the imports on software products for “National Security” reasons. So I’ve gotten used to Google Code being blocked, SourceForge not letting me download stuff and all that jazz. Still Ubuntu worked like a champ and I could download anything I wanted from the repos. That was until I decided to update my system to 11.04.