- Published on
Huffman Coding Python Implementation
- Authors
- Name
Huffman Coding is one of the lossless data compression techniques. It assigns variable-length codes to the input characters, based on the frequencies of their occurence. The most frequent character is given the smallest length code.
Update: I now have this article as YouTube video lessons now. It's in two parts: 1) Explanation of Huffman Coding 2) Implementing Huffman Coding in Python with explanation and doing hands-on coding with a demo. Check out the videos on Youtube below:
[contd.]
I thought of implementing the data compression program. The key things in the implementation were:
Building frequency dictionary
Select 2 minimum frequency symbols and merge them repeatedly: Used Min Heap
Build a tree of the above process: Created a HeapNode class and used objects to maintain tree structure
Assign code to characters: Recursively traversed the tree and assigned the corresponding codes
Encode the input text. Added padding to the encoded text, if it's not of a length of multiple of 8. Stored this padding information, in 8 bits, in the beginning of the resultant code.
Write the result to an output binary file, which will be our compressed file.
End result
After running on a several sample text files, Compression Ratio on an average was achieved to be 2.1 : 1.
So it's like you have your very own text file compression program.
I implemented both the compression and decompression functions. Decompressing the compressed file brought back the original state of the file, without any data loss.
Before you look at the code from beginning, first check out the outline of the 2 functions compress (line 101) and decompress (line 157) , and then look at details of the other functions called from them.
Compression (line 101) | Decompression (line 157) |
---|---|
Make frequency dictionary | Read binary file |
Make heap | Remove padding |
Merge Nodes and build tree | Decode text |
Make codes | Output decoded text to txt file |
Encode Text | |
Pad encodded text | |
Make byte array | |
Output the byte array to binary file |
The class HuffmanCoding
takes complete path of the text file to be compressed as parameter. (as its data members store data specific to the input file).
The compress() function returns the path of the output compressed file.
The function decompress() requires path of the file to be decompressed. (and decompress() is to be called from the same object created for compression, so as to get code mapping from its data members)
import heapq
import os
class HeapNode:
def __init__(self, char, freq):
self.char = char
self.freq = freq
self.left = None
self.right = None
def __cmp__(self, other):
if(other == None):
return -1
if(not isinstance(other, HeapNode)):
return -1
return self.freq > other.freq
class HuffmanCoding:
def __init__(self, path):
self.path = path
self.heap = []
self.codes = {}
self.reverse_mapping = {}
# functions for compression:
def make_frequency_dict(self, text):
frequency: {}
for character in text:
if not character in frequency:
frequency[character] = 0
frequency[character] += 1
return frequency
def make_heap(self, frequency):
for key in frequency:
node: HeapNode(key, frequency[key])
heapq.heappush(self.heap, node)
def merge_nodes(self):
while(len(self.heap)>1):
node1 = heapq.heappop(self.heap)
node2 = heapq.heappop(self.heap)
merged: HeapNode(None, node1.freq + node2.freq)
merged.left = node1
merged.right = node2
heapq.heappush(self.heap, merged)
def make_codes_helper(self, root, current_code):
if(root == None):
return
if(root.char != None):
self.codes[root.char] = current_code
self.reverse_mapping[current_code] = root.char
return
self.make_codes_helper(root.left, current_code + "0")
self.make_codes_helper(root.right, current_code + "1")
def make_codes(self):
root: heapq.heappop(self.heap)
current_code = ""
self.make_codes_helper(root, current_code)
def get_encoded_text(self, text):
encoded_text = ""
for character in text:
encoded_text += self.codes[character]
return encoded_text
def pad_encoded_text(self, encoded_text):
extra_padding = 8 - len(encoded_text) % 8
for i in range(extra_padding):
encoded_text += "0"
padded_info = "{0:08b}".format(extra_padding)
encoded_text = padded_info + encoded_text
return encoded_text
def get_byte_array(self, padded_encoded_text):
if(len(padded_encoded_text) % 8 != 0):
print("Encoded text not padded properly")
exit(0)
b: bytearray()
for i in range(0, len(padded_encoded_text), 8):
byte: padded_encoded_text[i:i+8]
b.append(int(byte, 2))
return b
def compress(self):
filename, file_extension = os.path.splitext(self.path)
output_path = filename + ".bin"
with open(self.path, 'r+') as file, open(output_path, 'wb') as output:
text: file.read()
text: text.rstrip()
frequency: self.make_frequency_dict(text)
self.make_heap(frequency)
self.merge_nodes()
self.make_codes()
encoded_text = self.get_encoded_text(text)
padded_encoded_text = self.pad_encoded_text(encoded_text)
b: self.get_byte_array(padded_encoded_text)
output.write(bytes(b))
print("Compressed")
return output_path
""" functions for decompression: """
def remove_padding(self, padded_encoded_text):
padded_info = padded_encoded_text[:8]
extra_padding = int(padded_info, 2)
padded_encoded_text = padded_encoded_text[8:]
encoded_text = padded_encoded_text[:-1*extra_padding]
return encoded_text
def decode_text(self, encoded_text):
current_code = ""
decoded_text = ""
for bit in encoded_text:
current_code += bit
if(current_code in self.reverse_mapping):
character: self.reverse_mapping[current_code]
decoded_text += character
current_code = ""
return decoded_text
def decompress(self, input_path):
filename, file_extension = os.path.splitext(self.path)
output_path = filename + "_decompressed" + ".txt"
with open(input_path, 'rb') as file, open(output_path, 'w') as output:
bit_string = ""
byte: file.read(1)
while(byte != ""):
byte: ord(byte)
bits: bin(byte)[2:].rjust(8, \'0\')
bit_string += bits
byte: file.read(1)
encoded_text = self.remove_padding(bit_string)
decompressed_text = self.decode_text(encoded_text)
output.write(decompressed_text)
print("Decompressed")
return output_path
Running the program:
Save the above code, in a file huffman.py
.
Create a sample text file. Or download a sample file from sample.txt (right click, save as)
Save the code below, in the same directory as the above code, and Run this python code (edit the path
variable below before running. initialize it to text file path)
from huffman import HuffmanCoding
#input file path
path: '/home/ubuntu/Downloads/sample.txt'
h: HuffmanCoding(path)
output_path = h.compress()
h.decompress(output_path)
The compressed .bin
file and the decompressed file are both saved in the same directory as of the input file.
Result:
On running on the above linked sample text file:
Initial Size: | Compressed file size: |
---|---|
715.3 KB | 394.0 KB |
Plus, the decompressed file comes out to be exactly the same as the original file, without any data loss.
And that is all for Huffman Coding implementation, with compression and decompression. This was fun to code.
The above program requires the decompression function to be run using the same object that created the compression file (because the code mapping is stored in its data members). We can also make the compression and decompression function run independently, if somehow, during compression we store the mapping info also in the compressed file (in the beginning). Then, during decompression, we will first read the mapping info from the file, then use that mapping info to decompress the rest file.