AbstractSieve.NewFlowReceiver<ByteArrayList>
, it.unimi.dsi.jai4j.JobListener<BubingJob>
public class Frontier extends java.lang.Object implements it.unimi.dsi.jai4j.JobListener<BubingJob>, AbstractSieve.NewFlowReceiver<ByteArrayList>
VisitState
first.
Every BUbiNG agent contains an instance of this class, which is responsible for starting and orchestrating the mutual interaction of many different elements:
VisitState
s to be
visited next; Distributor
, that fills the workbench, possibly virtualizing part
of it through a WorkbenchVirtualizer
; TodoThread
and the
DoneThread
, that move around VisitState
s during the crawl; DNSThread
, FetchingThread
and
ParsingThread
. URLs in BUbiNG belong to one of four states:
readyURLs
queue, which happens only
the first time for every given URL);
RuntimeConfiguration.maxUrlsPerSchemeAuthority
) forbid it;
Note that waiting URLs that happen to be ready or visited will never come out of the sieve: it is the logic of the sieve itself that inhibits the same object to be emitted twice.
All ready URLs are initially stored in a ByteArrayDiskQueue
called readyURLs
,
from which they are moved to the FIFO queue of their VisitState
by the
Distributor
. Inside a VisitState
, we only store a byte-array represention of the
path+query of ready URLs. Some of them may be stored outside of the visit state, through
virtualization. Keeping path+queries in byte-array form in all
components reduces enormously object creation, and provides a simple form of compression by
prefix omission.
URLs are enqueued to the frontier either because
they are part of the visit seed, or because a ParsingThread
has found them, or because
they have been submitted using JAI4J (this includes both
manual submission and URLs sent by other agents). The method enqueue(ByteArrayList)
will
first check whether we have stored already too many URLs for the URL scheme+authority, in which
case the URL is discarded; then, it will check whether the URL already appears in the URL cache
(also in this case, the URL is discarded); finally it will check whether there is another agent
responsible for the URL, and in this case the URL will be sent to the appropriate agent using the
JAI4J infrastructure. If all checks have passed, the URL will be put in the sieve.
The URL might never be dequeued from the sieve (because we have seen it already), but if it
is, it will become ready by going through the append(long, ByteArrayList)
method, which
will again check that we do not have too many URLs for the URL authority, and then will enqueue
the URL to the disk queue of ready URLs.
Note that we expect that the vast majority of URLs will be of the first kind. This is very important, as there is much less contention on visit state locks than on the frontier lock.
Modifier and Type | Class | Description |
---|---|---|
static class |
Frontier.PropertyKeys |
Names of the scalar fields saved by
snap() . |
Modifier and Type | Field | Description |
---|---|---|
protected Agent |
agent |
The agent that created this frontier.
|
java.util.concurrent.atomic.AtomicLong[] |
archetypesStatus |
In position i, with 0 < i <6, the number of pages stored (does
not include duplicates) having status ixx.
|
protected double |
averageSpeed |
The average speeds of all visit states.
|
java.util.concurrent.atomic.AtomicLong |
brokenVisitStates |
The number of broken visit states.
|
static it.unimi.dsi.sux4j.mph.AbstractHashFunction<byte[]> |
BYTE_ARRAY_HASHING_STRATEGY |
A hash function using
MurmurHash3 . |
static it.unimi.dsi.sux4j.mph.AbstractHashFunction<ByteArrayList> |
BYTE_ARRAY_LIST_HASHING_STRATEGY |
A hash function using
MurmurHash3 . |
SummaryStats |
contentLength |
Statistic about the content length of each archetype
|
java.util.concurrent.atomic.AtomicLong |
contentTypeApplication |
Number of archetypes whose indicated content type starts with application (case insensitive)
|
java.util.concurrent.atomic.AtomicLong |
contentTypeImage |
Number of archetypes whose indicated content type starts with image (case insensitive)
|
java.util.concurrent.atomic.AtomicLong |
contentTypeOthers |
Number of archetypes whose indicated content type does not start with text, image, or
application (case insensitive)
|
java.util.concurrent.atomic.AtomicLong |
contentTypeText |
Number of archetypes whose indicated content type starts with text (case insensitive)
|
org.apache.http.client.config.RequestConfig |
defaultRequestConfig |
The default configuration for a non-
robots.txt request. |
BloomFilter<java.lang.Void> |
digests |
A Bloom filter storing page digests for duplicate detection.
|
protected Distributor |
distributor |
The thread constantly moving ready URLs into the workbench.
|
protected ObjectArrayList<DNSThread> |
dnsThreads |
The threads resolving DNS for new visit states.
|
LockFreeQueue<VisitState> |
done |
A lock-free list of visit states ready to be released; it is filled by
fetching threads and emptied by the
DoneThread . |
java.util.concurrent.atomic.AtomicLong |
duplicates |
The number of duplicate pages.
|
SummaryStats |
externalOutdegree |
Statistics about the number of out-links of each archetype, without considering the links to
the same corresponding host
|
java.util.concurrent.atomic.AtomicLong |
fetchedResources |
The number of fetched resources (updated by
ParsingThread instances). |
java.util.concurrent.atomic.AtomicLong |
fetchedRobots |
The number of fetched
robots.txt files (updated by ParsingThread
instances). |
java.util.concurrent.atomic.AtomicLong |
fetchingThreadWaitingTimeSum |
The sum of the waiting time of the waiting fetching threads: every time a fetching thread
waits this sum is updated; every time the statistics are printed this value is reset.
|
java.util.concurrent.atomic.AtomicLong |
fetchingThreadWaits |
The number of waits performed by fetching threads; every time the statistics are printed this
value is reset.
|
static long |
FRONT_INCREASE |
The increase of the front size used by
updateRequestedFrontSize() . |
static java.net.InetAddress[] |
LOOPBACK |
The loopback address, cached.
|
static long |
MIN_FLUSH_INTERVAL |
The minimum number of milliseconds between two flushes.
|
java.util.concurrent.LinkedBlockingQueue<VisitState> |
newVisitStates |
|
protected long |
nextFlush |
The time at which the next flush can happen.
|
java.util.concurrent.atomic.AtomicLong |
numberOfReceivedURLs |
The overall number of URLs sent by other agents.
|
SummaryStats |
outdegree |
Statistics about the number of out-links of each archetype
|
ObjectArrayList<ParsingThread> |
parsingThreads |
The parsing threads.
|
java.util.concurrent.atomic.AtomicLong |
pathQueriesInQueues |
The overall number of path+queries stored in
VisitState queues. |
java.util.concurrent.ArrayBlockingQueue<ByteArrayList> |
quickReceivedURLs |
A queue to quickly buffer URLs communicated by
receive(BubingJob) . |
RuntimeConfiguration |
rc |
The runtime configuration.
|
static int |
READY_URLS_BUFFER_SIZE |
The size of the buffer used for
readyURLs . |
ByteArrayDiskQueue |
readyURLs |
A queue to store URLs coming out of the
sieve . |
ByteArrayDiskQueue |
receivedURLs |
A queue to buffer in the long run URLs communicated by
receive(BubingJob) . |
protected LockFreeQueue<VisitState> |
refill |
A queue of visit states ready to be reilled; it is filled by fetching
threads and emptied by the
Distributor . |
java.util.concurrent.atomic.AtomicLong |
requiredFrontSize |
The current estimation for the size of the front in IP addresses.
|
LockFreeQueue<FetchData> |
results |
A lock-free list of
FetchData to be parsed; it is filled by
fetching threads and emptied by the parsing threads. |
org.apache.http.client.config.RequestConfig |
robotsRequestConfig |
The default configuration for a
robots.txt request. |
ParallelBufferedWarcWriter |
robotsWarcParallelOutputStream |
The Warc file where to write (if so requested) the downloaded
robots.txt files. |
ConcurrentCountingMap |
schemeAuthority2Count |
A synchronized, highly concurrent map from scheme+authorities to number of stored URLs.
|
AbstractSieve<ByteArrayList,java.lang.Void> |
sieve |
An instance of a
MercatorSieve . |
java.util.concurrent.atomic.AtomicLongArray |
speedDist |
The logarithmically binned statistics of download speed in bits/s.
|
protected Store |
store |
The store.
|
LockFreeQueue<VisitState> |
todo |
A lock-free list of visit states ready to be visited; it is filled by the
TodoThread
and emptied by the fetching threads. |
java.util.concurrent.atomic.AtomicLong |
transferredBytes |
The overall number of transferred bytes.
|
java.util.concurrent.DelayQueue<VisitState> |
unknownHosts |
The queue of unknown hosts.
|
FastApproximateByteArrayCache |
urlCache |
The URL cache.
|
protected WorkbenchVirtualizer |
virtualizer |
The workbench virtualizer used by this frontier.
|
java.util.concurrent.atomic.AtomicLong |
weightOfpathQueriesInQueues |
The overall memory (in bytes) used by path+queries
stored in
VisitState queues. |
Workbench |
workbench |
The workbench.
|
long |
workbenchSizeInPathQueries |
An estimation of the number of path+query objects that the workbench can store.
|
Constructor | Description |
---|---|
Frontier(RuntimeConfiguration rc,
Store store,
Agent agent) |
Creates the frontier.
|
Modifier and Type | Method | Description |
---|---|---|
void |
append(long hash,
ByteArrayList list) |
A new key is appended.
|
long |
archetypes() |
The number of pages stored (does not include duplicates).
|
void |
close() |
Closes the frontier: threads are stopped (if necessary, aborted), sieve and store and robots
stream are closed.
|
void |
dnsThreads(int newDnsThreads) |
Changes the number of DNS threads.
|
void |
enqueue(ByteArrayList url) |
Enqueues a URL to the BUbiNG crawl.
|
void |
enqueueLocal(ByteArrayList url) |
Enqueues a local URL represented by a byte array to the crawl of this agent.
|
void |
fetchingThreads(int numFetchingThreads) |
Changes the number of fetching threads.
|
void |
finishedAppending() |
The new flow of keys is over.
|
StatsThread |
getStatsThread() |
Returns the
StatsThread . |
void |
noMoreAppend() |
There will be no more new flows (because the sieve that is calling
this method was closed).
|
void |
parsingThreads(int newParsingThreads) |
Changes the number of parsing threads.
|
void |
prepareToAppend() |
A new flow of keys is ready and will start being appended.
|
void |
receive(BubingJob job) |
|
void |
resetFetchingThreadsWaitingStats() |
Resets the statistics relative to the wait time of
FetchingThread s. |
void |
restore() |
Restores data from the given directory.
|
void |
snap() |
Snaps fields to files in the given directory.
|
void |
updateFetchingThreadsWaitingStats(long waitTime) |
Updates the statistics relative to the wait time of
FetchingThread s. |
void |
updateRequestedFrontSize() |
Update, if necessary, the
requiredFrontSize . |
boolean |
workbenchIsFull() |
Returns whether the workbench is full.
|
public static final int READY_URLS_BUFFER_SIZE
readyURLs
.public static final java.net.InetAddress[] LOOPBACK
public static final long MIN_FLUSH_INTERVAL
public static final long FRONT_INCREASE
updateRequestedFrontSize()
.public static final it.unimi.dsi.sux4j.mph.AbstractHashFunction<byte[]> BYTE_ARRAY_HASHING_STRATEGY
MurmurHash3
.public static final it.unimi.dsi.sux4j.mph.AbstractHashFunction<ByteArrayList> BYTE_ARRAY_LIST_HASHING_STRATEGY
MurmurHash3
.protected final Store store
protected final Agent agent
public final RuntimeConfiguration rc
public AbstractSieve<ByteArrayList,java.lang.Void> sieve
MercatorSieve
.public ByteArrayDiskQueue readyURLs
sieve
.public java.util.concurrent.ArrayBlockingQueue<ByteArrayList> quickReceivedURLs
receive(BubingJob)
.public ByteArrayDiskQueue receivedURLs
receive(BubingJob)
.public final ObjectArrayList<ParsingThread> parsingThreads
public final ParallelBufferedWarcWriter robotsWarcParallelOutputStream
robots.txt
files.public final java.util.concurrent.atomic.AtomicLong pathQueriesInQueues
VisitState
queues.public final java.util.concurrent.atomic.AtomicLong weightOfpathQueriesInQueues
VisitState
queues.public final java.util.concurrent.atomic.AtomicLong brokenVisitStates
public final java.util.concurrent.atomic.AtomicLong numberOfReceivedURLs
protected volatile long nextFlush
public final Workbench workbench
public final java.util.concurrent.DelayQueue<VisitState> unknownHosts
public final java.util.concurrent.LinkedBlockingQueue<VisitState> newVisitStates
public BloomFilter<java.lang.Void> digests
protected final ObjectArrayList<DNSThread> dnsThreads
protected final Distributor distributor
public final FastApproximateByteArrayCache urlCache
protected final WorkbenchVirtualizer virtualizer
public final LockFreeQueue<VisitState> todo
TodoThread
and emptied by the fetching threads.public final LockFreeQueue<VisitState> done
DoneThread
.protected final LockFreeQueue<VisitState> refill
Distributor
.public final java.util.concurrent.atomic.AtomicLong requiredFrontSize
FetchingThread
has to wait to retrieve a VisitState
from the
todo
queue. It is never more than half the number of path+queries that the workbench can hold.public final java.util.concurrent.atomic.AtomicLong fetchingThreadWaits
public final java.util.concurrent.atomic.AtomicLong fetchingThreadWaitingTimeSum
public final LockFreeQueue<FetchData> results
FetchData
to be parsed; it is filled by
fetching threads and emptied by the parsing threads.public final java.util.concurrent.atomic.AtomicLong[] archetypesStatus
public final SummaryStats outdegree
public final SummaryStats externalOutdegree
public final SummaryStats contentLength
public final java.util.concurrent.atomic.AtomicLong contentTypeText
public final java.util.concurrent.atomic.AtomicLong contentTypeImage
public final java.util.concurrent.atomic.AtomicLong contentTypeApplication
public final java.util.concurrent.atomic.AtomicLong contentTypeOthers
public final java.util.concurrent.atomic.AtomicLong duplicates
public final java.util.concurrent.atomic.AtomicLong fetchedResources
ParsingThread
instances).public final java.util.concurrent.atomic.AtomicLong fetchedRobots
robots.txt
files (updated by ParsingThread
instances).public final java.util.concurrent.atomic.AtomicLong transferredBytes
public ConcurrentCountingMap schemeAuthority2Count
public final java.util.concurrent.atomic.AtomicLongArray speedDist
protected double averageSpeed
public volatile long workbenchSizeInPathQueries
public final org.apache.http.client.config.RequestConfig defaultRequestConfig
robots.txt
request.public final org.apache.http.client.config.RequestConfig robotsRequestConfig
robots.txt
request.public Frontier(RuntimeConfiguration rc, Store store, Agent agent) throws java.io.IOException, java.lang.IllegalArgumentException, ConfigurationException, java.lang.ClassNotFoundException, java.lang.InterruptedException
rc
- the configuration to be used to set all parameters.store
- the place where fetched pages will be stored.agent
- the BUbiNG agent possessing this frontier.java.io.IOException
java.lang.IllegalArgumentException
ConfigurationException
java.lang.ClassNotFoundException
java.lang.InterruptedException
public void dnsThreads(int newDnsThreads) throws java.lang.IllegalArgumentException
Note that when the number of thread is reduced, the stopped thread will actually terminate
their execution as soon as they check the DNSThread.stop
field.
newDnsThreads
- the new number of threads.java.lang.IllegalArgumentException
public void fetchingThreads(int numFetchingThreads) throws java.lang.IllegalArgumentException, java.security.NoSuchAlgorithmException, java.io.IOException
Note that when the number of thread is reduced, the stopped thread will actually terminate
their execution as soon as they check the ParsingThread.stop
field.
numFetchingThreads
- the new number of threads.java.lang.IllegalArgumentException
java.security.NoSuchAlgorithmException
java.io.IOException
public void parsingThreads(int newParsingThreads) throws java.lang.IllegalArgumentException
Note that when the number of thread is reduced, the stopped thread will actually terminate
their execution as soon as they check the ParsingThread.stop
field.
newParsingThreads
- the new number of threads.java.lang.IllegalArgumentException
public void enqueue(ByteArrayList url) throws java.io.IOException, java.lang.InterruptedException
Before enqueueing the URL to the sieve we perform a number of checks:
The difference between this method and enqueueLocal(ByteArrayList)
is that the
latter does not check whether the argument is
local.
url
- a BUbiNG URL to be enqueued to the BUbiNG crawl.java.lang.InterruptedException
- from AbstractSieve.enqueue(Object, Object)
.java.io.IOException
public boolean workbenchIsFull()
public void enqueueLocal(ByteArrayList url) throws java.io.IOException, java.lang.InterruptedException
Before enqueueing the URL to the sieve we perform a number of checks:
The difference between this method and enqueue(ByteArrayList)
is that the latter
checks whether the argument is local.
url
- a BUbiNG URL to be enqueued to the BUbiNG crawl, in byte-array representation.java.lang.InterruptedException
- from AbstractSieve.enqueue(Object, Object)
.java.io.IOException
public void receive(BubingJob job)
receive
in interface it.unimi.dsi.jai4j.JobListener<BubingJob>
public void close() throws java.io.IOException, java.lang.InterruptedException
java.io.IOException
java.lang.InterruptedException
public void prepareToAppend() throws java.io.IOException
AbstractSieve.NewFlowReceiver
prepareToAppend
in interface AbstractSieve.NewFlowReceiver<ByteArrayList>
java.io.IOException
public void append(long hash, ByteArrayList list) throws java.io.IOException
AbstractSieve.NewFlowReceiver
append
in interface AbstractSieve.NewFlowReceiver<ByteArrayList>
hash
- the key hash.list
- the key itself.java.io.IOException
public void finishedAppending() throws java.io.IOException
AbstractSieve.NewFlowReceiver
finishedAppending
in interface AbstractSieve.NewFlowReceiver<ByteArrayList>
java.io.IOException
public void noMoreAppend() throws java.io.IOException
AbstractSieve.NewFlowReceiver
noMoreAppend
in interface AbstractSieve.NewFlowReceiver<ByteArrayList>
java.io.IOException
public void updateRequestedFrontSize()
requiredFrontSize
. The current front size is the number of
visit states present in the workbench and in the todo
queue. If this quantity is
larged than the currently-required front size, the latter is
increase by FRONT_INCREASE
, although it will never be set to a value larger than
half of the workbench (two queries per visit state).public void updateFetchingThreadsWaitingStats(long waitTime)
FetchingThread
s.waitTime
- the length of a new pause by one of the fetching threads.public void resetFetchingThreadsWaitingStats()
FetchingThread
s.public void snap() throws ConfigurationException, java.lang.IllegalArgumentException, java.io.IOException
frontier.data
. Other fields are written each in a file of its
own, named with the name of the field.ConfigurationException
java.lang.IllegalArgumentException
java.io.IOException
public void restore() throws ConfigurationException, java.lang.IllegalArgumentException, java.io.IOException, java.lang.ClassNotFoundException, java.lang.InterruptedException
java.lang.InterruptedException
ConfigurationException
java.lang.IllegalArgumentException
java.io.IOException
java.lang.ClassNotFoundException
snap()
public StatsThread getStatsThread()
StatsThread
.public long archetypes()