| 1 | require 'federation/connection_pool' |
|---|
| 2 | |
|---|
| 3 | # Manages the federation of datasources: distributes queries to right |
|---|
| 4 | # datasources and merges their results |
|---|
| 5 | |
|---|
| 6 | class FederationManager |
|---|
| 7 | # add triple s,p,o to the currently selected write-adapter |
|---|
| 8 | def FederationManager.add(s,p,o) |
|---|
| 9 | # TODO: allow addition of full graphs |
|---|
| 10 | raise ActiveRdfError, "cannot write without a write-adapter" unless ConnectionPool.write_adapter |
|---|
| 11 | ConnectionPool.write_adapter.add(s,p,o) |
|---|
| 12 | end |
|---|
| 13 | |
|---|
| 14 | # delete triple s,p,o from the currently selected write adapter (s and p are |
|---|
| 15 | # mandatory, o is optional, symbols are interpreted as wildcards) |
|---|
| 16 | def FederationManager.delete(s,p,o=:all) |
|---|
| 17 | raise ActiveRdfError, "cannot write without a write-adapter" unless ConnectionPool.write_adapter |
|---|
| 18 | ConnectionPool.write_adapter.delete(s,p,o) |
|---|
| 19 | end |
|---|
| 20 | # enable or disable a speficic adapter. This method also reset all caches. |
|---|
| 21 | def FederationManager.enable(adapter,status) |
|---|
| 22 | adapter.enabled=status |
|---|
| 23 | adapter.reset_cache() |
|---|
| 24 | RDFS::Resource.reset_cache() |
|---|
| 25 | |
|---|
| 26 | end |
|---|
| 27 | |
|---|
| 28 | # executes read-only queries |
|---|
| 29 | # by distributing query over complete read-pool |
|---|
| 30 | # and aggregating the results |
|---|
| 31 | def FederationManager.query(q, options={:flatten => true}) |
|---|
| 32 | |
|---|
| 33 | if ConnectionPool.read_adapters.empty? |
|---|
| 34 | raise ActiveRdfError, "cannot execute query without data sources" |
|---|
| 35 | end |
|---|
| 36 | |
|---|
| 37 | # ask each adapter for query results |
|---|
| 38 | # and yield them consequtively |
|---|
| 39 | if block_given? |
|---|
| 40 | ConnectionPool.read_adapters.each do |source| |
|---|
| 41 | |
|---|
| 42 | source.query(q) do |*clauses| |
|---|
| 43 | yield(*clauses) |
|---|
| 44 | end |
|---|
| 45 | end |
|---|
| 46 | else |
|---|
| 47 | # build Array of results from all sources |
|---|
| 48 | # TODO: write test for sebastian's select problem |
|---|
| 49 | # (without distinct, should get duplicates, they |
|---|
| 50 | # were filtered out when doing results.union) |
|---|
| 51 | results = [] |
|---|
| 52 | ConnectionPool.read_adapters.each do |source| |
|---|
| 53 | |
|---|
| 54 | source_results = source.query(q) |
|---|
| 55 | source_results.each do |clauses| |
|---|
| 56 | results << clauses |
|---|
| 57 | end |
|---|
| 58 | end |
|---|
| 59 | |
|---|
| 60 | # filter the empty results |
|---|
| 61 | results.reject {|ary| ary.empty? } |
|---|
| 62 | |
|---|
| 63 | # remove duplicate results from multiple |
|---|
| 64 | # adapters if asked for distinct query |
|---|
| 65 | # (adapters return only distinct results, |
|---|
| 66 | # but they cannot check duplicates against each other) |
|---|
| 67 | results.uniq! if q.distinct? |
|---|
| 68 | |
|---|
| 69 | # flatten results array if only one select clause |
|---|
| 70 | # to prevent unnecessarily nested array [[eyal],[renaud],...] |
|---|
| 71 | results.flatten! if q.select_clauses.size == 1 or q.ask? |
|---|
| 72 | |
|---|
| 73 | # remove array (return single value or nil) if asked to |
|---|
| 74 | if options[:flatten] or q.count? |
|---|
| 75 | case results.size |
|---|
| 76 | when 0 |
|---|
| 77 | results = nil |
|---|
| 78 | when 1 |
|---|
| 79 | results = results.first |
|---|
| 80 | end |
|---|
| 81 | end |
|---|
| 82 | end |
|---|
| 83 | results |
|---|
| 84 | end |
|---|
| 85 | end |
|---|