Friday, October 12, 2012

c# - Managed Multithreading to archive text files

I have a process that generates large text files on a daily basis.The data for these files come from different sources.The files are then loaded to the warehouse.Once the data is loaded to the warehouse the files are of no use and we usually delete them after a certain number of days based on the retention policy.In order to save space on the file server I have an archival process that zips up these files.

I had used GZipStream from .Net framework for most of my archival programs, however the .net framework GZipStream is limited for files of size 4 GB.More about .Net GZipStream here http://msdn.microsoft.com/en-us/library/system.io.compression.gzipstream.aspx

I use ICSharpCode.dll for archival program and it is a free library available for download here
http://www.icsharpcode.net/opensource/sharpziplib/. There are plenty of examples on the site as well.

Below is code for a managed multi threaded archival program that zips up all the text files in a given folder.The number of threads depends on the number of processor the server has and is controlled from a configuration file.Make sure to reference the dll in your project and import it to the program using the using statement


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Configuration;
using System.IO;
using ICSharpCode.SharpZipLib.Zip;
using System.Threading;

namespace ZipFiles

{
    class Program
    {
        public static int iZipFileQueue = 0;
        public static int iMaxThreads = int.Parse(ConfigurationSettings.AppSettings["MAX_THREADS"]);
        static void Main(string[] args)
        {
            string sInputFolderPath = ConfigurationSettings.AppSettings["INPUT_FOLDER_PATH"];
            string sLogFilePath = ConfigurationSettings.AppSettings["LOG_FILE_PATH"];
            using (StreamWriter oWriter = new StreamWriter(sLogFilePath))
            {
                try
                {
                    Console.WriteLine("TIMESTAMP:{0} INFO:{1}", DateTime.Now.ToString(), "Zip file started.");
                    oWriter.WriteLine("TIMESTAMP:{0} INFO:{1}", DateTime.Now.ToString(), "Zip file started.");
                    DirectoryInfo oDirectoryInfo = new DirectoryInfo(sInputFolderPath);
                    foreach (FileInfo oFileInfo in oDirectoryInfo.GetFiles())
                    {
                        if (oFileInfo.Extension.Equals(".txt"))
                        {
                            oWriter.WriteLine("TIMESTAMP:{0} INFO:{1}", DateTime.Now.ToString(), "Started zipping file " + oFileInfo.Name);
                            Console.WriteLine("TIMESTAMP:{0} INFO:{1}", DateTime.Now.ToString(), "Started zipping file " + oFileInfo.Name);
                            ZipFile(oFileInfo);
                            oWriter.WriteLine("TIMESTAMP:{0} INFO:{1}", DateTime.Now.ToString(), "Finished zipping file " + oFileInfo.Name);
                            Console.WriteLine("TIMESTAMP:{0} INFO:{1}", DateTime.Now.ToString(), "Finished zipping file " + oFileInfo.Name);
                        }
                    }

                }

                catch (Exception ex)
                {
                    oWriter.WriteLine("TIMESTAMP:{0} INFO:{1}", DateTime.Now.ToString(), ex.Message);
                    Console.WriteLine("TIMESTAMP:{0} INFO:{1}", DateTime.Now.ToString(), ex.Message);
                }
                Console.WriteLine("TIMESTAMP:{0} INFO:{1}", DateTime.Now.ToString(), "Zip file finished.");
                oWriter.WriteLine("TIMESTAMP:{0} INFO:{1}", DateTime.Now.ToString(), "Zip file finished.");
            }
            while (iZipFileQueue >0)
            {
                Thread.Sleep(30000);
            }
        }

        private static void ZipFile(FileInfo oFileInfo)

        {
            while (iZipFileQueue >= iMaxThreads)
            {
                Thread.Sleep(30000);
            }
            FileCompress oFileCompress = new FileCompress(oFileInfo);
            if (ThreadPool.QueueUserWorkItem(new WaitCallback(CompressFile), oFileCompress))
            {
                Console.WriteLine("TIMESTAMP:{0} INFO:{1}", DateTime.Now.ToString(), "Started compressing " + oFileInfo.Name);
                iZipFileQueue++;
            }
        }
        public static void CompressFile(object p_oFileCompress)
        {
            try
            {
                FileCompress oFileCompress = (FileCompress)p_oFileCompress;
                oFileCompress.CompressFile();
            }
            catch (Exception ex)
            {
                throw ex;
            }
            iZipFileQueue--;
        }
        
    }
    public class FileCompress
    {
        FileInfo oFileInfo;
        public FileCompress() { }
        public FileCompress(FileInfo p_oFileInfo)
        {
            oFileInfo = p_oFileInfo;
        }

        public void CompressFile()

        {
            byte[] buffer = new byte[int.Parse(ConfigurationSettings.AppSettings["BUFFER_SIZE"])];
            ZipOutputStream oZipStream = new ZipOutputStream(File.Create(oFileInfo.FullName.Replace(".txt", ".zip")));
            oZipStream.SetLevel(9);
            ZipEntry zipEntry = new ZipEntry(oFileInfo.Name);
            try
            {
                oZipStream.PutNextEntry(zipEntry);

                using (FileStream stream = File.OpenRead(oFileInfo.FullName))

                {
                    int data = 0;
                    while ((data = stream.Read(buffer, 0, buffer.Length)) > 0)
                        oZipStream.Write(buffer, 0, data);
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
            finally
            {
                oZipStream.Close();
            }
        }
    }
}

No comments: