Skip to content

Commit

Permalink
add recursive fetching of a href links found in content
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Haslik authored and Jan Haslik committed Sep 5, 2024
1 parent 037b254 commit 3e0060b
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 55 deletions.
166 changes: 112 additions & 54 deletions crawler/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@ import (
"errors"
"fmt"
"golang.org/x/net/html"
"io"
"net/http"
"net/url"
"strings"
"sync"
"web-crawler/data"
)

type document struct {
Url string `json:"url"`
Content string `json:"content"`
}

// Create a global map to track visited URLs and a mutex to ensure thread-safe access
var visitedUrls = make(map[string]bool)
var mu sync.Mutex

/**
* @brief Crawl multiple URLs concurrently using a specified number of workers.
*
Expand All @@ -28,50 +33,44 @@ func Crawl(urls []string, numWorkers int) error {
if len(urls) == 0 {
return errors.New("no urls provided")
}
urlChan := make(chan string, len(urls))

// Fill the channel with URLs
for _, url := range urls {
urlChan <- url
urlChan := make(chan string, len(urls)) // Channel for URLs to process
errChan := make(chan error, numWorkers) // Channel for errors

// Fill the channel with initial URLs
for _, u := range urls {
urlChan <- u
}
close(urlChan)

var wg sync.WaitGroup
wg.Add(numWorkers)

type crawlErr struct {
err error
url string
worker int
}
errChan := make(chan crawlErr, numWorkers)

// Start worker goroutines
for i := 1; i <= numWorkers; i++ {
for i := 0; i < numWorkers; i++ {
go func(workerID int) {
defer wg.Done()
for url := range urlChan {
fmt.Printf("Crawling %s\n", url)
err := fetch(url)
fmt.Printf("Worker %d crawling: %s\n", workerID, url)
err := fetch(url, urlChan)
if err != nil {
errChan <- crawlErr{err, url, workerID}
errChan <- fmt.Errorf("worker %d: %w", workerID, err)
} else {
fmt.Printf("Successfully Crawled %s and sent document to indexer server, channel worker: %d\n", url, workerID)
fmt.Printf("Worker %d successfully crawled: %s\n", workerID, url)
}
}
}(i)
}

// Close error channel after all workers are done
// Close the error channel after all workers are done
go func() {
wg.Wait()
close(errChan)
}()

// Handle errors from workers
for channel := range errChan {
if channel.err != nil {
fmt.Printf("Error fetching url: %s, channel worker: %d, error: %s\n", channel.url, channel.worker, channel.err)
for err := range errChan {
if err != nil {
fmt.Printf("Error: %s\n", err)
}
}

Expand All @@ -80,31 +79,38 @@ func Crawl(urls []string, numWorkers int) error {

/**
* @brief Fetches the content of a URL and sends it to the indexer server.
* Discovered links are passed to other workers through the urlChan.
*
* @param url The URL to fetch.
* @param urlChan The channel to send newly discovered URLs.
* @return error An error if any issue occurs during fetching or sending.
*/
func fetch(url string) error {
func fetch(url string, urlChan chan<- string) error {
// Ensure the URL has not been visited before
mu.Lock()
if visitedUrls[url] {
mu.Unlock()
fmt.Printf("Skipping already visited URL: %s\n", url)
return nil
}
visitedUrls[url] = true // Mark URL as visited
mu.Unlock()

res, err := http.Get(url)
if err != nil {
fmt.Println(err)
return err
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
fmt.Println("Error closing body:", err)
}
}(res.Body)
defer res.Body.Close()

// Parse the HTML content
tokenizer := html.NewTokenizer(res.Body)
content := ""
appendHTML := true

// Tokenize HTML content
for {
tokenType := tokenizer.Next()
switch tokenType {

case html.ErrorToken:
content = ExtractStrings(content)
content = CleanContent(content)
Expand All @@ -118,39 +124,62 @@ func fetch(url string) error {
if err != nil {
return err
}

req, err := http.NewRequest("POST", "http://localhost:7001/index", bytes.NewBuffer(docBytes))
if err != nil {
return err
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Host", "localhost:7002")

fmt.Println("Sending request to indexer")

client := &http.Client{}
res, err := client.Do(req)
if err != nil {
fmt.Printf("Failed to request indexer server: %s\n", err)
return err
return fmt.Errorf("failed to request indexer server: %w", err)
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
fmt.Println("Error closing body:", err)
}
}(res.Body)
defer res.Body.Close()

// Check the response status code
if res.StatusCode != http.StatusOK {
fmt.Printf("Error: Status %s\n", res.Status)
return errors.New("non-OK HTTP status")
return fmt.Errorf("non-OK HTTP status: %s", res.Status)
}
return nil

case html.StartTagToken, html.SelfClosingTagToken:
token := tokenizer.Token()
tagName := token.Data

if tagName == "a" {
for _, attr := range token.Attr {
if attr.Key == "href" {
href := attr.Val
resolvedUrl, err := resolveUrl(href, url)
if err != nil {
fmt.Printf("Error resolving URL: %s\n", err)
continue
}
if resolvedUrl == "" {
continue
}

conn, err := data.Connect()

if err != nil {
fmt.Printf("Error connecting to database: %s\n", err)
}

err = conn.InsertUrl(resolvedUrl)

if err != nil {
fmt.Printf("Error inserting new URL %s: %s\n", resolvedUrl, err)
}

// Send discovered URL to the channel for workers to process
go func() {
urlChan <- resolvedUrl
}()
}
}
}

if tagName == "script" || tagName == "style" {
appendHTML = false
} else {
Expand All @@ -167,10 +196,44 @@ func fetch(url string) error {
}

/**
* @brief Cleans the content by removing blacklisted characters and turning it into lowercase.
* @brief Resolves a relative or protocol-less URL to an absolute URL.
*
* @param content The raw content to clean.
* @return string The cleaned content.
* @param href The URL found in the <a> tag.
* @param baseUrl The base URL of the current page.
* @return string The resolved absolute URL.
*/
func resolveUrl(href string, baseUrl string) (string, error) {
if strings.HasPrefix(href, "#") {
return "", nil
}

baseParsed, err := url.Parse(baseUrl)
if err != nil {
return "", err
}

hrefParsed, err := url.Parse(href)
if err != nil {
return "", err
}

// If the URL is relative, resolve it against the base URL
if !hrefParsed.IsAbs() {
//return baseParsed.ResolveReference(hrefParsed).String(), nil
// for now skip relative urls
return "", nil
}

// If the URL starts with "//", add the scheme from the base URL (e.g., https:)
if strings.HasPrefix(href, "//") {
return baseParsed.Scheme + ":" + href, nil
}

return href, nil
}

/**
* @brief Cleans the content by removing blacklisted characters and turning it into lowercase.
*/
func CleanContent(content string) string {
blacklist := map[string]bool{"and": true, ";": true, ":": true, ".": true, "{": true, "}": true, "[": true, "]": true, "\\": true, "%": true, "$": true, ",": true, "'": true, "<": true, ">": true, "!": true, "\"": true, "with": true, "or": true, "-": true}
Expand All @@ -194,20 +257,15 @@ func CleanContent(content string) string {

/**
* @brief Extracts strings by removing unnecessary whitespace characters from the content.
*
* @param content The raw content to extract strings from.
* @return string The extracted content.
*/
func ExtractStrings(content string) string {
lines := strings.Split(content, "\n")
var result []string

for _, line := range lines {
trimmed := strings.TrimSpace(line)
if trimmed != "" {
result = append(result, trimmed)
}
}

return strings.Join(result, " ")
}
15 changes: 15 additions & 0 deletions crawler/data/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,18 @@ func (collection *COLLECTION) GetWebsiteUrls() ([]string, error) {

return websitesUrls, nil
}

func (collection *COLLECTION) InsertUrl(url string) error {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

doc := bson.D{{Key: "url", Value: url}}

_, err := collection.collection.InsertOne(ctx, doc)

if err != nil {
return err
}

return nil
}
2 changes: 1 addition & 1 deletion crawler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
*/
func main() {
// Create a ticker to trigger the crawling process every 5 minutes
ticker := time.NewTicker(time.Minute * 5)
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()

var wg sync.WaitGroup
Expand Down

0 comments on commit 3e0060b

Please sign in to comment.