Java MapReduce for top N Twitter Hashtags

Back in 2015 i had to implement MapReduce job to extract top 15 hashtags from  twitter’s raw data in Hadoop. This was a part of Business Intelligence lecture exercise at Vienna University of Technology.

Regex used (not the best one by my opinion), for hashtag extraction, has following format:
String regex = “text\”:\\\”(.*)\\\”,\”source”;
The ‘source’ field comes always after ‘text’ field.

Final solution is implemented as one MapReduce job with overridden cleanup method in reducer.
The drawback of this implementation is that it uses more RAM than usual because one variable holds all hashtags in it. It is also less scalable than 2 job(Job-chaining method) solution.

The number of maps is usually driven by the number of DFS blocks in the input file and block size. Number of reducers can be manually set and is by default 1. MR job can also be executed without Reducer.

The most time is spent in “mapping” part of MR (as seen below). Distributing file/job over multiple maschines speeds up mapping as every mapper is run in parallel.

AT – 2 Mapper and 1 Reducer/Job

CAN – 3 Mapper and 1 Reducer
Total time spent by all maps in occupied slots (ms)=149603
Total time spent by all reduces in occupied slots (ms)=35271

NYC – 3 Mapper 1 Reducer
Total time spent by all maps in occupied slots (ms)=69959
Total time spent by all reduces in occupied slots (ms)=9528

SF – 3 Mapper 1 Reducer
Total time spent by all maps in occupied slots (ms)=134493
Total time spent by all reduces in occupied slots (ms)=10695

UK – 3 Mapper 1 Reducer
Total time spent by all maps in occupied slots (ms)=72951
Total time spent by all reduces in occupied slots (ms)=13101

So here is the actual source code:


/**
 * @author Dzenan Hamzic
 *
 */
import java.io.IOException;
import org.apache.log4j.Logger;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.StringTokenizer;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper extends
			Mapper<Object, Text, Text, IntWritable> {

		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			String regex ="text\":\\\"(.*)\\\",\"source";
			Matcher matcher = Pattern.compile(regex).matcher(value.toString());

			while (matcher.find()) {
				System.out.println(matcher.group(1));
				StringTokenizer itr = new StringTokenizer(matcher.group(1));
				while (itr.hasMoreTokens()) {

					String token = itr.nextToken();
					token = token.toLowerCase();
					if (token.startsWith("#")) {
						word.set(token);
						context.write(word, one);
					}
				}
			}

		}
	}

	/**
	 * Reducer
	 * it sums values for every hashtag and puts them to HashMap
	 */
	public static class IntSumReducer extends
			Reducer<Text, IntWritable, Text, IntWritable> {

		private Map<Text, IntWritable> countMap = new HashMap<>();

		@Override
		public void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {

			// computes the number of occurrences of a single word
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}

			// puts the number of occurrences of this word into the map.
			// We need to create another Text object because the Text instance
			// we receive is the same for all the words
			countMap.put(new Text(key), new IntWritable(sum));
		}

		/**
		 * this mehtod is run after the reducer has seen all the values.
		 * it is used to output top 15 hashtags in file.
		 */
		@Override
		protected void cleanup(Context context) throws IOException,
				InterruptedException {

			Map<Text, IntWritable> sortedMap = sortByValues(countMap);

			int counter = 0;
			for (Text key : sortedMap.keySet()) {
				if (counter++ == 15) {
					break;
				}
				context.write(key, sortedMap.get(key));
			}
		}
	}

	/**
	 * This mehtod sorts Map by values
	 * @param map
	 * @return sortedMap Map<K,V>;
	 */
	private static <K extends Comparable, V extends Comparable> Map<K, V> sortByValues(
			Map<K, V> map) {
		List<Map.Entry<K, V>> entries = new LinkedList<Map.Entry<K, V>>(
				map.entrySet());

		Collections.sort(entries, new Comparator<Map.Entry<K, V>>() {

			@Override
			public int compare(Map.Entry<K, V> o1, Map.Entry<K, V> o2) {
				return o2.getValue().compareTo(o1.getValue());
			}
		});

		// LinkedHashMap will keep the keys in the order they are inserted
		// which is currently sorted on natural ordering
		Map<K, V> sortedMap = new LinkedHashMap<K, V>();

		for (Map.Entry<K, V> entry : entries) {
			sortedMap.put(entry.getKey(), entry.getValue());
		}

		return sortedMap;
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "word count");
		job.setJarByClass(WordCount.class);
		job.setMapperClass(TokenizerMapper.class);
		// job.setCombinerClass(TopNCombiner.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Here are some resulting top hashtags.

tweetsAT (Austria)
#mtvstars 945
#geln\u00e4gel 347
#vienna 301
#job 264
#munich 228
#hiring 228
#like 214
#art 213
#retweet 208
#austria 205
#survivorseries 201
#quiz 196
#m\u00fcnchen 186
#jobs 180
#ikwydlsonitunes 178

 
tweetsCAN(Canada)
#hiring 4867
#job 4654
#jobs 3045
#careerarc 2832
#veterans 1041
#job: 955
#retail 899
#hiring! 826
#job? 825
#hospitality 775
#sales 604
#nursing 549
#healthcare 471
#it 330
#realestate 285

 
tweetsNYC (New York City)
#hiring 1283
#job 1262
#nyc 932
#jobs 801
#careerarc 670
#newyork, 490
#1dsv 398
#retail 303
#newyork 299
#aldubtherivalry 288
#job: 262
#hiring! 251
#ikwydlsatmidnight 249
#job? 227
#veterans 225

 
tweetsSF (San Francisko)
#hiring 2754
#job 2714
#jobs 1698
#careerarc 1448
#job: 611
#job? 546
#hiring! 530
#sanfrancisco, 528
#it 520
#veterans 428
#hospitality 427
#dubnation 412
#paloalto, 391
#mtvstars 388
#sales 350

 
tweetsUK (United Kingdom)
#london 420
#lovetheatreday 387
#job 362
#hiring 351
#ourday 305
#ktt 291
#tides 291
#nowplaying 265
#trndnl 264
#win 258
#jobs 254
#christmas 236
#pmqs 203
#winitwednesday 190
#housingday 177

 

Enjoy the source …

Leave a comment