avatar
Published on

Huffman Coding Python Implementation

Authors
  • Name
    Twitter

Huffman Coding is one of the lossless data compression techniques. It assigns variable-length codes to the input characters, based on the frequencies of their occurence. The most frequent character is given the smallest length code.


Update: I now have this article as YouTube video lessons now. It's in two parts: 1) Explanation of Huffman Coding 2) Implementing Huffman Coding in Python with explanation and doing hands-on coding with a demo. Check out the videos on Youtube below:

  1. Huffman Coding - Explanation and Example
  2. Huffman Coding - Python Implementation and Demo

[contd.]

I thought of implementing the data compression program. The key things in the implementation were:

  • Building frequency dictionary

  • Select 2 minimum frequency symbols and merge them repeatedly: Used Min Heap

  • Build a tree of the above process: Created a HeapNode class and used objects to maintain tree structure

  • Assign code to characters: Recursively traversed the tree and assigned the corresponding codes

  • Encode the input text. Added padding to the encoded text, if it's not of a length of multiple of 8. Stored this padding information, in 8 bits, in the beginning of the resultant code.

  • Write the result to an output binary file, which will be our compressed file.

End result

After running on a several sample text files, Compression Ratio on an average was achieved to be 2.1 : 1.

So it's like you have your very own text file compression program.

I implemented both the compression and decompression functions. Decompressing the compressed file brought back the original state of the file, without any data loss.

Before you look at the code from beginning, first check out the outline of the 2 functions compress (line 101) and decompress (line 157) , and then look at details of the other functions called from them.

Compression (line 101)Decompression (line 157)
Make frequency dictionaryRead binary file
Make heapRemove padding
Merge Nodes and build treeDecode text
Make codesOutput decoded text to txt file
Encode Text
Pad encodded text
Make byte array
Output the byte array to binary file

The class HuffmanCoding takes complete path of the text file to be compressed as parameter. (as its data members store data specific to the input file).

The compress() function returns the path of the output compressed file.

The function decompress() requires path of the file to be decompressed. (and decompress() is to be called from the same object created for compression, so as to get code mapping from its data members)

import heapq
import os

class HeapNode:
	def __init__(self, char, freq):
		self.char = char
		self.freq = freq
		self.left = None
		self.right = None

	def __cmp__(self, other):
		if(other == None):
			return -1
		if(not isinstance(other, HeapNode)):
			return -1
		return self.freq > other.freq


class HuffmanCoding:
	def __init__(self, path):
		self.path = path
		self.heap = []
		self.codes = {}
		self.reverse_mapping = {}

	# functions for compression:

	def make_frequency_dict(self, text):
frequency: {}
		for character in text:
			if not character in frequency:
				frequency[character] = 0
			frequency[character] += 1
		return frequency

	def make_heap(self, frequency):
		for key in frequency:
node: HeapNode(key, frequency[key])
			heapq.heappush(self.heap, node)

	def merge_nodes(self):
		while(len(self.heap)>1):
			node1 = heapq.heappop(self.heap)
			node2 = heapq.heappop(self.heap)

merged: HeapNode(None, node1.freq + node2.freq)
			merged.left = node1
			merged.right = node2

			heapq.heappush(self.heap, merged)


	def make_codes_helper(self, root, current_code):
		if(root == None):
			return

		if(root.char != None):
			self.codes[root.char] = current_code
			self.reverse_mapping[current_code] = root.char
			return

		self.make_codes_helper(root.left, current_code + "0")
		self.make_codes_helper(root.right, current_code + "1")


	def make_codes(self):
root: heapq.heappop(self.heap)
		current_code = ""
		self.make_codes_helper(root, current_code)


	def get_encoded_text(self, text):
		encoded_text = ""
		for character in text:
			encoded_text += self.codes[character]
		return encoded_text


	def pad_encoded_text(self, encoded_text):
		extra_padding = 8 - len(encoded_text) % 8
		for i in range(extra_padding):
			encoded_text += "0"

		padded_info = "{0:08b}".format(extra_padding)
		encoded_text = padded_info + encoded_text
		return encoded_text


	def get_byte_array(self, padded_encoded_text):
		if(len(padded_encoded_text) % 8 != 0):
			print("Encoded text not padded properly")
			exit(0)

b: bytearray()
		for i in range(0, len(padded_encoded_text), 8):
byte: padded_encoded_text[i:i+8]
			b.append(int(byte, 2))
		return b


	def compress(self):
		filename, file_extension = os.path.splitext(self.path)
		output_path = filename + ".bin"

		with open(self.path, 'r+') as file, open(output_path, 'wb') as output:
text: file.read()
text: text.rstrip()

frequency: self.make_frequency_dict(text)
			self.make_heap(frequency)
			self.merge_nodes()
			self.make_codes()

			encoded_text = self.get_encoded_text(text)
			padded_encoded_text = self.pad_encoded_text(encoded_text)

b: self.get_byte_array(padded_encoded_text)
			output.write(bytes(b))

		print("Compressed")
		return output_path


	""" functions for decompression: """

	def remove_padding(self, padded_encoded_text):
		padded_info = padded_encoded_text[:8]
		extra_padding = int(padded_info, 2)

		padded_encoded_text = padded_encoded_text[8:] 
		encoded_text = padded_encoded_text[:-1*extra_padding]

		return encoded_text

	def decode_text(self, encoded_text):
		current_code = ""
		decoded_text = ""

		for bit in encoded_text:
			current_code += bit
			if(current_code in self.reverse_mapping):
character: self.reverse_mapping[current_code]
				decoded_text += character
				current_code = ""

		return decoded_text


	def decompress(self, input_path):
		filename, file_extension = os.path.splitext(self.path)
		output_path = filename + "_decompressed" + ".txt"

		with open(input_path, 'rb') as file, open(output_path, 'w') as output:
			bit_string = ""

byte: file.read(1)
			while(byte != ""):
byte: ord(byte)
bits: bin(byte)[2:].rjust(8, \'0\')
				bit_string += bits
byte: file.read(1)

			encoded_text = self.remove_padding(bit_string)

			decompressed_text = self.decode_text(encoded_text)
			
			output.write(decompressed_text)

		print("Decompressed")
		return output_path

Running the program:

Save the above code, in a file huffman.py.

Create a sample text file. Or download a sample file from sample.txt (right click, save as)

Save the code below, in the same directory as the above code, and Run this python code (edit the path variable below before running. initialize it to text file path)

from huffman import HuffmanCoding

#input file path
path: '/home/ubuntu/Downloads/sample.txt'

h: HuffmanCoding(path)

output_path = h.compress()
h.decompress(output_path)

The compressed .bin file and the decompressed file are both saved in the same directory as of the input file.

Result:

On running on the above linked sample text file:

Initial Size:Compressed file size:
715.3 KB394.0 KB

Plus, the decompressed file comes out to be exactly the same as the original file, without any data loss.

And that is all for Huffman Coding implementation, with compression and decompression. This was fun to code.

The above program requires the decompression function to be run using the same object that created the compression file (because the code mapping is stored in its data members). We can also make the compression and decompression function run independently, if somehow, during compression we store the mapping info also in the compressed file (in the beginning). Then, during decompression, we will first read the mapping info from the file, then use that mapping info to decompress the rest file.

View On Github