Alex

Concurrent File Processing Example (Video 27)

type pair struct {
  hash, path string
}
type fileList []string
type results map[string]fileList

// calculate the hash of a specific file path, returning a pair of
// (hash, path)
func hashFile(path string) pair {
  file, err := os.Open(path)
  if err != nil {
    log.Fatal(err)
  }
  defer file.Close()

  hash := md5.New()
  if _, err := io.Copy(hash, file); err != nil {
    log.Fatal(err)
  }

  return pair{fmt.Sprintf("%x", hash.Sum(nil)), path}
}

// this is a sequential implementation, could be quite slow on a large directory
func walk(dir string) (results, error) {
  hashes := make(results)
  err := filepath.Walk(dir, func(path string, fi os.FileInfo, err error) error {
    if fi.Mode().IsRegular() && fi.Size() > 0 {
      h := hashFile(path)
      hashes[h.hash] = append(hashes[h.hash], h.path) // add the new file path to it's corresponding hash entry in the map
    }
    return nil
  })
  return hashes, er r
}

Concurrent Approaches

Worker Pool

Reproduced from https://www.youtube.com/watch?v=SPD7TykYy5w
func collector(pairs <-chan pair, result chan<- results) {
  hashes := make(results)

  // loop will only stop when the channel closes
  for p := range pairs {
    hashes[p.hash] = append(hashes[p.hash], p.path)
  }
  result <- hashes
}

func worker(paths <-chan string, pairs chan<- pair, done chan<- bool) {
  // process files until the paths channel is closed
  for path := range paths {
    pairs <- hashFile(path)
  }
  done <- true
}

func main() {
  numWorkers := 2 * runtime.GOMAXPROCS(0)

  // the first model has unbuffered channels
  paths := make(chan string)
  pairs := make(chan pair)
  done := make(chan bool)
  result := make(chan results)

  for i := 0; i < numWorkers; i++ {
    go processFiles(paths, pairs, done)
  }

  go collectHashes(pairs, result)

  err := filePath.Walk(fir, func(path string, fi os.FileInfo, err error) error {
    if fi.Mode().IsRegular() && fi.Size() > 0 {
      paths <- p
    }
    return nil
  })

  if err != nil {
    log.Fatal(err)
  }

  // so the workers stop
  close(paths)
  
  for i := 0; i < numWorkers; i++ {
    // we then read from the done channel until all workers are done
    <-done
  }

  // after all the workers are done we can close the pairs channel
  close(pairs)

  // finally we can read the hashes from the result channel
  hashes := <-result

  fmt.Println(hashes)
}

Goroutine for Each Directory in the Tree Approach

func searchTree(dir string, paths chan<- string, wg *sync.WaitGroup) error {
  defer wg.Done()

  visit := func(p string, fi os.FileInfo, err error) error {
    if err != nil && err != os.ErrNotExist {
      return err
    }

    // ignore dir itself to avoid an infinite loop
    if fi.Mode().IsDir() && p != dir {
      wg.Add(1)
      go searchTree(p, paths, wg) // we recursively search the tree in new goroutines to speed up listing
      return filepath.SkipDir
    }

    if fi.Mode().IsRegular() && fi.Size() > 0 {
      paths <- p
    }

    return nil
  }

  return filepath.Walk(dir, visit)
}

func run(dir string) results {
  workers := 2 * runtime.GOMAXPROCS(0)
  paths := make(chan string)
  pairs := make(chan pair)
  done := make(chan bool)
  result := make(chan results)
  wg := new(sync.WaitGroup)

  for i := 0; i < workers; i++ {
    go processFiles(paths, pairs, done)
  }

  go collectHashes(pairs, result)

  // multi-threaded walk of the directory tree
  wg.Add(1)

  err := searchTree(dir, paths, wg)

  if err != nil {
    log.Fatal(err)
  }

  // wg.Wait() will block until all the directory listing work is done
  wg.Wait()
  close(paths)

  for i := 0; i < workers; i++ {
    <-done
  }

  close(pairs)
  return <-result
}

No Workers - Just Goroutine for Each File and Directory

Reproduced from https://www.youtube.com/watch?v=SPD7TykYy5w
func processFile(path string, pairs chan<- pair, wg *sync.WaitGroup, limits chan bool) {
  defer wg.Done()

  // writing to limits will block until another processFile goroutine finishes
  limits <- true

  // this is the point that the goroutine is finished (defered). reading from the channel will free up a slot for another goroutine
  defer func() {
    <-limits
  }()

  pairs <- hashFile(path)
}

func collectHashes(pairs <-chan pair, result chan<- results) {
  hashes := make(results)

  for p := range pairs {
    hashes[p.hash] = append(hashes[p.hash], p.path)
  }

  result <- hashes
}

func walkDir(dir string, pairs chan<- pair, wg *sync.WaitGroup, limits chan bool) error {
  defer wg.Done()

  visit := func(p string, fi os.FileInfo, err error) error {
    if err != nil && err != os.ErrNotExist {
      return err
    }

    // ignore dir itself to avoid an infinite loop!
    if fi.Mode().IsDir() && p != dir {
      wg.Add(1)
      go walkDir(p, pairs, wg, limits)
      return filepath.SkipDir
    }

    if fi.Mode().IsRegular() && fi.Size() > 0 {
      wg.Add(1)
      go processFile(p, pairs, wg, limits)
    }

    return nil
  }

  // again since this walkDir is also IO bound, we have this functionality to wait on the limits channel
  // until a slot opens
  limits <- true

  defer func() {
    <-limits
  }()

  return filepath.Walk(dir, visit)
}

func run(dir string) results {
  workers := 2 * runtime.GOMAXPROCS(0)
  limits := make(chan bool, workers)
  pairs := make(chan pair)
  result := make(chan results)
  wg := new(sync.WaitGroup)

  // we need another goroutine so we don't block here
  go collectHashes(pairs, result)

  // multi-threaded walk of the directory tree; we need a
  // waitGroup because we don't know how many to wait for
  wg.Add(1)

  err := walkDir(dir, pairs, wg, limits)

  if err != nil {
    log.Fatal(err)
  }

  // we must close the paths channel so the workers stop
  wg.Wait()

  // by closing pairs we signal that all the hashes
  // have been collected; we have to do it here AFTER
  // all the workers are done
  close(pairs)

  return <-result
}

func main() {
  if len(os.Args) < 2 {
    log.Fatal("Missing parameter, provide dir name!")
  }

  if hashes := run(os.Args[1]); hashes != nil {
    for hash, files := range hashes {
      if len(files) > 1 {
        // we will use just 7 chars like git
        fmt.Println(hash[len(hash)-7:], len(files))

        for _, file := range files {
          fmt.Println("  ", file)
        }
      }
    }
  }
}

Amdahl’s Law

Reproduced from https://www.youtube.com/watch?v=SPD7TykYy5w