ストリーム・データ処理は通常はデータベースの処理と同様にインタプリタ的に実行される. しかし,ここではクエリの実行のしくみをみるために,あえてそれを Perl のプログラム (“コンパイル・コード”) でシミュレートしてみる. とりあげる例題は STREAM (Stanford Stream Data Manager) や ATLaS (Aggregate & Table Language and System) でも使用されているオンライン・オークションである. なお,このページでは非構造的な Perl に翻訳するやりかたをしめす. 「ストリーム・データ処理によるオンライン・オークションのシミュレーション (構造化版)」 において,構造化されたプログラムをしめす.
目次
- はじめに
- 使用するデータ
- 両替計算
- 特定のオークションを選択するクエリ
- Local Item Suggestion Query
- 開催中のオークションに関するクエリ
- 終了時価格に関するクエリ
- 終了時価格のカテゴリーごとの平均値に関するクエリ
- その他
はじめに
“Stream Query Repository” というサイトには,CQL (Continuous Query Language) の文とその実行例がいろいろ書かれている. そこにあるをすこしかえて,Perl に翻訳してみることにする. このページのなかにもひととおりのプログラムを収容するが,全部をつないだ ソースコード もある. (また,このコードより 構造化されているがアドホックなソースコード もある.)
使用するデータ
“Stream Query Repository: Online Auctions” でつかわれているデータを一部だけ変更したものをしめす.
関係
ここであつかういくつかの関係 (relation) は配列によって表現する. オークションにおいてあつかわれる品目は Item という関係の要素 (record または tuple) とみなすことができる. 品目は識別子 (id),なまえ (name),種類の識別子 (categoryID),登録時刻 (registrationTime) などの属性をもつ. 以下のテスト・データは NEXMark ペンチマークのテスト・データのごく一部の部分を変更して使用している.
my @Item = ({id => 0, name => 'Item 1', description => '', categoryID => 1, registrationTime => 0}, {id => 1, name => 'Item 2', description => '', categoryID => 2, registrationTime => 0}, {id => 2, name => 'Item 3', description => '', categoryID => 2, registrationTime => 0}, {id => 3, name => 'Item 4', description => '', categoryID => 1, registrationTime => 0});
もとのベンチマークであつかわれている多数の品目のうち,ここでは 4 品目だけをあつかっている.
また,売り手,買い手や入札者はひとであり,それは Person という関係の要素だとかんがえることができる. 各人はなまえ,E-メイル・アドレス,国,市などの属性をもつが,国,市などは不明なばあいもある.
my @Person = ({id => 0, name => 'Luitpold Martucci', emailAddress => 'Martucci@toronto.edu', city => '', country => ''}, {id => 1, name => 'Takakazu Schonegge', emailAddress => 'Schonegge@panasonic.com', city => 'Gothenburg', country => 'United States'}, {id => 9, name => 'Leonid Nour', emailAddress => 'Nour@llnl.gov', city => '', country => ''}, {id => 11, name => 'Ravindra Abdelmoty', emailAddress => 'Abdelmoty@whizbang.com', city => 'Manchester', country => 'United States'}, {id => 13, name => 'Annette Klaiber', emailAddress => 'Klaiber@propel.com', city => '', country => ''}, {id => 14, name => 'Jacinto Ceccarelli', emailAddress => 'Ceccarelli@unbc.ca', city => 'Corpus', country => 'United States'}, {id => 15, name => 'Janick Blaauw', emailAddress => 'Blaauw@ogi.edu', city => '', country => ''}, {id => 16, name => 'Warwich Marsiglia', emailAddress => 'Marsiglia@cmu.edu', city => 'South', country => 'United States'}, {id => 18, name => 'Justus Binkley', emailAddress => 'Binkley@ogi.edu', city => '', country => ''}, {id => 19, name => 'Kunsoo Raghavendran', emailAddress => 'Raghavendran@umass.edu', city => 'Paris', country => 'Tuvalu'}, {id => 20, name => 'Takuji Liedekerke', emailAddress => 'Liedekerke@cnr.it', city => '', country => ''}, {id => 22, name => 'Shiyi Polster', emailAddress => 'Polster@ucd.ie', city => '', country => ''}, {id => 30, name => 'Keiko Nastansky', emailAddress => 'Nastansky@co.jp', city => '', country => ''}, {id => 31, name => 'Keumog Vuskovic', emailAddress => 'Vuskovic@filemaker.com', city => 'Cancun', country => 'Tajikistan'}, {id => 33, name => 'Armand Impagliazzo', emailAddress => 'Impagliazzo@cwi.nl', city => 'Oakland', country => 'Heard and Mcdonald Island'}, {id => 36, name => 'Zoe Holmback', emailAddress => 'Holmback@ucsb.edu', city => '', country => ''}, {id => 39, name => 'Rajamani Pinzani', emailAddress => 'Pinzani@mitre.org', city => 'Gothenburg', country => 'United States'}, {id => 44, name => 'Jianying Parikh', emailAddress => 'Parikh@sleepycat.com', city => 'Torreon', country => 'United States'}, {id => 45, name => 'Edmond Rajcani', emailAddress => 'Rajcani@okcu.edu', city => '', country => ''}, {id => 53, name => 'Guijun Cosette', emailAddress => 'Cosette@unl.edu', city => '', country => ''}, {id => 55, name => 'Fabio Denna', emailAddress => 'Denna@cmu.edu', city => 'Wilmington', country => 'Australia'});
関係の内容を印刷するためのサブルーティン print_result() を用意する. その定義はつぎのとおりである.
sub print_result(\@$) { my ($result, $keys) = @_; foreach my $key (@$keys) { print "\t$key"; } print "\n\t----------------\n"; foreach my $record (@$result) { foreach my $key (@$keys) { print "\t$record->{$key}"; } print "\n"; } }
ストリーム
ストリームの要素はこのシミュレーションにおいてはあらかじめ配列にいれ,必要なときにとりだすことにする. オークションの開始をあらわすストリーム OpenAuction の要素を配列 @OpenAuction にいれる.
my @OpenAuction = ({itemID => 3, sellerID => 12, start_price => 80, timestamp => 3100}, {itemID => 2, sellerID => 39, start_price => 110, timestamp => 3150}, {itemID => 1, sellerID => 18, start_price => 150, timestamp => 3300}, {itemID => 0, sellerID => 30, start_price => 30, timestamp => 3600});
また,オークションの終了をあらわすストリーム ClosedAuction の要素を配列 @ClosedAuction にいれる.
my @ClosedAuction = ({itemID => 3, byerID => 31, timestamp => 5400}, {itemID => 2, byerID => 16, timestamp => 6500}, {itemID => 1, byerID => 55, timestamp => 7050}, {itemID => 0, byerID => 30, timestamp => 7200});
競りにおけるビッドのストリーム Bid の要素を配列 @Bid にいれる.
my @Bid = ({itemID => 3, bidPrice => 82, bidderID => 22, timestamp => 3105}, {itemID => 2, bidPrice => 115, bidderID => 19, timestamp => 3175}, {itemID => 2, bidPrice => 120, bidderID => 39, timestamp => 3196}, {itemID => 1, bidPrice => 155, bidderID => 45, timestamp => 3306}, {itemID => 3, bidPrice => 103, bidderID => 0, timestamp => 3380}, {itemID => 2, bidPrice => 122, bidderID => 38, timestamp => 3497}, {itemID => 0, bidPrice => 33, bidderID => 41, timestamp => 3662}, {itemID => 3, bidPrice => 130, bidderID => 19, timestamp => 3663}, {itemID => 1, bidPrice => 171, bidderID => 11, timestamp => 3852}, {itemID => 1, bidPrice => 181, bidderID => 9, timestamp => 4395}, {itemID => 1, bidPrice => 201, bidderID => 20, timestamp => 4396}, {itemID => 3, bidPrice => 148, bidderID => 33, timestamp => 4573}, {itemID => 3, bidPrice => 152, bidderID => 15, timestamp => 4574}, {itemID => 3, bidPrice => 166, bidderID => 44, timestamp => 4758}, {itemID => 3, bidPrice => 169, bidderID => 1, timestamp => 4972}, {itemID => 3, bidPrice => 191, bidderID => 53, timestamp => 5227}, {itemID => 3, bidPrice => 216, bidderID => 31, timestamp => 5360}, {itemID => 1, bidPrice => 226, bidderID => 33, timestamp => 5506}, {itemID => 1, bidPrice => 249, bidderID => 36, timestamp => 5536}, {itemID => 0, bidPrice => 43, bidderID => 18, timestamp => 6040}, {itemID => 1, bidPrice => 260, bidderID => 33, timestamp => 6041}, {itemID => 0, bidPrice => 53, bidderID => 9, timestamp => 6282}, {itemID => 0, bidPrice => 58, bidderID => 13, timestamp => 6391}, {itemID => 2, bidPrice => 126, bidderID => 16, timestamp => 6462}, {itemID => 1, bidPrice => 280, bidderID => 14, timestamp => 6570}, {itemID => 1, bidPrice => 302, bidderID => 55, timestamp => 6975}, {itemID => 0, bidPrice => 80, bidderID => 30, timestamp => 7171});
もとのデータにおいては同一のタイム・スタンプをもつビッドがふくまれていたが,プログラムにおけるあつかいを単純化するため,タイム・スタンプをずらして,かさならないようにしている.
時間の経過は $clock という変数によってシミュレートする. tick() がよびだされるごとに時刻が 1 (秒) すすむものとする (実用的にはもっとこまかい単位でタイムスタンプをつける必要があるが,ここではかんたんのために 1 秒単位としている). また,ここではかんたんのため,1 秒に 2 個ストリーム要素が到着することはないものとする.
my $clock = 0; sub tick () { return ++$clock; }
本来は外部から到着するはずの,配列にいれたストリームの要素は, instream_get() という関数によって 1 個ずつとりだして使用する. 初期設定のためにあらかじめ instream_open() をよびだすことにする. これらの関数の定義はつぎのとおりである.
sub instream_open(\@) { my ($array) = @_; my $stream = {array => $array, index => 0}; return $stream; } sub instream_get($) { my ($stream) = @_; my $array = $stream->{array}; my $next = $array->[$stream->{index}]; if ($stream->{index} >= @$array || $next->{timestamp} > $clock) { return ''; } else { $stream->{index}++; return $next; } }
instream_open() はストリームのハンドルをかえすが,これを instream_get() の第 1 引数として使用する.
また,外部に出力する (このシミュレーションにおいては印刷する) ストリームは outstream_open() によってひらき,outstream_put() によって出力する. outstream_open() の第 1 引数は本来はストリームを指定するが,現在は出力先を標準出力にかぎっているため使用していない. 第 2 引数は出力するフィールド名のリストを指定する. outstream_open() はストリームのハンドルをかえすが,それを outstream_put() の第 1 引数にわたす.
sub outstream_open($$) { my ($stream, $keys) = @_; foreach my $key (@$keys) { print "\t$key"; } print "\n"; return $keys; } sub outstream_put($$) { my ($stream, $record) = @_; my $keys = $stream; foreach my $key (@$keys) { print "\t$record->{$key}"; } print "\n"; }
両替計算
米ドルによるビッドにおける価格をユーロに変換する. ビッドがストリームとして入力されるので,結果もストリームとして出力する. CQL によるプログラムはつぎのとおりである.
stream EuroBid is Select itemID, DolToEuro(bidPrice) euroBidPrice, bidderID From Bid;
これを Perl に翻訳するとつぎのようになる.
sub DolToEuro($) { my ($dollar) = @_; return 0.71 * $dollar; } sub euroBid() { my $dolBid = instream_open(@Bid); my $euroBid = outstream_open('', ['itemID', 'euroBidPrice', 'bidderID']); for (my $t = 0; $t < $Max_timestamp; $t++) { tick(); my $record; if ($record = instream_get($dolBid)) { # From Bid outstream_put($euroBid, {itemID => $record->{itemID}, euroBidPrice => DolToEuro($record->{bidPrice}), bidderID => $record->{bidderID}}); # Select ... } } }
サブルーティン euroBid() は入力ストリームの要素を 1 個よむごとにその値を変換し,出力ストリームに出力する. すなわち,この処理は 1 本のパイプだけで実行することができる. euroBid() を実行すれば,つぎのような結果がえられる.
itemID euroBidPrice bidderID 3 58.22 22 2 81.65 19 2 85.2 39 1 110.05 45 3 73.13 0 2 86.62 38 0 23.43 41 3 92.3 19 1 121.41 11 1 128.51 9 1 142.71 20 3 105.08 33 3 107.92 15 3 117.86 44 3 119.99 1 3 135.61 53 3 153.36 31 1 160.46 33 1 176.79 36 0 30.53 18 1 184.6 33 0 37.63 9 0 41.18 13 2 89.46 16 1 198.8 14 1 214.42 55 0 56.8 30
特定のオークションを選択するクエリ
品目識別子が 1 または 2 のビッドだけを抽出するには,つぎのような CQL クエリを実行すればよい.
stream Bid_1_2 is Select itemID, bidPrice From Bid where itemID = 1 or itemID = 2;
これを Perl に翻訳するとつぎのようになる.
sub bid_1_2() { my $bid = instream_open(@Bid); my $selected = outstream_open('', ['itemID', 'bidPrice']); for (my $t = 0; $t < $Max_timestamp; $t++) { tick(); my $record; if ($record = instream_get($bid)) { if ($record->{itemID} == 1 || $record->{itemID} == 2) { outstream_put($selected, $record); } } } }
この処理も 1 本のパイプだけで実行することができる. bid_1_2() を実行すれば,つぎのような結果がえられる.
itemID bidPrice 2 115 2 120 1 155 2 122 1 171 1 181 1 201 1 226 1 249 1 260 2 126 1 280 1 302
[TBD]
Local Item Suggestion Query
米国の売り手が出品した品種が 2 の品目をすべて報告するには,つぎのクエリを実行すればよい.
stream Items_cat2_US is Select Istream(P.name, P.city, O.itemID) From OpenAuction [Now] O, Person P, Item I Where O.sellerID = P.id and P.country = 'United States' and O.itemID = I.id and I.categoryID = 2;
これを Perl に翻訳するとつぎのようになる.
sub items_cat2_US() { # Generate index to relation: id -> person my %personByID = (); foreach my $record (@Person) { $personByID{$record->{id}} = $record; # id is assumed to be unique } # print %{$personByID{39}}, "\n"; # Generate index to relation: id -> item my %itemByID = (); foreach my $record (@Item) { $itemByID{$record->{id}} = $record; # id is assumed to be unique } # print %{$itemByID{2}}, "\n"; my $is_auction = instream_open(@OpenAuction); my $selected = outstream_open('', ['name', 'city', 'itemID']); for (my $t = 0; $t < $Max_timestamp; $t++) { tick(); my $auction; if ($auction = instream_get($is_auction)) { my $person = $personByID{$auction->{sellerID}}; my $item = $itemByID{$auction->{itemID}}; if ($person->{country} eq 'United States' && $item->{categoryID} == 2) { outstream_put($selected, {name => $person->{name}, city => $person->{city}, itemID => $auction->{itemID}}); } } } }
関係 Person および Item のインデクス (primary index) を生成して使用している. いずれにおいても id が一意であることを仮定している. これらの関係の処理をのぞいたストリームの入力,処理,出力の処理に関しては,上記の 2 つの例題とおおきなちがいはない.
items_cat2_US() をよびだすと,結果はつぎのように表示される.
name city itemID Rajamani Pinzani Gothenburg 2
開催中のオークションに関するクエリ
現在開催中のオークションからなるテーブル (関係) を維持するには,つぎのようなクエリを使用すればよい.
temporal relation OpenAuctionItemID is Select itemID From OpenAuction [range .. now]; temporal relation ClosedAuctionItemID is Select itemID From ClosedAuction [range .. now]; temporal relation CurrentAuctions is OpenAuctionItemID Minus ClosedAuctionItemID;
これを Perl に翻訳するとつぎのようになる. このプログラムにおいては複数のパイプがくみあわされるので,このプログラムにおいてはパイプごとにラベルをつけ,そのあいだを goto 文によってつなぎあわせる (移動する) ようにしている.
sub currentAuctions() { my $openAuction = instream_open(@OpenAuction); my $closedAuction = instream_open(@ClosedAuction); my %closedAuction_itemID = (); my %minus_itemID = (); for (my $t = 0; $t < $Max_timestamp; $t++) { tick(); my $record; my $minus_input; # Dispatcher: if ($record = instream_get($openAuction)) { goto OpenAuctionItemID; } elsif ($record = instream_get($closedAuction)) { goto ClosedAuctionItemID; } goto Next; OpenAuctionItemID: { my $itemID = $record->{itemID}; if (!$closedAuction_itemID{$itemID}) { $minus_itemID{$itemID} = $record; $minus_input = 1; goto CurrentAuctions; }; goto Next; } ClosedAuctionItemID: { my $itemID = $record->{itemID}; $closedAuction_itemID{$itemID} = $record; delete $minus_itemID{$itemID}; $minus_input = -1; goto CurrentAuctions; } CurrentAuctions: { my @result = (); foreach my $itemID (keys %minus_itemID) { push(@result, {itemID => $itemID}); } print "$minus_input $clock\n"; print_result(@result, ['itemID']); print "\n"; goto Next; } Next: } }
このプログラムにおいて,“# Dispatcher” というコメントをつけた部分が本来なら外部からとどくストリーム要素の内容によって 2 個のパイプ (“OpenAuctionItemID” と “ClosedAuctionItemID”) のうちのいずれかにその要素をふりわける.
currentAuctions() をよびだすと,結果はつぎのように表示される.
1 3100 itemID ---------------- 3 1 3150 itemID ---------------- 3 2 1 3300 itemID ---------------- 1 3 2 1 3600 itemID ---------------- 1 0 3 2 -1 5400 itemID ---------------- 1 0 2 -1 6500 itemID ---------------- 1 0 -1 7050 itemID ---------------- 0 -1 7200 itemID ----------------
おなじ意味をもつプログラムをつぎのように記述することもできる.
stream CurrentAuction2 is Select * From OpenAuction Where itemID Not in (Select itemID From ClosedAuction);
このプログラムを Perl ではつぎのように表現することができる.
sub currentAuctions2() { my $openAuction = instream_open(@OpenAuction); my $closedAuction = instream_open(@ClosedAuction); my %closedAuction = (); my %result = (); for (my $t = 0; $t < $Max_timestamp; $t++) { tick(); my $record; my $changed = 0; if ($record = instream_get($openAuction)) { goto OpenAuctionItemID; } elsif ($record = instream_get($closedAuction)) { goto ClosedAuctionItemID; } goto Next; OpenAuctionItemID: { my $itemID = $record->{itemID}; if (!$closedAuction{$itemID}) { $result{$itemID} = $record; $changed = 1; goto Result; }; goto Next; } ClosedAuctionItemID: { my $itemID = $record->{itemID}; $closedAuction{$itemID} = $record; delete $result{$itemID}; # where itemID Not in ... $changed = -1; goto Result; } Result: { my @result = (); foreach my $itemID (keys %result) { push(@result, {itemID => $itemID}); } print "$changed $clock\n"; print_result(@result, ['itemID']); print "\n"; goto Next; } Next: } }
[TBD]
currentAuctions2() の実行によってえられる結果は currentAuctions() のそれとひとしい.
終了時価格に関するクエリ
各オークションの終了時の価格を報告するには,つぎのクエリを実行すればよい.
stream BidPrice is Select itemID, bidPrice as price From Bid; stream OpenPrice is Select itemID, start_price as price From OpenAuction; temporal relation P is BidPrice Union All OpenPrice temporal relation CurrentPrice is Select P.itemID, Max(P.price) as price From P [range .. now] Group By P.itemID; stream Result is Select Rstream(C.itemID, P.price) From ClosedAuction [Now] C, CurrentPrice P Where C.itemID = P.itemID;
これを Perl に翻訳するとつぎのようになる. ここでもパイプどうしを goto 文によってつないでいる.
sub closedPrices() { my $openAuction = instream_open(@OpenAuction); my $closedAuction = instream_open(@ClosedAuction); my $bid = instream_open(@Bid); my %CurrentPrice = (); my $selected = outstream_open('', ['itemID', 'price', 'timestamp']); for (my $t = 0; $t < $Max_timestamp; $t++) { tick(); my $record; my $itemID; my $price; # Input: external stream $closedAuction, $bid, $openAuction -> # stream ($record) if ($record = instream_get($closedAuction)) { goto Result; } elsif ($record = instream_get($bid)) { goto BidPrice; } elsif ($record = instream_get($openAuction)) { goto OpenPrice; } goto Next; # BidPrice: stream $record->{itemID, bidPrice} -> # stream ($itemID, $price) BidPrice: { $itemID = $record->{itemID}; $price = $record->{bidPrice}; goto CurrentPrice; # implicit union } # OpenPrice: stream $record->{itemID, start_price} -> # stream ($itemID, $price) OpenPrice: { $itemID = $record->{itemID}; $price = $record->{start_price}; goto CurrentPrice; # implicit union } # CurrentPrice: stream ($itemID, $price) -> hashed relation (%CurrentPrice) CurrentPrice: { # Max(price) & Group By itemID: if (!$CurrentPrice{$itemID}) { $CurrentPrice{$itemID} = {itemID => $itemID, price => $price}; # goto Result; } elsif ($price > $CurrentPrice{$itemID}->{price}) { $CurrentPrice{$itemID}->{price} = $price; # goto Result; } goto Next; } # Result: hashed relation (%CurrentPrice) -> external stream $selected Result: { my $itemID = $record->{itemID}; my $currentPrice = $CurrentPrice{$itemID}; my $Rstream_record = {itemID => $itemID, price => $currentPrice->{price}, timestamp => $clock}; outstream_put($selected, $Rstream_record); goto Next; } Next: } } ## [Original CQL Query] # CurrentPrice: # Select P.itemID, Max(P.price) as price # From ((Select itemID, bidPrice as price # From Bid) Union All # (Select itemID, start_price as price # From OpenAuction)) P # Group By P.itemID; # # Select Rstream(C.itemID, I.categoryID, P.price) # From ClosedAuction [Now] C, CurrentPrice P, Item I # Where C.itemID = P.itemID and C.itemID = I.id
closedPrices() をよびだすと,結果はつぎのように表示される.
itemID price timestamp 3 216 5400 2 126 6500 1 302 7050 0 80 7200
終了時価格のカテゴリーごとの平均値に関するクエリ
Monitor the average closing price across items in each category over the last hour.
temporal relation CurrentPrice is Select P.itemID, Max(P.price) as price From ((Select itemID, bid_price as price From Bid) Union All (Select itemID, start_price as price From OpenAuction)) [range .. now] P Group By P.itemID stream ClosingPriceStream is Select Rstream(I.categoryID as categoryID, P.price as price) From ClosedAuction [Now] C, CurrentPrice P, Item I Where C.itemID = P.itemID and C.itemID = I.id temporal relation AvgPrice is Select catID, Avg(price) From ClosingPriceStream [Range 1 Hour] Group By catID
## [Original] # ClosingPriceStream: # Select Rstream(T.id as catID, P.price as price) # From ClosedAuction [Now] C, CurrentPrice P, # Item I, Category T # Where C.itemID = P.itemID and C.itemID = I.id and I.categoryID = T.id
Perl ではつぎのように表現することができる. ただし,ウィンドウ・サイズは 1 時間 (3600 sec) でなく 1500 sec にしている.
sub averageClosingPrice() { my $openAuction = instream_open(@OpenAuction); my $closedAuction = instream_open(@ClosedAuction); my $bid = instream_open(@Bid); my %CurrentPrice = (); my @ClosingPriceWindow = (); my %AvgPrice = (); # Generate Item primary index: relation (@Item) -> hashed relation (%ItemById) my %ItemById = (); foreach my $record (@Item) { $ItemById{$record->{id}} = $record; } for (my $t = 0; $t < $Max_timestamp; $t++) { my $changed = 0; my $closedAuction_record; my $bid_record; my $openAuction_record; my $categoryID; my $itemID; my $price; tick(); my $changed1 = 0; ClosingPriceWindow_remove: { if (@ClosingPriceWindow > 0 && $ClosingPriceWindow[0]->{timestamp} <= $clock - 1500) { # $ClosingPriceWindow[0]->{timestamp} <= $clock - 3600) { my $removedFromClosingPrice = shift @ClosingPriceWindow; $categoryID = $removedFromClosingPrice->{categoryID}; $price = $removedFromClosingPrice->{price}; $changed1 = 1; } } AvgPrice_remove: { if ($changed1) { $AvgPrice{$categoryID}->{priceSum} -= $price; $AvgPrice{$categoryID}->{count}--; $changed = -1; } } Result_remove: { if ($changed) { print "$changed $clock\n"; my @result = (); foreach my $categoryID (keys %AvgPrice) { my $record = $AvgPrice{$categoryID}; if ($record->{count} > 0) { push(@result, {categoryID => $categoryID, Avg_price => $record->{priceSum} / $record->{count}}); } } print_result(@result, ['categoryID', 'Avg_price']); print "\n"; } } # Input: external stream $closedAuction, $bid, $openAuction -> # stream ($record) if ($closedAuction_record = instream_get($closedAuction)) { goto ClosingPriceStream; } elsif ($bid_record = instream_get($bid)) { goto BidPrice; } elsif ($openAuction_record = instream_get($openAuction)) { goto OpenPrice; } goto Next; # BidPrice: stream $record->{itemID, bidPrice} -> # stream ($itemID, $price) BidPrice: { $itemID = $bid_record->{itemID}; $price = $bid_record->{bidPrice}; goto CurrentPrice; } # OpenPrice: stream $record->{itemID, start_price} -> # stream ($itemID, $price) OpenPrice: { $itemID = $openAuction_record->{itemID}; $price = $openAuction_record->{start_price}; goto CurrentPrice; } # CurrentPrice: stream ($itemID, $price) -> hashed relation (%CurrentPrice) CurrentPrice: { # Max(price) & Group By itemID: if (!$CurrentPrice{$itemID}) { $CurrentPrice{$itemID} = {itemID => $itemID, price => $price}; goto ClosingPriceStream; } elsif ($price > $CurrentPrice{$itemID}->{price}) { $CurrentPrice{$itemID}->{price} = $price; goto ClosingPriceStream; } goto Next; } # ClosingPriceStream: # external stream ($closedAuction), hashed relation (%CurrentPrice) -> # stream ($categoryID, $price) ClosingPriceStream: { if ($closedAuction_record) { my $itemID = $closedAuction_record->{itemID}; my $currentPrice = $CurrentPrice{$itemID}; if ($currentPrice) { $categoryID = $ItemById{$itemID}->{categoryID}; $price = $currentPrice->{price}; goto ClosingPriceWindow_add; } } goto Next; } # ClosingPriceWindow: stream ($categoryID, $price) -> # relation (@ClosingPriceWindow) ClosingPriceWindow_add: { my $ClosingPrice_record = {categoryID => $categoryID, price => $price, timestamp => $clock}; # Get ClosingPriceStream [Range 1 Hour] as ClosingPrice push(@ClosingPriceWindow, $ClosingPrice_record); goto AvgPrice_add; } # AvgPrice: signed stream ($categoryID, $price) -> # hashed relation (%AvgPrice) AvgPrice_add: { if (!$AvgPrice{$categoryID}) { $AvgPrice{$categoryID} = {categoryID => $categoryID, priceSum => $price, count => 1}; } else { $AvgPrice{$categoryID}->{priceSum} += $price; $AvgPrice{$categoryID}->{count}++; } $changed = 1; goto Result; } # Result: hashed relation (%AvgPrice) -> view of relation @result Result: { print "$changed $clock\n"; my @result = (); foreach my $categoryID (keys %AvgPrice) { my $record = $AvgPrice{$categoryID}; if ($record->{count} > 0) { push(@result, {categoryID => $categoryID, Avg_price => $record->{priceSum} / $record->{count}}); } } print_result(@result, ['categoryID', 'Avg_price']); print "\n"; goto Next; } Next: } }
このプログラムがこれまでのプログラムともっともおおきくことなる点は,スライディング・ウィンドウ [Range 1 Hour] から,ふるい要素が脱落していくために,再計算が必要になることである. その再計算をおこなっているのが ClosingPriceWindow_remove と AvgPrice_remove という 2 つのパイプであり,再計算した値を Result_remove によって印刷している. これらが for 文の最初の部分におかれているのは,tick() によって時刻が 1 秒すすめられたときに,必要ならすぐに要素をウィンドウから削除するためである.
averageClosingPrice() をよびだすと,結果はつぎのように表示される.
1 5400 categoryID Avg_price ---------------- 1 216 1 6500 categoryID Avg_price ---------------- 1 216 2 126 -1 6900 categoryID Avg_price ---------------- 2 126 1 7050 categoryID Avg_price ---------------- 2 214 1 7200 categoryID Avg_price ---------------- 1 80 2 214 -1 8000 categoryID Avg_price ---------------- 1 80 2 302
その他
オンライン・オークションの例題のなかには,さらに以下のような 5 題の問題がふくまれているが,これらは当面,省略する.
# 7. Short Auctions Query: Report all auctions that closed within five hours of their opening. # # Select Rstream(OpenAuction.*) # From OpenAuction [Range 5 Hour] O, ClosedAuction [Now] C # Where O.itemID = C.itemID # 8. Hot Item Query: Select the item(s) with the most bids in the past hour. Update the results every minute. # # HotItemStream: # Select Rstream(itemID) # From (Select B1.itemID as itemID, Count(*) as num # From Bid [Range 60 Minute # Slide 1 Minute] B1 # Group By B1.itemID) # Where num >= All (Select Count(*) # From Bid [Range 60 Minute # Slide 1 Minute] B2 # Group By B2.itemID) # # Select * # From HotItemStream [Range 1 Minute] # 9. Average Selling Price By Seller Query: For each seller, maintain the average selling price over the last 10 items sold. # # CurrentPrice: # Select P.itemID, Max(P.price) as price # From ((Select itemID, bid_price as price # From Bid) Union All # (Select itemID, start_price as price # From OpenAuction)) P # Group By P.itemID # # ClosingPriceStream: # Select Rstream(O.sellerID as sellerID, P.price as price) # From ClosedAuction [Now] C, CurrentPrice P, # OpenAuction O # Where C.itemID = P.itemID and C.itemID = O.itemID # # AvgSellingPrice: # Select sellerID, Avg(price) # From ClosingPriceStream [Partition By sellerID Rows 10] # Group By sellerID # 10. Highest Bid Query: Every 10 minutes return the highest bid(s) in the recent 10 minutes. # # Select Rstream(itemID, bid_price) # From Bid [Range 10 Minute # Slide 10 Minute] # Where bid_price = (Select Max(bid_price) # From Bid [Range 10 Minute] # Slide 10 Minute] # 11. Monitor New Users Query: Find people who put up something for sale within 12 hours of registering to use the auction service. # # NewPersonStream: # Select Istream(P.id, P.name) # From Person P # # Select Distinct(P.id, P.name) # From Select Rstream(P.id, P.name) # From NewPersonStream [Range 12 Hour] P, OpenAuction A [Now] # Where P.id = A.sellerID